From de2fd07836accab563ecbf0405749b6bef9e76b4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 19 Oct 2012 08:45:04 +0200 Subject: cosmetic: get rid of compiler warning on currently unused debug code --- runtime/queue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 0cd33701..fbf77108 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -127,7 +127,7 @@ static struct cnfparamblk pblk = }; /* debug aid */ -static void displayBatchState(batch_t *pBatch) +static inline void displayBatchState(batch_t *pBatch) { int i; for(i = 0 ; i < pBatch->nElem ; ++i) { -- cgit v1.2.3 From 40fffde2b6a36ba12388b89d422104c258a667f7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 2 Nov 2012 18:55:53 +0100 Subject: generate disk .qi file once at queue construction ... instead of each time a file write happens. In some situations (very frequent sync), this can probably be a big performane win. --- runtime/queue.c | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index fbf77108..f5bb841f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -744,18 +744,12 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) { DEFiRet; strm_t *psQIF = NULL; - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; struct stat stat_buf; ISOBJ_TYPE_assert(pThis, qqueue); - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - /* check if the file exists */ - if(stat((char*) pszQIFNam, &stat_buf) == -1) { + if(stat((char*) pThis->pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { DBGOPRINT((obj_t*) pThis, "clean startup, no .qi file found\n"); ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); @@ -770,7 +764,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) CHKiRet(strm.Construct(&psQIF)); CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_READ)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); /* first, we try to read the property bag for ourselfs */ @@ -1985,6 +1979,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; uchar pszBuf[64]; + uchar pszQIFNam[MAXFNAME]; int wrk; uchar *qName; size_t lenBuf; @@ -2020,6 +2015,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->MultiEnq = qqueueMultiEnqObjNonDirect; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ + /* pre-construct file name for .qi file */ + pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), + "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); + pThis->pszQIFNam = ustrdup(pszQIFNam); + DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam, + (int) pThis->lenQIFNam); break; case QUEUETYPE_DIRECT: pThis->qConstruct = qConstructDirect; @@ -2157,8 +2158,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) { DEFiRet; strm_t *psQIF = NULL; /* Queue Info File */ - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; ASSERT(pThis != NULL); @@ -2176,13 +2175,9 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) DBGOPRINT((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis)); - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) { if(pThis->bNeedDelQIF) { - unlink((char*)pszQIFNam); + unlink((char*)pThis->pszQIFNam); pThis->bNeedDelQIF = 0; } /* indicate spool file needs to be deleted */ @@ -2195,7 +2190,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC)); CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); /* first, write the property bag for ourselfs -- cgit v1.2.3 From feddb2cc8fa213725a14e556d0366aef8d3a4339 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 11:40:13 +0100 Subject: queue: change gerenic queue pUsr ptr to be action_t this was always action_t, but the initial design was more generic. We are making it specific now in order to gain better performance (after all, we did not need the generic engine in the past 8 years...) --- runtime/queue.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index f5bb841f..eaf33e00 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -411,7 +411,7 @@ StartDA(qqueue_t *pThis) */ pThis->pqDA->pqParent = pThis; - CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(qqueueSetpAction(pThis->pqDA, pThis->pAction)); CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); @@ -996,7 +996,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); @@ -1023,7 +1023,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, pBatch, &pThis->bShutdownImmediate); RETiRet; } @@ -1835,7 +1835,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* at this spot, we may be cancelled */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2759,7 +2759,7 @@ DEFpropSetMeth(qqueue, iLightDlyMrk, int) DEFpropSetMeth(qqueue, bIsDA, int) DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) -DEFpropSetMeth(qqueue, pUsr, void*) +DEFpropSetMeth(qqueue, pAction, action_t*) DEFpropSetMeth(qqueue, iDeqSlowdown, int) DEFpropSetMeth(qqueue, iDeqBatchSize, int) DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) -- cgit v1.2.3 From c55e0a5a06e69106bc346057dd61dcb98688a4aa Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 12:32:50 +0100 Subject: queue: change generic msg ptr (pUsr) to be of msg_t type --- runtime/queue.c | 59 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 29 insertions(+), 30 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index eaf33e00..6e40b0c9 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -74,7 +74,7 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(statsobj) /* forward-definitions */ -static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); +static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -83,7 +83,7 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); @@ -312,7 +312,7 @@ getLogicalQueueSize(qqueue_t *pThis) */ static inline void queueDrain(qqueue_t *pThis) { - void *pUsr; + msg_t *pUsr; ASSERT(pThis != NULL); BEGINfunc @@ -546,7 +546,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis) } -static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) +static rsRetVal qAddFixedArray(qqueue_t *pThis, msg_t* in) { DEFiRet; @@ -560,7 +560,7 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) } -static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) +static rsRetVal qDeqFixedArray(qqueue_t *pThis, msg_t **out) { DEFiRet; @@ -621,7 +621,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) RETiRet; } -static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr) { qLinkedList_t *pEntry; DEFiRet; @@ -629,7 +629,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; - pEntry->pUsr = pUsr; + pEntry->pMsg = pUsr; if(pThis->tVars.linklist.pDelRoot == NULL) { pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry; @@ -647,14 +647,14 @@ finalize_it: } -static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) +static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppUsr) { qLinkedList_t *pEntry; DEFiRet; pEntry = pThis->tVars.linklist.pDeqRoot; ISOBJ_TYPE_assert(pEntry->pUsr, msg); - *ppUsr = pEntry->pUsr; + *ppUsr = pEntry->pMsg; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; RETiRet; @@ -889,7 +889,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) RETiRet; } -static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) { DEFiRet; number_t nWriteCount; @@ -917,7 +917,7 @@ finalize_it: } -static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr) +static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppUsr) { DEFiRet; iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); @@ -970,7 +970,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) { batch_t singleBatch; batch_obj_t batchObj; @@ -992,7 +992,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); batchObj.state = BATCH_STATE_RDY; - batchObj.pUsrp = (obj_t*) pUsr; + batchObj.pMsg = pUsr; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.active = &active; @@ -1044,7 +1044,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -qqueueAdd(qqueue_t *pThis, void *pUsr) +qqueueAdd(qqueue_t *pThis, msg_t *pUsr) { DEFiRet; @@ -1066,7 +1066,7 @@ finalize_it: /* generic code to dequeue a queue entry */ static rsRetVal -qqueueDeq(qqueue_t *pThis, void **ppUsr) +qqueueDeq(qqueue_t *pThis, msg_t **ppUsr) { DEFiRet; @@ -1420,14 +1420,13 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) * the return state! * rgerhards, 2008-01-24 */ -static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr) { DEFiRet; rsRetVal iRetLocal; int iSeverity; ISOBJ_TYPE_assert(pThis, qqueue); - ISOBJ_assert(pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); @@ -1521,7 +1520,7 @@ static inline rsRetVal DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; - void *pUsr; + msg_t *pUsr; int nEnqueued = 0; rsRetVal localRet; DEFiRet; @@ -1530,11 +1529,10 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { - pUsr = pBatch->pElem[i].pUsrp; + pUsr = pBatch->pElem[i].pMsg; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { - localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, - (obj_t*)MsgAddRef((msg_t*) pUsr)); + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pUsr)); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); @@ -1572,7 +1570,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz int nDiscarded; int nDeleted; int iQueueSize; - void *pUsr; + msg_t *pUsr; rsRetVal localRet; DEFiRet; @@ -1593,7 +1591,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz } /* all well, use this element */ - pWti->batch.pElem[nDequeued].pUsrp = pUsr; + pWti->batch.pElem[nDequeued].pMsg = pUsr; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; ++nDequeued; } @@ -1897,8 +1895,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, - (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, + MsgAddRef(pWti->batch.pElem[i].pMsg))); pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } @@ -2002,8 +2000,9 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qConstruct = qConstructLinkedList; pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; - pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; - pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->qDeq = qDeqLinkedList; + //pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->qDel = qDelLinkedList; pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_DISK: @@ -2432,7 +2431,7 @@ finalize_it: * rgerhards, 2009-06-16 */ static inline rsRetVal -doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) { DEFiRet; int err; @@ -2614,7 +2613,7 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); @@ -2627,7 +2626,7 @@ qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) { DEFiRet; int iCancelStateSave; -- cgit v1.2.3 From f88d4f76f69f52918726eb91a2ce06a163cbf0c6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 12:51:19 +0100 Subject: queue: remove unnecessary (obj_t*) redirection from msg ptrs --- runtime/queue.c | 82 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 40 insertions(+), 42 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 6e40b0c9..bdca61eb 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -312,16 +312,16 @@ getLogicalQueueSize(qqueue_t *pThis) */ static inline void queueDrain(qqueue_t *pThis) { - msg_t *pUsr; + msg_t *pMsg; ASSERT(pThis != NULL); BEGINfunc DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { - pThis->qDeq(pThis, &pUsr); - if(pUsr != NULL) { - objDestruct(pUsr); + pThis->qDeq(pThis, &pMsg); + if(pMsg != NULL) { + msgDestruct(&pMsg); } pThis->qDel(pThis); } @@ -621,7 +621,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) RETiRet; } -static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pMsg) { qLinkedList_t *pEntry; DEFiRet; @@ -629,7 +629,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr) CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; - pEntry->pMsg = pUsr; + pEntry->pMsg = pMsg; if(pThis->tVars.linklist.pDelRoot == NULL) { pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry; @@ -647,14 +647,13 @@ finalize_it: } -static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppUsr) +static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppMsg) { qLinkedList_t *pEntry; DEFiRet; pEntry = pThis->tVars.linklist.pDeqRoot; - ISOBJ_TYPE_assert(pEntry->pUsr, msg); - *ppUsr = pEntry->pMsg; + *ppMsg = pEntry->pMsg; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; RETiRet; @@ -889,7 +888,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) RETiRet; } -static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pMsg) { DEFiRet; number_t nWriteCount; @@ -897,7 +896,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) ASSERT(pThis != NULL); CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount)); - CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); + CHKiRet((objSerialize(pMsg))(pMsg, pThis->tVars.disk.pWrite)); CHKiRet(strm.Flush(pThis->tVars.disk.pWrite)); CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */ @@ -907,7 +906,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) * the in-memory representation. The instance will be re-created upon * dequeue. -- rgerhards, 2008-07-09 */ - objDestruct(pUsr); + msgDestruct(&pMsg); DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n", nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly); @@ -917,10 +916,10 @@ finalize_it: } -static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppUsr) +static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; - iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); + iRet = obj.Deserialize(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); RETiRet; } @@ -970,7 +969,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) { batch_t singleBatch; batch_obj_t batchObj; @@ -992,7 +991,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); batchObj.state = BATCH_STATE_RDY; - batchObj.pMsg = pUsr; + batchObj.pMsg = pMsg; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.active = &active; @@ -1001,7 +1000,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); } - objDestruct(pUsr); + msgDestruct(&pMsg); RETiRet; } @@ -1044,13 +1043,13 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -qqueueAdd(qqueue_t *pThis, msg_t *pUsr) +qqueueAdd(qqueue_t *pThis, msg_t *pMsg) { DEFiRet; ASSERT(pThis != NULL); - CHKiRet(pThis->qAdd(pThis, pUsr)); + CHKiRet(pThis->qAdd(pThis, pMsg)); if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); @@ -1066,7 +1065,7 @@ finalize_it: /* generic code to dequeue a queue entry */ static rsRetVal -qqueueDeq(qqueue_t *pThis, msg_t **ppUsr) +qqueueDeq(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; @@ -1077,7 +1076,7 @@ qqueueDeq(qqueue_t *pThis, msg_t **ppUsr) * If we decrement, however, we may lose a message. But that is better than * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ - iRet = pThis->qDeq(pThis, ppUsr); + iRet = pThis->qDeq(pThis, ppMsg); ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq); // DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", @@ -1420,7 +1419,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) * the return state! * rgerhards, 2008-01-24 */ -static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pMsg) { DEFiRet; rsRetVal iRetLocal; @@ -1429,12 +1428,12 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr) ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { - iRetLocal = objGetSeverity(pUsr, &iSeverity); + iRetLocal = MsgGetSeverity(pMsg, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", iQueueSize, iSeverity); STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg " @@ -1520,7 +1519,7 @@ static inline rsRetVal DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; - msg_t *pUsr; + msg_t *pMsg; int nEnqueued = 0; rsRetVal localRet; DEFiRet; @@ -1529,16 +1528,16 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { - pUsr = pBatch->pElem[i].pMsg; + pMsg = pBatch->pElem[i].pMsg; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { - localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pUsr)); + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); } } - objDestruct(pUsr); + msgDestruct(&pMsg); } DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); @@ -1570,7 +1569,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz int nDiscarded; int nDeleted; int iQueueSize; - msg_t *pUsr; + msg_t *pMsg; rsRetVal localRet; DEFiRet; @@ -1579,10 +1578,10 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { - CHKiRet(qqueueDeq(pThis, &pUsr)); + CHKiRet(qqueueDeq(pThis, &pMsg)); /* check if we should discard this element */ - localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr); + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg); if(localRet == RS_RET_QUEUE_FULL) { ++nDiscarded; continue; @@ -1591,7 +1590,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz } /* all well, use this element */ - pWti->batch.pElem[nDequeued].pMsg = pUsr; + pWti->batch.pElem[nDequeued].pMsg = pMsg; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; ++nDequeued; } @@ -2001,7 +2000,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; pThis->qDeq = qDeqLinkedList; - //pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; pThis->qDel = qDelLinkedList; pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; @@ -2431,7 +2429,7 @@ finalize_it: * rgerhards, 2009-06-16 */ static inline rsRetVal -doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) +doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) { DEFiRet; int err; @@ -2440,7 +2438,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued); /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ - CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2517,7 +2515,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) if(pThis->toEnq == 0 || pThis->bEnqOnly) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq); @@ -2529,7 +2527,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); @@ -2537,7 +2535,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) } /* and finally enqueue the message */ - CHKiRet(qqueueAdd(pThis, pUsr)); + CHKiRet(qqueueAdd(pThis, pMsg)); STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize); finalize_it: @@ -2613,11 +2611,11 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pUsr); + iRet = qAddDirect(pThis, pMsg); RETiRet; } @@ -2626,7 +2624,7 @@ qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr) * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) +qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) { DEFiRet; int iCancelStateSave; @@ -2638,7 +2636,7 @@ qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) d_pthread_mutex_lock(pThis->mut); } - CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr)); + CHKiRet(doEnqSingleObj(pThis, flowCtlType, pMsg)); qqueueChkPersist(pThis, 1); -- cgit v1.2.3 From c28d92259b27eebca3892b9ad18d467691e5aacc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 13:27:27 +0100 Subject: queue: use specific deserializer for msg object spares lengthy table lookups --- runtime/queue.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index bdca61eb..c42b18ad 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -59,7 +59,6 @@ #include "datetime.h" #include "unicode-helper.h" #include "statsobj.h" -#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS # include @@ -919,7 +918,7 @@ finalize_it: static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; - iRet = obj.Deserialize(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); + iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL, msgConstruct, msgConstructFinalizer, MsgSetProperty); RETiRet; } @@ -933,7 +932,7 @@ static rsRetVal qDelDisk(qqueue_t *pThis) int64 offsOut; CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); - CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL)); + CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL, msgConstruct, msgConstructFinalizer, MsgSetProperty)); objDestruct(pDummyObj); CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); -- cgit v1.2.3 From 5a643669221363a49fb36cfb2acc64dc29b53a14 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 17:07:22 +0100 Subject: queue: remove time() calls from msg deserialization --- runtime/queue.c | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index c42b18ad..ed486037 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -918,22 +918,34 @@ finalize_it: static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; - iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL, msgConstruct, msgConstructFinalizer, MsgSetProperty); + iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, + NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgSetProperty); RETiRet; } +/* the following function is a dummy to be used for qDelDisk, which (currently) must + * provide constructors & others even though it does not really need the object. So + * instead of using the real ones, we use the dummy here. That at least saves some time. + * Of course, in the longer term the whole process should be refactored. + * rgerhards, 2012-11-03 + */ +static rsRetVal +qDelDiskCallbackDummy(void) +{ + return RS_RET_OK; +} static rsRetVal qDelDisk(qqueue_t *pThis) { - obj_t *pDummyObj; /* we need to deserialize it... */ + obj_t *pDummyObj; /* another dummy, nothing is created */ DEFiRet; int64 offsIn; int64 offsOut; CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); - CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL, msgConstruct, msgConstructFinalizer, MsgSetProperty)); - objDestruct(pDummyObj); + CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, + NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, qDelDiskCallbackDummy)); CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); /* This time it is a bit tricky: we free disk space only upon file deletion. So we need -- cgit v1.2.3 From 9ab150318063524b41176d587e0c0f7e4edf6472 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 17:18:11 +0100 Subject: queue: handle unknown queue type in debug output note: can not happen, but... --- runtime/queue.c | 3 +++ 1 file changed, 3 insertions(+) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index ed486037..306b88ed 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -241,6 +241,9 @@ getQueueTypeName(queueType_t t) case QUEUETYPE_DIRECT: r = "Direct"; break; + default: + r = "invalid/unknown queue mode"; + break; } return r; } -- cgit v1.2.3 From 704bb9fc2165688ec23f9c7f1fe2776d2c2efa21 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 5 Nov 2012 17:04:30 +0100 Subject: queue: mini-improvement in deserializer (stage work) --- runtime/queue.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 306b88ed..9113ccb5 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -921,7 +921,7 @@ finalize_it: static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; - iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, + iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", 3, pThis->tVars.disk.pReadDeq, NULL, NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgSetProperty); RETiRet; } @@ -947,7 +947,7 @@ static rsRetVal qDelDisk(qqueue_t *pThis) int64 offsOut; CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); - CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, + CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", 3, pThis->tVars.disk.pReadDel, NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, qDelDiskCallbackDummy)); CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); -- cgit v1.2.3 From 94f6326237404545877c3d3df0edef44e28bcda9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 6 Nov 2012 17:48:35 +0100 Subject: queue: reduce CPU load for deserializing message properties Linear runtime due to message order. Was quadratic before. However, not a big overall improvement. --- runtime/queue.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 9113ccb5..802a9340 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -922,7 +922,7 @@ static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", 3, pThis->tVars.disk.pReadDeq, NULL, - NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgSetProperty); + NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgDeserialize); RETiRet; } @@ -940,7 +940,7 @@ qDelDiskCallbackDummy(void) } static rsRetVal qDelDisk(qqueue_t *pThis) { - obj_t *pDummyObj; /* another dummy, nothing is created */ + msg_t *pDummyObj; /* another dummy, nothing is created */ DEFiRet; int64 offsIn; @@ -948,7 +948,7 @@ static rsRetVal qDelDisk(qqueue_t *pThis) CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", 3, pThis->tVars.disk.pReadDel, - NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, qDelDiskCallbackDummy)); + NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, objDeserializeDummy)); CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); /* This time it is a bit tricky: we free disk space only upon file deletion. So we need -- cgit v1.2.3 From c563914d6f96efc1c4da02a7f49409297b20f656 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Nov 2012 16:46:41 +0100 Subject: queue: file delete stream does no longer do real io This stream is primarily used for state tracking, and has been modified to do just that. This results in considerable less io being done and the respective speedup. --- runtime/queue.c | 96 ++++++++++++++++++++------------------------------------- 1 file changed, 34 insertions(+), 62 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 802a9340..1ee34335 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -777,9 +777,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); - /* create a duplicate for the read "pointer". - */ - + /* create a duplicate for the read "pointer". */ CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq)); CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */ CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq)); @@ -798,7 +796,7 @@ finalize_it: strm.Destruct(&psQIF); if(iRet != RS_RET_OK) { - DBGOPRINT((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n", + DBGOPRINT((obj_t*) pThis, "state %d reading .qi file - can not read persisted info (if any)\n", iRet); } @@ -927,50 +925,6 @@ static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) } -/* the following function is a dummy to be used for qDelDisk, which (currently) must - * provide constructors & others even though it does not really need the object. So - * instead of using the real ones, we use the dummy here. That at least saves some time. - * Of course, in the longer term the whole process should be refactored. - * rgerhards, 2012-11-03 - */ -static rsRetVal -qDelDiskCallbackDummy(void) -{ - return RS_RET_OK; -} -static rsRetVal qDelDisk(qqueue_t *pThis) -{ - msg_t *pDummyObj; /* another dummy, nothing is created */ - DEFiRet; - - int64 offsIn; - int64 offsOut; - - CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); - CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", 3, pThis->tVars.disk.pReadDel, - NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, objDeserializeDummy)); - CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); - - /* This time it is a bit tricky: we free disk space only upon file deletion. So we need - * to keep track of what we have read until we get an out-offset that is lower than the - * in-offset (which indicates file change). Then, we can subtract the whole thing from - * the on-disk size. -- rgerhards, 2008-01-30 - */ - if(offsIn < offsOut) { - pThis->tVars.disk.bytesRead += offsOut - offsIn; - } else { - pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead; - pThis->tVars.disk.bytesRead = offsOut; - DBGOPRINT((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk); - /* awake possibly waiting enq process */ - pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ - } - -finalize_it: - RETiRet; -} - - /* -------------------- direct (no queueing) -------------------- */ static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis) { @@ -1188,11 +1142,11 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) rsRetVal iRetLocal; DEFiRet; -RUNLOG_STR("trying to shutdown workers within Action Timeout"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ + DBGOPRINT((obj_t*) pThis, "trying to shutdown workers within Action Timeout"); DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n"); pThis->bEnqOnly = 1; pThis->bShutdownImmediate = 1; @@ -1466,13 +1420,32 @@ static inline rsRetVal DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) { int i; + off64_t bytesDel; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); /* now send delete request to storage driver */ - for(i = 0 ; i < nElem ; ++i) { - pThis->qDel(pThis); + if(pThis->qType == QUEUETYPE_DISK) { + strmMultiFileSeek(pThis->tVars.disk.pReadDel, pThis->tVars.disk.deqFileNumOut, + pThis->tVars.disk.deqOffs, &bytesDel); + /* We need to correct the on-disk file size. This time it is a bit tricky: + * we free disk space only upon file deletion. So we need to keep track of what we + * have read until we get an out-offset that is lower than the in-offset (which + * indicates file change). Then, we can subtract the whole thing from the on-disk + * size. -- rgerhards, 2008-01-30 + */ + if(bytesDel != 0) { + pThis->tVars.disk.sizeOnDisk -= bytesDel; + DBGOPRINT((obj_t*) pThis, "a %lld octet file has been deleted, now %lld octets disk " + "space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk); + /* awake possibly waiting enq process */ + pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ + } + } else { /* memory queue */ + for(i = 0 ; i < nElem ; ++i) { + pThis->qDel(pThis); + } } /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ @@ -1591,6 +1564,9 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz DeleteProcessedBatch(pThis, &pWti->batch); nDequeued = nDiscarded = 0; + if(pThis->qType == QUEUETYPE_DISK) { + pThis->tVars.disk.deqFileNumIn = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq); + } while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { CHKiRet(qqueueDeq(pThis, &pMsg)); @@ -1609,6 +1585,11 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz ++nDequeued; } + if(pThis->qType == QUEUETYPE_DISK) { + strm.GetCurrOffset(pThis->tVars.disk.pReadDeq, &pThis->tVars.disk.deqOffs); + pThis->tVars.disk.deqFileNumOut = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq); + } + /* it is sufficient to persist only when the bulk of work is done */ qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted); @@ -1616,7 +1597,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz pWti->batch.nElemDeq = nDequeued + nDiscarded; pWti->batch.deqID = getNextDeqID(pThis); *piRemainingQueueSize = iQueueSize; - finalize_it: RETiRet; } @@ -1652,7 +1632,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); } - // TODO: MULTI: check physical queue size? pthread_cond_signal(&pThis->notFull); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ @@ -1904,10 +1883,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { - /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs - * the message. So far, we simply assume we always have msg_t, what currently is always the case. - * rgerhards, 2009-05-28 - */ CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg))); pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ @@ -2022,7 +1997,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qDestruct = qDestructDisk; pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; - pThis->qDel = qDelDisk; + pThis->qDel = NULL; /* delete for disk handled via special code! */ pThis->MultiEnq = qqueueMultiEnqObjNonDirect; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ @@ -2158,7 +2133,7 @@ finalize_it: } -/* persist the queue to disk. If we have something to persist, we first +/* persist the queue to disk (write the .qi file). If we have something to persist, we first * save the information on the queue properties itself and then we call * the queue-type specific drivers. * Variable bIsCheckpoint is set to 1 if the persist is for a checkpoint, @@ -2213,7 +2188,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis)); objSerializeSCALAR(psQIF, iQueueSize, INT); objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64); - objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64); CHKiRet(obj.EndSerialize(psQIF)); /* now persist the stream info */ @@ -2793,8 +2767,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp) pThis->iQueueSize = pProp->val.num; } else if(isProp("tVars.disk.sizeOnDisk")) { pThis->tVars.disk.sizeOnDisk = pProp->val.num; - } else if(isProp("tVars.disk.bytesRead")) { - pThis->tVars.disk.bytesRead = pProp->val.num; } else if(isProp("qType")) { if(pThis->qType != pProp->val.num) ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH); -- cgit v1.2.3 From 9273b4bb4dcb6683cf6825fedd3cb5cd0f59805a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 15 Jan 2013 15:01:16 +0100 Subject: optimize memory layout for much better cache hits Moave element status out of batch_obj_t because we get a *much* better cache hit ratio this way. Note that this is really a HUGE saving, even if it doesn't look so (both profiler data as well as practical tests indicate that!). --- runtime/queue.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 99fb5fbd..6df1c95e 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -130,7 +130,7 @@ static inline void displayBatchState(batch_t *pBatch) { int i; for(i = 0 ; i < pBatch->nElem ; ++i) { - DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); + DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->eltState[i]); } } @@ -941,6 +941,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) { batch_t singleBatch; batch_obj_t batchObj; + batch_state_t batchState = BATCH_STATE_RDY; sbool active = 1; int i; DEFiRet; @@ -958,10 +959,10 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) */ memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); - batchObj.state = BATCH_STATE_RDY; batchObj.pMsg = pMsg; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; + singleBatch.eltState = &batchState; singleBatch.active = &active; iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ @@ -1546,8 +1547,8 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) for(i = 0 ; i < pBatch->nElem ; ++i) { pMsg = pBatch->pElem[i].pMsg; - if( pBatch->pElem[i].state == BATCH_STATE_RDY - || pBatch->pElem[i].state == BATCH_STATE_SUB) { + if( pBatch->eltState[i] == BATCH_STATE_RDY + || pBatch->eltState[i] == BATCH_STATE_SUB) { localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { @@ -1611,7 +1612,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz /* all well, use this element */ pWti->batch.pElem[nDequeued].pMsg = pMsg; - pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; + pWti->batch.eltState[nDequeued] = BATCH_STATE_RDY; ++nDequeued; } @@ -1915,7 +1916,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg))); - pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ + pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */ } /* but now cancellation is no longer permitted */ -- cgit v1.2.3 From d9cde56eb8532bd660d6feb2249562afac0c40f6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 8 Apr 2013 17:55:52 +0200 Subject: add output module interface to facilitate cooperative shutdown ... in more complex cases (where receiving SIGTTIN is not sufficient). See also: http://blog.gerhards.net/2013/04/rsyslog-output-plugin-wrangling.html --- runtime/queue.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 8d8d8e0a..a464c2d7 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1332,8 +1332,6 @@ finalize_it: RETiRet; } - - /* Constructor for the queue object * This constructs the data structure, but does not yet start the queue. That * is done by queueStart(). The reason is that we want to give the caller a chance -- cgit v1.2.3 From 86e34c6985da29c62f13ab83e44548f1fd21849d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 9 Apr 2013 10:54:04 +0200 Subject: make imrelp properly terminate on system shutdown it didn't do so if it was inside a retry loop --- runtime/queue.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index a464c2d7..5c736d2c 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1032,7 +1032,8 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate); +dbgprintf("DDDD: qqueueEnqObjDirectBatch\n"); + iRet = pThis->pConsumer(pThis->pUsr, pBatch, NULL); RETiRet; } @@ -1191,6 +1192,7 @@ RUNLOG_STR("trying to shutdown workers within Action Timeout"); /* instruct workers to finish ASAP, even if still work exists */ DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n"); pThis->bEnqOnly = 1; +dbgprintf("DDDD: setting shutdownImmediate mode, ptr %p!\n", &pThis->bShutdownImmediate); pThis->bShutdownImmediate = 1; /* now DA queue */ if(pThis->bIsDA) { @@ -1872,6 +1874,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* at this spot, we may be cancelled */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); +dbgprintf("DDDD: calling consumer with shutdownImmeditate ptr %p\n", &pThis->bShutdownImmediate); CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit -- cgit v1.2.3 From d0cefac7a766d0f02ca76fcaeb6cfbace695b925 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 24 Apr 2013 11:09:37 +0200 Subject: cleanup --- runtime/queue.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 74090a4d..600b5688 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1150,7 +1150,6 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "trying to shutdown workers within Action Timeout"); DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n"); pThis->bEnqOnly = 1; -dbgprintf("DDDD: setting shutdownImmediate mode, ptr %p!\n", &pThis->bShutdownImmediate); pThis->bShutdownImmediate = 1; /* now DA queue */ if(pThis->bIsDA) { @@ -1856,7 +1855,6 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); -dbgprintf("DDDD: calling consumer with shutdownImmeditate ptr %p\n", &pThis->bShutdownImmediate); CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit -- cgit v1.2.3 From 0817fc95a810df497c52200c772fac743ca84dde Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 8 May 2013 09:40:20 +0200 Subject: bugfix: segfault on startup if a disk queue was configure without file name Now this triggers an error message and the queue is changed to linkedList type. --- runtime/queue.c | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 600b5688..85b1e45b 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2760,6 +2760,12 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals) "param '%s'\n", pblk.descr[i].name); } } + if(pThis->qType == QUEUETYPE_DISK && pThis->pszFilePrefix == NULL) { + errmsg.LogError(0, RS_RET_QUEUE_DISK_NO_FN, "error on queue '%s', disk mode selected, but " + "no queue file name given; queue type changed to 'linkedList'", + obj.GetName((obj_t*) pThis)); + pThis->qType = QUEUETYPE_LINKEDLIST; + } cnfparamvalsDestruct(pvals, &pblk); return RS_RET_OK; } -- cgit v1.2.3 From fd63b32409c258b7740cf2282ab5469b8c6f7c4e Mon Sep 17 00:00:00 2001 From: Andre Lorbach Date: Tue, 9 Jul 2013 15:30:05 +0200 Subject: Changed and extended debug output in queue/action classes for further analysis --- runtime/queue.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 85b1e45b..bc9788ef 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1022,7 +1022,7 @@ qqueueAdd(qqueue_t *pThis, msg_t *pMsg) if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); - DBGOPRINT((obj_t*) pThis, "entry added, size now log %d, phys %d entries\n", + DBGOPRINT((obj_t*) pThis, "enqueueMsg: entry added, size now log %d, phys %d entries\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); } @@ -1466,7 +1466,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) */ if(bytesDel != 0) { pThis->tVars.disk.sizeOnDisk -= bytesDel; - DBGOPRINT((obj_t*) pThis, "a %lld octet file has been deleted, now %lld octets disk " + DBGOPRINT((obj_t*) pThis, "enqueueMsg: a %lld octet file has been deleted, now %lld octets disk " "space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk); /* awake possibly waiting enq process */ pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ @@ -1480,7 +1480,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); - DBGPRINTF("delete batch from store, new sizes: log %d, phys %d\n", + DBGPRINTF("enqueueMsg: delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ @@ -1550,13 +1550,13 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { - DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); + DBGPRINTF("enqueueMsg: error %d re-enqueuing unprocessed data element - discarded\n", localRet); } } msgDestruct(&pMsg); } - DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + DBGPRINTF("enqueueMsg: we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); if(nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued); @@ -2531,7 +2531,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull); if(pThis->toEnq == 0 || pThis->bEnqOnly) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n"); + DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding QueueSize=%d MaxQueueSize=%d sizeOnDisk=%d sizeOnDiskMax=%d\n", pThis->iQueueSize, pThis->iMaxQueueSize, pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); -- cgit v1.2.3 From bcb090ab8ac53615a5995532ea9b245dc759af42 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 13 Jul 2013 16:04:12 +0200 Subject: nit: use correct function names in new debug instrumentation --- runtime/queue.c | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index bc9788ef..3ae74287 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1022,7 +1022,7 @@ qqueueAdd(qqueue_t *pThis, msg_t *pMsg) if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); - DBGOPRINT((obj_t*) pThis, "enqueueMsg: entry added, size now log %d, phys %d entries\n", + DBGOPRINT((obj_t*) pThis, "qqueueAdd: entry added, size now log %d, phys %d entries\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); } @@ -1466,7 +1466,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) */ if(bytesDel != 0) { pThis->tVars.disk.sizeOnDisk -= bytesDel; - DBGOPRINT((obj_t*) pThis, "enqueueMsg: a %lld octet file has been deleted, now %lld octets disk " + DBGOPRINT((obj_t*) pThis, "doDeleteBatch: a %lld octet file has been deleted, now %lld octets disk " "space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk); /* awake possibly waiting enq process */ pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ @@ -1480,7 +1480,7 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); - DBGPRINTF("enqueueMsg: delete batch from store, new sizes: log %d, phys %d\n", + DBGPRINTF("doDeleteBatch: delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ @@ -1550,13 +1550,13 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { - DBGPRINTF("enqueueMsg: error %d re-enqueuing unprocessed data element - discarded\n", localRet); + DBGPRINTF("DeleteProcessedBatch: error %d re-enqueuing unprocessed data element - discarded\n", localRet); } } msgDestruct(&pMsg); } - DBGPRINTF("enqueueMsg: we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + DBGPRINTF("DeleteProcessedBatch: we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); if(nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued); @@ -2491,7 +2491,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) * In any case, this was the old code (if we do the TODO): * pthread_cond_wait(&pThis->belowFullDlyWtrMrk, pThis->mut); */ - DBGOPRINT((obj_t*) pThis, "enqueueMsg: FullDelay mark reached for full delayable message " + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: FullDelay mark reached for full delayable message " "- blocking, queue size is %d.\n", pThis->iQueueSize); timeoutComp(&t, 1000); err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); @@ -2508,7 +2508,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) } } else if(flowCtlType == eFLOWCTL_LIGHT_DELAY && !glbl.GetGlobalInputTermState()) { if(pThis->iQueueSize >= pThis->iLightDlyMrk) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: LightDelay mark reached for light " + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: LightDelay mark reached for light " "delayable message - blocking a bit.\n"); timeoutComp(&t, 1000); /* 1000 millisconds = 1 second TODO: make configurable */ err = pthread_cond_timedwait(&pThis->belowLightDlyWtrMrk, pThis->mut, &t); @@ -2531,24 +2531,24 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull); if(pThis->toEnq == 0 || pThis->bEnqOnly) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding QueueSize=%d MaxQueueSize=%d sizeOnDisk=%d sizeOnDiskMax=%d\n", pThis->iQueueSize, pThis->iMaxQueueSize, pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax); + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - configured for immediate discarding QueueSize=%d MaxQueueSize=%d sizeOnDisk=%d sizeOnDiskMax=%d\n", pThis->iQueueSize, pThis->iMaxQueueSize, pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq); + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - waiting %dms to drain.\n", pThis->toEnq); if(glbl.GetGlobalInputTermState()) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL, discard due to FORCE_TERM.\n"); + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL, discard due to FORCE_TERM.\n"); ABORT_FINALIZE(RS_RET_FORCE_TERM); } timeoutComp(&t, pThis->toEnq); if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: cond timeout, dropping message!\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } - dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); + dbgoprint((obj_t*) pThis, "doEnqSingleObject: wait solved queue full condition, enqueing\n"); } } -- cgit v1.2.3 From 495ae8752084b896f29d92e33028f44269e1d1b1 Mon Sep 17 00:00:00 2001 From: Andre Lorbach Date: Mon, 15 Jul 2013 08:17:08 +0200 Subject: bugfix: 100% CPU utilization when DA queue became full --- runtime/queue.c | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 3ae74287..d5ad37a1 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1913,8 +1913,16 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { - CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, - MsgAddRef(pWti->batch.pElem[i].pMsg))); + iRet = qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, MsgAddRef(pWti->batch.pElem[i].pMsg)); + if(iRet != RS_RET_OK) { + if(iRet == RS_RET_ERR_QUEUE_EMERGENCY) { + /* Queue emergency error occured */ + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg caught RS_RET_ERR_QUEUE_EMERGENCY, aborting loop.\n"); + FINALIZE; + } else { + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg item (%d) returned with error state: '%d'\n", i, iRet); + } + } pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */ } @@ -1922,10 +1930,14 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(iCancelStateSave, NULL); finalize_it: + if(iRet != RS_RET_OK && iRet != RS_RET_ERR_QUEUE_EMERGENCY) { + iRet = RS_RET_OK; + } + /* now we are done, but potentially need to re-aquire the mutex */ if(bNeedReLock) d_pthread_mutex_lock(pThis->mut); - DBGOPRINT((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet); + RETiRet; } -- cgit v1.2.3 From d302fb7571dde5a929c5e80c2bdf4b2133b4f249 Mon Sep 17 00:00:00 2001 From: Andre Lorbach Date: Tue, 16 Jul 2013 09:13:35 +0200 Subject: Fixed return state handling in ConsumerDA The queue full loop fix added a problem to the queue when rsyslog was shutdown. This problem has been corrected now. Conflicts: runtime/queue.c --- runtime/queue.c | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index d5ad37a1..5652efe2 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1930,8 +1930,25 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(iCancelStateSave, NULL); finalize_it: - if(iRet != RS_RET_OK && iRet != RS_RET_ERR_QUEUE_EMERGENCY) { - iRet = RS_RET_OK; + /* Check the last return state of qqueueEnqMsg. If an error was returned, we acknowledge it only. + * Unless the error code is RS_RET_ERR_QUEUE_EMERGENCY, we resett the return state to RS_RET_OK. + * Otherwise the Caller functions would run into an infinite Loop trying to enqueue the + * same messages over and over again. + * + * However we do NOT overwrite positive return states like + * RS_RET_TERMINATE_NOW, + * RS_RET_NO_RUN, + * RS_RET_IDLE, + * RS_RET_TERMINATE_WHEN_IDLE + * These return states are important for Queue handling of the upper laying functions. + */ + if( iRet != RS_RET_OK && + iRet != RS_RET_ERR_QUEUE_EMERGENCY && + iRet < 0) { + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg Resetting iRet from %d back to RS_RET_OK\n", iRet); + iRet = RS_RET_OK; + } else { + DBGOPRINT((obj_t*) pThis, "ConsumerDA:qqueueEnqMsg returns with iRet %d\n", iRet); } /* now we are done, but potentially need to re-aquire the mutex */ -- cgit v1.2.3 From 1c6449022c423c13a8828028cb68afe30e3c2f46 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 16 Jul 2013 10:55:39 +0200 Subject: add note on a potential future troublespot --- runtime/queue.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 5652efe2..5ccdd814 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1931,7 +1931,7 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) finalize_it: /* Check the last return state of qqueueEnqMsg. If an error was returned, we acknowledge it only. - * Unless the error code is RS_RET_ERR_QUEUE_EMERGENCY, we resett the return state to RS_RET_OK. + * Unless the error code is RS_RET_ERR_QUEUE_EMERGENCY, we reset the return state to RS_RET_OK. * Otherwise the Caller functions would run into an infinite Loop trying to enqueue the * same messages over and over again. * @@ -1941,6 +1941,13 @@ finalize_it: * RS_RET_IDLE, * RS_RET_TERMINATE_WHEN_IDLE * These return states are important for Queue handling of the upper laying functions. + * RGer: Note that checking for iRet < 0 is a bit bold. In theory, positive iRet + * values are "OK" states, and things that the caller shall deal with. However, + * this has not been done so consistently. Andre convinced me that the current + * code is an elegant solution. However, if problems with queue workers and/or + * shutdown come up, this code here should be looked at suspiciously. In those + * cases it may work out to check all status codes explicitely, just to avoid + * a pitfall due to unexpected states being passed on to the caller. */ if( iRet != RS_RET_OK && iRet != RS_RET_ERR_QUEUE_EMERGENCY && -- cgit v1.2.3 From cd2b917feff1f292085cef15574bb5974a11d4a7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 17 Jul 2013 13:33:13 +0200 Subject: cosmetic bugfix: file name buffer was not freed on disk queue destruction This was an extremely small one-time per run memleak, so nothing of concern. However, it bugs under valgrind and similar memory debuggers. --- runtime/queue.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 5ccdd814..08ef475f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -877,7 +877,8 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) DEFiRet; ASSERT(pThis != NULL); - + + free(pThis->pszQIFNam); if(pThis->tVars.disk.pWrite != NULL) strm.Destruct(&pThis->tVars.disk.pWrite); if(pThis->tVars.disk.pReadDeq != NULL) -- cgit v1.2.3 From 62090c416cd0b7f4b67a68a81b5c37e2d27cf84e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 17 Jul 2013 13:34:23 +0200 Subject: regression fix: %d used for long long types in debug output regression from recent DA queue bug fix set of patches; not present in any released code. --- runtime/queue.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 08ef475f..abe2be06 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2568,7 +2568,9 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull); if(pThis->toEnq == 0 || pThis->bEnqOnly) { - DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - configured for immediate discarding QueueSize=%d MaxQueueSize=%d sizeOnDisk=%d sizeOnDiskMax=%d\n", pThis->iQueueSize, pThis->iMaxQueueSize, pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax); + DBGOPRINT((obj_t*) pThis, "doEnqSingleObject: queue FULL - configured for immediate discarding QueueSize=%d " + "MaxQueueSize=%d sizeOnDisk=%lld sizeOnDiskMax=%lld\n", pThis->iQueueSize, pThis->iMaxQueueSize, + pThis->tVars.disk.sizeOnDisk, pThis->sizeOnDiskMax); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); -- cgit v1.2.3 From 743680f78f3e98d9c693d2394d13b6b8ac80a637 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 19 Jul 2013 16:14:07 +0200 Subject: debug: improve queue startup debug output --- runtime/queue.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index abe2be06..935a8106 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2126,12 +2126,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->iFullDlyMrk = wrk; } - DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, lqsize %d, pqsize %d, child %d, " - "full delay %d, light delay %d, deq batch size %d starting\n", - pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, + DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, " + "full delay %d, light delay %d, deq batch size %d starting, high wtrrmrk %d, low wtrmrk %d\n", + pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk, - pThis->iDeqBatchSize); + pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk); pThis->bQueueStarted = 1; if(pThis->qType == QUEUETYPE_DIRECT) -- cgit v1.2.3