diff options
Diffstat (limited to 'runtime/wti.c')
-rw-r--r-- | runtime/wti.c | 32 |
1 files changed, 12 insertions, 20 deletions
diff --git a/runtime/wti.c b/runtime/wti.c index 53b695b0..288670b6 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -119,7 +119,7 @@ wtiSetState(wti_t *pThis, bool bNewVal) * Note that when waiting for the thread to terminate, we do a busy wait, checking * progress every 10ms. It is very unlikely that we will ever cancel a thread * and, if so, it will only happen at the end of the rsyslog run. So doing this - * kind of not optimal wait is considered preferable over using condition variables. + * kind of non-optimal wait is considered preferable over using condition variables. * rgerhards, 2008-02-26 */ rsRetVal @@ -134,7 +134,6 @@ wtiCancelThrd(wti_t *pThis) pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ while(wtiGetState(pThis)) { -//fprintf(stderr, "sleep loop for getState\n"); srSleep(0, 10000); } } @@ -184,7 +183,7 @@ finalize_it: /* cancellation cleanup handler for queueWorker () - * Updates admin structure and frees ressources. + * Most importantly, it must bring back the batch into a consistent state. * Keep in mind that cancellation is disabled if we run into * the cancel cleanup handler (and have been cancelled). * rgerhards, 2008-01-16 @@ -201,10 +200,9 @@ wtiWorkerCancelCleanup(void *arg) ISOBJ_TYPE_assert(pWtp, wtp); DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); + pWtp->pfObjProcessed(pWtp->pUsr, pThis); + DBGPRINTF("%s: done cancelation cleanup handler.\n", wtiGetDbgHdr(pThis)); - /* call user supplied handler */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); - ENDfunc } @@ -222,11 +220,8 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) BEGINfunc DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); - pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); - if(pThis->bAlwaysRunning) { /* never shut down any started worker */ -dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ @@ -235,11 +230,16 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } + dbgoprint((obj_t*) pThis, "worker awoke from idle processing\n"); ENDfunc } -/* generic worker thread framework +/* generic worker thread framework. Note that we prohibit cancellation + * during almost all times, because it can have very undesired side effects. + * However, we may need to cancel a thread if the consumer blocks for too + * long (during shutdown). So what we do is block cancellation, and every + * consumer must enable it during the periods where it is safe. */ #pragma GCC diagnostic ignored "-Wempty-body" rsRetVal @@ -258,8 +258,7 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - - pWtp->pfOnWorkerStartup(pWtp->pUsr); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ @@ -267,10 +266,8 @@ wtiWorker(wti_t *pThis) pWtp->pfRateLimiter(pWtp->pUsr); } -dbgprintf("YYY/ZZZ: pre lock mutex\n"); d_pthread_mutex_lock(pWtp->pmutUsr); -dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); if(terminateRet == RS_RET_TERMINATE_NOW) { @@ -288,7 +285,6 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); -dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { d_pthread_mutex_unlock(pWtp->pmutUsr); @@ -305,12 +301,8 @@ dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); } /* indicate termination */ - d_pthread_mutex_lock(pWtp->pmutUsr); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); pthread_cleanup_pop(0); /* remove cleanup handler */ - pWtp->pfOnWorkerShutdown(pWtp->pUsr); pthread_setcancelstate(iCancelStateSave, NULL); - d_pthread_mutex_unlock(pWtp->pmutUsr); RETiRet; } @@ -340,7 +332,7 @@ wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg) free(pThis->pszDbgHdr); } - if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL) + if((pThis->pszDbgHdr = MALLOC(sizeof(uchar) * lenMsg + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */ |