From 4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 13 Oct 2009 14:38:45 +0200 Subject: added some debug settings plus improved shutdown sequence ... non-working version! --- runtime/wtp.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 40d031dc..93234819 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -169,9 +169,9 @@ wtpWakeupAllWrkr(wtp_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, wtp); - d_pthread_mutex_lock(pThis->pmutUsr); + //d_pthread_mutex_lock(pThis->pmutUsr); pthread_cond_broadcast(pThis->pcondBusy); - d_pthread_mutex_unlock(pThis->pmutUsr); + //d_pthread_mutex_unlock(pThis->pmutUsr); RETiRet; } -- cgit v1.2.3 From c5408da3d8f17691fb91282d031757ed041fec55 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 14 Oct 2009 11:01:21 +0200 Subject: new queue engine - initial commit (probably not 100% working!) simplified and thus speeded up the queue engine, also fixed some potential race conditions (in very unusual shutdown conditions) along the way. The threading model has seriously changes, so there may be some regressions. NOTE: the code passed basic tests, but there is still more work and testing to be done. This commit should be treated with care. --- runtime/wtp.c | 35 +++++++---------------------------- 1 file changed, 7 insertions(+), 28 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 93234819..3e76bb56 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -94,13 +94,8 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; pThis->pfGetDeqBatchSize = NotImplementedDummy; - pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfObjProcessed = NotImplementedDummy; - pThis->pfOnIdle = NotImplementedDummy; - pThis->pfOnWorkerCancel = NotImplementedDummy; - pThis->pfOnWorkerStartup = NotImplementedDummy; - pThis->pfOnWorkerShutdown = NotImplementedDummy; ENDobjConstruct(wtp) @@ -160,22 +155,6 @@ CODESTARTobjDestruct(wtp) ENDobjDestruct(wtp) -/* wake up all worker threads. - * rgerhards, 2008-01-16 - */ -rsRetVal -wtpWakeupAllWrkr(wtp_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, wtp); - //d_pthread_mutex_lock(pThis->pmutUsr); - pthread_cond_broadcast(pThis->pcondBusy); - //d_pthread_mutex_unlock(pThis->pmutUsr); - RETiRet; -} - - /* Sent a specific state for the worker thread pool. -- rgerhards, 2008-01-21 * We do not need to do atomic instructions as set operations are only * called when terminating the pool, and then in strict sequence. So we @@ -211,8 +190,10 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex) wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState); if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) { +RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_NOW"); ABORT_FINALIZE(RS_RET_TERMINATE_NOW); } else if(wtpState == wtpState_SHUTDOWN) { +RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_WHEN_IDLE"); ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); } @@ -241,8 +222,11 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout ISOBJ_TYPE_assert(pThis, wtp); + /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */ + d_pthread_mutex_lock(pThis->pmutUsr); wtpSetState(pThis, tShutdownCmd); - wtpWakeupAllWrkr(pThis); + pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */ + d_pthread_mutex_unlock(pThis->pmutUsr); /* wait for worker thread termination */ d_pthread_mutex_lock(&pThis->mutWtp); @@ -430,7 +414,7 @@ dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, p CHKiRet(wtpStartWrkr(pThis)); } } else { -dbgprintf("YYY: adivse signal cond busy"); +dbgprintf("YYY: wtpAdviseMaxWorkers, sufficient workers, just doing adivse signal cond busy\n"); pthread_cond_signal(pThis->pcondBusy); } @@ -450,13 +434,8 @@ DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) -DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*)) DEFpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*)) -DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) -DEFpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*, void*)) -DEFpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*)) -DEFpropSetMethFP(wtp, pfOnWorkerShutdown, rsRetVal(*pVal)(void*)) /* set the debug header message -- cgit v1.2.3 From e04e1b50025f5fa9c26abd946190dce8f797d08f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 22 Oct 2009 11:33:38 +0200 Subject: enhanced test environment (including testbench) support for enhancing probability of memory addressing failure by using non-NULL default value for malloced memory (optional, only if requested by configure option). This helps to track down some otherwise undetected issues within the testbench and is expected to be very useful in the future. --- runtime/wtp.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 3e76bb56..08cf5c3d 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -117,7 +117,7 @@ wtpConstructFinalize(wtp_t *pThis) /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ - if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) + if((pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { @@ -459,7 +459,7 @@ wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg) pThis->pszDbgHdr = NULL; } - if((pThis->pszDbgHdr = malloc(sizeof(uchar) * lenMsg + 1)) == NULL) + if((pThis->pszDbgHdr = MALLOC(sizeof(uchar) * lenMsg + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */ -- cgit v1.2.3 From b585a4e90940e9f4d2d288d462d1c273ae5ffa09 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 26 Oct 2009 18:53:01 +0100 Subject: addressed some race issues during queue shutdown these occured in very unusual scenarios where we had a DA-queue running in parallel and very lengthy actions. Then, in some situations, the shutdown could hang. The code needs some addition lab time, but is believed to be much better than any previous version. --- runtime/wtp.c | 48 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 8 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index 08cf5c3d..e075e5b8 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -271,14 +271,14 @@ wtpCancelAll(wtp_t *pThis) } -/* cancellation cleanup handler for executing worker decrements the worker counter. - * This is also called when the the worker is normally shut down. - * rgerhards, 2009-07-20 +/* this function contains shared code for both regular worker shutdown as + * well as shutdown via cancellation. We can not simply use pthread_cleanup_pop(1) + * as this introduces a race in the debug system (RETiRet system). + * rgerhards, 2009-10-26 */ -static void -wtpWrkrExecCancelCleanup(void *arg) +static inline void +wtpWrkrExecCleanup(wti_t *pWti) { - wti_t *pWti = (wti_t*) arg; wtp_t *pThis; BEGINfunc @@ -293,11 +293,37 @@ wtpWrkrExecCancelCleanup(void *arg) DBGPRINTF("%s: Worker thread %lx, terminated, num workers now %d\n", wtpGetDbgHdr(pThis), (unsigned long) pWti, ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd)); - pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ ENDfunc } +/* cancellation cleanup handler for executing worker decrements the worker counter. + * rgerhards, 2009-07-20 + */ +static void +wtpWrkrExecCancelCleanup(void *arg) +{ + wti_t *pWti = (wti_t*) arg; + wtp_t *pThis; + + BEGINfunc + ISOBJ_TYPE_assert(pWti, wti); + pThis = pWti->pWtp; + ISOBJ_TYPE_assert(pThis, wtp); + DBGPRINTF("%s: Worker thread %lx requested to be cancelled.\n", + wtpGetDbgHdr(pThis), (unsigned long) pWti); + + wtpWrkrExecCleanup(pWti); + + ENDfunc + /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main + * thread after the broadcast, which could destroy the debug class, resulting in a potential + * segfault. So we need to do the broadcast as actually the last action in our processing + */ + pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ +} + + /* wtp worker shell. This is started and calls into the actual * wti worker. * rgerhards, 2008-01-21 @@ -331,9 +357,15 @@ wtpWorker(void *arg) /* the arg is actually a wti object, even though we are in pthread_cleanup_push(wtpWrkrExecCancelCleanup, pWti); wtiWorker(pWti); - pthread_cleanup_pop(1); + pthread_cleanup_pop(0); + wtpWrkrExecCleanup(pWti); ENDfunc + /* NOTE: we must call ENDfunc FIRST, because otherwise the schedule may activate the main + * thread after the broadcast, which could destroy the debug class, resulting in a potential + * segfault. So we need to do the broadcast as actually the last action in our processing + */ + pthread_cond_broadcast(&pThis->condThrdTrm); /* activate anyone waiting on thread shutdown */ pthread_exit(0); } #pragma GCC diagnostic warning "-Wempty-body" -- cgit v1.2.3 From 553d1880d47b57b2f4e023c2017675f010afd9a0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 10:36:53 +0100 Subject: some cleanup --- runtime/wtp.c | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'runtime/wtp.c') diff --git a/runtime/wtp.c b/runtime/wtp.c index e075e5b8..060e6627 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -117,8 +117,7 @@ wtpConstructFinalize(wtp_t *pThis) /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ - if((pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + CHKmalloc(pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads)); for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { CHKiRet(wtiConstruct(&pThis->pWrkr[i])); @@ -190,10 +189,8 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex) wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState); if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) { -RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_NOW"); ABORT_FINALIZE(RS_RET_TERMINATE_NOW); } else if(wtpState == wtpState_SHUTDOWN) { -RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_WHEN_IDLE"); ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE); } @@ -429,7 +426,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) ISOBJ_TYPE_assert(pThis, wtp); -int nMaxWrkrTmp = nMaxWrkr; if(nMaxWrkr == 0) FINALIZE; @@ -437,10 +433,10 @@ int nMaxWrkrTmp = nMaxWrkr; nMaxWrkr = pThis->iNumWorkerThreads; nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd); -dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, pThis->iNumWorkerThreads, nMissing); if(nMissing > 0) { - DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing); + DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", + wtpGetDbgHdr(pThis), nMissing); /* start the rqtd nbr of workers */ for(i = 0 ; i < nMissing ; ++i) { CHKiRet(wtpStartWrkr(pThis)); -- cgit v1.2.3