summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-04-22 16:39:58 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-04-22 16:39:58 +0200
commite4b3f6d287d74b34d27b4e296c33cb3f1294a58c (patch)
treedd75cbbc5f857ccdb6d835fb2bf850ed1527b6ff /runtime
parent7667845bd72b6f92eabc975318a4f288a77f2630 (diff)
downloadrsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.tar.gz
rsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.tar.bz2
rsyslog-e4b3f6d287d74b34d27b4e296c33cb3f1294a58c.zip
now batches are handed down to the actual consumer
... but the action consumer does not do anything really intelligent with them. But the DA consumer is already done, as is the main message queue consumer.
Diffstat (limited to 'runtime')
-rw-r--r--runtime/queue.c30
-rw-r--r--runtime/queue.h9
2 files changed, 21 insertions, 18 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index c48eb724..ea8567ea 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -899,6 +899,8 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
{
+ aUsrp_t aUsrp;
+ obj_t *pMsgp;
DEFiRet;
ASSERT(pThis != NULL);
@@ -908,8 +910,13 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr)
* mode the consumer probably has a lot to convey (which get's lost in the other modes
* because they are asynchronous. But direct mode is deliberately synchronous.
* rgerhards, 2008-02-12
+ * We use our knowledge about the aUsrp_t structure below, but without that, we
+ * pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pUsr, pUsr);
+ pMsgp = (obj_t*) pUsr;
+ aUsrp.nElem = 1; /* there always is only one in direct mode */
+ aUsrp.pUsrp = &pMsgp;
+ iRet = pThis->pConsumer(pThis->pUsr, &aUsrp);
RETiRet;
}
@@ -959,7 +966,7 @@ UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
* rgerhards, 2008-01-29
*/
static rsRetVal
-qqueueGetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
+GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
{
DEFiRet;
@@ -1015,7 +1022,7 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
* losing the whole process because it loops... -- rgerhards, 2008-01-03
*/
if(pThis->iUngottenObjs > 0) {
- iRet = qqueueGetUngottenObj(pThis, (obj_t**) pUsr);
+ iRet = GetUngottenObj(pThis, (obj_t**) pUsr);
} else {
iRet = pThis->qDel(pThis, pUsr);
ATOMIC_DEC(pThis->iQueueSize);
@@ -1243,7 +1250,7 @@ static rsRetVal qqueueShutdownWorkers(qqueue_t *pThis)
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1578,14 +1585,12 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
-// MULTIQUEUE: here we need to iterate through array! - or better pass it as whole? ... probably
- int i;
- for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
- CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp->pUsrp[i]));
+ CHKiRet(pThis->pConsumer(pThis->pUsr, pWti->paUsrp));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
*/
+//TODO: MULTIQUEUE: the following setting is no longer correct - need to think about how to do that...
if(pThis->iDeqSlowdown) {
dbgoprint((obj_t*) pThis, "sleeping %d microseconds as requested by config params\n",
pThis->iDeqSlowdown);
@@ -1597,7 +1602,7 @@ finalize_it:
}
-/* This is a special consumer to feed the disk-queue in disk-assited mode.
+/* This is a special consumer to feed the disk-queue in disk-assisted mode.
* When active, our own queue more or less acts as a memory buffer to the disk.
* So this consumer just needs to drain the memory queue and submit entries
* to the disk queue. The disk queue will then call the actual consumer from
@@ -1609,18 +1614,17 @@ finalize_it:
static rsRetVal
ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
{
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
ISOBJ_TYPE_assert(pWti, wti);
CHKiRet(DequeueConsumable(pThis, pWti, iCancelStateSave));
-// MULTIQUEUE:
- int i;
+ /* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->paUsrp->nElem ; i++)
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->paUsrp->pUsrp[i]));
-
finalize_it:
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
RETiRet;
@@ -1944,7 +1948,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
* to the regular files. -- rgerhards, 2008-01-29
*/
while(pThis->iUngottenObjs > 0) {
- CHKiRet(qqueueGetUngottenObj(pThis, &pUsr));
+ CHKiRet(GetUngottenObj(pThis, &pUsr));
CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
objDestruct(pUsr);
}
diff --git a/runtime/queue.h b/runtime/queue.h
index 7ecb9294..4fb57d07 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -100,11 +100,10 @@ typedef struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
- rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
- * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
- * to message)
- * rgerhards, 2008-01-28
+ * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
+ * is pointer to an array of message message pointers)
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@@ -185,7 +184,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*));
PROTOTYPEObjClassInit(qqueue);
PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);