From 83c40e8838a2b64a3fdb02196944302533079d2a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 17 Apr 2009 12:37:53 +0200 Subject: some cleanup --- runtime/queue.c | 39 --------------------------------------- 1 file changed, 39 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index c4a0fad2..e5db866c 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -607,28 +607,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) { DEFiRet; - iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); -#if 0 - qLinkedList_t *pEntry; - - ASSERT(pThis != NULL); - if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } - - pEntry->pNext = NULL; - pEntry->pUsr = pUsr; - - if(pThis->tVars.linklist.pRoot == NULL) { - pThis->tVars.linklist.pRoot = pThis->tVars.linklist.pLast = pEntry; - } else { - pThis->tVars.linklist.pLast->pNext = pEntry; - pThis->tVars.linklist.pLast = pEntry; - } - -finalize_it: -#endif RETiRet; } @@ -636,24 +615,6 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); -#if 0 - qLinkedList_t *pEntry; - - ASSERT(pThis != NULL); - ASSERT(pThis->tVars.linklist.pRoot != NULL); - - pEntry = pThis->tVars.linklist.pRoot; - *ppUsr = pEntry->pUsr; - - if(pThis->tVars.linklist.pRoot == pThis->tVars.linklist.pLast) { - pThis->tVars.linklist.pRoot = NULL; - pThis->tVars.linklist.pLast = NULL; - } else { - pThis->tVars.linklist.pRoot = pEntry->pNext; - } - free(pEntry); - -#endif RETiRet; } -- cgit v1.2.3 From 7667845bd72b6f92eabc975318a4f288a77f2630 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 22 Apr 2009 15:06:45 +0200 Subject: first attempt at dequeueing multiple batches inside the queue ... but this code has serious problems when terminating the queue, also it is far from being optimal. I will commit a series of patches (hopefully) as I am on the path to the final implementation. --- runtime/queue.c | 182 ++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 111 insertions(+), 71 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index e5db866c..c48eb724 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -8,7 +8,11 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static + * function names - this makes it really hard to read and does not provide much + * benefit, at least I (now) think so... + * + * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -63,12 +67,13 @@ DEFobjCurrIf(glbl) /* forward-definitions */ rsRetVal qqueueChkPersist(qqueue_t *pThis); static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); -static rsRetVal qqueueRateLimiter(qqueue_t *pThis); +static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); +static rsRetVal GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); -static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); -static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2); -static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); +static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2); +static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -369,9 +374,11 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); + // MULTIQUEUE: TODO: this should be DA-specific! + CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); @@ -721,7 +728,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) while(iUngottenObjs > 0) { /* fill the queue from disk */ CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL)); - qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); + UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); --iUngottenObjs; /* one less */ } @@ -922,7 +929,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute_ * rgerhards, 2008-01-20 */ static rsRetVal -qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) +UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -1266,6 +1273,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ + pThis->iDeqMaxAtOnce = 8; /* conservative default, should still provide good performance */ pThis->pszFilePrefix = NULL; pThis->qType = qType; @@ -1315,7 +1323,7 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -qqueueConsumerCancelCleanup(void *arg1, void *arg2) +ConsumerCancelCleanup(void *arg1, void *arg2) { DEFiRet; @@ -1327,7 +1335,7 @@ qqueueConsumerCancelCleanup(void *arg1, void *arg2) if(pUsr != NULL) { /* make sure the data element is not lost */ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); - CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX)); + CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX)); } finalize_it: @@ -1376,38 +1384,68 @@ finalize_it: } +/* dequeue as many user points as are available, until we hit the configured + * upper limit of pointers. + * This must only be called when the queue mutex is LOOKED, otherwise serious + * malfunction will happen. + */ +static inline rsRetVal +DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize) +{ + int nDequeued; + int iQueueSize; + void *pUsr; + rsRetVal localRet; + DEFiRet; + + nDequeued = 0; + do { +dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); + CHKiRet(qqueueDel(pThis, &pUsr)); + qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */ + iQueueSize = qqueueGetOverallQueueSize(pThis); + + /* check if we should discard this element */ + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr); + if(localRet == RS_RET_QUEUE_FULL) + continue; + else if(localRet != RS_RET_OK) + ABORT_FINALIZE(localRet); + + /* all well, use this element */ + pWti->paUsrp->pUsrp[nDequeued++] = pUsr; + } while(iQueueSize > 0 && nDequeued < pThis->iDeqMaxAtOnce); + + //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ + pWti->paUsrp->nElem = nDequeued; + *iRemainingQueueSize = iQueueSize; + +finalize_it: + RETiRet; +} + + /* dequeue the queued object for the queue consumers. * rgerhards, 2008-10-21 + * I made a radical change - we now dequeue multiple elements, and store these objects in + * an array of user pointers. We expect that this increases performance. + * rgerhards, 2009-04-22 */ static rsRetVal -qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; - void *pUsr; - int iQueueSize; - int bRunsDA; /* cache for early mutex release */ + int iQueueSize = 0; /* keep the compiler happy... */ - /* dequeue element (still protected from mutex) */ - iRet = qqueueDel(pThis, &pUsr); - qqueueChkPersist(pThis); - iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */ - bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ - - /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY - * if we could successfully obtain a user pointer. Otherwise, we would bring the - * cancel cleanup handler into big troubles (and we did ;)). Note that we can - * NOT set the variable further below, as this may lead to an object leak. We - * may get cancelled before we reach that part of the code, so the only - * solution is to do it here. -- rgerhards, 2008-02-27 - */ - if(iRet == RS_RET_OK) { - pWti->pUsrp = pUsr; - } + /* dequeue element batch (still protected from mutex) */ + iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize); /* awake some flow-controlled sources if we can do this right now */ /* TODO: this could be done better from a performance point of view -- do it only if * we have someone waiting for the condition (or only when we hit the watermark right * on the nail [exact value]) -- rgerhards, 2008-03-14 + * now that we dequeue batches of pointers, this is much less an issue... + * rgerhards, 2009-04-22 */ if(iQueueSize < pThis->iFullDlyMrk) { pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk); @@ -1417,37 +1455,16 @@ qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); } - /* rgerhards, 2008-09-30: I reversed the order of cond_signal und mutex_unlock - * as of the pthreads recommendation on predictable scheduling behaviour. I don't see - * any problems caused by this, but I add this comment in case some will be seen - * in the next time. - */ pthread_cond_signal(&pThis->notFull); d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ - /* do actual processing (the lengthy part, runs in parallel) - * If we had a problem while dequeing, we do not call the consumer, - * but we otherwise ignore it. This is in the hopes that it will be - * self-healing. However, this is really not a good thing. - * rgerhards, 2008-01-03 - */ - if(iRet != RS_RET_OK) - FINALIZE; - - /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue. - * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to - * provide real-time creation of spool files. - * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong. - */ - CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); - -finalize_it: if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things " "may happen\n", iRet); } + RETiRet; } @@ -1490,7 +1507,7 @@ finalize_it: * but you get the idea from the code above. */ static rsRetVal -qqueueRateLimiter(qqueue_t *pThis) +RateLimiter(qqueue_t *pThis) { DEFiRet; int iDelay; @@ -1553,15 +1570,18 @@ qqueueRateLimiter(qqueue_t *pThis) * rgerhards, 2008-01-21 */ static rsRetVal -qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp)); + CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); +// MULTIQUEUE: here we need to iterate through array! - or better pass it as whole? ... probably + int i; + for(i = 0 ; i < pWti->paUsrp->nElem ; i++) + CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp->pUsrp[i])); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1587,15 +1607,19 @@ finalize_it: * rgerhards, 2008-01-14 */ static rsRetVal -qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); + CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); +// MULTIQUEUE: + int i; + for(i = 0 ; i < pWti->paUsrp->nElem ; i++) + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i])); + finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1653,12 +1677,27 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) * the DA queue */ static int -qqueueChkStopWrkrReg(qqueue_t *pThis) +ChkStooWrkrReg(qqueue_t *pThis) { return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0); } +/* return the configured "deq max at once" interval + * rgerhards, 2009-04-22 + */ +static rsRetVal +GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal) +{ + DEFiRet; + assert(pVal != NULL); +RUNLOG_VAR("%d", pThis->iDeqMaxAtOnce); // MULTIQUEUE: delete this when done + + *pVal = pThis->iDeqMaxAtOnce; + + RETiRet; +} + /* must only be called when the queue mutex is locked, else results * are not stable! DA queue version */ @@ -1673,7 +1712,7 @@ qqueueIsIdleDA(qqueue_t *pThis) * are not stable! Regular queue version */ static int -qqueueIsIdleReg(qqueue_t *pThis) +IsIdleReg(qqueue_t *pThis) { #if 0 /* enable for performance testing */ int ret; @@ -1701,7 +1740,7 @@ qqueueIsIdleReg(qqueue_t *pThis) * I am telling this, because I, too, always get confused by those... */ static rsRetVal -qqueueRegOnWrkrShutdown(qqueue_t *pThis) +RegOnWrkrShutdown(qqueue_t *pThis) { DEFiRet; @@ -1722,7 +1761,7 @@ qqueueRegOnWrkrShutdown(qqueue_t *pThis) * hook to indicate in the parent queue (if we are a child) that we are not done yet. */ static rsRetVal -qqueueRegOnWrkrStartup(qqueue_t *pThis) +RegOnWrkrStartup(qqueue_t *pThis) { DEFiRet; @@ -1788,13 +1827,14 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); - CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg)); + CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); -- cgit v1.2.3 From e4b3f6d287d74b34d27b4e296c33cb3f1294a58c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 22 Apr 2009 16:39:58 +0200 Subject: now batches are handed down to the actual consumer ... but the action consumer does not do anything really intelligent with them. But the DA consumer is already done, as is the main message queue consumer. --- runtime/queue.c | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index c48eb724..ea8567ea 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -899,6 +899,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { + aUsrp_t aUsrp; + obj_t *pMsgp; DEFiRet; ASSERT(pThis != NULL); @@ -908,8 +910,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) * mode the consumer probably has a lot to convey (which get's lost in the other modes * because they are asynchronous. But direct mode is deliberately synchronous. * rgerhards, 2008-02-12 + * We use our knowledge about the aUsrp_t structure below, but without that, we + * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pUsr, pUsr); + pMsgp = (obj_t*) pUsr; + aUsrp.nElem = 1; /* there always is only one in direct mode */ + aUsrp.pUsrp = &pMsgp; + iRet = pThis->pConsumer(pThis->pUsr, &aUsrp); RETiRet; } @@ -959,7 +966,7 @@ UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) * rgerhards, 2008-01-29 */ static rsRetVal -qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) +GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; @@ -1015,7 +1022,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ if(pThis->iUngottenObjs > 0) { - iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); + iRet = GetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); ATOMIC_DEC(pThis->iQueueSize); @@ -1243,7 +1250,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)) { DEFiRet; qqueue_t *pThis; @@ -1578,14 +1585,12 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); -// MULTIQUEUE: here we need to iterate through array! - or better pass it as whole? ... probably - int i; - for(i = 0 ; i < pWti->paUsrp->nElem ; i++) - CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp->pUsrp[i])); + CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 */ +//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that... if(pThis->iDeqSlowdown) { dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n", pThis->iDeqSlowdown); @@ -1597,7 +1602,7 @@ finalize_it: } -/* This is a special consumer to feed the disk-queue in disk-assited mode. +/* This is a special consumer to feed the disk-queue in disk-assisted mode. * When active, our own queue more or less acts as a memory buffer to the disk. * So this consumer just needs to drain the memory queue and submit entries * to the disk queue. The disk queue will then call the actual consumer from @@ -1609,18 +1614,17 @@ finalize_it: static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); -// MULTIQUEUE: - int i; + /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->paUsrp->nElem ; i++) CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i])); - finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); RETiRet; @@ -1944,7 +1948,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) * to the regular files. -- rgerhards, 2008-01-29 */ while(pThis->iUngottenObjs > 0) { - CHKiRet(qqueueGetUngottenObj(pThis, &pUsr)); + CHKiRet(GetUngottenObj(pThis, &pUsr)); CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); objDestruct(pUsr); } -- cgit v1.2.3 From feddb7ea18769399ce53e3958d5c4a0b68e964bc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 11:19:32 +0200 Subject: improving debugging info a bit --- runtime/queue.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index ea8567ea..4e37a0c2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1818,9 +1818,11 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ - dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d starting\n", + dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, " + "full delay %d, light delay %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, - qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1); + qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, + pThis->iFullDlyMrk, pThis->iLightDlyMrk); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ -- cgit v1.2.3 From 5c0aeae8ab1f344a022d586dc26c5d78203f7e0b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 12:50:07 +0200 Subject: added $MainMsgQueueDequeueBatchSize and $ActionQueueDequeueBatchSize configuration directives --- runtime/queue.c | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index c5f9df81..f3d3fe71 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -69,7 +69,7 @@ rsRetVal qqueueChkPersist(qqueue_t *pThis); static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); -static rsRetVal GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal); +static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2); @@ -375,7 +375,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); // MULTIQUEUE: TODO: this should be DA-specific! - CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup)); @@ -1280,7 +1280,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ - pThis->iDeqMaxAtOnce = 8; /* conservative default, should still provide good performance */ + pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */ pThis->pszFilePrefix = NULL; pThis->qType = qType; @@ -1421,7 +1421,7 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); /* all well, use this element */ pWti->paUsrp->pUsrp[nDequeued++] = pUsr; - } while(iQueueSize > 0 && nDequeued < pThis->iDeqMaxAtOnce); + } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize); //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ pWti->paUsrp->nElem = nDequeued; @@ -1691,14 +1691,11 @@ ChkStooWrkrReg(qqueue_t *pThis) * rgerhards, 2009-04-22 */ static rsRetVal -GetDeqMaxAtOnce(qqueue_t *pThis, int *pVal) +GetDeqBatchSize(qqueue_t *pThis, int *pVal) { DEFiRet; assert(pVal != NULL); -RUNLOG_VAR("%d", pThis->iDeqMaxAtOnce); // MULTIQUEUE: delete this when done - - *pVal = pThis->iDeqMaxAtOnce; - + *pVal = pThis->iDeqBatchSize; RETiRet; } @@ -1819,10 +1816,10 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, " - "full delay %d, light delay %d starting\n", + "full delay %d, light delay %d, deq batch size %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, - pThis->iFullDlyMrk, pThis->iLightDlyMrk); + pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1835,7 +1832,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg)); - CHKiRet(wtpSetpfGetDeqMaxAtOnce (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqMaxAtOnce)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup)); @@ -2301,6 +2298,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) DEFpropSetMeth(qqueue, pUsr, void*) DEFpropSetMeth(qqueue, iDeqSlowdown, int) +DEFpropSetMeth(qqueue, iDeqBatchSize, int) DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) -- cgit v1.2.3 From 6c5264159c099ddc4d06590508980ee53a83b67b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 14:29:41 +0200 Subject: fixing a small (newly-introduced) memory leak ... plus simplifying free() calls after agreement on mailing list that we no longer need to check if the pointer is non-NULL --- runtime/queue.c | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index f3d3fe71..a5feef3d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -490,9 +490,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis) ASSERT(pThis != NULL); queueDrain(pThis); /* discard any remaining queue entries */ - - if(pThis->tVars.farray.pBuf != NULL) - free(pThis->tVars.farray.pBuf); + free(pThis->tVars.farray.pBuf); RETiRet; } @@ -2063,11 +2061,8 @@ CODESTARTobjDestruct(qqueue) /* type-specific destructor */ iRet = pThis->qDestruct(pThis); - if(pThis->pszFilePrefix != NULL) - free(pThis->pszFilePrefix); - - if(pThis->pszSpoolDir != NULL) - free(pThis->pszSpoolDir); + free(pThis->pszFilePrefix); + free(pThis->pszSpoolDir); ENDobjDestruct(qqueue) @@ -2081,8 +2076,8 @@ qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix) { DEFiRet; - if(pThis->pszFilePrefix != NULL) - free(pThis->pszFilePrefix); + free(pThis->pszFilePrefix); + pThis->pszFilePrefix = NULL; if(pszPrefix == NULL) /* just unset the prefix! */ ABORT_FINALIZE(RS_RET_OK); -- cgit v1.2.3 From 8159d0a117837ed573e80cf86f4f013bf9534ab2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 16:02:42 +0200 Subject: fixed abort condition in DA mode --- runtime/queue.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index a5feef3d..c2df928b 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -374,8 +374,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); - // MULTIQUEUE: TODO: this should be DA-specific! - CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup)); -- cgit v1.2.3 From bb79e96dc300fa5a2182e7c047afb3b15c5dc870 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 12 May 2009 15:27:40 +0200 Subject: moving to a cleaner implementation of batches ... now that we know what we need from a theoretical POV. --- runtime/queue.c | 49 ++++++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 19 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index c2df928b..c3a8e9d4 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -65,7 +65,7 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) /* forward-definitions */ -rsRetVal qqueueChkPersist(qqueue_t *pThis); +static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -896,8 +896,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { - aUsrp_t aUsrp; - obj_t *pMsgp; + batch_t singleBatch; + batch_obj_t batchObj; DEFiRet; ASSERT(pThis != NULL); @@ -907,17 +907,19 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) * mode the consumer probably has a lot to convey (which get's lost in the other modes * because they are asynchronous. But direct mode is deliberately synchronous. * rgerhards, 2008-02-12 - * We use our knowledge about the aUsrp_t structure below, but without that, we + * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - pMsgp = (obj_t*) pUsr; - aUsrp.nElem = 1; /* there always is only one in direct mode */ - aUsrp.pUsrp = &pMsgp; - iRet = pThis->pConsumer(pThis->pUsr, &aUsrp); + batchObj.state = BATCH_STATE_RDY; + batchObj.pUsrp = (obj_t*) pUsr; + singleBatch.nElem = 1; /* there always is only one in direct mode */ + singleBatch.pElem = &batchObj; + iRet = pThis->pConsumer(pThis->pUsr, &singleBatch); RETiRet; } + static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) { return RS_RET_OK; @@ -1247,7 +1249,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*)) { DEFiRet; qqueue_t *pThis; @@ -1402,11 +1404,12 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize rsRetVal localRet; DEFiRet; + /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ + nDequeued = 0; do { dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDel(pThis, &pUsr)); - qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */ iQueueSize = qqueueGetOverallQueueSize(pThis); /* check if we should discard this element */ @@ -1417,11 +1420,15 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); ABORT_FINALIZE(localRet); /* all well, use this element */ - pWti->paUsrp->pUsrp[nDequeued++] = pUsr; + pWti->batch.pElem[nDequeued].pUsrp = pUsr; + pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; + ++nDequeued; } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize); + qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ + //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ - pWti->paUsrp->nElem = nDequeued; + pWti->batch.nElem = nDequeued; *iRemainingQueueSize = iQueueSize; finalize_it: @@ -1582,7 +1589,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pWti, wti); CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp)); + CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1619,8 +1626,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); /* iterate over returned results and enqueue them in DA queue */ - for(i = 0 ; i < pWti->paUsrp->nElem ; i++) - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i])); + for(i = 0 ; i < pWti->batch.nElem ; i++) + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp)); finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1974,17 +1981,21 @@ finalize_it: /* check if we need to persist the current queue info. If an - * error occurs, thus should be ignored by caller (but we still + * error occurs, this should be ignored by caller (but we still * abide to our regular call interface)... * rgerhards, 2008-01-13 + * nUpdates is the number of updates since the last call to this function. + * It may be > 1 due to batches. -- rgerhards, 2009-05-12 */ -rsRetVal qqueueChkPersist(qqueue_t *pThis) +static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); + assert(nUpdates > 0); - if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { + pThis->iUpdsSincePersist += nUpdates; + if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { qqueuePersist(pThis, QUEUE_CHECKPOINT); pThis->iUpdsSincePersist = 0; } @@ -2199,7 +2210,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* and finally enqueue the message */ CHKiRet(qqueueAdd(pThis, pUsr)); - qqueueChkPersist(pThis); + qqueueChkPersist(pThis, 1); finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { -- cgit v1.2.3 From 4a8c02870a55e19c1bebfae5cb70d1ec5aa7c203 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 13 May 2009 16:00:15 +0200 Subject: moved user object destruction to queue itself So far, the consumer was responsible for destroying objects. However, this does not work well with ultra-reliable queues. This is the first move to support them. --- runtime/queue.c | 40 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index c3a8e9d4..6bea338a 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1390,13 +1390,47 @@ finalize_it: } +/* Delete a batch of processed user objects from the queue, which includes + * destructing the objects themself. The pointer piRemainingQueu + * rgerhards, 2009-05-13 + */ +static inline rsRetVal +DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) +{ + int i; + void *pUsr; + DEFiRet; + + /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pBatch != NULL); + + /* if the queue runs in DA mode, the DA worker already deleted the message. But + * in regular mode, we need to do it ourselfs. We differentiate between the two cases, + * because it is actually the easiest way to handle the destruct-Problem in a simple + * and pUsrp-Type agnostic way (else we would need an objAddRef() generic function). + */ + if(!pThis->bRunsDA) { + for(i = 0 ; i < pBatch->nElem ; ++i) { + /* TODO: pull msgs off the queue (not yet necessary) */ + pUsr = pBatch->pElem[i].pUsrp; + objDestruct(pUsr); + } + } + pBatch->nElem = 0; + + RETiRet; +} + + /* dequeue as many user points as are available, until we hit the configured * upper limit of pointers. * This must only be called when the queue mutex is LOOKED, otherwise serious * malfunction will happen. */ static inline rsRetVal -DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize) +DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSize) { int nDequeued; int iQueueSize; @@ -1405,6 +1439,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize DEFiRet; /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ + DeleteProcessedBatch(pThis, &pWti->batch); nDequeued = 0; do { @@ -1427,9 +1462,8 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ - //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ pWti->batch.nElem = nDequeued; - *iRemainingQueueSize = iQueueSize; + *piRemainingQueueSize = iQueueSize; finalize_it: RETiRet; -- cgit v1.2.3 From 93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 18 May 2009 17:28:34 +0200 Subject: t-delete list implemented, queue store drivers updated... ... on the way to the ultra-reliable queue modes (redesign doc). This version does not really work, but is a good commit point. Next comes queue size calculation. DA mode does not yet work. --- runtime/queue.c | 353 ++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 306 insertions(+), 47 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 6bea338a..dc399066 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -79,6 +79,93 @@ static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); #define QUEUE_CHECKPOINT 1 #define QUEUE_NO_CHECKPOINT 0 +/*********************************************************************** + * we need a private data structure, the "to-delete" list. As C does + * not provide any partly private data structures, we implement this + * structure right here inside the module. + * Note that this list must always be kept sorted based on a unique + * dequeue ID (which is monotonically increasing). + * rgerhards, 2009-05-18 + ***********************************************************************/ + +/* generate next uniqueue dequeue ID. Note that uniqueness is only required + * on a per-queue basis and while this instance runs. So a stricly monotonically + * increasing counter is sufficient (if enough bits are used). + */ +static inline qDeqID getNextDeqID(qqueue_t *pQueue) +{ + ISOBJ_TYPE_assert(pQueue, qqueue); + return pQueue->deqIDAdd++; +} + + +/* return the top element of the to-delete list or NULL, if the + * list is empty. + */ +static inline toDeleteLst_t *tdlPeek(qqueue_t *pQueue) +{ + ISOBJ_TYPE_assert(pQueue, qqueue); + return pQueue->toDeleteLst; +} + + +/* remove the top element of the to-delete list. Nothing but the + * element itself is destroyed. Must not be called when the list + * is empty. + */ +static inline rsRetVal tdlPop(qqueue_t *pQueue) +{ + toDeleteLst_t *pRemove; + DEFiRet; + + ISOBJ_TYPE_assert(pQueue, qqueue); + assert(pQueue->toDeleteLst != NULL); + + pRemove = pQueue->toDeleteLst; + pQueue->toDeleteLst = pQueue->toDeleteLst->pNext; + free(pRemove); + + RETiRet; +} + + +/* Add a new to-delete list entry. The function allocates the data + * structure, populates it with the values provided and links the new + * element into the correct place inside the list. + */ +static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElem) +{ + toDeleteLst_t *pNew; + toDeleteLst_t *pPrev; + DEFiRet; + + ISOBJ_TYPE_assert(pQueue, qqueue); + assert(pQueue->toDeleteLst != NULL); + + CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t))); + pNew->deqID = deqID; + pNew->nElem = nElem; + + /* now find right spot */ + for( pPrev = pQueue->toDeleteLst + ; pPrev != NULL && deqID > pPrev->deqID + ; pPrev = pPrev->pNext) { + /*JUST SEARCH*/; + } + + if(pPrev == NULL) { + pNew->pNext = pQueue->toDeleteLst; + pQueue->toDeleteLst = pNew; + } else { + pNew->pNext = pPrev->pNext; + pPrev->pNext = pNew; + } + +finalize_it: + RETiRet; +} + + /* methods */ @@ -114,7 +201,9 @@ static inline void queueDrain(qqueue_t *pThis) /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(pThis->iQueueSize-- > 0) { - pThis->qDel(pThis, &pUsr); + pThis->qDeq(pThis, &pUsr); +// TODO: ULTRA + //pThis->qDel(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); } @@ -472,6 +561,7 @@ static rsRetVal qConstructFixedArray(qqueue_t *pThis) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } + pThis->tVars.farray.deqhead = 0; pThis->tVars.farray.head = 0; pThis->tVars.farray.tail = 0; @@ -508,13 +598,29 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) RETiRet; } -static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out) + +static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) { DEFiRet; ASSERT(pThis != NULL); - *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.head]; + *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.deqhead]; +//MULTIdbgprintf("ULTRA qDeqFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail); + pThis->tVars.farray.deqhead++; + if (pThis->tVars.farray.deqhead == pThis->iMaxQueueSize) + pThis->tVars.farray.deqhead = 0; + + RETiRet; +} + +static rsRetVal qDelFixedArray(qqueue_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + +//MULTIdbgprintf("ULTRA qDelFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail); pThis->tVars.farray.head++; if (pThis->tVars.farray.head == pThis->iMaxQueueSize) pThis->tVars.farray.head = 0; @@ -529,15 +635,13 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out) static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) { - DEFiRet; qLinkedList_t *pEntry; + DEFiRet; ASSERT(ppRoot != NULL); ASSERT(ppLast != NULL); - if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } + CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; pEntry->pUsr = pUsr; @@ -586,8 +690,9 @@ static rsRetVal qConstructLinkedList(qqueue_t *pThis) ASSERT(pThis != NULL); - pThis->tVars.linklist.pRoot = 0; - pThis->tVars.linklist.pLast = 0; + pThis->tVars.linklist.pDeqRoot = NULL; + pThis->tVars.linklist.pDelRoot = NULL; + pThis->tVars.linklist.pLast = NULL; qqueueChkIsDA(pThis); @@ -610,15 +715,60 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) { + qLinkedList_t *pEntry; + DEFiRet; + + CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t)))); + + pEntry->pNext = NULL; + pEntry->pUsr = pUsr; + + if(pThis->tVars.linklist.pDelRoot == NULL) { + pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry; + } else { + pThis->tVars.linklist.pLast->pNext = pEntry; + pThis->tVars.linklist.pLast = pEntry; + } + + if(pThis->tVars.linklist.pDeqRoot == NULL) { + pThis->tVars.linklist.pDeqRoot = pEntry; + } +RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot); + +finalize_it: + RETiRet; +} + + +static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) +{ + qLinkedList_t *pEntry; DEFiRet; - iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); + +RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot); + pEntry = pThis->tVars.linklist.pDeqRoot; + *ppUsr = pEntry->pUsr; + pThis->tVars.linklist.pDeqRoot = pEntry->pNext; + RETiRet; } -static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr) + +static rsRetVal qDelLinkedList(qqueue_t *pThis) { + qLinkedList_t *pEntry; DEFiRet; - iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); + + pEntry = pThis->tVars.linklist.pDelRoot; + + if(pThis->tVars.linklist.pDelRoot == pThis->tVars.linklist.pLast) { + pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = NULL; + } else { + pThis->tVars.linklist.pDelRoot = pEntry->pNext; + } + + free(pEntry); + RETiRet; } @@ -732,11 +882,29 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) /* and now the stream objects (some order as when persisted!) */ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); - CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF, + CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); - CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead)); + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel)); + + /* we now need to take care of the Deq handle. It is not persisted, so we can create + * a virgin copy based on pReadDel. // TODO duplicat code, same as blow - single function! + */ + + CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); + CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); + CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq)); + + /* TODO: dirty, need stream methods --> */ + pThis->tVars.disk.pReadDeq->iCurrFNum = pThis->tVars.disk.pReadDel->iCurrFNum; + pThis->tVars.disk.pReadDeq->iCurrOffs = pThis->tVars.disk.pReadDel->iCurrOffs; + /* <-- dirty, need stream methods :TODO */ + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDeq)); /* OK, we could successfully read the file, so we now can request that it be * deleted when we are done with the persisted information. @@ -787,17 +955,25 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite)); - CHKiRet(strmConstruct(&pThis->tVars.disk.pRead)); - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); - CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); - CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000)); - CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ)); - CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR)); - CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead)); - - - CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); - CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); + CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); + CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq)); + + CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDel)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); + CHKiRet(strmSetDir(pThis->tVars.disk.pReadDel, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000)); + CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ)); + CHKiRet(strmSetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDel)); + + CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strmSetFName(pThis->tVars.disk.pReadDeq, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(strmSetFName(pThis->tVars.disk.pReadDel, pThis->pszFilePrefix, pThis->lenFilePrefix)); } /* now we set (and overwrite in case of a persisted restart) some parameters which @@ -806,7 +982,8 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) * ability to read existing queue files. -- rgerhards, 2008-01-12 */ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize)); - CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize)); + CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pReadDeq, pThis->iMaxFileSize)); + CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pReadDel, pThis->iMaxFileSize)); finalize_it: RETiRet; @@ -820,7 +997,8 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) ASSERT(pThis != NULL); strmDestruct(&pThis->tVars.disk.pWrite); - strmDestruct(&pThis->tVars.disk.pRead); + strmDestruct(&pThis->tVars.disk.pReadDeq); + strmDestruct(&pThis->tVars.disk.pReadDel); RETiRet; } @@ -852,16 +1030,30 @@ finalize_it: RETiRet; } -static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr) + +static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr) { DEFiRet; + CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL)); + +finalize_it: + RETiRet; +} + + +static rsRetVal qDelDisk(qqueue_t *pThis) +{ + obj_t *pDummyObj; /* we need to deserialize it... */ + DEFiRet; + int64 offsIn; int64 offsOut; - CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn)); - CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pRead, NULL, NULL)); - CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut)); + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); + CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL)); + objDestruct(pDummyObj); + CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); /* This time it is a bit tricky: we free disk space only upon file deletion. So we need * to keep track of what we have read until we get an out-offset that is lower than the @@ -882,6 +1074,7 @@ finalize_it: RETiRet; } + /* -------------------- direct (no queueing) -------------------- */ static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -920,7 +1113,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) } -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out) +static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) { return RS_RET_OK; } @@ -1023,7 +1216,8 @@ qqueueDel(qqueue_t *pThis, void *pUsr) if(pThis->iUngottenObjs > 0) { iRet = GetUngottenObj(pThis, (obj_t**) pUsr); } else { - iRet = pThis->qDel(pThis, pUsr); + iRet = pThis->qDeq(pThis, pUsr); + // TODO: ULTRA iRet = pThis->qDel(pThis, pUsr); ATOMIC_DEC(pThis->iQueueSize); } @@ -1290,18 +1484,21 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qConstruct = qConstructFixedArray; pThis->qDestruct = qDestructFixedArray; pThis->qAdd = qAddFixedArray; + pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; - pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList; + pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; + pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; pThis->qDestruct = qDestructDisk; pThis->qAdd = qAddDisk; + pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ @@ -1390,8 +1587,68 @@ finalize_it: } +/* Finally remove n elements from the queue store. + */ +static inline rsRetVal +DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) +{ + int i; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + + /* now send delete request to storage driver */ + for(i = 0 ; i < nElem ; ++i) { + pThis->qDel(pThis); + } + + ++pThis->deqIDDel; /* one more batch dequeued */ + + RETiRet; +} + + +/* remove messages from the physical queue store that are fully processed. This is + * controlled via the to-delete list. We can only delete those elements, that are + * at the current physical tail of the queue. If the batch is from another position, + * we schedule it for deletion, but actual deletion will happen at a later call + * of this function here. We always delete as much as possible, which includes + * picking up things from the to-delete list. + */ +static inline rsRetVal +DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) +{ + toDeleteLst_t *pTdl; + qDeqID deqIDDel; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pBatch != NULL); + + pTdl = tdlPeek(pThis); + if(pTdl == NULL) { + DoDeleteBatchFromQStore(pThis, pBatch->nElem); + } else if(pBatch->deqID == pThis->deqIDDel) { + deqIDDel = pThis->deqIDDel; + pTdl = tdlPeek(pThis); + while(pTdl != NULL && deqIDDel == pTdl->deqID) { + DoDeleteBatchFromQStore(pThis, pTdl->nElem); + tdlPop(pThis); + ++deqIDDel; + pTdl = tdlPeek(pThis); + } + } else { + /* can not delete, insert into to-delete list */ + CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); + } + +finalize_it: + RETiRet; +} + + /* Delete a batch of processed user objects from the queue, which includes - * destructing the objects themself. The pointer piRemainingQueu + * destructing the objects themself. * rgerhards, 2009-05-13 */ static inline rsRetVal @@ -1401,30 +1658,31 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) void *pUsr; DEFiRet; - /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ - ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); +// TODO: ULTRA: lock qaueue mutex if instructed to do so - /* if the queue runs in DA mode, the DA worker already deleted the message. But - * in regular mode, we need to do it ourselfs. We differentiate between the two cases, - * because it is actually the easiest way to handle the destruct-Problem in a simple - * and pUsrp-Type agnostic way (else we would need an objAddRef() generic function). + /* if the queue runs in DA mode, the DA worker already deleted the in-memory representation + * of the message. But in regular mode, we need to do it ourselfs. We differentiate between + * the two cases, because it is actually the easiest way to handle the destruct-Problem in + * a simple and pUsrp-Type agnostic way (else we would need an objAddRef() generic function). */ if(!pThis->bRunsDA) { for(i = 0 ; i < pBatch->nElem ; ++i) { - /* TODO: pull msgs off the queue (not yet necessary) */ pUsr = pBatch->pElem[i].pUsrp; objDestruct(pUsr); } } - pBatch->nElem = 0; + + iRet = DeleteBatchFromQStore(pThis, pBatch); + + pBatch->nElem = 0; /* reset batch */ RETiRet; } -/* dequeue as many user points as are available, until we hit the configured +/* dequeue as many user pointers as are available, until we hit the configured * upper limit of pointers. * This must only be called when the queue mutex is LOOKED, otherwise serious * malfunction will happen. @@ -1463,6 +1721,7 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ pWti->batch.nElem = nDequeued; + pWti->batch.deqID = getNextDeqID(pThis); *piRemainingQueueSize = iQueueSize; finalize_it: @@ -1957,7 +2216,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) pThis->bNeedDelQIF = 0; } /* indicate spool file needs to be deleted */ - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); FINALIZE; /* nothing left to do, so be happy */ } @@ -1992,13 +2251,13 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) /* now persist the stream info */ CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); - CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF)); + CHKiRet(strmSerialize(pThis->tVars.disk.pReadDel, psQIF)); /* tell the input file object that it must not delete the file on close if the queue * is non-empty - but only if we are not during a simple checkpoint */ if(bIsCheckpoint != QUEUE_CHECKPOINT) { - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 0)); } /* we have persisted the queue object. So whenever it comes to an empty queue, -- cgit v1.2.3 From fe5bea77ac2533faab3b7b73bc253c4dc7d702bc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 18 May 2009 18:39:52 +0200 Subject: removed queue's UngetObj() call ... which is no longer needed thanks to the new queue design. --- runtime/queue.c | 110 ++++++-------------------------------------------------- 1 file changed, 11 insertions(+), 99 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index dc399066..8ef3e7db 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -73,7 +73,6 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2); -static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -169,7 +168,7 @@ finalize_it: /* methods */ -/* get the overall queue size, which includes ungotten objects. Must only be called +/* get the overall queue size. Must only be called * while mutex is locked! * rgerhards, 2008-01-29 */ @@ -178,11 +177,11 @@ qqueueGetOverallQueueSize(qqueue_t *pThis) { #if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */ BEGINfunc -dbgoprint((obj_t*) pThis, "queue size: %d (regular %d, ungotten %d)\n", - pThis->iQueueSize + pThis->iUngottenObjs, pThis->iQueueSize, pThis->iUngottenObjs); +dbgoprint((obj_t*) pThis, "queue size: %d (regular %d)\n", + pThis->iQueueSize, pThis->iQueueSize); ENDfunc #endif - return pThis->iQueueSize + pThis->iUngottenObjs; + return pThis->iQueueSize; } @@ -837,8 +836,6 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; struct stat stat_buf; - int iUngottenObjs; - obj_t *pUsr; ISOBJ_TYPE_assert(pThis, qqueue); @@ -868,18 +865,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) /* first, we try to read the property bag for ourselfs */ CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF)); - /* then the ungotten object queue */ - iUngottenObjs = pThis->iUngottenObjs; - pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */ - - while(iUngottenObjs > 0) { - /* fill the queue from disk */ - CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL)); - UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); - --iUngottenObjs; /* one less */ - } - - /* and now the stream objects (some order as when persisted!) */ + /* then the stream objects (same order as when persisted!) */ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, @@ -1122,57 +1108,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) /* --------------- end type-specific handlers -------------------- */ -/* unget a user pointer that has been dequeued. This functionality is especially important - * for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers - * is maintened in memory. - * rgerhards, 2008-01-20 - */ -static rsRetVal -UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) -{ - DEFiRet; - DEFVARS_mutexProtection; - - ISOBJ_TYPE_assert(pThis, qqueue); - ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15 - The second time I noticed it the queue was in destruction with NO worker threads - running. The pUsr ptr was totally off and provided no clue what it may be pointing - at (except that it looked like the static data pool). Both times, the abort happend - inside an action queue */ - - dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr)); - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex); - iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr); - ++pThis->iUngottenObjs; /* indicate one more */ - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - - RETiRet; -} - - -/* dequeues a user pointer from the ungotten queue. Pointers from there should always be - * dequeued first. - * - * This function must only be called when the mutex is locked! - * - * rgerhards, 2008-01-29 - */ -static rsRetVal -GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) -{ - DEFiRet; - - ISOBJ_TYPE_assert(pThis, qqueue); - ASSERT(ppUsr != NULL); - - iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr); - --pThis->iUngottenObjs; /* indicate one less */ - dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr)); - - RETiRet; -} - - /* generic code to add a queue entry * We use some specific code to most efficiently support direct mode * queues. This is justified in spite of the gain and the need to do some @@ -1198,8 +1133,6 @@ finalize_it: /* generic code to remove a queue entry - * rgerhards, 2008-01-29: we must first see if there is any object in the - * ungotten list and, if so, dequeue it first. */ static rsRetVal qqueueDel(qqueue_t *pThis, void *pUsr) @@ -1213,13 +1146,8 @@ qqueueDel(qqueue_t *pThis, void *pUsr) * If we decrement, however, we may lose a message. But that is better than * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ - if(pThis->iUngottenObjs > 0) { - iRet = GetUngottenObj(pThis, (obj_t**) pUsr); - } else { - iRet = pThis->qDeq(pThis, pUsr); - // TODO: ULTRA iRet = pThis->qDel(pThis, pUsr); - ATOMIC_DEC(pThis->iQueueSize); - } + iRet = pThis->qDeq(pThis, pUsr); + ATOMIC_DEC(pThis->iQueueSize); dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", iRet, pThis->iQueueSize); @@ -1528,6 +1456,8 @@ finalize_it: static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2) { + //TODO: looks like we no longer need it! + /* DEFiRet; qqueue_t *pThis = (qqueue_t*) arg1; @@ -1535,14 +1465,9 @@ ConsumerCancelCleanup(void *arg1, void *arg2) ISOBJ_TYPE_assert(pThis, qqueue); - if(pUsr != NULL) { - /* make sure the data element is not lost */ - dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); - CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX)); - } - -finalize_it: RETiRet; + */ + return RS_RET_OK; } @@ -2188,7 +2113,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) strm_t *psQIF = NULL; /* Queue Info File */ uchar pszQIFNam[MAXFNAME]; size_t lenQIFNam; - obj_t *pUsr; ASSERT(pThis != NULL); @@ -2235,20 +2159,10 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) */ CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis)); objSerializeSCALAR(psQIF, iQueueSize, INT); - objSerializeSCALAR(psQIF, iUngottenObjs, INT); objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64); objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64); CHKiRet(obj.EndSerialize(psQIF)); - /* now we must persist all objects on the ungotten queue - they can not go to - * to the regular files. -- rgerhards, 2008-01-29 - */ - while(pThis->iUngottenObjs > 0) { - CHKiRet(GetUngottenObj(pThis, &pUsr)); - CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); - objDestruct(pUsr); - } - /* now persist the stream info */ CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF)); CHKiRet(strmSerialize(pThis->tVars.disk.pReadDel, psQIF)); @@ -2615,8 +2529,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp) if(isProp("iQueueSize")) { pThis->iQueueSize = pProp->val.num; - } else if(isProp("iUngottenObjs")) { - pThis->iUngottenObjs = pProp->val.num; } else if(isProp("tVars.disk.sizeOnDisk")) { pThis->tVars.disk.sizeOnDisk = pProp->val.num; } else if(isProp("tVars.disk.bytesRead")) { -- cgit v1.2.3 From a4dad2009992d436ba23c2d0a4a43b483aac40fc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 May 2009 11:03:09 +0200 Subject: queue size calculation now based on logical/physical dequeue ... needed to split the old single counter into two. I wouldn't bet that I made some mistakes while doing so, but at least some ad-hoc tests plus the testbench do no longer indicate errors. --- runtime/queue.c | 114 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 66 insertions(+), 48 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 8ef3e7db..9855dac8 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -168,29 +168,37 @@ finalize_it: /* methods */ -/* get the overall queue size. Must only be called +/* get the physical queue size. Must only be called * while mutex is locked! * rgerhards, 2008-01-29 */ static inline int -qqueueGetOverallQueueSize(qqueue_t *pThis) +getPhysicalQueueSize(qqueue_t *pThis) { -#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */ -BEGINfunc -dbgoprint((obj_t*) pThis, "queue size: %d (regular %d)\n", - pThis->iQueueSize, pThis->iQueueSize); -ENDfunc -#endif return pThis->iQueueSize; } +/* get the logical queue size (that is store size minus logically dequeued elements). + * Must only be called while mutex is locked! + * rgerhards, 2009-05-19 + */ +static inline int +getLogicalQueueSize(qqueue_t *pThis) +{ + return pThis->iQueueSize - pThis->nLogDeq; +} + + + /* This function drains the queue in cases where this needs to be done. The most probable * reason is a HUP which needs to discard data (because the queue is configured to be lossy). * During a shutdown, this is typically not needed, as the OS frees up ressources and does * this much quicker than when we clean up ourselvs. -- rgerhards, 2008-10-21 * This function returns void, as it makes no sense to communicate an error back, even if * it happens. + * This functions works "around" the regular deque mechanism, because it is only used to + * clean up (in cases where message loss is acceptable). */ static inline void queueDrain(qqueue_t *pThis) { @@ -198,14 +206,14 @@ static inline void queueDrain(qqueue_t *pThis) ASSERT(pThis != NULL); +// TODO: ULTRA it may be a good idea to check validitity once again /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(pThis->iQueueSize-- > 0) { pThis->qDeq(pThis, &pUsr); -// TODO: ULTRA - //pThis->qDel(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); } + pThis->qDel(pThis); } } @@ -229,14 +237,14 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) /* if we have not yet reached the high water mark, there is no need to start a * worker. -- rgerhards, 2008-01-26 */ - if(qqueueGetOverallQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { + if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } } else { if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { iMaxWorkers = 1; } else { - iMaxWorkers = qqueueGetOverallQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } @@ -295,13 +303,13 @@ qqueueTurnOffDAMode(qqueue_t *pThis) */ /* we need to check if the DA queue is empty because the DA worker may simply have - * terminated do to no new messages arriving. That does not, however, mean that the + * terminated due to no new messages arriving. That does not, however, mean that the * DA queue is empty. If there is still data in that queue, we do nothing and leave * that for a later incarnation of this function (it will be called multiple times * during the lifetime of DA-mode, depending on how often the DA worker receives an * inactivity timeout. -- rgerhards, 2008-01-25 */ - if(pThis->pqDA->iQueueSize == 0) { + if(getLogicalQueueSize(pThis->pqDA) == 0) { pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. @@ -313,7 +321,7 @@ qqueueTurnOffDAMode(qqueue_t *pThis) * when it is waiting that the high water mark is reached again. If so, we need to start up * a regular worker. -- rgerhards, 2008-01-26 */ - if(qqueueGetOverallQueueSize(pThis) > 0) { + if(getLogicalQueueSize(pThis) > 0) { qqueueAdviseMaxWorkers(pThis); } } @@ -507,7 +515,7 @@ qqueueChkStrtDA(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); /* if we do not hit the high water mark, we have nothing to do */ - if(qqueueGetOverallQueueSize(pThis) != pThis->iHighWtrMrk) + if(getPhysicalQueueSize(pThis) != pThis->iHighWtrMrk) ABORT_FINALIZE(RS_RET_OK); if(pThis->bRunsDA) { @@ -521,14 +529,14 @@ qqueueChkStrtDA(qqueue_t *pThis) * we need at least one). */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark in DA mode, send notify\n", - qqueueGetOverallQueueSize(pThis)); + getPhysicalQueueSize(pThis)); qqueueAdviseMaxWorkers(pThis); } else { /* this is the case when we are currently not running in DA mode. So it is time * to turn it back on. */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", - qqueueGetOverallQueueSize(pThis)); + getPhysicalQueueSize(pThis)); qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } @@ -1124,7 +1132,8 @@ qqueueAdd(qqueue_t *pThis, void *pUsr) if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(pThis->iQueueSize); - dbgoprint((obj_t*) pThis, "entry added, size now %d entries\n", pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); } finalize_it: @@ -1132,10 +1141,10 @@ finalize_it: } -/* generic code to remove a queue entry +/* generic code to dequeue a queue entry */ static rsRetVal -qqueueDel(qqueue_t *pThis, void *pUsr) +qqueueDeq(qqueue_t *pThis, void *pUsr) { DEFiRet; @@ -1147,10 +1156,10 @@ qqueueDel(qqueue_t *pThis, void *pUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ iRet = pThis->qDeq(pThis, pUsr); - ATOMIC_DEC(pThis->iQueueSize); + ATOMIC_INC(pThis->nLogDeq); - dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n", - iRet, pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; } @@ -1188,7 +1197,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) /* first try to shutdown the queue within the regular shutdown period */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(qqueueGetOverallQueueSize(pThis) > 0) { + if(getPhysicalQueueSize(pThis) > 0) { if(pThis->bRunsDA) { /* We may have waited on the low water mark. As it may have changed, we * see if we reactivate the worker. @@ -1259,7 +1268,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA && qqueueGetOverallQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { + if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { /* switch to enqueue-only mode so that no more actions happen */ if(pThis->bRunsDA == 0) { qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ @@ -1289,7 +1298,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * they will automatically terminate as there no longer is any message left to process. */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(qqueueGetOverallQueueSize(pThis) > 0) { + if(getPhysicalQueueSize(pThis) > 0) { timeoutComp(&tTimeout, pThis->toActShutdown); if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -1358,7 +1367,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. */ - dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", qqueueGetOverallQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", getPhysicalQueueSize(pThis)); RETiRet; } @@ -1397,6 +1406,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->lenSpoolDir = strlen((char*)pThis->pszSpoolDir); pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; + pThis->nLogDeq = 0; pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; @@ -1522,11 +1532,16 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) ISOBJ_TYPE_assert(pThis, qqueue); +dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); /* now send delete request to storage driver */ for(i = 0 ; i < nElem ; ++i) { pThis->qDel(pThis); } + /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ + pThis->iQueueSize -= nElem; + pThis->nLogDeq -= nElem; +dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ RETiRet; @@ -1627,11 +1642,13 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = 0; do { dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); - CHKiRet(qqueueDel(pThis, &pUsr)); - iQueueSize = qqueueGetOverallQueueSize(pThis); + CHKiRet(qqueueDeq(pThis, &pUsr)); + iQueueSize = getLogicalQueueSize(pThis); /* check if we should discard this element */ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr); + //MULTI-DEQUEUE / ULTRA-RELIABLE: we need to handle this case, we need to advance the + // DEQ pointer (or so...) TODO!!! Idea: get a second nElem int in pBatch, nDequeued. Use that when deleting! if(localRet == RS_RET_QUEUE_FULL) continue; else if(localRet != RS_RET_OK) @@ -1880,7 +1897,7 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ bStopWrkr = 1; - } else if(qqueueGetOverallQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { + } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { bStopWrkr = 1; } else { bStopWrkr = 0; @@ -1903,9 +1920,9 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) * the DA queue */ static int -ChkStooWrkrReg(qqueue_t *pThis) +ChkStopWrkrReg(qqueue_t *pThis) { - return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0); + return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); } @@ -1929,7 +1946,7 @@ qqueueIsIdleDA(qqueue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results * are not stable! Regular queue version @@ -1939,12 +1956,12 @@ IsIdleReg(qqueue_t *pThis) { #if 0 /* enable for performance testing */ int ret; - ret = qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk); + ret = getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk); if(ret) fprintf(stderr, "queue is idle\n"); return ret; #else /* regular code! */ - return(qqueueGetOverallQueueSize(pThis) == 0 || (pThis->bRunsDA && qqueueGetOverallQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk)); #endif } @@ -2037,11 +2054,12 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* call type-specific constructor */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ - dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, " + dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, " "full delay %d, light delay %d, deq batch size %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, - qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, - pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize); + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), + pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk, + pThis->iDeqBatchSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -2053,7 +2071,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); @@ -2117,7 +2135,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) ASSERT(pThis != NULL); if(pThis->qType != QUEUETYPE_DISK) { - if(qqueueGetOverallQueueSize(pThis) > 0) { + if(getPhysicalQueueSize(pThis) > 0) { /* This error code is OK, but we will probably not implement this any time * The reason is that persistence happens via DA queues. But I would like to * leave the code as is, as we so have a hook in case we need one. @@ -2128,13 +2146,13 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) FINALIZE; /* if the queue is empty, we are happy and done... */ } - dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", qqueueGetOverallQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis)); /* Construct file name */ lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - if((bIsCheckpoint != QUEUE_CHECKPOINT) && (qqueueGetOverallQueueSize(pThis) == 0)) { + if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) { if(pThis->bNeedDelQIF) { unlink((char*)pszQIFNam); pThis->bNeedDelQIF = 0; @@ -2342,13 +2360,13 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) ISOBJ_TYPE_assert(pThis, qqueue); /* first check if we need to discard this message (which will cause CHKiRet() to exit) - * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The iQueueSize + * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The queue size * and bRunsDA parameters may not reflect the correct settings here, but they are * "good enough" in the sense that they can be used to drive the decision. Valgrind's * threading tools may point this access to be an error, but this is done * intentional. I do not see this causes problems to us. */ - CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, getPhysicalQueueSize(pThis), pThis->bRunsDA, pUsr)); /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE @@ -2386,12 +2404,12 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) * It's a side effect, but a good one ;) -- rgerhards, 2008-03-14 */ if(flowCtlType == eFLOWCTL_FULL_DELAY) { - while(pThis->iQueueSize >= pThis->iFullDlyMrk) { + while(getPhysicalQueueSize(pThis) >= pThis->iFullDlyMrk) { dbgoprint((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message - blocking.\n"); pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); /* TODO error check? But what do then? */ } } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY) { - if(pThis->iQueueSize >= pThis->iLightDlyMrk) { + if(getPhysicalQueueSize(pThis) >= pThis->iLightDlyMrk) { dbgoprint((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light delayable message - blocking a bit.\n"); timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */ pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); /* TODO error check? But what do then? */ @@ -2403,7 +2421,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) * is not the case, basic flow control enters the field, which means we wait for * the queue to become ready or drop the new message. -- rgerhards, 2008-03-14 */ - while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) + while( (pThis->iMaxQueueSize > 0 && getPhysicalQueueSize(pThis) >= pThis->iMaxQueueSize) || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { dbgoprint((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); -- cgit v1.2.3 From a9c4b26d462dd3c9dbd0575a3a1acc6d8df1c3b3 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 May 2009 18:47:26 +0200 Subject: some cleanup --- runtime/queue.c | 86 --------------------------------------------------------- 1 file changed, 86 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 9855dac8..0f87b235 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -72,7 +72,6 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); -static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -206,7 +205,6 @@ static inline void queueDrain(qqueue_t *pThis) ASSERT(pThis != NULL); -// TODO: ULTRA it may be a good idea to check validitity once again /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(pThis->iQueueSize-- > 0) { pThis->qDeq(pThis, &pUsr); @@ -473,7 +471,6 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); @@ -613,7 +610,6 @@ static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) ASSERT(pThis != NULL); *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.deqhead]; -//MULTIdbgprintf("ULTRA qDeqFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail); pThis->tVars.farray.deqhead++; if (pThis->tVars.farray.deqhead == pThis->iMaxQueueSize) pThis->tVars.farray.deqhead = 0; @@ -627,7 +623,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis) ASSERT(pThis != NULL); -//MULTIdbgprintf("ULTRA qDelFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail); pThis->tVars.farray.head++; if (pThis->tVars.farray.head == pThis->iMaxQueueSize) pThis->tVars.farray.head = 0; @@ -638,58 +633,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis) /* -------------------- linked list -------------------- */ -/* first some generic functions which are also used for the unget linked list */ - -static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr) -{ - qLinkedList_t *pEntry; - DEFiRet; - - ASSERT(ppRoot != NULL); - ASSERT(ppLast != NULL); - - CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t)))); - - pEntry->pNext = NULL; - pEntry->pUsr = pUsr; - - if(*ppRoot == NULL) { - *ppRoot = *ppLast = pEntry; - } else { - (*ppLast)->pNext = pEntry; - *ppLast = pEntry; - } - -finalize_it: - RETiRet; -} - -static inline rsRetVal qqueueDelLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, obj_t **ppUsr) -{ - DEFiRet; - qLinkedList_t *pEntry; - - ASSERT(ppRoot != NULL); - ASSERT(ppLast != NULL); - ASSERT(ppUsr != NULL); - ASSERT(*ppRoot != NULL); - - pEntry = *ppRoot; - *ppUsr = pEntry->pUsr; - - if(*ppRoot == *ppLast) { - *ppRoot = NULL; - *ppLast = NULL; - } else { - *ppRoot = pEntry->pNext; - } - free(pEntry); - - RETiRet; -} - -/* end generic functions which are also used for the unget linked list */ - static rsRetVal qConstructLinkedList(qqueue_t *pThis) { @@ -1455,33 +1398,6 @@ finalize_it: } -/* cancellation cleanup handler for queueWorker () - * Updates admin structure and frees ressources. - * Params: - * arg1 - user pointer (in this case a qqueue_t) - * arg2 - user data pointer (in this case a queue data element, any object [queue's pUsr ptr!]) - * Note that arg2 may be NULL, in which case no dequeued but unprocessed pUsr exists! - * rgerhards, 2008-01-16 - */ -static rsRetVal -ConsumerCancelCleanup(void *arg1, void *arg2) -{ - //TODO: looks like we no longer need it! - /* - DEFiRet; - - qqueue_t *pThis = (qqueue_t*) arg1; - obj_t *pUsr = (obj_t*) arg2; - - ISOBJ_TYPE_assert(pThis, qqueue); - - RETiRet; - */ - return RS_RET_OK; -} - - - /* This function checks if the provided message shall be discarded and does so, if needed. * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to * provide real-time creation of spool files. @@ -1636,7 +1552,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz rsRetVal localRet; DEFiRet; - /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */ DeleteProcessedBatch(pThis, &pWti->batch); nDequeued = 0; @@ -2075,7 +1990,6 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); -- cgit v1.2.3 From 0cf8e88a348dc574244e4f5c2be26f47e8bfff08 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 May 2009 18:58:33 +0200 Subject: solved the intended-discard-during-dequeue issue --- runtime/queue.c | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 0f87b235..672ba9f5 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -131,7 +131,7 @@ static inline rsRetVal tdlPop(qqueue_t *pQueue) * structure, populates it with the values provided and links the new * element into the correct place inside the list. */ -static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElem) +static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElemDeq) { toDeleteLst_t *pNew; toDeleteLst_t *pPrev; @@ -142,7 +142,7 @@ static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElem) CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t))); pNew->deqID = deqID; - pNew->nElem = nElem; + pNew->nElemDeq = nElemDeq; /* now find right spot */ for( pPrev = pQueue->toDeleteLst @@ -1483,19 +1483,19 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) pTdl = tdlPeek(pThis); if(pTdl == NULL) { - DoDeleteBatchFromQStore(pThis, pBatch->nElem); + DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; pTdl = tdlPeek(pThis); while(pTdl != NULL && deqIDDel == pTdl->deqID) { - DoDeleteBatchFromQStore(pThis, pTdl->nElem); + DoDeleteBatchFromQStore(pThis, pTdl->nElemDeq); tdlPop(pThis); ++deqIDDel; pTdl = tdlPeek(pThis); } } else { /* can not delete, insert into to-delete list */ - CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); + CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq)); } finalize_it: @@ -1547,6 +1547,7 @@ static inline rsRetVal DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSize) { int nDequeued; + int nDiscarded; int iQueueSize; void *pUsr; rsRetVal localRet; @@ -1554,7 +1555,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz DeleteProcessedBatch(pThis, &pWti->batch); - nDequeued = 0; + nDequeued = nDiscarded = 0; do { dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); @@ -1562,12 +1563,12 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); /* check if we should discard this element */ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr); - //MULTI-DEQUEUE / ULTRA-RELIABLE: we need to handle this case, we need to advance the - // DEQ pointer (or so...) TODO!!! Idea: get a second nElem int in pBatch, nDequeued. Use that when deleting! - if(localRet == RS_RET_QUEUE_FULL) + if(localRet == RS_RET_QUEUE_FULL) { + ++nDiscarded; continue; - else if(localRet != RS_RET_OK) + } else if(localRet != RS_RET_OK) { ABORT_FINALIZE(localRet); + } /* all well, use this element */ pWti->batch.pElem[nDequeued].pUsrp = pUsr; @@ -1578,6 +1579,7 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ pWti->batch.nElem = nDequeued; + pWti->batch.nElemDeq = nDequeued + nDiscarded; pWti->batch.deqID = getNextDeqID(pThis); *piRemainingQueueSize = iQueueSize; -- cgit v1.2.3 From ad7ccabe5ec616a4bf9fda1472d8041eaf1bf815 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 20 May 2009 11:11:02 +0200 Subject: yield() no longer needed on uniproc thanks to new algorithms --- runtime/queue.c | 7 ------- 1 file changed, 7 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 672ba9f5..3118bb19 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2361,13 +2361,6 @@ finalize_it: d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); dbgoprint((obj_t*) pThis, "EnqueueMsg advised worker start\n"); - /* the following pthread_yield is experimental, but brought us performance - * benefit. For details, please see http://kb.monitorware.com/post14216.html#p14216 - * rgerhards, 2008-10-09 - * but this is only true for uniprocessors, so we guard it with an optimize flag -- rgerhards, 2008-10-22 - */ - if(pThis->bOptimizeUniProc) - pthread_yield(); } RETiRet; -- cgit v1.2.3 From 9f45b80ea9ea86d516c895d97fd8670df37e319e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 20 May 2009 15:12:49 +0200 Subject: free last processed message in all cases so far, the last processed message was only freed when the next one was processed. This has been changed now. More precisely, a better algorithm has been selected for the queue worker process, which also involves less overhead than the previous one. The fix for "free last processed message" as then more or less a side-effect (easy to do) of the new algorithm. --- runtime/queue.c | 51 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 10 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 3118bb19..0019297b 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -469,7 +469,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); @@ -1257,7 +1257,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) /* we need to re-aquire the mutex for the next check in this case! */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ } - if(pThis->bIsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { + if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { /* and now the same for the DA queue */ END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n"); @@ -1548,18 +1548,19 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz { int nDequeued; int nDiscarded; + int nDeleted; int iQueueSize; void *pUsr; rsRetVal localRet; DEFiRet; + nDeleted = pWti->batch.nElemDeq; DeleteProcessedBatch(pThis, &pWti->batch); nDequeued = nDiscarded = 0; - do { + while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); - iQueueSize = getLogicalQueueSize(pThis); /* check if we should discard this element */ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr); @@ -1574,9 +1575,10 @@ dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); pWti->batch.pElem[nDequeued].pUsrp = pUsr; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; ++nDequeued; - } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize); + } - qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */ + /* it is sufficient to persist only when the bulk of work is done */ + qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted); pWti->batch.nElem = nDequeued; pWti->batch.nElemDeq = nDequeued + nDiscarded; @@ -1618,6 +1620,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); } + // TODO: MULTI: check physical queue size! pthread_cond_signal(&pThis->notFull); d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -1727,6 +1730,27 @@ RateLimiter(qqueue_t *pThis) } +/* This dequeues the next batch and checks if the queue is empty. If it is + * empty, return RS_RET_IDLE. That will trigger termination of the function + * and tell the upper layer caller to initiate idle processing. + * rgerhards, 2009-05-20 + */ +static inline rsRetVal +DequeueForConsumer(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ISOBJ_TYPE_assert(pWti, wti); + + CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); + + if(pWti->batch.nElem == 0) + ABORT_FINALIZE(RS_RET_IDLE); + +finalize_it: + RETiRet; +} /* This is the queue consumer in the regular (non-DA) case. It is * protected by the queue mutex, but MUST release it as soon as possible. @@ -1740,7 +1764,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave)); CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch)); /* we now need to check if we should deliberately delay processing a bit @@ -1754,6 +1778,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) } finalize_it: +dbgprintf("XXX: regular consumer finished, iret=%d, szlog %d sz phys %d\n", iRet, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; } @@ -1776,7 +1801,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave)); /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem ; i++) CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp)); @@ -1855,6 +1880,8 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal) RETiRet; } + +/* common function for the idle functions that deletes the last batch if TODO MULTI */ /* must only be called when the queue mutex is locked, else results * are not stable! DA queue version */ @@ -1990,7 +2017,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStopWrkrReg)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); @@ -2133,7 +2160,10 @@ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates) DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - assert(nUpdates > 0); + assert(nUpdates >= 0); + + if(nUpdates == 0) + FINALIZE; pThis->iUpdsSincePersist += nUpdates; if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) { @@ -2141,6 +2171,7 @@ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates) pThis->iUpdsSincePersist = 0; } +finalize_it: RETiRet; } -- cgit v1.2.3 From aa9426f683fa6af9280bc63050ee0187ba4c57e1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 26 May 2009 12:43:43 +0200 Subject: solved design issue with queue termination ... and also improved the test suite. There is a design issue in the v3 queue engine that manifested to some serious problems with the new processing mode. However, in v3 shutdown may take eternally if a queue runs in DA mode, is configured to preserve data AND the action fails and retries immediately. There is no cure available for v3, it would require doing much of the work we have done on the new engine. The window of exposure, as one might guess from the description, is very small. That is probably the reason why we have not seen it in practice. --- runtime/queue.c | 253 +++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 158 insertions(+), 95 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 0019297b..539cf4ec 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -202,9 +202,10 @@ getLogicalQueueSize(qqueue_t *pThis) static inline void queueDrain(qqueue_t *pThis) { void *pUsr; - ASSERT(pThis != NULL); + BEGINfunc + dbgoprint((obj_t*) pThis, "queue will lose %d messages, destroying...\n", pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(pThis->iQueueSize-- > 0) { pThis->qDeq(pThis, &pUsr); @@ -213,6 +214,7 @@ static inline void queueDrain(qqueue_t *pThis) } pThis->qDel(pThis); } + ENDfunc } @@ -617,6 +619,7 @@ static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) RETiRet; } + static rsRetVal qDelFixedArray(qqueue_t *pThis) { DEFiRet; @@ -631,6 +634,26 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis) } +/* reset the logical dequeue pointer to the physical dequeue position. + * This is only needed after we cancelled workers (during queue shutdown). + */ +static rsRetVal +qUnDeqAllFixedArray(qqueue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + + dbgoprint((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n", + pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq); + + pThis->tVars.farray.deqhead = pThis->tVars.farray.head; + pThis->nLogDeq = 0; + + RETiRet; +} + + /* -------------------- linked list -------------------- */ @@ -695,7 +718,6 @@ static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) qLinkedList_t *pEntry; DEFiRet; -RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot); pEntry = pThis->tVars.linklist.pDeqRoot; *ppUsr = pEntry->pUsr; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; @@ -723,6 +745,26 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis) } +/* reset the logical dequeue pointer to the physical dequeue position. + * This is only needed after we cancelled workers (during queue shutdown). + */ +static rsRetVal +qUnDeqAllLinkedList(qqueue_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + dbgoprint((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n", + pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq); + + pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot; + pThis->nLogDeq = 0; + + RETiRet; +} + + /* -------------------- disk -------------------- */ @@ -822,25 +864,16 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); - CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); - CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel)); - /* we now need to take care of the Deq handle. It is not persisted, so we can create * a virgin copy based on pReadDel. // TODO duplicat code, same as blow - single function! */ - CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq)); - CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); - CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); - CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); - CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); - CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); + CHKiRet(strmDup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq)); + CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq)); - /* TODO: dirty, need stream methods --> */ - pThis->tVars.disk.pReadDeq->iCurrFNum = pThis->tVars.disk.pReadDel->iCurrFNum; - pThis->tVars.disk.pReadDeq->iCurrOffs = pThis->tVars.disk.pReadDel->iCurrOffs; - /* <-- dirty, need stream methods :TODO */ + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite)); + CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel)); CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDeq)); /* OK, we could successfully read the file, so we now can request that it be @@ -1012,6 +1045,16 @@ finalize_it: } +/* This is a dummy function for disks - we do not need to reset anything + * because everything is already persisted... + */ +static rsRetVal +qUnDeqAllDisk(__attribute__((unused)) qqueue_t *pThis) +{ + return RS_RET_OK; +} + + /* -------------------- direct (no queueing) -------------------- */ static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -1055,6 +1098,12 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } +static rsRetVal +qUnDeqAllDirect(__attribute__((unused)) qqueue_t *pThis) +{ + return RS_RET_OK; +} + /* --------------- end type-specific handlers -------------------- */ @@ -1109,22 +1158,29 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) /* This function shuts down all worker threads and waits until they - * have terminated. If they timeout, they are cancelled. Parameters have been set - * before this function is called so that DA queues will be fully persisted to - * disk (if configured to do so). + * have terminated. If they timeout, they are cancelled. * rgerhards, 2008-01-24 * Please note that this function shuts down BOTH the parent AND the child queue * in DA case. This is necessary because their timeouts are tightly coupled. Most * importantly, the timeouts would be applied twice (or logic be extremely * complex) if each would have its own shutdown. The function does not self check * this condition - the caller must make sure it is not called with a parent. + * rgerhards, 2009-05-26: we do NO logner persist the queue here if bSaveOnShutdown + * is set. This must be handled by the caller. Not doing that cleans up the queue + * shutdown considerably. Also, older engines had a potential hang condition when + * the DA queue was already started and the DA worker configured for infinite + * retries and the action was during retry processing. This was a design issue, + * which is solved as of now. Note that the shutdown now may take a little bit + * longer, because we no longer can persist the queue in parallel to waiting + * on worker timeouts. */ -static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) +static rsRetVal +ShutdownWorkers(qqueue_t *pThis) { - DEFiRet; DEFVARS_mutexProtection; struct timespec tTimeout; rsRetVal iRetLocal; + DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ @@ -1193,53 +1249,18 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) } /* when we reach this point, both queues are either empty or the regular queue shutdown timeout - * has expired. Now we need to check if we are configured to not loose messages. If so, we need - * to persist the queue to disk (this is only possible if the queue is DA-enabled). We must also - * set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate as soon as its consumer - * is done. This is especially important as we otherwise may interfere with queue order while the - * DA consumer is running. -- rgerhards, 2008-01-27 - * Note: there was a note that we should not wait eternally on the DA worker if we run in - * enqueue-only note. I have reviewed the code and think there is no need for this check. Howerver, - * I'd like to keep this note in here should we happen to run into some related trouble. - * rgerhards, 2008-01-28 + * has expired. We must set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate + * as soon as its consumer is done. In particular, it does no longer need try to empty the queue. */ wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */ /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ - if(pThis->bRunsDA) + if(pThis->bRunsDA) { qqueueWaitDAModeInitialized(pThis); - - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - /* optimize parameters for shutdown of DA-enabled queues */ - if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { - /* switch to enqueue-only mode so that no more actions happen */ - if(pThis->bRunsDA == 0) { - qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to DA mode */ - } else { - /* TODO: RACE: we may reach this point when the DA worker has been initialized (state 1) - * but is not yet running (state 2). In this case, pThis->pqDA is NULL! rgerhards, 2008-02-27 - */ - qqueueSetEnqOnly(pThis->pqDA, QUEUE_MODE_ENQONLY, MUTEX_ALREADY_LOCKED); /* switch to enqueue-only mode */ - } - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - /* make sure we do not timeout before we are done */ - dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n"); - timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); - /* and run the primary queue's DA worker to drain the queue */ - iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); - if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, " - "continuing, but results are unpredictable\n", iRetLocal); - } - } else { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); /* also stop DA queue */ } - /* now the primary queue is either empty, persisted to disk - or set to loose messages. So we - * can now request immediate shutdown of any remaining workers. Note that if bSaveOnShutdown was set, - * the queue is now empty. If regular workers are still running, and try to pull the next message, - * they will automatically terminate as there no longer is any message left to process. - */ + /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(getPhysicalQueueSize(pThis) > 0) { timeoutComp(&tTimeout, pThis->toActShutdown); @@ -1278,10 +1299,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) /* Now queue workers should have terminated. If not, we need to cancel them as we have applied * all timeout setting. If any worker in any queue still executes, its consumer is possibly - * long-running and cancelling is the only way to get rid of it. Note that the - * cancellation handler will probably re-queue a user pointer, so the queue's enqueue - * function is still needed (what is no problem as we do not yet destroy the queue - but I - * thought it's a good idea to mention that fact). -- rgerhards, 2008-01-25 + * long-running and cancelling is the only way to get rid of it. */ dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the primary queue\n"); iRetLocal = wtpCancelAll(pThis->pWtpReg); /* returns immediately if all threads already have terminated */ @@ -1290,12 +1308,6 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) "threads, continuing, but results are unpredictable\n", iRetLocal); } - - /* TODO: think: do we really need to do this here? Can't it happen on DA queue destruction? If we - * disable it, we get an assertion... I think this is OK, as we need to have a certain order and - * canceling the DA workers here ensures that order. But in any instant, we may have a look at this - * code after we have reaced the milestone. -- rgerhards, 2008-01-27 - */ /* ... and now the DA queue, if it exists (should always be after the primary one) */ if(pThis->pqDA != NULL) { dbgoprint((obj_t*) pThis, "checking to see if we need to cancel any worker threads of the DA queue\n"); @@ -1310,7 +1322,8 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers * may still be running. */ - dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size %d.\n", getPhysicalQueueSize(pThis)); + dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; } @@ -1367,6 +1380,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddFixedArray; pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; + pThis->qUnDeqAll = qUnDeqAllFixedArray; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; @@ -1374,6 +1388,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddLinkedList; pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->qUnDeqAll = qUnDeqAllLinkedList; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1381,6 +1396,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; + pThis->qUnDeqAll = qUnDeqAllDisk; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ break; @@ -1389,6 +1405,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qDestruct = qDestructDirect; pThis->qAdd = qAddDirect; pThis->qDel = qDelDirect; + pThis->qUnDeqAll = qUnDeqAllDirect; break; } @@ -1817,20 +1834,17 @@ finalize_it: * If we are a child, we have done our duty when the queue is empty. In that case, * we can terminate. * Version for the DA worker thread. NOTE: the pThis->bRunsDA is different from - * the DA queue + * the DA queue. + * If our queue is in destruction, we drain to the DA queue and so we shall not terminate + * until we have done so. */ -static int +static rsRetVal qqueueChkStopWrkrDA(qqueue_t *pThis) { - /* if our queue is in destruction, we drain to the DA queue and so we shall not terminate - * until we have done so. - */ - int bStopWrkr; - - BEGINfunc + DEFiRet; if(pThis->bEnqOnly) { - bStopWrkr = 1; + iRet = RS_RET_TERMINATE_NOW; } else { if(pThis->bRunsDA) { ASSERT(pThis->pqDA != NULL); @@ -1838,19 +1852,16 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) && pThis->pqDA->sizeOnDiskMax > 0 && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ - bStopWrkr = 1; + iRet = RS_RET_TERMINATE_NOW; } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { - bStopWrkr = 1; - } else { - bStopWrkr = 0; + iRet = RS_RET_TERMINATE_NOW; } } else { - bStopWrkr = 1; + iRet = RS_RET_TERMINATE_NOW; } } - ENDfunc - return bStopWrkr; + RETiRet; } @@ -1861,10 +1872,20 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) * Version for the regular worker thread. NOTE: the pThis->bRunsDA is different from * the DA queue */ -static int +static rsRetVal ChkStopWrkrReg(qqueue_t *pThis) { + DEFiRet; + /* original condition return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); + * TODO: remove when verified! -- rgerhards, 2009-05-26 + */ + if(pThis->bEnqOnly || pThis->bRunsDA) + iRet = RS_RET_TERMINATE_NOW; + else if(pThis->pqParent != NULL) + iRet = RS_RET_TERMINATE_WHEN_IDLE; + + RETiRet; } @@ -1881,7 +1902,6 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal) } -/* common function for the idle functions that deletes the last batch if TODO MULTI */ /* must only be called when the queue mutex is locked, else results * are not stable! DA queue version */ @@ -1890,7 +1910,7 @@ qqueueIsIdleDA(qqueue_t *pThis) { /* remember: iQueueSize is the DA queue size, not the main queue! */ /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk)); } /* must only be called when the queue mutex is locked, else results * are not stable! Regular queue version @@ -1905,7 +1925,7 @@ IsIdleReg(qqueue_t *pThis) return ret; #else /* regular code! */ - return(getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk)); #endif } @@ -2176,19 +2196,62 @@ finalize_it: } +/* persist a queue with all data elements to disk - this is used to handle + * bSaveOnShutdown. We utilize the DA worker to do this. This must only + * be called after all workers have been shut down and if bSaveOnShutdown + * is actually set. Note that this function may potentially run long, + * depending on the queue configuration (e.g. store on remote machine). + * rgerhards, 2009-05-26 + */ +static inline rsRetVal +DoSaveOnShutdown(qqueue_t *pThis) +{ + struct timespec tTimeout; + rsRetVal iRetLocal; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + + qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ + /* make sure we do not timeout before we are done */ + dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n"); + timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); + /* and run the primary queue's DA worker to drain the queue */ + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); + dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", + iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + if(iRetLocal != RS_RET_OK) { + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying to shut down primary queue in disk save mode, " + "continuing, but results are unpredictable\n", iRetLocal); + } + + RETiRet; +} + + /* destructor for the queue object */ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(qqueue) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ - /* shut down all workers (handles *all* of the persistence logic) - * See function head comment of queueShutdownWorkers () on why we don't call it - * We also do not need to shutdown workers when we are in enqueue-only mode or we are a + /* shut down all workers + * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) * with a child! -- rgerhards, 2008-01-28 */ if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL) - qqueueShutdownWorkers(pThis); + ShutdownWorkers(pThis); + + /* now all workers are terminated. Messages may exist. Also, some logically dequeued + * messages may never have been processed because their worker was terminated. So + * we need to reset the logical dequeue pointer, persist the queue if configured to do + * so and then destruct everything. -- rgerhards, 2009-05-26 + */ + CHKiRet(pThis->qUnDeqAll(pThis)); + + if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { + CHKiRet(DoSaveOnShutdown(pThis)); + } /* finally destruct our (regular) worker thread pool * Note: currently pWtpReg is never NULL, but if we optimize our logic, this may happen, -- cgit v1.2.3 From d4564f8399f4362c7e79066370049f909cef996c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 27 May 2009 19:43:28 +0200 Subject: interim commit: working on failure cases slightly improved situation, would like to save it before carrying on --- runtime/queue.c | 314 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 211 insertions(+), 103 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 539cf4ec..48d1c6e3 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -66,7 +66,7 @@ DEFobjCurrIf(glbl) /* forward-definitions */ static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); -static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); +static rsRetVal SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); @@ -205,7 +205,7 @@ static inline void queueDrain(qqueue_t *pThis) ASSERT(pThis != NULL); BEGINfunc - dbgoprint((obj_t*) pThis, "queue will lose %d messages, destroying...\n", pThis->iQueueSize); + dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(pThis->iQueueSize-- > 0) { pThis->qDeq(pThis, &pUsr); @@ -366,7 +366,7 @@ qqueueChkIsDA(qqueue_t *pThis) * rgerhards, 2008-01-15 */ static rsRetVal -qqueueStartDA(qqueue_t *pThis) +StartDA(qqueue_t *pThis) { DEFiRet; uchar pszDAQName[128]; @@ -396,7 +396,7 @@ qqueueStartDA(qqueue_t *pThis) CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); CHKiRet(qqueueSettoEnq(pThis->pqDA, pThis->toEnq)); - CHKiRet(qqueueSetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); + CHKiRet(SetEnqOnly(pThis->pqDA, pThis->bDAEnqOnly, MUTEX_ALREADY_LOCKED)); CHKiRet(qqueueSetiDeqtWinFromHr(pThis->pqDA, pThis->iDeqtWinFromHr)); CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); @@ -450,8 +450,8 @@ finalize_it: * If this function fails (should not happen), DA mode is not turned on. * rgerhards, 2008-01-16 */ -static inline rsRetVal -qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) +static rsRetVal +InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -464,16 +464,17 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) * is intentional. We assume that when we need it once, we may also need it on another * occasion. Ressources used are quite minimal when no worker is running. * rgerhards, 2008-01-24 + * NOTE: this is the DA worker *pool*, not the DA queue! */ if(pThis->pWtpDA == NULL) { - lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DA", obj.GetName((obj_t*) pThis)); + lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:DAwpool", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); @@ -493,6 +494,7 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) * until the next enqueue request. */ wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */ +RUNLOG_VAR("%d", pThis->bRunsDA); finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -536,7 +538,7 @@ qqueueChkStrtDA(qqueue_t *pThis) */ dbgoprint((obj_t*) pThis, "%d entries - passed high water mark for disk-assisted mode, initiating...\n", getPhysicalQueueSize(pThis)); - qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ + InitDA(pThis, QUEUE_MODE_ENQDEQ, MUTEX_ALREADY_LOCKED); /* initiate DA mode */ } finalize_it: @@ -706,7 +708,6 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) if(pThis->tVars.linklist.pDeqRoot == NULL) { pThis->tVars.linklist.pDeqRoot = pEntry; } -RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot); finalize_it: RETiRet; @@ -1157,25 +1158,14 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) } -/* This function shuts down all worker threads and waits until they - * have terminated. If they timeout, they are cancelled. - * rgerhards, 2008-01-24 - * Please note that this function shuts down BOTH the parent AND the child queue - * in DA case. This is necessary because their timeouts are tightly coupled. Most - * importantly, the timeouts would be applied twice (or logic be extremely - * complex) if each would have its own shutdown. The function does not self check - * this condition - the caller must make sure it is not called with a parent. - * rgerhards, 2009-05-26: we do NO logner persist the queue here if bSaveOnShutdown - * is set. This must be handled by the caller. Not doing that cleans up the queue - * shutdown considerably. Also, older engines had a potential hang condition when - * the DA queue was already started and the DA worker configured for infinite - * retries and the action was during retry processing. This was a design issue, - * which is solved as of now. Note that the shutdown now may take a little bit - * longer, because we no longer can persist the queue in parallel to waiting - * on worker timeouts. +/* Try to terminate queue worker threads within the regular shutdown interval. + * Both the regular and DA queue (if it exists) is waited for, but on the same timeout. + * After this function returns, the workers must either be finished or some force + * to finish them must be applied. + * rgerhards, 2009-05-27 */ static rsRetVal -ShutdownWorkers(qqueue_t *pThis) +tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) { DEFVARS_mutexProtection; struct timespec tTimeout; @@ -1185,16 +1175,6 @@ ShutdownWorkers(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); - - /* we reduce the low water mark in any case. This is not absolutely necessary, but - * it is useful because we enable DA mode at several spots below and so we do not need - * to think about the low water mark each time. - */ - pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */ - pThis->iLowWtrMrk = 0; - - /* first try to shutdown the queue within the regular shutdown period */ BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(getPhysicalQueueSize(pThis) > 0) { if(pThis->bRunsDA) { @@ -1206,6 +1186,11 @@ ShutdownWorkers(qqueue_t *pThis) } END_MTX_PROTECTED_OPERATIONS(pThis->mut); + /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ + if(pThis->bRunsDA) { + qqueueWaitDAModeInitialized(pThis); + } + /* Now wait for the queue's workers to shut down. Note that we run into the code even if we just found * out there are no active workers - that doesn't matter: the wtp knows about that and so will * return immediately. @@ -1228,74 +1213,109 @@ ShutdownWorkers(qqueue_t *pThis) if(iRetLocal == RS_RET_TIMED_OUT) { dbgoprint((obj_t*) pThis, "regular shutdown timed out on primary queue (this is OK)\n"); } else { - /* OK, the regular queue is now shut down. So we can now wait for the DA queue (if running DA) */ dbgoprint((obj_t*) pThis, "regular queue workers shut down.\n"); - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(pThis->bRunsDA) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", - qqueueGetID(pThis->pqDA)); - /* we use the same absolute timeout as above, so we do not use more than the configured - * timeout interval! - */ - dbgoprint((obj_t*) pThis, "trying shutdown of DA workers\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue (this is OK)\n"); - } + } + + /* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + if(pThis->bRunsDA) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", + qqueueGetID(pThis->pqDA)); + /* we use the same absolute timeout as above, so we do not use more than the configured + * timeout interval! + */ + dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n"); + iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgoprint((obj_t*) pThis, "shutdown timed out on DA queue worker (this is OK)\n"); } else { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n"); } + } else { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); } - /* when we reach this point, both queues are either empty or the regular queue shutdown timeout - * has expired. We must set the primary queue to SHUTDOWN_IMMEDIATE, as it shall now terminate - * as soon as its consumer is done. In particular, it does no longer need try to empty the queue. - */ - wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); /* set primary queue to shutdown only */ + RETiRet; +} - /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ - if(pThis->bRunsDA) { - qqueueWaitDAModeInitialized(pThis); - wtpSetState(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE); /* also stop DA queue */ + +/* Try to shut down regular and DA queue workers, within the action timeout + * period. Note that the main queue DA worker is still unaffected (and may shuffle + * data to the disk queue while we terminate the other workers). Not finishing + * processing all messages is now OK (but they may be preserved later, depending + * on bSaveOnShutdown setting). + * rgerhards, 2009-05-27 + */ +static rsRetVal +tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) +{ + DEFVARS_mutexProtection; + struct timespec tTimeout; + rsRetVal iRetLocal; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ + + /* instruct workers to finish ASAP, even if still work exists */ +RUNLOG_STR("setting enqOnly for main queue"); + //TODO:SetEnqOnly(pThis, 1, LOCK_MUTEX); /* start no new workers */ + pThis->bEnqOnly = 1; + wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); + if(pThis->pqDA != NULL) { +RUNLOG_STR("setting enqOnly for DA queue"); + //TODO:SetEnqOnly(pThis->pqDA, 1, LOCK_MUTEX); + pThis->pqDA->bEnqOnly = 1; + wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); } /* now give the queue workers a last chance to gracefully shut down (based on action timeout setting) */ + timeoutComp(&tTimeout, pThis->toActShutdown); BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - if(getPhysicalQueueSize(pThis) > 0) { - timeoutComp(&tTimeout, pThis->toActShutdown); - if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and " - "triggers cancellation)\n"); - } else if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " - "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); - } - /* we need to re-aquire the mutex for the next check in this case! */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ - } - if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pWtpDA, LOCK_MUTEX) > 0) { - /* and now the same for the DA queue */ - END_MTX_PROTECTED_OPERATIONS(pThis->mut); - dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA workers\n"); - iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); - if(iRetLocal == RS_RET_TIMED_OUT) { - dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and " - "triggers cancellation)\n"); - } else if(iRetLocal != RS_RET_OK) { - dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue " - "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); - } - } else { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + if(wtpGetCurNumWrkr(pThis->pWtpReg, LOCK_MUTEX) > 0) { + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgoprint((obj_t*) pThis, "trying immediate shutdown of regular workers\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgoprint((obj_t*) pThis, "immediate shutdown timed out on primary queue (this is acceptable and " + "triggers cancellation)\n"); + } else if(iRetLocal != RS_RET_OK) { + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the primary queue " + "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); } - } else { + /* we need to re-aquire the mutex for the next check in this case! */ + BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); + } + + if(pThis->bRunsDA && wtpGetCurNumWrkr(pThis->pqDA->pWtpReg, LOCK_MUTEX) > 0) { + /* and now the same for the DA queue */ END_MTX_PROTECTED_OPERATIONS(pThis->mut); + dbgoprint((obj_t*) pThis, "trying immediate shutdown of DA queue workers\n"); + iRetLocal = wtpShutdownAll(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgoprint((obj_t*) pThis, "immediate shutdown timed out on DA queue (this is acceptable and " + "triggers cancellation)\n"); + } else if(iRetLocal != RS_RET_OK) { + dbgoprint((obj_t*) pThis, "unexpected iRet state %d after trying immediate shutdown of the DA queue " + "in disk save mode. Continuing, but results are unpredictable\n", iRetLocal); + } } + END_MTX_PROTECTED_OPERATIONS(pThis->mut); + + RETiRet; +} + + +/* This function cancels all remenaing regular workers for both the main and the DA + * queue. The main queue's DA worker pool continues to run (if it exists and is active). + * rgerhards, 2009-05-29 + */ +static rsRetVal +cancelWorkers(qqueue_t *pThis) +{ + rsRetVal iRetLocal; + DEFiRet; /* Now queue workers should have terminated. If not, we need to cancel them as we have applied * all timeout setting. If any worker in any queue still executes, its consumer is possibly @@ -1318,13 +1338,60 @@ ShutdownWorkers(qqueue_t *pThis) } } + RETiRet; +} + + +/* This function shuts down all worker threads and waits until they + * have terminated. If they timeout, they are cancelled. + * rgerhards, 2008-01-24 + * Please note that this function shuts down BOTH the parent AND the child queue + * in DA case. This is necessary because their timeouts are tightly coupled. Most + * importantly, the timeouts would be applied twice (or logic be extremely + * complex) if each would have its own shutdown. The function does not self check + * this condition - the caller must make sure it is not called with a parent. + * rgerhards, 2009-05-26: we do NO longer persist the queue here if bSaveOnShutdown + * is set. This must be handled by the caller. Not doing that cleans up the queue + * shutdown considerably. Also, older engines had a potential hang condition when + * the DA queue was already started and the DA worker configured for infinite + * retries and the action was during retry processing. This was a design issue, + * which is solved as of now. Note that the shutdown now may take a little bit + * longer, because we no longer can persist the queue in parallel to waiting + * on worker timeouts. + */ +static rsRetVal +ShutdownWorkers(qqueue_t *pThis) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ + + dbgoprint((obj_t*) pThis, "initiating worker thread shutdown sequence\n"); + + /* we reduce the low water mark in any case. This is not absolutely necessary, but + * it is useful because we enable DA mode at several spots below and so we do not need + * to think about the low water mark each time. + */ + pThis->iHighWtrMrk = 1; /* if we do not do this, the DA queue will not stop! */ + pThis->iLowWtrMrk = 0; + + CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis)); + + if(getPhysicalQueueSize(pThis) > 0) { + CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis)); + } + + CHKiRet(cancelWorkers(pThis)); + /* ... finally ... all worker threads have terminated :-) * Well, more precisely, they *are in termination*. Some cancel cleanup handlers - * may still be running. + * may still be running. Note that the main queue's DA worker may still be running. */ dbgoprint((obj_t*) pThis, "worker threads terminated, remaining queue size log %d, phys %d.\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +finalize_it: RETiRet; } @@ -1498,8 +1565,8 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); - pTdl = tdlPeek(pThis); - if(pTdl == NULL) { + pTdl = tdlPeek(pThis); /* get current head element */ + if(pTdl == NULL) { /* to-delete list empty */ DoDeleteBatchFromQStore(pThis, pBatch->nElemDeq); } else if(pBatch->deqID == pThis->deqIDDel) { deqIDDel = pThis->deqIDDel; @@ -1512,6 +1579,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) } } else { /* can not delete, insert into to-delete list */ + dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElemDeq)); } @@ -1769,6 +1837,27 @@ finalize_it: RETiRet; } + +/* This is called when a batch is processed and the worker does not + * ask for another batch (e.g. because it is to be terminated) + * rgerhards, 2009-05-27 + */ +static rsRetVal +batchProcessedReg(qqueue_t *pThis, wti_t *pWti) +{ + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + ISOBJ_TYPE_assert(pWti, wti); +dbgprintf("XXX: batchProcessedReg deletes %d records\n", pWti->batch.nElemDeq); + + DeleteProcessedBatch(pThis, &pWti->batch); + qqueueChkPersist(pThis, pWti->batch.nElemDeq); + + RETiRet; +} + + /* This is the queue consumer in the regular (non-DA) case. It is * protected by the queue mutex, but MUST release it as soon as possible. * rgerhards, 2008-01-21 @@ -1844,7 +1933,8 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) DEFiRet; if(pThis->bEnqOnly) { - iRet = RS_RET_TERMINATE_NOW; + iRet = RS_RET_TERMINATE_WHEN_IDLE; +RUNLOG; } else { if(pThis->bRunsDA) { ASSERT(pThis->pqDA != NULL); @@ -1852,11 +1942,14 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) && pThis->pqDA->sizeOnDiskMax > 0 && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ +RUNLOG; iRet = RS_RET_TERMINATE_NOW; } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { +RUNLOG; iRet = RS_RET_TERMINATE_NOW; } } else { +RUNLOG; iRet = RS_RET_TERMINATE_NOW; } } @@ -1880,10 +1973,14 @@ ChkStopWrkrReg(qqueue_t *pThis) return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); * TODO: remove when verified! -- rgerhards, 2009-05-26 */ - if(pThis->bEnqOnly || pThis->bRunsDA) +RUNLOG; + if(pThis->bEnqOnly || pThis->bRunsDA) { +RUNLOG; iRet = RS_RET_TERMINATE_NOW; - else if(pThis->pqParent != NULL) + } else if(pThis->pqParent != NULL) { +RUNLOG; iRet = RS_RET_TERMINATE_WHEN_IDLE; + } RETiRet; } @@ -2039,6 +2136,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessedReg)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); @@ -2056,7 +2154,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ iRetLocal = qqueueHaveQIF(pThis); if(iRetLocal == RS_RET_OK) { dbgoprint((obj_t*) pThis, "on-disk queue present, needs to be reloaded\n"); - qqueueInitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ + InitDA(pThis, QUEUE_MODE_ENQDEQ, LOCK_MUTEX); /* initiate DA mode */ bInitialized = 1; /* we are done */ } else { /* TODO: use logerror? -- rgerhards, 2008-01-16 */ @@ -2212,9 +2310,16 @@ DoSaveOnShutdown(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); - qqueueInitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ +dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + if(pThis->bRunsDA != 2) { + InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ +dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +RUNLOG_VAR("%d", pThis->bRunsDA); +RUNLOG_VAR("%d", pThis->pWtpDA->wtpState); + qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */ + } /* make sure we do not timeout before we are done */ - dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, eternal timeout set\n"); + dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); @@ -2247,7 +2352,7 @@ CODESTARTobjDestruct(qqueue) * we need to reset the logical dequeue pointer, persist the queue if configured to do * so and then destruct everything. -- rgerhards, 2009-05-26 */ - CHKiRet(pThis->qUnDeqAll(pThis)); +//!!!! //CHKiRet(pThis->qUnDeqAll(pThis)); if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); @@ -2470,7 +2575,7 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) +SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -2495,13 +2600,16 @@ qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); if(pThis->pWtpReg != NULL) wtpWakeupAllWrkr(pThis->pWtpReg); +RUNLOG; if(pThis->pWtpDA != NULL) wtpWakeupAllWrkr(pThis->pWtpDA); +RUNLOG; } else { /* switch back to regular mode */ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ } } +RUNLOG; pThis->bEnqOnly = bEnqOnly; -- cgit v1.2.3 From 9517e19b6427c295e206ece9562ce70f4a6d7044 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 28 May 2009 09:59:45 +0200 Subject: preserving current changes ... in preparation for some larger changes - I need to apply some serious design changes, as the current system does not play well at all with ultra-reliable queues. Will do that in a totally new version. --- runtime/queue.c | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 48d1c6e3..4405dd39 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -255,7 +255,11 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) /* wait until we have a fully initialized DA queue. Sometimes, we need to - * sync with it, as we expect it for some function. + * sync with it, as we expect it for some function. Note that in extreme + * cases, the DA queue may already have started up AND terminated when we + * call this function. As such,it may validly be that DA is already shut down. + * So we just check if we are in init phase and then wait for full startup. + * If in non-DA mode, we silently return. * rgerhards, 2008-02-27 */ static rsRetVal @@ -264,9 +268,8 @@ qqueueWaitDAModeInitialized(qqueue_t *pThis) DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - ASSERT(pThis->bRunsDA); - while(pThis->bRunsDA != 2) { + while(pThis->bRunsDA == 1) { d_pthread_cond_wait(&pThis->condDAReady, pThis->mut); } @@ -289,13 +292,16 @@ qqueueTurnOffDAMode(qqueue_t *pThis) { DEFiRet; +RUNLOG_STR("XXX: TurnOffDAMode\n"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ +RUNLOG; qqueueWaitDAModeInitialized(pThis); +RUNLOG; /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -309,6 +315,7 @@ qqueueTurnOffDAMode(qqueue_t *pThis) * during the lifetime of DA-mode, depending on how often the DA worker receives an * inactivity timeout. -- rgerhards, 2008-01-25 */ +dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pThis->pqDA)); if(getLogicalQueueSize(pThis->pqDA) == 0) { pThis->bRunsDA = 0; /* tell the world we are back in non-DA mode */ /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, @@ -1151,8 +1158,8 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) iRet = pThis->qDeq(pThis, pUsr); ATOMIC_INC(pThis->nLogDeq); - dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", - getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +// dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", +// getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); RETiRet; } @@ -1259,13 +1266,12 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ -RUNLOG_STR("setting enqOnly for main queue"); - //TODO:SetEnqOnly(pThis, 1, LOCK_MUTEX); /* start no new workers */ + /* note that we modify bEnqOnly direclty, because going through the method would + * startup some workers again. So this is OK here. -- rgerhards, 2009-05-28 + */ pThis->bEnqOnly = 1; wtpSetState(pThis->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); if(pThis->pqDA != NULL) { -RUNLOG_STR("setting enqOnly for DA queue"); - //TODO:SetEnqOnly(pThis->pqDA, 1, LOCK_MUTEX); pThis->pqDA->bEnqOnly = 1; wtpSetState(pThis->pqDA->pWtpReg, wtpState_SHUTDOWN_IMMEDIATE); } @@ -1644,7 +1650,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { -dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); +//dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); /* check if we should discard this element */ @@ -1945,7 +1951,7 @@ RUNLOG; RUNLOG; iRet = RS_RET_TERMINATE_NOW; } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { -RUNLOG; +dbgprintf("XXX: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); iRet = RS_RET_TERMINATE_NOW; } } else { @@ -1973,7 +1979,6 @@ ChkStopWrkrReg(qqueue_t *pThis) return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); * TODO: remove when verified! -- rgerhards, 2009-05-26 */ -RUNLOG; if(pThis->bEnqOnly || pThis->bRunsDA) { RUNLOG; iRet = RS_RET_TERMINATE_NOW; @@ -2314,6 +2319,7 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g if(pThis->bRunsDA != 2) { InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); +//!!! TODO !!!das passiert wohl, wenn die queue empty wird! (aber es vorher noch nciht war) RUNLOG_VAR("%d", pThis->bRunsDA); RUNLOG_VAR("%d", pThis->pWtpDA->wtpState); qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */ @@ -2322,6 +2328,7 @@ RUNLOG_VAR("%d", pThis->pWtpDA->wtpState); dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ +RUNLOG; iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); @@ -2339,6 +2346,7 @@ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and C CODESTARTobjDestruct(qqueue) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ +RUNLOG_STR("XXX: queue destruct\n"); /* shut down all workers * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) @@ -2353,6 +2361,9 @@ CODESTARTobjDestruct(qqueue) * so and then destruct everything. -- rgerhards, 2009-05-26 */ //!!!! //CHKiRet(pThis->qUnDeqAll(pThis)); +dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + CHKiRet(pThis->qUnDeqAll(pThis)); +dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) { CHKiRet(DoSaveOnShutdown(pThis)); -- cgit v1.2.3 From fc3e56941ca6dbf401bee2f9dc0f9e4c5cd87f40 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 28 May 2009 11:57:30 +0200 Subject: fixing an issue during DA mode queue shutdown also changed DA queue mode in that the regular workers now run concurrently. --- runtime/queue.c | 108 ++++++++++++++++++++++++-------------------------------- 1 file changed, 46 insertions(+), 62 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 4405dd39..698495ef 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -72,6 +72,7 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -240,14 +241,15 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) if(getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk || pThis->bQueueStarted == 0) { wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ } + } + /* regular workers always run */ + if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; } else { - if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; - } else { - iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; - } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } +dbgprintf("YYY: wtp advise max reg workers %d\n", iMaxWorkers); + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } RETiRet; @@ -288,7 +290,7 @@ qqueueWaitDAModeInitialized(qqueue_t *pThis) * rgerhards, 2008-01-15 */ static rsRetVal -qqueueTurnOffDAMode(qqueue_t *pThis) +TurnOffDAMode(qqueue_t *pThis) { DEFiRet; @@ -299,9 +301,7 @@ RUNLOG_STR("XXX: TurnOffDAMode\n"); /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need * to wait for its startup... -- rgerhards, 2008-01-25 */ -RUNLOG; - qqueueWaitDAModeInitialized(pThis); -RUNLOG; + //TODO: MULTI del, can not happen (but verify first) qqueueWaitDAModeInitialized(pThis); /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to @@ -321,16 +321,13 @@ dbgprintf("XXX: getLogicalQueueSize(pThis->pqDA): %d\n", getLogicalQueueSize(pTh /* we destruct the queue object, which will also shutdown the queue worker. As the queue is empty, * this will be quick. */ - qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ +//XXX: TODO qqueueDestruct(&pThis->pqDA); /* and now we are ready to destruct the DA queue */ dbgoprint((obj_t*) pThis, "disk-assistance has been turned off, disk queue was empty (iRet %d)\n", iRet); - /* now we need to check if the regular queue has some messages. This may be the case - * when it is waiting that the high water mark is reached again. If so, we need to start up - * a regular worker. -- rgerhards, 2008-01-26 - */ - if(getLogicalQueueSize(pThis) > 0) { - qqueueAdviseMaxWorkers(pThis); - } + } else { + /* the queue has data again! */ + dbgprintf("DA queue has data during shutdown, restarting...\n"); + qqueueAdviseMaxWorkers(pThis->pqDA); } RETiRet; @@ -408,6 +405,10 @@ StartDA(qqueue_t *pThis) CHKiRet(qqueueSetiDeqtWinToHr(pThis->pqDA, pThis->iDeqtWinToHr)); CHKiRet(qqueueSetiHighWtrMrk(pThis->pqDA, 0)); CHKiRet(qqueueSetiDiscardMrk(pThis->pqDA, 0)); + + // experimental: XXX + CHKiRet(qqueueSettoWrkShutdown(pThis->pqDA, 0)); + if(pThis->toQShutdown == 0) { CHKiRet(qqueueSettoQShutdown(pThis->pqDA, 0)); /* if the user really wants... */ } else { @@ -423,14 +424,6 @@ StartDA(qqueue_t *pThis) if(iRet != RS_RET_OK && iRet != RS_RET_FILE_NOT_FOUND) FINALIZE; /* something is wrong */ - /* as we are right now starting DA mode because we are so busy, it is - * extremely unlikely that any regular worker is sleeping on empty queue. HOWEVER, - * we want to be on the safe side, and so we awake anyone that is waiting - * on one. So even if the scheduler plays badly with us, things should be - * quite well. -- rgerhards, 2008-01-15 - */ - wtpWakeupWrkr(pThis->pWtpReg); /* awake all workers, but not ourselves ;) */ - pThis->bRunsDA = 2; /* we are now in DA mode, but not fully initialized */ pThis->bChildIsDone = 0;/* set to 1 when child's worker detect queue is finished */ pthread_cond_broadcast(&pThis->condDAReady); /* signal we are now initialized and ready to go ;) */ @@ -481,8 +474,9 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wtp_t*)) qqueueIsIdleDA)); CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) StartDA)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) TurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); @@ -500,8 +494,7 @@ InitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) * that will also start one up. If we forgot that step, everything would be stalled * until the next enqueue request. */ - wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues alsways have just one worker max */ -RUNLOG_VAR("%d", pThis->bRunsDA); + wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* DA queues always have just one worker max */ finalize_it: END_MTX_PROTECTED_OPERATIONS(pThis->mut); @@ -516,7 +509,7 @@ finalize_it: * rgerhards, 2008-01-14 */ static inline rsRetVal -qqueueChkStrtDA(qqueue_t *pThis) +ChkStrtDA(qqueue_t *pThis) { DEFiRet; @@ -1538,7 +1531,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) ISOBJ_TYPE_assert(pThis, qqueue); -dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); +//dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); /* now send delete request to storage driver */ for(i = 0 ; i < nElem ; ++i) { pThis->qDel(pThis); @@ -1849,13 +1842,13 @@ finalize_it: * rgerhards, 2009-05-27 */ static rsRetVal -batchProcessedReg(qqueue_t *pThis, wti_t *pWti) +batchProcessed(qqueue_t *pThis, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); -dbgprintf("XXX: batchProcessedReg deletes %d records\n", pWti->batch.nElemDeq); +dbgprintf("XXX: batchProcessed deletes %d records\n", pWti->batch.nElemDeq); DeleteProcessedBatch(pThis, &pWti->batch); qqueueChkPersist(pThis, pWti->batch.nElemDeq); @@ -1940,7 +1933,6 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) if(pThis->bEnqOnly) { iRet = RS_RET_TERMINATE_WHEN_IDLE; -RUNLOG; } else { if(pThis->bRunsDA) { ASSERT(pThis->pqDA != NULL); @@ -1948,15 +1940,17 @@ RUNLOG; && pThis->pqDA->sizeOnDiskMax > 0 && pThis->pqDA->tVars.disk.sizeOnDisk > pThis->pqDA->sizeOnDiskMax) { /* this queue can never grow, so we can give up... */ -RUNLOG; iRet = RS_RET_TERMINATE_NOW; } else if(getPhysicalQueueSize(pThis) < pThis->iHighWtrMrk && pThis->bQueueStarted == 1) { -dbgprintf("XXX: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); +dbgprintf("XXX: terminate_NOW DA worker: queue size %d, high water mark %d\n", getPhysicalQueueSize(pThis), pThis->iHighWtrMrk); iRet = RS_RET_TERMINATE_NOW; +RUNLOG_STR("XXX: re-start reg worker"); +qqueueAdviseMaxWorkers(pThis); +RUNLOG_STR("XXX: done re-start reg worker"); } } else { -RUNLOG; - iRet = RS_RET_TERMINATE_NOW; + // experimental iRet = RS_RET_TERMINATE_NOW; + ; } } @@ -1979,11 +1973,11 @@ ChkStopWrkrReg(qqueue_t *pThis) return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); * TODO: remove when verified! -- rgerhards, 2009-05-26 */ - if(pThis->bEnqOnly || pThis->bRunsDA) { -RUNLOG; + // TODO: DEL - we now keep the workers running! if(pThis->bEnqOnly || pThis->bRunsDA) { + if(pThis->bEnqOnly) { +dbgprintf("XXX: terminate_NOW queue:Reg worker: enqOnly! queue size %d\n", getPhysicalQueueSize(pThis)); iRet = RS_RET_TERMINATE_NOW; } else if(pThis->pqParent != NULL) { -RUNLOG; iRet = RS_RET_TERMINATE_WHEN_IDLE; } @@ -2000,35 +1994,27 @@ GetDeqBatchSize(qqueue_t *pThis, int *pVal) DEFiRet; assert(pVal != NULL); *pVal = pThis->iDeqBatchSize; +if(pThis->pqParent != NULL) + *pVal = 16; RETiRet; } /* must only be called when the queue mutex is locked, else results - * are not stable! DA queue version + * are not stable! DA worker version (pThis *is* the *main* queue, not DA!) */ static int qqueueIsIdleDA(qqueue_t *pThis) { - /* remember: iQueueSize is the DA queue size, not the main queue! */ - /* TODO: I think we need just a single function for DA and non-DA mode - but I leave it for now as is */ - return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk)); + return(getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk); } /* must only be called when the queue mutex is locked, else results - * are not stable! Regular queue version + * are not stable! Regular worker version. */ static int IsIdleReg(qqueue_t *pThis) { -#if 0 /* enable for performance testing */ - int ret; - ret = getLogicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getLogicalQueueSize(pThis) <= pThis->iLowWtrMrk); - if(ret) fprintf(stderr, "queue is idle\n"); - return ret; -#else - /* regular code! */ - return(getPhysicalQueueSize(pThis) == 0 || (pThis->bRunsDA && getPhysicalQueueSize(pThis) <= pThis->iLowWtrMrk)); -#endif + return(getPhysicalQueueSize(pThis) == 0); } @@ -2084,7 +2070,8 @@ RegOnWrkrStartup(qqueue_t *pThis) /* start up the queue - it must have been constructed and parameters defined * before. */ -rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ +rsRetVal +qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; rsRetVal iRetLocal; @@ -2141,7 +2128,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wtp_t*)) IsIdleReg)); CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); - CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessedReg)); + CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); @@ -2168,7 +2155,7 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ } } - if(!bInitialized) { + if(Debug && !bInitialized) { dbgoprint((obj_t*) pThis, "queue starts up without (loading) any DA disk state (this is normal for the DA " "queue itself!)\n"); } @@ -2507,7 +2494,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) - CHKiRet(qqueueChkStrtDA(pThis)); + CHKiRet(ChkStrtDA(pThis)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2611,16 +2598,13 @@ SetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex) dbgoprint((obj_t*) pThis, "switching to enqueue-only mode, terminating all worker threads\n"); if(pThis->pWtpReg != NULL) wtpWakeupAllWrkr(pThis->pWtpReg); -RUNLOG; if(pThis->pWtpDA != NULL) wtpWakeupAllWrkr(pThis->pWtpDA); -RUNLOG; } else { /* switch back to regular mode */ ABORT_FINALIZE(RS_RET_NOT_IMPLEMENTED); /* we don't need this so far... */ } } -RUNLOG; pThis->bEnqOnly = bEnqOnly; -- cgit v1.2.3 From 13d4a23e92996e24d6a833ca75d06428c5387aa4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 28 May 2009 14:24:37 +0200 Subject: some more fixes for queue engine The enhanced testbench now runs without failures, again --- runtime/queue.c | 67 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 37 insertions(+), 30 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 698495ef..57385056 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -54,6 +54,7 @@ #include "wtp.h" #include "wti.h" #include "atomic.h" +#include "msg.h" /* TODO: remove one we removed MsgAddRef() call */ #ifdef OS_SOLARIS # include @@ -248,7 +249,6 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis) } else { iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } -dbgprintf("YYY: wtp advise max reg workers %d\n", iMaxWorkers); wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */ } @@ -294,7 +294,6 @@ TurnOffDAMode(qqueue_t *pThis) { DEFiRet; -RUNLOG_STR("XXX: TurnOffDAMode\n"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); @@ -720,6 +719,7 @@ static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) DEFiRet; pEntry = pThis->tVars.linklist.pDeqRoot; + ISOBJ_TYPE_assert(pEntry->pUsr, msg); *ppUsr = pEntry->pUsr; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; @@ -1137,7 +1137,7 @@ finalize_it: /* generic code to dequeue a queue entry */ static rsRetVal -qqueueDeq(qqueue_t *pThis, void *pUsr) +qqueueDeq(qqueue_t *pThis, void **ppUsr) { DEFiRet; @@ -1148,7 +1148,7 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) * If we decrement, however, we may lose a message. But that is better than * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ - iRet = pThis->qDeq(pThis, pUsr); + iRet = pThis->qDeq(pThis, ppUsr); ATOMIC_INC(pThis->nLogDeq); // dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", @@ -1162,6 +1162,8 @@ qqueueDeq(qqueue_t *pThis, void *pUsr) * Both the regular and DA queue (if it exists) is waited for, but on the same timeout. * After this function returns, the workers must either be finished or some force * to finish them must be applied. + * This function also instructs the DA worker pool (if it exists) to terminate. This is done + * in preparation of final queue shutdown. * rgerhards, 2009-05-27 */ static rsRetVal @@ -1175,7 +1177,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ + BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); /* some workers may be running in parallel! */ if(getPhysicalQueueSize(pThis) > 0) { if(pThis->bRunsDA) { /* We may have waited on the low water mark. As it may have changed, we @@ -1184,7 +1186,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) wtpAdviseMaxWorkers(pThis->pWtpDA, 1); } } - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + END_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); /* at this stage, we need to have the DA worker properly initialized and running (if there is one) */ if(pThis->bRunsDA) { @@ -1217,9 +1219,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } /* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */ - BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */ if(pThis->bRunsDA) { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n", qqueueGetID(pThis->pqDA)); /* we use the same absolute timeout as above, so we do not use more than the configured @@ -1232,8 +1232,16 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis) } else { dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n"); } - } else { - END_MTX_PROTECTED_OPERATIONS(pThis->mut); + /* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting + * the queue, it is restarted at a later stage. We don't care here if a timeout happens. + */ + dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n"); + iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout); + if(iRetLocal == RS_RET_TIMED_OUT) { + dbgoprint((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n"); + } else { + dbgoprint((obj_t*) pThis, "main queue DA worker pool shut down.\n"); + } } RETiRet; @@ -1335,6 +1343,14 @@ cancelWorkers(qqueue_t *pThis) dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker " "threads, continuing, but results are unpredictable\n", iRetLocal); } + + /* finally, we cancel the main queue's DA worker pool, if it still is running. It may be + * restarted later to persist the queue. But we stop it, because otherwise we get into + * big trouble when resetting the logical dequeue pointer. This operation can only be + * done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28 + */ + dbgoprint((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n"); + iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */ } RETiRet; @@ -1600,23 +1616,15 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) ISOBJ_TYPE_assert(pThis, qqueue); assert(pBatch != NULL); -// TODO: ULTRA: lock qaueue mutex if instructed to do so - /* if the queue runs in DA mode, the DA worker already deleted the in-memory representation - * of the message. But in regular mode, we need to do it ourselfs. We differentiate between - * the two cases, because it is actually the easiest way to handle the destruct-Problem in - * a simple and pUsrp-Type agnostic way (else we would need an objAddRef() generic function). - */ - if(!pThis->bRunsDA) { - for(i = 0 ; i < pBatch->nElem ; ++i) { - pUsr = pBatch->pElem[i].pUsrp; - objDestruct(pUsr); - } + for(i = 0 ; i < pBatch->nElem ; ++i) { + pUsr = pBatch->pElem[i].pUsrp; + objDestruct(pUsr); } iRet = DeleteBatchFromQStore(pThis, pBatch); - pBatch->nElem = 0; /* reset batch */ + pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */ RETiRet; } @@ -1908,8 +1916,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave)); /* iterate over returned results and enqueue them in DA queue */ - for(i = 0 ; i < pWti->batch.nElem ; i++) - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp)); + for(i = 0 ; i < pWti->batch.nElem ; i++) { + /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs + * the message. So far, we simply assume we always have msg_t, what currently is always the case. + * rgerhards, 2009-05-28 + */ + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + } finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -2306,16 +2319,12 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g if(pThis->bRunsDA != 2) { InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); -//!!! TODO !!!das passiert wohl, wenn die queue empty wird! (aber es vorher noch nciht war) -RUNLOG_VAR("%d", pThis->bRunsDA); -RUNLOG_VAR("%d", pThis->pWtpDA->wtpState); qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */ } /* make sure we do not timeout before we are done */ dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n"); timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL); /* and run the primary queue's DA worker to drain the queue */ -RUNLOG; iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout); dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n", iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); @@ -2333,7 +2342,6 @@ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and C CODESTARTobjDestruct(qqueue) pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */ -RUNLOG_STR("XXX: queue destruct\n"); /* shut down all workers * We do not need to shutdown workers when we are in enqueue-only mode or we are a * direct queue - because in both cases we have none... ;) @@ -2347,7 +2355,6 @@ RUNLOG_STR("XXX: queue destruct\n"); * we need to reset the logical dequeue pointer, persist the queue if configured to do * so and then destruct everything. -- rgerhards, 2009-05-26 */ -//!!!! //CHKiRet(pThis->qUnDeqAll(pThis)); dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); CHKiRet(pThis->qUnDeqAll(pThis)); dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); -- cgit v1.2.3 From cbbf1586a204a897a050b6cb294b120e3ff5f23c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 28 May 2009 15:57:39 +0200 Subject: some cleanup & fix make distcheck --- runtime/queue.c | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 57385056..23d60ddc 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -54,7 +54,7 @@ #include "wtp.h" #include "wti.h" #include "atomic.h" -#include "msg.h" /* TODO: remove one we removed MsgAddRef() call */ +#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS # include @@ -297,11 +297,6 @@ TurnOffDAMode(qqueue_t *pThis) ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->bRunsDA); - /* at this point, we need a fully initialized DA queue. So if it isn't, we finally need - * to wait for its startup... -- rgerhards, 2008-01-25 - */ - //TODO: MULTI del, can not happen (but verify first) qqueueWaitDAModeInitialized(pThis); - /* if we need to pull any data that we still need from the (child) disk queue, * now would be the time to do so. At present, we do not need this, but I'd like to * keep that comment if future need arises. @@ -865,8 +860,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); - /* we now need to take care of the Deq handle. It is not persisted, so we can create - * a virgin copy based on pReadDel. // TODO duplicat code, same as blow - single function! + /* create a duplicate for the read "pointer". */ CHKiRet(strmDup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq)); @@ -1712,7 +1706,7 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); } - // TODO: MULTI: check physical queue size! + // TODO: MULTI: check physical queue size? pthread_cond_signal(&pThis->notFull); d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); @@ -1982,11 +1976,6 @@ static rsRetVal ChkStopWrkrReg(qqueue_t *pThis) { DEFiRet; - /* original condition - return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && getPhysicalQueueSize(pThis) == 0); - * TODO: remove when verified! -- rgerhards, 2009-05-26 - */ - // TODO: DEL - we now keep the workers running! if(pThis->bEnqOnly || pThis->bRunsDA) { if(pThis->bEnqOnly) { dbgprintf("XXX: terminate_NOW queue:Reg worker: enqOnly! queue size %d\n", getPhysicalQueueSize(pThis)); iRet = RS_RET_TERMINATE_NOW; -- cgit v1.2.3 From 236bdb5cfaf56d9192a9911f140ba3da95b390b6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 22 Jun 2009 16:06:29 +0200 Subject: bugfix: huge memory leak in queue engine (made rsyslogd unusable in production). Occured if at least one queue was in direct mode (the default for action queues). --- runtime/queue.c | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index e18ce3d7..9462f865 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1089,6 +1089,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; iRet = pThis->pConsumer(pThis->pUsr, &singleBatch); + objDestruct(pUsr); RETiRet; } -- cgit v1.2.3 From d61bcbbd0e99a8ffadfe1e0c5347dee9e04a0b1d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 22 Jun 2009 18:32:21 +0200 Subject: adapted (and improved) input batching to v5 engine --- runtime/queue.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 9462f865..1ae386e7 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2635,7 +2635,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* and finally enqueue the message */ CHKiRet(qqueueAdd(pThis, pUsr)); - qqueueChkPersist(pThis, 1); // TODO: optimize, do in outer function! (but we need parts from v5?) finalize_it: RETiRet; @@ -2669,6 +2668,8 @@ dbgprintf("queueMultiEnq: %d\n", i); CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); } + qqueueChkPersist(pThis, pMultiSub->nElem); + finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { /* make sure at least one worker is running. */ -- cgit v1.2.3 From 46024834449840dabf399dda196c9dd11cf78ace Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 25 Jun 2009 11:06:42 +0200 Subject: added a few atomic operations mostly to get thread debugger errors clean (plus, of course, it makes things more deterministic) --- runtime/queue.c | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 1ae386e7..5102b0df 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1649,6 +1649,10 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDeleted = pWti->batch.nElemDeq; DeleteProcessedBatch(pThis, &pWti->batch); +//int iii = pthread_mutex_trylock(pThis->mut); +//char errStr[1024]; +//rs_strerror_r(iii, errStr, sizeof(errStr)); +//dbgprintf("DequeueConsumableElemnts mutex locked: %d (16 is EBUSY = OK): %s\n", iii, errStr); nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { //dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); @@ -2473,15 +2477,6 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) ISOBJ_TYPE_assert(pThis, qqueue); - /* first check if we need to discard this message (which will cause CHKiRet() to exit) - * rgerhards, 2008-10-07: It is OK to do this outside of mutex protection. The queue size - * and bRunsDA parameters may not reflect the correct settings here, but they are - * "good enough" in the sense that they can be used to drive the decision. Valgrind's - * threading tools may point this access to be an error, but this is done - * intentional. I do not see this causes problems to us. - */ - CHKiRet(qqueueChkDiscardMsg(pThis, getPhysicalQueueSize(pThis), pThis->bRunsDA, pUsr)); - /* Please note that this function is not cancel-safe and consequently * sets the calling thread's cancelibility state to PTHREAD_CANCEL_DISABLE * during its execution. If that is not done, race conditions occur if the @@ -2493,6 +2488,10 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) d_pthread_mutex_lock(pThis->mut); } + /* first check if we need to discard this message (which will cause CHKiRet() to exit) + */ + CHKiRet(qqueueChkDiscardMsg(pThis, getPhysicalQueueSize(pThis), pThis->bRunsDA, pUsr)); + /* then check if we need to add an assistance disk queue */ if(pThis->bIsDA) CHKiRet(ChkStrtDA(pThis)); -- cgit v1.2.3 From 4818b0081d3a265a87f9f646d79f2a2ffbcda819 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 25 Jun 2009 15:21:29 +0200 Subject: bugfix: subtle synchronization issue This may have caused a segfault under strange circumstances (but if we just run long enough with a high enough message volume, even the strangest circumstances will occur...) --- runtime/queue.c | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 5102b0df..7438fbaa 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -212,7 +212,7 @@ static inline void queueDrain(qqueue_t *pThis) BEGINfunc dbgoprint((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - while(pThis->iQueueSize-- > 0) { + while(ATOMIC_DEC_AND_FETCH(pThis->iQueueSize) > 0) { pThis->qDeq(pThis, &pUsr); if(pUsr != NULL) { objDestruct(pUsr); @@ -1547,15 +1547,14 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) ISOBJ_TYPE_assert(pThis, qqueue); -//dbgprintf("pre delete batch from store, new sizes: log %d, phys %d, nElem %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), nElem); /* now send delete request to storage driver */ for(i = 0 ; i < nElem ; ++i) { pThis->qDel(pThis); } /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ - pThis->iQueueSize -= nElem; - pThis->nLogDeq -= nElem; + ATOMIC_SUB(pThis->iQueueSize, nElem); + ATOMIC_SUB(pThis->nLogDeq, nElem); dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ @@ -1649,13 +1648,9 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDeleted = pWti->batch.nElemDeq; DeleteProcessedBatch(pThis, &pWti->batch); -//int iii = pthread_mutex_trylock(pThis->mut); -//char errStr[1024]; -//rs_strerror_r(iii, errStr, sizeof(errStr)); -//dbgprintf("DequeueConsumableElemnts mutex locked: %d (16 is EBUSY = OK): %s\n", iii, errStr); nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { -//dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); +dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); CHKiRet(qqueueDeq(pThis, &pUsr)); /* check if we should discard this element */ -- cgit v1.2.3