diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 131 |
1 files changed, 107 insertions, 24 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index b29ec7ac..60d17086 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -79,6 +79,8 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); +static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -214,7 +216,7 @@ static inline void queueDrain(qqueue_t *pThis) BEGINfunc DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) { + while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { pThis->qDeq(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); @@ -769,8 +771,8 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) */ objDestruct(pUsr); - DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets\n", - nWriteCount, pThis->tVars.disk.sizeOnDisk); + DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n", + nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly); finalize_it: RETiRet; @@ -839,6 +841,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) batch_obj_t batchObj; DEFiRet; + //TODO: init batchObj (states _OK and new fields -- CHECK) ASSERT(pThis != NULL); /* calling the consumer is quite different here than it is from a worker thread */ @@ -849,8 +852,11 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ + memset(&batchObj, 0, sizeof(batch_obj_t)); + memset(&singleBatch, 0, sizeof(batch_t)); batchObj.state = BATCH_STATE_RDY; batchObj.pUsrp = (obj_t*) pUsr; + batchObj.bFilterOK = 1; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); @@ -859,6 +865,28 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) RETiRet; } +/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead + * otherwise incured. -- rgerhards, ~2010-06-23 + */ +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + /* calling the consumer is quite different here than it is from a worker thread */ + /* we need to provide the consumer's return value back to the caller because in direct + * mode the consumer probably has a lot to convey (which get's lost in the other modes + * because they are asynchronous. But direct mode is deliberately synchronous. + * rgerhards, 2008-02-12 + * We use our knowledge about the batch_t structure below, but without that, we + * pay a too-large performance toll... -- rgerhards, 2009-04-22 + */ + iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate); + + RETiRet; +} + static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -884,7 +912,7 @@ qqueueAdd(qqueue_t *pThis, void *pUsr) CHKiRet(pThis->qAdd(pThis, pUsr)); if(pThis->qType != QUEUETYPE_DIRECT) { - ATOMIC_INC(pThis->iQueueSize); + ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); } @@ -909,7 +937,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ iRet = pThis->qDeq(pThis, ppUsr); - ATOMIC_INC(pThis->nLogDeq); + ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq); // DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", // getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); @@ -944,6 +972,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) d_pthread_mutex_lock(pThis->mut); /* tell regular queue DA worker to stop shuffling messages to DA queue... */ + DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode for DA worker\n"); pThis->pqDA->bEnqOnly = 1; wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); wtpAdviseMaxWorkers(pThis->pWtpDA, 1); @@ -1010,6 +1039,7 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout"); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ + DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n"); pThis->bEnqOnly = 1; pThis->bShutdownImmediate = 1; /* now DA queue */ @@ -1201,6 +1231,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddFixedArray; pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; @@ -1208,6 +1239,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddLinkedList; pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1215,6 +1247,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ break; @@ -1223,9 +1256,13 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qDestruct = qDestructDirect; pThis->qAdd = qAddDirect; pThis->qDel = qDelDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; break; } + INIT_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + INIT_ATOMIC_HELPER_MUT(pThis->mutLogDeq); + finalize_it: OBJCONSTRUCT_CHECK_SUCCESS_AND_CLEANUP RETiRet; @@ -1288,8 +1325,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) } /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - ATOMIC_SUB(pThis->iQueueSize, nElem); - ATOMIC_SUB(pThis->nLogDeq, nElem); + ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); + ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ @@ -1356,6 +1393,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { +dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state); localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*) pUsr)); ++nEnqueued; @@ -1373,7 +1411,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) iRet = DeleteBatchFromQStore(pThis, pBatch); - pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ + pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ // TODO: more fine init, new fields! 2010-06-14 RETiRet; } @@ -1418,6 +1456,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz /* all well, use this element */ pWti->batch.pElem[nDequeued].pUsrp = pUsr; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; + pWti->batch.pElem[nDequeued].bFilterOK = 1; // TODO: think again if we can handle that with more performance ++nDequeued; } @@ -1597,6 +1636,12 @@ finalize_it: /* This is called when a batch is processed and the worker does not * ask for another batch (e.g. because it is to be terminated) + * Note that we must not be terminated while we delete a processed + * batch. Otherwise, we may not complete it, and then the cancel + * handler also tries to delete the batch. But then it finds some of + * the messages already destructed. This was a bug we have seen, especially + * with disk mode, where a delete takes rather long. Anyhow, the coneptual + * problem exists in all queue modes. * rgerhards, 2009-05-27 */ static rsRetVal @@ -1607,8 +1652,12 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); + int iCancelStateSave; + /* at this spot, we must not be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); DeleteProcessedBatch(pThis, &pWti->batch); qqueueChkPersist(pThis, pWti->batch.nElemDeq); + pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -1693,7 +1742,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ -dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ @@ -2063,6 +2111,9 @@ CODESTARTobjDestruct(qqueue) pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutQueueSize); + DESTROY_ATOMIC_HELPER_MUT(pThis->mutLogDeq); + /* type-specific destructor */ iRet = pThis->qDestruct(pThis); @@ -2189,6 +2240,7 @@ finalize_it: RETiRet; } +/* ------------------------------ multi-enqueue functions ------------------------------ */ /* enqueue multiple user data elements at once. The aim is to provide a faster interface * for object submission. Uses the multi_submit_t helper object. * Please note that this function is not cancel-safe and consequently @@ -2196,9 +2248,12 @@ finalize_it: * during its execution. If that is not done, race conditions occur if the * thread is canceled (most important use case is input module termination). * rgerhards, 2009-06-16 + * Note: there now exists multiple different functions implementing specially + * optimized algorithms for different config cases. -- rgerhards, 2010-06-09 */ -rsRetVal -qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) +/* now the function for all modes but direct */ +static rsRetVal +qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int iCancelStateSave; int i; @@ -2207,27 +2262,55 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) ISOBJ_TYPE_assert(pThis, qqueue); assert(pMultiSub != NULL); - if(pThis->qType != QUEUETYPE_DIRECT) { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); - } - + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(pThis->mut); for(i = 0 ; i < pMultiSub->nElem ; ++i) { CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); } - qqueueChkPersist(pThis, pMultiSub->nElem); finalize_it: - if(pThis->qType != QUEUETYPE_DIRECT) { - /* make sure at least one worker is running. */ - qqueueAdviseMaxWorkers(pThis); - /* and release the mutex */ - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + /* make sure at least one worker is running. */ + qqueueAdviseMaxWorkers(pThis); + /* and release the mutex */ + d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + + RETiRet; +} + +/* now, the same function, but for direct mode */ +static rsRetVal +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +{ + int i; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pMultiSub != NULL); + + for(i = 0 ; i < pMultiSub->nElem ; ++i) { + CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); } +finalize_it: + RETiRet; +} +/* ------------------------------ END multi-enqueue functions ------------------------------ */ + + +/* enqueue a new user data element in direct mode + * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better + * code later on (like multi submit!) 2010-06-10 + * Enqueues the new element and awakes worker thread. + */ +rsRetVal +qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) +{ + DEFiRet; + ISOBJ_TYPE_assert(pThis, qqueue); + iRet = qAddDirect(pThis, pUsr); RETiRet; } |