summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c174
1 files changed, 44 insertions, 130 deletions
diff --git a/action.c b/action.c
index ca260f92..58ada67c 100644
--- a/action.c
+++ b/action.c
@@ -12,11 +12,11 @@
* necessary to triple-check that everything works well in *all* modes.
* The different modes (and calling sequence) are:
*
- * if set iExecEveryNthOccur > 1 || f_ReduceRepeated || iSecsExecOnceInterval
+ * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval
* - doSubmitToActionQComplexBatch
* - helperSubmitToActionQComplexBatch
* - doActionCallAction
- * handles duplicate message processing, but in essence calls
+ * handles mark message reduction, but in essence calls
* - actionWriteToAction
* - qqueueEnqObj
* (now queue engine processing)
@@ -307,9 +307,6 @@ rsRetVal actionDestruct(action_t *pThis)
if(pThis->pMod != NULL)
pThis->pMod->freeInstance(pThis->pModData);
- if(pThis->f_pMsg != NULL)
- msgDestruct(&pThis->f_pMsg);
-
pthread_mutex_destroy(&pThis->mutAction);
pthread_mutex_destroy(&pThis->mutActExec);
d_free(pThis->pszName);
@@ -410,16 +407,11 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
* mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08
*/
if( pThis->iExecEveryNthOccur > 1
- || pThis->f_ReduceRepeated
|| pThis->iSecsExecOnceInterval
) {
DBGPRINTF("info: firehose mode disabled for action because "
- "iExecEveryNthOccur=%d, "
- "ReduceRepeated=%d, "
- "iSecsExecOnceInterval=%d\n",
- pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated,
- pThis->iSecsExecOnceInterval
- );
+ "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n",
+ pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval);
pThis->submitToActQ = doSubmitToActionQComplexBatch;
} else if(pThis->bWriteAllMarkMsgs == RSFALSE) {
/* nearly full-speed submission mode, default case */
@@ -782,7 +774,6 @@ rsRetVal actionDbgPrint(action_t *pThis)
pThis->pMod->dbgPrintInstInfo(pThis->pModData);
dbgprintf("\n");
dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
- dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
if(pThis->eState == ACT_STATE_SUSP) {
dbgprintf("\tresume next retry: %u, number retries: %d",
@@ -809,7 +800,8 @@ rsRetVal actionDbgPrint(action_t *pThis)
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
-static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem)
+static rsRetVal
+prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow)
{
int i;
msg_t *pMsg;
@@ -825,17 +817,17 @@ static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem)
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]),
- &pElem->staticLenStrings[i]));
+ &pElem->staticLenStrings[i], ttNow));
pElem->staticActParams[i] = pElem->staticActStrings[i];
break;
case ACT_ARRAY_PASSING:
- CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i])));
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow));
break;
case ACT_MSG_PASSING:
pElem->staticActParams[i] = (void*) pMsg;
break;
case ACT_JSON_PASSING:
- CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json));
+ CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow));
pElem->staticActParams[i] = (void*) json;
break;
default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n",
@@ -1226,14 +1218,19 @@ prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustR
{
int i;
batch_obj_t *pElem;
+ struct syslogTime ttNow;
DEFiRet;
+ if(pAction->requiresDateCall) {
+ datetime.getCurrTime(&ttNow, NULL);
+ }
+
pBatch->iDoneUpTo = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
pElem = &(pBatch->pElem[i]);
if(batchIsValidElem(pBatch, i)) {
pElem->state = BATCH_STATE_RDY;
- if(prepareDoActionParams(pAction, pElem) != RS_RET_OK) {
+ if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) {
/* make sure we have our copy of "active" array */
if(!*bMustRestoreActivePtr) {
*activeSave = pBatch->active;
@@ -1412,14 +1409,10 @@ finalize_it:
* be filtered out before calling us (what is done currently!).
*/
rsRetVal
-actionWriteToAction(action_t *pAction)
+actionWriteToAction(action_t *pAction, msg_t *pMsg)
{
- msg_t *pMsgSave; /* to save current message pointer, necessary to restore
- it in case it needs to be updated (e.g. repeated msgs) */
DEFiRet;
- pMsgSave = NULL; /* indicate message poiner not saved */
-
/* first, we check if the action should actually be called. The action-specific
* $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth
* time. So we need to check if we need to drop the (otherwise perfectly executable)
@@ -1446,43 +1439,6 @@ actionWriteToAction(action_t *pAction)
}
}
- /* then check if this is a regular message or the repeation of
- * a previous message. If so, we need to change the message text
- * to "last message repeated n times" and then go ahead and write
- * it. Please note that we can not modify the message object, because
- * that would update it in other selectors as well. As such, we first
- * need to create a local copy of the message, which we than can update.
- * rgerhards, 2007-07-10
- */
- if(pAction->f_prevcount > 1) {
- msg_t *pMsg;
- size_t lenRepMsg;
- uchar szRepMsg[1024];
-
- if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) {
- /* it failed - nothing we can do against it... */
- DBGPRINTF("Message duplication failed, dropping repeat message.\n");
- ABORT_FINALIZE(RS_RET_ERR);
- }
-
- if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times",
- pAction->f_prevcount);
- } else {
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]",
- pAction->f_prevcount, getMSG(pAction->f_pMsg));
- }
-
- /* We now need to update the other message properties. Please note that digital
- * signatures inside the message are also invalidated.
- */
- datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime));
- memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime));
- MsgReplaceMSG(pMsg, szRepMsg, lenRepMsg);
- pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */
- pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */
- }
-
DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod));
/* now check if we need to drop the message because otherwise the action would be too
@@ -1503,31 +1459,14 @@ actionWriteToAction(action_t *pAction)
/* we use reception time, not dequeue time - this is considered more appropriate and also faster ;)
* rgerhards, 2008-09-17 */
pAction->tLastExec = getActNow(pAction); /* re-init time flags */
- pAction->f_time = pAction->f_pMsg->ttGenTime;
+ pAction->f_time = pMsg->ttGenTime;
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = doSubmitToActionQ(pAction, pAction->f_pMsg);
-
- if(iRet == RS_RET_OK)
- pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
+ iRet = doSubmitToActionQ(pAction, pMsg);
finalize_it:
- if(pMsgSave != NULL) {
- /* we had saved the original message pointer. That was
- * done because we needed to create a temporary one
- * (most often for "message repeated n time" handling). If so,
- * we need to restore the original one now, so that procesing
- * can continue as normal. We also need to discard the temporary
- * one, as we do not like memory leaks ;) Please note that the original
- * message object will be discarded by our callers, so this is nothing
- * of our business. rgerhards, 2007-07-10
- */
- msgDestruct(&pAction->f_pMsg);
- pAction->f_pMsg = pMsgSave; /* restore it */
- }
-
RETiRet;
}
@@ -1550,43 +1489,8 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
ABORT_FINALIZE(RS_RET_OK);
}
- /* suppress duplicate messages */
- if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL &&
- (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
- !ustrcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
- !strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) &&
- !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) &&
- !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) {
- pAction->f_prevcount++;
- DBGPRINTF("msg repeated %d times, %ld sec of %d.\n",
- pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time,
- repeatinterval[pAction->f_repeatcount]);
- /* use current message, so we have the new timestamp (means we need to discard previous one) */
- msgDestruct(&pAction->f_pMsg);
- pAction->f_pMsg = MsgAddRef(pMsg);
- /* If domark would have logged this by now, flush it now (so we don't hold
- * isolated messages), but back off so we'll flush less often in the future.
- */
- if(getActNow(pAction) > REPEATTIME(pAction)) {
- iRet = actionWriteToAction(pAction);
- BACKOFF(pAction);
- }
- } else {/* new message, save it */
- /* first check if we have a previous message stored
- * if so, emit and then discard it first
- */
- if(pAction->f_pMsg != NULL) {
- if(pAction->f_prevcount > 0)
- actionWriteToAction(pAction);
- /* we do not care about iRet above - I think it's right but if we have
- * some troubles, you know where to look at ;) -- rgerhards, 2007-08-01
- */
- msgDestruct(&pAction->f_pMsg);
- }
- pAction->f_pMsg = MsgAddRef(pMsg);
- /* call the output driver */
- iRet = actionWriteToAction(pAction);
- }
+ /* call the output driver */
+ iRet = actionWriteToAction(pAction, pMsg);
finalize_it:
/* we need to update the batch to handle failover processing correctly */
@@ -1875,6 +1779,26 @@ actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals)
return RS_RET_OK;
}
+/* check if the templates used in this action require a date call
+ * ($NOW family of properties).
+ */
+static inline int
+actionRequiresDateCall(action_t *pAction)
+{
+ int i;
+ int r = 0;
+
+ if(pAction->eParamPassing == ACT_MSG_PASSING)
+ /* in msg passing mode, we have NO templates! */
+ goto done;
+ for(i = 0 ; i < pAction->iNumTpls ; ++i) {
+ if(tplRequiresDateCall(pAction->ppTpl[i])) {
+ r = 1;
+ break;
+ }
+ }
+done: return r;
+}
/* add an Action to the current selector
@@ -1972,14 +1896,9 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->pMod = pMod;
pAction->pModData = pModData;
- /* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) {
- pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
- } else {
- DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n");
- pAction->f_ReduceRepeated = 0;
- }
+ /* check if the module is compatible with select features (currently no such features exist) */
pAction->eState = ACT_STATE_RDY; /* action is enabled */
+ pAction->requiresDateCall = actionRequiresDateCall(pAction);
if(bSuspended)
actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */
@@ -2078,13 +1997,8 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams,
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
- /* now check if the module is compatible with select features */
- if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK)
- pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs;
- else {
- DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n");
- pAction->f_ReduceRepeated = 0;
- }
+ /* check if the module is compatible with select features
+ * (currently no such features exist) */
pAction->eState = ACT_STATE_RDY; /* action is enabled */
loadConf->actions.nbrActions++; /* one more active action! */
}