diff options
Diffstat (limited to 'plugins/imjournal')
-rwxr-xr-x | plugins/imjournal/imjournal.c | 79 |
1 files changed, 62 insertions, 17 deletions
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c index 8f1824c9..74ef7115 100755 --- a/plugins/imjournal/imjournal.c +++ b/plugins/imjournal/imjournal.c @@ -3,7 +3,7 @@ * To test under Linux: * emmit log message into systemd journal * - * Copyright (C) 2008-2012 Adiscon GmbH + * Copyright (C) 2008-2013 Adiscon GmbH * * This file is part of rsyslog. * @@ -33,6 +33,7 @@ #include <sys/poll.h> #include <sys/socket.h> #include <errno.h> +#include <systemd/sd-journal.h> #include "dirty.h" #include "cfsysline.h" @@ -47,7 +48,7 @@ #include "errmsg.h" #include "srUtils.h" #include "unicode-helper.h" -#include <systemd/sd-journal.h> +#include "ratelimit.h" MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP @@ -64,11 +65,15 @@ DEFobjCurrIf(errmsg) static struct configSettings_s { char *stateFile; int iPersistStateInterval; + int ratelimitInterval; + int ratelimitBurst; } cs; -/* module-gloval parameters */ +/* module-global parameters */ static struct cnfparamdescr modpdescr[] = { { "statefile", eCmdHdlrGetWord, 0 }, + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 }, { "persiststateinterval", eCmdHdlrInt, 0 } }; static struct cnfparamblk modpblk = @@ -84,6 +89,7 @@ static int bLegacyCnfModGlobalsPermitted = 1;/* are legacy module-global config static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */ static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */ +static ratelimit_t *ratelimiter = NULL; static sd_journal *j; /* enqueue the the journal message into the message queue. @@ -121,7 +127,7 @@ enqMsg(uchar *msg, uchar *pszTag, int iFacility, int iSeverity, struct timeval * msgAddJSON(pMsg, (uchar*)"!", json); } - CHKiRet(submitMsg2(pMsg)); + CHKiRet(ratelimitAddMsg(ratelimiter, NULL, pMsg)); finalize_it: RETiRet; @@ -244,7 +250,14 @@ readjournal() { SD_JOURNAL_FOREACH_DATA(j, get, l) { /* locate equal sign, this is always present */ equal_sign = memchr(get, '=', l); - assert (equal_sign != NULL); + + /* ... but we know better than to trust the specs */ + if (equal_sign == NULL) { + errmsg.LogError(0, RS_RET_ERR,"SD_JOURNAL_FOREACH_DATA()" + " returned a malformed field (has no '='): '%s'", + (char*)get); + continue; /* skip the entry */ + } /* get length of journal data prefix */ prefixlen = ((char *)equal_sign - (char *)get); @@ -427,12 +440,13 @@ finalize_it: } -BEGINrunInput -CODESTARTrunInput - /* this is an endless loop - it is terminated when the thread is - * signalled to do so. This, however, is handled by the framework, - * right into the sleep below. - */ +/* This function loads a journal cursor from the state file. + */ +static rsRetVal +loadJournalState() +{ + DEFiRet; + if (cs.stateFile[0] != '/') { char *new_stateFile; @@ -472,6 +486,22 @@ CODESTARTrunInput } } +finalize_it: + RETiRet; +} + +BEGINrunInput +CODESTARTrunInput + CHKiRet(ratelimitNew(&ratelimiter, "imjournal", NULL)); + ratelimitSetLinuxLike(ratelimiter, cs.ratelimitInterval, cs.ratelimitBurst); + + if (cs.stateFile) { + CHKiRet(loadJournalState()); + } + + /* this is an endless loop - it is terminated when the thread is + * signalled to do so. This, however, is handled by the framework. + */ while (glbl.GetGlobalInputTermState() == 0) { int count = 0, r; @@ -492,11 +522,13 @@ CODESTARTrunInput } CHKiRet(readjournal()); - /* TODO: This could use some finer metric. */ - count++; - if (count == cs.iPersistStateInterval) { - count = 0; - persistJournalState(); + if (cs.stateFile) { /* can't persist without a state file */ + /* TODO: This could use some finer metric. */ + count++; + if (count == cs.iPersistStateInterval) { + count = 0; + persistJournalState(); + } } } @@ -510,6 +542,8 @@ CODESTARTbeginCnfLoad cs.iPersistStateInterval = DFLT_persiststateinterval; cs.stateFile = NULL; + cs.ratelimitBurst = 20000; + cs.ratelimitInterval = 600; ENDbeginCnfLoad @@ -545,13 +579,16 @@ ENDwillRun /* close journal */ BEGINafterRun CODESTARTafterRun - persistJournalState(); + if (cs.stateFile) { /* can't persist without a state file */ + persistJournalState(); + } sd_journal_close(j); ENDafterRun BEGINmodExit CODESTARTmodExit + ratelimitDestruct(ratelimiter); if(pInputName != NULL) prop.Destruct(&pInputName); if(pLocalHostIP != NULL) @@ -589,6 +626,10 @@ CODESTARTsetModCnf cs.iPersistStateInterval = (int) pvals[i].val.d.n; } else if (!strcmp(modpblk.descr[i].name, "statefile")) { cs.stateFile = (char *)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(modpblk.descr[i].name, "ratelimit.burst")) { + cs.ratelimitBurst = (int) pvals[i].val.d.n; + } else if(!strcmp(modpblk.descr[i].name, "ratelimit.interval")) { + cs.ratelimitInterval = (int) pvals[i].val.d.n; } else { dbgprintf("imjournal: program error, non-handled " "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); @@ -634,6 +675,10 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalpersiststateinterval", 0, eCmdHdlrInt, NULL, &cs.iPersistStateInterval, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalratelimitinterval", 0, eCmdHdlrInt, + NULL, &cs.ratelimitInterval, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalratelimitburst", 0, eCmdHdlrInt, + NULL, &cs.ratelimitBurst, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournalstatefile", 0, eCmdHdlrGetWord, NULL, &cs.stateFile, STD_LOADABLE_MODULE_ID)); |