summaryrefslogtreecommitdiffstats
path: root/runtime/wtp.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r--runtime/wtp.c145
1 files changed, 33 insertions, 112 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 596ff866..470e0b03 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -44,9 +44,10 @@
# include <sys/prctl.h>
#endif
-#ifdef OS_SOLARIS
-# include <sched.h>
-#endif
+/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20
+//#ifdef OS_SOLARIS
+//# include <sched.h>
+//#endif
#include "rsyslog.h"
#include "stringbuf.h"
@@ -82,21 +83,23 @@ wtpGetDbgHdr(wtp_t *pThis)
/* Not implemented dummy function for constructor */
-static rsRetVal NotImplementedDummy() { return RS_RET_OK; }
+static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; }
/* Standard-Constructor for the wtp object
*/
BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */
pthread_mutex_init(&pThis->mut, NULL);
- pthread_mutex_init(&pThis->mutThrdShutdwn, NULL);
pthread_cond_init(&pThis->condThrdTrm, NULL);
/* set all function pointers to "not implemented" dummy so that we can safely call them */
pThis->pfChkStopWrkr = NotImplementedDummy;
+ pThis->pfGetDeqBatchSize = NotImplementedDummy;
pThis->pfIsIdle = NotImplementedDummy;
pThis->pfDoWork = NotImplementedDummy;
+ pThis->pfObjProcessed = NotImplementedDummy;
pThis->pfOnIdle = NotImplementedDummy;
pThis->pfOnWorkerCancel = NotImplementedDummy;
pThis->pfOnWorkerStartup = NotImplementedDummy;
pThis->pfOnWorkerShutdown = NotImplementedDummy;
+dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState);
ENDobjConstruct(wtp)
@@ -120,7 +123,7 @@ wtpConstructFinalize(wtp_t *pThis)
*/
if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
+
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
CHKiRet(wtiConstruct(&pThis->pWrkr[i]));
pWti = pThis->pWrkr[i];
@@ -140,8 +143,6 @@ finalize_it:
BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODESTART macros! */
int i;
CODESTARTobjDestruct(wtp)
- wtpProcessThrdChanges(pThis); /* process thread changes one last time */
-
/* destruct workers */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i)
wtiDestruct(&pThis->pWrkr[i]);
@@ -152,7 +153,6 @@ CODESTARTobjDestruct(wtp)
/* actual destruction */
pthread_cond_destroy(&pThis->condThrdTrm);
pthread_mutex_destroy(&pThis->mut);
- pthread_mutex_destroy(&pThis->mutThrdShutdwn);
free(pThis->pszDbgHdr);
ENDobjDestruct(wtp)
@@ -188,51 +188,6 @@ wtpWakeupAllWrkr(wtp_t *pThis)
}
-/* check if we had any worker thread changes and, if so, act
- * on them. At a minimum, terminated threads are harvested (joined).
- * This function MUST NEVER block on the queue mutex!
- */
-rsRetVal
-wtpProcessThrdChanges(wtp_t *pThis)
-{
- DEFiRet;
- int i;
-
- ISOBJ_TYPE_assert(pThis, wtp);
-
- if(pThis->bThrdStateChanged == 0)
- FINALIZE;
-
- if(d_pthread_mutex_trylock(&(pThis->mutThrdShutdwn)) != 0) {
- /* another thread is already in the loop */
- FINALIZE;
- }
-
- /* Note: there is a left-over potential race condition below:
- * pThis->bThrdStateChanged may be re-set by another thread while
- * we work on it and thus the loop may terminate too early. However,
- * there are no really bad effects from that so I perfer - for this
- * version - to live with the problem as is. Not a good idea to
- * introduce that large change into the stable branch without very
- * good reason. -- rgerhards, 2009-04-02
- */
- do {
- /* reset the change marker */
- ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged);
- /* go through all threads */
- for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
- wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX);
- }
- /* restart if another change occured while we were processing the changes */
- } while(pThis->bThrdStateChanged != 0);
-
- d_pthread_mutex_unlock(&(pThis->mutThrdShutdwn));
-
-finalize_it:
- RETiRet;
-}
-
-
/* Sent a specific state for the worker thread pool.
* rgerhards, 2008-01-21
*/
@@ -250,7 +205,6 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState)
/* check if the worker shall shutdown (1 = yes, 0 = no)
- * TODO: check if we can use atomic operations to enhance performance
* Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user"
* (e.g. the queue clas)
* rgerhards, 2008-01-21
@@ -264,16 +218,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex)
ISOBJ_TYPE_assert(pThis, wtp);
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
- if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE)
- || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex)))
- iRet = RS_RET_TERMINATE_NOW;
- END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
+ if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
+ ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
+ } else if(pThis->wtpState == wtpState_SHUTDOWN) {
+ ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
+ }
/* try customer handler if one was set and we do not yet have a definite result */
- if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) {
+ if(pThis->pfChkStopWrkr != NULL) {
iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex);
}
+finalize_it:
+ END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
RETiRet;
}
@@ -295,12 +252,7 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
wtpSetState(pThis, tShutdownCmd);
wtpWakeupAllWrkr(pThis);
- /* see if we need to harvest (join) any terminated threads (even in timeout case,
- * some may have terminated...
- */
- wtpProcessThrdChanges(pThis);
-
- /* and wait for their termination */
+ /* wait for worker thread termination */
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pThis->mut);
pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
@@ -320,40 +272,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
if(bTimedOut)
iRet = RS_RET_TIMED_OUT;
- /* see if we need to harvest (join) any terminated threads (even in timeout case,
- * some may have terminated...
- */
- wtpProcessThrdChanges(pThis);
-
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
-/* indicate that a thread has terminated and awake anyone waiting on it
- * rgerhards, 2008-01-23
- */
-rsRetVal wtpSignalWrkrTermination(wtp_t *pThis)
-{
- DEFiRet;
- /* I leave the mutex code here out as it gives us deadlocks. I think it is not really
- * needed and we are on the safe side. I leave this comment in if practice proves us
- * wrong. The whole thing should be removed after half a year or year if we see there
- * actually is no issue (or revisit it from a theoretical POV).
- * rgerhards, 2008-01-28
- * revisited 2008-09-30, still a bit unclear, leave in
- */
- /*TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;*/
-
- ISOBJ_TYPE_assert(pThis, wtp);
-
- /*BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);*/
- pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
- /*END_MTX_PROTECTED_OPERATIONS(&pThis->mut);*/
- RETiRet;
-}
-
-
/* Unconditionally cancel all running worker threads.
* rgerhards, 2008-01-14
*/
@@ -365,9 +288,6 @@ wtpCancelAll(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- /* process any pending thread requests so that we know who actually is still running */
- wtpProcessThrdChanges(pThis);
-
/* go through all workers and cancel those that are active */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i);
@@ -400,7 +320,7 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex)
* decrements the worker counter
* rgerhards, 2008-01-20
*/
-void
+static void
wtpWrkrExecCancelCleanup(void *arg)
{
wtp_t *pThis = (wtp_t*) arg;
@@ -408,8 +328,7 @@ wtpWrkrExecCancelCleanup(void *arg)
BEGINfunc
ISOBJ_TYPE_assert(pThis, wtp);
pThis->iCurNumWrkThrd--;
- wtpSignalWrkrTermination(pThis);
-
+ pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd);
ENDfunc
}
@@ -457,7 +376,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
* because someone may have requested us to shut down even before we got a chance to do
* our init. That would be a bad race... -- rgerhards, 2008-01-16
*/
- wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */
+ wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */
do {
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
@@ -474,7 +393,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
pthread_cleanup_pop(0);
pThis->iCurNumWrkThrd--;
- wtpSignalWrkrTermination(pThis);
+ pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */
dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n",
wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd);
@@ -491,23 +410,20 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in
static rsRetVal
wtpStartWrkr(wtp_t *pThis, int bLockMutex)
{
- DEFiRet;
DEFVARS_mutexProtection;
wti_t *pWti;
int i;
int iState;
+ pthread_attr_t attr;
+ DEFiRet;
ISOBJ_TYPE_assert(pThis, wtp);
- wtpProcessThrdChanges(pThis); // TODO: Performance: this causes a lot of FUTEX calls
-
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
pThis->iCurNumWrkThrd++;
- /* find free spot in thread table. If we find at least one worker that is in initialization,
- * we do NOT start a new one. Let's give the other one a chance, first.
- */
+ /* find free spot in thread table. */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) {
break;
@@ -518,8 +434,11 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex)
ABORT_FINALIZE(RS_RET_NO_MORE_THREADS);
pWti = pThis->pWrkr[i];
- wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX);
- iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti);
+ wtiSetState(pWti, eWRKTHRD_RUN_CREATED, LOCK_MUTEX);
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti);
+ pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */
dbgprintf("%s: started with state %d, num workers now %d\n",
wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd);
@@ -589,8 +508,10 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t)
DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
-DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int))
+DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
+DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*))
DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int))
+DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*))
DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*))
DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*))