summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c39
1 files changed, 33 insertions, 6 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index c091df0b..4d0c864f 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -36,8 +36,8 @@
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
-
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
@@ -92,6 +92,7 @@ DEFobjCurrIf(netstrm)
DEFobjCurrIf(nssel)
DEFobjCurrIf(nspoll)
DEFobjCurrIf(prop)
+DEFobjCurrIf(statsobj)
/* The following structure controls the worker threads. Global data is
@@ -118,6 +119,7 @@ static inline rsRetVal
addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
{
tcpLstnPortList_t *pEntry;
+ uchar statname[64];
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcpsrv);
@@ -137,6 +139,15 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort)
pEntry->pNext = pThis->pLstnPorts;
pThis->pLstnPorts = pEntry;
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&(pEntry->stats)));
+ snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort);
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(pEntry->stats, statname));
+ CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &(pEntry->ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(pEntry->stats));
+
finalize_it:
RETiRet;
}
@@ -415,6 +426,10 @@ SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess,
ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
}
+ if(pThis->bUseKeepAlive) {
+ CHKiRet(netstrm.EnableKeepAlive(pNewStrm));
+ }
+
/* we found a free spot and can construct our session object */
CHKiRet(tcps_sess.Construct(&pSess));
CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis));
@@ -530,9 +545,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n",
(*ppSess)->pStrm, pszPeer);
}
- //pthread_mutex_lock(&mut);
CHKiRet(closeSess(pThis, ppSess, pPoll));
- //pthread_mutex_unlock(&mut);
break;
case RS_RET_RETRY:
/* we simply ignore retry - this is not an error, but we also have not received anything */
@@ -566,16 +579,18 @@ finalize_it:
static inline rsRetVal
processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr)
{
- tcps_sess_t *pNewSess;
+ tcps_sess_t *pNewSess = NULL;
DEFiRet;
- dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr);
+ DBGPRINTF("tcpsrv: processing item %d, pUsr %p, bAbortConn\n", idx, pUsr);
if(pUsr == pThis->ppLstn) {
DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
if(iRet == RS_RET_OK) {
- if(pPoll != NULL)
+ if(pPoll != NULL) {
+ dbgprintf("XXXXXX: processWorksetItem trying nspoll.ctl\n");
CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
+ }
DBGPRINTF("New session created with NSD %p.\n", pNewSess);
} else {
DBGPRINTF("tcpsrv: error %d during accept\n", iRet);
@@ -1025,6 +1040,15 @@ SetUsrP(tcpsrv_t *pThis, void *pUsr)
}
static rsRetVal
+SetKeepAlive(tcpsrv_t *pThis, int iVal)
+{
+ DEFiRet;
+ dbgprintf("tcpsrv: keep-alive set to %d\n", iVal);
+ pThis->bUseKeepAlive = iVal;
+ RETiRet;
+}
+
+static rsRetVal
SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int))
{
DEFiRet;
@@ -1203,6 +1227,7 @@ CODESTARTobjQueryInterface(tcpsrv)
pIf->create_tcp_socket = create_tcp_socket;
pIf->Run = Run;
+ pIf->SetKeepAlive = SetKeepAlive;
pIf->SetUsrP = SetUsrP;
pIf->SetInputName = SetInputName;
pIf->SetAddtlFrameDelim = SetAddtlFrameDelim;
@@ -1240,6 +1265,7 @@ CODESTARTObjClassExit(tcpsrv)
objRelease(tcps_sess, DONT_LOAD_LIB);
objRelease(conf, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
@@ -1266,6 +1292,7 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
CHKiRet(objUse(conf, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
/* set our own handlers */