summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/imptcp/imptcp.c28
1 files changed, 23 insertions, 5 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index d3551225..e9a20c1c 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -251,6 +251,8 @@ struct ptcplstn_s {
sbool bSuppOctetFram;
epolld_t *epd;
statsobj_t *stats; /* listener stats */
+ intctr_t rcvdBytes;
+ intctr_t rcvdDecompressed;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
};
@@ -879,10 +881,9 @@ DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len)
pThis->bzInitDone = RSTRUE;
}
- /* now doing the compression */
pThis->zstrm.next_in = (Bytef*) buf;
pThis->zstrm.avail_in = len;
- /* run deflate() on buffer until everything has been compressed */
+ /* run inflate() on buffer until everything has been uncompressed */
do {
DBGPRINTF("imptcp: in inflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
pThis->zstrm.avail_out = sizeof(zipBuf);
@@ -892,6 +893,7 @@ DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len)
outavail = sizeof(zipBuf) - pThis->zstrm.avail_out;
if(outavail != 0) {
outtotal += outavail;
+ pThis->pLstn->rcvdDecompressed += outavail;
CHKiRet(DataRcvdUncompressed(pThis, (char*)zipBuf, outavail, ttGenTime));
}
} while (pThis->zstrm.avail_out == 0);
@@ -905,6 +907,7 @@ static rsRetVal
DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
{
DEFiRet;
+ pThis->pLstn->rcvdBytes += iLen;
if(pThis->compressionMode >= COMPRESS_STREAM_ALWAYS)
iRet = DataRcvdCompressed(pThis, pData, iLen);
else
@@ -1012,6 +1015,14 @@ addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6)
STATSCOUNTER_INIT(pLstn->ctrSubmit, pLstn->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(pLstn->ctrSubmit)));
+ /* the following counters are not protected by mutexes; we accept
+ * that they may not be 100% correct */
+ pLstn->rcvdBytes = 0,
+ pLstn->rcvdDecompressed = 0;
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.received"),
+ ctrType_IntCtr, &(pLstn->rcvdBytes)));
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.decompressed"),
+ ctrType_IntCtr, &(pLstn->rcvdDecompressed)));
CHKiRet(statsobj.ConstructFinalize(pLstn->stats));
/* add to start of server's listener list */
@@ -1024,6 +1035,7 @@ addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6)
iRet = addEPollSock(epolld_lstn, pLstn, sock, &pLstn->epd);
finalize_it:
+dbgprintf("DDDD: addLstn return %d\n", iRet);
RETiRet;
}
@@ -1078,7 +1090,6 @@ doZipFinish(ptcpsess_t *pSess)
if(!pSess->bzInitDone)
goto done;
-dbgprintf("DDDD: enter doZipFinish\n");
pSess->zstrm.avail_in = 0;
/* run inflate() on buffer until everything has been compressed */
do {
@@ -1089,6 +1100,7 @@ dbgprintf("DDDD: enter doZipFinish\n");
DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pSess->zstrm.avail_out);
outavail = sizeof(zipBuf) - pSess->zstrm.avail_out;
if(outavail != 0) {
+ pSess->pLstn->rcvdDecompressed += outavail;
CHKiRet(DataRcvdUncompressed(pSess, (char*)zipBuf, outavail, 0)); // TODO: query time!
}
} while (pSess->zstrm.avail_out == 0);
@@ -1102,6 +1114,7 @@ finalize_it:
pSess->bzInitDone = 0;
done: RETiRet;
}
+
/* close/remove a session
* NOTE: we must first remove the fd from the epoll set and then close it -- else we
* get an error "bad file descriptor" from epoll.
@@ -1112,7 +1125,9 @@ closeSess(ptcpsess_t *pSess)
int sock;
DEFiRet;
- doZipFinish(pSess);
+ if(pSess->compressionMode >= COMPRESS_STREAM_ALWAYS)
+ doZipFinish(pSess);
+
sock = pSess->sock;
CHKiRet(removeEPollSock(sock, pSess->epd));
close(sock);
@@ -1790,6 +1805,7 @@ shutdownSrv(ptcpsrv_t *pSrv)
ptcplstn_t *pLstn, *lstnDel;
ptcpsess_t *pSess, *sessDel;
+dbgprintf("DDDD: enter shutdownSrv\n");
/* listeners */
pLstn = pSrv->pLstn;
while(pLstn != NULL) {
@@ -1798,7 +1814,9 @@ shutdownSrv(ptcpsrv_t *pSrv)
/* now unlink listner */
lstnDel = pLstn;
pLstn = pLstn->next;
- DBGPRINTF("imptcp shutdown listen socket %d\n", lstnDel->sock);
+ DBGPRINTF("imptcp shutdown listen socket %d (rcvd %lld bytes, "
+ "decompressed %lld)\n", lstnDel->sock, lstnDel->rcvdBytes,
+ lstnDel->rcvdDecompressed);
free(lstnDel->epd);
free(lstnDel);
}