summaryrefslogtreecommitdiffstats
path: root/plugins/imptcp/imptcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r--plugins/imptcp/imptcp.c124
1 files changed, 114 insertions, 10 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 5c8bb67a..7481611d 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -50,6 +50,7 @@
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/tcp.h>
+#include <zlib.h>
#if HAVE_FCNTL_H
#include <fcntl.h>
#endif
@@ -207,11 +208,12 @@ struct ptcpsrv_s {
* includes support for doubly-linked list.
*/
struct ptcpsess_s {
-// ptcpsrv_t *pSrv; /* our server TODO: check remove! */
ptcplstn_t *pLstn; /* our listener */
ptcpsess_t *prev, *next;
int sock;
epolld_t *epd;
+ sbool bzInitDone; /* did we do an init of zstrm already? */
+ z_stream zstrm; /* zip stream to use for tcp compression */
//--- from tcps_sess.h
int iMsg; /* index of next char to store in msg */
int bAtStrtOfFram; /* are we at the very beginning of a new frame? */
@@ -806,19 +808,19 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG
* EXTRACT from tcps_sess.c
*/
static rsRetVal
-DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
+DataRcvdUncompressed(ptcpsess_t *pThis, char *pData, size_t iLen, time_t ttGenTime)
{
multi_submit_t multiSub;
msg_t *pMsgs[CONF_NUM_MULTISUB];
struct syslogTime stTime;
- time_t ttGenTime;
char *pEnd;
DEFiRet;
assert(pData != NULL);
assert(iLen > 0);
- datetime.getCurrTime(&stTime, &ttGenTime);
+ if(ttGenTime == 0)
+ datetime.getCurrTime(&stTime, &ttGenTime);
multiSub.ppMsgs = pMsgs;
multiSub.maxElem = CONF_NUM_MULTISUB;
multiSub.nElem = 0;
@@ -836,6 +838,65 @@ finalize_it:
RETiRet;
}
+static rsRetVal
+DataRcvdCompress(ptcpsess_t *pThis, char *buf, size_t len)
+{
+ struct syslogTime stTime;
+ time_t ttGenTime;
+ int zRet; /* zlib return state */
+ unsigned outavail;
+ uchar zipBuf[64*1024]; // TODO: alloc on heap, and much larger (512KiB? batch size!)
+ DEFiRet;
+ // TODO: can we do stats counters? Even if they are not 100% correct under all cases,
+ // by simply updating the input and output sizes?
+ uint64_t outtotal;
+
+ assert(iLen > 0);
+
+ datetime.getCurrTime(&stTime, &ttGenTime);
+ outtotal = 0;
+
+ if(!pThis->bzInitDone) {
+ /* allocate deflate state */
+ pThis->zstrm.zalloc = Z_NULL;
+ pThis->zstrm.zfree = Z_NULL;
+ pThis->zstrm.opaque = Z_NULL;
+ zRet = inflateInit(&pThis->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("imptcp: error %d returned from zlib/inflateInit()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+ pThis->bzInitDone = RSTRUE;
+ }
+
+ /* now doing the compression */
+ pThis->zstrm.next_in = (Bytef*) buf;
+ pThis->zstrm.avail_in = len;
+ /* run deflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("imptcp: in inflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = sizeof(zipBuf);
+ pThis->zstrm.next_out = zipBuf;
+ zRet = inflate(&pThis->zstrm, Z_NO_FLUSH); /* no bad return value */
+ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail = sizeof(zipBuf) - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ outtotal += outavail;
+ CHKiRet(DataRcvdUncompressed(pThis, (char*)zipBuf, outavail, ttGenTime));
+ }
+ } while (pThis->zstrm.avail_out == 0);
+
+ dbgprintf("end of DataRcvCompress, sizes: in %lld, out %llu\n", (long long) len, outtotal);
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
+{
+ return DataRcvdCompress(pThis, pData, iLen);
+}
+
/****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/
@@ -988,6 +1049,42 @@ finalize_it:
}
+/* finish zlib buffer, to be called before closing the session.
+ */
+static rsRetVal
+doZipFinish(ptcpsess_t *pSess)
+{
+ int zRet; /* zlib return state */
+ DEFiRet;
+ unsigned outavail;
+ uchar zipBuf[32*1024]; // TODO: use "global" one from pSess
+
+ if(!pSess->bzInitDone)
+ goto done;
+
+ pSess->zstrm.avail_in = 0;
+ /* run inflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("in inflate() loop, avail_in %d, total_in %ld\n", pSess->zstrm.avail_in, pSess->zstrm.total_in);
+ pSess->zstrm.avail_out = sizeof(zipBuf);
+ pSess->zstrm.next_out = zipBuf;
+ zRet = inflate(&pSess->zstrm, Z_FINISH); /* no bad return value */
+ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pSess->zstrm.avail_out);
+ outavail = sizeof(zipBuf) - pSess->zstrm.avail_out;
+ if(outavail != 0) {
+ CHKiRet(DataRcvdUncompressed(pSess, (char*)zipBuf, outavail, 0)); // TODO: query time!
+ }
+ } while (pSess->zstrm.avail_out == 0);
+
+finalize_it:
+ zRet = deflateEnd(&pSess->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet);
+ }
+
+ pSess->bzInitDone = 0;
+done: RETiRet;
+}
/* close/remove a session
* NOTE: we must first remove the fd from the epoll set and then close it -- else we
* get an error "bad file descriptor" from epoll.
@@ -998,6 +1095,7 @@ closeSess(ptcpsess_t *pSess)
int sock;
DEFiRet;
+ doZipFinish(pSess);
sock = pSess->sock;
CHKiRet(removeEPollSock(sock, pSess->epd));
close(sock);
@@ -1269,6 +1367,10 @@ sessActivity(ptcpsess_t *pSess)
{
int lenRcv;
int lenBuf;
+ uchar *peerName;
+ int lenPeer;
+ int remsock = 0; /* init just to keep compiler happy... :-( */
+ sbool bEmitOnClose = 0;
char rcvBuf[128*1024];
DEFiRet;
@@ -1285,13 +1387,15 @@ sessActivity(ptcpsess_t *pSess)
} else if (lenRcv == 0) {
/* session was closed, do clean-up */
if(pSess->pLstn->pSrv->bEmitMsgOnClose) {
- uchar *peerName;
- int lenPeer;
- prop.GetString(pSess->peerName, &peerName, &lenPeer);
- errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "imptcp session %d closed by remote peer %s.\n",
- pSess->sock, peerName);
+ prop.GetString(pSess->peerName, &peerName, &lenPeer),
+ remsock = pSess->sock;
+ bEmitOnClose = 1;
+ }
+ CHKiRet(closeSess(pSess)); /* close may emit more messages in strmzip mode! */
+ if(bEmitOnClose) {
+ errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "imptcp session %d closed by "
+ "remote peer %s.\n", remsock, peerName);
}
- CHKiRet(closeSess(pSess));
break;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK)