summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-11-13 10:00:23 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2009-11-13 10:00:23 +0100
commit2a3c25a5c68db66ec35c155f7ecbe65079aed0e0 (patch)
treeb899a3cd256d60f3415753aa925c66fc0504373e /runtime/queue.c
parentadb880f17bee807df6058ab3772eed3d35bd8d37 (diff)
parent8b246de2a587454f9260ff91192d27a2e168ea2d (diff)
downloadrsyslog-2a3c25a5c68db66ec35c155f7ecbe65079aed0e0.tar.gz
rsyslog-2a3c25a5c68db66ec35c155f7ecbe65079aed0e0.tar.bz2
rsyslog-2a3c25a5c68db66ec35c155f7ecbe65079aed0e0.zip
Begin new beta branch
Merge branch 'master' into beta Conflicts: ChangeLog tests/Makefile.am
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c705
1 files changed, 183 insertions, 522 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 101052a1..b29ec7ac 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -55,6 +55,9 @@
#include "wti.h"
#include "msg.h"
#include "atomic.h"
+#include "errmsg.h"
+#include "datetime.h"
+#include "unicode-helper.h"
#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */
#ifdef OS_SOLARIS
@@ -65,14 +68,15 @@
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
DEFobjCurrIf(strm)
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(datetime)
/* forward-definitions */
+static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr);
static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
-static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
-static int qqueueIsIdleDA(qqueue_t *pThis);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
@@ -143,7 +147,7 @@ static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElemDeq)
ISOBJ_TYPE_assert(pQueue, qqueue);
assert(pQueue->toDeleteLst != NULL);
- CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t)));
+ CHKmalloc(pNew = MALLOC(sizeof(toDeleteLst_t)));
pNew->deqID = deqID;
pNew->nElemDeq = nElemDeq;
@@ -228,7 +232,8 @@ static inline void queueDrain(qqueue_t *pThis)
* this point in time. The mutex must be locked when
* ths function is called. -- rgerhards, 2008-01-25
*/
-static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
+static inline rsRetVal
+qqueueAdviseMaxWorkers(qqueue_t *pThis)
{
DEFiRet;
int iMaxWorkers;
@@ -236,48 +241,18 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
if(!pThis->bEnqOnly) {
- if(pThis->bRunsDA) {
- /* if we have not yet reached the high water mark, there is no need to start a
- * worker. -- rgerhards, 2008-01-26
- */
- if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) {
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
- }
- }
- /* regular workers always run */
- if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
+ if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
} else {
- iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ if(getLogicalQueueSize(pThis) == 0) {
+ iMaxWorkers = 0;
+ } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
+ } else {
+ iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
+ }
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
}
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
- }
-
- RETiRet;
-}
-
-
-/* Destruct DA queue. This is the last part of DA-to-normal-mode
- * transistion. This is called asynchronously and some time quite a
- * while after the actual transistion. The key point is that we need to
- * do it at some later time, because we need to destruct the DA queue. That,
- * however, can not be done in a thread that has been signalled
- * This is to be called when we revert back to our own queue.
- * This function must be called with the queue mutex locked (the wti
- * class ensures this).
- * rgerhards, 2008-01-15
- */
-static rsRetVal
-TurnOffDAMode(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(pThis->bRunsDA);
- if(getLogicalQueueSize(pThis->pqDA) == 0) {
- pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */
- DBGOPRINT((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n",
- iRet);
}
RETiRet;
@@ -308,15 +283,7 @@ qqueueChkIsDA(qqueue_t *pThis)
}
-/* Start disk-assisted queue mode. All internal settings are changed. This is supposed
- * to be called from the DA worker, which must have been started before. The most important
- * chore of this function is to create the DA queue object. If that function fails,
- * the DA worker should return with an appropriate state, which in turn should lead to
- * a re-set to non-DA mode in the Enq process. The queue mutex must be locked when this
- * function is called, else a number of races will happen.
- * Please note that this function may be called *while* we in DA mode. This is due to the
- * fact that the DA worker calls it and the DA worker may be suspended (and restarted) due
- * to inactivity timeouts.
+/* Start disk-assisted queue mode.
* rgerhards, 2008-01-15
*/
static rsRetVal
@@ -348,33 +315,22 @@ StartDA(qqueue_t *pThis)
CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq));
- CHKiRet(SetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED));
CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr));
CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr));
+ CHKiRet(qqueueSettoQShutdown(pThis->pqDA, pThis->toQShutdown));
CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0));
CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0));
- // experimental: XXX
- CHKiRet(qqueueSettoWrkShutdown(pThis->pqDA, 0));
-
- if(pThis->toQShutdown == 0) {
- CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */
- } else {
- /* we use the shortest possible shutdown (0 is endless!) because when we run on disk AND
- * have an obviously large backlog, we can't finish it in any case. So there is no point
- * in holding shutdown longer than necessary. -- rgerhards, 2008-01-15
- */
- CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 1));
- }
-
iRet = qqueueStart(pThis->pqDA);
/* file not found is expected, that means it is no previous QIF available */
- if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND)
+ if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) {
+ errno = 0; /* else an errno is shown in errmsg! */
+ errmsg.LogError(errno, iRet, "error starting up disk queue, using pure in-memory mode");
+ pThis->bIsDA = 0; /* disable memory mode */
FINALIZE; /* something is wrong */
+ }
- //pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */
-
- DBGOPRINT((obj_t*) pThis, "is now running in disk assisted mode, disk queue 0x%lx\n",
+ DBGOPRINT((obj_t*) pThis, "DA queue initialized, disk queue 0x%lx\n",
qqueueGetID(pThis->pqDA));
finalize_it:
@@ -397,7 +353,7 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
+InitDA(qqueue_t *pThis, int bLockMutex)
{
DEFiRet;
DEFVARS_mutexProtection;
@@ -412,91 +368,32 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
* rgerhards, 2008-01-24
* NOTE: this is the DA worker *pool*, not the DA queue!
*/
- if(pThis->pWtpDA == NULL) {
- lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis));
- CHKiRet(wtpConstruct (&pThis->pWtpDA));
- CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
- CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
- CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode));
- CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
- CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
- CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
- CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
- CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
- CHKiRet(wtpConstructFinalize (pThis->pWtpDA));
- }
+ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis));
+ CHKiRet(wtpConstruct (&pThis->pWtpDA));
+ CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
+ CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
+ CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
+ CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
+ CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
+ CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
+ CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
+ CHKiRet(wtpConstructFinalize (pThis->pWtpDA));
/* if we reach this point, we have a "good" DA worker pool */
- /* indicate we now run in DA mode - this is reset by the DA worker if it fails */
- pThis->bDAEnqOnly = bEnqOnly;
-
/* now construct the actual queue (if it does not already exist) */
if(pThis->pqDA == NULL) {
CHKiRet(StartDA(pThis));
}
- pThis->bRunsDA = 1;
-
- /* now we must now adivse the wtp that we need one worker. If none is yet active,
- * that will also start one up. If we forgot that step, everything would be stalled
- * until the next enqueue request.
- */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues always have just one worker max */
-
finalize_it:
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
RETiRet;
}
-/* check if we need to start disk assisted mode and send some signals to
- * keep it running if we are already in it. It also checks if DA mode is
- * partially initialized, in which case it waits for initialization to
- * complete.
- * rgerhards, 2008-01-14
- */
-static rsRetVal
-ChkStrtDA(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- /* if we do not hit the high water mark, we have nothing to do */
- if(getPhysicalQueueSize(pThis) != pThis->iHighWtrMrk)
- ABORT_FINALIZE(RS_RET_OK);
-
- if(pThis->bRunsDA) {
- /* then we need to signal that we are at the high water mark again. If that happens
- * on our way down the queue, that doesn't matter, because then nobody is waiting
- * on the condition variable.
- * (Remember that a DA queue stops draining the queue once it has reached the low
- * water mark and restarts it when the high water mark is reached again - this is
- * what this code here is responsible for. Please note that all workers may have been
- * terminated due to the inactivity timeout, thus we need to advise the pool that
- * we need at least one).
- */
- DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n",
- getPhysicalQueueSize(pThis));
- qqueueAdviseMaxWorkers(pThis);
- } else {
- /* this is the case when we are currently not running in DA mode. So it is time
- * to turn it back on.
- */
- DBGOPRINT((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n",
- getPhysicalQueueSize(pThis));
- InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */
- }
-
-finalize_it:
- RETiRet;
-}
-
-
/* --------------- end code for disk-assisted queue modes -------------------- */
@@ -516,7 +413,7 @@ static rsRetVal qConstructFixedArray(qqueue_t *pThis)
if(pThis->iMaxQueueSize == 0)
ABORT_FINALIZE(RS_RET_QSIZE_ZERO);
- if((pThis->tVars.farray.pBuf = malloc(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) {
+ if((pThis->tVars.farray.pBuf = MALLOC(sizeof(void *) * pThis->iMaxQueueSize)) == NULL) {
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
@@ -587,26 +484,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis)
}
-/* reset the logical dequeue pointer to the physical dequeue position.
- * This is only needed after we cancelled workers (during queue shutdown).
- */
-static rsRetVal
-qUnDeqAllFixedArray(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- DBGOPRINT((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n",
- pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq);
-
- pThis->tVars.farray.deqhead = pThis->tVars.farray.head;
- pThis->nLogDeq = 0;
-
- RETiRet;
-}
-
-
/* -------------------- linked list -------------------- */
@@ -644,7 +521,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
qLinkedList_t *pEntry;
DEFiRet;
- CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))));
+ CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
pEntry->pUsr = pUsr;
@@ -698,26 +575,6 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis)
}
-/* reset the logical dequeue pointer to the physical dequeue position.
- * This is only needed after we cancelled workers (during queue shutdown).
- */
-static rsRetVal
-qUnDeqAllLinkedList(qqueue_t *pThis)
-{
- DEFiRet;
-
- ASSERT(pThis != NULL);
-
- DBGOPRINT((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n",
- pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq);
-
- pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot;
- pThis->nLogDeq = 0;
-
- RETiRet;
-}
-
-
/* -------------------- disk -------------------- */
@@ -733,44 +590,6 @@ finalize_it:
}
-/* This method checks if we have a QIF file for the current queue (no matter of
- * queue mode). Returns RS_RET_OK if we have a QIF file or an error status otherwise.
- * rgerhards, 2008-01-15
- */
-static rsRetVal
-qqueueHaveQIF(qqueue_t *pThis)
-{
- DEFiRet;
- uchar pszQIFNam[MAXFNAME];
- size_t lenQIFNam;
- struct stat stat_buf;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- if(pThis->pszFilePrefix == NULL)
- ABORT_FINALIZE(RS_RET_NO_FILEPREFIX);
-
- /* Construct file name */
- lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
- (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
-
- /* check if the file exists */
- if(stat((char*) pszQIFNam, &stat_buf) == -1) {
- if(errno == ENOENT) {
- DBGOPRINT((obj_t*) pThis, "no .qi file found\n");
- ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
- } else {
- DBGOPRINT((obj_t*) pThis, "error %d trying to access .qi file\n", errno);
- ABORT_FINALIZE(RS_RET_IO_ERROR);
- }
- }
- /* If we reach this point, we have a .qi file */
-
-finalize_it:
- RETiRet;
-}
-
-
/* The method loads the persistent queue information.
* rgerhards, 2008-01-11
*/
@@ -920,9 +739,12 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
ASSERT(pThis != NULL);
- strm.Destruct(&pThis->tVars.disk.pWrite);
- strm.Destruct(&pThis->tVars.disk.pReadDeq);
- strm.Destruct(&pThis->tVars.disk.pReadDel);
+ if(pThis->tVars.disk.pWrite != NULL)
+ strm.Destruct(&pThis->tVars.disk.pWrite);
+ if(pThis->tVars.disk.pReadDeq != NULL)
+ strm.Destruct(&pThis->tVars.disk.pReadDeq);
+ if(pThis->tVars.disk.pReadDel != NULL)
+ strm.Destruct(&pThis->tVars.disk.pReadDel);
RETiRet;
}
@@ -999,16 +821,6 @@ finalize_it:
}
-/* This is a dummy function for disks - we do not need to reset anything
- * because everything is already persisted...
- */
-static rsRetVal
-qUnDeqAllDisk(__attribute__((unused)) qqueue_t *pThis)
-{
- return RS_RET_OK;
-}
-
-
/* -------------------- direct (no queueing) -------------------- */
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -1041,7 +853,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
batchObj.pUsrp = (obj_t*) pUsr;
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
- iRet = pThis->pConsumer(pThis->pUsr, &singleBatch);
+ iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate);
objDestruct(pUsr);
RETiRet;
@@ -1053,12 +865,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal
-qUnDeqAllDirect(__attribute__((unused)) qqueue_t *pThis)
-{
- return RS_RET_OK;
-}
-
/* --------------- end type-specific handlers -------------------- */
@@ -1112,15 +918,17 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr)
}
-/* Try to terminate queue worker threads within the regular shutdown interval.
- * Both the regular and DA queue (if it exists) is waited for, but on the same timeout.
- * After this function returns, the workers must either be finished or some force
- * to finish them must be applied.
- * This function also instructs the DA worker pool (if it exists) to terminate. This is done
- * in preparation of final queue shutdown.
- * rgerhards, 2009-05-27
+/* Try to shut down regular and DA queue workers, within the queue timeout
+ * period. That means processing continues as usual. This is the expected
+ * usual case, where during shutdown those messages remaining are being
+ * processed. At this point, it is acceptable that the queue can not be
+ * fully depleted, that case is handled in the next step. During this phase,
+ * we first shut down the main queue DA worker to prevent new data to arrive
+ * at the DA queue, and then we ask the regular workers of both the Regular
+ * and DA queue to try complete processing.
+ * rgerhards, 2009-10-14
*/
-static rsRetVal
+static inline rsRetVal
tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
{
struct timespec tTimeout;
@@ -1130,30 +938,26 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
- d_pthread_mutex_lock(pThis->mut); /* some workers may be running in parallel! */
- if(getPhysicalQueueSize(pThis) > 0) {
- if(pThis->bRunsDA) {
- /* We may have waited on the low water mark. As it may have changed, we
- * see if we reactivate the worker.
- */
- wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
- }
+ if(pThis->bIsDA) {
+ /* We need to lock the mutex, as otherwise we may have a race that prevents
+ * us from awaking the DA worker. */
+ d_pthread_mutex_lock(pThis->mut);
+
+ /* tell regular queue DA worker to stop shuffling messages to DA queue... */
+ pThis->pqDA->bEnqOnly = 1;
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE);
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
+ DBGOPRINT((obj_t*) pThis, "awoke DA worker, told it to shut down.\n");
+
+ /* also tell the DA queue worker to shut down, so that it already knows... */
+ wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN);
+ wtpAdviseMaxWorkers(pThis->pqDA->pWtpReg, 1); /* awake its lone worker */
+ DBGOPRINT((obj_t*) pThis, "awoke DA queue regular worker, told it to shut down when done.\n");
+
+ d_pthread_mutex_unlock(pThis->mut);
}
- d_pthread_mutex_unlock(pThis->mut);
- /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found
- * out there are no active workers - that doesn't matter: the wtp knows about that and so will
- * return immediately.
- * We do not yet care about the DA worker - that will be handled down later in the process.
- * Note that we must not request shutdown right now - that may introduce a race: if the regular queue
- * still runs DA assisted and the DA worker gets scheduled first, it will terminate itself (if the DA
- * queue happens to be empty at that instant). Then the regular worker enqueues messages, what will lead
- * to a restart of the worker. Of course, everything will continue to run, but in a bit sub-optimal way
- * (from a performance point of view). So we don't do anything right now. The DA queue will continue to
- * process messages and shutdown itself in any case if there is nothing to do. So we don't loose anything
- * by not requesting shutdown now.
- * rgerhards, 2008-01-25
- */
+
/* first calculate absolute timeout - we need the absolute value here, because we need to coordinate
* shutdown of both the regular and DA queue on *the same* timeout.
*/
@@ -1180,16 +984,6 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
} else {
DBGOPRINT((obj_t*) pThis, "DA queue worker shut down.\n");
}
- /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting
- * the queue, it is restarted at a later stage. We don't care here if a timeout happens.
- */
- DBGOPRINT((obj_t*) pThis, "trying shutdown of main queue DA worker pool\n");
- iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
- if(iRetLocal == RS_RET_TIMED_OUT) {
- DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n");
- } else {
- DBGOPRINT((obj_t*) pThis, "main queue DA worker pool shut down on first try.\n");
- }
}
RETiRet;
@@ -1197,11 +991,12 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
/* Try to shut down regular and DA queue workers, within the action timeout
- * period. Note that the main queue DA worker is still unaffected (and may shuffle
- * data to the disk queue while we terminate the other workers). Not finishing
- * processing all messages is now OK (but they may be preserved later, depending
- * on bSaveOnShutdown setting).
- * rgerhards, 2009-05-27
+ * period. This aborts processing, but at the end of the current action, in
+ * a well-defined manner. During this phase, we terminate all three worker
+ * pools, including the regular queue DA worker if it not yet has terminated.
+ * Not finishing processing all messages is OK (and expected) at this stage
+ * (they may be preserved later, depending * on bSaveOnShutdown setting).
+ * rgerhards, 2009-10-14
*/
static rsRetVal
tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
@@ -1210,20 +1005,20 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
rsRetVal iRetLocal;
DEFiRet;
+RUNLOG_STR("trying to shutdown workers within Action Timeout");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
- /* note that we modify bEnqOnly directly, because going through the method would
- * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28
- */
pThis->bEnqOnly = 1;
- /* need to set this so that the DA queue begins shutdown in parallel! */
- if(pThis->pqDA != NULL) {
+ pThis->bShutdownImmediate = 1;
+ /* now DA queue */
+ if(pThis->bIsDA) {
pThis->pqDA->bEnqOnly = 1;
- wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE);
+ pThis->pqDA->bShutdownImmediate = 1;
}
+// TODO: make sure we have at minimum a 10ms timeout - workers deserve a chance...
/* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */
timeoutComp(&tTimeout, pThis->toActShutdown);
DBGOPRINT((obj_t*) pThis, "trying immediate shutdown of regular workers (if any)\n");
@@ -1247,17 +1042,14 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA "
"queue in disk save mode. Continuing, but results are unpredictable\n", iRetLocal);
}
- /* and now we need to check the DA worker itself (the one that shuffles data to the disk). This
- * is necessary because we may be in a situation where the DA queue regular worker and the
- * main queue worker stopped rather quickly. In this case, there is almost no time (and
- * probably no thread switch!) between the point where we instructed the main queue DA
- * worker to shutdown and this code location. In consequence, it may not even have
- * noticed that it should should down, less acutally done this. So we provide it with a
- * fixed 100ms timeout to try complete its work, what usually should be sufficient.
- * rgerhards, 2009-10-06
+
+ /* and now we need to terminate the DA worker itself. We always grant it a 100ms timeout,
+ * which should be sufficient and usually not be required (it is expected to have finished
+ * long before while we were processing the queue timeout in shutdown phase 1).
+ * rgerhards, 2009-10-14
*/
timeoutComp(&tTimeout, 100);
- DBGOPRINT((obj_t*) pThis, "last try for regular shutdown of main queue DA worker pool\n");
+ DBGOPRINT((obj_t*) pThis, "trying regular shutdown of main queue DA worker pool\n");
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
if(iRetLocal == RS_RET_TIMED_OUT) {
DBGOPRINT((obj_t*) pThis, "shutdown timed out on main queue DA worker pool "
@@ -1272,7 +1064,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
/* This function cancels all remaining regular workers for both the main and the DA
- * queue. The main queue's DA worker pool continues to run (if it exists and is active).
+ * queue.
* rgerhards, 2009-05-29
*/
static rsRetVal
@@ -1306,7 +1098,7 @@ cancelWorkers(qqueue_t *pThis)
* big trouble when resetting the logical dequeue pointer. This operation can only be
* done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
*/
- DBGOPRINT((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n");
+ DBGOPRINT((obj_t*) pThis, "checking to see if main queue DA worker pool needs to be cancelled\n");
iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
}
@@ -1341,13 +1133,6 @@ ShutdownWorkers(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
- /* we reduce the low water mark in any case. This is not absolutely necessary, but
- * it is useful because we enable DA mode at several spots below and so we do not need
- * to think about the low water mark each time.
- */
- pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */
- pThis->iLowWtrMrk = 0;
-
CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
if(getPhysicalQueueSize(pThis) > 0) {
@@ -1375,7 +1160,7 @@ finalize_it:
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1384,9 +1169,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
ASSERT(pConsumer != NULL);
ASSERT(iWorkerThreads >= 0);
- if((pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc(pThis = (qqueue_t *)calloc(1, sizeof(qqueue_t)));
/* we have an object, so let's fill the properties */
objConstructSetObjInfo(pThis);
@@ -1397,7 +1180,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->iFullDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 3; /* default 97% */
pThis->iLightDlyMrk = iMaxQueueSize - (iMaxQueueSize / 100) * 30; /* default 70% */
- pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir);
+ pThis->lenSpoolDir = ustrlen(pThis->pszSpoolDir);
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
pThis->nLogDeq = 0;
@@ -1418,7 +1201,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddFixedArray;
pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
- pThis->qUnDeqAll = qUnDeqAllFixedArray;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
@@ -1426,7 +1208,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddLinkedList;
pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
- pThis->qUnDeqAll = qUnDeqAllLinkedList;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
@@ -1434,7 +1215,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
- pThis->qUnDeqAll = qUnDeqAllDisk;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
break;
@@ -1443,7 +1223,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qDestruct = qDestructDirect;
pThis->qAdd = qAddDirect;
pThis->qDel = qDelDirect;
- pThis->qUnDeqAll = qUnDeqAllDirect;
break;
}
@@ -1456,7 +1235,7 @@ finalize_it:
/* This function checks if the provided message shall be discarded and does so, if needed.
* In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
* provide real-time creation of spool files.
- * Note: cached copies of iQueueSize and bRunsDA are provided so that no mutex locks are required.
+ * Note: cached copies of iQueueSize is provided so that no mutex locks are required.
* The caller must have obtained them while the mutex was locked. Of course, these values may no
* longer be current, but that is OK for the discard check. At worst, the message is either processed
* or discarded when it should not have been. As discarding is in itself somewhat racy and erratic,
@@ -1466,7 +1245,7 @@ finalize_it:
* the return state!
* rgerhards, 2008-01-24
*/
-static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, void *pUsr)
+static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr)
{
DEFiRet;
rsRetVal iRetLocal;
@@ -1475,7 +1254,7 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, int bRunsDA, voi
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_assert(pUsr);
- if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk && bRunsDA == 0) {
+ if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) {
iRetLocal = objGetSeverity(pUsr, &iSeverity);
if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) {
DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n",
@@ -1519,11 +1298,7 @@ dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQue
/* remove messages from the physical queue store that are fully processed. This is
- * controlled via the to-delete list. We can only delete those elements, that are
- * at the current physical tail of the queue. If the batch is from another position,
- * we schedule it for deletion, but actual deletion will happen at a later call
- * of this function here. We always delete as much as possible, which includes
- * picking up things from the to-delete list.
+ * controlled via the to-delete list.
*/
static inline rsRetVal
DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
@@ -1537,7 +1312,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
pTdl = tdlPeek(pThis); /* get current head element */
if(pTdl == NULL) { /* to-delete list empty */
- DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq);
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else if(pBatch->deqID == pThis->deqIDDel) {
deqIDDel = pThis->deqIDDel;
pTdl = tdlPeek(pThis);
@@ -1547,10 +1322,12 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
++deqIDDel;
pTdl = tdlPeek(pThis);
}
+ /* old entries deleted, now delete current ones... */
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
} else {
/* can not delete, insert into to-delete list */
dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID);
- CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq));
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem));
}
finalize_it:
@@ -1559,7 +1336,8 @@ finalize_it:
/* Delete a batch of processed user objects from the queue, which includes
- * destructing the objects themself.
+ * destructing the objects themself. Any entries not marked as finally
+ * processed are enqueued again. The new enqueue is necessary because we have a
* rgerhards, 2009-05-13
*/
static inline rsRetVal
@@ -1567,6 +1345,8 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
{
int i;
void *pUsr;
+ int nEnqueued = 0;
+ rsRetVal localRet;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -1574,9 +1354,23 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
for(i = 0 ; i < pBatch->nElem ; ++i) {
pUsr = pBatch->pElem[i].pUsrp;
+ if( pBatch->pElem[i].state == BATCH_STATE_RDY
+ || pBatch->pElem[i].state == BATCH_STATE_SUB) {
+ localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY,
+ (obj_t*)MsgAddRef((msg_t*) pUsr));
+ ++nEnqueued;
+ if(localRet != RS_RET_OK) {
+ DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet);
+ }
+ }
objDestruct(pUsr);
}
+ dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued);
+
+ if(nEnqueued > 0)
+ qqueueChkPersist(pThis, nEnqueued);
+
iRet = DeleteBatchFromQStore(pThis, pBatch);
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
@@ -1586,7 +1380,11 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
/* dequeue as many user pointers as are available, until we hit the configured
- * upper limit of pointers.
+ * upper limit of pointers. Note that this function also deletes all processed
+ * objects from the previous batch. However, it is perfectly valid that the
+ * previous batch contained NO objects at all. For example, this happens
+ * immediately after system startup or when a queue was exhausted and the queue
+ * worker needed to wait for new data.
* This must only be called when the queue mutex is LOOKED, otherwise serious
* malfunction will happen.
*/
@@ -1606,11 +1404,10 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
nDequeued = nDiscarded = 0;
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
-dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
CHKiRet(qqueueDeq(pThis, &pUsr));
/* check if we should discard this element */
- localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
+ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr);
if(localRet == RS_RET_QUEUE_FULL) {
++nDiscarded;
continue;
@@ -1731,7 +1528,7 @@ RateLimiter(qqueue_t *pThis)
iDelay = 0;
if(pThis->iDeqtWinToHr != 25) { /* 25 means disabled */
/* time calls are expensive, so only do them when needed */
- time(&tCurr);
+ datetime.GetTime(&tCurr);
localtime_r(&tCurr, &m);
iHrCurr = m.tm_hour;
@@ -1775,7 +1572,8 @@ RateLimiter(qqueue_t *pThis)
}
-/* This dequeues the next batch.
+/* This dequeues the next batch. Note that this function must not be
+ * cancelled, else it will leave back an inconsistent state.
* rgerhards, 2009-05-20
*/
static inline rsRetVal
@@ -1786,7 +1584,6 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
-dbgprintf("YYY: deqeueu for consumer");
CHKiRet(DequeueConsumable(pThis, pWti));
if(pWti->batch.nElem == 0)
@@ -1809,7 +1606,6 @@ batchProcessed(qqueue_t *pThis, wti_t *pWti)
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
-dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq);
DeleteProcessedBatch(pThis, &pWti->batch);
qqueueChkPersist(pThis, pWti->batch.nElemDeq);
@@ -1825,6 +1621,7 @@ dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq);
static rsRetVal
ConsumerReg(qqueue_t *pThis, wti_t *pWti)
{
+ int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -1835,7 +1632,10 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
- CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
+ /* at this spot, we may be cancelled */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
+
+ CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -1847,11 +1647,15 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
srSleep(pThis->iDeqSlowdown / 1000000, pThis->iDeqSlowdown % 1000000);
}
+ /* but now cancellation is no longer permitted */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+
/* now we are done, but need to re-aquire the mutex */
d_pthread_mutex_lock(pThis->mut);
finalize_it:
-dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
+ dbgprintf("regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet,
+ getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
RETiRet;
}
@@ -1869,6 +1673,7 @@ static rsRetVal
ConsumerDA(qqueue_t *pThis, wti_t *pWti)
{
int i;
+ int iCancelStateSave;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -1879,15 +1684,24 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
/* we now have a non-idle batch of work, so we can release the queue mutex and process it */
d_pthread_mutex_unlock(pThis->mut);
+ /* at this spot, we may be cancelled */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
+
/* iterate over returned results and enqueue them in DA queue */
- for(i = 0 ; i < pWti->batch.nElem ; i++) {
+ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
* rgerhards, 2009-05-28
*/
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg);
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY,
+ (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
+ pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
}
+ /* but now cancellation is no longer permitted */
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+
/* now we are done, but need to re-aquire the mutex */
d_pthread_mutex_lock(pThis->mut);
@@ -1899,12 +1713,6 @@ finalize_it:
/* must only be called when the queue mutex is locked, else results
* are not stable!
- * If we are a child, we have done our duty when the queue is empty. In that case,
- * we can terminate.
- * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from
- * the DA queue.
- * If our queue is in destruction, we drain to the DA queue and so we shall not terminate
- * until we have done so.
*/
static rsRetVal
qqueueChkStopWrkrDA(qqueue_t *pThis)
@@ -1913,25 +1721,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
if(pThis->bEnqOnly) {
iRet = RS_RET_TERMINATE_WHEN_IDLE;
- } else {
- if(pThis->bRunsDA) {
- ASSERT(pThis->pqDA != NULL);
- if( pThis->pqDA->bEnqOnly
- && pThis->pqDA->sizeOnDiskMax > 0
- && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) {
- /* this queue can never grow, so we can give up... */
- iRet = RS_RET_TERMINATE_NOW;
- } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) {
-dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk);
- iRet = RS_RET_TERMINATE_NOW;
-RUNLOG_STR("XXX: re-start reg worker");
-qqueueAdviseMaxWorkers(pThis);
-RUNLOG_STR("XXX: done re-start reg worker");
- }
- } else {
- // experimental iRet = RS_RET_TERMINATE_NOW;
- ;
- }
}
RETiRet;
@@ -1941,9 +1730,7 @@ RUNLOG_STR("XXX: done re-start reg worker");
/* must only be called when the queue mutex is locked, else results
* are not stable!
* If we are a child, we have done our duty when the queue is empty. In that case,
- * we can terminate.
- * Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from
- * the DA queue
+ * we can terminate. Version for the regular worker thread.
*/
static rsRetVal
ChkStopWrkrReg(qqueue_t *pThis)
@@ -1968,60 +1755,12 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal)
DEFiRet;
assert(pVal != NULL);
*pVal = pThis->iDeqBatchSize;
-if(pThis->pqParent != NULL)
+if(pThis->pqParent != NULL) // TODO: check why we actually do this!
*pVal = 16;
RETiRet;
}
-/* must only be called when the queue mutex is locked, else results
- * are not stable! DA worker version (pThis *is* the *main* queue, not DA!)
- */
-static int
-qqueueIsIdleDA(qqueue_t *pThis)
-{
- return(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk);
-}
-/* must only be called when the queue mutex is locked, else results
- * are not stable! Regular worker version.
- */
-static int
-IsIdleReg(qqueue_t *pThis)
-{
- return(getPhysicalQueueSize(pThis) == 0);
-}
-
-
-/* This function is called when a worker thread for the regular queue is shut down.
- * If we are the primary queue, this is not really interesting to us. If, however,
- * we are the DA (child) queue, that means the DA queue is empty. In that case, we
- * need to signal the parent queue's DA worker, so that it can terminate DA mode.
- * rgerhards, 2008-01-26
- * rgerhards, 2008-02-27: HOWEVER, in a shutdown condition, it may be that the parent's worker thread pool
- * has already been terminated and destructed. This *is* a legal condition and happens
- * from time to time in practice. So we need to signal only if there still is a
- * parent DA worker queue. Please keep in mind that the the parent's DA worker
- * pool is DIFFERENT from our (DA queue) regular worker pool. So when the parent's
- * pWtpDA is destructed, there can still be some of our (DAq/wtp) threads be running.
- * I am telling this, because I, too, always get confused by those...
- */
-static rsRetVal
-RegOnWrkrShutdown(qqueue_t *pThis)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- if(pThis->pqParent != NULL) {
- if(pThis->pqParent->pWtpDA != NULL) { /* see comment in function header from 2008-02-27 */
- wtpAdviseMaxWorkers(pThis->pqParent->pWtpDA, 1); /* reactivate DA worker (always 1) */
- }
- }
-
- RETiRet;
-}
-
-
/* start up the queue - it must have been constructed and parameters defined
* before.
*/
@@ -2029,8 +1768,6 @@ rsRetVal
qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
{
DEFiRet;
- rsRetVal iRetLocal;
- int bInitialized = 0; /* is queue already initialized? */
uchar pszBuf[64];
size_t lenBuf;
@@ -2044,7 +1781,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
* influenced by properties which might have been set after queueConstruct ()
*/
if(pThis->pqParent == NULL) {
- pThis->mut = (pthread_mutex_t *) malloc (sizeof (pthread_mutex_t));
+ pThis->mut = (pthread_mutex_t *) MALLOC (sizeof (pthread_mutex_t));
pthread_mutex_init(pThis->mut, NULL);
} else {
/* child queue, we need to use parent's mutex */
@@ -2072,8 +1809,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
- /* create worker thread pools for regular operation. The DA pool is created on an as-needed
- * basis, which potentially means never under most circumstances.
+ /* create worker thread pools for regular and DA operation.
*/
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
@@ -2081,10 +1817,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg));
CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg));
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -2092,27 +1826,11 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis));
CHKiRet(wtpConstructFinalize (pThis->pWtpReg));
- /* initialize worker thread instances */
- if(pThis->bIsDA) {
- /* If we are disk-assisted, we need to check if there is a QIF file
- * which we need to load. -- rgerhards, 2008-01-15
- */
- iRetLocal = qqueueHaveQIF(pThis);
- if(iRetLocal == RS_RET_OK) {
- DBGOPRINT((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n");
- InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */
- bInitialized = 1; /* we are done */
- } else {
- /* TODO: use logerror? -- rgerhards, 2008-01-16 */
- DBGOPRINT((obj_t*) pThis, "error %d trying to access on-disk queue files, starting without them. "
- "Some data may be lost\n", iRetLocal);
- }
- }
+ /* set up DA system if we have a disk-assisted queue */
+ if(pThis->bIsDA)
+ InitDA(pThis, LOCK_MUTEX); /* initiate DA mode */
- if(Debug && !bInitialized) {
- DBGOPRINT((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA "
- "queue itself!)\n");
- }
+ DBGOPRINT((obj_t*) pThis, "queue finished initialization\n");
/* if the queue already contains data, we need to start the correct number of worker threads. This can be
* the case when a disk queue has been loaded. If we did not start it here, it would never start.
@@ -2165,7 +1883,8 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
pThis->bNeedDelQIF = 0;
}
/* indicate spool file needs to be deleted */
- CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
+ if(pThis->tVars.disk.pReadDel != NULL) /* may be NULL if we had a startup failure! */
+ CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
FINALIZE; /* nothing left to do, so be happy */
}
@@ -2255,10 +1974,16 @@ DoSaveOnShutdown(qqueue_t *pThis)
ISOBJ_TYPE_assert(pThis, qqueue);
- InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
-dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
- /* make sure we do not timeout before we are done */
- DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
+ /* we reduce the low water mark, otherwise the DA worker would terminate when
+ * it is reached.
+ */
+ DBGOPRINT((obj_t*) pThis, "bSaveOnShutdown set, restarting DA worker...\n");
+ pThis->bShutdownImmediate = 0; /* would termiante the DA worker! */
+ pThis->iLowWtrMrk = 0;
+ wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN); /* shutdown worker (only) when done (was _IMMEDIATE!) */
+ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* restart DA worker */
+
+ DBGOPRINT((obj_t*) pThis, "waiting for DA worker to terminate...\n");
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
/* and run the primary queue's DA worker to drain the queue */
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
@@ -2276,8 +2001,6 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g
/* destructor for the queue object */
BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(qqueue)
- pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
-
/* shut down all workers
* We do not need to shutdown workers when we are in enqueue-only mode or we are a
* direct queue - because in both cases we have none... ;)
@@ -2286,13 +2009,6 @@ CODESTARTobjDestruct(qqueue)
if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
ShutdownWorkers(pThis);
- /* now all workers are terminated. Messages may exist. Also, some logically dequeued
- * messages may never have been processed because their worker was terminated. So
- * we need to reset the logical dequeue pointer, persist the queue if configured to do
- * so and then destruct everything. -- rgerhards, 2009-05-26
- */
- CHKiRet(pThis->qUnDeqAll(pThis));
-
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
CHKiRet(DoSaveOnShutdown(pThis));
}
@@ -2371,7 +2087,7 @@ qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
if(pszPrefix == NULL) /* just unset the prefix! */
ABORT_FINALIZE(RS_RET_OK);
- if((pThis->pszFilePrefix = malloc(sizeof(uchar) * iLenPrefix + 1)) == NULL)
+ if((pThis->pszFilePrefix = MALLOC(sizeof(uchar) * iLenPrefix + 1)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszFilePrefix, pszPrefix, iLenPrefix + 1);
pThis->lenFilePrefix = iLenPrefix;
@@ -2413,12 +2129,8 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* first check if we need to discard this message (which will cause CHKiRet() to exit)
*/
- CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr));
+ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr));
- /* then check if we need to add an assistance disk queue */
- if(pThis->bIsDA)
- CHKiRet(ChkStrtDA(pThis));
-
/* handle flow control
* There are two different flow control mechanisms: basic and advanced flow control.
* Basic flow control has always been implemented and protects the queue structures
@@ -2462,6 +2174,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
&& pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n");
timeoutComp(&t, pThis->toEnq);
+// TODO : handle enqOnly => discard!
if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) {
DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n");
objDestruct(pUsr);
@@ -2500,7 +2213,6 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub)
}
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
-dbgprintf("queueMultiEnq: %d\n", i);
CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i]));
}
@@ -2544,7 +2256,6 @@ finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
/* make sure at least one worker is running. */
qqueueAdviseMaxWorkers(pThis);
-dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut);
/* and release the mutex */
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
@@ -2555,58 +2266,6 @@ dbgprintf("YYY: call advise with mutex %p locked \n", pThis->mut);
}
-/* set queue mode to enqueue only or not
- * There is one subtle issue: this method may be called during queue
- * construction or while it is running. In the former case, the queue
- * mutex does not yet exist (it is NULL), while in the later case it
- * must be locked. The function detects the state and operates as
- * required.
- * rgerhards, 2008-01-16
- */
-static rsRetVal
-SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
-
- /* for simplicity, we do one big mutex lock. This method is extremely seldom
- * called, so that doesn't matter... -- rgerhards, 2008-01-16
- */
- if(pThis->mut != NULL) {
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
- }
-
- if(bEnqOnly == pThis->bEnqOnly)
- FINALIZE; /* no change, nothing to do */
-
- if(pThis->bQueueStarted) {
- /* we need to adjust queue operation only if we are not during initial param setup */
- if(bEnqOnly == 1) {
- /* switch to enqueue-only mode */
- /* this means we need to terminate all workers - that's it... */
- DBGOPRINT((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n");
- if(pThis->pWtpReg != NULL)
- wtpWakeupAllWrkr(pThis->pWtpReg);
- if(pThis->pWtpDA != NULL)
- wtpWakeupAllWrkr(pThis->pWtpDA);
- } else {
- /* switch back to regular mode */
- ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */
- }
- }
-
- pThis->bEnqOnly = bEnqOnly;
-
-finalize_it:
- if(pThis->mut != NULL) {
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
- }
- RETiRet;
-}
-
-
/* some simple object access methods */
DEFpropSetMeth(qqueue, bSyncQueueFiles, int)
DEFpropSetMeth(qqueue, iPersistUpdCnt, int)
@@ -2670,6 +2329,8 @@ BEGINObjClassInit(qqueue, 1, OBJ_IS_CORE_MODULE)
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(strm, CORE_COMPONENT));
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
/* now set our own handlers */
OBJSetMethodHandler(objMethod_SETPROPERTY, qqueueSetProperty);