diff options
46 files changed, 1655 insertions, 168 deletions
@@ -26,6 +26,7 @@ missing compile rsyslogd rsyslog.service +ylwrap *.orig rg.conf* *.swp @@ -1,4 +1,47 @@ --------------------------------------------------------------------------- +<<<<<<< HEAD +Version 6.5.0 [devel] 2012-0?-?? +- imrelp now supports non-cancel thread termination + (but now requires at least librelp 1.0.1) +- added --enable-debugless configure option for very high demanding envs + This actually at compile time disables a lot of debug code, resulting + in some speedup (but serious loss of debugging capabilities) +- added new 0mq plugins (via czmq lib) + Thanks to David Kelly for contributing these modules +--------------------------------------------------------------------------- +Version 6.3.11 [BETA] 2012-06-?? +- bugfix: expression-based filters with AND/OR could segfault + due to a problem with boolean shortcut operations. From the user's + perspective, the segfault is almost non-deterministic (it occurs when + a shortcut is used). + Thanks to Lars Peterson for providing the initial bug report and his + support in solving it. +- bugfix: "last message repeated n times" message was missing hostname + Thanks to Zdenek Salvet for finding this bug and to Bodik for reporting +--------------------------------------------------------------------------- +Version 6.3.10 [BETA] 2012-06-04 +- bugfix: delayble source could block action queue, even if there was + a disk queue associated with it. The root cause of this problem was + that it makes no sense to delay messages once they arrive in the + action queue - the "input" that is being held in that case is the main + queue worker, what makes no sense. + Thanks to Marcin for alerting us on this problem and providing + instructions to reproduce it. +- bugfix: invalid free in imptcp could lead to abort during startup +- bugfix: if debug message could end up in log file when forking + if rsyslog was set to auto-background (thus fork, the default) and debug + mode to stdout was enabled, debug messages ended up in the first log file + opened. Currently, stdout logging is completely disabled in forking mode + (but writing to the debug log file is still possible). This is a change + in behaviour, which is under review. If it causes problems to you, + please let us know. + Thanks to Tomas Heinrich for the patch. +- bugfix: --enable-smcustbindcdr configure directive did not work + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=330 + Thanks to Ultrabug for the patch. +- bugfix: made rsyslog compile when libestr ist not installed in /usr + Thanks to Miloslav Trmač for providing patches and suggestions +--------------------------------------------------------------------------- Version 6.3.9 [BETA] 2012-05-22 - bugfix: imtcp could cause hang during reception this also applied to other users of core file tcpsrv.c, but imtcp was @@ -171,6 +214,28 @@ Version 6.2.2 [v6-stable], 2012-05-?? full delayable sources somewhat smarter and permits, assuming sufficient timouts, to persist message up to the max queue capacity. Also some nits in debug instrumentation have been fixed. +- bugfix: --enable-smcustbindcdr configure directive did not work + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=330 + Thanks to Ultrabug for the patch. +- add small delay (50ms) after sending shutdown message + There seem to be cases where the shutdown message is otherwise not + processed, not even on an idle system. Thanks to Marcin for + bringing this problem up. +- support for resolving huge groups + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=310 + Thanks to Alec Warner for the patch +- bugfix: potential hang due to mutex deadlock + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=316 + Thanks to Andreas Piesk for reporting&analyzing this bug as well as + providing patches and other help in resolving it. +- bugfix: property PROCID empty instead of proper nilvalue if not present + If it is not present, it must have the nilvalue "-" as of RFC5424 + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=332 + Thanks to John N for reporting this issue. +- bugfix: did not compile under solaris due to $uptime property code + For the time being, $uptime is not supported on Solaris +- bugfix: "last message repeated n times" message was missing hostname + Thanks to Zdenek Salvet for finding this bug and to Bodik for reporting --------------------------------------------------------------------------- Version 6.2.1 [v6-stable], 2012-05-10 - change plugin config interface to be compatible with pre-v6.2 system @@ -442,7 +507,52 @@ expected that interfaces, even new ones, break during the initial syslog plain tcp input plugin (NOT supporting TLS!) [ported from v4] --------------------------------------------------------------------------- -Version 5.9.7 [V5-BETA], 2012-05-?? +======= +>>>>>>> 0620fc5780fabdfb160524197fd4aa6bb22613e9 +Version 5.9.8 [V5-BETA], 2012-05-?? +- bugfix: delayble source could block action queue, even if there was + a disk queue associated with it. The root cause of this problem was + that it makes no sense to delay messages once they arrive in the + action queue - the "input" that is being held in that case is the main + queue worker, what makes no sense. + Thanks to Marcin for alerting us on this problem and providing + instructions to reproduce it. +- bugfix: disk queue was not persisted on shutdown, regression of fix to + http://bugzilla.adiscon.com/show_bug.cgi?id=299 + The new code also handles the case of shutdown of blocking light and + full delayable sources somewhat smarter and permits, assuming sufficient + timouts, to persist message up to the max queue capacity. Also some nits + in debug instrumentation have been fixed. +- add small delay (50ms) after sending shutdown message + There seem to be cases where the shutdown message is otherwise not + processed, not even on an idle system. Thanks to Marcin for + bringing this problem up. +- support for resolving huge groups + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=310 + Thanks to Alec Warner for the patch +- bugfix: potential hang due to mutex deadlock + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=316 + Thanks to Andreas Piesk for reporting&analyzing this bug as well as + providing patches and other help in resolving it. +- bugfix: property PROCID empty instead of proper nilvalue if not present + If it is not present, it must have the nilvalue "-" as of RFC5424 + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=332 + Thanks to John N for reporting this issue. +- bugfix: "last message repeated n times" message was missing hostname + Thanks to Zdenek Salvet for finding this bug and to Bodik for reporting +--------------------------------------------------------------------------- +Version 5.9.7 [V5-BETA], 2012-05-10 +- added capability to specify substrings for field extraction mode +- bugfix: ommysql did not properly init/exit the mysql runtime library + this could lead to segfaults. Triggering condition: multiple action + instances using ommysql. Thanks to Tomas Heinrich for reporting this + problem and providing an initial patch (which my solution is based on, + I need to add more code to clean the mess up). +- bugfix: rsyslog did not terminate when delayable inputs were blocked + due to unvailable sources. Fixes: + http://bugzilla.adiscon.com/show_bug.cgi?id=299 + Thanks to Marcin M for bringing up this problem and Andre Lorbach + for helping to reproduce and fix it. - bugfix/tcpflood: sending small test files did not work correctly --------------------------------------------------------------------------- Version 5.9.6 [V5-BETA], 2012-04-12 @@ -580,14 +690,51 @@ Version 5.9.0 [V5-DEVEL] (rgerhards), 2011-06-08 affected directive was: $ActionExecOnlyWhenPreviousIsSuspended on closes: http://bugzilla.adiscon.com/show_bug.cgi?id=236 --------------------------------------------------------------------------- -Version 5.8.12 [V5-stable] 2012-05-?? +Version 5.8.13 [V5-stable] 2012-06-?? +- bugfix: "last message repeated n times" message was missing hostname + Thanks to Zdenek Salvet for finding this bug and to Bodik for reporting +--------------------------------------------------------------------------- +Version 5.8.12 [V5-stable] 2012-06-06 +- add small delay (50ms) after sending shutdown message + There seem to be cases where the shutdown message is otherwise not + processed, not even on an idle system. Thanks to Marcin for + bringing this problem up. +- support for resolving huge groups + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=310 + Thanks to Alec Warner for the patch +- bugfix: delayble source could block action queue, even if there was + a disk queue associated with it. The root cause of this problem was + that it makes no sense to delay messages once they arrive in the + action queue - the "input" that is being held in that case is the main + queue worker, what makes no sense. + Thanks to Marcin for alerting us on this problem and providing + instructions to reproduce it. - bugfix: disk queue was not persisted on shutdown, regression of fix to http://bugzilla.adiscon.com/show_bug.cgi?id=299 The new code also handles the case of shutdown of blocking light and full delayable sources somewhat smarter and permits, assuming sufficient timouts, to persist message up to the max queue capacity. Also some nits in debug instrumentation have been fixed. +- bugfix/omudpspoof: problems, including abort, happend when run on + multiple threads. Root cause is that libnet is not thread-safe. + omudpspoof now guards libnet calls with their own mutex. +- bugfix: if debug message could end up in log file when forking + if rsyslog was set to auto-background (thus fork, the default) and debug + mode to stdout was enabled, debug messages ended up in the first log file + opened. Currently, stdout logging is completely disabled in forking mode + (but writing to the debug log file is still possible). This is a change + in behaviour, which is under review. If it causes problems to you, + please let us know. + Thanks to Tomas Heinrich for the patch. - bugfix/tcpflood: sending small test files did not work correctly +- bugfix: potential hang due to mutex deadlock + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=316 + Thanks to Andreas Piesk for reporting&analyzing this bug as well as + providing patches and other help in resolving it. +- bugfix: property PROCID empty instead of proper nilvalue if not present + If it is not present, it must have the nilvalue "-" as of RFC5424 + closes: http://bugzilla.adiscon.com/show_bug.cgi?id=332 + Thanks to John N for reporting this issue. --------------------------------------------------------------------------- Version 5.8.11 [V5-stable] 2012-05-03 - bugfix: ommysql did not properly init/exit the mysql runtime library diff --git a/Makefile.am b/Makefile.am index 999404e7..4cefb756 100644 --- a/Makefile.am +++ b/Makefile.am @@ -158,6 +158,14 @@ if ENABLE_OMHIREDIS SUBDIRS += plugins/omhiredis endif +if ENABLE_OMZMQ3 +SUBDIRS += plugins/omzmq3 +endif + +if ENABLE_IMZMQ3 +SUBDIRS += plugins/imzmq3 +endif + if ENABLE_OMUXSOCK SUBDIRS += plugins/omuxsock endif @@ -262,7 +262,7 @@ actionResetQueueParams(void) cs.bActionQSyncQeueFiles = 0; cs.iActionQtoQShutdown = 0; /* queue shutdown */ cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ - cs.iActionQtoEnq = 2000; /* timeout for queue enque */ + cs.iActionQtoEnq = 50; /* timeout for queue enque */ cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ @@ -1350,7 +1350,7 @@ doSubmitToActionQ(action_t *pAction, msg_t *pMsg) if(pAction->pQueue->qType == QUEUETYPE_DIRECT) iRet = qqueueEnqObjDirect(pAction->pQueue, (void*) MsgAddRef(pMsg)); else - iRet = qqueueEnqObj(pAction->pQueue, pMsg->flowCtlType, (void*) MsgAddRef(pMsg)); + iRet = qqueueEnqObj(pAction->pQueue, eFLOWCTL_NO_DELAY, (void*) MsgAddRef(pMsg)); RETiRet; } diff --git a/configure.ac b/configure.ac index 0c6fe35c..589b1715 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ(2.61) -AC_INIT([rsyslog],[6.3.9],[rsyslog@lists.adiscon.com]) +AC_INIT([rsyslog],[6.5.0],[rsyslog@lists.adiscon.com]) AM_INIT_AUTOMAKE m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])]) @@ -445,6 +445,22 @@ if test "$enable_rtinst" = "yes"; then fi +# total debugless: highest performance, but no way at all to enable debug +# logging +AC_ARG_ENABLE(debugless, + [AS_HELP_STRING([--enable-debugless],[Enable runtime instrumentation mode @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_debugless="yes" ;; + no) enable_debugless="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-debugless) ;; + esac], + [enable_debugless="no"] +) +if test "$enable_debugless" = "yes"; then + AC_DEFINE(DEBUGLESS, 1, [Defined if debugless mode is enabled.]) +fi + + # valgrind AC_ARG_ENABLE(valgrind, [AS_HELP_STRING([--enable-valgrind],[Enable valgrind support settings @<:@default=no@:>@])], @@ -752,11 +768,15 @@ AC_ARG_ENABLE(rsyslogrt, [enable_rsyslogrt=yes] ) if test "x$enable_rsyslogrt" = "xyes"; then - RSRT_CFLAGS="-I\$(top_srcdir)/runtime -I\$(top_srcdir) -I\$(top_srcdir)/grammar" - RSRT_LIBS="\$(top_builddir)/runtime/librsyslog.la" + RSRT_CFLAGS1="-I\$(top_srcdir)/runtime -I\$(top_srcdir) -I\$(top_srcdir)/grammar" + RSRT_LIBS1="\$(top_builddir)/runtime/librsyslog.la" #??CNF_LIBS="\$(top_builddir)/grammar/libgrammar.la" fi AM_CONDITIONAL(ENABLE_RSYSLOGRT, test x$enable_rsyslogrt = xyes) +RSRT_CFLAGS="\$(RSRT_CFLAGS1) \$(LIBESTR_CFLAGS)" +RSRT_LIBS="\$(RSRT_LIBS1) \$(LIBESTR_LIBS)" +AC_SUBST(RSRT_CFLAGS1) +AC_SUBST(RSRT_LIBS1) AC_SUBST(RSRT_CFLAGS) AC_SUBST(RSRT_LIBS) @@ -900,7 +920,7 @@ AC_ARG_ENABLE(relp, [enable_relp=no] ) if test "x$enable_relp" = "xyes"; then - PKG_CHECK_MODULES(RELP, relp >= 0.1.1) + PKG_CHECK_MODULES(RELP, relp >= 1.0.1) fi AM_CONDITIONAL(ENABLE_RELP, test x$enable_relp = xyes) AC_SUBST(RELP_CFLAGS) @@ -1188,16 +1208,16 @@ AM_CONDITIONAL(ENABLE_CUST1, test x$enable_cust1 = xyes) # A custom strgen that also serves as a sample of how to do # SQL-generating strgen's -AC_ARG_ENABLE(smcustbindcdr, - [AS_HELP_STRING([--enable-smcustbindcdr],[Compiles smcustbindcdr module @<:@default=no@:>@])], +AC_ARG_ENABLE(sm_cust_bindcdr, + [AS_HELP_STRING([--enable-sm_cust_bindcdr],[Compiles sm_cust_bindcdr module @<:@default=no@:>@])], [case "${enableval}" in - yes) enable_smcustbindcdr="yes" ;; - no) enable_smcustbindcdr="no" ;; - *) AC_MSG_ERROR(bad value ${enableval} for --enable-smcustbindcdr) ;; + yes) enable_sm_cust_bindcdr="yes" ;; + no) enable_sm_cust_bindcdr="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-sm_cust_bindcdr) ;; esac], - [enable_smcustbindcdr=no] + [enable_sm_cust_bindcdr=no] ) -AM_CONDITIONAL(ENABLE_SMCUSTBINDCDR, test x$enable_smcustbindcdr = xyes) +AM_CONDITIONAL(ENABLE_SMCUSTBINDCDR, test x$enable_sm_cust_bindcdr = xyes) # settings for mmsnmptrapd message modification module @@ -1253,6 +1273,44 @@ fi AM_CONDITIONAL(ENABLE_OMMONGODB, test x$enable_ommongodb = xyes) # end of mongodb code +# BEGIN ZMQ3 INPUT SUPPORT +AC_ARG_ENABLE(imzmq3, + [AS_HELP_STRING([--enable-imzmq3],[Compiles imzmq3 output module @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_imzmq3="yes" ;; + no) enable_imzmq3="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-imzmq3) ;; + esac], + [enable_imzmq3=no] +) +if test "x$enable_imzmq3" = "xyes"; then + PKG_CHECK_MODULES(CZMQ, libczmq >= 1.1.0) + AC_SUBST(CZMQ_CFLAGS) + AC_SUBST(CZMQ_LIBS) +fi +AM_CONDITIONAL(ENABLE_IMZMQ3, test x$enable_imzmq3 = xyes) + +# END ZMQ3 INPUT SUPPORT + +# BEGIN ZMQ3 OUTPUT SUPPORT +AC_ARG_ENABLE(omzmq3, + [AS_HELP_STRING([--enable-omzmq3],[Compiles omzmq3 output module @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_omzmq3="yes" ;; + no) enable_omzmq3="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-omzmq3) ;; + esac], + [enable_omzmq3=no] +) +if test "x$enable_omzmq3" = "xyes"; then + PKG_CHECK_MODULES(CZMQ, libczmq >= 1.1.0) + AC_SUBST(CZMQ_CFLAGS) + AC_SUBST(CZMQ_LIBS) +fi +AM_CONDITIONAL(ENABLE_OMZMQ3, test x$enable_omzmq3 = xyes) + +# END ZMQ3 SUPPORT + # HIREDIS SUPPORT AC_ARG_ENABLE(omhiredis, @@ -1303,6 +1361,7 @@ AC_CONFIG_FILES([Makefile \ plugins/impstats/Makefile \ plugins/imrelp/Makefile \ plugins/imdiag/Makefile \ + plugins/imzmq3/Makefile \ plugins/omtesting/Makefile \ plugins/omgssapi/Makefile \ plugins/ommysql/Makefile \ @@ -1315,6 +1374,7 @@ AC_CONFIG_FILES([Makefile \ plugins/omudpspoof/Makefile \ plugins/ommongodb/Makefile \ plugins/omhiredis/Makefile \ + plugins/omzmq3/Makefile \ plugins/mmnormalize/Makefile \ plugins/mmjsonparse/Makefile \ plugins/mmaudit/Makefile \ @@ -1347,6 +1407,7 @@ echo " imdiag enabled: $enable_imdiag" echo " file input module enabled: $enable_imfile" echo " Solaris input module enabled: $enable_imsolaris" echo " periodic statistics module enabled: $enable_impstats" +echo " imzmq3 input module enabled: $enable_imzmq3" echo echo "---{ output plugins }---" echo " Mail support enabled: $enable_mail" @@ -1357,6 +1418,7 @@ echo " omelasticsearch module will be compiled: $enable_elasticsearch" echo " omruleset module will be compiled: $enable_omruleset" echo " omudpspoof module will be compiled: $enable_omudpspoof" echo " omuxsock module will be compiled: $enable_omuxsock" +echo " omzmq3 module will be compiled: $enable_omzmq3" echo echo "---{ parser modules }---" echo " pmrfc3164sd module will be compiled: $enable_pmrfc3164sd" @@ -1394,6 +1456,7 @@ echo " Extended Testbench enabled: $enable_extended_tests" echo " MySQL Tests enabled: $enable_mysql_tests" echo " Debug mode enabled: $enable_debug" echo " Runtime Instrumentation enabled: $enable_rtinst" +echo " (total) debugless mode enabled: $enable_debugless" echo " Diagnostic tools enabled: $enable_diagtools" echo " Enhanced memory checking enabled: $enable_memcheck" echo " Valgrind support settings enabled: $enable_valgrind" diff --git a/doc/manual.html b/doc/manual.html index c9021e52..7af5c5eb 100644 --- a/doc/manual.html +++ b/doc/manual.html @@ -19,7 +19,7 @@ rsyslog support</a> available directly from the source!</p> <p><b>Please visit the <a href="http://www.rsyslog.com/sponsors">rsyslog sponsor's page</a> to honor the project sponsors or become one yourself!</b> We are very grateful for any help towards the project goals.</p> -<p><b>This documentation is for version 6.3.9 (beta branch) of rsyslog.</b> +<p><b>This documentation is for version 6.3.10 (beta branch) of rsyslog.</b> Visit the <i><a href="http://www.rsyslog.com/status">rsyslog status page</a></i></b> to obtain current version information and project status. </p><p><b>If you like rsyslog, you might diff --git a/doc/v6compatibility.html b/doc/v6compatibility.html index 1f830854..058ab4f1 100644 --- a/doc/v6compatibility.html +++ b/doc/v6compatibility.html @@ -112,6 +112,15 @@ to spot why things went wrong (and if at all). <p>Due to their positive effect on performance and comparatively low overhead, default batch sizes have been increased. Starting with 6.3.4, the action queues have a default batch size of 128 messages. +<h2>Default action queue enqueue timeout</h2> +<p>This timeout previously was 2seconds, and has been reduced to 50ms (starting with 6.5.0). This change +was made as a long timeout will caused delays in the associated main queue, something +that was quite unexpected to users. Now, this can still happen, but the effect is much +less harsh (but still considerable on a busy system). Also, 50ms should be fairly enough +for most output sources, except when they are really broken (like network disconnect). If +they are really broken, even a 2second timeout does not help, so we hopefully get the best +of both worlds with the new timeout. A specific timeout can of course still be configured, +it is just the timeout that changed. <h2>outchannels</h2> <p>Outchannels are a to-be-removed feature of rsyslog, at least as far as the config syntax is concerned. Nevertheless, v6 still supports it, but a new syntax is required diff --git a/grammar/Makefile.am b/grammar/Makefile.am index 5911f443..d231bb46 100644 --- a/grammar/Makefile.am +++ b/grammar/Makefile.am @@ -13,7 +13,7 @@ libgrammar_la_SOURCES = \ grammar.h libgrammar_la_CPPFLAGS = $(RSRT_CFLAGS) -testdriver_SOURCES = testdriver.c libgrammar.la -testdriver_CPPFLAGS = $(RSRT_CFLAGS) -testdriver_LDADD = libgrammar.la -testdriver_LDFLAGS = -lestr +#testdriver_SOURCES = testdriver.c libgrammar.la +#testdriver_CPPFLAGS = $(RSRT_CFLAGS) +#testdriver_LDADD = libgrammar.la +#testdriver_LDFLAGS = -lestr diff --git a/grammar/rainerscript.c b/grammar/rainerscript.c index ce0298fb..bcdbdf3b 100644 --- a/grammar/rainerscript.c +++ b/grammar/rainerscript.c @@ -623,14 +623,13 @@ cnfactlstAddSysline(struct cnfactlst* actlst, char *line) struct cnfcfsyslinelst *cflst; if((cflst = malloc(sizeof(struct cnfcfsyslinelst))) != NULL) { - cflst->next = NULL; cflst->line = line; if(actlst->syslines == NULL) { - actlst->syslines = cflst; + cflst->next = NULL; } else { cflst->next = actlst->syslines; - actlst->syslines = cflst; } + actlst->syslines = cflst; } return actlst; } @@ -1135,8 +1134,9 @@ cnfexprEval(struct cnfexpr *expr, struct var *ret, void* usrptr) ret->d.n = 1ll; else ret->d.n = 0ll; + if(r.datatype == 'S') es_deleteStr(r.d.estr); } - FREE_BOTH_RET; + if(l.datatype == 'S') es_deleteStr(l.d.estr); break; case AND: cnfexprEval(expr->l, &l, usrptr); @@ -1147,10 +1147,11 @@ cnfexprEval(struct cnfexpr *expr, struct var *ret, void* usrptr) ret->d.n = 1ll; else ret->d.n = 0ll; + if(r.datatype == 'S') es_deleteStr(r.d.estr); } else { ret->d.n = 0ll; } - FREE_BOTH_RET; + if(l.datatype == 'S') es_deleteStr(l.d.estr); break; case NOT: cnfexprEval(expr->r, &r, usrptr); @@ -1415,6 +1416,9 @@ cnfrulePrint(struct cnfrule *rule) dbgprintf("------ end rule %p\n", rule); } +/* note: the sysline itself was already freed during processing + * and as such MUST NOT be freed again! + */ void cnfcfsyslinelstDestruct(struct cnfcfsyslinelst *cfslst) { @@ -1422,7 +1426,6 @@ cnfcfsyslinelstDestruct(struct cnfcfsyslinelst *cfslst) while(cfslst != NULL) { toDel = cfslst; cfslst = cfslst->next; - free(toDel->line); free(toDel); } } diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 4fec8e70..0abde84a 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -59,6 +59,7 @@ typedef struct configSettings_s { int iFacility; int iSeverity; int bJSON; + int bCEE; } configSettings_t; struct modConfData_s { @@ -89,6 +90,7 @@ initConfigSettings(void) cs.iFacility = DEFAULT_FACILITY; cs.iSeverity = DEFAULT_SEVERITY; cs.bJSON = 0; + cs.bCEE = 0; } @@ -157,7 +159,13 @@ CODESTARTendCnfLoad loadModConf->iStatsInterval = cs.iStatsInterval; loadModConf->iFacility = cs.iFacility; loadModConf->iSeverity = cs.iSeverity; - loadModConf->statsFmt = cs.bJSON ? statsFmt_JSON : statsFmt_Legacy; + if (cs.bCEE == 1) { + loadModConf->statsFmt = statsFmt_CEE; + } else if (cs.bJSON == 1) { + loadModConf->statsFmt = statsFmt_JSON; + } else { + loadModConf->statsFmt = statsFmt_Legacy; + } ENDendCnfLoad @@ -259,6 +267,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatfacility", 0, eCmdHdlrInt, NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatseverity", 0, eCmdHdlrInt, NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatjson", 0, eCmdHdlrBinary, NULL, &cs.bJSON, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatcee", 0, eCmdHdlrBinary, NULL, &cs.bCEE, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(prop.Construct(&pInputName)); diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index ba323a94..90216b5b 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -1362,6 +1362,8 @@ CODESTARTendCnfLoad /* free legacy config vars */ free(cs.pszInputName); free(cs.lstnIP); + cs.pszInputName = NULL; + cs.lstnIP = NULL; ENDendCnfLoad diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c index 99fabd18..f6040b21 100644 --- a/plugins/imrelp/imrelp.c +++ b/plugins/imrelp/imrelp.c @@ -22,7 +22,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "config.h" #include <stdlib.h> #include <assert.h> @@ -35,6 +34,7 @@ #include <netdb.h> #include <sys/types.h> #include <sys/socket.h> +#include <signal.h> #include <librelp.h> #include "rsyslog.h" #include "dirty.h" @@ -236,13 +236,39 @@ BEGINfreeCnf CODESTARTfreeCnf ENDfreeCnf +/* This is used to terminate the plugin. Note that the signal handler blocks + * other activity on the thread. As such, it is safe to request the stop. When + * we terminate, relpEngine is called, and it's select() loop interrupted. But + * only *after this function is done*. So we do not have a race! + */ +static void +doSIGTTIN(int __attribute__((unused)) sig) +{ + DBGPRINTF("imrelp: termination requested via SIGTTIN - telling RELP engine\n"); + relpEngineSetStop(pRelpEngine); +} + + /* This function is called to gather input. */ BEGINrunInput + sigset_t sigSet; + struct sigaction sigAct; CODESTARTrunInput - /* TODO: we must be careful to start the listener here. Currently, tcpsrv.c seems to - * do that in ConstructFinalize + /* we want to support non-cancel input termination. To do so, we must signal librelp + * when to stop. As we run on the same thread, we need to register as SIGTTIN handler, + * which will be used to put the terminating condition into librelp. */ + sigfillset(&sigSet); + pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + sigemptyset(&sigSet); + sigaddset(&sigSet, SIGTTIN); + pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL); + memset(&sigAct, 0, sizeof (sigAct)); + sigemptyset(&sigAct.sa_mask); + sigAct.sa_handler = doSIGTTIN; + sigaction(SIGTTIN, &sigAct, NULL); + iRet = relpEngineRun(pRelpEngine); ENDrunInput @@ -284,12 +310,19 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus } +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 2d26e652..19028470 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -388,7 +388,7 @@ addListner(instanceConf_t *inst) if(inst->ratelimitInterval > 0) { if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { /* in this case, we simply turn off rate-limiting */ - dbgprintf("imuxsock: turning off rate limiting because we could not " + DBGPRINTF("imuxsock: turning off rate limiting because we could not " "create hash table\n"); inst->ratelimitInterval = 0; } @@ -458,7 +458,7 @@ createLogSocket(lstn_t *pLstn) if(pLstn->fd < 0 || bind(pLstn->fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 || chmod((char*)pLstn->sockName, 0666) < 0) { errmsg.LogError(errno, NO_ERRCODE, "cannot create '%s'", pLstn->sockName); - dbgprintf("cannot create %s (%d).\n", pLstn->sockName, errno); + DBGPRINTF("cannot create %s (%d).\n", pLstn->sockName, errno); if(pLstn->fd != -1) close(pLstn->fd); pLstn->fd = -1; @@ -491,7 +491,7 @@ openLogSocket(lstn_t *pLstn) /* ok, it matches -- just use as is */ pLstn->fd = fd; - dbgprintf("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n", + DBGPRINTF("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n", pLstn->sockName, pLstn->fd); break; } @@ -563,7 +563,7 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) rl = hashtable_search(pLstn->ht, &cred->pid); if(rl == NULL) { /* we need to add a new ratelimiter, process not seen before! */ - dbgprintf("imuxsock: no ratelimiter for pid %lu, creating one\n", + DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n", (unsigned long) cred->pid); STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters); CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t))); @@ -931,7 +931,7 @@ static rsRetVal readSocket(lstn_t *pLstn) msgh.msg_iovlen = 1; iRcvd = recvmsg(pLstn->fd, &msgh, MSG_DONTWAIT); - dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd); + DBGPRINTF("Message from UNIX socket: #%d\n", pLstn->fd); if(iRcvd > 0) { cred = NULL; ts = NULL; @@ -948,8 +948,6 @@ static rsRetVal readSocket(lstn_t *pLstn) if( pLstn->bUseSysTimeStamp && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) { ts = (struct timeval *)CMSG_DATA(cm); - dbgprintf("XXX: got timestamp %ld.%ld\n", - (long) ts->tv_sec, (long) ts->tv_usec); break; } # endif /* HAVE_SO_TIMESTAMP */ @@ -959,7 +957,7 @@ static rsRetVal readSocket(lstn_t *pLstn) } else if(iRcvd < 0 && errno != EINTR) { char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("UNIX socket error: %d = %s.\n", errno, errStr); + DBGPRINTF("UNIX socket error: %d = %s.\n", errno, errStr); errmsg.LogError(errno, NO_ERRCODE, "imuxsock: recvfrom UNIX"); } @@ -1025,7 +1023,7 @@ activateListeners() for (i = startIndexUxLocalSockets ; i < nfd ; i++) { if(openLogSocket(&(listeners[i])) == RS_RET_OK) { ++actSocks; - dbgprintf("imuxsock: Opened UNIX socket '%s' (fd %d).\n", + DBGPRINTF("imuxsock: Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd); } } @@ -1263,7 +1261,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(parser, CORE_COMPONENT)); - dbgprintf("imuxsock version %s initializing\n", PACKAGE_VERSION); + DBGPRINTF("imuxsock version %s initializing\n", PACKAGE_VERSION); /* init legacy config vars */ cs.pLogSockName = NULL; diff --git a/plugins/imzmq3/Makefile.am b/plugins/imzmq3/Makefile.am new file mode 100644 index 00000000..f9c84e5d --- /dev/null +++ b/plugins/imzmq3/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = imzmq3.la + +imzmq3_la_SOURCES = imzmq3.c +imzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS) +imzmq3_la_LDFLAGS = -module -avoid-version +imzmq3_la_LIBADD = $(CZMQ_LIBS) + +EXTRA_DIST = diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README new file mode 100644 index 00000000..88653b83 --- /dev/null +++ b/plugins/imzmq3/README @@ -0,0 +1,24 @@ +ZeroMQ 3.x Input Plugin + +Building this plugin: +Requires libzmq and libczmq. First, install libzmq from the HEAD on github: +http://github.com/zeromq/libzmq. You can clone the repository, build, then +install it. The directions for doing so are there in the readme. Then, do +the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 +version of libzmq will be released, and a supporting version of libczmq. +At that time, you could simply download and install the tarballs instead of +using git to clone the repositories. Those tarballs (when available) can +be found at http://download.zeromq.org. As of this writing (5/31/2012), the +most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile +properly. + +Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example +below binds a SUB socket to port 7172, and then any messages with the topic +"foo" will be pushed into rsyslog. + +Example Rsyslog.conf snippet: +------------------------------------------------------------------------------- + +$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo + +------------------------------------------------------------------------------- diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c new file mode 100644 index 00000000..78eee887 --- /dev/null +++ b/plugins/imzmq3/imzmq3.c @@ -0,0 +1,653 @@ +/* imzmq3.c + * + * This input plugin enables rsyslog to read messages from a ZeroMQ + * queue. + * + * Copyright 2012 Talksum, Inc. + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: David Kelly + * <davidk@talksum.com> + */ + +#include <assert.h> +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "rsyslog.h" + +#include "cfsysline.h" +#include "config.h" +#include "dirty.h" +#include "errmsg.h" +#include "glbl.h" +#include "module-template.h" +#include "msg.h" +#include "net.h" +#include "parser.h" +#include "prop.h" +#include "ruleset.h" +#include "srUtils.h" +#include "unicode-helper.h" + +#include <czmq.h> + +MODULE_TYPE_INPUT +MODULE_TYPE_NOKEEP + +/* convienent symbols to denote a socket we want to bind + * vs one we want to just connect to + */ +#define ACTION_CONNECT 1 +#define ACTION_BIND 2 + +/* Module static data */ +DEF_IMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) +DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) + + +/* ---------------------------------------------------------------------------- + * structs to describe sockets + */ +typedef struct _socket_type { + char* name; + int type; +} socket_type; + +/* more overkill, but seems nice to be consistent.*/ +typedef struct _socket_action { + char* name; + int action; +} socket_action; + +typedef struct _poller_data { + ruleset_t* ruleset; + thrdInfo_t* thread; +} poller_data; + +typedef struct _socket_info { + int type; + int action; + char* description; + int sndHWM; /* if you want more than 2^32 messages, */ + int rcvHWM; /* then pass in 0 (the default). */ + char* identity; + char** subscriptions; + ruleset_t* ruleset; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + +} socket_info; + + +/* ---------------------------------------------------------------------------- + * Static definitions/initializations. + */ +static socket_info* s_socketInfo = NULL; +static size_t s_nitems = 0; +static prop_t * s_namep = NULL; +static zloop_t* s_zloop = NULL; +static int s_io_threads = 1; +static zctx_t* s_context = NULL; +static ruleset_t* s_ruleset = NULL; +static socket_type socketTypes[] = { + {"SUB", ZMQ_SUB }, + {"PULL", ZMQ_PULL }, + {"XSUB", ZMQ_XSUB } +}; + +static socket_action socketActions[] = { + {"BIND", ACTION_BIND}, + {"CONNECT", ACTION_CONNECT}, +}; + + +/* ---------------------------------------------------------------------------- + * Helper functions + */ + +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ +static int getSocketType(char* name) { + int type = -1; + uint i; + + /* match name with known socket type */ + for(i=0; i<sizeof(socketTypes)/sizeof(socket_type); ++i) { + if( !strcmp(socketTypes[i].name, name) ) { + type = socketTypes[i].type; + break; + } + } + + /* whine if no match was found. */ + if (type == -1) + errmsg.LogError(0, NO_ERRCODE, "unknown type %s",name); + + return type; +} + + +static int getSocketAction(char* name) { + int action = -1; + uint i; + + /* match name with known socket action */ + for(i=0; i < sizeof(socketActions)/sizeof(socket_action); ++i) { + if(!strcmp(socketActions[i].name, name)) { + action = socketActions[i].action; + break; + } + } + + /* whine if no matching action was found */ + if (action == -1) + errmsg.LogError(0, NO_ERRCODE, "unknown action %s",name); + + return action; +} + + +static void setDefaults(socket_info* info) { + info->type = ZMQ_SUB; + info->action = ACTION_BIND; + info->description = NULL; + info->sndHWM = 0; + info->rcvHWM = 0; + info->identity = NULL; + info->subscriptions = NULL; + info->ruleset = NULL; + info->sndBuf = -1; + info->rcvBuf = -1; + info->linger = -1; + info->backlog = -1; + info->sndTimeout = -1; + info->rcvTimeout = -1; + info->maxMsgSize = -1; + info->rate = -1; + info->recoveryIVL = -1; + info->multicastHops = -1; + info->reconnectIVL = -1; + info->reconnectIVLMax = -1; + info->ipv4Only = -1; + info->affinity = -1; + +}; + + +/* The config string should look like: + * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'" + * + */ +static rsRetVal parseConfig(char* config, socket_info* info) { + int nsubs = 0; + + char* binding; + char* ptr1; + for (binding = strtok_r(config, ",", &ptr1); + binding != NULL; + binding = strtok_r(NULL, ",", &ptr1)) { + + /* Each binding looks like foo=bar */ + char * sep = strchr(binding, '='); + if (sep == NULL) + { + errmsg.LogError(0, NO_ERRCODE, + "Invalid argument format %s, ignoring ...", + binding); + continue; + } + + /* Replace '=' with '\0'. */ + *sep = '\0'; + + char * val = sep + 1; + + if (strcmp(binding, "action") == 0) { + info->action = getSocketAction(val); + } else if (strcmp(binding, "type") == 0) { + info->type = getSocketType(val); + } else if (strcmp(binding, "description") == 0) { + info->description = strdup(val); + } else if (strcmp(binding, "sndHWM") == 0) { + info->sndHWM = atoi(val); + } else if (strcmp(binding, "rcvHWM") == 0) { + info->sndHWM = atoi(val); + } else if (strcmp(binding, "subscribe") == 0) { + /* Add the subscription value to the list.*/ + char * substr = NULL; + substr = strdup(val); + info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1); + info->subscriptions[nsubs] = substr; + ++nsubs; + } else if (strcmp(binding, "sndBuf") == 0) { + info->sndBuf = atoi(val); + } else if (strcmp(binding, "rcvBuf") == 0) { + info->rcvBuf = atoi(val); + } else if (strcmp(binding, "linger") == 0) { + info->linger = atoi(val); + } else if (strcmp(binding, "backlog") == 0) { + info->backlog = atoi(val); + } else if (strcmp(binding, "sndTimeout") == 0) { + info->sndTimeout = atoi(val); + } else if (strcmp(binding, "rcvTimeout") == 0) { + info->rcvTimeout = atoi(val); + } else if (strcmp(binding, "maxMsgSize") == 0) { + info->maxMsgSize = atoi(val); + } else if (strcmp(binding, "rate") == 0) { + info->rate = atoi(val); + } else if (strcmp(binding, "recoveryIVL") == 0) { + info->recoveryIVL = atoi(val); + } else if (strcmp(binding, "multicastHops") == 0) { + info->multicastHops = atoi(val); + } else if (strcmp(binding, "reconnectIVL") == 0) { + info->reconnectIVL = atoi(val); + } else if (strcmp(binding, "reconnectIVLMax") == 0) { + info->reconnectIVLMax = atoi(val); + } else if (strcmp(binding, "ipv4Only") == 0) { + info->ipv4Only = atoi(val); + } else if (strcmp(binding, "affinity") == 0) { + info->affinity = atoi(val); + } else { + errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding); + return RS_RET_INVALID_PARAMS; + } + } + + return RS_RET_OK; +} + +static rsRetVal validateConfig(socket_info* info) { + + if (info->type == -1) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you entered an invalid type"); + return RS_RET_INVALID_PARAMS; + } + if (info->action == -1) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you entered an invalid action"); + return RS_RET_INVALID_PARAMS; + } + if (info->description == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you didn't enter a description"); + return RS_RET_INVALID_PARAMS; + } + if(info->type == ZMQ_SUB && info->subscriptions == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "SUB sockets need at least one subscription"); + return RS_RET_INVALID_PARAMS; + } + if(info->type != ZMQ_SUB && info->subscriptions != NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "only SUB sockets can have subscriptions"); + return RS_RET_INVALID_PARAMS; + } + return RS_RET_OK; +} + +static rsRetVal createContext() { + if (s_context == NULL) { + errmsg.LogError(0, NO_ERRCODE, "creating zctx."); + s_context = zctx_new(); + + if (s_context == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "zctx_new failed: %s", + strerror(errno)); + /* DK: really should do better than invalid params...*/ + return RS_RET_INVALID_PARAMS; + } + + if (s_io_threads > 1) { + errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads); + zctx_set_iothreads(s_context, s_io_threads); + } + } + return RS_RET_OK; +} + +static rsRetVal createSocket(socket_info* info, void** sock) { + size_t ii; + int rv; + + *sock = zsocket_new(s_context, info->type); + if (!sock) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zsocket_new failed: %s, for type %d", + strerror(errno),info->type); + /* DK: invalid params seems right here */ + return RS_RET_INVALID_PARAMS; + } + + /* Set options *before* the connect/bind. */ + if (info->identity) zsocket_set_identity(*sock, info->identity); + if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf); + if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf); + if (info->linger > -1) zsocket_set_linger(*sock, info->linger); + if (info->backlog > -1) zsocket_set_backlog(*sock, info->backlog); + if (info->sndTimeout > -1) zsocket_set_sndtimeo(*sock, info->sndTimeout); + if (info->rcvTimeout > -1) zsocket_set_rcvtimeo(*sock, info->rcvTimeout); + if (info->maxMsgSize > -1) zsocket_set_maxmsgsize(*sock, info->maxMsgSize); + if (info->rate > -1) zsocket_set_rate(*sock, info->rate); + if (info->recoveryIVL > -1) zsocket_set_recovery_ivl(*sock, info->recoveryIVL); + if (info->multicastHops > -1) zsocket_set_multicast_hops(*sock, info->multicastHops); + if (info->reconnectIVL > -1) zsocket_set_reconnect_ivl(*sock, info->reconnectIVL); + if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax); + if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only); + if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity); + + /* since HWM have defaults, we always set them. No return codes to check, either.*/ + zsocket_set_sndhwm(*sock, info->sndHWM); + zsocket_set_rcvhwm(*sock, info->rcvHWM); + + /* Set subscriptions.*/ + for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii) + zsocket_set_subscribe(*sock, info->subscriptions[ii]); + + + + /* Do the bind/connect... */ + if (info->action==ACTION_CONNECT) { + rv = zsocket_connect(*sock, info->description); + if (rv < 0) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zmq_connect using %s failed: %s", + info->description, strerror(errno)); + return RS_RET_INVALID_PARAMS; + } + } else { + rv = zsocket_bind(*sock, info->description); + if (rv <= 0) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zmq_bind using %s failed: %s", + info->description, strerror(errno)); + return RS_RET_INVALID_PARAMS; + } + } + return RS_RET_OK; +} + +/* ---------------------------------------------------------------------------- + * Module endpoints + */ + +/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note + * that this makes the assumption that after the bind ruleset is called in the config, + * another call will be made to add an endpoint. +*/ +static rsRetVal +set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) { + ruleset_t* ruleset_ptr; + rsRetVal localRet; + DEFiRet; + + localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName); + if(localRet == RS_RET_NOT_FOUND) { + errmsg.LogError(0, NO_ERRCODE, "error: " + "ruleset '%s' not found - ignored", pszName); + } + CHKiRet(localRet); + s_ruleset = ruleset_ptr; + DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName); + +finalize_it: + free(pszName); /* no longer needed */ + RETiRet; +} + +/* add an actual endpoint + */ +static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) { + DEFiRet; + + /* increment number of items and store old num items, as it will be handy.*/ + size_t idx = s_nitems++; + + /* allocate a new socket_info array to accomidate this new endpoint*/ + socket_info* tmpSocketInfo; + CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems)); + + /* copy existing socket_info across into new array, if any, and free old storage*/ + if(idx) { + memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx); + free(s_socketInfo); + } + + /* set the static to hold the new array */ + s_socketInfo = tmpSocketInfo; + + /* point to the new one */ + socket_info* sockInfo = &s_socketInfo[idx]; + + /* set defaults for the new socket info */ + setDefaults(sockInfo); + + /* Make a writeable copy of the string so we can use strtok + in the parseConfig call */ + char * copy = NULL; + CHKmalloc(copy = strdup((char *) valp)); + + /* parse the config string */ + CHKiRet(parseConfig(copy, sockInfo)); + + /* validate it */ + CHKiRet(validateConfig(sockInfo)); + + /* bind to the current ruleset (if any)*/ + sockInfo->ruleset = s_ruleset; + +finalize_it: + free(valp); /* in any case, this is no longer needed */ + RETiRet; +} + + +static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) { + msg_t* logmsg; + poller_data* pollerData = (poller_data*)pd; + + char* buf = zstr_recv(poller->socket); + if (msgConstruct(&logmsg) == RS_RET_OK) { + MsgSetRawMsg(logmsg, buf, strlen(buf)); + MsgSetInputName(logmsg, s_namep); + MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY); + MsgSetRuleset(logmsg, pollerData->ruleset); + logmsg->msgFlags = NEEDS_PARSING; + submitMsg(logmsg); + } + + if( pollerData->thread->bShallStop == TRUE) { + /* a handler that returns -1 will terminate the + czmq reactor loop + */ + return -1; + } + + return 0; +} + +/* called when runInput is called by rsyslog + */ +static rsRetVal rcv_loop(thrdInfo_t* pThrd){ + size_t i; + int rv; + zmq_pollitem_t* items; + poller_data* pollerData; + + DEFiRet; + + /* create the context*/ + CHKiRet(createContext()); + + /* create the poll items*/ + CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems)); + + /* create poller data (stuff to pass into the zmq closure called when we get a message)*/ + CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems)); + + /* loop through and initialize the poll items and poller_data arrays...*/ + for(i=0; i<s_nitems;++i) { + /* create the socket, update items.*/ + createSocket(&s_socketInfo[i], &items[i].socket); + items[i].events = ZMQ_POLLIN; + + /* now update the poller_data for this item */ + pollerData[i].thread = pThrd; + pollerData[i].ruleset = s_socketInfo[i].ruleset; + } + + s_zloop = zloop_new(); + for(i=0; i<s_nitems; ++i) { + + rv = zloop_poller(s_zloop, &items[i], handlePoll, &pollerData[i]); + if (rv) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu", i); + } + } + zloop_start(s_zloop); + zloop_destroy(&s_zloop); + finalize_it: + for(i=0; i< s_nitems; ++i) { + zsocket_destroy(s_context, items[i].socket); + } + + zctx_destroy(&s_context); + + free(items); + RETiRet; +} + +/* ---------------------------------------------------------------------------- + * input module functions + */ + +BEGINrunInput +CODESTARTrunInput + iRet = rcv_loop(pThrd); +ENDrunInput + + +/* initialize and return if will run or not */ +BEGINwillRun +CODESTARTwillRun + /* we need to create the inputName property (only once during our + lifetime) */ + CHKiRet(prop.Construct(&s_namep)); + CHKiRet(prop.SetString(s_namep, + UCHAR_CONSTANT("imzmq3"), + sizeof("imzmq3") - 1)); + CHKiRet(prop.ConstructFinalize(s_namep)); + +/* If there are no endpoints this is pointless ...*/ + if (s_nitems == 0) + ABORT_FINALIZE(RS_RET_NO_RUN); + +finalize_it: +ENDwillRun + + +BEGINafterRun +CODESTARTafterRun + /* do cleanup here */ + if(s_namep != NULL) + prop.Destruct(&s_namep); +ENDafterRun + + +BEGINmodExit +CODESTARTmodExit + /* release what we no longer need */ + objRelease(errmsg, CORE_COMPONENT); + objRelease(glbl, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); +ENDmodExit + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, + void __attribute__((unused)) *pVal) { + return RS_RET_OK; +} +static rsRetVal setGlobalWorkerThreads(uchar __attribute__((unused)) *pp, int val) { + errmsg.LogError(0, NO_ERRCODE, "setGlobalWorkerThreads called with %d",val); + s_io_threads = val; + return RS_RET_OK; +} + +BEGINmodInit() +CODESTARTmodInit + /* we only support the current interface specification */ + *ipIFVersProvided = CURR_MOD_IF_VERSION; +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverbindruleset", + 0, eCmdHdlrGetWord, + set_ruleset, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverrun", + 0, eCmdHdlrGetWord, + add_endpoint, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", + 1, eCmdHdlrCustomHandler, + resetConfigVariables, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3globalWorkerThreads", + 1, eCmdHdlrInt, + setGlobalWorkerThreads, NULL, + STD_LOADABLE_MODULE_ID)); +ENDmodInit diff --git a/plugins/ommongodb/README b/plugins/ommongodb/README index 71d56cfa..7581131a 100644 --- a/plugins/ommongodb/README +++ b/plugins/ommongodb/README @@ -2,21 +2,6 @@ plugin to use MongoDB as backend. tested in ubuntu 10.04 and ubuntu 10.10 -BUILDING THIS PLUGIN -Right now, it seems to be necessary to copy the 10gen c-driver directly under -the ./plugins/ommongodb subdirectory. Then, you need to follow their build -instructions on how to build the mongodb c driver: -http://api.mongodb.org/c/current/building.html - -This is clumpsy, and if someone has ideas on how to improve this situation, -please drop us a line. For obvious reasons, ./configure does not detect -a missing mongodb c driver. - -In order to successfully build ommongodb, you NEED to use the v0.2 version of -the mongo c driver. As it looks, the driver breaks API compatibility and the -curret v0.4 driver seems to have a totally different API (at least this is -what I currently (2012-03-08) see. - configuration: in your /etc/rsyslog.conf, together with other modules: diff --git a/plugins/omudpspoof/omudpspoof.c b/plugins/omudpspoof/omudpspoof.c index 9a469c16..fadb2c7b 100644 --- a/plugins/omudpspoof/omudpspoof.c +++ b/plugins/omudpspoof/omudpspoof.c @@ -132,6 +132,7 @@ ENDinitConfVars /* add some variables needed for libnet */ libnet_t *libnet_handle; char errbuf[LIBNET_ERRBUF_SIZE]; +pthread_mutex_t mutLibnet; /* forward definitions */ static rsRetVal doTryResume(instanceData *pData); @@ -194,6 +195,8 @@ ENDdbgPrintInstInfo /* Send a message via UDP + * Note: libnet is not thread-safe, so we need to ensure that only one + * instance ever is calling libnet code. * rgehards, 2007-12-20 */ static inline rsRetVal @@ -207,6 +210,7 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) struct sockaddr_in *tempaddr,source_ip; libnet_ptag_t ip, ipo; libnet_ptag_t udp; + sbool bNeedUnlock = 0; DEFiRet; if(pData->pSockArray == NULL) { @@ -221,6 +225,8 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) inet_pton(AF_INET, (char*)pszSourcename, &(source_ip.sin_addr)); bSendSuccess = FALSE; + d_pthread_mutex_lock(&mutLibnet); + bNeedUnlock = 1; for (r = pData->f_addr; r; r = r->ai_next) { tempaddr = (struct sockaddr_in *)r->ai_addr; libnet_clear_packet(libnet_handle); @@ -280,6 +286,9 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) } finalize_it: + if(bNeedUnlock) { + d_pthread_mutex_unlock(&mutLibnet); + } RETiRet; } @@ -453,6 +462,7 @@ BEGINmodExit CODESTARTmodExit /* destroy the libnet state needed for forged UDP sources */ libnet_destroy(libnet_handle); + pthread_mutex_destroy(&mutLibnet); /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); @@ -502,6 +512,7 @@ CODEmodInit_QueryRegCFSLineHdlr errmsg.LogError(0, NO_ERRCODE, "Error initializing libnet, can not continue "); ABORT_FINALIZE(RS_RET_ERR_LIBNET_INIT); } + pthread_mutex_init(&mutLibnet, NULL); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofdefaulttemplate", 0, eCmdHdlrGetWord, NULL, &cs.pszTplName, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofsourcenametemplate", 0, eCmdHdlrGetWord, NULL, &cs.pszSourceNameTemplate, NULL)); diff --git a/plugins/omzmq3/Makefile.am b/plugins/omzmq3/Makefile.am new file mode 100644 index 00000000..92cd7586 --- /dev/null +++ b/plugins/omzmq3/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omzmq3.la + +omzmq3_la_SOURCES = omzmq3.c +omzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS) +omzmq3_la_LDFLAGS = -module -avoid-version +omzmq3_la_LIBADD = $(CZMQ_LIBS) + +EXTRA_DIST = diff --git a/plugins/omzmq3/README b/plugins/omzmq3/README new file mode 100644 index 00000000..ccc96c74 --- /dev/null +++ b/plugins/omzmq3/README @@ -0,0 +1,25 @@ +ZeroMQ 3.x Output Plugin + +Building this plugin: +Requires libzmq and libczmq. First, install libzmq from the HEAD on github: +http://github.com/zeromq/libzmq. You can clone the repository, build, then +install it. The directions for doing so are there in the readme. Then, do +the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 +version of libzmq will be released, and a supporting version of libczmq. +At that time, you could simply download and install the tarballs instead of +using git to clone the repositories. Those tarballs (when available) can +be found at http://download.zeromq.org. As of this writing (5/31/2012), the +most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile +properly. + +Omzmq3 allows you to push data out of rsyslog from a zeromq socket. The example +below binds a PUB socket to port 7171, and any message fitting the criteria will +be output to the zmq socket. + +Example Rsyslog.conf snippet (NOTE: v6 format): +------------------------------------------------------------------------------- +if $msg then { + action(type="omzmq3", sockType="PUB", action="BIND", + description="tcp://*:7172) +} +------------------------------------------------------------------------------- diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c new file mode 100644 index 00000000..e13011fb --- /dev/null +++ b/plugins/omzmq3/omzmq3.c @@ -0,0 +1,462 @@ +/* omzmq3.c + * Copyright 2012 Talksum, Inc + * Using the czmq interface to zeromq, we output + * to a zmq socket. + + +* +* This program is free software: you can redistribute it and/or +* modify it under the terms of the GNU Lesser General Public License +* as published by the Free Software Foundation, either version 3 of +* the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public +* License along with this program. If not, see +* <http://www.gnu.org/licenses/>. +* +* Author: David Kelly +* <davidk@talksum.com> +*/ + + +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <unistd.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "cfsysline.h" + +#include <czmq.h> + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omzmq3") + +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) + +/* convienent symbols to denote a socket we want to bind + vs one we want to just connect to +*/ +#define ACTION_CONNECT 1 +#define ACTION_BIND 2 + + +/* ---------------------------------------------------------------------------- + * structs to describe sockets + */ +struct socket_type { + char* name; + int type; +}; + +/* more overkill, but seems nice to be consistent. */ +struct socket_action { + char* name; + int action; +}; + +typedef struct _instanceData { + void* socket; + uchar* description; + int type; + int action; + int sndHWM; + int rcvHWM; + uchar* identity; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + uchar* tplName; +} instanceData; + + +/* ---------------------------------------------------------------------------- + * Static definitions/initializations + */ + +/* only 1 zctx for all the sockets, with an adjustable number of + worker threads which may be useful if we use affinity in particular + sockets +*/ +static zctx_t* s_context = NULL; +static int s_workerThreads = -1; + +static struct socket_type types[] = { + {"PUB", ZMQ_PUB }, + {"PUSH", ZMQ_PUSH }, + {"XPUB", ZMQ_XPUB } +}; + +static struct socket_action actions[] = { + {"BIND", ACTION_BIND}, + {"CONNECT", ACTION_CONNECT}, +}; + +static struct cnfparamdescr actpdescr[] = { + { "description", eCmdHdlrGetWord, 0 }, + { "sockType", eCmdHdlrGetWord, 0 }, + { "action", eCmdHdlrGetWord, 0 }, + { "sndHWM", eCmdHdlrInt, 0 }, + { "rcvHWM", eCmdHdlrInt, 0 }, + { "identity", eCmdHdlrGetWord, 0 }, + { "sndBuf", eCmdHdlrInt, 0 }, + { "rcvBuf", eCmdHdlrInt, 0 }, + { "linger", eCmdHdlrInt, 0 }, + { "backlog", eCmdHdlrInt, 0 }, + { "sndTimeout", eCmdHdlrInt, 0 }, + { "rcvTimeout", eCmdHdlrInt, 0 }, + { "maxMsgSize", eCmdHdlrInt, 0 }, + { "rate", eCmdHdlrInt, 0 }, + { "recoveryIVL", eCmdHdlrInt, 0 }, + { "multicastHops", eCmdHdlrInt, 0 }, + { "reconnectIVL", eCmdHdlrInt, 0 }, + { "reconnectIVLMax", eCmdHdlrInt, 0 }, + { "ipv4Only", eCmdHdlrInt, 0 }, + { "affinity", eCmdHdlrInt, 0 }, + { "globalWorkerThreads", eCmdHdlrInt, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; + +static struct cnfparamblk actpblk = { + CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr +}; + +/* ---------------------------------------------------------------------------- + * Helper Functions + */ + +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ +int getSocketType(char* name) { + int type = -1; + uint i; + for(i=0; i<sizeof(types)/sizeof(struct socket_type); ++i) { + if( !strcmp(types[i].name, name) ) { + type = types[i].type; + break; + } + } + return type; +} + + +static int getSocketAction(char* name) { + int action = -1; + uint i; + for(i=0; i < sizeof(actions)/sizeof(struct socket_action); ++i) { + if(!strcmp(actions[i].name, name)) { + action = actions[i].action; + break; + } + } + return action; +} + +/* closeZMQ will destroy the context and + * associated socket + */ +static void closeZMQ(instanceData* pData) { + errmsg.LogError(0, NO_ERRCODE, "closeZMQ called"); + if(s_context && pData->socket) { + if(pData->socket != NULL) { + zsocket_destroy(s_context, pData->socket); + } + } +} + + +static rsRetVal initZMQ(instanceData* pData) { + DEFiRet; + + /* create the context if necessary. */ + if (NULL == s_context) { + s_context = zctx_new(); + if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads); + } + + pData->socket = zsocket_new(s_context, pData->type); + + /* ALWAYS set the HWM as the zmq3 default is 1000 and we default + to 0 (infinity) */ + zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); + zsocket_set_sndhwm(pData->socket, pData->sndHWM); + + /* use czmq defaults for these, unless set to non-default values */ + if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity); + if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf); + if(pData->rcvBuf > -1) zsocket_set_sndbuf(pData->socket, pData->rcvBuf); + if(pData->linger > -1) zsocket_set_linger(pData->socket, pData->linger); + if(pData->backlog > -1) zsocket_set_backlog(pData->socket, pData->backlog); + if(pData->sndTimeout > -1) zsocket_set_sndtimeo(pData->socket, pData->sndTimeout); + if(pData->rcvTimeout > -1) zsocket_set_rcvtimeo(pData->socket, pData->rcvTimeout); + if(pData->maxMsgSize > -1) zsocket_set_maxmsgsize(pData->socket, pData->maxMsgSize); + if(pData->rate > -1) zsocket_set_rate(pData->socket, pData->rate); + if(pData->recoveryIVL > -1) zsocket_set_recovery_ivl(pData->socket, pData->recoveryIVL); + if(pData->multicastHops > -1) zsocket_set_multicast_hops(pData->socket, pData->multicastHops); + if(pData->reconnectIVL > -1) zsocket_set_reconnect_ivl(pData->socket, pData->reconnectIVL); + if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax); + if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only); + if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity); + + /* bind or connect to it */ + if (pData->action == ACTION_BIND) { + /* bind asserts, so no need to test return val here + which isn't the greatest api -- oh well */ + zsocket_bind(pData->socket, (char*)pData->description); + } else { + if(zsocket_connect(pData->socket, (char*)pData->description) == -1) { + errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + } + finalize_it: + RETiRet; +} + +rsRetVal writeZMQ(uchar* msg, instanceData* pData) { + DEFiRet; + + /* initialize if necessary */ + if(NULL == pData->socket) + CHKiRet(initZMQ(pData)); + + /* send it */ + int result = zstr_send(pData->socket, (char*)msg); + + /* whine if things went wrong */ + if (result == -1) { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result); + ABORT_FINALIZE(RS_RET_ERR); + } + finalize_it: + RETiRet; +} + +static inline void +setInstParamDefaults(instanceData* pData) { + pData->description = (uchar*)"tcp://*:7171"; + pData->socket = NULL; + pData->tplName = NULL; + pData->type = ZMQ_PUB; + pData->action = ACTION_BIND; + pData->sndHWM = 0; /*unlimited*/ + pData->rcvHWM = 0; /*unlimited*/ + pData->identity = NULL; + pData->sndBuf = -1; + pData->rcvBuf = -1; + pData->linger = -1; + pData->backlog = -1; + pData->sndTimeout = -1; + pData->rcvTimeout = -1; + pData->maxMsgSize = -1; + pData->rate = -1; + pData->recoveryIVL = -1; + pData->multicastHops = -1; + pData->reconnectIVL = -1; + pData->reconnectIVLMax = -1; + pData->ipv4Only = -1; + pData->affinity = 1; +} + + +/* ---------------------------------------------------------------------------- + * Output Module Functions + */ + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINfreeInstance +CODESTARTfreeInstance + closeZMQ(pData); + free(pData->description); + free(pData->tplName); +ENDfreeInstance + +BEGINtryResume +CODESTARTtryResume + if(NULL == pData->socket) + iRet = initZMQ(pData); +ENDtryResume + +BEGINdoAction +CODESTARTdoAction +iRet = writeZMQ(ppString[0], pData); +ENDdoAction + + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst +if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + +CHKiRet(createInstance(&pData)); +setInstParamDefaults(pData); + +CODE_STD_STRING_REQUESTparseSelectorAct(1) +for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "description")) { + pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "sockType")){ + pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(actpblk.descr[i].name, "action")){ + pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { + pData->sndHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { + pData->rcvHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "identity")){ + pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { + pData->sndBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { + pData->rcvBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "linger")) { + pData->linger = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "backlog")) { + pData->backlog = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { + pData->sndTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { + pData->rcvTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { + pData->maxMsgSize = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rate")) { + pData->rate = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { + pData->recoveryIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { + pData->multicastHops = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { + pData->reconnectIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { + pData->reconnectIVLMax = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { + pData->ipv4Only = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "affinity")) { + pData->affinity = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { + s_workerThreads = (int) pvals[i].val.d.n; + } else { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + +if(pData->tplName == NULL) { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); + } else { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); + } + +if(pData->type == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } +if(pData->action == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + +BEGINparseSelectorAct +CODESTARTparseSelectorAct + +/* tell the engine we only want one template string */ +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1)) + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omzmq3 supports only v6 config format, use: " + "action(type=\"omzmq3\" serverport=...)"); + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINinitConfVars /* (re)set config variables to defaults */ +CODESTARTinitConfVars +s_workerThreads = -1; +ENDinitConfVars + +BEGINmodExit +CODESTARTmodExit +if(NULL != s_context) { + zctx_destroy(&s_context); + s_context=NULL; + } +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +ENDqueryEtryPt + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); + DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION); + +INITLegCnfVars +CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); +ENDmodInit + + + diff --git a/runtime/cfsysline.c b/runtime/cfsysline.c index af88b3de..d8c33169 100644 --- a/runtime/cfsysline.c +++ b/runtime/cfsysline.c @@ -346,11 +346,12 @@ static int doParseOnOffOption(uchar **pp) */ static rsRetVal doGetGID(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *pVal) { - struct group *pgBuf; + struct group *pgBuf = NULL; struct group gBuf; DEFiRet; uchar szName[256]; - char stringBuf[2048]; /* I hope this is large enough... */ + int bufSize = 2048; + char * stringBuf = NULL; assert(pp != NULL); assert(*pp != NULL); @@ -360,7 +361,17 @@ static rsRetVal doGetGID(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p ABORT_FINALIZE(RS_RET_NOT_FOUND); } - getgrnam_r((char*)szName, &gBuf, stringBuf, sizeof(stringBuf), &pgBuf); + + CHKmalloc(stringBuf = malloc(bufSize)); + while(pgBuf == NULL) { + errno = 0; + getgrnam_r((char*)szName, &gBuf, stringBuf, bufSize, &pgBuf); + if((pgBuf == NULL) && (errno == ERANGE)) { + /* Increase bufsize and try again.*/ + bufSize *= 2; + CHKmalloc(stringBuf = realloc(stringBuf, bufSize)); + } + } if(pgBuf == NULL) { errmsg.LogError(0, RS_RET_NOT_FOUND, "ID for group '%s' could not be found or error", (char*)szName); @@ -379,6 +390,7 @@ static rsRetVal doGetGID(uchar **pp, rsRetVal (*pSetHdlr)(void*, uid_t), void *p skipWhiteSpace(pp); /* skip over any whitespace */ finalize_it: + free(stringBuf); RETiRet; } diff --git a/runtime/debug.c b/runtime/debug.c index 955076e2..edc4a255 100644 --- a/runtime/debug.c +++ b/runtime/debug.c @@ -68,7 +68,7 @@ static int bPrintAllDebugOnExit = 0; static int bAbortTrace = 1; /* print a trace after SIGABRT or SIGSEGV */ static char *pszAltDbgFileName = NULL; /* if set, debug output is *also* sent to here */ static int altdbg = -1; /* and the handle for alternate debug output */ -static int stddbg; +int stddbg = 1; /* the handle for regular debug output, set to stdout if not forking, -1 otherwise */ /* list of files/objects that should be printed */ typedef struct dbgPrintName_s { @@ -1303,8 +1303,6 @@ dbgGetRuntimeOptions(void) uchar *optname; /* set some defaults */ - stddbg = 1; - if((pszOpts = (uchar*) getenv("RSYSLOG_DEBUG")) != NULL) { /* we have options set, so let's process them */ while(dbgGetRTOptNamVal(&pszOpts, &optname, &optval)) { diff --git a/runtime/debug.h b/runtime/debug.h index 717c0fef..5bd26bd8 100644 --- a/runtime/debug.h +++ b/runtime/debug.h @@ -35,6 +35,7 @@ /* external static data elements (some time to be replaced) */ extern int Debug; /* debug flag - read-only after startup */ extern int debugging_on; /* read-only, except on sig USR1 */ +extern int stddbg; /* the handle for regular debug output, set to stdout if not forking, -1 otherwise */ /* data types */ @@ -105,8 +106,13 @@ void dbgPrintAllDebugInfo(void); void *dbgmalloc(size_t size); /* macros */ -#define DBGPRINTF(...) if(Debug) { dbgprintf(__VA_ARGS__); } -#define DBGOPRINT(...) if(Debug) { dbgoprint(__VA_ARGS__); } +#ifdef DEBUGLESS +# define DBGPRINTF(...) {} +# define DBGOPRINT(...) {} +#else +# define DBGPRINTF(...) if(Debug) { dbgprintf(__VA_ARGS__); } +# define DBGOPRINT(...) if(Debug) { dbgoprint(__VA_ARGS__); } +#endif #ifdef RTINST # define BEGINfunc static dbgFuncDB_t *pdbgFuncDB; int dbgCALLStaCK_POP_POINT = dbgEntrFunc(&pdbgFuncDB, __FILE__, __func__, __LINE__); # define ENDfunc dbgExitFunc(pdbgFuncDB, dbgCALLStaCK_POP_POINT, RS_RET_NO_IRET); diff --git a/runtime/modules.c b/runtime/modules.c index 5a87c8be..dac3bd95 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -67,14 +67,6 @@ DEFobjCurrIf(errmsg) DEFobjCurrIf(parser) DEFobjCurrIf(strgen) -/* we must ensure that only one thread at one time tries to load or unload - * modules, otherwise we may see race conditions. This first came up with - * imdiag/imtcp, which both use the same stream drivers. Below is the mutex - * for that handling. - * rgerhards, 2009-05-25 - */ -static pthread_mutex_t mutLoadUnload; - static modInfo_t *pLoadedModules = NULL; /* list of currently-loaded modules */ static modInfo_t *pLoadedModulesLast = NULL; /* tail-pointer */ @@ -819,7 +811,7 @@ modUnlinkAndDestroy(modInfo_t **ppThis) pThis = *ppThis; assert(pThis != NULL); - pthread_mutex_lock(&mutLoadUnload); + pthread_mutex_lock(&mutObjGlobalOp); /* first check if we are permitted to unload */ if(pThis->eType == eMOD_LIB) { @@ -855,7 +847,7 @@ modUnlinkAndDestroy(modInfo_t **ppThis) moduleDestruct(pThis); finalize_it: - pthread_mutex_unlock(&mutLoadUnload); + pthread_mutex_unlock(&mutObjGlobalOp); RETiRet; } @@ -970,7 +962,7 @@ Load(uchar *pModName, sbool bConfLoad) */ # define PATHBUF_OVERHEAD 1 + iModNameLen + 3 + 1 - pthread_mutex_lock(&mutLoadUnload); + pthread_mutex_lock(&mutObjGlobalOp); if(iModNameLen > 3 && !strcmp((char *) pModName + iModNameLen - 3, ".so")) { iModNameLen -= 3; @@ -1096,7 +1088,7 @@ Load(uchar *pModName, sbool bConfLoad) finalize_it: if(pPathBuf != pathBuf) /* used malloc()ed memory? */ free(pPathBuf); - pthread_mutex_unlock(&mutLoadUnload); + pthread_mutex_unlock(&mutObjGlobalOp); RETiRet; } @@ -1193,16 +1185,6 @@ CODESTARTObjClassExit(module) /* release objects we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(parser, CORE_COMPONENT); - /* We have a problem in our reference counting, which leads to this function - * being called too early. This usually is no problem, but if we destroy - * the mutex object, we get into trouble. So rather than finding the root cause, - * we do not release the mutex right now and have a very, very slight leak. - * We know that otherwise no bad effects happen, so this acceptable for the - * time being. -- rgerhards, 2009-05-25 - * - * TODO: add again: pthread_mutex_destroy(&mutLoadUnload); - */ - free(pModDir); # ifdef DEBUG modUsrPrintAll(); /* debug aid - TODO: integrate with debug.c, at least the settings! */ @@ -1246,7 +1228,6 @@ ENDobjQueryInterface(module) */ BEGINAbstractObjClassInit(module, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE class also in END MACRO! */ uchar *pModPath; - pthread_mutexattr_t mutAttr; /* use any module load path specified in the environment */ if((pModPath = (uchar*) getenv("RSYSLOG_MODDIR")) != NULL) { @@ -1264,10 +1245,6 @@ BEGINAbstractObjClassInit(module, 1, OBJ_IS_CORE_MODULE) /* class, version - CHA SetModDir(glblModPath); } - pthread_mutexattr_init(&mutAttr); - pthread_mutexattr_settype(&mutAttr, PTHREAD_MUTEX_RECURSIVE); - pthread_mutex_init(&mutLoadUnload, &mutAttr); - /* request objects we use */ CHKiRet(objUse(errmsg, CORE_COMPONENT)); ENDObjClassInit(module) diff --git a/runtime/msg.c b/runtime/msg.c index 4ad2b89e..ec3dbfa1 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -1004,11 +1004,15 @@ msg_t* MsgDup(msg_t* pOld) } else { tmpCOPYSZ(RawMsg); } - if(pOld->iLenHOSTNAME < CONF_HOSTNAME_BUFSIZE) { - memcpy(pNew->szHOSTNAME, pOld->szHOSTNAME, pOld->iLenHOSTNAME + 1); - pNew->pszHOSTNAME = pNew->szHOSTNAME; + if(pOld->pszHOSTNAME == NULL) { + pNew->pszHOSTNAME = NULL; } else { - tmpCOPYSZ(HOSTNAME); + if(pOld->iLenHOSTNAME < CONF_HOSTNAME_BUFSIZE) { + memcpy(pNew->szHOSTNAME, pOld->szHOSTNAME, pOld->iLenHOSTNAME + 1); + pNew->pszHOSTNAME = pNew->szHOSTNAME; + } else { + tmpCOPYSZ(HOSTNAME); + } } tmpCOPYCSTR(ProgName); @@ -1638,7 +1642,7 @@ char *getPROCID(msg_t *pM, sbool bLockMutex) MsgLock(pM); preparePROCID(pM, MUTEX_ALREADY_LOCKED); if(pM->pCSPROCID == NULL) - pszRet = UCHAR_CONSTANT(""); + pszRet = UCHAR_CONSTANT("-"); else pszRet = rsCStrGetSzStrNoNULL(pM->pCSPROCID); if(bLockMutex == LOCK_MUTEX) @@ -2749,6 +2753,10 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, *pbMustBeFreed = 0; break; case PROP_SYS_UPTIME: +# ifdef OS_SOLARIS + pRes = (uchar*) "UPTIME NOT available under Solaris"; + *pbMustBeFreed = 0; +# else { struct sysinfo s_info; @@ -2764,6 +2772,7 @@ uchar *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe, snprintf((char*) pRes, sizeof(uchar) * 32, "%ld", s_info.uptime); } +# endif break; default: /* there is no point in continuing, we may even otherwise render the diff --git a/runtime/obj.c b/runtime/obj.c index 93fbd281..b2739c58 100644 --- a/runtime/obj.c +++ b/runtime/obj.c @@ -96,7 +96,7 @@ DEFobjCurrIf(module) DEFobjCurrIf(errmsg) DEFobjCurrIf(strm) static objInfo_t *arrObjInfo[OBJ_NUM_IDS]; /* array with object information pointers */ -static pthread_mutex_t mutObjGlobalOp; /* mutex to guard global operations of the object system */ +pthread_mutex_t mutObjGlobalOp; /* mutex to guard global operations of the object system */ /* cookies for serialized lines */ @@ -1318,7 +1318,7 @@ objClassInit(modInfo_t *pModInfo) } /* the mutex must be recursive, because objects may call into other - * object identifieres recursively. + * object identifiers recursively. */ pthread_mutexattr_init(&mutAttr); pthread_mutexattr_settype(&mutAttr, PTHREAD_MUTEX_RECURSIVE); diff --git a/runtime/obj.h b/runtime/obj.h index be97f20e..32f7ef09 100644 --- a/runtime/obj.h +++ b/runtime/obj.h @@ -122,4 +122,8 @@ rsRetVal objGetObjInterface(obj_if_t *pIf); PROTOTYPEObjClassInit(obj); PROTOTYPEObjClassExit(obj); + +/* the following definition is only for "friends" */ +extern pthread_mutex_t mutObjGlobalOp; /* mutex to guard global operations of the object system */ + #endif /* #ifndef OBJ_H_INCLUDED */ diff --git a/runtime/parser.c b/runtime/parser.c index a79f2ce8..f0515484 100644 --- a/runtime/parser.c +++ b/runtime/parser.c @@ -180,7 +180,7 @@ AddDfltParser(uchar *pName) CHKiRet(FindParser(&pParser, pName)); CHKiRet(AddParserToList(&pDfltParsLst, pParser)); - dbgprintf("Parser '%s' added to default parser set.\n", pName); + DBGPRINTF("Parser '%s' added to default parser set.\n", pName); finalize_it: RETiRet; @@ -209,7 +209,7 @@ finalize_it: BEGINobjDestruct(parser) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(parser) - dbgprintf("destructing parser '%s'\n", pThis->pName); + DBGPRINTF("destructing parser '%s'\n", pThis->pName); free(pThis->pName); ENDobjDestruct(parser) @@ -521,7 +521,7 @@ ParseMsg(msg_t *pMsg) bIsSanitized = TRUE; } localRet = pParser->pModule->mod.pm.parse(pMsg); - dbgprintf("Parser '%s' returned %d\n", pParser->pName, localRet); + DBGPRINTF("Parser '%s' returned %d\n", pParser->pName, localRet); if(localRet != RS_RET_COULD_NOT_PARSE) break; pParserList = pParserList->pNext; diff --git a/runtime/queue.c b/runtime/queue.c index 2b017747..34935403 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -131,7 +131,7 @@ static void displayBatchState(batch_t *pBatch) { int i; for(i = 0 ; i < pBatch->nElem ; ++i) { - dbgprintf("XXXXX: displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); + DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); } } @@ -1418,7 +1418,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); -dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + DBGPRINTF("delete batch from store, new sizes: log %d, phys %d\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ RETiRet; @@ -1454,7 +1455,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else { /* can not delete, insert into to-delete list */ - dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); + DBGPRINTF("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); } @@ -1484,7 +1485,6 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { -dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state); localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*) pUsr)); ++nEnqueued; @@ -1495,7 +1495,7 @@ dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch objDestruct(pUsr); } - dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); if(nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued); @@ -2462,21 +2462,27 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) while( (pThis->iMaxQueueSize > 0 && pThis->iQueueSize >= pThis->iMaxQueueSize) || (pThis->qType == QUEUETYPE_DISK && pThis->sizeOnDiskMax != 0 && pThis->tVars.disk.sizeOnDisk > pThis->sizeOnDiskMax)) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting to drain.\n"); - if(glbl.GetGlobalInputTermState()) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL, discard due to FORCE_TERM.\n"); - ABORT_FINALIZE(RS_RET_FORCE_TERM); - } - timeoutComp(&t, pThis->toEnq); STATSCOUNTER_INC(pThis->ctrFull, pThis->mutCtrFull); -// TODO : handle enqOnly => discard! - if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { - DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); + if(pThis->toEnq == 0 || pThis->bEnqOnly) { + DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); objDestruct(pUsr); ABORT_FINALIZE(RS_RET_QUEUE_FULL); - } + } else { + DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq); + if(glbl.GetGlobalInputTermState()) { + DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL, discard due to FORCE_TERM.\n"); + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } + timeoutComp(&t, pThis->toEnq); + if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { + DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); + STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); + objDestruct(pUsr); + ABORT_FINALIZE(RS_RET_QUEUE_FULL); + } dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); + } } /* and finally enqueue the message */ @@ -2674,7 +2680,7 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals) } else if(!strcmp(pblk.descr[i].name, "queuedequeuetimend.")) { pThis->iDeqtWinToHr = pvals[i].val.d.n; } else { - dbgprintf("queue: program error, non-handled " + DBGPRINTF("queue: program error, non-handled " "param '%s'\n", pblk.descr[i].name); } } diff --git a/runtime/rsconf.c b/runtime/rsconf.c index 460e69d6..c1af7d22 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -432,6 +432,7 @@ void cnfDoCfsysline(char *ln) DBGPRINTF("cnf:global:cfsysline: %s\n", ln); /* the legacy system needs the "$" stripped */ conf.cfsysline((uchar*) ln+1); + free(ln); } void cnfDoBSDTag(char *ln) @@ -685,9 +686,10 @@ runInputModules(void) node = module.GetNxtCnfType(runConf, NULL, eMOD_IN); while(node != NULL) { if(node->canRun) { - DBGPRINTF("running module %s with config %p\n", node->pMod->pszName, node); bNeedsCancel = (node->pMod->isCompatibleWithFeature(sFEATURENonCancelInputTermination) == RS_RET_OK) ? 0 : 1; + DBGPRINTF("running module %s with config %p, term mode: %s\n", node->pMod->pszName, node, + bNeedsCancel ? "cancel" : "cooperative/SIGTTIN"); thrdCreate(node->pMod->mod.im.runInput, node->pMod->mod.im.afterRun, bNeedsCancel, (node->pMod->cnfName == NULL) ? node->pMod->pszName : node->pMod->cnfName); } diff --git a/runtime/rule.c b/runtime/rule.c index 18199230..41cc9e4e 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -132,14 +132,14 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg) } else if(pRule->eHostnameCmpMode == HN_COMP_MATCH) { if(rsCStrSzStrCmp(pRule->pCSHostnameComp, (uchar*) getHOSTNAME(pMsg), getHOSTNAMELen(pMsg))) { /* not equal, so we are already done... */ - dbgprintf("hostname filter '+%s' does not match '%s'\n", + DBGPRINTF("hostname filter '+%s' does not match '%s'\n", rsCStrGetSzStrNoNULL(pRule->pCSHostnameComp), getHOSTNAME(pMsg)); FINALIZE; } } else { /* must be -hostname */ if(!rsCStrSzStrCmp(pRule->pCSHostnameComp, (uchar*) getHOSTNAME(pMsg), getHOSTNAMELen(pMsg))) { - /* not equal, so we are already done... */ - dbgprintf("hostname filter '-%s' does not match '%s'\n", + /* not equal, SO WE ARe already done... */ + DBGPRINTF("hostname filter '-%s' does not match '%s'\n", rsCStrGetSzStrNoNULL(pRule->pCSHostnameComp), getHOSTNAME(pMsg)); FINALIZE; } @@ -171,7 +171,7 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg) if(pRule->f_filter_type == FILTER_PRI) { /* skip messages that are incorrect priority */ - dbgprintf("testing filter, f_pmask %d\n", pRule->f_filterData.f_pmask[pMsg->iFacility]); + DBGPRINTF("testing filter, f_pmask %d\n", pRule->f_filterData.f_pmask[pMsg->iFacility]); if ( (pRule->f_filterData.f_pmask[pMsg->iFacility] == TABLE_NOPRI) || \ ((pRule->f_filterData.f_pmask[pMsg->iFacility] & (1<<pMsg->iSeverity)) == 0) ) bRet = 0; @@ -179,7 +179,7 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg) bRet = 1; } else if(pRule->f_filter_type == FILTER_EXPR) { bRet = cnfexprEvalBool(pRule->f_filterData.expr, pMsg); - dbgprintf("result of rainerscript filter evaluation: %d\n", bRet); + DBGPRINTF("result of rainerscript filter evaluation: %d\n", bRet); } else { assert(pRule->f_filter_type == FILTER_PROP); /* assert() just in case... */ pszPropVal = MsgGetProp(pMsg, NULL, pRule->f_filterData.prop.propID, @@ -230,21 +230,21 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg) char *cstr; if(pRule->f_filterData.prop.propID == PROP_CEE) { cstr = es_str2cstr(pRule->f_filterData.prop.propName, NULL); - dbgprintf("Filter: check for CEE property '%s' (value '%s') ", + DBGPRINTF("Filter: check for CEE property '%s' (value '%s') ", cstr, pszPropVal); free(cstr); } else { - dbgprintf("Filter: check for property '%s' (value '%s') ", + DBGPRINTF("Filter: check for property '%s' (value '%s') ", propIDToName(pRule->f_filterData.prop.propID), pszPropVal); } if(pRule->f_filterData.prop.isNegated) - dbgprintf("NOT "); + DBGPRINTF("NOT "); if(pRule->f_filterData.prop.operation == FIOP_ISEMPTY) { - dbgprintf("%s : %s\n", + DBGPRINTF("%s : %s\n", getFIOPName(pRule->f_filterData.prop.operation), bRet ? "TRUE" : "FALSE"); } else { - dbgprintf("%s '%s': %s\n", + DBGPRINTF("%s '%s': %s\n", getFIOPName(pRule->f_filterData.prop.operation), rsCStrGetSzStrNoNULL(pRule->f_filterData.prop.pCSCompValue), bRet ? "TRUE" : "FALSE"); diff --git a/runtime/ruleset.c b/runtime/ruleset.c index c384663a..ecded4a3 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -143,9 +143,9 @@ DEFFUNC_llExecFunc(processBatchDoRules) { rsRetVal iRet; ISOBJ_TYPE_assert(pData, rule); - dbgprintf("Processing next rule\n"); + DBGPRINTF("Processing next rule\n"); iRet = rule.ProcessBatch((rule_t*) pData, (batch_t*) pParam); -dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet); + DBGPRINTF("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet); return iRet; } @@ -266,7 +266,7 @@ addRule(ruleset_t *pThis, rule_t **ppRule) rule.Destruct(ppRule); } else { CHKiRet(llAppend(&pThis->llRules, NULL, *ppRule)); - dbgprintf("selector line successfully processed, %d actions\n", iActionCnt); + DBGPRINTF("selector line successfully processed, %d actions\n", iActionCnt); } finalize_it: @@ -337,7 +337,7 @@ SetDefaultRuleset(rsconf_t *conf, uchar *pszName) CHKiRet(rulesetGetRuleset(conf, &pRuleset, pszName)); conf->rulesets.pDflt = pRuleset; - dbgprintf("default rule set changed to %p: '%s'\n", pRuleset, pszName); + DBGPRINTF("default rule set changed to %p: '%s'\n", pRuleset, pszName); finalize_it: RETiRet; @@ -355,7 +355,7 @@ SetCurrRuleset(rsconf_t *conf, uchar *pszName) CHKiRet(rulesetGetRuleset(conf, &pRuleset, pszName)); conf->rulesets.pCurr = pRuleset; - dbgprintf("current rule set changed to %p: '%s'\n", pRuleset, pszName); + DBGPRINTF("current rule set changed to %p: '%s'\n", pRuleset, pszName); finalize_it: RETiRet; @@ -414,7 +414,7 @@ finalize_it: /* destructor for the ruleset object */ BEGINobjDestruct(ruleset) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(ruleset) - dbgprintf("destructing ruleset %p, name %p\n", pThis, pThis->pszName); + DBGPRINTF("destructing ruleset %p, name %p\n", pThis, pThis->pszName); if(pThis->pQueue != NULL) { qqueueDestruct(&pThis->pQueue); } @@ -515,7 +515,7 @@ doRulesetCreateQueue(rsconf_t *conf, int *pNewVal) if(pNewVal == 0) FINALIZE; /* if it is turned off, we do not need to change anything ;) */ - dbgprintf("adding a ruleset-specific \"main\" queue"); + DBGPRINTF("adding a ruleset-specific \"main\" queue"); rulesetMainQName = (conf->rulesets.pCurr->pszName == NULL)? UCHAR_CONSTANT("ruleset") : conf->rulesets.pCurr->pszName; CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rulesetMainQName)); @@ -560,8 +560,7 @@ doRulesetAddParser(rsconf_t *conf, uchar *pName) CHKiRet(parser.AddParserToList(&conf->rulesets.pCurr->pParserLst, pParser)); - dbgprintf("added parser '%s' to ruleset '%s'\n", pName, conf->rulesets.pCurr->pszName); -RUNLOG_VAR("%p", conf->rulesets.pCurr->pParserLst); + DBGPRINTF("added parser '%s' to ruleset '%s'\n", pName, conf->rulesets.pCurr->pszName); finalize_it: d_free(pName); /* no longer needed */ diff --git a/runtime/statsobj.c b/runtime/statsobj.c index a21614f6..25275616 100644 --- a/runtime/statsobj.c +++ b/runtime/statsobj.c @@ -168,15 +168,18 @@ finalize_it: /* get all the object's countes together as CEE. */ static rsRetVal -getStatsLineCEE(statsobj_t *pThis, cstr_t **ppcstr) +getStatsLineCEE(statsobj_t *pThis, cstr_t **ppcstr, int cee_cookie) { cstr_t *pcstr; ctr_t *pCtr; DEFiRet; CHKiRet(cstrConstruct(&pcstr)); - rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("@cee: {"), 7); + if (cee_cookie == 1) + rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("@cee: "), 6); + + rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("{"), 1); rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("\""), 1); rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("name"), 4); rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("\""), 1); @@ -273,8 +276,11 @@ getAllStatsLines(rsRetVal(*cb)(void*, cstr_t*), void *usrptr, statsFmtType_t fmt case statsFmt_Legacy: CHKiRet(getStatsLine(o, &cstr)); break; + case statsFmt_CEE: + CHKiRet(getStatsLineCEE(o, &cstr, 1)); + break; case statsFmt_JSON: - CHKiRet(getStatsLineCEE(o, &cstr)); + CHKiRet(getStatsLineCEE(o, &cstr, 0)); break; } CHKiRet(cb(usrptr, cstr)); diff --git a/runtime/statsobj.h b/runtime/statsobj.h index f7de68ee..14b33215 100644 --- a/runtime/statsobj.h +++ b/runtime/statsobj.h @@ -46,7 +46,8 @@ typedef enum statsCtrType_e { /* stats line format types */ typedef enum statsFmtType_e { statsFmt_Legacy, - statsFmt_JSON + statsFmt_JSON, + statsFmt_CEE } statsFmtType_t; diff --git a/runtime/wti.c b/runtime/wti.c index e44086af..382f3668 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -121,7 +121,7 @@ wtiWakeupThrd(wti_t *pThis) if(wtiGetState(pThis)) { /* we first try the cooperative "cancel" interface */ pthread_kill(pThis->thrdID, SIGTTIN); - dbgprintf("sent SIGTTIN to worker thread 0x%x\n", (unsigned) pThis->thrdID); + DBGPRINTF("sent SIGTTIN to worker thread 0x%x\n", (unsigned) pThis->thrdID); } RETiRet; @@ -148,13 +148,13 @@ wtiCancelThrd(wti_t *pThis) if(wtiGetState(pThis)) { /* we first try the cooperative "cancel" interface */ pthread_kill(pThis->thrdID, SIGTTIN); - dbgprintf("sent SIGTTIN to worker thread 0x%x, giving it a chance to terminate\n", (unsigned) pThis->thrdID); + DBGPRINTF("sent SIGTTIN to worker thread 0x%x, giving it a chance to terminate\n", (unsigned) pThis->thrdID); srSleep(0, 10000); } if(wtiGetState(pThis)) { - dbgprintf("cooperative worker termination failed, using cancellation...\n"); - dbgoprint((obj_t*) pThis, "canceling worker thread\n"); + DBGPRINTF("cooperative worker termination failed, using cancellation...\n"); + DBGOPRINT((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ while(wtiGetState(pThis)) { @@ -195,7 +195,7 @@ wtiConstructFinalize(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); - dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); /* initialize our thread instance descriptor (no concurrency here) */ pThis->bIsRunning = FALSE; @@ -257,7 +257,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } - dbgoprint((obj_t*) pThis, "worker awoke from idle processing\n"); + DBGOPRINT((obj_t*) pThis, "worker awoke from idle processing\n"); ENDfunc } @@ -300,7 +300,7 @@ wtiWorker(wti_t *pThis) if(terminateRet == RS_RET_TERMINATE_NOW) { /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); - dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", + DBGOPRINT((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", localRet); d_pthread_mutex_unlock(pWtp->pmutUsr); break; @@ -318,7 +318,7 @@ wtiWorker(wti_t *pThis) } else if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { d_pthread_mutex_unlock(pWtp->pmutUsr); - dbgoprint((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n", + DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n", terminateRet, bInactivityTOOccured); break; /* end of loop */ } diff --git a/tcps_sess.c b/tcps_sess.c index eb3740e4..e7149cb7 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -323,7 +323,7 @@ PrepareClose(tcps_sess_t *pThis) * of message may occur. As such, we process the message in * this case. */ - dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); + DBGPRINTF("Extra data at end of stream in legacy syslog/tcp message - processing\n"); datetime.getCurrTime(&stTime, &ttGenTime); defaultDoSubmitMessage(pThis, &stTime, ttGenTime, NULL); } @@ -382,21 +382,21 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt if(isdigit(c)) { pThis->iOctetsRemain = pThis->iOctetsRemain * 10 + c - '0'; } else { /* done with the octet count, so this must be the SP terminator */ - dbgprintf("TCP Message with octet-counter, size %d.\n", pThis->iOctetsRemain); + DBGPRINTF("TCP Message with octet-counter, size %d.\n", pThis->iOctetsRemain); if(c != ' ') { errmsg.LogError(0, NO_ERRCODE, "Framing Error in received TCP message: " "delimiter is not SP but has ASCII value %d.\n", c); } if(pThis->iOctetsRemain < 1) { /* TODO: handle the case where the octet count is 0! */ - dbgprintf("Framing Error: invalid octet count\n"); + DBGPRINTF("Framing Error: invalid octet count\n"); errmsg.LogError(0, NO_ERRCODE, "Framing Error in received TCP message: " "invalid octet count %d.\n", pThis->iOctetsRemain); } else if(pThis->iOctetsRemain > iMaxLine) { /* while we can not do anything against it, we can at least log an indication * that something went wrong) -- rgerhards, 2008-03-14 */ - dbgprintf("truncating message with %d octets - max msg size is %d\n", + DBGPRINTF("truncating message with %d octets - max msg size is %d\n", pThis->iOctetsRemain, iMaxLine); errmsg.LogError(0, NO_ERRCODE, "received oversize message: size is %d bytes, " "max msg size is %d, truncating...\n", pThis->iOctetsRemain, iMaxLine); @@ -407,7 +407,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt assert(pThis->inputState == eInMsg); if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ - dbgprintf("error: message received is larger than max msg size, we split it\n"); + DBGPRINTF("error: message received is larger than max msg size, we split it\n"); defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good @@ -596,7 +596,6 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); if(iRet == RS_RET_OK) { if(pPoll != NULL) { - dbgprintf("XXXXXX: processWorksetItem trying nspoll.ctl\n"); CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); } DBGPRINTF("New session created with NSD %p.\n", pNewSess); @@ -661,7 +660,7 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t int origEntries = numEntries; DEFiRet; - dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); + DBGPRINTF("tcpsrv: ready to process %d event entries\n", numEntries); while(numEntries > 0) { if(glbl.GetGlobalInputTermState() == 1) @@ -862,21 +861,21 @@ Run(tcpsrv_t *pThis) } if(localRet != RS_RET_OK) { /* fall back to select */ - dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); + DBGPRINTF("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); FINALIZE; } - dbgprintf("tcpsrv uses epoll() interface, nsdpoll driver found\n"); + DBGPRINTF("tcpsrv uses epoll() interface, nsdpoll driver found\n"); /* flag that we are in epoll mode */ pThis->bUsingEPoll = TRUE; /* Add the TCP listen sockets to the list of sockets to monitor */ for(i = 0 ; i < pThis->iLstnCurr ; ++i) { - dbgprintf("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn); + DBGPRINTF("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn); CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD)); - dbgprintf("Added listener %d\n", i); + DBGPRINTF("Added listener %d\n", i); } while(1) { @@ -1064,7 +1063,7 @@ static rsRetVal SetKeepAlive(tcpsrv_t *pThis, int iVal) { DEFiRet; - dbgprintf("tcpsrv: keep-alive set to %d\n", iVal); + DBGPRINTF("tcpsrv: keep-alive set to %d\n", iVal); pThis->bUseKeepAlive = iVal; RETiRet; } diff --git a/tests/Makefile.am b/tests/Makefile.am index 50ce2e0b..aff44eef 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -65,7 +65,6 @@ TESTS += \ failover-no-basic.sh \ rcvr_fail_restore.sh \ linkedlistqueue.sh -endif if HAVE_VALGRIND TESTS += \ @@ -76,7 +75,8 @@ TESTS += \ failover-no-basic-vg.sh \ failover-no-rptd-vg.sh \ tcp-msgreduc-vg.sh -endif +endif # HAVE_VALGRIND +endif # ENABLE_IMDIAG if ENABLE_MYSQL_TESTS diff --git a/tests/imuxsock_ccmiddle_root.sh b/tests/imuxsock_ccmiddle_root.sh index b487611a..7f255bd0 100755 --- a/tests/imuxsock_ccmiddle_root.sh +++ b/tests/imuxsock_ccmiddle_root.sh @@ -2,6 +2,9 @@ # carry out this test echo \[imuxsock_ccmiddle_root.sh\]: test trailing LF handling in imuxsock echo This test must be run as root with no other active syslogd +if [ "$EUID" -ne 0 ]; then + exit 77 # Not root, skip this test +fi source $srcdir/diag.sh init source $srcdir/diag.sh startup imuxsock_ccmiddle_root.conf # send a message with trailing LF diff --git a/tests/imuxsock_logger_root.sh b/tests/imuxsock_logger_root.sh index 377999f7..0902d797 100755 --- a/tests/imuxsock_logger_root.sh +++ b/tests/imuxsock_logger_root.sh @@ -2,6 +2,9 @@ # carry out this test. echo \[imuxsock_logger_root.sh\]: test trailing LF handling in imuxsock echo This test must be run as root with no other active syslogd +if [ "$EUID" -ne 0 ]; then + exit 77 # Not root, skip this test +fi source $srcdir/diag.sh init source $srcdir/diag.sh startup imuxsock_logger_root.conf # send a message with trailing LF diff --git a/tests/imuxsock_traillf_root.sh b/tests/imuxsock_traillf_root.sh index 1b821ee7..0141a626 100755 --- a/tests/imuxsock_traillf_root.sh +++ b/tests/imuxsock_traillf_root.sh @@ -2,6 +2,9 @@ # carry out this test echo \[imuxsock_traillf_root.sh\]: test trailing LF handling in imuxsock echo This test must be run as root with no other active syslogd +if [ "$EUID" -ne 0 ]; then + exit 77 # Not root, skip this test +fi source $srcdir/diag.sh init source $srcdir/diag.sh startup imuxsock_traillf_root.conf # send a message with trailing LF diff --git a/tests/sndrcv_udp.sh b/tests/sndrcv_udp.sh index 274a414a..df37782c 100755 --- a/tests/sndrcv_udp.sh +++ b/tests/sndrcv_udp.sh @@ -7,4 +7,7 @@ # This file is part of the rsyslog project, released under GPLv3 echo =============================================================================== echo \[sndrcv_udp.sh\]: testing sending and receiving via udp +if [ "$EUID" -ne 0 ]; then + exit 77 # Not root, skip this test +fi source $srcdir/sndrcv_drvr.sh sndrcv_udp 50 diff --git a/tools/omfile.c b/tools/omfile.c index 68222883..0b20b1f6 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -647,7 +647,7 @@ doWrite(instanceData *pData, uchar *pszBuf, int lenBuf) ASSERT(pData != NULL); ASSERT(pszBuf != NULL); -dbgprintf("write to stream, pData->pStrm %p, lenBuf %d\n", pData->pStrm, lenBuf); + DBGPRINTF("write to stream, pData->pStrm %p, lenBuf %d\n", pData->pStrm, lenBuf); if(pData->pStrm != NULL){ CHKiRet(strm.Write(pData->pStrm, pszBuf, lenBuf)); FINALIZE; diff --git a/tools/pmrfc3164.c b/tools/pmrfc3164.c index 2657780d..bcded428 100644 --- a/tools/pmrfc3164.c +++ b/tools/pmrfc3164.c @@ -79,7 +79,7 @@ BEGINparse uchar bufParseTAG[CONF_TAG_MAXSIZE]; uchar bufParseHOSTNAME[CONF_HOSTNAME_MAXSIZE]; CODESTARTparse - dbgprintf("Message will now be parsed by the legacy syslog parser (one size fits all... ;)).\n"); + DBGPRINTF("Message will now be parsed by the legacy syslog parser (one size fits all... ;)).\n"); assert(pMsg != NULL); assert(pMsg->pszRawMsg != NULL); lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */ @@ -229,7 +229,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(parser, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); - dbgprintf("rfc3164 parser init called\n"); + DBGPRINTF("rfc3164 parser init called\n"); bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */ diff --git a/tools/syslogd.c b/tools/syslogd.c index 06fd4cd4..82ad79b9 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -448,7 +448,6 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags) MsgSetRawMsgWOSize(pMsg, (char*)msg); MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())); MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp()); -dbgprintf("ZZZZ: pLocalHostIPIF used!\n"); MsgSetRcvFromIP(pMsg, glbl.GetLocalHostIP()); MsgSetMSGoffs(pMsg, 0); /* check if we have an error code associated and, if so, @@ -849,6 +848,11 @@ die(int sig) errno = 0; logmsgInternal(NO_ERRCODE, LOG_SYSLOG|LOG_INFO, (uchar*)buf, 0); } + /* we sleep for 50ms to give the queue a chance to pick up the exit message; + * otherwise we have seen cases where the message did not make it to log + * files, even on idle systems. + */ + srSleep(0, 50); /* drain queue (if configured so) and stop main queue worker thread pool */ DBGPRINTF("Terminating main queue...\n"); @@ -1663,7 +1667,7 @@ doGlblProcessInit(void) if( !(Debug == DEBUG_FULL || NoFork) ) { - DBGPRINTF("Checking pidfile.\n"); + DBGPRINTF("Checking pidfile '%s'.\n", PidFile); if (!check_pid(PidFile)) { memset(&sigAct, 0, sizeof (sigAct)); @@ -1671,6 +1675,9 @@ doGlblProcessInit(void) sigAct.sa_handler = doexit; sigaction(SIGTERM, &sigAct, NULL); + /* stop writing debug messages to stdout (if debugging is on) */ + stddbg = -1; + if (fork()) { /* Parent process */ @@ -1733,7 +1740,7 @@ doGlblProcessInit(void) } /* tuck my process id away */ - DBGPRINTF("Writing pidfile %s.\n", PidFile); + DBGPRINTF("Writing pidfile '%s'.\n", PidFile); if (!check_pid(PidFile)) { if (!write_pid(PidFile)) |