diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 731 |
1 files changed, 477 insertions, 254 deletions
@@ -8,7 +8,7 @@ * the right code in question): For performance reasons, this module * uses different methods of message submission based on the user-selected * configuration. This code is similar, but can not be abstracted because - * of the performanse-affecting differences in it. As such, it is often + * of the performance-affecting differences in it. As such, it is often * necessary to triple-check that everything works well in *all* modes. * The different modes (and calling sequence) are: * @@ -98,20 +98,22 @@ #include <strings.h> #include <time.h> #include <errno.h> +#include <json/json.h> #include "dirty.h" #include "template.h" #include "action.h" #include "modules.h" -#include "sync.h" #include "cfsysline.h" #include "srUtils.h" #include "errmsg.h" #include "batch.h" #include "wti.h" +#include "rsconf.h" #include "datetime.h" #include "unicode-helper.h" #include "atomic.h" +#include "ruleset.h" #include "statsobj.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ @@ -129,40 +131,46 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(errmsg) DEFobjCurrIf(statsobj) - -static int iActExecOnceInterval = 0; /* execute action once every nn seconds */ -static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */ -static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */ -static int glbliActionResumeInterval = 30; -int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */ -static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragment in it */ - -static int bActionWriteAllMarkMsgs = RSFALSE; /* should all mark messages be unconditionally written? */ -static uchar *pszActionName; /* short name for the action */ -/* action queue and its configuration parameters */ -static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ -static int iActionQueueSize = 1000; /* size of the main message queue above */ -static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */ -static int iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ -static int iActionQLightDlyMrk = -1; /* light delay mark for disk-assisted queues */ -static int iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ -static int iActionQDiscardMark = 9800; /* begin to discard messages */ -static int iActionQDiscardSeverity = 8; /* by default, discard nothing to prevent unintentional loss */ -static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ -static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */ -static int64 iActionQueMaxFileSize = 1024*1024; -static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ -static int bActionQSyncQeueFiles = 0; /* sync queue files */ -static int iActionQtoQShutdown = 0; /* queue shutdown */ -static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ -static int iActionQtoEnq = 2000; /* timeout for queue enque */ -static int iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ -static int iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ -static int bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ -static int64 iActionQueMaxDiskSpace = 0; /* max disk space allocated 0 ==> unlimited */ -static int iActionQueueDeqSlowdown = 0; /* dequeue slowdown (simple rate limiting) */ -static int iActionQueueDeqtWinFromHr = 0; /* hour begin of time frame when queue is to be dequeued */ -static int iActionQueueDeqtWinToHr = 25; /* hour begin of time frame when queue is to be dequeued */ +DEFobjCurrIf(ruleset) + + +typedef struct configSettings_s { + int bActExecWhenPrevSusp; /* execute action only when previous one was suspended? */ + int bActionWriteAllMarkMsgs; /* should all mark messages be unconditionally written? */ + int iActExecOnceInterval; /* execute action once every nn seconds */ + int iActExecEveryNthOccur; /* execute action every n-th occurence (0,1=always) */ + time_t iActExecEveryNthOccurTO; /* timeout for n-occurence setting (in seconds, 0=never) */ + int glbliActionResumeInterval; + int glbliActionResumeRetryCount; /* how often should suspended actions be retried? */ + int bActionRepMsgHasMsg; /* last messsage repeated... has msg fragment in it */ + uchar *pszActionName; /* short name for the action */ + /* action queue and its configuration parameters */ + queueType_t ActionQueType; /* type of the main message queue above */ + int iActionQueueSize; /* size of the main message queue above */ + int iActionQueueDeqBatchSize; /* batch size for action queues */ + int iActionQHighWtrMark; /* high water mark for disk-assisted queues */ + int iActionQLowWtrMark; /* low water mark for disk-assisted queues */ + int iActionQDiscardMark; /* begin to discard messages */ + int iActionQDiscardSeverity; /* by default, discard nothing to prevent unintentional loss */ + int iActionQueueNumWorkers; /* number of worker threads for the mm queue above */ + uchar *pszActionQFName; /* prefix for the main message queue file */ + int64 iActionQueMaxFileSize; + int iActionQPersistUpdCnt; /* persist queue info every n updates */ + int bActionQSyncQeueFiles; /* sync queue files */ + int iActionQtoQShutdown; /* queue shutdown */ + int iActionQtoActShutdown; /* action shutdown (in phase 2) */ + int iActionQtoEnq; /* timeout for queue enque */ + int iActionQtoWrkShutdown; /* timeout for worker thread shutdown */ + int iActionQWrkMinMsgs; /* minimum messages per worker needed to start a new one */ + int bActionQSaveOnShutdown; /* save queue on shutdown (when DA enabled)? */ + int64 iActionQueMaxDiskSpace; /* max disk space allocated 0 ==> unlimited */ + int iActionQueueDeqSlowdown; /* dequeue slowdown (simple rate limiting) */ + int iActionQueueDeqtWinFromHr; /* hour begin of time frame when queue is to be dequeued */ + int iActionQueueDeqtWinToHr; /* hour begin of time frame when queue is to be dequeued */ +} configSettings_t; + +configSettings_t cs; /* our current config settings */ +configSettings_t cs_save; /* our saved (scope!) config settings */ /* the counter below counts actions created. It is used to obtain unique IDs for the action. They * should not be relied on for any long-term activity (e.g. disk queue names!), but they are nice @@ -172,6 +180,25 @@ static int iActionQueueDeqtWinToHr = 25; /* hour begin of time frame when queu */ static int iActionNbr = 0; +/* tables for interfacing with the v6 config system */ +static struct cnfparamdescr cnfparamdescr[] = { + { "name", eCmdHdlrGetWord, 0 }, /* legacy: actionname */ + { "type", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: actionname */ + { "action.writeallmarkmessages", eCmdHdlrBinary, 0 }, /* legacy: actionwriteallmarkmessages */ + { "action.execonlyeverynthtime", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtime */ + { "action.execonlyeverynthtimetimeout", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyeverynthtimetimeout */ + { "action.execonlyonceeveryinterval", eCmdHdlrInt, 0 }, /* legacy: actionexeconlyonceeveryinterval */ + { "action.execonlywhenpreviousissuspended", eCmdHdlrInt, 0 }, /* legacy: actionexeconlywhenpreviousissuspended */ + { "action.repeatedmsgcontainsoriginalmsg", eCmdHdlrBinary, 0 }, /* legacy: repeatedmsgcontainsoriginalmsg */ + { "action.resumeretrycount", eCmdHdlrInt, 0 }, /* legacy: actionresumeretrycount */ + { "action.resumeinterval", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk pblk = + { CNFPARAMBLK_VERSION, + sizeof(cnfparamdescr)/sizeof(struct cnfparamdescr), + cnfparamdescr + }; + /* ------------------------------ methods ------------------------------ */ /* This function returns the "current" time for this action. Current time @@ -223,33 +250,32 @@ actionResetQueueParams(void) { DEFiRet; - ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ - iActionQueueSize = 1000; /* size of the main message queue above */ - iActionQueueDeqBatchSize = 16; /* default batch size */ - iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ - iActionQLightDlyMrk = -1; - iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ - iActionQDiscardMark = 9800; /* begin to discard messages */ - iActionQDiscardSeverity = 8; /* discard warning and above */ - iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ - iActionQueMaxFileSize = 1024*1024; - iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ - bActionQSyncQeueFiles = 0; - iActionQtoQShutdown = 0; /* queue shutdown */ - iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ - iActionQtoEnq = 2000; /* timeout for queue enque */ - iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ - iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ - bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ - iActionQueMaxDiskSpace = 0; - iActionQueueDeqSlowdown = 0; - iActionQueueDeqtWinFromHr = 0; - iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */ - - glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */ - - d_free(pszActionQFName); - pszActionQFName = NULL; /* prefix for the main message queue file */ + cs.ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ + cs.iActionQueueSize = 1000; /* size of the main message queue above */ + cs.iActionQueueDeqBatchSize = 16; /* default batch size */ + cs.iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ + cs.iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ + cs.iActionQDiscardMark = 9800; /* begin to discard messages */ + cs.iActionQDiscardSeverity = 8; /* discard warning and above */ + cs.iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */ + cs.iActionQueMaxFileSize = 1024*1024; + cs.iActionQPersistUpdCnt = 0; /* persist queue info every n updates */ + cs.bActionQSyncQeueFiles = 0; + cs.iActionQtoQShutdown = 0; /* queue shutdown */ + cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ + cs.iActionQtoEnq = 50; /* timeout for queue enque */ + cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ + cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ + cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ + cs.iActionQueMaxDiskSpace = 0; + cs.iActionQueueDeqSlowdown = 0; + cs.iActionQueueDeqtWinFromHr = 0; + cs.iActionQueueDeqtWinToHr = 25; /* 25 disables time windowed dequeuing */ + + cs.glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */ + + d_free(cs.pszActionQFName); + cs.pszActionQFName = NULL; /* prefix for the main message queue file */ RETiRet; } @@ -263,6 +289,11 @@ rsRetVal actionDestruct(action_t *pThis) DEFiRet; ASSERT(pThis != NULL); + if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) { + /* discard actions will be optimized out */ + FINALIZE; + } + if(pThis->pQueue != NULL) { qqueueDestruct(&pThis->pQueue); } @@ -279,19 +310,21 @@ rsRetVal actionDestruct(action_t *pThis) if(pThis->f_pMsg != NULL) msgDestruct(&pThis->f_pMsg); - SYNC_OBJ_TOOL_EXIT(pThis); + pthread_mutex_destroy(&pThis->mutAction); pthread_mutex_destroy(&pThis->mutActExec); d_free(pThis->pszName); d_free(pThis->ppTpl); +finalize_it: d_free(pThis); - RETiRet; } /* create a new action descriptor object * rgerhards, 2007-08-01 + * Note that it is vital to set proper initial values as the v6 config + * system depends on these! */ rsRetVal actionConstruct(action_t **ppThis) { @@ -301,12 +334,19 @@ rsRetVal actionConstruct(action_t **ppThis) ASSERT(ppThis != NULL); CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t))); - pThis->iResumeInterval = glbliActionResumeInterval; - pThis->iResumeRetryCount = glbliActionResumeRetryCount; + pThis->iResumeInterval = 30; + pThis->iResumeRetryCount = 0; + pThis->pszName = NULL; + pThis->bWriteAllMarkMsgs = RSFALSE; + pThis->iExecEveryNthOccur = 0; + pThis->iExecEveryNthOccurTO = 0; + pThis->iSecsExecOnceInterval = 0; + pThis->bExecWhenPrevSusp = 0; + pThis->bRepMsgHasMsg = 0; pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pthread_mutex_init(&pThis->mutActExec, NULL); + pthread_mutex_init(&pThis->mutAction, NULL); INIT_ATOMIC_HELPER_MUT(pThis->mutCAS); - SYNC_OBJ_TOOL_INIT(pThis); /* indicate we have a new action */ ++iActionNbr; @@ -320,19 +360,23 @@ finalize_it: /* action construction finalizer */ rsRetVal -actionConstructFinalize(action_t *pThis) +actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) { DEFiRet; uchar pszAName[64]; /* friendly name of our action */ ASSERT(pThis != NULL); + if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) { + /* discard actions will be optimized out */ + FINALIZE; + } /* generate a friendly name for us action stats */ if(pThis->pszName == NULL) { snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr); } else { ustrncpy(pszAName, pThis->pszName, sizeof(pszAName)); - pszAName[63] = '\0'; /* to be on the save side */ + pszAName[sizeof(pszAName)-1] = '\0'; /* to be on the save side */ } /* support statistics gathering */ @@ -385,66 +429,59 @@ actionConstructFinalize(action_t *pThis) pThis->submitToActQ = doSubmitToActionQBatch; } - /* we need to make a safety check: if the queue is NOT in direct mode, a single - * message object may be accessed by multiple threads. As such, we need to enable - * msg object thread safety in this case (this costs a bit performance and thus - * is not enabled by default. -- rgerhards, 2008-02-20 - */ - if(ActionQueType != QUEUETYPE_DIRECT) - MsgEnableThreadSafety(); - /* create queue */ /* action queues always (for now) have just one worker. This may change when * we begin to implement an interface the enable output modules to request * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ - CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, + CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszAName); - - /* ... set some properties ... */ -# define setQPROP(func, directive, data) \ - CHKiRet_Hdlr(func(pThis->pQueue, data)) { \ - errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ - } -# define setQPROPstr(func, directive, data) \ - CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \ - errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \ - } - qqueueSetpUsr(pThis->pQueue, pThis); - setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); - setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", iActionQueueDeqBatchSize); - setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); - setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); - setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); - setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", bActionQSyncQeueFiles); - setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown ); - setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown); - setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown); - setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq); - setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark); - if(iActionQLightDlyMrk > 0) { - setQPROP(qqueueSetiLightDlyMrk, "$ActionQueueLightDelayMark", iActionQLightDlyMrk); + + if(queueParams == NULL) { /* use legacy params? */ + /* ... set some properties ... */ +# define setQPROP(func, directive, data) \ + CHKiRet_Hdlr(func(pThis->pQueue, data)) { \ + errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", \ + error %d. Ignored, running with default setting", iRet); \ + } +# define setQPROPstr(func, directive, data) \ + CHKiRet_Hdlr(func(pThis->pQueue, data, (data == NULL)? 0 : strlen((char*) data))) { \ + errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", \ + error %d. Ignored, running with default setting", iRet); \ + } + setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", cs.iActionQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", cs.iActionQueueDeqBatchSize); + setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", cs.iActionQueMaxFileSize); + setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", cs.pszActionQFName); + setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", cs.iActionQPersistUpdCnt); + setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", cs.bActionQSyncQeueFiles); + setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", cs.iActionQtoQShutdown ); + setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", cs.iActionQtoActShutdown); + setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", cs.iActionQtoWrkShutdown); + setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", cs.iActionQtoEnq); + setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", cs.iActionQHighWtrMark); + setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", cs.iActionQLowWtrMark); + setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", cs.iActionQDiscardMark); + setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", cs.iActionQDiscardSeverity); + setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", cs.iActionQWrkMinMsgs); + setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", cs.bActionQSaveOnShutdown); + setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", cs.iActionQueueDeqSlowdown); + setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", cs.iActionQueueDeqtWinFromHr); + setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", cs.iActionQueueDeqtWinToHr); + } else { + /* we have v6-style config params */ + qqueueSetDefaultsActionQueue(pThis->pQueue); + qqueueApplyCnfParam(pThis->pQueue, queueParams); } - setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark); - setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark); - setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity); - setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs); - setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown); - setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown); - setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr); - setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr); # undef setQPROP # undef setQPROPstr - dbgoprint((obj_t*) pThis->pQueue, "save on shutdown %d, max disk space allowed %lld\n", - bActionQSaveOnShutdown, iActionQueMaxDiskSpace); - + qqueueDbgPrint(pThis->pQueue); - CHKiRet(qqueueStart(pThis->pQueue)); DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue); /* and now reset the queue params (see comment in its function header!) */ @@ -460,7 +497,7 @@ finalize_it: */ rsRetVal actionSetGlobalResumeInterval(int iNewVal) { - glbliActionResumeInterval = iNewVal; + cs.glbliActionResumeInterval = iNewVal; return RS_RET_OK; } @@ -614,7 +651,7 @@ actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate) iRetries = 0; while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) { iRet = pThis->pMod->tryResume(pThis->pModData); - if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) { + if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) { bTreatOKasSusp = 1; pThis->iResumeOKinRow = 0; } else { @@ -735,7 +772,8 @@ rsRetVal actionDbgPrint(action_t *pThis) dbgprintf("%s: ", module.GetStateName(pThis->pMod)); pThis->pMod->dbgPrintInstInfo(pThis->pModData); - dbgprintf("\n\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); + dbgprintf("\n"); + dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); if(pThis->eState == ACT_STATE_SUSP) { @@ -767,6 +805,7 @@ static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem) { int i; msg_t *pMsg; + struct json_object *json; DEFiRet; ASSERT(pAction != NULL); @@ -787,6 +826,10 @@ static rsRetVal prepareDoActionParams(action_t *pAction, batch_obj_t *pElem) case ACT_MSG_PASSING: pElem->staticActParams[i] = (void*) pMsg; break; + case ACT_JSON_PASSING: + CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json)); + pElem->staticActParams[i] = (void*) json; + break; default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", (int) pAction->eParamPassing); assert(0); /* software bug if this happens! */ @@ -819,7 +862,7 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { pElem = &(pBatch->pElem[i]); - if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { + if(batchIsValidElem(pBatch, i)) { switch(pAction->eParamPassing) { case ACT_ARRAY_PASSING: ppMsgs = (uchar***) pElem->staticActParams; @@ -849,6 +892,13 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) ((uchar**)pElem->staticActParams)[j] = NULL; } break; + case ACT_JSON_PASSING: + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + json_object_put((struct json_object*) + pElem->staticActParams[j]); + pElem->staticActParams[j] = NULL; + } + break; } } } @@ -1010,8 +1060,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) /* NOTE: do NOT extend the filter below! Anything else must be done on the * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 */ - if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC) { + if(batchIsValidElem(pBatch, i)) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, pBatch->pbShutdownImmediate); @@ -1058,16 +1107,6 @@ finalize_it: RETiRet; } -/* debug aid */ -static void displayBatchState(batch_t *pBatch) -{ - int i; - for(i = 0 ; i < pBatch->nElem ; ++i) { - dbgprintf("XXXXX: displayBatchState2 %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); - } -} - - /* submit a batch for actual action processing. * The first nElem elements are processed. This function calls itself * recursively if it needs to handle errors. @@ -1142,6 +1181,29 @@ finalize_it: } +/* copy "active" array of batch, as we need to modify it. The caller + * must make sure the new array is freed and the orginal batch + * pointer is restored (thus the caller must save it). If active + * is currently NULL, this is properly handled. + * Note: the batches active pointer is modified, so it must be + * saved BEFORE calling this function! + * rgerhards, 2012-09-12 + */ +static rsRetVal +copyActive(batch_t *pBatch) +{ + sbool *active; + DEFiRet; + + CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); + if(pBatch->active == NULL) + memset(active, 1, batchNumMsgs(pBatch)); + else + memcpy(active, pBatch->active, batchNumMsgs(pBatch)); + pBatch->active = active; +finalize_it: + RETiRet; +} /* The following function prepares a batch for processing, that it is * reinitializes batch states, generates strings and does everything else @@ -1152,7 +1214,7 @@ finalize_it: * rgerhards, 2010-06-14 */ static inline rsRetVal -prepareBatch(action_t *pAction, batch_t *pBatch) +prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr) { int i; batch_obj_t *pElem; @@ -1161,10 +1223,16 @@ prepareBatch(action_t *pAction, batch_t *pBatch) pBatch->iDoneUpTo = 0; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { pElem = &(pBatch->pElem[i]); - if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) { + if(batchIsValidElem(pBatch, i)) { pElem->state = BATCH_STATE_RDY; - if(prepareDoActionParams(pAction, pElem) != RS_RET_OK) - pElem->bFilterOK = RSFALSE; + if(prepareDoActionParams(pAction, pElem) != RS_RET_OK) { + /* make sure we have our copy of "active" array */ + if(!*bMustRestoreActivePtr) { + *activeSave = pBatch->active; + copyActive(pBatch); + } + pBatch->active[i] = RSFALSE; + } } } RETiRet; @@ -1197,6 +1265,8 @@ static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { int *pbShutdownImmdtSave; + sbool *activeSave; + int bMustRestoreActivePtr = 0; rsRetVal localRet; DEFiRet; @@ -1204,7 +1274,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) pbShutdownImmdtSave = pBatch->pbShutdownImmediate; pBatch->pbShutdownImmediate = pbShutdownImmediate; - CHKiRet(prepareBatch(pAction, pBatch)); + CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); /* We now must guard the output module against execution by multiple threads. The * plugin interface specifies that output modules must not be thread-safe (except @@ -1227,6 +1297,11 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) if(iRet == RS_RET_OK) iRet = localRet; + + if(bMustRestoreActivePtr) { + free(pBatch->active); + pBatch->active = activeSave; + } finalize_it: pBatch->pbShutdownImmediate = pbShutdownImmdtSave; @@ -1273,16 +1348,16 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT DEFiRet; if (!strcasecmp((char *) pszType, "fixedarray")) { - ActionQueType = QUEUETYPE_FIXED_ARRAY; + cs.ActionQueType = QUEUETYPE_FIXED_ARRAY; DBGPRINTF("action queue type set to FIXED_ARRAY\n"); } else if (!strcasecmp((char *) pszType, "linkedlist")) { - ActionQueType = QUEUETYPE_LINKEDLIST; + cs.ActionQueType = QUEUETYPE_LINKEDLIST; DBGPRINTF("action queue type set to LINKEDLIST\n"); } else if (!strcasecmp((char *) pszType, "disk")) { - ActionQueType = QUEUETYPE_DISK; + cs.ActionQueType = QUEUETYPE_DISK; DBGPRINTF("action queue type set to DISK\n"); } else if (!strcasecmp((char *) pszType, "direct")) { - ActionQueType = QUEUETYPE_DIRECT; + cs.ActionQueType = QUEUETYPE_DIRECT; DBGPRINTF("action queue type set to DIRECT (no queueing at all)\n"); } else { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "unknown actionqueue parameter: %s", (char *) pszType); @@ -1305,12 +1380,18 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) { DEFiRet; + if(pAction->eState == ACT_STATE_DIED) { + DBGPRINTF("action %p died, do NOT execute\n", pAction); + FINALIZE; + } + STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); else iRet = qqueueEnqObj(pAction->pQueue, eFLOWCTL_NO_DELAY, (void*) MsgAddRef(pMsg)); +finalize_it: RETiRet; } @@ -1511,6 +1592,43 @@ finalize_it: } +/* helper to activateActions, it activates a specific action. + */ +DEFFUNC_llExecFunc(doActivateActions) +{ + rsRetVal localRet; + action_t *pThis = (action_t*) pData; + BEGINfunc + localRet = qqueueStart(pThis->pQueue); + if(localRet != RS_RET_OK) { + errmsg.LogError(0, localRet, "error starting up action queue"); + if(localRet == RS_RET_FILE_PREFIX_MISSING) { + errmsg.LogError(0, localRet, "file prefix (work directory?) " + "is missing"); + } + actionDisable(pThis); + } + DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod), + pThis, pThis->pQueue); + ENDfunc + return RS_RET_OK; /* we ignore errors, we can not do anything either way */ +} + + +/* This function "activates" the action after privileges have been dropped. Currently, + * this means that the queues are started. + * rgerhards, 2011-05-02 + */ +rsRetVal +activateActions(void) +{ + DEFiRet; + iRet = ruleset.IterateAllActions(ourConf, doActivateActions, NULL); + RETiRet; +} + + + /* This submits the message to the action queue in case where we need to handle * bWriteAllMarkMessage == RSFALSE only. Note that we use a non-blocking CAS loop * for the synchronization. Here, we just modify the filter condition to be false when @@ -1525,22 +1643,15 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) time_t now = 0; time_t lastAct; int i; - int bModifiedFilter; - sbool FilterSave[1024]; - sbool *pFilterSave; + sbool *activeSave; DEFiRet; - if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) { - pFilterSave = FilterSave; - } else { - CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - } + activeSave = pBatch->active; + copyActive(pBatch); - bModifiedFilter = 0; for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(!pBatch->pElem[i].bFilterOK) + if((pBatch->pElem[i].state == BATCH_STATE_DISC) || !pBatch->active[i]) continue; - pFilterSave[i] = pBatch->pElem[i].bFilterOK; if(now == 0) { now = datetime.GetTime(NULL); /* good time call - the only one done */ } @@ -1551,15 +1662,15 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) lastAct = pAction->f_time; if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) { if((now - lastAct) < MarkInterval / 2) { - pBatch->pElem[i].bFilterOK = 0; - bModifiedFilter = 1; - DBGPRINTF("action was recently called, ignoring mark message\n"); + pBatch->active[i] = 0; + DBGPRINTF("batch item %d: action was recently called, ignoring " + "mark message\n", i); break; /* do not update timestamp for non-written mark messages */ } } } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0); - if(pBatch->pElem[i].bFilterOK) { + if(pBatch->active[i]) { DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", i, module.GetStateName(pAction->pMod)); } @@ -1567,17 +1678,8 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) iRet = doSubmitToActionQBatch(pAction, pBatch); - if(bModifiedFilter) { - /* in this case, we need to restore previous state */ - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - /* note: clang static code analyzer reports a false positive below */ - pBatch->pElem[i].bFilterOK = pFilterSave[i]; - } - } - -finalize_it: - if(pFilterSave != FilterSave) - free(pFilterSave); + free(pBatch->active); + pBatch->active = activeSave; RETiRet; } @@ -1587,8 +1689,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch) { int i; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC) { + if( batchIsValidElem(pBatch, i)) { STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); } } @@ -1602,18 +1703,13 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch) static inline rsRetVal doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) { - sbool FilterSave[1024]; - sbool *pFilterSave; sbool bNeedSubmit; - sbool bModifiedFilter; + sbool *activeSave; int i; DEFiRet; - if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) { - pFilterSave = FilterSave; - } else { - CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool))); - } + activeSave = pBatch->active; + copyActive(pBatch); /* note: for direct mode, we need to adjust the filter property. For non-direct * this is not necessary, because in that case we enqueue only what actually needs @@ -1621,37 +1717,25 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) */ if(pAction->bExecWhenPrevSusp) { bNeedSubmit = 0; - bModifiedFilter = 0; for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pFilterSave[i] = pBatch->pElem[i].bFilterOK; if(!pBatch->pElem[i].bPrevWasSuspended) { - DBGPRINTF("action enq stage: change bFilterOK to 0 due to " + DBGPRINTF("action enq stage: change active to 0 due to " "failover case in elem %d\n", i); - pBatch->pElem[i].bFilterOK = 0; - bModifiedFilter = 1; + pBatch->active[i] = 0; } - if(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC) { + if(batchIsValidElem(pBatch, i)) { STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); bNeedSubmit = 1; } - DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, + DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, i, batchIsValidElem(pBatch, i), pBatch->pElem[i].state, pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); } if(bNeedSubmit) { /* note: stats were already computed above */ iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } else { - DBGPRINTF("no need to submit batch, all bFilterOK==0 or discarded\n"); - } - if(bModifiedFilter) { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, - pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - /* note: clang static code analyzer reports a false positive below */ - pBatch->pElem[i].bFilterOK = pFilterSave[i]; - } + DBGPRINTF("no need to submit batch, all invalid\n"); } } else { if(GatherStats) @@ -1659,7 +1743,8 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch) iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch); } -finalize_it: + free(pBatch->active); + pBatch->active = activeSave; RETiRet; } @@ -1682,11 +1767,10 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) * TODO: optimize this, we may do at least a multi-submit! */ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, + DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, batchIsValidElem(pBatch, i), pBatch->pElem[i].state, pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC + if( batchIsValidElem(pBatch, i) && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); } @@ -1711,11 +1795,10 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) DBGPRINTF("Called action %p (complex case), logging to %s\n", pAction, module.GetStateName(pAction->pMod)); for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", - pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state, + DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n", + pAction, batchIsValidElem(pBatch, i), pBatch->pElem[i].state, pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); - if( pBatch->pElem[i].bFilterOK - && pBatch->pElem[i].state != BATCH_STATE_DISC + if( batchIsValidElem(pBatch, i) && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) { doActionCallAction(pAction, pBatch, i); } @@ -1733,23 +1816,68 @@ doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch) { DEFiRet; - LockObj(pAction); - pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); + d_pthread_mutex_lock(&pAction->mutAction); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction); iRet = helperSubmitToActionQComplexBatch(pAction, pBatch); - UnlockObj(pAction); + d_pthread_mutex_unlock(&pAction->mutAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" + +/* apply all params from param block to action. This supports the v6 config system. + * Defaults must have been set appropriately during action construct! + * rgerhards, 2011-08-01 + */ +static rsRetVal +actionApplyCnfParam(action_t *pAction, struct cnfparamvals *pvals) +{ + int i; + + for(i = 0 ; i < pblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(pblk.descr[i].name, "name")) { + pAction->pszName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(pblk.descr[i].name, "type")) { + continue; /* this is handled seperately during module select! */ + } else if(!strcmp(pblk.descr[i].name, "action.writeallmarkmessages")) { + pAction->bWriteAllMarkMsgs = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlyeverynthtime")) { + pAction->iExecEveryNthOccur = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlyeverynthtimetimeout")) { + pAction->iExecEveryNthOccurTO = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlyonceeveryinterval")) { + pAction->iSecsExecOnceInterval = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.execonlywhenpreviousissuspended")) { + pAction->bExecWhenPrevSusp = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.repeatedmsgcontainsoriginalmsg")) { + pAction->bRepMsgHasMsg = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.resumeretrycount")) { + pAction->iResumeRetryCount = pvals[i].val.d.n; + } else if(!strcmp(pblk.descr[i].name, "action.resumeinterval")) { + pAction->iResumeInterval = pvals[i].val.d.n; + } else { + dbgprintf("action: program error, non-handled " + "param '%s'\n", pblk.descr[i].name); + } + } + return RS_RET_OK; +} + + + /* add an Action to the current selector * The pOMSR is freed, as it is not needed after this function. * Note: this function pulls global data that specifies action config state. * rgerhards, 2007-07-27 */ rsRetVal -addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringRequest_t *pOMSR, int bSuspended) +addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, + omodStringRequest_t *pOMSR, struct cnfparamvals *actParams, + struct cnfparamvals *queueParams, int bSuspended) { DEFiRet; int i; @@ -1761,22 +1889,28 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques assert(ppAction != NULL); assert(pMod != NULL); assert(pOMSR != NULL); - DBGPRINTF("Module %s processed this config line.\n", module.GetName(pMod)); + DBGPRINTF("Module %s processes this action.\n", module.GetName(pMod)); CHKiRet(actionConstruct(&pAction)); /* create action object first */ pAction->pMod = pMod; pAction->pModData = pModData; - pAction->pszName = pszActionName; - pszActionName = NULL; /* free again! */ - pAction->bWriteAllMarkMsgs = bActionWriteAllMarkMsgs; - bActionWriteAllMarkMsgs = RSFALSE; /* reset */ - pAction->bExecWhenPrevSusp = bActExecWhenPrevSusp; - pAction->iSecsExecOnceInterval = iActExecOnceInterval; - pAction->iExecEveryNthOccur = iActExecEveryNthOccur; - pAction->iExecEveryNthOccurTO = iActExecEveryNthOccurTO; - pAction->bRepMsgHasMsg = bActionRepMsgHasMsg; - iActExecEveryNthOccur = 0; /* auto-reset */ - iActExecEveryNthOccurTO = 0; /* auto-reset */ + if(actParams == NULL) { /* use legacy systemn */ + pAction->pszName = cs.pszActionName; + pAction->iResumeInterval = cs.glbliActionResumeInterval; + pAction->iResumeRetryCount = cs.glbliActionResumeRetryCount; + pAction->bWriteAllMarkMsgs = cs.bActionWriteAllMarkMsgs; + pAction->bExecWhenPrevSusp = cs.bActExecWhenPrevSusp; + pAction->iSecsExecOnceInterval = cs.iActExecOnceInterval; + pAction->iExecEveryNthOccur = cs.iActExecEveryNthOccur; + pAction->iExecEveryNthOccurTO = cs.iActExecEveryNthOccurTO; + pAction->bRepMsgHasMsg = cs.bActionRepMsgHasMsg; + cs.iActExecEveryNthOccur = 0; /* auto-reset */ + cs.iActExecEveryNthOccurTO = 0; /* auto-reset */ + cs.bActionWriteAllMarkMsgs = RSFALSE; /* auto-reset */ + cs.pszActionName = NULL; /* free again! */ + } else { + actionApplyCnfParam(pAction, actParams); + } /* check if we can obtain the template pointers - TODO: move to separate function? */ pAction->iNumTpls = OMSRgetEntryCount(pOMSR); @@ -1795,7 +1929,9 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques /* Ok, we got everything, so it now is time to look up the template * (Hint: templates MUST be defined before they are used!) */ - if((pAction->ppTpl[i] = tplFind((char*)pTplName, strlen((char*)pTplName))) == NULL) { + if( !(iTplOpts & OMSR_TPL_AS_MSG) + && (pAction->ppTpl[i] = + tplFind(ourConf, (char*)pTplName, strlen((char*)pTplName))) == NULL) { snprintf(errMsg, sizeof(errMsg) / sizeof(char), " Could not find template '%s' - action disabled\n", pTplName); @@ -1817,6 +1953,8 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques pAction->eParamPassing = ACT_ARRAY_PASSING; } else if(iTplOpts & OMSR_TPL_AS_MSG) { pAction->eParamPassing = ACT_MSG_PASSING; + } else if(iTplOpts & OMSR_TPL_AS_JSON) { + pAction->eParamPassing = ACT_JSON_PASSING; } else { pAction->eParamPassing = ACT_STRING_PASSING; } @@ -1827,9 +1965,9 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques pAction->pMod = pMod; pAction->pModData = pModData; /* now check if the module is compatible with select features */ - if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) - pAction->f_ReduceRepeated = bReduceRepeatMsgs; - else { + if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) { + pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs; + } else { DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n"); pAction->f_ReduceRepeated = 0; } @@ -1838,7 +1976,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques if(bSuspended) actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */ - CHKiRet(actionConstructFinalize(pAction)); + CHKiRet(actionConstructFinalize(pAction, queueParams)); /* TODO: if we exit here, we have a memory leak... */ @@ -1864,11 +2002,92 @@ finalize_it: static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) { - iActExecOnceInterval = 0; + cs.iActExecOnceInterval = 0; + cs.bActExecWhenPrevSusp = 0; return RS_RET_OK; } +/* initialize (current) config variables. + * Used at program start and when a new scope is created. + */ +static inline void +initConfigVariables(void) +{ + cs.bActionWriteAllMarkMsgs = RSFALSE; + cs.glbliActionResumeRetryCount = 0; + cs.bActExecWhenPrevSusp = 0; + cs.iActExecOnceInterval = 0; + cs.iActExecEveryNthOccur = 0; + cs.iActExecEveryNthOccurTO = 0; + cs.glbliActionResumeInterval = 30; + cs.glbliActionResumeRetryCount = 0; + cs.bActionRepMsgHasMsg = 0; + if(cs.pszActionName != NULL) { + free(cs.pszActionName); + cs.pszActionName = NULL; + } + actionResetQueueParams(); +} + + +rsRetVal +actionNewInst(struct nvlst *lst, action_t **ppAction) +{ + struct cnfparamvals *paramvals; + struct cnfparamvals *queueParams; + modInfo_t *pMod; + uchar *cnfModName = NULL; + omodStringRequest_t *pOMSR; + void *pModData; + action_t *pAction; + int typeIdx; + DEFiRet; + + paramvals = nvlstGetParams(lst, &pblk, NULL); + if(paramvals == NULL) { + ABORT_FINALIZE(RS_RET_ERR); + } + dbgprintf("action param blk after actionNewInst:\n"); + cnfparamsPrint(&pblk, paramvals); + typeIdx = cnfparamGetIdx(&pblk, "type"); + if(paramvals[typeIdx].bUsed == 0) { + errmsg.LogError(0, RS_RET_CONF_RQRD_PARAM_MISSING, "action type missing"); + ABORT_FINALIZE(RS_RET_CONF_RQRD_PARAM_MISSING); // TODO: move this into rainerscript handlers + } + cnfModName = (uchar*)es_str2cstr(paramvals[cnfparamGetIdx(&pblk, ("type"))].val.d.estr, NULL); + if((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_OUT)) == NULL) { + errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "module name '%s' is unknown", cnfModName); + ABORT_FINALIZE(RS_RET_MOD_UNKNOWN); + } + iRet = pMod->mod.om.newActInst(cnfModName, lst, &pModData, &pOMSR); + // TODO: check if RS_RET_SUSPENDED is still valid in v6! + if(iRet != RS_RET_OK && iRet != RS_RET_SUSPENDED) { + FINALIZE; /* iRet is already set to error state */ + } + + qqueueDoCnfParams(lst, &queueParams); + + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, paramvals, queueParams, + (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + /* now check if the module is compatible with select features */ + if(pMod->isCompatibleWithFeature(sFEATURERepeatedMsgReduction) == RS_RET_OK) + pAction->f_ReduceRepeated = loadConf->globals.bReduceRepeatMsgs; + else { + DBGPRINTF("module is incompatible with RepeatedMsgReduction - turned off\n"); + pAction->f_ReduceRepeated = 0; + } + pAction->eState = ACT_STATE_RDY; /* action is enabled */ + loadConf->actions.nbrActions++; /* one more active action! */ + } + *ppAction = pAction; + +finalize_it: + free(cnfModName); + cnfparamvalsDestruct(paramvals, &pblk); + RETiRet; +} + /* TODO: we are not yet a real object, the ClassInit here just looks like it is.. */ rsRetVal actionClassInit(void) @@ -1880,38 +2099,42 @@ rsRetVal actionClassInit(void) CHKiRet(objUse(module, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); - - CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &bActionWriteAllMarkMsgs, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelightdelaymark", 0, eCmdHdlrInt, NULL, &iActionQLightDlyMrk, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); + + CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &cs.pszActionName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &cs.pszActionQFName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &cs.bActionWriteAllMarkMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqBatchSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxDiskSpace, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQHighWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &cs.iActionQLowWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &cs.iActionQDiscardSeverity, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &cs.iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &cs.bActionQSyncQeueFiles, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &iActExecOnceInterval, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &cs.iActionQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoQShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &cs.iActionQtoActShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &cs.iActionQtoEnq, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &cs.iActionQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &cs.iActionQWrkMinMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &cs.iActionQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &cs.bActionQSaveOnShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqSlowdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinFromHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &cs.iActionQueueDeqtWinToHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &cs.iActExecEveryNthOccur, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &cs.iActExecEveryNthOccurTO, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &cs.iActExecOnceInterval, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &cs.bActionRepMsgHasMsg, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlywhenpreviousissuspended", 0, eCmdHdlrBinary, NULL, &cs.bActExecWhenPrevSusp, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionresumeretrycount", 0, eCmdHdlrInt, NULL, &cs.glbliActionResumeRetryCount, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); + initConfigVariables(); /* first-time init of config setings */ + finalize_it: RETiRet; } |