diff options
-rw-r--r-- | action.c | 2 | ||||
-rw-r--r-- | runtime/queue.c | 51 | ||||
-rw-r--r-- | runtime/wti.c | 42 | ||||
-rw-r--r-- | runtime/wti.h | 1 |
4 files changed, 77 insertions, 19 deletions
@@ -1183,7 +1183,7 @@ static rsRetVal setActionQueType(void __attribute__((unused)) *pVal, uchar *pszT /* This submits the message to the action queue in case we do NOT need to handle repeat * message processing. That case permits us to gain lots of freedom during processing - * and thus speed. This is also utilized to submit messages in complex case once + * and thus speed. This is also utilized to submit messages in more complex cases once * the complex logic has been applied ;) * rgerhards, 2010-06-08 */ diff --git a/runtime/queue.c b/runtime/queue.c index 5d9296b1..a04e4f4d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -81,8 +81,8 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal); static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti); -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti); +static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); @@ -708,13 +708,12 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; /* these entry points shall not be used in direct mode - To catch program errors, make us abort if that happens! - TODO: currently main q in direct mode WILL abort! - rgerhards, 2013-11-05 + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 */ - pThis->qAdd = NULL; + pThis->qAdd = qAddDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; pThis->qDel = NULL; - pThis->MultiEnq = NULL; if(pThis->pqParent != NULL) { DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n"); pThis->pqParent->bIsDA = 0; @@ -964,7 +963,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) +static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) { batch_t singleBatch; batch_obj_t batchObj; @@ -994,6 +993,22 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) RETiRet; } +/* this is called if we do not have a pWti. This currently only happens + * when we are called from a main queue in direct mode. If so, we need + * to obtain a dummy pWti. + */ +static rsRetVal +qAddDirect(qqueue_t *pThis, msg_t* pMsg) +{ + wti_t *pWti; + DEFiRet; + + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + iRet = qAddDirectWithWti(pThis, pMsg, pWti); + RETiRet; +} + static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) { return RS_RET_OK; @@ -2077,13 +2092,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; /* these entry points shall not be used in direct mode - To catch program errors, make us abort if that happens! - TODO: currently main q in direct mode WILL abort! - rgerhards, 2013-11-05 + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 */ - pThis->qAdd = NULL; + pThis->qAdd = qAddDirect; + pThis->MultiEnq = qqueueMultiEnqObjDirect; pThis->qDel = NULL; - pThis->MultiEnq = NULL; break; } @@ -2656,16 +2670,17 @@ finalize_it: /* now, the same function, but for direct mode */ static rsRetVal -qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub, wti_t *pWti) +qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int i; + wti_t *pWti; DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - assert(pMultiSub != NULL); + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; for(i = 0 ; i < pMultiSub->nElem ; ++i) { - CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); + CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); } finalize_it: @@ -2684,7 +2699,7 @@ qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg, wti_t *pWti) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pMsg, pWti); + iRet = qAddDirectWithWti(pThis, pMsg, pWti); RETiRet; } diff --git a/runtime/wti.c b/runtime/wti.c index ddffd81a..4642b526 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -51,6 +51,8 @@ DEFobjStaticHelpers DEFobjCurrIf(glbl) +pthread_key_t thrd_wti_key; + /* forward-definitions */ /* methods */ @@ -206,6 +208,11 @@ wtiConstructFinalize(wti_t *pThis) /* must use calloc as we need zero-init */ CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t))); + if(pThis->pWtp == NULL) { + dbgprintf("wtiConstructFinalize: pWtp not set, this may be intentional\n"); + FINALIZE; + } + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); CHKiRet(batchInit(&pThis->batch, iDeqBatchSize)); @@ -391,6 +398,34 @@ finalize_it: RETiRet; } + +/* This function returns (and creates if necessary) a dummy wti suitable + * for use by the rule engine. It is intended to be used for direct-mode + * main queues (folks, don't do that!). Once created, data is stored in + * thread-specific storage. + * Note: we do NOT do error checking -- if this functions fails, all the + * rest will fail as well... (also, it will only fail under OOM, so...). + * Memleak: we leak pWti's when run in direct mode. However, this is only + * a cosmetic leak, as we need them until all inputs are terminated, + * what means essentially until rsyslog itself is terminated. So we + * don't care -- it's just not nice in valgrind, but that's it. + */ +wti_t * +wtiGetDummy(void) +{ + wti_t *pWti; + + pWti = (wti_t*) pthread_getspecific(thrd_wti_key); + if(pWti == NULL) { + wtiConstruct(&pWti); + wtiConstructFinalize(pWti); + if(pthread_setspecific(thrd_wti_key, pWti) != 0) { + DBGPRINTF("wtiGetDummy: error setspecific thrd_wti_key\n"); + } + } + return pWti; +} + /* dummy */ rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } @@ -400,6 +435,7 @@ BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */ CODESTARTObjClassExit(nsdsel_gtls) /* release objects we no longer need */ objRelease(glbl, CORE_COMPONENT); + pthread_key_delete(thrd_wti_key); ENDObjClassExit(wti) @@ -408,8 +444,14 @@ ENDObjClassExit(wti) * rgerhards, 2008-01-09 */ BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */ + int r; /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); + r = pthread_key_create(&thrd_wti_key, NULL); + if(r != 0) { + dbgprintf("wti.c: pthread_key_create failed\n"); + iRet = RS_RET_ERR; + } ENDObjClassInit(wti) /* vi:set ai: diff --git a/runtime/wti.h b/runtime/wti.h index 813237fc..f8a1bf53 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -94,6 +94,7 @@ rsRetVal wtiSetAlwaysRunning(wti_t *pThis); rsRetVal wtiSetState(wti_t *pThis, sbool bNew); rsRetVal wtiWakeupThrd(wti_t *pThis); sbool wtiGetState(wti_t *pThis); +wti_t *wtiGetDummy(void); PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*); |