diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/wti.c | 6 | ||||
-rw-r--r-- | runtime/wti.h | 1 | ||||
-rw-r--r-- | runtime/wtp.c | 12 | ||||
-rw-r--r-- | runtime/wtp.h | 2 |
4 files changed, 15 insertions, 6 deletions
diff --git a/runtime/wti.c b/runtime/wti.c index f91fb5a9..77197a95 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -171,6 +171,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(wti) /* actual destruction */ batchFree(&pThis->batch); + pthread_cond_destroy(&pThis->pcondBusy); DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); free(pThis->pszDbgHdr); @@ -181,6 +182,7 @@ ENDobjDestruct(wti) */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning); + pthread_cond_init(&pThis->pcondBusy, NULL); ENDobjConstruct(wti) @@ -249,10 +251,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) if(pThis->bAlwaysRunning) { /* never shut down any started worker */ - d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); + d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ - if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { + if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &t) != 0) { DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } diff --git a/runtime/wti.h b/runtime/wti.h index 014251f0..b0dc6c96 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -37,6 +37,7 @@ struct wti_s { wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ + pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */ DEF_ATOMIC_HELPER_MUT(mutIsRunning); }; diff --git a/runtime/wtp.c b/runtime/wtp.c index 19151e7c..0326d5dc 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -233,9 +233,9 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */ d_pthread_mutex_lock(pThis->pmutUsr); wtpSetState(pThis, tShutdownCmd); - pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */ /* awake workers in retry loop */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy); wtiWakeupThrd(pThis->pWrkr[i]); } d_pthread_mutex_unlock(pThis->pmutUsr); @@ -455,7 +455,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) { DEFiRet; int nMissing; /* number workers missing to run */ - int i; + int i, nRunning; ISOBJ_TYPE_assert(pThis, wtp); @@ -475,7 +475,13 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) CHKiRet(wtpStartWrkr(pThis)); } } else { - pthread_cond_signal(pThis->pcondBusy); + /* we have needed number of workers, but they may be sleeping */ + for(i = 0, nRunning = 0; i < pThis->iNumWorkerThreads && nRunning < nMaxWrkr; ++i) { + if (wtiGetState(pThis->pWrkr[i]) != WRKTHRD_STOPPED) { + pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy); + nRunning++; + } + } } diff --git a/runtime/wtp.h b/runtime/wtp.h index 25992f7f..697722af 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -56,7 +56,7 @@ struct wtp_s { void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */ pthread_attr_t attrThrd;/* attribute for new threads (created just once and cached here) */ pthread_mutex_t *pmutUsr; - pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */ + pthread_cond_t *pcondBusy; /* unused condition variable, was used to signal threads to wake up */ rsRetVal (*pfChkStopWrkr)(void *pUsr, int); rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */ rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */ |