From 772cba6f2e34660fa74a7e2f5ec2823a2cdb6b04 Mon Sep 17 00:00:00 2001 From: Pavel Levshin Date: Wed, 6 Nov 2013 18:37:47 +0100 Subject: improve worker thread pool handling among others, make possible that workers really timeout and the pool thus shrinks --- runtime/wti.c | 6 ++++-- runtime/wti.h | 1 + runtime/wtp.c | 12 +++++++++--- runtime/wtp.h | 2 +- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/runtime/wti.c b/runtime/wti.c index f91fb5a9..77197a95 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -171,6 +171,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(wti) /* actual destruction */ batchFree(&pThis->batch); + pthread_cond_destroy(&pThis->pcondBusy); DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); free(pThis->pszDbgHdr); @@ -181,6 +182,7 @@ ENDobjDestruct(wti) */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning); + pthread_cond_init(&pThis->pcondBusy, NULL); ENDobjConstruct(wti) @@ -249,10 +251,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) if(pThis->bAlwaysRunning) { /* never shut down any started worker */ - d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); + d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ - if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { + if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &t) != 0) { DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } diff --git a/runtime/wti.h b/runtime/wti.h index 014251f0..b0dc6c96 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -37,6 +37,7 @@ struct wti_s { wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ + pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */ DEF_ATOMIC_HELPER_MUT(mutIsRunning); }; diff --git a/runtime/wtp.c b/runtime/wtp.c index 19151e7c..0326d5dc 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -233,9 +233,9 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */ d_pthread_mutex_lock(pThis->pmutUsr); wtpSetState(pThis, tShutdownCmd); - pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */ /* awake workers in retry loop */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy); wtiWakeupThrd(pThis->pWrkr[i]); } d_pthread_mutex_unlock(pThis->pmutUsr); @@ -455,7 +455,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) { DEFiRet; int nMissing; /* number workers missing to run */ - int i; + int i, nRunning; ISOBJ_TYPE_assert(pThis, wtp); @@ -475,7 +475,13 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) CHKiRet(wtpStartWrkr(pThis)); } } else { - pthread_cond_signal(pThis->pcondBusy); + /* we have needed number of workers, but they may be sleeping */ + for(i = 0, nRunning = 0; i < pThis->iNumWorkerThreads && nRunning < nMaxWrkr; ++i) { + if (wtiGetState(pThis->pWrkr[i]) != WRKTHRD_STOPPED) { + pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy); + nRunning++; + } + } } diff --git a/runtime/wtp.h b/runtime/wtp.h index 25992f7f..697722af 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -56,7 +56,7 @@ struct wtp_s { void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */ pthread_attr_t attrThrd;/* attribute for new threads (created just once and cached here) */ pthread_mutex_t *pmutUsr; - pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */ + pthread_cond_t *pcondBusy; /* unused condition variable, was used to signal threads to wake up */ rsRetVal (*pfChkStopWrkr)(void *pUsr, int); rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */ rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */ -- cgit v1.2.3 From 0a0f68c6fe93519e718d4d10c6cf47a0b3a9b890 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 6 Nov 2013 18:40:08 +0100 Subject: doc: mention Pavel's improvement in ChangeLog --- ChangeLog | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ChangeLog b/ChangeLog index 2d5acf1a..a3a47b50 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,10 @@ +--------------------------------------------------------------------------- Version 7.4.7 [v7.4-stable] 2013-11-?? +- worker thread pool handling has been improved + Among others, permits pool to actually shrink (was quite hard with + previous implementation. This will also improve performance and/or + lower system overhead on busy systems. + Thanks to Pavel Levshin for the enhancement. - improved checking of queue config parameters on startup - bugfix: call to ruleset with async queue did not use the queue closes: http://bugzilla.adiscon.com/show_bug.cgi?id=443 -- cgit v1.2.3 From 4f88aa20b4e5ace523a2fb4b5f221d9e70e26d13 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 6 Nov 2013 19:06:27 +0100 Subject: do not complain for queue size on DA queues --- runtime/queue.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runtime/queue.c b/runtime/queue.c index 30269152..7ad2b849 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2106,7 +2106,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ break; } - if(pThis->iMaxQueueSize < 100) { + if(pThis->iMaxQueueSize < 100 + && (pThis->qType == QUEUETYPE_LINKEDLIST || pThis->qType == QUEUETYPE_FIXED_ARRAY)) { errmsg.LogError(0, RS_RET_OK_WARN, "Note: queue.size=\"%d\" is very " "low and can lead to unpredictable results. See also " "http://www.rsyslog.com/lower-bound-for-queue-sizes/", -- cgit v1.2.3 From 3f258be41684ff149e24adc3a3f80a47820614fa Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 7 Nov 2013 08:57:16 +0100 Subject: do not adjust batch size for disk queues --- runtime/queue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/queue.c b/runtime/queue.c index 7ad2b849..30516876 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2123,7 +2123,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) pThis->iLightDlyMrk = pThis->iMaxQueueSize - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */ - if(pThis->iDeqBatchSize > pThis->iMaxQueueSize) + if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) pThis->iDeqBatchSize = pThis->iMaxQueueSize; /* finalize some initializations that could not yet be done because it is -- cgit v1.2.3 From 00d2e47530f3b124f18036fe9d446fb8eb90e0d9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 7 Nov 2013 09:07:44 +0100 Subject: cosmetic: add test headers to some tests actually a bit more than cosmetic - makes it easier to see what failed --- tests/daqueue-persist.sh | 1 + tests/diskqueue.sh | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/daqueue-persist.sh b/tests/daqueue-persist.sh index feb2a347..0781a7dc 100755 --- a/tests/daqueue-persist.sh +++ b/tests/daqueue-persist.sh @@ -2,6 +2,7 @@ # to carry out multiple tests with different queue modes # added 2009-05-27 by Rgerhards # This file is part of the rsyslog project, released under GPLv3 +echo =============================================================================== echo \[daqueue-persist.sh\]: test data persisting at shutdown source $srcdir/daqueue-persist-drvr.sh LinkedList source $srcdir/daqueue-persist-drvr.sh FixedArray diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh index b871e9eb..853a836a 100755 --- a/tests/diskqueue.sh +++ b/tests/diskqueue.sh @@ -5,6 +5,7 @@ # added 2009-04-17 by Rgerhards # This file is part of the rsyslog project, released under GPLv3 # uncomment for debugging support: +echo =============================================================================== echo \[diskqueue.sh\]: testing queue disk-only mode # uncomment for debugging support: #export RSYSLOG_DEBUG="debug nostdout noprintmutexaction" -- cgit v1.2.3 From 6a27cd816c6972c553f1e3f38dd8f9ea3900e709 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 6 Nov 2013 19:06:27 +0100 Subject: do not complain for queue size on DA queues --- runtime/queue.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runtime/queue.c b/runtime/queue.c index 409424e7..925ef71e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2084,7 +2084,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ break; } - if(pThis->iMaxQueueSize < 100) { + if(pThis->iMaxQueueSize < 100 + && (pThis->qType == QUEUETYPE_LINKEDLIST || pThis->qType == QUEUETYPE_FIXED_ARRAY)) { errmsg.LogError(0, RS_RET_OK_WARN, "Note: queue.size=\"%d\" is very " "low and can lead to unpredictable results. See also " "http://www.rsyslog.com/lower-bound-for-queue-sizes/", -- cgit v1.2.3 From 8752ce5a2e8fd50325364bb733619e7b8beb4876 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 7 Nov 2013 08:57:16 +0100 Subject: do not adjust batch size for disk queues --- runtime/queue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/queue.c b/runtime/queue.c index 925ef71e..073c6823 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2101,7 +2101,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) pThis->iLightDlyMrk = pThis->iMaxQueueSize - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */ - if(pThis->iDeqBatchSize > pThis->iMaxQueueSize) + if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) pThis->iDeqBatchSize = pThis->iMaxQueueSize; /* finalize some initializations that could not yet be done because it is -- cgit v1.2.3 From 248e6bf819312e6ab43df0e46269080a8518a2f9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 7 Nov 2013 12:25:46 +0100 Subject: cleanup no longer needed code --- runtime/queue.c | 4 ---- runtime/queue.h | 2 +- runtime/wtp.c | 1 - runtime/wtp.h | 2 -- 4 files changed, 1 insertion(+), 8 deletions(-) diff --git a/runtime/queue.c b/runtime/queue.c index 30516876..ec9470c7 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -484,7 +484,6 @@ InitDA(qqueue_t *pThis, int bLockMutex) CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); @@ -2140,7 +2139,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pthread_mutex_init(&pThis->mutThrdMgmt, NULL); pthread_cond_init (&pThis->notFull, NULL); - pthread_cond_init (&pThis->notEmpty, NULL); pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL); pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL); @@ -2181,7 +2179,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); CHKiRet(wtpSettoWrkShutdown (pThis->pWtpReg, pThis->toWrkShutdown)); CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis)); @@ -2446,7 +2443,6 @@ CODESTARTobjDestruct(qqueue) } pthread_mutex_destroy(&pThis->mutThrdMgmt); pthread_cond_destroy(&pThis->notFull); - pthread_cond_destroy(&pThis->notEmpty); pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); diff --git a/runtime/queue.h b/runtime/queue.h index 844523ad..1918e502 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -122,7 +122,7 @@ struct queue_s { /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ - pthread_cond_t notFull, notEmpty; + pthread_cond_t notFull; pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ int bThrdStateChanged; /* at least one thread state has changed if 1 */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 0326d5dc..895c1ffe 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -496,7 +496,6 @@ DEFpropSetMeth(wtp, wtpState, wtpState_t) DEFpropSetMeth(wtp, iNumWorkerThreads, int) DEFpropSetMeth(wtp, pUsr, void*) DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) -DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) diff --git a/runtime/wtp.h b/runtime/wtp.h index 697722af..4bc284cb 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -56,7 +56,6 @@ struct wtp_s { void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */ pthread_attr_t attrThrd;/* attribute for new threads (created just once and cached here) */ pthread_mutex_t *pmutUsr; - pthread_cond_t *pcondBusy; /* unused condition variable, was used to signal threads to wake up */ rsRetVal (*pfChkStopWrkr)(void *pUsr, int); rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */ rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */ @@ -95,6 +94,5 @@ PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int); PROTOTYPEpropSetMeth(wtp, pUsr, void*); PROTOTYPEpropSetMeth(wtp, iNumWorkerThreads, int); PROTOTYPEpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t); -PROTOTYPEpropSetMethPTR(wtp, pcondBusy, pthread_cond_t); #endif /* #ifndef WTP_H_INCLUDED */ -- cgit v1.2.3 From ded6957225ae23f096dfa842caf28c0f34e9512c Mon Sep 17 00:00:00 2001 From: Pavel Levshin Date: Thu, 7 Nov 2013 12:42:37 +0100 Subject: bugfix: regular workers are no longer started if DA worker is active form mail: Look, below, if it is time to activate DA worker, we call it explicitly. But in this case we do not advise regular workers. They are likely already running at this point, but it is not guaranteed. What if, for example, the system is set to start additional workers when the queue is going over high watermark? What if HighWatermark is set to 1? Regular workers will not be started, and DA worker may fail. Thus, it is reasonable to advise regular workers even if we are going DA. --- runtime/queue.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/runtime/queue.c b/runtime/queue.c index ec9470c7..e0d60249 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -351,16 +351,15 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis) if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n"); wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + } + if(getLogicalQueueSize(pThis) == 0) { + iMaxWorkers = 0; + } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; } else { - if(getLogicalQueueSize(pThis) == 0) { - iMaxWorkers = 0; - } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; - } else { - iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; - } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); } RETiRet; -- cgit v1.2.3 From 9035771b7dbea738b8a22949a7f0a6514525fafb Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 7 Nov 2013 12:47:33 +0100 Subject: doc: maintain ChangeLog --- ChangeLog | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/ChangeLog b/ChangeLog index a3a47b50..454deee5 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,16 @@ --------------------------------------------------------------------------- -Version 7.4.7 [v7.4-stable] 2013-11-?? +Version 7.5.7 [devel] 2013-11-?? +- bugfix: ommysql lost configfile/section parameters after first close + This means that when a connection was broken, it was probably + re-instantiated with different parameters than configured. +- bugfix: regular worker threads are not properly (re)started if DA + mode is active. + This occurs only under rare conditions, but definitely is a bug that + needed to be addressed. It probably is present since version 4. + Note that this patch has not been applied to v7.4-stable, as it + is very unlikely to happen and the fix itself has some regression + potential (the fix looks very solid, but it addresses a core component). + Thanks to Pavel Levshin for the fix - worker thread pool handling has been improved Among others, permits pool to actually shrink (was quite hard with previous implementation. This will also improve performance and/or @@ -9,11 +20,6 @@ Version 7.4.7 [v7.4-stable] 2013-11-?? - bugfix: call to ruleset with async queue did not use the queue closes: http://bugzilla.adiscon.com/show_bug.cgi?id=443 --------------------------------------------------------------------------- -Version 7.5.7 [devel] 2013-11-?? -- bugfix: ommysql lost configfile/section parameters after first close - This means that when a connection was broken, it was probably - re-instantiated with different parameters than configured. ---------------------------------------------------------------------------- Version 7.5.6 [devel] 2013-10-29 - impstats: add capability to bind to a ruleset - improved performance of RainerScript variable access -- cgit v1.2.3