summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c900
1 files changed, 709 insertions, 191 deletions
diff --git a/action.c b/action.c
index 5c5bdbe9..352f7f9c 100644
--- a/action.c
+++ b/action.c
@@ -4,7 +4,7 @@
*
* File begun on 2007-08-06 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -42,10 +42,14 @@
#include "cfsysline.h"
#include "srUtils.h"
#include "errmsg.h"
+#include "batch.h"
+#include "wti.h"
#include "datetime.h"
+#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
+
/* forward definitions */
-rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg);
+static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -58,10 +62,13 @@ static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,
static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */
static int glbliActionResumeInterval = 30;
int glbliActionResumeRetryCount = 0; /* how often should suspended actions be retried? */
+static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragment in it */
+static uchar *pszActionName; /* short name for the action */
/* main message queue and its configuration parameters */
static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
static int iActionQueueSize = 1000; /* size of the main message queue above */
+static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */
static int iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
static int iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
static int iActionQDiscardMark = 9800; /* begin to discard messages */
@@ -70,6 +77,7 @@ static int iActionQueueNumWorkers = 1; /* number of worker threads for the mm
static uchar *pszActionQFName = NULL; /* prefix for the main message queue file */
static int64 iActionQueMaxFileSize = 1024*1024;
static int iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+static int bActionQSyncQeueFiles = 0; /* sync queue files */
static int iActionQtoQShutdown = 0; /* queue shutdown */
static int iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
static int iActionQtoEnq = 2000; /* timeout for queue enque */
@@ -142,6 +150,7 @@ actionResetQueueParams(void)
ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */
iActionQueueSize = 1000; /* size of the main message queue above */
+ iActionQueueDeqBatchSize = 16; /* default batch size */
iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */
iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */
iActionQDiscardMark = 9800; /* begin to discard messages */
@@ -149,6 +158,7 @@ actionResetQueueParams(void)
iActionQueueNumWorkers = 1; /* number of worker threads for the mm queue above */
iActionQueMaxFileSize = 1024*1024;
iActionQPersistUpdCnt = 0; /* persist queue info every n updates */
+ bActionQSyncQeueFiles = 0;
iActionQtoQShutdown = 0; /* queue shutdown */
iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */
iActionQtoEnq = 2000; /* timeout for queue enque */
@@ -162,8 +172,7 @@ actionResetQueueParams(void)
glbliActionResumeRetryCount = 0; /* I guess it is smart to reset this one, too */
- if(pszActionQFName != NULL)
- d_free(pszActionQFName);
+ d_free(pszActionQFName);
pszActionQFName = NULL; /* prefix for the main message queue file */
RETiRet;
@@ -175,11 +184,12 @@ actionResetQueueParams(void)
*/
rsRetVal actionDestruct(action_t *pThis)
{
+ int i;
DEFiRet;
ASSERT(pThis != NULL);
if(pThis->pQueue != NULL) {
- queueDestruct(&pThis->pQueue);
+ qqueueDestruct(&pThis->pQueue);
}
if(pThis->pMod != NULL)
@@ -190,8 +200,35 @@ rsRetVal actionDestruct(action_t *pThis)
SYNC_OBJ_TOOL_EXIT(pThis);
pthread_mutex_destroy(&pThis->mutActExec);
- if(pThis->ppTpl != NULL)
- d_free(pThis->ppTpl);
+ d_free(pThis->pszName);
+ d_free(pThis->ppTpl);
+
+ /* message ptr cleanup */
+ for(i = 0 ; i < pThis->iNumTpls ; ++i) {
+ if(pThis->ppMsgs[i] != NULL) {
+ switch(pThis->eParamPassing) {
+ case ACT_ARRAY_PASSING:
+#if 0 /* later! */
+ iArr = 0;
+ while(((char **)pThis->ppMsgs[i])[iArr] != NULL) {
+ d_free(((char **)pThis->ppMsgs[i])[iArr++]);
+ ((char **)pThis->ppMsgs[i])[iArr++] = NULL;
+ }
+ d_free(pThis->ppMsgs[i]);
+ pThis->ppMsgs[i] = NULL;
+#endif
+ break;
+ case ACT_STRING_PASSING:
+ d_free(pThis->ppMsgs[i]);
+ break;
+ default:
+ assert(0);
+ }
+ }
+ }
+ d_free(pThis->ppMsgs);
+ d_free(pThis->lenMsgs);
+
d_free(pThis);
RETiRet;
@@ -208,10 +245,7 @@ rsRetVal actionConstruct(action_t **ppThis)
ASSERT(ppThis != NULL);
- if((pThis = (action_t*) calloc(1, sizeof(action_t))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
-
+ CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t)));
pThis->iResumeInterval = glbliActionResumeInterval;
pThis->iResumeRetryCount = glbliActionResumeRetryCount;
pThis->tLastOccur = time(NULL); /* done once per action on startup only */
@@ -254,7 +288,8 @@ actionConstructFinalize(action_t *pThis)
* to be run on multiple threads. So far, this is forbidden by the interface
* spec. -- rgerhards, 2008-01-30
*/
- CHKiRet(queueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction));
+ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize,
+ (rsRetVal (*)(void*, batch_t*))processBatchMain));
obj.SetName((obj_t*) pThis->pQueue, pszQName);
/* ... set some properties ... */
@@ -267,24 +302,26 @@ actionConstructFinalize(action_t *pThis)
errmsg.LogError(0, NO_ERRCODE, "Invalid " #directive ", error %d. Ignored, running with default setting", iRet); \
}
- queueSetpUsr(pThis->pQueue, pThis);
- setQPROP(queueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
- setQPROP(queueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
- setQPROPstr(queueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
- setQPROP(queueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
- setQPROP(queueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
- setQPROP(queueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
- setQPROP(queueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
- setQPROP(queueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
- setQPROP(queueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
- setQPROP(queueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
- setQPROP(queueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
- setQPROP(queueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
- setQPROP(queueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
- setQPROP(queueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
- setQPROP(queueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
- setQPROP(queueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
- setQPROP(queueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
+ qqueueSetpUsr(pThis->pQueue, pThis);
+ setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace);
+ setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", iActionQueueDeqBatchSize);
+ setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize);
+ setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName);
+ setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt);
+ setQPROP(qqueueSetbSyncQueueFiles, "$ActionQueueSyncQueueFiles", bActionQSyncQeueFiles);
+ setQPROP(qqueueSettoQShutdown, "$ActionQueueTimeoutShutdown", iActionQtoQShutdown );
+ setQPROP(qqueueSettoActShutdown, "$ActionQueueTimeoutActionCompletion", iActionQtoActShutdown);
+ setQPROP(qqueueSettoWrkShutdown, "$ActionQueueWorkerTimeoutThreadShutdown", iActionQtoWrkShutdown);
+ setQPROP(qqueueSettoEnq, "$ActionQueueTimeoutEnqueue", iActionQtoEnq);
+ setQPROP(qqueueSetiHighWtrMrk, "$ActionQueueHighWaterMark", iActionQHighWtrMark);
+ setQPROP(qqueueSetiLowWtrMrk, "$ActionQueueLowWaterMark", iActionQLowWtrMark);
+ setQPROP(qqueueSetiDiscardMrk, "$ActionQueueDiscardMark", iActionQDiscardMark);
+ setQPROP(qqueueSetiDiscardSeverity, "$ActionQueueDiscardSeverity", iActionQDiscardSeverity);
+ setQPROP(qqueueSetiMinMsgsPerWrkr, "$ActionQueueWorkerThreadMinimumMessages", iActionQWrkMinMsgs);
+ setQPROP(qqueueSetbSaveOnShutdown, "$ActionQueueSaveOnShutdown", bActionQSaveOnShutdown);
+ setQPROP(qqueueSetiDeqSlowdown, "$ActionQueueDequeueSlowdown", iActionQueueDeqSlowdown);
+ setQPROP(qqueueSetiDeqtWinFromHr, "$ActionQueueDequeueTimeBegin", iActionQueueDeqtWinFromHr);
+ setQPROP(qqueueSetiDeqtWinToHr, "$ActionQueueDequeueTimeEnd", iActionQueueDeqtWinToHr);
# undef setQPROP
# undef setQPROPstr
@@ -293,7 +330,7 @@ actionConstructFinalize(action_t *pThis)
bActionQSaveOnShutdown, iActionQueMaxDiskSpace);
- CHKiRet(queueStart(pThis->pQueue));
+ CHKiRet(qqueueStart(pThis->pQueue));
dbgprintf("Action %p: queue %p created\n", pThis, pThis->pQueue);
/* and now reset the queue params (see comment in its function header!) */
@@ -304,81 +341,239 @@ finalize_it:
}
-/* set an action back to active state -- rgerhards, 2007-08-02
+
+/* set the global resume interval
+ */
+rsRetVal actionSetGlobalResumeInterval(int iNewVal)
+{
+ glbliActionResumeInterval = iNewVal;
+ return RS_RET_OK;
+}
+
+
+/* returns the action state name in human-readable form
+ * returned string must not be modified.
+ * rgerhards, 2009-05-07
+ */
+static uchar *getActStateName(action_t *pThis)
+{
+ switch(pThis->eState) {
+ case ACT_STATE_RDY:
+ return (uchar*) "rdy";
+ case ACT_STATE_ITX:
+ return (uchar*) "itx";
+ case ACT_STATE_RTRY:
+ return (uchar*) "rtry";
+ case ACT_STATE_SUSP:
+ return (uchar*) "susp";
+ case ACT_STATE_DIED:
+ return (uchar*) "died";
+ case ACT_STATE_COMM:
+ return (uchar*) "comm";
+ default:
+ return (uchar*) "ERROR/UNKNWON";
+ }
+}
+
+
+/* returns a suitable return code based on action state
+ * rgerhards, 2009-05-07
*/
-static rsRetVal actionResume(action_t *pThis)
+static rsRetVal getReturnCode(action_t *pThis)
{
DEFiRet;
ASSERT(pThis != NULL);
- pThis->bSuspended = 0;
+ switch(pThis->eState) {
+ case ACT_STATE_RDY:
+ iRet = RS_RET_OK;
+ break;
+ case ACT_STATE_ITX:
+ if(pThis->bHadAutoCommit) {
+ pThis->bHadAutoCommit = 0; /* auto-reset */
+ iRet = RS_RET_PREVIOUS_COMMITTED;
+ } else {
+ iRet = RS_RET_DEFER_COMMIT;
+ }
+ break;
+ case ACT_STATE_RTRY:
+ iRet = RS_RET_SUSPENDED;
+ break;
+ case ACT_STATE_SUSP:
+ case ACT_STATE_DIED:
+ iRet = RS_RET_ACTION_FAILED;
+ break;
+ default:
+ DBGPRINTF("Invalid action engine state %d, program error\n",
+ (int) pThis->eState);
+ iRet = RS_RET_ERR;
+ break;
+ }
RETiRet;
}
-/* set the global resume interval
+/* set the action to a new state
+ * rgerhards, 2007-08-02
*/
-rsRetVal actionSetGlobalResumeInterval(int iNewVal)
+static inline void actionSetState(action_t *pThis, action_state_t newState)
{
- glbliActionResumeInterval = iNewVal;
- return RS_RET_OK;
+ pThis->eState = newState;
+ DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis));
+}
+
+/* Handles the transient commit state. So far, this is
+ * mostly a dummy...
+ * rgerhards, 2007-08-02
+ */
+static void actionCommitted(action_t *pThis)
+{
+ actionSetState(pThis, ACT_STATE_RDY);
+}
+
+
+/* set action to "rtry" state.
+ * rgerhards, 2007-08-02
+ */
+static void actionRetry(action_t *pThis)
+{
+ actionSetState(pThis, ACT_STATE_RTRY);
+}
+
+
+/* Disable action, this means it will never again be usable
+ * until rsyslog is reloaded. Use only as a last resort, but
+ * depends on output module.
+ * rgerhards, 2007-08-02
+ */
+static void actionDisable(action_t *pThis)
+{
+ actionSetState(pThis, ACT_STATE_DIED);
+}
+
+
+/* Suspend action, this involves changing the acton state as well
+ * as setting the next retry time.
+ * if we have more than 10 retries, we prolong the
+ * retry interval. If something is really stalled, it will
+ * get re-tried only very, very seldom - but that saves
+ * CPU time. TODO: maybe a config option for that?
+ * rgerhards, 2007-08-02
+ */
+static inline void actionSuspend(action_t *pThis, time_t ttNow)
+{
+ if(ttNow == NO_TIME_PROVIDED)
+ time(&ttNow);
+ pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+ actionSetState(pThis, ACT_STATE_SUSP);
+ DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry);
}
-/* suspend an action -- rgerhards, 2007-08-02
+/* actually do retry processing. Note that the function receives a timestamp so
+ * that we do not need to call the (expensive) time() API.
+ * Note that we do the full retry processing here, doing the configured number of
+ * iterations.
+ * rgerhards, 2009-05-07
*/
-static rsRetVal actionSuspend(action_t *pThis, time_t tNow)
+static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow)
{
+ int iRetries;
+ int iSleepPeriod;
DEFiRet;
ASSERT(pThis != NULL);
- pThis->bSuspended = 1;
- pThis->ttResumeRtry = tNow + pThis->iResumeInterval;
- pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */
+
+ iRetries = 0;
+ while(pThis->eState == ACT_STATE_RTRY) {
+ iRet = pThis->pMod->tryResume(pThis->pModData);
+ if(iRet == RS_RET_OK) {
+ actionSetState(pThis, ACT_STATE_RDY);
+ } else if(iRet == RS_RET_SUSPENDED) {
+ /* max retries reached? */
+ if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
+ actionSuspend(pThis, ttNow);
+ } else {
+ ++pThis->iNbrResRtry;
+ ++iRetries;
+ iSleepPeriod = pThis->iResumeInterval;
+ ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */
+ srSleep(iSleepPeriod, 0);
+ }
+ } else if(iRet == RS_RET_DISABLE_ACTION) {
+ actionDisable(pThis);
+ }
+ }
+
+ if(pThis->eState == ACT_STATE_RDY) {
+ pThis->iNbrResRtry = 0;
+ }
RETiRet;
}
/* try to resume an action -- rgerhards, 2007-08-02
- * returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the
- * action is still suspended.
+ * changed to new action state engine -- rgerhards, 2009-05-07
*/
static rsRetVal actionTryResume(action_t *pThis)
{
DEFiRet;
- time_t ttNow;
+ time_t ttNow = NO_TIME_PROVIDED;
ASSERT(pThis != NULL);
- ttNow = getActNow(pThis); /* cache "now" */
-
- /* first check if it is time for a re-try */
- if(ttNow > pThis->ttResumeRtry) {
- iRet = pThis->pMod->tryResume(pThis->pModData);
- if(iRet == RS_RET_SUSPENDED) {
- /* set new tryResume time */
- ++pThis->iNbrResRtry;
- /* if we have more than 10 retries, we prolong the
- * retry interval. If something is really stalled, it will
- * get re-tried only very, very seldom - but that saves
- * CPU time. TODO: maybe a config option for that?
- * rgerhards, 2007-08-02
- */
- pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
+ if(pThis->eState == ACT_STATE_SUSP) {
+ /* if we are suspended, we need to check if the timeout expired.
+ * for this handling, we must always obtain a fresh timestamp. We used
+ * to use the action timestamp, but in this case we will never reach a
+ * point where a resumption is actually tried, because the action timestamp
+ * is always in the past. So we can not avoid doing a fresh time() call
+ * here. -- rgerhards, 2009-03-18
+ */
+ time(&ttNow); /* cache "now" */
+ if(ttNow > pThis->ttResumeRtry) {
+ actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */
}
- } else {
- /* it's too early, we are still suspended --> indicate this */
- iRet = RS_RET_SUSPENDED;
}
- if(iRet == RS_RET_OK)
- actionResume(pThis);
+ if(pThis->eState == ACT_STATE_RTRY) {
+ if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
+ time(&ttNow);
+ CHKiRet(actionDoRetry(pThis, ttNow));
+ }
- dbgprintf("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n",
- iRet, (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
+ dbgprintf("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n",
+ getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ }
+finalize_it:
+ RETiRet;
+}
+
+
+/* prepare an action for performing work. This involves trying to recover it,
+ * depending on its current state.
+ * rgerhards, 2009-05-07
+ */
+static rsRetVal actionPrepare(action_t *pThis)
+{
+ DEFiRet;
+
+ assert(pThis != NULL);
+ CHKiRet(actionTryResume(pThis));
+
+ /* if we are now ready, we initialize the transaction and advance
+ * action state accordingly
+ */
+ if(pThis->eState == ACT_STATE_RDY) {
+ CHKiRet(pThis->pMod->mod.om.beginTransaction(pThis->pModData));
+ actionSetState(pThis, ACT_STATE_ITX);
+ }
+
+finalize_it:
RETiRet;
}
@@ -395,12 +590,11 @@ rsRetVal actionDbgPrint(action_t *pThis)
dbgprintf("\n\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
- dbgprintf("\tSuspended: %d", pThis->bSuspended);
- if(pThis->bSuspended) {
- dbgprintf(" next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry);
+ if(pThis->eState == ACT_STATE_SUSP) {
+ dbgprintf("\tresume next retry: %u, number retries: %d",
+ (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry);
}
- dbgprintf("\n");
- dbgprintf("\tDisabled: %d\n", !pThis->bEnabled);
+ dbgprintf("\tState: %s\n", getActStateName(pThis));
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
dbgprintf("\n");
@@ -408,33 +602,350 @@ rsRetVal actionDbgPrint(action_t *pThis)
}
+/* prepare the calling parameters for doAction()
+ * rgerhards, 2009-05-07
+ */
+static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg)
+{
+ int i;
+ DEFiRet;
+
+ ASSERT(pAction != NULL);
+
+ /* here we must loop to process all requested strings */
+ for(i = 0 ; i < pAction->iNumTpls ; ++i) {
+ switch(pAction->eParamPassing) {
+ case ACT_STRING_PASSING:
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pAction->ppMsgs[i]), &(pAction->lenMsgs[i])));
+ break;
+ case ACT_ARRAY_PASSING:
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pAction->ppMsgs[i])));
+ break;
+ default:assert(0); /* software bug if this happens! */
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* cleanup doAction calling parameters
+ * rgerhards, 2009-05-07
+ */
+static rsRetVal cleanupDoActionParams(action_t *pAction)
+{
+ int i;
+ int iArr;
+ DEFiRet;
+
+ ASSERT(pAction != NULL);
+ for(i = 0 ; i < pAction->iNumTpls ; ++i) {
+ if(pAction->ppMsgs[i] != NULL) {
+ switch(pAction->eParamPassing) {
+ case ACT_ARRAY_PASSING:
+ iArr = 0;
+ while(((char **)pAction->ppMsgs[i])[iArr] != NULL) {
+ d_free(((char **)pAction->ppMsgs[i])[iArr++]);
+ ((char **)pAction->ppMsgs[i])[iArr++] = NULL;
+ }
+ d_free(pAction->ppMsgs[i]);
+ pAction->ppMsgs[i] = NULL;
+ break;
+ case ACT_STRING_PASSING:
+ break;
+ default:
+ assert(0);
+ }
+ }
+ }
+
+ RETiRet;
+}
+
+
/* call the DoAction output plugin entry point
+ * Performance note: we build the action parameters here in this function. That
+ * means we do it while we hold the action look, potentially reducing concurrency
+ * (especially if the action queue is run in DIRECT mode). As an alternative, we
+ * may generate all params for the batch as whole before aquiring the action. However,
+ * that requires more memory, for large batches potentially a lot of memory. So for the
+ * time being, I am doing it here - the performance hit should be very minor and may even
+ * not be a hit because we may gain CPU cache locality gains with the "fewer memory"
+ * approach (I'd say that is rater likely).
* rgerhards, 2008-01-28
*/
-#pragma GCC diagnostic ignored "-Wempty-body"
rsRetVal
-actionCallDoAction(action_t *pAction, msg_t *pMsg)
+actionCallDoAction(action_t *pThis, msg_t *pMsg)
{
DEFiRet;
- int iRetries;
+
+ ASSERT(pThis != NULL);
+ ISOBJ_TYPE_assert(pMsg, msg);
+
+ DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis));
+ CHKiRet(prepareDoActionParams(pThis, pMsg));
+
+ pThis->bHadAutoCommit = 0;
+ iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData);
+ switch(iRet) {
+ case RS_RET_OK:
+ actionCommitted(pThis);
+ break;
+ case RS_RET_DEFER_COMMIT:
+ /* we are done, action state remains the same */
+ break;
+ case RS_RET_PREVIOUS_COMMITTED:
+ /* action state remains the same, but we had a commit. */
+ pThis->bHadAutoCommit = 1;
+ break;
+ case RS_RET_SUSPENDED:
+ actionRetry(pThis);
+ break;
+ case RS_RET_DISABLE_ACTION:
+ actionDisable(pThis);
+ break;
+ default:/* permanent failure of this message - no sense in retrying. This is
+ * not yet handled (but easy TODO)
+ */
+ FINALIZE;
+ }
+ iRet = getReturnCode(pThis);
+
+finalize_it:
+ cleanupDoActionParams(pThis); /* iRet ignored! */
+
+ RETiRet;
+}
+
+
+/* process a message
+ * this readies the action and then calls doAction()
+ * rgerhards, 2008-01-28
+ */
+rsRetVal
+actionProcessMessage(action_t *pThis, msg_t *pMsg)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+ ISOBJ_TYPE_assert(pMsg, msg);
+
+RUNLOG_STR("inside actionProcessMsg()");
+ CHKiRet(actionPrepare(pThis));
+ if(pThis->eState == ACT_STATE_ITX)
+ CHKiRet(actionCallDoAction(pThis, pMsg));
+
+ iRet = getReturnCode(pThis);
+finalize_it:
+ RETiRet;
+}
+
+
+/* finish processing a batch. Most importantly, that means we commit if we
+ * need to do so.
+ * rgerhards, 2008-01-28
+ */
+static rsRetVal
+finishBatch(action_t *pThis)
+{
+ DEFiRet;
+
+ ASSERT(pThis != NULL);
+
+ if(pThis->eState == ACT_STATE_RDY)
+ FINALIZE; /* nothing to do */
+
+ CHKiRet(actionPrepare(pThis));
+ if(pThis->eState == ACT_STATE_ITX) {
+ iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData);
+ switch(iRet) {
+ case RS_RET_OK:
+ actionCommitted(pThis);
+ break;
+ case RS_RET_SUSPENDED:
+ actionRetry(pThis);
+ break;
+ case RS_RET_DISABLE_ACTION:
+ actionDisable(pThis);
+ break;
+ case RS_RET_DEFER_COMMIT:
+ DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT "
+ "- ignored\n");
+ actionCommitted(pThis);
+ break;
+ case RS_RET_PREVIOUS_COMMITTED:
+ DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED "
+ "- ignored\n");
+ actionCommitted(pThis);
+ break;
+ default:/* permanent failure of this message - no sense in retrying. This is
+ * not yet handled (but easy TODO)
+ */
+ FINALIZE;
+ }
+ }
+ iRet = getReturnCode(pThis);
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* try to submit a partial batch of elements.
+ * rgerhards, 2009-05-12
+ */
+static rsRetVal
+tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
+{
int i;
- int iSleepPeriod;
- int bCallAction;
- int iCancelStateSave;
- uchar **ppMsgs; /* array of message pointers for doAction */
+ int iElemProcessed;
+ int iCommittedUpTo;
+ msg_t *pMsg;
+ rsRetVal localRet;
+ DEFiRet;
- ASSERT(pAction != NULL);
+ assert(pBatch != NULL);
+ assert(pnElem != NULL);
+
+ i = pBatch->iDoneUpTo; /* all messages below that index are processed */
+ iElemProcessed = 0;
+ iCommittedUpTo = i;
+ while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
+ dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p\n", i, pBatch->nElem, *pnElem, pMsg);//remove later!
+ localRet = actionProcessMessage(pAction, pMsg);
+ dbgprintf("action call returned %d\n", localRet);
+ if(localRet == RS_RET_OK) {
+ /* mark messages as committed */
+ while(iCommittedUpTo < i) {
+ pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ }
+ } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
+ /* mark messages as committed */
+ while(iCommittedUpTo < i - 1) {
+ pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
+ }
+ pBatch->pElem[i].state = BATCH_STATE_SUB;
+ } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
+ pBatch->pElem[i].state = BATCH_STATE_SUB;
+ } else {
+ iRet = localRet;
+ FINALIZE;
+ }
+ ++i;
+ ++iElemProcessed;
+ }
- /* create the array for doAction() message pointers */
- if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+finalize_it:
+ if(pBatch->iDoneUpTo != iCommittedUpTo) {
+ *pnElem += iCommittedUpTo - pBatch->iDoneUpTo;
+ pBatch->iDoneUpTo = iCommittedUpTo;
}
+ RETiRet;
+}
- /* here we must loop to process all requested strings */
- for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(ppMsgs[i])));
+
+/* submit a batch for actual action processing.
+ * The first nElem elements are processed. This function calls itself
+ * recursively if it needs to handle errors.
+ * rgerhards, 2009-05-12
+ */
+static rsRetVal
+submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
+{
+ int i;
+ int bDone;
+ rsRetVal localRet;
+ DEFiRet;
+
+ assert(pBatch != NULL);
+
+ bDone = 0;
+ do {
+ localRet = tryDoAction(pAction, pBatch, &nElem);
+ if( localRet == RS_RET_OK
+ || localRet == RS_RET_PREVIOUS_COMMITTED
+ || localRet == RS_RET_DEFER_COMMIT) {
+ /* try commit transaction, once done, we can simply do so as if
+ * that return state was returned from tryDoAction().
+ */
+ localRet = finishBatch(pAction);
+ }
+
+ if( localRet == RS_RET_OK
+ || localRet == RS_RET_PREVIOUS_COMMITTED
+ || localRet == RS_RET_DEFER_COMMIT) {
+ bDone = 1;
+ } else if(localRet == RS_RET_SUSPENDED) {
+ ; /* do nothing, this will retry the full batch */
+ } else if(localRet == RS_RET_ACTION_FAILED) {
+ /* in this case, the whole batch can not be processed */
+ for(i = 0 ; i < nElem ; ++i) {
+ pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD;
+ }
+ bDone = 1;
+ } else {
+ if(nElem == 1) {
+ pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD;
+ bDone = 1;
+ } else {
+ /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
+ submitBatch(pAction, pBatch, nElem / 2);
+ submitBatch(pAction, pBatch, nElem - (nElem / 2));
+ bDone = 1;
+ }
+ }
+ } while(!bDone); /* do .. while()! */
+
+ RETiRet;
+}
+
+
+/* receive a batch and process it. This includes retry handling.
+ * rgerhards, 2009-05-12
+ */
+static rsRetVal
+processAction(action_t *pAction, batch_t *pBatch)
+{
+ int i;
+ msg_t *pMsg;
+ rsRetVal localRet;
+ DEFiRet;
+
+ assert(pBatch != NULL);
+
+ pBatch->iDoneUpTo = 0;
+ /* TODO: think about action batches, must be handled at upper layer!
+ * MULTIQUEUE
+ */
+ localRet = submitBatch(pAction, pBatch, pBatch->nElem);
+ CHKiRet(localRet);
+
+ /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */
+ for(i = 0 ; i < pBatch->nElem ; i++) {
+ pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
}
- iRetries = 0;
+ iRet = finishBatch(pAction);
+
+finalize_it:
+ RETiRet;
+}
+
+
+#pragma GCC diagnostic ignored "-Wempty-body"
+/* receive an array of to-process user pointers and submit them
+ * for processing.
+ * rgerhards, 2009-04-22
+ */
+static rsRetVal
+processBatchMain(action_t *pAction, batch_t *pBatch)
+{
+ int iCancelStateSave;
+ DEFiRet;
+
+ assert(pBatch != NULL);
+
/* We now must guard the output module against execution by multiple threads. The
* plugin interface specifies that output modules must not be thread-safe (except
* if they notify us they are - functionality not yet implemented...).
@@ -444,60 +955,48 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg)
d_pthread_mutex_lock(&pAction->mutActExec);
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
pthread_setcancelstate(iCancelStateSave, NULL);
- do {
- /* on first invocation, this if should never be true. We just put it at the top
- * of the loop so that processing (and code) is simplified. This code is actually
- * triggered on the 2nd+ invocation. -- rgerhards, 2008-01-30
- */
- if(iRet == RS_RET_SUSPENDED) {
- /* ok, this calls for our retry logic... */
- ++iRetries;
- iSleepPeriod = pAction->iResumeInterval;
- srSleep(iSleepPeriod, 0);
- }
- /* first check if we are suspended and, if so, retry */
- if(actionIsSuspended(pAction)) {
- iRet = actionTryResume(pAction);
- if(iRet == RS_RET_OK)
- bCallAction = 1;
- else
- bCallAction = 0;
- } else {
- bCallAction = 1;
- }
- if(bCallAction) {
- /* call configured action */
- iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData);
- if(iRet == RS_RET_SUSPENDED) {
- dbgprintf("Action requested to be suspended, done that.\n");
- actionSuspend(pAction, getActNow(pAction));
- }
- }
+ iRet = processAction(pAction, pBatch);
- } while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */
+ pthread_cleanup_pop(1); /* unlock mutex */
+
+ RETiRet;
+}
+#pragma GCC diagnostic warning "-Wempty-body"
+
+
+/* call the HUP handler for a given action, if such a handler is defined. The
+ * action mutex is locked, because the HUP handler most probably needs to modify
+ * some internal state information.
+ * rgerhards, 2008-10-22
+ */
+#pragma GCC diagnostic ignored "-Wempty-body"
+rsRetVal
+actionCallHUPHdlr(action_t *pAction)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ASSERT(pAction != NULL);
+ DBGPRINTF("Action %p checks HUP hdlr: %p\n", pAction, pAction->pMod->doHUP);
- if(iRet == RS_RET_DISABLE_ACTION) {
- dbgprintf("Action requested to be disabled, done that.\n");
- pAction->bEnabled = 0; /* that's it... */
+ if(pAction->pMod->doHUP == NULL) {
+ FINALIZE; /* no HUP handler, so we are done ;) */
}
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ d_pthread_mutex_lock(&pAction->mutActExec);
+ pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ CHKiRet(pAction->pMod->doHUP(pAction->pModData));
pthread_cleanup_pop(1); /* unlock mutex */
finalize_it:
- /* cleanup */
- for(i = 0 ; i < pAction->iNumTpls ; ++i) {
- if(ppMsgs[i] != NULL) {
- d_free(ppMsgs[i]);
- }
- }
- d_free(ppMsgs);
- msgDestruct(&pMsg); /* we are now finished with the message */
-
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
+
/* set the action message queue mode
* TODO: probably move this into queue object, merge with MainMsgQueue!
* rgerhards, 2008-01-28
@@ -571,7 +1070,7 @@ actionWriteToAction(action_t *pAction)
if(pAction->iNbrNoExec < pAction->iExecEveryNthOccur - 1) {
++pAction->iNbrNoExec;
dbgprintf("action %p passed %d times to execution - less than neded - discarding\n",
- pAction, pAction->iNbrNoExec);
+ pAction, pAction->iNbrNoExec);
FINALIZE;
} else {
pAction->iNbrNoExec = 0; /* we execute the action now, so the number of no execs is down to */
@@ -588,9 +1087,8 @@ actionWriteToAction(action_t *pAction)
*/
if(pAction->f_prevcount > 1) {
msg_t *pMsg;
- uchar szRepMsg[64];
- snprintf((char*)szRepMsg, sizeof(szRepMsg), "last message repeated %d times",
- pAction->f_prevcount);
+ size_t lenRepMsg;
+ uchar szRepMsg[1024];
if((pMsg = MsgDup(pAction->f_pMsg)) == NULL) {
/* it failed - nothing we can do against it... */
@@ -598,15 +1096,20 @@ actionWriteToAction(action_t *pAction)
ABORT_FINALIZE(RS_RET_ERR);
}
- /* We now need to update the other message properties.
- * ... RAWMSG is a problem ... Please note that digital
+ if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */
+ lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times",
+ pAction->f_prevcount);
+ } else {
+ lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]",
+ pAction->f_prevcount, getMSG(pAction->f_pMsg));
+ }
+
+ /* We now need to update the other message properties. Please note that digital
* signatures inside the message are also invalidated.
*/
- datetime.getCurrTime(&(pMsg->tRcvdAt));
+ datetime.getCurrTime(&(pMsg->tRcvdAt), &(pMsg->ttGenTime));
memcpy(&pMsg->tTIMESTAMP, &pMsg->tRcvdAt, sizeof(struct syslogTime));
- MsgSetMSG(pMsg, (char*)szRepMsg);
- MsgSetRawMsg(pMsg, (char*)szRepMsg);
-
+ MsgReplaceMSG(pMsg, szRepMsg, lenRepMsg);
pMsgSave = pAction->f_pMsg; /* save message pointer for later restoration */
pAction->f_pMsg = pMsg; /* use the new msg (pointer will be restored below) */
}
@@ -625,18 +1128,17 @@ actionWriteToAction(action_t *pAction)
dbgprintf("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n",
(int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction),
(int) (pAction->iSecsExecOnceInterval + pAction->tLastExec));
+ pAction->tLastExec = getActNow(pAction); /* re-init time flags */
FINALIZE;
}
- pAction->f_time = pAction->tLastExec = getActNow(pAction); /* re-init time flags */
- /* Note: tLastExec could be set in the if block above, but f_time causes us a hard time
- * so far, I do not see a solution to getting rid of it. -- rgerhards, 2008-09-16
- */
+ /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */
+ pAction->f_time = pAction->f_pMsg->ttGenTime;
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = queueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
+ iRet = qqueueEnqObj(pAction->pQueue, pAction->f_pMsg->flowCtlType, (void*) MsgAddRef(pAction->f_pMsg));
if(iRet == RS_RET_OK)
pAction->f_prevcount = 0; /* message processed, so we start a new cycle */
@@ -660,45 +1162,13 @@ finalize_it:
}
-/* call the configured action. Does all necessary housekeeping.
- * rgerhards, 2007-08-01
- * FYI: currently, this function is only called from the queue
- * consumer. So we (conceptually) run detached from the input
- * threads (which also means we may run much later than when the
- * message was generated).
+/* helper to actonCallAction, mostly needed because of this damn
+ * pthread_cleanup_push() POSIX macro...
*/
-#pragma GCC diagnostic ignored "-Wempty-body"
-rsRetVal
-actionCallAction(action_t *pAction, msg_t *pMsg)
+static rsRetVal
+doActionCallAction(action_t *pAction, msg_t *pMsg)
{
DEFiRet;
- int iCancelStateSave;
-
- ISOBJ_TYPE_assert(pMsg, msg);
- ASSERT(pAction != NULL);
-
- /* Make sure nodbody else modifies/uses this action object. Right now, this
- * is important because of "message repeated n times" processing and potentially
- * multiple worker threads. -- rgerhards, 2007-12-11
- */
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- LockObj(pAction);
- pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
- pthread_setcancelstate(iCancelStateSave, NULL);
-
- /* first, we need to check if this is a disabled
- * entry. If so, we must not further process it.
- * rgerhards 2005-09-26
- * In the future, disabled modules may be re-probed from time
- * to time. They are in a perfectly legal state, except that the
- * doAction method indicated that it wanted to be disabled - but
- * we do not consider this is a solution for eternity... So we
- * should check from time to time if affairs have improved.
- * rgerhards, 2007-07-24
- */
- if(pAction->bEnabled == 0) {
- ABORT_FINALIZE(RS_RET_OK);
- }
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
@@ -712,8 +1182,8 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
(pMsg->msgFlags & MARK) == 0 && getMSGLen(pMsg) == getMSGLen(pAction->f_pMsg) &&
!strcmp(getMSG(pMsg), getMSG(pAction->f_pMsg)) &&
!strcmp(getHOSTNAME(pMsg), getHOSTNAME(pAction->f_pMsg)) &&
- !strcmp(getPROCID(pMsg), getPROCID(pAction->f_pMsg)) &&
- !strcmp(getAPPNAME(pMsg), getAPPNAME(pAction->f_pMsg))) {
+ !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(pAction->f_pMsg, LOCK_MUTEX)) &&
+ !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(pAction->f_pMsg, LOCK_MUTEX))) {
pAction->f_prevcount++;
dbgprintf("msg repeated %d times, %ld sec of %d.\n",
pAction->f_prevcount, (long) getActNow(pAction) - pAction->f_time,
@@ -746,10 +1216,44 @@ actionCallAction(action_t *pAction, msg_t *pMsg)
}
finalize_it:
- pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
- UnlockObj(pAction);
- pthread_cleanup_pop(0); /* remove mutex cleanup handler */
- pthread_setcancelstate(iCancelStateSave, NULL);
+ RETiRet;
+}
+
+
+/* call the configured action. Does all necessary housekeeping.
+ * rgerhards, 2007-08-01
+ * FYI: currently, this function is only called from the queue
+ * consumer. So we (conceptually) run detached from the input
+ * threads (which also means we may run much later than when the
+ * message was generated).
+ */
+#pragma GCC diagnostic ignored "-Wempty-body"
+rsRetVal
+actionCallAction(action_t *pAction, msg_t *pMsg)
+{
+ DEFiRet;
+ int iCancelStateSave;
+
+ ISOBJ_TYPE_assert(pMsg, msg);
+ ASSERT(pAction != NULL);
+
+ /* We need to lock the mutex only for repeated line processing.
+ * rgerhards, 2009-06-19
+ */
+ //if(pAction->f_ReduceRepeated == 1) {
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ LockObj(pAction);
+ pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut);
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ iRet = doActionCallAction(pAction, pMsg);
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+ UnlockObj(pAction);
+ pthread_cleanup_pop(0); /* remove mutex cleanup handler */
+ pthread_setcancelstate(iCancelStateSave, NULL);
+ //} else {
+ //iRet = doActionCallAction(pAction, pMsg);
+ //}
+
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
@@ -763,14 +1267,17 @@ actionAddCfSysLineHdrl(void)
{
DEFiRet;
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL));
@@ -785,6 +1292,7 @@ actionAddCfSysLineHdrl(void)
CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL));
CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL));
+ CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL));
finalize_it:
RETiRet;
@@ -814,10 +1322,13 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
CHKiRet(actionConstruct(&pAction)); /* create action object first */
pAction->pMod = pMod;
pAction->pModData = pModData;
+ pAction->pszName = pszActionName;
+ pszActionName = NULL; /* free again! */
pAction->bExecWhenPrevSusp = bActExecWhenPrevSusp;
pAction->iSecsExecOnceInterval = iActExecOnceInterval;
pAction->iExecEveryNthOccur = iActExecEveryNthOccur;
pAction->iExecEveryNthOccurTO = iActExecEveryNthOccurTO;
+ pAction->bRepMsgHasMsg = bActionRepMsgHasMsg;
iActExecEveryNthOccur = 0; /* auto-reset */
iActExecEveryNthOccurTO = 0; /* auto-reset */
@@ -830,9 +1341,9 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
*/
if(pAction->iNumTpls > 0) {
/* we first need to create the template pointer array */
- if((pAction->ppTpl = calloc(pAction->iNumTpls, sizeof(struct template *))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc(pAction->ppTpl = (struct template **)calloc(pAction->iNumTpls, sizeof(struct template *)));
+ CHKmalloc(pAction->ppMsgs = (uchar**) calloc(pAction->iNumTpls, sizeof(uchar *)));
+ CHKmalloc(pAction->lenMsgs = (size_t*) calloc(pAction->iNumTpls, sizeof(size_t)));
}
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
@@ -858,6 +1369,13 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
ABORT_FINALIZE(RS_RET_RQD_TPLOPT_MISSING);
}
+ /* set parameter-passing mode */
+ if(iTplOpts & OMSR_TPL_AS_ARRAY) {
+ pAction->eParamPassing = ACT_ARRAY_PASSING;
+ } else {
+ pAction->eParamPassing = ACT_STRING_PASSING;
+ }
+
dbgprintf("template: '%s' assigned\n", pTplName);
}
@@ -870,7 +1388,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques
dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n");
pAction->f_ReduceRepeated = 0;
}
- pAction->bEnabled = 1; /* action is enabled */
+ pAction->eState = ACT_STATE_RDY; /* action is enabled */
if(bSuspended)
actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */