diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 9d7a9058..9c7f96d0 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -110,7 +110,7 @@ static inline void queueDrain(qqueue_t *pThis) ASSERT(pThis != NULL); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) { + while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { pThis->qDel(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); @@ -685,7 +685,6 @@ qqueueHaveQIF(qqueue_t *pThis) { DEFiRet; uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; struct stat stat_buf; ISOBJ_TYPE_assert(pThis, qqueue); @@ -694,8 +693,8 @@ qqueueHaveQIF(qqueue_t *pThis) ABORT_FINALIZE(RS_RET_NO_FILEPREFIX); /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); + snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", + (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); /* check if the file exists */ if(stat((char*) pszQIFNam, &stat_buf) == -1) { @@ -1028,7 +1027,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr) CHKiRet(pThis->qAdd(pThis, pUsr)); if(pThis->qType != QUEUETYPE_DIRECT) { - ATOMIC_INC(pThis->iQueueSize); + ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize); } @@ -1057,7 +1056,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr) iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); - ATOMIC_DEC(pThis->iQueueSize); + ATOMIC_DEC(&pThis->iQueueSize, &pThis->mutQueueSize); } dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", @@ -1345,6 +1344,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread break; } + INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP RETiRet; @@ -2065,6 +2066,8 @@ CODESTARTobjDestruct(qqueue) pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + /* type-specific destructor */ iRet = pThis->qDestruct(pThis); @@ -2206,6 +2209,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } + dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); } /* and finally enqueue the message */ @@ -2292,6 +2296,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } + dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); } /* and finally enqueue the message */ @@ -2315,6 +2320,7 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) { int iCancelStateSave; int i; + rsRetVal localRet; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); @@ -2326,8 +2332,9 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) } for(i = 0 ; i < pMultiSub->nElem ; ++i) { -dbgprintf("queueMultiEnq: %d\n", i); - CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); + localRet = doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]); + if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) + ABORT_FINALIZE(localRet); } finalize_it: |