diff options
Diffstat (limited to 'runtime/msg.c')
-rw-r--r-- | runtime/msg.c | 221 |
1 files changed, 89 insertions, 132 deletions
diff --git a/runtime/msg.c b/runtime/msg.c index d648bb92..ce863299 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -302,120 +302,20 @@ static uchar * jsonPathGetLeaf(uchar *name, int lenName); static struct json_object *jsonDeepCopy(struct json_object *src); -/* The following functions will support advanced output module - * multithreading, once this is implemented. Currently, we - * include them as hooks only. The idea is that we need to guard - * some msg objects data fields against concurrent access if - * we run on multiple threads. Please note that in any case this - * is not necessary for calls from INPUT modules, because they - * construct the message object and do this serially. Only when - * the message is in the processing queue, multiple threads may - * access a single object. Consequently, there are no guard functions - * for "set" methods, as these are called during input. Only "get" - * functions that modify important structures have them. - * rgerhards, 2007-07-20 - * We now support locked and non-locked operations, depending on - * the configuration of rsyslog. To support this, we use function - * pointers. Initially, we start in non-locked mode. There, all - * locking operations call into dummy functions. When locking is - * enabled, the function pointers are changed to functions doing - * actual work. We also introduced another MsgPrepareEnqueue() function - * which initializes the locking structures, if needed. This is - * necessary because internal messages during config file startup - * processing are always created in non-locking mode. So we can - * not initialize locking structures during constructions. We now - * postpone this until when the message is fully constructed and - * enqueued. Then we know the status of locking. This has a nice - * side effect, and that is that during the initial creation of - * the Msg object no locking needs to be done, which results in better - * performance. -- rgerhards, 2008-01-05 - */ -static void (*funcLock)(msg_t *pMsg); -static void (*funcUnlock)(msg_t *pMsg); -static void (*funcDeleteMutex)(msg_t *pMsg); -void (*funcMsgPrepareEnqueue)(msg_t *pMsg); -#if 1 /* This is a debug aid */ -#define MsgLock(pMsg) funcLock(pMsg) -#define MsgUnlock(pMsg) funcUnlock(pMsg) -#else -#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; } -#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); } -#endif - -/* the next function is a dummy to be used by the looking functions - * when the class is not yet running in an environment where locking - * is necessary. Please note that the need to lock can (and will) change - * during a single run. Typically, this is depending on the operation mode - * of the message queues (which is operator-configurable). -- rgerhards, 2008-01-05 - */ -static void MsgLockingDummy(msg_t __attribute__((unused)) *pMsg) -{ - /* empty be design */ -} - - -/* The following function prepares a message for enqueue into the queue. This is - * where a message may be accessed by multiple threads. This implementation here - * is the version for multiple concurrent acces. It initializes the locking - * structures. - * TODO: change to an iRet interface! -- rgerhards, 2008-07-14 - */ -static void MsgPrepareEnqueueLockingCase(msg_t *pThis) -{ - BEGINfunc - assert(pThis != NULL); - pthread_mutex_init(&pThis->mut, NULL); - pThis->bDoLock = 1; - ENDfunc -} - - -/* ... and now the locking and unlocking implementations: */ -static void MsgLockLockingCase(msg_t *pThis) +/* the locking and unlocking implementations: */ +static inline void +MsgLock(msg_t *pThis) { /* DEV debug only! dbgprintf("MsgLock(0x%lx)\n", (unsigned long) pThis); */ - assert(pThis != NULL); - if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */ - pthread_mutex_lock(&pThis->mut); + pthread_mutex_lock(&pThis->mut); } - -static void MsgUnlockLockingCase(msg_t *pThis) +static inline void +MsgUnlock(msg_t *pThis) { /* DEV debug only! dbgprintf("MsgUnlock(0x%lx)\n", (unsigned long) pThis); */ - assert(pThis != NULL); - if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */ - pthread_mutex_unlock(&pThis->mut); + pthread_mutex_unlock(&pThis->mut); } -/* delete the mutex object on message destruction (locking case) - */ -static void MsgDeleteMutexLockingCase(msg_t *pThis) -{ - assert(pThis != NULL); - pthread_mutex_destroy(&pThis->mut); -} - -/* enable multiple concurrent access on the message object - * This works on a class-wide basis and can bot be undone. - * That is, if it is once enabled, it can not be disabled during - * the same run. When this function is called, no other thread - * must manipulate message objects. Then we would have race conditions, - * but guarding against this is counter-productive because it - * would cost additional time. Plus, it would be a programming error. - * rgerhards, 2008-01-05 - */ -rsRetVal MsgEnableThreadSafety(void) -{ - DEFiRet; - funcLock = MsgLockLockingCase; - funcUnlock = MsgUnlockLockingCase; - funcMsgPrepareEnqueue = MsgPrepareEnqueueLockingCase; - funcDeleteMutex = MsgDeleteMutexLockingCase; - RETiRet; -} - -/* end locking functions */ - /* rgerhards 2012-04-18: set associated ruleset (by ruleset name) * If ruleset cannot be found, no update is done. @@ -728,8 +628,6 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis) /* initialize members in ORDER they appear in structure (think "cache line"!) */ pM->flowCtlType = 0; - pM->bDoLock = 0; - pM->bAlreadyFreed = 0; pM->bParseSuccess = 0; pM->iRefCount = 1; pM->iSeverity = -1; @@ -772,6 +670,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis) pM->pszTIMESTAMP_Unix[0] = '\0'; pM->pszRcvdAt_Unix[0] = '\0'; pM->pszUUID = NULL; + pthread_mutex_init(&pM->mut, NULL); /* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/ @@ -874,15 +773,6 @@ CODESTARTobjDestruct(msg) if(currRefCount == 0) { /* DEV Debugging Only! dbgprintf("msgDestruct\t0x%lx, RefCount now 0, doing DESTROY\n", (unsigned long)pThis); */ - /* The if below is included to try to nail down a well-hidden bug causing - * segfaults. I hope that do to the test code the problem is sooner detected and - * thus we get better data for debugging and resolving it. -- rgerhards, 2011-02-23. - * TODO: remove when no longer needed. - */ - if(pThis->bAlreadyFreed) - abort(); - pThis->bAlreadyFreed = 1; - /* end debug code */ if(pThis->pszRawMsg != pThis->szRawMsg) free(pThis->pszRawMsg); freeTAG(pThis); @@ -920,7 +810,7 @@ CODESTARTobjDestruct(msg) # ifndef HAVE_ATOMIC_BUILTINS MsgUnlock(pThis); # endif - funcDeleteMutex(pThis); + pthread_mutex_destroy(&pThis->mut); /* now we need to do our own optimization. Testing has shown that at least the glibc * malloc() subsystem returns memory to the OS far too late in our case. So we need * to help it a bit, by calling malloc_trim(), which will tell the alloc subsystem @@ -3759,17 +3649,89 @@ msgGetMsgVarNew(msg_t *pThis, uchar *name) } -/* This is a construction finalizer that must be called after all properties - * have been set. It does some final work on the message object. After this - * is done, the object is considered ready for full processing. - * rgerhards, 2008-07-08 +/* This function can be used as a generic way to set properties. + * We have to handle a lot of legacy, so our return value is not always + * 100% correct (called functions do not always provide one, should + * change over time). + * rgerhards, 2008-01-07 */ -rsRetVal -msgConstructFinalizer(msg_t *pThis) +#define isProp(name) !rsCStrSzStrCmp(pProp->pcsName, (uchar*) name, sizeof(name) - 1) +rsRetVal MsgSetProperty(msg_t *pThis, var_t *pProp) { - MsgPrepareEnqueue(pThis); - return RS_RET_OK; + prop_t *myProp; + prop_t *propRcvFrom = NULL; + prop_t *propRcvFromIP = NULL; + struct json_tokener *tokener; + struct json_object *json; + DEFiRet; + + ISOBJ_TYPE_assert(pThis, msg); + assert(pProp != NULL); + + if(isProp("iProtocolVersion")) { + setProtocolVersion(pThis, pProp->val.num); + } else if(isProp("iSeverity")) { + pThis->iSeverity = pProp->val.num; + } else if(isProp("iFacility")) { + pThis->iFacility = pProp->val.num; + } else if(isProp("msgFlags")) { + pThis->msgFlags = pProp->val.num; + } else if(isProp("offMSG")) { + MsgSetMSGoffs(pThis, pProp->val.num); + } else if(isProp("pszRawMsg")) { + MsgSetRawMsg(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr), cstrLen(pProp->val.pStr)); + } else if(isProp("pszUxTradMsg")) { + /*IGNORE*/; /* this *was* a property, but does no longer exist */ + } else if(isProp("pszTAG")) { + MsgSetTAG(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), cstrLen(pProp->val.pStr)); + } else if(isProp("pszInputName")) { + /* we need to create a property */ + CHKiRet(prop.Construct(&myProp)); + CHKiRet(prop.SetString(myProp, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr))); + CHKiRet(prop.ConstructFinalize(myProp)); + MsgSetInputName(pThis, myProp); + prop.Destruct(&myProp); + } else if(isProp("pszRcvFromIP")) { + MsgSetRcvFromIPStr(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr), &propRcvFromIP); + prop.Destruct(&propRcvFromIP); + } else if(isProp("pszRcvFrom")) { + MsgSetRcvFromStr(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr), &propRcvFrom); + prop.Destruct(&propRcvFrom); + } else if(isProp("pszHOSTNAME")) { + MsgSetHOSTNAME(pThis, rsCStrGetSzStrNoNULL(pProp->val.pStr), rsCStrLen(pProp->val.pStr)); + } else if(isProp("pCSStrucData")) { + MsgSetStructuredData(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr)); + } else if(isProp("pCSAPPNAME")) { + MsgSetAPPNAME(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr)); + } else if(isProp("pCSPROCID")) { + MsgSetPROCID(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr)); + } else if(isProp("pCSMSGID")) { + MsgSetMSGID(pThis, (char*) rsCStrGetSzStrNoNULL(pProp->val.pStr)); + } else if(isProp("ttGenTime")) { + pThis->ttGenTime = pProp->val.num; + } else if(isProp("tRcvdAt")) { + memcpy(&pThis->tRcvdAt, &pProp->val.vSyslogTime, sizeof(struct syslogTime)); + } else if(isProp("tTIMESTAMP")) { + memcpy(&pThis->tTIMESTAMP, &pProp->val.vSyslogTime, sizeof(struct syslogTime)); + } else if(isProp("pszRuleset")) { + MsgSetRulesetByName(pThis, pProp->val.pStr); + } else if(isProp("pszMSG")) { + dbgprintf("no longer supported property pszMSG silently ignored\n"); + } else if(isProp("json")) { + tokener = json_tokener_new(); + json = json_tokener_parse_ex(tokener, (char*)rsCStrGetSzStrNoNULL(pProp->val.pStr), + cstrLen(pProp->val.pStr)); + json_tokener_free(tokener); + msgAddJSON(pThis, (uchar*)"!", json); + } else { + dbgprintf("unknown supported property '%s' silently ignored\n", + rsCStrGetSzStrNoNULL(pProp->pcsName)); + } + +finalize_it: + RETiRet; } +#undef isProp /* get the severity - this is an entry point that @@ -4079,11 +4041,6 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE) /* set our own handlers */ OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize); - /* initially, we have no need to lock message objects */ - funcLock = MsgLockingDummy; - funcUnlock = MsgLockingDummy; - funcDeleteMutex = MsgLockingDummy; - funcMsgPrepareEnqueue = MsgLockingDummy; /* some more inits */ # if HAVE_MALLOC_TRIM INIT_ATOMIC_HELPER_MUT(mutTrimCtr); |