From 7667845bd72b6f92eabc975318a4f288a77f2630 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 22 Apr 2009 15:06:45 +0200 Subject: first attempt at dequeueing multiple batches inside the queue ... but this code has serious problems when terminating the queue, also it is far from being optimal. I will commit a series of patches (hopefully) as I am on the path to the final implementation. --- runtime/wtp.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 04eb974f..e1966099 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -88,6 +88,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; + pThis->pfGetDeqMaxAtOnce = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; @@ -117,7 +118,7 @@ wtpConstructFinalize(wtp_t *pThis) */ if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { CHKiRet(wtiConstruct(&pThis->pWrkr[i])); pWti = pThis->pWrkr[i]; @@ -584,6 +585,7 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) +DEFpropSetMethFP(wtp, pfGetDeqMaxAtOnce, rsRetVal(*pVal)(void*, int*)) DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) -- cgit v1.2.3 From 5c0aeae8ab1f344a022d586dc26c5d78203f7e0b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 12:50:07 +0200 Subject: added $MainMsgQueueDequeueBatchSize and $ActionQueueDequeueBatchSize configuration directives --- runtime/wtp.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index e1966099..8bb55cf7 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -88,7 +88,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; - pThis->pfGetDeqMaxAtOnce = NotImplementedDummy; + pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; @@ -585,7 +585,7 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) -DEFpropSetMethFP(wtp, pfGetDeqMaxAtOnce, rsRetVal(*pVal)(void*, int*)) +DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) -- cgit v1.2.3 From 6c5264159c099ddc4d06590508980ee53a83b67b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 14:29:41 +0200 Subject: fixing a small (newly-introduced) memory leak ... plus simplifying free() calls after agreement on mailing list that we no longer need to check if the pointer is non-NULL --- runtime/wtp.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 8bb55cf7..a23e85f9 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -152,8 +152,7 @@ CODESTARTobjDestruct(wtp) pthread_mutex_destroy(&pThis->mut); pthread_mutex_destroy(&pThis->mutThrdShutdwn); - if(pThis->pszDbgHdr != NULL) - free(pThis->pszDbgHdr); + free(pThis->pszDbgHdr); ENDobjDestruct(wtp) -- cgit v1.2.3 From 8159d0a117837ed573e80cf86f4f013bf9534ab2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 16:02:42 +0200 Subject: fixed abort condition in DA mode --- runtime/wtp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index a23e85f9..9891a55c 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -78,7 +78,7 @@ wtpGetDbgHdr(wtp_t *pThis) /* Not implemented dummy function for constructor */ -static rsRetVal NotImplementedDummy() { return RS_RET_OK; } +static rsRetVal NotImplementedDummy() { return RS_RET_NOT_IMPLEMENTED; } /* Standard-Constructor for the wtp object */ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! */ -- cgit v1.2.3 From ad7ccabe5ec616a4bf9fda1472d8041eaf1bf815 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 20 May 2009 11:11:02 +0200 Subject: yield() no longer needed on uniproc thanks to new algorithms --- runtime/wtp.c | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 9891a55c..dab59562 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -40,10 +40,10 @@ #include #include -#ifdef OS_SOLARIS -# include -# define pthread_yield() sched_yield() -#endif +/// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20 +//#ifdef OS_SOLARIS +//# include +//#endif #include "rsyslog.h" #include "stringbuf.h" @@ -510,14 +510,6 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) dbgprintf("%s: started with state %d, num workers now %d\n", wtpGetDbgHdr(pThis), iState, pThis->iCurNumWrkThrd); - /* we try to give the starting worker a little boost. It won't help much as we still - * hold the queue's mutex, but at least it has a chance to start on a single-CPU system. - */ -# if !defined(__hpux) /* pthread_yield is missing there! */ - if(pThis->bOptimizeUniProc) - pthread_yield(); -# endif - /* indicate we just started a worker and would like to see it running */ wtpSetInactivityGuard(pThis, 1, MUTEX_ALREADY_LOCKED); -- cgit v1.2.3 From 9f45b80ea9ea86d516c895d97fd8670df37e319e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 20 May 2009 15:12:49 +0200 Subject: free last processed message in all cases so far, the last processed message was only freed when the next one was processed. This has been changed now. More precisely, a better algorithm has been selected for the queue worker process, which also involves less overhead than the previous one. The fix for "free last processed message" as then more or less a side-effect (easy to do) of the new algorithm. --- runtime/wtp.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index dab59562..40a9095b 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -248,7 +248,6 @@ wtpSetState(wtp_t *pThis, wtpState_t iNewState) /* check if the worker shall shutdown (1 = yes, 0 = no) - * TODO: check if we can use atomic operations to enhance performance * Note: there may be two mutexes locked, the bLockUsrMutex is the one in our "user" * (e.g. the queue clas) * rgerhards, 2008-01-21 @@ -263,14 +262,14 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) - || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, bLockUsrMutex))) + || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, pThis))) iRet = RS_RET_TERMINATE_NOW; - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); /* try customer handler if one was set and we do not yet have a definite result */ if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) { iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); } + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } @@ -577,7 +576,7 @@ DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) -DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)) +DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) -- cgit v1.2.3 From aa9426f683fa6af9280bc63050ee0187ba4c57e1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 26 May 2009 12:43:43 +0200 Subject: solved design issue with queue termination ... and also improved the test suite. There is a design issue in the v3 queue engine that manifested to some serious problems with the new processing mode. However, in v3 shutdown may take eternally if a queue runs in DA mode, is configured to preserve data AND the action fails and retries immediately. There is no cure available for v3, it would require doing much of the work we have done on the new engine. The window of exposure, as one might guess from the description, is very small. That is probably the reason why we have not seen it in practice. --- runtime/wtp.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 40a9095b..41fcd8d9 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -261,16 +261,19 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) ISOBJ_TYPE_assert(pThis, wtp); BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); - if( (pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) - || ((pThis->wtpState == wtpState_SHUTDOWN) && pThis->pfIsIdle(pThis->pUsr, pThis))) - iRet = RS_RET_TERMINATE_NOW; + if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) { + ABORT_FINALIZE(RS_RET_TERMINATE_NOW); + } else if(pThis->wtpState == wtpState_SHUTDOWN) { + ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); + } /* try customer handler if one was set and we do not yet have a definite result */ - if(iRet == RS_RET_OK && pThis->pfChkStopWrkr != NULL) { + if(pThis->pfChkStopWrkr != NULL) { iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); } - END_MTX_PROTECTED_OPERATIONS(&pThis->mut); +finalize_it: + END_MTX_PROTECTED_OPERATIONS(&pThis->mut); RETiRet; } -- cgit v1.2.3 From d4564f8399f4362c7e79066370049f909cef996c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 27 May 2009 19:43:28 +0200 Subject: interim commit: working on failure cases slightly improved situation, would like to save it before carrying on --- runtime/wtp.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 41fcd8d9..7786a656 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -91,10 +91,12 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; + pThis->pfObjProcessed = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; pThis->pfOnWorkerCancel = NotImplementedDummy; pThis->pfOnWorkerStartup = NotImplementedDummy; pThis->pfOnWorkerShutdown = NotImplementedDummy; +dbgprintf("XXX: wtpConstruct: %d\n", pThis->wtpState); ENDobjConstruct(wtp) @@ -139,6 +141,7 @@ BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODE int i; CODESTARTobjDestruct(wtp) wtpProcessThrdChanges(pThis); /* process thread changes one last time */ +RUNLOG_STR("wtpDestruct"); /* destruct workers */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) @@ -260,17 +263,22 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) ISOBJ_TYPE_assert(pThis, wtp); +RUNLOG; BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) { +RUNLOG; ABORT_FINALIZE(RS_RET_TERMINATE_NOW); } else if(pThis->wtpState == wtpState_SHUTDOWN) { ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); +RUNLOG; } +RUNLOG_VAR("%d", iRet); /* try customer handler if one was set and we do not yet have a definite result */ if(pThis->pfChkStopWrkr != NULL) { iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); } +RUNLOG_VAR("%d", iRet); finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mut); @@ -292,13 +300,17 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout ISOBJ_TYPE_assert(pThis, wtp); +dbgprintf("XXX:10 wtp %p, state %d\n", pThis, pThis->wtpState); wtpSetState(pThis, tShutdownCmd); +dbgprintf("XXX:20 wtp %p, state %d\n", pThis, pThis->wtpState); wtpWakeupAllWrkr(pThis); +dbgprintf("XXX:30 wtp %p, state %d\n", pThis, pThis->wtpState); /* see if we need to harvest (join) any terminated threads (even in timeout case, * some may have terminated... */ wtpProcessThrdChanges(pThis); +dbgprintf("XXX:40 wtp %p, state %d\n", pThis, pThis->wtpState); /* and wait for their termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -306,7 +318,9 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; +dbgprintf("XXX:50 wtp %p, state %d\n", pThis, pThis->wtpState); while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { +dbgprintf("XXX:60 wtp %p, state %d\n", pThis, pThis->wtpState); dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); @@ -581,6 +595,7 @@ DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) +DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)) -- cgit v1.2.3 From 9517e19b6427c295e206ece9562ce70f4a6d7044 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 28 May 2009 09:59:45 +0200 Subject: preserving current changes ... in preparation for some larger changes - I need to apply some serious design changes, as the current system does not play well at all with ultra-reliable queues. Will do that in a totally new version. --- runtime/wtp.c | 6 ------ 1 file changed, 6 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 7786a656..a5836da3 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -141,7 +141,6 @@ BEGINobjDestruct(wtp) /* be sure to specify the object type also in END and CODE int i; CODESTARTobjDestruct(wtp) wtpProcessThrdChanges(pThis); /* process thread changes one last time */ -RUNLOG_STR("wtpDestruct"); /* destruct workers */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) @@ -263,22 +262,17 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockMutex, int bLockUsrMutex) ISOBJ_TYPE_assert(pThis, wtp); -RUNLOG; BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); if(pThis->wtpState == wtpState_SHUTDOWN_IMMEDIATE) { -RUNLOG; ABORT_FINALIZE(RS_RET_TERMINATE_NOW); } else if(pThis->wtpState == wtpState_SHUTDOWN) { ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); -RUNLOG; } -RUNLOG_VAR("%d", iRet); /* try customer handler if one was set and we do not yet have a definite result */ if(pThis->pfChkStopWrkr != NULL) { iRet = pThis->pfChkStopWrkr(pThis->pUsr, bLockUsrMutex); } -RUNLOG_VAR("%d", iRet); finalize_it: END_MTX_PROTECTED_OPERATIONS(&pThis->mut); -- cgit v1.2.3 From 13d4a23e92996e24d6a833ca75d06428c5387aa4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 28 May 2009 14:24:37 +0200 Subject: some more fixes for queue engine The enhanced testbench now runs without failures, again --- runtime/wtp.c | 6 ------ 1 file changed, 6 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index a5836da3..6b39793b 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -294,17 +294,13 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout ISOBJ_TYPE_assert(pThis, wtp); -dbgprintf("XXX:10 wtp %p, state %d\n", pThis, pThis->wtpState); wtpSetState(pThis, tShutdownCmd); -dbgprintf("XXX:20 wtp %p, state %d\n", pThis, pThis->wtpState); wtpWakeupAllWrkr(pThis); -dbgprintf("XXX:30 wtp %p, state %d\n", pThis, pThis->wtpState); /* see if we need to harvest (join) any terminated threads (even in timeout case, * some may have terminated... */ wtpProcessThrdChanges(pThis); -dbgprintf("XXX:40 wtp %p, state %d\n", pThis, pThis->wtpState); /* and wait for their termination */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); @@ -312,9 +308,7 @@ dbgprintf("XXX:40 wtp %p, state %d\n", pThis, pThis->wtpState); pthread_cleanup_push(mutexCancelCleanup, &pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); bTimedOut = 0; -dbgprintf("XXX:50 wtp %p, state %d\n", pThis, pThis->wtpState); while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) { -dbgprintf("XXX:60 wtp %p, state %d\n", pThis, pThis->wtpState); dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n", wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd); -- cgit v1.2.3 From 46024834449840dabf399dda196c9dd11cf78ace Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 25 Jun 2009 11:06:42 +0200 Subject: added a few atomic operations mostly to get thread debugger errors clean (plus, of course, it makes things more deterministic) --- runtime/wtp.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 267555cd..f5769a72 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -39,6 +39,7 @@ #include #include #include +#include /// TODO: check on solaris if this is any longer needed - I don't think so - rgerhards, 2009-09-20 //#ifdef OS_SOLARIS @@ -217,7 +218,7 @@ wtpProcessThrdChanges(wtp_t *pThis) */ do { /* reset the change marker */ - pThis->bThrdStateChanged = 0; + ATOMIC_STORE_0_TO_INT(pThis->bThrdStateChanged); /* go through all threads */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { wtiProcessThrdChanges(pThis->pWrkr[i], LOCK_MUTEX); -- cgit v1.2.3