summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c695
1 files changed, 454 insertions, 241 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 4e017e84..8ef3e7db 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -8,7 +8,11 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008 Rainer Gerhards and Adiscon GmbH.
+ * NOTE: as of 2009-04-22, I have begin to remove the qqueue* prefix from static
+ * function names - this makes it really hard to read and does not provide much
+ * benefit, at least I (now) think so...
+ *
+ * Copyright 2008, 2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -61,23 +65,110 @@ DEFobjStaticHelpers
DEFobjCurrIf(glbl)
/* forward-definitions */
-rsRetVal qqueueChkPersist(qqueue_t *pThis);
+static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates);
static rsRetVal qqueueSetEnqOnly(qqueue_t *pThis, int bEnqOnly, int bLockMutex);
-static rsRetVal qqueueRateLimiter(qqueue_t *pThis);
+static rsRetVal RateLimiter(qqueue_t *pThis);
static int qqueueChkStopWrkrDA(qqueue_t *pThis);
+static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static int qqueueIsIdleDA(qqueue_t *pThis);
-static rsRetVal qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
-static rsRetVal qqueueConsumerCancelCleanup(void *arg1, void *arg2);
-static rsRetVal qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
+static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
+static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
#define QUEUE_NO_CHECKPOINT 0
+/***********************************************************************
+ * we need a private data structure, the "to-delete" list. As C does
+ * not provide any partly private data structures, we implement this
+ * structure right here inside the module.
+ * Note that this list must always be kept sorted based on a unique
+ * dequeue ID (which is monotonically increasing).
+ * rgerhards, 2009-05-18
+ ***********************************************************************/
+
+/* generate next uniqueue dequeue ID. Note that uniqueness is only required
+ * on a per-queue basis and while this instance runs. So a stricly monotonically
+ * increasing counter is sufficient (if enough bits are used).
+ */
+static inline qDeqID getNextDeqID(qqueue_t *pQueue)
+{
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ return pQueue->deqIDAdd++;
+}
+
+
+/* return the top element of the to-delete list or NULL, if the
+ * list is empty.
+ */
+static inline toDeleteLst_t *tdlPeek(qqueue_t *pQueue)
+{
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ return pQueue->toDeleteLst;
+}
+
+
+/* remove the top element of the to-delete list. Nothing but the
+ * element itself is destroyed. Must not be called when the list
+ * is empty.
+ */
+static inline rsRetVal tdlPop(qqueue_t *pQueue)
+{
+ toDeleteLst_t *pRemove;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ assert(pQueue->toDeleteLst != NULL);
+
+ pRemove = pQueue->toDeleteLst;
+ pQueue->toDeleteLst = pQueue->toDeleteLst->pNext;
+ free(pRemove);
+
+ RETiRet;
+}
+
+
+/* Add a new to-delete list entry. The function allocates the data
+ * structure, populates it with the values provided and links the new
+ * element into the correct place inside the list.
+ */
+static inline rsRetVal tdlAdd(qqueue_t *pQueue, qDeqID deqID, int nElem)
+{
+ toDeleteLst_t *pNew;
+ toDeleteLst_t *pPrev;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pQueue, qqueue);
+ assert(pQueue->toDeleteLst != NULL);
+
+ CHKmalloc(pNew = malloc(sizeof(toDeleteLst_t)));
+ pNew->deqID = deqID;
+ pNew->nElem = nElem;
+
+ /* now find right spot */
+ for( pPrev = pQueue->toDeleteLst
+ ; pPrev != NULL && deqID > pPrev->deqID
+ ; pPrev = pPrev->pNext) {
+ /*JUST SEARCH*/;
+ }
+
+ if(pPrev == NULL) {
+ pNew->pNext = pQueue->toDeleteLst;
+ pQueue->toDeleteLst = pNew;
+ } else {
+ pNew->pNext = pPrev->pNext;
+ pPrev->pNext = pNew;
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
/* methods */
-/* get the overall queue size, which includes ungotten objects. Must only be called
+/* get the overall queue size. Must only be called
* while mutex is locked!
* rgerhards, 2008-01-29
*/
@@ -86,11 +177,11 @@ qqueueGetOverallQueueSize(qqueue_t *pThis)
{
#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
BEGINfunc
-dbgoprint((obj_t*) pThis, "queue size: %d (regular %d, ungotten %d)\n",
- pThis->iQueueSize + pThis->iUngottenObjs, pThis->iQueueSize, pThis->iUngottenObjs);
+dbgoprint((obj_t*) pThis, "queue size: %d (regular %d)\n",
+ pThis->iQueueSize, pThis->iQueueSize);
ENDfunc
#endif
- return pThis->iQueueSize + pThis->iUngottenObjs;
+ return pThis->iQueueSize;
}
@@ -109,7 +200,9 @@ static inline void queueDrain(qqueue_t *pThis)
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
while(pThis->iQueueSize-- > 0) {
- pThis->qDel(pThis, &pUsr);
+ pThis->qDeq(pThis, &pUsr);
+// TODO: ULTRA
+ //pThis->qDel(pThis, &pUsr);
if(pUsr != NULL) {
objDestruct(pUsr);
}
@@ -369,9 +462,10 @@ qqueueInitDA(qqueue_t *pThis, int bEnqOnly, int bLockMutex)
CHKiRet(wtpConstruct (&pThis->pWtpDA));
CHKiRet(wtpSetDbgHdr (pThis->pWtpDA, pszBuf, lenBuf));
CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrDA));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
CHKiRet(wtpSetpfIsIdle (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleDA));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerDA));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) qqueueConsumerCancelCleanup));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerDA));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void*pWti)) ConsumerCancelCleanup));
CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueStartDA));
CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpDA, (rsRetVal (*)(void *pUsr)) qqueueTurnOffDAMode));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
@@ -466,6 +560,7 @@ static rsRetVal qConstructFixedArray(qqueue_t *pThis)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}
+ pThis->tVars.farray.deqhead = 0;
pThis->tVars.farray.head = 0;
pThis->tVars.farray.tail = 0;
@@ -483,9 +578,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis)
ASSERT(pThis != NULL);
queueDrain(pThis); /* discard any remaining queue entries */
-
- if(pThis->tVars.farray.pBuf != NULL)
- free(pThis->tVars.farray.pBuf);
+ free(pThis->tVars.farray.pBuf);
RETiRet;
}
@@ -504,13 +597,29 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in)
RETiRet;
}
-static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out)
+
+static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.deqhead];
+
+//MULTIdbgprintf("ULTRA qDeqFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail);
+ pThis->tVars.farray.deqhead++;
+ if (pThis->tVars.farray.deqhead == pThis->iMaxQueueSize)
+ pThis->tVars.farray.deqhead = 0;
+
+ RETiRet;
+}
+
+static rsRetVal qDelFixedArray(qqueue_t *pThis)
{
DEFiRet;
ASSERT(pThis != NULL);
- *out = (void*) pThis->tVars.farray.pBuf[pThis->tVars.farray.head];
+//MULTIdbgprintf("ULTRA qDelFA, deqhead=%d head=%d, tail=%d\n", pThis->tVars.farray.deqhead, pThis->tVars.farray.head, pThis->tVars.farray.tail);
pThis->tVars.farray.head++;
if (pThis->tVars.farray.head == pThis->iMaxQueueSize)
pThis->tVars.farray.head = 0;
@@ -525,15 +634,13 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis, void **out)
static inline rsRetVal qqueueAddLinkedList(qLinkedList_t **ppRoot, qLinkedList_t **ppLast, void* pUsr)
{
- DEFiRet;
qLinkedList_t *pEntry;
+ DEFiRet;
ASSERT(ppRoot != NULL);
ASSERT(ppLast != NULL);
- if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
pEntry->pUsr = pUsr;
@@ -582,8 +689,9 @@ static rsRetVal qConstructLinkedList(qqueue_t *pThis)
ASSERT(pThis != NULL);
- pThis->tVars.linklist.pRoot = 0;
- pThis->tVars.linklist.pLast = 0;
+ pThis->tVars.linklist.pDeqRoot = NULL;
+ pThis->tVars.linklist.pDelRoot = NULL;
+ pThis->tVars.linklist.pLast = NULL;
qqueueChkIsDA(pThis);
@@ -606,54 +714,60 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr)
{
- DEFiRet;
-
- iRet = qqueueAddLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, pUsr);
-#if 0
qLinkedList_t *pEntry;
+ DEFiRet;
- ASSERT(pThis != NULL);
- if((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc((pEntry = (qLinkedList_t*) malloc(sizeof(qLinkedList_t))));
pEntry->pNext = NULL;
pEntry->pUsr = pUsr;
- if(pThis->tVars.linklist.pRoot == NULL) {
- pThis->tVars.linklist.pRoot = pThis->tVars.linklist.pLast = pEntry;
+ if(pThis->tVars.linklist.pDelRoot == NULL) {
+ pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry;
} else {
pThis->tVars.linklist.pLast->pNext = pEntry;
pThis->tVars.linklist.pLast = pEntry;
}
+ if(pThis->tVars.linklist.pDeqRoot == NULL) {
+ pThis->tVars.linklist.pDeqRoot = pEntry;
+ }
+RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot);
+
finalize_it:
-#endif
RETiRet;
}
-static rsRetVal qDelLinkedList(qqueue_t *pThis, obj_t **ppUsr)
+
+static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
{
- DEFiRet;
- iRet = qqueueDelLinkedList(&pThis->tVars.linklist.pRoot, &pThis->tVars.linklist.pLast, ppUsr);
-#if 0
qLinkedList_t *pEntry;
+ DEFiRet;
- ASSERT(pThis != NULL);
- ASSERT(pThis->tVars.linklist.pRoot != NULL);
-
- pEntry = pThis->tVars.linklist.pRoot;
+RUNLOG_VAR("%p", pThis->tVars.linklist.pDeqRoot);
+ pEntry = pThis->tVars.linklist.pDeqRoot;
*ppUsr = pEntry->pUsr;
+ pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
+
+ RETiRet;
+}
+
- if(pThis->tVars.linklist.pRoot == pThis->tVars.linklist.pLast) {
- pThis->tVars.linklist.pRoot = NULL;
- pThis->tVars.linklist.pLast = NULL;
+static rsRetVal qDelLinkedList(qqueue_t *pThis)
+{
+ qLinkedList_t *pEntry;
+ DEFiRet;
+
+ pEntry = pThis->tVars.linklist.pDelRoot;
+
+ if(pThis->tVars.linklist.pDelRoot == pThis->tVars.linklist.pLast) {
+ pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = NULL;
} else {
- pThis->tVars.linklist.pRoot = pEntry->pNext;
+ pThis->tVars.linklist.pDelRoot = pEntry->pNext;
}
+
free(pEntry);
-#endif
RETiRet;
}
@@ -722,8 +836,6 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
struct stat stat_buf;
- int iUngottenObjs;
- obj_t *pUsr;
ISOBJ_TYPE_assert(pThis, qqueue);
@@ -753,25 +865,32 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
/* first, we try to read the property bag for ourselfs */
CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF));
- /* then the ungotten object queue */
- iUngottenObjs = pThis->iUngottenObjs;
- pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */
-
- while(iUngottenObjs > 0) {
- /* fill the queue from disk */
- CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
- qqueueUngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
- --iUngottenObjs; /* one less */
- }
-
- /* and now the stream objects (some order as when persisted!) */
+ /* then the stream objects (same order as when persisted!) */
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- CHKiRet(obj.Deserialize(&pThis->tVars.disk.pRead, (uchar*) "strm", psQIF,
+ CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pWrite));
- CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pRead));
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDel));
+
+ /* we now need to take care of the Deq handle. It is not persisted, so we can create
+ * a virgin copy based on pReadDel. // TODO duplicat code, same as blow - single function!
+ */
+
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
+ CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq));
+
+ /* TODO: dirty, need stream methods --> */
+ pThis->tVars.disk.pReadDeq->iCurrFNum = pThis->tVars.disk.pReadDel->iCurrFNum;
+ pThis->tVars.disk.pReadDeq->iCurrOffs = pThis->tVars.disk.pReadDel->iCurrOffs;
+ /* <-- dirty, need stream methods :TODO */
+ CHKiRet(strmSeekCurrOffs(pThis->tVars.disk.pReadDeq));
/* OK, we could successfully read the file, so we now can request that it be
* deleted when we are done with the persisted information.
@@ -822,17 +941,25 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strmSetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
CHKiRet(strmConstructFinalize(pThis->tVars.disk.pWrite));
- CHKiRet(strmConstruct(&pThis->tVars.disk.pRead));
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
- CHKiRet(strmSetDir(pThis->tVars.disk.pRead, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
- CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pRead, 10000000));
- CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pRead, STREAMMODE_READ));
- CHKiRet(strmSetsType(pThis->tVars.disk.pRead, STREAMTYPE_FILE_CIRCULAR));
- CHKiRet(strmConstructFinalize(pThis->tVars.disk.pRead));
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDeq));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
+ CHKiRet(strmSetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDeq));
+ CHKiRet(strmConstruct(&pThis->tVars.disk.pReadDel));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
+ CHKiRet(strmSetDir(pThis->tVars.disk.pReadDel, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strmSetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000));
+ CHKiRet(strmSettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ));
+ CHKiRet(strmSetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR));
+ CHKiRet(strmConstructFinalize(pThis->tVars.disk.pReadDel));
- CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
- CHKiRet(strmSetFName(pThis->tVars.disk.pRead, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pReadDeq, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(strmSetFName(pThis->tVars.disk.pReadDel, pThis->pszFilePrefix, pThis->lenFilePrefix));
}
/* now we set (and overwrite in case of a persisted restart) some parameters which
@@ -841,7 +968,8 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
* ability to read existing queue files. -- rgerhards, 2008-01-12
*/
CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pWrite, pThis->iMaxFileSize));
- CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pRead, pThis->iMaxFileSize));
+ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pReadDeq, pThis->iMaxFileSize));
+ CHKiRet(strmSetiMaxFileSize(pThis->tVars.disk.pReadDel, pThis->iMaxFileSize));
finalize_it:
RETiRet;
@@ -855,7 +983,8 @@ static rsRetVal qDestructDisk(qqueue_t *pThis)
ASSERT(pThis != NULL);
strmDestruct(&pThis->tVars.disk.pWrite);
- strmDestruct(&pThis->tVars.disk.pRead);
+ strmDestruct(&pThis->tVars.disk.pReadDeq);
+ strmDestruct(&pThis->tVars.disk.pReadDel);
RETiRet;
}
@@ -887,16 +1016,30 @@ finalize_it:
RETiRet;
}
-static rsRetVal qDelDisk(qqueue_t *pThis, void **ppUsr)
+
+static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr)
+{
+ DEFiRet;
+
+ CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL));
+
+finalize_it:
+ RETiRet;
+}
+
+
+static rsRetVal qDelDisk(qqueue_t *pThis)
{
+ obj_t *pDummyObj; /* we need to deserialize it... */
DEFiRet;
int64 offsIn;
int64 offsOut;
- CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsIn));
- CHKiRet(obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pRead, NULL, NULL));
- CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pRead, &offsOut));
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn));
+ CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL));
+ objDestruct(pDummyObj);
+ CHKiRet(strmGetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut));
/* This time it is a bit tricky: we free disk space only upon file deletion. So we need
* to keep track of what we have read until we get an out-offset that is lower than the
@@ -917,6 +1060,7 @@ finalize_it:
RETiRet;
}
+
/* -------------------- direct (no queueing) -------------------- */
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -931,6 +1075,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
+ batch_t singleBatch;
+ batch_obj_t batchObj;
DEFiRet;
ASSERT(pThis != NULL);
@@ -940,13 +1086,20 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
+ * We use our knowledge about the batch_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pUsr, pUsr);
+ batchObj.state = BATCH_STATE_RDY;
+ batchObj.pUsrp = (obj_t*) pUsr;
+ singleBatch.nElem = 1; /* there always is only one in direct mode */
+ singleBatch.pElem = &batchObj;
+ iRet = pThis->pConsumer(pThis->pUsr, &singleBatch);
RETiRet;
}
-static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute__((unused)) void **out)
+
+static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
}
@@ -955,57 +1108,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis, __attribute_
/* --------------- end type-specific handlers -------------------- */
-/* unget a user pointer that has been dequeued. This functionality is especially important
- * for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers
- * is maintened in memory.
- * rgerhards, 2008-01-20
- */
-static rsRetVal
-qqueueUngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
-{
- DEFiRet;
- DEFVARS_mutexProtection;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15
- The second time I noticed it the queue was in destruction with NO worker threads
- running. The pUsr ptr was totally off and provided no clue what it may be pointing
- at (except that it looked like the static data pool). Both times, the abort happend
- inside an action queue */
-
- dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr));
- BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
- iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
- ++pThis->iUngottenObjs; /* indicate one more */
- END_MTX_PROTECTED_OPERATIONS(pThis->mut);
-
- RETiRet;
-}
-
-
-/* dequeues a user pointer from the ungotten queue. Pointers from there should always be
- * dequeued first.
- *
- * This function must only be called when the mutex is locked!
- *
- * rgerhards, 2008-01-29
- */
-static rsRetVal
-qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
-{
- DEFiRet;
-
- ISOBJ_TYPE_assert(pThis, qqueue);
- ASSERT(ppUsr != NULL);
-
- iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
- --pThis->iUngottenObjs; /* indicate one less */
- dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
-
- RETiRet;
-}
-
-
/* generic code to add a queue entry
* We use some specific code to most efficiently support direct mode
* queues. This is justified in spite of the gain and the need to do some
@@ -1031,8 +1133,6 @@ finalize_it:
/* generic code to remove a queue entry
- * rgerhards, 2008-01-29: we must first see if there is any object in the
- * ungotten list and, if so, dequeue it first.
*/
static rsRetVal
qqueueDel(qqueue_t *pThis, void *pUsr)
@@ -1046,12 +1146,8 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* If we decrement, however, we may lose a message. But that is better than
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
- if(pThis->iUngottenObjs > 0) {
- iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
- } else {
- iRet = pThis->qDel(pThis, pUsr);
- ATOMIC_DEC(pThis->iQueueSize);
- }
+ iRet = pThis->qDeq(pThis, pUsr);
+ ATOMIC_DEC(pThis->iQueueSize);
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
iRet, pThis->iQueueSize);
@@ -1275,7 +1371,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1305,6 +1401,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
pThis->iDeqtWinToHr = 25; /* disable time-windowed dequeuing by default */
+ pThis->iDeqBatchSize = 8; /* conservative default, should still provide good performance */
pThis->pszFilePrefix = NULL;
pThis->qType = qType;
@@ -1315,18 +1412,21 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->qConstruct = qConstructFixedArray;
pThis->qDestruct = qDestructFixedArray;
pThis->qAdd = qAddFixedArray;
+ pThis->qDeq = qDeqFixedArray;
pThis->qDel = qDelFixedArray;
break;
case QUEUETYPE_LINKEDLIST:
pThis->qConstruct = qConstructLinkedList;
pThis->qDestruct = qDestructLinkedList;
pThis->qAdd = qAddLinkedList;
- pThis->qDel = (rsRetVal (*)(qqueue_t*,void**)) qDelLinkedList;
+ pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
+ pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
break;
case QUEUETYPE_DISK:
pThis->qConstruct = qConstructDisk;
pThis->qDestruct = qDestructDisk;
pThis->qAdd = qAddDisk;
+ pThis->qDeq = qDeqDisk;
pThis->qDel = qDelDisk;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
@@ -1354,8 +1454,10 @@ finalize_it:
* rgerhards, 2008-01-16
*/
static rsRetVal
-qqueueConsumerCancelCleanup(void *arg1, void *arg2)
+ConsumerCancelCleanup(void *arg1, void *arg2)
{
+ //TODO: looks like we no longer need it!
+ /*
DEFiRet;
qqueue_t *pThis = (qqueue_t*) arg1;
@@ -1363,14 +1465,9 @@ qqueueConsumerCancelCleanup(void *arg1, void *arg2)
ISOBJ_TYPE_assert(pThis, qqueue);
- if(pUsr != NULL) {
- /* make sure the data element is not lost */
- dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
- CHKiRet(qqueueUngetObj(pThis, pUsr, LOCK_MUTEX));
- }
-
-finalize_it:
RETiRet;
+ */
+ return RS_RET_OK;
}
@@ -1415,38 +1512,169 @@ finalize_it:
}
-/* dequeue the queued object for the queue consumers.
- * rgerhards, 2008-10-21
+/* Finally remove n elements from the queue store.
*/
-static rsRetVal
-qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+static inline rsRetVal
+DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
+{
+ int i;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+
+ /* now send delete request to storage driver */
+ for(i = 0 ; i < nElem ; ++i) {
+ pThis->qDel(pThis);
+ }
+
+ ++pThis->deqIDDel; /* one more batch dequeued */
+
+ RETiRet;
+}
+
+
+/* remove messages from the physical queue store that are fully processed. This is
+ * controlled via the to-delete list. We can only delete those elements, that are
+ * at the current physical tail of the queue. If the batch is from another position,
+ * we schedule it for deletion, but actual deletion will happen at a later call
+ * of this function here. We always delete as much as possible, which includes
+ * picking up things from the to-delete list.
+ */
+static inline rsRetVal
+DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch)
{
+ toDeleteLst_t *pTdl;
+ qDeqID deqIDDel;
DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pBatch != NULL);
+
+ pTdl = tdlPeek(pThis);
+ if(pTdl == NULL) {
+ DoDeleteBatchFromQStore(pThis, pBatch->nElem);
+ } else if(pBatch->deqID == pThis->deqIDDel) {
+ deqIDDel = pThis->deqIDDel;
+ pTdl = tdlPeek(pThis);
+ while(pTdl != NULL && deqIDDel == pTdl->deqID) {
+ DoDeleteBatchFromQStore(pThis, pTdl->nElem);
+ tdlPop(pThis);
+ ++deqIDDel;
+ pTdl = tdlPeek(pThis);
+ }
+ } else {
+ /* can not delete, insert into to-delete list */
+ CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem));
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Delete a batch of processed user objects from the queue, which includes
+ * destructing the objects themself.
+ * rgerhards, 2009-05-13
+ */
+static inline rsRetVal
+DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
+{
+ int i;
void *pUsr;
- int iQueueSize;
- int bRunsDA; /* cache for early mutex release */
-
- /* dequeue element (still protected from mutex) */
- iRet = qqueueDel(pThis, &pUsr);
- qqueueChkPersist(pThis);
- iQueueSize = qqueueGetOverallQueueSize(pThis); /* cache this for after mutex release */
- bRunsDA = pThis->bRunsDA; /* cache this for after mutex release */
-
- /* We now need to save the user pointer for the cancel cleanup handler, BUT ONLY
- * if we could successfully obtain a user pointer. Otherwise, we would bring the
- * cancel cleanup handler into big troubles (and we did ;)). Note that we can
- * NOT set the variable further below, as this may lead to an object leak. We
- * may get cancelled before we reach that part of the code, so the only
- * solution is to do it here. -- rgerhards, 2008-02-27
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(pBatch != NULL);
+// TODO: ULTRA: lock qaueue mutex if instructed to do so
+
+ /* if the queue runs in DA mode, the DA worker already deleted the in-memory representation
+ * of the message. But in regular mode, we need to do it ourselfs. We differentiate between
+ * the two cases, because it is actually the easiest way to handle the destruct-Problem in
+ * a simple and pUsrp-Type agnostic way (else we would need an objAddRef() generic function).
*/
- if(iRet == RS_RET_OK) {
- pWti->pUsrp = pUsr;
+ if(!pThis->bRunsDA) {
+ for(i = 0 ; i < pBatch->nElem ; ++i) {
+ pUsr = pBatch->pElem[i].pUsrp;
+ objDestruct(pUsr);
+ }
}
+ iRet = DeleteBatchFromQStore(pThis, pBatch);
+
+ pBatch->nElem = 0; /* reset batch */
+
+ RETiRet;
+}
+
+
+/* dequeue as many user pointers as are available, until we hit the configured
+ * upper limit of pointers.
+ * This must only be called when the queue mutex is LOOKED, otherwise serious
+ * malfunction will happen.
+ */
+static inline rsRetVal
+DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSize)
+{
+ int nDequeued;
+ int iQueueSize;
+ void *pUsr;
+ rsRetVal localRet;
+ DEFiRet;
+
+ /* this is the place to destruct the old messages and pull them off the queue - MULTI-DEQUEUE */
+ DeleteProcessedBatch(pThis, &pWti->batch);
+
+ nDequeued = 0;
+ do {
+dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
+ CHKiRet(qqueueDel(pThis, &pUsr));
+ iQueueSize = qqueueGetOverallQueueSize(pThis);
+
+ /* check if we should discard this element */
+ localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pThis->bRunsDA, pUsr);
+ if(localRet == RS_RET_QUEUE_FULL)
+ continue;
+ else if(localRet != RS_RET_OK)
+ ABORT_FINALIZE(localRet);
+
+ /* all well, use this element */
+ pWti->batch.pElem[nDequeued].pUsrp = pUsr;
+ pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY;
+ ++nDequeued;
+ } while(iQueueSize > 0 && nDequeued < pThis->iDeqBatchSize);
+
+ qqueueChkPersist(pThis, nDequeued); /* it is sufficient to persist only when the bulk of work is done */
+
+ pWti->batch.nElem = nDequeued;
+ pWti->batch.deqID = getNextDeqID(pThis);
+ *piRemainingQueueSize = iQueueSize;
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* dequeue the queued object for the queue consumers.
+ * rgerhards, 2008-10-21
+ * I made a radical change - we now dequeue multiple elements, and store these objects in
+ * an array of user pointers. We expect that this increases performance.
+ * rgerhards, 2009-04-22
+ */
+static rsRetVal
+DequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+{
+ DEFiRet;
+ int iQueueSize = 0; /* keep the compiler happy... */
+
+ /* dequeue element batch (still protected from mutex) */
+ iRet = DequeueConsumableElements(pThis, pWti, &iQueueSize);
+
/* awake some flow-controlled sources if we can do this right now */
/* TODO: this could be done better from a performance point of view -- do it only if
* we have someone waiting for the condition (or only when we hit the watermark right
* on the nail [exact value]) -- rgerhards, 2008-03-14
+ * now that we dequeue batches of pointers, this is much less an issue...
+ * rgerhards, 2009-04-22
*/
if(iQueueSize < pThis->iFullDlyMrk) {
pthread_cond_broadcast(&pThis->belowFullDlyWtrMrk);
@@ -1456,37 +1684,16 @@ qqueueDequeueConsumable(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
- /* rgerhards, 2008-09-30: I reversed the order of cond_signal und mutex_unlock
- * as of the pthreads recommendation on predictable scheduling behaviour. I don't see
- * any problems caused by this, but I add this comment in case some will be seen
- * in the next time.
- */
pthread_cond_signal(&pThis->notFull);
d_pthread_mutex_unlock(pThis->mut);
pthread_setcancelstate(iCancelStateSave, NULL);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
- /* do actual processing (the lengthy part, runs in parallel)
- * If we had a problem while dequeing, we do not call the consumer,
- * but we otherwise ignore it. This is in the hopes that it will be
- * self-healing. However, this is really not a good thing.
- * rgerhards, 2008-01-03
- */
- if(iRet != RS_RET_OK)
- FINALIZE;
-
- /* we are running in normal, non-disk-assisted mode do a quick check if we need to drain the queue.
- * In DA mode, we do not discard any messages as we assume the disk subsystem is fast enough to
- * provide real-time creation of spool files.
- * Note: It is OK to use the cached iQueueSize here, because it does not hurt if it is slightly wrong.
- */
- CHKiRet(qqueueChkDiscardMsg(pThis, iQueueSize, bRunsDA, pUsr));
-
-finalize_it:
if(iRet != RS_RET_OK && iRet != RS_RET_DISCARDMSG) {
dbgoprint((obj_t*) pThis, "error %d dequeueing element - ignoring, but strange things "
"may happen\n", iRet);
}
+
RETiRet;
}
@@ -1529,7 +1736,7 @@ finalize_it:
* but you get the idea from the code above.
*/
static rsRetVal
-qqueueRateLimiter(qqueue_t *pThis)
+RateLimiter(qqueue_t *pThis)
{
DEFiRet;
int iDelay;
@@ -1592,19 +1799,20 @@ qqueueRateLimiter(qqueue_t *pThis)
* rgerhards, 2008-01-21
*/
static rsRetVal
-qqueueConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->pUsrp));
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
*/
+//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that...
if(pThis->iDeqSlowdown) {
dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
pThis->iDeqSlowdown);
@@ -1616,7 +1824,7 @@ finalize_it:
}
-/* This is a special consumer to feed the disk-queue in disk-assited mode.
+/* This is a special consumer to feed the disk-queue in disk-assisted mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
* to the disk queue. The disk queue will then call the actual consumer from
@@ -1626,15 +1834,18 @@ finalize_it:
* rgerhards, 2008-01-14
*/
static rsRetVal
-qqueueConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
+ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
- CHKiRet(qqueueDequeueConsumable(pThis, pWti, iCancelStateSave));
- CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->pUsrp));
+ CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
+ /* iterate over returned results and enqueue them in DA queue */
+ for(i = 0 ; i < pWti->batch.nElem ; i++)
+ CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp));
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
@@ -1692,12 +1903,24 @@ qqueueChkStopWrkrDA(qqueue_t *pThis)
* the DA queue
*/
static int
-qqueueChkStopWrkrReg(qqueue_t *pThis)
+ChkStooWrkrReg(qqueue_t *pThis)
{
return pThis->bEnqOnly || pThis->bRunsDA || (pThis->pqParent != NULL && qqueueGetOverallQueueSize(pThis) == 0);
}
+/* return the configured "deq max at once" interval
+ * rgerhards, 2009-04-22
+ */
+static rsRetVal
+GetDeqBatchSize(qqueue_t *pThis, int *pVal)
+{
+ DEFiRet;
+ assert(pVal != NULL);
+ *pVal = pThis->iDeqBatchSize;
+ RETiRet;
+}
+
/* must only be called when the queue mutex is locked, else results
* are not stable! DA queue version
*/
@@ -1712,7 +1935,7 @@ qqueueIsIdleDA(qqueue_t *pThis)
* are not stable! Regular queue version
*/
static int
-qqueueIsIdleReg(qqueue_t *pThis)
+IsIdleReg(qqueue_t *pThis)
{
#if 0 /* enable for performance testing */
int ret;
@@ -1740,7 +1963,7 @@ qqueueIsIdleReg(qqueue_t *pThis)
* I am telling this, because I, too, always get confused by those...
*/
static rsRetVal
-qqueueRegOnWrkrShutdown(qqueue_t *pThis)
+RegOnWrkrShutdown(qqueue_t *pThis)
{
DEFiRet;
@@ -1761,7 +1984,7 @@ qqueueRegOnWrkrShutdown(qqueue_t *pThis)
* hook to indicate in the parent queue (if we are a child) that we are not done yet.
*/
static rsRetVal
-qqueueRegOnWrkrStartup(qqueue_t *pThis)
+RegOnWrkrStartup(qqueue_t *pThis)
{
DEFiRet;
@@ -1815,10 +2038,10 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(pThis->qConstruct(pThis)); /* this also sets bIsDA */
dbgoprint((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, qsize %d, child %d, "
- "full delay %d, light delay %d starting\n",
+ "full delay %d, light delay %d, deq batch size %d starting\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize,
qqueueGetOverallQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1,
- pThis->iFullDlyMrk, pThis->iLightDlyMrk);
+ pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize);
if(pThis->qType == QUEUETYPE_DIRECT)
FINALIZE; /* with direct queues, we are already finished... */
@@ -1829,13 +2052,14 @@ rsRetVal qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
lenBuf = snprintf((char*)pszBuf, sizeof(pszBuf), "%s:Reg", obj.GetName((obj_t*) pThis));
CHKiRet(wtpConstruct (&pThis->pWtpReg));
CHKiRet(wtpSetDbgHdr (pThis->pWtpReg, pszBuf, lenBuf));
- CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRateLimiter));
- CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueChkStopWrkrReg));
- CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) qqueueIsIdleReg));
- CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) qqueueConsumerReg));
- CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))qqueueConsumerCancelCleanup));
- CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrStartup));
- CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) qqueueRegOnWrkrShutdown));
+ CHKiRet(wtpSetpfRateLimiter (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RateLimiter));
+ CHKiRet(wtpSetpfChkStopWrkr (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) ChkStooWrkrReg));
+ CHKiRet(wtpSetpfGetDeqBatchSize (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int*)) GetDeqBatchSize));
+ CHKiRet(wtpSetpfIsIdle (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, int)) IsIdleReg));
+ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti, int)) ConsumerReg));
+ CHKiRet(wtpSetpfOnWorkerCancel (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void*pWti))ConsumerCancelCleanup));
+ CHKiRet(wtpSetpfOnWorkerStartup (pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrStartup));
+ CHKiRet(wtpSetpfOnWorkerShutdown(pThis->pWtpReg, (rsRetVal (*)(void *pUsr)) RegOnWrkrShutdown));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
@@ -1889,7 +2113,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
strm_t *psQIF = NULL; /* Queue Info File */
uchar pszQIFNam[MAXFNAME];
size_t lenQIFNam;
- obj_t *pUsr;
ASSERT(pThis != NULL);
@@ -1917,7 +2140,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
pThis->bNeedDelQIF = 0;
}
/* indicate spool file needs to be deleted */
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 1));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
FINALIZE; /* nothing left to do, so be happy */
}
@@ -1936,29 +2159,19 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
*/
CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
- objSerializeSCALAR(psQIF, iUngottenObjs, INT);
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
CHKiRet(obj.EndSerialize(psQIF));
- /* now we must persist all objects on the ungotten queue - they can not go to
- * to the regular files. -- rgerhards, 2008-01-29
- */
- while(pThis->iUngottenObjs > 0) {
- CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
- CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
- objDestruct(pUsr);
- }
-
/* now persist the stream info */
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
- CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
+ CHKiRet(strmSerialize(pThis->tVars.disk.pReadDel, psQIF));
/* tell the input file object that it must not delete the file on close if the queue
* is non-empty - but only if we are not during a simple checkpoint
*/
if(bIsCheckpoint != QUEUE_CHECKPOINT) {
- CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pRead, 0));
+ CHKiRet(strmSetbDeleteOnClose(pThis->tVars.disk.pReadDel, 0));
}
/* we have persisted the queue object. So whenever it comes to an empty queue,
@@ -1975,17 +2188,21 @@ finalize_it:
/* check if we need to persist the current queue info. If an
- * error occurs, thus should be ignored by caller (but we still
+ * error occurs, this should be ignored by caller (but we still
* abide to our regular call interface)...
* rgerhards, 2008-01-13
+ * nUpdates is the number of updates since the last call to this function.
+ * It may be > 1 due to batches. -- rgerhards, 2009-05-12
*/
-rsRetVal qqueueChkPersist(qqueue_t *pThis)
+static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
+ assert(nUpdates > 0);
- if(pThis->iPersistUpdCnt && ++pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
+ pThis->iUpdsSincePersist += nUpdates;
+ if(pThis->iPersistUpdCnt && pThis->iUpdsSincePersist >= pThis->iPersistUpdCnt) {
qqueuePersist(pThis, QUEUE_CHECKPOINT);
pThis->iUpdsSincePersist = 0;
}
@@ -2061,11 +2278,8 @@ CODESTARTobjDestruct(qqueue)
/* type-specific destructor */
iRet = pThis->qDestruct(pThis);
- if(pThis->pszFilePrefix != NULL)
- free(pThis->pszFilePrefix);
-
- if(pThis->pszSpoolDir != NULL)
- free(pThis->pszSpoolDir);
+ free(pThis->pszFilePrefix);
+ free(pThis->pszSpoolDir);
ENDobjDestruct(qqueue)
@@ -2079,8 +2293,8 @@ qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix)
{
DEFiRet;
- if(pThis->pszFilePrefix != NULL)
- free(pThis->pszFilePrefix);
+ free(pThis->pszFilePrefix);
+ pThis->pszFilePrefix = NULL;
if(pszPrefix == NULL) /* just unset the prefix! */
ABORT_FINALIZE(RS_RET_OK);
@@ -2203,7 +2417,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr)
/* and finally enqueue the message */
CHKiRet(qqueueAdd(pThis, pUsr));
- qqueueChkPersist(pThis);
+ qqueueChkPersist(pThis, 1);
finalize_it:
if(pThis->qType != QUEUETYPE_DIRECT) {
@@ -2296,6 +2510,7 @@ DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int)
DEFpropSetMeth(qqueue, bSaveOnShutdown, int)
DEFpropSetMeth(qqueue, pUsr, void*)
DEFpropSetMeth(qqueue, iDeqSlowdown, int)
+DEFpropSetMeth(qqueue, iDeqBatchSize, int)
DEFpropSetMeth(qqueue, sizeOnDiskMax, int64)
@@ -2314,8 +2529,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
if(isProp("iQueueSize")) {
pThis->iQueueSize = pProp->val.num;
- } else if(isProp("iUngottenObjs")) {
- pThis->iUngottenObjs = pProp->val.num;
} else if(isProp("tVars.disk.sizeOnDisk")) {
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
} else if(isProp("tVars.disk.bytesRead")) {