summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c790
-rw-r--r--action.h19
-rw-r--r--runtime/batch.h38
-rw-r--r--runtime/conf.c1
-rw-r--r--runtime/queue.c6
-rw-r--r--runtime/ruleset.c364
-rw-r--r--runtime/wti.c1
-rw-r--r--runtime/wti.h113
-rw-r--r--tools/syslogd.c17
9 files changed, 469 insertions, 880 deletions
diff --git a/action.c b/action.c
index 61918884..790ea4ce 100644
--- a/action.c
+++ b/action.c
@@ -14,7 +14,6 @@
*
* if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval
* - doSubmitToActionQComplexBatch
- * - helperSubmitToActionQComplexBatch
* - doActionCallAction
* handles mark message reduction, but in essence calls
* - actionWriteToAction
@@ -36,9 +35,6 @@
*
* After dequeue, processing is as follows:
* - processBatchMain
- * - processAction
- * - submitBatch
- * - tryDoAction
* - ...
*
* MORE ON PROCESSING, QUEUES and FILTERING
@@ -69,7 +65,7 @@
* beast.
* rgerhards, 2011-06-15
*
- * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -120,9 +116,9 @@
/* forward definitions */
static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*);
-static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
-static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
-static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
+static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*);
+static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*);
+static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -496,9 +492,9 @@ rsRetVal actionSetGlobalResumeInterval(int iNewVal)
* returned string must not be modified.
* rgerhards, 2009-05-07
*/
-static uchar *getActStateName(action_t *pThis)
+static uchar *getActStateName(action_t *pThis, wti_t *pWti)
{
- switch(pThis->eState) {
+ switch(getActionState(pWti, pThis)) {
case ACT_STATE_RDY:
return (uchar*) "rdy";
case ACT_STATE_ITX:
@@ -520,12 +516,12 @@ static uchar *getActStateName(action_t *pThis)
/* returns a suitable return code based on action state
* rgerhards, 2009-05-07
*/
-static rsRetVal getReturnCode(action_t *pThis)
+static rsRetVal getReturnCode(action_t *pThis, wti_t *pWti)
{
DEFiRet;
ASSERT(pThis != NULL);
- switch(pThis->eState) {
+ switch(getActionState(pWti, pThis)) {
case ACT_STATE_RDY:
iRet = RS_RET_OK;
break;
@@ -545,8 +541,8 @@ static rsRetVal getReturnCode(action_t *pThis)
iRet = RS_RET_ACTION_FAILED;
break;
default:
- DBGPRINTF("Invalid action engine state %d, program error\n",
- (int) pThis->eState);
+ DBGPRINTF("Invalid action engine state %u, program error\n",
+ getActionState(pWti, pThis));
iRet = RS_RET_ERR;
break;
}
@@ -558,29 +554,31 @@ static rsRetVal getReturnCode(action_t *pThis)
/* set the action to a new state
* rgerhards, 2007-08-02
*/
-static inline void actionSetState(action_t *pThis, action_state_t newState)
+static inline void
+actionSetState(action_t *pThis, wti_t *pWti, uint8_t newState)
{
- pThis->eState = newState;
- DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis));
+ setActionState(pWti, pThis, newState);
+ DBGPRINTF("Action %d transitioned to state: %s\n",
+ pThis->iActionNbr, getActStateName(pThis, pWti));
}
/* Handles the transient commit state. So far, this is
* mostly a dummy...
* rgerhards, 2007-08-02
*/
-static void actionCommitted(action_t *pThis)
+static void actionCommitted(action_t *pThis, wti_t *pWti)
{
- actionSetState(pThis, ACT_STATE_RDY);
+ actionSetState(pThis, pWti, ACT_STATE_RDY);
}
/* set action to "rtry" state.
* rgerhards, 2007-08-02
*/
-static void actionRetry(action_t *pThis)
+static void actionRetry(action_t *pThis, wti_t *pWti)
{
- actionSetState(pThis, ACT_STATE_RTRY);
- pThis->iResumeOKinRow++;
+ actionSetState(pThis, pWti, ACT_STATE_RTRY);
+ incActionResumeInRow(pWti, pThis);
}
@@ -589,9 +587,9 @@ static void actionRetry(action_t *pThis)
* depends on output module.
* rgerhards, 2007-08-02
*/
-static void actionDisable(action_t *pThis)
+static void actionDisable(action_t *pThis, wti_t *pWti)
{
- actionSetState(pThis, ACT_STATE_DIED);
+ actionSetState(pThis, pWti, ACT_STATE_DIED);
}
@@ -603,7 +601,7 @@ static void actionDisable(action_t *pThis)
* CPU time. TODO: maybe a config option for that?
* rgerhards, 2007-08-02
*/
-static inline void actionSuspend(action_t *pThis)
+static inline void actionSuspend(action_t *pThis, wti_t *pWti)
{
time_t ttNow;
@@ -611,8 +609,9 @@ static inline void actionSuspend(action_t *pThis)
* since caching, and this would break logic (and it actually did so!)
*/
datetime.GetTime(&ttNow);
- pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1);
- actionSetState(pThis, ACT_STATE_SUSP);
+ pThis->ttResumeRtry = ttNow + pThis->iResumeInterval *
+ (getActionNbrResRtry(pWti, pThis) / 10 + 1);
+ actionSetState(pThis, pWti, ACT_STATE_SUSP);
DBGPRINTF("action suspended, earliest retry=%d\n", (int) pThis->ttResumeRtry);
}
@@ -633,7 +632,7 @@ static inline void actionSuspend(action_t *pThis)
* of its inability to recover. -- rgerhards, 2010-04-26.
*/
static inline rsRetVal
-actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
+actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
{
int iRetries;
int iSleepPeriod;
@@ -643,27 +642,27 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
ASSERT(pThis != NULL);
iRetries = 0;
- while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
+ while((*pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries);
iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet);
- if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) {
+ if((getActionResumeInRow(pWti, pThis) > 9) && (getActionResumeInRow(pWti, pThis) % 10 == 0)) {
bTreatOKasSusp = 1;
- pThis->iResumeOKinRow = 0;
+ setActionResumeInRow(pWti, pThis, 0);
} else {
bTreatOKasSusp = 0;
}
if((iRet == RS_RET_OK) && (!bTreatOKasSusp)) {
DBGPRINTF("actionDoRetry: had success RDY again (iRet=%d)\n", iRet);
- actionSetState(pThis, ACT_STATE_RDY);
+ actionSetState(pThis, pWti, ACT_STATE_RDY);
} else if(iRet == RS_RET_SUSPENDED || bTreatOKasSusp) {
/* max retries reached? */
DBGPRINTF("actionDoRetry: check for max retries, iResumeRetryCount %d, iRetries %d\n",
pThis->iResumeRetryCount, iRetries);
if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) {
- actionSuspend(pThis);
+ actionSuspend(pThis, pWti);
} else {
- ++pThis->iNbrResRtry;
+ incActionNbrResRtry(pWti, pThis);
++iRetries;
iSleepPeriod = pThis->iResumeInterval;
srSleep(iSleepPeriod, 0);
@@ -672,12 +671,12 @@ actionDoRetry(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
}
}
} else if(iRet == RS_RET_DISABLE_ACTION) {
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
}
}
- if(pThis->eState == ACT_STATE_RDY) {
- pThis->iNbrResRtry = 0;
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
+ setActionNbrResRtry(pWti, pThis, 0);
}
finalize_it:
@@ -696,6 +695,7 @@ dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis
CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData),
pThis->pModData));
pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis;
+ setActionState(pWti, pThis, ACT_STATE_RDY); /* action is enabled */
}
finalize_it:
RETiRet;
@@ -704,14 +704,13 @@ finalize_it:
/* try to resume an action -- rgerhards, 2007-08-02
* changed to new action state engine -- rgerhards, 2009-05-07
*/
-static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
+static rsRetVal
+actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
{
DEFiRet;
time_t ttNow = NO_TIME_PROVIDED;
- ASSERT(pThis != NULL);
-
- if(pThis->eState == ACT_STATE_SUSP) {
+ if(getActionState(pWti, pThis) == ACT_STATE_SUSP) {
/* if we are suspended, we need to check if the timeout expired.
* for this handling, we must always obtain a fresh timestamp. We used
* to use the action timestamp, but in this case we will never reach a
@@ -721,19 +720,19 @@ static rsRetVal actionTryResume(action_t *pThis, int *pbShutdownImmediate, wti_t
*/
datetime.GetTime(&ttNow); /* cache "now" */
if(ttNow >= pThis->ttResumeRtry) {
- actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */
+ actionSetState(pThis, pWti, ACT_STATE_RTRY); /* back to retries */
}
}
- if(pThis->eState == ACT_STATE_RTRY) {
+ if(getActionState(pWti, pThis) == ACT_STATE_RTRY) {
if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
datetime.GetTime(&ttNow);
- CHKiRet(actionDoRetry(pThis, pbShutdownImmediate, pWti));
+ CHKiRet(actionDoRetry(pThis, pWti, pbShutdownImmediate));
}
- if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) {
+ if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) {
DBGPRINTF("actionTryResume: action %p state: %s, next retry (if applicable): %u [now %u]\n",
- pThis, getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
+ pThis, getActStateName(pThis, pWti), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow);
}
finalize_it:
@@ -745,28 +744,30 @@ finalize_it:
* depending on its current state.
* rgerhards, 2009-05-07
*/
-static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
+static inline rsRetVal
+actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
assert(pThis != NULL);
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
- CHKiRet(actionTryResume(pThis, pbShutdownImmediate, pWti));
+ CHKiRet(actionTryResume(pThis, pWti, pbShutdownImmediate));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
*/
- if(pThis->eState == ACT_STATE_RDY) {
+ if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
+dbgprintf("DDDDD: calling beginTransaction for action %d\n", pThis->iActionNbr);
iRet = pThis->pMod->mod.om.beginTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionSetState(pThis, ACT_STATE_ITX);
+ actionSetState(pThis, pWti, ACT_STATE_ITX);
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
break;
default:FINALIZE;
}
@@ -777,10 +778,11 @@ finalize_it:
}
+#if 0 // TODO: remove?
/* debug-print the contents of an action object
* rgerhards, 2007-08-02
*/
-rsRetVal actionDbgPrint(action_t *pThis)
+static rsRetVal actionDbgPrint(action_t *pThis)
{
DEFiRet;
char *sz;
@@ -790,11 +792,12 @@ rsRetVal actionDbgPrint(action_t *pThis)
dbgprintf("\n");
dbgprintf("\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData);
dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval);
- if(pThis->eState == ACT_STATE_SUSP) {
+#if 0 // do we need this ???
+ if(getActionState(pWti, pThis) == ACT_STATE_SUSP) {
dbgprintf("\tresume next retry: %u, number retries: %d",
(unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry);
}
- dbgprintf("\tState: %s\n", getActStateName(pThis));
+#endif
dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp);
if(pThis->submitToActQ == doSubmitToActionQComplexBatch) {
sz = "slow, but feature-rich";
@@ -810,40 +813,41 @@ rsRetVal actionDbgPrint(action_t *pThis)
RETiRet;
}
+#endif
/* prepare the calling parameters for doAction()
* rgerhards, 2009-05-07
*/
static rsRetVal
-prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow)
+prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow)
{
int i;
- msg_t *pMsg;
struct json_object *json;
+ actWrkrIParams_t *iparams;
+ actWrkrInfo_t *pWrkrInfo;
DEFiRet;
- ASSERT(pAction != NULL);
- ASSERT(pElem != NULL);
-
- pMsg = pElem->pMsg;
+ pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]),
- &pElem->staticLenStrings[i], ttNow));
- pElem->staticActParams[i] = pElem->staticActStrings[i];
+ CHKiRet(wtiNewIParam(pWti, pAction, &iparams));
+ iparams->msgFlags = pMsg->msgFlags;
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(iparams->staticActStrings[i]),
+ &iparams->staticLenStrings[i], ttNow));
+ iparams->staticActParams[i] = iparams->staticActStrings[i];
break;
case ACT_ARRAY_PASSING:
- CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow));
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow));
break;
case ACT_MSG_PASSING:
- pElem->staticActParams[i] = (void*) pMsg;
+ pWrkrInfo->staticActParams[i] = (void*) pMsg;
break;
case ACT_JSON_PASSING:
CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow));
- pElem->staticActParams[i] = (void*) json;
+ pWrkrInfo->staticActParams[i] = (void*) json;
break;
default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n",
(int) pAction->eParamPassing);
@@ -857,110 +861,95 @@ finalize_it:
}
-/* free a batches ressources, but not string buffers (because they will
- * most probably be reused). String buffers are only deleted upon final
- * destruction of the batch.
- * This function here must be called only when the batch is actually no
- * longer used, also not for retrying actions or such. It invalidates
- * buffers.
- * rgerhards, 2010-12-17
- */
-static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
+static void
+releaseDoActionParams(action_t *pAction, wti_t *pWti)
{
int jArr;
- int i, j;
- batch_obj_t *pElem;
+ int j;
+ actWrkrInfo_t *pWrkrInfo;
uchar ***ppMsgs;
- DEFiRet;
-
- ASSERT(pAction != NULL);
if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING)
goto done; /* we need to do nothing with these types! */
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- pElem = &(pBatch->pElem[i]);
- if(batchIsValidElem(pBatch, i)) {
- switch(pAction->eParamPassing) {
- case ACT_ARRAY_PASSING:
- ppMsgs = (uchar***) pElem->staticActParams;
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
- if(((uchar**)ppMsgs)[j] != NULL) {
- jArr = 0;
- while(ppMsgs[j][jArr] != NULL) {
- d_free(ppMsgs[j][jArr]);
- ppMsgs[j][jArr] = NULL;
- ++jArr;
- }
- d_free(((uchar**)ppMsgs)[j]);
- ((uchar**)ppMsgs)[j] = NULL;
- }
- }
- break;
- case ACT_JSON_PASSING:
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
- json_object_put((struct json_object*)
- pElem->staticActParams[j]);
- pElem->staticActParams[j] = NULL;
+ pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
+ switch(pAction->eParamPassing) {
+ case ACT_ARRAY_PASSING:
+ ppMsgs = (uchar***) pWrkrInfo->staticActParams;
+ for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ if(((uchar**)ppMsgs)[j] != NULL) {
+ jArr = 0;
+ while(ppMsgs[j][jArr] != NULL) {
+ free(ppMsgs[j][jArr]);
+ ppMsgs[j][jArr] = NULL;
+ ++jArr;
}
- break;
- case ACT_STRING_PASSING:
- case ACT_MSG_PASSING:
- /* can never happen, just to keep compiler happy! */
- break;
+ free(((uchar**)ppMsgs)[j]);
+ ((uchar**)ppMsgs)[j] = NULL;
}
}
+ break;
+ case ACT_JSON_PASSING:
+ for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ json_object_put((struct json_object*)
+ pWrkrInfo->staticActParams[j]);
+ pWrkrInfo->staticActParams[j] = NULL;
+ }
+ break;
+ case ACT_STRING_PASSING:
+ case ACT_MSG_PASSING:
+ /* can never happen, just to keep compiler happy! */
+ break;
}
-done: RETiRet;
+done: return;
}
/* call the DoAction output plugin entry point
* rgerhards, 2008-01-28
*/
-rsRetVal
-actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
+static rsRetVal
+actionCallDoAction(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti)
{
DEFiRet;
ASSERT(pThis != NULL);
- ISOBJ_TYPE_assert(pMsg, msg);
DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n",
- getActStateName(pThis), pThis->iActionNbr);
+ getActStateName(pThis, pWti), pThis->iActionNbr);
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
pThis->bHadAutoCommit = 0;
- iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags,
+ iRet = pThis->pMod->mod.om.doAction(actParams, msgFlags,
pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionCommitted(pThis);
- pThis->iResumeOKinRow = 0; /* we had a successful call! */
+ actionCommitted(pThis, pWti);
+ setActionResumeInRow(pWti, pThis, 0);
break;
case RS_RET_DEFER_COMMIT:
- pThis->iResumeOKinRow = 0; /* we had a successful call! */
+ setActionResumeInRow(pWti, pThis, 0);
/* we are done, action state remains the same */
break;
case RS_RET_PREVIOUS_COMMITTED:
/* action state remains the same, but we had a commit. */
pThis->bHadAutoCommit = 1;
- pThis->iResumeOKinRow = 0; /* we had a successful call! */
+ setActionResumeInRow(pWti, pThis, 0);
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
break;
default:/* permanent failure of this message - no sense in retrying. This is
* not yet handled (but easy TODO)
*/
FINALIZE;
}
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
@@ -971,70 +960,70 @@ finalize_it:
* this readies the action and then calls doAction()
* rgerhards, 2008-01-28
*/
-static rsRetVal
-actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
+rsRetVal
+actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
- ASSERT(pThis != NULL);
- ISOBJ_TYPE_assert(pMsg, msg);
-
CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
- if(pThis->eState == ACT_STATE_ITX)
- CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti));
+ if(getActionState(pWti, pThis) == ACT_STATE_ITX)
+ CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti));
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
}
-/* finish processing a batch. Most importantly, that means we commit if we
- * need to do so.
- * rgerhards, 2008-01-28
- */
+/* Commit try committing (do not handle retry processing and such) */
static rsRetVal
-finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
+actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
{
- int i;
+ actWrkrInfo_t *wrkrInfo;
+ actWrkrIParams_t *iparamCurr, *iparamDel;
DEFiRet;
- ASSERT(pThis != NULL);
-
- if(pThis->eState == ACT_STATE_RDY) {
- /* we just need to flag the batch as commited */
- FINALIZE; /* nothing to do */
+ wrkrInfo = &(pWti->actWrkrInfo[pThis->iActionNbr]);
+ dbgprintf("DDDD: actionCommit: action %d, root %p\n", pThis->iActionNbr, wrkrInfo->iparamRoot);
+ if(wrkrInfo->iparamRoot != NULL) {
+ iparamCurr = wrkrInfo->iparamRoot;
+ while(iparamCurr != NULL) {
+ iRet = actionProcessMessage(pThis, iparamCurr->msgFlags,
+ iparamCurr->staticActParams,
+ pbShutdownImmediate, pWti);
+ releaseDoActionParams(pThis, pWti);
+ iparamDel = iparamCurr;
+ iparamCurr = iparamCurr->next;
+ free(iparamDel); // TODO: memleak strings!
+ }
+ wrkrInfo->iparamLast = NULL;
}
- CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti));
- if(pThis->eState == ACT_STATE_ITX) {
+ CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
+ if(getActionState(pWti, pThis) == ACT_STATE_ITX) {
+dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr);
iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
- actionCommitted(pThis);
- /* flag messages as committed */
- for(i = 0 ; i < pBatch->nElem ; ++i) {
- batchSetElemState(pBatch, i, BATCH_STATE_COMM);
- pBatch->pElem[i].bPrevWasSuspended = 0; /* we had success! */
- }
+ actionCommitted(pThis, pWti);
break;
case RS_RET_SUSPENDED:
- actionRetry(pThis);
+ actionRetry(pThis, pWti);
break;
case RS_RET_DISABLE_ACTION:
- actionDisable(pThis);
+ actionDisable(pThis, pWti);
break;
case RS_RET_DEFER_COMMIT:
DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT "
"- ignored\n");
- actionCommitted(pThis);
+ actionCommitted(pThis, pWti);
break;
case RS_RET_PREVIOUS_COMMITTED:
DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED "
"- ignored\n");
- actionCommitted(pThis);
+ actionCommitted(pThis, pWti);
break;
default:/* permanent failure of this message - no sense in retrying. This is
* not yet handled (but easy TODO)
@@ -1042,241 +1031,76 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
FINALIZE;
}
}
- iRet = getReturnCode(pThis);
+ iRet = getReturnCode(pThis, pWti);
finalize_it:
RETiRet;
}
-
-/* try to submit a partial batch of elements.
- * rgerhards, 2009-05-12
- */
-static inline rsRetVal
-tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti)
-{
- int i;
- int iElemProcessed;
- int iCommittedUpTo;
- msg_t *pMsg;
- rsRetVal localRet;
- DEFiRet;
-
- assert(pBatch != NULL);
- assert(pnElem != NULL);
-
- i = pBatch->iDoneUpTo; /* all messages below that index are processed */
- iElemProcessed = 0;
- iCommittedUpTo = i;
- DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
- while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
- if(*(pBatch->pbShutdownImmediate))
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- /* NOTE: do NOT extend the filter below! Anything else must be done on the
- * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
- */
- if(batchIsValidElem(pBatch, i)) {
- pMsg = pBatch->pElem[i].pMsg;
- localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
- pBatch->pbShutdownImmediate, pWti);
- DBGPRINTF("action %p call returned %d\n", pAction, localRet);
- /* Note: we directly modify the batch object state, because we know that
- * wo do not overwrite BATCH_STATE_DISC indicators!
- */
- if(localRet == RS_RET_OK) {
- /* mark messages as committed */
- while(iCommittedUpTo <= i) {
- pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
- ++iCommittedUpTo;
- //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
- }
- } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
- /* mark messages as committed */
- while(iCommittedUpTo < i) {
- pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
- ++iCommittedUpTo;
- //pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM;
- }
- pBatch->eltState[i] = BATCH_STATE_SUB;
- } else if(localRet == RS_RET_DEFER_COMMIT) {
- pBatch->eltState[i] = BATCH_STATE_SUB;
- } else if(localRet == RS_RET_DISCARDMSG) {
- pBatch->eltState[i] = BATCH_STATE_DISC;
- } else {
- dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n",
- localRet, *pnElem, iCommittedUpTo);
- iRet = localRet;
- FINALIZE;
- }
- }
- ++i;
- ++iElemProcessed;
- }
-
-finalize_it:
- if(pBatch->iDoneUpTo != iCommittedUpTo) {
- pBatch->iDoneUpTo = iCommittedUpTo;
- }
- RETiRet;
-}
-
-/* submit a batch for actual action processing.
- * The first nElem elements are processed. This function calls itself
- * recursively if it needs to handle errors.
- * Note: we don't need the number of the first message to be processed as a parameter,
- * because this is kept track of inside the batch itself (iDoneUpTo).
- * rgerhards, 2009-05-12
- */
static rsRetVal
-submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti)
+actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
{
- int i;
- int bDone;
- rsRetVal localRet;
- int wasDoneTo;
DEFiRet;
-
- assert(pBatch != NULL);
-
- DBGPRINTF("submitBatch: enter, nElem %d\n", nElem);
- wasDoneTo = pBatch->iDoneUpTo;
- bDone = 0;
- do {
- localRet = tryDoAction(pAction, pBatch, &nElem, pWti);
- if(localRet == RS_RET_FORCE_TERM) {
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- }
- if( localRet == RS_RET_OK
- || localRet == RS_RET_PREVIOUS_COMMITTED
- || localRet == RS_RET_DEFER_COMMIT) {
- /* try commit transaction, once done, we can simply do so as if
- * that return state was returned from tryDoAction().
- */
- localRet = finishBatch(pAction, pBatch, pWti);
- }
-
- if( localRet == RS_RET_OK
- || localRet == RS_RET_PREVIOUS_COMMITTED
- || localRet == RS_RET_DEFER_COMMIT) {
- bDone = 1;
- } else if(localRet == RS_RET_SUSPENDED) {
- DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n");
- /* do nothing, this will retry the full batch */
- } else if(localRet == RS_RET_ACTION_FAILED) {
- /* in this case, everything not yet committed is BAD */
- for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) {
- if( pBatch->eltState[i] != BATCH_STATE_DISC
- && pBatch->eltState[i] != BATCH_STATE_COMM ) {
- pBatch->eltState[i] = BATCH_STATE_BAD;
- pBatch->pElem[i].bPrevWasSuspended = 1;
- STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail);
- }
- }
- bDone = 1;
- } else {
- if(nElem == 1) {
- batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD);
- bDone = 1;
- } else {
- /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
- DBGPRINTF("submitBatch recursing trying to find and exclude the culprit "
- "for iRet %d\n", localRet);
- submitBatch(pAction, pBatch, nElem / 2, pWti);
- submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti);
- bDone = 1;
- }
- }
- } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */
-
- if(*(pBatch->pbShutdownImmediate))
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
-
-finalize_it:
- RETiRet;
-}
-
-
-/* copy "active" array of batch, as we need to modify it. The caller
- * must make sure the new array is freed and the orginal batch
- * pointer is restored (thus the caller must save it). If active
- * is currently NULL, this is properly handled.
- * Note: the batches active pointer is modified, so it must be
- * saved BEFORE calling this function!
- * rgerhards, 2012-09-12
- */
-static rsRetVal
-copyActive(batch_t *pBatch)
-{
- sbool *active;
- DEFiRet;
-
- CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
- if(pBatch->active == NULL)
- memset(active, 1, batchNumMsgs(pBatch));
- else
- memcpy(active, pBatch->active, batchNumMsgs(pBatch));
- pBatch->active = active;
-finalize_it:
+// TODO: #warning do we really need to return something?
+ /* even more TODO:
+ This is the place where retry processing needs to go in. If the action
+ permanently fails, we should - as a new feature - add the capability to
+ write an error file. This is already done be omelasticsearch, and IMHO
+ pretty useful.
+ For the time being, I do NOT implement all of this (not even retry!)
+ as I want to get the rest of the engine to SISD (non-SIMD ;)) so that
+ I know any potential suprises and complications that arise out of this.
+ When this is done, I can come back here and complete this work. Obviously,
+ many features do not work in the mean time (but it is not planned to release
+ any of these partial implementations).
+ rgerhards, 2013-11-04
+ */
+ iRet = actionTryCommit(pThis, pWti, pbShutdownImmediate);
RETiRet;
}
-/* The following function prepares a batch for processing, that it is
- * reinitializes batch states, generates strings and does everything else
- * that needs to be done in order to make the batch ready for submission to
- * the actual output module. Note that we look at the precomputed
- * filter OK condition and process only those messages, that actually matched
- * the filter.
- * rgerhards, 2010-06-14
- */
-static inline rsRetVal
-prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr)
+/* Commit all active transactions in *DIRECT mode* */
+void
+actionCommitAllDirect(wti_t *pWti)
{
int i;
- batch_obj_t *pElem;
- struct syslogTime ttNow;
- DEFiRet;
-
- /* indicate we have not yet read the date */
- ttNow.year = 0;
+ action_t *pAction;
- pBatch->iDoneUpTo = 0;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- pElem = &(pBatch->pElem[i]);
- if(batchIsValidElem(pBatch, i)) {
- pBatch->eltState[i] = BATCH_STATE_RDY;
- if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) {
- /* make sure we have our copy of "active" array */
- if(!*bMustRestoreActivePtr) {
- *activeSave = pBatch->active;
- copyActive(pBatch);
- }
- pBatch->active[i] = RSFALSE;
- }
- }
+ for(i = 0 ; i < iActionNbr ; ++i) {
+ dbgprintf("DDDD: actionCommitAll: action %d, state %u, root %p\n",
+ i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot);
+ pAction = pWti->actWrkrInfo[i].pAction;
+ if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT)
+ actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pWti->pbShutdownImmediate);
}
- RETiRet;
}
-
-/* receive a batch and process it. This includes retry handling.
- * rgerhards, 2009-05-12
+/* process a single message. This is both called if we run from the
+ * cosumer side of an action queue as well as directly from the main
+ * queue thread if the action queue is set to "direct".
*/
-static inline rsRetVal
-processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti)
+static rsRetVal
+processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate)
{
DEFiRet;
- assert(pBatch != NULL);
- CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti));
- iRet = finishBatch(pAction, pBatch, pWti);
+dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg);
+ // TODO: check error return states!
+ iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow);
+ if(pAction->eParamPassing == ACT_STRING_PASSING) {
+ pWti->actWrkrInfo[pAction->iActionNbr].pAction = pAction;
+ dbgprintf("DDDD: action %d is string passing - executing in commit phase\n", pAction->iActionNbr);
+ FINALIZE;
+ }
+ iRet = actionProcessMessage(pAction, pMsg->msgFlags,
+ pWti->actWrkrInfo[pAction->iActionNbr].staticActParams,
+ pbShutdownImmediate, pWti);
+ releaseDoActionParams(pAction, pWti);
finalize_it:
RETiRet;
}
-
/* receive an array of to-process user pointers and submit them
* for processing.
* rgerhards, 2009-04-22
@@ -1284,41 +1108,30 @@ finalize_it:
static rsRetVal
processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate)
{
- int *pbShutdownImmdtSave;
- sbool *activeSave;
- int bMustRestoreActivePtr = 0;
- rsRetVal localRet;
action_t *pAction = (action_t*) pVoid;
+ msg_t *pMsg;
+ int i;
+ struct syslogTime ttNow;
DEFiRet;
- assert(pBatch != NULL);
-
- if(pbShutdownImmediate != NULL) {
- pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
- pBatch->pbShutdownImmediate = pbShutdownImmediate;
+ if(pbShutdownImmediate == NULL) {
+ pbShutdownImmediate = pWti->pbShutdownImmediate;
}
- CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr));
- iRet = processAction(pAction, pBatch, pWti);
-
- /* even if processAction failed, we need to release the batch (else we
- * have a memory leak). So we do this first, and then check if we need to
- * return an error code. If so, the code from processAction has priority.
- * rgerhards, 2010-12-17
- */
- localRet = releaseBatch(pAction, pBatch);
+ /* indicate we have not yet read the date */
+ ttNow.year = 0;
- if(iRet == RS_RET_OK)
- iRet = localRet;
-
- if(bMustRestoreActivePtr) {
- free(pBatch->active);
- pBatch->active = activeSave;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) {
+ if(batchIsValidElem(pBatch, i)) {
+ pMsg = pBatch->pElem[i].pMsg;
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate);
+ // TODO: we must refactor this! flag messages as committed
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+ }
}
-finalize_it:
- if(pbShutdownImmediate != NULL)
- pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
+ iRet = actionCommit(pAction, pWti, pbShutdownImmediate);
+dbgprintf("DDDD: processBatchMain - end\n");
RETiRet;
}
@@ -1381,11 +1194,12 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
* rgerhards, 2010-06-08
*/
static inline rsRetVal
-doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti)
+doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
DEFiRet;
- if(pAction->eState == ACT_STATE_DIED) {
+ // TODO: bug? Isn't that supposed to be checked in direct mode as well???
+ if(getActionState(pWti, pAction) == ACT_STATE_DIED) {
DBGPRINTF("action %p died, do NOT execute\n", pAction);
FINALIZE;
}
@@ -1464,7 +1278,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = doSubmitToActionQ(pAction, pMsg, pWti);
+ iRet = doSubmitToActionQ(pAction, pWti, pMsg);
finalize_it:
RETiRet;
@@ -1475,12 +1289,10 @@ finalize_it:
* pthread_cleanup_push() POSIX macro...
*/
static inline rsRetVal
-doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti)
+doActionCallAction(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
- msg_t *pMsg;
DEFiRet;
- pMsg = pBatch->pElem[idxBtch].pMsg;
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
/* don't output marks to recently written outputs */
@@ -1493,13 +1305,6 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti)
iRet = actionWriteToAction(pAction, pMsg, pWti);
finalize_it:
- /* we need to update the batch to handle failover processing correctly */
- if(iRet == RS_RET_OK) {
- pBatch->pElem[idxBtch].bPrevWasSuspended = 0;
- } else if(iRet == RS_RET_ACTION_FAILED) {
- pBatch->pElem[idxBtch].bPrevWasSuspended = 1;
- }
-
RETiRet;
}
@@ -1518,7 +1323,8 @@ DEFFUNC_llExecFunc(doActivateActions)
errmsg.LogError(0, localRet, "file prefix (work directory?) "
"is missing");
}
- actionDisable(pThis);
+#warning think how to handle this:
+ //actionDisable(pThis);
}
DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod),
pThis, pThis->pQueue);
@@ -1550,113 +1356,56 @@ activateActions(void)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
time_t now = 0;
+ int doProcess = 1;
time_t lastAct;
- int i;
- sbool *activeSave;
DEFiRet;
- activeSave = pBatch->active;
- copyActive(pBatch);
-
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i])
- continue;
- if(now == 0) {
- now = datetime.GetTime(NULL); /* good time call - the only one done */
- }
- /* CAS loop, we write back a bit early, but that's OK... */
- /* we use reception time, not dequeue time - this is considered more appropriate and
- * also faster ;) -- rgerhards, 2008-09-17 */
- do {
- lastAct = pAction->f_time;
- if(pBatch->pElem[i].pMsg->msgFlags & MARK) {
- if((now - lastAct) < MarkInterval / 2) {
- pBatch->active[i] = 0;
- DBGPRINTF("batch item %d: action was recently called, ignoring "
- "mark message\n", i);
- break; /* do not update timestamp for non-written mark messages */
- }
+ if(now == 0) { // TODO: do in caller!
+ now = datetime.GetTime(NULL); /* good time call - the only one done */
+ }
+ /* CAS loop, we write back a bit early, but that's OK... */
+ /* we use reception time, not dequeue time - this is considered more appropriate and
+ * also faster ;) -- rgerhards, 2008-09-17 */
+ do {
+ lastAct = pAction->f_time;
+ if(pMsg->msgFlags & MARK) {
+ if((now - lastAct) < MarkInterval / 2) {
+ doProcess = 0;
+ DBGPRINTF("action was recently called, ignoring mark message\n");
+ break; /* do not update timestamp for non-written mark messages */
}
- } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
- pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0);
- if(pBatch->active[i]) {
- DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n",
- i, module.GetStateName(pAction->pMod));
}
- }
-
- iRet = doSubmitToActionQBatch(pAction, pBatch, pWti);
+ } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
+ pMsg->ttGenTime, &pAction->mutCAS) == 0);
- free(pBatch->active);
- pBatch->active = activeSave;
+ if(doProcess) {
+ DBGPRINTF("Called action(NotAllMark), processing via '%s'\n",
+ module.GetStateName(pAction->pMod));
+ iRet = doSubmitToActionQBatch(pAction, pWti, pMsg);
+ }
RETiRet;
}
-static inline void
-countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
-{
- int i;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( batchIsValidElem(pBatch, i)) {
- STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
- }
- }
-}
-
/* enqueue a batch in direct mode. We have put this into its own function just to avoid
* cluttering the actual submit function.
* rgerhards, 2011-06-16
*/
static inline rsRetVal
-doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
- sbool bNeedSubmit;
- sbool *activeSave;
- int i;
+int pbShutdownImmediate = 0; // TODO: implement
+dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg);
+ struct syslogTime ttNow;
DEFiRet;
-
- activeSave = pBatch->active;
- copyActive(pBatch);
-
- /* note: for direct mode, we need to adjust the filter property. For non-direct
- * this is not necessary, because in that case we enqueue only what actually needs
- * to be processed.
- */
- if(pAction->bExecWhenPrevSusp) {
- bNeedSubmit = 0;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if(!pBatch->pElem[i].bPrevWasSuspended) {
- DBGPRINTF("action enq stage: change active to 0 due to "
- "failover case in elem %d\n", i);
- pBatch->active[i] = 0;
- }
- if(batchIsValidElem(pBatch, i)) {
- STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
- bNeedSubmit = 1;
- }
- DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, i, batchIsValidElem(pBatch, i), pBatch->eltState[i],
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- }
- if(bNeedSubmit) {
- /* note: stats were already computed above */
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
- } else {
- DBGPRINTF("no need to submit batch, all invalid\n");
- }
- } else {
- if(GatherStats)
- countStatsBatchEnq(pAction, pBatch);
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
- }
-
- free(pBatch->active);
- pBatch->active = activeSave;
+ if(GatherStats)
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
+ ttNow.year = 0;
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate);
RETiRet;
}
@@ -1666,72 +1415,38 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
- int i;
DEFiRet;
- DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
- iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti);
+ iRet = doQueueEnqObjDirectBatch(pAction, pWti, pMsg);
} else {/* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- if( batchIsValidElem(pBatch, i)
- && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
- doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti);
- }
- }
+ doSubmitToActionQ(pAction, pWti, pMsg);
}
RETiRet;
}
-
-/* Helper to submit a batch of actions to the engine. Note that we have rather
- * complicated processing here, so we need to do this one message after another.
- * rgerhards, 2010-06-23
- */
-static inline rsRetVal
-helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
-{
- int i;
- DEFiRet;
-
- DBGPRINTF("Called action %p (complex case), logging to %s\n",
- pAction, module.GetStateName(pAction->pMod));
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- if( batchIsValidElem(pBatch, i)
- && ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
- doActionCallAction(pAction, pBatch, i, pWti);
- }
- }
-
- RETiRet;
-}
-
/* Call configured action, most complex case with all features supported (and thus slow).
* rgerhards, 2010-06-08
*/
#pragma GCC diagnostic ignored "-Wempty-body"
static rsRetVal
-doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
DEFiRet;
d_pthread_mutex_lock(&pAction->mutAction);
-dbgprintf("DDDD: locked mutAction\n");
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction);
- iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti);
+ DBGPRINTF("Called action %p (complex case), logging to %s\n",
+ pAction, module.GetStateName(pAction->pMod));
+ doActionCallAction(pAction, pWti, pMsg);
d_pthread_mutex_unlock(&pAction->mutAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */
@@ -1876,11 +1591,11 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData,
pAction->pMod = pMod;
pAction->pModData = pModData;
- /* check if the module is compatible with select features (currently no such features exist) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
- if(bSuspended)
- actionSuspend(pAction);
+// TODO #warning we need to look at the following
+ // Probably the core init needs to be done during createWrkrInstance()
+ //if(bSuspended)
+ // actionSuspend(pAction, pWti);
CHKiRet(actionConstructFinalize(pAction, lst));
@@ -1975,7 +1690,6 @@ actionNewInst(struct nvlst *lst, action_t **ppAction)
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* check if the module is compatible with select features
* (currently no such features exist) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
loadConf->actions.nbrActions++; /* one more active action! */
}
*ppAction = pAction;
diff --git a/action.h b/action.h
index 5c88b052..666542ac 100644
--- a/action.h
+++ b/action.h
@@ -35,15 +35,6 @@
extern int glbliActionResumeRetryCount;
-typedef enum {
- ACT_STATE_DIED = 0, /* action permanently failed and now disabled - MUST BE ZERO! */
- ACT_STATE_RDY = 1, /* action ready, waiting for new transaction */
- ACT_STATE_ITX = 2, /* transaction active, waiting for new data or commit */
- ACT_STATE_COMM = 3, /* transaction finished (a transient state) */
- ACT_STATE_RTRY = 4, /* failure occured, trying to restablish ready state */
- ACT_STATE_SUSP = 5 /* suspended due to failure (return fail until timeout expired) */
-} action_state_t;
-
/* the following struct defines the action object data structure
*/
struct action_s {
@@ -55,13 +46,10 @@ struct action_s {
sbool bExecWhenPrevSusp;/* execute only when previous action is suspended? */
sbool bWriteAllMarkMsgs;/* should all mark msgs be written (not matter how recent the action was executed)? */
int iSecsExecOnceInterval; /* if non-zero, minimum seconds to wait until action is executed again */
- action_state_t eState; /* current state of action */
sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */
time_t ttResumeRtry; /* when is it time to retry the resume? */
- int iResumeOKinRow; /* number of times in a row that resume said OK with an immediate failure following */
int iResumeInterval;/* resume interval for this action */
int iResumeRetryCount;/* how often shall we retry a suspended action? (-1 --> eternal) */
- int iNbrResRtry; /* number of retries since last suspend */
int iNbrNoExec; /* number of matches that did not yet yield to an exec */
int iExecEveryNthOccur;/* execute this action only every n-th occurence (with n=0,1 -> always) */
int iExecEveryNthOccurTO;/* timeout for n-th occurence feature */
@@ -69,7 +57,7 @@ struct action_s {
struct modInfo_s *pMod;/* pointer to output module handling this selector */
void *pModData; /* pointer to module data - content is module-specific */
sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */
- rsRetVal (*submitToActQ)(action_t *, batch_t *, wti_t*);/* function submit message to action queue */
+ rsRetVal (*submitToActQ)(action_t *, wti_t*, msg_t*);/* function submit message to action queue */
rsRetVal (*qConstruct)(struct queue_s *pThis);
enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING = 2,
ACT_JSON_PASSING = 3}
@@ -79,7 +67,7 @@ struct action_s {
* in this order. */
qqueue_t *pQueue; /* action queue */
pthread_mutex_t mutAction; /* primary action mutex */
- uchar *pszName; /* action name (for documentation) */
+ uchar *pszName; /* action name */
DEF_ATOMIC_HELPER_MUT(mutCAS);
/* for statistics subsystem */
statsobj_t *statsobj;
@@ -93,7 +81,7 @@ struct action_s {
rsRetVal actionConstruct(action_t **ppThis);
rsRetVal actionConstructFinalize(action_t *pThis, struct nvlst *lst);
rsRetVal actionDestruct(action_t *pThis);
-rsRetVal actionDbgPrint(action_t *pThis);
+//rsRetVal actionDbgPrint(action_t *pThis);
rsRetVal actionSetGlobalResumeInterval(int iNewVal);
rsRetVal actionDoAction(action_t *pAction);
rsRetVal actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t*);
@@ -103,6 +91,7 @@ rsRetVal addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStr
rsRetVal activateActions(void);
rsRetVal actionNewInst(struct nvlst *lst, action_t **ppAction);
rsRetVal actionProcessCnf(struct cnfobj *o);
+void actionCommitAllDirect(wti_t *pWti);
/* external data */
extern int iActionNbr;
diff --git a/runtime/batch.h b/runtime/batch.h
index 2ec07670..5c855521 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -46,17 +46,6 @@ typedef unsigned char batch_state_t;
*/
struct batch_obj_s {
msg_t *pMsg;
- /* work variables for action processing; these are reused for each action (or block of
- * actions)
- */
- sbool bPrevWasSuspended;
- /* following are caches to save allocs if not absolutely necessary */
- uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
- /* a cache to save malloc(), if not absolutely necessary */
- void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */
- size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
- /* and the same for the message length (if used) */
- /* end action work variables */
};
/* the batch
@@ -79,9 +68,6 @@ struct batch_s {
int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */
int iDoneUpTo; /* all messages below this index have state other than RDY */
qDeqID deqID; /* ID of dequeue operation that generated this batch */
- int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
- sbool *active; /* which messages are active for processing, NULL=all */
- sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */
batch_obj_t *pElem; /* batch elements */
batch_state_t *eltState;/* state (array!) for individual objects.
NOTE: we have moved this out of batch_obj_t because we
@@ -93,24 +79,6 @@ struct batch_s {
};
-/* some inline functions (we may move this off to an object .. or not) */
-static inline void
-batchSetSingleRuleset(batch_t *pBatch, sbool val) {
- pBatch->bSingleRuleset = val;
-}
-
-/* get the batches ruleset (if we have a single ruleset) */
-static inline ruleset_t*
-batchGetRuleset(batch_t *pBatch) {
- return (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL;
-}
-
-/* get the ruleset of a specifc element of the batch (index not verified!) */
-static inline ruleset_t*
-batchElemGetRuleset(batch_t *pBatch, int i) {
- return pBatch->pElem[i].pMsg->pRuleset;
-}
-
/* get number of msgs for this batch */
static inline int
batchNumMsgs(batch_t *pBatch) {
@@ -134,8 +102,7 @@ batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) {
*/
static inline int
batchIsValidElem(batch_t *pBatch, int i) {
- return( (pBatch->eltState[i] != BATCH_STATE_DISC)
- && (pBatch->active == NULL || pBatch->active[i]));
+ return(pBatch->eltState[i] != BATCH_STATE_DISC);
}
@@ -152,7 +119,7 @@ batchFree(batch_t *pBatch) {
/* staticActParams MUST be freed immediately (if required),
* so we do not need to do that!
*/
- free(pBatch->pElem[i].staticActStrings[j]);
+ //TODO: do this in wti! free(pBatch->pElem[i].staticActStrings[j]);
}
}
free(pBatch->pElem);
@@ -171,7 +138,6 @@ batchInit(batch_t *pBatch, int maxElem) {
pBatch->maxElem = maxElem;
CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t)));
- // TODO: replace calloc by inidividual writes?
finalize_it:
RETiRet;
}
diff --git a/runtime/conf.c b/runtime/conf.c
index 2b000c60..1544e364 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -523,7 +523,6 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction)
(iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
/* here check if the module is compatible with select features
* (currently, we have no such features!) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
conf->actions.nbrActions++; /* one more active action! */
}
break;
diff --git a/runtime/queue.c b/runtime/queue.c
index 968c016e..18f62ffa 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -964,7 +964,6 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
batch_t singleBatch;
batch_obj_t batchObj;
batch_state_t batchState = BATCH_STATE_RDY;
- sbool active = 1;
int i;
DEFiRet;
@@ -985,12 +984,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.eltState = &batchState;
- singleBatch.active = &active;
iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate);
- /* delete the batch string params: TODO: create its own "class" for this */
- for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
- free(batchObj.staticActStrings[i]);
- }
msgDestruct(&pMsg);
RETiRet;
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index e39a1889..4ac0039a 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -48,6 +48,7 @@
#include "rainerscript.h"
#include "srUtils.h"
#include "modules.h"
+#include "wti.h"
#include "dirty.h" /* for main ruleset queue creation */
/* static data */
@@ -68,7 +69,7 @@ static struct cnfparamblk rspblk =
/* forward definitions */
static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti);
-static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti);
+static void scriptExec(struct cnfstmt *root, msg_t *pMsg, wti_t *pWti);
/* ---------- linked-list key handling functions (ruleset) ---------- */
@@ -161,249 +162,87 @@ finalize_it:
}
-/* This function is similar to processBatch(), but works on a batch that
- * contains rules from multiple rulesets. In this case, we can not push
- * the whole batch through the ruleset. Instead, we examine it and
- * partition it into sub-rulesets which we then push through the system.
- * rgerhards, 2010-06-15
- */
-static inline rsRetVal
-processBatchMultiRuleset(batch_t *pBatch, wti_t *pWti)
-{
- ruleset_t *currRuleset;
- batch_t snglRuleBatch;
- int i;
- int iStart; /* start index of partial batch */
- int iNew; /* index for new (temporary) batch */
- int bHaveUnprocessed; /* do we (still) have unprocessed entries? (loop term predicate) */
- DEFiRet;
-
- do {
- bHaveUnprocessed = 0;
- /* search for first unprocessed element */
- for(iStart = 0 ; iStart < pBatch->nElem && pBatch->eltState[iStart] == BATCH_STATE_DISC ; ++iStart)
- /* just search, no action */;
- if(iStart == pBatch->nElem)
- break; /* everything processed */
-
- /* prepare temporary batch */
- CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem));
- snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate;
- currRuleset = batchElemGetRuleset(pBatch, iStart);
- iNew = 0;
- for(i = iStart ; i < pBatch->nElem ; ++i) {
- if(batchElemGetRuleset(pBatch, i) == currRuleset) {
- /* for performance reasons, we copy only those members that we actually need */
- snglRuleBatch.pElem[iNew].pMsg = pBatch->pElem[i].pMsg;
- snglRuleBatch.eltState[iNew] = pBatch->eltState[i];
- ++iNew;
- /* We indicate the element also as done, so it will not be processed again */
- pBatch->eltState[i] = BATCH_STATE_DISC;
- } else {
- bHaveUnprocessed = 1;
- }
- }
- snglRuleBatch.nElem = iNew; /* was left just right by the for loop */
- batchSetSingleRuleset(&snglRuleBatch, 1);
- /* process temp batch */
- processBatch(&snglRuleBatch, pWti);
- batchFree(&snglRuleBatch);
- } while(bHaveUnprocessed == 1);
-
-finalize_it:
- RETiRet;
-}
-
-/* return a new "active" structure for the batch. Free with freeActive(). */
-static inline sbool *newActive(batch_t *pBatch)
+static void
+execAct(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
{
- return malloc(sizeof(sbool) * batchNumMsgs(pBatch));
-
-}
-static inline void freeActive(sbool *active) { free(active); }
+// TODO: check here if bPrevWasSuspsended was required and, if so
+// if we actually are permitted to execute this action.
+// NOTE: this will primarily be handled by end-of-batch processing
-
-/* for details, see scriptExec() header comment! */
-/* call action for all messages with filter on */
-static rsRetVal
-execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
-{
- DEFiRet;
-dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active);
- pBatch->active = active;
- stmt->d.act->submitToActQ(stmt->d.act, pBatch, pWti);
- RETiRet;
+ DBGPRINTF("executing action %d\n", stmt->d.act->iActionNbr);
+ stmt->d.act->submitToActQ(stmt->d.act, pWti, pMsg);
}
-static rsRetVal
-execSet(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+static void
+execSet(struct cnfstmt *stmt, msg_t *pMsg)
{
- int i;
struct var result;
- DEFiRet;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->eltState[i] != BATCH_STATE_DISC
- && (active == NULL || active[i])) {
- cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pMsg);
- msgSetJSONFromVar(pBatch->pElem[i].pMsg, stmt->d.s_set.varname,
- &result);
- varDelete(&result);
- }
- }
- RETiRet;
+ cnfexprEval(stmt->d.s_set.expr, &result, pMsg);
+ msgSetJSONFromVar(pMsg, stmt->d.s_set.varname, &result);
+ varDelete(&result);
}
-static rsRetVal
-execUnset(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+static void
+execUnset(struct cnfstmt *stmt, msg_t *pMsg)
{
- int i;
- DEFiRet;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->eltState[i] != BATCH_STATE_DISC
- && (active == NULL || active[i])) {
- msgDelJSON(pBatch->pElem[i].pMsg, stmt->d.s_unset.varname);
- }
- }
- RETiRet;
+ msgDelJSON(pMsg, stmt->d.s_unset.varname);
}
-/* for details, see scriptExec() header comment! */
-/* "stop" simply discards the filtered items - it's just a (hopefully more intuitive
- * shortcut for users.
- */
static rsRetVal
-execStop(batch_t *pBatch, sbool *active)
+execCall(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
{
- int i;
- DEFiRet;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->eltState[i] != BATCH_STATE_DISC
- && (active == NULL || active[i])) {
- pBatch->eltState[i] = BATCH_STATE_DISC;
- }
- }
- RETiRet;
-}
-static rsRetVal
-execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
-{
- msg_t *pMsg;
- int i;
DEFiRet;
if(stmt->d.s_call.ruleset == NULL) {
- scriptExec(stmt->d.s_call.stmt, pBatch, active, pWti);
+ scriptExec(stmt->d.s_call.stmt, pMsg, pWti);
} else {
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- CHKmalloc(pMsg = MsgDup((msg_t*) pBatch->pElem[i].pMsg));
- DBGPRINTF("CALL: forwarding message %d to async ruleset %p\n",
- i, stmt->d.s_call.ruleset->pQueue);
- MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
- MsgSetRuleset(pMsg, stmt->d.s_call.ruleset);
- /* Note: we intentionally use submitMsg2() here, as we process messages
- * that were already run through the rate-limiter.
- */
- submitMsg2(pMsg);
- }
+ CHKmalloc(pMsg = MsgDup((msg_t*) pMsg));
+ DBGPRINTF("CALL: forwarding message to async ruleset %p\n",
+ stmt->d.s_call.ruleset->pQueue);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
+ MsgSetRuleset(pMsg, stmt->d.s_call.ruleset);
+ /* Note: we intentionally use submitMsg2() here, as we process messages
+ * that were already run through the rate-limiter.
+ */
+ submitMsg2(pMsg);
}
finalize_it:
RETiRet;
}
-/* for details, see scriptExec() header comment! */
-// save current filter, evaluate new one
-// perform then (if any message)
-// if ELSE given:
-// set new filter, inverted
-// perform else (if any messages)
-static rsRetVal
-execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
+static void
+execIf(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
{
- sbool *newAct;
- int i;
sbool bRet;
- sbool allInactive = 1;
- DEFiRet;
- newAct = newActive(pBatch);
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(*(pBatch->pbShutdownImmediate))
- FINALIZE;
- if(pBatch->eltState[i] == BATCH_STATE_DISC)
- continue; /* will be ignored in any case */
- if(active == NULL || active[i]) {
- bRet = cnfexprEvalBool(stmt->d.s_if.expr, pBatch->pElem[i].pMsg);
- allInactive = 0;
- } else
- bRet = 0;
- newAct[i] = bRet;
- DBGPRINTF("batch: item %d: expr eval: %d\n", i, bRet);
- }
-
- if(allInactive) {
- DBGPRINTF("execIf: all batch elements are inactive, holding execution\n");
- freeActive(newAct);
- FINALIZE;
- }
-
- if(stmt->d.s_if.t_then != NULL) {
- scriptExec(stmt->d.s_if.t_then, pBatch, newAct, pWti);
- }
- if(stmt->d.s_if.t_else != NULL) {
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(*(pBatch->pbShutdownImmediate))
- FINALIZE;
- if(pBatch->eltState[i] != BATCH_STATE_DISC
- && (active == NULL || active[i]))
- newAct[i] = !newAct[i];
- }
- scriptExec(stmt->d.s_if.t_else, pBatch, newAct, pWti);
+ bRet = cnfexprEvalBool(stmt->d.s_if.expr, pMsg);
+ DBGPRINTF("if condition result is %d\n", bRet);
+ if(bRet) {
+ if(stmt->d.s_if.t_then != NULL)
+ scriptExec(stmt->d.s_if.t_then, pMsg, pWti);
+ } else {
+ if(stmt->d.s_if.t_else != NULL)
+ scriptExec(stmt->d.s_if.t_else, pMsg, pWti);
}
- freeActive(newAct);
-finalize_it:
- RETiRet;
}
-/* for details, see scriptExec() header comment! */
static void
-execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
+execPRIFILT(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
{
- sbool *newAct;
- msg_t *pMsg;
int bRet;
- int i;
- newAct = newActive(pBatch);
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(*(pBatch->pbShutdownImmediate))
- return;
- if(pBatch->eltState[i] == BATCH_STATE_DISC)
- continue; /* will be ignored in any case */
- pMsg = pBatch->pElem[i].pMsg;
- if(active == NULL || active[i]) {
- if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) ||
- ((stmt->d.s_prifilt.pmask[pMsg->iFacility]
- & (1<<pMsg->iSeverity)) == 0) )
- bRet = 0;
- else
- bRet = 1;
- } else
- bRet = 0;
- newAct[i] = bRet;
- DBGPRINTF("batch: item %d PRIFILT %d\n", i, newAct[i]);
- }
-
- if(stmt->d.s_prifilt.t_then != NULL) {
- scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct, pWti);
- }
- if(stmt->d.s_prifilt.t_else != NULL) {
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(*(pBatch->pbShutdownImmediate))
- return;
- if(pBatch->eltState[i] != BATCH_STATE_DISC
- && (active == NULL || active[i]))
- newAct[i] = !newAct[i];
- }
- scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct, pWti);
+ if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) ||
+ ((stmt->d.s_prifilt.pmask[pMsg->iFacility]
+ & (1<<pMsg->iSeverity)) == 0) )
+ bRet = 0;
+ else
+ bRet = 1;
+
+ DBGPRINTF("PRIFILT condition result is %d\n", bRet);
+ if(bRet) {
+ if(stmt->d.s_prifilt.t_then != NULL)
+ scriptExec(stmt->d.s_prifilt.t_then, pMsg, pWti);
+ } else {
+ if(stmt->d.s_prifilt.t_else != NULL)
+ scriptExec(stmt->d.s_prifilt.t_else, pMsg, pWti);
}
- freeActive(newAct);
}
@@ -498,79 +337,63 @@ done:
return bRet;
}
-/* for details, see scriptExec() header comment! */
static void
-execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active, wti_t *pWti)
+execPROPFILT(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
{
- sbool *thenAct;
sbool bRet;
- int i;
- thenAct = newActive(pBatch);
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(*(pBatch->pbShutdownImmediate))
- return;
- if(pBatch->eltState[i] == BATCH_STATE_DISC)
- continue; /* will be ignored in any case */
- if(active == NULL || active[i]) {
- bRet = evalPROPFILT(stmt, pBatch->pElem[i].pMsg);
- } else
- bRet = 0;
- thenAct[i] = bRet;
- DBGPRINTF("batch: item %d PROPFILT %d\n", i, thenAct[i]);
- }
- scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct, pWti);
- freeActive(thenAct);
+ bRet = evalPROPFILT(stmt, pMsg);
+ DBGPRINTF("PROPFILT condition result is %d\n", bRet);
+ if(bRet)
+ scriptExec(stmt->d.s_propfilt.t_then, pMsg, pWti);
}
/* The rainerscript execution engine. It is debatable if that would be better
* contained in grammer/rainerscript.c, HOWEVER, that file focusses primarily
* on the parsing and object creation part. So as an actual executor, it is
* better suited here.
- * param active: if NULL, all messages are active (to be processed), if non-null
- * this is an array of the same size as the batch. If 1, the message
- * is to be processed, otherwise not.
- * NOTE: this function must receive batches which contain a single ruleset ONLY!
* rgerhards, 2012-09-04
*/
-static rsRetVal
-scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti)
+static void
+scriptExec(struct cnfstmt *root, msg_t *pMsg, wti_t *pWti)
{
- DEFiRet;
struct cnfstmt *stmt;
for(stmt = root ; stmt != NULL ; stmt = stmt->next) {
+ if(*pWti->pbShutdownImmediate) {
+ DBGPRINTF("scriptExec: ShutdownImmediate set, "
+ "force terminating\n");
+ goto done;
+ }
if(Debug) {
- dbgprintf("scriptExec: batch of %d elements, active %p, active[0]:%d\n",
- batchNumMsgs(pBatch), active, (active == NULL ? 1 : active[0]));
cnfstmtPrintOnly(stmt, 2, 0);
}
switch(stmt->nodetype) {
case S_NOP:
break;
case S_STOP:
- execStop(pBatch, active);
+ goto done;
break;
case S_ACT:
- execAct(stmt, pBatch, active, pWti);
+ execAct(stmt, pMsg, pWti);
break;
case S_SET:
- execSet(stmt, pBatch, active);
+ execSet(stmt, pMsg);
break;
case S_UNSET:
- execUnset(stmt, pBatch, active);
+ execUnset(stmt, pMsg);
break;
case S_CALL:
- execCall(stmt, pBatch, active, pWti);
+ execCall(stmt, pMsg, pWti);
break;
case S_IF:
- execIf(stmt, pBatch, active, pWti);
+ execIf(stmt, pMsg, pWti);
break;
case S_PRIFILT:
- execPRIFILT(stmt, pBatch, active, pWti);
+ execPRIFILT(stmt, pMsg, pWti);
break;
case S_PROPFILT:
- execPROPFILT(stmt, pBatch, active, pWti);
+ execPROPFILT(stmt, pMsg, pWti);
break;
default:
dbgprintf("error: unknown stmt type %u during exec\n",
@@ -578,36 +401,39 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active, wti_t *pWti)
break;
}
}
- RETiRet;
+done: return;
}
/* Process (consume) a batch of messages. Calls the actions configured.
- * If the whole batch uses a singel ruleset, we can process the batch as
- * a whole. Otherwise, we need to process it slower, on a message-by-message
- * basis (what can be optimized to a per-ruleset basis)
- * rgerhards, 2005-10-13
*/
static rsRetVal
processBatch(batch_t *pBatch, wti_t *pWti)
{
- ruleset_t *pThis;
+ int i;
+ msg_t *pMsg;
+ ruleset_t *pRuleset;
DEFiRet;
- assert(pBatch != NULL);
-
- DBGPRINTF("processBatch: batch of %d elements must be processed\n", pBatch->nElem);
- if(pBatch->bSingleRuleset) {
- pThis = batchGetRuleset(pBatch);
- if(pThis == NULL)
- pThis = ourConf->rulesets.pDflt;
- ISOBJ_TYPE_assert(pThis, ruleset);
- CHKiRet(scriptExec(pThis->root, pBatch, NULL, pWti));
- } else {
- CHKiRet(processBatchMultiRuleset(pBatch, pWti));
+
+ DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem);
+
+ /* execution phase */
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pWti->pbShutdownImmediate) ; ++i) {
+ pMsg = pBatch->pElem[i].pMsg;
+ DBGPRINTF("processBATCH: next msg %d: %.128s\n", i, pMsg->pszRawMsg);
+ pRuleset = (pMsg->pRuleset == NULL) ? ourConf->rulesets.pDflt : pMsg->pRuleset;
+ scriptExec(pRuleset->root, pMsg, pWti);
+ // TODO: think if we need a return state of scriptExec - most probably
+ // the answer is "no", as we need to process the batch in any case!
+ // TODO: we must refactor this! flag messages as committed
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
}
-finalize_it:
- DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet);
+ /* commit phase */
+ dbgprintf("END batch execution phase, entering to commit phase\n");
+ actionCommitAllDirect(pWti);
+
+ DBGPRINTF("processBATCH: batch of %d elements has been processed\n", pBatch->nElem);
RETiRet;
}
diff --git a/runtime/wti.c b/runtime/wti.c
index df77bc19..ddffd81a 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -391,7 +391,6 @@ finalize_it:
RETiRet;
}
-
/* dummy */
rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
diff --git a/runtime/wti.h b/runtime/wti.h
index bb4f56bc..813237fc 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -1,6 +1,6 @@
/* Definition of the worker thread instance (wti) class.
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2013 Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -26,11 +26,46 @@
#include "wtp.h"
#include "obj.h"
#include "batch.h"
+#include "action.h"
+#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */
+#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */
+#define ACT_STATE_COMM 2 /* transaction finished (a transient state) */
+#define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */
+#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */
+#define ACT_STATE_DIED 7 /* action permanently failed and now disabled */
+/* note: 3 bit bit field --> highest value is 7! */
+
+/* The following structure defines immutable parameters which need to
+ * be passed as action parameters. Note that the current implementation
+ * does NOT focus on performance, but on a simple PoC in order to get
+ * things going. TODO: Once it works, revisit this code and think about
+ * an array implementation. We also need to support other passing modes
+ * as well. -- gerhards, 2013-11-04
+ */
+typedef struct actWrkrIParams {
+ struct actWrkrIParams *next;
+ int msgFlags;
+ /* following are caches to save allocs if not absolutely necessary */
+ uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
+ /* a cache to save malloc(), if not absolutely necessary */
+ unsigned staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+ /* and the same for the message length (if used) */
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+} actWrkrIParams_t;
+
typedef struct actWrkrInfo {
action_t *pAction;
void *actWrkrData;
+ uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an immediate failure following */
+ int iNbrResRtry; /* number of retries since last suspend */
+ struct {
+ unsigned actState : 3;
+ } flags;
+ actWrkrIParams_t *iparamRoot;
+ actWrkrIParams_t *iparamLast;
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /* for non-strings */
} actWrkrInfo_t;
/* the worker thread instance class */
@@ -39,6 +74,7 @@ struct wti_s {
pthread_t thrdID; /* thread ID */
int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */
sbool bAlwaysRunning; /* should this thread always run? */
+ int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
@@ -62,4 +98,79 @@ PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
+static inline uint8_t
+getActionStateByNbr(wti_t *pWti, int iActNbr)
+{
+ return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState);
+}
+
+static inline uint8_t
+getActionState(wti_t *pWti, action_t *pAction)
+{
+ return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState);
+}
+
+static inline void
+setActionState(wti_t *pWti, action_t *pAction, uint8_t newState)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState;
+}
+
+static inline uint16_t
+getActionResumeInRow(wti_t *pWti, action_t *pAction)
+{
+ return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow);
+}
+
+static inline void
+setActionResumeInRow(wti_t *pWti, action_t *pAction, uint16_t val)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val;
+}
+
+static inline void
+incActionResumeInRow(wti_t *pWti, action_t *pAction)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++;
+}
+
+static inline int
+getActionNbrResRtry(wti_t *pWti, action_t *pAction)
+{
+ return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry);
+}
+
+static inline void
+setActionNbrResRtry(wti_t *pWti, action_t *pAction, uint16_t val)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val;
+}
+
+static inline void
+incActionNbrResRtry(wti_t *pWti, action_t *pAction)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++;
+}
+
+static inline rsRetVal
+wtiNewIParam(wti_t *pWti, action_t *pAction, actWrkrIParams_t **piparams)
+{
+ actWrkrInfo_t *wrkrInfo;
+ actWrkrIParams_t *iparams;
+ DEFiRet;
+
+ wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
+dbgprintf("DDDD: adding param for action %d\n", pAction->iActionNbr);
+ CHKmalloc(iparams = calloc(1, sizeof(actWrkrIParams_t)));
+ if(wrkrInfo->iparamLast == NULL) {
+ wrkrInfo->iparamLast = wrkrInfo->iparamRoot = iparams;
+ } else {
+ wrkrInfo->iparamLast->next = iparams;
+ wrkrInfo->iparamLast = iparams;
+ }
+ *piparams = iparams;
+
+finalize_it:
+ RETiRet;
+}
#endif /* #ifndef WTI_H_INCLUDED */
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 7597b05d..c27d79b7 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -497,24 +497,19 @@ finalize_it:
* rgerhards, 2010-06-09
*/
static inline rsRetVal
-preprocessBatch(batch_t *pBatch) {
+preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) {
prop_t *ip;
prop_t *fqdn;
prop_t *localName;
prop_t *propFromHost = NULL;
prop_t *propFromHostIP = NULL;
- int bSingleRuleset;
- ruleset_t *batchRuleset; /* the ruleset used for all message inside the batch, if there is a single one */
int bIsPermitted;
msg_t *pMsg;
int i;
rsRetVal localRet;
DEFiRet;
- bSingleRuleset = 1;
- batchRuleset = (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL;
-
- for(i = 0 ; i < pBatch->nElem && !*(pBatch->pbShutdownImmediate) ; i++) {
+ for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
pMsg = pBatch->pElem[i].pMsg;
if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) {
DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n");
@@ -539,12 +534,8 @@ preprocessBatch(batch_t *pBatch) {
pBatch->eltState[i] = BATCH_STATE_DISC;
}
}
- if(pMsg->pRuleset != batchRuleset)
- bSingleRuleset = 0;
}
- batchSetSingleRuleset(pBatch, bSingleRuleset);
-
finalize_it:
if(propFromHost != NULL)
prop.Destruct(&propFromHost);
@@ -564,8 +555,8 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWt
{
DEFiRet;
assert(pBatch != NULL);
- pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */
- preprocessBatch(pBatch);
+ pWti->pbShutdownImmediate = pbShutdownImmediate;
+ preprocessBatch(pBatch, pWti->pbShutdownImmediate);
ruleset.ProcessBatch(pBatch, pWti);
//TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we
//do not have this yet and so we emulate -- 2010-06-10