From 4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 13 Oct 2009 14:38:45 +0200 Subject: added some debug settings plus improved shutdown sequence ... non-working version! --- runtime/queue.c | 51 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 12 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 101052a1..00bbd15f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1041,7 +1041,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) batchObj.pUsrp = (obj_t*) pUsr; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; - iRet = pThis->pConsumer(pThis->pUsr, &singleBatch); + iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); objDestruct(pUsr); RETiRet; @@ -1180,6 +1180,9 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } else { DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n"); } + } + + if(pThis->pWtpDA != NULL) { /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting * the queue, it is restarted at a later stage. We don't care here if a timeout happens. */ @@ -1210,6 +1213,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) rsRetVal iRetLocal; DEFiRet; +RUNLOG_STR("trying to shutdown workers within Action Timeout"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ @@ -1218,6 +1222,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 */ pThis->bEnqOnly = 1; + pThis->bShutdownImmediate = 1; /* need to set this so that the DA queue begins shutdown in parallel! */ if(pThis->pqDA != NULL) { pThis->pqDA->bEnqOnly = 1; @@ -1247,6 +1252,9 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } + } + + if(pThis->pWtpDA != NULL) { /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This * is necessary because we may be in a situation where the DA queue regular worker and the * main queue worker stopped rather quickly. In this case, there is almost no time (and @@ -1279,6 +1287,7 @@ static rsRetVal cancelWorkers(qqueue_t *pThis) { rsRetVal iRetLocal; + struct timespec tTimeout; DEFiRet; /* Now queue workers should have terminated. If not, we need to cancel them as we have applied @@ -1300,13 +1309,31 @@ cancelWorkers(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker " "threads, continuing, but results are unpredictable\n", iRetLocal); } + } - /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be - * restarted later to persist the queue. But we stop it, because otherwise we get into - * big trouble when resetting the logical dequeue pointer. This operation can only be - * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 + /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be + * restarted later to persist the queue. But we stop it, because otherwise we get into + * big trouble when resetting the logical dequeue pointer. This operation can only be + * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 + */ + if(pThis->pWtpDA != NULL) { + /* but because of the potentially harsh consequences of cancelling, we try one last + * (and short) time to shut down the DA worker in a normal fashion. The idea here + * is that it may be willing to do so, but we did not yet have a task switch so + * that it could not terminate but will do immediately when it gets time. + * rgerhards, 2009-10-13 */ - DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n"); + timeoutComp(&tTimeout, 50); + DBGOPRINT((obj_t*) pThis, "one ultimately last try for regular shutdown of main queue DA worker pool\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool " + "- this is not good, need to cancel now...\n"); + } else { + DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down!\n"); + } + + DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n"); iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */ } @@ -1349,6 +1376,7 @@ ShutdownWorkers(qqueue_t *pThis) pThis->iLowWtrMrk = 0; CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); +dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis)); if(getPhysicalQueueSize(pThis) > 0) { CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis)); @@ -1375,7 +1403,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) { DEFiRet; qqueue_t *pThis; @@ -1835,7 +1863,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1880,7 +1908,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) d_pthread_mutex_unlock(pThis->mut); /* iterate over returned results and enqueue them in DA queue */ - for(i = 0 ; i < pWti->batch.nElem ; i++) { + for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 @@ -1925,7 +1953,8 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); iRet = RS_RET_TERMINATE_NOW; RUNLOG_STR("XXX: re-start reg worker"); -qqueueAdviseMaxWorkers(pThis); +if(!pThis->bShutdownImmediate) + qqueueAdviseMaxWorkers(pThis); RUNLOG_STR("XXX: done re-start reg worker"); } } else { @@ -2276,8 +2305,6 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g /* destructor for the queue object */ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(qqueue) - pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* shut down all workers * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) -- cgit v1.2.3 From c5408da3d8f17691fb91282d031757ed041fec55 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 14 Oct 2009 11:01:21 +0200 Subject: new queue engine - initial commit (probably not 100% working!) simplified and thus speeded up the queue engine, also fixed some potential race conditions (in very unusual shutdown conditions) along the way. The threading model has seriously changes, so there may be some regressions. NOTE: the code passed basic tests, but there is still more work and testing to be done. This commit should be treated with care. --- runtime/queue.c | 509 +++++++++++--------------------------------------------- 1 file changed, 101 insertions(+), 408 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 00bbd15f..dacf1f13 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -55,6 +55,7 @@ #include "wti.h" #include "msg.h" #include "atomic.h" +#include "unicode-helper.h" #include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS @@ -68,11 +69,9 @@ DEFobjCurrIf(strm) /* forward-definitions */ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); -static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); -static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); @@ -228,7 +227,8 @@ static inline void queueDrain(qqueue_t *pThis) * this point in time. The mutex must be locked when * ths function is called. -- rgerhards, 2008-01-25 */ -static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) +static inline rsRetVal +qqueueAdviseMaxWorkers(qqueue_t *pThis) { DEFiRet; int iMaxWorkers; @@ -236,48 +236,20 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); if(!pThis->bEnqOnly) { - if(pThis->bRunsDA) { - /* if we have not yet reached the high water mark, there is no need to start a - * worker. -- rgerhards, 2008-01-26 - */ - if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ - } - } - /* regular workers always run */ - if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; +dbgprintf("AdviseMaxWorkers: log Queue Size: %d, high water mark %d\n", + getLogicalQueueSize(pThis) , pThis->iHighWtrMrk); + if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } else { - iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + if(getLogicalQueueSize(pThis) == 0) { + iMaxWorkers = 0; + } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; + } else { + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ - } - - RETiRet; -} - - -/* Destruct DA queue. This is the last part of DA-to-normal-mode - * transistion. This is called asynchronously and some time quite a - * while after the actual transistion. The key point is that we need to - * do it at some later time, because we need to destruct the DA queue. That, - * however, can not be done in a thread that has been signalled - * This is to be called when we revert back to our own queue. - * This function must be called with the queue mutex locked (the wti - * class ensures this). - * rgerhards, 2008-01-15 - */ -static rsRetVal -TurnOffDAMode(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - ASSERT(pThis->bRunsDA); - if(getLogicalQueueSize(pThis->pqDA) == 0) { - pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ - DBGOPRINT((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", - iRet); } RETiRet; @@ -348,33 +320,18 @@ StartDA(qqueue_t *pThis) CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles)); CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); - CHKiRet(SetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); + CHKiRet(qqueueSettoQShutdown(pThis->pqDA, pThis->toQShutdown)); CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0)); - // experimental: XXX - CHKiRet(qqueueSettoWrkShutdown(pThis->pqDA, 0)); - - if(pThis->toQShutdown == 0) { - CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ - } else { - /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND - * have an obviously large backlog, we can't finish it in any case. So there is no point - * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15 - */ - CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1)); - } - iRet = qqueueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ - //pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ - - DBGOPRINT((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", + DBGOPRINT((obj_t*) pThis, "DA queue initialized, disk queue 0x%lx\n", qqueueGetID(pThis->pqDA)); finalize_it: @@ -412,91 +369,35 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) * rgerhards, 2008-01-24 * NOTE: this is the DA worker *pool*, not the DA queue! */ - if(pThis->pWtpDA == NULL) { - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis)); - CHKiRet(wtpConstruct (&pThis->pWtpDA)); - CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); - CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); - CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode)); - CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); - CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); - CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); - CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); - CHKiRet(wtpConstructFinalize (pThis->pWtpDA)); - } + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis)); + CHKiRet(wtpConstruct (&pThis->pWtpDA)); + CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); + CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); + CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); + CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); + CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); + CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); + CHKiRet(wtpConstructFinalize (pThis->pWtpDA)); /* if we reach this point, we have a "good" DA worker pool */ - /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ - pThis->bDAEnqOnly = bEnqOnly; - /* now construct the actual queue (if it does not already exist) */ if(pThis->pqDA == NULL) { CHKiRet(StartDA(pThis)); } + pThis->bEnqOnly = bEnqOnly; // TODO: I think this is not needed, but first clean up shutdown processing! pThis->bRunsDA = 1; - /* now we must now adivse the wtp that we need one worker. If none is yet active, - * that will also start one up. If we forgot that step, everything would be stalled - * until the next enqueue request. - */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues always have just one worker max */ - finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); RETiRet; } -/* check if we need to start disk assisted mode and send some signals to - * keep it running if we are already in it. It also checks if DA mode is - * partially initialized, in which case it waits for initialization to - * complete. - * rgerhards, 2008-01-14 - */ -static rsRetVal -ChkStrtDA(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - /* if we do not hit the high water mark, we have nothing to do */ - if(getPhysicalQueueSize(pThis) != pThis->iHighWtrMrk) - ABORT_FINALIZE(RS_RET_OK); - - if(pThis->bRunsDA) { - /* then we need to signal that we are at the high water mark again. If that happens - * on our way down the queue, that doesn't matter, because then nobody is waiting - * on the condition variable. - * (Remember that a DA queue stops draining the queue once it has reached the low - * water mark and restarts it when the high water mark is reached again - this is - * what this code here is responsible for. Please note that all workers may have been - * terminated due to the inactivity timeout, thus we need to advise the pool that - * we need at least one). - */ - DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", - getPhysicalQueueSize(pThis)); - qqueueAdviseMaxWorkers(pThis); - } else { - /* this is the case when we are currently not running in DA mode. So it is time - * to turn it back on. - */ - DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", - getPhysicalQueueSize(pThis)); - InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ - } - -finalize_it: - RETiRet; -} - - /* --------------- end code for disk-assisted queue modes -------------------- */ @@ -733,44 +634,6 @@ finalize_it: } -/* This method checks if we have a QIF file for the current queue (no matter of - * queue mode). Returns RS_RET_OK if we have a QIF file or an error status otherwise. - * rgerhards, 2008-01-15 - */ -static rsRetVal -qqueueHaveQIF(qqueue_t *pThis) -{ - DEFiRet; - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; - struct stat stat_buf; - - ISOBJ_TYPE_assert(pThis, qqueue); - - if(pThis->pszFilePrefix == NULL) - ABORT_FINALIZE(RS_RET_NO_FILEPREFIX); - - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - - /* check if the file exists */ - if(stat((char*) pszQIFNam, &stat_buf) == -1) { - if(errno == ENOENT) { - DBGOPRINT((obj_t*) pThis, "no .qi file found\n"); - ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); - } else { - DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno); - ABORT_FINALIZE(RS_RET_IO_ERROR); - } - } - /* If we reach this point, we have a .qi file */ - -finalize_it: - RETiRet; -} - - /* The method loads the persistent queue information. * rgerhards, 2008-01-11 */ @@ -1112,15 +975,17 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) } -/* Try to terminate queue worker threads within the regular shutdown interval. - * Both the regular and DA queue (if it exists) is waited for, but on the same timeout. - * After this function returns, the workers must either be finished or some force - * to finish them must be applied. - * This function also instructs the DA worker pool (if it exists) to terminate. This is done - * in preparation of final queue shutdown. - * rgerhards, 2009-05-27 +/* Try to shut down regular and DA queue workers, within the queue timeout + * period. That means processing continues as usual. This is the expected + * usual case, where during shutdown those messages remaining are being + * processed. At this point, it is acceptable that the queue can not be + * fully depleted, that case is handled in the next step. During this phase, + * we first shut down the main queue DA worker to prevent new data to arrive + * at the DA queue, and then we ask the regular workers of both the Regular + * and DA queue to try complete processing. + * rgerhards, 2009-10-14 */ -static rsRetVal +static inline rsRetVal tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) { struct timespec tTimeout; @@ -1130,30 +995,26 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ - if(getPhysicalQueueSize(pThis) > 0) { - if(pThis->bRunsDA) { - /* We may have waited on the low water mark. As it may have changed, we - * see if we reactivate the worker. - */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); - } + if(pThis->bIsDA) { + /* We need to lock the mutex, as otherwise we may have a race that prevents + * us from awaking the DA worker. */ + d_pthread_mutex_lock(pThis->mut); + + /* tell regular queue DA worker to stop shuffling messages to DA queue... */ + pThis->pqDA->bEnqOnly = 1; + wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); + DBGOPRINT((obj_t*) pThis, "awoke DA worker, told it to shut down.\n"); + + /* also tell the DA queue worker to shut down, so that it already knows... */ + wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN); + wtpAdviseMaxWorkers(pThis->pqDA->pWtpReg, 1); /* awake its lone worker */ + DBGOPRINT((obj_t*) pThis, "awoke DA queue regular worker, told it to shut down when done.\n"); + + d_pthread_mutex_unlock(pThis->mut); } - d_pthread_mutex_unlock(pThis->mut); - /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found - * out there are no active workers - that doesn't matter: the wtp knows about that and so will - * return immediately. - * We do not yet care about the DA worker - that will be handled down later in the process. - * Note that we must not request shutdown right now - that may introduce a race: if the regular queue - * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA - * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead - * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way - * (from a performance point of view). So we don't do anything right now. The DA queue will continue to - * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything - * by not requesting shutdown now. - * rgerhards, 2008-01-25 - */ + /* first calculate absolute timeout - we need the absolute value here, because we need to coordinate * shutdown of both the regular and DA queue on *the same* timeout. */ @@ -1182,29 +1043,17 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } } - if(pThis->pWtpDA != NULL) { - /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting - * the queue, it is restarted at a later stage. We don't care here if a timeout happens. - */ - DBGOPRINT((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); - } else { - DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down on first try.\n"); - } - } - RETiRet; } /* Try to shut down regular and DA queue workers, within the action timeout - * period. Note that the main queue DA worker is still unaffected (and may shuffle - * data to the disk queue while we terminate the other workers). Not finishing - * processing all messages is now OK (but they may be preserved later, depending - * on bSaveOnShutdown setting). - * rgerhards, 2009-05-27 + * period. This aborts processing, but at the end of the current action, in + * a well-defined manner. During this phase, we terminate all three worker + * pools, including the regular queue DA worker if it not yet has terminated. + * Not finishing processing all messages is OK (and expected) at this stage + * (they may be preserved later, depending * on bSaveOnShutdown setting). + * rgerhards, 2009-10-14 */ static rsRetVal tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) @@ -1218,17 +1067,10 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout"); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ - /* note that we modify bEnqOnly directly, because going through the method would - * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 - */ pThis->bEnqOnly = 1; pThis->bShutdownImmediate = 1; - /* need to set this so that the DA queue begins shutdown in parallel! */ - if(pThis->pqDA != NULL) { - pThis->pqDA->bEnqOnly = 1; - wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); - } +// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance... /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ timeoutComp(&tTimeout, pThis->toActShutdown); DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n"); @@ -1252,20 +1094,14 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout"); DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } - } - if(pThis->pWtpDA != NULL) { - /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This - * is necessary because we may be in a situation where the DA queue regular worker and the - * main queue worker stopped rather quickly. In this case, there is almost no time (and - * probably no thread switch!) between the point where we instructed the main queue DA - * worker to shutdown and this code location. In consequence, it may not even have - * noticed that it should should down, less acutally done this. So we provide it with a - * fixed 100ms timeout to try complete its work, what usually should be sufficient. - * rgerhards, 2009-10-06 + /* and now we need to terminate the DA worker itself. We always grant it a 100ms timeout, + * which should be sufficient and usually not be required (it is expected to have finished + * long before while we were processing the queue timeout in shutdown phase 1). + * rgerhards, 2009-10-14 */ timeoutComp(&tTimeout, 100); - DBGOPRINT((obj_t*) pThis, "last try for regular shutdown of main queue DA worker pool\n"); + DBGOPRINT((obj_t*) pThis, "trying regular shutdown of main queue DA worker pool\n"); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool " @@ -1280,14 +1116,13 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout"); /* This function cancels all remaining regular workers for both the main and the DA - * queue. The main queue's DA worker pool continues to run (if it exists and is active). + * queue. * rgerhards, 2009-05-29 */ static rsRetVal cancelWorkers(qqueue_t *pThis) { rsRetVal iRetLocal; - struct timespec tTimeout; DEFiRet; /* Now queue workers should have terminated. If not, we need to cancel them as we have applied @@ -1309,30 +1144,12 @@ cancelWorkers(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker " "threads, continuing, but results are unpredictable\n", iRetLocal); } - } - /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be - * restarted later to persist the queue. But we stop it, because otherwise we get into - * big trouble when resetting the logical dequeue pointer. This operation can only be - * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 - */ - if(pThis->pWtpDA != NULL) { - /* but because of the potentially harsh consequences of cancelling, we try one last - * (and short) time to shut down the DA worker in a normal fashion. The idea here - * is that it may be willing to do so, but we did not yet have a task switch so - * that it could not terminate but will do immediately when it gets time. - * rgerhards, 2009-10-13 + /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be + * restarted later to persist the queue. But we stop it, because otherwise we get into + * big trouble when resetting the logical dequeue pointer. This operation can only be + * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 */ - timeoutComp(&tTimeout, 50); - DBGOPRINT((obj_t*) pThis, "one ultimately last try for regular shutdown of main queue DA worker pool\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool " - "- this is not good, need to cancel now...\n"); - } else { - DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down!\n"); - } - DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n"); iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */ } @@ -1368,13 +1185,6 @@ ShutdownWorkers(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); - /* we reduce the low water mark in any case. This is not absolutely necessary, but - * it is useful because we enable DA mode at several spots below and so we do not need - * to think about the low water mark each time. - */ - pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */ - pThis->iLowWtrMrk = 0; - CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis)); @@ -1412,9 +1222,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread ASSERT(pConsumer != NULL); ASSERT(iWorkerThreads >= 0); - if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } + CHKmalloc(pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))); /* we have an object, so let's fill the properties */ objConstructSetObjInfo(pThis); @@ -1425,7 +1233,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */ pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */ - pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir); + pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; pThis->nLogDeq = 0; @@ -1814,7 +1622,7 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("YYY: deqeueu for consumer"); +dbgprintf("YYY: dequeue for consumer\n"); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) @@ -1908,11 +1716,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) d_pthread_mutex_unlock(pThis->mut); /* iterate over returned results and enqueue them in DA queue */ - for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { + //for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { + for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ +dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); } @@ -1941,6 +1751,7 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; +#if 0 } else { if(pThis->bRunsDA) { ASSERT(pThis->pqDA != NULL); @@ -1961,6 +1772,7 @@ RUNLOG_STR("XXX: done re-start reg worker"); // experimental iRet = RS_RET_TERMINATE_NOW; ; } +#endif } RETiRet; @@ -1997,60 +1809,12 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal) DEFiRet; assert(pVal != NULL); *pVal = pThis->iDeqBatchSize; -if(pThis->pqParent != NULL) +if(pThis->pqParent != NULL) // TODO: check why we actually do this! *pVal = 16; RETiRet; } -/* must only be called when the queue mutex is locked, else results - * are not stable! DA worker version (pThis *is* the *main* queue, not DA!) - */ -static int -qqueueIsIdleDA(qqueue_t *pThis) -{ - return(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk); -} -/* must only be called when the queue mutex is locked, else results - * are not stable! Regular worker version. - */ -static int -IsIdleReg(qqueue_t *pThis) -{ - return(getPhysicalQueueSize(pThis) == 0); -} - - -/* This function is called when a worker thread for the regular queue is shut down. - * If we are the primary queue, this is not really interesting to us. If, however, - * we are the DA (child) queue, that means the DA queue is empty. In that case, we - * need to signal the parent queue's DA worker, so that it can terminate DA mode. - * rgerhards, 2008-01-26 - * rgerhards, 2008-02-27: HOWEVER, in a shutdown condition, it may be that the parent's worker thread pool - * has already been terminated and destructed. This *is* a legal condition and happens - * from time to time in practice. So we need to signal only if there still is a - * parent DA worker queue. Please keep in mind that the the parent's DA worker - * pool is DIFFERENT from our (DA queue) regular worker pool. So when the parent's - * pWtpDA is destructed, there can still be some of our (DAq/wtp) threads be running. - * I am telling this, because I, too, always get confused by those... - */ -static rsRetVal -RegOnWrkrShutdown(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - if(pThis->pqParent != NULL) { - if(pThis->pqParent->pWtpDA != NULL) { /* see comment in function header from 2008-02-27 */ - wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ - } - } - - RETiRet; -} - - /* start up the queue - it must have been constructed and parameters defined * before. */ @@ -2058,8 +1822,6 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; - rsRetVal iRetLocal; - int bInitialized = 0; /* is queue already initialized? */ uchar pszBuf[64]; size_t lenBuf; @@ -2101,8 +1863,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ - /* create worker thread pools for regular operation. The DA pool is created on an as-needed - * basis, which potentially means never under most circumstances. + /* create worker thread pools for regular and DA operation. */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); @@ -2110,10 +1871,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); @@ -2121,27 +1880,11 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis)); CHKiRet(wtpConstructFinalize (pThis->pWtpReg)); - /* initialize worker thread instances */ - if(pThis->bIsDA) { - /* If we are disk-assisted, we need to check if there is a QIF file - * which we need to load. -- rgerhards, 2008-01-15 - */ - iRetLocal = qqueueHaveQIF(pThis); - if(iRetLocal == RS_RET_OK) { - DBGOPRINT((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); - InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ - bInitialized = 1; /* we are done */ - } else { - /* TODO: use logerror? -- rgerhards, 2008-01-16 */ - DBGOPRINT((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. " - "Some data may be lost\n", iRetLocal); - } - } + /* set up DA system if we have a disk-assisted queue */ + if(pThis->bIsDA) + InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ - if(Debug && !bInitialized) { - DBGOPRINT((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA " - "queue itself!)\n"); - } + DBGOPRINT((obj_t*) pThis, "queue finished initialization\n"); /* if the queue already contains data, we need to start the correct number of worker threads. This can be * the case when a disk queue has been loaded. If we did not start it here, it would never start. @@ -2284,10 +2027,16 @@ DoSaveOnShutdown(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); - InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ -dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); - /* make sure we do not timeout before we are done */ - DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); + /* we reduce the low water mark, otherwise the DA worker would terminate when + * it is reached. + */ + DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown set, restarting DA worker...\n"); + pThis->bShutdownImmediate = 0; /* would termiante the DA worker! */ + pThis->iLowWtrMrk = 0; + wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN); /* shutdown worker (only) when done (was _IMMEDIATE!) */ + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* restart DA worker */ + + DBGOPRINT((obj_t*) pThis, "waiting for DA worker to terminate...\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); @@ -2442,10 +2191,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) */ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); - /* then check if we need to add an assistance disk queue */ - if(pThis->bIsDA) - CHKiRet(ChkStrtDA(pThis)); - /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. * Basic flow control has always been implemented and protects the queue structures @@ -2489,6 +2234,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); timeoutComp(&t, pThis->toEnq); +// TODO : handle enqOnly => discard! if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); objDestruct(pUsr); @@ -2527,7 +2273,6 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) } for(i = 0 ; i < pMultiSub->nElem ; ++i) { -dbgprintf("queueMultiEnq: %d\n", i); CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); } @@ -2582,58 +2327,6 @@ dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); } -/* set queue mode to enqueue only or not - * There is one subtle issue: this method may be called during queue - * construction or while it is running. In the former case, the queue - * mutex does not yet exist (it is NULL), while in the later case it - * must be locked. The function detects the state and operates as - * required. - * rgerhards, 2008-01-16 - */ -static rsRetVal -SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - - ISOBJ_TYPE_assert(pThis, qqueue); - - /* for simplicity, we do one big mutex lock. This method is extremely seldom - * called, so that doesn't matter... -- rgerhards, 2008-01-16 - */ - if(pThis->mut != NULL) { - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); - } - - if(bEnqOnly == pThis->bEnqOnly) - FINALIZE; /* no change, nothing to do */ - - if(pThis->bQueueStarted) { - /* we need to adjust queue operation only if we are not during initial param setup */ - if(bEnqOnly == 1) { - /* switch to enqueue-only mode */ - /* this means we need to terminate all workers - that's it... */ - DBGOPRINT((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); - if(pThis->pWtpReg != NULL) - wtpWakeupAllWrkr(pThis->pWtpReg); - if(pThis->pWtpDA != NULL) - wtpWakeupAllWrkr(pThis->pWtpDA); - } else { - /* switch back to regular mode */ - ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ - } - } - - pThis->bEnqOnly = bEnqOnly; - -finalize_it: - if(pThis->mut != NULL) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - } - RETiRet; -} - - /* some simple object access methods */ DEFpropSetMeth(qqueue, bSyncQueueFiles, int) DEFpropSetMeth(qqueue, iPersistUpdCnt, int) -- cgit v1.2.3 From 90e8475260cf8ac54519b3d964d879489af879f6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Oct 2009 09:41:45 +0200 Subject: bugfix: message processing states were not set correctly in all cases however, this had no negative effect, as the message processing state was not evaluated when a batch was deleted, and that was the only case where the state could be wrong. --- runtime/queue.c | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index dacf1f13..62fb339b 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -280,15 +280,7 @@ qqueueChkIsDA(qqueue_t *pThis) } -/* Start disk-assisted queue mode. All internal settings are changed. This is supposed - * to be called from the DA worker, which must have been started before. The most important - * chore of this function is to create the DA queue object. If that function fails, - * the DA worker should return with an appropriate state, which in turn should lead to - * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this - * function is called, else a number of races will happen. - * Please note that this function may be called *while* we in DA mode. This is due to the - * fact that the DA worker calls it and the DA worker may be suspended (and restarted) due - * to inactivity timeouts. +/* Start disk-assisted queue mode. * rgerhards, 2008-01-15 */ static rsRetVal @@ -354,7 +346,7 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) +InitDA(qqueue_t *pThis, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -389,7 +381,6 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(StartDA(pThis)); } - pThis->bEnqOnly = bEnqOnly; // TODO: I think this is not needed, but first clean up shutdown processing! pThis->bRunsDA = 1; finalize_it: @@ -1409,6 +1400,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { +dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state); pUsr = pBatch->pElem[i].pUsrp; objDestruct(pUsr); } @@ -1645,7 +1637,6 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq); DeleteProcessedBatch(pThis, &pWti->batch); qqueueChkPersist(pThis, pWti->batch.nElemDeq); @@ -1882,7 +1873,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* set up DA system if we have a disk-assisted queue */ if(pThis->bIsDA) - InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ + InitDA(pThis, LOCK_MUTEX); /* initiate DA mode */ DBGOPRINT((obj_t*) pThis, "queue finished initialization\n"); -- cgit v1.2.3 From ff0912a8b74282b0b420425fc27df33cb71d96d0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 20 Oct 2009 10:51:36 +0200 Subject: bugfix: segfault when starting up with an invalid .qi file for a disk queue Failed for both pure disk as well as DA queues. Now, we emit an error message and disable disk queueing facility. --- runtime/queue.c | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 101052a1..0c9d1eea 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -55,6 +55,7 @@ #include "wti.h" #include "msg.h" #include "atomic.h" +#include "errmsg.h" #include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS @@ -65,6 +66,7 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) DEFobjCurrIf(strm) +DEFobjCurrIf(errmsg) /* forward-definitions */ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); @@ -369,8 +371,12 @@ StartDA(qqueue_t *pThis) iRet = qqueueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ - if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) + if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) { + errno = 0; /* else an errno is shown in errmsg! */ + errmsg.LogError(errno, iRet, "error starting up disk queue, using pure in-memory mode"); + pThis->bIsDA = 0; /* disable memory mode */ FINALIZE; /* something is wrong */ + } //pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ @@ -920,9 +926,12 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) ASSERT(pThis != NULL); - strm.Destruct(&pThis->tVars.disk.pWrite); - strm.Destruct(&pThis->tVars.disk.pReadDeq); - strm.Destruct(&pThis->tVars.disk.pReadDel); + if(pThis->tVars.disk.pWrite != NULL) + strm.Destruct(&pThis->tVars.disk.pWrite); + if(pThis->tVars.disk.pReadDeq != NULL) + strm.Destruct(&pThis->tVars.disk.pReadDeq); + if(pThis->tVars.disk.pReadDel != NULL) + strm.Destruct(&pThis->tVars.disk.pReadDel); RETiRet; } @@ -2165,7 +2174,8 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) pThis->bNeedDelQIF = 0; } /* indicate spool file needs to be deleted */ - CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); + if(pThis->tVars.disk.pReadDel != NULL) /* may be NULL if we had a startup failure! */ + CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); FINALIZE; /* nothing left to do, so be happy */ } @@ -2670,6 +2680,7 @@ BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(strm, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); /* now set our own handlers */ OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty); -- cgit v1.2.3 From e04e1b50025f5fa9c26abd946190dce8f797d08f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 22 Oct 2009 11:33:38 +0200 Subject: enhanced test environment (including testbench) support for enhancing probability of memory addressing failure by using non-NULL default value for malloced memory (optional, only if requested by configure option). This helps to track down some otherwise undetected issues within the testbench and is expected to be very useful in the future. --- runtime/queue.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 85acdb8e..4bbcc2b8 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -144,7 +144,7 @@ static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElemDeq) ISOBJ_TYPE_assert(pQueue, qqueue); assert(pQueue->toDeleteLst != NULL); - CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t))); + CHKmalloc(pNew = MALLOC(sizeof(toDeleteLst_t))); pNew->deqID = deqID; pNew->nElemDeq = nElemDeq; @@ -414,7 +414,7 @@ static rsRetVal qConstructFixedArray(qqueue_t *pThis) if(pThis->iMaxQueueSize == 0) ABORT_FINALIZE(RS_RET_QSIZE_ZERO); - if((pThis->tVars.farray.pBuf = malloc(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) { + if((pThis->tVars.farray.pBuf = MALLOC(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } @@ -542,7 +542,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) qLinkedList_t *pEntry; DEFiRet; - CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t)))); + CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; pEntry->pUsr = pUsr; @@ -1835,7 +1835,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ * influenced by properties which might have been set after queueConstruct () */ if(pThis->pqParent == NULL) { - pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); + pThis->mut = (pthread_mutex_t *) MALLOC (sizeof (pthread_mutex_t)); pthread_mutex_init(pThis->mut, NULL); } else { /* child queue, we need to use parent's mutex */ @@ -2148,7 +2148,7 @@ qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) if(pszPrefix == NULL) /* just unset the prefix! */ ABORT_FINALIZE(RS_RET_OK); - if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) + if((pThis->pszFilePrefix = MALLOC(sizeof(uchar) * iLenPrefix + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); pThis->lenFilePrefix = iLenPrefix; -- cgit v1.2.3 From 33e216daf7f89542cc6c91f1e97da6fdb71eecf8 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 22 Oct 2009 14:57:34 +0200 Subject: Begun to work on partial batch deletes... ... but this brings a lot of problems with it. The issue is that we still have a sequential store and we do not know how we could delete the one entry right in the middle of processing. I keep this branch if we intend to move on with it - but for now I look into a different solution... --- runtime/queue.c | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 4bbcc2b8..d9dc599a 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1362,7 +1362,7 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue * picking up things from the to-delete list. */ static inline rsRetVal -DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) +DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) { toDeleteLst_t *pTdl; qDeqID deqIDDel; @@ -1370,10 +1370,11 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); + assert(nDeleted > 0); pTdl = tdlPeek(pThis); /* get current head element */ if(pTdl == NULL) { /* to-delete list empty */ - DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq); + DoDeleteBatchFromQStore(pThis, nDeleted); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; pTdl = tdlPeek(pThis); @@ -1386,7 +1387,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) } else { /* can not delete, insert into to-delete list */ dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); - CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq)); + CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted)); } finalize_it: @@ -1395,7 +1396,10 @@ finalize_it: /* Delete a batch of processed user objects from the queue, which includes - * destructing the objects themself. + * destructing the objects themself. It is assumed that batches + * are processed in sequential order, that is if we find one unprocessed entry, + * that indicates the end of the delete operation. Note that this function MUST + * be called only for non-empty batches! * rgerhards, 2009-05-13 */ static inline rsRetVal @@ -1408,13 +1412,17 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); - for(i = 0 ; i < pBatch->nElem ; ++i) { +dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); + for(i = 0 ; i < (pBatch->nElem) && (pBatch->pElem[i].state != BATCH_STATE_RDY); ++i) { dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state); pUsr = pBatch->pElem[i].pUsrp; objDestruct(pUsr); } - iRet = DeleteBatchFromQStore(pThis, pBatch); +dbgprintf("we deleted %d objects\n", i); + + if(i > 0) + iRet = DeleteBatchFromQStore(pThis, pBatch, i); pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ @@ -1423,7 +1431,11 @@ dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch /* dequeue as many user pointers as are available, until we hit the configured - * upper limit of pointers. + * upper limit of pointers. Note that this function also deletes all processed + * objects from the previous batch. However, it is perfectly valid that the + * previous batch contained NO objects at all. For example, this happens + * immediately after system startup or when a queue was exhausted and the queue + * worker needed to wait for new data. * This must only be called when the queue mutex is LOOKED, otherwise serious * malfunction will happen. */ @@ -1716,8 +1728,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) d_pthread_mutex_unlock(pThis->mut); /* iterate over returned results and enqueue them in DA queue */ - //for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { - for(i = 0 ; i < pWti->batch.nElem ; i++) { + for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { + //for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 -- cgit v1.2.3 From 6be07a8f8a3b9b7baf77e42639473e9b1a990e29 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 26 Oct 2009 10:09:04 +0100 Subject: bugfix: potential abort if inputname property was not set primarily a problem of imdiag. Also added some fix for a potential situation during cancel processing. That one is not considered vital and may later be removed again. --- runtime/queue.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 4bbcc2b8..67bc40c2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1612,18 +1612,21 @@ RateLimiter(qqueue_t *pThis) } -/* This dequeues the next batch. +/* This dequeues the next batch. Note that this function must not be + * cancelled, else it will leave back an inconsistent state. * rgerhards, 2009-05-20 */ static inline rsRetVal DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) { + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); dbgprintf("YYY: dequeue for consumer\n"); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) @@ -1631,6 +1634,7 @@ dbgprintf("YYY: dequeue for consumer\n"); finalize_it: + pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } -- cgit v1.2.3 From da53802c96a59a990859706219398dce709ba1b3 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 26 Oct 2009 12:21:07 +0100 Subject: implemented solution for cancel at shutdown/unprocessed entries We do now enqueue those objects that are left unprocessed. This enables us to delete the full batch, what is exactly what we need to do. --- runtime/queue.c | 58 +++++++++++++++++++++++---------------------------------- 1 file changed, 23 insertions(+), 35 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index d9dc599a..d1eefde6 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -70,6 +70,7 @@ DEFobjCurrIf(strm) DEFobjCurrIf(errmsg) /* forward-definitions */ +static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -1396,10 +1397,8 @@ finalize_it: /* Delete a batch of processed user objects from the queue, which includes - * destructing the objects themself. It is assumed that batches - * are processed in sequential order, that is if we find one unprocessed entry, - * that indicates the end of the delete operation. Note that this function MUST - * be called only for non-empty batches! + * destructing the objects themself. Any entries not marked as finally + * processed are enqueued again. The new enqueue is necessary because we have a * rgerhards, 2009-05-13 */ static inline rsRetVal @@ -1407,19 +1406,34 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; void *pUsr; + int nEnqueued = 0; + rsRetVal localRet; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); - for(i = 0 ; i < (pBatch->nElem) && (pBatch->pElem[i].state != BATCH_STATE_RDY); ++i) { + for(i = 0 ; i < pBatch->nElem ; ++i) { dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state); pUsr = pBatch->pElem[i].pUsrp; + if( pBatch->pElem[i].state == BATCH_STATE_RDY + || pBatch->pElem[i].state == BATCH_STATE_SUB) { +RUNLOG_STR("we need to requeue the entry"); + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, + (obj_t*)MsgAddRef((msg_t*) pUsr)); + ++nEnqueued; + if(localRet != RS_RET_OK) { + DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); + } + } objDestruct(pUsr); } -dbgprintf("we deleted %d objects\n", i); +dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + + if(nEnqueued > 0) + qqueueChkPersist(pThis, nEnqueued); if(i > 0) iRet = DeleteBatchFromQStore(pThis, pBatch, i); @@ -1735,7 +1749,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * rgerhards, 2009-05-28 */ dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, + (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } /* now we are done, but need to re-aquire the mutex */ @@ -1749,12 +1765,6 @@ finalize_it: /* must only be called when the queue mutex is locked, else results * are not stable! - * If we are a child, we have done our duty when the queue is empty. In that case, - * we can terminate. - * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from - * the DA queue. - * If our queue is in destruction, we drain to the DA queue and so we shall not terminate - * until we have done so. */ static rsRetVal qqueueChkStopWrkrDA(qqueue_t *pThis) @@ -1763,28 +1773,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; -#if 0 - } else { - if(pThis->bRunsDA) { - ASSERT(pThis->pqDA != NULL); - if( pThis->pqDA->bEnqOnly - && pThis->pqDA->sizeOnDiskMax > 0 - && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { - /* this queue can never grow, so we can give up... */ - iRet = RS_RET_TERMINATE_NOW; - } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { -dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); - iRet = RS_RET_TERMINATE_NOW; -RUNLOG_STR("XXX: re-start reg worker"); -if(!pThis->bShutdownImmediate) - qqueueAdviseMaxWorkers(pThis); -RUNLOG_STR("XXX: done re-start reg worker"); - } - } else { - // experimental iRet = RS_RET_TERMINATE_NOW; - ; - } -#endif } RETiRet; -- cgit v1.2.3 From b585a4e90940e9f4d2d288d462d1c273ae5ffa09 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 26 Oct 2009 18:53:01 +0100 Subject: addressed some race issues during queue shutdown these occured in very unusual scenarios where we had a DA-queue running in parallel and very lengthy actions. Then, in some situations, the shutdown could hang. The code needs some addition lab time, but is believed to be much better than any previous version. --- runtime/queue.c | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index b4f00446..d9942365 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1070,6 +1070,11 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout"); /* instruct workers to finish ASAP, even if still work exists */ pThis->bEnqOnly = 1; pThis->bShutdownImmediate = 1; + /* now DA queue */ + if(pThis->bIsDA) { + pThis->pqDA->bEnqOnly = 1; + pThis->pqDA->bShutdownImmediate = 1; + } // TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance... /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ @@ -1356,14 +1361,10 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue /* remove messages from the physical queue store that are fully processed. This is - * controlled via the to-delete list. We can only delete those elements, that are - * at the current physical tail of the queue. If the batch is from another position, - * we schedule it for deletion, but actual deletion will happen at a later call - * of this function here. We always delete as much as possible, which includes - * picking up things from the to-delete list. + * controlled via the to-delete list. */ static inline rsRetVal -DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) +DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) { toDeleteLst_t *pTdl; qDeqID deqIDDel; @@ -1371,11 +1372,10 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); - assert(nDeleted > 0); pTdl = tdlPeek(pThis); /* get current head element */ if(pTdl == NULL) { /* to-delete list empty */ - DoDeleteBatchFromQStore(pThis, nDeleted); + DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; pTdl = tdlPeek(pThis); @@ -1385,10 +1385,12 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch, int nDeleted) ++deqIDDel; pTdl = tdlPeek(pThis); } + /* old entries deleted, now delete current ones... */ + DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else { /* can not delete, insert into to-delete list */ dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); - CHKiRet(tdlAdd(pThis, pBatch->deqID, nDeleted)); + CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); } finalize_it: @@ -1408,20 +1410,23 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) void *pUsr; int nEnqueued = 0; rsRetVal localRet; + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); for(i = 0 ; i < pBatch->nElem ; ++i) { -dbgprintf("XXX: deleteProcessedBatch delete entry %d with state %d\n", i, pBatch->pElem[i].state); +dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n", +i, pBatch->pElem[i].pUsrp, ((msg_t*)pBatch->pElem[i].pUsrp)->iRefCount, pBatch->pElem[i].state); pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { -RUNLOG_STR("we need to requeue the entry"); localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*) pUsr)); +dbgprintf("we need to requeue the entry, refcnt now %d\n", ((msg_t*) pUsr)->iRefCount); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); @@ -1435,11 +1440,11 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque if(nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued); - if(i > 0) - iRet = DeleteBatchFromQStore(pThis, pBatch, i); + iRet = DeleteBatchFromQStore(pThis, pBatch); pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ + pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -2072,7 +2077,8 @@ CODESTARTobjDestruct(qqueue) * we need to reset the logical dequeue pointer, persist the queue if configured to do * so and then destruct everything. -- rgerhards, 2009-05-26 */ - CHKiRet(pThis->qUnDeqAll(pThis)); +RUNLOG_STR("XXX: NOT undequeueing entries!"); + //CHKiRet(pThis->qUnDeqAll(pThis)); if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); -- cgit v1.2.3 From a5cddbdbce76f14b4216aa74698bbc168ca5409f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 26 Oct 2009 20:24:28 +0100 Subject: shuffled cancelability state to different spot ... but in anticipation of changing cancel processing altogether... --- runtime/queue.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index d9942365..be169be2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1650,14 +1650,12 @@ RateLimiter(qqueue_t *pThis) static inline rsRetVal DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) { - int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); dbgprintf("YYY: dequeue for consumer\n"); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) @@ -1665,7 +1663,6 @@ dbgprintf("YYY: dequeue for consumer\n"); finalize_it: - pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -1696,16 +1693,20 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti) static rsRetVal ConsumerReg(qqueue_t *pThis, wti_t *pWti) { + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit @@ -1740,16 +1741,20 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti) { int i; + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { //for(i = 0 ; i < pWti->batch.nElem ; i++) { -- cgit v1.2.3 From 24cd5aee4720a98e321b69d2d9b5948348abd571 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 10:00:23 +0100 Subject: fixed race condition during queue shutdown Problems could happen if the queue worker needed to be cancelled and this cancellation happened inside queue-code (including wtp, wti). We have now solved this by disabling cancellation while in this code and only enabling it when working inside the user consumer. This exactly matches the use case for which cancellation may be needed. --- runtime/queue.c | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index be169be2..1539db6d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1410,13 +1410,11 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) void *pUsr; int nEnqueued = 0; rsRetVal localRet; - int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); for(i = 0 ; i < pBatch->nElem ; ++i) { dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n", @@ -1444,7 +1442,6 @@ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnque pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ - pthread_setcancelstate(iCancelStateSave, NULL); RETiRet; } @@ -1699,13 +1696,13 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); + /* at this spot, we may be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); @@ -1719,6 +1716,9 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); } + /* but now cancellation is no longer permitted */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + /* now we are done, but need to re-aquire the mutex */ d_pthread_mutex_lock(pThis->mut); @@ -1747,13 +1747,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); CHKiRet(DequeueForConsumer(pThis, pWti)); /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); + /* at this spot, we may be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { @@ -1768,6 +1768,9 @@ dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp) pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } + /* but now cancellation is no longer permitted */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + /* now we are done, but need to re-aquire the mutex */ d_pthread_mutex_lock(pThis->mut); @@ -2332,7 +2335,6 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ qqueueAdviseMaxWorkers(pThis); -dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); -- cgit v1.2.3 From 553d1880d47b57b2f4e023c2017675f010afd9a0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 10:36:53 +0100 Subject: some cleanup --- runtime/queue.c | 72 --------------------------------------------------------- 1 file changed, 72 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 1539db6d..781d115f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -239,7 +239,6 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); if(!pThis->bEnqOnly) { -dbgprintf("AdviseMaxWorkers: log Queue Size: %d, high water mark %d\n", getLogicalQueueSize(pThis) , pThis->iHighWtrMrk); if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ @@ -486,26 +485,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis) } -/* reset the logical dequeue pointer to the physical dequeue position. - * This is only needed after we cancelled workers (during queue shutdown). - */ -static rsRetVal -qUnDeqAllFixedArray(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - DBGOPRINT((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n", - pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq); - - pThis->tVars.farray.deqhead = pThis->tVars.farray.head; - pThis->nLogDeq = 0; - - RETiRet; -} - - /* -------------------- linked list -------------------- */ @@ -597,26 +576,6 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis) } -/* reset the logical dequeue pointer to the physical dequeue position. - * This is only needed after we cancelled workers (during queue shutdown). - */ -static rsRetVal -qUnDeqAllLinkedList(qqueue_t *pThis) -{ - DEFiRet; - - ASSERT(pThis != NULL); - - DBGOPRINT((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n", - pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq); - - pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot; - pThis->nLogDeq = 0; - - RETiRet; -} - - /* -------------------- disk -------------------- */ @@ -863,16 +822,6 @@ finalize_it: } -/* This is a dummy function for disks - we do not need to reset anything - * because everything is already persisted... - */ -static rsRetVal -qUnDeqAllDisk(__attribute__((unused)) qqueue_t *pThis) -{ - return RS_RET_OK; -} - - /* -------------------- direct (no queueing) -------------------- */ static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -917,12 +866,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal -qUnDeqAllDirect(__attribute__((unused)) qqueue_t *pThis) -{ - return RS_RET_OK; -} - /* --------------- end type-specific handlers -------------------- */ @@ -1192,7 +1135,6 @@ ShutdownWorkers(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); -dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis)); if(getPhysicalQueueSize(pThis) > 0) { CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis)); @@ -1260,7 +1202,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddFixedArray; pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; - pThis->qUnDeqAll = qUnDeqAllFixedArray; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; @@ -1268,7 +1209,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddLinkedList; pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; - pThis->qUnDeqAll = qUnDeqAllLinkedList; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1276,7 +1216,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; - pThis->qUnDeqAll = qUnDeqAllDisk; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ break; @@ -1285,7 +1224,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qDestruct = qDestructDirect; pThis->qAdd = qAddDirect; pThis->qDel = qDelDirect; - pThis->qUnDeqAll = qUnDeqAllDirect; break; } @@ -1471,7 +1409,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { -dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); /* check if we should discard this element */ @@ -1652,7 +1589,6 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("YYY: dequeue for consumer\n"); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) @@ -2080,14 +2016,6 @@ CODESTARTobjDestruct(qqueue) if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL) ShutdownWorkers(pThis); - /* now all workers are terminated. Messages may exist. Also, some logically dequeued - * messages may never have been processed because their worker was terminated. So - * we need to reset the logical dequeue pointer, persist the queue if configured to do - * so and then destruct everything. -- rgerhards, 2009-05-26 - */ -RUNLOG_STR("XXX: NOT undequeueing entries!"); - //CHKiRet(pThis->qUnDeqAll(pThis)); - if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); } -- cgit v1.2.3 From 796b01036db027077b19b8c183d51bcd93c3948d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 10:37:22 +0100 Subject: fix compile bug with last commit --- runtime/queue.c | 1 - 1 file changed, 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 781d115f..d1a97ba6 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -239,7 +239,6 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); if(!pThis->bEnqOnly) { - getLogicalQueueSize(pThis) , pThis->iHighWtrMrk); if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } else { -- cgit v1.2.3 From 386b7cd2f2ae6f9ac8e0b9c8b49934398c159ea4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 10:44:55 +0100 Subject: removed no longer needed flag variable --- runtime/queue.c | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index d1a97ba6..d219d74d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -386,8 +386,6 @@ InitDA(qqueue_t *pThis, int bLockMutex) CHKiRet(StartDA(pThis)); } - pThis->bRunsDA = 1; - finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); RETiRet; @@ -1235,7 +1233,7 @@ finalize_it: /* This function checks if the provided message shall be discarded and does so, if needed. * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to * provide real-time creation of spool files. - * Note: cached copies of iQueueSize and bRunsDA are provided so that no mutex locks are required. + * Note: cached copies of iQueueSize is provided so that no mutex locks are required. * The caller must have obtained them while the mutex was locked. Of course, these values may no * longer be current, but that is OK for the discard check. At worst, the message is either processed * or discarded when it should not have been. As discarding is in itself somewhat racy and erratic, @@ -1245,7 +1243,7 @@ finalize_it: * the return state! * rgerhards, 2008-01-24 */ -static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr) { DEFiRet; rsRetVal iRetLocal; @@ -1254,7 +1252,7 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, voi ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_assert(pUsr); - if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { + if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", @@ -1411,7 +1409,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz CHKiRet(qqueueDeq(pThis, &pUsr)); /* check if we should discard this element */ - localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr); + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr); if(localRet == RS_RET_QUEUE_FULL) { ++nDiscarded; continue; @@ -1734,9 +1732,7 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) /* must only be called when the queue mutex is locked, else results * are not stable! * If we are a child, we have done our duty when the queue is empty. In that case, - * we can terminate. - * Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from - * the DA queue + * we can terminate. Version for the regular worker thread. */ static rsRetVal ChkStopWrkrReg(qqueue_t *pThis) @@ -2135,7 +2131,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ - CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. -- cgit v1.2.3 From b1db196953713dd09c499a3edf81347bd903c19e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 3 Nov 2009 18:44:02 +0100 Subject: one step closer to dynamically loadable parsers This is a milestone commit, which adds new code that breaks nothing, but also does not add any visible change. Just prep work... --- runtime/queue.c | 1 - 1 file changed, 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index d219d74d..ed4ba83e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1690,7 +1690,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { - //for(i = 0 ; i < pWti->batch.nElem ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 -- cgit v1.2.3 From 1b7f5c54684db29c096e09238648a45dce78ebee Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 4 Nov 2009 10:40:27 +0100 Subject: moved rfc3164/5424 code to new parser modules another milestone commit: the program works, the new interface is used, some more cleanup is needed and the per-ruleset config options are still missing. But we are getting closer... --- runtime/queue.c | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index ed4ba83e..1c885925 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1350,16 +1350,12 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); -dbgprintf("XXX: deleteProcessedBatch total entries %d with state[0] %d\n", pBatch->nElem, pBatch->pElem[0].state); for(i = 0 ; i < pBatch->nElem ; ++i) { -dbgprintf("XXX: deleteProcessedBatch delete entry %d, ptr %p, refcnt %d with state %d\n", -i, pBatch->pElem[i].pUsrp, ((msg_t*)pBatch->pElem[i].pUsrp)->iRefCount, pBatch->pElem[i].state); pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*) pUsr)); -dbgprintf("we need to requeue the entry, refcnt now %d\n", ((msg_t*) pUsr)->iRefCount); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); @@ -1368,7 +1364,7 @@ dbgprintf("we need to requeue the entry, refcnt now %d\n", ((msg_t*) pUsr)->iRef objDestruct(pUsr); } -dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); if(nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued); @@ -1656,7 +1652,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) d_pthread_mutex_lock(pThis->mut); finalize_it: -dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + dbgprintf("regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; } -- cgit v1.2.3 From 8b246de2a587454f9260ff91192d27a2e168ea2d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 12 Nov 2009 17:12:10 +0100 Subject: some light performance enhancement ...by replacing time() call with much faster (at least under linux) gettimeofday() calls. --- runtime/queue.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 1c885925..b29ec7ac 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -56,6 +56,7 @@ #include "msg.h" #include "atomic.h" #include "errmsg.h" +#include "datetime.h" #include "unicode-helper.h" #include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ @@ -68,6 +69,7 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) DEFobjCurrIf(strm) DEFobjCurrIf(errmsg) +DEFobjCurrIf(datetime) /* forward-definitions */ static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); @@ -1526,7 +1528,7 @@ RateLimiter(qqueue_t *pThis) iDelay = 0; if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ /* time calls are expensive, so only do them when needed */ - time(&tCurr); + datetime.GetTime(&tCurr); localtime_r(&tCurr, &m); iHrCurr = m.tm_hour; @@ -2327,6 +2329,7 @@ BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(strm, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); /* now set our own handlers */ -- cgit v1.2.3