summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--action.c2
-rw-r--r--runtime/queue.c51
-rw-r--r--runtime/wti.c42
-rw-r--r--runtime/wti.h1
4 files changed, 77 insertions, 19 deletions
diff --git a/action.c b/action.c
index 46451b95..deb59258 100644
--- a/action.c
+++ b/action.c
@@ -1183,7 +1183,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT
/* This submits the message to the action queue in case we do NOT need to handle repeat
* message processing. That case permits us to gain lots of freedom during processing
- * and thus speed. This is also utilized to submit messages in complex case once
+ * and thus speed. This is also utilized to submit messages in more complex cases once
* the complex logic has been applied ;)
* rgerhards, 2010-06-08
*/
diff --git a/runtime/queue.c b/runtime/queue.c
index 5d9296b1..a04e4f4d 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -81,8 +81,8 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti);
static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti);
static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
-static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti);
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti);
+static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub);
+static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
@@ -708,13 +708,12 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
/* these entry points shall not be used in direct mode
- To catch program errors, make us abort if that happens!
- TODO: currently main q in direct mode WILL abort!
- rgerhards, 2013-11-05
+ * To catch program errors, make us abort if that happens!
+ * rgerhards, 2013-11-05
*/
- pThis->qAdd = NULL;
+ pThis->qAdd = qAddDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
pThis->qDel = NULL;
- pThis->MultiEnq = NULL;
if(pThis->pqParent != NULL) {
DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
pThis->pqParent->bIsDA = 0;
@@ -964,7 +963,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
+static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
{
batch_t singleBatch;
batch_obj_t batchObj;
@@ -994,6 +993,22 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
RETiRet;
}
+/* this is called if we do not have a pWti. This currently only happens
+ * when we are called from a main queue in direct mode. If so, we need
+ * to obtain a dummy pWti.
+ */
+static rsRetVal
+qAddDirect(qqueue_t *pThis, msg_t* pMsg)
+{
+ wti_t *pWti;
+ DEFiRet;
+
+ pWti = wtiGetDummy();
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
+ iRet = qAddDirectWithWti(pThis, pMsg, pWti);
+ RETiRet;
+}
+
static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
{
return RS_RET_OK;
@@ -2077,13 +2092,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
/* these entry points shall not be used in direct mode
- To catch program errors, make us abort if that happens!
- TODO: currently main q in direct mode WILL abort!
- rgerhards, 2013-11-05
+ * To catch program errors, make us abort if that happens!
+ * rgerhards, 2013-11-05
*/
- pThis->qAdd = NULL;
+ pThis->qAdd = qAddDirect;
+ pThis->MultiEnq = qqueueMultiEnqObjDirect;
pThis->qDel = NULL;
- pThis->MultiEnq = NULL;
break;
}
@@ -2656,16 +2670,17 @@ finalize_it:
/* now, the same function, but for direct mode */
static rsRetVal
-qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti)
+qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int i;
+ wti_t *pWti;
DEFiRet;
- ISOBJ_TYPE_assert(pThis, qqueue);
- assert(pMultiSub != NULL);
+ pWti = wtiGetDummy();
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
- CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
+ CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
}
finalize_it:
@@ -2684,7 +2699,7 @@ qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
- iRet = qAddDirect(pThis, pMsg, pWti);
+ iRet = qAddDirectWithWti(pThis, pMsg, pWti);
RETiRet;
}
diff --git a/runtime/wti.c b/runtime/wti.c
index ddffd81a..4642b526 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -51,6 +51,8 @@
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
+pthread_key_t thrd_wti_key;
+
/* forward-definitions */
/* methods */
@@ -206,6 +208,11 @@ wtiConstructFinalize(wti_t *pThis)
/* must use calloc as we need zero-init */
CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t)));
+ if(pThis->pWtp == NULL) {
+ dbgprintf("wtiConstructFinalize: pWtp not set, this may be intentional\n");
+ FINALIZE;
+ }
+
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
CHKiRet(batchInit(&pThis->batch, iDeqBatchSize));
@@ -391,6 +398,34 @@ finalize_it:
RETiRet;
}
+
+/* This function returns (and creates if necessary) a dummy wti suitable
+ * for use by the rule engine. It is intended to be used for direct-mode
+ * main queues (folks, don't do that!). Once created, data is stored in
+ * thread-specific storage.
+ * Note: we do NOT do error checking -- if this functions fails, all the
+ * rest will fail as well... (also, it will only fail under OOM, so...).
+ * Memleak: we leak pWti's when run in direct mode. However, this is only
+ * a cosmetic leak, as we need them until all inputs are terminated,
+ * what means essentially until rsyslog itself is terminated. So we
+ * don't care -- it's just not nice in valgrind, but that's it.
+ */
+wti_t *
+wtiGetDummy(void)
+{
+ wti_t *pWti;
+
+ pWti = (wti_t*) pthread_getspecific(thrd_wti_key);
+ if(pWti == NULL) {
+ wtiConstruct(&pWti);
+ wtiConstructFinalize(pWti);
+ if(pthread_setspecific(thrd_wti_key, pWti) != 0) {
+ DBGPRINTF("wtiGetDummy: error setspecific thrd_wti_key\n");
+ }
+ }
+ return pWti;
+}
+
/* dummy */
rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; }
@@ -400,6 +435,7 @@ BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
CODESTARTObjClassExit(nsdsel_gtls)
/* release objects we no longer need */
objRelease(glbl, CORE_COMPONENT);
+ pthread_key_delete(thrd_wti_key);
ENDObjClassExit(wti)
@@ -408,8 +444,14 @@ ENDObjClassExit(wti)
* rgerhards, 2008-01-09
*/
BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */
+ int r;
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ r = pthread_key_create(&thrd_wti_key, NULL);
+ if(r != 0) {
+ dbgprintf("wti.c: pthread_key_create failed\n");
+ iRet = RS_RET_ERR;
+ }
ENDObjClassInit(wti)
/* vi:set ai:
diff --git a/runtime/wti.h b/runtime/wti.h
index 813237fc..f8a1bf53 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -94,6 +94,7 @@ rsRetVal wtiSetAlwaysRunning(wti_t *pThis);
rsRetVal wtiSetState(wti_t *pThis, sbool bNew);
rsRetVal wtiWakeupThrd(wti_t *pThis);
sbool wtiGetState(wti_t *pThis);
+wti_t *wtiGetDummy(void);
PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);