summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
Diffstat (limited to 'action.c')
-rw-r--r--action.c185
1 files changed, 84 insertions, 101 deletions
diff --git a/action.c b/action.c
index 673d8e13..ca260f92 100644
--- a/action.c
+++ b/action.c
@@ -289,6 +289,11 @@ rsRetVal actionDestruct(action_t *pThis)
DEFiRet;
ASSERT(pThis != NULL);
+ if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) {
+ /* discard actions will be optimized out */
+ FINALIZE;
+ }
+
if(pThis->pQueue != NULL) {
qqueueDestruct(&pThis->pQueue);
}
@@ -310,8 +315,8 @@ rsRetVal actionDestruct(action_t *pThis)
d_free(pThis->pszName);
d_free(pThis->ppTpl);
+finalize_it:
d_free(pThis);
-
RETiRet;
}
@@ -362,6 +367,10 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
ASSERT(pThis != NULL);
+ if(!strcmp((char*)modGetName(pThis->pMod), "builtin:omdiscard")) {
+ /* discard actions will be optimized out */
+ FINALIZE;
+ }
/* generate a friendly name for us action stats */
if(pThis->pszName == NULL) {
snprintf((char*) pszAName, sizeof(pszAName)/sizeof(uchar), "action %d", iActionNbr);
@@ -650,7 +659,7 @@ actionDoRetry(action_t *pThis, time_t ttNow, int *pbShutdownImmediate)
iRetries = 0;
while((*pbShutdownImmediate == 0) && pThis->eState == ACT_STATE_RTRY) {
iRet = pThis->pMod->tryResume(pThis->pModData);
- if((pThis->iResumeOKinRow > 999) && (pThis->iResumeOKinRow % 1000 == 0)) {
+ if((pThis->iResumeOKinRow > 9) && (pThis->iResumeOKinRow % 10 == 0)) {
bTreatOKasSusp = 1;
pThis->iResumeOKinRow = 0;
} else {
@@ -861,7 +870,7 @@ static rsRetVal releaseBatch(action_t *pAction, batch_t *pBatch)
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
pElem = &(pBatch->pElem[i]);
- if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
+ if(batchIsValidElem(pBatch, i)) {
switch(pAction->eParamPassing) {
case ACT_ARRAY_PASSING:
ppMsgs = (uchar***) pElem->staticActParams;
@@ -1059,8 +1068,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem)
/* NOTE: do NOT extend the filter below! Anything else must be done on the
* enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15
*/
- if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC) {
+ if(batchIsValidElem(pBatch, i)) {
pMsg = (msg_t*) pBatch->pElem[i].pUsrp;
localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams,
pBatch->pbShutdownImmediate);
@@ -1181,6 +1189,29 @@ finalize_it:
}
+/* copy "active" array of batch, as we need to modify it. The caller
+ * must make sure the new array is freed and the orginal batch
+ * pointer is restored (thus the caller must save it). If active
+ * is currently NULL, this is properly handled.
+ * Note: the batches active pointer is modified, so it must be
+ * saved BEFORE calling this function!
+ * rgerhards, 2012-09-12
+ */
+static rsRetVal
+copyActive(batch_t *pBatch)
+{
+ sbool *active;
+ DEFiRet;
+
+ CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
+ if(pBatch->active == NULL)
+ memset(active, 1, batchNumMsgs(pBatch));
+ else
+ memcpy(active, pBatch->active, batchNumMsgs(pBatch));
+ pBatch->active = active;
+finalize_it:
+ RETiRet;
+}
/* The following function prepares a batch for processing, that it is
* reinitializes batch states, generates strings and does everything else
@@ -1191,7 +1222,7 @@ finalize_it:
* rgerhards, 2010-06-14
*/
static inline rsRetVal
-prepareBatch(action_t *pAction, batch_t *pBatch)
+prepareBatch(action_t *pAction, batch_t *pBatch, sbool **activeSave, int *bMustRestoreActivePtr)
{
int i;
batch_obj_t *pElem;
@@ -1200,10 +1231,16 @@ prepareBatch(action_t *pAction, batch_t *pBatch)
pBatch->iDoneUpTo = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
pElem = &(pBatch->pElem[i]);
- if(pElem->bFilterOK && pElem->state != BATCH_STATE_DISC) {
+ if(batchIsValidElem(pBatch, i)) {
pElem->state = BATCH_STATE_RDY;
- if(prepareDoActionParams(pAction, pElem) != RS_RET_OK)
- pElem->bFilterOK = RSFALSE;
+ if(prepareDoActionParams(pAction, pElem) != RS_RET_OK) {
+ /* make sure we have our copy of "active" array */
+ if(!*bMustRestoreActivePtr) {
+ *activeSave = pBatch->active;
+ copyActive(pBatch);
+ }
+ pBatch->active[i] = RSFALSE;
+ }
}
}
RETiRet;
@@ -1236,6 +1273,8 @@ static rsRetVal
processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
{
int *pbShutdownImmdtSave;
+ sbool *activeSave;
+ int bMustRestoreActivePtr = 0;
rsRetVal localRet;
DEFiRet;
@@ -1243,7 +1282,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
pBatch->pbShutdownImmediate = pbShutdownImmediate;
- CHKiRet(prepareBatch(pAction, pBatch));
+ CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr));
/* We now must guard the output module against execution by multiple threads. The
* plugin interface specifies that output modules must not be thread-safe (except
@@ -1266,6 +1305,11 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
if(iRet == RS_RET_OK)
iRet = localRet;
+
+ if(bMustRestoreActivePtr) {
+ free(pBatch->active);
+ pBatch->active = activeSave;
+ }
finalize_it:
pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
@@ -1572,7 +1616,8 @@ DEFFUNC_llExecFunc(doActivateActions)
}
actionDisable(pThis);
}
- DBGPRINTF("Action %p: queue %p started\n", pThis, pThis->pQueue);
+ DBGPRINTF("Action %s[%p]: queue %p started\n", modGetName(pThis->pMod),
+ pThis, pThis->pQueue);
ENDfunc
return RS_RET_OK; /* we ignore errors, we can not do anything either way */
}
@@ -1606,22 +1651,15 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
time_t now = 0;
time_t lastAct;
int i;
- int bModifiedFilter;
- sbool FilterSave[1024];
- sbool *pFilterSave;
+ sbool *activeSave;
DEFiRet;
- if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) {
- pFilterSave = FilterSave;
- } else {
- CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
- }
+ activeSave = pBatch->active;
+ copyActive(pBatch);
- bModifiedFilter = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(!pBatch->pElem[i].bFilterOK)
+ if((pBatch->pElem[i].state == BATCH_STATE_DISC) || !pBatch->active[i])
continue;
- pFilterSave[i] = pBatch->pElem[i].bFilterOK;
if(now == 0) {
now = datetime.GetTime(NULL); /* good time call - the only one done */
}
@@ -1632,15 +1670,15 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
lastAct = pAction->f_time;
if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) {
if((now - lastAct) < MarkInterval / 2) {
- pBatch->pElem[i].bFilterOK = 0;
- bModifiedFilter = 1;
- DBGPRINTF("action was recently called, ignoring mark message\n");
+ pBatch->active[i] = 0;
+ DBGPRINTF("batch item %d: action was recently called, ignoring "
+ "mark message\n", i);
break; /* do not update timestamp for non-written mark messages */
}
}
} while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0);
- if(pBatch->pElem[i].bFilterOK) {
+ if(pBatch->active[i]) {
DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n",
i, module.GetStateName(pAction->pMod));
}
@@ -1648,17 +1686,8 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch)
iRet = doSubmitToActionQBatch(pAction, pBatch);
- if(bModifiedFilter) {
- /* in this case, we need to restore previous state */
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- /* note: clang static code analyzer reports a false positive below */
- pBatch->pElem[i].bFilterOK = pFilterSave[i];
- }
- }
-
-finalize_it:
- if(pFilterSave != FilterSave)
- free(pFilterSave);
+ free(pBatch->active);
+ pBatch->active = activeSave;
RETiRet;
}
@@ -1668,8 +1697,7 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
{
int i;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC) {
+ if( batchIsValidElem(pBatch, i)) {
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
}
}
@@ -1683,18 +1711,13 @@ countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
static inline rsRetVal
doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
{
- sbool FilterSave[1024];
- sbool *pFilterSave;
sbool bNeedSubmit;
- sbool bModifiedFilter;
+ sbool *activeSave;
int i;
DEFiRet;
- if(batchNumMsgs(pBatch) <= (int) (sizeof(FilterSave)/sizeof(sbool))) {
- pFilterSave = FilterSave;
- } else {
- CHKmalloc(pFilterSave = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
- }
+ activeSave = pBatch->active;
+ copyActive(pBatch);
/* note: for direct mode, we need to adjust the filter property. For non-direct
* this is not necessary, because in that case we enqueue only what actually needs
@@ -1702,37 +1725,25 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
*/
if(pAction->bExecWhenPrevSusp) {
bNeedSubmit = 0;
- bModifiedFilter = 0;
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- pFilterSave[i] = pBatch->pElem[i].bFilterOK;
if(!pBatch->pElem[i].bPrevWasSuspended) {
- DBGPRINTF("action enq stage: change bFilterOK to 0 due to "
+ DBGPRINTF("action enq stage: change active to 0 due to "
"failover case in elem %d\n", i);
- pBatch->pElem[i].bFilterOK = 0;
- bModifiedFilter = 1;
+ pBatch->active[i] = 0;
}
- if(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC) {
+ if(batchIsValidElem(pBatch, i)) {
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
bNeedSubmit = 1;
}
- DBGPRINTF("action %p[%d]: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, i, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ DBGPRINTF("action %p[%d]: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, i, batchIsValidElem(pBatch, i), pBatch->pElem[i].state,
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
}
if(bNeedSubmit) {
/* note: stats were already computed above */
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
} else {
- DBGPRINTF("no need to submit batch, all bFilterOK==0 or discarded\n");
- }
- if(bModifiedFilter) {
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
- pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- /* note: clang static code analyzer reports a false positive below */
- pBatch->pElem[i].bFilterOK = pFilterSave[i];
- }
+ DBGPRINTF("no need to submit batch, all invalid\n");
}
} else {
if(GatherStats)
@@ -1740,7 +1751,8 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch)
iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch);
}
-finalize_it:
+ free(pBatch->active);
+ pBatch->active = activeSave;
RETiRet;
}
@@ -1763,11 +1775,10 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch)
* TODO: optimize this, we may do at least a multi-submit!
*/
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, batchIsValidElem(pBatch, i), pBatch->pElem[i].state,
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC
+ if( batchIsValidElem(pBatch, i)
&& (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) {
doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp));
}
@@ -1792,11 +1803,10 @@ helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch)
DBGPRINTF("Called action %p (complex case), logging to %s\n",
pAction, module.GetStateName(pAction->pMod));
for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: filterOK:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
- pAction, pBatch->pElem[i].bFilterOK, pBatch->pElem[i].state,
+ DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d prevWasSusp:%d\n",
+ pAction, batchIsValidElem(pBatch, i), pBatch->pElem[i].state,
pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended);
- if( pBatch->pElem[i].bFilterOK
- && pBatch->pElem[i].state != BATCH_STATE_DISC
+ if( batchIsValidElem(pBatch, i)
&& ((pAction->bExecWhenPrevSusp == 0) || pBatch->pElem[i].bPrevWasSuspended) ) {
doActionCallAction(pAction, pBatch, i);
}
@@ -2086,33 +2096,6 @@ finalize_it:
RETiRet;
}
-
-/* Process a rsyslog v6 action config object (the now-primary config method).
- * rgerhards, 2011-07-19
- */
-rsRetVal
-actionProcessCnf(struct cnfobj __attribute__((unused)) *o)
-{
- DEFiRet;
-#if 0 /* we need to check if we actually need this functionality -- later! */
-// This is for STAND-ALONE actions at the conf file TOP level
- struct cnfparamvals *paramvals;
-
- paramvals = nvlstGetParams(o->nvlst, &pblk, NULL);
- if(paramvals == NULL) {
- iRet = RS_RET_ERR;
- goto finalize_it;
- }
- DBGPRINTF("action param blk after actionProcessCnf:\n");
- cnfparamsPrint(&pblk, paramvals);
-
- /* now find module to activate */
-finalize_it:
-#endif
- RETiRet;
-}
-
-
/* TODO: we are not yet a real object, the ClassInit here just looks like it is..
*/
rsRetVal actionClassInit(void)