diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2009-11-13 10:00:23 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2009-11-13 10:00:23 +0100 |
commit | 2a3c25a5c68db66ec35c155f7ecbe65079aed0e0 (patch) | |
tree | b899a3cd256d60f3415753aa925c66fc0504373e /runtime/queue.c | |
parent | adb880f17bee807df6058ab3772eed3d35bd8d37 (diff) | |
parent | 8b246de2a587454f9260ff91192d27a2e168ea2d (diff) | |
download | rsyslog-2a3c25a5c68db66ec35c155f7ecbe65079aed0e0.tar.gz rsyslog-2a3c25a5c68db66ec35c155f7ecbe65079aed0e0.tar.bz2 rsyslog-2a3c25a5c68db66ec35c155f7ecbe65079aed0e0.zip |
Begin new beta branch
Merge branch 'master' into beta
Conflicts:
ChangeLog
tests/Makefile.am
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 705 |
1 files changed, 183 insertions, 522 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 101052a1..b29ec7ac 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -55,6 +55,9 @@ #include "wti.h" #include "msg.h" #include "atomic.h" +#include "errmsg.h" +#include "datetime.h" +#include "unicode-helper.h" #include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS @@ -65,14 +68,15 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) DEFobjCurrIf(strm) +DEFobjCurrIf(errmsg) +DEFobjCurrIf(datetime) /* forward-definitions */ +static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); -static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); -static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); @@ -143,7 +147,7 @@ static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElemDeq) ISOBJ_TYPE_assert(pQueue, qqueue); assert(pQueue->toDeleteLst != NULL); - CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t))); + CHKmalloc(pNew = MALLOC(sizeof(toDeleteLst_t))); pNew->deqID = deqID; pNew->nElemDeq = nElemDeq; @@ -228,7 +232,8 @@ static inline void queueDrain(qqueue_t *pThis) * this point in time. The mutex must be locked when * ths function is called. -- rgerhards, 2008-01-25 */ -static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) +static inline rsRetVal +qqueueAdviseMaxWorkers(qqueue_t *pThis) { DEFiRet; int iMaxWorkers; @@ -236,48 +241,18 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); if(!pThis->bEnqOnly) { - if(pThis->bRunsDA) { - /* if we have not yet reached the high water mark, there is no need to start a - * worker. -- rgerhards, 2008-01-26 - */ - if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ - } - } - /* regular workers always run */ - if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; + if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } else { - iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + if(getLogicalQueueSize(pThis) == 0) { + iMaxWorkers = 0; + } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; + } else { + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ - } - - RETiRet; -} - - -/* Destruct DA queue. This is the last part of DA-to-normal-mode - * transistion. This is called asynchronously and some time quite a - * while after the actual transistion. The key point is that we need to - * do it at some later time, because we need to destruct the DA queue. That, - * however, can not be done in a thread that has been signalled - * This is to be called when we revert back to our own queue. - * This function must be called with the queue mutex locked (the wti - * class ensures this). - * rgerhards, 2008-01-15 - */ -static rsRetVal -TurnOffDAMode(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - ASSERT(pThis->bRunsDA); - if(getLogicalQueueSize(pThis->pqDA) == 0) { - pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ - DBGOPRINT((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", - iRet); } RETiRet; @@ -308,15 +283,7 @@ qqueueChkIsDA(qqueue_t *pThis) } -/* Start disk-assisted queue mode. All internal settings are changed. This is supposed - * to be called from the DA worker, which must have been started before. The most important - * chore of this function is to create the DA queue object. If that function fails, - * the DA worker should return with an appropriate state, which in turn should lead to - * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this - * function is called, else a number of races will happen. - * Please note that this function may be called *while* we in DA mode. This is due to the - * fact that the DA worker calls it and the DA worker may be suspended (and restarted) due - * to inactivity timeouts. +/* Start disk-assisted queue mode. * rgerhards, 2008-01-15 */ static rsRetVal @@ -348,33 +315,22 @@ StartDA(qqueue_t *pThis) CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles)); CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); - CHKiRet(SetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); + CHKiRet(qqueueSettoQShutdown(pThis->pqDA, pThis->toQShutdown)); CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0)); - // experimental: XXX - CHKiRet(qqueueSettoWrkShutdown(pThis->pqDA, 0)); - - if(pThis->toQShutdown == 0) { - CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ - } else { - /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND - * have an obviously large backlog, we can't finish it in any case. So there is no point - * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15 - */ - CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1)); - } - iRet = qqueueStart(pThis->pqDA); /* file not found is expected, that means it is no previous QIF available */ - if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) + if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) { + errno = 0; /* else an errno is shown in errmsg! */ + errmsg.LogError(errno, iRet, "error starting up disk queue, using pure in-memory mode"); + pThis->bIsDA = 0; /* disable memory mode */ FINALIZE; /* something is wrong */ + } - //pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ - - DBGOPRINT((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n", + DBGOPRINT((obj_t*) pThis, "DA queue initialized, disk queue 0x%lx\n", qqueueGetID(pThis->pqDA)); finalize_it: @@ -397,7 +353,7 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) +InitDA(qqueue_t *pThis, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -412,91 +368,32 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) * rgerhards, 2008-01-24 * NOTE: this is the DA worker *pool*, not the DA queue! */ - if(pThis->pWtpDA == NULL) { - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis)); - CHKiRet(wtpConstruct (&pThis->pWtpDA)); - CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); - CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); - CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode)); - CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); - CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); - CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); - CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); - CHKiRet(wtpConstructFinalize (pThis->pWtpDA)); - } + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis)); + CHKiRet(wtpConstruct (&pThis->pWtpDA)); + CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); + CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); + CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); + CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); + CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); + CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); + CHKiRet(wtpConstructFinalize (pThis->pWtpDA)); /* if we reach this point, we have a "good" DA worker pool */ - /* indicate we now run in DA mode - this is reset by the DA worker if it fails */ - pThis->bDAEnqOnly = bEnqOnly; - /* now construct the actual queue (if it does not already exist) */ if(pThis->pqDA == NULL) { CHKiRet(StartDA(pThis)); } - pThis->bRunsDA = 1; - - /* now we must now adivse the wtp that we need one worker. If none is yet active, - * that will also start one up. If we forgot that step, everything would be stalled - * until the next enqueue request. - */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues always have just one worker max */ - finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); RETiRet; } -/* check if we need to start disk assisted mode and send some signals to - * keep it running if we are already in it. It also checks if DA mode is - * partially initialized, in which case it waits for initialization to - * complete. - * rgerhards, 2008-01-14 - */ -static rsRetVal -ChkStrtDA(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - /* if we do not hit the high water mark, we have nothing to do */ - if(getPhysicalQueueSize(pThis) != pThis->iHighWtrMrk) - ABORT_FINALIZE(RS_RET_OK); - - if(pThis->bRunsDA) { - /* then we need to signal that we are at the high water mark again. If that happens - * on our way down the queue, that doesn't matter, because then nobody is waiting - * on the condition variable. - * (Remember that a DA queue stops draining the queue once it has reached the low - * water mark and restarts it when the high water mark is reached again - this is - * what this code here is responsible for. Please note that all workers may have been - * terminated due to the inactivity timeout, thus we need to advise the pool that - * we need at least one). - */ - DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", - getPhysicalQueueSize(pThis)); - qqueueAdviseMaxWorkers(pThis); - } else { - /* this is the case when we are currently not running in DA mode. So it is time - * to turn it back on. - */ - DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", - getPhysicalQueueSize(pThis)); - InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ - } - -finalize_it: - RETiRet; -} - - /* --------------- end code for disk-assisted queue modes -------------------- */ @@ -516,7 +413,7 @@ static rsRetVal qConstructFixedArray(qqueue_t *pThis) if(pThis->iMaxQueueSize == 0) ABORT_FINALIZE(RS_RET_QSIZE_ZERO); - if((pThis->tVars.farray.pBuf = malloc(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) { + if((pThis->tVars.farray.pBuf = MALLOC(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } @@ -587,26 +484,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis) } -/* reset the logical dequeue pointer to the physical dequeue position. - * This is only needed after we cancelled workers (during queue shutdown). - */ -static rsRetVal -qUnDeqAllFixedArray(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - DBGOPRINT((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n", - pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq); - - pThis->tVars.farray.deqhead = pThis->tVars.farray.head; - pThis->nLogDeq = 0; - - RETiRet; -} - - /* -------------------- linked list -------------------- */ @@ -644,7 +521,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) qLinkedList_t *pEntry; DEFiRet; - CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t)))); + CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; pEntry->pUsr = pUsr; @@ -698,26 +575,6 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis) } -/* reset the logical dequeue pointer to the physical dequeue position. - * This is only needed after we cancelled workers (during queue shutdown). - */ -static rsRetVal -qUnDeqAllLinkedList(qqueue_t *pThis) -{ - DEFiRet; - - ASSERT(pThis != NULL); - - DBGOPRINT((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n", - pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq); - - pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot; - pThis->nLogDeq = 0; - - RETiRet; -} - - /* -------------------- disk -------------------- */ @@ -733,44 +590,6 @@ finalize_it: } -/* This method checks if we have a QIF file for the current queue (no matter of - * queue mode). Returns RS_RET_OK if we have a QIF file or an error status otherwise. - * rgerhards, 2008-01-15 - */ -static rsRetVal -qqueueHaveQIF(qqueue_t *pThis) -{ - DEFiRet; - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; - struct stat stat_buf; - - ISOBJ_TYPE_assert(pThis, qqueue); - - if(pThis->pszFilePrefix == NULL) - ABORT_FINALIZE(RS_RET_NO_FILEPREFIX); - - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - - /* check if the file exists */ - if(stat((char*) pszQIFNam, &stat_buf) == -1) { - if(errno == ENOENT) { - DBGOPRINT((obj_t*) pThis, "no .qi file found\n"); - ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); - } else { - DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno); - ABORT_FINALIZE(RS_RET_IO_ERROR); - } - } - /* If we reach this point, we have a .qi file */ - -finalize_it: - RETiRet; -} - - /* The method loads the persistent queue information. * rgerhards, 2008-01-11 */ @@ -920,9 +739,12 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) ASSERT(pThis != NULL); - strm.Destruct(&pThis->tVars.disk.pWrite); - strm.Destruct(&pThis->tVars.disk.pReadDeq); - strm.Destruct(&pThis->tVars.disk.pReadDel); + if(pThis->tVars.disk.pWrite != NULL) + strm.Destruct(&pThis->tVars.disk.pWrite); + if(pThis->tVars.disk.pReadDeq != NULL) + strm.Destruct(&pThis->tVars.disk.pReadDeq); + if(pThis->tVars.disk.pReadDel != NULL) + strm.Destruct(&pThis->tVars.disk.pReadDel); RETiRet; } @@ -999,16 +821,6 @@ finalize_it: } -/* This is a dummy function for disks - we do not need to reset anything - * because everything is already persisted... - */ -static rsRetVal -qUnDeqAllDisk(__attribute__((unused)) qqueue_t *pThis) -{ - return RS_RET_OK; -} - - /* -------------------- direct (no queueing) -------------------- */ static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -1041,7 +853,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) batchObj.pUsrp = (obj_t*) pUsr; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; - iRet = pThis->pConsumer(pThis->pUsr, &singleBatch); + iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); objDestruct(pUsr); RETiRet; @@ -1053,12 +865,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal -qUnDeqAllDirect(__attribute__((unused)) qqueue_t *pThis) -{ - return RS_RET_OK; -} - /* --------------- end type-specific handlers -------------------- */ @@ -1112,15 +918,17 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) } -/* Try to terminate queue worker threads within the regular shutdown interval. - * Both the regular and DA queue (if it exists) is waited for, but on the same timeout. - * After this function returns, the workers must either be finished or some force - * to finish them must be applied. - * This function also instructs the DA worker pool (if it exists) to terminate. This is done - * in preparation of final queue shutdown. - * rgerhards, 2009-05-27 +/* Try to shut down regular and DA queue workers, within the queue timeout + * period. That means processing continues as usual. This is the expected + * usual case, where during shutdown those messages remaining are being + * processed. At this point, it is acceptable that the queue can not be + * fully depleted, that case is handled in the next step. During this phase, + * we first shut down the main queue DA worker to prevent new data to arrive + * at the DA queue, and then we ask the regular workers of both the Regular + * and DA queue to try complete processing. + * rgerhards, 2009-10-14 */ -static rsRetVal +static inline rsRetVal tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) { struct timespec tTimeout; @@ -1130,30 +938,26 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */ - if(getPhysicalQueueSize(pThis) > 0) { - if(pThis->bRunsDA) { - /* We may have waited on the low water mark. As it may have changed, we - * see if we reactivate the worker. - */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); - } + if(pThis->bIsDA) { + /* We need to lock the mutex, as otherwise we may have a race that prevents + * us from awaking the DA worker. */ + d_pthread_mutex_lock(pThis->mut); + + /* tell regular queue DA worker to stop shuffling messages to DA queue... */ + pThis->pqDA->bEnqOnly = 1; + wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); + DBGOPRINT((obj_t*) pThis, "awoke DA worker, told it to shut down.\n"); + + /* also tell the DA queue worker to shut down, so that it already knows... */ + wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN); + wtpAdviseMaxWorkers(pThis->pqDA->pWtpReg, 1); /* awake its lone worker */ + DBGOPRINT((obj_t*) pThis, "awoke DA queue regular worker, told it to shut down when done.\n"); + + d_pthread_mutex_unlock(pThis->mut); } - d_pthread_mutex_unlock(pThis->mut); - /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found - * out there are no active workers - that doesn't matter: the wtp knows about that and so will - * return immediately. - * We do not yet care about the DA worker - that will be handled down later in the process. - * Note that we must not request shutdown right now - that may introduce a race: if the regular queue - * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA - * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead - * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way - * (from a performance point of view). So we don't do anything right now. The DA queue will continue to - * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything - * by not requesting shutdown now. - * rgerhards, 2008-01-25 - */ + /* first calculate absolute timeout - we need the absolute value here, because we need to coordinate * shutdown of both the regular and DA queue on *the same* timeout. */ @@ -1180,16 +984,6 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } else { DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n"); } - /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting - * the queue, it is restarted at a later stage. We don't care here if a timeout happens. - */ - DBGOPRINT((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); - } else { - DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down on first try.\n"); - } } RETiRet; @@ -1197,11 +991,12 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) /* Try to shut down regular and DA queue workers, within the action timeout - * period. Note that the main queue DA worker is still unaffected (and may shuffle - * data to the disk queue while we terminate the other workers). Not finishing - * processing all messages is now OK (but they may be preserved later, depending - * on bSaveOnShutdown setting). - * rgerhards, 2009-05-27 + * period. This aborts processing, but at the end of the current action, in + * a well-defined manner. During this phase, we terminate all three worker + * pools, including the regular queue DA worker if it not yet has terminated. + * Not finishing processing all messages is OK (and expected) at this stage + * (they may be preserved later, depending * on bSaveOnShutdown setting). + * rgerhards, 2009-10-14 */ static rsRetVal tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) @@ -1210,20 +1005,20 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) rsRetVal iRetLocal; DEFiRet; +RUNLOG_STR("trying to shutdown workers within Action Timeout"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ - /* note that we modify bEnqOnly directly, because going through the method would - * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 - */ pThis->bEnqOnly = 1; - /* need to set this so that the DA queue begins shutdown in parallel! */ - if(pThis->pqDA != NULL) { + pThis->bShutdownImmediate = 1; + /* now DA queue */ + if(pThis->bIsDA) { pThis->pqDA->bEnqOnly = 1; - wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); + pThis->pqDA->bShutdownImmediate = 1; } +// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance... /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ timeoutComp(&tTimeout, pThis->toActShutdown); DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n"); @@ -1247,17 +1042,14 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA " "queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } - /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This - * is necessary because we may be in a situation where the DA queue regular worker and the - * main queue worker stopped rather quickly. In this case, there is almost no time (and - * probably no thread switch!) between the point where we instructed the main queue DA - * worker to shutdown and this code location. In consequence, it may not even have - * noticed that it should should down, less acutally done this. So we provide it with a - * fixed 100ms timeout to try complete its work, what usually should be sufficient. - * rgerhards, 2009-10-06 + + /* and now we need to terminate the DA worker itself. We always grant it a 100ms timeout, + * which should be sufficient and usually not be required (it is expected to have finished + * long before while we were processing the queue timeout in shutdown phase 1). + * rgerhards, 2009-10-14 */ timeoutComp(&tTimeout, 100); - DBGOPRINT((obj_t*) pThis, "last try for regular shutdown of main queue DA worker pool\n"); + DBGOPRINT((obj_t*) pThis, "trying regular shutdown of main queue DA worker pool\n"); iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); if(iRetLocal == RS_RET_TIMED_OUT) { DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool " @@ -1272,7 +1064,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) /* This function cancels all remaining regular workers for both the main and the DA - * queue. The main queue's DA worker pool continues to run (if it exists and is active). + * queue. * rgerhards, 2009-05-29 */ static rsRetVal @@ -1306,7 +1098,7 @@ cancelWorkers(qqueue_t *pThis) * big trouble when resetting the logical dequeue pointer. This operation can only be * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 */ - DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n"); + DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n"); iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */ } @@ -1341,13 +1133,6 @@ ShutdownWorkers(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); - /* we reduce the low water mark in any case. This is not absolutely necessary, but - * it is useful because we enable DA mode at several spots below and so we do not need - * to think about the low water mark each time. - */ - pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */ - pThis->iLowWtrMrk = 0; - CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); if(getPhysicalQueueSize(pThis) > 0) { @@ -1375,7 +1160,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) { DEFiRet; qqueue_t *pThis; @@ -1384,9 +1169,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread ASSERT(pConsumer != NULL); ASSERT(iWorkerThreads >= 0); - if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } + CHKmalloc(pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))); /* we have an object, so let's fill the properties */ objConstructSetObjInfo(pThis); @@ -1397,7 +1180,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */ pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */ - pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir); + pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; pThis->nLogDeq = 0; @@ -1418,7 +1201,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddFixedArray; pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; - pThis->qUnDeqAll = qUnDeqAllFixedArray; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; @@ -1426,7 +1208,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddLinkedList; pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; - pThis->qUnDeqAll = qUnDeqAllLinkedList; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1434,7 +1215,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; - pThis->qUnDeqAll = qUnDeqAllDisk; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ break; @@ -1443,7 +1223,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qDestruct = qDestructDirect; pThis->qAdd = qAddDirect; pThis->qDel = qDelDirect; - pThis->qUnDeqAll = qUnDeqAllDirect; break; } @@ -1456,7 +1235,7 @@ finalize_it: /* This function checks if the provided message shall be discarded and does so, if needed. * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to * provide real-time creation of spool files. - * Note: cached copies of iQueueSize and bRunsDA are provided so that no mutex locks are required. + * Note: cached copies of iQueueSize is provided so that no mutex locks are required. * The caller must have obtained them while the mutex was locked. Of course, these values may no * longer be current, but that is OK for the discard check. At worst, the message is either processed * or discarded when it should not have been. As discarding is in itself somewhat racy and erratic, @@ -1466,7 +1245,7 @@ finalize_it: * the return state! * rgerhards, 2008-01-24 */ -static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr) { DEFiRet; rsRetVal iRetLocal; @@ -1475,7 +1254,7 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, voi ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_assert(pUsr); - if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) { + if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", @@ -1519,11 +1298,7 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue /* remove messages from the physical queue store that are fully processed. This is - * controlled via the to-delete list. We can only delete those elements, that are - * at the current physical tail of the queue. If the batch is from another position, - * we schedule it for deletion, but actual deletion will happen at a later call - * of this function here. We always delete as much as possible, which includes - * picking up things from the to-delete list. + * controlled via the to-delete list. */ static inline rsRetVal DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) @@ -1537,7 +1312,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) pTdl = tdlPeek(pThis); /* get current head element */ if(pTdl == NULL) { /* to-delete list empty */ - DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq); + DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; pTdl = tdlPeek(pThis); @@ -1547,10 +1322,12 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) ++deqIDDel; pTdl = tdlPeek(pThis); } + /* old entries deleted, now delete current ones... */ + DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else { /* can not delete, insert into to-delete list */ dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); - CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq)); + CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); } finalize_it: @@ -1559,7 +1336,8 @@ finalize_it: /* Delete a batch of processed user objects from the queue, which includes - * destructing the objects themself. + * destructing the objects themself. Any entries not marked as finally + * processed are enqueued again. The new enqueue is necessary because we have a * rgerhards, 2009-05-13 */ static inline rsRetVal @@ -1567,6 +1345,8 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; void *pUsr; + int nEnqueued = 0; + rsRetVal localRet; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); @@ -1574,9 +1354,23 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) for(i = 0 ; i < pBatch->nElem ; ++i) { pUsr = pBatch->pElem[i].pUsrp; + if( pBatch->pElem[i].state == BATCH_STATE_RDY + || pBatch->pElem[i].state == BATCH_STATE_SUB) { + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, + (obj_t*)MsgAddRef((msg_t*) pUsr)); + ++nEnqueued; + if(localRet != RS_RET_OK) { + DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); + } + } objDestruct(pUsr); } + dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + + if(nEnqueued > 0) + qqueueChkPersist(pThis, nEnqueued); + iRet = DeleteBatchFromQStore(pThis, pBatch); pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ @@ -1586,7 +1380,11 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) /* dequeue as many user pointers as are available, until we hit the configured - * upper limit of pointers. + * upper limit of pointers. Note that this function also deletes all processed + * objects from the previous batch. However, it is perfectly valid that the + * previous batch contained NO objects at all. For example, this happens + * immediately after system startup or when a queue was exhausted and the queue + * worker needed to wait for new data. * This must only be called when the queue mutex is LOOKED, otherwise serious * malfunction will happen. */ @@ -1606,11 +1404,10 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { -dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); /* check if we should discard this element */ - localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr); + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr); if(localRet == RS_RET_QUEUE_FULL) { ++nDiscarded; continue; @@ -1731,7 +1528,7 @@ RateLimiter(qqueue_t *pThis) iDelay = 0; if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */ /* time calls are expensive, so only do them when needed */ - time(&tCurr); + datetime.GetTime(&tCurr); localtime_r(&tCurr, &m); iHrCurr = m.tm_hour; @@ -1775,7 +1572,8 @@ RateLimiter(qqueue_t *pThis) } -/* This dequeues the next batch. +/* This dequeues the next batch. Note that this function must not be + * cancelled, else it will leave back an inconsistent state. * rgerhards, 2009-05-20 */ static inline rsRetVal @@ -1786,7 +1584,6 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("YYY: deqeueu for consumer"); CHKiRet(DequeueConsumable(pThis, pWti)); if(pWti->batch.nElem == 0) @@ -1809,7 +1606,6 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq); DeleteProcessedBatch(pThis, &pWti->batch); qqueueChkPersist(pThis, pWti->batch.nElemDeq); @@ -1825,6 +1621,7 @@ dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq); static rsRetVal ConsumerReg(qqueue_t *pThis, wti_t *pWti) { + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); @@ -1835,7 +1632,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); - CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); + /* at this spot, we may be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); + + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1847,11 +1647,15 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000); } + /* but now cancellation is no longer permitted */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + /* now we are done, but need to re-aquire the mutex */ d_pthread_mutex_lock(pThis->mut); finalize_it: -dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + dbgprintf("regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; } @@ -1869,6 +1673,7 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti) { int i; + int iCancelStateSave; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); @@ -1879,15 +1684,24 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) /* we now have a non-idle batch of work, so we can release the queue mutex and process it */ d_pthread_mutex_unlock(pThis->mut); + /* at this spot, we may be cancelled */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); + /* iterate over returned results and enqueue them in DA queue */ - for(i = 0 ; i < pWti->batch.nElem ; i++) { + for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); +dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, + (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } + /* but now cancellation is no longer permitted */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + /* now we are done, but need to re-aquire the mutex */ d_pthread_mutex_lock(pThis->mut); @@ -1899,12 +1713,6 @@ finalize_it: /* must only be called when the queue mutex is locked, else results * are not stable! - * If we are a child, we have done our duty when the queue is empty. In that case, - * we can terminate. - * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from - * the DA queue. - * If our queue is in destruction, we drain to the DA queue and so we shall not terminate - * until we have done so. */ static rsRetVal qqueueChkStopWrkrDA(qqueue_t *pThis) @@ -1913,25 +1721,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; - } else { - if(pThis->bRunsDA) { - ASSERT(pThis->pqDA != NULL); - if( pThis->pqDA->bEnqOnly - && pThis->pqDA->sizeOnDiskMax > 0 - && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { - /* this queue can never grow, so we can give up... */ - iRet = RS_RET_TERMINATE_NOW; - } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { -dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); - iRet = RS_RET_TERMINATE_NOW; -RUNLOG_STR("XXX: re-start reg worker"); -qqueueAdviseMaxWorkers(pThis); -RUNLOG_STR("XXX: done re-start reg worker"); - } - } else { - // experimental iRet = RS_RET_TERMINATE_NOW; - ; - } } RETiRet; @@ -1941,9 +1730,7 @@ RUNLOG_STR("XXX: done re-start reg worker"); /* must only be called when the queue mutex is locked, else results * are not stable! * If we are a child, we have done our duty when the queue is empty. In that case, - * we can terminate. - * Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from - * the DA queue + * we can terminate. Version for the regular worker thread. */ static rsRetVal ChkStopWrkrReg(qqueue_t *pThis) @@ -1968,60 +1755,12 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal) DEFiRet; assert(pVal != NULL); *pVal = pThis->iDeqBatchSize; -if(pThis->pqParent != NULL) +if(pThis->pqParent != NULL) // TODO: check why we actually do this! *pVal = 16; RETiRet; } -/* must only be called when the queue mutex is locked, else results - * are not stable! DA worker version (pThis *is* the *main* queue, not DA!) - */ -static int -qqueueIsIdleDA(qqueue_t *pThis) -{ - return(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk); -} -/* must only be called when the queue mutex is locked, else results - * are not stable! Regular worker version. - */ -static int -IsIdleReg(qqueue_t *pThis) -{ - return(getPhysicalQueueSize(pThis) == 0); -} - - -/* This function is called when a worker thread for the regular queue is shut down. - * If we are the primary queue, this is not really interesting to us. If, however, - * we are the DA (child) queue, that means the DA queue is empty. In that case, we - * need to signal the parent queue's DA worker, so that it can terminate DA mode. - * rgerhards, 2008-01-26 - * rgerhards, 2008-02-27: HOWEVER, in a shutdown condition, it may be that the parent's worker thread pool - * has already been terminated and destructed. This *is* a legal condition and happens - * from time to time in practice. So we need to signal only if there still is a - * parent DA worker queue. Please keep in mind that the the parent's DA worker - * pool is DIFFERENT from our (DA queue) regular worker pool. So when the parent's - * pWtpDA is destructed, there can still be some of our (DAq/wtp) threads be running. - * I am telling this, because I, too, always get confused by those... - */ -static rsRetVal -RegOnWrkrShutdown(qqueue_t *pThis) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - - if(pThis->pqParent != NULL) { - if(pThis->pqParent->pWtpDA != NULL) { /* see comment in function header from 2008-02-27 */ - wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */ - } - } - - RETiRet; -} - - /* start up the queue - it must have been constructed and parameters defined * before. */ @@ -2029,8 +1768,6 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; - rsRetVal iRetLocal; - int bInitialized = 0; /* is queue already initialized? */ uchar pszBuf[64]; size_t lenBuf; @@ -2044,7 +1781,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ * influenced by properties which might have been set after queueConstruct () */ if(pThis->pqParent == NULL) { - pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t)); + pThis->mut = (pthread_mutex_t *) MALLOC (sizeof (pthread_mutex_t)); pthread_mutex_init(pThis->mut, NULL); } else { /* child queue, we need to use parent's mutex */ @@ -2072,8 +1809,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ - /* create worker thread pools for regular operation. The DA pool is created on an as-needed - * basis, which potentially means never under most circumstances. + /* create worker thread pools for regular and DA operation. */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); @@ -2081,10 +1817,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); @@ -2092,27 +1826,11 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis)); CHKiRet(wtpConstructFinalize (pThis->pWtpReg)); - /* initialize worker thread instances */ - if(pThis->bIsDA) { - /* If we are disk-assisted, we need to check if there is a QIF file - * which we need to load. -- rgerhards, 2008-01-15 - */ - iRetLocal = qqueueHaveQIF(pThis); - if(iRetLocal == RS_RET_OK) { - DBGOPRINT((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); - InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ - bInitialized = 1; /* we are done */ - } else { - /* TODO: use logerror? -- rgerhards, 2008-01-16 */ - DBGOPRINT((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. " - "Some data may be lost\n", iRetLocal); - } - } + /* set up DA system if we have a disk-assisted queue */ + if(pThis->bIsDA) + InitDA(pThis, LOCK_MUTEX); /* initiate DA mode */ - if(Debug && !bInitialized) { - DBGOPRINT((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA " - "queue itself!)\n"); - } + DBGOPRINT((obj_t*) pThis, "queue finished initialization\n"); /* if the queue already contains data, we need to start the correct number of worker threads. This can be * the case when a disk queue has been loaded. If we did not start it here, it would never start. @@ -2165,7 +1883,8 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) pThis->bNeedDelQIF = 0; } /* indicate spool file needs to be deleted */ - CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); + if(pThis->tVars.disk.pReadDel != NULL) /* may be NULL if we had a startup failure! */ + CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); FINALIZE; /* nothing left to do, so be happy */ } @@ -2255,10 +1974,16 @@ DoSaveOnShutdown(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); - InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ -dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); - /* make sure we do not timeout before we are done */ - DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); + /* we reduce the low water mark, otherwise the DA worker would terminate when + * it is reached. + */ + DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown set, restarting DA worker...\n"); + pThis->bShutdownImmediate = 0; /* would termiante the DA worker! */ + pThis->iLowWtrMrk = 0; + wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN); /* shutdown worker (only) when done (was _IMMEDIATE!) */ + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* restart DA worker */ + + DBGOPRINT((obj_t*) pThis, "waiting for DA worker to terminate...\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); @@ -2276,8 +2001,6 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g /* destructor for the queue object */ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(qqueue) - pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* shut down all workers * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) @@ -2286,13 +2009,6 @@ CODESTARTobjDestruct(qqueue) if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL) ShutdownWorkers(pThis); - /* now all workers are terminated. Messages may exist. Also, some logically dequeued - * messages may never have been processed because their worker was terminated. So - * we need to reset the logical dequeue pointer, persist the queue if configured to do - * so and then destruct everything. -- rgerhards, 2009-05-26 - */ - CHKiRet(pThis->qUnDeqAll(pThis)); - if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); } @@ -2371,7 +2087,7 @@ qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) if(pszPrefix == NULL) /* just unset the prefix! */ ABORT_FINALIZE(RS_RET_OK); - if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL) + if((pThis->pszFilePrefix = MALLOC(sizeof(uchar) * iLenPrefix + 1)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1); pThis->lenFilePrefix = iLenPrefix; @@ -2413,12 +2129,8 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ - CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); - /* then check if we need to add an assistance disk queue */ - if(pThis->bIsDA) - CHKiRet(ChkStrtDA(pThis)); - /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. * Basic flow control has always been implemented and protects the queue structures @@ -2462,6 +2174,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); timeoutComp(&t, pThis->toEnq); +// TODO : handle enqOnly => discard! if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); objDestruct(pUsr); @@ -2500,7 +2213,6 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) } for(i = 0 ; i < pMultiSub->nElem ; ++i) { -dbgprintf("queueMultiEnq: %d\n", i); CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); } @@ -2544,7 +2256,6 @@ finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ qqueueAdviseMaxWorkers(pThis); -dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); /* and release the mutex */ d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -2555,58 +2266,6 @@ dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut); } -/* set queue mode to enqueue only or not - * There is one subtle issue: this method may be called during queue - * construction or while it is running. In the former case, the queue - * mutex does not yet exist (it is NULL), while in the later case it - * must be locked. The function detects the state and operates as - * required. - * rgerhards, 2008-01-16 - */ -static rsRetVal -SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - - ISOBJ_TYPE_assert(pThis, qqueue); - - /* for simplicity, we do one big mutex lock. This method is extremely seldom - * called, so that doesn't matter... -- rgerhards, 2008-01-16 - */ - if(pThis->mut != NULL) { - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); - } - - if(bEnqOnly == pThis->bEnqOnly) - FINALIZE; /* no change, nothing to do */ - - if(pThis->bQueueStarted) { - /* we need to adjust queue operation only if we are not during initial param setup */ - if(bEnqOnly == 1) { - /* switch to enqueue-only mode */ - /* this means we need to terminate all workers - that's it... */ - DBGOPRINT((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); - if(pThis->pWtpReg != NULL) - wtpWakeupAllWrkr(pThis->pWtpReg); - if(pThis->pWtpDA != NULL) - wtpWakeupAllWrkr(pThis->pWtpDA); - } else { - /* switch back to regular mode */ - ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ - } - } - - pThis->bEnqOnly = bEnqOnly; - -finalize_it: - if(pThis->mut != NULL) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - } - RETiRet; -} - - /* some simple object access methods */ DEFpropSetMeth(qqueue, bSyncQueueFiles, int) DEFpropSetMeth(qqueue, iPersistUpdCnt, int) @@ -2670,6 +2329,8 @@ BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE) /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(strm, CORE_COMPONENT)); + CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); /* now set our own handlers */ OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty); |