summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c137
1 files changed, 12 insertions, 125 deletions
diff --git a/action.c b/action.c
index ae40eadf..58ada67c 100644
--- a/action.c
+++ b/action.c
@@ -12,11 +12,11 @@
* necessary to triple-check that everything works well in *all* modes.
* The different modes (and calling sequence) are:
*
- * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval
+ * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval
* - doSubmitToActionQComplexBatch
* - helperSubmitToActionQComplexBatch
* - doActionCallAction
- * handles duplicate message processing, but in essence calls
+ * handles mark message reduction, but in essence calls
* - actionWriteToAction
* - qqueueEnqObj
* (now queue engine processing)
@@ -307,9 +307,6 @@ rsRetVal actionDestruct(action_t *pThis)
if(pThis->pMod != NULL)
pThis->pMod->freeInstance(pThis->pModData);
- if(pThis->f_pMsg != NULL)
- msgDestruct(&pThis->f_pMsg);
-
pthread_mutex_destroy(&pThis->mutAction);
pthread_mutex_destroy(&pThis->mutActExec);
d_free(pThis->pszName);
@@ -410,16 +407,11 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
* mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
*/
if( pThis->iExecEveryNthOccur > 1
- || pThis->f_ReduceRepeated
|| pThis->iSecsExecOnceInterval
) {
DBGPRINTF("info: firehose mode disabled for action because "
- "iExecEveryNthOccur=%d, "
- "ReduceRepeated=%d, "
- "iSecsExecOnceInterval=%d\n",
- pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
- pThis->iSecsExecOnceInterval
- );
+ "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n",
+ pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval);
pThis->submitToActQ = doSubmitToActionQComplexBatch;
} else if(pThis->bWriteAllMarkMsgs == RSFALSE) {
/* nearly full-speed submission mode, default case */
@@ -782,7 +774,6 @@ rsRetVal actionDbgPrint(action_t *pThis)
pThis->pMod->dbgPrintInstInfo(pThis->pModData);
dbgprintf("\n");
dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
- dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
if(pThis->eState == ACT_STATE_SUSP) {
dbgprintf("\tresume next retry: %u, number retries: %d",
@@ -1418,14 +1409,10 @@ finalize_it:
* be filtered out before calling us (what is done currently!).
*/
rsRetVal
-actionWriteToAction(action_t *pAction)
+actionWriteToAction(action_t *pAction, msg_t *pMsg)
{
- msg_t *pMsgSave; /* to save current message pointer, necessary to restore
- it in case it needs to be updated (e.g. repeated msgs) */
DEFiRet;
- pMsgSave = NULL; /* indicate message poiner not saved */
-
/* first, we check if the action should actually be called. The action-specific
* $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth
* time. So we need to check if we need to drop the (otherwise perfectly executable)
@@ -1452,43 +1439,6 @@ actionWriteToAction(action_t *pAction)
}
}
- /* then check if this is a regular message or the repeation of
- * a previous message. If so, we need to change the message text
- * to "last message repeated n times" and then go ahead and write
- * it. Please note that we can not modify the message object, because
- * that would update it in other selectors as well. As such, we first
- * need to create a local copy of the message, which we than can update.
- * rgerhards, 2007-07-10
- */
- if(pAction->f_prevcount > 1) {
- msg_t *pMsg;
- size_t lenRepMsg;
- uchar szRepMsg[1024];
-
- if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) {
- /* it failed - nothing we can do against it... */
- DBGPRINTF("Message duplication failed, dropping repeat message.\n");
- ABORT_FINALIZE(RS_RET_ERR);
- }
-
- if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times",
- pAction->f_prevcount);
- } else {
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]",
- pAction->f_prevcount, getMSG(pAction->f_pMsg));
- }
-
- /* We now need to update the other message properties. Please note that digital
- * signatures inside the message are also invalidated.
- */
- datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime));
- memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime));
- MsgReplaceMSG(pMsg, szRepMsg, lenRepMsg);
- pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */
- pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */
- }
-
DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
/* now check if we need to drop the message because otherwise the action would be too
@@ -1509,31 +1459,14 @@ actionWriteToAction(action_t *pAction)
/* we use reception time, not dequeue time - this is considered more appropriate and also faster ;)
* rgerhards, 2008-09-17 */
pAction->tLastExec = getActNow(pAction); /* re-init time flags */
- pAction->f_time = pAction->f_pMsg->ttGenTime;
+ pAction->f_time = pMsg->ttGenTime;
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
-
- if(iRet == RS_RET_OK)
- pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
+ iRet = doSubmitToActionQ(pAction, pMsg);
finalize_it:
- if(pMsgSave != NULL) {
- /* we had saved the original message pointer. That was
- * done because we needed to create a temporary one
- * (most often for "message repeated n time" handling). If so,
- * we need to restore the original one now, so that procesing
- * can continue as normal. We also need to discard the temporary
- * one, as we do not like memory leaks ;) Please note that the original
- * message object will be discarded by our callers, so this is nothing
- * of our business. rgerhards, 2007-07-10
- */
- msgDestruct(&pAction->f_pMsg);
- pAction->f_pMsg = pMsgSave; /* restore it */
- }
-
RETiRet;
}
@@ -1556,43 +1489,8 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
ABORT_FINALIZE(RS_RET_OK);
}
- /* suppress duplicate messages */
- if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL &&
- (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
- !ustrcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
- !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) &&
- !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) &&
- !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) {
- pAction->f_prevcount++;
- DBGPRINTF("msg repeated %d times, %ld sec of %d.\n",
- pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time,
- repeatinterval[pAction->f_repeatcount]);
- /* use current message, so we have the new timestamp (means we need to discard previous one) */
- msgDestruct(&pAction->f_pMsg);
- pAction->f_pMsg = MsgAddRef(pMsg);
- /* If domark would have logged this by now, flush it now (so we don't hold
- * isolated messages), but back off so we'll flush less often in the future.
- */
- if(getActNow(pAction) > REPEATTIME(pAction)) {
- iRet = actionWriteToAction(pAction);
- BACKOFF(pAction);
- }
- } else {/* new message, save it */
- /* first check if we have a previous message stored
- * if so, emit and then discard it first
- */
- if(pAction->f_pMsg != NULL) {
- if(pAction->f_prevcount > 0)
- actionWriteToAction(pAction);
- /* we do not care about iRet above - I think it's right but if we have
- * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
- */
- msgDestruct(&pAction->f_pMsg);
- }
- pAction->f_pMsg = MsgAddRef(pMsg);
- /* call the output driver */
- iRet = actionWriteToAction(pAction);
- }
+ /* call the output driver */
+ iRet = actionWriteToAction(pAction, pMsg);
finalize_it:
/* we need to update the batch to handle failover processing correctly */
@@ -1998,13 +1896,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->pMod = pMod;
pAction->pModData = pModData;
- /* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) {
- pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
- } else {
- DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n");
- pAction->f_ReduceRepeated = 0;
- }
+ /* check if the module is compatible with select features (currently no such features exist) */
pAction->eState = ACT_STATE_RDY; /* action is enabled */
pAction->requiresDateCall = actionRequiresDateCall(pAction);
@@ -2105,13 +1997,8 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams,
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
- /* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
- pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
- else {
- DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n");
- pAction->f_ReduceRepeated = 0;
- }
+ /* check if the module is compatible with select features
+ * (currently no such features exist) */
pAction->eState = ACT_STATE_RDY; /* action is enabled */
loadConf->actions.nbrActions++; /* one more active action! */
}