summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/msg.c96
-rw-r--r--runtime/msg.h2
-rw-r--r--runtime/ruleset.c3
-rw-r--r--runtime/stream.c140
-rw-r--r--runtime/stream.h7
5 files changed, 156 insertions, 92 deletions
diff --git a/runtime/msg.c b/runtime/msg.c
index d874178b..b0b93f98 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -2373,40 +2373,38 @@ char *textpri(char *pRes, size_t pResLen, int pri)
*/
typedef enum ENOWType { NOW_NOW, NOW_YEAR, NOW_MONTH, NOW_DAY, NOW_HOUR, NOW_HHOUR, NOW_QHOUR, NOW_MINUTE } eNOWType;
#define tmpBUFSIZE 16 /* size of formatting buffer */
-static uchar *getNOW(eNOWType eNow)
+static uchar *getNOW(eNOWType eNow, struct syslogTime *t)
{
uchar *pBuf;
- struct syslogTime t;
if((pBuf = (uchar*) MALLOC(sizeof(uchar) * tmpBUFSIZE)) == NULL) {
return NULL;
}
- datetime.getCurrTime(&t, NULL);
switch(eNow) {
case NOW_NOW:
- snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d-%2.2d-%2.2d", t.year, t.month, t.day);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d-%2.2d-%2.2d", t->year, t->month, t->day);
break;
case NOW_YEAR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d", t.year);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%4.4d", t->year);
break;
case NOW_MONTH:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.month);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->month);
break;
case NOW_DAY:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.day);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->day);
break;
case NOW_HOUR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.hour);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->hour);
break;
case NOW_HHOUR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.minute / 30);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->minute / 30);
break;
case NOW_QHOUR:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.minute / 15);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->minute / 15);
break;
case NOW_MINUTE:
- snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t.minute);
+ snprintf((char*) pBuf, tmpBUFSIZE, "%2.2d", t->minute);
break;
}
@@ -2673,7 +2671,7 @@ finalize_it:
* Parameter "bMustBeFreed" is set by this function. It tells the
* caller whether or not the string returned must be freed by the
* caller itself. It is is 0, the caller MUST NOT free it. If it is
- * 1, the caller MUST free 1. Handling this wrongly leads to either
+ * 1, the caller MUST free it. Handling this wrongly leads to either
* a memory leak of a program abort (do to double-frees or frees on
* the constant memory pool). So be careful to do it right.
* rgerhards 2004-11-23
@@ -2690,7 +2688,7 @@ finalize_it:
return(UCHAR_CONSTANT("**OUT OF MEMORY**"));}
uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
propid_t propid, es_str_t *propName, rs_size_t *pPropLen,
- unsigned short *pbMustBeFreed)
+ unsigned short *pbMustBeFreed, struct syslogTime *ttNow)
{
uchar *pRes; /* result pointer */
rs_size_t bufLen = -1; /* length of string or -1, if not known */
@@ -2803,52 +2801,68 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
pRes = (uchar*)getParseSuccess(pMsg);
break;
case PROP_SYS_NOW:
- if((pRes = getNOW(NOW_NOW)) == NULL) {
+ if((pRes = getNOW(NOW_NOW, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 10;
+ }
break;
case PROP_SYS_YEAR:
- if((pRes = getNOW(NOW_YEAR)) == NULL) {
+ if((pRes = getNOW(NOW_YEAR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 4;
+ }
break;
case PROP_SYS_MONTH:
- if((pRes = getNOW(NOW_MONTH)) == NULL) {
+ if((pRes = getNOW(NOW_MONTH, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_DAY:
- if((pRes = getNOW(NOW_DAY)) == NULL) {
+ if((pRes = getNOW(NOW_DAY, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_HOUR:
- if((pRes = getNOW(NOW_HOUR)) == NULL) {
+ if((pRes = getNOW(NOW_HOUR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_HHOUR:
- if((pRes = getNOW(NOW_HHOUR)) == NULL) {
+ if((pRes = getNOW(NOW_HHOUR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_QHOUR:
- if((pRes = getNOW(NOW_QHOUR)) == NULL) {
+ if((pRes = getNOW(NOW_QHOUR, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_MINUTE:
- if((pRes = getNOW(NOW_MINUTE)) == NULL) {
+ if((pRes = getNOW(NOW_MINUTE, ttNow)) == NULL) {
RET_OUT_OF_MEMORY;
- } else
- *pbMustBeFreed = 1; /* all of these functions allocate dyn. memory */
+ } else {
+ *pbMustBeFreed = 1;
+ bufLen = 2;
+ }
break;
case PROP_SYS_MYHOSTNAME:
pRes = glbl.GetLocalHostName();
@@ -2907,7 +2921,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
/* If we did not receive a template pointer, we are already done... */
- if(pTpe == NULL) {
+ if(pTpe == NULL || !pTpe->bComplexProcessing) {
*pPropLen = (bufLen == -1) ? ustrlen(pRes) : bufLen;
return pRes;
}
@@ -3494,9 +3508,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
jsonField(pTpe, &pRes, pbMustBeFreed, &bufLen);
}
- if(bufLen == -1)
- bufLen = ustrlen(pRes);
- *pPropLen = bufLen;
+ *pPropLen = (bufLen == -1) ? ustrlen(pRes) : bufLen;
ENDfunc
return(pRes);
@@ -3553,7 +3565,7 @@ msgGetMsgVarNew(msg_t *pThis, uchar *name)
/* always call MsgGetProp() without a template specifier */
/* TODO: optimize propNameToID() call -- rgerhards, 2009-06-26 */
propNameStrToID(name, &propid);
- pszProp = (uchar*) MsgGetProp(pThis, NULL, propid, NULL, &propLen, &bMustBeFreed);
+ pszProp = (uchar*) MsgGetProp(pThis, NULL, propid, NULL, &propLen, &bMustBeFreed, NULL);
estr = es_newStrFromCStr((char*)pszProp, propLen);
if(bMustBeFreed)
diff --git a/runtime/msg.h b/runtime/msg.h
index 396e861f..172ae0da 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -173,7 +173,7 @@ void MsgSetRawMsg(msg_t *pMsg, char* pszRawMsg, size_t lenMsg);
rsRetVal MsgReplaceMSG(msg_t *pThis, uchar* pszMSG, int lenMSG);
uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
propid_t propid, es_str_t *propName,
- rs_size_t *pPropLen, unsigned short *pbMustBeFreed);
+ rs_size_t *pPropLen, unsigned short *pbMustBeFreed, struct syslogTime *ttNow);
char *textpri(char *pRes, size_t pResLen, int pri);
rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar);
es_str_t* msgGetMsgVarNew(msg_t *pThis, uchar *name);
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 8d2bb924..24d8279c 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -378,7 +378,8 @@ evalPROPFILT(struct cnfstmt *stmt, msg_t *pMsg)
goto done;
pszPropVal = MsgGetProp(pMsg, NULL, stmt->d.s_propfilt.propID,
- stmt->d.s_propfilt.propName, &propLen, &pbMustBeFreed);
+ stmt->d.s_propfilt.propName, &propLen,
+ &pbMustBeFreed, NULL);
/* Now do the compares (short list currently ;)) */
switch(stmt->d.s_propfilt.operation ) {
diff --git a/runtime/stream.c b/runtime/stream.c
index 742799d2..064cb42a 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -74,11 +74,12 @@ DEFobjStaticHelpers
DEFobjCurrIf(zlibw)
/* forward definitions */
-static rsRetVal strmFlushInternal(strm_t *pThis);
+static rsRetVal strmFlushInternal(strm_t *pThis, int bFlushZip);
static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
static rsRetVal strmCloseFile(strm_t *pThis);
static void *asyncWriterThread(void *pPtr);
-static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
+static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush);
+static rsRetVal doZipFinish(strm_t *pThis);
static rsRetVal strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf);
@@ -341,7 +342,7 @@ static rsRetVal strmCloseFile(strm_t *pThis)
(pThis->pszFName == NULL) ? "N/A" : (char*)pThis->pszFName);
if(pThis->tOperationsMode != STREAMMODE_READ) {
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
if(pThis->bAsyncWrite) {
strmWaitAsyncWriterDone(pThis);
}
@@ -675,6 +676,7 @@ BEGINobjConstruct(strm) /* be sure to specify the object type also in END macro!
pThis->fd = -1;
pThis->fdDir = -1;
pThis->iUngetC = -1;
+ pThis->bVeryReliableZip = 0;
pThis->sType = STREAMTYPE_FILE_SINGLE;
pThis->sIOBufSize = glblGetIOBufSize();
pThis->tOpenMode = 0600;
@@ -777,6 +779,10 @@ stopWriter(strm_t *pThis)
BEGINobjDestruct(strm) /* be sure to specify the object type also in END and CODESTART macros! */
int i;
CODESTARTobjDestruct(strm)
+ /* we need to stop the ZIP writer */
+ if(pThis->iZipLevel) {
+ doZipFinish(pThis);
+ }
if(pThis->bAsyncWrite)
/* Note: mutex will be unlocked in stopWriter! */
d_pthread_mutex_lock(&pThis->mut);
@@ -919,14 +925,14 @@ finalize_it:
/* write memory buffer to a stream object.
*/
static inline rsRetVal
-doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doWriteInternal(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
DEFiRet;
ASSERT(pThis != NULL);
if(pThis->iZipLevel) {
- CHKiRet(doZipWrite(pThis, pBuf, lenBuf));
+ CHKiRet(doZipWrite(pThis, pBuf, lenBuf, bFlush));
} else {
/* write without zipping */
CHKiRet(strmPhysWrite(pThis, pBuf, lenBuf));
@@ -971,7 +977,7 @@ doAsyncWriteInternal(strm_t *pThis, size_t lenBuf)
* the background thread. -- rgerhards, 2009-07-07
*/
static rsRetVal
-strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlushZip)
{
DEFiRet;
@@ -990,7 +996,7 @@ strmSchedWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
if(pThis->bAsyncWrite) {
CHKiRet(doAsyncWriteInternal(pThis, lenBuf));
} else {
- CHKiRet(doWriteInternal(pThis, pBuf, lenBuf));
+ CHKiRet(doWriteInternal(pThis, pBuf, lenBuf, bFlushZip));
}
@@ -1030,7 +1036,7 @@ asyncWriterThread(void *pPtr)
}
if(bTimedOut && pThis->iBufPtr > 0) {
/* if we timed out, we need to flush pending data */
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
bTimedOut = 0;
continue; /* now we should have data */
}
@@ -1056,7 +1062,7 @@ asyncWriterThread(void *pPtr)
bTimedOut = 0; /* we may have timed out, but there *is* work to do... */
iDeq = pThis->iDeq++ % STREAM_ASYNC_NUMBUFS;
- doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf);
+ doWriteInternal(pThis, pThis->asyncBuf[iDeq].pBuf, pThis->asyncBuf[iDeq].lenBuf, 0); // TODO: flush state
// TODO: error check????? 2009-07-06
--pThis->iCnt;
@@ -1166,63 +1172,101 @@ finalize_it:
* rgerhards, 2009-06-04
*/
static rsRetVal
-doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
+doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush)
{
- z_stream zstrm;
int zRet; /* zlib return state */
sbool bzInitDone = RSFALSE;
DEFiRet;
+ unsigned outavail;
assert(pThis != NULL);
assert(pBuf != NULL);
- /* allocate deflate state */
- zstrm.zalloc = Z_NULL;
- zstrm.zfree = Z_NULL;
- zstrm.opaque = Z_NULL;
- zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
- /* see note in file header for the params we use with deflateInit2() */
- zRet = zlibw.DeflateInit2(&zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
- ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ if(!pThis->bzInitDone) {
+ /* allocate deflate state */
+ pThis->zstrm.zalloc = Z_NULL;
+ pThis->zstrm.zfree = Z_NULL;
+ pThis->zstrm.opaque = Z_NULL;
+ /* see note in file header for the params we use with deflateInit2() */
+ zRet = zlibw.DeflateInit2(&pThis->zstrm, pThis->iZipLevel, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateInit2()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+ pThis->bzInitDone = RSTRUE;
}
bzInitDone = RSTRUE;
/* now doing the compression */
- zstrm.next_in = (Bytef*) pBuf; /* as of zlib doc, this must be set BEFORE DeflateInit2 */
- zstrm.avail_in = lenBuf;
+ pThis->zstrm.next_in = (Bytef*) pBuf;
+ pThis->zstrm.avail_in = lenBuf;
/* run deflate() on buffer until everything has been compressed */
do {
- DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", zstrm.avail_in, zstrm.total_in);
- zstrm.avail_out = pThis->sIOBufSize;
- zstrm.next_out = pThis->pZipBuf;
- zRet = zlibw.Deflate(&zstrm, Z_FINISH); /* no bad return value */
- DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, zstrm.avail_out);
- assert(zRet != Z_STREAM_ERROR); /* state not clobbered */
- if(zstrm.avail_out == pThis->sIOBufSize)
- break; /* this is valid, indicates end of compression --> see zlib howto */
- CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, pThis->sIOBufSize - zstrm.avail_out));
- } while (zstrm.avail_out == 0);
- assert(zstrm.avail_in == 0); /* all input will be used */
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = pThis->sIOBufSize;
+ pThis->zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&pThis->zstrm, bFlush ? Z_SYNC_FLUSH : Z_NO_FLUSH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail =pThis->sIOBufSize - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail));
+ }
+ } while (pThis->zstrm.avail_out == 0);
finalize_it:
- if(bzInitDone) {
- zRet = zlibw.DeflateEnd(&zstrm);
- if(zRet != Z_OK) {
- DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
+ if(pThis->bzInitDone && pThis->bVeryReliableZip) {
+ doZipFinish(pThis);
+ }
+ RETiRet;
+}
+
+
+
+/* finish zlib buffer, to be called before closing the ZIP file (if
+ * running in stream mode).
+ */
+static rsRetVal
+doZipFinish(strm_t *pThis)
+{
+ int zRet; /* zlib return state */
+ DEFiRet;
+ unsigned outavail;
+ assert(pThis != NULL);
+
+ if(!pThis->bzInitDone) {
+ FINALIZE;
+ }
+
+dbgprintf("AAAA: doZipFinish() called\n");
+ pThis->zstrm.avail_in = 0;
+ /* run deflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = pThis->sIOBufSize;
+ pThis->zstrm.next_out = pThis->pZipBuf;
+ zRet = zlibw.Deflate(&pThis->zstrm, Z_FINISH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail = pThis->sIOBufSize - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(strmPhysWrite(pThis, (uchar*)pThis->pZipBuf, outavail));
}
+ } while (pThis->zstrm.avail_out == 0);
+
+finalize_it:
+ zRet = zlibw.DeflateEnd(&pThis->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
}
+ pThis->bzInitDone = 0;
RETiRet;
}
-
/* flush stream output buffer to persistent storage. This can be called at any time
* and is automatically called when the output buffer is full.
* rgerhards, 2008-01-10
*/
static rsRetVal
-strmFlushInternal(strm_t *pThis)
+strmFlushInternal(strm_t *pThis, int bFlushZip)
{
DEFiRet;
@@ -1232,7 +1276,7 @@ strmFlushInternal(strm_t *pThis)
(long) pThis->iBufPtr, (pThis->iBufPtr == 0) ? " (no need to flush)" : "");
if(pThis->tOperationsMode != STREAMMODE_READ && pThis->iBufPtr > 0) {
- iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr);
+ iRet = strmSchedWrite(pThis, pThis->pIOBuf, pThis->iBufPtr, bFlushZip);
}
RETiRet;
@@ -1254,7 +1298,7 @@ strmFlush(strm_t *pThis)
if(pThis->bAsyncWrite)
d_pthread_mutex_lock(&pThis->mut);
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 1));
finalize_it:
if(pThis->bAsyncWrite)
@@ -1277,7 +1321,7 @@ static rsRetVal strmSeek(strm_t *pThis, off64_t offs)
if(pThis->fd == -1) {
CHKiRet(strmOpenFile(pThis));
} else {
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 0));
}
long long i;
DBGOPRINT((obj_t*) pThis, "file %d seek, pos %llu\n", pThis->fd, (long long unsigned) offs);
@@ -1320,7 +1364,7 @@ static rsRetVal strmWriteChar(strm_t *pThis, uchar c)
/* if the buffer is full, we need to flush before we can write */
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis));
+ CHKiRet(strmFlushInternal(pThis, 0));
}
/* we now always have space for one character, so we simply copy it */
*(pThis->pIOBuf + pThis->iBufPtr) = c;
@@ -1390,7 +1434,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
iOffset = 0;
do {
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
iWrite = pThis->sIOBufSize - pThis->iBufPtr; /* this fits in current buf */
if(iWrite > lenBuf)
@@ -1405,7 +1449,7 @@ strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
* write it. This seems more natural than waiting (hours?) for the next message...
*/
if(pThis->iBufPtr == pThis->sIOBufSize) {
- CHKiRet(strmFlushInternal(pThis)); /* get a new buffer for rest of data */
+ CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
finalize_it:
@@ -1433,6 +1477,7 @@ DEFpropSetMeth(strm, tOperationsMode, int)
DEFpropSetMeth(strm, tOpenMode, mode_t)
DEFpropSetMeth(strm, sType, strmType_t)
DEFpropSetMeth(strm, iZipLevel, int)
+DEFpropSetMeth(strm, bVeryReliableZip, int)
DEFpropSetMeth(strm, bSync, int)
DEFpropSetMeth(strm, sIOBufSize, size_t)
DEFpropSetMeth(strm, iSizeLimit, off_t)
@@ -1564,7 +1609,7 @@ static rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
ISOBJ_TYPE_assert(pThis, strm);
ISOBJ_TYPE_assert(pStrm, strm);
- strmFlushInternal(pThis);
+ strmFlushInternal(pThis, 0);
CHKiRet(obj.BeginSerialize(pStrm, (obj_t*) pThis));
objSerializeSCALAR(pStrm, iCurrFNum, INT);
@@ -1757,6 +1802,7 @@ CODESTARTobjQueryInterface(strm)
pIf->SettOpenMode = strmSettOpenMode;
pIf->SetsType = strmSetsType;
pIf->SetiZipLevel = strmSetiZipLevel;
+ pIf->SetbVeryReliableZip = strmSetbVeryReliableZip;
pIf->SetbSync = strmSetbSync;
pIf->SetsIOBufSize = strmSetsIOBufSize;
pIf->SetiSizeLimit = strmSetiSizeLimit;
diff --git a/runtime/stream.h b/runtime/stream.h
index a01929f2..fdfefaa3 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -124,6 +124,8 @@ typedef struct strm_s {
sbool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */
sbool bStopWriter; /* shall writer thread terminate? */
sbool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */
+ sbool bzInitDone; /* did we do an init of zstrm already? */
+ sbool bVeryReliableZip; /* shall we write interim headers to create a very reliable ZIP file? */
int iFlushInterval; /* flush in which interval - 0, no flushing */
pthread_mutex_t mut;/* mutex for flush in async mode */
pthread_cond_t notFull;
@@ -132,6 +134,7 @@ typedef struct strm_s {
unsigned short iEnq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
unsigned short iDeq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
short iCnt; /* current nbr of elements in buffer */
+ z_stream zstrm; /* zip stream to use */
struct {
uchar *pBuf;
size_t lenBuf;
@@ -181,8 +184,10 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*);
/* v6 added */
rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, int mode);
+ /* v7 added 2012-09-14 */
+ INTERFACEpropSetMeth(strm, bVeryReliableZip, int);
ENDinterface(strm)
-#define strmCURR_IF_VERSION 6 /* increment whenever you change the interface structure! */
+#define strmCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */
/* prototypes */