summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c34
1 files changed, 32 insertions, 2 deletions
diff --git a/action.c b/action.c
index 03073153..be3c7556 100644
--- a/action.c
+++ b/action.c
@@ -42,10 +42,11 @@
#include "cfsysline.h"
#include "srUtils.h"
#include "errmsg.h"
+#include "wti.h"
#include "datetime.h"
/* forward definitions */
-rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg);
+rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -63,6 +64,7 @@ static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragme
/* main message queue and its configuration parameters */
static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
static int iActionQueueSize = 1000; /* size of the main message queue above */
+static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */
static int iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
static int iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
static int iActionQDiscardMark = 9800; /* begin to discard messages */
@@ -143,6 +145,7 @@ actionResetQueueParams(void)
ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
iActionQueueSize = 1000; /* size of the main message queue above */
+ iActionQueueDeqBatchSize = 16; /* default batch size */
iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
iActionQDiscardMark = 9800; /* begin to discard messages */
@@ -255,7 +258,8 @@ actionConstructFinalize(action_t *pThis)
* to be run on multiple threads. So far, this is forbidden by the interface
* spec. -- rgerhards, 2008-01-30
*/
- CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
+ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
+ (rsRetVal (*)(void*,aUsrp_t*))actionCallDoActionMULTIQUEUE));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -270,6 +274,7 @@ actionConstructFinalize(action_t *pThis)
qqueueSetpUsr(pThis->pQueue, pThis);
setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
+ setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", iActionQueueDeqBatchSize);
setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
@@ -415,6 +420,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
+//MULTIQUEUE: think about these two functions below
/* call the DoAction output plugin entry point
* rgerhards, 2008-01-28
*/
@@ -527,6 +533,29 @@ finalize_it:
#pragma GCC diagnostic warning "-Wempty-body"
+/* receive an array of to-process user pointers and submit them
+ * for processing.
+ * rgerhards, 2009-04-22
+ */
+rsRetVal
+actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp)
+{
+ int i;
+ msg_t *pMsg;
+ DEFiRet;
+
+ assert(paUsrp != NULL);
+
+ for(i = 0 ; i < paUsrp->nElem ; i++) {
+ pMsg = (msg_t*) paUsrp->pUsrp[i];
+dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg);
+ CHKiRet(actionCallDoAction(pAction, pMsg));
+ }
+finalize_it:
+ RETiRet;
+}
+
+
/* call the HUP handler for a given action, if such a handler is defined. The
* action mutex is locked, because the HUP handler most probably needs to modify
* some internal state information.
@@ -831,6 +860,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL));