summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rw-r--r--tools/omfwd.c35
1 files changed, 27 insertions, 8 deletions
diff --git a/tools/omfwd.c b/tools/omfwd.c
index a75f04c4..c7c3e32b 100644
--- a/tools/omfwd.c
+++ b/tools/omfwd.c
@@ -75,6 +75,10 @@ DEFobjCurrIf(netstrms)
DEFobjCurrIf(netstrm)
DEFobjCurrIf(tcpclt)
+/* some local constants (just) for better readybility */
+#define IS_FLUSH 1
+#define NO_FLUSH 0
+
typedef struct _instanceData {
uchar *tplName; /* name of assigned template */
netstrms_t *pNS; /* netstream subsystem */
@@ -103,6 +107,7 @@ typedef struct _instanceData {
/* all other settings are for stream-compression */
# define COMPRESS_STREAM_ALWAYS 2
uint8_t compressionMode;
+ sbool strmCompFlushOnTxEnd; /* flush stream compression on transaction end? */
sbool bzInitDone; /* did we do an init of zstrm already? */
z_stream zstrm; /* zip stream to use for tcp compression */
uchar sndBuf[16*1024]; /* this is intensionally fixed -- see no good reason to make configurable */
@@ -141,6 +146,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "tcp_framing", eCmdHdlrGetWord, 0 },
{ "ziplevel", eCmdHdlrInt, 0 },
{ "compression.mode", eCmdHdlrGetWord, 0 },
+ { "compression.stream.flushontxend", eCmdHdlrBinary, 0 },
{ "rebindinterval", eCmdHdlrInt, 0 },
{ "streamdriver", eCmdHdlrGetWord, 0 },
{ "streamdrivermode", eCmdHdlrInt, 0 },
@@ -249,6 +255,7 @@ pData->bIsConnected = 0; // TODO: remove this variable altogether
static inline void
DestructTCPInstanceData(instanceData *pData)
{
+dbgprintf("DDDD: in DestructTCPInstanceData\n");
assert(pData != NULL);
doZipFinish(pData);
if(pData->pNetstrm != NULL)
@@ -462,11 +469,12 @@ finalize_it:
}
static rsRetVal
-TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len)
+TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush)
{
int zRet; /* zlib return state */
unsigned outavail;
uchar zipBuf[32*1024];
+ int op;
DEFiRet;
if(!pData->bzInitDone) {
@@ -486,12 +494,17 @@ TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len)
/* now doing the compression */
pData->zstrm.next_in = (Bytef*) buf;
pData->zstrm.avail_in = len;
+ if(pData->strmCompFlushOnTxEnd && bIsFlush)
+ op = Z_SYNC_FLUSH;
+ else
+ op = Z_NO_FLUSH;
+dbgprintf("DDDD: op: %d (SYNC_FLUSH %d)\n", op, Z_SYNC_FLUSH);
/* run deflate() on buffer until everything has been compressed */
do {
- DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld\n", pData->zstrm.avail_in, pData->zstrm.total_in);
+ DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld, isFlush %d\n", pData->zstrm.avail_in, pData->zstrm.total_in, bIsFlush);
pData->zstrm.avail_out = sizeof(zipBuf);
pData->zstrm.next_out = zipBuf;
- zRet = deflate(&pData->zstrm, Z_NO_FLUSH); /* no bad return value */
+ zRet = deflate(&pData->zstrm, op); /* no bad return value */
DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pData->zstrm.avail_out);
outavail = sizeof(zipBuf) - pData->zstrm.avail_out;
if(outavail != 0) {
@@ -504,11 +517,11 @@ finalize_it:
}
static rsRetVal
-TCPSendBuf(instanceData *pData, uchar *buf, unsigned len)
+TCPSendBuf(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush)
{
DEFiRet;
if(pData->compressionMode >= COMPRESS_STREAM_ALWAYS)
- iRet = TCPSendBufCompressed(pData, buf, len);
+ iRet = TCPSendBufCompressed(pData, buf, len, bIsFlush);
else
iRet = TCPSendBufUncompressed(pData, buf, len);
RETiRet;
@@ -566,14 +579,14 @@ static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len)
(unsigned) len, pData->offsSndBuf);
if(pData->offsSndBuf != 0 && pData->offsSndBuf + len >= sizeof(pData->sndBuf)) {
/* no buffer space left, need to commit previous records */
- CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf));
+ CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, NO_FLUSH));
pData->offsSndBuf = 0;
iRet = RS_RET_PREVIOUS_COMMITTED;
}
/* check if the message is too large to fit into buffer */
if(len > sizeof(pData->sndBuf)) {
- CHKiRet(TCPSendBuf(pData, (uchar*)msg, len));
+ CHKiRet(TCPSendBuf(pData, (uchar*)msg, len, NO_FLUSH));
ABORT_FINALIZE(RS_RET_OK); /* committed everything so far */
}
@@ -777,6 +790,9 @@ CODESTARTdoAction
DestructTCPInstanceData(pData);
iRet = RS_RET_SUSPENDED;
}
+ if(pData->compressionMode >= COMPRESS_STREAM_ALWAYS && pData->strmCompFlushOnTxEnd)
+ /* mimic not committed, as we need the EndTx entry point to be called */
+ iRet = RS_RET_DEFER_COMMIT;
}
finalize_it:
# ifdef USE_NETZIP
@@ -789,7 +805,7 @@ BEGINendTransaction
CODESTARTendTransaction
dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf);
if(pData->offsSndBuf != 0) {
- iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf);
+ iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, IS_FLUSH);
pData->offsSndBuf = 0;
}
ENDendTransaction
@@ -855,6 +871,7 @@ setInstParamDefaults(instanceData *pData)
pData->bResendLastOnRecon = 0;
pData->pPermPeers = NULL;
pData->compressionLevel = 9;
+ pData->strmCompFlushOnTxEnd = 1;
pData->compressionMode = COMPRESS_NEVER;
}
@@ -978,6 +995,8 @@ CODESTARTnewActInst
pData->bResendLastOnRecon = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "compression.stream.flushontxend")) {
+ pData->strmCompFlushOnTxEnd = (sbool) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "compression.mode")) {
cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
if(!strcasecmp(cstr, "stream:always")) {