From 4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 13 Oct 2009 14:38:45 +0200 Subject: added some debug settings plus improved shutdown sequence ... non-working version! --- action.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index d22d6d6d..dddb0e01 100644 --- a/action.c +++ b/action.c @@ -50,7 +50,7 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -291,7 +291,7 @@ actionConstructFinalize(action_t *pThis) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, - (rsRetVal (*)(void*, batch_t*))processBatchMain)); + (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -917,7 +917,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) * rgerhards, 2009-05-12 */ static rsRetVal -processAction(action_t *pAction, batch_t *pBatch) +processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { int i; msg_t *pMsg; @@ -934,7 +934,7 @@ processAction(action_t *pAction, batch_t *pBatch) CHKiRet(localRet); /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ - for(i = 0 ; i < pBatch->nElem ; i++) { + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; } iRet = finishBatch(pAction); @@ -950,7 +950,7 @@ finalize_it: * rgerhards, 2009-04-22 */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch) +processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { DEFiRet; @@ -964,7 +964,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch) d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - iRet = processAction(pAction, pBatch); + iRet = processAction(pAction, pBatch, pbShutdownImmediate); pthread_cleanup_pop(1); /* unlock mutex */ -- cgit v1.2.3 From c5408da3d8f17691fb91282d031757ed041fec55 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 14 Oct 2009 11:01:21 +0200 Subject: new queue engine - initial commit (probably not 100% working!) simplified and thus speeded up the queue engine, also fixed some potential race conditions (in very unusual shutdown conditions) along the way. The threading model has seriously changes, so there may be some regressions. NOTE: the code passed basic tests, but there is still more work and testing to be done. This commit should be treated with care. --- action.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'action.c') diff --git a/action.c b/action.c index dddb0e01..5bd175e5 100644 --- a/action.c +++ b/action.c @@ -934,7 +934,8 @@ processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) CHKiRet(localRet); /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ - for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + //for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + for(i = 0 ; i < pBatch->nElem ; i++) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; } iRet = finishBatch(pAction); -- cgit v1.2.3 From 90e8475260cf8ac54519b3d964d879489af879f6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Oct 2009 09:41:45 +0200 Subject: bugfix: message processing states were not set correctly in all cases however, this had no negative effect, as the message processing state was not evaluated when a batch was deleted, and that was the only case where the state could be wrong. --- action.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 5bd175e5..58658ac1 100644 --- a/action.c +++ b/action.c @@ -821,12 +821,12 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) DBGPRINTF("action call returned %d\n", localRet); if(localRet == RS_RET_OK) { /* mark messages as committed */ - while(iCommittedUpTo < i) { + while(iCommittedUpTo <= i) { pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { /* mark messages as committed */ - while(iCommittedUpTo < i - 1) { + while(iCommittedUpTo < i) { pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } pBatch->pElem[i].state = BATCH_STATE_SUB; @@ -838,6 +838,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) iRet = localRet; FINALIZE; } +dbgprintf("XXX: submitBatch set element %d state to %d\n", i, pBatch->pElem[i].state); } ++i; ++iElemProcessed; @@ -871,6 +872,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) bDone = 0; do { +dbgprintf("XXX: submitBatch in loop, batch size %d\n", nElem); localRet = tryDoAction(pAction, pBatch, &nElem); if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED -- cgit v1.2.3 From 33e216daf7f89542cc6c91f1e97da6fdb71eecf8 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 22 Oct 2009 14:57:34 +0200 Subject: Begun to work on partial batch deletes... ... but this brings a lot of problems with it. The issue is that we still have a sequential store and we do not know how we could delete the one entry right in the middle of processing. I keep this branch if we intend to move on with it - but for now I look into a different solution... --- action.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 58658ac1..3439f123 100644 --- a/action.c +++ b/action.c @@ -936,8 +936,8 @@ processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) CHKiRet(localRet); /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ - //for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { - for(i = 0 ; i < pBatch->nElem ; i++) { + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + //for(i = 0 ; i < pBatch->nElem ; i++) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; } iRet = finishBatch(pAction); -- cgit v1.2.3 From 6cf7fc7ec2f1b2f69fb69d8b18df8240beed3380 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 13:53:45 +0100 Subject: action processing optimized for queue shutdown --- action.c | 38 +++++++++++++++----------------------- 1 file changed, 15 insertions(+), 23 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 3439f123..d49c9d2c 100644 --- a/action.c +++ b/action.c @@ -798,7 +798,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static rsRetVal -tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) +tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem, int *pbShutdownImmediate) { int i; int iElemProcessed; @@ -814,8 +814,9 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) iElemProcessed = 0; iCommittedUpTo = i; while(iElemProcessed <= *pnElem && i < pBatch->nElem) { + if(*pbShutdownImmediate) + ABORT_FINALIZE(RS_RET_FORCE_TERM); pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - DBGPRINTF("submitBatch: i:%d, batch size %d, to process %d, pMsg: %p, state %d\n", i, pBatch->nElem, *pnElem, pMsg, pBatch->pElem[i].state);//remove later! if(pBatch->pElem[i].state != BATCH_STATE_DISC) { localRet = actionProcessMessage(pAction, pMsg); DBGPRINTF("action call returned %d\n", localRet); @@ -838,7 +839,6 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) iRet = localRet; FINALIZE; } -dbgprintf("XXX: submitBatch set element %d state to %d\n", i, pBatch->pElem[i].state); } ++i; ++iElemProcessed; @@ -861,7 +861,7 @@ finalize_it: * rgerhards, 2009-05-12 */ static rsRetVal -submitBatch(action_t *pAction, batch_t *pBatch, int nElem) +submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmediate) { int i; int bDone; @@ -873,7 +873,9 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) bDone = 0; do { dbgprintf("XXX: submitBatch in loop, batch size %d\n", nElem); - localRet = tryDoAction(pAction, pBatch, &nElem); + localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate); + if(localRet == RS_RET_FORCE_TERM) + FINALIZE; if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED || localRet == RS_RET_DEFER_COMMIT) { @@ -904,13 +906,17 @@ dbgprintf("XXX: submitBatch in loop, batch size %d\n", nElem); bDone = 1; } else { /* retry with half as much. Depth is log_2 batchsize, so recursion is not too deep */ - submitBatch(pAction, pBatch, nElem / 2); - submitBatch(pAction, pBatch, nElem - (nElem / 2)); + submitBatch(pAction, pBatch, nElem / 2, pbShutdownImmediate); + submitBatch(pAction, pBatch, nElem - (nElem / 2), pbShutdownImmediate); bDone = 1; } } - } while(!bDone); /* do .. while()! */ + } while(!bDone && !*pbShutdownImmediate); /* do .. while()! */ + + if(*pbShutdownImmediate) + ABORT_FINALIZE(RS_RET_FORCE_TERM); +finalize_it: RETiRet; } @@ -921,25 +927,11 @@ dbgprintf("XXX: submitBatch in loop, batch size %d\n", nElem); static rsRetVal processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { - int i; - msg_t *pMsg; - rsRetVal localRet; DEFiRet; assert(pBatch != NULL); - pBatch->iDoneUpTo = 0; - /* TODO: think about action batches, must be handled at upper layer! - * MULTIQUEUE - */ - localRet = submitBatch(pAction, pBatch, pBatch->nElem); - CHKiRet(localRet); - - /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ - for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { - //for(i = 0 ; i < pBatch->nElem ; i++) { - pMsg = (msg_t*) pBatch->pElem[i].pUsrp; - } + CHKiRet(submitBatch(pAction, pBatch, pBatch->nElem, pbShutdownImmediate)); iRet = finishBatch(pAction); finalize_it: -- cgit v1.2.3 From f3134f89211ea6a65e72bca1dd2f91bf0a0ae894 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 15:16:10 +0100 Subject: some more cleanup - action config line handlers are now defined in action.c As an artifact of early development, they were registered in syslogd.c --- action.c | 69 ++++++++++++++++++++++++++-------------------------------------- 1 file changed, 28 insertions(+), 41 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index d49c9d2c..3e9ac544 100644 --- a/action.c +++ b/action.c @@ -1255,47 +1255,6 @@ actionCallAction(action_t *pAction, msg_t *pMsg) #pragma GCC diagnostic warning "-Wempty-body" -/* add our cfsysline handlers - * rgerhards, 2008-01-28 - */ -rsRetVal -actionAddCfSysLineHdrl(void) -{ - DEFiRet; - - CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &bActionWriteAllMarkMsgs, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); - -finalize_it: - RETiRet; -} - - /* add an Action to the current selector * The pOMSR is freed, as it is not needed after this function. * Note: this function pulls global data that specifies action config state. @@ -1423,6 +1382,34 @@ rsRetVal actionClassInit(void) CHKiRet(objUse(module, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionname", 0, eCmdHdlrGetWord, NULL, &pszActionName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuefilename", 0, eCmdHdlrGetWord, NULL, &pszActionQFName, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesize", 0, eCmdHdlrInt, NULL, &iActionQueueSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionwriteallmarkmessages", 0, eCmdHdlrBinary, NULL, &bActionWriteAllMarkMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuebatchsize", 0, eCmdHdlrInt, NULL, &iActionQueueDeqBatchSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxdiskspace", 0, eCmdHdlrSize, NULL, &iActionQueMaxDiskSpace, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuehighwatermark", 0, eCmdHdlrInt, NULL, &iActionQHighWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuelowwatermark", 0, eCmdHdlrInt, NULL, &iActionQLowWtrMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardmark", 0, eCmdHdlrInt, NULL, &iActionQDiscardMark, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuediscardseverity", 0, eCmdHdlrInt, NULL, &iActionQDiscardSeverity, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuecheckpointinterval", 0, eCmdHdlrInt, NULL, &iActionQPersistUpdCnt, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesyncqueuefiles", 0, eCmdHdlrBinary, NULL, &bActionQSyncQeueFiles, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetype", 0, eCmdHdlrGetWord, setActionQueType, NULL, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreads", 0, eCmdHdlrInt, NULL, &iActionQueueNumWorkers, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoQShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutactioncompletion", 0, eCmdHdlrInt, NULL, &iActionQtoActShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuetimeoutenqueue", 0, eCmdHdlrInt, NULL, &iActionQtoEnq, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkertimeoutthreadshutdown", 0, eCmdHdlrInt, NULL, &iActionQtoWrkShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueueworkerthreadminimummessages", 0, eCmdHdlrInt, NULL, &iActionQWrkMinMsgs, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuemaxfilesize", 0, eCmdHdlrSize, NULL, &iActionQueMaxFileSize, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuesaveonshutdown", 0, eCmdHdlrBinary, NULL, &bActionQSaveOnShutdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeueslowdown", 0, eCmdHdlrInt, NULL, &iActionQueueDeqSlowdown, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimebegin", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinFromHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); + finalize_it: RETiRet; } -- cgit v1.2.3 From 05d693123b175855174023874ce6f497f3642e61 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 2 Nov 2009 11:39:38 +0100 Subject: added omruleset output module, which provides great flexibility in action processing. THIS IS A VERY IMPORTANT ADDITION, see its doc for why. --- action.c | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 3e9ac544..e58d2e85 100644 --- a/action.c +++ b/action.c @@ -210,7 +210,7 @@ rsRetVal actionDestruct(action_t *pThis) if(pThis->ppMsgs[i] != NULL) { switch(pThis->eParamPassing) { case ACT_ARRAY_PASSING: -#if 0 /* later! */ +#if 0 /* later, as an optimization. So far, we do the cleanup after each message */ iArr = 0; while(((char **)pThis->ppMsgs[i])[iArr] != NULL) { d_free(((char **)pThis->ppMsgs[i])[iArr++]); @@ -223,6 +223,9 @@ rsRetVal actionDestruct(action_t *pThis) case ACT_STRING_PASSING: d_free(pThis->ppMsgs[i]); break; + case ACT_MSG_PASSING: + /* No cleanup needed in this case */ + break; default: assert(0); } @@ -623,6 +626,12 @@ static rsRetVal prepareDoActionParams(action_t *pAction, msg_t *pMsg) case ACT_ARRAY_PASSING: CHKiRet(tplToArray(pAction->ppTpl[i], pMsg, (uchar***) &(pAction->ppMsgs[i]))); break; + case ACT_MSG_PASSING: + /* we abuse the uchar* ptr, it now actually is a void*, but we can not + * change that other than by chaning the interface, what we don't like... + */ + pAction->ppMsgs[i] = (uchar*) pMsg; + break; default:assert(0); /* software bug if this happens! */ } } @@ -654,6 +663,7 @@ static rsRetVal cleanupDoActionParams(action_t *pAction) d_free(pAction->ppMsgs[i]); pAction->ppMsgs[i] = NULL; break; + case ACT_MSG_PASSING: case ACT_STRING_PASSING: break; default: @@ -1306,9 +1316,8 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques for(i = 0 ; i < pAction->iNumTpls ; ++i) { CHKiRet(OMSRgetEntry(pOMSR, i, &pTplName, &iTplOpts)); - /* Ok, we got everything, so it now is time to look up the - * template (Hint: templates MUST be defined before they are - * used!) + /* Ok, we got everything, so it now is time to look up the template + * (Hint: templates MUST be defined before they are used!) */ if((pAction->ppTpl[i] = tplFind((char*)pTplName, strlen((char*)pTplName))) == NULL) { snprintf(errMsg, sizeof(errMsg) / sizeof(char), @@ -1330,6 +1339,8 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques /* set parameter-passing mode */ if(iTplOpts & OMSR_TPL_AS_ARRAY) { pAction->eParamPassing = ACT_ARRAY_PASSING; + } else if(iTplOpts & OMSR_TPL_AS_MSG) { + pAction->eParamPassing = ACT_MSG_PASSING; } else { pAction->eParamPassing = ACT_STRING_PASSING; } -- cgit v1.2.3 From 1b7f5c54684db29c096e09238648a45dce78ebee Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 4 Nov 2009 10:40:27 +0100 Subject: moved rfc3164/5424 code to new parser modules another milestone commit: the program works, the new interface is used, some more cleanup is needed and the per-ruleset config options are still missing. But we are getting closer... --- action.c | 1 - 1 file changed, 1 deletion(-) (limited to 'action.c') diff --git a/action.c b/action.c index e58d2e85..06d1095c 100644 --- a/action.c +++ b/action.c @@ -882,7 +882,6 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem, int *pbShutdownImmedi bDone = 0; do { -dbgprintf("XXX: submitBatch in loop, batch size %d\n", nElem); localRet = tryDoAction(pAction, pBatch, &nElem, pbShutdownImmediate); if(localRet == RS_RET_FORCE_TERM) FINALIZE; -- cgit v1.2.3 From 2dafb35077f013a22f0ce9d59306d8f59d709a51 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 12 Nov 2009 12:03:27 +0100 Subject: some module cleanup --- action.c | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'action.c') diff --git a/action.c b/action.c index 06d1095c..22615c2c 100644 --- a/action.c +++ b/action.c @@ -59,6 +59,7 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(module) DEFobjCurrIf(errmsg) +static int iActExecOnceInterval = 0; /* execute action once every nn seconds */ static int iActExecEveryNthOccur = 0; /* execute action every n-th occurence (0,1=always) */ static time_t iActExecEveryNthOccurTO = 0; /* timeout for n-occurence setting (in seconds, 0=never) */ static int glbliActionResumeInterval = 30; @@ -1381,6 +1382,17 @@ finalize_it: } +/* Reset config variables to default values. + * rgerhards, 2009-11-12 + */ +static rsRetVal +resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) +{ + iActExecOnceInterval = 0; + return RS_RET_OK; +} + + /* TODO: we are not yet a real object, the ClassInit here just looks like it is.. */ rsRetVal actionClassInit(void) @@ -1418,7 +1430,9 @@ rsRetVal actionClassInit(void) CHKiRet(regCfSysLineHdlr((uchar *)"actionqueuedequeuetimeend", 0, eCmdHdlrInt, NULL, &iActionQueueDeqtWinToHr, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtime", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccur, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyeverynthtimetimeout", 0, eCmdHdlrInt, NULL, &iActExecEveryNthOccurTO, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"actionexeconlyonceeveryinterval", 0, eCmdHdlrInt, NULL, &iActExecOnceInterval, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"repeatedmsgcontainsoriginalmsg", 0, eCmdHdlrBinary, NULL, &bActionRepMsgHasMsg, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); finalize_it: RETiRet; -- cgit v1.2.3 From 0a5b731f2f18168795bcdefd9b6fa562dce6a0d5 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 12 Nov 2009 13:14:39 +0100 Subject: bugfix: $ActionExecOnlyOnceEveryInterval did not work. This was a regression from the time() optimizations done in v4. Bug tracker: http://bugzilla.adiscon.com/show_bug.cgi?id=143 Thanks to Klaus Tachtler for reporting this bug. --- action.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'action.c') diff --git a/action.c b/action.c index 51620fce..6d0b5da4 100644 --- a/action.c +++ b/action.c @@ -686,7 +686,7 @@ actionWriteToAction(action_t *pAction) * a purely logical point of view. However, if safes us to check the system time in * (those common) cases where ExecOnceInterval is not used. -- rgerhards, 2008-09-16 */ - if(pAction->f_time != 0 && pAction->iSecsExecOnceInterval > 0 && + if(pAction->iSecsExecOnceInterval > 0 && pAction->iSecsExecOnceInterval + pAction->tLastExec > getActNow(pAction)) { /* in this case we need to discard the message - its not yet time to exec the action */ dbgprintf("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n", @@ -697,6 +697,7 @@ actionWriteToAction(action_t *pAction) } /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ pAction->f_time = pAction->f_pMsg->ttGenTime; /* When we reach this point, we have a valid, non-disabled action. -- cgit v1.2.3 From 8b246de2a587454f9260ff91192d27a2e168ea2d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 12 Nov 2009 17:12:10 +0100 Subject: some light performance enhancement ...by replacing time() call with much faster (at least under linux) gettimeofday() calls. --- action.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'action.c') diff --git a/action.c b/action.c index 5958214f..07f14c24 100644 --- a/action.c +++ b/action.c @@ -129,7 +129,7 @@ getActNow(action_t *pThis) { assert(pThis != NULL); if(pThis->tActNow == -1) { - pThis->tActNow = time(NULL); /* good time call - the only one done */ + pThis->tActNow = datetime.GetTime(NULL); /* good time call - the only one done */ if(pThis->tLastExec > pThis->tActNow) { /* if we are traveling back in time, reset tLastExec */ pThis->tLastExec = (time_t) 0; @@ -254,7 +254,7 @@ rsRetVal actionConstruct(action_t **ppThis) CHKmalloc(pThis = (action_t*) calloc(1, sizeof(action_t))); pThis->iResumeInterval = glbliActionResumeInterval; pThis->iResumeRetryCount = glbliActionResumeRetryCount; - pThis->tLastOccur = time(NULL); /* done once per action on startup only */ + pThis->tLastOccur = datetime.GetTime(NULL); /* done once per action on startup only */ pthread_mutex_init(&pThis->mutActExec, NULL); SYNC_OBJ_TOOL_INIT(pThis); @@ -470,7 +470,7 @@ static void actionDisable(action_t *pThis) static inline void actionSuspend(action_t *pThis, time_t ttNow) { if(ttNow == NO_TIME_PROVIDED) - time(&ttNow); + datetime.GetTime(&ttNow); pThis->ttResumeRtry = ttNow + pThis->iResumeInterval * (pThis->iNbrResRtry / 10 + 1); actionSetState(pThis, ACT_STATE_SUSP); DBGPRINTF("earliest retry=%d\n", (int) pThis->ttResumeRtry); @@ -538,7 +538,7 @@ static rsRetVal actionTryResume(action_t *pThis) * is always in the past. So we can not avoid doing a fresh time() call * here. -- rgerhards, 2009-03-18 */ - time(&ttNow); /* cache "now" */ + datetime.GetTime(&ttNow); /* cache "now" */ if(ttNow > pThis->ttResumeRtry) { actionSetState(pThis, ACT_STATE_RTRY); /* back to retries */ } @@ -546,7 +546,7 @@ static rsRetVal actionTryResume(action_t *pThis) if(pThis->eState == ACT_STATE_RTRY) { if(ttNow == NO_TIME_PROVIDED) /* use cached result if we have it */ - time(&ttNow); + datetime.GetTime(&ttNow); CHKiRet(actionDoRetry(pThis, ttNow)); } @@ -1361,7 +1361,7 @@ addAction(action_t **ppAction, modInfo_t *pMod, void *pModData, omodStringReques pAction->eState = ACT_STATE_RDY; /* action is enabled */ if(bSuspended) - actionSuspend(pAction, time(NULL)); /* "good" time call, only during init and unavoidable */ + actionSuspend(pAction, datetime.GetTime(NULL)); /* "good" time call, only during init and unavoidable */ CHKiRet(actionConstructFinalize(pAction)); -- cgit v1.2.3