diff options
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r-- | runtime/wtp.c | 145 |
1 files changed, 33 insertions, 112 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c index 596ff866..470e0b03 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -44,9 +44,10 @@ # include <sys/prctl.h> #endif -#ifdef OS_SOLARIS -# include <sched.h> -#endif +/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20 +//#ifdef OS_SOLARIS +//# include <sched.h> +//#endif #include "rsyslog.h" #include "stringbuf.h" @@ -82,21 +83,23 @@ wtpGetDbgHdr(wtp_t *pThis) /* Not implemented dummy function for constructor */ -static rsRetVal NotImplementedDummy() { return RS_RET_OK; } +static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; } /* Standard-Constructor for the wtp object */ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */ pthread_mutex_init(&pThis->mut, NULL); - pthread_mutex_init(&pThis->mutThrdShutdwn, NULL); pthread_cond_init(&pThis->condThrdTrm, NULL); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; + pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; + pThis->pfObjProcessed = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; pThis->pfOnWorkerCancel = NotImplementedDummy; pThis->pfOnWorkerStartup = NotImplementedDummy; pThis->pfOnWorkerShutdown = NotImplementedDummy; +dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState); ENDobjConstruct(wtp) @@ -120,7 +123,7 @@ wtpConstructFinalize(wtp_t *pThis) */ if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { CHKiRet(wtiConstruct(&pThis->pWrkr[i])); pWti = pThis->pWrkr[i]; @@ -140,8 +143,6 @@ finalize_it: BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODESTART macros! */ int i; CODESTARTobjDestruct(wtp) - wtpProcessThrdChanges(pThis); /* process thread changes one last time */ - /* destruct workers */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) wtiDestruct(&pThis->pWrkr[i]); @@ -152,7 +153,6 @@ CODESTARTobjDestruct(wtp) /* actual destruction */ pthread_cond_destroy(&pThis->condThrdTrm); pthread_mutex_destroy(&pThis->mut); - pthread_mutex_destroy(&pThis->mutThrdShutdwn); free(pThis->pszDbgHdr); ENDobjDestruct(wtp) @@ -188,51 +188,6 @@ wtpWakeupAllWrkr(wtp_t *pThis) } -/* check if we had any worker thread changes and, if so, act - * on them. At a minimum, terminated threads are harvested (joined). - * This function MUST NEVER block on the queue mutex! - */ -rsRetVal -wtpProcessThrdChanges(wtp_t *pThis) -{ - DEFiRet; - int i; - - ISOBJ_TYPE_assert(pThis, wtp); - - if(pThis->bThrdStateChanged == 0) - FINALIZE; - - if(d_pthread_mutex_trylock(&(pThis->mutThrdShutdwn)) != 0) { - /* another thread is already in the loop */ - FINALIZE; - } - - /* Note: there is a left-over potential race condition below: - * pThis->bThrdStateChanged may be re-set by another thread while - * we work on it and thus the loop may terminate too early. However, - * there are no really bad effects from that so I perfer - for this - * version - to live with the problem as is. Not a good idea to - * introduce that large change into the stable branch without very - * good reason. -- rgerhards, 2009-04-02 - */ - do { - /* reset the change marker */ - ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged); - /* go through all threads */ - for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { - wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX); - } - /* restart if another change occured while we were processing the changes */ - } while(pThis->bThrdStateChanged != 0); - - d_pthread_mutex_unlock(&(pThis->mutThrdShutdwn)); - -finalize_it: - RETiRet; -} - - /* Sent a specific state for the worker thread pool. * rgerhards, 2008-01-21 */ @@ -250,7 +205,6 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState) /* check if the worker shall shutdown (1 = yes, 0 = no) - * TODO: check if we can use atomic operations to enhance performance * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" * (e.g. the queue clas) * rgerhards, 2008-01-21 @@ -264,16 +218,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) ISOBJ_TYPE_assert(pThis, wtp); BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) - || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex))) - iRet = RS_RET_TERMINATE_NOW; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); + if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) { + ABORT_FINALIZE(RS_RET_TERMINATE_NOW); + } else if(pThis->wtpState == wtpState_SHUTDOWN) { + ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); + } /* try customer handler if one was set and we do not yet have a definite result */ - if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) { + if(pThis->pfChkStopWrkr != NULL) { iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); } +finalize_it: + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } @@ -295,12 +252,7 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout wtpSetState(pThis, tShutdownCmd); wtpWakeupAllWrkr(pThis); - /* see if we need to harvest (join) any terminated threads (even in timeout case, - * some may have terminated... - */ - wtpProcessThrdChanges(pThis); - - /* and wait for their termination */ + /* wait for worker thread termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pThis->mut); pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); @@ -320,40 +272,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout if(bTimedOut) iRet = RS_RET_TIMED_OUT; - /* see if we need to harvest (join) any terminated threads (even in timeout case, - * some may have terminated... - */ - wtpProcessThrdChanges(pThis); - RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" -/* indicate that a thread has terminated and awake anyone waiting on it - * rgerhards, 2008-01-23 - */ -rsRetVal wtpSignalWrkrTermination(wtp_t *pThis) -{ - DEFiRet; - /* I leave the mutex code here out as it gives us deadlocks. I think it is not really - * needed and we are on the safe side. I leave this comment in if practice proves us - * wrong. The whole thing should be removed after half a year or year if we see there - * actually is no issue (or revisit it from a theoretical POV). - * rgerhards, 2008-01-28 - * revisited 2008-09-30, still a bit unclear, leave in - */ - /*TODO: mutex or not mutex, that's the question ;)DEFVARS_mutexProtection;*/ - - ISOBJ_TYPE_assert(pThis, wtp); - - /*BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, LOCK_MUTEX);*/ - pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ - /*END_MTX_PROTECTED_OPERATIONS(&pThis->mut);*/ - RETiRet; -} - - /* Unconditionally cancel all running worker threads. * rgerhards, 2008-01-14 */ @@ -365,9 +288,6 @@ wtpCancelAll(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - /* process any pending thread requests so that we know who actually is still running */ - wtpProcessThrdChanges(pThis); - /* go through all workers and cancel those that are active */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { dbgprintf("%s: try canceling worker thread %d\n", wtpGetDbgHdr(pThis), i); @@ -400,7 +320,7 @@ wtpSetInactivityGuard(wtp_t *pThis, int bNewState, int bLockMutex) * decrements the worker counter * rgerhards, 2008-01-20 */ -void +static void wtpWrkrExecCancelCleanup(void *arg) { wtp_t *pThis = (wtp_t*) arg; @@ -408,8 +328,7 @@ wtpWrkrExecCancelCleanup(void *arg) BEGINfunc ISOBJ_TYPE_assert(pThis, wtp); pThis->iCurNumWrkThrd--; - wtpSignalWrkrTermination(pThis); - + pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ dbgprintf("%s: thread CANCELED with %d workers running.\n", wtpGetDbgHdr(pThis), pThis->iCurNumWrkThrd); ENDfunc } @@ -457,7 +376,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in * because someone may have requested us to shut down even before we got a chance to do * our init. That would be a bad race... -- rgerhards, 2008-01-16 */ - wtiSetState(pWti, eWRKTHRD_RUNNING, 0, MUTEX_ALREADY_LOCKED); /* we are running now! */ + wtiSetState(pWti, eWRKTHRD_RUNNING, MUTEX_ALREADY_LOCKED); /* we are running now! */ do { END_MTX_PROTECTED_OPERATIONS(&pThis->mut); @@ -474,7 +393,7 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in pthread_cleanup_pop(0); pThis->iCurNumWrkThrd--; - wtpSignalWrkrTermination(pThis); + pthread_cond_signal(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ dbgprintf("%s: Worker thread %lx, terminated, num workers now %d\n", wtpGetDbgHdr(pThis), (unsigned long) pWti, pThis->iCurNumWrkThrd); @@ -491,23 +410,20 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in static rsRetVal wtpStartWrkr(wtp_t *pThis, int bLockMutex) { - DEFiRet; DEFVARS_mutexProtection; wti_t *pWti; int i; int iState; + pthread_attr_t attr; + DEFiRet; ISOBJ_TYPE_assert(pThis, wtp); - wtpProcessThrdChanges(pThis); // TODO: Performance: this causes a lot of FUTEX calls - BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); pThis->iCurNumWrkThrd++; - /* find free spot in thread table. If we find at least one worker that is in initialization, - * we do NOT start a new one. Let's give the other one a chance, first. - */ + /* find free spot in thread table. */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { if(wtiGetState(pThis->pWrkr[i], LOCK_MUTEX) == eWRKTHRD_STOPPED) { break; @@ -518,8 +434,11 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) ABORT_FINALIZE(RS_RET_NO_MORE_THREADS); pWti = pThis->pWrkr[i]; - wtiSetState(pWti, eWRKTHRD_RUN_CREATED, 0, LOCK_MUTEX); - iState = pthread_create(&(pWti->thrdID), NULL, wtpWorker, (void*) pWti); + wtiSetState(pWti, eWRKTHRD_RUN_CREATED, LOCK_MUTEX); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + iState = pthread_create(&(pWti->thrdID), &attr, wtpWorker, (void*) pWti); + pthread_attr_destroy(&attr); /* TODO: we could globally reuse such an attribute 2009-07-08 */ dbgprintf("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); @@ -589,8 +508,10 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) -DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)) +DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) +DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) +DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)) |