From 11bd517465360278b270ee7c18607b4d1d97e44e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 8 Jun 2010 15:20:33 +0200 Subject: added support for high-performance action queue submission if not all mark messages should be logged this was previously not properly handeld. This is also the first occurence of a (real) CAS loop inside rsyslog. Note that the performance is now very well in the default configuration, and mark message directives are still correctly being handled. So this code looks close to final, but needs to have some bug cleanup as the testsuite shows. --- action.c | 101 ++++++++++++++++++++++++++++++++++--------------------- action.h | 6 ++-- runtime/atomic.h | 2 +- runtime/rule.c | 2 +- 4 files changed, 68 insertions(+), 43 deletions(-) diff --git a/action.c b/action.c index b055ebf4..b8751c63 100644 --- a/action.c +++ b/action.c @@ -46,11 +46,15 @@ #include "wti.h" #include "datetime.h" #include "unicode-helper.h" +#include "atomic.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); +static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg); +static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg); +static rsRetVal doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -298,9 +302,13 @@ actionConstructFinalize(action_t *pThis) pThis->iExecEveryNthOccur, pThis->f_ReduceRepeated, pThis->iSecsExecOnceInterval ); - pThis->bSubmitFirehoseMode = 0; + pThis->submitToActQ = actionCallAction; + } else if(pThis->bWriteAllMarkMsgs == FALSE) { + /* nearly full-speed submission mode, default case */ + pThis->submitToActQ = doSubmitToActionQNotAllMark; } else { - pThis->bSubmitFirehoseMode = 1; + /* full firehose submission mode */ + pThis->submitToActQ = doSubmitToActionQ; } /* we need to make a safety check: if the queue is NOT in direct mode, a single @@ -644,6 +652,7 @@ finalize_it: rsRetVal actionDbgPrint(action_t *pThis) { DEFiRet; + char *sz; dbgprintf("%s: ", module.GetStateName(pThis->pMod)); pThis->pMod->dbgPrintInstInfo(pThis->pModData); @@ -656,7 +665,16 @@ rsRetVal actionDbgPrint(action_t *pThis) } dbgprintf("\tState: %s\n", getActStateName(pThis)); dbgprintf("\tExec only when previous is suspended: %d\n", pThis->bExecWhenPrevSusp); - dbgprintf("\tFirehose mode (stage 1): %d\n", pThis->bSubmitFirehoseMode); + if(pThis->submitToActQ == actionCallAction) { + sz = "slow, but feature-rich"; + } else if(pThis->submitToActQ == doSubmitToActionQNotAllMark) { + sz = "fast, but supports partial mark messages"; + } else if(pThis->submitToActQ == doSubmitToActionQ) { + sz = "firehose (fastest)"; + } else { + sz = "unknown (need to update debug display?)"; + } + dbgprintf("\tsubmission mode: %s\n", sz); dbgprintf("\n"); RETiRet; @@ -1317,33 +1335,51 @@ finalize_it: } +/* This submits the message to the action queue in case where we need to handle + * bWriteAllMarkMessage == FALSE only. Note that we use a non-blocking CAS loop + * for the synchronization. + * rgerhards, 2010-06-08 + */ +static rsRetVal +doSubmitToActionQNotAllMark(action_t *pAction, msg_t *pMsg) +{ + DEFiRet; + time_t now; + time_t lastAct; + + if(pMsg->msgFlags & MARK) { + now = datetime.GetTime(NULL); /* good time call - the only one done */ + /* CAS loop, we write back a bit early, but that's OK... */ + /* we use reception time, not dequeue time - this is considered more appropriate and + * also faster ;) -- rgerhards, 2008-09-17 */ + do { + lastAct = pAction->f_time; + if((now - lastAct) < MarkInterval / 2) { + DBGPRINTF("file was recently written, ignoring mark message\n"); + ABORT_FINALIZE(RS_RET_OK); + } + } while(ATOMIC_CAS(&pAction->f_time, lastAct, pMsg->ttGenTime, ADDME) == 0); + } + + DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); + iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + +finalize_it: + RETiRet; +} + /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing * and thus speed. * rgerhards, 2010-06-08 */ -static inline rsRetVal +static rsRetVal doSubmitToActionQ(action_t *pAction, msg_t *pMsg) { DEFiRet; -#if 0 // TODO: we need to care about this -- after PoC 2010-06-08 - /* don't output marks to recently written outputs */ - if(pAction->bWriteAllMarkMsgs == FALSE - && (pMsg->msgFlags & MARK) && (getActNow(pAction) - pAction->f_time) < MarkInterval / 2) { - ABORT_FINALIZE(RS_RET_OK); - } -#endif - DBGPRINTF("Called action, logging to %s\n", module.GetStateName(pAction->pMod)); - -#if 0 // we would need this for bWriteAllMarkMsgs - /* we use reception time, not dequeue time - this is considered more appropriate and also faster ;) -- rgerhards, 2008-09-17 */ - pAction->tLastExec = getActNow(pAction); /* re-init time flags */ - pAction->f_time = pAction->f_pMsg->ttGenTime; - -#endif iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); RETiRet; @@ -1351,15 +1387,11 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) -/* call the configured action. Does all necessary housekeeping. - * rgerhards, 2007-08-01 - * FYI: currently, this function is only called from the queue - * consumer. So we (conceptually) run detached from the input - * threads (which also means we may run much later than when the - * message was generated). +/* Call configured action, most complex case with all features supported (and thus slow). + * rgerhards, 2010-06-08 */ #pragma GCC diagnostic ignored "-Wempty-body" -rsRetVal +static rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg) { DEFiRet; @@ -1367,18 +1399,11 @@ actionCallAction(action_t *pAction, msg_t *pMsg) ISOBJ_TYPE_assert(pMsg, msg); ASSERT(pAction != NULL); - /* We need to lock the mutex only for repeated line processing. - * rgerhards, 2009-06-19 - */ - if(pAction->bSubmitFirehoseMode == 1) { - iRet = doSubmitToActionQ(pAction, pMsg); - } else { - LockObj(pAction); - pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); - iRet = doActionCallAction(pAction, pMsg); - UnlockObj(pAction); - pthread_cleanup_pop(0); /* remove mutex cleanup handler */ - } + LockObj(pAction); + pthread_cleanup_push(mutexCancelCleanup, pAction->Sync_mut); + iRet = doActionCallAction(pAction, pMsg); + UnlockObj(pAction); + pthread_cleanup_pop(0); /* remove mutex cleanup handler */ RETiRet; } diff --git a/action.h b/action.h index 43e6ae7d..bf9ceafa 100644 --- a/action.h +++ b/action.h @@ -47,6 +47,7 @@ typedef enum { /* the following struct defines the action object data structure */ +typedef struct action_s action_t; struct action_s { time_t f_time; /* used for "message repeated n times" - be careful, old, old code */ time_t tActNow; /* the current time for an action execution. Initially set to -1 and @@ -69,10 +70,11 @@ struct action_s { struct modInfo_s *pMod;/* pointer to output module handling this selector */ void *pModData; /* pointer to module data - content is module-specific */ sbool bRepMsgHasMsg; /* "message repeated..." has msg fragment in it (0-no, 1-yes) */ - sbool bSubmitFirehoseMode;/* fast submission to action q in phase 1 possible? */ short f_ReduceRepeated;/* reduce repeated lines 0 - no, 1 - yes */ int f_prevcount; /* repetition cnt of prevline */ int f_repeatcount; /* number of "repeated" msgs */ + rsRetVal (*submitToActQ)(action_t *, msg_t *); /* function submit message to action queue */ + rsRetVal (*qConstruct)(struct queue_s *pThis); enum { ACT_STRING_PASSING = 0, ACT_ARRAY_PASSING = 1, ACT_MSG_PASSING } eParamPassing; /* mode of parameter passing to action */ int iNumTpls; /* number of array entries for template element below */ @@ -90,7 +92,6 @@ struct action_s { void *ppMsgs; /* pointer to action-calling parameters (kept in structure to save alloc() time!) */ size_t *lenMsgs; /* length of message in ppMsgs */ }; -typedef struct action_s action_t; /* function prototypes @@ -101,7 +102,6 @@ rsRetVal actionDestruct(action_t *pThis); rsRetVal actionDbgPrint(action_t *pThis); rsRetVal actionSetGlobalResumeInterval(int iNewVal); rsRetVal actionDoAction(action_t *pAction); -rsRetVal actionCallAction(action_t *pAction, msg_t *pMsg); rsRetVal actionWriteToAction(action_t *pAction); rsRetVal actionCallHUPHdlr(action_t *pAction); rsRetVal actionClassInit(void); diff --git a/runtime/atomic.h b/runtime/atomic.h index e5fafe04..da0852fa 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -50,7 +50,7 @@ # define ATOMIC_STORE_0_TO_INT(data, phlpmut) __sync_fetch_and_and(data, 0) # define ATOMIC_STORE_1_TO_INT(data, phlpmut) __sync_fetch_and_or(data, 1) # define ATOMIC_STORE_INT_TO_INT(data, val) __sync_fetch_and_or(&(data), (val)) -# define ATOMIC_CAS(data, oldVal, newVal) __sync_bool_compare_and_swap(&(data), (oldVal), (newVal)); +# define ATOMIC_CAS(data, oldVal, newVal, phlpmut) __sync_bool_compare_and_swap(data, (oldVal), (newVal)) # define ATOMIC_CAS_VAL(data, oldVal, newVal, phlpmut) __sync_val_compare_and_swap(data, (oldVal), (newVal)); /* functions below are not needed if we have atomics */ diff --git a/runtime/rule.c b/runtime/rule.c index 65ad071e..7a26a03a 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -110,7 +110,7 @@ DEFFUNC_llExecFunc(processMsgDoActions) ABORT_FINALIZE(RS_RET_OK); } - iRetMod = actionCallAction(pAction, pDoActData->pMsg); + iRetMod = pAction->submitToActQ(pAction, pDoActData->pMsg); if(iRetMod == RS_RET_DISCARDMSG) { ABORT_FINALIZE(RS_RET_DISCARDMSG); } else if(iRetMod == RS_RET_SUSPENDED) { -- cgit v1.2.3 From 395660f462c62029f76b99f73bd9a424a8cf73a2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 9 Jun 2010 14:34:35 +0200 Subject: somewhat improved direct mode queue performance ... but only for batch enqueues. This will not help much with the current code, but will play well with upcoming changes. --- runtime/queue.c | 55 ++++++++++++++++++++++++++++++++++++++----------------- runtime/queue.h | 4 +++- tools/syslogd.c | 2 +- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/runtime/queue.c b/runtime/queue.c index b6c30278..d437d590 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -79,6 +79,8 @@ static int qqueueChkStopWrkrDA(qqueue_t *pThis); static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); +static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -1203,6 +1205,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddFixedArray; pThis->qDeq = qDeqFixedArray; pThis->qDel = qDelFixedArray; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_LINKEDLIST: pThis->qConstruct = qConstructLinkedList; @@ -1210,6 +1213,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddLinkedList; pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_DISK: pThis->qConstruct = qConstructDisk; @@ -1217,6 +1221,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; pThis->qDel = qDelDisk; + pThis->MultiEnq = qqueueMultiEnqObjNonDirect; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ break; @@ -1225,6 +1230,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->qDestruct = qDestructDirect; pThis->qAdd = qAddDirect; pThis->qDel = qDelDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; break; } @@ -1709,7 +1715,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ -dbgprintf("DA consumer pushes msg '%s'\n", ((msg_t*)(pWti->batch.pElem[i].pUsrp))->pszRawMsg); CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ @@ -2149,7 +2154,6 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); -//dbgCallStackPrintAll(); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2209,6 +2213,7 @@ finalize_it: RETiRet; } +/* ------------------------------ multi-enqueue functions ------------------------------ */ /* enqueue multiple user data elements at once. The aim is to provide a faster interface * for object submission. Uses the multi_submit_t helper object. * Please note that this function is not cancel-safe and consequently @@ -2216,9 +2221,12 @@ finalize_it: * during its execution. If that is not done, race conditions occur if the * thread is canceled (most important use case is input module termination). * rgerhards, 2009-06-16 + * Note: there now exists multiple different functions implementing specially + * optimized algorithms for different config cases. -- rgerhards, 2010-06-09 */ -rsRetVal -qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) +/* now the function for all modes but direct */ +static rsRetVal +qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int iCancelStateSave; int i; @@ -2227,29 +2235,42 @@ qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub) ISOBJ_TYPE_assert(pThis, qqueue); assert(pMultiSub != NULL); - if(pThis->qType != QUEUETYPE_DIRECT) { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - d_pthread_mutex_lock(pThis->mut); - } - + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); + d_pthread_mutex_lock(pThis->mut); for(i = 0 ; i < pMultiSub->nElem ; ++i) { CHKiRet(doEnqSingleObj(pThis, pMultiSub->ppMsgs[i]->flowCtlType, (void*)pMultiSub->ppMsgs[i])); } - qqueueChkPersist(pThis, pMultiSub->nElem); finalize_it: - if(pThis->qType != QUEUETYPE_DIRECT) { - /* make sure at least one worker is running. */ - qqueueAdviseMaxWorkers(pThis); - /* and release the mutex */ - d_pthread_mutex_unlock(pThis->mut); - pthread_setcancelstate(iCancelStateSave, NULL); - DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + /* make sure at least one worker is running. */ + qqueueAdviseMaxWorkers(pThis); + /* and release the mutex */ + d_pthread_mutex_unlock(pThis->mut); + pthread_setcancelstate(iCancelStateSave, NULL); + DBGOPRINT((obj_t*) pThis, "MultiEnqObj advised worker start\n"); + + RETiRet; +} + +/* now, the same function, but for direct mode */ +static rsRetVal +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) +{ + int i; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, qqueue); + assert(pMultiSub != NULL); + + for(i = 0 ; i < pMultiSub->nElem ; ++i) { + CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); } +finalize_it: RETiRet; } +/* ------------------------------ END multi-enqueue functions ------------------------------ */ /* enqueue a new user data element diff --git a/runtime/queue.h b/runtime/queue.h index 8ede6922..33b21c9a 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -114,6 +114,9 @@ struct queue_s { rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); rsRetVal (*qDel)(struct queue_s *pThis); /* end type-specific handler */ + /* public entry points (set during construction, permit to set best algorithm for params selected) */ + rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub); + /* end public entry points */ /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ @@ -174,7 +177,6 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub); rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); diff --git a/tools/syslogd.c b/tools/syslogd.c index dfbd184b..94e8346d 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -735,7 +735,7 @@ multiSubmitMsg(multi_submit_t *pMultiSub) pRuleset = MsgGetRuleset(pMultiSub->ppMsgs[0]); pQueue = (pRuleset == NULL) ? pMsgQueue : ruleset.GetRulesetQueue(pRuleset); - iRet = qqueueMultiEnqObj(pQueue, pMultiSub); + iRet = pQueue->MultiEnq(pQueue, pMultiSub); pMultiSub->nElem = 0; finalize_it: -- cgit v1.2.3 From 8fbcea483710faae468ecf0ba706adc7e60ed41d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 9 Jun 2010 15:37:00 +0200 Subject: main msg q consumer now preprocesses messages before doing rule processing things like ACL check and message parsing. This leads to a greater level of concurrent processing. Beware, though, that this commit duplicates some messages. May be a regression from this or an earlier commit. I will soon sort out. --- action.c | 16 ++----------- doc/msgflow.txt | 6 ++--- runtime/rule.c | 1 - tools/syslogd.c | 72 +++++++++++++++++++++++++++++++++++++-------------------- 4 files changed, 52 insertions(+), 43 deletions(-) diff --git a/action.c b/action.c index b8751c63..cae86c29 100644 --- a/action.c +++ b/action.c @@ -1140,20 +1140,8 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT } -/* rgerhards 2004-11-09: fprintlog() is the actual driver for - * the output channel. It receives the channel description (f) as - * well as the message and outputs them according to the channel - * semantics. The message is typically already contained in the - * channel save buffer (f->f_prevline). This is not only the case - * when a message was already repeated but also when a new message - * arrived. - * rgerhards 2007-08-01: interface changed to use action_t - * rgerhards, 2007-12-11: please note: THIS METHOD MUST ONLY BE - * CALLED AFTER THE CALLER HAS LOCKED THE pAction OBJECT! We do - * not do this here. Failing to do so results in all kinds of - * "interesting" problems! - * RGERHARDS, 2008-01-29: - * This is now the action caller and has been renamed. +/* This function builds up a batch of messages to be (later) + * submitted to the action queue. */ rsRetVal actionWriteToAction(action_t *pAction) diff --git a/doc/msgflow.txt b/doc/msgflow.txt index c1c440ef..b53ba7e7 100644 --- a/doc/msgflow.txt +++ b/doc/msgflow.txt @@ -18,11 +18,11 @@ syslogd.c/msgConsumeOne parser.ParseMsg ruleset.ProcessMsg (loops through ruleset) ruleset.c/processMsgDoRules (for each rule in ruleset) -rule.c/ProcessMsg -rule.c/shouldProcessThisMessage +rule.c/processMsg +1:rule.c/shouldProcessThisMessage (evaluates filters, optimize via ALL-Filter) if to be processed, loop through associated actions -> -rule.c/processMsgsDoAction +2:rule.c/processMsgsDoAction action.c/actionCallAction (LOCKs action object!) action.c/doActionCallAction (does duplicate message reduction) action.c/actionWriteToAction diff --git a/runtime/rule.c b/runtime/rule.c index 7a26a03a..3b98d7d1 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -166,7 +166,6 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, int *bProcessMsg) } } -RUNLOG_VAR("%p", pRule->pCSProgNameComp); if(pRule->pCSProgNameComp != NULL) { int bInv = 0, bEqv = 0, offset = 0; if(*(rsCStrGetSzStrNoNULL(pRule->pCSProgNameComp)) == '-') { diff --git a/tools/syslogd.c b/tools/syslogd.c index 94e8346d..2c36e6c2 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -627,39 +627,66 @@ chkMsgAgainstACL() { * (by definition!) considered committed. * rgerhards, 2009-11-16 */ +///static inline rsRetVal +///msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) { + ///DEFiRet; + //////RETiRet; +///} + +/* preprocess a batch of messages, that is ready them for actual processing. This is done + * as a first stage and totally in parallel to any other worker active in the system. So + * it helps us keep up the overall concurrency level. + * rgerhards, 2010-06-09 + */ static inline rsRetVal -msgConsumeOne(msg_t *pMsg, prop_t **propFromHost, prop_t **propFromHostIP) { +preprocessBatch(batch_t *pBatch, int *pbShutdownImmediate) { uchar fromHost[NI_MAXHOST]; uchar fromHostIP[NI_MAXHOST]; uchar fromHostFQDN[NI_MAXHOST]; + prop_t *propFromHost = NULL; + prop_t *propFromHostIP = NULL; int bIsPermitted; + msg_t *pMsg; + int i; + rsRetVal localRet; DEFiRet; - if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { - dbgprintf("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); - CHKiRet(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP)); - bIsPermitted = net.isAllowedSender2((uchar*)"UDP", - (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1); - if(!bIsPermitted) { - DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n", - fromHostFQDN); - ABORT_FINALIZE(RS_RET_ERR); - /* save some of the info we obtained */ - MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), propFromHost); - CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), propFromHostIP)); - pMsg->msgFlags &= ~NEEDS_ACLCHK_U; + for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { + DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); + if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK) + continue; + bIsPermitted = net.isAllowedSender2((uchar*)"UDP", + (struct sockaddr *)pMsg->rcvFrom.pfrominet, (char*)fromHostFQDN, 1); + if(!bIsPermitted) { + DBGPRINTF("Message from '%s' discarded, not a permitted sender host\n", + fromHostFQDN); + pBatch->pElem[i].state = BATCH_STATE_DISC; + } else { + /* save some of the info we obtained */ + MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); + CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); + pMsg->msgFlags &= ~NEEDS_ACLCHK_U; + } + } + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { + DBGPRINTF("Message discarded, parsing error %d\n", localRet); + pBatch->pElem[i].state = BATCH_STATE_DISC; + } } } - if((pMsg->msgFlags & NEEDS_PARSING) != 0) - CHKiRet(parser.ParseMsg(pMsg)); - ruleset.ProcessMsg(pMsg); finalize_it: + if(propFromHost != NULL) + prop.Destruct(&propFromHost); + if(propFromHostIP != NULL) + prop.Destruct(&propFromHostIP); RETiRet; } - /* The consumer of dequeued messages. This function is called by the * queue engine on dequeueing of a message. It runs on a SEPARATE * THREAD. It receives an array of pointers, which it must iterate @@ -670,22 +697,17 @@ static rsRetVal msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate) { int i; - prop_t *propFromHost = NULL; - prop_t *propFromHostIP = NULL; DEFiRet; assert(pBatch != NULL); + preprocessBatch(pBatch, pbShutdownImmediate); for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { DBGPRINTF("msgConsumer processes msg %d/%d\n", i, pBatch->nElem); - msgConsumeOne((msg_t*) pBatch->pElem[i].pUsrp, &propFromHost, &propFromHostIP); + ruleset.ProcessMsg((msg_t*) pBatch->pElem[i].pUsrp); pBatch->pElem[i].state = BATCH_STATE_COMM; } - if(propFromHost != NULL) - prop.Destruct(&propFromHost); - if(propFromHostIP != NULL) - prop.Destruct(&propFromHostIP); RETiRet; } -- cgit v1.2.3