summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/wti.c6
-rw-r--r--runtime/wti.h1
-rw-r--r--runtime/wtp.c12
-rw-r--r--runtime/wtp.h2
4 files changed, 15 insertions, 6 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index f91fb5a9..77197a95 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -171,6 +171,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(wti)
/* actual destruction */
batchFree(&pThis->batch);
+ pthread_cond_destroy(&pThis->pcondBusy);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
free(pThis->pszDbgHdr);
@@ -181,6 +182,7 @@ ENDobjDestruct(wti)
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
+ pthread_cond_init(&pThis->pcondBusy, NULL);
ENDobjConstruct(wti)
@@ -249,10 +251,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
- d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+ d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
- if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &t) != 0) {
DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
diff --git a/runtime/wti.h b/runtime/wti.h
index 014251f0..b0dc6c96 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -37,6 +37,7 @@ struct wti_s {
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
+ pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */
DEF_ATOMIC_HELPER_MUT(mutIsRunning);
};
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 19151e7c..0326d5dc 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -233,9 +233,9 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
/* lock mutex to prevent races (may otherwise happen during idle processing and such...) */
d_pthread_mutex_lock(pThis->pmutUsr);
wtpSetState(pThis, tShutdownCmd);
- pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
/* awake workers in retry loop */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy);
wtiWakeupThrd(pThis->pWrkr[i]);
}
d_pthread_mutex_unlock(pThis->pmutUsr);
@@ -455,7 +455,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
{
DEFiRet;
int nMissing; /* number workers missing to run */
- int i;
+ int i, nRunning;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -475,7 +475,13 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
CHKiRet(wtpStartWrkr(pThis));
}
} else {
- pthread_cond_signal(pThis->pcondBusy);
+ /* we have needed number of workers, but they may be sleeping */
+ for(i = 0, nRunning = 0; i < pThis->iNumWorkerThreads && nRunning < nMaxWrkr; ++i) {
+ if (wtiGetState(pThis->pWrkr[i]) != WRKTHRD_STOPPED) {
+ pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy);
+ nRunning++;
+ }
+ }
}
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 25992f7f..697722af 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -56,7 +56,7 @@ struct wtp_s {
void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */
pthread_attr_t attrThrd;/* attribute for new threads (created just once and cached here) */
pthread_mutex_t *pmutUsr;
- pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */
+ pthread_cond_t *pcondBusy; /* unused condition variable, was used to signal threads to wake up */
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */