From e4b3f6d287d74b34d27b4e296c33cb3f1294a58c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 22 Apr 2009 16:39:58 +0200 Subject: now batches are handed down to the actual consumer ... but the action consumer does not do anything really intelligent with them. But the DA consumer is already done, as is the main message queue consumer. --- action.c | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 03073153..8395642c 100644 --- a/action.c +++ b/action.c @@ -42,10 +42,11 @@ #include "cfsysline.h" #include "srUtils.h" #include "errmsg.h" +#include "wti.h" #include "datetime.h" /* forward definitions */ -rsRetVal actionCallDoAction(action_t *pAction, msg_t *pMsg); +rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -255,7 +256,8 @@ actionConstructFinalize(action_t *pThis) * to be run on multiple threads. So far, this is forbidden by the interface * spec. -- rgerhards, 2008-01-30 */ - CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, (rsRetVal (*)(void*,void*))actionCallDoAction)); + CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, + (rsRetVal (*)(void*,aUsrp_t*))actionCallDoActionMULTIQUEUE)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -415,6 +417,7 @@ rsRetVal actionDbgPrint(action_t *pThis) } +//MULTIQUEUE: think about these two functions below /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 */ @@ -527,6 +530,29 @@ finalize_it: #pragma GCC diagnostic warning "-Wempty-body" +/* receive an array of to-process user pointers and submit them + * for processing. + * rgerhards, 2009-04-22 + */ +rsRetVal +actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) +{ + int i; + msg_t *pMsg; + DEFiRet; + + assert(paUsrp != NULL); + + for(i = 0 ; i < paUsrp->nElem ; i++) { + pMsg = (msg_t*) paUsrp->pUsrp[i]; +dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); + CHKiRet(actionCallDoAction(pAction, pMsg)); + } +finalize_it: + RETiRet; +} + + /* call the HUP handler for a given action, if such a handler is defined. The * action mutex is locked, because the HUP handler most probably needs to modify * some internal state information. -- cgit v1.2.3 From 5c0aeae8ab1f344a022d586dc26c5d78203f7e0b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 12:50:07 +0200 Subject: added $MainMsgQueueDequeueBatchSize and $ActionQueueDequeueBatchSize configuration directives --- action.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'action.c') diff --git a/action.c b/action.c index 8395642c..be3c7556 100644 --- a/action.c +++ b/action.c @@ -64,6 +64,7 @@ static int bActionRepMsgHasMsg = 0; /* last messsage repeated... has msg fragme /* main message queue and its configuration parameters */ static queueType_t ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ static int iActionQueueSize = 1000; /* size of the main message queue above */ +static int iActionQueueDeqBatchSize = 16; /* batch size for action queues */ static int iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ static int iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ static int iActionQDiscardMark = 9800; /* begin to discard messages */ @@ -144,6 +145,7 @@ actionResetQueueParams(void) ActionQueType = QUEUETYPE_DIRECT; /* type of the main message queue above */ iActionQueueSize = 1000; /* size of the main message queue above */ + iActionQueueDeqBatchSize = 16; /* default batch size */ iActionQHighWtrMark = 800; /* high water mark for disk-assisted queues */ iActionQLowWtrMark = 200; /* low water mark for disk-assisted queues */ iActionQDiscardMark = 9800; /* begin to discard messages */ @@ -272,6 +274,7 @@ actionConstructFinalize(action_t *pThis) qqueueSetpUsr(pThis->pQueue, pThis); setQPROP(qqueueSetsizeOnDiskMax, "$ActionQueueMaxDiskSpace", iActionQueMaxDiskSpace); + setQPROP(qqueueSetiDeqBatchSize, "$ActionQueueDequeueBatchSize", iActionQueueDeqBatchSize); setQPROP(qqueueSetMaxFileSize, "$ActionQueueFileSize", iActionQueMaxFileSize); setQPROPstr(qqueueSetFilePrefix, "$ActionQueueFileName", pszActionQFName); setQPROP(qqueueSetiPersistUpdCnt, "$ActionQueueCheckpointInterval", iActionQPersistUpdCnt); @@ -857,6 +860,7 @@ actionAddCfSysLineHdrl(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); -- cgit v1.2.3 From 10bab38993ae6853d7e23c6f6bd44eb0ed69e001 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 27 Apr 2009 15:40:54 +0200 Subject: begin implementation of new transactional output module interface code is not complete, error cases are not handled. --- action.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index be3c7556..476e072a 100644 --- a/action.c +++ b/action.c @@ -466,8 +466,7 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); pthread_setcancelstate(iCancelStateSave, NULL); - do { - /* on first invocation, this if should never be true. We just put it at the top + do { /* on first invocation, this if should never be true. We just put it at the top * of the loop so that processing (and code) is simplified. This code is actually * triggered on the 2nd+ invocation. -- rgerhards, 2008-01-30 */ @@ -490,6 +489,10 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) if(bCallAction) { /* call configured action */ + /* MULTIQUEUE: TODO: and this now gets us in trouble. If it was suspended, we can + * assume (and must so) that the action did not succeed. So we now need to redo all + * those messages from the batch that are not yet processed. + */ iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData); if(iRet == RS_RET_SUSPENDED) { dbgprintf("Action requested to be suspended, done that.\n"); @@ -546,11 +549,15 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) assert(paUsrp != NULL); + if(pAction->pMod->mod.om.beginTransaction != NULL) + CHKiRet(pAction->pMod->mod.om.beginTransaction(pAction->pModData)); for(i = 0 ; i < paUsrp->nElem ; i++) { pMsg = (msg_t*) paUsrp->pUsrp[i]; dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); CHKiRet(actionCallDoAction(pAction, pMsg)); } + if(pAction->pMod->mod.om.endTransaction != NULL) + CHKiRet(pAction->pMod->mod.om.endTransaction(pAction->pModData)); finalize_it: RETiRet; } -- cgit v1.2.3 From 2debe89434872b68f36cc4282d8a02039847c605 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 29 Apr 2009 17:07:01 +0200 Subject: minor cleanup --- action.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'action.c') diff --git a/action.c b/action.c index 476e072a..928b30dc 100644 --- a/action.c +++ b/action.c @@ -500,7 +500,8 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) } } - } while(iRet == RS_RET_SUSPENDED && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */ + } while( iRet == RS_RET_SUSPENDED + && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */ if(iRet == RS_RET_DISABLE_ACTION) { dbgprintf("Action requested to be disabled, done that.\n"); -- cgit v1.2.3 From 68877497a131d5b7c5b1588b771a623fc0ad41c1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 7 May 2009 10:44:46 +0200 Subject: first shot at action state machine implemention (untested) I am commiting it so that the code is visible, but will no begin with the test environment. --- action.c | 462 +++++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 335 insertions(+), 127 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 928b30dc..9ad05a9f 100644 --- a/action.c +++ b/action.c @@ -45,6 +45,8 @@ #include "wti.h" #include "datetime.h" +#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ + /* forward definitions */ rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*); @@ -310,87 +312,223 @@ finalize_it: } -/* set an action back to active state -- rgerhards, 2007-08-02 + +/* set the global resume interval + */ +rsRetVal actionSetGlobalResumeInterval(int iNewVal) +{ + glbliActionResumeInterval = iNewVal; + return RS_RET_OK; +} + + +/* returns the action state name in human-readable form + * returned string must not be modified. + * rgerhards, 2009-05-07 */ -static rsRetVal actionResume(action_t *pThis) +static uchar *getActStateName(action_t *pThis) +{ + switch(pThis->eState) { + case ACT_STATE_RDY: + return (uchar*) "rdy"; + case ACT_STATE_ITX: + return (uchar*) "itx"; + case ACT_STATE_RTRY: + return (uchar*) "rtry"; + case ACT_STATE_SUSP: + return (uchar*) "susp"; + case ACT_STATE_DIED: + return (uchar*) "died"; + case ACT_STATE_COMM: + return (uchar*) "comm"; + default: + return (uchar*) "ERROR/UNKNWON"; + } +} + + +/* returns a suitable return code based on action state + * rgerhards, 2009-05-07 + */ +static rsRetVal getReturnCode(action_t *pThis) { DEFiRet; ASSERT(pThis != NULL); - pThis->bSuspended = 0; + switch(pThis->eState) { + case ACT_STATE_RDY: + iRet = RS_RET_OK; + break; + case ACT_STATE_ITX: + if(pThis->bHadAutoCommit) { + pThis->bHadAutoCommit = 0; /* auto-reset */ + iRet = RS_RET_PREVIOUS_COMMITTED; + } else { + iRet = RS_RET_DEFER_COMMIT; + } + break; + case ACT_STATE_RTRY: + iRet = RS_RET_SUSPENDED; + break; + case ACT_STATE_SUSP: + iRet = RS_RET_SUSPENDED; + break; + case ACT_STATE_DIED: + iRet = RS_RET_DISABLE_ACTION; + break; + default: + DBGPRINTF("Invalid action engine state %d, program error\n", + (int) pThis->eState); + iRet = RS_RET_ERR; + break; + } RETiRet; } -/* set the global resume interval +/* Handles the transient commit state. So far, this is + * mostly a dummy... + * rgerhards, 2007-08-02 */ -rsRetVal actionSetGlobalResumeInterval(int iNewVal) +static void actionCommitted(action_t *pThis) { - glbliActionResumeInterval = iNewVal; - return RS_RET_OK; + pThis->eState = ACT_STATE_RDY; + DBGPRINTF("Action has committed.\n"); +} + + +/* Disable action, this means it will never again be usable + * until rsyslog is reloaded. Use only as a last resort, but + * depends on output module. + * rgerhards, 2007-08-02 + */ +static void actionDisable(action_t *pThis) +{ + pThis->eState = ACT_STATE_DIED; + DBGPRINTF("Action requested to be disabled, done that.\n"); +} + + +/* Suspend action, this involves changing the acton state as well + * as setting the next retry time. + * if we have more than 10 retries, we prolong the + * retry interval. If something is really stalled, it will + * get re-tried only very, very seldom - but that saves + * CPU time. TODO: maybe a config option for that? + * rgerhards, 2007-08-02 + */ +static inline void actionSuspend(action_t *pThis, time_t ttNow) +{ + if(ttNow == NO_TIME_PROVIDED) + time(&ttNow); + pThis->eState = ACT_STATE_SUSP; + pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + DBGPRINTF("Action requested to be suspended, done that, retry=%d\n", (int) pThis->ttResumeRtry); } -/* suspend an action -- rgerhards, 2007-08-02 +/* actually do retry processing. Note that the function receives a timestamp so + * that we do not need to call the (expensive) time() API. + * Note that we do the full retry processing here, doing the configured number of + * iterations. + * rgerhards, 2009-05-07 */ -static rsRetVal actionSuspend(action_t *pThis, time_t tNow) +static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) { + int iRetries; + int iSleepPeriod; DEFiRet; ASSERT(pThis != NULL); - pThis->bSuspended = 1; - pThis->ttResumeRtry = tNow + pThis->iResumeInterval; - pThis->iNbrResRtry = 0; /* tell that we did not yet retry to resume */ + + iRetries = 0; + while(pThis->eState == ACT_STATE_RTRY) { + iRet = pThis->pMod->tryResume(pThis->pModData); + if(iRet == RS_RET_SUSPENDED) { + /* max retries reached? */ + if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { + actionSuspend(pThis, ttNow); + } else { + ++pThis->iNbrResRtry; + ++iRetries; + iSleepPeriod = pThis->iResumeInterval; + ttNow += iSleepPeriod; /* not truly exact, but sufficiently... */ + srSleep(iSleepPeriod, 0); + } + } else if(iRet == RS_RET_DISABLE_ACTION) { + actionDisable(pThis); + } + } + + if(pThis->eState == ACT_STATE_RDY) { + pThis->iNbrResRtry = 0; + } RETiRet; } /* try to resume an action -- rgerhards, 2007-08-02 - * returns RS_RET_OK if resumption worked, RS_RET_SUSPEND if the - * action is still suspended. + * changed to new action state engine -- rgerhards, 2009-05-07 */ static rsRetVal actionTryResume(action_t *pThis) { DEFiRet; - time_t ttNow; + time_t ttNow = NO_TIME_PROVIDED; ASSERT(pThis != NULL); - /* for resume handling, we must always obtain a fresh timestamp. We used - * to use the action timestamp, but in this case we will never reach a - * point where a resumption is actually tried, because the action timestamp - * is always in the past. So we can not avoid doing a fresh time() call - * here. -- rgerhards, 2009-03-18 - */ - time(&ttNow); /* cache "now" */ - - /* first check if it is time for a re-try */ - if(ttNow > pThis->ttResumeRtry) { - iRet = pThis->pMod->tryResume(pThis->pModData); - if(iRet == RS_RET_SUSPENDED) { - /* set new tryResume time */ - ++pThis->iNbrResRtry; - /* if we have more than 10 retries, we prolong the - * retry interval. If something is really stalled, it will - * get re-tried only very, very seldom - but that saves - * CPU time. TODO: maybe a config option for that? - * rgerhards, 2007-08-02 - */ - pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); + if(pThis->eState == ACT_STATE_SUSP) { + /* if we are suspended, we need to check if the timeout expired. + * for this handling, we must always obtain a fresh timestamp. We used + * to use the action timestamp, but in this case we will never reach a + * point where a resumption is actually tried, because the action timestamp + * is always in the past. So we can not avoid doing a fresh time() call + * here. -- rgerhards, 2009-03-18 + */ + time(&ttNow); /* cache "now" */ + if(ttNow > pThis->ttResumeRtry) { + pThis->eState = ACT_STATE_RTRY; /* back to retries */ } - } else { - /* it's too early, we are still suspended --> indicate this */ - iRet = RS_RET_SUSPENDED; } - if(iRet == RS_RET_OK) - actionResume(pThis); + if(pThis->eState == ACT_STATE_RTRY) { + if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ + time(&ttNow); + CHKiRet(actionDoRetry(pThis, ttNow)); + } + + DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n", + getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + +finalize_it: + RETiRet; +} + + +/* prepare an action for performing work. This involves trying to recover it, + * depending on its current state. + * rgerhards, 2009-05-07 + */ +static rsRetVal actionPrepare(action_t *pThis) +{ + DEFiRet; + + assert(pThis != NULL); + if(pThis->eState == ACT_STATE_RTRY) { + CHKiRet(actionTryResume(pThis)); + } - dbgprintf("actionTryResume: iRet: %d, next retry (if applicable): %u [now %u]\n", - iRet, (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + /* if we are now ready, we initialize the transaction and advance + * action state accordingly + */ + if(pThis->eState == ACT_STATE_RDY) { + CHKiRet(pThis->pMod->mod.om.beginTransaction(pThis->pModData)); + pThis->eState = ACT_STATE_ITX; + } +finalize_it: RETiRet; } @@ -407,12 +545,11 @@ rsRetVal actionDbgPrint(action_t *pThis) dbgprintf("\n\tInstance data: 0x%lx\n", (unsigned long) pThis->pModData); dbgprintf("\tRepeatedMsgReduction: %d\n", pThis->f_ReduceRepeated); dbgprintf("\tResume Interval: %d\n", pThis->iResumeInterval); - dbgprintf("\tSuspended: %d", pThis->bSuspended); - if(pThis->bSuspended) { - dbgprintf(" next retry: %u, number retries: %d", (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); + if(pThis->eState == ACT_STATE_SUSP) { + dbgprintf("\tresume next retry: %u, number retries: %d", + (unsigned) pThis->ttResumeRtry, pThis->iNbrResRtry); } - dbgprintf("\n"); - dbgprintf("\tDisabled: %d\n", !pThis->bEnabled); + dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); dbgprintf("\n"); @@ -420,25 +557,16 @@ rsRetVal actionDbgPrint(action_t *pThis) } -//MULTIQUEUE: think about these two functions below -/* call the DoAction output plugin entry point - * rgerhards, 2008-01-28 +/* prepare the calling parameters for doAction() + * rgerhards, 2009-05-07 */ -#pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal -actionCallDoAction(action_t *pAction, msg_t *pMsg) +static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar ***pppMsgs) { - DEFiRet; - int iRetries; + uchar **ppMsgs = *pppMsgs; int i; - int iArr; - int iSleepPeriod; - int bCallAction; - int iCancelStateSave; - uchar **ppMsgs; /* array of message pointers for doAction */ + DEFiRet; ASSERT(pAction != NULL); - /* create the array for doAction() message pointers */ if((ppMsgs = calloc(pAction->iNumTpls, sizeof(uchar *))) == NULL) { ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -456,87 +584,154 @@ actionCallDoAction(action_t *pAction, msg_t *pMsg) default:assert(0); /* software bug if this happens! */ } } - iRetries = 0; - /* We now must guard the output module against execution by multiple threads. The - * plugin interface specifies that output modules must not be thread-safe (except - * if they notify us they are - functionality not yet implemented...). - * rgerhards, 2008-01-30 - */ - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(&pAction->mutActExec); - pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - pthread_setcancelstate(iCancelStateSave, NULL); - do { /* on first invocation, this if should never be true. We just put it at the top - * of the loop so that processing (and code) is simplified. This code is actually - * triggered on the 2nd+ invocation. -- rgerhards, 2008-01-30 - */ - if(iRet == RS_RET_SUSPENDED) { - /* ok, this calls for our retry logic... */ - ++iRetries; - iSleepPeriod = pAction->iResumeInterval; - srSleep(iSleepPeriod, 0); - } - /* first check if we are suspended and, if so, retry */ - if(actionIsSuspended(pAction)) { - iRet = actionTryResume(pAction); - if(iRet == RS_RET_OK) - bCallAction = 1; - else - bCallAction = 0; - } else { - bCallAction = 1; - } - if(bCallAction) { - /* call configured action */ - /* MULTIQUEUE: TODO: and this now gets us in trouble. If it was suspended, we can - * assume (and must so) that the action did not succeed. So we now need to redo all - * those messages from the batch that are not yet processed. - */ - iRet = pAction->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pAction->pModData); - if(iRet == RS_RET_SUSPENDED) { - dbgprintf("Action requested to be suspended, done that.\n"); - actionSuspend(pAction, getActNow(pAction)); - } - } +finalize_it: + *pppMsgs = ppMsgs; + RETiRet; +} - } while( iRet == RS_RET_SUSPENDED - && (pAction->iResumeRetryCount == -1 || iRetries < pAction->iResumeRetryCount)); /* do...while! */ - if(iRet == RS_RET_DISABLE_ACTION) { - dbgprintf("Action requested to be disabled, done that.\n"); - pAction->bEnabled = 0; /* that's it... */ - } - - pthread_cleanup_pop(1); /* unlock mutex */ +/* cleanup doAction calling parameters + * rgerhards, 2009-05-07 + */ +static rsRetVal cleanupDoActionParams(action_t *pAction, uchar ***pppMsgs) +{ + uchar **ppMsgs = *pppMsgs; + int i; + int iArr; + DEFiRet; -finalize_it: - /* cleanup */ + ASSERT(pAction != NULL); for(i = 0 ; i < pAction->iNumTpls ; ++i) { if(ppMsgs[i] != NULL) { switch(pAction->eParamPassing) { case ACT_ARRAY_PASSING: iArr = 0; while(((char **)ppMsgs[i])[iArr] != NULL) - d_free(((char **)ppMsgs[i])[iArr++]); - d_free(ppMsgs[i]); + free(((char **)ppMsgs[i])[iArr++]); + free(ppMsgs[i]); break; case ACT_STRING_PASSING: - d_free(ppMsgs[i]); + free(ppMsgs[i]); break; default: assert(0); } } } - d_free(ppMsgs); - msgDestruct(&pMsg); /* we are now finished with the message */ + free(ppMsgs); + *pppMsgs = NULL; RETiRet; } -#pragma GCC diagnostic warning "-Wempty-body" +/* call the DoAction output plugin entry point + * Performance note: we build the action parameters here in this function. That + * means we do it while we hold the action look, potentially reducing concurrency + * (especially if the action queue is run in DIRECT mode). As an alternative, we + * may generate all params for the batch as whole before aquiring the action. However, + * that requires more memory, for large batches potentially a lot of memory. So for the + * time being, I am doing it here - the performance hit should be very minor and may even + * not be a hit because we may gain CPU cache locality gains with the "fewer memory" + * approach (I'd say that is rater likely). + * rgerhards, 2008-01-28 + */ +rsRetVal +actionCallDoAction(action_t *pThis, msg_t *pMsg) +{ + uchar **ppMsgs; /* array of message pointers for doAction */ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + + CHKiRet(prepareDoActionParams(pThis, pMsg, &ppMsgs)); + + pThis->bHadAutoCommit = 0; + iRet = pThis->pMod->mod.om.doAction(ppMsgs, pMsg->msgFlags, pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis); + break; + case RS_RET_DEFER_COMMIT: + /* we are done, action state remains the same */ + break; + case RS_RET_PREVIOUS_COMMITTED: + /* action state remains the same, but we had a commit. */ + pThis->bHadAutoCommit = 1; + break; + case RS_RET_SUSPENDED: + actionSuspend(pThis, NO_TIME_PROVIDED); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + FINALIZE; + } + iRet = getReturnCode(pThis); + +finalize_it: + cleanupDoActionParams(pThis, &ppMsgs); /* iRet ignored! */ + + RETiRet; +} + + +/* finish processing a batch. Most importantly, that means we commit if we + * need to do so. + * rgerhards, 2008-01-28 + */ +static rsRetVal +finishBatch(action_t *pThis) +{ + DEFiRet; + + ASSERT(pThis != NULL); + + if(pThis->eState == ACT_STATE_RDY) + FINALIZE; /* nothing to do */ + + CHKiRet(actionPrepare(pThis)); + if(pThis->eState == ACT_STATE_ITX) { + iRet = pThis->pMod->mod.om.endTransaction(pThis->pModData); + switch(iRet) { + case RS_RET_OK: + actionCommitted(pThis); + break; + case RS_RET_SUSPENDED: + actionSuspend(pThis, NO_TIME_PROVIDED); + break; + case RS_RET_DISABLE_ACTION: + actionDisable(pThis); + break; + case RS_RET_DEFER_COMMIT: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_DEFER_COMMIT " + "- ignored\n"); + actionCommitted(pThis); + break; + case RS_RET_PREVIOUS_COMMITTED: + DBGPRINTF("output plugin error: endTransaction() returns RS_RET_PREVIOUS_COMMITTED " + "- ignored\n"); + actionCommitted(pThis); + break; + default:/* permanent failure of this message - no sense in retrying. This is + * not yet handled (but easy TODO) + */ + FINALIZE; + } + } + iRet = getReturnCode(pThis); + +finalize_it: + RETiRet; +} + + +#pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. * rgerhards, 2009-04-22 @@ -545,23 +740,36 @@ rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) { int i; + int iCancelStateSave; msg_t *pMsg; DEFiRet; assert(paUsrp != NULL); - if(pAction->pMod->mod.om.beginTransaction != NULL) - CHKiRet(pAction->pMod->mod.om.beginTransaction(pAction->pModData)); + /* We now must guard the output module against execution by multiple threads. The + * plugin interface specifies that output modules must not be thread-safe (except + * if they notify us they are - functionality not yet implemented...). + * rgerhards, 2008-01-30 + */ + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(&pAction->mutActExec); + pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); + pthread_setcancelstate(iCancelStateSave, NULL); + for(i = 0 ; i < paUsrp->nElem ; i++) { pMsg = (msg_t*) paUsrp->pUsrp[i]; dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); CHKiRet(actionCallDoAction(pAction, pMsg)); + msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ } - if(pAction->pMod->mod.om.endTransaction != NULL) - CHKiRet(pAction->pMod->mod.om.endTransaction(pAction->pModData)); + iRet = finishBatch(pAction); + + pthread_cleanup_pop(1); /* unlock mutex */ + finalize_it: RETiRet; } +#pragma GCC diagnostic warning "-Wempty-body" /* call the HUP handler for a given action, if such a handler is defined. The @@ -799,8 +1007,8 @@ actionCallAction(action_t *pAction, msg_t *pMsg) * should check from time to time if affairs have improved. * rgerhards, 2007-07-24 */ - if(pAction->bEnabled == 0) { - ABORT_FINALIZE(RS_RET_OK); + if(pAction->eState == ACT_STATE_DIED) { + ABORT_FINALIZE(RS_RET_DISABLE_ACTION); } pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ @@ -983,7 +1191,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques dbgprintf("module is incompatible with RepeatedMsgReduction - turned off\n"); pAction->f_ReduceRepeated = 0; } - pAction->bEnabled = 1; /* action is enabled */ + pAction->eState = ACT_STATE_RDY; /* action is enabled */ if(bSuspended) actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */ -- cgit v1.2.3 From 9e1bb31a4dc20d79515a19c85f2a5fec6a3d0c21 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 7 May 2009 13:37:25 +0200 Subject: fixed some bugs & added testing helpers The action state machine now works correctly and has been verified a few hand-picked cases. I am missing automatted tests, though, this is not easy to achive... Anyhow, I've improved omtesting, so that it can be used in such automatted tests. --- action.c | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 86 insertions(+), 21 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 9ad05a9f..829ef1eb 100644 --- a/action.c +++ b/action.c @@ -387,14 +387,31 @@ static rsRetVal getReturnCode(action_t *pThis) } +/* set the action to a new state + * rgerhards, 2007-08-02 + */ +static inline void actionSetState(action_t *pThis, action_state_t newState) +{ + pThis->eState = newState; + DBGPRINTF("Action %p transitioned to state: %s\n", pThis, getActStateName(pThis)); +} + /* Handles the transient commit state. So far, this is * mostly a dummy... * rgerhards, 2007-08-02 */ static void actionCommitted(action_t *pThis) { - pThis->eState = ACT_STATE_RDY; - DBGPRINTF("Action has committed.\n"); + actionSetState(pThis, ACT_STATE_RDY); +} + + +/* set action to "rtry" state. + * rgerhards, 2007-08-02 + */ +static void actionRetry(action_t *pThis) +{ + actionSetState(pThis, ACT_STATE_RTRY); } @@ -405,8 +422,7 @@ static void actionCommitted(action_t *pThis) */ static void actionDisable(action_t *pThis) { - pThis->eState = ACT_STATE_DIED; - DBGPRINTF("Action requested to be disabled, done that.\n"); + actionSetState(pThis, ACT_STATE_DIED); } @@ -422,9 +438,9 @@ static inline void actionSuspend(action_t *pThis, time_t ttNow) { if(ttNow == NO_TIME_PROVIDED) time(&ttNow); - pThis->eState = ACT_STATE_SUSP; pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); - DBGPRINTF("Action requested to be suspended, done that, retry=%d\n", (int) pThis->ttResumeRtry); + actionSetState(pThis, ACT_STATE_SUSP); + DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry); } @@ -442,10 +458,15 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) ASSERT(pThis != NULL); +RUNLOG_STR("actionDoRetry():"); iRetries = 0; while(pThis->eState == ACT_STATE_RTRY) { iRet = pThis->pMod->tryResume(pThis->pModData); - if(iRet == RS_RET_SUSPENDED) { + if(iRet == RS_RET_OK) { + actionSetState(pThis, ACT_STATE_RDY); +RUNLOG_STR("tryResume succeeded"); + } else if(iRet == RS_RET_SUSPENDED) { +RUNLOG_STR("still suspended");; /* max retries reached? */ if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { actionSuspend(pThis, ttNow); @@ -479,6 +500,7 @@ static rsRetVal actionTryResume(action_t *pThis) ASSERT(pThis != NULL); +RUNLOG_STR("actionTryResume()"); if(pThis->eState == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used @@ -489,7 +511,7 @@ static rsRetVal actionTryResume(action_t *pThis) */ time(&ttNow); /* cache "now" */ if(ttNow > pThis->ttResumeRtry) { - pThis->eState = ACT_STATE_RTRY; /* back to retries */ + actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ } } @@ -515,6 +537,7 @@ static rsRetVal actionPrepare(action_t *pThis) { DEFiRet; +RUNLOG_STR("actionPrepare()"); assert(pThis != NULL); if(pThis->eState == ACT_STATE_RTRY) { CHKiRet(actionTryResume(pThis)); @@ -525,7 +548,7 @@ static rsRetVal actionPrepare(action_t *pThis) */ if(pThis->eState == ACT_STATE_RDY) { CHKiRet(pThis->pMod->mod.om.beginTransaction(pThis->pModData)); - pThis->eState = ACT_STATE_ITX; + actionSetState(pThis, ACT_STATE_ITX); } finalize_it: @@ -646,6 +669,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg) ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); + DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); CHKiRet(prepareDoActionParams(pThis, pMsg, &ppMsgs)); pThis->bHadAutoCommit = 0; @@ -662,7 +686,7 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg) pThis->bHadAutoCommit = 1; break; case RS_RET_SUSPENDED: - actionSuspend(pThis, NO_TIME_PROVIDED); + actionRetry(pThis); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -681,6 +705,29 @@ finalize_it: } +/* process a message + * this readies the action and then calls doAction() + * rgerhards, 2008-01-28 + */ +rsRetVal +actionProcessMessage(action_t *pThis, msg_t *pMsg) +{ + DEFiRet; + + ASSERT(pThis != NULL); + ISOBJ_TYPE_assert(pMsg, msg); + +RUNLOG_STR("inside actionProcessMsg()"); + CHKiRet(actionPrepare(pThis)); + if(pThis->eState == ACT_STATE_ITX) + CHKiRet(actionCallDoAction(pThis, pMsg)); + + iRet = getReturnCode(pThis); +finalize_it: + RETiRet; +} + + /* finish processing a batch. Most importantly, that means we commit if we * need to do so. * rgerhards, 2008-01-28 @@ -703,7 +750,7 @@ finishBatch(action_t *pThis) actionCommitted(pThis); break; case RS_RET_SUSPENDED: - actionSuspend(pThis, NO_TIME_PROVIDED); + actionRetry(pThis); break; case RS_RET_DISABLE_ACTION: actionDisable(pThis); @@ -731,6 +778,33 @@ finalize_it: } +/* receive an array of to-process user pointers and submit them + * for processing. + * rgerhards, 2009-04-22 + */ +rsRetVal +actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, aUsrp_t *paUsrp) +{ + int i; + msg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + assert(paUsrp != NULL); + + for(i = 0 ; i < paUsrp->nElem ; i++) { + pMsg = (msg_t*) paUsrp->pUsrp[i]; +dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); + localRet = actionProcessMessage(pAction, pMsg); + dbgprintf("action call returned %d\n", localRet); + msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ + CHKiRet(localRet); + } + iRet = finishBatch(pAction); + +finalize_it: + RETiRet; +} #pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. @@ -739,9 +813,7 @@ finalize_it: rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) { - int i; int iCancelStateSave; - msg_t *pMsg; DEFiRet; assert(paUsrp != NULL); @@ -756,17 +828,10 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); pthread_setcancelstate(iCancelStateSave, NULL); - for(i = 0 ; i < paUsrp->nElem ; i++) { - pMsg = (msg_t*) paUsrp->pUsrp[i]; -dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); - CHKiRet(actionCallDoAction(pAction, pMsg)); - msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ - } - iRet = finishBatch(pAction); + iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, paUsrp); pthread_cleanup_pop(1); /* unlock mutex */ -finalize_it: RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" -- cgit v1.2.3 From bb79e96dc300fa5a2182e7c047afb3b15c5dc870 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 12 May 2009 15:27:40 +0200 Subject: moving to a cleaner implementation of batches ... now that we know what we need from a theoretical POV. --- action.c | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 509ad749..b12eda6e 100644 --- a/action.c +++ b/action.c @@ -42,13 +42,14 @@ #include "cfsysline.h" #include "srUtils.h" #include "errmsg.h" +#include "batch.h" #include "wti.h" #include "datetime.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t*); +static rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -261,7 +262,7 @@ actionConstructFinalize(action_t *pThis) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, - (rsRetVal (*)(void*,aUsrp_t*))actionCallDoActionMULTIQUEUE)); + (rsRetVal (*)(void*, batch_t*))actionCallDoActionMULTIQUEUE)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -782,19 +783,19 @@ finalize_it: * for processing. * rgerhards, 2009-04-22 */ -rsRetVal -actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, aUsrp_t *paUsrp) +static rsRetVal +actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, batch_t *pBatch) { int i; msg_t *pMsg; rsRetVal localRet; DEFiRet; - assert(paUsrp != NULL); + assert(pBatch != NULL); - for(i = 0 ; i < paUsrp->nElem ; i++) { - pMsg = (msg_t*) paUsrp->pUsrp[i]; -dbgprintf("actionCall..MULTIQUEUE: i: %d, pMsg: %p\n", i, pMsg); + for(i = 0 ; i < pBatch->nElem ; i++) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; +dbgprintf("actionCall..MULTIQUEUE: i: %d/%d, pMsg: %p\n", i, pBatch->nElem, pMsg); localRet = actionProcessMessage(pAction, pMsg); dbgprintf("action call returned %d\n", localRet); msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ @@ -810,13 +811,13 @@ finalize_it: * for processing. * rgerhards, 2009-04-22 */ -rsRetVal -actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) +static rsRetVal +actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch) { int iCancelStateSave; DEFiRet; - assert(paUsrp != NULL); + assert(pBatch != NULL); /* We now must guard the output module against execution by multiple threads. The * plugin interface specifies that output modules must not be thread-safe (except @@ -828,7 +829,7 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, aUsrp_t *paUsrp) pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); pthread_setcancelstate(iCancelStateSave, NULL); - iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, paUsrp); + iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, pBatch); pthread_cleanup_pop(1); /* unlock mutex */ -- cgit v1.2.3 From e2b229868955a6f6a6380273314d0d90ddad1273 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 12 May 2009 17:57:04 +0200 Subject: action batch processing implemented ... passed initial tests, but of course more are needed --- action.c | 156 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 132 insertions(+), 24 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index b12eda6e..0253e1b6 100644 --- a/action.c +++ b/action.c @@ -49,7 +49,7 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -262,7 +262,7 @@ actionConstructFinalize(action_t *pThis) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, - (rsRetVal (*)(void*, batch_t*))actionCallDoActionMULTIQUEUE)); + (rsRetVal (*)(void*, batch_t*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -372,10 +372,8 @@ static rsRetVal getReturnCode(action_t *pThis) iRet = RS_RET_SUSPENDED; break; case ACT_STATE_SUSP: - iRet = RS_RET_SUSPENDED; - break; case ACT_STATE_DIED: - iRet = RS_RET_DISABLE_ACTION; + iRet = RS_RET_ACTION_FAILED; break; default: DBGPRINTF("Invalid action engine state %d, program error\n", @@ -459,15 +457,12 @@ static rsRetVal actionDoRetry(action_t *pThis, time_t ttNow) ASSERT(pThis != NULL); -RUNLOG_STR("actionDoRetry():"); iRetries = 0; while(pThis->eState == ACT_STATE_RTRY) { iRet = pThis->pMod->tryResume(pThis->pModData); if(iRet == RS_RET_OK) { actionSetState(pThis, ACT_STATE_RDY); -RUNLOG_STR("tryResume succeeded"); } else if(iRet == RS_RET_SUSPENDED) { -RUNLOG_STR("still suspended");; /* max retries reached? */ if((pThis->iResumeRetryCount != -1 && iRetries >= pThis->iResumeRetryCount)) { actionSuspend(pThis, ttNow); @@ -501,7 +496,6 @@ static rsRetVal actionTryResume(action_t *pThis) ASSERT(pThis != NULL); -RUNLOG_STR("actionTryResume()"); if(pThis->eState == ACT_STATE_SUSP) { /* if we are suspended, we need to check if the timeout expired. * for this handling, we must always obtain a fresh timestamp. We used @@ -522,8 +516,10 @@ RUNLOG_STR("actionTryResume()"); CHKiRet(actionDoRetry(pThis, ttNow)); } - DBGPRINTF("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n", - getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + if(Debug && (pThis->eState == ACT_STATE_RTRY ||pThis->eState == ACT_STATE_SUSP)) { + dbgprintf("actionTryResume: action state: %s, next retry (if applicable): %u [now %u]\n", + getActStateName(pThis), (unsigned) pThis->ttResumeRtry, (unsigned) ttNow); + } finalize_it: RETiRet; @@ -538,11 +534,8 @@ static rsRetVal actionPrepare(action_t *pThis) { DEFiRet; -RUNLOG_STR("actionPrepare()"); assert(pThis != NULL); - if(pThis->eState == ACT_STATE_RTRY) { - CHKiRet(actionTryResume(pThis)); - } + CHKiRet(actionTryResume(pThis)); /* if we are now ready, we initialize the transaction and advance * action state accordingly @@ -779,40 +772,155 @@ finalize_it: } -/* receive an array of to-process user pointers and submit them - * for processing. - * rgerhards, 2009-04-22 +/* try to submit a partial batch of elements. + * rgerhards, 2009-05-12 */ static rsRetVal -actionCallDoActionMULTIQUEUEprocessing(action_t *pAction, batch_t *pBatch) +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) { int i; + int iElemProcessed; + int iCommittedUpTo; msg_t *pMsg; rsRetVal localRet; DEFiRet; assert(pBatch != NULL); + assert(pnElem != NULL); - for(i = 0 ; i < pBatch->nElem ; i++) { + i = pBatch->iDoneUpTo; /* all messages below that index are processed */ + iElemProcessed = 0; + iCommittedUpTo = i; + while(iElemProcessed <= *pnElem && i < pBatch->nElem) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; -dbgprintf("actionCall..MULTIQUEUE: i: %d/%d, pMsg: %p\n", i, pBatch->nElem, pMsg); + dbgprintf("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p\n", i, pBatch->nElem, *pnElem, pMsg);//remove later! localRet = actionProcessMessage(pAction, pMsg); dbgprintf("action call returned %d\n", localRet); + if(localRet == RS_RET_OK) { + /* mark messages as committed */ + while(iCommittedUpTo < i) { + pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + /* mark messages as committed */ + while(iCommittedUpTo < i - 1) { + pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; + } + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { + pBatch->pElem[i].state = BATCH_STATE_SUB; + } else { + iRet = localRet; + FINALIZE; + } + ++i; + ++iElemProcessed; + } + +finalize_it: + if(pBatch->iDoneUpTo != iCommittedUpTo) { + *pnElem += iCommittedUpTo - pBatch->iDoneUpTo; + pBatch->iDoneUpTo = iCommittedUpTo; + } + RETiRet; +} + + +/* submit a batch for actual action processing. + * The first nElem elements are processed. This function calls itself + * recursively if it needs to handle errors. + * rgerhards, 2009-05-12 + */ +static rsRetVal +submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +{ + int i; + int bDone; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + + bDone = 0; + do { + localRet = tryDoAction(pAction, pBatch, &nElem); + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + /* try commit transaction, once done, we can simply do so as if + * that return state was returned from tryDoAction(). + */ + localRet = finishBatch(pAction); + } + + if( localRet == RS_RET_OK + || localRet == RS_RET_PREVIOUS_COMMITTED + || localRet == RS_RET_DEFER_COMMIT) { + bDone = 1; + } else if(localRet == RS_RET_SUSPENDED) { + ; /* do nothing, this will retry the full batch */ + } else if(localRet == RS_RET_ACTION_FAILED) { + /* in this case, the whole batch can not be processed */ + for(i = 0 ; i < nElem ; ++i) { + pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD; + } + bDone = 1; + } else { + if(nElem == 1) { + pBatch->pElem[++pBatch->iDoneUpTo].state = BATCH_STATE_BAD; + bDone = 1; + } else { + /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ + submitBatch(pAction, pBatch, nElem / 2); + submitBatch(pAction, pBatch, nElem - (nElem / 2)); + bDone = 1; + } + } + } while(!bDone); /* do .. while()! */ + + RETiRet; +} + + +/* receive a batch and process it. This includes retry handling. + * rgerhards, 2009-05-12 + */ +static rsRetVal +processAction(action_t *pAction, batch_t *pBatch) +{ + int i; + msg_t *pMsg; + rsRetVal localRet; + DEFiRet; + + assert(pBatch != NULL); + + pBatch->iDoneUpTo = 0; + /* TODO: think about action batches, must be handled at upper layer! + * MULTIQUEUE + */ + localRet = submitBatch(pAction, pBatch, pBatch->nElem); + CHKiRet(localRet); + + /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ + for(i = 0 ; i < pBatch->nElem ; i++) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ - CHKiRet(localRet); } iRet = finishBatch(pAction); finalize_it: RETiRet; } + + #pragma GCC diagnostic ignored "-Wempty-body" /* receive an array of to-process user pointers and submit them * for processing. * rgerhards, 2009-04-22 */ static rsRetVal -actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch) +processBatchMain(action_t *pAction, batch_t *pBatch) { int iCancelStateSave; DEFiRet; @@ -829,7 +937,7 @@ actionCallDoActionMULTIQUEUE(action_t *pAction, batch_t *pBatch) pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); pthread_setcancelstate(iCancelStateSave, NULL); - iRet = actionCallDoActionMULTIQUEUEprocessing(pAction, pBatch); + iRet = processAction(pAction, pBatch); pthread_cleanup_pop(1); /* unlock mutex */ -- cgit v1.2.3 From 4a8c02870a55e19c1bebfae5cb70d1ec5aa7c203 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 13 May 2009 16:00:15 +0200 Subject: moved user object destruction to queue itself So far, the consumer was responsible for destroying objects. However, this does not work well with ultra-reliable queues. This is the first move to support them. --- action.c | 1 - 1 file changed, 1 deletion(-) (limited to 'action.c') diff --git a/action.c b/action.c index 0253e1b6..424cb00b 100644 --- a/action.c +++ b/action.c @@ -905,7 +905,6 @@ processAction(action_t *pAction, batch_t *pBatch) /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ for(i = 0 ; i < pBatch->nElem ; i++) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - msgDestruct(&pMsg); /* TODO: change: we are now finished with the message */ } iRet = finishBatch(pAction); -- cgit v1.2.3 From 5defa14fed704cabe2074f4bdbb6d389d6dee7cf Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 22 Jun 2009 15:32:01 +0200 Subject: some post-merge cleanup --- action.c | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 8c41b638..5d591a5d 100644 --- a/action.c +++ b/action.c @@ -608,9 +608,8 @@ rsRetVal actionDbgPrint(action_t *pThis) /* prepare the calling parameters for doAction() * rgerhards, 2009-05-07 */ -static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar ***pppMsgs) +static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) { - uchar **ppMsgs = *pppMsgs; int i; DEFiRet; @@ -630,7 +629,6 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg, uchar ***p } finalize_it: - *pppMsgs = ppMsgs; RETiRet; } @@ -683,14 +681,13 @@ static rsRetVal cleanupDoActionParams(action_t *pAction) rsRetVal actionCallDoAction(action_t *pThis, msg_t *pMsg) { - uchar **ppMsgs; /* array of message pointers for doAction */ DEFiRet; ASSERT(pThis != NULL); ISOBJ_TYPE_assert(pMsg, msg); DBGPRINTF("entering actionCalldoAction(), state: %s\n", getActStateName(pThis)); - CHKiRet(prepareDoActionParams(pThis, pMsg, &ppMsgs)); + CHKiRet(prepareDoActionParams(pThis, pMsg)); pThis->bHadAutoCommit = 0; iRet = pThis->pMod->mod.om.doAction(pThis->ppMsgs, pMsg->msgFlags, pThis->pModData); -- cgit v1.2.3 From 11172b62b0f0312fef6a4a0abca982a2a6301649 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 24 Jun 2009 15:05:13 +0200 Subject: quick and dirty fix for one race condition It is intentionally quick & dirty, as I would like to do some better patch, if possible. For that, I probably need the commented-out code, thus no delete. --- action.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 09dd7df4..d43028b8 100644 --- a/action.c +++ b/action.c @@ -1240,7 +1240,7 @@ actionCallAction(action_t *pAction, msg_t *pMsg) /* We need to lock the mutex only for repeated line processing. * rgerhards, 2009-06-19 */ - if(pAction->f_ReduceRepeated == 1) { + //if(pAction->f_ReduceRepeated == 1) { pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); LockObj(pAction); pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); @@ -1250,9 +1250,9 @@ actionCallAction(action_t *pAction, msg_t *pMsg) UnlockObj(pAction); pthread_cleanup_pop(0); /* remove mutex cleanup handler */ pthread_setcancelstate(iCancelStateSave, NULL); - } else { - iRet = doActionCallAction(pAction, pMsg); - } + //} else { + //iRet = doActionCallAction(pAction, pMsg); + //} RETiRet; } -- cgit v1.2.3