diff options
-rw-r--r-- | ChangeLog | 15 | ||||
-rw-r--r-- | action.c | 8 | ||||
-rw-r--r-- | runtime/msg.c | 146 | ||||
-rw-r--r-- | runtime/msg.h | 12 | ||||
-rwxr-xr-x | tests/diag.sh | 4 | ||||
-rw-r--r-- | tools/syslogd.c | 10 |
6 files changed, 26 insertions, 169 deletions
@@ -34,6 +34,21 @@ Version 6.6.1 [v6-stable] 2012-10-?? - bugfix: hostname set in rsyslog.conf was not picked up until HUP which could also mean "never" or "not for a very long time". Thanks to oxpa for providing analysis and a patch +- bugfix: some message properties could be garbled due to race condition + This happened only on very high volume systems, if the same message was + being processed by two different actions. This was a regression caused + by the new config processor, which did no longer properly enable msg + locking in multithreaded cases. The bugfix is actually a refactoring of + the msg locking code - we no longer do unlocked operations, as the use + case for it has mostly gone away. It is potentially possible only at + very low-end systems, and there the small additional overhead of doing + the locking does not really hurt. Instead, the removal of that + capability can actually slightly improve performance in common cases, + as the code path is smaller and requires slightly less memory writes. + That probably outperforms the extra locking overhead (which in the + low-end case always happens in user space, without need for kernel + support as we can always directly aquire the lock - there is no + contention at all). --------------------------------------------------------------------------- Version 6.6.0 [v6-stable] 2012-10-22 This starts a new stable branch, based on the 6.5.x series, plus: @@ -420,14 +420,6 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams) pThis->submitToActQ = doSubmitToActionQBatch; } - /* we need to make a safety check: if the queue is NOT in direct mode, a single - * message object may be accessed by multiple threads. As such, we need to enable - * msg object thread safety in this case (this costs a bit performance and thus - * is not enabled by default. -- rgerhards, 2008-02-20 - */ - if(cs.ActionQueType != QUEUETYPE_DIRECT) - MsgEnableThreadSafety(); - /* create queue */ /* action queues always (for now) have just one worker. This may change when * we begin to implement an interface the enable output modules to request diff --git a/runtime/msg.c b/runtime/msg.c index 187f0c22..45ebf5f9 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -293,120 +293,20 @@ static pthread_mutex_t mutTrimCtr; /* mutex to handle malloc trim */ static int getAPPNAMELen(msg_t *pM, sbool bLockMutex); -/* The following functions will support advanced output module - * multithreading, once this is implemented. Currently, we - * include them as hooks only. The idea is that we need to guard - * some msg objects data fields against concurrent access if - * we run on multiple threads. Please note that in any case this - * is not necessary for calls from INPUT modules, because they - * construct the message object and do this serially. Only when - * the message is in the processing queue, multiple threads may - * access a single object. Consequently, there are no guard functions - * for "set" methods, as these are called during input. Only "get" - * functions that modify important structures have them. - * rgerhards, 2007-07-20 - * We now support locked and non-locked operations, depending on - * the configuration of rsyslog. To support this, we use function - * pointers. Initially, we start in non-locked mode. There, all - * locking operations call into dummy functions. When locking is - * enabled, the function pointers are changed to functions doing - * actual work. We also introduced another MsgPrepareEnqueue() function - * which initializes the locking structures, if needed. This is - * necessary because internal messages during config file startup - * processing are always created in non-locking mode. So we can - * not initialize locking structures during constructions. We now - * postpone this until when the message is fully constructed and - * enqueued. Then we know the status of locking. This has a nice - * side effect, and that is that during the initial creation of - * the Msg object no locking needs to be done, which results in better - * performance. -- rgerhards, 2008-01-05 - */ -static void (*funcLock)(msg_t *pMsg); -static void (*funcUnlock)(msg_t *pMsg); -static void (*funcDeleteMutex)(msg_t *pMsg); -void (*funcMsgPrepareEnqueue)(msg_t *pMsg); -#if 1 /* This is a debug aid */ -#define MsgLock(pMsg) funcLock(pMsg) -#define MsgUnlock(pMsg) funcUnlock(pMsg) -#else -#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; } -#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); } -#endif - -/* the next function is a dummy to be used by the looking functions - * when the class is not yet running in an environment where locking - * is necessary. Please note that the need to lock can (and will) change - * during a single run. Typically, this is depending on the operation mode - * of the message queues (which is operator-configurable). -- rgerhards, 2008-01-05 - */ -static void MsgLockingDummy(msg_t __attribute__((unused)) *pMsg) -{ - /* empty be design */ -} - - -/* The following function prepares a message for enqueue into the queue. This is - * where a message may be accessed by multiple threads. This implementation here - * is the version for multiple concurrent acces. It initializes the locking - * structures. - * TODO: change to an iRet interface! -- rgerhards, 2008-07-14 - */ -static void MsgPrepareEnqueueLockingCase(msg_t *pThis) -{ - BEGINfunc - assert(pThis != NULL); - pthread_mutex_init(&pThis->mut, NULL); - pThis->bDoLock = 1; - ENDfunc -} - - -/* ... and now the locking and unlocking implementations: */ -static void MsgLockLockingCase(msg_t *pThis) +/* the locking and unlocking implementations: */ +static inline void +MsgLock(msg_t *pThis) { /* DEV debug only! dbgprintf("MsgLock(0x%lx)\n", (unsigned long) pThis); */ - assert(pThis != NULL); - if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */ - pthread_mutex_lock(&pThis->mut); + pthread_mutex_lock(&pThis->mut); } - -static void MsgUnlockLockingCase(msg_t *pThis) +static inline void +MsgUnlock(msg_t *pThis) { /* DEV debug only! dbgprintf("MsgUnlock(0x%lx)\n", (unsigned long) pThis); */ - assert(pThis != NULL); - if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */ - pthread_mutex_unlock(&pThis->mut); + pthread_mutex_unlock(&pThis->mut); } -/* delete the mutex object on message destruction (locking case) - */ -static void MsgDeleteMutexLockingCase(msg_t *pThis) -{ - assert(pThis != NULL); - pthread_mutex_destroy(&pThis->mut); -} - -/* enable multiple concurrent access on the message object - * This works on a class-wide basis and can bot be undone. - * That is, if it is once enabled, it can not be disabled during - * the same run. When this function is called, no other thread - * must manipulate message objects. Then we would have race conditions, - * but guarding against this is counter-productive because it - * would cost additional time. Plus, it would be a programming error. - * rgerhards, 2008-01-05 - */ -rsRetVal MsgEnableThreadSafety(void) -{ - DEFiRet; - funcLock = MsgLockLockingCase; - funcUnlock = MsgUnlockLockingCase; - funcMsgPrepareEnqueue = MsgPrepareEnqueueLockingCase; - funcDeleteMutex = MsgDeleteMutexLockingCase; - RETiRet; -} - -/* end locking functions */ - static inline int getProtocolVersion(msg_t *pM) { @@ -707,8 +607,6 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis) /* initialize members in ORDER they appear in structure (think "cache line"!) */ pM->flowCtlType = 0; - pM->bDoLock = 0; - pM->bAlreadyFreed = 0; pM->bParseSuccess = 0; pM->iRefCount = 1; pM->iSeverity = -1; @@ -751,6 +649,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis) pM->pszTIMESTAMP_Unix[0] = '\0'; pM->pszRcvdAt_Unix[0] = '\0'; pM->pszUUID = NULL; + pthread_mutex_init(&pM->mut, NULL); /* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/ @@ -840,15 +739,6 @@ CODESTARTobjDestruct(msg) if(currRefCount == 0) { /* DEV Debugging Only! dbgprintf("msgDestruct\t0x%lx, RefCount now 0, doing DESTROY\n", (unsigned long)pThis); */ - /* The if below is included to try to nail down a well-hidden bug causing - * segfaults. I hope that do to the test code the problem is sooner detected and - * thus we get better data for debugging and resolving it. -- rgerhards, 2011-02-23. - * TODO: remove when no longer needed. - */ - if(pThis->bAlreadyFreed) - abort(); - pThis->bAlreadyFreed = 1; - /* end debug code */ if(pThis->pszRawMsg != pThis->szRawMsg) free(pThis->pszRawMsg); freeTAG(pThis); @@ -886,7 +776,7 @@ CODESTARTobjDestruct(msg) # ifndef HAVE_ATOMIC_BUILTINS MsgUnlock(pThis); # endif - funcDeleteMutex(pThis); + pthread_mutex_destroy(&pThis->mut); /* now we need to do our own optimization. Testing has shown that at least the glibc * malloc() subsystem returns memory to the OS far too late in our case. So we need * to help it a bit, by calling malloc_trim(), which will tell the alloc subsystem @@ -3587,18 +3477,6 @@ finalize_it: #undef isProp -/* This is a construction finalizer that must be called after all properties - * have been set. It does some final work on the message object. After this - * is done, the object is considered ready for full processing. - * rgerhards, 2008-07-08 - */ -static rsRetVal msgConstructFinalizer(msg_t *pThis) -{ - MsgPrepareEnqueue(pThis); - return RS_RET_OK; -} - - /* get the severity - this is an entry point that * satisfies the base object class getSeverity semantics. * rgerhards, 2008-01-14 @@ -3629,13 +3507,7 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE) /* set our own handlers */ OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize); OBJSetMethodHandler(objMethod_SETPROPERTY, MsgSetProperty); - OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, msgConstructFinalizer); OBJSetMethodHandler(objMethod_GETSEVERITY, MsgGetSeverity); - /* initially, we have no need to lock message objects */ - funcLock = MsgLockingDummy; - funcUnlock = MsgLockingDummy; - funcDeleteMutex = MsgLockingDummy; - funcMsgPrepareEnqueue = MsgLockingDummy; /* some more inits */ # if HAVE_MALLOC_TRIM INIT_ATOMIC_HELPER_MUT(mutTrimCtr); diff --git a/runtime/msg.h b/runtime/msg.h index f6b54a77..c0b50709 100644 --- a/runtime/msg.h +++ b/runtime/msg.h @@ -63,7 +63,6 @@ struct msg { once data has entered the queue, this property is no longer needed. */ pthread_mutex_t mut; int iRefCount; /* reference counter (0 = unused) */ - sbool bDoLock; /* use the mutex? */ sbool bAlreadyFreed; /* aid to help detect a well-hidden bad bug -- TODO: remove when no longer needed */ sbool bParseSuccess; /* set to reflect state of last executed higher level parser */ short iSeverity; /* the severity 0..7 */ @@ -176,7 +175,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, char *textpri(char *pRes, size_t pResLen, int pri); rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar); es_str_t* msgGetMsgVarNew(msg_t *pThis, uchar *name); -rsRetVal MsgEnableThreadSafety(void); uchar *getRcvFrom(msg_t *pM); void getTAG(msg_t *pM, uchar **ppBuf, int *piLen); char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt); @@ -201,16 +199,6 @@ rsRetVal propNameToID(cstr_t *pCSPropName, propid_t *pPropID); uchar *propIDToName(propid_t propID); -/* The MsgPrepareEnqueue() function is a macro for performance reasons. - * It needs one global variable to work. This is acceptable, as it gains - * us quite some performance and is fully abstracted using this header file. - * The important thing is that no other module is permitted to actually - * access that global variable! -- rgerhards, 2008-01-05 - */ -extern void (*funcMsgPrepareEnqueue)(msg_t *pMsg); -#define MsgPrepareEnqueue(pMsg) funcMsgPrepareEnqueue(pMsg) - - /* ------------------------------ some inline functions ------------------------------ */ /* set raw message size. This is needed in some cases where a trunctation is necessary diff --git a/tests/diag.sh b/tests/diag.sh index 02b24c5b..bd38b29d 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -10,8 +10,8 @@ #valgrind="valgrind --tool=helgrind --log-fd=1" #valgrind="valgrind --tool=exp-ptrcheck --log-fd=1" #set -o xtrace -export RSYSLOG_DEBUG="debug nologfuncflow noprintmutexaction nostdout" -export RSYSLOG_DEBUGLOG="log" +#export RSYSLOG_DEBUG="debug nologfuncflow noprintmutexaction nostdout" +#export RSYSLOG_DEBUGLOG="log" case $1 in 'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason cp $srcdir/testsuites/diag-common.conf diag-common.conf diff --git a/tools/syslogd.c b/tools/syslogd.c index 5cc1b319..a89c7e57 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -638,7 +638,6 @@ submitMsg(msg_t *pMsg) FINALIZE; } - MsgPrepareEnqueue(pMsg); qqueueEnqObj(pQueue, pMsg->flowCtlType, (void*) pMsg); finalize_it: @@ -672,10 +671,6 @@ multiSubmitMsg(multi_submit_t *pMultiSub) FINALIZE; } - for(i = 0 ; i < pMultiSub->nElem ; ++i) { - MsgPrepareEnqueue(pMultiSub->ppMsgs[i]); - } - iRet = pQueue->MultiEnq(pQueue, pMultiSub); pMultiSub->nElem = 0; @@ -1119,11 +1114,6 @@ rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName) uchar qfrenamebuf[1024]; DEFiRet; - /* switch the message object to threaded operation, if necessary */ - if(ourConf->globals.mainQ.MainMsgQueType == QUEUETYPE_DIRECT || ourConf->globals.mainQ.iMainMsgQueueNumWorkers > 1) { - MsgEnableThreadSafety(); - } - /* create message queue */ CHKiRet_Hdlr(qqueueConstruct(ppQueue, ourConf->globals.mainQ.MainMsgQueType, ourConf->globals.mainQ.iMainMsgQueueNumWorkers, ourConf->globals.mainQ.iMainMsgQueueSize, msgConsumer)) { /* no queue is fatal, we need to give up in that case... */ |