diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-07-15 15:27:18 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-07-15 15:27:18 +0200 |
commit | bb4a8c26f156fc4995fa768191d01645a849ec34 (patch) | |
tree | 2a2103b215abb8509b1cc62293408dc6426db37a /runtime/queue.c | |
parent | 01225486c213ae2fe2406208eaaaec9bcc250512 (diff) | |
parent | b6d350ab9c212923dcfcf3b160af83b3c078f1bc (diff) | |
download | rsyslog-bb4a8c26f156fc4995fa768191d01645a849ec34.tar.gz rsyslog-bb4a8c26f156fc4995fa768191d01645a849ec34.tar.bz2 rsyslog-bb4a8c26f156fc4995fa768191d01645a849ec34.zip |
Merge branch 'v7-stable'
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 728eb133..0f71ff13 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1935,8 +1935,16 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { - CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, - MsgAddRef(pWti->batch.pElem[i].pMsg))); + iRet = qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg)); + if(iRet != RS_RET_OK) { + if(iRet == RS_RET_ERR_QUEUE_EMERGENCY) { + /* Queue emergency error occured */ + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg caught RS_RET_ERR_QUEUE_EMERGENCY, aborting loop.\n"); + FINALIZE; + } else { + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg item (%d) returned with error state: '%d'\n", i, iRet); + } + } pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */ } @@ -1944,10 +1952,14 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(iCancelStateSave, NULL); finalize_it: + if(iRet != RS_RET_OK && iRet != RS_RET_ERR_QUEUE_EMERGENCY) { + iRet = RS_RET_OK; + } + /* now we are done, but potentially need to re-aquire the mutex */ if(bNeedReLock) d_pthread_mutex_lock(pThis->mut); - DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); + RETiRet; } |