summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog15
-rw-r--r--action.c8
-rw-r--r--runtime/msg.c146
-rw-r--r--runtime/msg.h12
-rw-r--r--tools/syslogd.c10
5 files changed, 24 insertions, 167 deletions
diff --git a/ChangeLog b/ChangeLog
index ad642571..afdc645f 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -274,6 +274,21 @@ Version 6.6.1 [v6-stable] 2012-10-??
- bugfix: hostname set in rsyslog.conf was not picked up until HUP
which could also mean "never" or "not for a very long time".
Thanks to oxpa for providing analysis and a patch
+- bugfix: some message properties could be garbled due to race condition
+ This happened only on very high volume systems, if the same message was
+ being processed by two different actions. This was a regression caused
+ by the new config processor, which did no longer properly enable msg
+ locking in multithreaded cases. The bugfix is actually a refactoring of
+ the msg locking code - we no longer do unlocked operations, as the use
+ case for it has mostly gone away. It is potentially possible only at
+ very low-end systems, and there the small additional overhead of doing
+ the locking does not really hurt. Instead, the removal of that
+ capability can actually slightly improve performance in common cases,
+ as the code path is smaller and requires slightly less memory writes.
+ That probably outperforms the extra locking overhead (which in the
+ low-end case always happens in user space, without need for kernel
+ support as we can always directly aquire the lock - there is no
+ contention at all).
---------------------------------------------------------------------------
Version 6.6.0 [v6-stable] 2012-10-22
This starts a new stable branch, based on the 6.5.x series, plus:
diff --git a/action.c b/action.c
index ca260f92..dbd4e702 100644
--- a/action.c
+++ b/action.c
@@ -429,14 +429,6 @@ actionConstructFinalize(action_t *pThis, struct cnfparamvals *queueParams)
pThis->submitToActQ = doSubmitToActionQBatch;
}
- /* we need to make a safety check: if the queue is NOT in direct mode, a single
- * message object may be accessed by multiple threads. As such, we need to enable
- * msg object thread safety in this case (this costs a bit performance and thus
- * is not enabled by default. -- rgerhards, 2008-02-20
- */
- if(cs.ActionQueType != QUEUETYPE_DIRECT)
- MsgEnableThreadSafety();
-
/* create queue */
/* action queues always (for now) have just one worker. This may change when
* we begin to implement an interface the enable output modules to request
diff --git a/runtime/msg.c b/runtime/msg.c
index b627cd59..09f6d649 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -300,120 +300,20 @@ static uchar * jsonPathGetLeaf(uchar *name, int lenName);
static struct json_object *jsonDeepCopy(struct json_object *src);
-/* The following functions will support advanced output module
- * multithreading, once this is implemented. Currently, we
- * include them as hooks only. The idea is that we need to guard
- * some msg objects data fields against concurrent access if
- * we run on multiple threads. Please note that in any case this
- * is not necessary for calls from INPUT modules, because they
- * construct the message object and do this serially. Only when
- * the message is in the processing queue, multiple threads may
- * access a single object. Consequently, there are no guard functions
- * for "set" methods, as these are called during input. Only "get"
- * functions that modify important structures have them.
- * rgerhards, 2007-07-20
- * We now support locked and non-locked operations, depending on
- * the configuration of rsyslog. To support this, we use function
- * pointers. Initially, we start in non-locked mode. There, all
- * locking operations call into dummy functions. When locking is
- * enabled, the function pointers are changed to functions doing
- * actual work. We also introduced another MsgPrepareEnqueue() function
- * which initializes the locking structures, if needed. This is
- * necessary because internal messages during config file startup
- * processing are always created in non-locking mode. So we can
- * not initialize locking structures during constructions. We now
- * postpone this until when the message is fully constructed and
- * enqueued. Then we know the status of locking. This has a nice
- * side effect, and that is that during the initial creation of
- * the Msg object no locking needs to be done, which results in better
- * performance. -- rgerhards, 2008-01-05
- */
-static void (*funcLock)(msg_t *pMsg);
-static void (*funcUnlock)(msg_t *pMsg);
-static void (*funcDeleteMutex)(msg_t *pMsg);
-void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
-#if 1 /* This is a debug aid */
-#define MsgLock(pMsg) funcLock(pMsg)
-#define MsgUnlock(pMsg) funcUnlock(pMsg)
-#else
-#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; }
-#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); }
-#endif
-
-/* the next function is a dummy to be used by the looking functions
- * when the class is not yet running in an environment where locking
- * is necessary. Please note that the need to lock can (and will) change
- * during a single run. Typically, this is depending on the operation mode
- * of the message queues (which is operator-configurable). -- rgerhards, 2008-01-05
- */
-static void MsgLockingDummy(msg_t __attribute__((unused)) *pMsg)
-{
- /* empty be design */
-}
-
-
-/* The following function prepares a message for enqueue into the queue. This is
- * where a message may be accessed by multiple threads. This implementation here
- * is the version for multiple concurrent acces. It initializes the locking
- * structures.
- * TODO: change to an iRet interface! -- rgerhards, 2008-07-14
- */
-static void MsgPrepareEnqueueLockingCase(msg_t *pThis)
-{
- BEGINfunc
- assert(pThis != NULL);
- pthread_mutex_init(&pThis->mut, NULL);
- pThis->bDoLock = 1;
- ENDfunc
-}
-
-
-/* ... and now the locking and unlocking implementations: */
-static void MsgLockLockingCase(msg_t *pThis)
+/* the locking and unlocking implementations: */
+static inline void
+MsgLock(msg_t *pThis)
{
/* DEV debug only! dbgprintf("MsgLock(0x%lx)\n", (unsigned long) pThis); */
- assert(pThis != NULL);
- if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */
- pthread_mutex_lock(&pThis->mut);
+ pthread_mutex_lock(&pThis->mut);
}
-
-static void MsgUnlockLockingCase(msg_t *pThis)
+static inline void
+MsgUnlock(msg_t *pThis)
{
/* DEV debug only! dbgprintf("MsgUnlock(0x%lx)\n", (unsigned long) pThis); */
- assert(pThis != NULL);
- if(pThis->bDoLock == 1) /* TODO: this is a testing hack, we should find a way with better performance! -- rgerhards, 2009-01-27 */
- pthread_mutex_unlock(&pThis->mut);
+ pthread_mutex_unlock(&pThis->mut);
}
-/* delete the mutex object on message destruction (locking case)
- */
-static void MsgDeleteMutexLockingCase(msg_t *pThis)
-{
- assert(pThis != NULL);
- pthread_mutex_destroy(&pThis->mut);
-}
-
-/* enable multiple concurrent access on the message object
- * This works on a class-wide basis and can bot be undone.
- * That is, if it is once enabled, it can not be disabled during
- * the same run. When this function is called, no other thread
- * must manipulate message objects. Then we would have race conditions,
- * but guarding against this is counter-productive because it
- * would cost additional time. Plus, it would be a programming error.
- * rgerhards, 2008-01-05
- */
-rsRetVal MsgEnableThreadSafety(void)
-{
- DEFiRet;
- funcLock = MsgLockLockingCase;
- funcUnlock = MsgUnlockLockingCase;
- funcMsgPrepareEnqueue = MsgPrepareEnqueueLockingCase;
- funcDeleteMutex = MsgDeleteMutexLockingCase;
- RETiRet;
-}
-
-/* end locking functions */
-
static inline int getProtocolVersion(msg_t *pM)
{
@@ -716,8 +616,6 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
/* initialize members in ORDER they appear in structure (think "cache line"!) */
pM->flowCtlType = 0;
- pM->bDoLock = 0;
- pM->bAlreadyFreed = 0;
pM->bParseSuccess = 0;
pM->iRefCount = 1;
pM->iSeverity = -1;
@@ -760,6 +658,7 @@ static inline rsRetVal msgBaseConstruct(msg_t **ppThis)
pM->pszTIMESTAMP_Unix[0] = '\0';
pM->pszRcvdAt_Unix[0] = '\0';
pM->pszUUID = NULL;
+ pthread_mutex_init(&pM->mut, NULL);
/* DEV debugging only! dbgprintf("msgConstruct\t0x%x, ref 1\n", (int)pM);*/
@@ -849,15 +748,6 @@ CODESTARTobjDestruct(msg)
if(currRefCount == 0)
{
/* DEV Debugging Only! dbgprintf("msgDestruct\t0x%lx, RefCount now 0, doing DESTROY\n", (unsigned long)pThis); */
- /* The if below is included to try to nail down a well-hidden bug causing
- * segfaults. I hope that do to the test code the problem is sooner detected and
- * thus we get better data for debugging and resolving it. -- rgerhards, 2011-02-23.
- * TODO: remove when no longer needed.
- */
- if(pThis->bAlreadyFreed)
- abort();
- pThis->bAlreadyFreed = 1;
- /* end debug code */
if(pThis->pszRawMsg != pThis->szRawMsg)
free(pThis->pszRawMsg);
freeTAG(pThis);
@@ -895,7 +785,7 @@ CODESTARTobjDestruct(msg)
# ifndef HAVE_ATOMIC_BUILTINS
MsgUnlock(pThis);
# endif
- funcDeleteMutex(pThis);
+ pthread_mutex_destroy(&pThis->mut);
/* now we need to do our own optimization. Testing has shown that at least the glibc
* malloc() subsystem returns memory to the OS far too late in our case. So we need
* to help it a bit, by calling malloc_trim(), which will tell the alloc subsystem
@@ -3656,18 +3546,6 @@ finalize_it:
#undef isProp
-/* This is a construction finalizer that must be called after all properties
- * have been set. It does some final work on the message object. After this
- * is done, the object is considered ready for full processing.
- * rgerhards, 2008-07-08
- */
-static rsRetVal msgConstructFinalizer(msg_t *pThis)
-{
- MsgPrepareEnqueue(pThis);
- return RS_RET_OK;
-}
-
-
/* get the severity - this is an entry point that
* satisfies the base object class getSeverity semantics.
* rgerhards, 2008-01-14
@@ -3977,13 +3855,7 @@ BEGINObjClassInit(msg, 1, OBJ_IS_CORE_MODULE)
/* set our own handlers */
OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize);
OBJSetMethodHandler(objMethod_SETPROPERTY, MsgSetProperty);
- OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, msgConstructFinalizer);
OBJSetMethodHandler(objMethod_GETSEVERITY, MsgGetSeverity);
- /* initially, we have no need to lock message objects */
- funcLock = MsgLockingDummy;
- funcUnlock = MsgLockingDummy;
- funcDeleteMutex = MsgLockingDummy;
- funcMsgPrepareEnqueue = MsgLockingDummy;
/* some more inits */
# if HAVE_MALLOC_TRIM
INIT_ATOMIC_HELPER_MUT(mutTrimCtr);
diff --git a/runtime/msg.h b/runtime/msg.h
index 396e861f..ab479001 100644
--- a/runtime/msg.h
+++ b/runtime/msg.h
@@ -64,7 +64,6 @@ struct msg {
once data has entered the queue, this property is no longer needed. */
pthread_mutex_t mut;
int iRefCount; /* reference counter (0 = unused) */
- sbool bDoLock; /* use the mutex? */
sbool bAlreadyFreed; /* aid to help detect a well-hidden bad bug -- TODO: remove when no longer needed */
sbool bParseSuccess; /* set to reflect state of last executed higher level parser */
short iSeverity; /* the severity 0..7 */
@@ -177,7 +176,6 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
char *textpri(char *pRes, size_t pResLen, int pri);
rsRetVal msgGetMsgVar(msg_t *pThis, cstr_t *pstrPropName, var_t **ppVar);
es_str_t* msgGetMsgVarNew(msg_t *pThis, uchar *name);
-rsRetVal MsgEnableThreadSafety(void);
uchar *getRcvFrom(msg_t *pM);
void getTAG(msg_t *pM, uchar **ppBuf, int *piLen);
char *getTimeReported(msg_t *pM, enum tplFormatTypes eFmt);
@@ -213,16 +211,6 @@ msgUnsetJSON(msg_t *pMsg, uchar *varname) {
}
-/* The MsgPrepareEnqueue() function is a macro for performance reasons.
- * It needs one global variable to work. This is acceptable, as it gains
- * us quite some performance and is fully abstracted using this header file.
- * The important thing is that no other module is permitted to actually
- * access that global variable! -- rgerhards, 2008-01-05
- */
-extern void (*funcMsgPrepareEnqueue)(msg_t *pMsg);
-#define MsgPrepareEnqueue(pMsg) funcMsgPrepareEnqueue(pMsg)
-
-
/* ------------------------------ some inline functions ------------------------------ */
/* set raw message size. This is needed in some cases where a trunctation is necessary
diff --git a/tools/syslogd.c b/tools/syslogd.c
index ae821f60..62c18e72 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -638,7 +638,6 @@ submitMsg(msg_t *pMsg)
FINALIZE;
}
- MsgPrepareEnqueue(pMsg);
qqueueEnqObj(pQueue, pMsg->flowCtlType, (void*) pMsg);
finalize_it:
@@ -672,10 +671,6 @@ multiSubmitMsg(multi_submit_t *pMultiSub)
FINALIZE;
}
- for(i = 0 ; i < pMultiSub->nElem ; ++i) {
- MsgPrepareEnqueue(pMultiSub->ppMsgs[i]);
- }
-
iRet = pQueue->MultiEnq(pQueue, pMultiSub);
pMultiSub->nElem = 0;
@@ -1124,11 +1119,6 @@ rsRetVal createMainQueue(qqueue_t **ppQueue, uchar *pszQueueName, struct cnfpara
uchar qfrenamebuf[1024];
DEFiRet;
- /* switch the message object to threaded operation, if necessary */
- if(queueParams != NULL || ourConf->globals.mainQ.MainMsgQueType == QUEUETYPE_DIRECT || ourConf->globals.mainQ.iMainMsgQueueNumWorkers > 1) {
- MsgEnableThreadSafety();
- }
-
/* create message queue */
CHKiRet_Hdlr(qqueueConstruct(ppQueue, ourConf->globals.mainQ.MainMsgQueType, ourConf->globals.mainQ.iMainMsgQueueNumWorkers, ourConf->globals.mainQ.iMainMsgQueueSize, msgConsumer)) {
/* no queue is fatal, we need to give up in that case... */