diff options
-rw-r--r-- | action.c | 53 | ||||
-rw-r--r-- | runtime/queue.c | 29 | ||||
-rw-r--r-- | runtime/queue.h | 7 | ||||
-rw-r--r-- | tools/syslogd.c | 5 |
4 files changed, 49 insertions, 45 deletions
@@ -115,7 +115,7 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*); +static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti); static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*); static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*); static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*); @@ -632,7 +632,7 @@ static inline void actionSuspend(action_t *pThis, wti_t *pWti) * of its inability to recover. -- rgerhards, 2010-04-26. */ static inline rsRetVal -actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) +actionDoRetry(action_t *pThis, wti_t *pWti) { int iRetries; int iSleepPeriod; @@ -642,7 +642,7 @@ actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) ASSERT(pThis != NULL); iRetries = 0; - while((*pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { + while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) { DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries); iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet); @@ -666,7 +666,7 @@ actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) ++iRetries; iSleepPeriod = pThis->iResumeInterval; srSleep(iSleepPeriod, 0); - if(*pbShutdownImmediate) { + if(*pWti->pbShutdownImmediate) { ABORT_FINALIZE(RS_RET_FORCE_TERM); } } @@ -705,7 +705,7 @@ finalize_it: * changed to new action state engine -- rgerhards, 2009-05-07 */ static rsRetVal -actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) +actionTryResume(action_t *pThis, wti_t *pWti) { DEFiRet; time_t ttNow = NO_TIME_PROVIDED; @@ -727,7 +727,7 @@ actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) if(getActionState(pWti, pThis) == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ datetime.GetTime(&ttNow); - CHKiRet(actionDoRetry(pThis, pWti, pbShutdownImmediate)); + CHKiRet(actionDoRetry(pThis, pWti)); } if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) { @@ -745,13 +745,13 @@ finalize_it: * rgerhards, 2009-05-07 */ static inline rsRetVal -actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti) +actionPrepare(action_t *pThis, wti_t *pWti) { DEFiRet; assert(pThis != NULL); CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti)); - CHKiRet(actionTryResume(pThis, pWti, pbShutdownImmediate)); + CHKiRet(actionTryResume(pThis, pWti)); /* if we are now ready, we initialize the transaction and advance * action state accordingly @@ -961,13 +961,13 @@ finalize_it: * rgerhards, 2008-01-28 */ rsRetVal -actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, int *pbShutdownImmediate, wti_t *pWti) +actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti) { DEFiRet; - CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionPrepare(pThis, pWti)); if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) - pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate); if(getActionState(pWti, pThis) == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti)); @@ -979,7 +979,7 @@ finalize_it: /* Commit try committing (do not handle retry processing and such) */ static rsRetVal -actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) +actionTryCommit(action_t *pThis, wti_t *pWti) { actWrkrInfo_t *wrkrInfo; actWrkrIParams_t *iparamCurr, *iparamDel; @@ -992,7 +992,7 @@ actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) while(iparamCurr != NULL) { iRet = actionProcessMessage(pThis, iparamCurr->msgFlags, iparamCurr->staticActParams, - pbShutdownImmediate, pWti); + pWti); releaseDoActionParams(pThis, pWti); iparamDel = iparamCurr; iparamCurr = iparamCurr->next; @@ -1001,7 +1001,7 @@ actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) wrkrInfo->iparamLast = NULL; } - CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti)); + CHKiRet(actionPrepare(pThis, pWti)); if(getActionState(pWti, pThis) == ACT_STATE_ITX) { dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr); iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); @@ -1038,7 +1038,7 @@ finalize_it: } static rsRetVal -actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) +actionCommit(action_t *pThis, wti_t *pWti) { DEFiRet; // TODO: #warning do we really need to return something? @@ -1055,7 +1055,7 @@ actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate) any of these partial implementations). rgerhards, 2013-11-04 */ - iRet = actionTryCommit(pThis, pWti, pbShutdownImmediate); + iRet = actionTryCommit(pThis, pWti); RETiRet; } @@ -1071,7 +1071,7 @@ actionCommitAllDirect(wti_t *pWti) i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot); pAction = pWti->actWrkrInfo[i].pAction; if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT) - actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pWti->pbShutdownImmediate); + actionCommit(pWti->actWrkrInfo[i].pAction, pWti); } } @@ -1080,7 +1080,7 @@ actionCommitAllDirect(wti_t *pWti) * queue thread if the action queue is set to "direct". */ static rsRetVal -processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate) +processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow) { DEFiRet; @@ -1095,7 +1095,7 @@ dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRa iRet = actionProcessMessage(pAction, pMsg->msgFlags, pWti->actWrkrInfo[pAction->iActionNbr].staticActParams, - pbShutdownImmediate, pWti); + pWti); releaseDoActionParams(pAction, pWti); finalize_it: RETiRet; @@ -1106,7 +1106,7 @@ finalize_it: * rgerhards, 2009-04-22 */ static rsRetVal -processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) +processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti) { action_t *pAction = (action_t*) pVoid; msg_t *pMsg; @@ -1114,23 +1114,19 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed struct syslogTime ttNow; DEFiRet; - if(pbShutdownImmediate == NULL) { - pbShutdownImmediate = pWti->pbShutdownImmediate; - } - /* indicate we have not yet read the date */ ttNow.year = 0; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) { + for(i = 0 ; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate ; ++i) { if(batchIsValidElem(pBatch, i)) { pMsg = pBatch->pElem[i].pMsg; - iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate); + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); // TODO: we must refactor this! flag messages as committed batchSetElemState(pBatch, i, BATCH_STATE_COMM); } } - iRet = actionCommit(pAction, pWti, pbShutdownImmediate); + iRet = actionCommit(pAction, pWti); dbgprintf("DDDD: processBatchMain - end\n"); RETiRet; } @@ -1398,14 +1394,13 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) static inline rsRetVal doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg) { -int pbShutdownImmediate = 0; // TODO: implement dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg); struct syslogTime ttNow; DEFiRet; if(GatherStats) STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); ttNow.year = 0; - iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate); + iRet = processMsgMain(pAction, pWti, pMsg, &ttNow); RETiRet; } diff --git a/runtime/queue.c b/runtime/queue.c index 952edb0f..5d9296b1 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -707,9 +707,14 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) pThis->qType = QUEUETYPE_DIRECT; pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; - pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; - pThis->MultiEnq = qqueueMultiEnqObjDirect; + /* these entry points shall not be used in direct mode + To catch program errors, make us abort if that happens! + TODO: currently main q in direct mode WILL abort! + rgerhards, 2013-11-05 + */ + pThis->qAdd = NULL; + pThis->qDel = NULL; + pThis->MultiEnq = NULL; if(pThis->pqParent != NULL) { DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n"); pThis->pqParent->bIsDA = 0; @@ -983,7 +988,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; - iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti); msgDestruct(&pMsg); RETiRet; @@ -1289,7 +1294,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*, int*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*)) { DEFiRet; qqueue_t *pThis; @@ -1848,7 +1853,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti, &pThis->bShutdownImmediate)); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2070,9 +2076,14 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ case QUEUETYPE_DIRECT: pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; - pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; - pThis->MultiEnq = qqueueMultiEnqObjDirect; + /* these entry points shall not be used in direct mode + To catch program errors, make us abort if that happens! + TODO: currently main q in direct mode WILL abort! + rgerhards, 2013-11-05 + */ + pThis->qAdd = NULL; + pThis->qDel = NULL; + pThis->MultiEnq = NULL; break; } diff --git a/runtime/queue.h b/runtime/queue.h index 45eac45d..e5eef168 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -103,11 +103,10 @@ struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,batch_t*, wti_t*,int*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*, wti_t*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 - * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero - * during normal operations and one if the consumer must urgently shut down. + * is pointer to an array of message message pointers) */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -201,7 +200,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *)); int queueCnfParamsSet(struct nvlst *lst); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst); void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); diff --git a/tools/syslogd.c b/tools/syslogd.c index c27d79b7..7543d338 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -551,17 +551,16 @@ finalize_it: * for the main queue. */ static rsRetVal -msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate) +msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWti) { DEFiRet; assert(pBatch != NULL); - pWti->pbShutdownImmediate = pbShutdownImmediate; preprocessBatch(pBatch, pWti->pbShutdownImmediate); ruleset.ProcessBatch(pBatch, pWti); //TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we //do not have this yet and so we emulate -- 2010-06-10 int i; - for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + for(i = 0 ; i < pBatch->nElem && !*pWti->pbShutdownImmediate ; i++) { pBatch->eltState[i] = BATCH_STATE_COMM; } RETiRet; |