summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c32
1 files changed, 12 insertions, 20 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index 53b695b0..288670b6 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -119,7 +119,7 @@ wtiSetState(wti_t *pThis, bool bNewVal)
* Note that when waiting for the thread to terminate, we do a busy wait, checking
* progress every 10ms. It is very unlikely that we will ever cancel a thread
* and, if so, it will only happen at the end of the rsyslog run. So doing this
- * kind of not optimal wait is considered preferable over using condition variables.
+ * kind of non-optimal wait is considered preferable over using condition variables.
* rgerhards, 2008-02-26
*/
rsRetVal
@@ -134,7 +134,6 @@ wtiCancelThrd(wti_t *pThis)
pthread_cancel(pThis->thrdID);
/* now wait until the thread terminates... */
while(wtiGetState(pThis)) {
-//fprintf(stderr, "sleep loop for getState\n");
srSleep(0, 10000);
}
}
@@ -184,7 +183,7 @@ finalize_it:
/* cancellation cleanup handler for queueWorker ()
- * Updates admin structure and frees ressources.
+ * Most importantly, it must bring back the batch into a consistent state.
* Keep in mind that cancellation is disabled if we run into
* the cancel cleanup handler (and have been cancelled).
* rgerhards, 2008-01-16
@@ -201,10 +200,9 @@ wtiWorkerCancelCleanup(void *arg)
ISOBJ_TYPE_assert(pWtp, wtp);
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
+ pWtp->pfObjProcessed(pWtp->pUsr, pThis);
+ DBGPRINTF("%s: done cancelation cleanup handler.\n", wtiGetDbgHdr(pThis));
- /* call user supplied handler */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
-
ENDfunc
}
@@ -222,11 +220,8 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
BEGINfunc
DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
- pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
-
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
-dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
@@ -235,11 +230,16 @@ dbgprintf("YYY/ZZZ: wti Idle wait cond busy, mutex %p\n", pWtp->pmutUsr);
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
}
+ dbgoprint((obj_t*) pThis, "worker awoke from idle processing\n");
ENDfunc
}
-/* generic worker thread framework
+/* generic worker thread framework. Note that we prohibit cancellation
+ * during almost all times, because it can have very undesired side effects.
+ * However, we may need to cancel a thread if the consumer blocks for too
+ * long (during shutdown). So what we do is block cancellation, and every
+ * consumer must enable it during the periods where it is safe.
*/
#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
@@ -258,8 +258,7 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
-
- pWtp->pfOnWorkerStartup(pWtp->pUsr);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
@@ -267,10 +266,8 @@ wtiWorker(wti_t *pThis)
pWtp->pfRateLimiter(pWtp->pUsr);
}
-dbgprintf("YYY/ZZZ: pre lock mutex\n");
d_pthread_mutex_lock(pWtp->pmutUsr);
-dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
if(terminateRet == RS_RET_TERMINATE_NOW) {
@@ -288,7 +285,6 @@ dbgprintf("YYY/ZZZ: wti locks mutex %p\n", pWtp->pmutUsr);
*/
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
-dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr);
if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
d_pthread_mutex_unlock(pWtp->pmutUsr);
@@ -305,12 +301,8 @@ dbgprintf("YYY/ZZZ: wti loop locked mutex %p again\n", pWtp->pmutUsr);
}
/* indicate termination */
- d_pthread_mutex_lock(pWtp->pmutUsr);
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
pthread_cleanup_pop(0); /* remove cleanup handler */
- pWtp->pfOnWorkerShutdown(pWtp->pUsr);
pthread_setcancelstate(iCancelStateSave, NULL);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
RETiRet;
}
@@ -340,7 +332,7 @@ wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg)
free(pThis->pszDbgHdr);
}
- if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL)
+ if((pThis->pszDbgHdr = MALLOC(sizeof(uchar) * lenMsg + 1)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */