diff options
Diffstat (limited to 'tcpsrv.c')
-rw-r--r-- | tcpsrv.c | 39 |
1 files changed, 33 insertions, 6 deletions
@@ -39,8 +39,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "config.h" +#include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> @@ -95,6 +95,7 @@ DEFobjCurrIf(netstrm) DEFobjCurrIf(nssel) DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) +DEFobjCurrIf(statsobj) /* The following structure controls the worker threads. Global data is @@ -121,6 +122,7 @@ static inline rsRetVal addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort) { tcpLstnPortList_t *pEntry; + uchar statname[64]; DEFiRet; ISOBJ_TYPE_assert(pThis, tcpsrv); @@ -140,6 +142,15 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort) pEntry->pNext = pThis->pLstnPorts; pThis->pLstnPorts = pEntry; + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&(pEntry->stats))); + snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort); + statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(statsobj.SetName(pEntry->stats, statname)); + CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"), + ctrType_IntCtr, &(pEntry->ctrSubmit))); + CHKiRet(statsobj.ConstructFinalize(pEntry->stats)); + finalize_it: RETiRet; } @@ -418,6 +429,10 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED); } + if(pThis->bUseKeepAlive) { + CHKiRet(netstrm.EnableKeepAlive(pNewStrm)); + } + /* we found a free spot and can construct our session object */ CHKiRet(tcps_sess.Construct(&pSess)); CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis)); @@ -533,9 +548,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } - //pthread_mutex_lock(&mut); CHKiRet(closeSess(pThis, ppSess, pPoll)); - //pthread_mutex_unlock(&mut); break; case RS_RET_RETRY: /* we simply ignore retry - this is not an error, but we also have not received anything */ @@ -569,16 +582,18 @@ finalize_it: static inline rsRetVal processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) { - tcps_sess_t *pNewSess; + tcps_sess_t *pNewSess = NULL; DEFiRet; - dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); + DBGPRINTF("tcpsrv: processing item %d, pUsr %p, bAbortConn\n", idx, pUsr); if(pUsr == pThis->ppLstn) { DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); if(iRet == RS_RET_OK) { - if(pPoll != NULL) + if(pPoll != NULL) { + dbgprintf("XXXXXX: processWorksetItem trying nspoll.ctl\n"); CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + } DBGPRINTF("New session created with NSD %p.\n", pNewSess); } else { DBGPRINTF("tcpsrv: error %d during accept\n", iRet); @@ -1028,6 +1043,15 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr) } static rsRetVal +SetKeepAlive(tcpsrv_t *pThis, int iVal) +{ + DEFiRet; + dbgprintf("tcpsrv: keep-alive set to %d\n", iVal); + pThis->bUseKeepAlive = iVal; + RETiRet; +} + +static rsRetVal SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int)) { DEFiRet; @@ -1206,6 +1230,7 @@ CODESTARTobjQueryInterface(tcpsrv) pIf->create_tcp_socket = create_tcp_socket; pIf->Run = Run; + pIf->SetKeepAlive = SetKeepAlive; pIf->SetUsrP = SetUsrP; pIf->SetInputName = SetInputName; pIf->SetAddtlFrameDelim = SetAddtlFrameDelim; @@ -1243,6 +1268,7 @@ CODESTARTObjClassExit(tcpsrv) objRelease(tcps_sess, DONT_LOAD_LIB); objRelease(conf, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); objRelease(ruleset, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); @@ -1269,6 +1295,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE CHKiRet(objUse(conf, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); /* set our own handlers */ |