diff options
Diffstat (limited to 'runtime/wti.c')
-rw-r--r-- | runtime/wti.c | 57 |
1 files changed, 43 insertions, 14 deletions
diff --git a/runtime/wti.c b/runtime/wti.c index 288670b6..9343f5c5 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -39,11 +39,6 @@ #include <pthread.h> #include <errno.h> -/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20 -//#ifdef OS_SOLARIS -//# include <sched.h> -//#endif - #include "rsyslog.h" #include "stringbuf.h" #include "srUtils.h" @@ -79,10 +74,10 @@ wtiGetDbgHdr(wti_t *pThis) /* return the current worker processing state. For the sake of * simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17 */ -bool +sbool wtiGetState(wti_t *pThis) { - return ATOMIC_FETCH_32BIT(pThis->bIsRunning); + return ATOMIC_FETCH_32BIT(&pThis->bIsRunning, &pThis->mutIsRunning); } @@ -102,17 +97,39 @@ wtiSetAlwaysRunning(wti_t *pThis) * is inside wti). -- rgerhards, 2009-07-17 */ rsRetVal -wtiSetState(wti_t *pThis, bool bNewVal) +wtiSetState(wti_t *pThis, sbool bNewVal) { ISOBJ_TYPE_assert(pThis, wti); - if(bNewVal) - ATOMIC_STORE_1_TO_INT(pThis->bIsRunning); - else - ATOMIC_STORE_0_TO_INT(pThis->bIsRunning); + if(bNewVal) { + ATOMIC_STORE_1_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning); + } else { + ATOMIC_STORE_0_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning); + } return RS_RET_OK; } +/* advise all workers to start by interrupting them. That should unblock all srSleep() + * calls. + */ +rsRetVal +wtiWakeupThrd(wti_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, wti); + + + if(wtiGetState(pThis)) { + /* we first try the cooperative "cancel" interface */ + pthread_kill(pThis->thrdID, SIGTTIN); + dbgprintf("sent SIGTTIN to worker thread 0x%x\n", (unsigned) pThis->thrdID); + } + + RETiRet; +} + + /* Cancel the thread. If the thread is not running. But it is save and legal to * call wtiCancelThrd() in such situations. This function only returns when the * thread has terminated. Else we may get race conditions all over the code... @@ -129,7 +146,16 @@ wtiCancelThrd(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); + + if(wtiGetState(pThis)) { + /* we first try the cooperative "cancel" interface */ + pthread_kill(pThis->thrdID, SIGTTIN); + dbgprintf("sent SIGTTIN to worker thread 0x%x, giving it a chance to terminate\n", (unsigned) pThis->thrdID); + srSleep(0, 10000); + } + if(wtiGetState(pThis)) { + dbgprintf("cooperative worker termination failed, using cancellation...\n"); dbgoprint((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ @@ -146,7 +172,9 @@ wtiCancelThrd(wti_t *pThis) BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(wti) /* actual destruction */ - free(pThis->batch.pElem); + batchFree(&pThis->batch); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); + free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -154,6 +182,7 @@ ENDobjDestruct(wti) /* Standard-Constructor for the wti object */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ + INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning); ENDobjConstruct(wti) @@ -175,7 +204,7 @@ wtiConstructFinalize(wti_t *pThis) /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); - CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t))); + CHKiRet(batchInit(&pThis->batch, iDeqBatchSize)); finalize_it: RETiRet; |