summaryrefslogtreecommitdiffstats
path: root/tools/omfwd.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/omfwd.c')
-rw-r--r--tools/omfwd.c100
1 files changed, 93 insertions, 7 deletions
diff --git a/tools/omfwd.c b/tools/omfwd.c
index 129392d2..61a5d57b 100644
--- a/tools/omfwd.c
+++ b/tools/omfwd.c
@@ -97,6 +97,8 @@ typedef struct _instanceData {
TCPFRAMINGMODE tcp_framing;
int bResendLastOnRecon; /* should the last message be re-sent on a successful reconnect? */
tcpclt_t *pTCPClt; /* our tcpclt object */
+ sbool bzInitDone; /* did we do an init of zstrm already? */
+ z_stream zstrm; /* zip stream to use for tcp compression */
uchar sndBuf[16*1024]; /* this is intensionally fixed -- see no good reason to make configurable */
unsigned offsSndBuf; /* next free spot in send buffer */
} instanceData;
@@ -169,6 +171,7 @@ ENDinitConfVars
static rsRetVal doTryResume(instanceData *pData);
+static rsRetVal doZipFinish(instanceData *pData);
/* this function gets the default template. It coordinates action between
* old-style and new-style configuration parts.
@@ -240,6 +243,7 @@ static inline void
DestructTCPInstanceData(instanceData *pData)
{
assert(pData != NULL);
+ doZipFinish(pData);
if(pData->pNetstrm != NULL)
netstrm.Destruct(&pData->pNetstrm);
if(pData->pNS != NULL)
@@ -423,14 +427,8 @@ finalize_it:
/* CODE FOR SENDING TCP MESSAGES */
-
-/* Send a buffer via TCP. Usually, this is used to send the current
- * send buffer, but if a message is larger than the buffer, we need to
- * have the capability to send the message buffer directly.
- * rgerhards, 2011-04-04
- */
static rsRetVal
-TCPSendBuf(instanceData *pData, uchar *buf, unsigned len)
+TCPSendUncompressedBuf(instanceData *pData, uchar *buf, unsigned len)
{
DEFiRet;
unsigned alreadySent;
@@ -438,6 +436,7 @@ TCPSendBuf(instanceData *pData, uchar *buf, unsigned len)
alreadySent = 0;
CHKiRet(netstrm.CheckConnection(pData->pNetstrm)); /* hack for plain tcp syslog - see ptcp driver for details */
+
while(alreadySent != len) {
lenSend = len - alreadySent;
CHKiRet(netstrm.Send(pData->pNetstrm, buf+alreadySent, &lenSend));
@@ -455,6 +454,93 @@ finalize_it:
RETiRet;
}
+static rsRetVal
+TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len)
+{
+ int zRet; /* zlib return state */
+ unsigned outavail;
+ uchar zipBuf[32*1024];
+ DEFiRet;
+
+ if(!pData->bzInitDone) {
+ /* allocate deflate state */
+ pData->zstrm.zalloc = Z_NULL;
+ pData->zstrm.zfree = Z_NULL;
+ pData->zstrm.opaque = Z_NULL;
+ /* see note in file header for the params we use with deflateInit2() */
+ zRet = deflateInit(&pData->zstrm, 9);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateInit()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+ pData->bzInitDone = RSTRUE;
+ }
+
+ /* now doing the compression */
+ pData->zstrm.next_in = (Bytef*) buf;
+ pData->zstrm.avail_in = len;
+ /* run deflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld\n", pData->zstrm.avail_in, pData->zstrm.total_in);
+ pData->zstrm.avail_out = sizeof(zipBuf);
+ pData->zstrm.next_out = zipBuf;
+ zRet = deflate(&pData->zstrm, Z_NO_FLUSH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pData->zstrm.avail_out);
+ outavail = sizeof(zipBuf) - pData->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(TCPSendUncompressedBuf(pData, zipBuf, outavail));
+ }
+ } while (pData->zstrm.avail_out == 0);
+
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+TCPSendBuf(instanceData *pData, uchar *buf, unsigned len)
+{
+ return TCPSendBufCompressed(pData, buf, len);
+}
+
+/* finish zlib buffer, to be called before closing the ZIP file (if
+ * running in stream mode).
+ */
+static rsRetVal
+doZipFinish(instanceData *pData)
+{
+ int zRet; /* zlib return state */
+ DEFiRet;
+ unsigned outavail;
+ uchar zipBuf[32*1024];
+
+ if(!pData->bzInitDone)
+ goto done;
+
+dbgprintf("DDDD: in doZipFinish()\n");
+ pData->zstrm.avail_in = 0;
+ /* run deflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pData->zstrm.avail_in, pData->zstrm.total_in);
+ pData->zstrm.avail_out = sizeof(zipBuf);
+ pData->zstrm.next_out = zipBuf;
+ zRet = deflate(&pData->zstrm, Z_FINISH); /* no bad return value */
+ DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pData->zstrm.avail_out);
+ outavail = sizeof(zipBuf) - pData->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(TCPSendUncompressedBuf(pData, zipBuf, outavail));
+ }
+ } while (pData->zstrm.avail_out == 0);
+
+finalize_it:
+ zRet = deflateEnd(&pData->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
+ }
+
+ pData->bzInitDone = 0;
+done: RETiRet;
+}
+
/* Add frame to send buffer (or send, if requried)
*/