diff options
-rw-r--r-- | doc/omfwd.html | 18 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 13 | ||||
-rw-r--r-- | tools/omfwd.c | 35 |
3 files changed, 56 insertions, 10 deletions
diff --git a/doc/omfwd.html b/doc/omfwd.html index 51aa58b5..85d3aad8 100644 --- a/doc/omfwd.html +++ b/doc/omfwd.html @@ -43,7 +43,7 @@ compression mode, so pre 7.5.1 configuration will continue to work as expected. <br>The compression level is specified via the usual factor of 0 to 9, with 9 being the strongest compression (taking up most processing time) and 0 being no compression at all (taking up no extra processing time). <br></li><br> - <li><b>compression.mode</b><i>mode</i><br> + <li><b>compression.mode</b> <i>mode</i><br> <i>mode</i> is one of "none", "single", or "stream:always". The default is "none", in which no compression happens at all. <br>In "single" compression mode, Rsyslog implements a proprietary @@ -68,6 +68,22 @@ data.</b> <br></li><br> + <li><b>compression.stream.flushOnTXEnd</b> <i>[<b>on</b>/off</i>] (requires 7.5.3+)<br> + This setting affects stream compression mode, only. If enabled (the default), the compression + buffer will by emptied at the end of a rsyslog batch. If set to "off", + end of batch will not affect compression at all.<br> + While setting it to "off" can potentially greatly improve compression + ratio, it will also introduce severe delay between when a message is being processed + by rsyslog and actually sent out to the network. We have seen cases where for + several thousand message not a single byte was sent. This is good in the sense that + it can happen only if we have a great compression ratio. This is most probably + a very good mode for busy machines which will process several thousand messages + per second and te resulting short delay will not pose any problems. However, + the default is more conservative, while it works more "naturally" with even low + message traffic. Even in flush mode, notable compression should be achivable + (but we do not yet have practice reports on actual compression ratios). + <br></li><br> + <li><strong>RebindInterval </strong>integer<br> Permits to specify an interval at which the current connection is broken and re-established. This setting is primarily an aid to load balancers. After the configured number of messages has been transmitted, the current connection is terminated and a new one started. Note that this setting applies to both TCP and UDP traffic. For UDP, the new ``connection'' uses a different source port (ports are cycled and not reused too frequently). This usually is perceived as a ``new connection'' by load balancers, which in turn forward messages to another physical target system. <br></li><br> diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 1fecaee5..a568e5fe 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -830,6 +830,12 @@ DataRcvdUncompressed(ptcpsess_t *pThis, char *pData, size_t iLen, struct syslogT assert(pData != NULL); assert(iLen > 0); +{size_t i; +dbgprintf("Data received(%u): '", (unsigned) iLen); +for(i=0;i<iLen;++i) + dbgprintf("%c", pData[i]); +dbgprintf("'\n"); +} if(ttGenTime == 0) datetime.getCurrTime(stTime, &ttGenTime); multiSub.ppMsgs = pMsgs; @@ -861,12 +867,16 @@ DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len) // TODO: can we do stats counters? Even if they are not 100% correct under all cases, // by simply updating the input and output sizes? uint64_t outtotal; +dbgprintf("DDDD: in DataRcvdCompressed, init done %d\n", pThis->bzInitDone); datetime.getCurrTime(&stTime, &ttGenTime); outtotal = 0; if(!pThis->bzInitDone) { +dbgprintf("DDDD; inside zlib init code\n"); /* allocate deflate state */ + pThis->zstrm.next_in = (Bytef*) buf; + pThis->zstrm.avail_in = 0; pThis->zstrm.zalloc = Z_NULL; pThis->zstrm.zfree = Z_NULL; pThis->zstrm.opaque = Z_NULL; @@ -885,7 +895,8 @@ DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len) DBGPRINTF("imptcp: in inflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in); pThis->zstrm.avail_out = sizeof(zipBuf); pThis->zstrm.next_out = zipBuf; - zRet = inflate(&pThis->zstrm, Z_NO_FLUSH); /* no bad return value */ + zRet = inflate(&pThis->zstrm, Z_SYNC_FLUSH); /* no bad return value */ + //zRet = inflate(&pThis->zstrm, Z_NO_FLUSH); /* no bad return value */ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out); outavail = sizeof(zipBuf) - pThis->zstrm.avail_out; if(outavail != 0) { diff --git a/tools/omfwd.c b/tools/omfwd.c index a75f04c4..c7c3e32b 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -75,6 +75,10 @@ DEFobjCurrIf(netstrms) DEFobjCurrIf(netstrm) DEFobjCurrIf(tcpclt) +/* some local constants (just) for better readybility */ +#define IS_FLUSH 1 +#define NO_FLUSH 0 + typedef struct _instanceData { uchar *tplName; /* name of assigned template */ netstrms_t *pNS; /* netstream subsystem */ @@ -103,6 +107,7 @@ typedef struct _instanceData { /* all other settings are for stream-compression */ # define COMPRESS_STREAM_ALWAYS 2 uint8_t compressionMode; + sbool strmCompFlushOnTxEnd; /* flush stream compression on transaction end? */ sbool bzInitDone; /* did we do an init of zstrm already? */ z_stream zstrm; /* zip stream to use for tcp compression */ uchar sndBuf[16*1024]; /* this is intensionally fixed -- see no good reason to make configurable */ @@ -141,6 +146,7 @@ static struct cnfparamdescr actpdescr[] = { { "tcp_framing", eCmdHdlrGetWord, 0 }, { "ziplevel", eCmdHdlrInt, 0 }, { "compression.mode", eCmdHdlrGetWord, 0 }, + { "compression.stream.flushontxend", eCmdHdlrBinary, 0 }, { "rebindinterval", eCmdHdlrInt, 0 }, { "streamdriver", eCmdHdlrGetWord, 0 }, { "streamdrivermode", eCmdHdlrInt, 0 }, @@ -249,6 +255,7 @@ pData->bIsConnected = 0; // TODO: remove this variable altogether static inline void DestructTCPInstanceData(instanceData *pData) { +dbgprintf("DDDD: in DestructTCPInstanceData\n"); assert(pData != NULL); doZipFinish(pData); if(pData->pNetstrm != NULL) @@ -462,11 +469,12 @@ finalize_it: } static rsRetVal -TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len) +TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush) { int zRet; /* zlib return state */ unsigned outavail; uchar zipBuf[32*1024]; + int op; DEFiRet; if(!pData->bzInitDone) { @@ -486,12 +494,17 @@ TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len) /* now doing the compression */ pData->zstrm.next_in = (Bytef*) buf; pData->zstrm.avail_in = len; + if(pData->strmCompFlushOnTxEnd && bIsFlush) + op = Z_SYNC_FLUSH; + else + op = Z_NO_FLUSH; +dbgprintf("DDDD: op: %d (SYNC_FLUSH %d)\n", op, Z_SYNC_FLUSH); /* run deflate() on buffer until everything has been compressed */ do { - DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld\n", pData->zstrm.avail_in, pData->zstrm.total_in); + DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld, isFlush %d\n", pData->zstrm.avail_in, pData->zstrm.total_in, bIsFlush); pData->zstrm.avail_out = sizeof(zipBuf); pData->zstrm.next_out = zipBuf; - zRet = deflate(&pData->zstrm, Z_NO_FLUSH); /* no bad return value */ + zRet = deflate(&pData->zstrm, op); /* no bad return value */ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pData->zstrm.avail_out); outavail = sizeof(zipBuf) - pData->zstrm.avail_out; if(outavail != 0) { @@ -504,11 +517,11 @@ finalize_it: } static rsRetVal -TCPSendBuf(instanceData *pData, uchar *buf, unsigned len) +TCPSendBuf(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush) { DEFiRet; if(pData->compressionMode >= COMPRESS_STREAM_ALWAYS) - iRet = TCPSendBufCompressed(pData, buf, len); + iRet = TCPSendBufCompressed(pData, buf, len, bIsFlush); else iRet = TCPSendBufUncompressed(pData, buf, len); RETiRet; @@ -566,14 +579,14 @@ static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len) (unsigned) len, pData->offsSndBuf); if(pData->offsSndBuf != 0 && pData->offsSndBuf + len >= sizeof(pData->sndBuf)) { /* no buffer space left, need to commit previous records */ - CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf)); + CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, NO_FLUSH)); pData->offsSndBuf = 0; iRet = RS_RET_PREVIOUS_COMMITTED; } /* check if the message is too large to fit into buffer */ if(len > sizeof(pData->sndBuf)) { - CHKiRet(TCPSendBuf(pData, (uchar*)msg, len)); + CHKiRet(TCPSendBuf(pData, (uchar*)msg, len, NO_FLUSH)); ABORT_FINALIZE(RS_RET_OK); /* committed everything so far */ } @@ -777,6 +790,9 @@ CODESTARTdoAction DestructTCPInstanceData(pData); iRet = RS_RET_SUSPENDED; } + if(pData->compressionMode >= COMPRESS_STREAM_ALWAYS && pData->strmCompFlushOnTxEnd) + /* mimic not committed, as we need the EndTx entry point to be called */ + iRet = RS_RET_DEFER_COMMIT; } finalize_it: # ifdef USE_NETZIP @@ -789,7 +805,7 @@ BEGINendTransaction CODESTARTendTransaction dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf); if(pData->offsSndBuf != 0) { - iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); + iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, IS_FLUSH); pData->offsSndBuf = 0; } ENDendTransaction @@ -855,6 +871,7 @@ setInstParamDefaults(instanceData *pData) pData->bResendLastOnRecon = 0; pData->pPermPeers = NULL; pData->compressionLevel = 9; + pData->strmCompFlushOnTxEnd = 1; pData->compressionMode = COMPRESS_NEVER; } @@ -978,6 +995,8 @@ CODESTARTnewActInst pData->bResendLastOnRecon = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "compression.stream.flushontxend")) { + pData->strmCompFlushOnTxEnd = (sbool) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "compression.mode")) { cstr = es_str2cstr(pvals[i].val.d.estr, NULL); if(!strcasecmp(cstr, "stream:always")) { |