summaryrefslogtreecommitdiffstats
path: root/action.c
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2013-11-04 09:12:56 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2013-11-04 09:12:56 +0100
commitb278457d495379dd0eca1d9da8b51fcf7fad4559 (patch)
tree3261827916c421a5312edfeadf1812d3cc7feb9f /action.c
parent32b37ecd82b508e707cf1aa0b27acb4ac96a295c (diff)
downloadrsyslog-b278457d495379dd0eca1d9da8b51fcf7fad4559.tar.gz
rsyslog-b278457d495379dd0eca1d9da8b51fcf7fad4559.tar.bz2
rsyslog-b278457d495379dd0eca1d9da8b51fcf7fad4559.zip
refactor: move batch "unrolling" up one layer
Diffstat (limited to 'action.c')
-rw-r--r--action.c206
1 files changed, 72 insertions, 134 deletions
diff --git a/action.c b/action.c
index 5d2fab68..07f6c239 100644
--- a/action.c
+++ b/action.c
@@ -14,7 +14,6 @@
*
* if set iExecEveryNthOccur > 1 || iSecsExecOnceInterval
* - doSubmitToActionQComplexBatch
- * - helperSubmitToActionQComplexBatch
* - doActionCallAction
* handles mark message reduction, but in essence calls
* - actionWriteToAction
@@ -117,9 +116,9 @@
/* forward definitions */
static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*);
-static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
-static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
-static rsRetVal doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti);
+static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*);
+static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*);
+static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*);
/* object static data (once for all instances) */
/* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */
@@ -977,26 +976,14 @@ finalize_it:
}
-/* finish processing a batch. Most importantly, that means we commit if we
- * need to do so.
- * rgerhards, 2008-01-28
- */
+/* Commit action after processing. */
static rsRetVal
-finishBatch(action_t *pThis, wti_t *pWti)
+actionCommit(action_t *pThis, wti_t *pWti)
{
- int i;
int pbShutdownImmediate = 1;
DEFiRet;
- ASSERT(pThis != NULL);
-// Testing for new engine:
-dbgprintf("DDDD: iActionNbr %d\n", iActionNbr);
-for(i = 0 ; i < iActionNbr ; ++i) {
- dbgprintf("DDDD: finishBatch, act %d state %u\n", i, getActionStateByNbr(pWti, i));
-}
-
if(getActionState(pWti, pThis) == ACT_STATE_RDY) {
- /* we just need to flag the batch as commited */
FINALIZE; /* nothing to do */
}
@@ -1037,30 +1024,38 @@ finalize_it:
}
-/* copy "active" array of batch, as we need to modify it. The caller
- * must make sure the new array is freed and the orginal batch
- * pointer is restored (thus the caller must save it). If active
- * is currently NULL, this is properly handled.
- * Note: the batches active pointer is modified, so it must be
- * saved BEFORE calling this function!
- * rgerhards, 2012-09-12
+/* process a single message. This is both called if we run from the
+ * cosumer side of an action queue as well as directly from the main
+ * queue thread if the action queue is set to "direct".
*/
static rsRetVal
-copyActive(batch_t *pBatch)
+processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate)
{
- sbool *active;
DEFiRet;
- CHKmalloc(active = malloc(batchNumMsgs(pBatch) * sizeof(sbool)));
- if(pBatch->active == NULL)
- memset(active, 1, batchNumMsgs(pBatch));
- else
- memcpy(active, pBatch->active, batchNumMsgs(pBatch));
- pBatch->active = active;
-finalize_it:
+dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRawMsg);
+ // TODO: check error return states!
+ iRet = prepareDoActionParams(pAction, pWti, pMsg, ttNow);
+ iRet = actionProcessMessage(pAction, pMsg,
+ pWti->actWrkrInfo[pAction->iActionNbr].staticActParams,
+ pbShutdownImmediate, pWti);
+ releaseDoActionParams(pAction, pWti);
RETiRet;
}
+/* Commit all active transactions */
+rsRetVal
+actionCommitAll(wti_t *pWti)
+{
+ int i;
+ for(i = 0 ; i < iActionNbr ; ++i) {
+ DBGPRINTF("DDDD: actionCommitall action %d state %u\n",
+ i, getActionStateByNbr(pWti, i));
+ if(getActionStateByNbr(pWti, i) != ACT_STATE_RDY) {
+ actionCommit(pWti->actWrkrInfo[i].pAction, pWti);
+ }
+ }
+}
/* receive an array of to-process user pointers and submit them
* for processing.
@@ -1085,19 +1080,13 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed
for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) {
if(batchIsValidElem(pBatch, i)) {
pMsg = pBatch->pElem[i].pMsg;
-dbgprintf("DDDD: processBatchMain[act %d], elt %d: %s\n", pAction->iActionNbr, i, pMsg->pszRawMsg);
- // TODO: check error return states!
- iRet = prepareDoActionParams(pAction, pWti, pMsg, &ttNow);
- iRet = actionProcessMessage(pAction, pMsg,
- pWti->actWrkrInfo[pAction->iActionNbr].staticActParams,
- pbShutdownImmediate, pWti);
- releaseDoActionParams(pAction, pWti);
- // TODO: we must refactor this! flag messages as committed
- batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate);
+ // TODO: we must refactor this! flag messages as committed
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
}
}
- iRet = finishBatch(pAction, pWti);
+ iRet = actionCommit(pAction, pWti);
dbgprintf("DDDD: processBatchMain - end\n");
RETiRet;
}
@@ -1161,7 +1150,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
* rgerhards, 2010-06-08
*/
static inline rsRetVal
-doSubmitToActionQ(action_t *pAction, msg_t *pMsg, wti_t *pWti)
+doSubmitToActionQ(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
DEFiRet;
@@ -1245,7 +1234,7 @@ actionWriteToAction(action_t *pAction, msg_t *pMsg, wti_t *pWti)
/* When we reach this point, we have a valid, non-disabled action.
* So let's enqueue our message for execution. -- rgerhards, 2007-07-24
*/
- iRet = doSubmitToActionQ(pAction, pMsg, pWti);
+ iRet = doSubmitToActionQ(pAction, pWti, pMsg);
finalize_it:
RETiRet;
@@ -1256,12 +1245,10 @@ finalize_it:
* pthread_cleanup_push() POSIX macro...
*/
static inline rsRetVal
-doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch, wti_t *pWti)
+doActionCallAction(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
- msg_t *pMsg;
DEFiRet;
- pMsg = pBatch->pElem[idxBtch].pMsg;
pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */
/* don't output marks to recently written outputs */
@@ -1325,75 +1312,55 @@ activateActions(void)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
time_t now = 0;
+ int doProcess = 1;
time_t lastAct;
- int i;
- sbool *activeSave;
DEFiRet;
- activeSave = pBatch->active;
- copyActive(pBatch);
-
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if((pBatch->eltState[i] == BATCH_STATE_DISC) || !pBatch->active[i])
- continue;
- if(now == 0) {
- now = datetime.GetTime(NULL); /* good time call - the only one done */
- }
- /* CAS loop, we write back a bit early, but that's OK... */
- /* we use reception time, not dequeue time - this is considered more appropriate and
- * also faster ;) -- rgerhards, 2008-09-17 */
- do {
- lastAct = pAction->f_time;
- if(pBatch->pElem[i].pMsg->msgFlags & MARK) {
- if((now - lastAct) < MarkInterval / 2) {
- pBatch->active[i] = 0;
- DBGPRINTF("batch item %d: action was recently called, ignoring "
- "mark message\n", i);
- break; /* do not update timestamp for non-written mark messages */
- }
+ if(now == 0) { // TODO: do in caller!
+ now = datetime.GetTime(NULL); /* good time call - the only one done */
+ }
+ /* CAS loop, we write back a bit early, but that's OK... */
+ /* we use reception time, not dequeue time - this is considered more appropriate and
+ * also faster ;) -- rgerhards, 2008-09-17 */
+ do {
+ lastAct = pAction->f_time;
+ if(pMsg->msgFlags & MARK) {
+ if((now - lastAct) < MarkInterval / 2) {
+ doProcess = 0;
+ DBGPRINTF("action was recently called, ignoring mark message\n");
+ break; /* do not update timestamp for non-written mark messages */
}
- } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
- pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0);
- if(pBatch->active[i]) {
- DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n",
- i, module.GetStateName(pAction->pMod));
}
- }
-
- iRet = doSubmitToActionQBatch(pAction, pBatch, pWti);
+ } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct,
+ pMsg->ttGenTime, &pAction->mutCAS) == 0);
- free(pBatch->active);
- pBatch->active = activeSave;
+ if(doProcess) {
+ DBGPRINTF("Called action(NotAllMark), processing via '%s'\n",
+ module.GetStateName(pAction->pMod));
+ iRet = doSubmitToActionQBatch(pAction, pWti, pMsg);
+ }
RETiRet;
}
-static inline void
-countStatsBatchEnq(action_t *pAction, batch_t *pBatch)
-{
- int i;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( batchIsValidElem(pBatch, i)) {
- STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
- }
- }
-}
-
/* enqueue a batch in direct mode. We have put this into its own function just to avoid
* cluttering the actual submit function.
* rgerhards, 2011-06-16
*/
static inline rsRetVal
-doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
+int pbShutdownImmediate = 0; // TODO: implement
+ struct syslogTime ttNow;
DEFiRet;
if(GatherStats)
- countStatsBatchEnq(pAction, pBatch);
- iRet = qqueueEnqObjDirectBatch(pAction->pQueue, pBatch, pWti);
+ STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
+ ttNow.year = 0;
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate);
RETiRet;
}
@@ -1403,67 +1370,38 @@ doQueueEnqObjDirectBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
* rgerhards, 2010-06-08
*/
static rsRetVal
-doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
- int i;
DEFiRet;
- DBGPRINTF("Called action(Batch), logging to %s\n", module.GetStateName(pAction->pMod));
+ DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod));
if(pAction->pQueue->qType == QUEUETYPE_DIRECT) {
- iRet = doQueueEnqObjDirectBatch(pAction, pBatch, pWti);
+ iRet = doQueueEnqObjDirectBatch(pAction, pWti, pMsg);
} else {/* in this case, we do single submits to the queue.
* TODO: optimize this, we may do at least a multi-submit!
*/
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if(batchIsValidElem(pBatch, i)) {
- doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg, pWti);
- }
- }
+ doSubmitToActionQ(pAction, pWti, pMsg);
}
RETiRet;
}
-
-/* Helper to submit a batch of actions to the engine. Note that we have rather
- * complicated processing here, so we need to do this one message after another.
- * rgerhards, 2010-06-23
- */
-static inline rsRetVal
-helperSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
-{
- int i;
- DEFiRet;
-
- DBGPRINTF("Called action %p (complex case), logging to %s\n",
- pAction, module.GetStateName(pAction->pMod));
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- DBGPRINTF("action %p: valid:%d state:%d execWhenPrev:%d\n",
- pAction, batchIsValidElem(pBatch, i), pBatch->eltState[i],
- pAction->bExecWhenPrevSusp);
- if(batchIsValidElem(pBatch, i)) {
- doActionCallAction(pAction, pBatch, i, pWti);
- }
- }
-
- RETiRet;
-}
-
/* Call configured action, most complex case with all features supported (and thus slow).
* rgerhards, 2010-06-08
*/
#pragma GCC diagnostic ignored "-Wempty-body"
static rsRetVal
-doSubmitToActionQComplexBatch(action_t *pAction, batch_t *pBatch, wti_t *pWti)
+doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
DEFiRet;
d_pthread_mutex_lock(&pAction->mutAction);
-dbgprintf("DDDD: locked mutAction\n");
pthread_cleanup_push(mutexCancelCleanup, &pAction->mutAction);
- iRet = helperSubmitToActionQComplexBatch(pAction, pBatch, pWti);
+ DBGPRINTF("Called action %p (complex case), logging to %s\n",
+ pAction, module.GetStateName(pAction->pMod));
+ doActionCallAction(pAction, pWti, pMsg);
d_pthread_mutex_unlock(&pAction->mutAction);
pthread_cleanup_pop(0); /* remove mutex cleanup handler */