diff options
-rw-r--r-- | ChangeLog | 11 | ||||
-rw-r--r-- | Makefile.am | 8 | ||||
-rw-r--r-- | action.c | 2 | ||||
-rw-r--r-- | configure.ac | 63 | ||||
-rw-r--r-- | doc/v6compatibility.html | 9 | ||||
-rw-r--r-- | grammar/rainerscript.c | 162 | ||||
-rw-r--r-- | grammar/rainerscript.h | 6 | ||||
-rw-r--r-- | plugins/impstats/impstats.c | 11 | ||||
-rw-r--r-- | plugins/imrelp/imrelp.c | 39 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 18 | ||||
-rw-r--r-- | plugins/imzmq3/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/imzmq3/README | 24 | ||||
-rw-r--r-- | plugins/imzmq3/imzmq3.c | 653 | ||||
-rw-r--r-- | plugins/omzmq3/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/omzmq3/README | 25 | ||||
-rw-r--r-- | plugins/omzmq3/omzmq3.c | 462 | ||||
-rw-r--r-- | runtime/debug.h | 9 | ||||
-rw-r--r-- | runtime/parser.c | 6 | ||||
-rw-r--r-- | runtime/queue.c | 12 | ||||
-rw-r--r-- | runtime/rsconf.c | 3 | ||||
-rw-r--r-- | runtime/rule.c | 23 | ||||
-rw-r--r-- | runtime/ruleset.c | 17 | ||||
-rw-r--r-- | runtime/statsobj.c | 12 | ||||
-rw-r--r-- | runtime/statsobj.h | 3 | ||||
-rw-r--r-- | runtime/wti.c | 16 | ||||
-rw-r--r-- | tcps_sess.c | 10 | ||||
-rw-r--r-- | tcpsrv.c | 13 | ||||
-rw-r--r-- | tools/omfile.c | 4 | ||||
-rw-r--r-- | tools/pmrfc3164.c | 4 | ||||
-rw-r--r-- | tools/syslogd.c | 3 |
30 files changed, 1563 insertions, 81 deletions
@@ -1,4 +1,13 @@ --------------------------------------------------------------------------- +Version 6.5.0 [devel] 2012-0?-?? +- imrelp now supports non-cancel thread termination + (but now requires at least librelp 1.0.1) +- added --enable-debugless configure option for very high demanding envs + This actually at compile time disables a lot of debug code, resulting + in some speedup (but serious loss of debugging capabilities) +- added new 0mq plugins (via czmq lib) + Thanks to David Kelly for contributing these modules +--------------------------------------------------------------------------- Version 6.3.11 [BETA] 2012-06-?? - bugfix: expression-based filters with AND/OR could segfault due to a problem with boolean shortcut operations. From the user's @@ -528,6 +537,8 @@ Version 5.9.8 [V5-BETA], 2012-05-?? If it is not present, it must have the nilvalue "-" as of RFC5424 closes: http://bugzilla.adiscon.com/show_bug.cgi?id=332 Thanks to John N for reporting this issue. +- bugfix: "last message repeated n times" message was missing hostname + Thanks to Zdenek Salvet for finding this bug and to Bodik for reporting --------------------------------------------------------------------------- Version 5.9.7 [V5-BETA], 2012-05-10 - added capability to specify substrings for field extraction mode diff --git a/Makefile.am b/Makefile.am index befc1d44..eeb50311 100644 --- a/Makefile.am +++ b/Makefile.am @@ -160,6 +160,14 @@ if ENABLE_OMHIREDIS SUBDIRS += plugins/omhiredis endif +if ENABLE_OMZMQ3 +SUBDIRS += plugins/omzmq3 +endif + +if ENABLE_IMZMQ3 +SUBDIRS += plugins/imzmq3 +endif + if ENABLE_OMUXSOCK SUBDIRS += plugins/omuxsock endif @@ -262,7 +262,7 @@ actionResetQueueParams(void) cs.bActionQSyncQeueFiles = 0; cs.iActionQtoQShutdown = 0; /* queue shutdown */ cs.iActionQtoActShutdown = 1000; /* action shutdown (in phase 2) */ - cs.iActionQtoEnq = 2000; /* timeout for queue enque */ + cs.iActionQtoEnq = 50; /* timeout for queue enque */ cs.iActionQtoWrkShutdown = 60000; /* timeout for worker thread shutdown */ cs.iActionQWrkMinMsgs = 100; /* minimum messages per worker needed to start a new one */ cs.bActionQSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ diff --git a/configure.ac b/configure.ac index 498b59d8..589b1715 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ(2.61) -AC_INIT([rsyslog],[6.3.10],[rsyslog@lists.adiscon.com]) +AC_INIT([rsyslog],[6.5.0],[rsyslog@lists.adiscon.com]) AM_INIT_AUTOMAKE m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])]) @@ -445,6 +445,22 @@ if test "$enable_rtinst" = "yes"; then fi +# total debugless: highest performance, but no way at all to enable debug +# logging +AC_ARG_ENABLE(debugless, + [AS_HELP_STRING([--enable-debugless],[Enable runtime instrumentation mode @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_debugless="yes" ;; + no) enable_debugless="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-debugless) ;; + esac], + [enable_debugless="no"] +) +if test "$enable_debugless" = "yes"; then + AC_DEFINE(DEBUGLESS, 1, [Defined if debugless mode is enabled.]) +fi + + # valgrind AC_ARG_ENABLE(valgrind, [AS_HELP_STRING([--enable-valgrind],[Enable valgrind support settings @<:@default=no@:>@])], @@ -904,7 +920,7 @@ AC_ARG_ENABLE(relp, [enable_relp=no] ) if test "x$enable_relp" = "xyes"; then - PKG_CHECK_MODULES(RELP, relp >= 0.1.1) + PKG_CHECK_MODULES(RELP, relp >= 1.0.1) fi AM_CONDITIONAL(ENABLE_RELP, test x$enable_relp = xyes) AC_SUBST(RELP_CFLAGS) @@ -1257,6 +1273,44 @@ fi AM_CONDITIONAL(ENABLE_OMMONGODB, test x$enable_ommongodb = xyes) # end of mongodb code +# BEGIN ZMQ3 INPUT SUPPORT +AC_ARG_ENABLE(imzmq3, + [AS_HELP_STRING([--enable-imzmq3],[Compiles imzmq3 output module @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_imzmq3="yes" ;; + no) enable_imzmq3="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-imzmq3) ;; + esac], + [enable_imzmq3=no] +) +if test "x$enable_imzmq3" = "xyes"; then + PKG_CHECK_MODULES(CZMQ, libczmq >= 1.1.0) + AC_SUBST(CZMQ_CFLAGS) + AC_SUBST(CZMQ_LIBS) +fi +AM_CONDITIONAL(ENABLE_IMZMQ3, test x$enable_imzmq3 = xyes) + +# END ZMQ3 INPUT SUPPORT + +# BEGIN ZMQ3 OUTPUT SUPPORT +AC_ARG_ENABLE(omzmq3, + [AS_HELP_STRING([--enable-omzmq3],[Compiles omzmq3 output module @<:@default=no@:>@])], + [case "${enableval}" in + yes) enable_omzmq3="yes" ;; + no) enable_omzmq3="no" ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-omzmq3) ;; + esac], + [enable_omzmq3=no] +) +if test "x$enable_omzmq3" = "xyes"; then + PKG_CHECK_MODULES(CZMQ, libczmq >= 1.1.0) + AC_SUBST(CZMQ_CFLAGS) + AC_SUBST(CZMQ_LIBS) +fi +AM_CONDITIONAL(ENABLE_OMZMQ3, test x$enable_omzmq3 = xyes) + +# END ZMQ3 SUPPORT + # HIREDIS SUPPORT AC_ARG_ENABLE(omhiredis, @@ -1307,6 +1361,7 @@ AC_CONFIG_FILES([Makefile \ plugins/impstats/Makefile \ plugins/imrelp/Makefile \ plugins/imdiag/Makefile \ + plugins/imzmq3/Makefile \ plugins/omtesting/Makefile \ plugins/omgssapi/Makefile \ plugins/ommysql/Makefile \ @@ -1319,6 +1374,7 @@ AC_CONFIG_FILES([Makefile \ plugins/omudpspoof/Makefile \ plugins/ommongodb/Makefile \ plugins/omhiredis/Makefile \ + plugins/omzmq3/Makefile \ plugins/mmnormalize/Makefile \ plugins/mmjsonparse/Makefile \ plugins/mmaudit/Makefile \ @@ -1351,6 +1407,7 @@ echo " imdiag enabled: $enable_imdiag" echo " file input module enabled: $enable_imfile" echo " Solaris input module enabled: $enable_imsolaris" echo " periodic statistics module enabled: $enable_impstats" +echo " imzmq3 input module enabled: $enable_imzmq3" echo echo "---{ output plugins }---" echo " Mail support enabled: $enable_mail" @@ -1361,6 +1418,7 @@ echo " omelasticsearch module will be compiled: $enable_elasticsearch" echo " omruleset module will be compiled: $enable_omruleset" echo " omudpspoof module will be compiled: $enable_omudpspoof" echo " omuxsock module will be compiled: $enable_omuxsock" +echo " omzmq3 module will be compiled: $enable_omzmq3" echo echo "---{ parser modules }---" echo " pmrfc3164sd module will be compiled: $enable_pmrfc3164sd" @@ -1398,6 +1456,7 @@ echo " Extended Testbench enabled: $enable_extended_tests" echo " MySQL Tests enabled: $enable_mysql_tests" echo " Debug mode enabled: $enable_debug" echo " Runtime Instrumentation enabled: $enable_rtinst" +echo " (total) debugless mode enabled: $enable_debugless" echo " Diagnostic tools enabled: $enable_diagtools" echo " Enhanced memory checking enabled: $enable_memcheck" echo " Valgrind support settings enabled: $enable_valgrind" diff --git a/doc/v6compatibility.html b/doc/v6compatibility.html index 1f830854..058ab4f1 100644 --- a/doc/v6compatibility.html +++ b/doc/v6compatibility.html @@ -112,6 +112,15 @@ to spot why things went wrong (and if at all). <p>Due to their positive effect on performance and comparatively low overhead, default batch sizes have been increased. Starting with 6.3.4, the action queues have a default batch size of 128 messages. +<h2>Default action queue enqueue timeout</h2> +<p>This timeout previously was 2seconds, and has been reduced to 50ms (starting with 6.5.0). This change +was made as a long timeout will caused delays in the associated main queue, something +that was quite unexpected to users. Now, this can still happen, but the effect is much +less harsh (but still considerable on a busy system). Also, 50ms should be fairly enough +for most output sources, except when they are really broken (like network disconnect). If +they are really broken, even a 2second timeout does not help, so we hopefully get the best +of both worlds with the new timeout. A specific timeout can of course still be configured, +it is just the timeout that changed. <h2>outchannels</h2> <p>Outchannels are a to-be-removed feature of rsyslog, at least as far as the config syntax is concerned. Nevertheless, v6 still supports it, but a new syntax is required diff --git a/grammar/rainerscript.c b/grammar/rainerscript.c index a5cc10c2..be8272b4 100644 --- a/grammar/rainerscript.c +++ b/grammar/rainerscript.c @@ -41,6 +41,11 @@ #include "grammar.h" #include "queue.h" #include "srUtils.h" +#include "regexp.h" +#include "obj.h" + +DEFobjCurrIf(obj) +DEFobjCurrIf(regexp) void readConfFile(FILE *fp, es_str_t **str) @@ -749,8 +754,7 @@ var2Number(struct var *r, int *bSuccess) return n; } -/* ensure that retval is a string; if string is no number, - * emit error message and set number to 0. +/* ensure that retval is a string */ static inline es_str_t * var2String(struct var *r, int *bMustFree) @@ -774,6 +778,7 @@ doFuncCall(struct cnffunc *func, struct var *ret, void* usrptr) int bMustFree; es_str_t *estr; char *str; + int retval; struct var r[CNFFUNC_MAX_ARGS]; dbgprintf("rainerscript: executing function id %d\n", func->fID); @@ -816,6 +821,7 @@ doFuncCall(struct cnffunc *func, struct var *ret, void* usrptr) es_tolower(estr); ret->datatype = 'S'; ret->d.estr = estr; + if(r[0].datatype == 'S') es_deleteStr(r[0].d.estr); break; case CNFFUNC_CSTR: cnfexprEval(func->expr[0], &r[0], usrptr); @@ -824,6 +830,7 @@ doFuncCall(struct cnffunc *func, struct var *ret, void* usrptr) estr = es_strdup(estr); ret->datatype = 'S'; ret->d.estr = estr; + if(r[0].datatype == 'S') es_deleteStr(r[0].d.estr); break; case CNFFUNC_CNUM: if(func->expr[0]->nodetype == 'N') { @@ -838,6 +845,24 @@ doFuncCall(struct cnffunc *func, struct var *ret, void* usrptr) } ret->datatype = 'N'; break; + case CNFFUNC_RE_MATCH: + cnfexprEval(func->expr[0], &r[0], usrptr); + estr = var2String(&r[0], &bMustFree); + str = es_str2cstr(estr, NULL); + retval = regexp.regexec(func->funcdata, str, 0, NULL, 0); + if(retval == 0) + ret->d.n = 1; + else { + ret->d.n = 0; + if(retval != REG_NOMATCH) { + DBGPRINTF("re_match: regexec returned error %d\n", retval); + } + } + ret->datatype = 'N'; + if(bMustFree) es_deleteStr(estr); + free(str); + if(r[0].datatype == 'S') es_deleteStr(r[0].d.estr); + break; default: if(Debug) { fname = es_str2cstr(func->fname, NULL); @@ -891,7 +916,7 @@ cnfexprEval(struct cnfexpr *expr, struct var *ret, void* usrptr) int bMustFree, bMustFree2; long long n_r, n_l; - //dbgprintf("eval expr %p, type '%c'(%u)\n", expr, expr->nodetype, expr->nodetype); + dbgprintf("eval expr %p, type '%c'(%u)\n", expr, expr->nodetype, expr->nodetype); switch(expr->nodetype) { /* note: comparison operations are extremely similar. The code can be copyied, only * places flagged with "CMP" need to be changed. @@ -1199,6 +1224,77 @@ cnfexprEval(struct cnfexpr *expr, struct var *ret, void* usrptr) } } +//--------------------------------------------------------- + +static inline void +cnffuncDestruct(struct cnffunc *func) +{ + unsigned short i; + + for(i = 0 ; i < func->nParams ; ++i) { + cnfexprDestruct(func->expr[i]); + } + /* some functions require special destruction */ + switch(func->fID) { + case CNFFUNC_RE_MATCH: + regexp.regfree(func->funcdata); + free(func->funcdata); + break; + default:break; + } +} + +/* Destruct an expression and all sub-expressions contained in it. + */ +void +cnfexprDestruct(struct cnfexpr *expr) +{ + + dbgprintf("cnfexprDestruct expr %p, type '%c'(%u)\n", expr, expr->nodetype, expr->nodetype); + switch(expr->nodetype) { + case CMP_NE: + case CMP_EQ: + case CMP_LE: + case CMP_GE: + case CMP_LT: + case CMP_GT: + case CMP_STARTSWITH: + case CMP_STARTSWITHI: + case CMP_CONTAINS: + case CMP_CONTAINSI: + case OR: + case AND: + case '+': + case '-': + case '*': + case '/': + case '%': /* binary */ + cnfexprDestruct(expr->l); + cnfexprDestruct(expr->r); + break; + case NOT: + case 'M': /* unary */ + cnfexprDestruct(expr->r); + break; + case 'N': + break; + case 'S': + es_deleteStr(((struct cnfstringval*)expr)->estr); + break; + case 'V': + free(((struct cnfvar*)expr)->name); + break; + case 'F': + cnffuncDestruct((struct cnffunc*)expr); + break; + default:break; + } + free(expr); +} + +//---- END + + /* Evaluate an expression as a bool. This is added because expressions are * mostly used inside filters, and so this function is quite common and * important. @@ -1488,11 +1584,53 @@ funcName2ID(es_str_t *fname, unsigned short nParams) return CNFFUNC_INVALID; } return CNFFUNC_CNUM; + } else if(!es_strbufcmp(fname, (unsigned char*)"re_match", sizeof("re_match") - 1)) { + if(nParams != 2) { + parser_errmsg("number of parameters for re_match() must be two " + "but is %d.", nParams); + return CNFFUNC_INVALID; + } + return CNFFUNC_RE_MATCH; } else { return CNFFUNC_INVALID; } } + +static inline rsRetVal +initFunc_re_match(struct cnffunc *func) +{ + rsRetVal localRet; + char *regex = NULL; + regex_t *re; + DEFiRet; + + func->funcdata = NULL; + if(func->expr[1]->nodetype != 'S') { + parser_errmsg("param 2 of re_match() must be a constant string"); + FINALIZE; + } + + CHKmalloc(re = malloc(sizeof(regex_t))); + func->funcdata = re; + + regex = es_str2cstr(((struct cnfstringval*) func->expr[1])->estr, NULL); + + if((localRet = objUse(regexp, LM_REGEXP_FILENAME)) == RS_RET_OK) { + if(regexp.regcomp(re, (char*) regex, REG_EXTENDED) != 0) { + parser_errmsg("cannot compile regex '%s'", regex); + ABORT_FINALIZE(RS_RET_ERR); + } + } else { /* regexp object could not be loaded */ + parser_errmsg("could not load regex support - regex ignored"); + ABORT_FINALIZE(RS_RET_ERR); + } + +finalize_it: + free(regex); + RETiRet; +} + struct cnffunc * cnffuncNew(es_str_t *fname, struct cnffparamlst* paramlst) { @@ -1519,6 +1657,14 @@ cnffuncNew(es_str_t *fname, struct cnffparamlst* paramlst) param = param->next; free(toDel); } + /* some functions require special initialization */ + switch(func->fID) { + case CNFFUNC_RE_MATCH: + /* need to compile the regexp in param 2, so this MUST be a constant */ + initFunc_re_match(func); + break; + default:break; + } } return func; } @@ -1616,3 +1762,13 @@ cstrPrint(char *text, es_str_t *estr) dbgprintf("%s%s", text, str); free(str); } + +/* init must be called once before any parsing of the script files start */ +rsRetVal +initRainerscript(void) +{ + DEFiRet; + CHKiRet(objGetObjInterface(&obj)); +finalize_it: + RETiRet; +} diff --git a/grammar/rainerscript.h b/grammar/rainerscript.h index e11ae62f..5ff71bee 100644 --- a/grammar/rainerscript.h +++ b/grammar/rainerscript.h @@ -163,7 +163,8 @@ enum cnffuncid { CNFFUNC_GETENV, CNFFUNC_TOLOWER, CNFFUNC_CSTR, - CNFFUNC_CNUM + CNFFUNC_CNUM, + CNFFUNC_RE_MATCH }; struct cnffunc { @@ -171,6 +172,7 @@ struct cnffunc { es_str_t *fname; unsigned short nParams; enum cnffuncid fID; /* function ID for built-ins, 0 means use name */ + void *funcdata; /* global data for function-specific use (e.g. compiled regex) */ struct cnfexpr *expr[]; }; @@ -233,6 +235,7 @@ struct cnfexpr* cnfexprNew(unsigned nodetype, struct cnfexpr *l, struct cnfexpr void cnfexprPrint(struct cnfexpr *expr, int indent); void cnfexprEval(struct cnfexpr *expr, struct var *ret, void *pusr); int cnfexprEvalBool(struct cnfexpr *expr, void *usrptr); +void cnfexprDestruct(struct cnfexpr *expr); struct cnfnumval* cnfnumvalNew(long long val); struct cnfstringval* cnfstringvalNew(es_str_t *estr); struct cnfrule * cnfruleNew(enum cnfFiltType filttype, struct cnfactlst *actlst); @@ -249,6 +252,7 @@ void cnfparamsPrint(struct cnfparamblk *params, struct cnfparamvals *vals); void varDelete(struct var *v); void cnfparamvalsDestruct(struct cnfparamvals *paramvals, struct cnfparamblk *blk); void cnfcfsyslinelstDestruct(struct cnfcfsyslinelst *cfslst); +rsRetVal initRainerscript(void); /* debug helper */ void cstrPrint(char *text, es_str_t *estr); diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 4fec8e70..0abde84a 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -59,6 +59,7 @@ typedef struct configSettings_s { int iFacility; int iSeverity; int bJSON; + int bCEE; } configSettings_t; struct modConfData_s { @@ -89,6 +90,7 @@ initConfigSettings(void) cs.iFacility = DEFAULT_FACILITY; cs.iSeverity = DEFAULT_SEVERITY; cs.bJSON = 0; + cs.bCEE = 0; } @@ -157,7 +159,13 @@ CODESTARTendCnfLoad loadModConf->iStatsInterval = cs.iStatsInterval; loadModConf->iFacility = cs.iFacility; loadModConf->iSeverity = cs.iSeverity; - loadModConf->statsFmt = cs.bJSON ? statsFmt_JSON : statsFmt_Legacy; + if (cs.bCEE == 1) { + loadModConf->statsFmt = statsFmt_CEE; + } else if (cs.bJSON == 1) { + loadModConf->statsFmt = statsFmt_JSON; + } else { + loadModConf->statsFmt = statsFmt_Legacy; + } ENDendCnfLoad @@ -259,6 +267,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatfacility", 0, eCmdHdlrInt, NULL, &cs.iFacility, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatseverity", 0, eCmdHdlrInt, NULL, &cs.iSeverity, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatjson", 0, eCmdHdlrBinary, NULL, &cs.bJSON, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"pstatcee", 0, eCmdHdlrBinary, NULL, &cs.bCEE, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(prop.Construct(&pInputName)); diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c index 99fabd18..f6040b21 100644 --- a/plugins/imrelp/imrelp.c +++ b/plugins/imrelp/imrelp.c @@ -22,7 +22,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include "config.h" #include <stdlib.h> #include <assert.h> @@ -35,6 +34,7 @@ #include <netdb.h> #include <sys/types.h> #include <sys/socket.h> +#include <signal.h> #include <librelp.h> #include "rsyslog.h" #include "dirty.h" @@ -236,13 +236,39 @@ BEGINfreeCnf CODESTARTfreeCnf ENDfreeCnf +/* This is used to terminate the plugin. Note that the signal handler blocks + * other activity on the thread. As such, it is safe to request the stop. When + * we terminate, relpEngine is called, and it's select() loop interrupted. But + * only *after this function is done*. So we do not have a race! + */ +static void +doSIGTTIN(int __attribute__((unused)) sig) +{ + DBGPRINTF("imrelp: termination requested via SIGTTIN - telling RELP engine\n"); + relpEngineSetStop(pRelpEngine); +} + + /* This function is called to gather input. */ BEGINrunInput + sigset_t sigSet; + struct sigaction sigAct; CODESTARTrunInput - /* TODO: we must be careful to start the listener here. Currently, tcpsrv.c seems to - * do that in ConstructFinalize + /* we want to support non-cancel input termination. To do so, we must signal librelp + * when to stop. As we run on the same thread, we need to register as SIGTTIN handler, + * which will be used to put the terminating condition into librelp. */ + sigfillset(&sigSet); + pthread_sigmask(SIG_BLOCK, &sigSet, NULL); + sigemptyset(&sigSet); + sigaddset(&sigSet, SIGTTIN); + pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL); + memset(&sigAct, 0, sizeof (sigAct)); + sigemptyset(&sigAct.sa_mask); + sigAct.sa_handler = doSIGTTIN; + sigaction(SIGTTIN, &sigAct, NULL); + iRet = relpEngineRun(pRelpEngine); ENDrunInput @@ -284,12 +310,19 @@ resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unus } +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 2d26e652..19028470 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -388,7 +388,7 @@ addListner(instanceConf_t *inst) if(inst->ratelimitInterval > 0) { if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { /* in this case, we simply turn off rate-limiting */ - dbgprintf("imuxsock: turning off rate limiting because we could not " + DBGPRINTF("imuxsock: turning off rate limiting because we could not " "create hash table\n"); inst->ratelimitInterval = 0; } @@ -458,7 +458,7 @@ createLogSocket(lstn_t *pLstn) if(pLstn->fd < 0 || bind(pLstn->fd, (struct sockaddr *) &sunx, SUN_LEN(&sunx)) < 0 || chmod((char*)pLstn->sockName, 0666) < 0) { errmsg.LogError(errno, NO_ERRCODE, "cannot create '%s'", pLstn->sockName); - dbgprintf("cannot create %s (%d).\n", pLstn->sockName, errno); + DBGPRINTF("cannot create %s (%d).\n", pLstn->sockName, errno); if(pLstn->fd != -1) close(pLstn->fd); pLstn->fd = -1; @@ -491,7 +491,7 @@ openLogSocket(lstn_t *pLstn) /* ok, it matches -- just use as is */ pLstn->fd = fd; - dbgprintf("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n", + DBGPRINTF("imuxsock: Acquired UNIX socket '%s' (fd %d) from systemd.\n", pLstn->sockName, pLstn->fd); break; } @@ -563,7 +563,7 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) rl = hashtable_search(pLstn->ht, &cred->pid); if(rl == NULL) { /* we need to add a new ratelimiter, process not seen before! */ - dbgprintf("imuxsock: no ratelimiter for pid %lu, creating one\n", + DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n", (unsigned long) cred->pid); STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters); CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t))); @@ -931,7 +931,7 @@ static rsRetVal readSocket(lstn_t *pLstn) msgh.msg_iovlen = 1; iRcvd = recvmsg(pLstn->fd, &msgh, MSG_DONTWAIT); - dbgprintf("Message from UNIX socket: #%d\n", pLstn->fd); + DBGPRINTF("Message from UNIX socket: #%d\n", pLstn->fd); if(iRcvd > 0) { cred = NULL; ts = NULL; @@ -948,8 +948,6 @@ static rsRetVal readSocket(lstn_t *pLstn) if( pLstn->bUseSysTimeStamp && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SO_TIMESTAMP) { ts = (struct timeval *)CMSG_DATA(cm); - dbgprintf("XXX: got timestamp %ld.%ld\n", - (long) ts->tv_sec, (long) ts->tv_usec); break; } # endif /* HAVE_SO_TIMESTAMP */ @@ -959,7 +957,7 @@ static rsRetVal readSocket(lstn_t *pLstn) } else if(iRcvd < 0 && errno != EINTR) { char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); - dbgprintf("UNIX socket error: %d = %s.\n", errno, errStr); + DBGPRINTF("UNIX socket error: %d = %s.\n", errno, errStr); errmsg.LogError(errno, NO_ERRCODE, "imuxsock: recvfrom UNIX"); } @@ -1025,7 +1023,7 @@ activateListeners() for (i = startIndexUxLocalSockets ; i < nfd ; i++) { if(openLogSocket(&(listeners[i])) == RS_RET_OK) { ++actSocks; - dbgprintf("imuxsock: Opened UNIX socket '%s' (fd %d).\n", + DBGPRINTF("imuxsock: Opened UNIX socket '%s' (fd %d).\n", listeners[i].sockName, listeners[i].fd); } } @@ -1263,7 +1261,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(parser, CORE_COMPONENT)); - dbgprintf("imuxsock version %s initializing\n", PACKAGE_VERSION); + DBGPRINTF("imuxsock version %s initializing\n", PACKAGE_VERSION); /* init legacy config vars */ cs.pLogSockName = NULL; diff --git a/plugins/imzmq3/Makefile.am b/plugins/imzmq3/Makefile.am new file mode 100644 index 00000000..f9c84e5d --- /dev/null +++ b/plugins/imzmq3/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = imzmq3.la + +imzmq3_la_SOURCES = imzmq3.c +imzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS) +imzmq3_la_LDFLAGS = -module -avoid-version +imzmq3_la_LIBADD = $(CZMQ_LIBS) + +EXTRA_DIST = diff --git a/plugins/imzmq3/README b/plugins/imzmq3/README new file mode 100644 index 00000000..88653b83 --- /dev/null +++ b/plugins/imzmq3/README @@ -0,0 +1,24 @@ +ZeroMQ 3.x Input Plugin + +Building this plugin: +Requires libzmq and libczmq. First, install libzmq from the HEAD on github: +http://github.com/zeromq/libzmq. You can clone the repository, build, then +install it. The directions for doing so are there in the readme. Then, do +the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 +version of libzmq will be released, and a supporting version of libczmq. +At that time, you could simply download and install the tarballs instead of +using git to clone the repositories. Those tarballs (when available) can +be found at http://download.zeromq.org. As of this writing (5/31/2012), the +most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile +properly. + +Imzmq3 allows you to push data into rsyslog from a zeromq socket. The example +below binds a SUB socket to port 7172, and then any messages with the topic +"foo" will be pushed into rsyslog. + +Example Rsyslog.conf snippet: +------------------------------------------------------------------------------- + +$InputZmq3ServerRun action=BIND,type=SUB,description=tcp://*:7172,subscribe=foo + +------------------------------------------------------------------------------- diff --git a/plugins/imzmq3/imzmq3.c b/plugins/imzmq3/imzmq3.c new file mode 100644 index 00000000..78eee887 --- /dev/null +++ b/plugins/imzmq3/imzmq3.c @@ -0,0 +1,653 @@ +/* imzmq3.c + * + * This input plugin enables rsyslog to read messages from a ZeroMQ + * queue. + * + * Copyright 2012 Talksum, Inc. + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: David Kelly + * <davidk@talksum.com> + */ + +#include <assert.h> +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "rsyslog.h" + +#include "cfsysline.h" +#include "config.h" +#include "dirty.h" +#include "errmsg.h" +#include "glbl.h" +#include "module-template.h" +#include "msg.h" +#include "net.h" +#include "parser.h" +#include "prop.h" +#include "ruleset.h" +#include "srUtils.h" +#include "unicode-helper.h" + +#include <czmq.h> + +MODULE_TYPE_INPUT +MODULE_TYPE_NOKEEP + +/* convienent symbols to denote a socket we want to bind + * vs one we want to just connect to + */ +#define ACTION_CONNECT 1 +#define ACTION_BIND 2 + +/* Module static data */ +DEF_IMOD_STATIC_DATA +DEFobjCurrIf(errmsg) +DEFobjCurrIf(glbl) +DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) + + +/* ---------------------------------------------------------------------------- + * structs to describe sockets + */ +typedef struct _socket_type { + char* name; + int type; +} socket_type; + +/* more overkill, but seems nice to be consistent.*/ +typedef struct _socket_action { + char* name; + int action; +} socket_action; + +typedef struct _poller_data { + ruleset_t* ruleset; + thrdInfo_t* thread; +} poller_data; + +typedef struct _socket_info { + int type; + int action; + char* description; + int sndHWM; /* if you want more than 2^32 messages, */ + int rcvHWM; /* then pass in 0 (the default). */ + char* identity; + char** subscriptions; + ruleset_t* ruleset; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + +} socket_info; + + +/* ---------------------------------------------------------------------------- + * Static definitions/initializations. + */ +static socket_info* s_socketInfo = NULL; +static size_t s_nitems = 0; +static prop_t * s_namep = NULL; +static zloop_t* s_zloop = NULL; +static int s_io_threads = 1; +static zctx_t* s_context = NULL; +static ruleset_t* s_ruleset = NULL; +static socket_type socketTypes[] = { + {"SUB", ZMQ_SUB }, + {"PULL", ZMQ_PULL }, + {"XSUB", ZMQ_XSUB } +}; + +static socket_action socketActions[] = { + {"BIND", ACTION_BIND}, + {"CONNECT", ACTION_CONNECT}, +}; + + +/* ---------------------------------------------------------------------------- + * Helper functions + */ + +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ +static int getSocketType(char* name) { + int type = -1; + uint i; + + /* match name with known socket type */ + for(i=0; i<sizeof(socketTypes)/sizeof(socket_type); ++i) { + if( !strcmp(socketTypes[i].name, name) ) { + type = socketTypes[i].type; + break; + } + } + + /* whine if no match was found. */ + if (type == -1) + errmsg.LogError(0, NO_ERRCODE, "unknown type %s",name); + + return type; +} + + +static int getSocketAction(char* name) { + int action = -1; + uint i; + + /* match name with known socket action */ + for(i=0; i < sizeof(socketActions)/sizeof(socket_action); ++i) { + if(!strcmp(socketActions[i].name, name)) { + action = socketActions[i].action; + break; + } + } + + /* whine if no matching action was found */ + if (action == -1) + errmsg.LogError(0, NO_ERRCODE, "unknown action %s",name); + + return action; +} + + +static void setDefaults(socket_info* info) { + info->type = ZMQ_SUB; + info->action = ACTION_BIND; + info->description = NULL; + info->sndHWM = 0; + info->rcvHWM = 0; + info->identity = NULL; + info->subscriptions = NULL; + info->ruleset = NULL; + info->sndBuf = -1; + info->rcvBuf = -1; + info->linger = -1; + info->backlog = -1; + info->sndTimeout = -1; + info->rcvTimeout = -1; + info->maxMsgSize = -1; + info->rate = -1; + info->recoveryIVL = -1; + info->multicastHops = -1; + info->reconnectIVL = -1; + info->reconnectIVLMax = -1; + info->ipv4Only = -1; + info->affinity = -1; + +}; + + +/* The config string should look like: + * "action=AAA,type=TTT,description=DDD,sndHWM=SSS,rcvHWM=RRR,subscribe='xxx',subscribe='yyy'" + * + */ +static rsRetVal parseConfig(char* config, socket_info* info) { + int nsubs = 0; + + char* binding; + char* ptr1; + for (binding = strtok_r(config, ",", &ptr1); + binding != NULL; + binding = strtok_r(NULL, ",", &ptr1)) { + + /* Each binding looks like foo=bar */ + char * sep = strchr(binding, '='); + if (sep == NULL) + { + errmsg.LogError(0, NO_ERRCODE, + "Invalid argument format %s, ignoring ...", + binding); + continue; + } + + /* Replace '=' with '\0'. */ + *sep = '\0'; + + char * val = sep + 1; + + if (strcmp(binding, "action") == 0) { + info->action = getSocketAction(val); + } else if (strcmp(binding, "type") == 0) { + info->type = getSocketType(val); + } else if (strcmp(binding, "description") == 0) { + info->description = strdup(val); + } else if (strcmp(binding, "sndHWM") == 0) { + info->sndHWM = atoi(val); + } else if (strcmp(binding, "rcvHWM") == 0) { + info->sndHWM = atoi(val); + } else if (strcmp(binding, "subscribe") == 0) { + /* Add the subscription value to the list.*/ + char * substr = NULL; + substr = strdup(val); + info->subscriptions = realloc(info->subscriptions, sizeof(char *) * nsubs + 1); + info->subscriptions[nsubs] = substr; + ++nsubs; + } else if (strcmp(binding, "sndBuf") == 0) { + info->sndBuf = atoi(val); + } else if (strcmp(binding, "rcvBuf") == 0) { + info->rcvBuf = atoi(val); + } else if (strcmp(binding, "linger") == 0) { + info->linger = atoi(val); + } else if (strcmp(binding, "backlog") == 0) { + info->backlog = atoi(val); + } else if (strcmp(binding, "sndTimeout") == 0) { + info->sndTimeout = atoi(val); + } else if (strcmp(binding, "rcvTimeout") == 0) { + info->rcvTimeout = atoi(val); + } else if (strcmp(binding, "maxMsgSize") == 0) { + info->maxMsgSize = atoi(val); + } else if (strcmp(binding, "rate") == 0) { + info->rate = atoi(val); + } else if (strcmp(binding, "recoveryIVL") == 0) { + info->recoveryIVL = atoi(val); + } else if (strcmp(binding, "multicastHops") == 0) { + info->multicastHops = atoi(val); + } else if (strcmp(binding, "reconnectIVL") == 0) { + info->reconnectIVL = atoi(val); + } else if (strcmp(binding, "reconnectIVLMax") == 0) { + info->reconnectIVLMax = atoi(val); + } else if (strcmp(binding, "ipv4Only") == 0) { + info->ipv4Only = atoi(val); + } else if (strcmp(binding, "affinity") == 0) { + info->affinity = atoi(val); + } else { + errmsg.LogError(0, NO_ERRCODE, "Unknown argument %s", binding); + return RS_RET_INVALID_PARAMS; + } + } + + return RS_RET_OK; +} + +static rsRetVal validateConfig(socket_info* info) { + + if (info->type == -1) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you entered an invalid type"); + return RS_RET_INVALID_PARAMS; + } + if (info->action == -1) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you entered an invalid action"); + return RS_RET_INVALID_PARAMS; + } + if (info->description == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "you didn't enter a description"); + return RS_RET_INVALID_PARAMS; + } + if(info->type == ZMQ_SUB && info->subscriptions == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "SUB sockets need at least one subscription"); + return RS_RET_INVALID_PARAMS; + } + if(info->type != ZMQ_SUB && info->subscriptions != NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "only SUB sockets can have subscriptions"); + return RS_RET_INVALID_PARAMS; + } + return RS_RET_OK; +} + +static rsRetVal createContext() { + if (s_context == NULL) { + errmsg.LogError(0, NO_ERRCODE, "creating zctx."); + s_context = zctx_new(); + + if (s_context == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, + "zctx_new failed: %s", + strerror(errno)); + /* DK: really should do better than invalid params...*/ + return RS_RET_INVALID_PARAMS; + } + + if (s_io_threads > 1) { + errmsg.LogError(0, NO_ERRCODE, "setting io worker threads to %d", s_io_threads); + zctx_set_iothreads(s_context, s_io_threads); + } + } + return RS_RET_OK; +} + +static rsRetVal createSocket(socket_info* info, void** sock) { + size_t ii; + int rv; + + *sock = zsocket_new(s_context, info->type); + if (!sock) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zsocket_new failed: %s, for type %d", + strerror(errno),info->type); + /* DK: invalid params seems right here */ + return RS_RET_INVALID_PARAMS; + } + + /* Set options *before* the connect/bind. */ + if (info->identity) zsocket_set_identity(*sock, info->identity); + if (info->sndBuf > -1) zsocket_set_sndbuf(*sock, info->sndBuf); + if (info->rcvBuf > -1) zsocket_set_rcvbuf(*sock, info->rcvBuf); + if (info->linger > -1) zsocket_set_linger(*sock, info->linger); + if (info->backlog > -1) zsocket_set_backlog(*sock, info->backlog); + if (info->sndTimeout > -1) zsocket_set_sndtimeo(*sock, info->sndTimeout); + if (info->rcvTimeout > -1) zsocket_set_rcvtimeo(*sock, info->rcvTimeout); + if (info->maxMsgSize > -1) zsocket_set_maxmsgsize(*sock, info->maxMsgSize); + if (info->rate > -1) zsocket_set_rate(*sock, info->rate); + if (info->recoveryIVL > -1) zsocket_set_recovery_ivl(*sock, info->recoveryIVL); + if (info->multicastHops > -1) zsocket_set_multicast_hops(*sock, info->multicastHops); + if (info->reconnectIVL > -1) zsocket_set_reconnect_ivl(*sock, info->reconnectIVL); + if (info->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(*sock, info->reconnectIVLMax); + if (info->ipv4Only > -1) zsocket_set_ipv4only(*sock, info->ipv4Only); + if (info->affinity > -1) zsocket_set_affinity(*sock, info->affinity); + + /* since HWM have defaults, we always set them. No return codes to check, either.*/ + zsocket_set_sndhwm(*sock, info->sndHWM); + zsocket_set_rcvhwm(*sock, info->rcvHWM); + + /* Set subscriptions.*/ + for (ii = 0; ii < sizeof(info->subscriptions)/sizeof(char*); ++ii) + zsocket_set_subscribe(*sock, info->subscriptions[ii]); + + + + /* Do the bind/connect... */ + if (info->action==ACTION_CONNECT) { + rv = zsocket_connect(*sock, info->description); + if (rv < 0) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zmq_connect using %s failed: %s", + info->description, strerror(errno)); + return RS_RET_INVALID_PARAMS; + } + } else { + rv = zsocket_bind(*sock, info->description); + if (rv <= 0) { + errmsg.LogError(0, + RS_RET_INVALID_PARAMS, + "zmq_bind using %s failed: %s", + info->description, strerror(errno)); + return RS_RET_INVALID_PARAMS; + } + } + return RS_RET_OK; +} + +/* ---------------------------------------------------------------------------- + * Module endpoints + */ + +/* accept a new ruleset to bind. Checks if it exists and complains, if not. Note + * that this makes the assumption that after the bind ruleset is called in the config, + * another call will be made to add an endpoint. +*/ +static rsRetVal +set_ruleset(void __attribute__((unused)) *pVal, uchar *pszName) { + ruleset_t* ruleset_ptr; + rsRetVal localRet; + DEFiRet; + + localRet = ruleset.GetRuleset(ourConf, &ruleset_ptr, pszName); + if(localRet == RS_RET_NOT_FOUND) { + errmsg.LogError(0, NO_ERRCODE, "error: " + "ruleset '%s' not found - ignored", pszName); + } + CHKiRet(localRet); + s_ruleset = ruleset_ptr; + DBGPRINTF("imzmq3 current bind ruleset '%s'\n", pszName); + +finalize_it: + free(pszName); /* no longer needed */ + RETiRet; +} + +/* add an actual endpoint + */ +static rsRetVal add_endpoint(void __attribute__((unused)) * oldp, uchar * valp) { + DEFiRet; + + /* increment number of items and store old num items, as it will be handy.*/ + size_t idx = s_nitems++; + + /* allocate a new socket_info array to accomidate this new endpoint*/ + socket_info* tmpSocketInfo; + CHKmalloc(tmpSocketInfo = (socket_info*)MALLOC(sizeof(socket_info) * s_nitems)); + + /* copy existing socket_info across into new array, if any, and free old storage*/ + if(idx) { + memcpy(tmpSocketInfo, s_socketInfo, sizeof(socket_info) * idx); + free(s_socketInfo); + } + + /* set the static to hold the new array */ + s_socketInfo = tmpSocketInfo; + + /* point to the new one */ + socket_info* sockInfo = &s_socketInfo[idx]; + + /* set defaults for the new socket info */ + setDefaults(sockInfo); + + /* Make a writeable copy of the string so we can use strtok + in the parseConfig call */ + char * copy = NULL; + CHKmalloc(copy = strdup((char *) valp)); + + /* parse the config string */ + CHKiRet(parseConfig(copy, sockInfo)); + + /* validate it */ + CHKiRet(validateConfig(sockInfo)); + + /* bind to the current ruleset (if any)*/ + sockInfo->ruleset = s_ruleset; + +finalize_it: + free(valp); /* in any case, this is no longer needed */ + RETiRet; +} + + +static int handlePoll(zloop_t __attribute__((unused)) * loop, zmq_pollitem_t *poller, void* pd) { + msg_t* logmsg; + poller_data* pollerData = (poller_data*)pd; + + char* buf = zstr_recv(poller->socket); + if (msgConstruct(&logmsg) == RS_RET_OK) { + MsgSetRawMsg(logmsg, buf, strlen(buf)); + MsgSetInputName(logmsg, s_namep); + MsgSetFlowControlType(logmsg, eFLOWCTL_NO_DELAY); + MsgSetRuleset(logmsg, pollerData->ruleset); + logmsg->msgFlags = NEEDS_PARSING; + submitMsg(logmsg); + } + + if( pollerData->thread->bShallStop == TRUE) { + /* a handler that returns -1 will terminate the + czmq reactor loop + */ + return -1; + } + + return 0; +} + +/* called when runInput is called by rsyslog + */ +static rsRetVal rcv_loop(thrdInfo_t* pThrd){ + size_t i; + int rv; + zmq_pollitem_t* items; + poller_data* pollerData; + + DEFiRet; + + /* create the context*/ + CHKiRet(createContext()); + + /* create the poll items*/ + CHKmalloc(items = (zmq_pollitem_t*)MALLOC(sizeof(zmq_pollitem_t)*s_nitems)); + + /* create poller data (stuff to pass into the zmq closure called when we get a message)*/ + CHKmalloc(pollerData = (poller_data*)MALLOC(sizeof(poller_data)*s_nitems)); + + /* loop through and initialize the poll items and poller_data arrays...*/ + for(i=0; i<s_nitems;++i) { + /* create the socket, update items.*/ + createSocket(&s_socketInfo[i], &items[i].socket); + items[i].events = ZMQ_POLLIN; + + /* now update the poller_data for this item */ + pollerData[i].thread = pThrd; + pollerData[i].ruleset = s_socketInfo[i].ruleset; + } + + s_zloop = zloop_new(); + for(i=0; i<s_nitems; ++i) { + + rv = zloop_poller(s_zloop, &items[i], handlePoll, &pollerData[i]); + if (rv) { + errmsg.LogError(0, NO_ERRCODE, "imzmq3: zloop_poller failed for item %zu", i); + } + } + zloop_start(s_zloop); + zloop_destroy(&s_zloop); + finalize_it: + for(i=0; i< s_nitems; ++i) { + zsocket_destroy(s_context, items[i].socket); + } + + zctx_destroy(&s_context); + + free(items); + RETiRet; +} + +/* ---------------------------------------------------------------------------- + * input module functions + */ + +BEGINrunInput +CODESTARTrunInput + iRet = rcv_loop(pThrd); +ENDrunInput + + +/* initialize and return if will run or not */ +BEGINwillRun +CODESTARTwillRun + /* we need to create the inputName property (only once during our + lifetime) */ + CHKiRet(prop.Construct(&s_namep)); + CHKiRet(prop.SetString(s_namep, + UCHAR_CONSTANT("imzmq3"), + sizeof("imzmq3") - 1)); + CHKiRet(prop.ConstructFinalize(s_namep)); + +/* If there are no endpoints this is pointless ...*/ + if (s_nitems == 0) + ABORT_FINALIZE(RS_RET_NO_RUN); + +finalize_it: +ENDwillRun + + +BEGINafterRun +CODESTARTafterRun + /* do cleanup here */ + if(s_namep != NULL) + prop.Destruct(&s_namep); +ENDafterRun + + +BEGINmodExit +CODESTARTmodExit + /* release what we no longer need */ + objRelease(errmsg, CORE_COMPONENT); + objRelease(glbl, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); +ENDmodExit + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES +ENDqueryEtryPt + +static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, + void __attribute__((unused)) *pVal) { + return RS_RET_OK; +} +static rsRetVal setGlobalWorkerThreads(uchar __attribute__((unused)) *pp, int val) { + errmsg.LogError(0, NO_ERRCODE, "setGlobalWorkerThreads called with %d",val); + s_io_threads = val; + return RS_RET_OK; +} + +BEGINmodInit() +CODESTARTmodInit + /* we only support the current interface specification */ + *ipIFVersProvided = CURR_MOD_IF_VERSION; +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + CHKiRet(objUse(glbl, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); + + /* register config file handlers */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverbindruleset", + 0, eCmdHdlrGetWord, + set_ruleset, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3serverrun", + 0, eCmdHdlrGetWord, + add_endpoint, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", + 1, eCmdHdlrCustomHandler, + resetConfigVariables, NULL, + STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputzmq3globalWorkerThreads", + 1, eCmdHdlrInt, + setGlobalWorkerThreads, NULL, + STD_LOADABLE_MODULE_ID)); +ENDmodInit diff --git a/plugins/omzmq3/Makefile.am b/plugins/omzmq3/Makefile.am new file mode 100644 index 00000000..92cd7586 --- /dev/null +++ b/plugins/omzmq3/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omzmq3.la + +omzmq3_la_SOURCES = omzmq3.c +omzmq3_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CZMQ_CFLAGS) +omzmq3_la_LDFLAGS = -module -avoid-version +omzmq3_la_LIBADD = $(CZMQ_LIBS) + +EXTRA_DIST = diff --git a/plugins/omzmq3/README b/plugins/omzmq3/README new file mode 100644 index 00000000..ccc96c74 --- /dev/null +++ b/plugins/omzmq3/README @@ -0,0 +1,25 @@ +ZeroMQ 3.x Output Plugin + +Building this plugin: +Requires libzmq and libczmq. First, install libzmq from the HEAD on github: +http://github.com/zeromq/libzmq. You can clone the repository, build, then +install it. The directions for doing so are there in the readme. Then, do +the same for libczmq: http://github.com/zeromq/czmq. At some point, the 3.1 +version of libzmq will be released, and a supporting version of libczmq. +At that time, you could simply download and install the tarballs instead of +using git to clone the repositories. Those tarballs (when available) can +be found at http://download.zeromq.org. As of this writing (5/31/2012), the +most recent version of czmq (1.1.0) and libzmq (3.1.0-beta) will not compile +properly. + +Omzmq3 allows you to push data out of rsyslog from a zeromq socket. The example +below binds a PUB socket to port 7171, and any message fitting the criteria will +be output to the zmq socket. + +Example Rsyslog.conf snippet (NOTE: v6 format): +------------------------------------------------------------------------------- +if $msg then { + action(type="omzmq3", sockType="PUB", action="BIND", + description="tcp://*:7172) +} +------------------------------------------------------------------------------- diff --git a/plugins/omzmq3/omzmq3.c b/plugins/omzmq3/omzmq3.c new file mode 100644 index 00000000..e13011fb --- /dev/null +++ b/plugins/omzmq3/omzmq3.c @@ -0,0 +1,462 @@ +/* omzmq3.c + * Copyright 2012 Talksum, Inc + * Using the czmq interface to zeromq, we output + * to a zmq socket. + + +* +* This program is free software: you can redistribute it and/or +* modify it under the terms of the GNU Lesser General Public License +* as published by the Free Software Foundation, either version 3 of +* the License, or (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, but +* WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +* Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public +* License along with this program. If not, see +* <http://www.gnu.org/licenses/>. +* +* Author: David Kelly +* <davidk@talksum.com> +*/ + + +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <unistd.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "cfsysline.h" + +#include <czmq.h> + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omzmq3") + +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) + +/* convienent symbols to denote a socket we want to bind + vs one we want to just connect to +*/ +#define ACTION_CONNECT 1 +#define ACTION_BIND 2 + + +/* ---------------------------------------------------------------------------- + * structs to describe sockets + */ +struct socket_type { + char* name; + int type; +}; + +/* more overkill, but seems nice to be consistent. */ +struct socket_action { + char* name; + int action; +}; + +typedef struct _instanceData { + void* socket; + uchar* description; + int type; + int action; + int sndHWM; + int rcvHWM; + uchar* identity; + int sndBuf; + int rcvBuf; + int linger; + int backlog; + int sndTimeout; + int rcvTimeout; + int maxMsgSize; + int rate; + int recoveryIVL; + int multicastHops; + int reconnectIVL; + int reconnectIVLMax; + int ipv4Only; + int affinity; + uchar* tplName; +} instanceData; + + +/* ---------------------------------------------------------------------------- + * Static definitions/initializations + */ + +/* only 1 zctx for all the sockets, with an adjustable number of + worker threads which may be useful if we use affinity in particular + sockets +*/ +static zctx_t* s_context = NULL; +static int s_workerThreads = -1; + +static struct socket_type types[] = { + {"PUB", ZMQ_PUB }, + {"PUSH", ZMQ_PUSH }, + {"XPUB", ZMQ_XPUB } +}; + +static struct socket_action actions[] = { + {"BIND", ACTION_BIND}, + {"CONNECT", ACTION_CONNECT}, +}; + +static struct cnfparamdescr actpdescr[] = { + { "description", eCmdHdlrGetWord, 0 }, + { "sockType", eCmdHdlrGetWord, 0 }, + { "action", eCmdHdlrGetWord, 0 }, + { "sndHWM", eCmdHdlrInt, 0 }, + { "rcvHWM", eCmdHdlrInt, 0 }, + { "identity", eCmdHdlrGetWord, 0 }, + { "sndBuf", eCmdHdlrInt, 0 }, + { "rcvBuf", eCmdHdlrInt, 0 }, + { "linger", eCmdHdlrInt, 0 }, + { "backlog", eCmdHdlrInt, 0 }, + { "sndTimeout", eCmdHdlrInt, 0 }, + { "rcvTimeout", eCmdHdlrInt, 0 }, + { "maxMsgSize", eCmdHdlrInt, 0 }, + { "rate", eCmdHdlrInt, 0 }, + { "recoveryIVL", eCmdHdlrInt, 0 }, + { "multicastHops", eCmdHdlrInt, 0 }, + { "reconnectIVL", eCmdHdlrInt, 0 }, + { "reconnectIVLMax", eCmdHdlrInt, 0 }, + { "ipv4Only", eCmdHdlrInt, 0 }, + { "affinity", eCmdHdlrInt, 0 }, + { "globalWorkerThreads", eCmdHdlrInt, 0 }, + { "template", eCmdHdlrGetWord, 1 } +}; + +static struct cnfparamblk actpblk = { + CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr +}; + +/* ---------------------------------------------------------------------------- + * Helper Functions + */ + +/* get the name of a socket type, return the ZMQ_XXX type + or -1 if not a supported type (see above) +*/ +int getSocketType(char* name) { + int type = -1; + uint i; + for(i=0; i<sizeof(types)/sizeof(struct socket_type); ++i) { + if( !strcmp(types[i].name, name) ) { + type = types[i].type; + break; + } + } + return type; +} + + +static int getSocketAction(char* name) { + int action = -1; + uint i; + for(i=0; i < sizeof(actions)/sizeof(struct socket_action); ++i) { + if(!strcmp(actions[i].name, name)) { + action = actions[i].action; + break; + } + } + return action; +} + +/* closeZMQ will destroy the context and + * associated socket + */ +static void closeZMQ(instanceData* pData) { + errmsg.LogError(0, NO_ERRCODE, "closeZMQ called"); + if(s_context && pData->socket) { + if(pData->socket != NULL) { + zsocket_destroy(s_context, pData->socket); + } + } +} + + +static rsRetVal initZMQ(instanceData* pData) { + DEFiRet; + + /* create the context if necessary. */ + if (NULL == s_context) { + s_context = zctx_new(); + if (s_workerThreads > 0) zctx_set_iothreads(s_context, s_workerThreads); + } + + pData->socket = zsocket_new(s_context, pData->type); + + /* ALWAYS set the HWM as the zmq3 default is 1000 and we default + to 0 (infinity) */ + zsocket_set_rcvhwm(pData->socket, pData->rcvHWM); + zsocket_set_sndhwm(pData->socket, pData->sndHWM); + + /* use czmq defaults for these, unless set to non-default values */ + if(pData->identity) zsocket_set_identity(pData->socket, (char*)pData->identity); + if(pData->sndBuf > -1) zsocket_set_sndbuf(pData->socket, pData->sndBuf); + if(pData->rcvBuf > -1) zsocket_set_sndbuf(pData->socket, pData->rcvBuf); + if(pData->linger > -1) zsocket_set_linger(pData->socket, pData->linger); + if(pData->backlog > -1) zsocket_set_backlog(pData->socket, pData->backlog); + if(pData->sndTimeout > -1) zsocket_set_sndtimeo(pData->socket, pData->sndTimeout); + if(pData->rcvTimeout > -1) zsocket_set_rcvtimeo(pData->socket, pData->rcvTimeout); + if(pData->maxMsgSize > -1) zsocket_set_maxmsgsize(pData->socket, pData->maxMsgSize); + if(pData->rate > -1) zsocket_set_rate(pData->socket, pData->rate); + if(pData->recoveryIVL > -1) zsocket_set_recovery_ivl(pData->socket, pData->recoveryIVL); + if(pData->multicastHops > -1) zsocket_set_multicast_hops(pData->socket, pData->multicastHops); + if(pData->reconnectIVL > -1) zsocket_set_reconnect_ivl(pData->socket, pData->reconnectIVL); + if(pData->reconnectIVLMax > -1) zsocket_set_reconnect_ivl_max(pData->socket, pData->reconnectIVLMax); + if(pData->ipv4Only > -1) zsocket_set_ipv4only(pData->socket, pData->ipv4Only); + if(pData->affinity != 1) zsocket_set_affinity(pData->socket, pData->affinity); + + /* bind or connect to it */ + if (pData->action == ACTION_BIND) { + /* bind asserts, so no need to test return val here + which isn't the greatest api -- oh well */ + zsocket_bind(pData->socket, (char*)pData->description); + } else { + if(zsocket_connect(pData->socket, (char*)pData->description) == -1) { + errmsg.LogError(0, RS_RET_SUSPENDED, "omzmq3: connect failed!"); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + } + finalize_it: + RETiRet; +} + +rsRetVal writeZMQ(uchar* msg, instanceData* pData) { + DEFiRet; + + /* initialize if necessary */ + if(NULL == pData->socket) + CHKiRet(initZMQ(pData)); + + /* send it */ + int result = zstr_send(pData->socket, (char*)msg); + + /* whine if things went wrong */ + if (result == -1) { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: send of %s failed with return %d", msg, result); + ABORT_FINALIZE(RS_RET_ERR); + } + finalize_it: + RETiRet; +} + +static inline void +setInstParamDefaults(instanceData* pData) { + pData->description = (uchar*)"tcp://*:7171"; + pData->socket = NULL; + pData->tplName = NULL; + pData->type = ZMQ_PUB; + pData->action = ACTION_BIND; + pData->sndHWM = 0; /*unlimited*/ + pData->rcvHWM = 0; /*unlimited*/ + pData->identity = NULL; + pData->sndBuf = -1; + pData->rcvBuf = -1; + pData->linger = -1; + pData->backlog = -1; + pData->sndTimeout = -1; + pData->rcvTimeout = -1; + pData->maxMsgSize = -1; + pData->rate = -1; + pData->recoveryIVL = -1; + pData->multicastHops = -1; + pData->reconnectIVL = -1; + pData->reconnectIVLMax = -1; + pData->ipv4Only = -1; + pData->affinity = 1; +} + + +/* ---------------------------------------------------------------------------- + * Output Module Functions + */ + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo +ENDdbgPrintInstInfo + +BEGINfreeInstance +CODESTARTfreeInstance + closeZMQ(pData); + free(pData->description); + free(pData->tplName); +ENDfreeInstance + +BEGINtryResume +CODESTARTtryResume + if(NULL == pData->socket) + iRet = initZMQ(pData); +ENDtryResume + +BEGINdoAction +CODESTARTdoAction +iRet = writeZMQ(ppString[0], pData); +ENDdoAction + + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst +if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + +CHKiRet(createInstance(&pData)); +setInstParamDefaults(pData); + +CODE_STD_STRING_REQUESTparseSelectorAct(1) +for(i = 0 ; i < actpblk.nParams ; ++i) { + if(!pvals[i].bUsed) + continue; + if(!strcmp(actpblk.descr[i].name, "description")) { + pData->description = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "sockType")){ + pData->type = getSocketType(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(actpblk.descr[i].name, "action")){ + pData->action = getSocketAction(es_str2cstr(pvals[i].val.d.estr, NULL)); + } else if(!strcmp(actpblk.descr[i].name, "sndHWM")) { + pData->sndHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvHWM")) { + pData->rcvHWM = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "identity")){ + pData->identity = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "sndBuf")) { + pData->sndBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvBuf")) { + pData->rcvBuf = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "linger")) { + pData->linger = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "backlog")) { + pData->backlog = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "sndTimeout")) { + pData->sndTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rcvTimeout")) { + pData->rcvTimeout = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "maxMsgSize")) { + pData->maxMsgSize = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "rate")) { + pData->rate = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "recoveryIVL")) { + pData->recoveryIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "multicastHops")) { + pData->multicastHops = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "reconnectIVL")) { + pData->reconnectIVL = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "reconnectIVLMax")) { + pData->reconnectIVLMax = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "ipv4Only")) { + pData->ipv4Only = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "affinity")) { + pData->affinity = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "globalWorkerThreads")) { + s_workerThreads = (int) pvals[i].val.d.n; + } else { + errmsg.LogError(0, NO_ERRCODE, "omzmq3: program error, non-handled " + "param '%s'\n", actpblk.descr[i].name); + } + } + +if(pData->tplName == NULL) { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG)); + } else { + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS)); + } + +if(pData->type == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket type."); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } +if(pData->action == -1) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, "omzmq3: unknown socket action"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + +BEGINparseSelectorAct +CODESTARTparseSelectorAct + +/* tell the engine we only want one template string */ +CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omzmq3:", sizeof(":omzmq3:") - 1)) + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omzmq3 supports only v6 config format, use: " + "action(type=\"omzmq3\" serverport=...)"); + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + +BEGINinitConfVars /* (re)set config variables to defaults */ +CODESTARTinitConfVars +s_workerThreads = -1; +ENDinitConfVars + +BEGINmodExit +CODESTARTmodExit +if(NULL != s_context) { + zctx_destroy(&s_context); + s_context=NULL; + } +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt +CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +ENDqueryEtryPt + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* only supports rsyslog 6 configs */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); + INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); + DBGPRINTF("omzmq3: module compiled with rsyslog version %s.\n", VERSION); + +INITLegCnfVars +CHKiRet(omsdRegCFSLineHdlr((uchar *)"omzmq3workerthreads", 0, eCmdHdlrInt, NULL, &s_workerThreads, STD_LOADABLE_MODULE_ID)); +ENDmodInit + + + diff --git a/runtime/debug.h b/runtime/debug.h index 26672c3e..5bd26bd8 100644 --- a/runtime/debug.h +++ b/runtime/debug.h @@ -106,8 +106,13 @@ void dbgPrintAllDebugInfo(void); void *dbgmalloc(size_t size); /* macros */ -#define DBGPRINTF(...) if(Debug) { dbgprintf(__VA_ARGS__); } -#define DBGOPRINT(...) if(Debug) { dbgoprint(__VA_ARGS__); } +#ifdef DEBUGLESS +# define DBGPRINTF(...) {} +# define DBGOPRINT(...) {} +#else +# define DBGPRINTF(...) if(Debug) { dbgprintf(__VA_ARGS__); } +# define DBGOPRINT(...) if(Debug) { dbgoprint(__VA_ARGS__); } +#endif #ifdef RTINST # define BEGINfunc static dbgFuncDB_t *pdbgFuncDB; int dbgCALLStaCK_POP_POINT = dbgEntrFunc(&pdbgFuncDB, __FILE__, __func__, __LINE__); # define ENDfunc dbgExitFunc(pdbgFuncDB, dbgCALLStaCK_POP_POINT, RS_RET_NO_IRET); diff --git a/runtime/parser.c b/runtime/parser.c index a79f2ce8..f0515484 100644 --- a/runtime/parser.c +++ b/runtime/parser.c @@ -180,7 +180,7 @@ AddDfltParser(uchar *pName) CHKiRet(FindParser(&pParser, pName)); CHKiRet(AddParserToList(&pDfltParsLst, pParser)); - dbgprintf("Parser '%s' added to default parser set.\n", pName); + DBGPRINTF("Parser '%s' added to default parser set.\n", pName); finalize_it: RETiRet; @@ -209,7 +209,7 @@ finalize_it: BEGINobjDestruct(parser) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(parser) - dbgprintf("destructing parser '%s'\n", pThis->pName); + DBGPRINTF("destructing parser '%s'\n", pThis->pName); free(pThis->pName); ENDobjDestruct(parser) @@ -521,7 +521,7 @@ ParseMsg(msg_t *pMsg) bIsSanitized = TRUE; } localRet = pParser->pModule->mod.pm.parse(pMsg); - dbgprintf("Parser '%s' returned %d\n", pParser->pName, localRet); + DBGPRINTF("Parser '%s' returned %d\n", pParser->pName, localRet); if(localRet != RS_RET_COULD_NOT_PARSE) break; pParserList = pParserList->pNext; diff --git a/runtime/queue.c b/runtime/queue.c index a2f80d29..34935403 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -131,7 +131,7 @@ static void displayBatchState(batch_t *pBatch) { int i; for(i = 0 ; i < pBatch->nElem ; ++i) { - dbgprintf("XXXXX: displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); + DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); } } @@ -1418,7 +1418,8 @@ DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ ATOMIC_SUB(&pThis->iQueueSize, nElem, &pThis->mutQueueSize); ATOMIC_SUB(&pThis->nLogDeq, nElem, &pThis->mutLogDeq); -dbgprintf("delete batch from store, new sizes: log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); + DBGPRINTF("delete batch from store, new sizes: log %d, phys %d\n", + getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); ++pThis->deqIDDel; /* one more batch dequeued */ RETiRet; @@ -1454,7 +1455,7 @@ DeleteBatchFromQStore(qqueue_t *pThis, batch_t *pBatch) DoDeleteBatchFromQStore(pThis, pBatch->nElem); } else { /* can not delete, insert into to-delete list */ - dbgprintf("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); + DBGPRINTF("not at head of to-delete list, enqueue %d\n", (int) pBatch->deqID); CHKiRet(tdlAdd(pThis, pBatch->deqID, pBatch->nElem)); } @@ -1484,7 +1485,6 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) pUsr = pBatch->pElem[i].pUsrp; if( pBatch->pElem[i].state == BATCH_STATE_RDY || pBatch->pElem[i].state == BATCH_STATE_SUB) { -dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch->nElem, pBatch->pElem[i].state); localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*) pUsr)); ++nEnqueued; @@ -1495,7 +1495,7 @@ dbgprintf("XXX: DeleteProcessedBatch re-enqueue %d of %d, state %d\n", i, pBatch objDestruct(pUsr); } - dbgprintf("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); + DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); if(nEnqueued > 0) qqueueChkPersist(pThis, nEnqueued); @@ -2680,7 +2680,7 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals) } else if(!strcmp(pblk.descr[i].name, "queuedequeuetimend.")) { pThis->iDeqtWinToHr = pvals[i].val.d.n; } else { - dbgprintf("queue: program error, non-handled " + DBGPRINTF("queue: program error, non-handled " "param '%s'\n", pblk.descr[i].name); } } diff --git a/runtime/rsconf.c b/runtime/rsconf.c index a8c1b4b9..c1af7d22 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -686,9 +686,10 @@ runInputModules(void) node = module.GetNxtCnfType(runConf, NULL, eMOD_IN); while(node != NULL) { if(node->canRun) { - DBGPRINTF("running module %s with config %p\n", node->pMod->pszName, node); bNeedsCancel = (node->pMod->isCompatibleWithFeature(sFEATURENonCancelInputTermination) == RS_RET_OK) ? 0 : 1; + DBGPRINTF("running module %s with config %p, term mode: %s\n", node->pMod->pszName, node, + bNeedsCancel ? "cancel" : "cooperative/SIGTTIN"); thrdCreate(node->pMod->mod.im.runInput, node->pMod->mod.im.afterRun, bNeedsCancel, (node->pMod->cnfName == NULL) ? node->pMod->pszName : node->pMod->cnfName); } diff --git a/runtime/rule.c b/runtime/rule.c index 18199230..254f2f10 100644 --- a/runtime/rule.c +++ b/runtime/rule.c @@ -132,14 +132,14 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg) } else if(pRule->eHostnameCmpMode == HN_COMP_MATCH) { if(rsCStrSzStrCmp(pRule->pCSHostnameComp, (uchar*) getHOSTNAME(pMsg), getHOSTNAMELen(pMsg))) { /* not equal, so we are already done... */ - dbgprintf("hostname filter '+%s' does not match '%s'\n", + DBGPRINTF("hostname filter '+%s' does not match '%s'\n", rsCStrGetSzStrNoNULL(pRule->pCSHostnameComp), getHOSTNAME(pMsg)); FINALIZE; } } else { /* must be -hostname */ if(!rsCStrSzStrCmp(pRule->pCSHostnameComp, (uchar*) getHOSTNAME(pMsg), getHOSTNAMELen(pMsg))) { - /* not equal, so we are already done... */ - dbgprintf("hostname filter '-%s' does not match '%s'\n", + /* not equal, SO WE ARe already done... */ + DBGPRINTF("hostname filter '-%s' does not match '%s'\n", rsCStrGetSzStrNoNULL(pRule->pCSHostnameComp), getHOSTNAME(pMsg)); FINALIZE; } @@ -171,7 +171,7 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg) if(pRule->f_filter_type == FILTER_PRI) { /* skip messages that are incorrect priority */ - dbgprintf("testing filter, f_pmask %d\n", pRule->f_filterData.f_pmask[pMsg->iFacility]); + DBGPRINTF("testing filter, f_pmask %d\n", pRule->f_filterData.f_pmask[pMsg->iFacility]); if ( (pRule->f_filterData.f_pmask[pMsg->iFacility] == TABLE_NOPRI) || \ ((pRule->f_filterData.f_pmask[pMsg->iFacility] & (1<<pMsg->iSeverity)) == 0) ) bRet = 0; @@ -179,7 +179,7 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg) bRet = 1; } else if(pRule->f_filter_type == FILTER_EXPR) { bRet = cnfexprEvalBool(pRule->f_filterData.expr, pMsg); - dbgprintf("result of rainerscript filter evaluation: %d\n", bRet); + DBGPRINTF("result of rainerscript filter evaluation: %d\n", bRet); } else { assert(pRule->f_filter_type == FILTER_PROP); /* assert() just in case... */ pszPropVal = MsgGetProp(pMsg, NULL, pRule->f_filterData.prop.propID, @@ -230,21 +230,21 @@ shouldProcessThisMessage(rule_t *pRule, msg_t *pMsg, sbool *bProcessMsg) char *cstr; if(pRule->f_filterData.prop.propID == PROP_CEE) { cstr = es_str2cstr(pRule->f_filterData.prop.propName, NULL); - dbgprintf("Filter: check for CEE property '%s' (value '%s') ", + DBGPRINTF("Filter: check for CEE property '%s' (value '%s') ", cstr, pszPropVal); free(cstr); } else { - dbgprintf("Filter: check for property '%s' (value '%s') ", + DBGPRINTF("Filter: check for property '%s' (value '%s') ", propIDToName(pRule->f_filterData.prop.propID), pszPropVal); } if(pRule->f_filterData.prop.isNegated) - dbgprintf("NOT "); + DBGPRINTF("NOT "); if(pRule->f_filterData.prop.operation == FIOP_ISEMPTY) { - dbgprintf("%s : %s\n", + DBGPRINTF("%s : %s\n", getFIOPName(pRule->f_filterData.prop.operation), bRet ? "TRUE" : "FALSE"); } else { - dbgprintf("%s '%s': %s\n", + DBGPRINTF("%s '%s': %s\n", getFIOPName(pRule->f_filterData.prop.operation), rsCStrGetSzStrNoNULL(pRule->f_filterData.prop.pCSCompValue), bRet ? "TRUE" : "FALSE"); @@ -335,7 +335,10 @@ CODESTARTobjDestruct(rule) rsCStrRegexDestruct(&pThis->f_filterData.prop.regex_cache); if(pThis->f_filterData.prop.propName != NULL) es_deleteStr(pThis->f_filterData.prop.propName); + } else if(pThis->f_filter_type == FILTER_EXPR) { + cnfexprDestruct(pThis->f_filterData.expr); } +#warning: need to destroy expression based filter! llDestroy(&pThis->llActList); ENDobjDestruct(rule) diff --git a/runtime/ruleset.c b/runtime/ruleset.c index c384663a..ecded4a3 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -143,9 +143,9 @@ DEFFUNC_llExecFunc(processBatchDoRules) { rsRetVal iRet; ISOBJ_TYPE_assert(pData, rule); - dbgprintf("Processing next rule\n"); + DBGPRINTF("Processing next rule\n"); iRet = rule.ProcessBatch((rule_t*) pData, (batch_t*) pParam); -dbgprintf("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet); + DBGPRINTF("ruleset: get iRet %d from rule.ProcessMsg()\n", iRet); return iRet; } @@ -266,7 +266,7 @@ addRule(ruleset_t *pThis, rule_t **ppRule) rule.Destruct(ppRule); } else { CHKiRet(llAppend(&pThis->llRules, NULL, *ppRule)); - dbgprintf("selector line successfully processed, %d actions\n", iActionCnt); + DBGPRINTF("selector line successfully processed, %d actions\n", iActionCnt); } finalize_it: @@ -337,7 +337,7 @@ SetDefaultRuleset(rsconf_t *conf, uchar *pszName) CHKiRet(rulesetGetRuleset(conf, &pRuleset, pszName)); conf->rulesets.pDflt = pRuleset; - dbgprintf("default rule set changed to %p: '%s'\n", pRuleset, pszName); + DBGPRINTF("default rule set changed to %p: '%s'\n", pRuleset, pszName); finalize_it: RETiRet; @@ -355,7 +355,7 @@ SetCurrRuleset(rsconf_t *conf, uchar *pszName) CHKiRet(rulesetGetRuleset(conf, &pRuleset, pszName)); conf->rulesets.pCurr = pRuleset; - dbgprintf("current rule set changed to %p: '%s'\n", pRuleset, pszName); + DBGPRINTF("current rule set changed to %p: '%s'\n", pRuleset, pszName); finalize_it: RETiRet; @@ -414,7 +414,7 @@ finalize_it: /* destructor for the ruleset object */ BEGINobjDestruct(ruleset) /* be sure to specify the object type also in END and CODESTART macros! */ CODESTARTobjDestruct(ruleset) - dbgprintf("destructing ruleset %p, name %p\n", pThis, pThis->pszName); + DBGPRINTF("destructing ruleset %p, name %p\n", pThis, pThis->pszName); if(pThis->pQueue != NULL) { qqueueDestruct(&pThis->pQueue); } @@ -515,7 +515,7 @@ doRulesetCreateQueue(rsconf_t *conf, int *pNewVal) if(pNewVal == 0) FINALIZE; /* if it is turned off, we do not need to change anything ;) */ - dbgprintf("adding a ruleset-specific \"main\" queue"); + DBGPRINTF("adding a ruleset-specific \"main\" queue"); rulesetMainQName = (conf->rulesets.pCurr->pszName == NULL)? UCHAR_CONSTANT("ruleset") : conf->rulesets.pCurr->pszName; CHKiRet(createMainQueue(&conf->rulesets.pCurr->pQueue, rulesetMainQName)); @@ -560,8 +560,7 @@ doRulesetAddParser(rsconf_t *conf, uchar *pName) CHKiRet(parser.AddParserToList(&conf->rulesets.pCurr->pParserLst, pParser)); - dbgprintf("added parser '%s' to ruleset '%s'\n", pName, conf->rulesets.pCurr->pszName); -RUNLOG_VAR("%p", conf->rulesets.pCurr->pParserLst); + DBGPRINTF("added parser '%s' to ruleset '%s'\n", pName, conf->rulesets.pCurr->pszName); finalize_it: d_free(pName); /* no longer needed */ diff --git a/runtime/statsobj.c b/runtime/statsobj.c index a21614f6..25275616 100644 --- a/runtime/statsobj.c +++ b/runtime/statsobj.c @@ -168,15 +168,18 @@ finalize_it: /* get all the object's countes together as CEE. */ static rsRetVal -getStatsLineCEE(statsobj_t *pThis, cstr_t **ppcstr) +getStatsLineCEE(statsobj_t *pThis, cstr_t **ppcstr, int cee_cookie) { cstr_t *pcstr; ctr_t *pCtr; DEFiRet; CHKiRet(cstrConstruct(&pcstr)); - rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("@cee: {"), 7); + if (cee_cookie == 1) + rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("@cee: "), 6); + + rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("{"), 1); rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("\""), 1); rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("name"), 4); rsCStrAppendStrWithLen(pcstr, UCHAR_CONSTANT("\""), 1); @@ -273,8 +276,11 @@ getAllStatsLines(rsRetVal(*cb)(void*, cstr_t*), void *usrptr, statsFmtType_t fmt case statsFmt_Legacy: CHKiRet(getStatsLine(o, &cstr)); break; + case statsFmt_CEE: + CHKiRet(getStatsLineCEE(o, &cstr, 1)); + break; case statsFmt_JSON: - CHKiRet(getStatsLineCEE(o, &cstr)); + CHKiRet(getStatsLineCEE(o, &cstr, 0)); break; } CHKiRet(cb(usrptr, cstr)); diff --git a/runtime/statsobj.h b/runtime/statsobj.h index f7de68ee..14b33215 100644 --- a/runtime/statsobj.h +++ b/runtime/statsobj.h @@ -46,7 +46,8 @@ typedef enum statsCtrType_e { /* stats line format types */ typedef enum statsFmtType_e { statsFmt_Legacy, - statsFmt_JSON + statsFmt_JSON, + statsFmt_CEE } statsFmtType_t; diff --git a/runtime/wti.c b/runtime/wti.c index e44086af..382f3668 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -121,7 +121,7 @@ wtiWakeupThrd(wti_t *pThis) if(wtiGetState(pThis)) { /* we first try the cooperative "cancel" interface */ pthread_kill(pThis->thrdID, SIGTTIN); - dbgprintf("sent SIGTTIN to worker thread 0x%x\n", (unsigned) pThis->thrdID); + DBGPRINTF("sent SIGTTIN to worker thread 0x%x\n", (unsigned) pThis->thrdID); } RETiRet; @@ -148,13 +148,13 @@ wtiCancelThrd(wti_t *pThis) if(wtiGetState(pThis)) { /* we first try the cooperative "cancel" interface */ pthread_kill(pThis->thrdID, SIGTTIN); - dbgprintf("sent SIGTTIN to worker thread 0x%x, giving it a chance to terminate\n", (unsigned) pThis->thrdID); + DBGPRINTF("sent SIGTTIN to worker thread 0x%x, giving it a chance to terminate\n", (unsigned) pThis->thrdID); srSleep(0, 10000); } if(wtiGetState(pThis)) { - dbgprintf("cooperative worker termination failed, using cancellation...\n"); - dbgoprint((obj_t*) pThis, "canceling worker thread\n"); + DBGPRINTF("cooperative worker termination failed, using cancellation...\n"); + DBGOPRINT((obj_t*) pThis, "canceling worker thread\n"); pthread_cancel(pThis->thrdID); /* now wait until the thread terminates... */ while(wtiGetState(pThis)) { @@ -195,7 +195,7 @@ wtiConstructFinalize(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); - dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); /* initialize our thread instance descriptor (no concurrency here) */ pThis->bIsRunning = FALSE; @@ -257,7 +257,7 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) *pbInactivityTOOccured = 1; /* indicate we had a timeout */ } } - dbgoprint((obj_t*) pThis, "worker awoke from idle processing\n"); + DBGOPRINT((obj_t*) pThis, "worker awoke from idle processing\n"); ENDfunc } @@ -300,7 +300,7 @@ wtiWorker(wti_t *pThis) if(terminateRet == RS_RET_TERMINATE_NOW) { /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); - dbgoprint((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", + DBGOPRINT((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", localRet); d_pthread_mutex_unlock(pWtp->pmutUsr); break; @@ -318,7 +318,7 @@ wtiWorker(wti_t *pThis) } else if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { d_pthread_mutex_unlock(pWtp->pmutUsr); - dbgoprint((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n", + DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n", terminateRet, bInactivityTOOccured); break; /* end of loop */ } diff --git a/tcps_sess.c b/tcps_sess.c index eb3740e4..e7149cb7 100644 --- a/tcps_sess.c +++ b/tcps_sess.c @@ -323,7 +323,7 @@ PrepareClose(tcps_sess_t *pThis) * of message may occur. As such, we process the message in * this case. */ - dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n"); + DBGPRINTF("Extra data at end of stream in legacy syslog/tcp message - processing\n"); datetime.getCurrTime(&stTime, &ttGenTime); defaultDoSubmitMessage(pThis, &stTime, ttGenTime, NULL); } @@ -382,21 +382,21 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt if(isdigit(c)) { pThis->iOctetsRemain = pThis->iOctetsRemain * 10 + c - '0'; } else { /* done with the octet count, so this must be the SP terminator */ - dbgprintf("TCP Message with octet-counter, size %d.\n", pThis->iOctetsRemain); + DBGPRINTF("TCP Message with octet-counter, size %d.\n", pThis->iOctetsRemain); if(c != ' ') { errmsg.LogError(0, NO_ERRCODE, "Framing Error in received TCP message: " "delimiter is not SP but has ASCII value %d.\n", c); } if(pThis->iOctetsRemain < 1) { /* TODO: handle the case where the octet count is 0! */ - dbgprintf("Framing Error: invalid octet count\n"); + DBGPRINTF("Framing Error: invalid octet count\n"); errmsg.LogError(0, NO_ERRCODE, "Framing Error in received TCP message: " "invalid octet count %d.\n", pThis->iOctetsRemain); } else if(pThis->iOctetsRemain > iMaxLine) { /* while we can not do anything against it, we can at least log an indication * that something went wrong) -- rgerhards, 2008-03-14 */ - dbgprintf("truncating message with %d octets - max msg size is %d\n", + DBGPRINTF("truncating message with %d octets - max msg size is %d\n", pThis->iOctetsRemain, iMaxLine); errmsg.LogError(0, NO_ERRCODE, "received oversize message: size is %d bytes, " "max msg size is %d, truncating...\n", pThis->iOctetsRemain, iMaxLine); @@ -407,7 +407,7 @@ processDataRcvd(tcps_sess_t *pThis, char c, struct syslogTime *stTime, time_t tt assert(pThis->inputState == eInMsg); if(pThis->iMsg >= iMaxLine) { /* emergency, we now need to flush, no matter if we are at end of message or not... */ - dbgprintf("error: message received is larger than max msg size, we split it\n"); + DBGPRINTF("error: message received is larger than max msg size, we split it\n"); defaultDoSubmitMessage(pThis, stTime, ttGenTime, pMultiSub); /* we might think if it is better to ignore the rest of the * message than to treat it as a new one. Maybe this is a good @@ -596,7 +596,6 @@ processWorksetItem(tcpsrv_t *pThis, nspoll_t *pPoll, int idx, void *pUsr) iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]); if(iRet == RS_RET_OK) { if(pPoll != NULL) { - dbgprintf("XXXXXX: processWorksetItem trying nspoll.ctl\n"); CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD)); } DBGPRINTF("New session created with NSD %p.\n", pNewSess); @@ -661,7 +660,7 @@ processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t int origEntries = numEntries; DEFiRet; - dbgprintf("tcpsrv: ready to process %d event entries\n", numEntries); + DBGPRINTF("tcpsrv: ready to process %d event entries\n", numEntries); while(numEntries > 0) { if(glbl.GetGlobalInputTermState() == 1) @@ -862,21 +861,21 @@ Run(tcpsrv_t *pThis) } if(localRet != RS_RET_OK) { /* fall back to select */ - dbgprintf("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); + DBGPRINTF("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet); iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t)); FINALIZE; } - dbgprintf("tcpsrv uses epoll() interface, nsdpoll driver found\n"); + DBGPRINTF("tcpsrv uses epoll() interface, nsdpoll driver found\n"); /* flag that we are in epoll mode */ pThis->bUsingEPoll = TRUE; /* Add the TCP listen sockets to the list of sockets to monitor */ for(i = 0 ; i < pThis->iLstnCurr ; ++i) { - dbgprintf("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn); + DBGPRINTF("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn); CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD)); - dbgprintf("Added listener %d\n", i); + DBGPRINTF("Added listener %d\n", i); } while(1) { @@ -1064,7 +1063,7 @@ static rsRetVal SetKeepAlive(tcpsrv_t *pThis, int iVal) { DEFiRet; - dbgprintf("tcpsrv: keep-alive set to %d\n", iVal); + DBGPRINTF("tcpsrv: keep-alive set to %d\n", iVal); pThis->bUseKeepAlive = iVal; RETiRet; } diff --git a/tools/omfile.c b/tools/omfile.c index 68222883..f88ffce4 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -121,7 +121,7 @@ struct s_dynaFileCacheEntry { typedef struct s_dynaFileCacheEntry dynaFileCacheEntry; -#define IOBUF_DFLT_SIZE 1024 /* default size for io buffers */ +#define IOBUF_DFLT_SIZE 4096 /* default size for io buffers */ #define FLUSH_INTRVL_DFLT 1 /* default buffer flush interval (in seconds) */ #define USE_ASYNCWRITER_DFLT 0 /* default buffer use async writer */ #define FLUSHONTX_DFLT 1 /* default for flush on TX end */ @@ -647,7 +647,7 @@ doWrite(instanceData *pData, uchar *pszBuf, int lenBuf) ASSERT(pData != NULL); ASSERT(pszBuf != NULL); -dbgprintf("write to stream, pData->pStrm %p, lenBuf %d\n", pData->pStrm, lenBuf); + DBGPRINTF("write to stream, pData->pStrm %p, lenBuf %d\n", pData->pStrm, lenBuf); if(pData->pStrm != NULL){ CHKiRet(strm.Write(pData->pStrm, pszBuf, lenBuf)); FINALIZE; diff --git a/tools/pmrfc3164.c b/tools/pmrfc3164.c index 2657780d..bcded428 100644 --- a/tools/pmrfc3164.c +++ b/tools/pmrfc3164.c @@ -79,7 +79,7 @@ BEGINparse uchar bufParseTAG[CONF_TAG_MAXSIZE]; uchar bufParseHOSTNAME[CONF_HOSTNAME_MAXSIZE]; CODESTARTparse - dbgprintf("Message will now be parsed by the legacy syslog parser (one size fits all... ;)).\n"); + DBGPRINTF("Message will now be parsed by the legacy syslog parser (one size fits all... ;)).\n"); assert(pMsg != NULL); assert(pMsg->pszRawMsg != NULL); lenMsg = pMsg->iLenRawMsg - pMsg->offAfterPRI; /* note: offAfterPRI is already the number of PRI chars (do not add one!) */ @@ -229,7 +229,7 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(parser, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); - dbgprintf("rfc3164 parser init called\n"); + DBGPRINTF("rfc3164 parser init called\n"); bParseHOSTNAMEandTAG = glbl.GetParseHOSTNAMEandTAG(); /* cache value, is set only during rsyslogd option processing */ diff --git a/tools/syslogd.c b/tools/syslogd.c index 44e60b1c..b84aae22 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -125,6 +125,7 @@ #include "rsconf.h" #include "dnscache.h" #include "sd-daemon.h" +#include "rainerscript.h" /* definitions for objects we access */ DEFobjCurrIf(obj) @@ -448,7 +449,6 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags) MsgSetRawMsgWOSize(pMsg, (char*)msg); MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())); MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp()); -dbgprintf("ZZZZ: pLocalHostIPIF used!\n"); MsgSetRcvFromIP(pMsg, glbl.GetLocalHostIP()); MsgSetMSGoffs(pMsg, 0); /* check if we have an error code associated and, if so, @@ -1446,6 +1446,7 @@ InitGlobalClasses(void) pErrObj = "net"; CHKiRet(objUse(net, LM_NET_FILENAME)); dnscacheInit(); + initRainerscript(); finalize_it: if(iRet != RS_RET_OK) { |