diff options
Diffstat (limited to 'runtime/stream.c')
-rw-r--r-- | runtime/stream.c | 142 |
1 files changed, 93 insertions, 49 deletions
diff --git a/runtime/stream.c b/runtime/stream.c index 906a45fc..b8dd5150 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -74,11 +74,12 @@ DEFobjStaticHelpers DEFobjCurrIf(zlibw) /* forward definitions */ -static rsRetVal strmFlushInternal(strm_t *pThis); +static rsRetVal strmFlushInternal(strm_t *pThis, int bFlushZip); static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); static void *asyncWriterThread(void *pPtr); -static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush); +static rsRetVal doZipFinish(strm_t *pThis); static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); @@ -341,7 +342,7 @@ static rsRetVal strmCloseFile(strm_t *pThis) (pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName); if(pThis->tOperationsMode != STREAMMODE_READ) { - strmFlushInternal(pThis); + strmFlushInternal(pThis, 0); if(pThis->bAsyncWrite) { strmWaitAsyncWriterDone(pThis); } @@ -675,6 +676,7 @@ BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro! pThis->fd = -1; pThis->fdDir = -1; pThis->iUngetC = -1; + pThis->bVeryReliableZip = 0; pThis->sType = STREAMTYPE_FILE_SINGLE; pThis->sIOBufSize = glblGetIOBufSize(); pThis->tOpenMode = 0600; @@ -777,6 +779,10 @@ stopWriter(strm_t *pThis) BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */ int i; CODESTARTobjDestruct(strm) + /* we need to stop the ZIP writer */ + if(pThis->iZipLevel) { + doZipFinish(pThis); + } if(pThis->bAsyncWrite) /* Note: mutex will be unlocked in stopWriter! */ d_pthread_mutex_lock(&pThis->mut); @@ -919,14 +925,14 @@ finalize_it: /* write memory buffer to a stream object. */ static inline rsRetVal -doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf) +doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush) { DEFiRet; ASSERT(pThis != NULL); if(pThis->iZipLevel) { - CHKiRet(doZipWrite(pThis, pBuf, lenBuf)); + CHKiRet(doZipWrite(pThis, pBuf, lenBuf, bFlush)); } else { /* write without zipping */ CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf)); @@ -971,7 +977,7 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf) * the background thread. -- rgerhards, 2009-07-07 */ static rsRetVal -strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlushZip) { DEFiRet; @@ -990,7 +996,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) if(pThis->bAsyncWrite) { CHKiRet(doAsyncWriteInternal(pThis, lenBuf)); } else { - CHKiRet(doWriteInternal(pThis, pBuf, lenBuf)); + CHKiRet(doWriteInternal(pThis, pBuf, lenBuf, bFlushZip)); } @@ -1030,7 +1036,7 @@ asyncWriterThread(void *pPtr) } if(bTimedOut && pThis->iBufPtr > 0) { /* if we timed out, we need to flush pending data */ - strmFlushInternal(pThis); + strmFlushInternal(pThis, 0); bTimedOut = 0; continue; /* now we should have data */ } @@ -1056,7 +1062,7 @@ asyncWriterThread(void *pPtr) bTimedOut = 0; /* we may have timed out, but there *is* work to do... */ iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS; - doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf); + doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, 0); // TODO: flush state // TODO: error check????? 2009-07-06 --pThis->iCnt; @@ -1167,63 +1173,99 @@ finalize_it: * rgerhards, 2009-06-04 */ static rsRetVal -doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush) { - z_stream zstrm; int zRet; /* zlib return state */ - sbool bzInitDone = RSFALSE; DEFiRet; + unsigned outavail; assert(pThis != NULL); assert(pBuf != NULL); - /* allocate deflate state */ - zstrm.zalloc = Z_NULL; - zstrm.zfree = Z_NULL; - zstrm.opaque = Z_NULL; - zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */ - /* see note in file header for the params we use with deflateInit2() */ - zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); - if(zRet != Z_OK) { - DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet); - ABORT_FINALIZE(RS_RET_ZLIB_ERR); + if(!pThis->bzInitDone) { + /* allocate deflate state */ + pThis->zstrm.zalloc = Z_NULL; + pThis->zstrm.zfree = Z_NULL; + pThis->zstrm.opaque = Z_NULL; + /* see note in file header for the params we use with deflateInit2() */ + zRet = zlibw.DeflateInit2(&pThis->zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); + if(zRet != Z_OK) { + DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet); + ABORT_FINALIZE(RS_RET_ZLIB_ERR); + } + pThis->bzInitDone = RSTRUE; } - bzInitDone = RSTRUE; /* now doing the compression */ - zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */ - zstrm.avail_in = lenBuf; + pThis->zstrm.next_in = (Bytef*) pBuf; + pThis->zstrm.avail_in = lenBuf; /* run deflate() on buffer until everything has been compressed */ do { - DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in); - zstrm.avail_out = pThis->sIOBufSize; - zstrm.next_out = pThis->pZipBuf; - zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */ - DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out); - assert(zRet != Z_STREAM_ERROR); /* state not clobbered */ - if(zstrm.avail_out == pThis->sIOBufSize) - break; /* this is valid, indicates end of compression --> see zlib howto */ - CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out)); - } while (zstrm.avail_out == 0); - assert(zstrm.avail_in == 0); /* all input will be used */ + DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in); + pThis->zstrm.avail_out = pThis->sIOBufSize; + pThis->zstrm.next_out = pThis->pZipBuf; + zRet = zlibw.Deflate(&pThis->zstrm, bFlush ? Z_SYNC_FLUSH : Z_NO_FLUSH); /* no bad return value */ + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out); + outavail =pThis->sIOBufSize - pThis->zstrm.avail_out; + if(outavail != 0) { + CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail)); + } + } while (pThis->zstrm.avail_out == 0); finalize_it: - if(bzInitDone) { - zRet = zlibw.DeflateEnd(&zstrm); - if(zRet != Z_OK) { - DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet); + if(pThis->bzInitDone && pThis->bVeryReliableZip) { + doZipFinish(pThis); + } + RETiRet; +} + + + +/* finish zlib buffer, to be called before closing the ZIP file (if + * running in stream mode). + */ +static rsRetVal +doZipFinish(strm_t *pThis) +{ + int zRet; /* zlib return state */ + DEFiRet; + unsigned outavail; + assert(pThis != NULL); + + if(!pThis->bzInitDone) { + FINALIZE; + } + +dbgprintf("AAAA: doZipFinish() called\n"); + pThis->zstrm.avail_in = 0; + /* run deflate() on buffer until everything has been compressed */ + do { + DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in); + pThis->zstrm.avail_out = pThis->sIOBufSize; + pThis->zstrm.next_out = pThis->pZipBuf; + zRet = zlibw.Deflate(&pThis->zstrm, Z_FINISH); /* no bad return value */ + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out); + outavail = pThis->sIOBufSize - pThis->zstrm.avail_out; + if(outavail != 0) { + CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail)); } + } while (pThis->zstrm.avail_out == 0); + +finalize_it: + zRet = zlibw.DeflateEnd(&pThis->zstrm); + if(zRet != Z_OK) { + DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet); } + pThis->bzInitDone = 0; RETiRet; } - /* flush stream output buffer to persistent storage. This can be called at any time * and is automatically called when the output buffer is full. * rgerhards, 2008-01-10 */ static rsRetVal -strmFlushInternal(strm_t *pThis) +strmFlushInternal(strm_t *pThis, int bFlushZip) { DEFiRet; @@ -1233,7 +1275,7 @@ strmFlushInternal(strm_t *pThis) (long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : ""); if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) { - iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr); + iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr, bFlushZip); } RETiRet; @@ -1255,7 +1297,7 @@ strmFlush(strm_t *pThis) if(pThis->bAsyncWrite) d_pthread_mutex_lock(&pThis->mut); - CHKiRet(strmFlushInternal(pThis)); + CHKiRet(strmFlushInternal(pThis, 1)); finalize_it: if(pThis->bAsyncWrite) @@ -1278,7 +1320,7 @@ static rsRetVal strmSeek(strm_t *pThis, off64_t offs) if(pThis->fd == -1) { CHKiRet(strmOpenFile(pThis)); } else { - CHKiRet(strmFlushInternal(pThis)); + CHKiRet(strmFlushInternal(pThis, 0)); } long long i; DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs); @@ -1321,7 +1363,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c) /* if the buffer is full, we need to flush before we can write */ if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlushInternal(pThis)); + CHKiRet(strmFlushInternal(pThis, 0)); } /* we now always have space for one character, so we simply copy it */ *(pThis->pIOBuf + pThis->iBufPtr) = c; @@ -1391,7 +1433,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) iOffset = 0; do { if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */ + CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */ } iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */ if(iWrite > lenBuf) @@ -1406,7 +1448,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) * write it. This seems more natural than waiting (hours?) for the next message... */ if(pThis->iBufPtr == pThis->sIOBufSize) { - CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */ + CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */ } finalize_it: @@ -1434,6 +1476,7 @@ DEFpropSetMeth(strm, tOperationsMode, int) DEFpropSetMeth(strm, tOpenMode, mode_t) DEFpropSetMeth(strm, sType, strmType_t) DEFpropSetMeth(strm, iZipLevel, int) +DEFpropSetMeth(strm, bVeryReliableZip, int) DEFpropSetMeth(strm, bSync, int) DEFpropSetMeth(strm, sIOBufSize, size_t) DEFpropSetMeth(strm, iSizeLimit, off_t) @@ -1565,7 +1608,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm) ISOBJ_TYPE_assert(pThis, strm); ISOBJ_TYPE_assert(pStrm, strm); - strmFlushInternal(pThis); + strmFlushInternal(pThis, 0); CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis)); objSerializeSCALAR(pStrm, iCurrFNum, INT); @@ -1758,6 +1801,7 @@ CODESTARTobjQueryInterface(strm) pIf->SettOpenMode = strmSettOpenMode; pIf->SetsType = strmSetsType; pIf->SetiZipLevel = strmSetiZipLevel; + pIf->SetbVeryReliableZip = strmSetbVeryReliableZip; pIf->SetbSync = strmSetbSync; pIf->SetsIOBufSize = strmSetsIOBufSize; pIf->SetiSizeLimit = strmSetiSizeLimit; |