summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--dirty.h2
-rw-r--r--plugins/imfile/imfile.c2
-rw-r--r--plugins/imptcp/imptcp.c3
-rw-r--r--plugins/imudp/imudp.c4
-rw-r--r--plugins/imuxsock/imuxsock.c12
-rw-r--r--runtime/ratelimit.c141
-rw-r--r--runtime/ratelimit.h9
-rw-r--r--runtime/rsyslog.h1
-rw-r--r--tools/syslogd.c32
9 files changed, 101 insertions, 105 deletions
diff --git a/dirty.h b/dirty.h
index e49730a1..30b30bec 100644
--- a/dirty.h
+++ b/dirty.h
@@ -28,10 +28,10 @@
#define DIRTY_H_INCLUDED 1
rsRetVal __attribute__((deprecated)) multiSubmitMsg(multi_submit_t *pMultiSub);
+rsRetVal multiSubmitMsg2(multi_submit_t *pMultiSub); /* friends only! */
rsRetVal submitMsg2(msg_t *pMsg);
rsRetVal __attribute__((deprecated)) submitMsg(msg_t *pMsg);
rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub);
-rsRetVal multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit);
rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags);
rsRetVal __attribute__((deprecated)) parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset);
rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index d6e6ad0e..453b6b05 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -306,7 +306,7 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
finalize_it:
if(pThis->multiSub.nElem > 0) {
/* submit everything that was not yet submitted */
- CHKiRet(multiSubmitMsg2(&pThis->multiSub, NULL));
+ CHKiRet(multiSubmitMsg(&pThis->multiSub));
}
; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */
/* Note: the problem above is that pthread:cleanup_pop() is a macro which
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index e7650812..22d3ff25 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -682,7 +682,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult
MsgSetRuleset(pMsg, pSrv->pRuleset);
STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit);
- multiSubmitAddMsg(pMultiSub, pMsg, pSrv->ratelimiter);
+ ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg);
finalize_it:
/* reset status variables */
@@ -1124,6 +1124,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
CHKiRet(ratelimitNew(&pSrv->ratelimiter));
+ ratelimitSetThreadSafe(pSrv->ratelimiter);
CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort));
pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim;
if(inst->pszBindAddr == NULL)
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 382ccbc2..2ddc10bb 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -51,6 +51,7 @@
#include "prop.h"
#include "ruleset.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "unicode-helper.h"
MODULE_TYPE_INPUT
@@ -75,6 +76,7 @@ static struct lstn_s {
int sock; /* socket */
ruleset_t *pRuleset; /* bound ruleset */
statsobj_t *stats; /* listener stats */
+ ratelimit_t *ratelimiter;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
} *lcnfRoot = NULL, *lcnfLast = NULL;
@@ -246,6 +248,7 @@ addListner(instanceConf_t *inst)
newlcnfinfo->next = NULL;
newlcnfinfo->sock = newSocks[iSrc];
newlcnfinfo->pRuleset = inst->pBindRuleset;
+ CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter));
/* support statistics gathering */
CHKiRet(statsobj.Construct(&(newlcnfinfo->stats)));
snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port);
@@ -882,6 +885,7 @@ CODESTARTafterRun
net.clearAllowedSenders((uchar*)"UDP");
for(lstn = lcnfRoot ; lstn != NULL ; ) {
statsobj.Destruct(&(lstn->stats));
+ ratelimitDestruct(lstn->ratelimiter);
close(lstn->sock);
lstnDel = lstn;
lstn = lstn->next;
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 2f7daf87..3421863d 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -811,6 +811,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
int toffs; /* offset for trusted properties */
struct syslogTime dummyTS;
struct json_object *json = NULL, *jval;
+ msg_t *repMsg;
DEFiRet;
#warning experimental code needs to be made production-ready!
/* we need to decide how many ratelimiters we use --> hashtable
@@ -992,14 +993,11 @@ if(ratelimit == NULL)
MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName);
CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP));
- localRet = ratelimitMsg(pMsg, ratelimit);
- if(localRet == RS_RET_OK_HAVE_REPMSG) {
-dbgprintf("DDDD: doing repeat submit!\n");
- CHKiRet(submitMsg2(ratelimitGetRepeatMsg(ratelimit)));
- localRet = RS_RET_OK;
- }
+ localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
+ if(repMsg != NULL)
+ CHKiRet(submitMsg(repMsg));
if(localRet == RS_RET_OK)
- CHKiRet(submitMsg2(pMsg));
+ CHKiRet(submitMsg(pMsg));
STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit);
finalize_it:
diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c
index e6ea6f4e..6b637677 100644
--- a/runtime/ratelimit.c
+++ b/runtime/ratelimit.c
@@ -43,6 +43,31 @@ DEFobjCurrIf(parser)
/* static data */
+/* generate a "repeated n times" message */
+static inline msg_t *
+ratelimitGenRepMsg(ratelimit_t *ratelimit)
+{
+ msg_t *repMsg;
+ size_t lenRepMsg;
+ uchar szRepMsg[1024];
+
+ if(ratelimit->nsupp == 1) { /* we simply use the original message! */
+ repMsg = MsgAddRef(ratelimit->pMsg);
+ } else {/* we need to duplicate, original message may still be in use in other
+ * parts of the system! */
+ if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) {
+ DBGPRINTF("Message duplication failed, dropping repeat message.\n");
+ goto done;
+ }
+ lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
+ " message repeated %d times: [%.800s]",
+ ratelimit->nsupp, getMSG(ratelimit->pMsg));
+ MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg);
+ }
+
+done: return repMsg;
+}
+
/* ratelimit a message, that means:
* - handle "last message repeated n times" logic
* - handle actual (discarding) rate-limiting
@@ -54,14 +79,14 @@ DEFobjCurrIf(parser)
* cooperative mode can enable a faulty caller to thrash up part
* of the system, but we accept that risk (a faulty caller can
* always do all sorts of evil, so...)
- * Note that the message pointer may be updated upon return. In
- * this case, the ratelimiter is reponsible for handling the
- * original message.
+ * If *ppRepMsg != NULL on return, the caller must enqueue that
+ * message before the original message.
*/
rsRetVal
-ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit)
+ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
{
rsRetVal localRet;
+ int bNeedUnlockMutex = 0;
DEFiRet;
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
@@ -71,7 +96,12 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit)
}
}
+ if(ratelimit->bThreadSafe) {
+ pthread_mutex_lock(&ratelimit->mut);
+ bNeedUnlockMutex = 1;
+ }
+ *ppRepMsg = NULL;
/* suppress duplicate messages */
if( ratelimit->pMsg != NULL &&
getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) &&
@@ -84,72 +114,59 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit)
/* use current message, so we have the new timestamp
* (means we need to discard previous one) */
msgDestruct(&ratelimit->pMsg);
- ratelimit->pMsg = MsgAddRef(pMsg);
+ ratelimit->pMsg = pMsg;
ABORT_FINALIZE(RS_RET_DISCARDMSG);
- } else {/* new message, save it */
- /* first check if we have a previous message stored
- * if so, emit and then discard it first
- */
+ } else {/* new message, do "repeat processing" & save it */
if(ratelimit->pMsg != NULL) {
if(ratelimit->nsupp > 0) {
- dbgprintf("DDDD: would need to emit 'repeat' message\n");
- if(ratelimit->repMsg != NULL) {
- dbgprintf("ratelimiter: call sequence error, have "
- "previous repeat message - discarding\n");
- msgDestruct(&ratelimit->repMsg);
- }
- ratelimit->repMsg = ratelimit->pMsg;
- iRet = RS_RET_OK_HAVE_REPMSG;
+ *ppRepMsg = ratelimitGenRepMsg(ratelimit);
+ ratelimit->nsupp = 0;
}
}
ratelimit->pMsg = MsgAddRef(pMsg);
}
finalize_it:
-dbgprintf("DDDD: in ratelimitMsg(): %d\n", iRet);
+ if(bNeedUnlockMutex)
+ pthread_mutex_unlock(&ratelimit->mut);
RETiRet;
}
-/* return the current repeat message or NULL, if none is present. This MUST
- * be called after ratelimitMsg() returned RS_RET_OK_HAVE_REPMSG. It is
- * important that the message returned by us is enqueued BEFORE the original
- * message, otherwise users my imply different order.
- * If a message object is returned, the caller must destruct it if no longer
- * needed.
+
+/* add a message to a ratelimiter/multisubmit structure.
+ * ratelimiting is automatically handled according to the ratelimit
+ * settings.
+ * if pMultiSub == NULL, a single-message enqueue happens (under reconsideration)
*/
-msg_t *
-ratelimitGetRepeatMsg(ratelimit_t *ratelimit)
+rsRetVal
+ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg)
{
+ rsRetVal localRet;
msg_t *repMsg;
- size_t lenRepMsg;
- uchar szRepMsg[1024];
-dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n");
+ DEFiRet;
- /* we need to duplicate, original message may still be in use in other
- * parts of the system! */
- if((repMsg = MsgDup(ratelimit->repMsg)) == NULL) {
- DBGPRINTF("Message duplication failed, dropping repeat message.\n");
- goto done;
+ if(pMultiSub == NULL) {
+dbgprintf("DDDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n");
+#warning missing multisub Implementation?
+ CHKiRet(submitMsg(pMsg));
+ } else {
+ localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
+ if(repMsg != NULL) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub));
+ }
+ if(localRet == RS_RET_OK) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub));
+ }
}
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
- " message repeated %d times: [%.800s]",
- ratelimit->nsupp, getMSG(ratelimit->repMsg));
-
- /* We now need to update the other message properties. Please note that digital
- * signatures inside the message are invalidated. */
- datetime.getCurrTime(&(repMsg->tRcvdAt), &(repMsg->ttGenTime));
- memcpy(&repMsg->tTIMESTAMP, &repMsg->tRcvdAt, sizeof(struct syslogTime));
- MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg);
-
- if(ratelimit->repMsg != NULL) {
- ratelimit->repMsg = NULL;
- ratelimit->nsupp = 0;
- }
-done: return repMsg;
+finalize_it:
+ RETiRet;
}
-
rsRetVal
ratelimitNew(ratelimit_t **ppThis)
{
@@ -162,25 +179,33 @@ finalize_it:
RETiRet;
}
+/* enable thread-safe operations mode. This make sure that
+ * a single ratelimiter can be called from multiple threads. As
+ * this causes some overhead and is not always required, it needs
+ * to be explicitely enabled. This operation cannot be undone
+ * (think: why should one do that???)
+ */
+void
+ratelimitSetThreadSafe(ratelimit_t *ratelimit)
+{
+ ratelimit->bThreadSafe = 1;
+ pthread_mutex_init(&ratelimit->mut, NULL);
+}
+
void
ratelimitDestruct(ratelimit_t *ratelimit)
{
msg_t *pMsg;
if(ratelimit->pMsg != NULL) {
if(ratelimit->nsupp > 0) {
- if(ratelimit->repMsg != NULL) {
- dbgprintf("ratelimiter/destuct: call sequence error, have "
- "previous repeat message - discarding\n");
- msgDestruct(&ratelimit->repMsg);
- }
- ratelimit->repMsg = ratelimit->pMsg;
- pMsg = ratelimitGetRepeatMsg(ratelimit);
+ pMsg = ratelimitGenRepMsg(ratelimit);
if(pMsg != NULL)
submitMsg2(pMsg);
}
- } else {
msgDestruct(&ratelimit->pMsg);
}
+ if(ratelimit->bThreadSafe)
+ pthread_mutex_destroy(&ratelimit->mut);
free(ratelimit);
}
diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h
index ba2c6b23..37dad900 100644
--- a/runtime/ratelimit.h
+++ b/runtime/ratelimit.h
@@ -24,14 +24,15 @@
struct ratelimit_s {
unsigned nsupp; /**< nbr of msgs suppressed */
msg_t *pMsg;
- msg_t *repMsg; /**< repeat message, temporary buffer */
- /* dummy field list - TODO: implement */
+ sbool bThreadSafe; /**< do we need to operate in Thread-Safe mode? */
+ pthread_mutex_t mut; /**< mutex if thread-safe operation desired */
};
/* prototypes */
rsRetVal ratelimitNew(ratelimit_t **ppThis);
-rsRetVal ratelimitMsg(msg_t *ppMsg, ratelimit_t *ratelimit);
-msg_t * ratelimitGetRepeatMsg(ratelimit_t *ratelimit);
+void ratelimitSetThreadSafe(ratelimit_t *ratelimit);
+rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRep);
+rsRetVal ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg);
void ratelimitDestruct(ratelimit_t *pThis);
rsRetVal ratelimitModInit(void);
void ratelimitModExit(void);
diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h
index e98a9b8c..4404c475 100644
--- a/runtime/rsyslog.h
+++ b/runtime/rsyslog.h
@@ -392,7 +392,6 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_JNAME_NOTFOUND = -2305, /**< JSON name not found (does not exist) */
RS_RET_INVLD_SETOP = -2305, /**< invalid variable set operation, incompatible type */
RS_RET_RULESET_EXISTS = -2306,/**< ruleset already exists */
- RS_RET_OK_HAVE_REPMSG = -2307,/**< processing was OK, but we do have a repeat message (ratelimiter status) */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
diff --git a/tools/syslogd.c b/tools/syslogd.c
index 1eac6ccb..a56cea94 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -700,38 +700,6 @@ multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */
}
-/* add a message to a multisubmit structure. This handles ratelimiting. IF
- * pMultiSub == NULL, a single-message enqueue happens. */
-rsRetVal
-multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit)
-{
- rsRetVal localRet;
- DEFiRet;
-
- if(pMultiSub == NULL) {
-dbgprintf("DDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n");
- CHKiRet(submitMsg(pMsg));
- } else {
-dbgprintf("DDDD: have multisub!\n");
- localRet = ratelimitMsg(pMsg, ratelimit);
- if(localRet == RS_RET_OK_HAVE_REPMSG) {
-dbgprintf("DDDD: doing repeat submit!\n");
- pMultiSub->ppMsgs[pMultiSub->nElem++] = ratelimitGetRepeatMsg(ratelimit);
- if(pMultiSub->nElem == pMultiSub->maxElem)
- CHKiRet(multiSubmitMsg2(pMultiSub));
- localRet = RS_RET_OK;
- }
- if(localRet == RS_RET_OK) {
- pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
- if(pMultiSub->nElem == pMultiSub->maxElem)
- CHKiRet(multiSubmitMsg2(pMultiSub));
- }
- }
-
-finalize_it:
- RETiRet;
-}
-
/* flush multiSubmit, e.g. at end of read records */
rsRetVal
multiSubmitFlush(multi_submit_t *pMultiSub)