summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c72
-rw-r--r--runtime/ruleset.c6
2 files changed, 24 insertions, 54 deletions
diff --git a/action.c b/action.c
index 8349120c..e23f013f 100644
--- a/action.c
+++ b/action.c
@@ -115,9 +115,9 @@
/* forward definitions */
static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti);
+static rsRetVal doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t*);
static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*);
static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*);
-static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -411,7 +411,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst)
pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch;
} else {
/* full firehose submission mode */
- pThis->submitToActQ = doSubmitToActionQBatch;
+ pThis->submitToActQ = doSubmitToActionQ;
}
/* create queue */
@@ -1188,24 +1188,24 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
* the complex logic has been applied ;)
* rgerhards, 2010-06-08
*/
-static inline rsRetVal
+static rsRetVal
doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
+ struct syslogTime ttNow;
DEFiRet;
- // TODO: bug? Isn't that supposed to be checked in direct mode as well???
- if(getActionState(pWti, pAction) == ACT_STATE_DIED) {
- DBGPRINTF("action %p died, do NOT execute\n", pAction);
- FINALIZE;
- }
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
- if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
- iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg), pWti);
- else
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
+ ttNow.year = 0;
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow);
+ } else {/* in this case, we do single submits to the queue.
+ * TODO: optimize this, we may do at least a multi-submit!
+ */
iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
+ }
-finalize_it:
RETiRet;
}
@@ -1333,6 +1333,11 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
time_t lastAct;
DEFiRet;
+ /* TODO: think about the whole logic. If messages come in out of order, things
+ * tend to become a bit unreliable. On the other hand, this only happens if we have
+ * very high traffic, in which this use case here is not really affected (as the
+ * MarkInterval is pretty corase).
+ */
/* CAS loop, we write back a bit early, but that's OK... */
/* we use reception time, not dequeue time - this is considered more appropriate and
* also faster ;) -- rgerhards, 2008-09-17 */
@@ -1351,48 +1356,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
if(doProcess) {
DBGPRINTF("Called action(NotAllMark), processing via '%s'\n",
module.GetStateName(pAction->pMod));
- iRet = doSubmitToActionQBatch(pAction, pWti, pMsg);
- }
-
- RETiRet;
-}
-
-
-/* enqueue a batch in direct mode. We have put this into its own function just to avoid
- * cluttering the actual submit function.
- * rgerhards, 2011-06-16
- */
-static inline rsRetVal
-doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
-{
-dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg);
- struct syslogTime ttNow;
- DEFiRet;
- if(GatherStats)
- STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
- ttNow.year = 0;
- iRet = processMsgMain(pAction, pWti, pMsg, &ttNow);
- RETiRet;
-}
-
-/* This submits the message to the action queue in case we do NOT need to handle repeat
- * message processing. That case permits us to gain lots of freedom during processing
- * and thus speed.
- * rgerhards, 2010-06-08
- */
-static rsRetVal
-doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
-{
- DEFiRet;
-
- DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
-
- if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
- iRet = doQueueEnqObjDirectBatch(pAction, pWti, pMsg);
- } else {/* in this case, we do single submits to the queue.
- * TODO: optimize this, we may do at least a multi-submit!
- */
- doSubmitToActionQ(pAction, pWti, pMsg);
+ iRet = doSubmitToActionQ(pAction, pWti, pMsg);
}
RETiRet;
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 4ac0039a..6013baf7 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -169,8 +169,14 @@ execAct(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
// if we actually are permitted to execute this action.
// NOTE: this will primarily be handled by end-of-batch processing
+ if(getActionState(pWti, stmt->d.act) == ACT_STATE_DIED) {
+ DBGPRINTF("action %d died, do NOT execute\n", stmt->d.act->iActionNbr);
+ goto done;
+ }
+
DBGPRINTF("executing action %d\n", stmt->d.act->iActionNbr);
stmt->d.act->submitToActQ(stmt->d.act, pWti, pMsg);
+done: return;
}
static void