diff options
Diffstat (limited to 'tcpsrv.c')
-rw-r--r-- | tcpsrv.c | 240 |
1 files changed, 207 insertions, 33 deletions
@@ -47,6 +47,7 @@ #include <ctype.h> #include <netinet/in.h> #include <netdb.h> +#include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #if HAVE_FCNTL_H @@ -70,6 +71,7 @@ #include "ruleset.h" #include "unicode-helper.h" + MODULE_TYPE_LIB /* defines */ @@ -91,6 +93,23 @@ DEFobjCurrIf(nspoll) DEFobjCurrIf(prop) +/* The following structure controls the worker threads. Global data is + * needed for their access. + */ +static struct wrkrInfo_s { + pthread_t tid; /* the worker's thread ID */ + pthread_cond_t run; + int idx; + tcpsrv_t *pSrv; /* pSrv == NULL -> idle */ + nspoll_t *pPoll; + void *pUsr; + long long unsigned numCalled; /* how often was this called */ +} wrkrInfo[4]; +static pthread_mutex_t wrkrMut; +static pthread_cond_t wrkrIdle; +static int wrkrMax = 4; +static int wrkrRunning; + /* add new listener port to listener port list * rgerhards, 2009-05-21 */ @@ -477,7 +496,9 @@ closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) { CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL)); } pThis->pOnRegularClose(*ppSess); +dbgprintf("XXX: pre destruct *ppSess = %p\n", *ppSess); tcps_sess.Destruct(ppSess); +dbgprintf("XXX: post destruct *ppSess = %p\n", *ppSess); finalize_it: RETiRet; } @@ -496,6 +517,7 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) rsRetVal localRet; DEFiRet; +//printf("doReceive %p/%p\n", pThis, *ppSess); ISOBJ_TYPE_assert(pThis, tcpsrv); DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm); /* Receive message */ @@ -510,7 +532,9 @@ doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote peer %s.\n", (*ppSess)->pStrm, pszPeer); } + //pthread_mutex_lock(&mut); CHKiRet(closeSess(pThis, ppSess, pPoll)); + //pthread_mutex_unlock(&mut); break; case RS_RET_RETRY: /* we simply ignore retry - this is not an error, but we also have not received anything */ @@ -538,7 +562,6 @@ finalize_it: RETiRet; } - /* process a single workset item */ static inline rsRetVal @@ -549,13 +572,23 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) dbgprintf("tcpsrv: processing item %d, pUsr %p\n", idx, pUsr); if(pUsr == pThis->ppLstn) { +//printf("work item %p: connect\n", pUsr); DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]); - SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); - CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); - DBGPRINTF("New session created with NSD %p.\n", pNewSess); + iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); + if(iRet == RS_RET_OK) { + if(pPoll != NULL) + CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); + DBGPRINTF("New session created with NSD %p.\n", pNewSess); + } else { + DBGPRINTF("tcpsrv: error %d during accept\n", iRet); + } } else { +//printf("work item %p: receive\n", pUsr); pNewSess = (tcps_sess_t*) pUsr; doReceive(pThis, &pNewSess, pPoll); + if(pPoll == NULL && pNewSess == NULL) { + pThis->pSessions[idx] = NULL; + } } finalize_it: @@ -563,22 +596,110 @@ finalize_it: } +/* worker to process incoming requests + */ +static void * +wrkr(void *myself) +{ + struct wrkrInfo_s *me = (struct wrkrInfo_s*) myself; + + pthread_mutex_lock(&wrkrMut); + while(1) { + while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) { + pthread_cond_wait(&me->run, &wrkrMut); + } + if(glbl.GetGlobalInputTermState() == 1) + break; + pthread_mutex_unlock(&wrkrMut); + + ++me->numCalled; + processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr); + + pthread_mutex_lock(&wrkrMut); + me->pSrv = NULL; /* indicate we are free again */ + --wrkrRunning; + pthread_cond_signal(&wrkrIdle); + } + pthread_mutex_unlock(&wrkrMut); + + return NULL; +} + + /* Process a workset, that is handle io. We become activated * from either select or epoll handler. We split the workload * out to a pool of threads, but try to avoid context switches * as much as possible. */ -static rsRetVal processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) +static rsRetVal +processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[]) { int i; + int origEntries = numEntries; DEFiRet; +#if 0 +{ /* chck workset for dupes */ +int k, j; +for(k = 0 ; k < numEntries ; ++k) { + //printf("work item %d: %p\n", k, workset[k].pUsr); + for(j = k+1 ; j < numEntries ; ++j) { + if(workset[k].pUsr == workset[j].pUsr) { + printf(stderr, "workset duplicate %d:%d:%p\n", k, j, workset[k].pUsr); + flush(stderr); + } + } +} +} +#endif dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); - for(i = 0 ; i < numEntries ; i++) { + while(numEntries > 0) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); - CHKiRet(processWorksetItem(pThis, pPoll, workset[i].id, workset[i].pUsr)); + if(numEntries == 1) { + /* process self, save context switch */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr); + } else { + pthread_mutex_lock(&wrkrMut); + /* check if there is a free worker */ + for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i) + /*do search*/; + if(i < wrkrMax) { + /* worker free -> use it! */ + wrkrInfo[i].pSrv = pThis; + wrkrInfo[i].pPoll = pPoll; + wrkrInfo[i].idx = workset[numEntries -1].id; + wrkrInfo[i].pUsr = workset[numEntries -1].pUsr; +dbgprintf("XXX: activating worker %d\n", i); + /* Note: we must increment wrkrRunning HERE and not inside the worker's + * code. This is because a worker may actually never start, and thus + * increment wrkrRunning, before we finish and check the running worker + * count. We can only avoid this by incrementing it here. + */ + ++wrkrRunning; + pthread_cond_signal(&wrkrInfo[i].run); + pthread_mutex_unlock(&wrkrMut); + } else { + pthread_mutex_unlock(&wrkrMut); + /* no free worker, so we process this one ourselfs */ + processWorksetItem(pThis, pPoll, workset[numEntries-1].id, + workset[numEntries-1].pUsr); + } + } + --numEntries; + } + + if(origEntries > 1) { + /* we now need to wait until all workers finish. This is because the + * rest of this module can not handle the concurrency introduced + * by workers running during the epoll call. + */ + pthread_mutex_lock(&wrkrMut); + while(wrkrRunning > 0) { + pthread_cond_wait(&wrkrIdle, &wrkrMut); + } + pthread_mutex_unlock(&wrkrMut); } finalize_it: @@ -592,11 +713,13 @@ finalize_it: */ #pragma GCC diagnostic ignored "-Wempty-body" static inline rsRetVal -RunSelect(tcpsrv_t *pThis) +RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset) { DEFiRet; int nfds; int i; + int iWorkset; + int numEntries; int iTCPSess; int bIsReady; tcps_sess_t *pNewSess; @@ -622,6 +745,10 @@ RunSelect(tcpsrv_t *pThis) /* do the sessions */ iTCPSess = TCPSessGetNxtSess(pThis, -1); while(iTCPSess != -1) { +//dbgprintf("Added sessions to select set, pSel %p\n", pSel); +//dbgprintf("Added sessions to select set, iTCPSess %d\n", iTCPSess); +//dbgprintf("Added sessions to select set, ptr to strm %p\n", pThis->pSessions[iTCPSess]); +//dbgprintf("Added sessions to select set, strm %p\n", pThis->pSessions[iTCPSess]->pStrm); /* TODO: access to pNsd is NOT really CLEAN, use method... */ CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD)); /* now get next... */ @@ -633,13 +760,21 @@ RunSelect(tcpsrv_t *pThis) if(glbl.GetGlobalInputTermState() == 1) break; /* terminate input! */ + iWorkset = 0; for(i = 0 ; i < pThis->iLstnCurr ; ++i) { if(glbl.GetGlobalInputTermState() == 1) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); - SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); + workset[iWorkset].id = i; + workset[iWorkset].pUsr = (void*) pThis->ppLstn; /* this is a flag to indicate listen sock */ + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } + //DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[i]); + //SessAccept(pThis, pThis->ppLstnPort[i], &pNewSess, pThis->ppLstn[i]); --nfds; /* indicate we have processed one */ } } @@ -651,11 +786,23 @@ RunSelect(tcpsrv_t *pThis) ABORT_FINALIZE(RS_RET_FORCE_TERM); CHKiRet(nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD, &bIsReady, &nfds)); if(bIsReady) { - doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); + //doReceive(pThis, &pThis->pSessions[iTCPSess], NULL); + workset[iWorkset].id = iTCPSess; + workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess]; + ++iWorkset; + if(iWorkset >= (int) sizeWorkset) { + processWorkset(pThis, NULL, iWorkset, workset); + iWorkset = 0; + } --nfds; /* indicate we have processed one */ } iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess); } + + if(iWorkset > 0) + processWorkset(pThis, NULL, iWorkset, workset); + + /* we need to copy back close descriptors */ CHKiRet(nssel.Destruct(&pSel)); finalize_it: /* this is a very special case - this time only we do not exit the function, * because that would not help us either. So we simply retry it. Let's see @@ -690,6 +837,10 @@ Run(tcpsrv_t *pThis) rsRetVal localRet; ISOBJ_TYPE_assert(pThis, tcpsrv); +#if 0 +iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); +FINALIZE; +#endif /* this is an endless loop - it is terminated by the framework canelling * this thread. Thus, we also need to instantiate a cancel cleanup handler @@ -702,7 +853,7 @@ Run(tcpsrv_t *pThis) if(localRet != RS_RET_OK) { /* fall back to select */ dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); - iRet = RunSelect(pThis); + iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); FINALIZE; } @@ -732,26 +883,6 @@ Run(tcpsrv_t *pThis) continue; processWorkset(pThis, pPoll, numEntries, workset); -#if 0 - dbgprintf("poll returned with %d entries.\n", numEntries); - - for(i = 0 ; i < numEntries ; i++) { - if(glbl.GetGlobalInputTermState() == 1) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - currIdx = workset[i].id; - dbgprintf("tcpsrv processing i %d, pUsr %p\n", currIdx, workset[i].pUsr); -dbgprintf("tcpsrv processing pUsr %p, ppLstn[0] %p, ppLstn[%d] %p\n", workset[i].pUsr, pThis->ppLstn[0], currIdx, pThis->ppLstn[currIdx]); - if(workset[i].pUsr == pThis->ppLstn) { - DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[currIdx]); - SessAccept(pThis, pThis->ppLstnPort[currIdx], &pNewSess, pThis->ppLstn[currIdx]); - CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); - DBGPRINTF("New session created with NSD %p.\n", pNewSess); - } else { - pNewSess = (tcps_sess_t*) workset[i].pUsr; - doReceive(pThis, &pNewSess, pPoll); - } - } -#endif } /* remove the tcp listen sockets from the epoll set */ @@ -1155,11 +1286,51 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE ENDObjClassInit(tcpsrv) -/* --------------- here now comes the plumbing that makes as a library module --------------- */ +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +startWorkerPool(void) +{ + int i; + wrkrRunning = 0; + pthread_mutex_init(&wrkrMut, NULL); + pthread_cond_init(&wrkrIdle, NULL); + for(i = 0 ; i < wrkrMax ; ++i) { + /* init worker info structure! */ + pthread_cond_init(&wrkrInfo[i].run, NULL); + wrkrInfo[i].pSrv = NULL; + wrkrInfo[i].numCalled = 0; + pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i])); + } + +} + +/* destroy worker pool structures and wait for workers to terminate + */ +static inline void +stopWorkerPool(void) +{ + int i; + for(i = 0 ; i < wrkrMax ; ++i) { + pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */ + pthread_join(wrkrInfo[i].tid, NULL); + DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); +printf("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled); + pthread_cond_destroy(&wrkrInfo[i].run); + } + pthread_cond_destroy(&wrkrIdle); + pthread_mutex_destroy(&wrkrMut); + +} + +/* --------------- here now comes the plumbing that makes as a library module --------------- */ BEGINmodExit CODESTARTmodExit +dbgprintf("tcpsrv: modExit\n"); + stopWorkerPool(); + /* de-init in reverse order! */ tcpsrvClassExit(); tcps_sessClassExit(); @@ -1179,6 +1350,9 @@ CODESTARTmodInit /* Initialize all classes that are in our module - this includes ourselfs */ CHKiRet(tcps_sessClassInit(pModInfo)); CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */ + + startWorkerPool(); + ENDmodInit /* vim:set ai: |