summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/omfwd.html18
-rw-r--r--plugins/imptcp/imptcp.c13
-rw-r--r--tools/omfwd.c35
3 files changed, 56 insertions, 10 deletions
diff --git a/doc/omfwd.html b/doc/omfwd.html
index 51aa58b5..85d3aad8 100644
--- a/doc/omfwd.html
+++ b/doc/omfwd.html
@@ -43,7 +43,7 @@
compression mode, so pre 7.5.1 configuration will continue to work
as expected.
<br>The compression level is specified via the usual factor of 0 to 9, with 9 being the strongest compression (taking up most processing time) and 0 being no compression at all (taking up no extra processing time). <br></li><br>
- <li><b>compression.mode</b><i>mode</i><br>
+ <li><b>compression.mode</b> <i>mode</i><br>
<i>mode</i> is one of "none", "single", or "stream:always". The
default is "none", in which no compression happens at all.
<br>In "single" compression mode, Rsyslog implements a proprietary
@@ -68,6 +68,22 @@
data.</b>
<br></li><br>
+ <li><b>compression.stream.flushOnTXEnd</b> <i>[<b>on</b>/off</i>] (requires 7.5.3+)<br>
+ This setting affects stream compression mode, only. If enabled (the default), the compression
+ buffer will by emptied at the end of a rsyslog batch. If set to "off",
+ end of batch will not affect compression at all.<br>
+ While setting it to "off" can potentially greatly improve compression
+ ratio, it will also introduce severe delay between when a message is being processed
+ by rsyslog and actually sent out to the network. We have seen cases where for
+ several thousand message not a single byte was sent. This is good in the sense that
+ it can happen only if we have a great compression ratio. This is most probably
+ a very good mode for busy machines which will process several thousand messages
+ per second and te resulting short delay will not pose any problems. However,
+ the default is more conservative, while it works more "naturally" with even low
+ message traffic. Even in flush mode, notable compression should be achivable
+ (but we do not yet have practice reports on actual compression ratios).
+ <br></li><br>
+
<li><strong>RebindInterval </strong>integer<br>
Permits to specify an interval at which the current connection is broken and re-established. This setting is primarily an aid to load balancers. After the configured number of messages has been transmitted, the current connection is terminated and a new one started. Note that this setting applies to both TCP and UDP traffic. For UDP, the new ``connection'' uses a different source port (ports are cycled and not reused too frequently). This usually is perceived as a ``new connection'' by load balancers, which in turn forward messages to another physical target system. <br></li><br>
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 1fecaee5..a568e5fe 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -830,6 +830,12 @@ DataRcvdUncompressed(ptcpsess_t *pThis, char *pData, size_t iLen, struct syslogT
assert(pData != NULL);
assert(iLen > 0);
+{size_t i;
+dbgprintf("Data received(%u): '", (unsigned) iLen);
+for(i=0;i<iLen;++i)
+ dbgprintf("%c", pData[i]);
+dbgprintf("'\n");
+}
if(ttGenTime == 0)
datetime.getCurrTime(stTime, &ttGenTime);
multiSub.ppMsgs = pMsgs;
@@ -861,12 +867,16 @@ DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len)
// TODO: can we do stats counters? Even if they are not 100% correct under all cases,
// by simply updating the input and output sizes?
uint64_t outtotal;
+dbgprintf("DDDD: in DataRcvdCompressed, init done %d\n", pThis->bzInitDone);
datetime.getCurrTime(&stTime, &ttGenTime);
outtotal = 0;
if(!pThis->bzInitDone) {
+dbgprintf("DDDD; inside zlib init code\n");
/* allocate deflate state */
+ pThis->zstrm.next_in = (Bytef*) buf;
+ pThis->zstrm.avail_in = 0;
pThis->zstrm.zalloc = Z_NULL;
pThis->zstrm.zfree = Z_NULL;
pThis->zstrm.opaque = Z_NULL;
@@ -885,7 +895,8 @@ DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len)
DBGPRINTF("imptcp: in inflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
pThis->zstrm.avail_out = sizeof(zipBuf);
pThis->zstrm.next_out = zipBuf;
- zRet = inflate(&pThis->zstrm, Z_NO_FLUSH); /* no bad return value */
+ zRet = inflate(&pThis->zstrm, Z_SYNC_FLUSH); /* no bad return value */
+ //zRet = inflate(&pThis->zstrm, Z_NO_FLUSH); /* no bad return value */
DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
outavail = sizeof(zipBuf) - pThis->zstrm.avail_out;
if(outavail != 0) {
diff --git a/tools/omfwd.c b/tools/omfwd.c
index a75f04c4..c7c3e32b 100644
--- a/tools/omfwd.c
+++ b/tools/omfwd.c
@@ -75,6 +75,10 @@ DEFobjCurrIf(netstrms)
DEFobjCurrIf(netstrm)
DEFobjCurrIf(tcpclt)
+/* some local constants (just) for better readybility */
+#define IS_FLUSH 1
+#define NO_FLUSH 0
+
typedef struct _instanceData {
uchar *tplName; /* name of assigned template */
netstrms_t *pNS; /* netstream subsystem */
@@ -103,6 +107,7 @@ typedef struct _instanceData {
/* all other settings are for stream-compression */
# define COMPRESS_STREAM_ALWAYS 2
uint8_t compressionMode;
+ sbool strmCompFlushOnTxEnd; /* flush stream compression on transaction end? */
sbool bzInitDone; /* did we do an init of zstrm already? */
z_stream zstrm; /* zip stream to use for tcp compression */
uchar sndBuf[16*1024]; /* this is intensionally fixed -- see no good reason to make configurable */
@@ -141,6 +146,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "tcp_framing", eCmdHdlrGetWord, 0 },
{ "ziplevel", eCmdHdlrInt, 0 },
{ "compression.mode", eCmdHdlrGetWord, 0 },
+ { "compression.stream.flushontxend", eCmdHdlrBinary, 0 },
{ "rebindinterval", eCmdHdlrInt, 0 },
{ "streamdriver", eCmdHdlrGetWord, 0 },
{ "streamdrivermode", eCmdHdlrInt, 0 },
@@ -249,6 +255,7 @@ pData->bIsConnected = 0; // TODO: remove this variable altogether
static inline void
DestructTCPInstanceData(instanceData *pData)
{
+dbgprintf("DDDD: in DestructTCPInstanceData\n");
assert(pData != NULL);
doZipFinish(pData);
if(pData->pNetstrm != NULL)
@@ -462,11 +469,12 @@ finalize_it:
}
static rsRetVal
-TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len)
+TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush)
{
int zRet; /* zlib return state */
unsigned outavail;
uchar zipBuf[32*1024];
+ int op;
DEFiRet;
if(!pData->bzInitDone) {
@@ -486,12 +494,17 @@ TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len)
/* now doing the compression */
pData->zstrm.next_in = (Bytef*) buf;
pData->zstrm.avail_in = len;
+ if(pData->strmCompFlushOnTxEnd && bIsFlush)
+ op = Z_SYNC_FLUSH;
+ else
+ op = Z_NO_FLUSH;
+dbgprintf("DDDD: op: %d (SYNC_FLUSH %d)\n", op, Z_SYNC_FLUSH);
/* run deflate() on buffer until everything has been compressed */
do {
- DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld\n", pData->zstrm.avail_in, pData->zstrm.total_in);
+ DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld, isFlush %d\n", pData->zstrm.avail_in, pData->zstrm.total_in, bIsFlush);
pData->zstrm.avail_out = sizeof(zipBuf);
pData->zstrm.next_out = zipBuf;
- zRet = deflate(&pData->zstrm, Z_NO_FLUSH); /* no bad return value */
+ zRet = deflate(&pData->zstrm, op); /* no bad return value */
DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pData->zstrm.avail_out);
outavail = sizeof(zipBuf) - pData->zstrm.avail_out;
if(outavail != 0) {
@@ -504,11 +517,11 @@ finalize_it:
}
static rsRetVal
-TCPSendBuf(instanceData *pData, uchar *buf, unsigned len)
+TCPSendBuf(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush)
{
DEFiRet;
if(pData->compressionMode >= COMPRESS_STREAM_ALWAYS)
- iRet = TCPSendBufCompressed(pData, buf, len);
+ iRet = TCPSendBufCompressed(pData, buf, len, bIsFlush);
else
iRet = TCPSendBufUncompressed(pData, buf, len);
RETiRet;
@@ -566,14 +579,14 @@ static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len)
(unsigned) len, pData->offsSndBuf);
if(pData->offsSndBuf != 0 && pData->offsSndBuf + len >= sizeof(pData->sndBuf)) {
/* no buffer space left, need to commit previous records */
- CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf));
+ CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, NO_FLUSH));
pData->offsSndBuf = 0;
iRet = RS_RET_PREVIOUS_COMMITTED;
}
/* check if the message is too large to fit into buffer */
if(len > sizeof(pData->sndBuf)) {
- CHKiRet(TCPSendBuf(pData, (uchar*)msg, len));
+ CHKiRet(TCPSendBuf(pData, (uchar*)msg, len, NO_FLUSH));
ABORT_FINALIZE(RS_RET_OK); /* committed everything so far */
}
@@ -777,6 +790,9 @@ CODESTARTdoAction
DestructTCPInstanceData(pData);
iRet = RS_RET_SUSPENDED;
}
+ if(pData->compressionMode >= COMPRESS_STREAM_ALWAYS && pData->strmCompFlushOnTxEnd)
+ /* mimic not committed, as we need the EndTx entry point to be called */
+ iRet = RS_RET_DEFER_COMMIT;
}
finalize_it:
# ifdef USE_NETZIP
@@ -789,7 +805,7 @@ BEGINendTransaction
CODESTARTendTransaction
dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf);
if(pData->offsSndBuf != 0) {
- iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf);
+ iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, IS_FLUSH);
pData->offsSndBuf = 0;
}
ENDendTransaction
@@ -855,6 +871,7 @@ setInstParamDefaults(instanceData *pData)
pData->bResendLastOnRecon = 0;
pData->pPermPeers = NULL;
pData->compressionLevel = 9;
+ pData->strmCompFlushOnTxEnd = 1;
pData->compressionMode = COMPRESS_NEVER;
}
@@ -978,6 +995,8 @@ CODESTARTnewActInst
pData->bResendLastOnRecon = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "compression.stream.flushontxend")) {
+ pData->strmCompFlushOnTxEnd = (sbool) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "compression.mode")) {
cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
if(!strcasecmp(cstr, "stream:always")) {