diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 230 |
1 files changed, 178 insertions, 52 deletions
@@ -54,8 +54,11 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(errmsg) +static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */ +static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */ static int glbliActionResumeInterval = 30; int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */ +static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragment in it */ /* main message queue and its configuration parameters */ static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ @@ -89,6 +92,44 @@ static int iActionNbr = 0; /* ------------------------------ methods ------------------------------ */ +/* This function returns the "current" time for this action. Current time + * is not necessarily real-time. In order to enhance performance, current + * system time is obtained the first time an action needs to know the time + * and then kept cached inside the action structure. Later requests will + * always return that very same time. Wile not totally accurate, it is far + * accurate in most cases and considered "acurate enough" for all cases. + * When changing the threading model, please keep in mind that this + * logic needs to be changed should we once allow more than one parallel + * call into the same action (object). As this is currently not supported, + * we simply cache the time inside the action object itself, after it + * is under mutex protection. + * Side-note: the value -1 is used as tActNow, because it also is the + * error return value of time(). So we would do a retry with the next + * invocation if time() failed. Then, of course, we would probably already + * be in trouble, but for the sake of performance we accept this very, + * very slight risk. + * This logic has been added as part of an overall performance improvment + * effort inspired by David Lang. -- rgerhards, 2008-09-16 + * Note: this function does not use the usual iRet call conventions + * because that would provide little to no benefit but complicate things + * a lot. So we simply return the system time. + */ +static inline time_t +getActNow(action_t *pThis) +{ + assert(pThis != NULL); + if(pThis->tActNow == -1) { + pThis->tActNow = time(NULL); /* good time call - the only one done */ + if(pThis->tLastExec > pThis->tActNow) { + /* if we are traveling back in time, reset tLastExec */ + pThis->tLastExec = (time_t) 0; + } + } + + return pThis->tActNow; +} + + /* resets action queue parameters to their default values. This happens * after each action has been created in order to prevent any wild defaults * to be used. It is somewhat against the original spirit of the config file @@ -139,7 +180,7 @@ rsRetVal actionDestruct(action_t *pThis) ASSERT(pThis != NULL); if(pThis->pQueue != NULL) { - queueDestruct(&pThis->pQueue); + qqueueDestruct(&pThis->pQueue); } if(pThis->pMod != NULL) @@ -174,6 +215,7 @@ rsRetVal actionConstruct(action_t **ppThis) pThis->iResumeInterval = glbliActionResumeInterval; pThis->iResumeRetryCount = glbliActionResumeRetryCount; + pThis->tLastOccur = time(NULL); /* done once per action on startup only */ pthread_mutex_init(&pThis->mutActExec, NULL); SYNC_OBJ_TOOL_INIT(pThis); @@ -213,7 +255,7 @@ actionConstructFinalize(action_t *pThis) * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ - CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); + CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -226,24 +268,24 @@ actionConstructFinalize(action_t *pThis) errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ } - queueSetpUsr(pThis->pQueue, pThis); - setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); - setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); - setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); - setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); - setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); - setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); - setQPROP(queueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); - setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); - setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); - setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); - setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); - setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); - setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); - setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); - setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); - setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr); - setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr); + qqueueSetpUsr(pThis->pQueue, pThis); + setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); + setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); + setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); + setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr); # undef setQPROP # undef setQPROPstr @@ -252,7 +294,7 @@ actionConstructFinalize(action_t *pThis) bActionQSaveOnShutdown, iActionQueMaxDiskSpace); - CHKiRet(queueStart(pThis->pQueue)); + CHKiRet(qqueueStart(pThis->pQueue)); dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue); /* and now reset the queue params (see comment in its function header!) */ @@ -287,30 +329,37 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal) /* suspend an action -- rgerhards, 2007-08-02 */ -rsRetVal actionSuspend(action_t *pThis) +static rsRetVal actionSuspend(action_t *pThis, time_t tNow) { DEFiRet; ASSERT(pThis != NULL); pThis->bSuspended = 1; - pThis->ttResumeRtry = time(NULL) + pThis->iResumeInterval; + pThis->ttResumeRtry = tNow + pThis->iResumeInterval; pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */ RETiRet; } + /* try to resume an action -- rgerhards, 2007-08-02 * returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the * action is still suspended. */ -rsRetVal actionTryResume(action_t *pThis) +static rsRetVal actionTryResume(action_t *pThis) { DEFiRet; time_t ttNow; ASSERT(pThis != NULL); - ttNow = time(NULL); /* do the system call just once */ + /* for resume handling, we must always obtain a fresh timestamp. We used + * to use the action timestamp, but in this case we will never reach a + * point where a resumption is actually tried, because the action timestamp + * is always in the past. So we can not avoid doing a fresh time() call + * here. -- rgerhards, 2009-03-18 + */ + time(&ttNow); /* cache "now" */ /* first check if it is time for a re-try */ if(ttNow > pThis->ttResumeRtry) { @@ -429,7 +478,7 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData); if(iRet == RS_RET_SUSPENDED) { dbgprintf("Action requested to be suspended, done that.\n"); - actionSuspend(pAction); + actionSuspend(pAction, getActNow(pAction)); } } @@ -456,6 +505,39 @@ finalize_it: } #pragma GCC diagnostic warning "-Wempty-body" + +/* call the HUP handler for a given action, if such a handler is defined. The + * action mutex is locked, because the HUP handler most probably needs to modify + * some internal state information. + * rgerhards, 2008-10-22 + */ +#pragma GCC diagnostic ignored "-Wempty-body" +rsRetVal +actionCallHUPHdlr(action_t *pAction) +{ + DEFiRet; + int iCancelStateSave; + + ASSERT(pAction != NULL); + DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); + + if(pAction->pMod->doHUP == NULL) { + FINALIZE; /* no HUP handler, so we are done ;) */ + } + + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + pthread_setcancelstate(iCancelStateSave, NULL); + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); + pthread_cleanup_pop(1); /* unlock mutex */ + +finalize_it: + RETiRet; +} +#pragma GCC diagnostic warning "-Wempty-body" + + /* set the action message queue mode * TODO: probably move this into queue object, merge with MainMsgQueue! * rgerhards, 2008-01-28 @@ -506,11 +588,37 @@ actionWriteToAction(action_t *pAction) { msg_t *pMsgSave; /* to save current message pointer, necessary to restore it in case it needs to be updated (e.g. repeated msgs) */ - time_t now; DEFiRet; pMsgSave = NULL; /* indicate message poiner not saved */ - /* first check if this is a regular message or the repeation of + + /* first, we check if the action should actually be called. The action-specific + * $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth + * time. So we need to check if we need to drop the (otherwise perfectly executable) + * action for this reason. Note that in case we need to drop it, we return RS_RET_OK + * as the action was properly "passed to execution" from the upper layer's point + * of view. -- rgerhards, 2008-08-07. + */ + if(pAction->iExecEveryNthOccur > 1) { + /* we need to care about multiple occurences */ + if( pAction->iExecEveryNthOccurTO > 0 + && (getActNow(pAction) - pAction->tLastOccur) > pAction->iExecEveryNthOccurTO) { + dbgprintf("n-th occurence handling timed out (%d sec), restarting from 0\n", + (int) (getActNow(pAction) - pAction->tLastOccur)); + pAction->iNbrNoExec = 0; + pAction->tLastOccur = getActNow(pAction); + } + if(pAction->iNbrNoExec < pAction->iExecEveryNthOccur - 1) { + ++pAction->iNbrNoExec; + dbgprintf("action %p passed %d times to execution - less than neded - discarding\n", + pAction, pAction->iNbrNoExec); + FINALIZE; + } else { + pAction->iNbrNoExec = 0; /* we execute the action now, so the number of no execs is down to */ + } + } + + /* then check if this is a regular message or the repeation of * a previous message. If so, we need to change the message text * to "last message repeated n times" and then go ahead and write * it. Please note that we can not modify the message object, because @@ -520,9 +628,7 @@ actionWriteToAction(action_t *pAction) */ if(pAction->f_prevcount > 1) { msg_t *pMsg; - uchar szRepMsg[64]; - snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times", - pAction->f_prevcount); + uchar szRepMsg[1024]; if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) { /* it failed - nothing we can do against it... */ @@ -530,12 +636,20 @@ actionWriteToAction(action_t *pAction) ABORT_FINALIZE(RS_RET_ERR); } + if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */ + snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times", + pAction->f_prevcount); + } else { + snprintf((char*)szRepMsg, sizeof(szRepMsg), "message repeated %d times: [%.800s]", + pAction->f_prevcount, getMSG(pAction->f_pMsg)); + } + /* We now need to update the other message properties. * ... RAWMSG is a problem ... Please note that digital * signatures inside the message are also invalidated. */ - datetime.getCurrTime(&(pMsg->tRcvdAt)); - datetime.getCurrTime(&(pMsg->tTIMESTAMP)); + datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime)); + memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime)); MsgSetMSG(pMsg, (char*)szRepMsg); MsgSetRawMsg(pMsg, (char*)szRepMsg); @@ -545,29 +659,29 @@ actionWriteToAction(action_t *pAction) dbgprintf("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); - time(&now); /* we need this for message repeation processing AND $ActionExecOnlyOnceEveryInterval */ - if(pAction->tLastExec > now) { - /* if we are traveling back in time, reset tLastExec */ - pAction->tLastExec = (time_t) 0; - } /* now check if we need to drop the message because otherwise the action would be too * frequently called. -- rgerhards, 2008-04-08 + * Note that the check for "pAction->iSecsExecOnceInterval > 0" is not necessary from + * a purely logical point of view. However, if safes us to check the system time in + * (those common) cases where ExecOnceInterval is not used. -- rgerhards, 2008-09-16 */ - if(pAction->f_time != 0 && pAction->iSecsExecOnceInterval + pAction->tLastExec > now) { + if(pAction->f_time != 0 && pAction->iSecsExecOnceInterval > 0 && + pAction->iSecsExecOnceInterval + pAction->tLastExec > getActNow(pAction)) { /* in this case we need to discard the message - its not yet time to exec the action */ dbgprintf("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n", - (int) pAction->iSecsExecOnceInterval, (int) now, + (int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction), (int) (pAction->iSecsExecOnceInterval + pAction->tLastExec)); + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ FINALIZE; } - pAction->tLastExec = now; /* we need this OnceInterval */ - pAction->f_time = now; /* we need this for message repeation processing */ + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + pAction->f_time = pAction->f_pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ - iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); + iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg)); if(iRet == RS_RET_OK) pAction->f_prevcount = 0; /* message processed, so we start a new cycle */ @@ -593,6 +707,10 @@ finalize_it: /* call the configured action. Does all necessary housekeeping. * rgerhards, 2007-08-01 + * FYI: currently, this function is only called from the queue + * consumer. So we (conceptually) run detached from the input + * threads (which also means we may run much later than when the + * message was generated). */ #pragma GCC diagnostic ignored "-Wempty-body" rsRetVal @@ -627,13 +745,14 @@ actionCallAction(action_t *pAction, msg_t *pMsg) ABORT_FINALIZE(RS_RET_OK); } - /* don't output marks to recently written files */ - if ((pMsg->msgFlags & MARK) && (time(NULL) - pAction->f_time) < MarkInterval / 2) { + pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ + + /* don't output marks to recently written outputs */ + if((pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { ABORT_FINALIZE(RS_RET_OK); } - /* suppress duplicate messages - */ + /* suppress duplicate messages */ if ((pAction->f_ReduceRepeated == 1) && pAction->f_pMsg != NULL && (pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) && !strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) && @@ -642,7 +761,7 @@ actionCallAction(action_t *pAction, msg_t *pMsg) !strcmp(getAPPNAME(pMsg), getAPPNAME(pAction->f_pMsg))) { pAction->f_prevcount++; dbgprintf("msg repeated %d times, %ld sec of %d.\n", - pAction->f_prevcount, (long) time(NULL) - pAction->f_time, + pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time, repeatinterval[pAction->f_repeatcount]); /* use current message, so we have the new timestamp (means we need to discard previous one) */ msgDestruct(&pAction->f_pMsg); @@ -650,12 +769,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg) /* If domark would have logged this by now, flush it now (so we don't hold * isolated messages), but back off so we'll flush less often in the future. */ - if(time(NULL) > REPEATTIME(pAction)) { + if(getActNow(pAction) > REPEATTIME(pAction)) { iRet = actionWriteToAction(pAction); BACKOFF(pAction); } - } else { - /* new message, save it */ + } else {/* new message, save it */ /* first check if we have a previous message stored * if so, emit and then discard it first */ @@ -710,6 +828,9 @@ actionAddCfSysLineHdrl(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); finalize_it: RETiRet; @@ -741,6 +862,11 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques pAction->pModData = pModData; pAction->bExecWhenPrevSusp = bActExecWhenPrevSusp; pAction->iSecsExecOnceInterval = iActExecOnceInterval; + pAction->iExecEveryNthOccur = iActExecEveryNthOccur; + pAction->iExecEveryNthOccurTO = iActExecEveryNthOccurTO; + pAction->bRepMsgHasMsg = bActionRepMsgHasMsg; + iActExecEveryNthOccur = 0; /* auto-reset */ + iActExecEveryNthOccurTO = 0; /* auto-reset */ /* check if we can obtain the template pointers - TODO: move to separate function? */ pAction->iNumTpls = OMSRgetEntryCount(pOMSR); @@ -794,7 +920,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques pAction->bEnabled = 1; /* action is enabled */ if(bSuspended) - actionSuspend(pAction); + actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */ CHKiRet(actionConstructFinalize(pAction)); |