summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c23
1 files changed, 15 insertions, 8 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 9d7a9058..9c7f96d0 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -110,7 +110,7 @@ static inline void queueDrain(qqueue_t *pThis)
ASSERT(pThis != NULL);
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
- while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) {
+ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) {
pThis->qDel(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
@@ -685,7 +685,6 @@ qqueueHaveQIF(qqueue_t *pThis)
{
DEFiRet;
uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
struct stat stat_buf;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -694,8 +693,8 @@ qqueueHaveQIF(qqueue_t *pThis)
ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
/* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
+ snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
+ (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
/* check if the file exists */
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
@@ -1028,7 +1027,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr)
CHKiRet(pThis->qAdd(pThis, pUsr));
if(pThis->qType != QUEUETYPE_DIRECT) {
- ATOMIC_INC(pThis->iQueueSize);
+ ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize);
dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize);
}
@@ -1057,7 +1056,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
- ATOMIC_DEC(pThis->iQueueSize);
+ ATOMIC_DEC(&pThis->iQueueSize, &pThis->mutQueueSize);
}
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
@@ -1345,6 +1344,8 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
break;
}
+ INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+
finalize_it:
OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP
RETiRet;
@@ -2065,6 +2066,8 @@ CODESTARTobjDestruct(qqueue)
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
+ DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize);
+
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
@@ -2206,6 +2209,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
+ dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n");
}
/* and finally enqueue the message */
@@ -2292,6 +2296,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
objDestruct(pUsr);
ABORT_FINALIZE(RS_RET_QUEUE_FULL);
}
+ dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n");
}
/* and finally enqueue the message */
@@ -2315,6 +2320,7 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int iCancelStateSave;
int i;
+ rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -2326,8 +2332,9 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
}
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
-dbgprintf("queueMultiEnq: %d\n", i);
- CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
+ localRet = doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]);
+ if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL)
+ ABORT_FINALIZE(localRet);
}
finalize_it: