summaryrefslogtreecommitdiffstats
path: root/tcpsrv.c
diff options
context:
space:
mode:
Diffstat (limited to 'tcpsrv.c')
-rw-r--r--tcpsrv.c65
1 files changed, 52 insertions, 13 deletions
diff --git a/tcpsrv.c b/tcpsrv.c
index b5b64f07..0e6e13d2 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -97,6 +97,7 @@ DEFobjCurrIf(nspoll)
DEFobjCurrIf(prop)
DEFobjCurrIf(statsobj)
+static void startWorkerPool(void);
/* The following structure controls the worker threads. Global data is
* needed for their access.
@@ -108,8 +109,10 @@ static struct wrkrInfo_s {
tcpsrv_t *pSrv; /* pSrv == NULL -> idle */
nspoll_t *pPoll;
void *pUsr;
+ sbool enabled;
long long unsigned numCalled; /* how often was this called */
} wrkrInfo[4];
+static sbool bWrkrRunning; /* are the worker threads running? */
static pthread_mutex_t wrkrMut;
static pthread_cond_t wrkrIdle;
static int wrkrMax = 4;
@@ -625,8 +628,10 @@ wrkr(void *myself)
while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) {
pthread_cond_wait(&me->run, &wrkrMut);
}
- if(glbl.GetGlobalInputTermState() == 1)
+ if(glbl.GetGlobalInputTermState() == 1) {
+ --wrkrRunning;
break;
+ }
pthread_mutex_unlock(&wrkrMut);
++me->numCalled;
@@ -637,6 +642,7 @@ wrkr(void *myself)
--wrkrRunning;
pthread_cond_signal(&wrkrIdle);
}
+ me->enabled = 0; /* indicate we are no longer available */
pthread_mutex_unlock(&wrkrMut);
return NULL;
@@ -666,7 +672,7 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t
} else {
pthread_mutex_lock(&wrkrMut);
/* check if there is a free worker */
- for(i = 0 ; (i < wrkrMax) && (wrkrInfo[i].pSrv != NULL) ; ++i)
+ for(i = 0 ; (i < wrkrMax) && ((wrkrInfo[i].pSrv != NULL) || (wrkrInfo[i].enabled == 0)) ; ++i)
/*do search*/;
if(i < wrkrMax) {
/* worker free -> use it! */
@@ -836,6 +842,16 @@ Run(tcpsrv_t *pThis)
ISOBJ_TYPE_assert(pThis, tcpsrv);
+ /* check if we need to start the worker pool. Once it is running, all is
+ * well. Shutdown is done on modExit.
+ */
+ d_pthread_mutex_lock(&wrkrMut);
+ if(!bWrkrRunning) {
+ bWrkrRunning = 1;
+ startWorkerPool();
+ }
+ d_pthread_mutex_unlock(&wrkrMut);
+
/* this is an endless loop - it is terminated by the framework canelling
* this thread. Thus, we also need to instantiate a cancel cleanup handler
* to prevent us from leaking anything. -- rgerhards, 20080-04-24
@@ -1306,28 +1322,42 @@ BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE
ENDObjClassInit(tcpsrv)
-/* destroy worker pool structures and wait for workers to terminate
+/* start worker threads
+ * Important: if we fork, this MUST be done AFTER forking
*/
-static inline void
+static void
startWorkerPool(void)
{
int i;
+ int r;
+ pthread_attr_t sessThrdAttr;
+
wrkrRunning = 0;
- pthread_mutex_init(&wrkrMut, NULL);
pthread_cond_init(&wrkrIdle, NULL);
+ pthread_attr_init(&sessThrdAttr);
+ pthread_attr_setstacksize(&sessThrdAttr, 200*1024);
for(i = 0 ; i < wrkrMax ; ++i) {
/* init worker info structure! */
pthread_cond_init(&wrkrInfo[i].run, NULL);
wrkrInfo[i].pSrv = NULL;
wrkrInfo[i].numCalled = 0;
- pthread_create(&wrkrInfo[i].tid, NULL, wrkr, &(wrkrInfo[i]));
+ r = pthread_create(&wrkrInfo[i].tid, &sessThrdAttr, wrkr, &(wrkrInfo[i]));
+ if(r == 0) {
+ wrkrInfo[i].enabled = 1;
+ } else {
+ char errStr[1024];
+ wrkrInfo[i].enabled = 0;
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, NO_ERRCODE, "tcpsrv error creating thread %d: "
+ "%s", i, errStr);
+ }
}
-
+ pthread_attr_destroy(&sessThrdAttr);
}
/* destroy worker pool structures and wait for workers to terminate
*/
-static inline void
+static void
stopWorkerPool(void)
{
int i;
@@ -1347,9 +1377,7 @@ stopWorkerPool(void)
BEGINmodExit
CODESTARTmodExit
-dbgprintf("tcpsrv: modExit\n");
stopWorkerPool();
-
/* de-init in reverse order! */
tcpsrvClassExit();
tcps_sessClassExit();
@@ -1365,13 +1393,24 @@ ENDqueryEtryPt
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+ /* we just init the worker mutex, but do not start the workers themselves. This is deferred
+ * to the first call of Run(). Reasons for this:
+ * 1. depending on load order, tcpsrv gets loaded during rsyslog startup BEFORE
+ * it forks, in which case the workers would be running in the then-killed parent,
+ * leading to a defuncnt child (we actually had this bug).
+ * 2. depending on circumstances, Run() would possibly never be called, in which case
+ * the worker threads would be totally useless.
+ * Note that in order to guarantee a non-racy worker start, we need to guard the
+ * startup sequence by a mutex, which is why we init it here (no problem with fork()
+ * in this case as the mutex is a pure-memory structure).
+ * rgerhards, 2012-05-18
+ */
+ pthread_mutex_init(&wrkrMut, NULL);
+ bWrkrRunning = 0;
/* Initialize all classes that are in our module - this includes ourselfs */
CHKiRet(tcps_sessClassInit(pModInfo));
CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
-
- startWorkerPool();
-
ENDmodInit
/* vim:set ai: