summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c470
1 files changed, 325 insertions, 145 deletions
diff --git a/action.c b/action.c
index 305b6a5c..32a07dcb 100644
--- a/action.c
+++ b/action.c
@@ -46,11 +46,15 @@
#include "wti.h"
#include "datetime.h"
#include "unicode-helper.h"
+#include "atomic.h"
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*);
+static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch);
+static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch);
+static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -187,7 +191,6 @@ actionResetQueueParams(void)
*/
rsRetVal actionDestruct(action_t *pThis)
{
- int i;
DEFiRet;
ASSERT(pThis != NULL);
@@ -206,35 +209,6 @@ rsRetVal actionDestruct(action_t *pThis)
d_free(pThis->pszName);
d_free(pThis->ppTpl);
- /* message ptr cleanup */
- for(i = 0 ; i < pThis->iNumTpls ; ++i) {
- if(pThis->ppMsgs[i] != NULL) {
- switch(pThis->eParamPassing) {
- case ACT_ARRAY_PASSING:
-#if 0 /* later, as an optimization. So far, we do the cleanup after each message */
- iArr = 0;
- while(((char **)pThis->ppMsgs[i])[iArr] != NULL) {
- d_free(((char **)pThis->ppMsgs[i])[iArr++]);
- ((char **)pThis->ppMsgs[i])[iArr++] = NULL;
- }
- d_free(pThis->ppMsgs[i]);
- pThis->ppMsgs[i] = NULL;
-#endif
- break;
- case ACT_STRING_PASSING:
- d_free(pThis->ppMsgs[i]);
- break;
- case ACT_MSG_PASSING:
- /* No cleanup needed in this case */
- break;
- default:
- assert(0);
- }
- }
- }
- d_free(pThis->ppMsgs);
- d_free(pThis->lenMsgs);
-
d_free(pThis);
RETiRet;
@@ -256,6 +230,7 @@ rsRetVal actionConstruct(action_t **ppThis)
pThis->iResumeRetryCount = glbliActionResumeRetryCount;
pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */
pthread_mutex_init(&pThis->mutActExec, NULL);
+ INIT_ATOMIC_HELPER_MUT(pThis->mutCAS);
SYNC_OBJ_TOOL_INIT(pThis);
/* indicate we have a new action */
@@ -280,6 +255,32 @@ actionConstructFinalize(action_t *pThis)
/* find a name for our queue */
snprintf((char*) pszQName, sizeof(pszQName)/sizeof(uchar), "action %d queue", iActionNbr);
+ /* now check if we can run the action in "firehose mode" during stage one of
+ * its processing (that is before messages are enqueued into the action q).
+ * This is only possible if some features, which require strict sequence, are
+ * not used. Thankfully, that is usually the case. The benefit of firehose
+ * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
+ */
+ if( pThis->iExecEveryNthOccur > 1
+ || pThis->f_ReduceRepeated
+ || pThis->iSecsExecOnceInterval
+ ) {
+ DBGPRINTF("info: firehose mode disabled for action because "
+ "iExecEveryNthOccur=%d, "
+ "ReduceRepeated=%d, "
+ "iSecsExecOnceInterval=%d\n",
+ pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
+ pThis->iSecsExecOnceInterval
+ );
+ pThis->submitToActQ = doSubmitToActionQComplexBatch;
+ } else if(pThis->bWriteAllMarkMsgs == FALSE) {
+ /* nearly full-speed submission mode, default case */
+ pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch;
+ } else {
+ /* full firehose submission mode */
+ pThis->submitToActQ = doSubmitToActionQBatch;
+ }
+
/* we need to make a safety check: if the queue is NOT in direct mode, a single
* message object may be accessed by multiple threads. As such, we need to enable
* msg object thread safety in this case (this costs a bit performance and thus
@@ -503,7 +504,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
ASSERT(pThis != NULL);
iRetries = 0;
- while(pThis->eState == ACT_STATE_RTRY) {
+ while((*pThis->pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
iRet = pThis->pMod->tryResume(pThis->pModData);
if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) {
bTreatOKasSusp = 1;
@@ -522,6 +523,9 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
iSleepPeriod = pThis->iResumeInterval;
ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */
srSleep(iSleepPeriod, 0);
+ if(*pThis->pbShutdownImmediate) {
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ }
}
} else if(iRet == RS_RET_DISABLE_ACTION) {
actionDisable(pThis);
@@ -532,6 +536,7 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
pThis->iNbrResRtry = 0;
}
+finalize_it:
RETiRet;
}
@@ -580,7 +585,7 @@ finalize_it:
* depending on its current state.
* rgerhards, 2009-05-07
*/
-static rsRetVal actionPrepare(action_t *pThis)
+static inline rsRetVal actionPrepare(action_t *pThis)
{
DEFiRet;
@@ -617,6 +622,7 @@ finalize_it:
rsRetVal actionDbgPrint(action_t *pThis)
{
DEFiRet;
+ char *sz;
dbgprintf("%s: ", module.GetStateName(pThis->pMod));
pThis->pMod->dbgPrintInstInfo(pThis->pModData);
@@ -629,6 +635,16 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
+ if(pThis->submitToActQ == doSubmitToActionQComplexBatch) {
+ sz = "slow, but feature-rich";
+ } else if(pThis->submitToActQ == doSubmitToActionQNotAllMarkBatch) {
+ sz = "fast, but supports partial mark messages";
+ } else if(pThis->submitToActQ == doSubmitToActionQBatch) {
+ sz = "firehose (fastest)";
+ } else {
+ sz = "unknown (need to update debug display?)";
+ }
+ dbgprintf("\tsubmission mode: %s\n", sz);
dbgprintf("\n");
RETiRet;
@@ -638,7 +654,7 @@ rsRetVal actionDbgPrint(action_t *pThis)
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
-static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
+static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar **ppMsgs, size_t *lenMsgs)
{
int i;
DEFiRet;
@@ -649,16 +665,17 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pAction->ppMsgs[i]), &(pAction->lenMsgs[i])));
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i]), &lenMsgs[i]));
break;
case ACT_ARRAY_PASSING:
- CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pAction->ppMsgs[i])));
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(ppMsgs[i])));
break;
case ACT_MSG_PASSING:
/* we abuse the uchar* ptr, it now actually is a void*, but we can not
* change that other than by chaning the interface, what we don't like...
*/
- pAction->ppMsgs[i] = (uchar*) pMsg;
+ ppMsgs[i] = (void*) pMsg;
+ lenMsgs[i] = 0; /* init for *next* action */
break;
default:assert(0); /* software bug if this happens! */
}
@@ -672,31 +689,23 @@ finalize_it:
/* cleanup doAction calling parameters
* rgerhards, 2009-05-07
*/
-static rsRetVal cleanupDoActionParams(action_t *pAction)
+static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***ppMsgs)
{
- int i;
int iArr;
+ int i;
DEFiRet;
ASSERT(pAction != NULL);
+
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- if(pAction->ppMsgs[i] != NULL) {
- switch(pAction->eParamPassing) {
- case ACT_ARRAY_PASSING:
- iArr = 0;
- while(((char **)pAction->ppMsgs[i])[iArr] != NULL) {
- d_free(((char **)pAction->ppMsgs[i])[iArr++]);
- ((char **)pAction->ppMsgs[i])[iArr++] = NULL;
- }
- d_free(pAction->ppMsgs[i]);
- pAction->ppMsgs[i] = NULL;
- break;
- case ACT_MSG_PASSING:
- case ACT_STRING_PASSING:
- break;
- default:
- assert(0);
+ if(((uchar**)ppMsgs)[i] != NULL) {
+ iArr = 0;
+ while((((uchar***)ppMsgs)[i][iArr]) != NULL) {
+ d_free(((uchar ***)ppMsgs)[i][iArr++]);
+ ((uchar ***)ppMsgs)[i][iArr++] = NULL;
}
+ d_free(((uchar**)ppMsgs)[i]);
+ ((uchar**)ppMsgs)[i] = NULL;
}
}
@@ -705,29 +714,24 @@ static rsRetVal cleanupDoActionParams(action_t *pAction)
/* call the DoAction output plugin entry point
- * Performance note: we build the action parameters here in this function. That
- * means we do it while we hold the action look, potentially reducing concurrency
- * (especially if the action queue is run in DIRECT mode). As an alternative, we
- * may generate all params for the batch as whole before aquiring the action. However,
- * that requires more memory, for large batches potentially a lot of memory. So for the
- * time being, I am doing it here - the performance hit should be very minor and may even
- * not be a hit because we may gain CPU cache locality gains with the "fewer memory"
- * approach (I'd say that is rater likely).
* rgerhards, 2008-01-28
*/
rsRetVal
-actionCallDoAction(action_t *pThis, msg_t *pMsg)
+actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
{
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
- CHKiRet(prepareDoActionParams(pThis, pMsg));
pThis->bHadAutoCommit = 0;
- iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
+//d_pthread_mutex_lock(&pThis->mutActExec);
+//pthread_cleanup_push(mutexCancelCleanup, &pThis->mutActExec);
+ iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData);
+//pthread_cleanup_pop(1); /* unlock mutex */
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -756,7 +760,26 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg)
iRet = getReturnCode(pThis);
finalize_it:
- cleanupDoActionParams(pThis); /* iRet ignored! */
+
+ /* we need to cleanup the batches string buffers if they have been used
+ * in a non-standard way. -- rgerhards, 2010-06-15
+ * Note that we may do this at the batch level, this would provide a bit
+ * more concurrency (TODO).
+ */
+ switch(pThis->eParamPassing) {
+ case ACT_STRING_PASSING:
+ /* nothing to do in that case */
+ break;
+ case ACT_ARRAY_PASSING:
+ cleanupDoActionParams(pThis, actParams); /* iRet ignored! */
+ break;
+ case ACT_MSG_PASSING:
+ /* nothing to do in that case */
+ for(i = 0 ; i < pThis->iNumTpls ; ++i) {
+ ((uchar**)actParams)[i] = NULL;
+ }
+ break;
+ }
RETiRet;
}
@@ -766,18 +789,17 @@ finalize_it:
* this readies the action and then calls doAction()
* rgerhards, 2008-01-28
*/
-rsRetVal
-actionProcessMessage(action_t *pThis, msg_t *pMsg)
+static inline rsRetVal
+actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams)
{
DEFiRet;
ASSERT(pThis != NULL);
ISOBJ_TYPE_assert(pMsg, msg);
-RUNLOG_STR("inside actionProcessMsg()");
CHKiRet(actionPrepare(pThis));
if(pThis->eState == ACT_STATE_ITX)
- CHKiRet(actionCallDoAction(pThis, pMsg));
+ CHKiRet(actionCallDoAction(pThis, pMsg, actParams));
iRet = getReturnCode(pThis);
finalize_it:
@@ -797,8 +819,10 @@ finishBatch(action_t *pThis, batch_t *pBatch)
ASSERT(pThis != NULL);
- if(pThis->eState == ACT_STATE_RDY)
+ if(pThis->eState == ACT_STATE_RDY) {
+ /* we just need to flag the batch as commited */
FINALIZE; /* nothing to do */
+ }
CHKiRet(actionPrepare(pThis));
if(pThis->eState == ACT_STATE_ITX) {
@@ -808,7 +832,8 @@ finishBatch(action_t *pThis, batch_t *pBatch)
actionCommitted(pThis);
/* flag messages as committed */
for(i = 0 ; i < pBatch->nElem ; ++i) {
- pBatch->pElem[i].state = BATCH_STATE_COMM;
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+ pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */
}
break;
case RS_RET_SUSPENDED:
@@ -843,8 +868,8 @@ finalize_it:
/* try to submit a partial batch of elements.
* rgerhards, 2009-05-12
*/
-static rsRetVal
-tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImmediate)
+static inline rsRetVal
+tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
{
int i;
int iElemProcessed;
@@ -860,12 +885,17 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme
iElemProcessed = 0;
iCommittedUpTo = i;
while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
- if(*pbShutdownImmediate)
+ if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
- pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
- if(pBatch->pElem[i].state != BATCH_STATE_DISC) {
- localRet = actionProcessMessage(pAction, pMsg);
+ if( pBatch->pElem[i].bFilterOK
+ && pBatch->pElem[i].state != BATCH_STATE_DISC
+ && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams);
DBGPRINTF("action call returned %d\n", localRet);
+ /* Note: we directly modify the batch object state, because we know that
+ * wo do not overwrite BATCH_STATE_DISC indicators!
+ */
if(localRet == RS_RET_OK) {
/* mark messages as committed */
while(iCommittedUpTo <= i) {
@@ -882,7 +912,8 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme
} else if(localRet == RS_RET_DISCARDMSG) {
pBatch->pElem[i].state = BATCH_STATE_DISC;
} else {
- dbgprintf("tryDoAction: unexpected error code %d, finalizing\n", localRet);
+ dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n",
+ localRet, *pnElem, iCommittedUpTo);
iRet = localRet;
FINALIZE;
}
@@ -892,10 +923,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImme
}
finalize_it:
- if(pBatch->nElem == 1 && pBatch->pElem[0].state == BATCH_STATE_DISC) {
- iRet = RS_RET_DISCARDMSG;
- } else if(pBatch->iDoneUpTo != iCommittedUpTo) {
- *pnElem += iCommittedUpTo - pBatch->iDoneUpTo;
+ if(pBatch->iDoneUpTo != iCommittedUpTo) {
pBatch->iDoneUpTo = iCommittedUpTo;
}
RETiRet;
@@ -905,10 +933,12 @@ finalize_it:
/* submit a batch for actual action processing.
* The first nElem elements are processed. This function calls itself
* recursively if it needs to handle errors.
+ * Note: we don't need the number of the first message to be processed as a parameter,
+ * because this is kept track of inside the batch itself (iDoneUpTo).
* rgerhards, 2009-05-12
*/
static rsRetVal
-submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmediate)
+submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
{
int i;
int bDone;
@@ -919,47 +949,49 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi
bDone = 0;
do {
- localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate);
- if(localRet == RS_RET_FORCE_TERM)
- FINALIZE;
+ localRet = tryDoAction(pAction, pBatch, &nElem);
+ if(localRet == RS_RET_FORCE_TERM) {
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ }
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
/* try commit transaction, once done, we can simply do so as if
* that return state was returned from tryDoAction().
*/
- localRet = finishBatch(pAction, pBatch); // TODO: careful, do we need the elem counter?
+ localRet = finishBatch(pAction, pBatch);
}
if( localRet == RS_RET_OK
|| localRet == RS_RET_PREVIOUS_COMMITTED
|| localRet == RS_RET_DEFER_COMMIT) {
bDone = 1;
- } else if(localRet == RS_RET_DISCARDMSG) {
- iRet = RS_RET_DISCARDMSG; /* TODO: verify this sequence -- rgerhards, 2009-07-30 */
- bDone = 1;
} else if(localRet == RS_RET_SUSPENDED) {
; /* do nothing, this will retry the full batch */
} else if(localRet == RS_RET_ACTION_FAILED) {
- /* in this case, the whole batch can not be processed */
- for(i = 0 ; i < nElem ; ++i) {
- pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD;
+ /* in this case, everything not yet committed is BAD */
+ for(i = pBatch->iDoneUpTo ; i < nElem ; ++i) {
+ if( pBatch->pElem[i].state != BATCH_STATE_DISC
+ && pBatch->pElem[i].state != BATCH_STATE_COMM ) {
+ pBatch->pElem[i].state = BATCH_STATE_BAD;
+ pBatch->pElem[i].bPrevWasSuspended = 1;
+ }
}
bDone = 1;
} else {
if(nElem == 1) {
- pBatch->pElem[pBatch->iDoneUpTo++].state = BATCH_STATE_BAD;
+ batchSetElemState(pBatch, i, BATCH_STATE_BAD);
bDone = 1;
} else {
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
- submitBatch(pAction, pBatch, nElem / 2, pbShutdownImmediate);
- submitBatch(pAction, pBatch, nElem - (nElem / 2), pbShutdownImmediate);
+ submitBatch(pAction, pBatch, nElem / 2);
+ submitBatch(pAction, pBatch, nElem - (nElem / 2));
bDone = 1;
}
}
- } while(!bDone && !*pbShutdownImmediate); /* do .. while()! */
+ } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */
- if(*pbShutdownImmediate)
+ if(*(pBatch->pbShutdownImmediate))
ABORT_FINALIZE(RS_RET_FORCE_TERM);
finalize_it:
@@ -967,17 +999,45 @@ finalize_it:
}
+
+/* The following function prepares a batch for processing, that it is
+ * reinitializes batch states, generates strings and does everything else
+ * that needs to be done in order to make the batch ready for submission to
+ * the actual output module. Note that we look at the precomputed
+ * filter OK condition and process only those messages, that actually matched
+ * the filter.
+ * rgerhards, 2010-06-14
+ */
+static inline rsRetVal
+prepareBatch(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ batch_obj_t *pElem;
+ DEFiRet;
+
+ pBatch->iDoneUpTo = 0;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ pElem = &(pBatch->pElem[i]);
+ if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
+ pElem->state = BATCH_STATE_RDY;
+ prepareDoActionParams(pAction, (msg_t*) pElem->pUsrp,
+ (uchar**) &(pElem->staticActParams), pElem->staticLenParams);
+ }
+ }
+ RETiRet;
+}
+
+
/* receive a batch and process it. This includes retry handling.
* rgerhards, 2009-05-12
*/
-static rsRetVal
-processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
+static inline rsRetVal
+processAction(action_t *pAction, batch_t *pBatch)
{
DEFiRet;
assert(pBatch != NULL);
- pBatch->iDoneUpTo = 0;
- CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pbShutdownImmediate));
+ CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem));
iRet = finishBatch(pAction, pBatch);
finalize_it:
@@ -993,10 +1053,16 @@ finalize_it:
static rsRetVal
processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
{
+ int *pbShutdownImmdtSave;
DEFiRet;
assert(pBatch != NULL);
+ pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
+ pBatch->pbShutdownImmediate = pbShutdownImmediate;
+ pAction->pbShutdownImmediate = pBatch->pbShutdownImmediate;
+ CHKiRet(prepareBatch(pAction, pBatch));
+
/* We now must guard the output module against execution by multiple threads. The
* plugin interface specifies that output modules must not be thread-safe (except
* if they notify us they are - functionality not yet implemented...).
@@ -1005,10 +1071,12 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
d_pthread_mutex_lock(&pAction->mutActExec);
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- iRet = processAction(pAction, pBatch, pbShutdownImmediate);
+ iRet = processAction(pAction, pBatch);
pthread_cleanup_pop(1); /* unlock mutex */
+finalize_it:
+ pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
@@ -1073,20 +1141,8 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
}
-/* rgerhards 2004-11-09: fprintlog() is the actual driver for
- * the output channel. It receives the channel description (f) as
- * well as the message and outputs them according to the channel
- * semantics. The message is typically already contained in the
- * channel save buffer (f->f_prevline). This is not only the case
- * when a message was already repeated but also when a new message
- * arrived.
- * rgerhards 2007-08-01: interface changed to use action_t
- * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE
- * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do
- * not do this here. Failing to do so results in all kinds of
- * "interesting" problems!
- * RGERHARDS, 2008-01-29:
- * This is now the action caller and has been renamed.
+/* This function builds up a batch of messages to be (later)
+ * submitted to the action queue.
*/
rsRetVal
actionWriteToAction(action_t *pAction)
@@ -1212,7 +1268,7 @@ finalize_it:
/* helper to actonCallAction, mostly needed because of this damn
* pthread_cleanup_push() POSIX macro...
*/
-static rsRetVal
+static inline rsRetVal
doActionCallAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
@@ -1267,41 +1323,167 @@ finalize_it:
RETiRet;
}
+/* This submits the message to the action queue in case we do NOT need to handle repeat
+ * message processing. That case permits us to gain lots of freedom during processing
+ * and thus speed.
+ * rgerhards, 2010-06-08
+ */
+static inline rsRetVal
+doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
-/* call the configured action. Does all necessary housekeeping.
- * rgerhards, 2007-08-01
- * FYI: currently, this function is only called from the queue
- * consumer. So we (conceptually) run detached from the input
- * threads (which also means we may run much later than when the
- * message was generated).
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
+ else
+ iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg));
+
+ RETiRet;
+}
+
+
+
+/* This submits the message to the action queue in case where we need to handle
+ * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop
+ * for the synchronization. Here, we just modify the filter condition to be false when
+ * a mark message must not be written. However, in this case we must save the previous
+ * filter as we may need it in the next action (potential future optimization: check if this is
+ * the last action TODO).
+ * rgerhards, 2010-06-08
*/
-#pragma GCC diagnostic ignored "-Wempty-body"
-rsRetVal
-actionCallAction(action_t *pAction, msg_t *pMsg)
+static rsRetVal
+doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
{
+ time_t now = 0;
+ time_t lastAct;
+ int i;
+ int bProcessMarkMsgs;
+ int bModifiedFilter;
+ sbool FilterSave[128];
+ sbool *pFilterSave;
DEFiRet;
- ISOBJ_TYPE_assert(pMsg, msg);
- ASSERT(pAction != NULL);
+ if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) {
+ pFilterSave = FilterSave;
+ } else {
+ CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
+ }
- /* We need to lock the mutex only for repeated line processing.
- * rgerhards, 2009-06-19
- */
- //if(pAction->f_ReduceRepeated == 1) {
- LockObj(pAction);
- pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
- iRet = doActionCallAction(pAction, pMsg);
- UnlockObj(pAction);
- pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- //} else {
- //iRet = doActionCallAction(pAction, pMsg);
- //}
+ bModifiedFilter = 0;
+ for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
+ pFilterSave[i] = pBatch->pElem[i].bFilterOK;
+ if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) {
+ /* check if we need to write or not */
+ if(now == 0) {
+ now = datetime.GetTime(NULL); /* good time call - the only one done */
+ /* CAS loop, we write back a bit early, but that's OK... */
+ /* we use reception time, not dequeue time - this is considered more appropriate and
+ * also faster ;) -- rgerhards, 2008-09-17 */
+ do {
+ lastAct = pAction->f_time;
+ if((now - lastAct) < MarkInterval / 2) {
+ DBGPRINTF("action was recently called, ignoring mark message\n");
+ bProcessMarkMsgs = 0;
+ } else {
+ bProcessMarkMsgs = 1;
+ }
+ } while(ATOMIC_CAS(&pAction->f_time, lastAct,
+ ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0);
+ }
+ if(bProcessMarkMsgs) {
+ pBatch->pElem[i].bFilterOK = 0;
+ bModifiedFilter = 1;
+ }
+ }
+ }
+
+ DBGPRINTF("Called action(NotAllMark), logging to %s\n", module.GetStateName(pAction->pMod));
+
+ iRet = doSubmitToActionQBatch(pAction, pBatch);
+
+ if(bModifiedFilter) {
+ /* in this case, we need to restore previous state */
+ for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
+ pBatch->pElem[i].bFilterOK = pFilterSave[i];
+ }
+ }
+
+finalize_it:
+ if(pFilterSave != FilterSave)
+ free(pFilterSave);
RETiRet;
}
-#pragma GCC diagnostic warning "-Wempty-body"
+/* This submits the message to the action queue in case we do NOT need to handle repeat
+ * message processing. That case permits us to gain lots of freedom during processing
+ * and thus speed.
+ * rgerhards, 2010-06-08
+ */
+static rsRetVal
+doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ DEFiRet;
+
+ DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
+ if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
+ else { /* in this case, we do single submits to the queue.
+ * TODO: optimize this, we may do at least a multi-submit!
+ */
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(pBatch->pElem[i].bFilterOK) {
+ doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ }
+ }
+ }
+
+ RETiRet;
+}
+
+
+
+/* Helper to submit a batch of actions to the engine. Note that we have rather
+ * complicated processing here, so we need to do this one message after another.
+ * rgerhards, 2010-06-23
+ */
+static inline rsRetVal
+helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ DEFiRet;
+
+ DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(pBatch->pElem[i].bFilterOK) {
+ doActionCallAction(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ }
+ }
+
+ RETiRet;
+}
+
+/* Call configured action, most complex case with all features supported (and thus slow).
+ * rgerhards, 2010-06-08
+ */
+#pragma GCC diagnostic ignored "-Wempty-body"
+static rsRetVal
+doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
+{
+ DEFiRet;
+
+ LockObj(pAction);
+ pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
+ iRet = helperSubmitToActionQComplexBatch(pAction, pBatch);
+ UnlockObj(pAction);
+ pthread_cleanup_pop(0); /* remove mutex cleanup handler */
+
+ RETiRet;
+}
+#pragma GCC diagnostic warning "-Wempty-body"
+
/* add an Action to the current selector
* The pOMSR is freed, as it is not needed after this function.
* Note: this function pulls global data that specifies action config state.
@@ -1347,8 +1529,6 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
if(pAction->iNumTpls > 0) {
/* we first need to create the template pointer array */
CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *)));
- CHKmalloc(pAction->ppMsgs = (uchar**) calloc(pAction->iNumTpls, sizeof(uchar *)));
- CHKmalloc(pAction->lenMsgs = (size_t*) calloc(pAction->iNumTpls, sizeof(size_t)));
}
for(i = 0 ; i < pAction->iNumTpls ; ++i) {