diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-11-02 12:04:33 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-11-02 12:04:33 +0100 |
commit | 08e96b620e67c576b5ad50a205a1ae941c990272 (patch) | |
tree | 85942f979462c98a8a80d8286f4107304759997a /action.c | |
parent | fbfc3984633113ac7e4e018afb3d10cb8a3d3e50 (diff) | |
download | rsyslog-08e96b620e67c576b5ad50a205a1ae941c990272.tar.gz rsyslog-08e96b620e67c576b5ad50a205a1ae941c990272.tar.bz2 rsyslog-08e96b620e67c576b5ad50a205a1ae941c990272.zip |
refactor output side of action queue - main shot
this needs refinement, but basically we now do no longer rely on
batches for the sub-functions.
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 359 |
1 files changed, 69 insertions, 290 deletions
@@ -69,7 +69,7 @@ * beast. * rgerhards, 2011-06-15 * - * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -748,7 +748,8 @@ finalize_it: * depending on its current state. * rgerhards, 2009-05-07 */ -static inline rsRetVal actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +static inline rsRetVal +actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) { DEFiRet; @@ -821,34 +822,31 @@ static rsRetVal actionDbgPrint(action_t *pThis) * rgerhards, 2009-05-07 */ static rsRetVal -prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime *ttNow) +prepareDoActionParams(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow) { int i; - msg_t *pMsg; struct json_object *json; + actWrkrInfo_t *pWrkrInfo; DEFiRet; - ASSERT(pAction != NULL); - ASSERT(pElem != NULL); - - pMsg = pElem->pMsg; + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { case ACT_STRING_PASSING: - CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pElem->staticActStrings[i]), - &pElem->staticLenStrings[i], ttNow)); - pElem->staticActParams[i] = pElem->staticActStrings[i]; + CHKiRet(tplToString(pAction->ppTpl[i], pMsg, &(pWrkrInfo->staticActStrings[i]), + &pWrkrInfo->staticLenStrings[i], ttNow)); + pWrkrInfo->staticActParams[i] = pWrkrInfo->staticActStrings[i]; break; case ACT_ARRAY_PASSING: - CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pElem->staticActParams[i]), ttNow)); + CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pWrkrInfo->staticActParams[i]), ttNow)); break; case ACT_MSG_PASSING: - pElem->staticActParams[i] = (void*) pMsg; + pWrkrInfo->staticActParams[i] = (void*) pMsg; break; case ACT_JSON_PASSING: CHKiRet(tplToJSON(pAction->ppTpl[i], pMsg, &json, ttNow)); - pElem->staticActParams[i] = (void*) json; + pWrkrInfo->staticActParams[i] = (void*) json; break; default:dbgprintf("software bug/error: unknown pAction->eParamPassing %d in prepareDoActionParams\n", (int) pAction->eParamPassing); @@ -862,69 +860,55 @@ finalize_it: } -/* free a batches ressources, but not string buffers (because they will - * most probably be reused). String buffers are only deleted upon final - * destruction of the batch. - * This function here must be called only when the batch is actually no - * longer used, also not for retrying actions or such. It invalidates - * buffers. - * rgerhards, 2010-12-17 - */ -static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch) +static void +releaseDoActionParams(action_t *pAction, wti_t *pWti) { int jArr; - int i, j; - batch_obj_t *pElem; + int j; + actWrkrInfo_t *pWrkrInfo; uchar ***ppMsgs; - DEFiRet; - - ASSERT(pAction != NULL); if(pAction->eParamPassing == ACT_STRING_PASSING || pAction->eParamPassing == ACT_MSG_PASSING) goto done; /* we need to do nothing with these types! */ - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - switch(pAction->eParamPassing) { - case ACT_ARRAY_PASSING: - ppMsgs = (uchar***) pElem->staticActParams; - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - if(((uchar**)ppMsgs)[j] != NULL) { - jArr = 0; - while(ppMsgs[j][jArr] != NULL) { - d_free(ppMsgs[j][jArr]); - ppMsgs[j][jArr] = NULL; - ++jArr; - } - d_free(((uchar**)ppMsgs)[j]); - ((uchar**)ppMsgs)[j] = NULL; - } + pWrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + switch(pAction->eParamPassing) { + case ACT_ARRAY_PASSING: + ppMsgs = (uchar***) pWrkrInfo->staticActParams; + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + if(((uchar**)ppMsgs)[j] != NULL) { + jArr = 0; + while(ppMsgs[j][jArr] != NULL) { + free(ppMsgs[j][jArr]); + ppMsgs[j][jArr] = NULL; + ++jArr; } - break; - case ACT_JSON_PASSING: - for(j = 0 ; j < pAction->iNumTpls ; ++j) { - json_object_put((struct json_object*) - pElem->staticActParams[j]); - pElem->staticActParams[j] = NULL; - } - break; - case ACT_STRING_PASSING: - case ACT_MSG_PASSING: - /* can never happen, just to keep compiler happy! */ - break; + free(((uchar**)ppMsgs)[j]); + ((uchar**)ppMsgs)[j] = NULL; } } + break; + case ACT_JSON_PASSING: + for(j = 0 ; j < pAction->iNumTpls ; ++j) { + json_object_put((struct json_object*) + pWrkrInfo->staticActParams[j]); + pWrkrInfo->staticActParams[j] = NULL; + } + break; + case ACT_STRING_PASSING: + case ACT_MSG_PASSING: + /* can never happen, just to keep compiler happy! */ + break; } -done: RETiRet; +done: return; } /* call the DoAction output plugin entry point * rgerhards, 2008-01-28 */ -rsRetVal +static rsRetVal actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) { DEFiRet; @@ -938,7 +922,6 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); pThis->bHadAutoCommit = 0; -dbgprintf("DDDDD: calling doAction for action %d\n", pThis->iActionNbr); iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { @@ -1002,9 +985,10 @@ finalize_it: * rgerhards, 2008-01-28 */ static rsRetVal -finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) +finishBatch(action_t *pThis, wti_t *pWti) { int i; + int pbShutdownImmediate = 1; DEFiRet; ASSERT(pThis != NULL); @@ -1019,10 +1003,12 @@ for(i = 0 ; i < iActionNbr ; ++i) { FINALIZE; /* nothing to do */ } - CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate, pWti)); + CHKiRet(actionPrepare(pThis, &pbShutdownImmediate, pWti)); if(getActionState(pWti, pThis) == ACT_STATE_ITX) { dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr); iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); + setActionState(pWti, pThis, ACT_STATE_RDY); +#if 0 switch(iRet) { case RS_RET_OK: actionCommitted(pThis, pWti); @@ -1053,6 +1039,7 @@ dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr); */ FINALIZE; } +#endif } iRet = getReturnCode(pThis, pWti); @@ -1061,153 +1048,6 @@ finalize_it: } -/* try to submit a partial batch of elements. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, wti_t *pWti) -{ - int i; - int iElemProcessed; - int iCommittedUpTo; - msg_t *pMsg; - rsRetVal localRet; - DEFiRet; - - assert(pBatch != NULL); - assert(pnElem != NULL); - - i = pBatch->iDoneUpTo; /* all messages below that index are processed */ - iElemProcessed = 0; - iCommittedUpTo = i; - DBGPRINTF("tryDoAction %p, pnElem %d, nElem %d\n", pAction, *pnElem, pBatch->nElem); - while(iElemProcessed <= *pnElem && i < pBatch->nElem) { - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - /* NOTE: do NOT extend the filter below! Anything else must be done on the - * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 - */ - if(batchIsValidElem(pBatch, i)) { - pMsg = pBatch->pElem[i].pMsg; - localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, - pBatch->pbShutdownImmediate, pWti); - DBGPRINTF("action %p call returned %d\n", pAction, localRet); - /* Note: we directly modify the batch object state, because we know that - * wo do not overwrite BATCH_STATE_DISC indicators! - */ - if(localRet == RS_RET_OK) { - /* mark messages as committed */ - while(iCommittedUpTo <= i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - } - } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { - /* mark messages as committed */ - while(iCommittedUpTo < i) { - pBatch->pElem[iCommittedUpTo].bPrevWasSuspended = 0; /* we had success! */ - batchSetElemState(pBatch, iCommittedUpTo, BATCH_STATE_COMM); - ++iCommittedUpTo; - } - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DEFER_COMMIT) { - pBatch->eltState[i] = BATCH_STATE_SUB; - } else if(localRet == RS_RET_DISCARDMSG) { - pBatch->eltState[i] = BATCH_STATE_DISC; - } else { - dbgprintf("tryDoAction: unexpected error code %d[nElem %d, Commited UpTo %d], finalizing\n", - localRet, *pnElem, iCommittedUpTo); - iRet = localRet; - FINALIZE; - } - } - ++i; - ++iElemProcessed; - } - -finalize_it: - if(pBatch->iDoneUpTo != iCommittedUpTo) { - pBatch->iDoneUpTo = iCommittedUpTo; - } - RETiRet; -} - -/* submit a batch for actual action processing. - * The first nElem elements are processed. This function calls itself - * recursively if it needs to handle errors. - * Note: we don't need the number of the first message to be processed as a parameter, - * because this is kept track of inside the batch itself (iDoneUpTo). - * rgerhards, 2009-05-12 - */ -static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem, wti_t *pWti) -{ - int i; - int bDone; - rsRetVal localRet; - int wasDoneTo; - DEFiRet; - - assert(pBatch != NULL); - - DBGPRINTF("submitBatch: enter, nElem %d\n", nElem); - wasDoneTo = pBatch->iDoneUpTo; - bDone = 0; - do { - localRet = tryDoAction(pAction, pBatch, &nElem, pWti); - if(localRet == RS_RET_FORCE_TERM) { - ABORT_FINALIZE(RS_RET_FORCE_TERM); - } - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { - /* try commit transaction, once done, we can simply do so as if - * that return state was returned from tryDoAction(). - */ - localRet = finishBatch(pAction, pBatch, pWti); - } - - if( localRet == RS_RET_OK - || localRet == RS_RET_PREVIOUS_COMMITTED - || localRet == RS_RET_DEFER_COMMIT) { - bDone = 1; - } else if(localRet == RS_RET_SUSPENDED) { - DBGPRINTF("action ret RS_RET_SUSPENDED - retry full batch\n"); - /* do nothing, this will retry the full batch */ - } else if(localRet == RS_RET_ACTION_FAILED) { - /* in this case, everything not yet committed is BAD */ - for(i = pBatch->iDoneUpTo ; i < wasDoneTo + nElem ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && pBatch->eltState[i] != BATCH_STATE_COMM ) { - pBatch->eltState[i] = BATCH_STATE_BAD; - pBatch->pElem[i].bPrevWasSuspended = 1; - STATSCOUNTER_INC(pAction->ctrFail, pAction->mutCtrFail); - } - } - bDone = 1; - } else { - if(nElem == 1) { - batchSetElemState(pBatch, pBatch->iDoneUpTo, BATCH_STATE_BAD); - bDone = 1; - } else { - /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - DBGPRINTF("submitBatch recursing trying to find and exclude the culprit " - "for iRet %d\n", localRet); - submitBatch(pAction, pBatch, nElem / 2, pWti); - submitBatch(pAction, pBatch, nElem - (nElem / 2), pWti); - bDone = 1; - } - } - } while(!bDone && !*(pBatch->pbShutdownImmediate)); /* do .. while()! */ - - if(*(pBatch->pbShutdownImmediate)) - ABORT_FINALIZE(RS_RET_FORCE_TERM); - -finalize_it: - RETiRet; -} - - /* copy "active" array of batch, as we need to modify it. The caller * must make sure the new array is freed and the orginal batch * pointer is restored (thus the caller must save it). If active @@ -1232,60 +1072,6 @@ finalize_it: RETiRet; } -/* The following function prepares a batch for processing, that it is - * reinitializes batch states, generates strings and does everything else - * that needs to be done in order to make the batch ready for submission to - * the actual output module. Note that we look at the precomputed - * filter OK condition and process only those messages, that actually matched - * the filter. - * rgerhards, 2010-06-14 - */ -static inline rsRetVal -prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr) -{ - int i; - batch_obj_t *pElem; - struct syslogTime ttNow; - DEFiRet; - - /* indicate we have not yet read the date */ - ttNow.year = 0; - - pBatch->iDoneUpTo = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - pElem = &(pBatch->pElem[i]); - if(batchIsValidElem(pBatch, i)) { - pBatch->eltState[i] = BATCH_STATE_RDY; - if(prepareDoActionParams(pAction, pElem, &ttNow) != RS_RET_OK) { - /* make sure we have our copy of "active" array */ - if(!*bMustRestoreActivePtr) { - *activeSave = pBatch->active; - copyActive(pBatch); - } - pBatch->active[i] = RSFALSE; - } - } - } - RETiRet; -} - - -/* receive a batch and process it. This includes retry handling. - * rgerhards, 2009-05-12 - */ -static inline rsRetVal -processAction(action_t *pAction, batch_t *pBatch, wti_t* pWti) -{ - DEFiRet; - - assert(pBatch != NULL); - CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pWti)); - iRet = finishBatch(pAction, pBatch, pWti); - -finalize_it: - RETiRet; -} - /* receive an array of to-process user pointers and submit them * for processing. @@ -1294,41 +1080,34 @@ finalize_it: static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) { - int *pbShutdownImmdtSave; - sbool *activeSave; - int bMustRestoreActivePtr = 0; - rsRetVal localRet; action_t *pAction = (action_t*) pVoid; + msg_t *pMsg; + int i; + struct syslogTime ttNow; DEFiRet; - assert(pBatch != NULL); - - if(pbShutdownImmediate != NULL) { - pbShutdownImmdtSave = pBatch->pbShutdownImmediate; - pBatch->pbShutdownImmediate = pbShutdownImmediate; + if(pbShutdownImmediate == NULL) { + pbShutdownImmediate = pBatch->pbShutdownImmediate; } - CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); - - iRet = processAction(pAction, pBatch, pWti); - /* even if processAction failed, we need to release the batch (else we - * have a memory leak). So we do this first, and then check if we need to - * return an error code. If so, the code from processAction has priority. - * rgerhards, 2010-12-17 - */ - localRet = releaseBatch(pAction, pBatch); + /* indicate we have not yet read the date */ + ttNow.year = 0; - if(iRet == RS_RET_OK) - iRet = localRet; - - if(bMustRestoreActivePtr) { - free(pBatch->active); - pBatch->active = activeSave; + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { + if(batchIsValidElem(pBatch, i)) { + pMsg = pBatch->pElem[i].pMsg; +dbgprintf("DDDD: processBatchMain[act %d], elt %d: %s\n", pAction->iActionNbr, i, pMsg->pszRawMsg); + // TODO: check error return states! + iRet = prepareDoActionParams(pAction, pWti, pMsg, &ttNow); + iRet = actionProcessMessage(pAction, pMsg, + pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, + pbShutdownImmediate, pWti); + releaseDoActionParams(pAction, pWti); + } } -finalize_it: - if(pbShutdownImmediate != NULL) - pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + iRet = finishBatch(pAction, pWti); +dbgprintf("DDDD: processBatchMain - end\n"); RETiRet; } |