summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dirty.h2
-rw-r--r--doc/v7compatibility.html19
-rw-r--r--plugins/imptcp/imptcp.c19
-rw-r--r--plugins/imuxsock/imuxsock.c5
-rw-r--r--runtime/ratelimit.c49
-rw-r--r--tools/syslogd.c43
6 files changed, 111 insertions, 26 deletions
diff --git a/dirty.h b/dirty.h
index 3c602caa..d82d5524 100644
--- a/dirty.h
+++ b/dirty.h
@@ -31,6 +31,8 @@ rsRetVal __attribute__((deprecated)) multiSubmitMsg(multi_submit_t *pMultiSub);
rsRetVal __attribute__((deprecated)) submitMsg(msg_t *pMsg);
rsRetVal multiSubmitMsg2(multi_submit_t *pMultiSub, ratelimit_t *ratelimit);
rsRetVal submitMsg2(msg_t *pMsg, ratelimit_t *ratelimit);
+rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub);
+rsRetVal multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit);
rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags);
rsRetVal parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset);
rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */
diff --git a/doc/v7compatibility.html b/doc/v7compatibility.html
index bf4c0eba..e954ca1d 100644
--- a/doc/v7compatibility.html
+++ b/doc/v7compatibility.html
@@ -27,9 +27,12 @@ problems to anyone, especially as in v6 this was announced as a missing feature.
<h2>"last message repeated n times" Processing</h2>
<p>This processing has been optimized and moved to the input side. This results
-in far better performance and also de-couples different sources from the same
+in usually far better performance and also de-couples different sources
+from the same
processing. It is now also integrated in to the more generic rate-limiting
-processing. The code works almost as before, with two exceptions:
+processing.
+<h3>User-Noticable Changes</h3>
+The code works almost as before, with two exceptions:
<ul>
<li>The supression amount can be different, as the new algorithm
precisely check's a single source, and while that source is being
@@ -46,6 +49,18 @@ messages will most probably not be detected. Upgrading the module code is
simple, and all rsyslog-provided plugins support the new method, so this
should not be a real problem (crafting a solution would result in rather
complex code - for a case that most probably would never happen).
+<h3>Performance Implications</h3>
+<p>In general, the new method enables far faster output procesing. However, it
+needs to be noted that the "last message repeated n" processing needs parsed
+messages in order to detect duplicated. Consequently, if it is enabled the
+parser step cannot be deferred to the main queue processing thread and
+thus must be done during input processing. The changes workload distribution
+and may have (good or bad) effect on the overall performance. If you have
+a very high performance installation, it is suggested to check the performance
+profike before deploying the new version. Note: for high-performance
+environments it is highly recommended NOT to use "last message repeated n times"
+processing but rather the other (more efficient) rate-limiting methods. These
+also do NOT require the parsing step to be done during input processing.
<p><font size="2">This documentation is part of the
<a href="http://www.rsyslog.com/">rsyslog</a> project.<br>
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index f804a70e..6d2ecd2f 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -67,6 +67,7 @@
#include "ruleset.h"
#include "msg.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
/* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */
@@ -195,6 +196,7 @@ struct ptcpsrv_s {
sbool bKeepAlive; /* support keep-alive packets */
sbool bEmitMsgOnClose;
sbool bSuppOctetFram;
+ ratelimit_t *ratelimiter;
};
/* the ptcp session object. Describes a single active session.
@@ -295,6 +297,7 @@ destructSess(ptcpsess_t *pSess)
static void
destructSrv(ptcpsrv_t *pSrv)
{
+ ratelimitDestruct(pSrv->ratelimiter);
prop.Destruct(&pSrv->pInputName);
pthread_mutex_destroy(&pSrv->mutSessLst);
free(pSrv->pszInputName);
@@ -661,6 +664,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult
msg_t *pMsg;
ptcpsrv_t *pSrv;
DEFiRet;
+dbgprintf("DDDD: in imptcp doSubmitMSg()\n");
if(pThis->iMsg == 0) {
DBGPRINTF("discarding zero-sized message\n");
@@ -679,14 +683,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult
MsgSetRuleset(pMsg, pSrv->pRuleset);
STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit);
- if(pMultiSub == NULL) {
- CHKiRet(submitMsg2(pMsg, NULL));
- } else {
- pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
- if(pMultiSub->nElem == pMultiSub->maxElem)
- CHKiRet(multiSubmitMsg2(pMultiSub, NULL));
- }
-
+ multiSubmitAddMsg(pMultiSub, pMsg, pSrv->ratelimiter);
finalize_it:
/* reset status variables */
@@ -831,10 +828,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub));
}
- if(multiSub.nElem > 0) {
- /* submit anything that was not yet submitted */
- CHKiRet(multiSubmitMsg2(&multiSub, NULL));
- }
+ iRet = multiSubmitFlush(&multiSub);
finalize_it:
RETiRet;
@@ -1130,6 +1124,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
+ CHKiRet(ratelimitNew(&pSrv->ratelimiter));
CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort));
pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim;
if(inst->pszBindAddr == NULL)
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 392709a0..1ff89353 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -812,6 +812,11 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
struct syslogTime dummyTS;
struct json_object *json = NULL, *jval;
DEFiRet;
+#warning experimental code needs to be made production-ready!
+/* we need to decide how many ratelimiters we use --> hashtable
+ also remove current homegrown ratelimiting functionality and
+ replace it with the new one.
+ */
rsRetVal localRet;
static ratelimit_t *ratelimit = NULL;
if(ratelimit == NULL)
diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c
index 6372602f..03b9631e 100644
--- a/runtime/ratelimit.c
+++ b/runtime/ratelimit.c
@@ -29,14 +29,17 @@
#include "errmsg.h"
#include "ratelimit.h"
#include "datetime.h"
+#include "parser.h"
#include "unicode-helper.h"
#include "msg.h"
+#include "dirty.h"
/* definitions for objects we access */
DEFobjStaticHelpers
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(datetime)
+DEFobjCurrIf(parser)
/* static data */
@@ -58,8 +61,17 @@ DEFobjCurrIf(datetime)
rsRetVal
ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit)
{
+ rsRetVal localRet;
DEFiRet;
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
+ DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ }
+ }
+
+
/* suppress duplicate messages */
if( ratelimit->pMsg != NULL &&
getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) &&
@@ -94,6 +106,7 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit)
}
finalize_it:
+dbgprintf("DDDD: in ratelimitMsg(): %d\n", iRet);
RETiRet;
}
@@ -119,16 +132,9 @@ dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n");
goto done;
}
-#warning remove/enable after mailing list feedback
-#if 0
- if(pAction->bRepMsgHasMsg == 0) { /* old format repeat message? */
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " last message repeated %d times",
- pAction->f_prevcount);
- } else {
-#endif
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), " message repeated %d times: [%.800s]",
- ratelimit->nsupp, getMSG(ratelimit->repMsg));
-// }
+ lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
+ " message repeated %d times: [%.800s]",
+ ratelimit->nsupp, getMSG(ratelimit->repMsg));
/* We now need to update the other message properties. Please note that digital
* signatures inside the message are invalidated. */
@@ -143,6 +149,7 @@ dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n");
done: return repMsg;
}
+
rsRetVal
ratelimitNew(ratelimit_t **ppThis)
{
@@ -156,9 +163,25 @@ finalize_it:
}
void
-ratelimitDestruct(ratelimit_t *pThis)
+ratelimitDestruct(ratelimit_t *ratelimit)
{
- free(pThis);
+ msg_t *pMsg;
+ if(ratelimit->pMsg != NULL) {
+ if(ratelimit->nsupp > 0) {
+ if(ratelimit->repMsg != NULL) {
+ dbgprintf("ratelimiter/destuct: call sequence error, have "
+ "previous repeat message - discarding\n");
+ msgDestruct(&ratelimit->repMsg);
+ }
+ ratelimit->repMsg = ratelimit->pMsg;
+ pMsg = ratelimitGetRepeatMsg(ratelimit);
+ if(pMsg != NULL)
+ submitMsg(pMsg);
+ }
+ } else {
+ msgDestruct(&ratelimit->pMsg);
+ }
+ free(ratelimit);
}
void
@@ -167,6 +190,7 @@ ratelimitModExit(void)
objRelease(datetime, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(errmsg, CORE_COMPONENT);
+ objRelease(parser, CORE_COMPONENT);
}
rsRetVal
@@ -177,6 +201,7 @@ ratelimitModInit(void)
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ CHKiRet(objUse(parser, CORE_COMPONENT));
finalize_it:
RETiRet;
}
diff --git a/tools/syslogd.c b/tools/syslogd.c
index bfdb5081..e8957e13 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -699,6 +699,49 @@ multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */
}
+/* add a message to a multisubmit structure. This handles ratelimiting. IF
+ * pMultiSub == NULL, a single-message enqueue happens. */
+rsRetVal
+multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit)
+{
+ rsRetVal localRet;
+ DEFiRet;
+
+ if(pMultiSub == NULL) {
+dbgprintf("DDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n");
+ CHKiRet(submitMsg(pMsg));
+ } else {
+dbgprintf("DDDD: have multisub!\n");
+ localRet = ratelimitMsg(pMsg, ratelimit);
+ if(localRet == RS_RET_OK_HAVE_REPMSG) {
+dbgprintf("DDDD: doing repeat submit!\n");
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = ratelimitGetRepeatMsg(ratelimit);
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub, NULL));
+ localRet = RS_RET_OK;
+ }
+ if(localRet == RS_RET_OK) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub, NULL));
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+/* flush multiSubmit, e.g. at end of read records */
+rsRetVal
+multiSubmitFlush(multi_submit_t *pMultiSub)
+{
+ DEFiRet;
+dbgprintf("DDDD: multiSubmitFlish, nElem %d\n", pMultiSub->nElem);
+ if(pMultiSub->nElem > 0) {
+ iRet = multiSubmitMsg(pMultiSub);
+ }
+ RETiRet;
+}
static void