summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c57
1 files changed, 43 insertions, 14 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index 288670b6..9343f5c5 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -39,11 +39,6 @@
#include <pthread.h>
#include <errno.h>
-/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20
-//#ifdef OS_SOLARIS
-//# include <sched.h>
-//#endif
-
#include "rsyslog.h"
#include "stringbuf.h"
#include "srUtils.h"
@@ -79,10 +74,10 @@ wtiGetDbgHdr(wti_t *pThis)
/* return the current worker processing state. For the sake of
* simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17
*/
-bool
+sbool
wtiGetState(wti_t *pThis)
{
- return ATOMIC_FETCH_32BIT(pThis->bIsRunning);
+ return ATOMIC_FETCH_32BIT(&pThis->bIsRunning, &pThis->mutIsRunning);
}
@@ -102,17 +97,39 @@ wtiSetAlwaysRunning(wti_t *pThis)
* is inside wti). -- rgerhards, 2009-07-17
*/
rsRetVal
-wtiSetState(wti_t *pThis, bool bNewVal)
+wtiSetState(wti_t *pThis, sbool bNewVal)
{
ISOBJ_TYPE_assert(pThis, wti);
- if(bNewVal)
- ATOMIC_STORE_1_TO_INT(pThis->bIsRunning);
- else
- ATOMIC_STORE_0_TO_INT(pThis->bIsRunning);
+ if(bNewVal) {
+ ATOMIC_STORE_1_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning);
+ } else {
+ ATOMIC_STORE_0_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning);
+ }
return RS_RET_OK;
}
+/* advise all workers to start by interrupting them. That should unblock all srSleep()
+ * calls.
+ */
+rsRetVal
+wtiWakeupThrd(wti_t *pThis)
+{
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, wti);
+
+
+ if(wtiGetState(pThis)) {
+ /* we first try the cooperative "cancel" interface */
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ dbgprintf("sent SIGTTIN to worker thread 0x%x\n", (unsigned) pThis->thrdID);
+ }
+
+ RETiRet;
+}
+
+
/* Cancel the thread. If the thread is not running. But it is save and legal to
* call wtiCancelThrd() in such situations. This function only returns when the
* thread has terminated. Else we may get race conditions all over the code...
@@ -129,7 +146,16 @@ wtiCancelThrd(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
+
+ if(wtiGetState(pThis)) {
+ /* we first try the cooperative "cancel" interface */
+ pthread_kill(pThis->thrdID, SIGTTIN);
+ dbgprintf("sent SIGTTIN to worker thread 0x%x, giving it a chance to terminate\n", (unsigned) pThis->thrdID);
+ srSleep(0, 10000);
+ }
+
if(wtiGetState(pThis)) {
+ dbgprintf("cooperative worker termination failed, using cancellation...\n");
dbgoprint((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
/* now wait until the thread terminates... */
@@ -146,7 +172,9 @@ wtiCancelThrd(wti_t *pThis)
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
/* actual destruction */
- free(pThis->batch.pElem);
+ batchFree(&pThis->batch);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
+
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -154,6 +182,7 @@ ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
+ INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
ENDobjConstruct(wti)
@@ -175,7 +204,7 @@ wtiConstructFinalize(wti_t *pThis)
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
- CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t)));
+ CHKiRet(batchInit(&pThis->batch, iDeqBatchSize));
finalize_it:
RETiRet;