summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2013-11-07 12:57:49 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2013-11-07 12:57:49 +0100
commit23278699e35173f46effda964fd80b8f868e8b3e (patch)
tree35b8370799ac9ca9705a26bdee4c9f8309d07f31 /runtime/queue.c
parente3dc6041424a779ddce3da8d55af9bc71c99aa61 (diff)
parentfb81cf202c0277f48c75f777fa47e4db4e59f0a3 (diff)
downloadrsyslog-23278699e35173f46effda964fd80b8f868e8b3e.tar.gz
rsyslog-23278699e35173f46effda964fd80b8f868e8b3e.tar.bz2
rsyslog-23278699e35173f46effda964fd80b8f868e8b3e.zip
Merge branch 'master-ruleeng' into master-ruleeng-simd
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c24
1 files changed, 10 insertions, 14 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 989b29ca..8fb734e7 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -350,16 +350,15 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis)
if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n");
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
+ }
+ if(getLogicalQueueSize(pThis) == 0) {
+ iMaxWorkers = 0;
+ } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
} else {
- if(getLogicalQueueSize(pThis) == 0) {
- iMaxWorkers = 0;
- } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
- } else {
- iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
- }
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
+ iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
}
RETiRet;
@@ -483,7 +482,6 @@ InitDA(qqueue_t *pThis, int bLockMutex)
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
- CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
@@ -2095,7 +2093,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
break;
}
- if(pThis->iMaxQueueSize < 100) {
+ if(pThis->iMaxQueueSize < 100
+ && (pThis->qType == QUEUETYPE_LINKEDLIST || pThis->qType == QUEUETYPE_FIXED_ARRAY)) {
errmsg.LogError(0, RS_RET_OK_WARN, "Note: queue.size=\"%d\" is very "
"low and can lead to unpredictable results. See also "
"http://www.rsyslog.com/lower-bound-for-queue-sizes/",
@@ -2111,7 +2110,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize)
pThis->iLightDlyMrk = pThis->iMaxQueueSize
- (pThis->iMaxQueueSize / 100) * 30; /* default 70% */
- if(pThis->iDeqBatchSize > pThis->iMaxQueueSize)
+ if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize)
pThis->iDeqBatchSize = pThis->iMaxQueueSize;
/* finalize some initializations that could not yet be done because it is
@@ -2128,7 +2127,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
pthread_cond_init (&pThis->notFull, NULL);
- pthread_cond_init (&pThis->notEmpty, NULL);
pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL);
pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL);
@@ -2169,7 +2167,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
- CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
CHKiRet(wtpSettoWrkShutdown (pThis->pWtpReg, pThis->toWrkShutdown));
CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis));
@@ -2434,7 +2431,6 @@ CODESTARTobjDestruct(qqueue)
}
pthread_mutex_destroy(&pThis->mutThrdMgmt);
pthread_cond_destroy(&pThis->notFull);
- pthread_cond_destroy(&pThis->notEmpty);
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);