From 4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 13 Oct 2009 14:38:45 +0200 Subject: added some debug settings plus improved shutdown sequence ... non-working version! --- runtime/wti.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'runtime/wti.c') diff --git a/runtime/wti.c b/runtime/wti.c index 53b695b0..c3ab0aba 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -273,7 +273,9 @@ dbgprintf("YYY/ZZZ: pre lock mutex\n"); dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); +RUNLOG; if(terminateRet == RS_RET_TERMINATE_NOW) { +RUNLOG; /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", @@ -281,6 +283,7 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); d_pthread_mutex_unlock(pWtp->pmutUsr); break; } +RUNLOG; /* try to execute and process whatever we have */ /* Note that this function releases and re-aquires the mutex. The returned @@ -290,27 +293,39 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); if(localRet == RS_RET_IDLE) { +RUNLOG; if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } +RUNLOG; doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); +RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); +RUNLOG; continue; /* request next iteration */ } +RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); bInactivityTOOccured = 0; /* reset for next run */ } /* indicate termination */ +RUNLOG; d_pthread_mutex_lock(pWtp->pmutUsr); +RUNLOG; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); +RUNLOG; pthread_cleanup_pop(0); /* remove cleanup handler */ +RUNLOG; pWtp->pfOnWorkerShutdown(pWtp->pUsr); +RUNLOG; pthread_setcancelstate(iCancelStateSave, NULL); +RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); +RUNLOG; RETiRet; } -- cgit v1.2.3 From c5408da3d8f17691fb91282d031757ed041fec55 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 14 Oct 2009 11:01:21 +0200 Subject: new queue engine - initial commit (probably not 100% working!) simplified and thus speeded up the queue engine, also fixed some potential race conditions (in very unusual shutdown conditions) along the way. The threading model has seriously changes, so there may be some regressions. NOTE: the code passed basic tests, but there is still more work and testing to be done. This commit should be treated with care. --- runtime/wti.c | 34 +++------------------------------- 1 file changed, 3 insertions(+), 31 deletions(-) (limited to 'runtime/wti.c') diff --git a/runtime/wti.c b/runtime/wti.c index c3ab0aba..24988cbe 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -189,6 +189,7 @@ finalize_it: * the cancel cleanup handler (and have been cancelled). * rgerhards, 2008-01-16 */ +// TODO: REMOVE THIS FUNCTION, CURRENTLY ONLY PRESENT TO PROVIDE DEBUG OUTPUT -- rgerhards, 2009-10-14 static void wtiWorkerCancelCleanup(void *arg) { @@ -202,9 +203,6 @@ wtiWorkerCancelCleanup(void *arg) DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); - /* call user supplied handler */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp); - ENDfunc } @@ -222,8 +220,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) BEGINfunc DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis)); - pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED); - if(pThis->bAlwaysRunning) { /* never shut down any started worker */ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); @@ -235,6 +231,7 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } + dbgoprint((obj_t*) pThis, "worker awoke from idle processing\n"); ENDfunc } @@ -249,7 +246,6 @@ wtiWorker(wti_t *pThis) int bInactivityTOOccured = 0; rsRetVal localRet; rsRetVal terminateRet; - int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -259,23 +255,18 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); - pWtp->pfOnWorkerStartup(pWtp->pUsr); - /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); } -dbgprintf("YYY/ZZZ: pre lock mutex\n"); d_pthread_mutex_lock(pWtp->pmutUsr); -dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr); /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); -RUNLOG; +RUNLOG_VAR("%d", terminateRet); if(terminateRet == RS_RET_TERMINATE_NOW) { -RUNLOG; /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", @@ -283,7 +274,6 @@ RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); break; } -RUNLOG; /* try to execute and process whatever we have */ /* Note that this function releases and re-aquires the mutex. The returned @@ -291,41 +281,23 @@ RUNLOG; */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); -dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr); if(localRet == RS_RET_IDLE) { -RUNLOG; if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } -RUNLOG; doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); -RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); -RUNLOG; continue; /* request next iteration */ } -RUNLOG; d_pthread_mutex_unlock(pWtp->pmutUsr); bInactivityTOOccured = 0; /* reset for next run */ } /* indicate termination */ -RUNLOG; - d_pthread_mutex_lock(pWtp->pmutUsr); -RUNLOG; - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); -RUNLOG; pthread_cleanup_pop(0); /* remove cleanup handler */ -RUNLOG; - pWtp->pfOnWorkerShutdown(pWtp->pUsr); -RUNLOG; - pthread_setcancelstate(iCancelStateSave, NULL); -RUNLOG; - d_pthread_mutex_unlock(pWtp->pmutUsr); -RUNLOG; RETiRet; } -- cgit v1.2.3 From e04e1b50025f5fa9c26abd946190dce8f797d08f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 22 Oct 2009 11:33:38 +0200 Subject: enhanced test environment (including testbench) support for enhancing probability of memory addressing failure by using non-NULL default value for malloced memory (optional, only if requested by configure option). This helps to track down some otherwise undetected issues within the testbench and is expected to be very useful in the future. --- runtime/wti.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/wti.c') diff --git a/runtime/wti.c b/runtime/wti.c index 24988cbe..3d98b4c4 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -327,7 +327,7 @@ wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg) free(pThis->pszDbgHdr); } - if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL) + if((pThis->pszDbgHdr = MALLOC(sizeof(uchar) * lenMsg + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */ -- cgit v1.2.3 From b585a4e90940e9f4d2d288d462d1c273ae5ffa09 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 26 Oct 2009 18:53:01 +0100 Subject: addressed some race issues during queue shutdown these occured in very unusual scenarios where we had a DA-queue running in parallel and very lengthy actions. Then, in some situations, the shutdown could hang. The code needs some addition lab time, but is believed to be much better than any previous version. --- runtime/wti.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'runtime/wti.c') diff --git a/runtime/wti.c b/runtime/wti.c index 3d98b4c4..e5237885 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -202,6 +202,8 @@ wtiWorkerCancelCleanup(void *arg) ISOBJ_TYPE_assert(pWtp, wtp); DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); + pWtp->pfObjProcessed(pWtp->pUsr, pThis); + DBGPRINTF("%s: done cancelation cleanup handler.\n", wtiGetDbgHdr(pThis)); ENDfunc } -- cgit v1.2.3 From 24cd5aee4720a98e321b69d2d9b5948348abd571 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 10:00:23 +0100 Subject: fixed race condition during queue shutdown Problems could happen if the queue worker needed to be cancelled and this cancellation happened inside queue-code (including wtp, wti). We have now solved this by disabling cancellation while in this code and only enabling it when working inside the user consumer. This exactly matches the use case for which cancellation may be needed. --- runtime/wti.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'runtime/wti.c') diff --git a/runtime/wti.c b/runtime/wti.c index e5237885..aade156e 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -184,12 +184,11 @@ finalize_it: /* cancellation cleanup handler for queueWorker () - * Updates admin structure and frees ressources. + * Most importantly, it must bring back the batch into a consistent state. * Keep in mind that cancellation is disabled if we run into * the cancel cleanup handler (and have been cancelled). * rgerhards, 2008-01-16 */ -// TODO: REMOVE THIS FUNCTION, CURRENTLY ONLY PRESENT TO PROVIDE DEBUG OUTPUT -- rgerhards, 2009-10-14 static void wtiWorkerCancelCleanup(void *arg) { @@ -224,7 +223,6 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) if(pThis->bAlwaysRunning) { /* never shut down any started worker */ -dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ @@ -238,7 +236,11 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr); } -/* generic worker thread framework +/* generic worker thread framework. Note that we prohibit cancellation + * during almost all times, because it can have very undesired side effects. + * However, we may need to cancel a thread if the consumer blocks for too + * long (during shutdown). So what we do is block cancellation, and every + * consumer must enable it during the periods where it is safe. */ #pragma GCC diagnostic ignored "-Wempty-body" rsRetVal @@ -248,6 +250,7 @@ wtiWorker(wti_t *pThis) int bInactivityTOOccured = 0; rsRetVal localRet; rsRetVal terminateRet; + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -256,6 +259,7 @@ wtiWorker(wti_t *pThis) dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); /* now we have our identity, on to real processing */ while(1) { /* loop will be broken below - need to do mutex locks */ @@ -300,6 +304,7 @@ RUNLOG_VAR("%d", terminateRet); /* indicate termination */ pthread_cleanup_pop(0); /* remove cleanup handler */ + pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } -- cgit v1.2.3 From 553d1880d47b57b2f4e023c2017675f010afd9a0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 10:36:53 +0100 Subject: some cleanup --- runtime/wti.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'runtime/wti.c') diff --git a/runtime/wti.c b/runtime/wti.c index aade156e..288670b6 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -119,7 +119,7 @@ wtiSetState(wti_t *pThis, bool bNewVal) * Note that when waiting for the thread to terminate, we do a busy wait, checking * progress every 10ms. It is very unlikely that we will ever cancel a thread * and, if so, it will only happen at the end of the rsyslog run. So doing this - * kind of not optimal wait is considered preferable over using condition variables. + * kind of non-optimal wait is considered preferable over using condition variables. * rgerhards, 2008-02-26 */ rsRetVal @@ -134,7 +134,6 @@ wtiCancelThrd(wti_t *pThis) pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ while(wtiGetState(pThis)) { -//fprintf(stderr, "sleep loop for getState\n"); srSleep(0, 10000); } } @@ -271,7 +270,6 @@ wtiWorker(wti_t *pThis) /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); -RUNLOG_VAR("%d", terminateRet); if(terminateRet == RS_RET_TERMINATE_NOW) { /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); -- cgit v1.2.3