summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c65
1 files changed, 34 insertions, 31 deletions
diff --git a/action.c b/action.c
index 5f49b44c..9ea3aa65 100644
--- a/action.c
+++ b/action.c
@@ -119,10 +119,10 @@
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
-static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*);
-static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch);
-static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch);
-static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch);
+static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*);
+static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
+static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
+static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -428,7 +428,7 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst)
* spec. -- rgerhards, 2008-01-30
*/
CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize,
- (rsRetVal (*)(void*, batch_t*, int*))processBatchMain));
+ processBatchMain));
obj.SetName((obj_t*) pThis->pQueue, pszAName);
qqueueSetpAction(pThis->pQueue, pThis);
@@ -905,7 +905,7 @@ done: RETiRet;
* rgerhards, 2008-01-28
*/
rsRetVal
-actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
+actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
{
DEFiRet;
@@ -916,6 +916,8 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams)
getActStateName(pThis), pThis->iActionNbr);
pThis->bHadAutoCommit = 0;
+dbgprintf("DDDD: calling action id %d for wti %p\n", pThis->iActionNbr, pWti);
+#warning TODO: add action instance check here
iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pThis->pModData);
switch(iRet) {
case RS_RET_OK:
@@ -954,7 +956,7 @@ finalize_it:
* rgerhards, 2008-01-28
*/
static inline rsRetVal
-actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate)
+actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
@@ -965,7 +967,7 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
if(pThis->eState == ACT_STATE_ITX)
- CHKiRet(actionCallDoAction(pThis, pMsg, actParams));
+ CHKiRet(actionCallDoAction(pThis, pMsg, actParams, pWti));
iRet = getReturnCode(pThis);
finalize_it:
@@ -1035,7 +1037,7 @@ finalize_it:
* rgerhards, 2009-05-12
*/
static inline rsRetVal
-tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
+tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti)
{
int i;
int iElemProcessed;
@@ -1060,7 +1062,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
if(batchIsValidElem(pBatch, i)) {
pMsg = pBatch->pElem[i].pMsg;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
- pBatch->pbShutdownImmediate);
+ pBatch->pbShutdownImmediate, pWti);
DBGPRINTF("action %p call returned %d\n", pAction, localRet);
/* Note: we directly modify the batch object state, because we know that
* wo do not overwrite BATCH_STATE_DISC indicators!
@@ -1112,7 +1114,7 @@ finalize_it:
* rgerhards, 2009-05-12
*/
static rsRetVal
-submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
+submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti)
{
int i;
int bDone;
@@ -1126,7 +1128,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
wasDoneTo = pBatch->iDoneUpTo;
bDone = 0;
do {
- localRet = tryDoAction(pAction, pBatch, &nElem);
+ localRet = tryDoAction(pAction, pBatch, &nElem, pWti);
if(localRet == RS_RET_FORCE_TERM) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
@@ -1165,8 +1167,8 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem)
/* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
DBGPRINTF("submitBatch recursing trying to find and exclude the culprit "
"for iRet %d\n", localRet);
- submitBatch(pAction, pBatch, nElem / 2);
- submitBatch(pAction, pBatch, nElem - (nElem / 2));
+ submitBatch(pAction, pBatch, nElem / 2, pWti);
+ submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti);
bDone = 1;
}
}
@@ -1246,12 +1248,12 @@ prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustR
* rgerhards, 2009-05-12
*/
static inline rsRetVal
-processAction(action_t *pAction, batch_t *pBatch)
+processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti)
{
DEFiRet;
assert(pBatch != NULL);
- CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem));
+ CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti));
iRet = finishBatch(pAction, pBatch);
finalize_it:
@@ -1265,12 +1267,13 @@ finalize_it:
* rgerhards, 2009-04-22
*/
static rsRetVal
-processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
+processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate)
{
int *pbShutdownImmdtSave;
sbool *activeSave;
int bMustRestoreActivePtr = 0;
rsRetVal localRet;
+ action_t *pAction = (action_t*) pVoid;
DEFiRet;
assert(pBatch != NULL);
@@ -1289,7 +1292,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
d_pthread_mutex_lock(&pAction->mutActExec);
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec);
- iRet = processAction(pAction, pBatch);
+ iRet = processAction(pAction, pBatch, pWti);
pthread_cleanup_pop(1); /* unlock mutex */
@@ -1382,7 +1385,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
* rgerhards, 2010-06-08
*/
static inline rsRetVal
-doSubmitToActionQ(action_t *pAction, msg_t *pMsg)
+doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti)
{
DEFiRet;
@@ -1410,7 +1413,7 @@ finalize_it:
* be filtered out before calling us (what is done currently!).
*/
rsRetVal
-actionWriteToAction(action_t *pAction, msg_t *pMsg)
+actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti)
{
DEFiRet;
@@ -1465,7 +1468,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = doSubmitToActionQ(pAction, pMsg);
+ iRet = doSubmitToActionQ(pAction, pMsg, pWti);
finalize_it:
RETiRet;
@@ -1476,7 +1479,7 @@ finalize_it:
* pthread_cleanup_push() POSIX macro...
*/
static inline rsRetVal
-doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
+doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti)
{
msg_t *pMsg;
DEFiRet;
@@ -1491,7 +1494,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch)
}
/* call the output driver */
- iRet = actionWriteToAction(pAction, pMsg);
+ iRet = actionWriteToAction(pAction, pMsg, pWti);
finalize_it:
/* we need to update the batch to handle failover processing correctly */
@@ -1551,7 +1554,7 @@ activateActions(void)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
+doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
time_t now = 0;
time_t lastAct;
@@ -1589,7 +1592,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
}
}
- iRet = doSubmitToActionQBatch(pAction, pBatch);
+ iRet = doSubmitToActionQBatch(pAction, pBatch, pWti);
free(pBatch->active);
pBatch->active = activeSave;
@@ -1667,7 +1670,7 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
+doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
int i;
DEFiRet;
@@ -1685,7 +1688,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( batchIsValidElem(pBatch, i)
&& (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
- doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg);
+ doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti);
}
}
}
@@ -1700,7 +1703,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
* rgerhards, 2010-06-23
*/
static inline rsRetVal
-helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
+helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
int i;
DEFiRet;
@@ -1713,7 +1716,7 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
if( batchIsValidElem(pBatch, i)
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
- doActionCallAction(pAction, pBatch, i);
+ doActionCallAction(pAction, pBatch, i, pWti);
}
}
@@ -1725,13 +1728,13 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
*/
#pragma GCC diagnostic ignored "-Wempty-body"
static rsRetVal
-doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
+doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
{
DEFiRet;
d_pthread_mutex_lock(&pAction->mutAction);
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction);
- iRet = helperSubmitToActionQComplexBatch(pAction, pBatch);
+ iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti);
d_pthread_mutex_unlock(&pAction->mutAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */