diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/queue.c | 239 | ||||
-rw-r--r-- | runtime/queue.h | 14 | ||||
-rw-r--r-- | runtime/rsyslog.h | 1 | ||||
-rw-r--r-- | runtime/wti.c | 17 | ||||
-rw-r--r-- | runtime/wti.h | 16 | ||||
-rw-r--r-- | runtime/wtp.c | 4 | ||||
-rw-r--r-- | runtime/wtp.h | 4 |
7 files changed, 164 insertions, 131 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 4e017e84..f3d3fe71 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -8,7 +8,11 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static + * function names - this makes it really hard to read and does not provide much + * benefit, at least I (now) think so... + * + * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -63,12 +67,13 @@ DEFobjCurrIf(glbl) /* forward-definitions */ rsRetVal qqueueChkPersist(qqueue_t *pThis); static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex); -static rsRetVal qqueueRateLimiter(qqueue_t *pThis); +static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); +static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static int qqueueIsIdleDA(qqueue_t *pThis); -static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); -static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2); -static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); +static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave); +static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2); +static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -369,9 +374,11 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex) CHKiRet(wtpConstruct (&pThis->pWtpDA)); CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf)); CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA)); + // MULTIQUEUE: TODO: this should be DA-specific! + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup)); CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA)); CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); @@ -607,28 +614,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) { DEFiRet; - iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr); -#if 0 - qLinkedList_t *pEntry; - - ASSERT(pThis != NULL); - if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } - - pEntry->pNext = NULL; - pEntry->pUsr = pUsr; - - if(pThis->tVars.linklist.pRoot == NULL) { - pThis->tVars.linklist.pRoot = pThis->tVars.linklist.pLast = pEntry; - } else { - pThis->tVars.linklist.pLast->pNext = pEntry; - pThis->tVars.linklist.pLast = pEntry; - } - -finalize_it: -#endif RETiRet; } @@ -636,24 +622,6 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr); -#if 0 - qLinkedList_t *pEntry; - - ASSERT(pThis != NULL); - ASSERT(pThis->tVars.linklist.pRoot != NULL); - - pEntry = pThis->tVars.linklist.pRoot; - *ppUsr = pEntry->pUsr; - - if(pThis->tVars.linklist.pRoot == pThis->tVars.linklist.pLast) { - pThis->tVars.linklist.pRoot = NULL; - pThis->tVars.linklist.pLast = NULL; - } else { - pThis->tVars.linklist.pRoot = pEntry->pNext; - } - free(pEntry); - -#endif RETiRet; } @@ -760,7 +728,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) while(iUngottenObjs > 0) { /* fill the queue from disk */ CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL)); - qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); + UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED); --iUngottenObjs; /* one less */ } @@ -931,6 +899,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) { + aUsrp_t aUsrp; + obj_t *pMsgp; DEFiRet; ASSERT(pThis != NULL); @@ -940,8 +910,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) * mode the consumer probably has a lot to convey (which get's lost in the other modes * because they are asynchronous. But direct mode is deliberately synchronous. * rgerhards, 2008-02-12 + * We use our knowledge about the aUsrp_t structure below, but without that, we + * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pUsr, pUsr); + pMsgp = (obj_t*) pUsr; + aUsrp.nElem = 1; /* there always is only one in direct mode */ + aUsrp.pUsrp = &pMsgp; + iRet = pThis->pConsumer(pThis->pUsr, &aUsrp); RETiRet; } @@ -961,7 +936,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute_ * rgerhards, 2008-01-20 */ static rsRetVal -qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) +UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) { DEFiRet; DEFVARS_mutexProtection; @@ -991,7 +966,7 @@ qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex) * rgerhards, 2008-01-29 */ static rsRetVal -qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) +GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr) { DEFiRet; @@ -1047,7 +1022,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr) * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ if(pThis->iUngottenObjs > 0) { - iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr); + iRet = GetUngottenObj(pThis, (obj_t**) pUsr); } else { iRet = pThis->qDel(pThis, pUsr); ATOMIC_DEC(pThis->iQueueSize); @@ -1275,7 +1250,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis) * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)) { DEFiRet; qqueue_t *pThis; @@ -1305,6 +1280,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */ + pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */ pThis->pszFilePrefix = NULL; pThis->qType = qType; @@ -1354,7 +1330,7 @@ finalize_it: * rgerhards, 2008-01-16 */ static rsRetVal -qqueueConsumerCancelCleanup(void *arg1, void *arg2) +ConsumerCancelCleanup(void *arg1, void *arg2) { DEFiRet; @@ -1366,7 +1342,7 @@ qqueueConsumerCancelCleanup(void *arg1, void *arg2) if(pUsr != NULL) { /* make sure the data element is not lost */ dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n"); - CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX)); + CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX)); } finalize_it: @@ -1415,38 +1391,68 @@ finalize_it: } +/* dequeue as many user points as are available, until we hit the configured + * upper limit of pointers. + * This must only be called when the queue mutex is LOOKED, otherwise serious + * malfunction will happen. + */ +static inline rsRetVal +DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *iRemainingQueueSize) +{ + int nDequeued; + int iQueueSize; + void *pUsr; + rsRetVal localRet; + DEFiRet; + + nDequeued = 0; + do { +dbgprintf("DequeueConsumableElements, index %d\n", nDequeued); + CHKiRet(qqueueDel(pThis, &pUsr)); + qqueueChkPersist(pThis); /* is is questionable if we should really need to call this every time... */ + iQueueSize = qqueueGetOverallQueueSize(pThis); + + /* check if we should discard this element */ + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr); + if(localRet == RS_RET_QUEUE_FULL) + continue; + else if(localRet != RS_RET_OK) + ABORT_FINALIZE(localRet); + + /* all well, use this element */ + pWti->paUsrp->pUsrp[nDequeued++] = pUsr; + } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize); + + //bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ + pWti->paUsrp->nElem = nDequeued; + *iRemainingQueueSize = iQueueSize; + +finalize_it: + RETiRet; +} + + /* dequeue the queued object for the queue consumers. * rgerhards, 2008-10-21 + * I made a radical change - we now dequeue multiple elements, and store these objects in + * an array of user pointers. We expect that this increases performance. + * rgerhards, 2009-04-22 */ static rsRetVal -qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; - void *pUsr; - int iQueueSize; - int bRunsDA; /* cache for early mutex release */ + int iQueueSize = 0; /* keep the compiler happy... */ - /* dequeue element (still protected from mutex) */ - iRet = qqueueDel(pThis, &pUsr); - qqueueChkPersist(pThis); - iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */ - bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */ - - /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY - * if we could successfully obtain a user pointer. Otherwise, we would bring the - * cancel cleanup handler into big troubles (and we did ;)). Note that we can - * NOT set the variable further below, as this may lead to an object leak. We - * may get cancelled before we reach that part of the code, so the only - * solution is to do it here. -- rgerhards, 2008-02-27 - */ - if(iRet == RS_RET_OK) { - pWti->pUsrp = pUsr; - } + /* dequeue element batch (still protected from mutex) */ + iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize); /* awake some flow-controlled sources if we can do this right now */ /* TODO: this could be done better from a performance point of view -- do it only if * we have someone waiting for the condition (or only when we hit the watermark right * on the nail [exact value]) -- rgerhards, 2008-03-14 + * now that we dequeue batches of pointers, this is much less an issue... + * rgerhards, 2009-04-22 */ if(iQueueSize < pThis->iFullDlyMrk) { pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk); @@ -1456,37 +1462,16 @@ qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); } - /* rgerhards, 2008-09-30: I reversed the order of cond_signal und mutex_unlock - * as of the pthreads recommendation on predictable scheduling behaviour. I don't see - * any problems caused by this, but I add this comment in case some will be seen - * in the next time. - */ pthread_cond_signal(&pThis->notFull); d_pthread_mutex_unlock(pThis->mut); pthread_setcancelstate(iCancelStateSave, NULL); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ - /* do actual processing (the lengthy part, runs in parallel) - * If we had a problem while dequeing, we do not call the consumer, - * but we otherwise ignore it. This is in the hopes that it will be - * self-healing. However, this is really not a good thing. - * rgerhards, 2008-01-03 - */ - if(iRet != RS_RET_OK) - FINALIZE; - - /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue. - * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to - * provide real-time creation of spool files. - * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong. - */ - CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr)); - -finalize_it: if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) { dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things " "may happen\n", iRet); } + RETiRet; } @@ -1529,7 +1514,7 @@ finalize_it: * but you get the idea from the code above. */ static rsRetVal -qqueueRateLimiter(qqueue_t *pThis) +RateLimiter(qqueue_t *pThis) { DEFiRet; int iDelay; @@ -1592,19 +1577,20 @@ qqueueRateLimiter(qqueue_t *pThis) * rgerhards, 2008-01-21 */ static rsRetVal -qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp)); + CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); + CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 */ +//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that... if(pThis->iDeqSlowdown) { dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n", pThis->iDeqSlowdown); @@ -1616,7 +1602,7 @@ finalize_it: } -/* This is a special consumer to feed the disk-queue in disk-assited mode. +/* This is a special consumer to feed the disk-queue in disk-assisted mode. * When active, our own queue more or less acts as a memory buffer to the disk. * So this consumer just needs to drain the memory queue and submit entries * to the disk queue. The disk queue will then call the actual consumer from @@ -1626,15 +1612,18 @@ finalize_it: * rgerhards, 2008-01-14 */ static rsRetVal -qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) +ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave) { + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); ISOBJ_TYPE_assert(pWti, wti); - CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave)); - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp)); + CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave)); + /* iterate over returned results and enqueue them in DA queue */ + for(i = 0 ; i < pWti->paUsrp->nElem ; i++) + CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i])); finalize_it: dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); @@ -1692,12 +1681,24 @@ qqueueChkStopWrkrDA(qqueue_t *pThis) * the DA queue */ static int -qqueueChkStopWrkrReg(qqueue_t *pThis) +ChkStooWrkrReg(qqueue_t *pThis) { return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0); } +/* return the configured "deq max at once" interval + * rgerhards, 2009-04-22 + */ +static rsRetVal +GetDeqBatchSize(qqueue_t *pThis, int *pVal) +{ + DEFiRet; + assert(pVal != NULL); + *pVal = pThis->iDeqBatchSize; + RETiRet; +} + /* must only be called when the queue mutex is locked, else results * are not stable! DA queue version */ @@ -1712,7 +1713,7 @@ qqueueIsIdleDA(qqueue_t *pThis) * are not stable! Regular queue version */ static int -qqueueIsIdleReg(qqueue_t *pThis) +IsIdleReg(qqueue_t *pThis) { #if 0 /* enable for performance testing */ int ret; @@ -1740,7 +1741,7 @@ qqueueIsIdleReg(qqueue_t *pThis) * I am telling this, because I, too, always get confused by those... */ static rsRetVal -qqueueRegOnWrkrShutdown(qqueue_t *pThis) +RegOnWrkrShutdown(qqueue_t *pThis) { DEFiRet; @@ -1761,7 +1762,7 @@ qqueueRegOnWrkrShutdown(qqueue_t *pThis) * hook to indicate in the parent queue (if we are a child) that we are not done yet. */ static rsRetVal -qqueueRegOnWrkrStartup(qqueue_t *pThis) +RegOnWrkrStartup(qqueue_t *pThis) { DEFiRet; @@ -1815,10 +1816,10 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */ dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, " - "full delay %d, light delay %d starting\n", + "full delay %d, light delay %d, deq batch size %d starting\n", pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, - pThis->iFullDlyMrk, pThis->iLightDlyMrk); + pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize); if(pThis->qType == QUEUETYPE_DIRECT) FINALIZE; /* with direct queues, we are already finished... */ @@ -1829,13 +1830,14 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis)); CHKiRet(wtpConstruct (&pThis->pWtpReg)); CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf)); - CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter)); - CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg)); - CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg)); - CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg)); - CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup)); - CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup)); - CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown)); + CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter)); + CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg)); + CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize)); + CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg)); + CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg)); + CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup)); + CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup)); + CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); @@ -1945,7 +1947,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) * to the regular files. -- rgerhards, 2008-01-29 */ while(pThis->iUngottenObjs > 0) { - CHKiRet(qqueueGetUngottenObj(pThis, &pUsr)); + CHKiRet(GetUngottenObj(pThis, &pUsr)); CHKiRet((objSerialize(pUsr))(pUsr, psQIF)); objDestruct(pUsr); } @@ -2296,6 +2298,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) DEFpropSetMeth(qqueue, pUsr, void*) DEFpropSetMeth(qqueue, iDeqSlowdown, int) +DEFpropSetMeth(qqueue, iDeqBatchSize, int) DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) diff --git a/runtime/queue.h b/runtime/queue.h index a267862d..8a60254b 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -54,6 +54,7 @@ typedef struct qWrkThrd_s { pthread_mutex_t mut; } qWrkThrd_t; /* type for queue worker threads */ + /* the queue object */ typedef struct queue_s { BEGINobjInstance; @@ -84,6 +85,7 @@ typedef struct queue_s { int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ int toEnq; /* enqueue timeout */ + int iDeqBatchSize; /* max number of elements that shall be dequeued at once */ /* rate limiting settings (will be expanded) */ int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */ /* end rate limiting */ @@ -97,12 +99,11 @@ typedef struct queue_s { * applied to detect user configuration errors (and tell me how should we detect what * the user really wanted...). -- rgerhards, 2008-04-02 */ - /* ane dequeue time window */ - rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */ + /* end dequeue time window */ + rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the - * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer - * to message) - * rgerhards, 2008-01-28 + * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 + * is pointer to an array of message message pointers) */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -183,7 +184,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); @@ -201,6 +202,7 @@ PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); PROTOTYPEpropSetMeth(qqueue, pUsr, void*); PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64); +PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int); #define qqueueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index 026fbbed..74ea5270 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -60,6 +60,7 @@ /* define some base data types */ typedef unsigned char uchar;/* get rid of the unhandy "unsigned char" */ +typedef struct aUsrp_s aUsrp_t; typedef struct thrdInfo thrdInfo_t; typedef struct obj_s obj_t; typedef struct filed selector_t;/* TODO: this so far resides in syslogd.c, think about modularization */ diff --git a/runtime/wti.c b/runtime/wti.c index 544bffa7..df1ea0ed 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -201,6 +201,9 @@ CODESTARTobjDestruct(wti) pthread_cond_destroy(&pThis->condExitDone); pthread_mutex_destroy(&pThis->mut); + if(pThis->paUsrp != NULL) + free(pThis->paUsrp); + if(pThis->pszDbgHdr != NULL) free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -209,6 +212,7 @@ ENDobjDestruct(wti) /* Standard-Constructor for the wti object */ BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */ + pThis->bOptimizeUniProc = glbl.GetOptimizeUniProc(); pthread_cond_init(&pThis->condExitDone, NULL); pthread_mutex_init(&pThis->mut, NULL); @@ -222,15 +226,21 @@ rsRetVal wtiConstructFinalize(wti_t *pThis) { DEFiRet; + int iDeqBatchSize; ISOBJ_TYPE_assert(pThis, wti); dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); /* initialize our thread instance descriptor */ - pThis->pUsrp = NULL; pThis->tCurrCmd = eWRKTHRD_STOPPED; + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ + CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); + CHKmalloc(pThis->paUsrp = calloc(1, sizeof(aUsrp_t))); + CHKmalloc(pThis->paUsrp->pUsrp = calloc((size_t)iDeqBatchSize, sizeof(void*))); + +finalize_it: RETiRet; } @@ -314,7 +324,8 @@ wtiWorkerCancelCleanup(void *arg) DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis)); /* call user supplied handler (that one e.g. requeues the element) */ - pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp); +// MULTIQUEUE: need to change here! + pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->paUsrp->pUsrp[0]); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); d_pthread_mutex_lock(&pWtp->mut); @@ -366,7 +377,7 @@ wtiWorker(wti_t *pThis) ISOBJ_TYPE_assert(pWtp, wtp); dbgSetThrdName(pThis->pszDbgHdr); - pThis->pUsrp = NULL; + pThis->paUsrp->nElem = 0; /* flag no elements present */ // MULTIQUEUE: do we really need this any longer (cnacel handeler)? pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX); diff --git a/runtime/wti.h b/runtime/wti.h index 6b60b833..85c98fe6 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -1,6 +1,6 @@ /* Definition of the worker thread instance (wti) class. * - * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008, 2009 by Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -28,13 +28,25 @@ #include "wtp.h" #include "obj.h" +/* the user pointer array object + * This object is used to dequeue multiple user pointers which are than handed over + * to processing. The size of elements is fixed after queue creation, but may be + * modified by config variables (better said: queue properties). + * rgerhards, 2009-04-22 + */ +struct aUsrp_s { + int nElem; /* actual number of element in this entry */ + obj_t **pUsrp; /* actual elements (array!) */ +}; + + /* the worker thread instance class */ typedef struct wti_s { BEGINobjInstance; int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */ pthread_t thrdID; /* thread ID */ qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ - obj_t *pUsrp; /* pointer to an object meaningful for current user pointer (e.g. queue pUsr data elemt) */ + aUsrp_t *paUsrp; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ pthread_cond_t condExitDone; /* signaled when the thread exit is done (once per thread existance) */ pthread_mutex_t mut; diff --git a/runtime/wtp.c b/runtime/wtp.c index 04eb974f..8bb55cf7 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -88,6 +88,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); /* set all function pointers to "not implemented" dummy so that we can safely call them */ pThis->pfChkStopWrkr = NotImplementedDummy; + pThis->pfGetDeqBatchSize = NotImplementedDummy; pThis->pfIsIdle = NotImplementedDummy; pThis->pfDoWork = NotImplementedDummy; pThis->pfOnIdle = NotImplementedDummy; @@ -117,7 +118,7 @@ wtpConstructFinalize(wtp_t *pThis) */ if((pThis->pWrkr = malloc(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - + for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) { CHKiRet(wtiConstruct(&pThis->pWrkr[i])); pWti = pThis->pWrkr[i]; @@ -584,6 +585,7 @@ DEFpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t) DEFpropSetMethPTR(wtp, pcondBusy, pthread_cond_t) DEFpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)) +DEFpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)) DEFpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)) DEFpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)) DEFpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)) diff --git a/runtime/wtp.h b/runtime/wtp.h index b9cb07c5..88bd9197 100644 --- a/runtime/wtp.h +++ b/runtime/wtp.h @@ -67,10 +67,11 @@ typedef struct wtp_s { int bThrdStateChanged; /* at least one thread state has changed if 1 */ /* end sync variables */ /* user objects */ - void *pUsr; /* pointer to user object */ + void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */ pthread_mutex_t *pmutUsr; pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */ rsRetVal (*pfChkStopWrkr)(void *pUsr, int); + rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */ rsRetVal (*pfRateLimiter)(void *pUsr); rsRetVal (*pfIsIdle)(void *pUsr, int); rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int); @@ -104,6 +105,7 @@ int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex); PROTOTYPEObjClassInit(wtp); PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*)); +PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*)); PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int)); PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int)); PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int)); |