summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c53
-rw-r--r--runtime/queue.c29
-rw-r--r--runtime/queue.h7
-rw-r--r--tools/syslogd.c5
4 files changed, 49 insertions, 45 deletions
diff --git a/action.c b/action.c
index 790ea4ce..54004311 100644
--- a/action.c
+++ b/action.c
@@ -115,7 +115,7 @@
#define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */
/* forward definitions */
-static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int*);
+static rsRetVal processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti);
static rsRetVal doSubmitToActionQComplexBatch(action_t *pAction, wti_t *pWti, msg_t*);
static rsRetVal doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t*);
static rsRetVal doSubmitToActionQBatch(action_t *pAction, wti_t *pWti, msg_t*);
@@ -632,7 +632,7 @@ static inline void actionSuspend(action_t *pThis, wti_t *pWti)
* of its inability to recover. -- rgerhards, 2010-04-26.
*/
static inline rsRetVal
-actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
+actionDoRetry(action_t *pThis, wti_t *pWti)
{
int iRetries;
int iSleepPeriod;
@@ -642,7 +642,7 @@ actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
ASSERT(pThis != NULL);
iRetries = 0;
- while((*pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
+ while((*pWti->pbShutdownImmediate == 0) && getActionState(pWti, pThis) == ACT_STATE_RTRY) {
DBGPRINTF("actionDoRetry: enter loop, iRetries=%d\n", iRetries);
iRet = pThis->pMod->tryResume(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
DBGPRINTF("actionDoRetry: action->tryResume returned %d\n", iRet);
@@ -666,7 +666,7 @@ actionDoRetry(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
++iRetries;
iSleepPeriod = pThis->iResumeInterval;
srSleep(iSleepPeriod, 0);
- if(*pbShutdownImmediate) {
+ if(*pWti->pbShutdownImmediate) {
ABORT_FINALIZE(RS_RET_FORCE_TERM);
}
}
@@ -705,7 +705,7 @@ finalize_it:
* changed to new action state engine -- rgerhards, 2009-05-07
*/
static rsRetVal
-actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
+actionTryResume(action_t *pThis, wti_t *pWti)
{
DEFiRet;
time_t ttNow = NO_TIME_PROVIDED;
@@ -727,7 +727,7 @@ actionTryResume(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
if(getActionState(pWti, pThis) == ACT_STATE_RTRY) {
if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */
datetime.GetTime(&ttNow);
- CHKiRet(actionDoRetry(pThis, pWti, pbShutdownImmediate));
+ CHKiRet(actionDoRetry(pThis, pWti));
}
if(Debug && (getActionState(pWti, pThis) == ACT_STATE_RTRY ||getActionState(pWti, pThis) == ACT_STATE_SUSP)) {
@@ -745,13 +745,13 @@ finalize_it:
* rgerhards, 2009-05-07
*/
static inline rsRetVal
-actionPrepare(action_t *pThis, int *pbShutdownImmediate, wti_t *pWti)
+actionPrepare(action_t *pThis, wti_t *pWti)
{
DEFiRet;
assert(pThis != NULL);
CHKiRet(actionCheckAndCreateWrkrInstance(pThis, pWti));
- CHKiRet(actionTryResume(pThis, pWti, pbShutdownImmediate));
+ CHKiRet(actionTryResume(pThis, pWti));
/* if we are now ready, we initialize the transaction and advance
* action state accordingly
@@ -961,13 +961,13 @@ finalize_it:
* rgerhards, 2008-01-28
*/
rsRetVal
-actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, int *pbShutdownImmediate, wti_t *pWti)
+actionProcessMessage(action_t *pThis, int msgFlags, void *actParams, wti_t *pWti)
{
DEFiRet;
- CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
+ CHKiRet(actionPrepare(pThis, pWti));
if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
- pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
+ pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pWti->pbShutdownImmediate);
if(getActionState(pWti, pThis) == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, msgFlags, actParams, pWti));
@@ -979,7 +979,7 @@ finalize_it:
/* Commit try committing (do not handle retry processing and such) */
static rsRetVal
-actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
+actionTryCommit(action_t *pThis, wti_t *pWti)
{
actWrkrInfo_t *wrkrInfo;
actWrkrIParams_t *iparamCurr, *iparamDel;
@@ -992,7 +992,7 @@ actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
while(iparamCurr != NULL) {
iRet = actionProcessMessage(pThis, iparamCurr->msgFlags,
iparamCurr->staticActParams,
- pbShutdownImmediate, pWti);
+ pWti);
releaseDoActionParams(pThis, pWti);
iparamDel = iparamCurr;
iparamCurr = iparamCurr->next;
@@ -1001,7 +1001,7 @@ actionTryCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
wrkrInfo->iparamLast = NULL;
}
- CHKiRet(actionPrepare(pThis, pbShutdownImmediate, pWti));
+ CHKiRet(actionPrepare(pThis, pWti));
if(getActionState(pWti, pThis) == ACT_STATE_ITX) {
dbgprintf("DDDDD: calling endTransaction for action %d\n", pThis->iActionNbr);
iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
@@ -1038,7 +1038,7 @@ finalize_it:
}
static rsRetVal
-actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
+actionCommit(action_t *pThis, wti_t *pWti)
{
DEFiRet;
// TODO: #warning do we really need to return something?
@@ -1055,7 +1055,7 @@ actionCommit(action_t *pThis, wti_t *pWti, int *pbShutdownImmediate)
any of these partial implementations).
rgerhards, 2013-11-04
*/
- iRet = actionTryCommit(pThis, pWti, pbShutdownImmediate);
+ iRet = actionTryCommit(pThis, pWti);
RETiRet;
}
@@ -1071,7 +1071,7 @@ actionCommitAllDirect(wti_t *pWti)
i, getActionStateByNbr(pWti, i), pWti->actWrkrInfo[i].iparamRoot);
pAction = pWti->actWrkrInfo[i].pAction;
if(pAction != NULL && pAction->pQueue->qType == QUEUETYPE_DIRECT)
- actionCommit(pWti->actWrkrInfo[i].pAction, pWti, pWti->pbShutdownImmediate);
+ actionCommit(pWti->actWrkrInfo[i].pAction, pWti);
}
}
@@ -1080,7 +1080,7 @@ actionCommitAllDirect(wti_t *pWti)
* queue thread if the action queue is set to "direct".
*/
static rsRetVal
-processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow, int *pbShutdownImmediate)
+processMsgMain(action_t *pAction, wti_t *pWti, msg_t *pMsg, struct syslogTime *ttNow)
{
DEFiRet;
@@ -1095,7 +1095,7 @@ dbgprintf("DDDD: processMsgMain[act %d], %s\n", pAction->iActionNbr, pMsg->pszRa
iRet = actionProcessMessage(pAction, pMsg->msgFlags,
pWti->actWrkrInfo[pAction->iActionNbr].staticActParams,
- pbShutdownImmediate, pWti);
+ pWti);
releaseDoActionParams(pAction, pWti);
finalize_it:
RETiRet;
@@ -1106,7 +1106,7 @@ finalize_it:
* rgerhards, 2009-04-22
*/
static rsRetVal
-processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate)
+processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti)
{
action_t *pAction = (action_t*) pVoid;
msg_t *pMsg;
@@ -1114,23 +1114,19 @@ processBatchMain(void *pVoid, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmed
struct syslogTime ttNow;
DEFiRet;
- if(pbShutdownImmediate == NULL) {
- pbShutdownImmediate = pWti->pbShutdownImmediate;
- }
-
/* indicate we have not yet read the date */
ttNow.year = 0;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*pbShutdownImmediate ; ++i) {
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*pWti->pbShutdownImmediate ; ++i) {
if(batchIsValidElem(pBatch, i)) {
pMsg = pBatch->pElem[i].pMsg;
- iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, pbShutdownImmediate);
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow);
// TODO: we must refactor this! flag messages as committed
batchSetElemState(pBatch, i, BATCH_STATE_COMM);
}
}
- iRet = actionCommit(pAction, pWti, pbShutdownImmediate);
+ iRet = actionCommit(pAction, pWti);
dbgprintf("DDDD: processBatchMain - end\n");
RETiRet;
}
@@ -1398,14 +1394,13 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
static inline rsRetVal
doQueueEnqObjDirectBatch(action_t *pAction, wti_t *pWti, msg_t *pMsg)
{
-int pbShutdownImmediate = 0; // TODO: implement
dbgprintf("DDDD: doQueueEnqObjDirectBatch: %s\n", pMsg->pszRawMsg);
struct syslogTime ttNow;
DEFiRet;
if(GatherStats)
STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed);
ttNow.year = 0;
- iRet = processMsgMain(pAction, pWti, pMsg, &ttNow, &pbShutdownImmediate);
+ iRet = processMsgMain(pAction, pWti, pMsg, &ttNow);
RETiRet;
}
diff --git a/runtime/queue.c b/runtime/queue.c
index 952edb0f..5d9296b1 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -707,9 +707,14 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
pThis->qType = QUEUETYPE_DIRECT;
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
- pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
- pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ /* these entry points shall not be used in direct mode
+ To catch program errors, make us abort if that happens!
+ TODO: currently main q in direct mode WILL abort!
+ rgerhards, 2013-11-05
+ */
+ pThis->qAdd = NULL;
+ pThis->qDel = NULL;
+ pThis->MultiEnq = NULL;
if(pThis->pqParent != NULL) {
DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
pThis->pqParent->bIsDA = 0;
@@ -983,7 +988,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.eltState = &batchState;
- iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti, &pThis->bShutdownImmediate);
+ iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti);
msgDestruct(&pMsg);
RETiRet;
@@ -1289,7 +1294,7 @@ finalize_it:
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*, int*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1848,7 +1853,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
- CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti, &pThis->bShutdownImmediate));
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
+ CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -2070,9 +2076,14 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
case QUEUETYPE_DIRECT:
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
- pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
- pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ /* these entry points shall not be used in direct mode
+ To catch program errors, make us abort if that happens!
+ TODO: currently main q in direct mode WILL abort!
+ rgerhards, 2013-11-05
+ */
+ pThis->qAdd = NULL;
+ pThis->qDel = NULL;
+ pThis->MultiEnq = NULL;
break;
}
diff --git a/runtime/queue.h b/runtime/queue.h
index 45eac45d..e5eef168 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -103,11 +103,10 @@ struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
- rsRetVal (*pConsumer)(void *,batch_t*, wti_t*,int*); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,batch_t*, wti_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
- * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero
- * during normal operations and one if the consumer must urgently shut down.
+ * is pointer to an array of message message pointers)
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@@ -201,7 +200,7 @@ rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *, int*));
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *));
int queueCnfParamsSet(struct nvlst *lst);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst);
void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);
diff --git a/tools/syslogd.c b/tools/syslogd.c
index c27d79b7..7543d338 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -551,17 +551,16 @@ finalize_it:
* for the main queue.
*/
static rsRetVal
-msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate)
+msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWti)
{
DEFiRet;
assert(pBatch != NULL);
- pWti->pbShutdownImmediate = pbShutdownImmediate;
preprocessBatch(pBatch, pWti->pbShutdownImmediate);
ruleset.ProcessBatch(pBatch, pWti);
//TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we
//do not have this yet and so we emulate -- 2010-06-10
int i;
- for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) {
+ for(i = 0 ; i < pBatch->nElem && !*pWti->pbShutdownImmediate ; i++) {
pBatch->eltState[i] = BATCH_STATE_COMM;
}
RETiRet;