diff options
Diffstat (limited to 'tools')
-rw-r--r-- | tools/omfwd.c | 35 |
1 files changed, 27 insertions, 8 deletions
diff --git a/tools/omfwd.c b/tools/omfwd.c index a75f04c4..c7c3e32b 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -75,6 +75,10 @@ DEFobjCurrIf(netstrms) DEFobjCurrIf(netstrm) DEFobjCurrIf(tcpclt) +/* some local constants (just) for better readybility */ +#define IS_FLUSH 1 +#define NO_FLUSH 0 + typedef struct _instanceData { uchar *tplName; /* name of assigned template */ netstrms_t *pNS; /* netstream subsystem */ @@ -103,6 +107,7 @@ typedef struct _instanceData { /* all other settings are for stream-compression */ # define COMPRESS_STREAM_ALWAYS 2 uint8_t compressionMode; + sbool strmCompFlushOnTxEnd; /* flush stream compression on transaction end? */ sbool bzInitDone; /* did we do an init of zstrm already? */ z_stream zstrm; /* zip stream to use for tcp compression */ uchar sndBuf[16*1024]; /* this is intensionally fixed -- see no good reason to make configurable */ @@ -141,6 +146,7 @@ static struct cnfparamdescr actpdescr[] = { { "tcp_framing", eCmdHdlrGetWord, 0 }, { "ziplevel", eCmdHdlrInt, 0 }, { "compression.mode", eCmdHdlrGetWord, 0 }, + { "compression.stream.flushontxend", eCmdHdlrBinary, 0 }, { "rebindinterval", eCmdHdlrInt, 0 }, { "streamdriver", eCmdHdlrGetWord, 0 }, { "streamdrivermode", eCmdHdlrInt, 0 }, @@ -249,6 +255,7 @@ pData->bIsConnected = 0; // TODO: remove this variable altogether static inline void DestructTCPInstanceData(instanceData *pData) { +dbgprintf("DDDD: in DestructTCPInstanceData\n"); assert(pData != NULL); doZipFinish(pData); if(pData->pNetstrm != NULL) @@ -462,11 +469,12 @@ finalize_it: } static rsRetVal -TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len) +TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush) { int zRet; /* zlib return state */ unsigned outavail; uchar zipBuf[32*1024]; + int op; DEFiRet; if(!pData->bzInitDone) { @@ -486,12 +494,17 @@ TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len) /* now doing the compression */ pData->zstrm.next_in = (Bytef*) buf; pData->zstrm.avail_in = len; + if(pData->strmCompFlushOnTxEnd && bIsFlush) + op = Z_SYNC_FLUSH; + else + op = Z_NO_FLUSH; +dbgprintf("DDDD: op: %d (SYNC_FLUSH %d)\n", op, Z_SYNC_FLUSH); /* run deflate() on buffer until everything has been compressed */ do { - DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld\n", pData->zstrm.avail_in, pData->zstrm.total_in); + DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld, isFlush %d\n", pData->zstrm.avail_in, pData->zstrm.total_in, bIsFlush); pData->zstrm.avail_out = sizeof(zipBuf); pData->zstrm.next_out = zipBuf; - zRet = deflate(&pData->zstrm, Z_NO_FLUSH); /* no bad return value */ + zRet = deflate(&pData->zstrm, op); /* no bad return value */ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pData->zstrm.avail_out); outavail = sizeof(zipBuf) - pData->zstrm.avail_out; if(outavail != 0) { @@ -504,11 +517,11 @@ finalize_it: } static rsRetVal -TCPSendBuf(instanceData *pData, uchar *buf, unsigned len) +TCPSendBuf(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush) { DEFiRet; if(pData->compressionMode >= COMPRESS_STREAM_ALWAYS) - iRet = TCPSendBufCompressed(pData, buf, len); + iRet = TCPSendBufCompressed(pData, buf, len, bIsFlush); else iRet = TCPSendBufUncompressed(pData, buf, len); RETiRet; @@ -566,14 +579,14 @@ static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len) (unsigned) len, pData->offsSndBuf); if(pData->offsSndBuf != 0 && pData->offsSndBuf + len >= sizeof(pData->sndBuf)) { /* no buffer space left, need to commit previous records */ - CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf)); + CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, NO_FLUSH)); pData->offsSndBuf = 0; iRet = RS_RET_PREVIOUS_COMMITTED; } /* check if the message is too large to fit into buffer */ if(len > sizeof(pData->sndBuf)) { - CHKiRet(TCPSendBuf(pData, (uchar*)msg, len)); + CHKiRet(TCPSendBuf(pData, (uchar*)msg, len, NO_FLUSH)); ABORT_FINALIZE(RS_RET_OK); /* committed everything so far */ } @@ -777,6 +790,9 @@ CODESTARTdoAction DestructTCPInstanceData(pData); iRet = RS_RET_SUSPENDED; } + if(pData->compressionMode >= COMPRESS_STREAM_ALWAYS && pData->strmCompFlushOnTxEnd) + /* mimic not committed, as we need the EndTx entry point to be called */ + iRet = RS_RET_DEFER_COMMIT; } finalize_it: # ifdef USE_NETZIP @@ -789,7 +805,7 @@ BEGINendTransaction CODESTARTendTransaction dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf); if(pData->offsSndBuf != 0) { - iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf); + iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, IS_FLUSH); pData->offsSndBuf = 0; } ENDendTransaction @@ -855,6 +871,7 @@ setInstParamDefaults(instanceData *pData) pData->bResendLastOnRecon = 0; pData->pPermPeers = NULL; pData->compressionLevel = 9; + pData->strmCompFlushOnTxEnd = 1; pData->compressionMode = COMPRESS_NEVER; } @@ -978,6 +995,8 @@ CODESTARTnewActInst pData->bResendLastOnRecon = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "compression.stream.flushontxend")) { + pData->strmCompFlushOnTxEnd = (sbool) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "compression.mode")) { cstr = es_str2cstr(pvals[i].val.d.estr, NULL); if(!strcasecmp(cstr, "stream:always")) { |