diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-11-07 12:57:36 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-11-07 12:57:36 +0100 |
commit | fb81cf202c0277f48c75f777fa47e4db4e59f0a3 (patch) | |
tree | 804c7265d82e0bb7127b31a9a7d68cc4b616e279 /runtime/queue.c | |
parent | ec32919ca05598f11f538060c406d8668a35e229 (diff) | |
parent | 9035771b7dbea738b8a22949a7f0a6514525fafb (diff) | |
download | rsyslog-fb81cf202c0277f48c75f777fa47e4db4e59f0a3.tar.gz rsyslog-fb81cf202c0277f48c75f777fa47e4db4e59f0a3.tar.bz2 rsyslog-fb81cf202c0277f48c75f777fa47e4db4e59f0a3.zip |
Merge branch 'master' into master-ruleeng
Conflicts:
runtime/wti.c
runtime/wti.h
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 24 |
1 files changed, 10 insertions, 14 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index e0f4481f..8c610b11 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -351,16 +351,15 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis) if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n"); wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + } + if(getLogicalQueueSize(pThis) == 0) { + iMaxWorkers = 0; + } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; } else { - if(getLogicalQueueSize(pThis) == 0) { - iMaxWorkers = 0; - } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; - } else { - iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; - } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); } RETiRet; @@ -484,7 +483,6 @@ InitDA(qqueue_t *pThis, int bLockMutex) CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); @@ -2106,7 +2104,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ break; } - if(pThis->iMaxQueueSize < 100) { + if(pThis->iMaxQueueSize < 100 + && (pThis->qType == QUEUETYPE_LINKEDLIST || pThis->qType == QUEUETYPE_FIXED_ARRAY)) { errmsg.LogError(0, RS_RET_OK_WARN, "Note: queue.size=\"%d\" is very " "low and can lead to unpredictable results. See also " "http://www.rsyslog.com/lower-bound-for-queue-sizes/", @@ -2122,7 +2121,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) pThis->iLightDlyMrk = pThis->iMaxQueueSize - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */ - if(pThis->iDeqBatchSize > pThis->iMaxQueueSize) + if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) pThis->iDeqBatchSize = pThis->iMaxQueueSize; /* finalize some initializations that could not yet be done because it is @@ -2139,7 +2138,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pthread_mutex_init(&pThis->mutThrdMgmt, NULL); pthread_cond_init (&pThis->notFull, NULL); - pthread_cond_init (&pThis->notEmpty, NULL); pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL); pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL); @@ -2180,7 +2178,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); CHKiRet(wtpSettoWrkShutdown (pThis->pWtpReg, pThis->toWrkShutdown)); CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis)); @@ -2445,7 +2442,6 @@ CODESTARTobjDestruct(qqueue) } pthread_mutex_destroy(&pThis->mutThrdMgmt); pthread_cond_destroy(&pThis->notFull); - pthread_cond_destroy(&pThis->notEmpty); pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); |