summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c435
1 files changed, 159 insertions, 276 deletions
diff --git a/action.c b/action.c
index efc116b5..259fb666 100644
--- a/action.c
+++ b/action.c
@@ -12,11 +12,11 @@
* necessary to triple-check that everything works well in *all* modes.
* The different modes (and calling sequence) are:
*
- * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval
+ * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval
* - doSubmitToActionQComplexBatch
* - helperSubmitToActionQComplexBatch
* - doActionCallAction
- * handles duplicate message processing, but in essence calls
+ * handles mark message reduction, but in essence calls
* - actionWriteToAction
* - qqueueEnqObj
* (now queue engine processing)
@@ -188,7 +188,7 @@ static struct cnfparamdescr cnfparamdescr[] = {
{ "action.execonlyeverynthtime", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtime */
{ "action.execonlyeverynthtimetimeout", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtimetimeout */
{ "action.execonlyonceeveryinterval", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyonceeveryinterval */
- { "action.execonlywhenpreviousissuspended", eCmdHdlrInt, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */
+ { "action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */
{ "action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0 }, /* legacy: repeatedmsgcontainsoriginalmsg */
{ "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */
{ "action.resumeinterval", eCmdHdlrInt, 0 }
@@ -289,6 +289,11 @@ rsRetVal actionDestruct(action_t *pThis)
DEFiRet;
ASSERT(pThis != NULL);
+ if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) {
+ /* discard actions will be optimized out */
+ FINALIZE;
+ }
+
if(pThis->pQueue != NULL) {
qqueueDestruct(&pThis->pQueue);
}
@@ -302,16 +307,13 @@ rsRetVal actionDestruct(action_t *pThis)
if(pThis->pMod != NULL)
pThis->pMod->freeInstance(pThis->pModData);
- if(pThis->f_pMsg != NULL)
- msgDestruct(&pThis->f_pMsg);
-
pthread_mutex_destroy(&pThis->mutAction);
pthread_mutex_destroy(&pThis->mutActExec);
d_free(pThis->pszName);
d_free(pThis->ppTpl);
+finalize_it:
d_free(pThis);
-
RETiRet;
}
@@ -362,6 +364,10 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
ASSERT(pThis != NULL);
+ if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) {
+ /* discard actions will be optimized out */
+ FINALIZE;
+ }
/* generate a friendly name for us action stats */
if(pThis->pszName == NULL) {
snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr);
@@ -401,16 +407,11 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
* mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
*/
if( pThis->iExecEveryNthOccur > 1
- || pThis->f_ReduceRepeated
|| pThis->iSecsExecOnceInterval
) {
DBGPRINTF("info: firehose mode disabled for action because "
- "iExecEveryNthOccur=%d, "
- "ReduceRepeated=%d, "
- "iSecsExecOnceInterval=%d\n",
- pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
- pThis->iSecsExecOnceInterval
- );
+ "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n",
+ pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval);
pThis->submitToActQ = doSubmitToActionQComplexBatch;
} else if(pThis->bWriteAllMarkMsgs == RSFALSE) {
/* nearly full-speed submission mode, default case */
@@ -429,7 +430,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize,
(rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
obj.SetName((obj_t*) pThis->pQueue, pszAName);
- qqueueSetpUsr(pThis->pQueue, pThis);
+ qqueueSetpAction(pThis->pQueue, pThis);
if(queueParams == NULL) { /* use legacy params? */
/* ... set some properties ... */
@@ -604,13 +605,17 @@ static void actionDisable(action_t *pThis)
* CPU time. TODO: maybe a config option for that?
* rgerhards, 2007-08-02
*/
-static inline void actionSuspend(action_t *pThis, time_t ttNow)
+static inline void actionSuspend(action_t *pThis)
{
- if(ttNow == NO_TIME_PROVIDED)
- datetime.GetTime(&ttNow);
+ time_t ttNow;
+
+ /* note: we can NOT use a cached timestamp, as time may have evolved
+ * since caching, and this would break logic (and it actually did so!)
+ */
+ datetime.GetTime(&ttNow);
pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
actionSetState(pThis, ACT_STATE_SUSP);
- DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry);
+ DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry);
}
@@ -630,7 +635,7 @@ static inline void actionSuspend(action_t *pThis, time_t ttNow)
* of its inability to recover. -- rgerhards, 2010-04-26.
*/
static inline rsRetVal
-actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate)
+actionDoRetry(action_t *pThis, int *pbShutdownImmediate)
{
int iRetries;
int iSleepPeriod;
@@ -641,24 +646,28 @@ actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate)
iRetries = 0;
while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
+ DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries);
iRet = pThis->pMod->tryResume(pThis->pModData);
- if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) {
+ DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet);
+ if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) {
bTreatOKasSusp = 1;
pThis->iResumeOKinRow = 0;
} else {
bTreatOKasSusp = 0;
}
if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) {
+ DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet);
actionSetState(pThis, ACT_STATE_RDY);
} else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) {
/* max retries reached? */
+ DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n",
+ pThis->iResumeRetryCount, iRetries);
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
- actionSuspend(pThis, ttNow);
+ actionSuspend(pThis);
} else {
++pThis->iNbrResRtry;
++iRetries;
iSleepPeriod = pThis->iResumeInterval;
- ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */
srSleep(iSleepPeriod, 0);
if(*pbShutdownImmediate) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
@@ -705,7 +714,7 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate)
if(pThis->eState == ACT_STATE_RTRY) {
if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
datetime.GetTime(&ttNow);
- CHKiRet(actionDoRetry(pThis, ttNow, pbShutdownImmediate));
+ CHKiRet(actionDoRetry(pThis, pbShutdownImmediate));
}
if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
@@ -765,7 +774,6 @@ rsRetVal actionDbgPrint(action_t *pThis)
pThis->pMod->dbgPrintInstInfo(pThis->pModData);
dbgprintf("\n");
dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
- dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
if(pThis->eState == ACT_STATE_SUSP) {
dbgprintf("\tresume next retry: %u, number retries: %d",
@@ -792,7 +800,8 @@ rsRetVal actionDbgPrint(action_t *pThis)
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
-static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem)
+static rsRetVal
+prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow)
{
int i;
msg_t *pMsg;
@@ -802,23 +811,23 @@ static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem)
ASSERT(pAction != NULL);
ASSERT(pElem != NULL);
- pMsg = (msg_t*) pElem->pUsrp;
+ pMsg = pElem->pMsg;
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]),
- &pElem->staticLenStrings[i]));
+ &pElem->staticLenStrings[i], ttNow));
pElem->staticActParams[i] = pElem->staticActStrings[i];
break;
case ACT_ARRAY_PASSING:
- CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i])));
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow));
break;
case ACT_MSG_PASSING:
pElem->staticActParams[i] = (void*) pMsg;
break;
case ACT_JSON_PASSING:
- CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json));
+ CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow));
pElem->staticActParams[i] = (void*) json;
break;
default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n",
@@ -851,9 +860,12 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
ASSERT(pAction != NULL);
+ if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING)
+ goto done; /* we need to do nothing with these types! */
+
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
pElem = &(pBatch->pElem[i]);
- if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
+ if(batchIsValidElem(pBatch, i)) {
switch(pAction->eParamPassing) {
case ACT_ARRAY_PASSING:
ppMsgs = (uchar***) pElem->staticActParams;
@@ -870,19 +882,6 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
}
}
break;
- case ACT_STRING_PASSING:
- case ACT_MSG_PASSING:
- /* nothing to do in that case */
- /* TODO ... and yet we do something ;) This is considered not
- * really needed, but I was not bold enough to remove that while
- * fixing the stable. It should be removed in a devel version
- * soon (I really don't see a reason why we would need it).
- * rgerhards, 2010-12-16
- */
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
- ((uchar**)pElem->staticActParams)[j] = NULL;
- }
- break;
case ACT_JSON_PASSING:
for(j = 0 ; j < pAction->iNumTpls ; ++j) {
json_object_put((struct json_object*)
@@ -890,11 +889,15 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
pElem->staticActParams[j] = NULL;
}
break;
+ case ACT_STRING_PASSING:
+ case ACT_MSG_PASSING:
+ /* can never happen, just to keep compiler happy! */
+ break;
}
}
}
- RETiRet;
+done: RETiRet;
}
@@ -958,6 +961,8 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd
ISOBJ_TYPE_assert(pMsg, msg);
CHKiRet(actionPrepare(pThis, pbShutdownImmediate));
+ if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
+ pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
if(pThis->eState == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, pMsg, actParams));
@@ -1051,9 +1056,8 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
/* NOTE: do NOT extend the filter below! Anything else must be done on the
* enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
*/
- if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC) {
- pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ if(batchIsValidElem(pBatch, i)) {
+ pMsg = pBatch->pElem[i].pMsg;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
pBatch->pbShutdownImmediate);
DBGPRINTF("action %p call returned %d\n", pAction, localRet);
@@ -1076,11 +1080,11 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
++iCommittedUpTo;
//pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
}
- pBatch->pElem[i].state = BATCH_STATE_SUB;
+ pBatch->eltState[i] = BATCH_STATE_SUB;
} else if(localRet == RS_RET_DEFER_COMMIT) {
- pBatch->pElem[i].state = BATCH_STATE_SUB;
+ pBatch->eltState[i] = BATCH_STATE_SUB;
} else if(localRet == RS_RET_DISCARDMSG) {
- pBatch->pElem[i].state = BATCH_STATE_DISC;
+ pBatch->eltState[i] = BATCH_STATE_DISC;
} else {
dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n",
localRet, *pnElem, iCommittedUpTo);
@@ -1117,6 +1121,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
assert(pBatch != NULL);
+ DBGPRINTF("submitBatch: enter, nElem %d\n", nElem);
wasDoneTo = pBatch->iDoneUpTo;
bDone = 0;
do {
@@ -1138,13 +1143,14 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
|| localRet == RS_RET_DEFER_COMMIT) {
bDone = 1;
} else if(localRet == RS_RET_SUSPENDED) {
- ; /* do nothing, this will retry the full batch */
+ DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n");
+ /* do nothing, this will retry the full batch */
} else if(localRet == RS_RET_ACTION_FAILED) {
/* in this case, everything not yet committed is BAD */
for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) {
- if( pBatch->pElem[i].state != BATCH_STATE_DISC
- && pBatch->pElem[i].state != BATCH_STATE_COMM ) {
- pBatch->pElem[i].state = BATCH_STATE_BAD;
+ if( pBatch->eltState[i] != BATCH_STATE_DISC
+ && pBatch->eltState[i] != BATCH_STATE_COMM ) {
+ pBatch->eltState[i] = BATCH_STATE_BAD;
pBatch->pElem[i].bPrevWasSuspended = 1;
STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail);
}
@@ -1173,6 +1179,29 @@ finalize_it:
}
+/* copy "active" array of batch, as we need to modify it. The caller
+ * must make sure the new array is freed and the orginal batch
+ * pointer is restored (thus the caller must save it). If active
+ * is currently NULL, this is properly handled.
+ * Note: the batches active pointer is modified, so it must be
+ * saved BEFORE calling this function!
+ * rgerhards, 2012-09-12
+ */
+static rsRetVal
+copyActive(batch_t *pBatch)
+{
+ sbool *active;
+ DEFiRet;
+
+ CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
+ if(pBatch->active == NULL)
+ memset(active, 1, batchNumMsgs(pBatch));
+ else
+ memcpy(active, pBatch->active, batchNumMsgs(pBatch));
+ pBatch->active = active;
+finalize_it:
+ RETiRet;
+}
/* The following function prepares a batch for processing, that it is
* reinitializes batch states, generates strings and does everything else
@@ -1183,19 +1212,29 @@ finalize_it:
* rgerhards, 2010-06-14
*/
static inline rsRetVal
-prepareBatch(action_t *pAction, batch_t *pBatch)
+prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr)
{
int i;
batch_obj_t *pElem;
+ struct syslogTime ttNow;
DEFiRet;
+ /* indicate we have not yet read the date */
+ ttNow.year = 0;
+
pBatch->iDoneUpTo = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
pElem = &(pBatch->pElem[i]);
- if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
- pElem->state = BATCH_STATE_RDY;
- if(prepareDoActionParams(pAction, pElem) != RS_RET_OK)
- pElem->bFilterOK = RSFALSE;
+ if(batchIsValidElem(pBatch, i)) {
+ pBatch->eltState[i] = BATCH_STATE_RDY;
+ if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) {
+ /* make sure we have our copy of "active" array */
+ if(!*bMustRestoreActivePtr) {
+ *activeSave = pBatch->active;
+ copyActive(pBatch);
+ }
+ pBatch->active[i] = RSFALSE;
+ }
}
}
RETiRet;
@@ -1228,14 +1267,18 @@ static rsRetVal
processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
{
int *pbShutdownImmdtSave;
+ sbool *activeSave;
+ int bMustRestoreActivePtr = 0;
rsRetVal localRet;
DEFiRet;
assert(pBatch != NULL);
- pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
- pBatch->pbShutdownImmediate = pbShutdownImmediate;
- CHKiRet(prepareBatch(pAction, pBatch));
+ if(pbShutdownImmediate != NULL) {
+ pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
+ pBatch->pbShutdownImmediate = pbShutdownImmediate;
+ }
+ CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr));
/* We now must guard the output module against execution by multiple threads. The
* plugin interface specifies that output modules must not be thread-safe (except
@@ -1258,9 +1301,15 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
if(iRet == RS_RET_OK)
iRet = localRet;
+
+ if(bMustRestoreActivePtr) {
+ free(pBatch->active);
+ pBatch->active = activeSave;
+ }
finalize_it:
- pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
+ if(pbShutdownImmediate != NULL)
+ pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
@@ -1343,9 +1392,9 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
if(pAction->pQueue->qType == QUEUETYPE_DIRECT)
- iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg));
+ iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg));
else
- iRet = qqueueEnqObj(pAction->pQueue, eFLOWCTL_NO_DELAY, (void*) MsgAddRef(pMsg));
+ iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
finalize_it:
RETiRet;
@@ -1360,14 +1409,10 @@ finalize_it:
* be filtered out before calling us (what is done currently!).
*/
rsRetVal
-actionWriteToAction(action_t *pAction)
+actionWriteToAction(action_t *pAction, msg_t *pMsg)
{
- msg_t *pMsgSave; /* to save current message pointer, necessary to restore
- it in case it needs to be updated (e.g. repeated msgs) */
DEFiRet;
- pMsgSave = NULL; /* indicate message poiner not saved */
-
/* first, we check if the action should actually be called. The action-specific
* $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth
* time. So we need to check if we need to drop the (otherwise perfectly executable)
@@ -1394,43 +1439,6 @@ actionWriteToAction(action_t *pAction)
}
}
- /* then check if this is a regular message or the repeation of
- * a previous message. If so, we need to change the message text
- * to "last message repeated n times" and then go ahead and write
- * it. Please note that we can not modify the message object, because
- * that would update it in other selectors as well. As such, we first
- * need to create a local copy of the message, which we than can update.
- * rgerhards, 2007-07-10
- */
- if(pAction->f_prevcount > 1) {
- msg_t *pMsg;
- size_t lenRepMsg;
- uchar szRepMsg[1024];
-
- if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) {
- /* it failed - nothing we can do against it... */
- DBGPRINTF("Message duplication failed, dropping repeat message.\n");
- ABORT_FINALIZE(RS_RET_ERR);
- }
-
- if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times",
- pAction->f_prevcount);
- } else {
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]",
- pAction->f_prevcount, getMSG(pAction->f_pMsg));
- }
-
- /* We now need to update the other message properties. Please note that digital
- * signatures inside the message are also invalidated.
- */
- datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime));
- memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime));
- MsgReplaceMSG(pMsg, szRepMsg, lenRepMsg);
- pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */
- pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */
- }
-
DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
/* now check if we need to drop the message because otherwise the action would be too
@@ -1451,31 +1459,14 @@ actionWriteToAction(action_t *pAction)
/* we use reception time, not dequeue time - this is considered more appropriate and also faster ;)
* rgerhards, 2008-09-17 */
pAction->tLastExec = getActNow(pAction); /* re-init time flags */
- pAction->f_time = pAction->f_pMsg->ttGenTime;
+ pAction->f_time = pMsg->ttGenTime;
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
-
- if(iRet == RS_RET_OK)
- pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
+ iRet = doSubmitToActionQ(pAction, pMsg);
finalize_it:
- if(pMsgSave != NULL) {
- /* we had saved the original message pointer. That was
- * done because we needed to create a temporary one
- * (most often for "message repeated n time" handling). If so,
- * we need to restore the original one now, so that procesing
- * can continue as normal. We also need to discard the temporary
- * one, as we do not like memory leaks ;) Please note that the original
- * message object will be discarded by our callers, so this is nothing
- * of our business. rgerhards, 2007-07-10
- */
- msgDestruct(&pAction->f_pMsg);
- pAction->f_pMsg = pMsgSave; /* restore it */
- }
-
RETiRet;
}
@@ -1489,7 +1480,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
msg_t *pMsg;
DEFiRet;
- pMsg = (msg_t*)(pBatch->pElem[idxBtch].pUsrp);
+ pMsg = pBatch->pElem[idxBtch].pMsg;
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
/* don't output marks to recently written outputs */
@@ -1498,43 +1489,8 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
ABORT_FINALIZE(RS_RET_OK);
}
- /* suppress duplicate messages */
- if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL &&
- (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
- !ustrcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
- !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) &&
- !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) &&
- !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) {
- pAction->f_prevcount++;
- DBGPRINTF("msg repeated %d times, %ld sec of %d.\n",
- pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time,
- repeatinterval[pAction->f_repeatcount]);
- /* use current message, so we have the new timestamp (means we need to discard previous one) */
- msgDestruct(&pAction->f_pMsg);
- pAction->f_pMsg = MsgAddRef(pMsg);
- /* If domark would have logged this by now, flush it now (so we don't hold
- * isolated messages), but back off so we'll flush less often in the future.
- */
- if(getActNow(pAction) > REPEATTIME(pAction)) {
- iRet = actionWriteToAction(pAction);
- BACKOFF(pAction);
- }
- } else {/* new message, save it */
- /* first check if we have a previous message stored
- * if so, emit and then discard it first
- */
- if(pAction->f_pMsg != NULL) {
- if(pAction->f_prevcount > 0)
- actionWriteToAction(pAction);
- /* we do not care about iRet above - I think it's right but if we have
- * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
- */
- msgDestruct(&pAction->f_pMsg);
- }
- pAction->f_pMsg = MsgAddRef(pMsg);
- /* call the output driver */
- iRet = actionWriteToAction(pAction);
- }
+ /* call the output driver */
+ iRet = actionWriteToAction(pAction, pMsg);
finalize_it:
/* we need to update the batch to handle failover processing correctly */
@@ -1564,7 +1520,8 @@ DEFFUNC_llExecFunc(doActivateActions)
}
actionDisable(pThis);
}
- DBGPRINTF("Action %p: queue %p started\n", pThis, pThis->pQueue);
+ DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod),
+ pThis, pThis->pQueue);
ENDfunc
return RS_RET_OK; /* we ignore errors, we can not do anything either way */
}
@@ -1598,22 +1555,15 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
time_t now = 0;
time_t lastAct;
int i;
- int bModifiedFilter;
- sbool FilterSave[1024];
- sbool *pFilterSave;
+ sbool *activeSave;
DEFiRet;
- if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) {
- pFilterSave = FilterSave;
- } else {
- CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
- }
+ activeSave = pBatch->active;
+ copyActive(pBatch);
- bModifiedFilter = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(!pBatch->pElem[i].bFilterOK)
+ if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i])
continue;
- pFilterSave[i] = pBatch->pElem[i].bFilterOK;
if(now == 0) {
now = datetime.GetTime(NULL); /* good time call - the only one done */
}
@@ -1622,17 +1572,17 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
* also faster ;) -- rgerhards, 2008-09-17 */
do {
lastAct = pAction->f_time;
- if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) {
+ if(pBatch->pElem[i].pMsg->msgFlags & MARK) {
if((now - lastAct) < MarkInterval / 2) {
- pBatch->pElem[i].bFilterOK = 0;
- bModifiedFilter = 1;
- DBGPRINTF("action was recently called, ignoring mark message\n");
+ pBatch->active[i] = 0;
+ DBGPRINTF("batch item %d: action was recently called, ignoring "
+ "mark message\n", i);
break; /* do not update timestamp for non-written mark messages */
}
}
} while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
- ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0);
- if(pBatch->pElem[i].bFilterOK) {
+ pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0);
+ if(pBatch->active[i]) {
DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n",
i, module.GetStateName(pAction->pMod));
}
@@ -1640,17 +1590,8 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
iRet = doSubmitToActionQBatch(pAction, pBatch);
- if(bModifiedFilter) {
- /* in this case, we need to restore previous state */
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- /* note: clang static code analyzer reports a false positive below */
- pBatch->pElem[i].bFilterOK = pFilterSave[i];
- }
- }
-
-finalize_it:
- if(pFilterSave != FilterSave)
- free(pFilterSave);
+ free(pBatch->active);
+ pBatch->active = activeSave;
RETiRet;
}
@@ -1660,8 +1601,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
{
int i;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC) {
+ if( batchIsValidElem(pBatch, i)) {
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
}
}
@@ -1675,18 +1615,13 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
static inline rsRetVal
doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
{
- sbool FilterSave[1024];
- sbool *pFilterSave;
sbool bNeedSubmit;
- sbool bModifiedFilter;
+ sbool *activeSave;
int i;
DEFiRet;
- if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) {
- pFilterSave = FilterSave;
- } else {
- CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
- }
+ activeSave = pBatch->active;
+ copyActive(pBatch);
/* note: for direct mode, we need to adjust the filter property. For non-direct
* this is not necessary, because in that case we enqueue only what actually needs
@@ -1694,37 +1629,25 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
*/
if(pAction->bExecWhenPrevSusp) {
bNeedSubmit = 0;
- bModifiedFilter = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- pFilterSave[i] = pBatch->pElem[i].bFilterOK;
if(!pBatch->pElem[i].bPrevWasSuspended) {
- DBGPRINTF("action enq stage: change bFilterOK to 0 due to "
+ DBGPRINTF("action enq stage: change active to 0 due to "
"failover case in elem %d\n", i);
- pBatch->pElem[i].bFilterOK = 0;
- bModifiedFilter = 1;
+ pBatch->active[i] = 0;
}
- if(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC) {
+ if(batchIsValidElem(pBatch, i)) {
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
bNeedSubmit = 1;
}
- DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i],
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
}
if(bNeedSubmit) {
/* note: stats were already computed above */
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
} else {
- DBGPRINTF("no need to submit batch, all bFilterOK==0 or discarded\n");
- }
- if(bModifiedFilter) {
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- /* note: clang static code analyzer reports a false positive below */
- pBatch->pElem[i].bFilterOK = pFilterSave[i];
- }
+ DBGPRINTF("no need to submit batch, all invalid\n");
}
} else {
if(GatherStats)
@@ -1732,7 +1655,8 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
}
-finalize_it:
+ free(pBatch->active);
+ pBatch->active = activeSave;
RETiRet;
}
@@ -1755,13 +1679,12 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
* TODO: optimize this, we may do at least a multi-submit!
*/
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC
+ if( batchIsValidElem(pBatch, i)
&& (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
- doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
+ doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg);
}
}
}
@@ -1784,11 +1707,10 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
DBGPRINTF("Called action %p (complex case), logging to %s\n",
pAction, module.GetStateName(pAction->pMod));
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC
+ if( batchIsValidElem(pBatch, i)
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
doActionCallAction(pAction, pBatch, i);
}
@@ -1858,7 +1780,6 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals)
}
-
/* add an Action to the current selector
* The pOMSR is freed, as it is not needed after this function.
* Note: this function pulls global data that specifies action config state.
@@ -1923,7 +1844,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
&& (pAction->ppTpl[i] =
tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) {
snprintf(errMsg, sizeof(errMsg) / sizeof(char),
- " Could not find template '%s' - action disabled\n",
+ " Could not find template '%s' - action disabled",
pTplName);
errno = 0;
errmsg.LogError(0, RS_RET_NOT_FOUND, "%s", errMsg);
@@ -1954,17 +1875,11 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->pMod = pMod;
pAction->pModData = pModData;
- /* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) {
- pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
- } else {
- DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n");
- pAction->f_ReduceRepeated = 0;
- }
+ /* check if the module is compatible with select features (currently no such features exist) */
pAction->eState = ACT_STATE_RDY; /* action is enabled */
if(bSuspended)
- actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */
+ actionSuspend(pAction);
CHKiRet(actionConstructFinalize(pAction, queueParams));
@@ -2060,13 +1975,8 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams,
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
- /* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
- pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
- else {
- DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n");
- pAction->f_ReduceRepeated = 0;
- }
+ /* check if the module is compatible with select features
+ * (currently no such features exist) */
pAction->eState = ACT_STATE_RDY; /* action is enabled */
loadConf->actions.nbrActions++; /* one more active action! */
}
@@ -2078,33 +1988,6 @@ finalize_it:
RETiRet;
}
-
-/* Process a rsyslog v6 action config object (the now-primary config method).
- * rgerhards, 2011-07-19
- */
-rsRetVal
-actionProcessCnf(struct cnfobj __attribute__((unused)) *o)
-{
- DEFiRet;
-#if 0 /* we need to check if we actually need this functionality -- later! */
-// This is for STAND-ALONE actions at the conf file TOP level
- struct cnfparamvals *paramvals;
-
- paramvals = nvlstGetParams(o->nvlst, &pblk, NULL);
- if(paramvals == NULL) {
- iRet = RS_RET_ERR;
- goto finalize_it;
- }
- DBGPRINTF("action param blk after actionProcessCnf:\n");
- cnfparamsPrint(&pblk, paramvals);
-
- /* now find module to activate */
-finalize_it:
-#endif
- RETiRet;
-}
-
-
/* TODO: we are not yet a real object, the ClassInit here just looks like it is..
*/
rsRetVal actionClassInit(void)