summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2013-11-02 12:04:33 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2013-11-02 12:04:33 +0100
commit08e96b620e67c576b5ad50a205a1ae941c990272 (patch)
tree85942f979462c98a8a80d8286f4107304759997a /action.c
parentfbfc3984633113ac7e4e018afb3d10cb8a3d3e50 (diff)
downloadrsyslog-08e96b620e67c576b5ad50a205a1ae941c990272.tar.gz
rsyslog-08e96b620e67c576b5ad50a205a1ae941c990272.tar.bz2
rsyslog-08e96b620e67c576b5ad50a205a1ae941c990272.zip
refactor output side of action queue - main shot
this needs refinement, but basically we now do no longer rely on batches for the sub-functions.
Diffstat (limited to 'action.c')
-rw-r--r--action.c359
1 files changed, 69 insertions, 290 deletions
diff --git a/action.c b/action.c
index 69479bad..8145574e 100644
--- a/action.c
+++ b/action.c
@@ -69,7 +69,7 @@
* beast.
* rgerhards, 2011-06-15
*
- * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -748,7 +748,8 @@ finalize_it:
* depending on its current state.
* rgerhards, 2009-05-07
*/
-static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
+static inline rsRetVal
+actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
{
DEFiRet;
@@ -821,34 +822,31 @@ static rsRetVal actionDbgPrint(action_t *pThis)
* rgerhards, 2009-05-07
*/
static rsRetVal
-prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow)
+prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow)
{
int i;
- msg_t *pMsg;
struct json_object *json;
+ actWrkrInfo_t *pWrkrInfo;
DEFiRet;
- ASSERT(pAction != NULL);
- ASSERT(pElem != NULL);
-
- pMsg = pElem->pMsg;
+ pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
/* here we must loop to process all requested strings */
for(i = 0 ; i < pAction->iNumTpls ; ++i) {
switch(pAction->eParamPassing) {
case ACT_STRING_PASSING:
- CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]),
- &pElem->staticLenStrings[i], ttNow));
- pElem->staticActParams[i] = pElem->staticActStrings[i];
+ CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->staticActStrings[i]),
+ &pWrkrInfo->staticLenStrings[i], ttNow));
+ pWrkrInfo->staticActParams[i] = pWrkrInfo->staticActStrings[i];
break;
case ACT_ARRAY_PASSING:
- CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow));
+ CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow));
break;
case ACT_MSG_PASSING:
- pElem->staticActParams[i] = (void*) pMsg;
+ pWrkrInfo->staticActParams[i] = (void*) pMsg;
break;
case ACT_JSON_PASSING:
CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow));
- pElem->staticActParams[i] = (void*) json;
+ pWrkrInfo->staticActParams[i] = (void*) json;
break;
default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n",
(int) pAction->eParamPassing);
@@ -862,69 +860,55 @@ finalize_it:
}
-/* free a batches ressources, but not string buffers (because they will
- * most probably be reused). String buffers are only deleted upon final
- * destruction of the batch.
- * This function here must be called only when the batch is actually no
- * longer used, also not for retrying actions or such. It invalidates
- * buffers.
- * rgerhards, 2010-12-17
- */
-static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
+static void
+releaseDoActionParams(action_t *pAction, wti_t *pWti)
{
int jArr;
- int i, j;
- batch_obj_t *pElem;
+ int j;
+ actWrkrInfo_t *pWrkrInfo;
uchar ***ppMsgs;
- DEFiRet;
-
- ASSERT(pAction != NULL);
if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING)
goto done; /* we need to do nothing with these types! */
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- pElem = &(pBatch->pElem[i]);
- if(batchIsValidElem(pBatch, i)) {
- switch(pAction->eParamPassing) {
- case ACT_ARRAY_PASSING:
- ppMsgs = (uchar***) pElem->staticActParams;
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
- if(((uchar**)ppMsgs)[j] != NULL) {
- jArr = 0;
- while(ppMsgs[j][jArr] != NULL) {
- d_free(ppMsgs[j][jArr]);
- ppMsgs[j][jArr] = NULL;
- ++jArr;
- }
- d_free(((uchar**)ppMsgs)[j]);
- ((uchar**)ppMsgs)[j] = NULL;
- }
+ pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
+ switch(pAction->eParamPassing) {
+ case ACT_ARRAY_PASSING:
+ ppMsgs = (uchar***) pWrkrInfo->staticActParams;
+ for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ if(((uchar**)ppMsgs)[j] != NULL) {
+ jArr = 0;
+ while(ppMsgs[j][jArr] != NULL) {
+ free(ppMsgs[j][jArr]);
+ ppMsgs[j][jArr] = NULL;
+ ++jArr;
}
- break;
- case ACT_JSON_PASSING:
- for(j = 0 ; j < pAction->iNumTpls ; ++j) {
- json_object_put((struct json_object*)
- pElem->staticActParams[j]);
- pElem->staticActParams[j] = NULL;
- }
- break;
- case ACT_STRING_PASSING:
- case ACT_MSG_PASSING:
- /* can never happen, just to keep compiler happy! */
- break;
+ free(((uchar**)ppMsgs)[j]);
+ ((uchar**)ppMsgs)[j] = NULL;
}
}
+ break;
+ case ACT_JSON_PASSING:
+ for(j = 0 ; j < pAction->iNumTpls ; ++j) {
+ json_object_put((struct json_object*)
+ pWrkrInfo->staticActParams[j]);
+ pWrkrInfo->staticActParams[j] = NULL;
+ }
+ break;
+ case ACT_STRING_PASSING:
+ case ACT_MSG_PASSING:
+ /* can never happen, just to keep compiler happy! */
+ break;
}
-done: RETiRet;
+done: return;
}
/* call the DoAction output plugin entry point
* rgerhards, 2008-01-28
*/
-rsRetVal
+static rsRetVal
actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
{
DEFiRet;
@@ -938,7 +922,6 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
pThis->bHadAutoCommit = 0;
-dbgprintf("DDDDD: calling doAction for action %d\n", pThis->iActionNbr);
iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags,
pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
@@ -1002,9 +985,10 @@ finalize_it:
* rgerhards, 2008-01-28
*/
static rsRetVal
-finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
+finishBatch(action_t *pThis, wti_t *pWti)
{
int i;
+ int pbShutdownImmediate = 1;
DEFiRet;
ASSERT(pThis != NULL);
@@ -1019,10 +1003,12 @@ for(i = 0 ; i < iActionNbr ; ++i) {
FINALIZE; /* nothing to do */
}
- CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti));
+ CHKiRet(actionPrepare(pThis, &pbShutdownImmediate, pWti));
if(getActionState(pWti, pThis) == ACT_STATE_ITX) {
dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr);
iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
+ setActionState(pWti, pThis, ACT_STATE_RDY);
+#if 0
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis, pWti);
@@ -1053,6 +1039,7 @@ dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr);
*/
FINALIZE;
}
+#endif
}
iRet = getReturnCode(pThis, pWti);
@@ -1061,153 +1048,6 @@ finalize_it:
}
-/* try to submit a partial batch of elements.
- * rgerhards, 2009-05-12
- */
-static inline rsRetVal
-tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti)
-{
- int i;
- int iElemProcessed;
- int iCommittedUpTo;
- msg_t *pMsg;
- rsRetVal localRet;
- DEFiRet;
-
- assert(pBatch != NULL);
- assert(pnElem != NULL);
-
- i = pBatch->iDoneUpTo; /* all messages below that index are processed */
- iElemProcessed = 0;
- iCommittedUpTo = i;
- DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem);
- while(iElemProcessed <= *pnElem && i < pBatch->nElem) {
- if(*(pBatch->pbShutdownImmediate))
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- /* NOTE: do NOT extend the filter below! Anything else must be done on the
- * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
- */
- if(batchIsValidElem(pBatch, i)) {
- pMsg = pBatch->pElem[i].pMsg;
- localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
- pBatch->pbShutdownImmediate, pWti);
- DBGPRINTF("action %p call returned %d\n", pAction, localRet);
- /* Note: we directly modify the batch object state, because we know that
- * wo do not overwrite BATCH_STATE_DISC indicators!
- */
- if(localRet == RS_RET_OK) {
- /* mark messages as committed */
- while(iCommittedUpTo <= i) {
- pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
- ++iCommittedUpTo;
- }
- } else if(localRet == RS_RET_PREVIOUS_COMMITTED) {
- /* mark messages as committed */
- while(iCommittedUpTo < i) {
- pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */
- batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM);
- ++iCommittedUpTo;
- }
- pBatch->eltState[i] = BATCH_STATE_SUB;
- } else if(localRet == RS_RET_DEFER_COMMIT) {
- pBatch->eltState[i] = BATCH_STATE_SUB;
- } else if(localRet == RS_RET_DISCARDMSG) {
- pBatch->eltState[i] = BATCH_STATE_DISC;
- } else {
- dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n",
- localRet, *pnElem, iCommittedUpTo);
- iRet = localRet;
- FINALIZE;
- }
- }
- ++i;
- ++iElemProcessed;
- }
-
-finalize_it:
- if(pBatch->iDoneUpTo != iCommittedUpTo) {
- pBatch->iDoneUpTo = iCommittedUpTo;
- }
- RETiRet;
-}
-
-/* submit a batch for actual action processing.
- * The first nElem elements are processed. This function calls itself
- * recursively if it needs to handle errors.
- * Note: we don't need the number of the first message to be processed as a parameter,
- * because this is kept track of inside the batch itself (iDoneUpTo).
- * rgerhards, 2009-05-12
- */
-static rsRetVal
-submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti)
-{
- int i;
- int bDone;
- rsRetVal localRet;
- int wasDoneTo;
- DEFiRet;
-
- assert(pBatch != NULL);
-
- DBGPRINTF("submitBatch: enter, nElem %d\n", nElem);
- wasDoneTo = pBatch->iDoneUpTo;
- bDone = 0;
- do {
- localRet = tryDoAction(pAction, pBatch, &nElem, pWti);
- if(localRet == RS_RET_FORCE_TERM) {
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
- }
- if( localRet == RS_RET_OK
- || localRet == RS_RET_PREVIOUS_COMMITTED
- || localRet == RS_RET_DEFER_COMMIT) {
- /* try commit transaction, once done, we can simply do so as if
- * that return state was returned from tryDoAction().
- */
- localRet = finishBatch(pAction, pBatch, pWti);
- }
-
- if( localRet == RS_RET_OK
- || localRet == RS_RET_PREVIOUS_COMMITTED
- || localRet == RS_RET_DEFER_COMMIT) {
- bDone = 1;
- } else if(localRet == RS_RET_SUSPENDED) {
- DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n");
- /* do nothing, this will retry the full batch */
- } else if(localRet == RS_RET_ACTION_FAILED) {
- /* in this case, everything not yet committed is BAD */
- for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) {
- if( pBatch->eltState[i] != BATCH_STATE_DISC
- && pBatch->eltState[i] != BATCH_STATE_COMM ) {
- pBatch->eltState[i] = BATCH_STATE_BAD;
- pBatch->pElem[i].bPrevWasSuspended = 1;
- STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail);
- }
- }
- bDone = 1;
- } else {
- if(nElem == 1) {
- batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD);
- bDone = 1;
- } else {
- /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */
- DBGPRINTF("submitBatch recursing trying to find and exclude the culprit "
- "for iRet %d\n", localRet);
- submitBatch(pAction, pBatch, nElem / 2, pWti);
- submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti);
- bDone = 1;
- }
- }
- } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */
-
- if(*(pBatch->pbShutdownImmediate))
- ABORT_FINALIZE(RS_RET_FORCE_TERM);
-
-finalize_it:
- RETiRet;
-}
-
-
/* copy "active" array of batch, as we need to modify it. The caller
* must make sure the new array is freed and the orginal batch
* pointer is restored (thus the caller must save it). If active
@@ -1232,60 +1072,6 @@ finalize_it:
RETiRet;
}
-/* The following function prepares a batch for processing, that it is
- * reinitializes batch states, generates strings and does everything else
- * that needs to be done in order to make the batch ready for submission to
- * the actual output module. Note that we look at the precomputed
- * filter OK condition and process only those messages, that actually matched
- * the filter.
- * rgerhards, 2010-06-14
- */
-static inline rsRetVal
-prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr)
-{
- int i;
- batch_obj_t *pElem;
- struct syslogTime ttNow;
- DEFiRet;
-
- /* indicate we have not yet read the date */
- ttNow.year = 0;
-
- pBatch->iDoneUpTo = 0;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- pElem = &(pBatch->pElem[i]);
- if(batchIsValidElem(pBatch, i)) {
- pBatch->eltState[i] = BATCH_STATE_RDY;
- if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) {
- /* make sure we have our copy of "active" array */
- if(!*bMustRestoreActivePtr) {
- *activeSave = pBatch->active;
- copyActive(pBatch);
- }
- pBatch->active[i] = RSFALSE;
- }
- }
- }
- RETiRet;
-}
-
-
-/* receive a batch and process it. This includes retry handling.
- * rgerhards, 2009-05-12
- */
-static inline rsRetVal
-processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti)
-{
- DEFiRet;
-
- assert(pBatch != NULL);
- CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti));
- iRet = finishBatch(pAction, pBatch, pWti);
-
-finalize_it:
- RETiRet;
-}
-
/* receive an array of to-process user pointers and submit them
* for processing.
@@ -1294,41 +1080,34 @@ finalize_it:
static rsRetVal
processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate)
{
- int *pbShutdownImmdtSave;
- sbool *activeSave;
- int bMustRestoreActivePtr = 0;
- rsRetVal localRet;
action_t *pAction = (action_t*) pVoid;
+ msg_t *pMsg;
+ int i;
+ struct syslogTime ttNow;
DEFiRet;
- assert(pBatch != NULL);
-
- if(pbShutdownImmediate != NULL) {
- pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
- pBatch->pbShutdownImmediate = pbShutdownImmediate;
+ if(pbShutdownImmediate == NULL) {
+ pbShutdownImmediate = pBatch->pbShutdownImmediate;
}
- CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr));
-
- iRet = processAction(pAction, pBatch, pWti);
- /* even if processAction failed, we need to release the batch (else we
- * have a memory leak). So we do this first, and then check if we need to
- * return an error code. If so, the code from processAction has priority.
- * rgerhards, 2010-12-17
- */
- localRet = releaseBatch(pAction, pBatch);
+ /* indicate we have not yet read the date */
+ ttNow.year = 0;
- if(iRet == RS_RET_OK)
- iRet = localRet;
-
- if(bMustRestoreActivePtr) {
- free(pBatch->active);
- pBatch->active = activeSave;
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
+ if(batchIsValidElem(pBatch, i)) {
+ pMsg = pBatch->pElem[i].pMsg;
+dbgprintf("DDDD: processBatchMain[act %d], elt %d: %s\n", pAction->iActionNbr, i, pMsg->pszRawMsg);
+ // TODO: check error return states!
+ iRet = prepareDoActionParams(pAction, pWti, pMsg, &ttNow);
+ iRet = actionProcessMessage(pAction, pMsg,
+ pWti->actWrkrInfo[pAction->iActionNbr].staticActParams,
+ pbShutdownImmediate, pWti);
+ releaseDoActionParams(pAction, pWti);
+ }
}
-finalize_it:
- if(pbShutdownImmediate != NULL)
- pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
+ iRet = finishBatch(pAction, pWti);
+dbgprintf("DDDD: processBatchMain - end\n");
RETiRet;
}