diff options
Diffstat (limited to 'tools')
-rw-r--r-- | tools/omdiscard.c | 4 | ||||
-rw-r--r-- | tools/omfile.c | 15 | ||||
-rw-r--r-- | tools/syslogd.c | 167 |
3 files changed, 60 insertions, 126 deletions
diff --git a/tools/omdiscard.c b/tools/omdiscard.c index 08cd7491..15c6ea82 100644 --- a/tools/omdiscard.c +++ b/tools/omdiscard.c @@ -95,13 +95,9 @@ CODE_STD_STRING_REQUESTparseSelectorAct(0) if(*p == '~') { dbgprintf("discard\n"); - /* re-enable in v7.3: requires action list to support - * action-like statements, something that is too late to - * do in 7.1. errmsg.LogError(0, RS_RET_DEPRECATED, "warning: ~ action " "is deprecated, consider using the 'stop' " "statement instead"); - */ } else { iRet = RS_RET_CONFLINE_UNPROCESSED; } diff --git a/tools/omfile.c b/tools/omfile.c index 5b0bfb46..c7e0dc25 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -156,6 +156,7 @@ typedef struct _instanceData { int iFlushInterval; /* how fast flush buffer on inactivity? */ sbool bFlushOnTXEnd; /* flush write buffers when transaction has ended? */ sbool bUseAsyncWriter; /* use async stream writer? */ + sbool bVeryRobustZip; } instanceData; @@ -205,6 +206,7 @@ static struct cnfparamdescr actpdescr[] = { { "ziplevel", eCmdHdlrInt, 0 }, /* legacy: omfileziplevel */ { "flushinterval", eCmdHdlrInt, 0 }, /* legacy: omfileflushinterval */ { "asyncwriting", eCmdHdlrBinary, 0 }, /* legacy: omfileasyncwriting */ + { "veryrobustzip", eCmdHdlrBinary, 0 }, { "flushontxend", eCmdHdlrBinary, 0 }, /* legacy: omfileflushontxend */ { "iobuffersize", eCmdHdlrSize, 0 }, /* legacy: omfileiobuffersize */ { "dirowner", eCmdHdlrUID, 0 }, /* legacy: dirowner */ @@ -269,7 +271,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tflush on TX end=%d\n", pData->bFlushOnTXEnd); dbgprintf("\tflush interval=%d\n", pData->iFlushInterval); dbgprintf("\tfile cache size=%d\n", pData->iDynaFileCacheSize); - dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "yes" : "no"); + dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "on" : "off"); + dbgprintf("\tvery robust zip: %s\n", pData->bCreateDirs ? "on" : "off"); dbgprintf("\tfile owner %d, group %d\n", (int) pData->fileUID, (int) pData->fileGID); dbgprintf("\tdirectory owner %d, group %d\n", (int) pData->dirUID, (int) pData->dirGID); dbgprintf("\tdir create mode 0%3.3o, file create mode 0%3.3o\n", @@ -292,7 +295,7 @@ setLegacyDfltTpl(void __attribute__((unused)) *pVal, uchar* newVal) if(loadModConf != NULL && loadModConf->tplName != NULL) { free(newVal); - errmsg.LogError(0, RS_RET_ERR, "omfile default template already set via module " + errmsg.LogError(0, RS_RET_ERR, "omfile: default template already set via module " "global parameter - can no longer be changed"); ABORT_FINALIZE(RS_RET_ERR); } @@ -536,6 +539,7 @@ prepareFile(instanceData *pData, uchar *newFileName) CHKiRet(strm.SetFName(pData->pStrm, szBaseName, ustrlen(szBaseName))); CHKiRet(strm.SetDir(pData->pStrm, szDirName, ustrlen(szDirName))); CHKiRet(strm.SetiZipLevel(pData->pStrm, pData->iZipLevel)); + CHKiRet(strm.SetbVeryReliableZip(pData->pStrm, pData->bVeryRobustZip)); CHKiRet(strm.SetsIOBufSize(pData->pStrm, (size_t) pData->iIOBufSize)); CHKiRet(strm.SettOperationsMode(pData->pStrm, STREAMMODE_WRITE_APPEND)); CHKiRet(strm.SettOpenMode(pData->pStrm, cs.fCreateMode)); @@ -843,6 +847,7 @@ BEGINendTransaction CODESTARTendTransaction /* Note: pStrm may be NULL if there was an error opening the stream */ if(pData->bFlushOnTXEnd && pData->pStrm != NULL) { +dbgprintf("AAAA: flusing stream, endTx\n"); CHKiRet(strm.Flush(pData->pStrm)); } finalize_it: @@ -854,6 +859,7 @@ CODESTARTdoAction DBGPRINTF("file to log to: %s\n", pData->f_fname); CHKiRet(writeFile(ppString, iMsgOpts, pData)); if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) { +dbgprintf("AAAA: flusing stream, in Tx\n"); CHKiRet(strm.Flush(pData->pStrm)); } finalize_it: @@ -878,6 +884,7 @@ setInstParamDefaults(instanceData *pData) pData->bCreateDirs = 1; pData->bSyncFile = 0; pData->iZipLevel = 0; + pData->bVeryRobustZip = 0; pData->bFlushOnTXEnd = FLUSHONTX_DFLT; pData->iIOBufSize = IOBUF_DFLT_SIZE; pData->iFlushInterval = FLUSH_INTRVL_DFLT; @@ -915,6 +922,8 @@ CODESTARTnewActInst pData->iZipLevel = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "flushinterval")) { pData->iFlushInterval = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "veryrobustzip")) { + pData->bVeryRobustZip = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "asyncwriting")) { pData->bUseAsyncWriter = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "flushontxend")) { @@ -1064,6 +1073,7 @@ CODESTARTparseSelectorAct pData->iIOBufSize = (int) cs.iIOBufSize; pData->iFlushInterval = cs.iFlushInterval; pData->bUseAsyncWriter = cs.bUseAsyncWriter; + pData->bVeryRobustZip = 0; /* cannot be specified via legacy conf */ CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -1090,7 +1100,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a cs.bUseAsyncWriter = USE_ASYNCWRITER_DFLT; free(pszFileDfltTplName); pszFileDfltTplName = NULL; - return RS_RET_OK; } diff --git a/tools/syslogd.c b/tools/syslogd.c index 05cbfc13..e347794b 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -44,7 +44,6 @@ #include "rsyslog.h" #define DEFUPRI (LOG_USER|LOG_NOTICE) -#define TIMERINTVL 30 /* interval for checking flush, mark */ #include <unistd.h> #include <stdlib.h> @@ -127,6 +126,7 @@ extern int yydebug; /* interface to flex */ #include "dnscache.h" #include "sd-daemon.h" #include "rainerscript.h" +#include "ratelimit.h" /* definitions for objects we access */ DEFobjCurrIf(obj) @@ -205,13 +205,6 @@ static int bFinished = 0; /* used by termination signal handler, read-only excep */ int iConfigVerify = 0; /* is this just a config verify run? */ -/* Intervals at which we flush out "message repeated" messages, - * in seconds after previous message is logged. After each flush, - * we move to the next interval until we reach the largest. - * TODO: this shall go into action object! -- rgerhards, 2008-01-29 - */ -int repeatinterval[2] = { 30, 60 }; /* # of secs before flush */ - #define LIST_DELIMITER ':' /* delimiter between two hosts */ static pid_t ppid; /* This is a quick and dirty hack used for spliting main/startup thread */ @@ -222,6 +215,8 @@ struct queuefilenames_s { } *queuefilenames = NULL; +static ratelimit_t *dflt_ratelimiter = NULL; /* ratelimiter for submits without explicit one */ +static ratelimit_t *internalMsg_ratelimiter = NULL; /* ratelimiter for rsyslog-own messages */ int MarkInterval = 20 * 60; /* interval between marks in seconds - read-only after startup */ int send_to_all = 0; /* send message to all IPv4/IPv6 addresses */ static int NoFork = 0; /* don't fork - don't run in daemon mode - read-only after startup */ @@ -413,7 +408,7 @@ parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int fla CHKiRet(prop.Destruct(&pProp)); CHKiRet(MsgSetRcvFromIPStr(pMsg, hnameIP, ustrlen(hnameIP), &pProp)); CHKiRet(prop.Destruct(&pProp)); - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg2(pMsg)); finalize_it: RETiRet; @@ -433,6 +428,12 @@ submitErrMsg(int iErr, uchar *msg) } +static inline rsRetVal +submitMsgWithDfltRatelimiter(msg_t *pMsg) +{ + return ratelimitAddMsg(dflt_ratelimiter, NULL, pMsg); +} + /* rgerhards 2004-11-09: the following is a function that can be used * to log a message orginating from the syslogd itself. */ @@ -484,50 +485,13 @@ logmsgInternal(int iErr, int pri, uchar *msg, int flags) /* we have the queue, so we can simply provide the * message to the queue engine. */ - submitMsg(pMsg); + ratelimitAddMsg(internalMsg_ratelimiter, NULL, pMsg); + //submitMsgWithDfltRatelimiter(pMsg); } finalize_it: RETiRet; } -/* check message against ACL set - * rgerhards, 2009-11-16 - */ -#if 0 -static inline rsRetVal -chkMsgAgainstACL() { - /* if we reach this point, we had a good receive and can process the packet received */ - /* check if we have a different sender than before, if so, we need to query some new values */ - if(net.CmpHost(&frominet, frominetPrev, socklen) != 0) { - CHKiRet(net.cvthname(&frominet, fromHost, fromHostFQDN, fromHostIP)); - memcpy(frominetPrev, &frominet, socklen); /* update cache indicator */ - /* Here we check if a host is permitted to send us - * syslog messages. If it isn't, we do not further - * process the message but log a warning (if we are - * configured to do this). - * rgerhards, 2005-09-26 - */ - *pbIsPermitted = net.isAllowedSender((uchar*)"UDP", - (struct sockaddr *)&frominet, (char*)fromHostFQDN); - - if(!*pbIsPermitted) { - DBGPRINTF("%s is not an allowed sender\n", (char*)fromHostFQDN); - if(glbl.GetOption_DisallowWarning) { - time_t tt; - - datetime.GetTime(&tt); - if(tt > ttLastDiscard + 60) { - ttLastDiscard = tt; - errmsg.LogError(0, NO_ERRCODE, - "UDP message from disallowed sender %s discarded", - (char*)fromHost); - } - } - } - } -} -#endif - /* preprocess a batch of messages, that is ready them for actual processing. This is done * as a first stage and totally in parallel to any other worker active in the system. So @@ -620,7 +584,7 @@ int i; * rgerhards, 2008-02-13 */ rsRetVal -submitMsg(msg_t *pMsg) +submitMsg2(msg_t *pMsg) { qqueue_t *pQueue; ruleset_t *pRuleset; @@ -633,7 +597,7 @@ submitMsg(msg_t *pMsg) /* if a plugin logs a message during shutdown, the queue may no longer exist */ if(pQueue == NULL) { - DBGPRINTF("submitMsg() could not submit message - " + DBGPRINTF("submitMsg2() could not submit message - " "queue does (no longer?) exist - ignored\n"); FINALIZE; } @@ -645,13 +609,19 @@ finalize_it: RETiRet; } +rsRetVal +submitMsg(msg_t *pMsg) +{ + return submitMsgWithDfltRatelimiter(pMsg); +} + /* submit multiple messages at once, very similar to submitMsg, just * for multi_submit_t. All messages need to go into the SAME queue! * rgerhards, 2009-06-16 */ rsRetVal -multiSubmitMsg(multi_submit_t *pMultiSub) +multiSubmitMsg2(multi_submit_t *pMultiSub) { int i; qqueue_t *pQueue; @@ -682,8 +652,23 @@ multiSubmitMsg(multi_submit_t *pMultiSub) finalize_it: RETiRet; } +rsRetVal +multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */ +{ + return multiSubmitMsg2(pMultiSub); +} +/* flush multiSubmit, e.g. at end of read records */ +rsRetVal +multiSubmitFlush(multi_submit_t *pMultiSub) +{ + DEFiRet; + if(pMultiSub->nElem > 0) { + iRet = multiSubmitMsg2(pMultiSub); + } + RETiRet; +} static void @@ -702,43 +687,6 @@ reapchild() } -/* helper to doFlushRptdMsgs() to flush the individual action links via llExecFunc - * rgerhards, 2007-08-02 - */ -DEFFUNC_llExecFunc(flushRptdMsgsActions) -{ - action_t *pAction = (action_t*) pData; - assert(pAction != NULL); - - BEGINfunc - d_pthread_mutex_lock(&pAction->mutAction); - /* TODO: time() performance: the call below could be moved to - * the beginn of the llExec(). This makes it slightly less correct, but - * in an acceptable way. -- rgerhards, 2008-09-16 - */ - if (pAction->f_prevcount && datetime.GetTime(NULL) >= REPEATTIME(pAction)) { - DBGPRINTF("flush %s: repeated %d times, %d sec.\n", - module.GetStateName(pAction->pMod), pAction->f_prevcount, - repeatinterval[pAction->f_repeatcount]); - actionWriteToAction(pAction); - BACKOFF(pAction); - } - d_pthread_mutex_unlock(&pAction->mutAction); - - ENDfunc - return RS_RET_OK; /* we ignore errors, we can not do anything either way */ -} - - -/* This method flushes repeat messages. - */ -static void -doFlushRptdMsgs(void) -{ - ruleset.IterateAllActions(runConf, flushRptdMsgsActions, NULL); -} - - static void debug_switch() { time_t tTime; @@ -1264,7 +1212,7 @@ static inline void processImInternal(void) msg_t *pMsg; while(iminternalRemoveMsg(&pMsg) == RS_RET_OK) { - submitMsg(pMsg); + submitMsgWithDfltRatelimiter(pMsg); } } @@ -1331,49 +1279,22 @@ mainloop(void) while(!bFinished){ /* this is now just a wait - please note that we do use a near-"eternal" - * timeout of 1 day if we do not have repeated message reduction turned on - * (which it is not by default). This enables us to help safe the environment + * timeout of 1 day. This enables us to help safe the environment * by not unnecessarily awaking rsyslog on a regular tick (just think * powertop, for example). In that case, we primarily wait for a signal, * but a once-a-day wakeup should be quite acceptable. -- rgerhards, 2008-06-09 */ - tvSelectTimeout.tv_sec = (runConf->globals.bReduceRepeatMsgs == 1) ? TIMERINTVL : 86400 /*1 day*/; - //tvSelectTimeout.tv_sec = TIMERINTVL; /* TODO: change this back to the above code when we have a better solution for apc */ + tvSelectTimeout.tv_sec = 86400 /*1 day*/; tvSelectTimeout.tv_usec = 0; select(1, NULL, NULL, NULL, &tvSelectTimeout); if(bFinished) - break; /* exit as quickly as possible - see long comment below */ - - /* If we received a HUP signal, we call doFlushRptdMsgs() a bit early. This - * doesn't matter, because doFlushRptdMsgs() checks timestamps. What may happen, - * however, is that the too-early call may lead to a bit too-late output - * of "last message repeated n times" messages. But that is quite acceptable. - * rgerhards, 2007-12-21 - * ... and just to explain, we flush here because that is exactly what the mainloop - * shall do - provide a periodic interval in which not-yet-flushed messages will - * be flushed. Be careful, there is a potential race condition: doFlushRptdMsgs() - * needs to aquire a lock on the action objects. If, however, long-running consumers - * cause the main queue worker threads to lock them for a long time, we may receive - * a starvation condition, resulting in the mainloop being held on lock for an extended - * period of time. That, in turn, could lead to unresponsiveness to termination - * requests. It is especially important that the bFinished flag is checked before - * doFlushRptdMsgs() is called (I know because I ran into that situation). I am - * not yet sure if the remaining probability window of a termination-related - * problem is large enough to justify changing the code - I would consider it - * extremely unlikely that the problem ever occurs in practice. Fixing it would - * require not only a lot of effort but would cost considerable performance. So - * for the time being, I think the remaining risk can be accepted. - * rgerhards, 2008-01-10 - */ - if(runConf->globals.bReduceRepeatMsgs == 1) - doFlushRptdMsgs(); + break; /* exit as quickly as possible */ if(bHadHUP) { doHUP(); bHadHUP = 0; continue; } - // TODO: remove execScheduled(); /* handle Apc calls (if any) */ } ENDfunc } @@ -1469,6 +1390,7 @@ InitGlobalClasses(void) CHKiRet(objUse(net, LM_NET_FILENAME)); dnscacheInit(); initRainerscript(); + ratelimitModInit(); finalize_it: if(iRet != RS_RET_OK) { @@ -1507,6 +1429,7 @@ GlobalClassExit(void) /* TODO: implement the rest of the deinit */ /* dummy "classes */ strExit(); + ratelimitModExit(); #if 0 CHKiRet(objGetObjInterface(&obj)); /* this provides the root pointer for all other queries */ @@ -2043,6 +1966,12 @@ int realMain(int argc, char **argv) } CHKiRet(localRet); + CHKiRet(ratelimitNew(&dflt_ratelimiter, "rsyslogd", "dflt")); + /* TODO: add linux-type limiting capability */ + CHKiRet(ratelimitNew(&internalMsg_ratelimiter, "rsyslogd", "internal_messages")); + ratelimitSetLinuxLike(internalMsg_ratelimiter, 5, 500); + /* TODO: make internalMsg ratelimit settings configurable */ + if(bChDirRoot) { if(chdir("/") != 0) fprintf(stderr, "Can not do 'cd /' - still trying to run\n"); |