summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c230
1 files changed, 178 insertions, 52 deletions
diff --git a/action.c b/action.c
index 5f9e35db..08755c13 100644
--- a/action.c
+++ b/action.c
@@ -54,8 +54,11 @@ DEFobjCurrIf(datetime)
DEFobjCurrIf(module)
DEFobjCurrIf(errmsg)
+static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */
+static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */
static int glbliActionResumeInterval = 30;
int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */
+static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragment in it */
/* main message queue and its configuration parameters */
static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
@@ -89,6 +92,44 @@ static int iActionNbr = 0;
/* ------------------------------ methods ------------------------------ */
+/* This function returns the "current" time for this action. Current time
+ * is not necessarily real-time. In order to enhance performance, current
+ * system time is obtained the first time an action needs to know the time
+ * and then kept cached inside the action structure. Later requests will
+ * always return that very same time. Wile not totally accurate, it is far
+ * accurate in most cases and considered "acurate enough" for all cases.
+ * When changing the threading model, please keep in mind that this
+ * logic needs to be changed should we once allow more than one parallel
+ * call into the same action (object). As this is currently not supported,
+ * we simply cache the time inside the action object itself, after it
+ * is under mutex protection.
+ * Side-note: the value -1 is used as tActNow, because it also is the
+ * error return value of time(). So we would do a retry with the next
+ * invocation if time() failed. Then, of course, we would probably already
+ * be in trouble, but for the sake of performance we accept this very,
+ * very slight risk.
+ * This logic has been added as part of an overall performance improvment
+ * effort inspired by David Lang. -- rgerhards, 2008-09-16
+ * Note: this function does not use the usual iRet call conventions
+ * because that would provide little to no benefit but complicate things
+ * a lot. So we simply return the system time.
+ */
+static inline time_t
+getActNow(action_t *pThis)
+{
+ assert(pThis != NULL);
+ if(pThis->tActNow == -1) {
+ pThis->tActNow = time(NULL); /* good time call - the only one done */
+ if(pThis->tLastExec > pThis->tActNow) {
+ /* if we are traveling back in time, reset tLastExec */
+ pThis->tLastExec = (time_t) 0;
+ }
+ }
+
+ return pThis->tActNow;
+}
+
+
/* resets action queue parameters to their default values. This happens
* after each action has been created in order to prevent any wild defaults
* to be used. It is somewhat against the original spirit of the config file
@@ -139,7 +180,7 @@ rsRetVal actionDestruct(action_t *pThis)
ASSERT(pThis != NULL);
if(pThis->pQueue != NULL) {
- queueDestruct(&pThis->pQueue);
+ qqueueDestruct(&pThis->pQueue);
}
if(pThis->pMod != NULL)
@@ -174,6 +215,7 @@ rsRetVal actionConstruct(action_t **ppThis)
pThis->iResumeInterval = glbliActionResumeInterval;
pThis->iResumeRetryCount = glbliActionResumeRetryCount;
+ pThis->tLastOccur = time(NULL); /* done once per action on startup only */
pthread_mutex_init(&pThis->mutActExec, NULL);
SYNC_OBJ_TOOL_INIT(pThis);
@@ -213,7 +255,7 @@ actionConstructFinalize(action_t *pThis)
* to be run on multiple threads. So far, this is forbidden by the interface
* spec. -- rgerhards, 2008-01-30
*/
- CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
+ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -226,24 +268,24 @@ actionConstructFinalize(action_t *pThis)
errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
}
- queueSetpUsr(pThis->pQueue, pThis);
- setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
- setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
- setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
- setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
- setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
- setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
- setQPROP(queueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
- setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
- setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
- setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
- setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
- setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
- setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
- setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
- setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
- setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
- setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
+ qqueueSetpUsr(pThis->pQueue, pThis);
+ setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
+ setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
+ setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
+ setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
+ setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
+ setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
+ setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
+ setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
+ setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
+ setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
+ setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
+ setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
+ setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
+ setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
+ setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
+ setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
@@ -252,7 +294,7 @@ actionConstructFinalize(action_t *pThis)
bActionQSaveOnShutdown, iActionQueMaxDiskSpace);
- CHKiRet(queueStart(pThis->pQueue));
+ CHKiRet(qqueueStart(pThis->pQueue));
dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue);
/* and now reset the queue params (see comment in its function header!) */
@@ -287,30 +329,37 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal)
/* suspend an action -- rgerhards, 2007-08-02
*/
-rsRetVal actionSuspend(action_t *pThis)
+static rsRetVal actionSuspend(action_t *pThis, time_t tNow)
{
DEFiRet;
ASSERT(pThis != NULL);
pThis->bSuspended = 1;
- pThis->ttResumeRtry = time(NULL) + pThis->iResumeInterval;
+ pThis->ttResumeRtry = tNow + pThis->iResumeInterval;
pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */
RETiRet;
}
+
/* try to resume an action -- rgerhards, 2007-08-02
* returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the
* action is still suspended.
*/
-rsRetVal actionTryResume(action_t *pThis)
+static rsRetVal actionTryResume(action_t *pThis)
{
DEFiRet;
time_t ttNow;
ASSERT(pThis != NULL);
- ttNow = time(NULL); /* do the system call just once */
+ /* for resume handling, we must always obtain a fresh timestamp. We used
+ * to use the action timestamp, but in this case we will never reach a
+ * point where a resumption is actually tried, because the action timestamp
+ * is always in the past. So we can not avoid doing a fresh time() call
+ * here. -- rgerhards, 2009-03-18
+ */
+ time(&ttNow); /* cache "now" */
/* first check if it is time for a re-try */
if(ttNow > pThis->ttResumeRtry) {
@@ -429,7 +478,7 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData);
if(iRet == RS_RET_SUSPENDED) {
dbgprintf("Action requested to be suspended, done that.\n");
- actionSuspend(pAction);
+ actionSuspend(pAction, getActNow(pAction));
}
}
@@ -456,6 +505,39 @@ finalize_it:
}
#pragma GCC diagnostic warning "-Wempty-body"
+
+/* call the HUP handler for a given action, if such a handler is defined. The
+ * action mutex is locked, because the HUP handler most probably needs to modify
+ * some internal state information.
+ * rgerhards, 2008-10-22
+ */
+#pragma GCC diagnostic ignored "-Wempty-body"
+rsRetVal
+actionCallHUPHdlr(action_t *pAction)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ASSERT(pAction != NULL);
+ DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP);
+
+ if(pAction->pMod->doHUP == NULL) {
+ FINALIZE; /* no HUP handler, so we are done ;) */
+ }
+
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pAction->mutActExec);
+ pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ CHKiRet(pAction->pMod->doHUP(pAction->pModData));
+ pthread_cleanup_pop(1); /* unlock mutex */
+
+finalize_it:
+ RETiRet;
+}
+#pragma GCC diagnostic warning "-Wempty-body"
+
+
/* set the action message queue mode
* TODO: probably move this into queue object, merge with MainMsgQueue!
* rgerhards, 2008-01-28
@@ -506,11 +588,37 @@ actionWriteToAction(action_t *pAction)
{
msg_t *pMsgSave; /* to save current message pointer, necessary to restore
it in case it needs to be updated (e.g. repeated msgs) */
- time_t now;
DEFiRet;
pMsgSave = NULL; /* indicate message poiner not saved */
- /* first check if this is a regular message or the repeation of
+
+ /* first, we check if the action should actually be called. The action-specific
+ * $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth
+ * time. So we need to check if we need to drop the (otherwise perfectly executable)
+ * action for this reason. Note that in case we need to drop it, we return RS_RET_OK
+ * as the action was properly "passed to execution" from the upper layer's point
+ * of view. -- rgerhards, 2008-08-07.
+ */
+ if(pAction->iExecEveryNthOccur > 1) {
+ /* we need to care about multiple occurences */
+ if( pAction->iExecEveryNthOccurTO > 0
+ && (getActNow(pAction) - pAction->tLastOccur) > pAction->iExecEveryNthOccurTO) {
+ dbgprintf("n-th occurence handling timed out (%d sec), restarting from 0\n",
+ (int) (getActNow(pAction) - pAction->tLastOccur));
+ pAction->iNbrNoExec = 0;
+ pAction->tLastOccur = getActNow(pAction);
+ }
+ if(pAction->iNbrNoExec < pAction->iExecEveryNthOccur - 1) {
+ ++pAction->iNbrNoExec;
+ dbgprintf("action %p passed %d times to execution - less than neded - discarding\n",
+ pAction, pAction->iNbrNoExec);
+ FINALIZE;
+ } else {
+ pAction->iNbrNoExec = 0; /* we execute the action now, so the number of no execs is down to */
+ }
+ }
+
+ /* then check if this is a regular message or the repeation of
* a previous message. If so, we need to change the message text
* to "last message repeated n times" and then go ahead and write
* it. Please note that we can not modify the message object, because
@@ -520,9 +628,7 @@ actionWriteToAction(action_t *pAction)
*/
if(pAction->f_prevcount > 1) {
msg_t *pMsg;
- uchar szRepMsg[64];
- snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times",
- pAction->f_prevcount);
+ uchar szRepMsg[1024];
if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) {
/* it failed - nothing we can do against it... */
@@ -530,12 +636,20 @@ actionWriteToAction(action_t *pAction)
ABORT_FINALIZE(RS_RET_ERR);
}
+ if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */
+ snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times",
+ pAction->f_prevcount);
+ } else {
+ snprintf((char*)szRepMsg, sizeof(szRepMsg), "message repeated %d times: [%.800s]",
+ pAction->f_prevcount, getMSG(pAction->f_pMsg));
+ }
+
/* We now need to update the other message properties.
* ... RAWMSG is a problem ... Please note that digital
* signatures inside the message are also invalidated.
*/
- datetime.getCurrTime(&(pMsg->tRcvdAt));
- datetime.getCurrTime(&(pMsg->tTIMESTAMP));
+ datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime));
+ memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime));
MsgSetMSG(pMsg, (char*)szRepMsg);
MsgSetRawMsg(pMsg, (char*)szRepMsg);
@@ -545,29 +659,29 @@ actionWriteToAction(action_t *pAction)
dbgprintf("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
- time(&now); /* we need this for message repeation processing AND $ActionExecOnlyOnceEveryInterval */
- if(pAction->tLastExec > now) {
- /* if we are traveling back in time, reset tLastExec */
- pAction->tLastExec = (time_t) 0;
- }
/* now check if we need to drop the message because otherwise the action would be too
* frequently called. -- rgerhards, 2008-04-08
+ * Note that the check for "pAction->iSecsExecOnceInterval > 0" is not necessary from
+ * a purely logical point of view. However, if safes us to check the system time in
+ * (those common) cases where ExecOnceInterval is not used. -- rgerhards, 2008-09-16
*/
- if(pAction->f_time != 0 && pAction->iSecsExecOnceInterval + pAction->tLastExec > now) {
+ if(pAction->f_time != 0 && pAction->iSecsExecOnceInterval > 0 &&
+ pAction->iSecsExecOnceInterval + pAction->tLastExec > getActNow(pAction)) {
/* in this case we need to discard the message - its not yet time to exec the action */
dbgprintf("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n",
- (int) pAction->iSecsExecOnceInterval, (int) now,
+ (int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction),
(int) (pAction->iSecsExecOnceInterval + pAction->tLastExec));
+ pAction->tLastExec = getActNow(pAction); /* re-init time flags */
FINALIZE;
}
- pAction->tLastExec = now; /* we need this OnceInterval */
- pAction->f_time = now; /* we need this for message repeation processing */
+ /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
+ pAction->f_time = pAction->f_pMsg->ttGenTime;
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
+ iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
@@ -593,6 +707,10 @@ finalize_it:
/* call the configured action. Does all necessary housekeeping.
* rgerhards, 2007-08-01
+ * FYI: currently, this function is only called from the queue
+ * consumer. So we (conceptually) run detached from the input
+ * threads (which also means we may run much later than when the
+ * message was generated).
*/
#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
@@ -627,13 +745,14 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
ABORT_FINALIZE(RS_RET_OK);
}
- /* don't output marks to recently written files */
- if ((pMsg->msgFlags & MARK) && (time(NULL) - pAction->f_time) < MarkInterval / 2) {
+ pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
+
+ /* don't output marks to recently written outputs */
+ if((pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) {
ABORT_FINALIZE(RS_RET_OK);
}
- /* suppress duplicate messages
- */
+ /* suppress duplicate messages */
if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL &&
(pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
!strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
@@ -642,7 +761,7 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
!strcmp(getAPPNAME(pMsg), getAPPNAME(pAction->f_pMsg))) {
pAction->f_prevcount++;
dbgprintf("msg repeated %d times, %ld sec of %d.\n",
- pAction->f_prevcount, (long) time(NULL) - pAction->f_time,
+ pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time,
repeatinterval[pAction->f_repeatcount]);
/* use current message, so we have the new timestamp (means we need to discard previous one) */
msgDestruct(&pAction->f_pMsg);
@@ -650,12 +769,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
/* If domark would have logged this by now, flush it now (so we don't hold
* isolated messages), but back off so we'll flush less often in the future.
*/
- if(time(NULL) > REPEATTIME(pAction)) {
+ if(getActNow(pAction) > REPEATTIME(pAction)) {
iRet = actionWriteToAction(pAction);
BACKOFF(pAction);
}
- } else {
- /* new message, save it */
+ } else {/* new message, save it */
/* first check if we have a previous message stored
* if so, emit and then discard it first
*/
@@ -710,6 +828,9 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL));
finalize_it:
RETiRet;
@@ -741,6 +862,11 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
pAction->pModData = pModData;
pAction->bExecWhenPrevSusp = bActExecWhenPrevSusp;
pAction->iSecsExecOnceInterval = iActExecOnceInterval;
+ pAction->iExecEveryNthOccur = iActExecEveryNthOccur;
+ pAction->iExecEveryNthOccurTO = iActExecEveryNthOccurTO;
+ pAction->bRepMsgHasMsg = bActionRepMsgHasMsg;
+ iActExecEveryNthOccur = 0; /* auto-reset */
+ iActExecEveryNthOccurTO = 0; /* auto-reset */
/* check if we can obtain the template pointers - TODO: move to separate function? */
pAction->iNumTpls = OMSRgetEntryCount(pOMSR);
@@ -794,7 +920,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
pAction->bEnabled = 1; /* action is enabled */
if(bSuspended)
- actionSuspend(pAction);
+ actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */
CHKiRet(actionConstructFinalize(pAction));