diff options
Diffstat (limited to 'runtime/wtp.c')
-rw-r--r-- | runtime/wtp.c | 89 |
1 files changed, 49 insertions, 40 deletions
diff --git a/runtime/wtp.c b/runtime/wtp.c index 47b99fe8..060e6627 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -94,13 +94,8 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; pThis->pfGetDeqBatchSize = NotImplementedDummy; - pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfObjProcessed = NotImplementedDummy; - pThis->pfOnIdle = NotImplementedDummy; - pThis->pfOnWorkerCancel = NotImplementedDummy; - pThis->pfOnWorkerStartup = NotImplementedDummy; - pThis->pfOnWorkerShutdown = NotImplementedDummy; ENDobjConstruct(wtp) @@ -122,8 +117,7 @@ wtpConstructFinalize(wtp_t *pThis) /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ - if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + CHKmalloc(pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads)); for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { CHKiRet(wtiConstruct(&pThis->pWrkr[i])); @@ -160,20 +154,6 @@ CODESTARTobjDestruct(wtp) ENDobjDestruct(wtp) -/* wake up all worker threads. - * rgerhards, 2008-01-16 - */ -rsRetVal -wtpWakeupAllWrkr(wtp_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wtp); - pthread_cond_broadcast(pThis->pcondBusy); - RETiRet; -} - - /* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21 * We do not need to do atomic instructions as set operations are only * called when terminating the pool, and then in strict sequence. So we @@ -239,8 +219,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout ISOBJ_TYPE_assert(pThis, wtp); + /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */ + d_pthread_mutex_lock(pThis->pmutUsr); wtpSetState(pThis, tShutdownCmd); - wtpWakeupAllWrkr(pThis); + pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */ + d_pthread_mutex_unlock(pThis->pmutUsr); /* wait for worker thread termination */ d_pthread_mutex_lock(&pThis->mutWtp); @@ -285,14 +268,14 @@ wtpCancelAll(wtp_t *pThis) } -/* cancellation cleanup handler for executing worker decrements the worker counter. - * This is also called when the the worker is normally shut down. - * rgerhards, 2009-07-20 +/* this function contains shared code for both regular worker shutdown as + * well as shutdown via cancellation. We can not simply use pthread_cleanup_pop(1) + * as this introduces a race in the debug system (RETiRet system). + * rgerhards, 2009-10-26 */ -static void -wtpWrkrExecCancelCleanup(void *arg) +static inline void +wtpWrkrExecCleanup(wti_t *pWti) { - wti_t *pWti = (wti_t*) arg; wtp_t *pThis; BEGINfunc @@ -307,11 +290,37 @@ wtpWrkrExecCancelCleanup(void *arg) DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n", wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); - pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ ENDfunc } +/* cancellation cleanup handler for executing worker decrements the worker counter. + * rgerhards, 2009-07-20 + */ +static void +wtpWrkrExecCancelCleanup(void *arg) +{ + wti_t *pWti = (wti_t*) arg; + wtp_t *pThis; + + BEGINfunc + ISOBJ_TYPE_assert(pWti, wti); + pThis = pWti->pWtp; + ISOBJ_TYPE_assert(pThis, wtp); + DBGPRINTF("%s: Worker thread %lx requested to be cancelled.\n", + wtpGetDbgHdr(pThis), (unsigned long) pWti); + + wtpWrkrExecCleanup(pWti); + + ENDfunc + /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main + * thread after the broadcast, which could destroy the debug class, resulting in a potential + * segfault. So we need to do the broadcast as actually the last action in our processing + */ + pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ +} + + /* wtp worker shell. This is started and calls into the actual * wti worker. * rgerhards, 2008-01-21 @@ -345,9 +354,15 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti); wtiWorker(pWti); - pthread_cleanup_pop(1); + pthread_cleanup_pop(0); + wtpWrkrExecCleanup(pWti); ENDfunc + /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main + * thread after the broadcast, which could destroy the debug class, resulting in a potential + * segfault. So we need to do the broadcast as actually the last action in our processing + */ + pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ pthread_exit(0); } #pragma GCC diagnostic warning "-Wempty-body" @@ -411,7 +426,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) ISOBJ_TYPE_assert(pThis, wtp); -int nMaxWrkrTmp = nMaxWrkr; if(nMaxWrkr == 0) FINALIZE; @@ -419,16 +433,16 @@ int nMaxWrkrTmp = nMaxWrkr; nMaxWrkr = pThis->iNumWorkerThreads; nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd); -dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, pThis->iNumWorkerThreads, nMissing); if(nMissing > 0) { - DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); + DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", + wtpGetDbgHdr(pThis), nMissing); /* start the rqtd nbr of workers */ for(i = 0 ; i < nMissing ; ++i) { CHKiRet(wtpStartWrkr(pThis)); } } else { -dbgprintf("YYY: adivse signal cond busy"); +dbgprintf("YYY: wtpAdviseMaxWorkers, sufficient workers, just doing adivse signal cond busy\n"); pthread_cond_signal(pThis->pcondBusy); } @@ -448,13 +462,8 @@ DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) -DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*)) DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)) -DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) -DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) -DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)) -DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*)) /* set the debug header message @@ -478,7 +487,7 @@ wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg) pThis->pszDbgHdr = NULL; } - if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL) + if((pThis->pszDbgHdr = MALLOC(sizeof(uchar) * lenMsg + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */ |