diff options
-rw-r--r-- | action.c | 18 | ||||
-rw-r--r-- | action.h | 1 | ||||
-rw-r--r-- | runtime/batch.h | 22 | ||||
-rw-r--r-- | runtime/dnscache.c | 2 | ||||
-rw-r--r-- | runtime/msg.c | 25 | ||||
-rw-r--r-- | runtime/msg.h | 4 | ||||
-rw-r--r-- | runtime/obj.c | 71 | ||||
-rw-r--r-- | runtime/obj.h | 4 | ||||
-rw-r--r-- | runtime/queue.c | 155 | ||||
-rw-r--r-- | runtime/queue.h | 16 | ||||
-rw-r--r-- | runtime/ruleset.c | 15 | ||||
-rw-r--r-- | runtime/typedefs.h | 1 | ||||
-rw-r--r-- | tools/syslogd.c | 6 |
13 files changed, 201 insertions, 139 deletions
@@ -438,7 +438,7 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) CHKiRet(qqueueConstruct(&pThis->pQueue, cs.ActionQueType, 1, cs.iActionQueueSize, (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszAName); - qqueueSetpUsr(pThis->pQueue, pThis); + qqueueSetpAction(pThis->pQueue, pThis); if(queueParams == NULL) { /* use legacy params? */ /* ... set some properties ... */ @@ -811,7 +811,7 @@ prepareDoActionParams(action_t *pAction, batch_obj_t *pElem, struct syslogTime * ASSERT(pAction != NULL); ASSERT(pElem != NULL); - pMsg = (msg_t*) pElem->pUsrp; + pMsg = pElem->pMsg; /* here we must loop to process all requested strings */ for(i = 0 ; i < pAction->iNumTpls ; ++i) { switch(pAction->eParamPassing) { @@ -1061,7 +1061,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) * enq side of the queue (see file header comment)! -- rgerhards, 2011-06-15 */ if(batchIsValidElem(pBatch, i)) { - pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + pMsg = pBatch->pElem[i].pMsg; localRet = actionProcessMessage(pAction, pMsg, pBatch->pElem[i].staticActParams, pBatch->pbShutdownImmediate); DBGPRINTF("action %p call returned %d\n", pAction, localRet); @@ -1392,9 +1392,9 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) STATSCOUNTER_INC(pAction->ctrProcessed, pAction->mutCtrProcessed); if(pAction->pQueue->qType == QUEUETYPE_DIRECT) - iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); + iRet = qqueueEnqMsgDirect(pAction->pQueue, MsgAddRef(pMsg)); else - iRet = qqueueEnqObj(pAction->pQueue, eFLOWCTL_NO_DELAY, (void*) MsgAddRef(pMsg)); + iRet = qqueueEnqMsg(pAction->pQueue, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); finalize_it: RETiRet; @@ -1480,7 +1480,7 @@ doActionCallAction(action_t *pAction, batch_t *pBatch, int idxBtch) msg_t *pMsg; DEFiRet; - pMsg = (msg_t*)(pBatch->pElem[idxBtch].pUsrp); + pMsg = pBatch->pElem[idxBtch].pMsg; pAction->tActNow = -1; /* we do not yet know our current time (clear prev. value) */ /* don't output marks to recently written outputs */ @@ -1572,7 +1572,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) * also faster ;) -- rgerhards, 2008-09-17 */ do { lastAct = pAction->f_time; - if(((msg_t*)(pBatch->pElem[i].pUsrp))->msgFlags & MARK) { + if(pBatch->pElem[i].pMsg->msgFlags & MARK) { if((now - lastAct) < MarkInterval / 2) { pBatch->active[i] = 0; DBGPRINTF("batch item %d: action was recently called, ignoring " @@ -1581,7 +1581,7 @@ doSubmitToActionQNotAllMarkBatch(action_t *pAction, batch_t *pBatch) } } } while(ATOMIC_CAS_time_t(&pAction->f_time, lastAct, - ((msg_t*)(pBatch->pElem[i].pUsrp))->ttGenTime, &pAction->mutCAS) == 0); + pBatch->pElem[i].pMsg->ttGenTime, &pAction->mutCAS) == 0); if(pBatch->active[i]) { DBGPRINTF("Called action(NotAllMark), processing batch[%d] via '%s'\n", i, module.GetStateName(pAction->pMod)); @@ -1684,7 +1684,7 @@ doSubmitToActionQBatch(action_t *pAction, batch_t *pBatch) pAction->bExecWhenPrevSusp, pBatch->pElem[i].bPrevWasSuspended); if( batchIsValidElem(pBatch, i) && (pAction->bExecWhenPrevSusp == 0 || pBatch->pElem[i].bPrevWasSuspended == 1)) { - doSubmitToActionQ(pAction, (msg_t*)(pBatch->pElem[i].pUsrp)); + doSubmitToActionQ(pAction, pBatch->pElem[i].pMsg); } } } @@ -46,7 +46,6 @@ typedef enum { /* the following struct defines the action object data structure */ -typedef struct action_s action_t; struct action_s { time_t f_time; /* used for "max. n messages in m seconds" processing */ time_t tActNow; /* the current time for an action execution. Initially set to -1 and diff --git a/runtime/batch.h b/runtime/batch.h index f743c188..0f19f5bb 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -46,7 +46,7 @@ typedef enum { /* an object inside a batch, including any information (state!) needed for it to "life". */ struct batch_obj_s { - obj_t *pUsrp; /* pointer to user object (most often message) */ + msg_t *pMsg; batch_state_t state; /* associated state */ /* work variables for action processing; these are reused for each action (or block of * actions) @@ -97,13 +97,13 @@ batchSetSingleRuleset(batch_t *pBatch, sbool val) { /* get the batches ruleset (if we have a single ruleset) */ static inline ruleset_t* batchGetRuleset(batch_t *pBatch) { - return (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL; + return (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL; } /* get the ruleset of a specifc element of the batch (index not verified!) */ static inline ruleset_t* batchElemGetRuleset(batch_t *pBatch, int i) { - return ((msg_t*) pBatch->pElem[i].pUsrp)->pRuleset; + return pBatch->pElem[i].pMsg->pRuleset; } /* get number of msgs for this batch */ @@ -134,22 +134,6 @@ batchIsValidElem(batch_t *pBatch, int i) { } -/* copy one batch element to another. - * This creates a complete duplicate in those cases where - * it is needed. Use duplication only when absolutely necessary! - * Note that all working fields are reset to zeros. If that were - * not done, we would have potential problems with invalid - * or double pointer frees. - * rgerhards, 2010-06-10 - */ -static inline void -batchCopyElem(batch_obj_t *pDest, batch_obj_t *pSrc) { - memset(pDest, 0, sizeof(batch_obj_t)); - pDest->pUsrp = pSrc->pUsrp; - pDest->state = pSrc->state; -} - - /* free members of a batch "object". Note that we can not do the usual * destruction as the object typically is allocated on the stack and so the * object itself cannot be freed! -- rgerhards, 2010-06-15 diff --git a/runtime/dnscache.c b/runtime/dnscache.c index 32d6e425..0b89d0bb 100644 --- a/runtime/dnscache.c +++ b/runtime/dnscache.c @@ -314,7 +314,7 @@ finalize_it: * TODO: implement! */ static inline rsRetVal -validateEntry(dnscache_entry_t *etry, struct sockaddr_storage *addr) +validateEntry(dnscache_entry_t __attribute__((unused)) *etry, struct sockaddr_storage __attribute__((unused)) *addr) { return RS_RET_OK; } diff --git a/runtime/msg.c b/runtime/msg.c index b0b93f98..dca49a6d 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -814,6 +814,19 @@ finalize_it: } +/* Special msg constructor, to be used when an object is deserialized. + * we do only the base init as we know the properties will be set in + * any case by the deserializer. We still do the "inexpensive" inits + * just to be on the safe side. The whole process needs to be + * refactored together with the msg serialization subsystem. + */ +rsRetVal +msgConstructForDeserializer(msg_t **ppThis) +{ + return msgBaseConstruct(ppThis); +} + + /* some free handlers for (slightly) complicated cases... All of them may be called * with an empty element. */ @@ -3665,7 +3678,8 @@ finalize_it: * is done, the object is considered ready for full processing. * rgerhards, 2008-07-08 */ -static rsRetVal msgConstructFinalizer(msg_t *pThis) +rsRetVal +msgConstructFinalizer(msg_t *pThis) { MsgPrepareEnqueue(pThis); return RS_RET_OK; @@ -3676,12 +3690,10 @@ static rsRetVal msgConstructFinalizer(msg_t *pThis) * satisfies the base object class getSeverity semantics. * rgerhards, 2008-01-14 */ -static rsRetVal -MsgGetSeverity(obj_t_ptr pThis, int *piSeverity) +rsRetVal +MsgGetSeverity(msg_t *pMsg, int *piSeverity) { - ISOBJ_TYPE_assert(pThis, msg); - assert(piSeverity != NULL); - *piSeverity = ((msg_t*) pThis)->iSeverity; + *piSeverity = pMsg->iSeverity; return RS_RET_OK; } @@ -3982,7 +3994,6 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE) OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, MsgSetProperty); OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, msgConstructFinalizer); - OBJSetMethodHandler(objMethod_GETSEVERITY, MsgGetSeverity); /* initially, we have no need to lock message objects */ funcLock = MsgLockingDummy; funcUnlock = MsgLockingDummy; diff --git a/runtime/msg.h b/runtime/msg.h index 172ae0da..950de559 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -147,7 +147,10 @@ struct msg { PROTOTYPEObjClassInit(msg); rsRetVal msgConstruct(msg_t **ppThis); rsRetVal msgConstructWithTime(msg_t **ppThis, struct syslogTime *stTime, time_t ttGenTime); +rsRetVal msgConstructForDeserializer(msg_t **ppThis); +rsRetVal msgConstructFinalizer(msg_t *pThis); rsRetVal msgDestruct(msg_t **ppM); +rsRetVal MsgSetProperty(msg_t *pThis, var_t *pProp); msg_t* MsgDup(msg_t* pOld); msg_t *MsgAddRef(msg_t *pM); void setProtocolVersion(msg_t *pM, int iNewVersion); @@ -187,6 +190,7 @@ rsRetVal msgGetCEEVar(msg_t *pThis, cstr_t *propName, var_t **ppVar); es_str_t* msgGetCEEVarNew(msg_t *pMsg, char *name); rsRetVal msgAddJSON(msg_t *pM, uchar *name, struct json_object *json); rsRetVal getCEEPropVal(msg_t *pM, es_str_t *propName, uchar **pRes, rs_size_t *buflen, unsigned short *pbMustBeFreed); +rsRetVal MsgGetSeverity(msg_t *pThis, int *piSeverity); /* TODO: remove these five (so far used in action.c) */ uchar *getMSG(msg_t *pM); diff --git a/runtime/obj.c b/runtime/obj.c index 3ecf9ab2..cf4ef50d 100644 --- a/runtime/obj.c +++ b/runtime/obj.c @@ -767,7 +767,7 @@ finalize_it: * of the trailer. Header must already have been processed. * rgerhards, 2008-01-11 */ -static rsRetVal objDeserializeProperties(obj_t *pObj, objInfo_t *pObjInfo, strm_t *pStrm) +static rsRetVal objDeserializeProperties(obj_t *pObj, rsRetVal (*objSetProperty)(), strm_t *pStrm) { DEFiRet; var_t *pVar = NULL; @@ -781,7 +781,7 @@ static rsRetVal objDeserializeProperties(obj_t *pObj, objInfo_t *pObjInfo, strm_ iRet = objDeserializeProperty(pVar, pStrm); while(iRet == RS_RET_OK) { - CHKiRet(pObjInfo->objMethods[objMethod_SETPROPERTY](pObj, pVar)); + CHKiRet(objSetProperty(pObj, pVar)); /* re-init var object - TODO: method of var! */ rsCStrDestruct(&pVar->pcsName); /* no longer needed */ if(pVar->varType == VARTYPE_STR) { @@ -848,7 +848,7 @@ Deserialize(void *ppObj, uchar *pszTypeExpected, strm_t *pStrm, rsRetVal (*fFixu CHKiRet(pObjInfo->objMethods[objMethod_CONSTRUCT](&pObj)); /* we got the object, now we need to fill the properties */ - CHKiRet(objDeserializeProperties(pObj, pObjInfo, pStrm)); + CHKiRet(objDeserializeProperties(pObj, pObjInfo->objMethods[objMethod_SETPROPERTY], pStrm)); /* check if we need to call a fixup function that modifies the object * before it is finalized. -- rgerhards, 2008-01-13 @@ -873,6 +873,67 @@ finalize_it: } +/* De-Serialize an object, with known constructur and destructor. Params like Deserialize(). + * rgerhards, 2012-11-03 + */ +rsRetVal +objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr, rsRetVal (*objConstruct)(), rsRetVal (*objConstructFinalize)(), rsRetVal (*objSetProperty)()) +{ + DEFiRet; + rsRetVal iRetLocal; + obj_t *pObj = NULL; + int oVers = 0; /* keep compiler happy, but it is totally useless but takes up some execution time... */ + cstr_t *pstrID = NULL; + + assert(ppObj != NULL); + assert(pszTypeExpected != NULL); + ISOBJ_TYPE_assert(pStrm, strm); + + /* we de-serialize the header. if all goes well, we are happy. However, if + * we experience a problem, we try to recover. We do this by skipping to + * the next object header. This is defined via the line-start cookies. In + * worst case, we exhaust the queue, but then we receive EOF return state, + * from objDeserializeTryRecover(), what will cause us to ultimately give up. + * rgerhards, 2008-07-08 + */ + do { + iRetLocal = objDeserializeHeader((uchar*) "Obj", &pstrID, &oVers, pStrm); + if(iRetLocal != RS_RET_OK) { + dbgprintf("objDeserialize error %d during header processing - trying to recover\n", iRetLocal); + CHKiRet(objDeserializeTryRecover(pStrm)); + } + } while(iRetLocal != RS_RET_OK); + + if(rsCStrSzStrCmp(pstrID, pszTypeExpected, ustrlen(pszTypeExpected))) /* TODO: optimize strlen() - caller shall provide */ + ABORT_FINALIZE(RS_RET_INVALID_OID); + + CHKiRet(objConstruct(&pObj)); + + /* we got the object, now we need to fill the properties */ + CHKiRet(objDeserializeProperties(pObj, objSetProperty, pStrm)); + + /* check if we need to call a fixup function that modifies the object + * before it is finalized. -- rgerhards, 2008-01-13 + */ + if(fFixup != NULL) + CHKiRet(fFixup(pObj, pUsr)); + + /* we have a valid object, let's finalize our work and return */ + CHKiRet(objConstructFinalize(pObj)); + + *((obj_t**) ppObj) = pObj; + +finalize_it: + if(iRet != RS_RET_OK && pObj != NULL) + free(pObj); /* TODO: check if we can call destructor 2008-01-13 rger */ + + if(pstrID != NULL) + rsCStrDestruct(&pstrID); + + RETiRet; +} + + /* De-Serialize an object, but treat it as property bag. * rgerhards, 2008-01-11 */ @@ -909,7 +970,7 @@ objDeserializeObjAsPropBag(obj_t *pObj, strm_t *pStrm) CHKiRet(FindObjInfo(pstrID, &pObjInfo)); /* we got the object, now we need to fill the properties */ - CHKiRet(objDeserializeProperties(pObj, pObjInfo, pStrm)); + CHKiRet(objDeserializeProperties(pObj, pObjInfo->objMethods[objMethod_SETPROPERTY], pStrm)); finalize_it: if(pstrID != NULL) @@ -961,7 +1022,7 @@ DeserializePropBag(obj_t *pObj, strm_t *pStrm) CHKiRet(FindObjInfo(pstrID, &pObjInfo)); /* we got the object, now we need to fill the properties */ - CHKiRet(objDeserializeProperties(pObj, pObjInfo, pStrm)); + CHKiRet(objDeserializeProperties(pObj, pObjInfo->objMethods[objMethod_SETPROPERTY], pStrm)); finalize_it: if(pstrID != NULL) diff --git a/runtime/obj.h b/runtime/obj.h index 32f7ef09..a93befa3 100644 --- a/runtime/obj.h +++ b/runtime/obj.h @@ -83,10 +83,7 @@ ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ; \ ((obj_t*) (pThis))->pszName = NULL #endif -#define objDestruct(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DESTRUCT])(&pThis) #define objSerialize(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_SERIALIZE]) -#define objGetSeverity(pThis, piSever) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_GETSEVERITY])(pThis, piSever) -#define objDebugPrint(pThis) (((obj_t*) (pThis))->pObjInfo->objMethods[objMethod_DEBUGPRINT])(pThis) #define OBJSetMethodHandler(methodID, pHdlr) \ CHKiRet(obj.InfoSetMethod(pObjInfoOBJ, methodID, (rsRetVal (*)(void*)) pHdlr)) @@ -121,6 +118,7 @@ ENDinterface(obj) rsRetVal objGetObjInterface(obj_if_t *pIf); PROTOTYPEObjClassInit(obj); PROTOTYPEObjClassExit(obj); +rsRetVal objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, strm_t *pStrm, rsRetVal (*fFixup)(obj_t*,void*), void *pUsr, rsRetVal (*objConstruct)(), rsRetVal (*objConstructFinalize)(), rsRetVal (*objSetProperty)()); /* the following definition is only for "friends" */ diff --git a/runtime/queue.c b/runtime/queue.c index fbf77108..ed486037 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -59,7 +59,6 @@ #include "datetime.h" #include "unicode-helper.h" #include "statsobj.h" -#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS # include <sched.h> @@ -74,7 +73,7 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(statsobj) /* forward-definitions */ -static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); +static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -83,7 +82,7 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); @@ -312,16 +311,16 @@ getLogicalQueueSize(qqueue_t *pThis) */ static inline void queueDrain(qqueue_t *pThis) { - void *pUsr; + msg_t *pMsg; ASSERT(pThis != NULL); BEGINfunc DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { - pThis->qDeq(pThis, &pUsr); - if(pUsr != NULL) { - objDestruct(pUsr); + pThis->qDeq(pThis, &pMsg); + if(pMsg != NULL) { + msgDestruct(&pMsg); } pThis->qDel(pThis); } @@ -411,7 +410,7 @@ StartDA(qqueue_t *pThis) */ pThis->pqDA->pqParent = pThis; - CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(qqueueSetpAction(pThis->pqDA, pThis->pAction)); CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); @@ -546,7 +545,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis) } -static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) +static rsRetVal qAddFixedArray(qqueue_t *pThis, msg_t* in) { DEFiRet; @@ -560,7 +559,7 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) } -static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) +static rsRetVal qDeqFixedArray(qqueue_t *pThis, msg_t **out) { DEFiRet; @@ -621,7 +620,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) RETiRet; } -static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pMsg) { qLinkedList_t *pEntry; DEFiRet; @@ -629,7 +628,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; - pEntry->pUsr = pUsr; + pEntry->pMsg = pMsg; if(pThis->tVars.linklist.pDelRoot == NULL) { pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry; @@ -647,14 +646,13 @@ finalize_it: } -static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) +static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppMsg) { qLinkedList_t *pEntry; DEFiRet; pEntry = pThis->tVars.linklist.pDeqRoot; - ISOBJ_TYPE_assert(pEntry->pUsr, msg); - *ppUsr = pEntry->pUsr; + *ppMsg = pEntry->pMsg; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; RETiRet; @@ -744,18 +742,12 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) { DEFiRet; strm_t *psQIF = NULL; - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; struct stat stat_buf; ISOBJ_TYPE_assert(pThis, qqueue); - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - /* check if the file exists */ - if(stat((char*) pszQIFNam, &stat_buf) == -1) { + if(stat((char*) pThis->pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { DBGOPRINT((obj_t*) pThis, "clean startup, no .qi file found\n"); ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); @@ -770,7 +762,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) CHKiRet(strm.Construct(&psQIF)); CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_READ)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); /* first, we try to read the property bag for ourselfs */ @@ -895,7 +887,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) RETiRet; } -static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pMsg) { DEFiRet; number_t nWriteCount; @@ -903,7 +895,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) ASSERT(pThis != NULL); CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount)); - CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); + CHKiRet((objSerialize(pMsg))(pMsg, pThis->tVars.disk.pWrite)); CHKiRet(strm.Flush(pThis->tVars.disk.pWrite)); CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */ @@ -913,7 +905,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) * the in-memory representation. The instance will be re-created upon * dequeue. -- rgerhards, 2008-07-09 */ - objDestruct(pUsr); + msgDestruct(&pMsg); DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n", nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly); @@ -923,25 +915,37 @@ finalize_it: } -static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr) +static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; - iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); + iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, + NULL, msgConstructForDeserializer, msgConstructFinalizer, MsgSetProperty); RETiRet; } +/* the following function is a dummy to be used for qDelDisk, which (currently) must + * provide constructors & others even though it does not really need the object. So + * instead of using the real ones, we use the dummy here. That at least saves some time. + * Of course, in the longer term the whole process should be refactored. + * rgerhards, 2012-11-03 + */ +static rsRetVal +qDelDiskCallbackDummy(void) +{ + return RS_RET_OK; +} static rsRetVal qDelDisk(qqueue_t *pThis) { - obj_t *pDummyObj; /* we need to deserialize it... */ + obj_t *pDummyObj; /* another dummy, nothing is created */ DEFiRet; int64 offsIn; int64 offsOut; CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); - CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL)); - objDestruct(pDummyObj); + CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, + NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, qDelDiskCallbackDummy)); CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); /* This time it is a bit tricky: we free disk space only upon file deletion. So we need @@ -976,7 +980,7 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) { batch_t singleBatch; batch_obj_t batchObj; @@ -998,16 +1002,16 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); batchObj.state = BATCH_STATE_RDY; - batchObj.pUsrp = (obj_t*) pUsr; + batchObj.pMsg = pMsg; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); } - objDestruct(pUsr); + msgDestruct(&pMsg); RETiRet; } @@ -1029,7 +1033,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, pBatch, &pThis->bShutdownImmediate); RETiRet; } @@ -1050,13 +1054,13 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -qqueueAdd(qqueue_t *pThis, void *pUsr) +qqueueAdd(qqueue_t *pThis, msg_t *pMsg) { DEFiRet; ASSERT(pThis != NULL); - CHKiRet(pThis->qAdd(pThis, pUsr)); + CHKiRet(pThis->qAdd(pThis, pMsg)); if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); @@ -1072,7 +1076,7 @@ finalize_it: /* generic code to dequeue a queue entry */ static rsRetVal -qqueueDeq(qqueue_t *pThis, void **ppUsr) +qqueueDeq(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; @@ -1083,7 +1087,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) * If we decrement, however, we may lose a message. But that is better than * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ - iRet = pThis->qDeq(pThis, ppUsr); + iRet = pThis->qDeq(pThis, ppMsg); ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq); // DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", @@ -1426,22 +1430,21 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) * the return state! * rgerhards, 2008-01-24 */ -static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pMsg) { DEFiRet; rsRetVal iRetLocal; int iSeverity; ISOBJ_TYPE_assert(pThis, qqueue); - ISOBJ_assert(pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { - iRetLocal = objGetSeverity(pUsr, &iSeverity); + iRetLocal = MsgGetSeverity(pMsg, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", iQueueSize, iSeverity); STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg " @@ -1527,7 +1530,7 @@ static inline rsRetVal DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; - void *pUsr; + msg_t *pMsg; int nEnqueued = 0; rsRetVal localRet; DEFiRet; @@ -1536,17 +1539,16 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { - pUsr = pBatch->pElem[i].pUsrp; + pMsg = pBatch->pElem[i].pMsg; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { - localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, - (obj_t*)MsgAddRef((msg_t*) pUsr)); + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); } } - objDestruct(pUsr); + msgDestruct(&pMsg); } DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); @@ -1578,7 +1580,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz int nDiscarded; int nDeleted; int iQueueSize; - void *pUsr; + msg_t *pMsg; rsRetVal localRet; DEFiRet; @@ -1587,10 +1589,10 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz nDequeued = nDiscarded = 0; while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { - CHKiRet(qqueueDeq(pThis, &pUsr)); + CHKiRet(qqueueDeq(pThis, &pMsg)); /* check if we should discard this element */ - localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr); + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg); if(localRet == RS_RET_QUEUE_FULL) { ++nDiscarded; continue; @@ -1599,7 +1601,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz } /* all well, use this element */ - pWti->batch.pElem[nDequeued].pUsrp = pUsr; + pWti->batch.pElem[nDequeued].pMsg = pMsg; pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; ++nDequeued; } @@ -1841,7 +1843,7 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* at this spot, we may be cancelled */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1903,8 +1905,8 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) * the message. So far, we simply assume we always have msg_t, what currently is always the case. * rgerhards, 2009-05-28 */ - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, - (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); + CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, + MsgAddRef(pWti->batch.pElem[i].pMsg))); pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ } @@ -1985,6 +1987,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; uchar pszBuf[64]; + uchar pszQIFNam[MAXFNAME]; int wrk; uchar *qName; size_t lenBuf; @@ -2007,8 +2010,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qConstruct = qConstructLinkedList; pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; - pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; - pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->qDeq = qDeqLinkedList; + pThis->qDel = qDelLinkedList; pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_DISK: @@ -2020,6 +2023,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->MultiEnq = qqueueMultiEnqObjNonDirect; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ + /* pre-construct file name for .qi file */ + pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), + "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); + pThis->pszQIFNam = ustrdup(pszQIFNam); + DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam, + (int) pThis->lenQIFNam); break; case QUEUETYPE_DIRECT: pThis->qConstruct = qConstructDirect; @@ -2157,8 +2166,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) { DEFiRet; strm_t *psQIF = NULL; /* Queue Info File */ - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; ASSERT(pThis != NULL); @@ -2176,13 +2183,9 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) DBGOPRINT((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis)); - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) { if(pThis->bNeedDelQIF) { - unlink((char*)pszQIFNam); + unlink((char*)pThis->pszQIFNam); pThis->bNeedDelQIF = 0; } /* indicate spool file needs to be deleted */ @@ -2195,7 +2198,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC)); CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); /* first, write the property bag for ourselfs @@ -2437,7 +2440,7 @@ finalize_it: * rgerhards, 2009-06-16 */ static inline rsRetVal -doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) { DEFiRet; int err; @@ -2446,7 +2449,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued); /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ - CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2523,7 +2526,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) if(pThis->toEnq == 0 || pThis->bEnqOnly) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq); @@ -2535,7 +2538,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); @@ -2543,7 +2546,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) } /* and finally enqueue the message */ - CHKiRet(qqueueAdd(pThis, pUsr)); + CHKiRet(qqueueAdd(pThis, pMsg)); STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize); finalize_it: @@ -2619,11 +2622,11 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pUsr); + iRet = qAddDirect(pThis, pMsg); RETiRet; } @@ -2632,7 +2635,7 @@ qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) { DEFiRet; int iCancelStateSave; @@ -2644,7 +2647,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) d_pthread_mutex_lock(pThis->mut); } - CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr)); + CHKiRet(doEnqSingleObj(pThis, flowCtlType, pMsg)); qqueueChkPersist(pThis, 1); @@ -2764,7 +2767,7 @@ DEFpropSetMeth(qqueue, iLightDlyMrk, int) DEFpropSetMeth(qqueue, bIsDA, int) DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) -DEFpropSetMeth(qqueue, pUsr, void*) +DEFpropSetMeth(qqueue, pAction, action_t*) DEFpropSetMeth(qqueue, iDeqSlowdown, int) DEFpropSetMeth(qqueue, iDeqBatchSize, int) DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) diff --git a/runtime/queue.h b/runtime/queue.h index edb770c6..e6ccdcdb 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -51,7 +51,7 @@ typedef enum { /* list member definition for linked list types of queues: */ typedef struct qLinkedList_S { struct qLinkedList_S *pNext; - void *pUsr; + msg_t *pMsg; } qLinkedList_t; @@ -71,7 +71,7 @@ struct queue_s { int iMinMsgsPerWrkr;/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */ wtp_t *pWtpDA; wtp_t *pWtpReg; - void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */ + action_t *pAction; /* for action queues, ptr to action object; for main queues unused */ int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ sbool bSyncQueueFiles;/* if working with files, sync them after each write? */ @@ -111,8 +111,8 @@ struct queue_s { /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); rsRetVal (*qDestruct)(struct queue_s *pThis); - rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr); - rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); + rsRetVal (*qAdd)(struct queue_s *pThis, msg_t *pMsg); + rsRetVal (*qDeq)(struct queue_s *pThis, msg_t **ppMsg); rsRetVal (*qDel)(struct queue_s *pThis); /* end type-specific handler */ /* public entry points (set during construction, permit to set best algorithm for params selected) */ @@ -135,6 +135,8 @@ struct queue_s { size_t lenSpoolDir; uchar *pszFilePrefix; size_t lenFilePrefix; + uchar *pszQIFNam; /* full .qi file name, based on parts above */ + size_t lenQIFNam; int iNumberFiles; /* how many files make up the queue? */ int64 iMaxFileSize; /* max size for a single queue file */ int64 sizeOnDiskMax; /* maximum size on disk allowed */ @@ -184,8 +186,8 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr); -rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); +rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg); +rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); @@ -213,7 +215,7 @@ PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int); PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int); PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int); PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); -PROTOTYPEpropSetMeth(qqueue, pUsr, void*); +PROTOTYPEpropSetMeth(qqueue, pAction, action_t*); PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64); PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int); diff --git a/runtime/ruleset.c b/runtime/ruleset.c index 24d8279c..bc8f5234 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -194,7 +194,7 @@ processBatchMultiRuleset(batch_t *pBatch) for(i = iStart ; i < pBatch->nElem ; ++i) { if(batchElemGetRuleset(pBatch, i) == currRuleset) { /* for performance reasons, we copy only those members that we actually need */ - snglRuleBatch.pElem[iNew].pUsrp = pBatch->pElem[i].pUsrp; + snglRuleBatch.pElem[iNew].pMsg = pBatch->pElem[i].pMsg; snglRuleBatch.pElem[iNew].state = pBatch->pElem[i].state; ++iNew; /* We indicate the element also as done, so it will not be processed again */ @@ -244,8 +244,8 @@ execSet(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { if( pBatch->pElem[i].state != BATCH_STATE_DISC && (active == NULL || active[i])) { - cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pUsrp); - msgSetJSONFromVar((msg_t*)pBatch->pElem[i].pUsrp, stmt->d.s_set.varname, + cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pMsg); + msgSetJSONFromVar(pBatch->pElem[i].pMsg, stmt->d.s_set.varname, &result); varDelete(&result); } @@ -261,7 +261,7 @@ execUnset(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { if( pBatch->pElem[i].state != BATCH_STATE_DISC && (active == NULL || active[i])) { - msgUnsetJSON((msg_t*)pBatch->pElem[i].pUsrp, stmt->d.s_unset.varname); + msgUnsetJSON(pBatch->pElem[i].pMsg, stmt->d.s_unset.varname); } } RETiRet; @@ -303,8 +303,7 @@ execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) if(pBatch->pElem[i].state == BATCH_STATE_DISC) continue; /* will be ignored in any case */ if(active == NULL || active[i]) { - bRet = cnfexprEvalBool(stmt->d.s_if.expr, - (msg_t*)(pBatch->pElem[i].pUsrp)); + bRet = cnfexprEvalBool(stmt->d.s_if.expr, pBatch->pElem[i].pMsg); } else bRet = 0; newAct[i] = bRet; @@ -337,7 +336,7 @@ execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { if(pBatch->pElem[i].state == BATCH_STATE_DISC) continue; /* will be ignored in any case */ - pMsg = (msg_t*)(pBatch->pElem[i].pUsrp); + pMsg = pBatch->pElem[i].pMsg; if(active == NULL || active[i]) { if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) || ((stmt->d.s_prifilt.pmask[pMsg->iFacility] @@ -466,7 +465,7 @@ execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) if(pBatch->pElem[i].state == BATCH_STATE_DISC) continue; /* will be ignored in any case */ if(active == NULL || active[i]) { - bRet = evalPROPFILT(stmt, (msg_t*)(pBatch->pElem[i].pUsrp)); + bRet = evalPROPFILT(stmt, pBatch->pElem[i].pMsg); } else bRet = 0; thenAct[i] = bRet; diff --git a/runtime/typedefs.h b/runtime/typedefs.h index 1e0cb466..5cc24e4a 100644 --- a/runtime/typedefs.h +++ b/runtime/typedefs.h @@ -93,6 +93,7 @@ typedef struct outchannels_s outchannels_t; typedef struct modConfData_s modConfData_t; typedef struct instanceConf_s instanceConf_t; typedef struct ratelimit_s ratelimit_t; +typedef struct action_s action_t; typedef int rs_size_t; /* we do never need more than 2Gig strings, signed permits to * use -1 as a special flag. */ typedef rsRetVal (*prsf_t)(struct vmstk_s*, int); /* pointer to a RainerScript function */ diff --git a/tools/syslogd.c b/tools/syslogd.c index e347794b..a2ce6469 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -514,10 +514,10 @@ preprocessBatch(batch_t *pBatch) { DEFiRet; bSingleRuleset = 1; - batchRuleset = (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL; + batchRuleset = (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL; for(i = 0 ; i < pBatch->nElem && !*(pBatch->pbShutdownImmediate) ; i++) { - pMsg = (msg_t*) pBatch->pElem[i].pUsrp; + pMsg = pBatch->pElem[i].pMsg; if((pMsg->msgFlags & NEEDS_ACLCHK_U) != 0) { DBGPRINTF("msgConsumer: UDP ACL must be checked for message (hostname-based)\n"); if(net.cvthname(pMsg->rcvFrom.pfrominet, fromHost, fromHostFQDN, fromHostIP) != RS_RET_OK) @@ -603,7 +603,7 @@ submitMsg2(msg_t *pMsg) } MsgPrepareEnqueue(pMsg); - qqueueEnqObj(pQueue, pMsg->flowCtlType, (void*) pMsg); + qqueueEnqMsg(pQueue, pMsg->flowCtlType, pMsg); finalize_it: RETiRet; |