summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c737
-rw-r--r--action.h19
-rw-r--r--runtime/batch.h14
-rw-r--r--runtime/conf.c1
-rw-r--r--runtime/queue.c4
-rw-r--r--runtime/ruleset.c33
-rw-r--r--runtime/wti.h74
7 files changed, 314 insertions, 568 deletions
diff --git a/action.c b/action.c
index 61918884..71ffd788 100644
--- a/action.c
+++ b/action.c
@@ -14,7 +14,6 @@
*
* if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval
* - doSubmitToActionQComplexBatch
- * - helperSubmitToActionQComplexBatch
* - doActionCallAction
* handles mark message reduction, but in essence calls
* - actionWriteToAction
@@ -36,9 +35,6 @@
*
* After dequeue, processing is as follows:
* - processBatchMain
- * - processAction
- * - submitBatch
- * - tryDoAction
* - ...
*
* MORE ON PROCESSING, QUEUES and FILTERING
@@ -69,7 +65,7 @@
* beast.
* rgerhards, 2011-06-15
*
- * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -120,9 +116,9 @@
/* forward definitions */
static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*);
-static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
-static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
-static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
+static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*);
+static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*);
+static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -496,9 +492,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal)
* returned string must not be modified.
* rgerhards, 2009-05-07
*/
-static uchar *getActStateName(action_t *pThis)
+static uchar *getActStateName(action_t *pThis, wti_t *pWti)
{
- switch(pThis->eState) {
+ switch(getActionState(pWti, pThis)) {
case ACT_STATE_RDY:
return (uchar*) "rdy";
case ACT_STATE_ITX:
@@ -520,12 +516,12 @@ static uchar *getActStateName(action_t *pThis)
/* returns a suitable return code based on action state
* rgerhards, 2009-05-07
*/
-static rsRetVal getReturnCode(action_t *pThis)
+static rsRetVal getReturnCode(action_t *pThis, wti_t *pWti)
{
DEFiRet;
ASSERT(pThis != NULL);
- switch(pThis->eState) {
+ switch(getActionState(pWti, pThis)) {
case ACT_STATE_RDY:
iRet = RS_RET_OK;
break;
@@ -545,8 +541,8 @@ static rsRetVal getReturnCode(action_t *pThis)
iRet = RS_RET_ACTION_FAILED;
break;
default:
- DBGPRINTF("Invalid action engine state %d, program error\n",
- (int) pThis->eState);
+ DBGPRINTF("Invalid action engine state %u, program error\n",
+ getActionState(pWti, pThis));
iRet = RS_RET_ERR;
break;
}
@@ -558,29 +554,31 @@ static rsRetVal getReturnCode(action_t *pThis)
/* set the action to a new state
* rgerhards, 2007-08-02
*/
-static inline void actionSetState(action_t *pThis, action_state_t newState)
+static inline void
+actionSetState(action_t *pThis, wti_t *pWti, uint8_t newState)
{
- pThis->eState = newState;
- DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis));
+ setActionState(pWti, pThis, newState);
+ DBGPRINTF("Action %d transitioned to state: %s\n",
+ pThis->iActionNbr, getActStateName(pThis, pWti));
}
/* Handles the transient commit state. So far, this is
* mostly a dummy...
* rgerhards, 2007-08-02
*/
-static void actionCommitted(action_t *pThis)
+static void actionCommitted(action_t *pThis, wti_t *pWti)
{
- actionSetState(pThis, ACT_STATE_RDY);
+ actionSetState(pThis, pWti, ACT_STATE_RDY);
}
/* set action to "rtry" state.
* rgerhards, 2007-08-02
*/
-static void actionRetry(action_t *pThis)
+static void actionRetry(action_t *pThis, wti_t *pWti)
{
- actionSetState(pThis, ACT_STATE_RTRY);
- pThis->iResumeOKinRow++;
+ actionSetState(pThis, pWti, ACT_STATE_RTRY);
+ incActionResumeInRow(pWti, pThis);
}
@@ -589,9 +587,9 @@ static void actionRetry(action_t *pThis)
* depends on output module.
* rgerhards, 2007-08-02
*/
-static void actionDisable(action_t *pThis)
+static void actionDisable(action_t *pThis, wti_t *pWti)
{
- actionSetState(pThis, ACT_STATE_DIED);
+ actionSetState(pThis, pWti, ACT_STATE_DIED);
}
@@ -603,7 +601,7 @@ static void actionDisable(action_t *pThis)
* CPU time. TODO: maybe a config option for that?
* rgerhards, 2007-08-02
*/
-static inline void actionSuspend(action_t *pThis)
+static inline void actionSuspend(action_t *pThis, wti_t *pWti)
{
time_t ttNow;
@@ -611,8 +609,9 @@ static inline void actionSuspend(action_t *pThis)
* since caching, and this would break logic (and it actually did so!)
*/
datetime.GetTime(&ttNow);
- pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
- actionSetState(pThis, ACT_STATE_SUSP);
+ pThis->ttResumeRtry = ttNow + pThis->iResumeInterval *
+ (getActionNbrResRtry(pWti, pThis) / 10 + 1);
+ actionSetState(pThis, pWti, ACT_STATE_SUSP);
DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry);
}
@@ -633,7 +632,7 @@ static inline void actionSuspend(action_t *pThis)
* of its inability to recover. -- rgerhards, 2010-04-26.
*/
static inline rsRetVal
-actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
+actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
{
int iRetries;
int iSleepPeriod;
@@ -643,27 +642,27 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
ASSERT(pThis != NULL);
iRetries = 0;
- while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
+ while((*pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries);
iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet);
- if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) {
+ if((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) {
bTreatOKasSusp = 1;
- pThis->iResumeOKinRow = 0;
+ setActionResumeInRow(pWti, pThis, 0);
} else {
bTreatOKasSusp = 0;
}
if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) {
DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet);
- actionSetState(pThis, ACT_STATE_RDY);
+ actionSetState(pThis, pWti, ACT_STATE_RDY);
} else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) {
/* max retries reached? */
DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n",
pThis->iResumeRetryCount, iRetries);
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
- actionSuspend(pThis);
+ actionSuspend(pThis, pWti);
} else {
- ++pThis->iNbrResRtry;
+ incActionNbrResRtry(pWti, pThis);
++iRetries;
iSleepPeriod = pThis->iResumeInterval;
srSleep(iSleepPeriod, 0);
@@ -672,12 +671,12 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
}
}
} else if(iRet == RS_RET_DISABLE_ACTION) {
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
}
}
- if(pThis->eState == ACT_STATE_RDY) {
- pThis->iNbrResRtry = 0;
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
+ setActionNbrResRtry(pWti, pThis, 0);
}
finalize_it:
@@ -696,6 +695,7 @@ dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis
CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData),
pThis->pModData));
pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis;
+ setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */
}
finalize_it:
RETiRet;
@@ -704,14 +704,13 @@ finalize_it:
/* try to resume an action -- rgerhards, 2007-08-02
* changed to new action state engine -- rgerhards, 2009-05-07
*/
-static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
+static rsRetVal
+actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
{
DEFiRet;
time_t ttNow = NO_TIME_PROVIDED;
- ASSERT(pThis != NULL);
-
- if(pThis->eState == ACT_STATE_SUSP) {
+ if(getActionState(pWti, pThis) == ACT_STATE_SUSP) {
/* if we are suspended, we need to check if the timeout expired.
* for this handling, we must always obtain a fresh timestamp. We used
* to use the action timestamp, but in this case we will never reach a
@@ -721,19 +720,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t
*/
datetime.GetTime(&ttNow); /* cache "now" */
if(ttNow >= pThis->ttResumeRtry) {
- actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */
+ actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */
}
}
- if(pThis->eState == ACT_STATE_RTRY) {
+ if(getActionState(pWti, pThis) == ACT_STATE_RTRY) {
if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
datetime.GetTime(&ttNow);
- CHKiRet(actionDoRetry(pThis, pbShutdownImmediate, pWti));
+ CHKiRet(actionDoRetry(pThis, pWti, pbShutdownImmediate));
}
- if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
+ if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) {
DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n",
- pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
}
finalize_it:
@@ -745,28 +744,30 @@ finalize_it:
* depending on its current state.
* rgerhards, 2009-05-07
*/
-static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
+static inline rsRetVal
+actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
assert(pThis != NULL);
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
- CHKiRet(actionTryResume(pThis, pbShutdownImmediate, pWti));
+ CHKiRet(actionTryResume(pThis, pWti, pbShutdownImmediate));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
*/
- if(pThis->eState == ACT_STATE_RDY) {
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
+dbgprintf("DDDDD: calling beginTransaction for action %d\n", pThis->iActionNbr);
iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionSetState(pThis, ACT_STATE_ITX);
+ actionSetState(pThis, pWti, ACT_STATE_ITX);
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
break;
default:FINALIZE;
}
@@ -780,7 +781,7 @@ finalize_it:
/* debug-print the contents of an action object
* rgerhards, 2007-08-02
*/
-rsRetVal actionDbgPrint(action_t *pThis)
+static rsRetVal actionDbgPrint(action_t *pThis)
{
DEFiRet;
char *sz;
@@ -790,11 +791,12 @@ rsRetVal actionDbgPrint(action_t *pThis)
dbgprintf("\n");
dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
- if(pThis->eState == ACT_STATE_SUSP) {
+#if 0 // do we need this ???
+ if(getActionState(pWti, pThis) == ACT_STATE_SUSP) {
dbgprintf("\tresume next retry: %u, number retries: %d",
(unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry);
}
- dbgprintf("\tState: %s\n", getActStateName(pThis));
+#endif
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
if(pThis->submitToActQ == doSubmitToActionQComplexBatch) {
sz = "slow, but feature-rich";
@@ -816,34 +818,31 @@ rsRetVal actionDbgPrint(action_t *pThis)
* rgerhards, 2009-05-07
*/
static rsRetVal
-prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow)
+prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow)
{
int i;
- msg_t *pMsg;
struct json_object *json;
+ actWrkrInfo_t *pWrkrInfo;
DEFiRet;
- ASSERT(pAction != NULL);
- ASSERT(pElem != NULL);
-
- pMsg = pElem->pMsg;
+ pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]),
- &pElem->staticLenStrings[i], ttNow));
- pElem->staticActParams[i] = pElem->staticActStrings[i];
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->staticActStrings[i]),
+ &pWrkrInfo->staticLenStrings[i], ttNow));
+ pWrkrInfo->staticActParams[i] = pWrkrInfo->staticActStrings[i];
break;
case ACT_ARRAY_PASSING:
- CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow));
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow));
break;
case ACT_MSG_PASSING:
- pElem->staticActParams[i] = (void*) pMsg;
+ pWrkrInfo->staticActParams[i] = (void*) pMsg;
break;
case ACT_JSON_PASSING:
CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow));
- pElem->staticActParams[i] = (void*) json;
+ pWrkrInfo->staticActParams[i] = (void*) json;
break;
default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n",
(int) pAction->eParamPassing);
@@ -857,69 +856,55 @@ finalize_it:
}
-/* free a batches ressources, but not string buffers (because they will
- * most probably be reused). String buffers are only deleted upon final
- * destruction of the batch.
- * This function here must be called only when the batch is actually no
- * longer used, also not for retrying actions or such. It invalidates
- * buffers.
- * rgerhards, 2010-12-17
- */
-static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
+static void
+releaseDoActionParams(action_t *pAction, wti_t *pWti)
{
int jArr;
- int i, j;
- batch_obj_t *pElem;
+ int j;
+ actWrkrInfo_t *pWrkrInfo;
uchar ***ppMsgs;
- DEFiRet;
-
- ASSERT(pAction != NULL);
if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING)
goto done; /* we need to do nothing with these types! */
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- pElem = &(pBatch->pElem[i]);
- if(batchIsValidElem(pBatch, i)) {
- switch(pAction->eParamPassing) {
- case ACT_ARRAY_PASSING:
- ppMsgs = (uchar***) pElem->staticActParams;
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
- if(((uchar**)ppMsgs)[j] != NULL) {
- jArr = 0;
- while(ppMsgs[j][jArr] != NULL) {
- d_free(ppMsgs[j][jArr]);
- ppMsgs[j][jArr] = NULL;
- ++jArr;
- }
- d_free(((uchar**)ppMsgs)[j]);
- ((uchar**)ppMsgs)[j] = NULL;
- }
- }
- break;
- case ACT_JSON_PASSING:
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
- json_object_put((struct json_object*)
- pElem->staticActParams[j]);
- pElem->staticActParams[j] = NULL;
+ pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
+ switch(pAction->eParamPassing) {
+ case ACT_ARRAY_PASSING:
+ ppMsgs = (uchar***) pWrkrInfo->staticActParams;
+ for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ if(((uchar**)ppMsgs)[j] != NULL) {
+ jArr = 0;
+ while(ppMsgs[j][jArr] != NULL) {
+ free(ppMsgs[j][jArr]);
+ ppMsgs[j][jArr] = NULL;
+ ++jArr;
}
- break;
- case ACT_STRING_PASSING:
- case ACT_MSG_PASSING:
- /* can never happen, just to keep compiler happy! */
- break;
+ free(((uchar**)ppMsgs)[j]);
+ ((uchar**)ppMsgs)[j] = NULL;
}
}
+ break;
+ case ACT_JSON_PASSING:
+ for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ json_object_put((struct json_object*)
+ pWrkrInfo->staticActParams[j]);
+ pWrkrInfo->staticActParams[j] = NULL;
+ }
+ break;
+ case ACT_STRING_PASSING:
+ case ACT_MSG_PASSING:
+ /* can never happen, just to keep compiler happy! */
+ break;
}
-done: RETiRet;
+done: return;
}
/* call the DoAction output plugin entry point
* rgerhards, 2008-01-28
*/
-rsRetVal
+static rsRetVal
actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
{
DEFiRet;
@@ -928,7 +913,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
ISOBJ_TYPE_assert(pMsg, msg);
DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n",
- getActStateName(pThis), pThis->iActionNbr);
+ getActStateName(pThis, pWti), pThis->iActionNbr);
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
@@ -937,30 +922,30 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionCommitted(pThis);
- pThis->iResumeOKinRow = 0; /* we had a successful call! */
+ actionCommitted(pThis, pWti);
+ setActionResumeInRow(pWti, pThis, 0);
break;
case RS_RET_DEFER_COMMIT:
- pThis->iResumeOKinRow = 0; /* we had a successful call! */
+ setActionResumeInRow(pWti, pThis, 0);
/* we are done, action state remains the same */
break;
case RS_RET_PREVIOUS_COMMITTED:
/* action state remains the same, but we had a commit. */
pThis->bHadAutoCommit = 1;
- pThis->iResumeOKinRow = 0; /* we had a successful call! */
+ setActionResumeInRow(pWti, pThis, 0);
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
break;
default:/* permanent failure of this message - no sense in retrying. This is
* not yet handled (but easy TODO)
*/
FINALIZE;
}
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
@@ -982,59 +967,49 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd
CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
- if(pThis->eState == ACT_STATE_ITX)
+ if(getActionState(pWti, pThis) == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti));
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
}
-/* finish processing a batch. Most importantly, that means we commit if we
- * need to do so.
- * rgerhards, 2008-01-28
- */
+/* Commit action after processing. */
static rsRetVal
-finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
+actionCommit(action_t *pThis, wti_t *pWti)
{
- int i;
+ int pbShutdownImmediate = 1;
DEFiRet;
- ASSERT(pThis != NULL);
-
- if(pThis->eState == ACT_STATE_RDY) {
- /* we just need to flag the batch as commited */
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
FINALIZE; /* nothing to do */
}
- CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti));
- if(pThis->eState == ACT_STATE_ITX) {
+ CHKiRet(actionPrepare(pThis, &pbShutdownImmediate, pWti));
+ if(getActionState(pWti, pThis) == ACT_STATE_ITX) {
+dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr);
iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionCommitted(pThis);
- /* flag messages as committed */
- for(i = 0 ; i < pBatch->nElem ; ++i) {
- batchSetElemState(pBatch, i, BATCH_STATE_COMM);
- pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */
- }
+ actionCommitted(pThis, pWti);
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
break;
case RS_RET_DEFER_COMMIT:
DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT "
"- ignored\n");
- actionCommitted(pThis);
+ actionCommitted(pThis, pWti);
break;
case RS_RET_PREVIOUS_COMMITTED:
DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED "
"- ignored\n");
- actionCommitted(pThis);
+ actionCommitted(pThis, pWti);
break;
default:/* permanent failure of this message - no sense in retrying. This is
* not yet handled (but easy TODO)
@@ -1042,241 +1017,46 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
FINALIZE;
}
}
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
}
-/* try to submit a partial batch of elements.
- * rgerhards, 2009-05-12
- */
-static inline rsRetVal
-tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti)
-{
- int i;
- int iElemProcessed;
- int iCommittedUpTo;
- msg_t *pMsg;
- rsRetVal localRet;
- DEFiRet;
-
- assert(pBatch != NULL);
- assert(pnElem != NULL);
-
- i = pBatch->iDoneUpTo; /* all messages below that index are processed */
- iElemProcessed = 0;
- iCommittedUpTo = i;
- DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
- while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
- if(*(pBatch->pbShutdownImmediate))
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- /* NOTE: do NOT extend the filter below! Anything else must be done on the
- * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
- */
- if(batchIsValidElem(pBatch, i)) {
- pMsg = pBatch->pElem[i].pMsg;
- localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
- pBatch->pbShutdownImmediate, pWti);
- DBGPRINTF("action %p call returned %d\n", pAction, localRet);
- /* Note: we directly modify the batch object state, because we know that
- * wo do not overwrite BATCH_STATE_DISC indicators!
- */
- if(localRet == RS_RET_OK) {
- /* mark messages as committed */
- while(iCommittedUpTo <= i) {
- pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
- ++iCommittedUpTo;
- //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
- }
- } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
- /* mark messages as committed */
- while(iCommittedUpTo < i) {
- pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
- ++iCommittedUpTo;
- //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
- }
- pBatch->eltState[i] = BATCH_STATE_SUB;
- } else if(localRet == RS_RET_DEFER_COMMIT) {
- pBatch->eltState[i] = BATCH_STATE_SUB;
- } else if(localRet == RS_RET_DISCARDMSG) {
- pBatch->eltState[i] = BATCH_STATE_DISC;
- } else {
- dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n",
- localRet, *pnElem, iCommittedUpTo);
- iRet = localRet;
- FINALIZE;
- }
- }
- ++i;
- ++iElemProcessed;
- }
-
-finalize_it:
- if(pBatch->iDoneUpTo != iCommittedUpTo) {
- pBatch->iDoneUpTo = iCommittedUpTo;
- }
- RETiRet;
-}
-
-/* submit a batch for actual action processing.
- * The first nElem elements are processed. This function calls itself
- * recursively if it needs to handle errors.
- * Note: we don't need the number of the first message to be processed as a parameter,
- * because this is kept track of inside the batch itself (iDoneUpTo).
- * rgerhards, 2009-05-12
- */
-static rsRetVal
-submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti)
-{
- int i;
- int bDone;
- rsRetVal localRet;
- int wasDoneTo;
- DEFiRet;
-
- assert(pBatch != NULL);
-
- DBGPRINTF("submitBatch: enter, nElem %d\n", nElem);
- wasDoneTo = pBatch->iDoneUpTo;
- bDone = 0;
- do {
- localRet = tryDoAction(pAction, pBatch, &nElem, pWti);
- if(localRet == RS_RET_FORCE_TERM) {
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- }
- if( localRet == RS_RET_OK
- || localRet == RS_RET_PREVIOUS_COMMITTED
- || localRet == RS_RET_DEFER_COMMIT) {
- /* try commit transaction, once done, we can simply do so as if
- * that return state was returned from tryDoAction().
- */
- localRet = finishBatch(pAction, pBatch, pWti);
- }
-
- if( localRet == RS_RET_OK
- || localRet == RS_RET_PREVIOUS_COMMITTED
- || localRet == RS_RET_DEFER_COMMIT) {
- bDone = 1;
- } else if(localRet == RS_RET_SUSPENDED) {
- DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n");
- /* do nothing, this will retry the full batch */
- } else if(localRet == RS_RET_ACTION_FAILED) {
- /* in this case, everything not yet committed is BAD */
- for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) {
- if( pBatch->eltState[i] != BATCH_STATE_DISC
- && pBatch->eltState[i] != BATCH_STATE_COMM ) {
- pBatch->eltState[i] = BATCH_STATE_BAD;
- pBatch->pElem[i].bPrevWasSuspended = 1;
- STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail);
- }
- }
- bDone = 1;
- } else {
- if(nElem == 1) {
- batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD);
- bDone = 1;
- } else {
- /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
- DBGPRINTF("submitBatch recursing trying to find and exclude the culprit "
- "for iRet %d\n", localRet);
- submitBatch(pAction, pBatch, nElem / 2, pWti);
- submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti);
- bDone = 1;
- }
- }
- } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */
-
- if(*(pBatch->pbShutdownImmediate))
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
-
-finalize_it:
- RETiRet;
-}
-
-
-/* copy "active" array of batch, as we need to modify it. The caller
- * must make sure the new array is freed and the orginal batch
- * pointer is restored (thus the caller must save it). If active
- * is currently NULL, this is properly handled.
- * Note: the batches active pointer is modified, so it must be
- * saved BEFORE calling this function!
- * rgerhards, 2012-09-12
+/* process a single message. This is both called if we run from the
+ * cosumer side of an action queue as well as directly from the main
+ * queue thread if the action queue is set to "direct".
*/
static rsRetVal
-copyActive(batch_t *pBatch)
+processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate)
{
- sbool *active;
DEFiRet;
- CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
- if(pBatch->active == NULL)
- memset(active, 1, batchNumMsgs(pBatch));
- else
- memcpy(active, pBatch->active, batchNumMsgs(pBatch));
- pBatch->active = active;
-finalize_it:
+dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg);
+ // TODO: check error return states!
+ iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow);
+ iRet = actionProcessMessage(pAction, pMsg,
+ pWti->actWrkrInfo[pAction->iActionNbr].staticActParams,
+ pbShutdownImmediate, pWti);
+ releaseDoActionParams(pAction, pWti);
RETiRet;
}
-/* The following function prepares a batch for processing, that it is
- * reinitializes batch states, generates strings and does everything else
- * that needs to be done in order to make the batch ready for submission to
- * the actual output module. Note that we look at the precomputed
- * filter OK condition and process only those messages, that actually matched
- * the filter.
- * rgerhards, 2010-06-14
- */
-static inline rsRetVal
-prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr)
+/* Commit all active transactions */
+rsRetVal
+actionCommitAll(wti_t *pWti)
{
int i;
- batch_obj_t *pElem;
- struct syslogTime ttNow;
- DEFiRet;
-
- /* indicate we have not yet read the date */
- ttNow.year = 0;
-
- pBatch->iDoneUpTo = 0;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- pElem = &(pBatch->pElem[i]);
- if(batchIsValidElem(pBatch, i)) {
- pBatch->eltState[i] = BATCH_STATE_RDY;
- if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) {
- /* make sure we have our copy of "active" array */
- if(!*bMustRestoreActivePtr) {
- *activeSave = pBatch->active;
- copyActive(pBatch);
- }
- pBatch->active[i] = RSFALSE;
- }
+ for(i = 0 ; i < iActionNbr ; ++i) {
+ DBGPRINTF("DDDD: actionCommitall action %d state %u\n",
+ i, getActionStateByNbr(pWti, i));
+ if(getActionStateByNbr(pWti, i) != ACT_STATE_RDY) {
+ actionCommit(pWti->actWrkrInfo[i].pAction, pWti);
}
}
- RETiRet;
-}
-
-
-/* receive a batch and process it. This includes retry handling.
- * rgerhards, 2009-05-12
- */
-static inline rsRetVal
-processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti)
-{
- DEFiRet;
-
- assert(pBatch != NULL);
- CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti));
- iRet = finishBatch(pAction, pBatch, pWti);
-
-finalize_it:
- RETiRet;
}
-
/* receive an array of to-process user pointers and submit them
* for processing.
* rgerhards, 2009-04-22
@@ -1284,41 +1064,30 @@ finalize_it:
static rsRetVal
processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate)
{
- int *pbShutdownImmdtSave;
- sbool *activeSave;
- int bMustRestoreActivePtr = 0;
- rsRetVal localRet;
action_t *pAction = (action_t*) pVoid;
+ msg_t *pMsg;
+ int i;
+ struct syslogTime ttNow;
DEFiRet;
- assert(pBatch != NULL);
-
- if(pbShutdownImmediate != NULL) {
- pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
- pBatch->pbShutdownImmediate = pbShutdownImmediate;
+ if(pbShutdownImmediate == NULL) {
+ pbShutdownImmediate = pBatch->pbShutdownImmediate;
}
- CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr));
-
- iRet = processAction(pAction, pBatch, pWti);
- /* even if processAction failed, we need to release the batch (else we
- * have a memory leak). So we do this first, and then check if we need to
- * return an error code. If so, the code from processAction has priority.
- * rgerhards, 2010-12-17
- */
- localRet = releaseBatch(pAction, pBatch);
+ /* indicate we have not yet read the date */
+ ttNow.year = 0;
- if(iRet == RS_RET_OK)
- iRet = localRet;
-
- if(bMustRestoreActivePtr) {
- free(pBatch->active);
- pBatch->active = activeSave;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) {
+ if(batchIsValidElem(pBatch, i)) {
+ pMsg = pBatch->pElem[i].pMsg;
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate);
+ // TODO: we must refactor this! flag messages as committed
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+ }
}
-finalize_it:
- if(pbShutdownImmediate != NULL)
- pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
+ iRet = actionCommit(pAction, pWti);
+dbgprintf("DDDD: processBatchMain - end\n");
RETiRet;
}
@@ -1381,11 +1150,12 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
* rgerhards, 2010-06-08
*/
static inline rsRetVal
-doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti)
+doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
DEFiRet;
- if(pAction->eState == ACT_STATE_DIED) {
+ // TODO: bug? Isn't that supposed to be checked in direct mode as well???
+ if(getActionState(pWti, pAction) == ACT_STATE_DIED) {
DBGPRINTF("action %p died, do NOT execute\n", pAction);
FINALIZE;
}
@@ -1464,7 +1234,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = doSubmitToActionQ(pAction, pMsg, pWti);
+ iRet = doSubmitToActionQ(pAction, pWti, pMsg);
finalize_it:
RETiRet;
@@ -1475,12 +1245,10 @@ finalize_it:
* pthread_cleanup_push() POSIX macro...
*/
static inline rsRetVal
-doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti)
+doActionCallAction(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
- msg_t *pMsg;
DEFiRet;
- pMsg = pBatch->pElem[idxBtch].pMsg;
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
/* don't output marks to recently written outputs */
@@ -1493,13 +1261,6 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti)
iRet = actionWriteToAction(pAction, pMsg, pWti);
finalize_it:
- /* we need to update the batch to handle failover processing correctly */
- if(iRet == RS_RET_OK) {
- pBatch->pElem[idxBtch].bPrevWasSuspended = 0;
- } else if(iRet == RS_RET_ACTION_FAILED) {
- pBatch->pElem[idxBtch].bPrevWasSuspended = 1;
- }
-
RETiRet;
}
@@ -1518,7 +1279,8 @@ DEFFUNC_llExecFunc(doActivateActions)
errmsg.LogError(0, localRet, "file prefix (work directory?) "
"is missing");
}
- actionDisable(pThis);
+#warning think how to handle this:
+ //actionDisable(pThis);
}
DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod),
pThis, pThis->pQueue);
@@ -1550,113 +1312,55 @@ activateActions(void)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
time_t now = 0;
+ int doProcess = 1;
time_t lastAct;
- int i;
- sbool *activeSave;
DEFiRet;
- activeSave = pBatch->active;
- copyActive(pBatch);
-
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i])
- continue;
- if(now == 0) {
- now = datetime.GetTime(NULL); /* good time call - the only one done */
- }
- /* CAS loop, we write back a bit early, but that's OK... */
- /* we use reception time, not dequeue time - this is considered more appropriate and
- * also faster ;) -- rgerhards, 2008-09-17 */
- do {
- lastAct = pAction->f_time;
- if(pBatch->pElem[i].pMsg->msgFlags & MARK) {
- if((now - lastAct) < MarkInterval / 2) {
- pBatch->active[i] = 0;
- DBGPRINTF("batch item %d: action was recently called, ignoring "
- "mark message\n", i);
- break; /* do not update timestamp for non-written mark messages */
- }
+ if(now == 0) { // TODO: do in caller!
+ now = datetime.GetTime(NULL); /* good time call - the only one done */
+ }
+ /* CAS loop, we write back a bit early, but that's OK... */
+ /* we use reception time, not dequeue time - this is considered more appropriate and
+ * also faster ;) -- rgerhards, 2008-09-17 */
+ do {
+ lastAct = pAction->f_time;
+ if(pMsg->msgFlags & MARK) {
+ if((now - lastAct) < MarkInterval / 2) {
+ doProcess = 0;
+ DBGPRINTF("action was recently called, ignoring mark message\n");
+ break; /* do not update timestamp for non-written mark messages */
}
- } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
- pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0);
- if(pBatch->active[i]) {
- DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n",
- i, module.GetStateName(pAction->pMod));
}
- }
-
- iRet = doSubmitToActionQBatch(pAction, pBatch, pWti);
+ } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
+ pMsg->ttGenTime, &pAction->mutCAS) == 0);
- free(pBatch->active);
- pBatch->active = activeSave;
+ if(doProcess) {
+ DBGPRINTF("Called action(NotAllMark), processing via '%s'\n",
+ module.GetStateName(pAction->pMod));
+ iRet = doSubmitToActionQBatch(pAction, pWti, pMsg);
+ }
RETiRet;
}
-static inline void
-countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
-{
- int i;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( batchIsValidElem(pBatch, i)) {
- STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
- }
- }
-}
-
/* enqueue a batch in direct mode. We have put this into its own function just to avoid
* cluttering the actual submit function.
* rgerhards, 2011-06-16
*/
static inline rsRetVal
-doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
- sbool bNeedSubmit;
- sbool *activeSave;
- int i;
+int pbShutdownImmediate = 0; // TODO: implement
+ struct syslogTime ttNow;
DEFiRet;
-
- activeSave = pBatch->active;
- copyActive(pBatch);
-
- /* note: for direct mode, we need to adjust the filter property. For non-direct
- * this is not necessary, because in that case we enqueue only what actually needs
- * to be processed.
- */
- if(pAction->bExecWhenPrevSusp) {
- bNeedSubmit = 0;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if(!pBatch->pElem[i].bPrevWasSuspended) {
- DBGPRINTF("action enq stage: change active to 0 due to "
- "failover case in elem %d\n", i);
- pBatch->active[i] = 0;
- }
- if(batchIsValidElem(pBatch, i)) {
- STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
- bNeedSubmit = 1;
- }
- DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i],
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- }
- if(bNeedSubmit) {
- /* note: stats were already computed above */
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
- } else {
- DBGPRINTF("no need to submit batch, all invalid\n");
- }
- } else {
- if(GatherStats)
- countStatsBatchEnq(pAction, pBatch);
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
- }
-
- free(pBatch->active);
- pBatch->active = activeSave;
+ if(GatherStats)
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
+ ttNow.year = 0;
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate);
RETiRet;
}
@@ -1666,72 +1370,38 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
- int i;
DEFiRet;
- DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
- iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti);
+ iRet = doQueueEnqObjDirectBatch(pAction, pWti, pMsg);
} else {/* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- if( batchIsValidElem(pBatch, i)
- && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
- doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti);
- }
- }
+ doSubmitToActionQ(pAction, pWti, pMsg);
}
RETiRet;
}
-
-/* Helper to submit a batch of actions to the engine. Note that we have rather
- * complicated processing here, so we need to do this one message after another.
- * rgerhards, 2010-06-23
- */
-static inline rsRetVal
-helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
-{
- int i;
- DEFiRet;
-
- DBGPRINTF("Called action %p (complex case), logging to %s\n",
- pAction, module.GetStateName(pAction->pMod));
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- if( batchIsValidElem(pBatch, i)
- && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
- doActionCallAction(pAction, pBatch, i, pWti);
- }
- }
-
- RETiRet;
-}
-
/* Call configured action, most complex case with all features supported (and thus slow).
* rgerhards, 2010-06-08
*/
#pragma GCC diagnostic ignored "-Wempty-body"
static rsRetVal
-doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
DEFiRet;
d_pthread_mutex_lock(&pAction->mutAction);
-dbgprintf("DDDD: locked mutAction\n");
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction);
- iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti);
+ DBGPRINTF("Called action %p (complex case), logging to %s\n",
+ pAction, module.GetStateName(pAction->pMod));
+ doActionCallAction(pAction, pWti, pMsg);
d_pthread_mutex_unlock(&pAction->mutAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
@@ -1876,11 +1546,11 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->pMod = pMod;
pAction->pModData = pModData;
- /* check if the module is compatible with select features (currently no such features exist) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
- if(bSuspended)
- actionSuspend(pAction);
+#warning we need to look at the following
+ // Probably the core init needs to be done during createWrkrInstance()
+ //if(bSuspended)
+ // actionSuspend(pAction, pWti);
CHKiRet(actionConstructFinalize(pAction, lst));
@@ -1975,7 +1645,6 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* check if the module is compatible with select features
* (currently no such features exist) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
loadConf->actions.nbrActions++; /* one more active action! */
}
*ppAction = pAction;
diff --git a/action.h b/action.h
index 5c88b052..27bb74a0 100644
--- a/action.h
+++ b/action.h
@@ -35,15 +35,6 @@
extern int glbliActionResumeRetryCount;
-typedef enum {
- ACT_STATE_DIED = 0, /* action permanently failed and now disabled - MUST BE ZERO! */
- ACT_STATE_RDY = 1, /* action ready, waiting for new transaction */
- ACT_STATE_ITX = 2, /* transaction active, waiting for new data or commit */
- ACT_STATE_COMM = 3, /* transaction finished (a transient state) */
- ACT_STATE_RTRY = 4, /* failure occured, trying to restablish ready state */
- ACT_STATE_SUSP = 5 /* suspended due to failure (return fail until timeout expired) */
-} action_state_t;
-
/* the following struct defines the action object data structure
*/
struct action_s {
@@ -55,13 +46,10 @@ struct action_s {
sbool bExecWhenPrevSusp;/* execute only when previous action is suspended? */
sbool bWriteAllMarkMsgs;/* should all mark msgs be written (not matter how recent the action was executed)? */
int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */
- action_state_t eState; /* current state of action */
sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */
time_t ttResumeRtry; /* when is it time to retry the resume? */
- int iResumeOKinRow; /* number of times in a row that resume said OK with an immediate failure following */
int iResumeInterval;/* resume interval for this action */
int iResumeRetryCount;/* how often shall we retry a suspended action? (-1 --> eternal) */
- int iNbrResRtry; /* number of retries since last suspend */
int iNbrNoExec; /* number of matches that did not yet yield to an exec */
int iExecEveryNthOccur;/* execute this action only every n-th occurence (with n=0,1 -> always) */
int iExecEveryNthOccurTO;/* timeout for n-th occurence feature */
@@ -69,7 +57,7 @@ struct action_s {
struct modInfo_s *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
- rsRetVal (*submitToActQ)(action_t *, batch_t *, wti_t*);/* function submit message to action queue */
+ rsRetVal (*submitToActQ)(action_t *, wti_t*, msg_t*);/* function submit message to action queue */
rsRetVal (*qConstruct)(struct queue_s *pThis);
enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2,
ACT_JSON_PASSING = 3}
@@ -79,7 +67,7 @@ struct action_s {
* in this order. */
qqueue_t *pQueue; /* action queue */
pthread_mutex_t mutAction; /* primary action mutex */
- uchar *pszName; /* action name (for documentation) */
+ uchar *pszName; /* action name */
DEF_ATOMIC_HELPER_MUT(mutCAS);
/* for statistics subsystem */
statsobj_t *statsobj;
@@ -93,7 +81,7 @@ struct action_s {
rsRetVal actionConstruct(action_t **ppThis);
rsRetVal actionConstructFinalize(action_t *pThis, struct nvlst *lst);
rsRetVal actionDestruct(action_t *pThis);
-rsRetVal actionDbgPrint(action_t *pThis);
+//rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t*);
@@ -103,6 +91,7 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr
rsRetVal activateActions(void);
rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction);
rsRetVal actionProcessCnf(struct cnfobj *o);
+rsRetVal actionCommitAll(wti_t *pWti);
/* external data */
extern int iActionNbr;
diff --git a/runtime/batch.h b/runtime/batch.h
index 2ec07670..fac0158e 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -46,17 +46,6 @@ typedef unsigned char batch_state_t;
*/
struct batch_obj_s {
msg_t *pMsg;
- /* work variables for action processing; these are reused for each action (or block of
- * actions)
- */
- sbool bPrevWasSuspended;
- /* following are caches to save allocs if not absolutely necessary */
- uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
- /* a cache to save malloc(), if not absolutely necessary */
- void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */
- size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
- /* and the same for the message length (if used) */
- /* end action work variables */
};
/* the batch
@@ -152,7 +141,7 @@ batchFree(batch_t *pBatch) {
/* staticActParams MUST be freed immediately (if required),
* so we do not need to do that!
*/
- free(pBatch->pElem[i].staticActStrings[j]);
+ //TODO: do this in wti! free(pBatch->pElem[i].staticActStrings[j]);
}
}
free(pBatch->pElem);
@@ -171,7 +160,6 @@ batchInit(batch_t *pBatch, int maxElem) {
pBatch->maxElem = maxElem;
CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t)));
- // TODO: replace calloc by inidividual writes?
finalize_it:
RETiRet;
}
diff --git a/runtime/conf.c b/runtime/conf.c
index 2b000c60..1544e364 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -523,7 +523,6 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction)
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* here check if the module is compatible with select features
* (currently, we have no such features!) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
conf->actions.nbrActions++; /* one more active action! */
}
break;
diff --git a/runtime/queue.c b/runtime/queue.c
index 968c016e..93661e41 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -987,10 +987,6 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
singleBatch.eltState = &batchState;
singleBatch.active = &active;
iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate);
- /* delete the batch string params: TODO: create its own "class" for this */
- for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
- free(batchObj.staticActStrings[i]);
- }
msgDestruct(&pMsg);
RETiRet;
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index db253d28..2ad21170 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -228,10 +228,40 @@ static inline void freeActive(sbool *active) { free(active); }
static rsRetVal
execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
{
+ int i;
DEFiRet;
dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active);
pBatch->active = active;
- stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti);
+// TODO: check here if bPrevWasSuspsended was required and, if so
+// if we actually are permitted to execute this action.
+ //if(pAction->bExecWhenPrevSusp) {
+
+
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ DBGPRINTF("action %d: valid:%d state:%d execWhenPrev:%d\n",
+ stmt->d.act->iActionNbr, batchIsValidElem(pBatch, i), pBatch->eltState[i],
+ stmt->d.act->bExecWhenPrevSusp);
+ if(batchIsValidElem(pBatch, i)) {
+ stmt->d.act->submitToActQ(stmt->d.act, pWti, pBatch->pElem[i].pMsg);
+ // TODO: we must refactor this! flag messages as committed
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+ }
+ }
+
+
+#warning implement action return code checking
+// we should store the return code and make it available
+// to users via a special function (or maybe variable)
+// internally, we can use this for bPrevWasSuspended checking
+// to implement this system, we need to keep a kind of
+// "execution state" when running the rule engine. This most
+// probably is best done inside the wti object.
+// I think in v7 there was a bug, so that bPrevWasSuspended did
+// not properly make it onto the next batch (because it was
+// stored within the batch state) -- but even if so, the
+// exposure window was minimal, as the action would probably
+// fail the next time again. [TODO: check if batch object survived
+// end of batch, in which case it was probably correctly handled]
RETiRet;
}
@@ -582,6 +612,7 @@ processBatch(batch_t *pBatch, wti_t *pWti)
CHKiRet(processBatchMultiRuleset(pBatch, pWti));
}
+ actionCommitAll(pWti);
finalize_it:
DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet);
RETiRet;
diff --git a/runtime/wti.h b/runtime/wti.h
index bb4f56bc..79b62102 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -26,11 +26,32 @@
#include "wtp.h"
#include "obj.h"
#include "batch.h"
+#include "action.h"
+#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */
+#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */
+#define ACT_STATE_COMM 2 /* transaction finished (a transient state) */
+#define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */
+#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */
+#define ACT_STATE_DIED 7 /* action permanently failed and now disabled */
+/* note: 3 bit bit field --> highest value is 7! */
+
typedef struct actWrkrInfo {
action_t *pAction;
void *actWrkrData;
+ uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an immediate failure following */
+ int iNbrResRtry; /* number of retries since last suspend */
+ struct {
+ unsigned actState : 3;
+ } flags;
+ /* following are caches to save allocs if not absolutely necessary */
+ uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
+ /* a cache to save malloc(), if not absolutely necessary */
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */
+ size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+ /* and the same for the message length (if used) */
+ /* end action work variables */
} actWrkrInfo_t;
/* the worker thread instance class */
@@ -62,4 +83,57 @@ PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
+static inline uint8_t
+getActionStateByNbr(wti_t *pWti, int iActNbr)
+{
+ return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState);
+}
+
+static inline uint8_t
+getActionState(wti_t *pWti, action_t *pAction)
+{
+ return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState);
+}
+
+static inline void
+setActionState(wti_t *pWti, action_t *pAction, uint8_t newState)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState;
+}
+
+static inline uint16_t
+getActionResumeInRow(wti_t *pWti, action_t *pAction)
+{
+ return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow);
+}
+
+static inline void
+setActionResumeInRow(wti_t *pWti, action_t *pAction, uint16_t val)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val;
+}
+
+static inline void
+incActionResumeInRow(wti_t *pWti, action_t *pAction)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++;
+}
+
+static inline int
+getActionNbrResRtry(wti_t *pWti, action_t *pAction)
+{
+ return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry);
+}
+
+static inline void
+setActionNbrResRtry(wti_t *pWti, action_t *pAction, uint16_t val)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val;
+}
+
+static inline void
+incActionNbrResRtry(wti_t *pWti, action_t *pAction)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++;
+}
#endif /* #ifndef WTI_H_INCLUDED */