From 40fffde2b6a36ba12388b89d422104c258a667f7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 2 Nov 2012 18:55:53 +0100 Subject: generate disk .qi file once at queue construction ... instead of each time a file write happens. In some situations (very frequent sync), this can probably be a big performane win. --- runtime/queue.c | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index fbf77108..f5bb841f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -744,18 +744,12 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) { DEFiRet; strm_t *psQIF = NULL; - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; struct stat stat_buf; ISOBJ_TYPE_assert(pThis, qqueue); - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - /* check if the file exists */ - if(stat((char*) pszQIFNam, &stat_buf) == -1) { + if(stat((char*) pThis->pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { DBGOPRINT((obj_t*) pThis, "clean startup, no .qi file found\n"); ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); @@ -770,7 +764,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) CHKiRet(strm.Construct(&psQIF)); CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_READ)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); /* first, we try to read the property bag for ourselfs */ @@ -1985,6 +1979,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; uchar pszBuf[64]; + uchar pszQIFNam[MAXFNAME]; int wrk; uchar *qName; size_t lenBuf; @@ -2020,6 +2015,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->MultiEnq = qqueueMultiEnqObjNonDirect; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ + /* pre-construct file name for .qi file */ + pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), + "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); + pThis->pszQIFNam = ustrdup(pszQIFNam); + DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam, + (int) pThis->lenQIFNam); break; case QUEUETYPE_DIRECT: pThis->qConstruct = qConstructDirect; @@ -2157,8 +2158,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) { DEFiRet; strm_t *psQIF = NULL; /* Queue Info File */ - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; ASSERT(pThis != NULL); @@ -2176,13 +2175,9 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) DBGOPRINT((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis)); - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) { if(pThis->bNeedDelQIF) { - unlink((char*)pszQIFNam); + unlink((char*)pThis->pszQIFNam); pThis->bNeedDelQIF = 0; } /* indicate spool file needs to be deleted */ @@ -2195,7 +2190,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC)); CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); /* first, write the property bag for ourselfs -- cgit v1.2.3 From feddb2cc8fa213725a14e556d0366aef8d3a4339 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 11:40:13 +0100 Subject: queue: change gerenic queue pUsr ptr to be action_t this was always action_t, but the initial design was more generic. We are making it specific now in order to gain better performance (after all, we did not need the generic engine in the past 8 years...) --- runtime/queue.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index f5bb841f..eaf33e00 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -411,7 +411,7 @@ StartDA(qqueue_t *pThis) */ pThis->pqDA->pqParent = pThis; - CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(qqueueSetpAction(pThis->pqDA, pThis->pAction)); CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); @@ -996,7 +996,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); @@ -1023,7 +1023,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, pBatch, &pThis->bShutdownImmediate); RETiRet; } @@ -1835,7 +1835,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* at this spot, we may be cancelled */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2759,7 +2759,7 @@ DEFpropSetMeth(qqueue, iLightDlyMrk, int) DEFpropSetMeth(qqueue, bIsDA, int) DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) -DEFpropSetMeth(qqueue, pUsr, void*) +DEFpropSetMeth(qqueue, pAction, action_t*) DEFpropSetMeth(qqueue, iDeqSlowdown, int) DEFpropSetMeth(qqueue, iDeqBatchSize, int) DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) -- cgit v1.2.3 From c55e0a5a06e69106bc346057dd61dcb98688a4aa Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 12:32:50 +0100 Subject: queue: change generic msg ptr (pUsr) to be of msg_t type --- runtime/queue.c | 59 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 29 insertions(+), 30 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index eaf33e00..6e40b0c9 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -74,7 +74,7 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(statsobj) /* forward-definitions */ -static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); +static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -83,7 +83,7 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); @@ -312,7 +312,7 @@ getLogicalQueueSize(qqueue_t *pThis) */ static inline void queueDrain(qqueue_t *pThis) { - void *pUsr; + msg_t *pUsr; ASSERT(pThis != NULL); BEGINfunc @@ -546,7 +546,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis) } -static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) +static rsRetVal qAddFixedArray(qqueue_t *pThis, msg_t* in) { DEFiRet; @@ -560,7 +560,7 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) } -static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) +static rsRetVal qDeqFixedArray(qqueue_t *pThis, msg_t **out) { DEFiRet; @@ -621,7 +621,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) RETiRet; } -static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr) { qLinkedList_t *pEntry; DEFiRet; @@ -629,7 +629,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; - pEntry->pUsr = pUsr; + pEntry->pMsg = pUsr; if(pThis->tVars.linklist.pDelRoot == NULL) { pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry; @@ -647,14 +647,14 @@ finalize_it: } -static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) +static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppUsr) { qLinkedList_t *pEntry; DEFiRet; pEntry = pThis->tVars.linklist.pDeqRoot; ISOBJ_TYPE_assert(pEntry->pUsr, msg); - *ppUsr = pEntry->pUsr; + *ppUsr = pEntry->pMsg; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; RETiRet; @@ -889,7 +889,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) RETiRet; } -static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) { DEFiRet; number_t nWriteCount; @@ -917,7 +917,7 @@ finalize_it: } -static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr) +static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppUsr) { DEFiRet; iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); @@ -970,7 +970,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) { batch_t singleBatch; batch_obj_t batchObj; @@ -992,7 +992,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); batchObj.state = BATCH_STATE_RDY; - batchObj.pUsrp = (obj_t*) pUsr; + batchObj.pMsg = pUsr; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.active = &active; @@ -1044,7 +1044,7 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -qqueueAdd(qqueue_t *pThis, void *pUsr) +qqueueAdd(qqueue_t *pThis, msg_t *pUsr) { DEFiRet; @@ -1066,7 +1066,7 @@ finalize_it: /* generic code to dequeue a queue entry */ static rsRetVal -qqueueDeq(qqueue_t *pThis, void **ppUsr) +qqueueDeq(qqueue_t *pThis, msg_t **ppUsr) { DEFiRet; @@ -1420,14 +1420,13 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) * the return state! * rgerhards, 2008-01-24 */ -static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr) { DEFiRet; rsRetVal iRetLocal; int iSeverity; ISOBJ_TYPE_assert(pThis, qqueue); - ISOBJ_assert(pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { iRetLocal = objGetSeverity(pUsr, &iSeverity); @@ -1521,7 +1520,7 @@ static inline rsRetVal DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; - void *pUsr; + msg_t *pUsr; int nEnqueued = 0; rsRetVal localRet; DEFiRet; @@ -1530,11 +1529,10 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { - pUsr = pBatch->pElem[i].pUsrp; + pUsr = pBatch->pElem[i].pMsg; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { - localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, - (obj_t*)MsgAddRef((msg_t*) pUsr)); + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pUsr)); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); @@ -1572,7 +1570,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz int nDiscarded; int nDeleted; int iQueueSize; - void *pUsr; + msg_t *pUsr; rsRetVal localRet; DEFiRet; @@ -1593,7 +1591,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz } /* all well, use this element */ - pWti->batch.pElem[nDequeued].pUsrp = pUsr; + pWti->batch.pElem[nDequeued].pMsg = pUsr; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; ++nDequeued; } @@ -1897,8 +1895,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, - (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, + MsgAddRef(pWti->batch.pElem[i].pMsg))); pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } @@ -2002,8 +2000,9 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qConstruct = qConstructLinkedList; pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; - pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; - pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->qDeq = qDeqLinkedList; + //pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->qDel = qDelLinkedList; pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_DISK: @@ -2432,7 +2431,7 @@ finalize_it: * rgerhards, 2009-06-16 */ static inline rsRetVal -doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) { DEFiRet; int err; @@ -2614,7 +2613,7 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); @@ -2627,7 +2626,7 @@ qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) { DEFiRet; int iCancelStateSave; -- cgit v1.2.3 From f88d4f76f69f52918726eb91a2ce06a163cbf0c6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 12:51:19 +0100 Subject: queue: remove unnecessary (obj_t*) redirection from msg ptrs --- runtime/queue.c | 82 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 40 insertions(+), 42 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index 6e40b0c9..bdca61eb 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -312,16 +312,16 @@ getLogicalQueueSize(qqueue_t *pThis) */ static inline void queueDrain(qqueue_t *pThis) { - msg_t *pUsr; + msg_t *pMsg; ASSERT(pThis != NULL); BEGINfunc DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { - pThis->qDeq(pThis, &pUsr); - if(pUsr != NULL) { - objDestruct(pUsr); + pThis->qDeq(pThis, &pMsg); + if(pMsg != NULL) { + msgDestruct(&pMsg); } pThis->qDel(pThis); } @@ -621,7 +621,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) RETiRet; } -static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pMsg) { qLinkedList_t *pEntry; DEFiRet; @@ -629,7 +629,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pUsr) CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; - pEntry->pMsg = pUsr; + pEntry->pMsg = pMsg; if(pThis->tVars.linklist.pDelRoot == NULL) { pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry; @@ -647,14 +647,13 @@ finalize_it: } -static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppUsr) +static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppMsg) { qLinkedList_t *pEntry; DEFiRet; pEntry = pThis->tVars.linklist.pDeqRoot; - ISOBJ_TYPE_assert(pEntry->pUsr, msg); - *ppUsr = pEntry->pMsg; + *ppMsg = pEntry->pMsg; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; RETiRet; @@ -889,7 +888,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) RETiRet; } -static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pMsg) { DEFiRet; number_t nWriteCount; @@ -897,7 +896,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) ASSERT(pThis != NULL); CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount)); - CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); + CHKiRet((objSerialize(pMsg))(pMsg, pThis->tVars.disk.pWrite)); CHKiRet(strm.Flush(pThis->tVars.disk.pWrite)); CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */ @@ -907,7 +906,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pUsr) * the in-memory representation. The instance will be re-created upon * dequeue. -- rgerhards, 2008-07-09 */ - objDestruct(pUsr); + msgDestruct(&pMsg); DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n", nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly); @@ -917,10 +916,10 @@ finalize_it: } -static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppUsr) +static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; - iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); + iRet = obj.Deserialize(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); RETiRet; } @@ -970,7 +969,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) { batch_t singleBatch; batch_obj_t batchObj; @@ -992,7 +991,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); batchObj.state = BATCH_STATE_RDY; - batchObj.pMsg = pUsr; + batchObj.pMsg = pMsg; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.active = &active; @@ -1001,7 +1000,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pUsr) for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); } - objDestruct(pUsr); + msgDestruct(&pMsg); RETiRet; } @@ -1044,13 +1043,13 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -qqueueAdd(qqueue_t *pThis, msg_t *pUsr) +qqueueAdd(qqueue_t *pThis, msg_t *pMsg) { DEFiRet; ASSERT(pThis != NULL); - CHKiRet(pThis->qAdd(pThis, pUsr)); + CHKiRet(pThis->qAdd(pThis, pMsg)); if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); @@ -1066,7 +1065,7 @@ finalize_it: /* generic code to dequeue a queue entry */ static rsRetVal -qqueueDeq(qqueue_t *pThis, msg_t **ppUsr) +qqueueDeq(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; @@ -1077,7 +1076,7 @@ qqueueDeq(qqueue_t *pThis, msg_t **ppUsr) * If we decrement, however, we may lose a message. But that is better than * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ - iRet = pThis->qDeq(pThis, ppUsr); + iRet = pThis->qDeq(pThis, ppMsg); ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq); // DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", @@ -1420,7 +1419,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) * the return state! * rgerhards, 2008-01-24 */ -static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pMsg) { DEFiRet; rsRetVal iRetLocal; @@ -1429,12 +1428,12 @@ static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pUsr) ISOBJ_TYPE_assert(pThis, qqueue); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { - iRetLocal = objGetSeverity(pUsr, &iSeverity); + iRetLocal = MsgGetSeverity(pMsg, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", iQueueSize, iSeverity); STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg " @@ -1520,7 +1519,7 @@ static inline rsRetVal DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; - msg_t *pUsr; + msg_t *pMsg; int nEnqueued = 0; rsRetVal localRet; DEFiRet; @@ -1529,16 +1528,16 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { - pUsr = pBatch->pElem[i].pMsg; + pMsg = pBatch->pElem[i].pMsg; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { - localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pUsr)); + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); } } - objDestruct(pUsr); + msgDestruct(&pMsg); } DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); @@ -1570,7 +1569,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz int nDiscarded; int nDeleted; int iQueueSize; - msg_t *pUsr; + msg_t *pMsg; rsRetVal localRet; DEFiRet; @@ -1579,10 +1578,10 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { - CHKiRet(qqueueDeq(pThis, &pUsr)); + CHKiRet(qqueueDeq(pThis, &pMsg)); /* check if we should discard this element */ - localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr); + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg); if(localRet == RS_RET_QUEUE_FULL) { ++nDiscarded; continue; @@ -1591,7 +1590,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz } /* all well, use this element */ - pWti->batch.pElem[nDequeued].pMsg = pUsr; + pWti->batch.pElem[nDequeued].pMsg = pMsg; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; ++nDequeued; } @@ -2001,7 +2000,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; pThis->qDeq = qDeqLinkedList; - //pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; pThis->qDel = qDelLinkedList; pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; @@ -2431,7 +2429,7 @@ finalize_it: * rgerhards, 2009-06-16 */ static inline rsRetVal -doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) +doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) { DEFiRet; int err; @@ -2440,7 +2438,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued); /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ - CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2517,7 +2515,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) if(pThis->toEnq == 0 || pThis->bEnqOnly) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq); @@ -2529,7 +2527,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); @@ -2537,7 +2535,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) } /* and finally enqueue the message */ - CHKiRet(qqueueAdd(pThis, pUsr)); + CHKiRet(qqueueAdd(pThis, pMsg)); STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize); finalize_it: @@ -2613,11 +2611,11 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pUsr); + iRet = qAddDirect(pThis, pMsg); RETiRet; } @@ -2626,7 +2624,7 @@ qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pUsr) * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) +qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) { DEFiRet; int iCancelStateSave; @@ -2638,7 +2636,7 @@ qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pUsr) d_pthread_mutex_lock(pThis->mut); } - CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr)); + CHKiRet(doEnqSingleObj(pThis, flowCtlType, pMsg)); qqueueChkPersist(pThis, 1); -- cgit v1.2.3 From c28d92259b27eebca3892b9ad18d467691e5aacc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 13:27:27 +0100 Subject: queue: use specific deserializer for msg object spares lengthy table lookups --- runtime/queue.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index bdca61eb..c42b18ad 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -59,7 +59,6 @@ #include "datetime.h" #include "unicode-helper.h" #include "statsobj.h" -#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS # include @@ -919,7 +918,7 @@ finalize_it: static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; - iRet = obj.Deserialize(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); + iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL, msgConstruct, msgConstructFinalizer, MsgSetProperty); RETiRet; } @@ -933,7 +932,7 @@ static rsRetVal qDelDisk(qqueue_t *pThis) int64 offsOut; CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); - CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL)); + CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL, msgConstruct, msgConstructFinalizer, MsgSetProperty)); objDestruct(pDummyObj); CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); -- cgit v1.2.3 From 5a643669221363a49fb36cfb2acc64dc29b53a14 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 17:07:22 +0100 Subject: queue: remove time() calls from msg deserialization --- runtime/queue.c | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) (limited to 'runtime/queue.c') diff --git a/runtime/queue.c b/runtime/queue.c index c42b18ad..ed486037 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -918,22 +918,34 @@ finalize_it: static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; - iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL, msgConstruct, msgConstructFinalizer, MsgSetProperty); + iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, + NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgSetProperty); RETiRet; } +/* the following function is a dummy to be used for qDelDisk, which (currently) must + * provide constructors & others even though it does not really need the object. So + * instead of using the real ones, we use the dummy here. That at least saves some time. + * Of course, in the longer term the whole process should be refactored. + * rgerhards, 2012-11-03 + */ +static rsRetVal +qDelDiskCallbackDummy(void) +{ + return RS_RET_OK; +} static rsRetVal qDelDisk(qqueue_t *pThis) { - obj_t *pDummyObj; /* we need to deserialize it... */ + obj_t *pDummyObj; /* another dummy, nothing is created */ DEFiRet; int64 offsIn; int64 offsOut; CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); - CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL, msgConstruct, msgConstructFinalizer, MsgSetProperty)); - objDestruct(pDummyObj); + CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, + NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, qDelDiskCallbackDummy)); CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); /* This time it is a bit tricky: we free disk space only upon file deletion. So we need -- cgit v1.2.3