diff options
author | Andre Lorbach <alorbach@adiscon.com> | 2013-06-18 13:51:00 +0200 |
---|---|---|
committer | Andre Lorbach <alorbach@adiscon.com> | 2013-06-18 13:51:00 +0200 |
commit | 95d69e9b1f846458d95d4ce74da2ae151a49fb39 (patch) | |
tree | 7d167a47e839614384fbf4894663cb5c6f1b1db3 /action.c | |
parent | c4fc57f4c551602e36091040b77bc5e22e299122 (diff) | |
parent | 520f0325e7c10b6a47f721ac353a6036d554cbc3 (diff) | |
download | rsyslog-95d69e9b1f846458d95d4ce74da2ae151a49fb39.tar.gz rsyslog-95d69e9b1f846458d95d4ce74da2ae151a49fb39.tar.bz2 rsyslog-95d69e9b1f846458d95d4ce74da2ae151a49fb39.zip |
Merge remote branch 'upstream/master'
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 2040 |
1 files changed, 2040 insertions, 0 deletions
diff --git a/action.c b/action.c new file mode 100644 index 00000000..85766941 --- /dev/null +++ b/action.c @@ -0,0 +1,2040 @@ +/* action.c + * + * Implementation of the action object. + * + * File begun on 2007-08-06 by RGerhards (extracted from syslogd.c) + * + * Some notes on processing (this hopefully makes it easier to find + * the right code in question): For performance reasons, this module + * uses different methods of message submission based on the user-selected + * configuration. This code is similar, but can not be abstracted because + * of the performance-affecting differences in it. As such, it is often + * necessary to triple-check that everything works well in *all* modes. + * The different modes (and calling sequence) are: + * + * if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval + * - doSubmitToActionQComplexBatch + * - helperSubmitToActionQComplexBatch + * - doActionCallAction + * handles mark message reduction, but in essence calls + * - actionWriteToAction + * - qqueueEnqObj + * (now queue engine processing) + * if(pThis->bWriteAllMarkMsgs == RSFALSE) - this is the DEFAULT + * - doSubmitToActionQNotAllMarkBatch + * - doSubmitToActionQBatch (and from here like in the else case below!) + * else + * - doSubmitToActionQBatch + * - doSubmitToActionQ + * - qqueueEnqObj + * (now queue engine processing) + * + * Note that bWriteAllMakrMsgs on or off creates almost the same processing. + * The difference ist that if WriteAllMarkMsgs is not set, we need to + * preprocess the batch and drop mark messages which are not yet due for + * writing. + * + * After dequeue, processing is as follows: + * - processBatchMain + * - processAction + * - submitBatch + * - tryDoAction + * - ... + * + * MORE ON PROCESSING, QUEUES and FILTERING + * All filtering needs to be done BEFORE messages are enqueued to an + * action. In previous code, part of the filtering was done at the + * "remote end" of the action queue, which lead to problems in + * non-direct mode (because then things run asynchronously). In order + * to solve this problem once and for all, I have changed the code so + * that all filtering is done before enq, and processing on the + * dequeue side of action processing now always executes whatever is + * enqueued. This is the only way to handle things consistently and + * (as much as possible) in a queue-type agnostic way. However, it is + * a rather radical change, which I unfortunately needed to make from + * stable version 5.8.1 to 5.8.2. If new problems pop up, you now know + * what may be their cause. In any case, the way it is done now is the + * only correct one. + * A problem is that, under fortunate conditions, we use the current + * batch for the output system as well. This is very good from a performance + * point of view, but makes the distinction between enq and deq side of + * the queue a bit hard. The current idea is that the filter condition + * alone is checked at the deq side of the queue (seems to be unavoidable + * to do it that way), but all other complex conditons (like failover + * handling) go into the computation of the filter condition. For + * non-direct queues, we still enqueue only what is acutally necessary. + * Note that in this case the rest of the code must ensure that the filter + * is set to "true". While this is not perfect and not as simple as + * we would like to see it, it looks like the best way to tackle that + * beast. + * rgerhards, 2011-06-15 + * + * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <assert.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <strings.h> +#include <time.h> +#include <errno.h> +#include <json/json.h> + +#include "dirty.h" +#include "template.h" +#include "action.h" +#include "modules.h" +#include "cfsysline.h" +#include "srUtils.h" +#include "errmsg.h" +#include "batch.h" +#include "wti.h" +#include "rsconf.h" +#include "datetime.h" +#include "unicode-helper.h" +#include "atomic.h" +#include "ruleset.h" +#include "statsobj.h" + +#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ + +/* forward definitions */ +static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); +static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch); +static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch); + +/* object static data (once for all instances) */ +/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ +DEFobjCurrIf(obj) +DEFobjCurrIf(datetime) +DEFobjCurrIf(module) +DEFobjCurrIf(errmsg) +DEFobjCurrIf(statsobj) +DEFobjCurrIf(ruleset) + + +typedef struct configSettings_s { + int bActExecWhenPrevSusp; /* execute action only when previous one was suspended? */ + int bActionWriteAllMarkMsgs; /* should all mark messages be unconditionally written? */ + int iActExecOnceInterval; /* execute action once every nn seconds */ + int iActExecEveryNthOccur; /* execute action every n-th occurence (0,1=always) */ + time_t iActExecEveryNthOccurTO; /* timeout for n-occurence setting (in seconds, 0=never) */ + int glbliActionResumeInterval; + int glbliActionResumeRetryCount; /* how often should suspended actions be retried? */ + int bActionRepMsgHasMsg; /* last messsage repeated... has msg fragment in it */ + uchar *pszActionName; /* short name for the action */ + /* action queue and its configuration parameters */ + queueType_t ActionQueType; /* type of the main message queue above */ + int iActionQueueSize; /* size of the main message queue above */ + int iActionQueueDeqBatchSize; /* batch size for action queues */ + int iActionQHighWtrMark; /* high water mark for disk-assisted queues */ + int iActionQLowWtrMark; /* low water mark for disk-assisted queues */ + int iActionQDiscardMark; /* begin to discard messages */ + int iActionQDiscardSeverity; /* by default, discard nothing to prevent unintentional loss */ + int iActionQueueNumWorkers; /* number of worker threads for the mm queue above */ + uchar *pszActionQFName; /* prefix for the main message queue file */ + int64 iActionQueMaxFileSize; + int iActionQPersistUpdCnt; /* persist queue info every n updates */ + int bActionQSyncQeueFiles; /* sync queue files */ + int iActionQtoQShutdown; /* queue shutdown */ + int iActionQtoActShutdown; /* action shutdown (in phase 2) */ + int iActionQtoEnq; /* timeout for queue enque */ + int iActionQtoWrkShutdown; /* timeout for worker thread shutdown */ + int iActionQWrkMinMsgs; /* minimum messages per worker needed to start a new one */ + int bActionQSaveOnShutdown; /* save queue on shutdown (when DA enabled)? */ + int64 iActionQueMaxDiskSpace; /* max disk space allocated 0 ==> unlimited */ + int iActionQueueDeqSlowdown; /* dequeue slowdown (simple rate limiting) */ + int iActionQueueDeqtWinFromHr; /* hour begin of time frame when queue is to be dequeued */ + int iActionQueueDeqtWinToHr; /* hour begin of time frame when queue is to be dequeued */ +} configSettings_t; + +configSettings_t cs; /* our current config settings */ +configSettings_t cs_save; /* our saved (scope!) config settings */ + +/* the counter below counts actions created. It is used to obtain unique IDs for the action. They + * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice + * to have during one instance of an rsyslogd run. For example, I use them to name actions when there + * is no better name available. Note that I do NOT recover previous numbers on HUP - we simply keep + * counting. -- rgerhards, 2008-01-29 + */ +static int iActionNbr = 0; + +/* tables for interfacing with the v6 config system */ +static struct cnfparamdescr cnfparamdescr[] = { + { "name", eCmdHdlrGetWord, 0 }, /* legacy: actionname */ + { "type", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: actionname */ + { "action.writeallmarkmessages", eCmdHdlrBinary, 0 }, /* legacy: actionwriteallmarkmessages */ + { "action.execonlyeverynthtime", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtime */ + { "action.execonlyeverynthtimetimeout", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtimetimeout */ + { "action.execonlyonceeveryinterval", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyonceeveryinterval */ + { "action.execonlywhenpreviousissuspended", eCmdHdlrBinary, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */ + { "action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0 }, /* legacy: repeatedmsgcontainsoriginalmsg */ + { "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */ + { "action.resumeinterval", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk pblk = + { CNFPARAMBLK_VERSION, + sizeof(cnfparamdescr)/sizeof(struct cnfparamdescr), + cnfparamdescr + }; + +/* ------------------------------ methods ------------------------------ */ + +/* This function returns the "current" time for this action. Current time + * is not necessarily real-time. In order to enhance performance, current + * system time is obtained the first time an action needs to know the time + * and then kept cached inside the action structure. Later requests will + * always return that very same time. Wile not totally accurate, it is far + * accurate in most cases and considered "acurate enough" for all cases. + * When changing the threading model, please keep in mind that this + * logic needs to be changed should we once allow more than one parallel + * call into the same action (object). As this is currently not supported, + * we simply cache the time inside the action object itself, after it + * is under mutex protection. + * Side-note: the value -1 is used as tActNow, because it also is the + * error return value of time(). So we would do a retry with the next + * invocation if time() failed. Then, of course, we would probably already + * be in trouble, but for the sake of performance we accept this very, + * very slight risk. + * This logic has been added as part of an overall performance improvment + * effort inspired by David Lang. -- rgerhards, 2008-09-16 + * Note: this function does not use the usual iRet call conventions + * because that would provide little to no benefit but complicate things + * a lot. So we simply return the system time. + */ +static inline time_t +getActNow(action_t *pThis) +{ + assert(pThis != NULL); + if(pThis->tActNow == -1) { + pThis->tActNow = datetime.GetTime(NULL); /* good time call - the only one done */ + if(pThis->tLastExec > pThis->tActNow) { + /* if we are traveling back in time, reset tLastExec */ + pThis->tLastExec = (time_t) 0; + } + } + + return pThis->tActNow; +} + + +/* resets action queue parameters to their default values. This happens + * after each action has been created in order to prevent any wild defaults + * to be used. It is somewhat against the original spirit of the config file + * reader, but I think it is a good thing to do. + * rgerhards, 2008-01-29 + */ +static rsRetVal +actionResetQueueParams(void) +{ + DEFiRet; + + cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ + cs.iActionQueueSize = 1000; /* size of the main message queue above */ + cs.iActionQueueDeqBatchSize = 16; /* default batch size */ + cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ + cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ + cs.iActionQDiscardMark = 980; /* begin to discard messages */ + cs.iActionQDiscardSeverity = 8; /* discard warning and above */ + cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ + cs.iActionQueMaxFileSize = 1024*1024; + cs.iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ + cs.bActionQSyncQeueFiles = 0; + cs.iActionQtoQShutdown = 0; /* queue shutdown */ + cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ + cs.iActionQtoEnq = 50; /* timeout for queue enque */ + cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ + cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ + cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + cs.iActionQueMaxDiskSpace = 0; + cs.iActionQueueDeqSlowdown = 0; + cs.iActionQueueDeqtWinFromHr = 0; + cs.iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */ + + cs.glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */ + + d_free(cs.pszActionQFName); + cs.pszActionQFName = NULL; /* prefix for the main message queue file */ + + RETiRet; +} + + +/* destructs an action descriptor object + * rgerhards, 2007-08-01 + */ +rsRetVal actionDestruct(action_t *pThis) +{ + DEFiRet; + ASSERT(pThis != NULL); + + if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) { + /* discard actions will be optimized out */ + FINALIZE; + } + + if(pThis->pQueue != NULL) { + qqueueDestruct(&pThis->pQueue); + } + + /* destroy stats object, if we have one (may not always be + * be the case, e.g. if turned off) + */ + if(pThis->statsobj != NULL) + statsobj.Destruct(&pThis->statsobj); + + if(pThis->pMod != NULL) + pThis->pMod->freeInstance(pThis->pModData); + + pthread_mutex_destroy(&pThis->mutAction); + pthread_mutex_destroy(&pThis->mutActExec); + d_free(pThis->pszName); + d_free(pThis->ppTpl); + +finalize_it: + d_free(pThis); + RETiRet; +} + + +/* create a new action descriptor object + * rgerhards, 2007-08-01 + * Note that it is vital to set proper initial values as the v6 config + * system depends on these! + */ +rsRetVal actionConstruct(action_t **ppThis) +{ + DEFiRet; + action_t *pThis; + + ASSERT(ppThis != NULL); + + CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t))); + pThis->iResumeInterval = 30; + pThis->iResumeRetryCount = 0; + pThis->pszName = NULL; + pThis->bWriteAllMarkMsgs = RSFALSE; + pThis->iExecEveryNthOccur = 0; + pThis->iExecEveryNthOccurTO = 0; + pThis->iSecsExecOnceInterval = 0; + pThis->bExecWhenPrevSusp = 0; + pThis->bRepMsgHasMsg = 0; + pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ + pthread_mutex_init(&pThis->mutActExec, NULL); + pthread_mutex_init(&pThis->mutAction, NULL); + INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); + + /* indicate we have a new action */ + ++iActionNbr; + +finalize_it: + *ppThis = pThis; + RETiRet; +} + + +/* action construction finalizer + */ +rsRetVal +actionConstructFinalize(action_t *pThis, struct nvlst *lst) +{ + DEFiRet; + uchar pszAName[64]; /* friendly name of our action */ + + ASSERT(pThis != NULL); + + if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) { + /* discard actions will be optimized out */ + FINALIZE; + } + /* generate a friendly name for us action stats */ + if(pThis->pszName == NULL) { + snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr); + } else { + ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); + pszAName[sizeof(pszAName)-1] = '\0'; /* to be on the save side */ + } + + /* support statistics gathering */ + CHKiRet(statsobj.Construct(&pThis->statsobj)); + CHKiRet(statsobj.SetName(pThis->statsobj, pszAName)); + + STATSCOUNTER_INIT(pThis->ctrProcessed, pThis->mutCtrProcessed); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("processed"), + ctrType_IntCtr, &pThis->ctrProcessed)); + + STATSCOUNTER_INIT(pThis->ctrFail, pThis->mutCtrFail); + CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("failed"), + ctrType_IntCtr, &pThis->ctrFail)); + + CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); + + /* create our queue */ + + /* generate a friendly name for the queue */ + if(pThis->pszName == NULL) { + snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d queue", + iActionNbr); + } else { + ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); + pszAName[63] = '\0'; /* to be on the save side */ + } + /* now check if we can run the action in "firehose mode" during stage one of + * its processing (that is before messages are enqueued into the action q). + * This is only possible if some features, which require strict sequence, are + * not used. Thankfully, that is usually the case. The benefit of firehose + * mode is much faster processing (and simpler code) -- rgerhards, 2010-06-08 + */ + if( pThis->iExecEveryNthOccur > 1 + || pThis->iSecsExecOnceInterval + ) { + DBGPRINTF("info: firehose mode disabled for action because " + "iExecEveryNthOccur=%d, iSecsExecOnceInterval=%d\n", + pThis->iExecEveryNthOccur, pThis->iSecsExecOnceInterval); + pThis->submitToActQ = doSubmitToActionQComplexBatch; + } else if(pThis->bWriteAllMarkMsgs == RSFALSE) { + /* nearly full-speed submission mode, default case */ + pThis->submitToActQ = doSubmitToActionQNotAllMarkBatch; + } else { + /* full firehose submission mode */ + pThis->submitToActQ = doSubmitToActionQBatch; + } + + /* create queue */ + /* action queues always (for now) have just one worker. This may change when + * we begin to implement an interface the enable output modules to request + * to be run on multiple threads. So far, this is forbidden by the interface + * spec. -- rgerhards, 2008-01-30 + */ + CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, + (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); + obj.SetName((obj_t*) pThis->pQueue, pszAName); + qqueueSetpAction(pThis->pQueue, pThis); + + if(lst == NULL) { /* use legacy params? */ + /* ... set some properties ... */ +# define setQPROP(func, directive, data) \ + CHKiRet_Hdlr(func(pThis->pQueue, data)) { \ + errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", \ + error %d. Ignored, running with default setting", iRet); \ + } +# define setQPROPstr(func, directive, data) \ + CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \ + errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", \ + error %d. Ignored, running with default setting", iRet); \ + } + setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize); + setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize); + setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles); + setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr); + } else { + /* we have v6-style config params */ + qqueueSetDefaultsActionQueue(pThis->pQueue); + qqueueApplyCnfParam(pThis->pQueue, lst); + } + +# undef setQPROP +# undef setQPROPstr + + qqueueDbgPrint(pThis->pQueue); + + DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue); + + /* and now reset the queue params (see comment in its function header!) */ + actionResetQueueParams(); + +finalize_it: + RETiRet; +} + + + +/* set the global resume interval + */ +rsRetVal actionSetGlobalResumeInterval(int iNewVal) +{ + cs.glbliActionResumeInterval = iNewVal; + return RS_RET_OK; +} + + +/* returns the action state name in human-readable form + * returned string must not be modified. + * rgerhards, 2009-05-07 + */ +static uchar *getActStateName(action_t *pThis) +{ + switch(pThis->eState) { + case ACT_STATE_RDY: + return (uchar*) "rdy"; + case ACT_STATE_ITX: + return (uchar*) "itx"; + case ACT_STATE_RTRY: + return (uchar*) "rtry"; + case ACT_STATE_SUSP: + return (uchar*) "susp"; + case ACT_STATE_DIED: + return (uchar*) "died"; + case ACT_STATE_COMM: + return (uchar*) "comm"; + default: + return (uchar*) "ERROR/UNKNWON"; + } +} + + +/* returns a suitable return code based on action state + * rgerhards, 2009-05-07 + */ +static rsRetVal getReturnCode(action_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + switch(pThis->eState) { + case ACT_STATE_RDY: + iRet = RS_RET_OK; + break; + case ACT_STATE_ITX: + if(pThis->bHadAutoCommit) { + pThis->bHadAutoCommit = 0; /* auto-reset */ + iRet = RS_RET_PREVIOUS_COMMITTED; + } else { + iRet = RS_RET_DEFER_COMMIT; + } + break; + case ACT_STATE_RTRY: + iRet = RS_RET_SUSPENDED; + break; + case ACT_STATE_SUSP: + case ACT_STATE_DIED: + iRet = RS_RET_ACTION_FAILED; + break; + default: + DBGPRINTF("Invalid action engine state %d, program error\n", + (int) pThis->eState); + iRet = RS_RET_ERR; + break; + } + + RETiRet; +} + + +/* set the action to a new state + * rgerhards, 2007-08-02 + */ +static inline void actionSetState(action_t *pThis, action_state_t newState) +{ + pThis->eState = newState; + DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); +} + +/* Handles the transient commit state. So far, this is + * mostly a dummy... + * rgerhards, 2007-08-02 + */ +static void actionCommitted(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RDY); +} + + +/* set action to "rtry" state. + * rgerhards, 2007-08-02 + */ +static void actionRetry(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RTRY); + pThis->iResumeOKinRow++; +} + + +/* Disable action, this means it will never again be usable + * until rsyslog is reloaded. Use only as a last resort, but + * depends on output module. + * rgerhards, 2007-08-02 + */ +static void actionDisable(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_DIED); +} + + +/* Suspend action, this involves changing the acton state as well + * as setting the next retry time. + * if we have more than 10 retries, we prolong the + * retry interval. If something is really stalled, it will + * get re-tried only very, very seldom - but that saves + * CPU time. TODO: maybe a config option for that? + * rgerhards, 2007-08-02 + */ +static inline void actionSuspend(action_t *pThis) +{ + time_t ttNow; + + /* note: we can NOT use a cached timestamp, as time may have evolved + * since caching, and this would break logic (and it actually did so!) + */ + datetime.GetTime(&ttNow); + pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + actionSetState(pThis, ACT_STATE_SUSP); + DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry); +} + + +/* actually do retry processing. Note that the function receives a timestamp so + * that we do not need to call the (expensive) time() API. + * Note that we do the full retry processing here, doing the configured number of + * iterations. -- rgerhards, 2009-05-07 + * We need to guard against module which always return RS_RET_OK from their tryResume() + * entry point. This is invalid, but has harsh consequences: it will cause the rsyslog + * engine to go into a tight loop. That obviously is not acceptable. As such, we track the + * count of iterations that a tryResume returning RS_RET_OK is immediately followed by + * an unsuccessful call to doAction(). If that happens more than 1,000 times, we assume + * the return acutally is a RS_RET_SUSPENDED. In order to go through the various + * resumption stages, we do this for every 1000 requests. This magic number 1000 may + * not be the most appropriate, but it should be thought of a "if nothing else helps" + * kind of facility: in the first place, the module should return a proper indication + * of its inability to recover. -- rgerhards, 2010-04-26. + */ +static inline rsRetVal +actionDoRetry(action_t *pThis, int *pbShutdownImmediate) +{ + int iRetries; + int iSleepPeriod; + int bTreatOKasSusp; + DEFiRet; + + ASSERT(pThis != NULL); + + iRetries = 0; + while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { + DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); + iRet = pThis->pMod->tryResume(pThis->pModData); + DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); + if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { + bTreatOKasSusp = 1; + pThis->iResumeOKinRow = 0; + } else { + bTreatOKasSusp = 0; + } + if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) { + DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet); + actionSetState(pThis, ACT_STATE_RDY); + } else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) { + /* max retries reached? */ + DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n", + pThis->iResumeRetryCount, iRetries); + if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { + actionSuspend(pThis); + } else { + ++pThis->iNbrResRtry; + ++iRetries; + iSleepPeriod = pThis->iResumeInterval; + srSleep(iSleepPeriod, 0); + if(*pbShutdownImmediate) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + } + } else if(iRet == RS_RET_DISABLE_ACTION) { + actionDisable(pThis); + } + } + + if(pThis->eState == ACT_STATE_RDY) { + pThis->iNbrResRtry = 0; + } + +finalize_it: + RETiRet; +} + + +/* try to resume an action -- rgerhards, 2007-08-02 + * changed to new action state engine -- rgerhards, 2009-05-07 + */ +static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate) +{ + DEFiRet; + time_t ttNow = NO_TIME_PROVIDED; + + ASSERT(pThis != NULL); + + if(pThis->eState == ACT_STATE_SUSP) { + /* if we are suspended, we need to check if the timeout expired. + * for this handling, we must always obtain a fresh timestamp. We used + * to use the action timestamp, but in this case we will never reach a + * point where a resumption is actually tried, because the action timestamp + * is always in the past. So we can not avoid doing a fresh time() call + * here. -- rgerhards, 2009-03-18 + */ + datetime.GetTime(&ttNow); /* cache "now" */ + if(ttNow >= pThis->ttResumeRtry) { + actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ + } + } + + if(pThis->eState == ACT_STATE_RTRY) { + if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ + datetime.GetTime(&ttNow); + CHKiRet(actionDoRetry(pThis, pbShutdownImmediate)); + } + + if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n", + pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + } + +finalize_it: + RETiRet; +} + + +/* prepare an action for performing work. This involves trying to recover it, + * depending on its current state. + * rgerhards, 2009-05-07 + */ +static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate) +{ + DEFiRet; + + assert(pThis != NULL); + CHKiRet(actionTryResume(pThis, pbShutdownImmediate)); + + /* if we are now ready, we initialize the transaction and advance + * action state accordingly + */ + if(pThis->eState == ACT_STATE_RDY) { + iRet = pThis->pMod->mod.om.beginTransaction(pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionSetState(pThis, ACT_STATE_ITX); + break; + case RS_RET_SUSPENDED: + actionRetry(pThis); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + default:FINALIZE; + } + } + +finalize_it: + RETiRet; +} + + +/* debug-print the contents of an action object + * rgerhards, 2007-08-02 + */ +rsRetVal actionDbgPrint(action_t *pThis) +{ + DEFiRet; + char *sz; + + dbgprintf("%s: ", module.GetStateName(pThis->pMod)); + pThis->pMod->dbgPrintInstInfo(pThis->pModData); + dbgprintf("\n"); + dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); + dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); + if(pThis->eState == ACT_STATE_SUSP) { + dbgprintf("\tresume next retry: %u, number retries: %d", + (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); + } + dbgprintf("\tState: %s\n", getActStateName(pThis)); + dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); + if(pThis->submitToActQ == doSubmitToActionQComplexBatch) { + sz = "slow, but feature-rich"; + } else if(pThis->submitToActQ == doSubmitToActionQNotAllMarkBatch) { + sz = "fast, but supports partial mark messages"; + } else if(pThis->submitToActQ == doSubmitToActionQBatch) { + sz = "firehose (fastest)"; + } else { + sz = "unknown (need to update debug display?)"; + } + dbgprintf("\tsubmission mode: %s\n", sz); + dbgprintf("\n"); + + RETiRet; +} + + +/* prepare the calling parameters for doAction() + * rgerhards, 2009-05-07 + */ +static rsRetVal +prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow) +{ + int i; + msg_t *pMsg; + struct json_object *json; + DEFiRet; + + ASSERT(pAction != NULL); + ASSERT(pElem != NULL); + + pMsg = pElem->pMsg; + /* here we must loop to process all requested strings */ + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + switch(pAction->eParamPassing) { + case ACT_STRING_PASSING: + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), + &pElem->staticLenStrings[i], ttNow)); + pElem->staticActParams[i] = pElem->staticActStrings[i]; + break; + case ACT_ARRAY_PASSING: + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow)); + break; + case ACT_MSG_PASSING: + pElem->staticActParams[i] = (void*) pMsg; + break; + case ACT_JSON_PASSING: + CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); + pElem->staticActParams[i] = (void*) json; + break; + default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", + (int) pAction->eParamPassing); + assert(0); /* software bug if this happens! */ + break; + } + } + +finalize_it: + RETiRet; +} + + +/* free a batches ressources, but not string buffers (because they will + * most probably be reused). String buffers are only deleted upon final + * destruction of the batch. + * This function here must be called only when the batch is actually no + * longer used, also not for retrying actions or such. It invalidates + * buffers. + * rgerhards, 2010-12-17 + */ +static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) +{ + int jArr; + int i, j; + batch_obj_t *pElem; + uchar ***ppMsgs; + DEFiRet; + + ASSERT(pAction != NULL); + + if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING) + goto done; /* we need to do nothing with these types! */ + + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + pElem = &(pBatch->pElem[i]); + if(batchIsValidElem(pBatch, i)) { + switch(pAction->eParamPassing) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pElem->staticActParams; + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + if(((uchar**)ppMsgs)[j] != NULL) { + jArr = 0; + while(ppMsgs[j][jArr] != NULL) { + d_free(ppMsgs[j][jArr]); + ppMsgs[j][jArr] = NULL; + ++jArr; + } + d_free(((uchar**)ppMsgs)[j]); + ((uchar**)ppMsgs)[j] = NULL; + } + } + break; + case ACT_JSON_PASSING: + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + json_object_put((struct json_object*) + pElem->staticActParams[j]); + pElem->staticActParams[j] = NULL; + } + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* can never happen, just to keep compiler happy! */ + break; + } + } + } + +done: RETiRet; +} + + +/* call the DoAction output plugin entry point + * rgerhards, 2008-01-28 + */ +rsRetVal +actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + + DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); + + pThis->bHadAutoCommit = 0; + iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis); + pThis->iResumeOKinRow = 0; /* we had a successful call! */ + break; + case RS_RET_DEFER_COMMIT: + pThis->iResumeOKinRow = 0; /* we had a successful call! */ + /* we are done, action state remains the same */ + break; + case RS_RET_PREVIOUS_COMMITTED: + /* action state remains the same, but we had a commit. */ + pThis->bHadAutoCommit = 1; + pThis->iResumeOKinRow = 0; /* we had a successful call! */ + break; + case RS_RET_SUSPENDED: + actionRetry(pThis); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + FINALIZE; + } + iRet = getReturnCode(pThis); + +finalize_it: + RETiRet; +} + + +/* process a message + * this readies the action and then calls doAction() + * rgerhards, 2008-01-28 + */ +static inline rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + + CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); + if(pThis->eState == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); + + iRet = getReturnCode(pThis); +finalize_it: + RETiRet; +} + + +/* finish processing a batch. Most importantly, that means we commit if we + * need to do so. + * rgerhards, 2008-01-28 + */ +static rsRetVal +finishBatch(action_t *pThis, batch_t *pBatch) +{ + int i; + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->eState == ACT_STATE_RDY) { + /* we just need to flag the batch as commited */ + FINALIZE; /* nothing to do */ + } + + CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate)); + if(pThis->eState == ACT_STATE_ITX) { + iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis); + /* flag messages as committed */ + for(i = 0 ; i < pBatch->nElem ; ++i) { + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */ + } + break; + case RS_RET_SUSPENDED: + actionRetry(pThis); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + case RS_RET_DEFER_COMMIT: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " + "- ignored\n"); + actionCommitted(pThis); + break; + case RS_RET_PREVIOUS_COMMITTED: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " + "- ignored\n"); + actionCommitted(pThis); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + FINALIZE; + } + } + iRet = getReturnCode(pThis); + +finalize_it: + RETiRet; +} + + +/* try to submit a partial batch of elements. + * rgerhards, 2009-05-12 + */ +static inline rsRetVal +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) +{ + int i; + int iElemProcessed; + int iCommittedUpTo; + msg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + assert(pnElem != NULL); + + i = pBatch->iDoneUpTo; /* all messages below that index are processed */ + iElemProcessed = 0; + iCommittedUpTo = i; + DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); + while(iElemProcessed <= *pnElem && i < pBatch->nElem) { + if(*(pBatch->pbShutdownImmediate)) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + /* NOTE: do NOT extend the filter below! Anything else must be done on the + * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 + */ + if(batchIsValidElem(pBatch, i)) { + pMsg = pBatch->pElem[i].pMsg; + localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, + pBatch->pbShutdownImmediate); + DBGPRINTF("action %p call returned %d\n", pAction, localRet); + /* Note: we directly modify the batch object state, because we know that + * wo do not overwrite BATCH_STATE_DISC indicators! + */ + if(localRet == RS_RET_OK) { + /* mark messages as committed */ + while(iCommittedUpTo <= i) { + pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ + batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); + ++iCommittedUpTo; + //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + /* mark messages as committed */ + while(iCommittedUpTo < i) { + pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ + batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); + ++iCommittedUpTo; + //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + pBatch->eltState[i] = BATCH_STATE_SUB; + } else if(localRet == RS_RET_DEFER_COMMIT) { + pBatch->eltState[i] = BATCH_STATE_SUB; + } else if(localRet == RS_RET_DISCARDMSG) { + pBatch->eltState[i] = BATCH_STATE_DISC; + } else { + dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", + localRet, *pnElem, iCommittedUpTo); + iRet = localRet; + FINALIZE; + } + } + ++i; + ++iElemProcessed; + } + +finalize_it: + if(pBatch->iDoneUpTo != iCommittedUpTo) { + pBatch->iDoneUpTo = iCommittedUpTo; + } + RETiRet; +} + +/* submit a batch for actual action processing. + * The first nElem elements are processed. This function calls itself + * recursively if it needs to handle errors. + * Note: we don't need the number of the first message to be processed as a parameter, + * because this is kept track of inside the batch itself (iDoneUpTo). + * rgerhards, 2009-05-12 + */ +static rsRetVal +submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +{ + int i; + int bDone; + rsRetVal localRet; + int wasDoneTo; + DEFiRet; + + assert(pBatch != NULL); + + DBGPRINTF("submitBatch: enter, nElem %d\n", nElem); + wasDoneTo = pBatch->iDoneUpTo; + bDone = 0; + do { + localRet = tryDoAction(pAction, pBatch, &nElem); + if(localRet == RS_RET_FORCE_TERM) { + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + /* try commit transaction, once done, we can simply do so as if + * that return state was returned from tryDoAction(). + */ + localRet = finishBatch(pAction, pBatch); + } + + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + bDone = 1; + } else if(localRet == RS_RET_SUSPENDED) { + DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n"); + /* do nothing, this will retry the full batch */ + } else if(localRet == RS_RET_ACTION_FAILED) { + /* in this case, everything not yet committed is BAD */ + for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { + if( pBatch->eltState[i] != BATCH_STATE_DISC + && pBatch->eltState[i] != BATCH_STATE_COMM ) { + pBatch->eltState[i] = BATCH_STATE_BAD; + pBatch->pElem[i].bPrevWasSuspended = 1; + STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); + } + } + bDone = 1; + } else { + if(nElem == 1) { + batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); + bDone = 1; + } else { + /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ + DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " + "for iRet %d\n", localRet); + submitBatch(pAction, pBatch, nElem / 2); + submitBatch(pAction, pBatch, nElem - (nElem / 2)); + bDone = 1; + } + } + } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ + + if(*(pBatch->pbShutdownImmediate)) + ABORT_FINALIZE(RS_RET_FORCE_TERM); + +finalize_it: + RETiRet; +} + + +/* copy "active" array of batch, as we need to modify it. The caller + * must make sure the new array is freed and the orginal batch + * pointer is restored (thus the caller must save it). If active + * is currently NULL, this is properly handled. + * Note: the batches active pointer is modified, so it must be + * saved BEFORE calling this function! + * rgerhards, 2012-09-12 + */ +static rsRetVal +copyActive(batch_t *pBatch) +{ + sbool *active; + DEFiRet; + + CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); + if(pBatch->active == NULL) + memset(active, 1, batchNumMsgs(pBatch)); + else + memcpy(active, pBatch->active, batchNumMsgs(pBatch)); + pBatch->active = active; +finalize_it: + RETiRet; +} + +/* The following function prepares a batch for processing, that it is + * reinitializes batch states, generates strings and does everything else + * that needs to be done in order to make the batch ready for submission to + * the actual output module. Note that we look at the precomputed + * filter OK condition and process only those messages, that actually matched + * the filter. + * rgerhards, 2010-06-14 + */ +static inline rsRetVal +prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr) +{ + int i; + batch_obj_t *pElem; + struct syslogTime ttNow; + DEFiRet; + + /* indicate we have not yet read the date */ + ttNow.year = 0; + + pBatch->iDoneUpTo = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + pElem = &(pBatch->pElem[i]); + if(batchIsValidElem(pBatch, i)) { + pBatch->eltState[i] = BATCH_STATE_RDY; + if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) { + /* make sure we have our copy of "active" array */ + if(!*bMustRestoreActivePtr) { + *activeSave = pBatch->active; + copyActive(pBatch); + } + pBatch->active[i] = RSFALSE; + } + } + } + RETiRet; +} + + +/* receive a batch and process it. This includes retry handling. + * rgerhards, 2009-05-12 + */ +static inline rsRetVal +processAction(action_t *pAction, batch_t *pBatch) +{ + DEFiRet; + + assert(pBatch != NULL); + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem)); + iRet = finishBatch(pAction, pBatch); + +finalize_it: + RETiRet; +} + + +#pragma GCC diagnostic ignored "-Wempty-body" +/* receive an array of to-process user pointers and submit them + * for processing. + * rgerhards, 2009-04-22 + */ +static rsRetVal +processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) +{ + int *pbShutdownImmdtSave; + sbool *activeSave; + int bMustRestoreActivePtr = 0; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + + if(pbShutdownImmediate != NULL) { + pbShutdownImmdtSave = pBatch->pbShutdownImmediate; + pBatch->pbShutdownImmediate = pbShutdownImmediate; + } + CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); + + /* We now must guard the output module against execution by multiple threads. The + * plugin interface specifies that output modules must not be thread-safe (except + * if they notify us they are - functionality not yet implemented...). + * rgerhards, 2008-01-30 + */ + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + + iRet = processAction(pAction, pBatch); + + pthread_cleanup_pop(1); /* unlock mutex */ + + /* even if processAction failed, we need to release the batch (else we + * have a memory leak). So we do this first, and then check if we need to + * return an error code. If so, the code from processAction has priority. + * rgerhards, 2010-12-17 + */ + localRet = releaseBatch(pAction, pBatch); + + if(iRet == RS_RET_OK) + iRet = localRet; + + if(bMustRestoreActivePtr) { + free(pBatch->active); + pBatch->active = activeSave; + } + +finalize_it: + if(pbShutdownImmediate != NULL) + pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + RETiRet; +} +#pragma GCC diagnostic warning "-Wempty-body" + + +/* call the HUP handler for a given action, if such a handler is defined. The + * action mutex is locked, because the HUP handler most probably needs to modify + * some internal state information. + * rgerhards, 2008-10-22 + */ +#pragma GCC diagnostic ignored "-Wempty-body" +rsRetVal +actionCallHUPHdlr(action_t *pAction) +{ + DEFiRet; + + ASSERT(pAction != NULL); + DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP); + + if(pAction->pMod->doHUP == NULL) { + FINALIZE; /* no HUP handler, so we are done ;) */ + } + + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + CHKiRet(pAction->pMod->doHUP(pAction->pModData)); + pthread_cleanup_pop(1); /* unlock mutex */ + +finalize_it: + RETiRet; +} +#pragma GCC diagnostic warning "-Wempty-body" + + +/* set the action message queue mode + * TODO: probably move this into queue object, merge with MainMsgQueue! + * rgerhards, 2008-01-28 + */ +static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszType) +{ + DEFiRet; + + if (!strcasecmp((char *) pszType, "fixedarray")) { + cs.ActionQueType = QUEUETYPE_FIXED_ARRAY; + DBGPRINTF("action queue type set to FIXED_ARRAY\n"); + } else if (!strcasecmp((char *) pszType, "linkedlist")) { + cs.ActionQueType = QUEUETYPE_LINKEDLIST; + DBGPRINTF("action queue type set to LINKEDLIST\n"); + } else if (!strcasecmp((char *) pszType, "disk")) { + cs.ActionQueType = QUEUETYPE_DISK; + DBGPRINTF("action queue type set to DISK\n"); + } else if (!strcasecmp((char *) pszType, "direct")) { + cs.ActionQueType = QUEUETYPE_DIRECT; + DBGPRINTF("action queue type set to DIRECT (no queueing at all)\n"); + } else { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, "unknown actionqueue parameter: %s", (char *) pszType); + iRet = RS_RET_INVALID_PARAMS; + } + d_free(pszType); /* no longer needed */ + + RETiRet; +} + + +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. This is also utilized to submit messages in complex case once + * the complex logic has been applied ;) + * rgerhards, 2010-06-08 + */ +static inline rsRetVal +doSubmitToActionQ(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + + if(pAction->eState == ACT_STATE_DIED) { + DBGPRINTF("action %p died, do NOT execute\n", pAction); + FINALIZE; + } + + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) + iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg)); + else + iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); + +finalize_it: + RETiRet; +} + + +/* This function builds up a batch of messages to be (later) + * submitted to the action queue. + * Important: this function MUST not be called with messages that are to + * be discarded due to their "prevWasSuspended" state. It will not check for + * this and submit all messages to the queue for execution. So these must + * be filtered out before calling us (what is done currently!). + */ +rsRetVal +actionWriteToAction(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + + /* first, we check if the action should actually be called. The action-specific + * $ActionExecOnlyEveryNthTime permits us to execute an action only every Nth + * time. So we need to check if we need to drop the (otherwise perfectly executable) + * action for this reason. Note that in case we need to drop it, we return RS_RET_OK + * as the action was properly "passed to execution" from the upper layer's point + * of view. -- rgerhards, 2008-08-07. + */ + if(pAction->iExecEveryNthOccur > 1) { + /* we need to care about multiple occurences */ + if( pAction->iExecEveryNthOccurTO > 0 + && (getActNow(pAction) - pAction->tLastOccur) > pAction->iExecEveryNthOccurTO) { + DBGPRINTF("n-th occurence handling timed out (%d sec), restarting from 0\n", + (int) (getActNow(pAction) - pAction->tLastOccur)); + pAction->iNbrNoExec = 0; + pAction->tLastOccur = getActNow(pAction); + } + if(pAction->iNbrNoExec < pAction->iExecEveryNthOccur - 1) { + ++pAction->iNbrNoExec; + DBGPRINTF("action %p passed %d times to execution - less than neded - discarding\n", + pAction, pAction->iNbrNoExec); + FINALIZE; + } else { + pAction->iNbrNoExec = 0; /* we execute the action now, so the number of no execs is down to */ + } + } + + DBGPRINTF("Called action(complex case), logging to %s\n", module.GetStateName(pAction->pMod)); + + /* now check if we need to drop the message because otherwise the action would be too + * frequently called. -- rgerhards, 2008-04-08 + * Note that the check for "pAction->iSecsExecOnceInterval > 0" is not necessary from + * a purely logical point of view. However, if safes us to check the system time in + * (those common) cases where ExecOnceInterval is not used. -- rgerhards, 2008-09-16 + */ + if(pAction->iSecsExecOnceInterval > 0 && + pAction->iSecsExecOnceInterval + pAction->tLastExec > getActNow(pAction)) { + /* in this case we need to discard the message - its not yet time to exec the action */ + DBGPRINTF("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n", + (int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction), + (int) (pAction->iSecsExecOnceInterval + pAction->tLastExec)); + FINALIZE; + } + + /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) + * rgerhards, 2008-09-17 */ + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ + pAction->f_time = pMsg->ttGenTime; + + /* When we reach this point, we have a valid, non-disabled action. + * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 + */ + iRet = doSubmitToActionQ(pAction, pMsg); + +finalize_it: + RETiRet; +} + + +/* helper to actonCallAction, mostly needed because of this damn + * pthread_cleanup_push() POSIX macro... + */ +static inline rsRetVal +doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) +{ + msg_t *pMsg; + DEFiRet; + + pMsg = pBatch->pElem[idxBtch].pMsg; + pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ + + /* don't output marks to recently written outputs */ + if(pAction->bWriteAllMarkMsgs == RSFALSE + && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { + ABORT_FINALIZE(RS_RET_OK); + } + + /* call the output driver */ + iRet = actionWriteToAction(pAction, pMsg); + +finalize_it: + /* we need to update the batch to handle failover processing correctly */ + if(iRet == RS_RET_OK) { + pBatch->pElem[idxBtch].bPrevWasSuspended = 0; + } else if(iRet == RS_RET_ACTION_FAILED) { + pBatch->pElem[idxBtch].bPrevWasSuspended = 1; + } + + RETiRet; +} + + +/* helper to activateActions, it activates a specific action. + */ +DEFFUNC_llExecFunc(doActivateActions) +{ + rsRetVal localRet; + action_t *pThis = (action_t*) pData; + BEGINfunc + localRet = qqueueStart(pThis->pQueue); + if(localRet != RS_RET_OK) { + errmsg.LogError(0, localRet, "error starting up action queue"); + if(localRet == RS_RET_FILE_PREFIX_MISSING) { + errmsg.LogError(0, localRet, "file prefix (work directory?) " + "is missing"); + } + actionDisable(pThis); + } + DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod), + pThis, pThis->pQueue); + ENDfunc + return RS_RET_OK; /* we ignore errors, we can not do anything either way */ +} + + +/* This function "activates" the action after privileges have been dropped. Currently, + * this means that the queues are started. + * rgerhards, 2011-05-02 + */ +rsRetVal +activateActions(void) +{ + DEFiRet; + iRet = ruleset.IterateAllActions(ourConf, doActivateActions, NULL); + RETiRet; +} + + + +/* This submits the message to the action queue in case where we need to handle + * bWriteAllMarkMessage == RSFALSE only. Note that we use a non-blocking CAS loop + * for the synchronization. Here, we just modify the filter condition to be false when + * a mark message must not be written. However, in this case we must save the previous + * filter as we may need it in the next action (potential future optimization: check if this is + * the last action TODO). + * rgerhards, 2010-06-08 + */ +static rsRetVal +doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) +{ + time_t now = 0; + time_t lastAct; + int i; + sbool *activeSave; + DEFiRet; + + activeSave = pBatch->active; + copyActive(pBatch); + + for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { + if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i]) + continue; + if(now == 0) { + now = datetime.GetTime(NULL); /* good time call - the only one done */ + } + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if(pBatch->pElem[i].pMsg->msgFlags & MARK) { + if((now - lastAct) < MarkInterval / 2) { + pBatch->active[i] = 0; + DBGPRINTF("batch item %d: action was recently called, ignoring " + "mark message\n", i); + break; /* do not update timestamp for non-written mark messages */ + } + } + } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, + pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); + if(pBatch->active[i]) { + DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", + i, module.GetStateName(pAction->pMod)); + } + } + + iRet = doSubmitToActionQBatch(pAction, pBatch); + + free(pBatch->active); + pBatch->active = activeSave; + + RETiRet; +} + +static inline void +countStatsBatchEnq(action_t *pAction, batch_t *pBatch) +{ + int i; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if( batchIsValidElem(pBatch, i)) { + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + } + } +} + + +/* enqueue a batch in direct mode. We have put this into its own function just to avoid + * cluttering the actual submit function. + * rgerhards, 2011-06-16 + */ +static inline rsRetVal +doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) +{ + sbool bNeedSubmit; + sbool *activeSave; + int i; + DEFiRet; + + activeSave = pBatch->active; + copyActive(pBatch); + + /* note: for direct mode, we need to adjust the filter property. For non-direct + * this is not necessary, because in that case we enqueue only what actually needs + * to be processed. + */ + if(pAction->bExecWhenPrevSusp) { + bNeedSubmit = 0; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(!pBatch->pElem[i].bPrevWasSuspended) { + DBGPRINTF("action enq stage: change active to 0 due to " + "failover case in elem %d\n", i); + pBatch->active[i] = 0; + } + if(batchIsValidElem(pBatch, i)) { + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); + bNeedSubmit = 1; + } + DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i], + pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); + } + if(bNeedSubmit) { + /* note: stats were already computed above */ + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + } else { + DBGPRINTF("no need to submit batch, all invalid\n"); + } + } else { + if(GatherStats) + countStatsBatchEnq(pAction, pBatch); + iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); + } + + free(pBatch->active); + pBatch->active = activeSave; + RETiRet; +} + +/* This submits the message to the action queue in case we do NOT need to handle repeat + * message processing. That case permits us to gain lots of freedom during processing + * and thus speed. + * rgerhards, 2010-06-08 + */ +static rsRetVal +doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod)); + + if(pAction->pQueue->qType == QUEUETYPE_DIRECT) { + iRet = doQueueEnqObjDirectBatch(pAction, pBatch); + } else {/* in this case, we do single submits to the queue. + * TODO: optimize this, we may do at least a multi-submit! + */ + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], + pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); + if( batchIsValidElem(pBatch, i) + && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { + doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg); + } + } + } + + RETiRet; +} + + + +/* Helper to submit a batch of actions to the engine. Note that we have rather + * complicated processing here, so we need to do this one message after another. + * rgerhards, 2010-06-23 + */ +static inline rsRetVal +helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +{ + int i; + DEFiRet; + + DBGPRINTF("Called action %p (complex case), logging to %s\n", + pAction, module.GetStateName(pAction->pMod)); + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i], + pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); + if( batchIsValidElem(pBatch, i) + && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { + doActionCallAction(pAction, pBatch, i); + } + } + + RETiRet; +} + +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 + */ +#pragma GCC diagnostic ignored "-Wempty-body" +static rsRetVal +doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) +{ + DEFiRet; + + d_pthread_mutex_lock(&pAction->mutAction); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); + iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); + d_pthread_mutex_unlock(&pAction->mutAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ + + RETiRet; +} +#pragma GCC diagnostic warning "-Wempty-body" + + +/* apply all params from param block to action. This supports the v6 config system. + * Defaults must have been set appropriately during action construct! + * rgerhards, 2011-08-01 + */ +static rsRetVal +actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) +{ + int i; + + for(i = 0 ; i < pblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(pblk.descr[i].name, "name")) { + pAction->pszName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(pblk.descr[i].name, "type")) { + continue; /* this is handled seperately during module select! */ + } else if(!strcmp(pblk.descr[i].name, "action.writeallmarkmessages")) { + pAction->bWriteAllMarkMsgs = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlyeverynthtime")) { + pAction->iExecEveryNthOccur = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlyeverynthtimetimeout")) { + pAction->iExecEveryNthOccurTO = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlyonceeveryinterval")) { + pAction->iSecsExecOnceInterval = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlywhenpreviousissuspended")) { + pAction->bExecWhenPrevSusp = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.repeatedmsgcontainsoriginalmsg")) { + pAction->bRepMsgHasMsg = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.resumeretrycount")) { + pAction->iResumeRetryCount = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.resumeinterval")) { + pAction->iResumeInterval = pvals[i].val.d.n; + } else { + dbgprintf("action: program error, non-handled " + "param '%s'\n", pblk.descr[i].name); + } + } + return RS_RET_OK; +} + + +/* add an Action to the current selector + * The pOMSR is freed, as it is not needed after this function. + * Note: this function pulls global data that specifies action config state. + * rgerhards, 2007-07-27 + */ +rsRetVal +addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, + omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, + struct nvlst *lst, int bSuspended) +{ + DEFiRet; + int i; + int iTplOpts; + uchar *pTplName; + action_t *pAction; + char errMsg[512]; + + assert(ppAction != NULL); + assert(pMod != NULL); + assert(pOMSR != NULL); + DBGPRINTF("Module %s processes this action.\n", module.GetName(pMod)); + + CHKiRet(actionConstruct(&pAction)); /* create action object first */ + pAction->pMod = pMod; + pAction->pModData = pModData; + if(actParams == NULL) { /* use legacy systemn */ + pAction->pszName = cs.pszActionName; + pAction->iResumeInterval = cs.glbliActionResumeInterval; + pAction->iResumeRetryCount = cs.glbliActionResumeRetryCount; + pAction->bWriteAllMarkMsgs = cs.bActionWriteAllMarkMsgs; + pAction->bExecWhenPrevSusp = cs.bActExecWhenPrevSusp; + pAction->iSecsExecOnceInterval = cs.iActExecOnceInterval; + pAction->iExecEveryNthOccur = cs.iActExecEveryNthOccur; + pAction->iExecEveryNthOccurTO = cs.iActExecEveryNthOccurTO; + pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg; + cs.iActExecEveryNthOccur = 0; /* auto-reset */ + cs.iActExecEveryNthOccurTO = 0; /* auto-reset */ + cs.bActionWriteAllMarkMsgs = RSFALSE; /* auto-reset */ + cs.pszActionName = NULL; /* free again! */ + } else { + actionApplyCnfParam(pAction, actParams); + } + + /* check if we can obtain the template pointers - TODO: move to separate function? */ + pAction->iNumTpls = OMSRgetEntryCount(pOMSR); + assert(pAction->iNumTpls >= 0); /* only debug check because this "can not happen" */ + /* please note: iNumTpls may validly be zero. This is the case if the module + * does not request any templates. This sounds unlikely, but an actual example is + * the discard action, which does not require a string. -- rgerhards, 2007-07-30 + */ + if(pAction->iNumTpls > 0) { + /* we first need to create the template pointer array */ + CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *))); + } + + for(i = 0 ; i < pAction->iNumTpls ; ++i) { + CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts)); + /* Ok, we got everything, so it now is time to look up the template + * (Hint: templates MUST be defined before they are used!) + */ + if( !(iTplOpts & OMSR_TPL_AS_MSG) + && (pAction->ppTpl[i] = + tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) { + snprintf(errMsg, sizeof(errMsg) / sizeof(char), + " Could not find template '%s' - action disabled", + pTplName); + errno = 0; + errmsg.LogError(0, RS_RET_NOT_FOUND, "%s", errMsg); + ABORT_FINALIZE(RS_RET_NOT_FOUND); + } + /* check required template options */ + if( (iTplOpts & OMSR_RQD_TPL_OPT_SQL) + && (pAction->ppTpl[i]->optFormatEscape == 0)) { + errno = 0; + errmsg.LogError(0, RS_RET_RQD_TPLOPT_MISSING, "Action disabled. To use this action, you have to specify " + "the SQL or stdSQL option in your template!\n"); + ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING); + } + + /* set parameter-passing mode */ + if(iTplOpts & OMSR_TPL_AS_ARRAY) { + pAction->eParamPassing = ACT_ARRAY_PASSING; + } else if(iTplOpts & OMSR_TPL_AS_MSG) { + pAction->eParamPassing = ACT_MSG_PASSING; + } else if(iTplOpts & OMSR_TPL_AS_JSON) { + pAction->eParamPassing = ACT_JSON_PASSING; + } else { + pAction->eParamPassing = ACT_STRING_PASSING; + } + + DBGPRINTF("template: '%s' assigned\n", pTplName); + } + + pAction->pMod = pMod; + pAction->pModData = pModData; + /* check if the module is compatible with select features (currently no such features exist) */ + pAction->eState = ACT_STATE_RDY; /* action is enabled */ + + if(bSuspended) + actionSuspend(pAction); + + CHKiRet(actionConstructFinalize(pAction, lst)); + + /* TODO: if we exit here, we have a memory leak... */ + + *ppAction = pAction; /* finally store the action pointer */ + +finalize_it: + if(iRet == RS_RET_OK) + iRet = OMSRdestruct(pOMSR); + else { + /* do not overwrite error state! */ + OMSRdestruct(pOMSR); + if(pAction != NULL) + actionDestruct(pAction); + } + + RETiRet; +} + + +/* Reset config variables to default values. + * rgerhards, 2009-11-12 + */ +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + cs.iActExecOnceInterval = 0; + cs.bActExecWhenPrevSusp = 0; + return RS_RET_OK; +} + + +/* initialize (current) config variables. + * Used at program start and when a new scope is created. + */ +static inline void +initConfigVariables(void) +{ + cs.bActionWriteAllMarkMsgs = RSFALSE; + cs.glbliActionResumeRetryCount = 0; + cs.bActExecWhenPrevSusp = 0; + cs.iActExecOnceInterval = 0; + cs.iActExecEveryNthOccur = 0; + cs.iActExecEveryNthOccurTO = 0; + cs.glbliActionResumeInterval = 30; + cs.glbliActionResumeRetryCount = 0; + cs.bActionRepMsgHasMsg = 0; + if(cs.pszActionName != NULL) { + free(cs.pszActionName); + cs.pszActionName = NULL; + } + actionResetQueueParams(); +} + + +rsRetVal +actionNewInst(struct nvlst *lst, action_t **ppAction) +{ + struct cnfparamvals *paramvals; + modInfo_t *pMod; + uchar *cnfModName = NULL; + omodStringRequest_t *pOMSR; + void *pModData; + action_t *pAction; + int typeIdx; + DEFiRet; + + paramvals = nvlstGetParams(lst, &pblk, NULL); + if(paramvals == NULL) { + ABORT_FINALIZE(RS_RET_ERR); + } + dbgprintf("action param blk after actionNewInst:\n"); + cnfparamsPrint(&pblk, paramvals); + typeIdx = cnfparamGetIdx(&pblk, "type"); + if(paramvals[typeIdx].bUsed == 0) { + errmsg.LogError(0, RS_RET_CONF_RQRD_PARAM_MISSING, "action type missing"); + ABORT_FINALIZE(RS_RET_CONF_RQRD_PARAM_MISSING); // TODO: move this into rainerscript handlers + } + cnfModName = (uchar*)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL); + if((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) { + errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName); + ABORT_FINALIZE(RS_RET_MOD_UNKNOWN); + } + iRet = pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR); + // TODO: check if RS_RET_SUSPENDED is still valid in v6! + if(iRet != RS_RET_OK && iRet != RS_RET_SUSPENDED) { + FINALIZE; /* iRet is already set to error state */ + } + + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, lst, + (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + /* check if the module is compatible with select features + * (currently no such features exist) */ + pAction->eState = ACT_STATE_RDY; /* action is enabled */ + loadConf->actions.nbrActions++; /* one more active action! */ + } + *ppAction = pAction; + +finalize_it: + free(cnfModName); + cnfparamvalsDestruct(paramvals, &pblk); + RETiRet; +} + +/* TODO: we are not yet a real object, the ClassInit here just looks like it is.. + */ +rsRetVal actionClassInit(void) +{ + DEFiRet; + /* request objects we use */ + CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */ + CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(module, CORE_COMPONENT)); + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); + + CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &cs.pszActionName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &cs.pszActionQFName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &cs.bActionWriteAllMarkMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqBatchSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxDiskSpace, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQHighWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQLowWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardSeverity, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &cs.iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &cs.bActionQSyncQeueFiles, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &cs.iActionQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoQShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &cs.iActionQtoActShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &cs.iActionQtoEnq, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &cs.iActionQWrkMinMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &cs.bActionQSaveOnShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqSlowdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinFromHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinToHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &cs.iActExecEveryNthOccur, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &cs.iActExecEveryNthOccurTO, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &cs.iActExecOnceInterval, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &cs.bActionRepMsgHasMsg, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &cs.bActExecWhenPrevSusp, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeretrycount", 0, eCmdHdlrInt, NULL, &cs.glbliActionResumeRetryCount, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); + + initConfigVariables(); /* first-time init of config setings */ + +finalize_it: + RETiRet; +} + +/* vi:set ai: + */ |