summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2013-11-07 12:57:49 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2013-11-07 12:57:49 +0100
commit23278699e35173f46effda964fd80b8f868e8b3e (patch)
tree35b8370799ac9ca9705a26bdee4c9f8309d07f31
parente3dc6041424a779ddce3da8d55af9bc71c99aa61 (diff)
parentfb81cf202c0277f48c75f777fa47e4db4e59f0a3 (diff)
downloadrsyslog-23278699e35173f46effda964fd80b8f868e8b3e.tar.gz
rsyslog-23278699e35173f46effda964fd80b8f868e8b3e.tar.bz2
rsyslog-23278699e35173f46effda964fd80b8f868e8b3e.zip
Merge branch 'master-ruleeng' into master-ruleeng-simd
-rw-r--r--ChangeLog20
-rw-r--r--runtime/queue.c24
-rw-r--r--runtime/queue.h2
-rw-r--r--runtime/wti.c6
-rw-r--r--runtime/wti.h1
-rw-r--r--runtime/wtp.c13
-rw-r--r--runtime/wtp.h2
-rwxr-xr-xtests/daqueue-persist.sh1
-rwxr-xr-xtests/diskqueue.sh1
9 files changed, 43 insertions, 27 deletions
diff --git a/ChangeLog b/ChangeLog
index 9a56e5f8..b7734f23 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,7 +1,3 @@
-Version 7.4.7 [v7.4-stable] 2013-11-??
-- improved checking of queue config parameters on startup
-- bugfix: call to ruleset with async queue did not use the queue
- closes: http://bugzilla.adiscon.com/show_bug.cgi?id=443
---------------------------------------------------------------------------
Version 8.1.0 [devel] 2013-11-??
- module omruleset is no longer enabled by default.
@@ -13,6 +9,22 @@ Version 7.5.7 [devel] 2013-11-??
- bugfix: ommysql lost configfile/section parameters after first close
This means that when a connection was broken, it was probably
re-instantiated with different parameters than configured.
+- bugfix: regular worker threads are not properly (re)started if DA
+ mode is active.
+ This occurs only under rare conditions, but definitely is a bug that
+ needed to be addressed. It probably is present since version 4.
+ Note that this patch has not been applied to v7.4-stable, as it
+ is very unlikely to happen and the fix itself has some regression
+ potential (the fix looks very solid, but it addresses a core component).
+ Thanks to Pavel Levshin for the fix
+- worker thread pool handling has been improved
+ Among others, permits pool to actually shrink (was quite hard with
+ previous implementation. This will also improve performance and/or
+ lower system overhead on busy systems.
+ Thanks to Pavel Levshin for the enhancement.
+- improved checking of queue config parameters on startup
+- bugfix: call to ruleset with async queue did not use the queue
+ closes: http://bugzilla.adiscon.com/show_bug.cgi?id=443
---------------------------------------------------------------------------
Version 7.5.6 [devel] 2013-10-29
- impstats: add capability to bind to a ruleset
diff --git a/runtime/queue.c b/runtime/queue.c
index 989b29ca..8fb734e7 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -350,16 +350,15 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis)
if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n");
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
+ }
+ if(getLogicalQueueSize(pThis) == 0) {
+ iMaxWorkers = 0;
+ } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
} else {
- if(getLogicalQueueSize(pThis) == 0) {
- iMaxWorkers = 0;
- } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
- } else {
- iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
- }
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
+ iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
}
RETiRet;
@@ -483,7 +482,6 @@ InitDA(qqueue_t *pThis, int bLockMutex)
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
- CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
@@ -2095,7 +2093,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
break;
}
- if(pThis->iMaxQueueSize < 100) {
+ if(pThis->iMaxQueueSize < 100
+ && (pThis->qType == QUEUETYPE_LINKEDLIST || pThis->qType == QUEUETYPE_FIXED_ARRAY)) {
errmsg.LogError(0, RS_RET_OK_WARN, "Note: queue.size=\"%d\" is very "
"low and can lead to unpredictable results. See also "
"http://www.rsyslog.com/lower-bound-for-queue-sizes/",
@@ -2111,7 +2110,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize)
pThis->iLightDlyMrk = pThis->iMaxQueueSize
- (pThis->iMaxQueueSize / 100) * 30; /* default 70% */
- if(pThis->iDeqBatchSize > pThis->iMaxQueueSize)
+ if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize)
pThis->iDeqBatchSize = pThis->iMaxQueueSize;
/* finalize some initializations that could not yet be done because it is
@@ -2128,7 +2127,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
pthread_cond_init (&pThis->notFull, NULL);
- pthread_cond_init (&pThis->notEmpty, NULL);
pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL);
pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL);
@@ -2169,7 +2167,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
- CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
CHKiRet(wtpSettoWrkShutdown (pThis->pWtpReg, pThis->toWrkShutdown));
CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis));
@@ -2434,7 +2431,6 @@ CODESTARTobjDestruct(qqueue)
}
pthread_mutex_destroy(&pThis->mutThrdMgmt);
pthread_cond_destroy(&pThis->notFull);
- pthread_cond_destroy(&pThis->notEmpty);
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
diff --git a/runtime/queue.h b/runtime/queue.h
index 07491f21..86107d24 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -121,7 +121,7 @@ struct queue_s {
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */
- pthread_cond_t notFull, notEmpty;
+ pthread_cond_t notFull;
pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */
int bThrdStateChanged; /* at least one thread state has changed if 1 */
diff --git a/runtime/wti.c b/runtime/wti.c
index 4642b526..6b5d82c8 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -175,6 +175,7 @@ CODESTARTobjDestruct(wti)
/* actual destruction */
batchFree(&pThis->batch);
free(pThis->actWrkrInfo);
+ pthread_cond_destroy(&pThis->pcondBusy);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
free(pThis->pszDbgHdr);
@@ -185,6 +186,7 @@ ENDobjDestruct(wti)
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
+ pthread_cond_init(&pThis->pcondBusy, NULL);
ENDobjConstruct(wti)
@@ -262,10 +264,10 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured)
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
- d_pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
+ d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
- if(d_pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
+ if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &t) != 0) {
DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
*pbInactivityTOOccured = 1; /* indicate we had a timeout */
}
diff --git a/runtime/wti.h b/runtime/wti.h
index 9ac61c2d..adc7897c 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -78,6 +78,7 @@ struct wti_s {
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */
+ pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */
DEF_ATOMIC_HELPER_MUT(mutIsRunning);
struct {
uint8_t bPrevWasSuspended;
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 1b960eb9..66942e6b 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -235,9 +235,9 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
/* lock mutex to prevent races (may otherwise happen during idle processing and such...) */
d_pthread_mutex_lock(pThis->pmutUsr);
wtpSetState(pThis, tShutdownCmd);
- pthread_cond_broadcast(pThis->pcondBusy); /* wake up all workers */
/* awake workers in retry loop */
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
+ pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy);
wtiWakeupThrd(pThis->pWrkr[i]);
}
d_pthread_mutex_unlock(pThis->pmutUsr);
@@ -457,7 +457,7 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
{
DEFiRet;
int nMissing; /* number workers missing to run */
- int i;
+ int i, nRunning;
ISOBJ_TYPE_assert(pThis, wtp);
@@ -477,7 +477,13 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
CHKiRet(wtpStartWrkr(pThis));
}
} else {
- pthread_cond_signal(pThis->pcondBusy);
+ /* we have needed number of workers, but they may be sleeping */
+ for(i = 0, nRunning = 0; i < pThis->iNumWorkerThreads && nRunning < nMaxWrkr; ++i) {
+ if (wtiGetState(pThis->pWrkr[i]) != WRKTHRD_STOPPED) {
+ pthread_cond_signal(&pThis->pWrkr[i]->pcondBusy);
+ nRunning++;
+ }
+ }
}
@@ -492,7 +498,6 @@ DEFpropSetMeth(wtp, wtpState, wtpState_t)
DEFpropSetMeth(wtp, iNumWorkerThreads, int)
DEFpropSetMeth(wtp, pUsr, void*)
DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t)
-DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t)
DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int))
DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*))
DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*))
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 25992f7f..4bc284cb 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -56,7 +56,6 @@ struct wtp_s {
void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */
pthread_attr_t attrThrd;/* attribute for new threads (created just once and cached here) */
pthread_mutex_t *pmutUsr;
- pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */
@@ -95,6 +94,5 @@ PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int);
PROTOTYPEpropSetMeth(wtp, pUsr, void*);
PROTOTYPEpropSetMeth(wtp, iNumWorkerThreads, int);
PROTOTYPEpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t);
-PROTOTYPEpropSetMethPTR(wtp, pcondBusy, pthread_cond_t);
#endif /* #ifndef WTP_H_INCLUDED */
diff --git a/tests/daqueue-persist.sh b/tests/daqueue-persist.sh
index feb2a347..0781a7dc 100755
--- a/tests/daqueue-persist.sh
+++ b/tests/daqueue-persist.sh
@@ -2,6 +2,7 @@
# to carry out multiple tests with different queue modes
# added 2009-05-27 by Rgerhards
# This file is part of the rsyslog project, released under GPLv3
+echo ===============================================================================
echo \[daqueue-persist.sh\]: test data persisting at shutdown
source $srcdir/daqueue-persist-drvr.sh LinkedList
source $srcdir/daqueue-persist-drvr.sh FixedArray
diff --git a/tests/diskqueue.sh b/tests/diskqueue.sh
index b871e9eb..853a836a 100755
--- a/tests/diskqueue.sh
+++ b/tests/diskqueue.sh
@@ -5,6 +5,7 @@
# added 2009-04-17 by Rgerhards
# This file is part of the rsyslog project, released under GPLv3
# uncomment for debugging support:
+echo ===============================================================================
echo \[diskqueue.sh\]: testing queue disk-only mode
# uncomment for debugging support:
#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"