diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-11-07 12:57:49 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-11-07 12:57:49 +0100 |
commit | 23278699e35173f46effda964fd80b8f868e8b3e (patch) | |
tree | 35b8370799ac9ca9705a26bdee4c9f8309d07f31 | |
parent | e3dc6041424a779ddce3da8d55af9bc71c99aa61 (diff) | |
parent | fb81cf202c0277f48c75f777fa47e4db4e59f0a3 (diff) | |
download | rsyslog-23278699e35173f46effda964fd80b8f868e8b3e.tar.gz rsyslog-23278699e35173f46effda964fd80b8f868e8b3e.tar.bz2 rsyslog-23278699e35173f46effda964fd80b8f868e8b3e.zip |
Merge branch 'master-ruleeng' into master-ruleeng-simd
-rw-r--r-- | ChangeLog | 20 | ||||
-rw-r--r-- | runtime/queue.c | 24 | ||||
-rw-r--r-- | runtime/queue.h | 2 | ||||
-rw-r--r-- | runtime/wti.c | 6 | ||||
-rw-r--r-- | runtime/wti.h | 1 | ||||
-rw-r--r-- | runtime/wtp.c | 13 | ||||
-rw-r--r-- | runtime/wtp.h | 2 | ||||
-rwxr-xr-x | tests/daqueue-persist.sh | 1 | ||||
-rwxr-xr-x | tests/diskqueue.sh | 1 |
9 files changed, 43 insertions, 27 deletions
@@ -1,7 +1,3 @@ -Version 7.4.7 [v7.4-stable] 2013-11-?? -- improved checking of queue config parameters on startup -- bugfix: call to ruleset with async queue did not use the queue - closes: http://bugzilla.adiscon.com/show_bug.cgi?id=443 --------------------------------------------------------------------------- Version 8.1.0 [devel] 2013-11-?? - module omruleset is no longer enabled by default. @@ -13,6 +9,22 @@ Version 7.5.7 [devel] 2013-11-?? - bugfix: ommysql lost configfile/section parameters after first close This means that when a connection was broken, it was probably re-instantiated with different parameters than configured. +- bugfix: regular worker threads are not properly (re)started if DA + mode is active. + This occurs only under rare conditions, but definitely is a bug that + needed to be addressed. It probably is present since version 4. + Note that this patch has not been applied to v7.4-stable, as it + is very unlikely to happen and the fix itself has some regression + potential (the fix looks very solid, but it addresses a core component). + Thanks to Pavel Levshin for the fix +- worker thread pool handling has been improved + Among others, permits pool to actually shrink (was quite hard with + previous implementation. This will also improve performance and/or + lower system overhead on busy systems. + Thanks to Pavel Levshin for the enhancement. +- improved checking of queue config parameters on startup +- bugfix: call to ruleset with async queue did not use the queue + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=443 --------------------------------------------------------------------------- Version 7.5.6 [devel] 2013-10-29 - impstats: add capability to bind to a ruleset diff --git a/runtime/queue.c b/runtime/queue.c index 989b29ca..8fb734e7 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -350,16 +350,15 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis) if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n"); wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + } + if(getLogicalQueueSize(pThis) == 0) { + iMaxWorkers = 0; + } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; } else { - if(getLogicalQueueSize(pThis) == 0) { - iMaxWorkers = 0; - } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; - } else { - iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; - } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); } RETiRet; @@ -483,7 +482,6 @@ InitDA(qqueue_t *pThis, int bLockMutex) CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); @@ -2095,7 +2093,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ break; } - if(pThis->iMaxQueueSize < 100) { + if(pThis->iMaxQueueSize < 100 + && (pThis->qType == QUEUETYPE_LINKEDLIST || pThis->qType == QUEUETYPE_FIXED_ARRAY)) { errmsg.LogError(0, RS_RET_OK_WARN, "Note: queue.size=\"%d\" is very " "low and can lead to unpredictable results. See also " "http://www.rsyslog.com/lower-bound-for-queue-sizes/", @@ -2111,7 +2110,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) pThis->iLightDlyMrk = pThis->iMaxQueueSize - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */ - if(pThis->iDeqBatchSize > pThis->iMaxQueueSize) + if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) pThis->iDeqBatchSize = pThis->iMaxQueueSize; /* finalize some initializations that could not yet be done because it is @@ -2128,7 +2127,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pthread_mutex_init(&pThis->mutThrdMgmt, NULL); pthread_cond_init (&pThis->notFull, NULL); - pthread_cond_init (&pThis->notEmpty, NULL); pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL); pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL); @@ -2169,7 +2167,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); CHKiRet(wtpSettoWrkShutdown (pThis->pWtpReg, pThis->toWrkShutdown)); CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis)); @@ -2434,7 +2431,6 @@ CODESTARTobjDestruct(qqueue) } pthread_mutex_destroy(&pThis->mutThrdMgmt); pthread_cond_destroy(&pThis->notFull); - pthread_cond_destroy(&pThis->notEmpty); pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); diff --git a/runtime/queue.h b/runtime/queue.h index 07491f21..86107d24 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -121,7 +121,7 @@ struct queue_s { /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ - pthread_cond_t notFull, notEmpty; + pthread_cond_t notFull; pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ int bThrdStateChanged; /* at least one thread state has changed if 1 */ diff --git a/runtime/wti.c b/runtime/wti.c index 4642b526..6b5d82c8 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -175,6 +175,7 @@ CODESTARTobjDestruct(wti) /* actual destruction */ batchFree(&pThis->batch); free(pThis->actWrkrInfo); + pthread_cond_destroy(&pThis->pcondBusy); DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); free(pThis->pszDbgHdr); @@ -185,6 +186,7 @@ ENDobjDestruct(wti) */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning); + pthread_cond_init(&pThis->pcondBusy, NULL); ENDobjConstruct(wti) @@ -262,10 +264,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) if(pThis->bAlwaysRunning) { /* never shut down any started worker */ - d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr); + d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr); } else { timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */ - if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) { + if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &t) != 0) { DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis)); *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } diff --git a/runtime/wti.h b/runtime/wti.h index 9ac61c2d..adc7897c 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -78,6 +78,7 @@ struct wti_s { batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */ + pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */ DEF_ATOMIC_HELPER_MUT(mutIsRunning); struct { uint8_t bPrevWasSuspended; diff --git a/runtime/wtp.c b/runtime/wtp.c index 1b960eb9..66942e6b 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -235,9 +235,9 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout /* lock mutex to prevent races (may otherwise happen during idle processing and such...) */ d_pthread_mutex_lock(pThis->pmutUsr); wtpSetState(pThis, tShutdownCmd); - pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */ /* awake workers in retry loop */ for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { + pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy); wtiWakeupThrd(pThis->pWrkr[i]); } d_pthread_mutex_unlock(pThis->pmutUsr); @@ -457,7 +457,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) { DEFiRet; int nMissing; /* number workers missing to run */ - int i; + int i, nRunning; ISOBJ_TYPE_assert(pThis, wtp); @@ -477,7 +477,13 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr) CHKiRet(wtpStartWrkr(pThis)); } } else { - pthread_cond_signal(pThis->pcondBusy); + /* we have needed number of workers, but they may be sleeping */ + for(i = 0, nRunning = 0; i < pThis->iNumWorkerThreads && nRunning < nMaxWrkr; ++i) { + if (wtiGetState(pThis->pWrkr[i]) != WRKTHRD_STOPPED) { + pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy); + nRunning++; + } + } } @@ -492,7 +498,6 @@ DEFpropSetMeth(wtp, wtpState, wtpState_t) DEFpropSetMeth(wtp, iNumWorkerThreads, int) DEFpropSetMeth(wtp, pUsr, void*) DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) -DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) diff --git a/runtime/wtp.h b/runtime/wtp.h index 25992f7f..4bc284cb 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -56,7 +56,6 @@ struct wtp_s { void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */ pthread_attr_t attrThrd;/* attribute for new threads (created just once and cached here) */ pthread_mutex_t *pmutUsr; - pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */ rsRetVal (*pfChkStopWrkr)(void *pUsr, int); rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */ rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */ @@ -95,6 +94,5 @@ PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int); PROTOTYPEpropSetMeth(wtp, pUsr, void*); PROTOTYPEpropSetMeth(wtp, iNumWorkerThreads, int); PROTOTYPEpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t); -PROTOTYPEpropSetMethPTR(wtp, pcondBusy, pthread_cond_t); #endif /* #ifndef WTP_H_INCLUDED */ diff --git a/tests/daqueue-persist.sh b/tests/daqueue-persist.sh index feb2a347..0781a7dc 100755 --- a/tests/daqueue-persist.sh +++ b/tests/daqueue-persist.sh @@ -2,6 +2,7 @@ # to carry out multiple tests with different queue modes # added 2009-05-27 by Rgerhards # This file is part of the rsyslog project, released under GPLv3 +echo =============================================================================== echo \[daqueue-persist.sh\]: test data persisting at shutdown source $srcdir/daqueue-persist-drvr.sh LinkedList source $srcdir/daqueue-persist-drvr.sh FixedArray diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh index b871e9eb..853a836a 100755 --- a/tests/diskqueue.sh +++ b/tests/diskqueue.sh @@ -5,6 +5,7 @@ # added 2009-04-17 by Rgerhards # This file is part of the rsyslog project, released under GPLv3 # uncomment for debugging support: +echo =============================================================================== echo \[diskqueue.sh\]: testing queue disk-only mode # uncomment for debugging support: #export RSYSLOG_DEBUG="debug nostdout noprintmutexaction" |