summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2013-07-15 15:27:18 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2013-07-15 15:27:18 +0200
commitbb4a8c26f156fc4995fa768191d01645a849ec34 (patch)
tree2a2103b215abb8509b1cc62293408dc6426db37a /runtime/queue.c
parent01225486c213ae2fe2406208eaaaec9bcc250512 (diff)
parentb6d350ab9c212923dcfcf3b160af83b3c078f1bc (diff)
downloadrsyslog-bb4a8c26f156fc4995fa768191d01645a849ec34.tar.gz
rsyslog-bb4a8c26f156fc4995fa768191d01645a849ec34.tar.bz2
rsyslog-bb4a8c26f156fc4995fa768191d01645a849ec34.zip
Merge branch 'v7-stable'
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c18
1 files changed, 15 insertions, 3 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 728eb133..0f71ff13 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1935,8 +1935,16 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
- CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY,
- MsgAddRef(pWti->batch.pElem[i].pMsg)));
+ iRet = qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg));
+ if(iRet != RS_RET_OK) {
+ if(iRet == RS_RET_ERR_QUEUE_EMERGENCY) {
+ /* Queue emergency error occured */
+ DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg caught RS_RET_ERR_QUEUE_EMERGENCY, aborting loop.\n");
+ FINALIZE;
+ } else {
+ DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg item (%d) returned with error state: '%d'\n", i, iRet);
+ }
+ }
pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */
}
@@ -1944,10 +1952,14 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
pthread_setcancelstate(iCancelStateSave, NULL);
finalize_it:
+ if(iRet != RS_RET_OK && iRet != RS_RET_ERR_QUEUE_EMERGENCY) {
+ iRet = RS_RET_OK;
+ }
+
/* now we are done, but potentially need to re-aquire the mutex */
if(bNeedReLock)
d_pthread_mutex_lock(pThis->mut);
- DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
+
RETiRet;
}