summaryrefslogtreecommitdiffstats
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/batch.h40
-rw-r--r--runtime/conf.c6
-rw-r--r--runtime/module-template.h55
-rw-r--r--runtime/modules.c3
-rw-r--r--runtime/modules.h2
-rw-r--r--runtime/queue.c93
-rw-r--r--runtime/queue.h9
-rw-r--r--runtime/rsconf.c10
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--runtime/ruleset.c378
-rw-r--r--runtime/ruleset.h6
-rw-r--r--runtime/wti.c96
-rw-r--r--runtime/wti.h138
-rw-r--r--runtime/wtp.c6
14 files changed, 463 insertions, 380 deletions
diff --git a/runtime/batch.h b/runtime/batch.h
index 2ec07670..b0aa8574 100644
--- a/runtime/batch.h
+++ b/runtime/batch.h
@@ -46,17 +46,6 @@ typedef unsigned char batch_state_t;
*/
struct batch_obj_s {
msg_t *pMsg;
- /* work variables for action processing; these are reused for each action (or block of
- * actions)
- */
- sbool bPrevWasSuspended;
- /* following are caches to save allocs if not absolutely necessary */
- uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
- /* a cache to save malloc(), if not absolutely necessary */
- void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */
- size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
- /* and the same for the message length (if used) */
- /* end action work variables */
};
/* the batch
@@ -77,11 +66,7 @@ struct batch_s {
int maxElem; /* maximum number of elements that this batch supports */
int nElem; /* actual number of element in this entry */
int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */
- int iDoneUpTo; /* all messages below this index have state other than RDY */
qDeqID deqID; /* ID of dequeue operation that generated this batch */
- int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
- sbool *active; /* which messages are active for processing, NULL=all */
- sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */
batch_obj_t *pElem; /* batch elements */
batch_state_t *eltState;/* state (array!) for individual objects.
NOTE: we have moved this out of batch_obj_t because we
@@ -93,24 +78,6 @@ struct batch_s {
};
-/* some inline functions (we may move this off to an object .. or not) */
-static inline void
-batchSetSingleRuleset(batch_t *pBatch, sbool val) {
- pBatch->bSingleRuleset = val;
-}
-
-/* get the batches ruleset (if we have a single ruleset) */
-static inline ruleset_t*
-batchGetRuleset(batch_t *pBatch) {
- return (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL;
-}
-
-/* get the ruleset of a specifc element of the batch (index not verified!) */
-static inline ruleset_t*
-batchElemGetRuleset(batch_t *pBatch, int i) {
- return pBatch->pElem[i].pMsg->pRuleset;
-}
-
/* get number of msgs for this batch */
static inline int
batchNumMsgs(batch_t *pBatch) {
@@ -134,8 +101,7 @@ batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) {
*/
static inline int
batchIsValidElem(batch_t *pBatch, int i) {
- return( (pBatch->eltState[i] != BATCH_STATE_DISC)
- && (pBatch->active == NULL || pBatch->active[i]));
+ return(pBatch->eltState[i] != BATCH_STATE_DISC);
}
@@ -152,7 +118,7 @@ batchFree(batch_t *pBatch) {
/* staticActParams MUST be freed immediately (if required),
* so we do not need to do that!
*/
- free(pBatch->pElem[i].staticActStrings[j]);
+ //TODO: do this in wti! free(pBatch->pElem[i].staticActStrings[j]);
}
}
free(pBatch->pElem);
@@ -167,11 +133,9 @@ batchFree(batch_t *pBatch) {
static inline rsRetVal
batchInit(batch_t *pBatch, int maxElem) {
DEFiRet;
- pBatch->iDoneUpTo = 0;
pBatch->maxElem = maxElem;
CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t)));
- // TODO: replace calloc by inidividual writes?
finalize_it:
RETiRet;
}
diff --git a/runtime/conf.c b/runtime/conf.c
index 2b000c60..83931bca 100644
--- a/runtime/conf.c
+++ b/runtime/conf.c
@@ -518,12 +518,10 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction)
bHadWarning = 1;
iRet = RS_RET_OK;
}
- if(iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED) {
- if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, NULL,
- (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) {
+ if(iRet == RS_RET_OK) {
+ if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, NULL)) == RS_RET_OK) {
/* here check if the module is compatible with select features
* (currently, we have no such features!) */
- pAction->eState = ACT_STATE_RDY; /* action is enabled */
conf->actions.nbrActions++; /* one more active action! */
}
break;
diff --git a/runtime/module-template.h b/runtime/module-template.h
index 8a958f90..f65aaf0f 100644
--- a/runtime/module-template.h
+++ b/runtime/module-template.h
@@ -175,6 +175,44 @@ static rsRetVal freeInstance(void* pModData)\
RETiRet;\
}
+/* createWrkrInstance()
+ */
+#define BEGINcreateWrkrInstance \
+static rsRetVal createWrkrInstance(wrkrInstanceData_t **ppWrkrData, instanceData *pData)\
+ {\
+ DEFiRet; /* store error code here */\
+ wrkrInstanceData_t *pWrkrData; /* use this to point to data elements */
+
+#define CODESTARTcreateWrkrInstance \
+ if((pWrkrData = calloc(1, sizeof(wrkrInstanceData_t))) == NULL) {\
+ *ppWrkrData = NULL;\
+ ENDfunc \
+ return RS_RET_OUT_OF_MEMORY;\
+ } \
+ pWrkrData->pData = pData;
+
+#define ENDcreateWrkrInstance \
+ *ppWrkrData = pWrkrData;\
+ RETiRet;\
+}
+
+/* freeWrkrInstance */
+#define BEGINfreeWrkrInstance \
+static rsRetVal freeWrkrInstance(void* pd)\
+{\
+ DEFiRet;\
+ wrkrInstanceData_t *pWrkrData;
+
+#define CODESTARTfreeWrkrInstance \
+ pWrkrData = (wrkrInstanceData_t*) pd;
+
+#define ENDfreeWrkrInstance \
+ if(pWrkrData != NULL)\
+ free(pWrkrData); /* we need to free this in any case */\
+ RETiRet;\
+}
+
+
/* isCompatibleWithFeature()
*/
#define BEGINisCompatibleWithFeature \
@@ -194,7 +232,7 @@ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eF
* introduced in v4.3.3 -- rgerhards, 2009-04-27
*/
#define BEGINbeginTransaction \
-static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\
+static rsRetVal beginTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\
{\
DEFiRet;
@@ -209,7 +247,7 @@ static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\
* introduced in v4.3.3 -- rgerhards, 2009-04-27
*/
#define BEGINendTransaction \
-static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\
+static rsRetVal endTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\
{\
DEFiRet;
@@ -223,7 +261,7 @@ static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\
/* doAction()
*/
#define BEGINdoAction \
-static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, instanceData __attribute__((unused)) *pData)\
+static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\
{\
DEFiRet;
@@ -382,12 +420,12 @@ static rsRetVal newInpInst(struct nvlst *lst)\
* rgerhard, 2007-08-02
*/
#define BEGINtryResume \
-static rsRetVal tryResume(instanceData __attribute__((unused)) *pData)\
+static rsRetVal tryResume(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\
{\
DEFiRet;
#define CODESTARTtryResume \
- assert(pData != NULL);
+ assert(pWrkrData != NULL);
#define ENDtryResume \
RETiRet;\
@@ -467,6 +505,13 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\
*pEtryPoint = tryResume;\
}
+/* standard queries for output module interface in rsyslog v8+ */
+#define CODEqueryEtryPt_STD_OMOD8_QUERIES \
+ else if(!strcmp((char*) name, "createWrkrInstance")) {\
+ *pEtryPoint = createWrkrInstance;\
+ } else if(!strcmp((char*) name, "freeWrkrInstance")) {\
+ *pEtryPoint = freeWrkrInstance;\
+ }
/* the following definition is queryEtryPt block that must be added
* if an output module supports the transactional interface.
diff --git a/runtime/modules.c b/runtime/modules.c
index 56606306..52096082 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -652,6 +652,9 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"doAction", &pNew->mod.om.doAction));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"parseSelectorAct", &pNew->mod.om.parseSelectorAct));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"tryResume", &pNew->tryResume));
+ CHKiRet((*pNew->modQueryEtryPt)((uchar*)"createWrkrInstance", &pNew->mod.om.createWrkrInstance));
+ CHKiRet((*pNew->modQueryEtryPt)((uchar*)"freeWrkrInstance", &pNew->mod.om.freeWrkrInstance));
+
/* try load optional interfaces */
localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP);
if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
diff --git a/runtime/modules.h b/runtime/modules.h
index 23df22d6..a0637ac9 100644
--- a/runtime/modules.h
+++ b/runtime/modules.h
@@ -139,6 +139,8 @@ struct modInfo_s {
rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**);
rsRetVal (*newActInst)(uchar *modName, struct nvlst *lst, void **, omodStringRequest_t **);
rsRetVal (*SetShutdownImmdtPtr)(void *pData, void *pPtr);
+ rsRetVal (*createWrkrInstance)(void*ppWrkrData, void*pData);
+ rsRetVal (*freeWrkrInstance)(void*pWrkrData);
} om;
struct { /* data for library modules */
char dummy;
diff --git a/runtime/queue.c b/runtime/queue.c
index 8cbce011..ff1b30f4 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -86,7 +86,6 @@ static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiS
static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
-static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDestructDisk(qqueue_t *pThis);
rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir);
@@ -709,9 +708,13 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
pThis->qType = QUEUETYPE_DIRECT;
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
+ /* these entry points shall not be used in direct mode
+ * To catch program errors, make us abort if that happens!
+ * rgerhards, 2013-11-05
+ */
pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ pThis->qDel = NULL;
if(pThis->pqParent != NULL) {
DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
pThis->pqParent->bIsDA = 0;
@@ -961,13 +964,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
+static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
{
batch_t singleBatch;
batch_obj_t batchObj;
batch_state_t batchState = BATCH_STATE_RDY;
- sbool active = 1;
- int i;
DEFiRet;
//TODO: init batchObj (states _OK and new fields -- CHECK)
@@ -987,46 +988,29 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.eltState = &batchState;
- singleBatch.active = &active;
- iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate);
- /* delete the batch string params: TODO: create its own "class" for this */
- for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
- free(batchObj.staticActStrings[i]);
- }
+ iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti);
msgDestruct(&pMsg);
RETiRet;
}
-/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead
- * otherwise incured. -- rgerhards, ~2010-06-23
+/* this is called if we do not have a pWti. This currently only happens
+ * when we are called from a main queue in direct mode. If so, we need
+ * to obtain a dummy pWti.
*/
-rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
+static rsRetVal
+qAddDirect(qqueue_t *pThis, msg_t* pMsg)
{
+ wti_t *pWti;
DEFiRet;
- ASSERT(pThis != NULL);
-
- /* calling the consumer is quite different here than it is from a worker thread */
- /* we need to provide the consumer's return value back to the caller because in direct
- * mode the consumer probably has a lot to convey (which get's lost in the other modes
- * because they are asynchronous. But direct mode is deliberately synchronous.
- * rgerhards, 2008-02-12
- * We use our knowledge about the batch_t structure below, but without that, we
- * pay a too-large performance toll... -- rgerhards, 2009-04-22
- */
- iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL);
-
+ pWti = wtiGetDummy();
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
+ iRet = qAddDirectWithWti(pThis, pMsg, pWti);
RETiRet;
}
-static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
-{
- return RS_RET_OK;
-}
-
-
/* --------------- end type-specific handlers -------------------- */
@@ -1321,7 +1305,7 @@ finalize_it:
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1700,6 +1684,9 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
/* The rate limiter
*
+ * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when
+ * it actually delays processing. Otherwise inputs are stalled.
+ *
* Here we may wait if a dequeue time window is defined or if we are
* rate-limited. TODO: If we do so, we should also look into the
* way new worker threads are spawned. Obviously, it doesn't make much
@@ -1785,8 +1772,10 @@ RateLimiter(qqueue_t *pThis)
}
if(iDelay > 0) {
+ pthread_mutex_unlock(pThis->mut);
DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
srSleep(iDelay, 0);
+ pthread_mutex_lock(pThis->mut);
}
RETiRet;
@@ -1880,7 +1869,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
- CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate));
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
+ CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -2103,9 +2093,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
case QUEUETYPE_DIRECT:
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
+ /* these entry points shall not be used in direct mode
+ * To catch program errors, make us abort if that happens!
+ * rgerhards, 2013-11-05
+ */
pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ pThis->qDel = NULL;
break;
}
@@ -2169,9 +2163,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->iLowWtrMrk = 1;
}
}
+
if( pThis->iMinMsgsPerWrkr < 1
- || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize )
+ || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) {
pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads;
+ }
+
if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) {
pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97;
if(pThis->iFullDlyMrk == 0) {
@@ -2777,13 +2774,14 @@ static rsRetVal
qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int i;
+ wti_t *pWti;
DEFiRet;
- ISOBJ_TYPE_assert(pThis, qqueue);
- assert(pMultiSub != NULL);
+ pWti = wtiGetDummy();
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
- CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i]));
+ CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
}
finalize_it:
@@ -2792,22 +2790,7 @@ finalize_it:
/* ------------------------------ END multi-enqueue functions ------------------------------ */
-/* enqueue a new user data element in direct mode
- * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better
- * code later on (like multi submit!) 2010-06-10
- * Enqueues the new element and awakes worker thread.
- */
-rsRetVal
-qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg)
-{
- DEFiRet;
- ISOBJ_TYPE_assert(pThis, qqueue);
- iRet = qAddDirect(pThis, pMsg);
- RETiRet;
-}
-
-
-/* enqueue a new user data element
+/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
diff --git a/runtime/queue.h b/runtime/queue.h
index 19ea735a..468e19bc 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -103,11 +103,10 @@ struct queue_s {
* the user really wanted...). -- rgerhards, 2008-04-02
*/
/* end dequeue time window */
- rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */
+ rsRetVal (*pConsumer)(void *,batch_t*, wti_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
* user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
- * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero
- * during normal operations and one if the consumer must urgently shut down.
+ * is pointer to an array of message message pointers)
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
@@ -195,14 +194,12 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
-rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg);
rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
-rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *));
int queueCnfParamsSet(struct nvlst *lst);
rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst);
void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);
diff --git a/runtime/rsconf.c b/runtime/rsconf.c
index b1016568..f2feb989 100644
--- a/runtime/rsconf.c
+++ b/runtime/rsconf.c
@@ -679,8 +679,12 @@ activateMainQueue()
mainqCnfObj = glbl.GetmainqCnfObj();
DBGPRINTF("activateMainQueue: mainq cnf obj ptr is %p\n", mainqCnfObj);
/* create message queue */
- CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"),
- (mainqCnfObj == NULL) ? NULL : mainqCnfObj->nvlst)) {
+ iRet = createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"),
+ (mainqCnfObj == NULL) ? NULL : mainqCnfObj->nvlst);
+ if(iRet == RS_RET_OK) {
+ iRet = startMainQueue(pMsgQueue);
+ }
+ if(iRet != RS_RET_OK) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
FINALIZE;
@@ -743,6 +747,7 @@ activate(rsconf_t *cnf)
tellModulesActivateConfig();
startInputModules();
CHKiRet(activateActions());
+ CHKiRet(activateRulesetQueues());
CHKiRet(activateMainQueue());
/* finally let the inputs run... */
runInputModules();
@@ -1220,6 +1225,7 @@ ourConf = loadConf; // TODO: remove, once ourConf is gone!
ABORT_FINALIZE(RS_RET_NO_ACTIONS);
}
tellLexEndParsing();
+ DBGPRINTF("Number of actions in this configuration: %d\n", iActionNbr);
rulesetOptimizeAll(loadConf);
tellCoreConfigLoadDone();
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index 3c1e55d2..dfbfc448 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -49,6 +49,7 @@
#define CONF_PROGNAME_BUFSIZE 16
#define CONF_HOSTNAME_BUFSIZE 32
#define CONF_PROP_BUFSIZE 16 /* should be close to sizeof(ptr) or lighly above it */
+#define CONF_IPARAMS_BUFSIZE 16 /* initial size of iparams array in wti (is automatically extended) */
#define CONF_MIN_SIZE_FOR_COMPRESS 60 /* config param: minimum message size to try compression. The smaller
* the message, the less likely is any compression gain. We check for
* gain before we submit the message. But to do so we still need to
diff --git a/runtime/ruleset.c b/runtime/ruleset.c
index 3c4d2eed..4bf8c057 100644
--- a/runtime/ruleset.c
+++ b/runtime/ruleset.c
@@ -48,6 +48,7 @@
#include "rainerscript.h"
#include "srUtils.h"
#include "modules.h"
+#include "wti.h"
#include "dirty.h" /* for main ruleset queue creation */
/* static data */
@@ -67,8 +68,8 @@ static struct cnfparamblk rspblk =
};
/* forward definitions */
-static rsRetVal processBatch(batch_t *pBatch);
-static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active);
+static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti);
+static rsRetVal scriptExec(struct cnfstmt *root, msg_t *pMsg, wti_t *pWti);
/* ---------- linked-list key handling functions (ruleset) ---------- */
@@ -160,250 +161,130 @@ finalize_it:
RETiRet;
}
-
-/* This function is similar to processBatch(), but works on a batch that
- * contains rules from multiple rulesets. In this case, we can not push
- * the whole batch through the ruleset. Instead, we examine it and
- * partition it into sub-rulesets which we then push through the system.
- * rgerhards, 2010-06-15
- */
-static inline rsRetVal
-processBatchMultiRuleset(batch_t *pBatch)
+/* driver to iterate over all rulesets */
+DEFFUNC_llExecFunc(doActivateRulesetQueues)
{
- ruleset_t *currRuleset;
- batch_t snglRuleBatch;
- int i;
- int iStart; /* start index of partial batch */
- int iNew; /* index for new (temporary) batch */
- int bHaveUnprocessed; /* do we (still) have unprocessed entries? (loop term predicate) */
DEFiRet;
-
- do {
- bHaveUnprocessed = 0;
- /* search for first unprocessed element */
- for(iStart = 0 ; iStart < pBatch->nElem && pBatch->eltState[iStart] == BATCH_STATE_DISC ; ++iStart)
- /* just search, no action */;
- if(iStart == pBatch->nElem)
- break; /* everything processed */
-
- /* prepare temporary batch */
- CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem));
- snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate;
- currRuleset = batchElemGetRuleset(pBatch, iStart);
- iNew = 0;
- for(i = iStart ; i < pBatch->nElem ; ++i) {
- if(batchElemGetRuleset(pBatch, i) == currRuleset) {
- /* for performance reasons, we copy only those members that we actually need */
- snglRuleBatch.pElem[iNew].pMsg = pBatch->pElem[i].pMsg;
- snglRuleBatch.eltState[iNew] = pBatch->eltState[i];
- ++iNew;
- /* We indicate the element also as done, so it will not be processed again */
- pBatch->eltState[i] = BATCH_STATE_DISC;
- } else {
- bHaveUnprocessed = 1;
- }
- }
- snglRuleBatch.nElem = iNew; /* was left just right by the for loop */
- batchSetSingleRuleset(&snglRuleBatch, 1);
- /* process temp batch */
- processBatch(&snglRuleBatch);
- batchFree(&snglRuleBatch);
- } while(bHaveUnprocessed == 1);
-
-finalize_it:
+ ruleset_t* pThis = (ruleset_t*) pData;
+ dbgprintf("Activating Ruleset Queue[%p] for Ruleset %s\n",
+ pThis->pQueue, pThis->pszName);
+ if(pThis->pQueue != NULL)
+ startMainQueue(pThis->pQueue);
RETiRet;
}
-
-/* return a new "active" structure for the batch. Free with freeActive(). */
-static inline sbool *newActive(batch_t *pBatch)
+/* activate all ruleset queues */
+rsRetVal
+activateRulesetQueues()
{
- return malloc(sizeof(sbool) * batchNumMsgs(pBatch));
-
-}
-static inline void freeActive(sbool *active) { free(active); }
+ DEFiRet;
+ llExecFunc(&(runConf->rulesets.llRulesets), doActivateRulesetQueues, NULL);
-/* for details, see scriptExec() header comment! */
-/* call action for all messages with filter on */
-static rsRetVal
-execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
-{
- DEFiRet;
-dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active);
- pBatch->active = active;
- stmt->d.act->submitToActQ(stmt->d.act, pBatch);
RETiRet;
}
+
static rsRetVal
-execSet(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+execAct(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
{
- int i;
- struct var result;
DEFiRet;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->eltState[i] != BATCH_STATE_DISC
- && (active == NULL || active[i])) {
- cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pMsg);
- msgSetJSONFromVar(pBatch->pElem[i].pMsg, stmt->d.s_set.varname,
- &result);
- varDelete(&result);
- }
+ if(stmt->d.act->bDisabled) {
+ DBGPRINTF("action %d died, do NOT execute\n", stmt->d.act->iActionNbr);
+ FINALIZE;
+ }
+
+ DBGPRINTF("executing action %d\n", stmt->d.act->iActionNbr);
+ stmt->d.act->submitToActQ(stmt->d.act, pWti, pMsg);
+ if(iRet != RS_RET_DISCARDMSG) {
+ /* note: we ignore the error code here, as we do NEVER want to
+ * stop script execution due to action return code
+ */
+ iRet = RS_RET_OK;
}
+finalize_it:
RETiRet;
}
static rsRetVal
-execUnset(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+execSet(struct cnfstmt *stmt, msg_t *pMsg)
{
- int i;
+ struct var result;
DEFiRet;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->eltState[i] != BATCH_STATE_DISC
- && (active == NULL || active[i])) {
- msgDelJSON(pBatch->pElem[i].pMsg, stmt->d.s_unset.varname);
- }
- }
+ cnfexprEval(stmt->d.s_set.expr, &result, pMsg);
+ msgSetJSONFromVar(pMsg, stmt->d.s_set.varname, &result);
+ varDelete(&result);
RETiRet;
}
-/* for details, see scriptExec() header comment! */
-/* "stop" simply discards the filtered items - it's just a (hopefully more intuitive
- * shortcut for users.
- */
static rsRetVal
-execStop(batch_t *pBatch, sbool *active)
+execUnset(struct cnfstmt *stmt, msg_t *pMsg)
{
- int i;
DEFiRet;
- for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) {
- if( pBatch->eltState[i] != BATCH_STATE_DISC
- && (active == NULL || active[i])) {
- pBatch->eltState[i] = BATCH_STATE_DISC;
- }
- }
+ msgDelJSON(pMsg, stmt->d.s_unset.varname);
RETiRet;
}
+
static rsRetVal
-execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+execCall(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
{
- msg_t *pMsg;
- int i;
DEFiRet;
if(stmt->d.s_call.ruleset == NULL) {
- scriptExec(stmt->d.s_call.stmt, pBatch, active);
+ CHKiRet(scriptExec(stmt->d.s_call.stmt, pMsg, pWti));
} else {
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- CHKmalloc(pMsg = MsgDup((msg_t*) pBatch->pElem[i].pMsg));
- DBGPRINTF("CALL: forwarding message %d to async ruleset %p\n",
- i, stmt->d.s_call.ruleset->pQueue);
- MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
- MsgSetRuleset(pMsg, stmt->d.s_call.ruleset);
- /* Note: we intentionally use submitMsg2() here, as we process messages
- * that were already run through the rate-limiter.
- */
- submitMsg2(pMsg);
- }
+ CHKmalloc(pMsg = MsgDup((msg_t*) pMsg));
+ DBGPRINTF("CALL: forwarding message to async ruleset %p\n",
+ stmt->d.s_call.ruleset->pQueue);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
+ MsgSetRuleset(pMsg, stmt->d.s_call.ruleset);
+ /* Note: we intentionally use submitMsg2() here, as we process messages
+ * that were already run through the rate-limiter.
+ */
+ submitMsg2(pMsg);
}
finalize_it:
RETiRet;
}
-/* for details, see scriptExec() header comment! */
-// save current filter, evaluate new one
-// perform then (if any message)
-// if ELSE given:
-// set new filter, inverted
-// perform else (if any messages)
static rsRetVal
-execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+execIf(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
{
- sbool *newAct;
- int i;
sbool bRet;
- sbool allInactive = 1;
DEFiRet;
- newAct = newActive(pBatch);
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(*(pBatch->pbShutdownImmediate))
- FINALIZE;
- if(pBatch->eltState[i] == BATCH_STATE_DISC)
- continue; /* will be ignored in any case */
- if(active == NULL || active[i]) {
- bRet = cnfexprEvalBool(stmt->d.s_if.expr, pBatch->pElem[i].pMsg);
- allInactive = 0;
- } else
- bRet = 0;
- newAct[i] = bRet;
- DBGPRINTF("batch: item %d: expr eval: %d\n", i, bRet);
- }
-
- if(allInactive) {
- DBGPRINTF("execIf: all batch elements are inactive, holding execution\n");
- freeActive(newAct);
- FINALIZE;
- }
-
- if(stmt->d.s_if.t_then != NULL) {
- scriptExec(stmt->d.s_if.t_then, pBatch, newAct);
- }
- if(stmt->d.s_if.t_else != NULL) {
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(*(pBatch->pbShutdownImmediate))
- FINALIZE;
- if(pBatch->eltState[i] != BATCH_STATE_DISC
- && (active == NULL || active[i]))
- newAct[i] = !newAct[i];
- }
- scriptExec(stmt->d.s_if.t_else, pBatch, newAct);
+ bRet = cnfexprEvalBool(stmt->d.s_if.expr, pMsg);
+ DBGPRINTF("if condition result is %d\n", bRet);
+ if(bRet) {
+ if(stmt->d.s_if.t_then != NULL)
+ CHKiRet(scriptExec(stmt->d.s_if.t_then, pMsg, pWti));
+ } else {
+ if(stmt->d.s_if.t_else != NULL)
+ CHKiRet(scriptExec(stmt->d.s_if.t_else, pMsg, pWti));
}
- freeActive(newAct);
finalize_it:
RETiRet;
}
-/* for details, see scriptExec() header comment! */
-static void
-execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+static rsRetVal
+execPRIFILT(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
{
- sbool *newAct;
- msg_t *pMsg;
int bRet;
- int i;
- newAct = newActive(pBatch);
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(*(pBatch->pbShutdownImmediate))
- return;
- if(pBatch->eltState[i] == BATCH_STATE_DISC)
- continue; /* will be ignored in any case */
- pMsg = pBatch->pElem[i].pMsg;
- if(active == NULL || active[i]) {
- if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) ||
- ((stmt->d.s_prifilt.pmask[pMsg->iFacility]
- & (1<<pMsg->iSeverity)) == 0) )
- bRet = 0;
- else
- bRet = 1;
- } else
- bRet = 0;
- newAct[i] = bRet;
- DBGPRINTF("batch: item %d PRIFILT %d\n", i, newAct[i]);
- }
-
- if(stmt->d.s_prifilt.t_then != NULL) {
- scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct);
- }
- if(stmt->d.s_prifilt.t_else != NULL) {
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(*(pBatch->pbShutdownImmediate))
- return;
- if(pBatch->eltState[i] != BATCH_STATE_DISC
- && (active == NULL || active[i]))
- newAct[i] = !newAct[i];
- }
- scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct);
+ DEFiRet;
+ if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) ||
+ ((stmt->d.s_prifilt.pmask[pMsg->iFacility]
+ & (1<<pMsg->iSeverity)) == 0) )
+ bRet = 0;
+ else
+ bRet = 1;
+
+ DBGPRINTF("PRIFILT condition result is %d\n", bRet);
+ if(bRet) {
+ if(stmt->d.s_prifilt.t_then != NULL)
+ CHKiRet(scriptExec(stmt->d.s_prifilt.t_then, pMsg, pWti));
+ } else {
+ if(stmt->d.s_prifilt.t_else != NULL)
+ CHKiRet(scriptExec(stmt->d.s_prifilt.t_else, pMsg, pWti));
}
- freeActive(newAct);
+finalize_it:
+ RETiRet;
}
@@ -498,79 +379,67 @@ done:
return bRet;
}
-/* for details, see scriptExec() header comment! */
-static void
-execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active)
+static rsRetVal
+execPROPFILT(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti)
{
- sbool *thenAct;
sbool bRet;
- int i;
- thenAct = newActive(pBatch);
- for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) {
- if(*(pBatch->pbShutdownImmediate))
- return;
- if(pBatch->eltState[i] == BATCH_STATE_DISC)
- continue; /* will be ignored in any case */
- if(active == NULL || active[i]) {
- bRet = evalPROPFILT(stmt, pBatch->pElem[i].pMsg);
- } else
- bRet = 0;
- thenAct[i] = bRet;
- DBGPRINTF("batch: item %d PROPFILT %d\n", i, thenAct[i]);
- }
+ DEFiRet;
- scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct);
- freeActive(thenAct);
+ bRet = evalPROPFILT(stmt, pMsg);
+ DBGPRINTF("PROPFILT condition result is %d\n", bRet);
+ if(bRet)
+ CHKiRet(scriptExec(stmt->d.s_propfilt.t_then, pMsg, pWti));
+finalize_it:
+ RETiRet;
}
/* The rainerscript execution engine. It is debatable if that would be better
* contained in grammer/rainerscript.c, HOWEVER, that file focusses primarily
* on the parsing and object creation part. So as an actual executor, it is
* better suited here.
- * param active: if NULL, all messages are active (to be processed), if non-null
- * this is an array of the same size as the batch. If 1, the message
- * is to be processed, otherwise not.
- * NOTE: this function must receive batches which contain a single ruleset ONLY!
* rgerhards, 2012-09-04
*/
static rsRetVal
-scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active)
+scriptExec(struct cnfstmt *root, msg_t *pMsg, wti_t *pWti)
{
- DEFiRet;
struct cnfstmt *stmt;
+ DEFiRet;
for(stmt = root ; stmt != NULL ; stmt = stmt->next) {
+ if(*pWti->pbShutdownImmediate) {
+ DBGPRINTF("scriptExec: ShutdownImmediate set, "
+ "force terminating\n");
+ ABORT_FINALIZE(RS_RET_FORCE_TERM);
+ }
if(Debug) {
- dbgprintf("scriptExec: batch of %d elements, active %p, active[0]:%d\n",
- batchNumMsgs(pBatch), active, (active == NULL ? 1 : active[0]));
cnfstmtPrintOnly(stmt, 2, 0);
}
switch(stmt->nodetype) {
case S_NOP:
break;
case S_STOP:
- execStop(pBatch, active);
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
break;
case S_ACT:
- execAct(stmt, pBatch, active);
+ CHKiRet(execAct(stmt, pMsg, pWti));
break;
case S_SET:
- execSet(stmt, pBatch, active);
+ CHKiRet(execSet(stmt, pMsg));
break;
case S_UNSET:
- execUnset(stmt, pBatch, active);
+ CHKiRet(execUnset(stmt, pMsg));
break;
case S_CALL:
- execCall(stmt, pBatch, active);
+ CHKiRet(execCall(stmt, pMsg, pWti));
break;
case S_IF:
- execIf(stmt, pBatch, active);
+ CHKiRet(execIf(stmt, pMsg, pWti));
break;
case S_PRIFILT:
- execPRIFILT(stmt, pBatch, active);
+ CHKiRet(execPRIFILT(stmt, pMsg, pWti));
break;
case S_PROPFILT:
- execPROPFILT(stmt, pBatch, active);
+ CHKiRet(execPROPFILT(stmt, pMsg, pWti));
break;
default:
dbgprintf("error: unknown stmt type %u during exec\n",
@@ -578,36 +447,43 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active)
break;
}
}
+finalize_it:
RETiRet;
}
/* Process (consume) a batch of messages. Calls the actions configured.
- * If the whole batch uses a singel ruleset, we can process the batch as
- * a whole. Otherwise, we need to process it slower, on a message-by-message
- * basis (what can be optimized to a per-ruleset basis)
- * rgerhards, 2005-10-13
+ * This is called by MAIN queues.
*/
static rsRetVal
-processBatch(batch_t *pBatch)
+processBatch(batch_t *pBatch, wti_t *pWti)
{
- ruleset_t *pThis;
+ int i;
+ msg_t *pMsg;
+ ruleset_t *pRuleset;
DEFiRet;
- assert(pBatch != NULL);
-
- DBGPRINTF("processBatch: batch of %d elements must be processed\n", pBatch->nElem);
- if(pBatch->bSingleRuleset) {
- pThis = batchGetRuleset(pBatch);
- if(pThis == NULL)
- pThis = ourConf->rulesets.pDflt;
- ISOBJ_TYPE_assert(pThis, ruleset);
- CHKiRet(scriptExec(pThis->root, pBatch, NULL));
- } else {
- CHKiRet(processBatchMultiRuleset(pBatch));
+
+ DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem);
+
+ wtiResetExecState(pWti, pBatch);
+
+ /* execution phase */
+ for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pWti->pbShutdownImmediate) ; ++i) {
+ pMsg = pBatch->pElem[i].pMsg;
+ DBGPRINTF("processBATCH: next msg %d: %.128s\n", i, pMsg->pszRawMsg);
+ pRuleset = (pMsg->pRuleset == NULL) ? ourConf->rulesets.pDflt : pMsg->pRuleset;
+ scriptExec(pRuleset->root, pMsg, pWti);
+ // TODO: think if we need a return state of scriptExec - most probably
+ // the answer is "no", as we need to process the batch in any case!
+ // TODO: we must refactor this! flag messages as committed
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
}
-finalize_it:
- DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet);
+ /* commit phase */
+ dbgprintf("END batch execution phase, entering to commit phase\n");
+ actionCommitAllDirect(pWti);
+
+ DBGPRINTF("processBATCH: batch of %d elements has been processed\n", pBatch->nElem);
RETiRet;
}
diff --git a/runtime/ruleset.h b/runtime/ruleset.h
index 9905b53c..d3dfd664 100644
--- a/runtime/ruleset.h
+++ b/runtime/ruleset.h
@@ -46,7 +46,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Destruct)(ruleset_t **ppThis);
rsRetVal (*DestructAllActions)(rsconf_t *conf);
rsRetVal (*SetName)(ruleset_t *pThis, uchar *pszName);
- rsRetVal (*ProcessBatch)(batch_t*);
+ rsRetVal (*ProcessBatch)(batch_t*, wti_t *);
rsRetVal (*GetRuleset)(rsconf_t *conf, ruleset_t **ppThis, uchar*);
rsRetVal (*SetDefaultRuleset)(rsconf_t *conf, uchar*);
rsRetVal (*SetCurrRuleset)(rsconf_t *conf, uchar*);
@@ -64,8 +64,9 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */
/* AddRule() removed */
/*TODO:REMOVE*/rsRetVal (*IterateAllActions)(rsconf_t *conf, rsRetVal (*pFunc)(void*, void*), void* pParam);
void (*AddScript)(ruleset_t *pThis, struct cnfstmt *script);
+ /* v8: changed processBatch interface */
ENDinterface(ruleset)
-#define rulesetCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */
+#define rulesetCURR_IF_VERSION 8 /* increment whenever you change the interface structure! */
/* prototypes */
@@ -104,6 +105,7 @@ rulesetHasQueue(ruleset_t *pRuleset)
rsRetVal rulesetGetRuleset(rsconf_t *conf, ruleset_t **ppRuleset, uchar *pszName);
rsRetVal rulesetOptimizeAll(rsconf_t *conf);
rsRetVal rulesetProcessCnf(struct cnfobj *o);
+rsRetVal activateRulesetQueues(void);
/* Set a current rule set to already-known pointer */
static inline void
diff --git a/runtime/wti.c b/runtime/wti.c
index 77197a95..c02d0573 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -44,12 +44,15 @@
#include "wti.h"
#include "obj.h"
#include "glbl.h"
+#include "action.h"
#include "atomic.h"
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
+pthread_key_t thrd_wti_key;
+
/* forward-definitions */
/* methods */
@@ -171,6 +174,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(wti)
/* actual destruction */
batchFree(&pThis->batch);
+ free(pThis->actWrkrInfo);
pthread_cond_destroy(&pThis->pcondBusy);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
@@ -197,11 +201,20 @@ wtiConstructFinalize(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
- DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
+ DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n",
+ wtiGetDbgHdr(pThis), iActionNbr);
/* initialize our thread instance descriptor (no concurrency here) */
pThis->bIsRunning = RSFALSE;
+ /* must use calloc as we need zero-init */
+ CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t)));
+
+ if(pThis->pWtp == NULL) {
+ dbgprintf("wtiConstructFinalize: pWtp not set, this may be intentional\n");
+ FINALIZE;
+ }
+
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
CHKiRet(batchInit(&pThis->batch, iDeqBatchSize));
@@ -279,6 +292,7 @@ wtiWorker(wti_t *pThis)
rsRetVal localRet;
rsRetVal terminateRet;
int iCancelStateSave;
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -288,15 +302,27 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
-
+dbgprintf("DDDD: wti %p: worker starting\n", pThis);
/* now we have our identity, on to real processing */
- while(1) { /* loop will be broken below - need to do mutex locks */
+
+ /* note: in this loop, the mutex is "never" unlocked. Of course,
+ * this is not true: it actually is unlocked when the actual processing
+ * is done, as part of pWtp->pfDoWork() processing. Note that this
+ * function is required to re-lock it when done. We cannot do the
+ * lock/unlock here ourselfs, as pfDoWork() needs to access queue
+ * structures itself.
+ * The same goes for pfRateLimiter(). While we could unlock/lock when
+ * we call it, in practice the function is often called without any
+ * ratelimiting actually done. Only the rate limiter itself knows
+ * that. As such, it needs to bear the burden of doing the locking
+ * when required. -- rgerhards, 2013-11-20
+ */
+ d_pthread_mutex_lock(pWtp->pmutUsr);
+ while(1) { /* loop will be broken below */
if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
- d_pthread_mutex_lock(pWtp->pmutUsr);
-
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
if(terminateRet == RS_RET_TERMINATE_NOW) {
@@ -304,39 +330,47 @@ wtiWorker(wti_t *pThis)
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
DBGOPRINT((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n",
localRet);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
break;
}
/* try to execute and process whatever we have */
- /* Note that this function releases and re-aquires the mutex. The returned
- * information on idle state must be processed before releasing the mutex again.
- */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) {
- d_pthread_mutex_unlock(pWtp->pmutUsr);
break; /* end of loop */
} else if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) {
- d_pthread_mutex_unlock(pWtp->pmutUsr);
DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n",
terminateRet, bInactivityTOOccured);
break; /* end of loop */
}
doIdleProcessing(pThis, pWtp, &bInactivityTOOccured);
- d_pthread_mutex_unlock(pWtp->pmutUsr);
continue; /* request next iteration */
}
- d_pthread_mutex_unlock(pWtp->pmutUsr);
-
bInactivityTOOccured = 0; /* reset for next run */
}
+ d_pthread_mutex_unlock(pWtp->pmutUsr);
+
+ DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis);
+ for(i = 0 ; i < iActionNbr ; ++i) {
+ dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData);
+ if(pThis->actWrkrInfo[i].actWrkrData != NULL) {
+ dbgprintf("DDDD: calling freeWrkrData!\n");
+ pThis->actWrkrInfo[i].pAction->pMod->mod.om.freeWrkrInstance(pThis->actWrkrInfo[i].actWrkrData);
+ free(pThis->actWrkrInfo[i].iparams);
+ pThis->actWrkrInfo[i].actWrkrData = NULL; /* re-init for next activation */
+ pThis->actWrkrInfo[i].iparams = NULL;
+ pThis->actWrkrInfo[i].currIParam = 0;
+ pThis->actWrkrInfo[i].maxIParams = 0;
+ }
+ }
+
/* indicate termination */
pthread_cleanup_pop(0); /* remove cleanup handler */
pthread_setcancelstate(iCancelStateSave, NULL);
+dbgprintf("DDDD: wti %p: worker exiting\n", pThis);
RETiRet;
}
@@ -376,6 +410,33 @@ finalize_it:
}
+/* This function returns (and creates if necessary) a dummy wti suitable
+ * for use by the rule engine. It is intended to be used for direct-mode
+ * main queues (folks, don't do that!). Once created, data is stored in
+ * thread-specific storage.
+ * Note: we do NOT do error checking -- if this functions fails, all the
+ * rest will fail as well... (also, it will only fail under OOM, so...).
+ * Memleak: we leak pWti's when run in direct mode. However, this is only
+ * a cosmetic leak, as we need them until all inputs are terminated,
+ * what means essentially until rsyslog itself is terminated. So we
+ * don't care -- it's just not nice in valgrind, but that's it.
+ */
+wti_t *
+wtiGetDummy(void)
+{
+ wti_t *pWti;
+
+ pWti = (wti_t*) pthread_getspecific(thrd_wti_key);
+ if(pWti == NULL) {
+ wtiConstruct(&pWti);
+ wtiConstructFinalize(pWti);
+ if(pthread_setspecific(thrd_wti_key, pWti) != 0) {
+ DBGPRINTF("wtiGetDummy: error setspecific thrd_wti_key\n");
+ }
+ }
+ return pWti;
+}
+
/* dummy */
rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
@@ -385,6 +446,7 @@ BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
CODESTARTObjClassExit(nsdsel_gtls)
/* release objects we no longer need */
objRelease(glbl, CORE_COMPONENT);
+ pthread_key_delete(thrd_wti_key);
ENDObjClassExit(wti)
@@ -393,8 +455,14 @@ ENDObjClassExit(wti)
* rgerhards, 2008-01-09
*/
BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */
+ int r;
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ r = pthread_key_create(&thrd_wti_key, NULL);
+ if(r != 0) {
+ dbgprintf("wti.c: pthread_key_create failed\n");
+ iRet = RS_RET_ERR;
+ }
ENDObjClassInit(wti)
/* vi:set ai:
diff --git a/runtime/wti.h b/runtime/wti.h
index b0dc6c96..adc7897c 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -1,6 +1,6 @@
/* Definition of the worker thread instance (wti) class.
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2013 Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -26,19 +26,67 @@
#include "wtp.h"
#include "obj.h"
#include "batch.h"
+#include "action.h"
+#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */
+#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */
+#define ACT_STATE_COMM 2 /* transaction finished (a transient state) */
+#define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */
+#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */
+/* note: 3 bit bit field --> highest value is 7! */
+
+/* The following structure defines immutable parameters which need to
+ * be passed as action parameters. Note that the current implementation
+ * does NOT focus on performance, but on a simple PoC in order to get
+ * things going. TODO: Once it works, revisit this code and think about
+ * an array implementation. We also need to support other passing modes
+ * as well. -- gerhards, 2013-11-04
+ */
+typedef struct actWrkrIParams {
+ int msgFlags;
+ /* following are caches to save allocs if not absolutely necessary */
+ uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
+ /* a cache to save malloc(), if not absolutely necessary */
+ unsigned staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+ /* and the same for the message length (if used) */
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
+} actWrkrIParams_t;
+
+typedef struct actWrkrInfo {
+ action_t *pAction;
+ void *actWrkrData;
+ uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an immediate failure following */
+ int iNbrResRtry; /* number of retries since last suspend */
+ struct {
+ unsigned actState : 3;
+ } flags;
+ actWrkrIParams_t *iparams;
+ int currIParam;
+ int maxIParams; /* current max */
+ void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /* for non-strings */
+} actWrkrInfo_t;
+
/* the worker thread instance class */
struct wti_s {
BEGINobjInstance;
pthread_t thrdID; /* thread ID */
int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */
sbool bAlwaysRunning; /* should this thread always run? */
+ int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
+ actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */
pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */
DEF_ATOMIC_HELPER_MUT(mutIsRunning);
+ struct {
+ uint8_t bPrevWasSuspended;
+ uint8_t bDoAutoCommit; /* do a commit after each message
+ * this is usually set for batches with 0 element, but may
+ * also be added as a user-selectable option (not implemented yet)
+ */
+ } execState; /* state for the execution engine */
};
@@ -53,8 +101,96 @@ rsRetVal wtiSetAlwaysRunning(wti_t *pThis);
rsRetVal wtiSetState(wti_t *pThis, sbool bNew);
rsRetVal wtiWakeupThrd(wti_t *pThis);
sbool wtiGetState(wti_t *pThis);
+wti_t *wtiGetDummy(void);
PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
+static inline uint8_t
+getActionStateByNbr(wti_t *pWti, int iActNbr)
+{
+ return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState);
+}
+
+static inline uint8_t
+getActionState(wti_t *pWti, action_t *pAction)
+{
+ return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState);
+}
+
+static inline void
+setActionState(wti_t *pWti, action_t *pAction, uint8_t newState)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState;
+}
+
+static inline uint16_t
+getActionResumeInRow(wti_t *pWti, action_t *pAction)
+{
+ return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow);
+}
+
+static inline void
+setActionResumeInRow(wti_t *pWti, action_t *pAction, uint16_t val)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val;
+}
+
+static inline void
+incActionResumeInRow(wti_t *pWti, action_t *pAction)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++;
+}
+
+static inline int
+getActionNbrResRtry(wti_t *pWti, action_t *pAction)
+{
+ return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry);
+}
+
+static inline void
+setActionNbrResRtry(wti_t *pWti, action_t *pAction, uint16_t val)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val;
+}
+
+static inline void
+incActionNbrResRtry(wti_t *pWti, action_t *pAction)
+{
+ pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++;
+}
+
+static inline rsRetVal
+wtiNewIParam(wti_t *pWti, action_t *pAction, actWrkrIParams_t **piparams)
+{
+ actWrkrInfo_t *wrkrInfo;
+ actWrkrIParams_t *iparams;
+ int newMax;
+ DEFiRet;
+
+ wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
+ if(wrkrInfo->currIParam == wrkrInfo->maxIParams) {
+ /* we need to extend */
+ dbgprintf("DDDD: extending iparams, max %d\n", wrkrInfo->maxIParams);
+ newMax = (wrkrInfo->maxIParams == 0) ? CONF_IPARAMS_BUFSIZE : 2 * wrkrInfo->maxIParams;
+ CHKmalloc(iparams = realloc(wrkrInfo->iparams, sizeof(actWrkrIParams_t) * newMax));
+ wrkrInfo->iparams = iparams;
+ wrkrInfo->maxIParams = newMax;
+ }
+dbgprintf("DDDD: adding param %d for action %d\n", wrkrInfo->currIParam, pAction->iActionNbr);
+ iparams = wrkrInfo->iparams + wrkrInfo->currIParam;
+ memset(iparams, 0, sizeof(actWrkrIParams_t));
+ *piparams = iparams;
+ ++wrkrInfo->currIParam;
+
+finalize_it:
+ RETiRet;
+}
+
+static inline void
+wtiResetExecState(wti_t *pWti, batch_t *pBatch)
+{
+ pWti->execState.bPrevWasSuspended = 0;
+ pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1);
+}
#endif /* #ifndef WTI_H_INCLUDED */
diff --git a/runtime/wtp.c b/runtime/wtp.c
index 895c1ffe..66942e6b 100644
--- a/runtime/wtp.c
+++ b/runtime/wtp.c
@@ -8,7 +8,7 @@
* (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it
* if you are getting aquainted to the object.
*
- * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -91,6 +91,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro!
pthread_cond_init(&pThis->condThrdTrm, NULL);
pthread_attr_init(&pThis->attrThrd);
/* Set thread scheduling policy to default */
+#warning do we need this any longer? I think it was a cure for an already fixed bug..
#ifdef HAVE_PTHREAD_SETSCHEDPARAM
pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy);
pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param);
@@ -121,7 +122,8 @@ wtpConstructFinalize(wtp_t *pThis)
ISOBJ_TYPE_assert(pThis, wtp);
- DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis));
+ DBGPRINTF("%s: finalizing construction of worker thread pool (numworkerThreads %d)\n",
+ wtpGetDbgHdr(pThis), pThis->iNumWorkerThreads);
/* alloc and construct workers - this can only be done in finalizer as we previously do
* not know the max number of workers
*/