diff options
-rw-r--r-- | ChangeLog | 9 | ||||
-rw-r--r-- | action.c | 31 | ||||
-rw-r--r-- | configure.ac | 50 | ||||
-rw-r--r-- | doc/queues.html | 7 | ||||
-rw-r--r-- | runtime/atomic.h | 22 | ||||
-rw-r--r-- | runtime/debug.c | 22 | ||||
-rw-r--r-- | runtime/debug.h | 3 | ||||
-rw-r--r-- | runtime/msg.c | 4 | ||||
-rw-r--r-- | runtime/queue.c | 12 | ||||
-rw-r--r-- | runtime/srutils.c | 4 | ||||
-rw-r--r-- | runtime/wtp.c | 2 | ||||
-rw-r--r-- | threads.c | 2 | ||||
-rw-r--r-- | tools/Makefile.am | 3 | ||||
-rw-r--r-- | tools/msggen.c | 38 | ||||
-rw-r--r-- | tools/syslogd.c | 15 |
15 files changed, 188 insertions, 36 deletions
@@ -11,6 +11,15 @@ Version 3.21.5 [DEVEL] (rgerhards), 2008-09-?? each input module (else it is blank). - added system property "$myhostname", which contains the name of the local host as it knows itself. +- doc bugfix: queue doc had wrong parameter name for setting controlling + worker thread shutdown period +- re-enabled gcc builtin atomic operations and added a proper + ./configure check +- bugfix: potential race condition when adding messages to queue + There was a wrong order of mutex lock operations. It is hard to + believe that really caused problems, but in theory it could and with + threading we often see that theory becomes practice if something is only + used long enough on a fast enough machine with enough CPUs ;) --------------------------------------------------------------------------- Version 3.21.4 [DEVEL] (rgerhards), 2008-09-04 - removed compile time fixed message size limit (was 2K), limit can now @@ -625,14 +625,43 @@ actionWriteToAction(action_t *pAction) dbgprintf("action not yet ready again to be executed, onceInterval %d, tCurr %d, tNext %d\n", (int) pAction->iSecsExecOnceInterval, (int) getActNow(pAction), (int) (pAction->iSecsExecOnceInterval + pAction->tLastExec)); + /* TODO: the time call below may use reception time, not dequeue time - under consideration. -- rgerhards, 2008-09-17 */ + pAction->tLastExec = getActNow(pAction); /* re-init time flags */ FINALIZE; } - pAction->f_time = pAction->tLastExec = getActNow(pAction); /* re-init time flags */ + + + /* TODO: move this to msg object or some other object. This is just for quick testing! + * ALSO, THIS DOES NOT YET WORK PROPERLY! + * The reason is that we do not know the DST status, which is major pain. I need to + * think about obtaining this information (or the actual Unix timestamp) when I + * create the reception timestamp, but that also means I need to preserve that information + * while in the on-disk queue. Also need to think about a few other implications. + * rgerhards, 2008-09-17 + */ + { + struct tm tTm; + tTm.tm_sec = pAction->f_pMsg->tRcvdAt.second; + tTm.tm_min = pAction->f_pMsg->tRcvdAt.minute; + tTm.tm_hour = pAction->f_pMsg->tRcvdAt.hour; + tTm.tm_mday = pAction->f_pMsg->tRcvdAt.day; + tTm.tm_mon = pAction->f_pMsg->tRcvdAt.month - 1; + tTm.tm_year = pAction->f_pMsg->tRcvdAt.year - 1900; + /********************************************************************************/ + tTm.tm_isdst = 1; /* TODO THIS IS JUST VALID FOR THE NEXT FEW DAYS ;) TODO */ + /********************************************************************************/ + pAction->f_time = mktime(&tTm); +dbgprintf("XXXX create our own timestamp: %ld, system time is %ld\n", pAction->f_time, time(NULL)); + } + + //pAction->f_time = getActNow(pAction); /* re-init time flags */ /* Note: tLastExec could be set in the if block above, but f_time causes us a hard time * so far, I do not see a solution to getting rid of it. -- rgerhards, 2008-09-16 */ + + /* When we reach this point, we have a valid, non-disabled action. * So let's enqueue our message for execution. -- rgerhards, 2007-07-24 */ diff --git a/configure.ac b/configure.ac index 8e9f7807..fea7c063 100644 --- a/configure.ac +++ b/configure.ac @@ -107,6 +107,56 @@ AC_TRY_COMPILE([ AC_MSG_RESULT(no; defined as 64) ) +# check for availability of atomic operations +# rgerhards, 2008-09-18, added based on +# http://svn.apache.org/repos/asf/apr/apr/trunk/configure.in + +AC_CACHE_CHECK([whether the compiler provides atomic builtins], [ap_cv_atomic_builtins], +[AC_TRY_RUN([ +int main() +{ + unsigned long val = 1010, tmp, *mem = &val; + + if (__sync_fetch_and_add(&val, 1010) != 1010 || val != 2020) + return 1; + + tmp = val; + + if (__sync_fetch_and_sub(mem, 1010) != tmp || val != 1010) + return 1; + + if (__sync_sub_and_fetch(&val, 1010) != 0 || val != 0) + return 1; + + tmp = 3030; + + if (__sync_val_compare_and_swap(mem, 0, tmp) != 0 || val != tmp) + return 1; + + if (__sync_lock_test_and_set(&val, 4040) != 3030) + return 1; + + mem = &tmp; + + if (__sync_val_compare_and_swap(&mem, &tmp, &val) != &tmp) + return 1; + + __sync_synchronize(); + + if (mem != &val) + return 1; + + return 0; +}], [ap_cv_atomic_builtins=yes], [ap_cv_atomic_builtins=no], [ap_cv_atomic_builtins=no])]) + +if test "$ap_cv_atomic_builtins" = "yes"; then + AC_DEFINE(HAVE_ATOMIC_BUILTINS, 1, [Define if compiler provides atomic builtins]) +fi + + + + + # Large file support AC_ARG_ENABLE(largefile, [AS_HELP_STRING([--enable-largefile],[Enable large file support @<:@default=yes@:>@])], diff --git a/doc/queues.html b/doc/queues.html index a2074d36..7461121b 100644 --- a/doc/queues.html +++ b/doc/queues.html @@ -219,11 +219,12 @@ parall. Thus, the upper limit ca be set via "<i>$<object>QueueWorkerThread If it, for example, is set to four, no more than four workers will ever be started, no matter how many elements are enqueued. </p> <p>Worker threads that have been started are kept running until an inactivity -timeout happens. The timeout can be set via "<i>$<object>QueueWorkerTimeoutShutdown</i>" +timeout happens. The timeout can be set via "<i>$<object>QueueWorkerTimeoutThreadShutdown</i>" and is specified in milliseconds. If you do not like to keep the workers running, simply set it to 0, which means immediate timeout and thus immediate shutdown. But consider that creating threads involves some overhead, and this is -why we keep them running.</p> +why we keep them running. If you would like to never shutdown any worker +threads, specify -1 for this parameter.</p> <h2>Discarding Messages</h2> <p>If the queue reaches the so called "discard watermark" (a number of queued elements), less important messages can automatically be discarded. This is in an @@ -357,4 +358,4 @@ parameters, because not all are applicable. For example, in current output module design, actions do not support multi-threading. Consequently, the number of worker threads is fixed to one for action queues and can not be changed.</p> -</body></html>
\ No newline at end of file +</body></html> diff --git a/runtime/atomic.h b/runtime/atomic.h index 430ae7f0..d15f78ee 100644 --- a/runtime/atomic.h +++ b/runtime/atomic.h @@ -1,6 +1,6 @@ /* This header supplies atomic operations. So far, we rely on GCC's - * atomic builtins. I have no idea if we can check them via autotools, - * but I am making the necessary provisioning to live without them if + * atomic builtins. During configure, we check if atomic operatons are + * available. If they are not, I am making the necessary provisioning to live without them if * they are not available. Please note that you should only use the macros * here if you think you can actually live WITHOUT an explicit atomic operation, * because in the non-presence of them, we simply do it without atomicitiy. @@ -36,16 +36,20 @@ #ifndef INCLUDED_ATOMIC_H #define INCLUDED_ATOMIC_H -/* set the following to 1 if we have atomic operations (and #undef it otherwise) */ -/* #define DO_HAVE_ATOMICS 1 */ /* for this release, we disable atomic calls because there seem to be some * portability problems and we can not fix that without destabilizing the build. * They simply came in too late. -- rgerhards, 2008-04-02 */ -/* make sure they are not used! -#define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&data, 1)) -#define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&data, 1) -*/ -#define ATOMIC_INC(data) (++(data)) +#ifdef HAVE_ATOMIC_BUILTINS +# define ATOMIC_INC(data) ((void) __sync_fetch_and_add(&(data), 1)) +# define ATOMIC_DEC_AND_FETCH(data) __sync_sub_and_fetch(&(data), 1) +# define ATOMIC_FETCH_32BIT(data) ((unsigned) __sync_fetch_and_and(&(data), 0xffffffff)) +# define ATOMIC_STORE_1_TO_32BIT(data) __sync_lock_test_and_set(&(data), 1) +#else +# warning "atomic builtins not available, using nul operations" +# define ATOMIC_INC(data) (++(data)) +# define ATOMIC_FETCH_32BIT(data) (data) +# define ATOMIC_STORE_1_TO_32BIT(data) (data) = 1 +#endif #endif /* #ifndef INCLUDED_ATOMIC_H */ diff --git a/runtime/debug.c b/runtime/debug.c index 1450d029..8d7b0084 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -1,3 +1,4 @@ +#include <sys/syscall.h> /* debug.c * * This file proides debug and run time error analysis support. Some of the @@ -480,7 +481,23 @@ static inline void dbgMutexUnlockLog(pthread_mutex_t *pmut, dbgFuncDB_t *pFuncDB pthread_mutex_lock(&mutMutLog); pLog = dbgMutLogFindSpecific(pmut, MUTOP_LOCK, NULL); +#if 0 /* toggle for testing */ assert(pLog != NULL); +#else +/* the change below seems not to work - the problem seems to be a real race... I keep this code in just in case + * I need to re-use it. It should be removed once we are finished analyzing this problem. -- rgerhards, 2008-09-17 + */ +if(pLog == NULL) { + /* this may happen due to some races. We do not try to avoid + * this, as it would complicate the "real" code. This is not justified + * just to keep the debug info system up. -- rgerhards, 2008-09-17 + */ + pthread_mutex_unlock(&mutMutLog); + dbgprintf("%s:%d:%s: mutex %p UNlocked [but we did not yet know this mutex!]\n", + pFuncDB->file, unlockLn, pFuncDB->func, (void*)pmut); + return; /* if we don't know it yet, we can not clean up... */ +} +#endif /* we found the last lock entry. We now need to see from which FuncDB we need to * remove it. This is recorded inside the mutex log entry. @@ -879,7 +896,7 @@ dbgprintf(char *fmt, ...) } /* do not cache the thread name, as the caller might have changed it - * TODO: optimized, invalidate cache when new name is set + * TODO: optimize, invalidate cache when new name is set */ dbgGetThrdName(pszThrdName, sizeof(pszThrdName), ptLastThrdID, 0); @@ -889,7 +906,8 @@ dbgprintf(char *fmt, ...) if(stddbg != NULL) fprintf(stddbg, "%4.4ld.%9.9ld:", (long) (t.tv_sec % 10000), t.tv_nsec); if(altdbg != NULL) fprintf(altdbg, "%4.4ld.%9.9ld:", (long) (t.tv_sec % 10000), t.tv_nsec); } - if(stddbg != NULL) fprintf(stddbg, "%s: ", pszThrdName); + if(stddbg != NULL) fprintf(stddbg, "{%ld}%s: ", (long) syscall(SYS_gettid), pszThrdName); + /*if(stddbg != NULL) fprintf(stddbg, "%s: ", pszThrdName);*/ if(altdbg != NULL) fprintf(altdbg, "%s: ", pszThrdName); } bWasNL = (*(fmt + strlen(fmt) - 1) == '\n') ? 1 : 0; diff --git a/runtime/debug.h b/runtime/debug.h index 214b7c05..d9d576b5 100644 --- a/runtime/debug.h +++ b/runtime/debug.h @@ -130,7 +130,8 @@ void dbgPrintAllDebugInfo(void); /* debug aides */ -#ifdef RTINST +//#ifdef RTINST +#if 0 // temporarily removed for helgrind #define d_pthread_mutex_lock(x) dbgMutexLock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) #define d_pthread_mutex_unlock(x) dbgMutexUnlock(x, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) #define d_pthread_cond_wait(cond, mut) dbgCondWait(cond, mut, pdbgFuncDB, __LINE__, dbgCALLStaCK_POP_POINT ) diff --git a/runtime/msg.c b/runtime/msg.c index f4eb9414..164c3517 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -140,8 +140,8 @@ void (*funcMsgPrepareEnqueue)(msg_t *pMsg); #define MsgLock(pMsg) funcLock(pMsg) #define MsgUnlock(pMsg) funcUnlock(pMsg) #else -#define MsgLock(pMsg) {dbgprintf("line %d\n - ", __LINE__); funcLock(pMsg);; } -#define MsgUnlock(pMsg) {dbgprintf("line %d - ", __LINE__); funcUnlock(pMsg); } +#define MsgLock(pMsg) {dbgprintf("MsgLock line %d\n - ", __LINE__); funcLock(pMsg);; } +#define MsgUnlock(pMsg) {dbgprintf("MsgUnlock line %d - ", __LINE__); funcUnlock(pMsg); } #endif /* the next function is a dummy to be used by the looking functions diff --git a/runtime/queue.c b/runtime/queue.c index 7e7d4152..c0a37019 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -2171,17 +2171,17 @@ queueEnqObj(queue_t *pThis, flowControl_t flowCtlType, void *pUsr) finalize_it: if(pThis->qType != QUEUETYPE_DIRECT) { - d_pthread_mutex_unlock(pThis->mut); + /* make sure at least one worker is running. */ + if(pThis->qType != QUEUETYPE_DIRECT) { + queueAdviseMaxWorkers(pThis); + } + /* and release the mutex */ i = pthread_cond_signal(&pThis->notEmpty); + d_pthread_mutex_unlock(pThis->mut); dbgoprint((obj_t*) pThis, "EnqueueMsg signaled condition (%d)\n", i); pthread_setcancelstate(iCancelStateSave, NULL); } - /* make sure at least one worker is running. */ - if(pThis->qType != QUEUETYPE_DIRECT) { - queueAdviseMaxWorkers(pThis); - } - RETiRet; } diff --git a/runtime/srutils.c b/runtime/srutils.c index 97cc3252..1280e40d 100644 --- a/runtime/srutils.c +++ b/runtime/srutils.c @@ -371,6 +371,7 @@ int getNumberDigits(long lNum) rsRetVal timeoutComp(struct timespec *pt, long iTimeout) { + BEGINfunc assert(pt != NULL); /* compute timeout */ clock_gettime(CLOCK_REALTIME, pt); @@ -379,6 +380,7 @@ timeoutComp(struct timespec *pt, long iTimeout) pt->tv_nsec -= 1000000000; } pt->tv_sec += iTimeout / 1000; + ENDfunc return RS_RET_OK; /* so far, this is static... */ } @@ -393,6 +395,7 @@ timeoutVal(struct timespec *pt) { struct timespec t; long iTimeout; + BEGINfunc assert(pt != NULL); /* compute timeout */ @@ -403,6 +406,7 @@ timeoutVal(struct timespec *pt) if(iTimeout < 0) iTimeout = 0; + ENDfunc return iTimeout; } diff --git a/runtime/wtp.c b/runtime/wtp.c index 8b041ea2..ee9fc765 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -453,7 +453,7 @@ wtpStartWrkr(wtp_t *pThis, int bLockMutex) ISOBJ_TYPE_assert(pThis, wtp); - wtpProcessThrdChanges(pThis); + wtpProcessThrdChanges(pThis); // TODO: Performance: this causes a lot of FUTEX calls BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex); @@ -127,7 +127,7 @@ static void* thrdStarter(void *arg) assert(pThis != NULL); assert(pThis->pUsrThrdMain != NULL); - /* block all signalsi */ + /* block all signals */ sigset_t sigSet; sigfillset(&sigSet); pthread_sigmask(SIG_BLOCK, &sigSet, NULL); diff --git a/tools/Makefile.am b/tools/Makefile.am index a265af9c..b66b8e0c 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -26,8 +26,9 @@ rsyslogd_LDADD = $(zlib_libs) $(pthreads_libs) $(rsrt_libs) rsyslogd_LDFLAGS = -export-dynamic if ENABLE_DIAGTOOLS -sbin_PROGRAMS += rsyslog_diag_hostname +sbin_PROGRAMS += rsyslog_diag_hostname msggen rsyslog_diag_hostname_SOURCES = gethostn.c +msggen_SOURCES = msggen.c endif EXTRA_DIST = $(man_MANS) diff --git a/tools/msggen.c b/tools/msggen.c new file mode 100644 index 00000000..7990a3c8 --- /dev/null +++ b/tools/msggen.c @@ -0,0 +1,38 @@ +/* msggen - a small diagnostic utility that does very quick + * syslog() calls. + * + * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of rsyslog. + * + * Rsyslog is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Rsyslog is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Rsyslog. If not, see <http://www.gnu.org/licenses/>. + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + */ + +#include <stdio.h> +#include <syslog.h> + +int main(int argc, char *argv[]) +{ + int i; + + openlog("msggen", 0 , LOG_LOCAL0); + + for(i = 0 ; i < 10 ; ++i) + syslog(LOG_NOTICE, "This is message number %d", i); + + closelog(); + return 0; +} diff --git a/tools/syslogd.c b/tools/syslogd.c index b6e1d826..e95d02c2 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -1928,7 +1928,7 @@ die(int sig) /* close the inputs */ dbgprintf("Terminating input threads...\n"); - thrdTerminateAll(); /* TODO: inputs only, please */ + thrdTerminateAll(); /* and THEN send the termination log message (see long comment above) */ if (sig) { @@ -2168,8 +2168,8 @@ static void dbgPrintInitInfo(void) cCCEscapeChar); dbgprintf("Main queue size %d messages.\n", iMainMsgQueueSize); - dbgprintf("Main queue worker threads: %d, Perists every %d updates.\n", - iMainMsgQueueNumWorkers, iMainMsgQPersistUpdCnt); + dbgprintf("Main queue worker threads: %d, wThread shutdown: %d, Perists every %d updates.\n", + iMainMsgQueueNumWorkers, iMainMsgQtoWrkShutdown, iMainMsgQPersistUpdCnt); dbgprintf("Main queue timeouts: shutdown: %d, action completion shutdown: %d, enq: %d\n", iMainMsgQtoQShutdown, iMainMsgQtoActShutdown, iMainMsgQtoEnq); dbgprintf("Main queue watermarks: high: %d, low: %d, discard: %d, discard-severity: %d\n", @@ -2179,11 +2179,9 @@ static void dbgPrintInitInfo(void) /* TODO: add iActionRetryCount = 0; iActionRetryInterval = 30000; - static int iMainMsgQtoWrkShutdown = 60000; static int iMainMsgQtoWrkMinMsgs = 100; static int iMainMsgQbSaveOnShutdown = 1; iMainMsgQueMaxDiskSpace = 0; - setQPROP(queueSettoWrkShutdown, "$MainMsgQueueTimeoutWorkerThreadShutdown", 5000); setQPROP(queueSetiMinMsgsPerWrkr, "$MainMsgQueueWorkerThreadMinimumMessages", 100); setQPROP(queueSetbSaveOnShutdown, "$MainMsgQueueSaveOnShutdown", 1); */ @@ -2832,9 +2830,8 @@ static rsRetVal mainThread() CHKiRet(init()); - if(Debug) { + if(Debug && debugging_on) { dbgprintf("Debugging enabled, SIGUSR1 to turn off debugging.\n"); - debugging_on = 1; } /* Send a signal to the parent so it can terminate. */ @@ -3082,9 +3079,9 @@ doGlblProcessInit(void) fputs(" Already running.\n", stderr); exit(1); /* "good" exit, done if syslogd is already running */ } - } - else + } else { debugging_on = 1; + } /* tuck my process id away */ dbgprintf("Writing pidfile %s.\n", PidFile); |