diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-04-18 12:40:39 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-04-18 12:40:39 +0200 |
commit | 7f846c53d6519476b6118282ba6b3bad625e43a3 (patch) | |
tree | b72e1e1dec5794455144a1650a3650d841c8110f /plugins/imptcp/imptcp.c | |
parent | e3da34ca5cfe5a70d829be364e815f837e5c47ba (diff) | |
download | rsyslog-7f846c53d6519476b6118282ba6b3bad625e43a3.tar.gz rsyslog-7f846c53d6519476b6118282ba6b3bad625e43a3.tar.bz2 rsyslog-7f846c53d6519476b6118282ba6b3bad625e43a3.zip |
tcp strm compression: add stats counters
Diffstat (limited to 'plugins/imptcp/imptcp.c')
-rw-r--r-- | plugins/imptcp/imptcp.c | 28 |
1 files changed, 23 insertions, 5 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index d3551225..e9a20c1c 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -251,6 +251,8 @@ struct ptcplstn_s { sbool bSuppOctetFram; epolld_t *epd; statsobj_t *stats; /* listener stats */ + intctr_t rcvdBytes; + intctr_t rcvdDecompressed; STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) }; @@ -879,10 +881,9 @@ DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len) pThis->bzInitDone = RSTRUE; } - /* now doing the compression */ pThis->zstrm.next_in = (Bytef*) buf; pThis->zstrm.avail_in = len; - /* run deflate() on buffer until everything has been compressed */ + /* run inflate() on buffer until everything has been uncompressed */ do { DBGPRINTF("imptcp: in inflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in); pThis->zstrm.avail_out = sizeof(zipBuf); @@ -892,6 +893,7 @@ DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len) outavail = sizeof(zipBuf) - pThis->zstrm.avail_out; if(outavail != 0) { outtotal += outavail; + pThis->pLstn->rcvdDecompressed += outavail; CHKiRet(DataRcvdUncompressed(pThis, (char*)zipBuf, outavail, ttGenTime)); } } while (pThis->zstrm.avail_out == 0); @@ -905,6 +907,7 @@ static rsRetVal DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen) { DEFiRet; + pThis->pLstn->rcvdBytes += iLen; if(pThis->compressionMode >= COMPRESS_STREAM_ALWAYS) iRet = DataRcvdCompressed(pThis, pData, iLen); else @@ -1012,6 +1015,14 @@ addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6) STATSCOUNTER_INIT(pLstn->ctrSubmit, pLstn->mutCtrSubmit); CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, &(pLstn->ctrSubmit))); + /* the following counters are not protected by mutexes; we accept + * that they may not be 100% correct */ + pLstn->rcvdBytes = 0, + pLstn->rcvdDecompressed = 0; + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.received"), + ctrType_IntCtr, &(pLstn->rcvdBytes))); + CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.decompressed"), + ctrType_IntCtr, &(pLstn->rcvdDecompressed))); CHKiRet(statsobj.ConstructFinalize(pLstn->stats)); /* add to start of server's listener list */ @@ -1024,6 +1035,7 @@ addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6) iRet = addEPollSock(epolld_lstn, pLstn, sock, &pLstn->epd); finalize_it: +dbgprintf("DDDD: addLstn return %d\n", iRet); RETiRet; } @@ -1078,7 +1090,6 @@ doZipFinish(ptcpsess_t *pSess) if(!pSess->bzInitDone) goto done; -dbgprintf("DDDD: enter doZipFinish\n"); pSess->zstrm.avail_in = 0; /* run inflate() on buffer until everything has been compressed */ do { @@ -1089,6 +1100,7 @@ dbgprintf("DDDD: enter doZipFinish\n"); DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pSess->zstrm.avail_out); outavail = sizeof(zipBuf) - pSess->zstrm.avail_out; if(outavail != 0) { + pSess->pLstn->rcvdDecompressed += outavail; CHKiRet(DataRcvdUncompressed(pSess, (char*)zipBuf, outavail, 0)); // TODO: query time! } } while (pSess->zstrm.avail_out == 0); @@ -1102,6 +1114,7 @@ finalize_it: pSess->bzInitDone = 0; done: RETiRet; } + /* close/remove a session * NOTE: we must first remove the fd from the epoll set and then close it -- else we * get an error "bad file descriptor" from epoll. @@ -1112,7 +1125,9 @@ closeSess(ptcpsess_t *pSess) int sock; DEFiRet; - doZipFinish(pSess); + if(pSess->compressionMode >= COMPRESS_STREAM_ALWAYS) + doZipFinish(pSess); + sock = pSess->sock; CHKiRet(removeEPollSock(sock, pSess->epd)); close(sock); @@ -1790,6 +1805,7 @@ shutdownSrv(ptcpsrv_t *pSrv) ptcplstn_t *pLstn, *lstnDel; ptcpsess_t *pSess, *sessDel; +dbgprintf("DDDD: enter shutdownSrv\n"); /* listeners */ pLstn = pSrv->pLstn; while(pLstn != NULL) { @@ -1798,7 +1814,9 @@ shutdownSrv(ptcpsrv_t *pSrv) /* now unlink listner */ lstnDel = pLstn; pLstn = pLstn->next; - DBGPRINTF("imptcp shutdown listen socket %d\n", lstnDel->sock); + DBGPRINTF("imptcp shutdown listen socket %d (rcvd %lld bytes, " + "decompressed %lld)\n", lstnDel->sock, lstnDel->rcvdBytes, + lstnDel->rcvdDecompressed); free(lstnDel->epd); free(lstnDel); } |