summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/imfile/imfile.c2
-rw-r--r--plugins/imptcp/imptcp.c3
-rw-r--r--plugins/imudp/imudp.c10
-rw-r--r--plugins/imuxsock/imuxsock.c106
-rw-r--r--runtime/msg.c3
-rw-r--r--runtime/ratelimit.c119
-rw-r--r--runtime/ratelimit.h7
-rw-r--r--tcpsrv.c2
-rw-r--r--tools/syslogd.c1
9 files changed, 143 insertions, 110 deletions
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 99210b76..9a8a0373 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -412,7 +412,7 @@ addListner(instanceConf_t *inst)
pThis->lenTag = ustrlen(pThis->pszTag);
pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile);
- CHKiRet(ratelimitNew(&pThis->ratelimiter));
+ CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", inst->pszFileName));
CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*)));
pThis->multiSub.maxElem = inst->nMultiSub;
pThis->multiSub.nElem = 0;
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 8495b293..aba4c439 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -1121,7 +1121,8 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
- CHKiRet(ratelimitNew(&pSrv->ratelimiter));
+ CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort));
+//TODO: add!ratelimitSetLinuxLike(pSrv->ratelimiter, 3, 2);
ratelimitSetThreadSafe(pSrv->ratelimiter);
CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort));
pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim;
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 7f9e00dc..aba7d69d 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -223,7 +223,7 @@ addListner(instanceConf_t *inst)
struct lstn_s *newlcnfinfo;
uchar *bindName;
uchar *port;
- uchar statname[64];
+ uchar dispname[64];
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -248,12 +248,12 @@ addListner(instanceConf_t *inst)
newlcnfinfo->next = NULL;
newlcnfinfo->sock = newSocks[iSrc];
newlcnfinfo->pRuleset = inst->pBindRuleset;
- CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter));
+ snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port);
+ dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, dispname, NULL));
/* support statistics gathering */
CHKiRet(statsobj.Construct(&(newlcnfinfo->stats)));
- snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port);
- statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
- CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname));
+ CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname));
STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(newlcnfinfo->ctrSubmit)));
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 6aa2b1e8..c871391c 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -106,15 +106,6 @@ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit)
STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters)
-struct rs_ratelimit_state {
- unsigned short interval;
- unsigned short burst;
- unsigned done;
- unsigned missed;
- time_t begin;
-};
-typedef struct rs_ratelimit_state rs_ratelimit_state_t;
-
/* a very simple "hash function" for process IDs - we simply use the
* pid itself: it is quite expected that all pids may log some time, but
@@ -272,74 +263,9 @@ static struct cnfparamblk inppblk =
/* we do not use this, because we do not bind to a ruleset so far
* enable when this is changed: #include "im-helper.h" */ /* must be included AFTER the type definitions! */
-static void
-initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst)
-{
- rs->interval = interval;
- rs->burst = burst;
- rs->done = 0;
- rs->missed = 0;
- rs->begin = 0;
-}
-
static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
-/* ratelimiting support, modelled after the linux kernel
- * returns 1 if message is within rate limit and shall be
- * processed, 0 otherwise.
- * This implementation is NOT THREAD-SAFE and must not
- * be called concurrently.
- */
-static inline int
-withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid)
-{
- int ret;
- uchar msgbuf[1024];
-
- if(rs->interval == 0) {
- ret = 1;
- goto finalize_it;
- }
-
- assert(rs->burst != 0);
-
- if(rs->begin == 0)
- rs->begin = tt;
-
- /* resume if we go out of out time window */
- if(tt > rs->begin + rs->interval) {
- if(rs->missed) {
- snprintf((char*)msgbuf, sizeof(msgbuf),
- "imuxsock lost %u messages from pid %lu due to rate-limiting",
- rs->missed, (unsigned long) pid);
- logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
- rs->missed = 0;
- }
- rs->begin = 0;
- rs->done = 0;
- }
-
- /* do actual limit check */
- if(rs->burst > rs->done) {
- rs->done++;
- ret = 1;
- } else {
- if(rs->missed == 0) {
- snprintf((char*)msgbuf, sizeof(msgbuf),
- "imuxsock begins to drop messages from pid %lu due to rate-limiting",
- (unsigned long) pid);
- logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
- }
- rs->missed++;
- ret = 0;
- }
-
-finalize_it:
- return ret;
-}
-
-
/* create input instance, set default paramters, and
* add it to the list of instances.
*/
@@ -446,7 +372,8 @@ addListner(instanceConf_t *inst)
CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName));
}
if(inst->ratelimitInterval > 0) {
- if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
+ if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn,
+ (void(*)(void*))ratelimitDestruct)) == NULL) {
/* in this case, we simply turn off rate-limiting */
DBGPRINTF("imuxsock: turning off rate limiting because we could not "
"create hash table\n");
@@ -605,19 +532,22 @@ finalize_it:
* listener (the latter being a performance enhancement).
*/
static inline rsRetVal
-findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
+findRatelimiter(lstn_t *pLstn, struct ucred *cred, ratelimit_t **prl)
{
- rs_ratelimit_state_t *rl;
+ ratelimit_t *rl;
int r;
pid_t *keybuf;
+ char pidbuf[256];
DEFiRet;
if(cred == NULL)
FINALIZE;
+#if 0 // TODO: check deactivated?
if(pLstn->ratelimitInterval == 0) {
*prl = NULL;
FINALIZE;
}
+#endif
rl = hashtable_search(pLstn->ht, &cred->pid);
if(rl == NULL) {
@@ -625,10 +555,13 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n",
(unsigned long) cred->pid);
STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters);
- CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t)));
+ snprintf(pidbuf, sizeof(pidbuf), "pid %lu",
+ (unsigned long) cred->pid);
+ pidbuf[sizeof(pidbuf)-1] = '\0'; /* to be on safe side */
+ CHKiRet(ratelimitNew(&rl, "imuxsock", pidbuf));
+ ratelimitSetLinuxLike(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst);
CHKmalloc(keybuf = malloc(sizeof(pid_t)));
*keybuf = cred->pid;
- initRatelimitState(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst);
r = hashtable_insert(pLstn->ht, keybuf, rl);
if(r == 0)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -781,8 +714,8 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
uchar bufParseTAG[CONF_TAG_MAXSIZE];
struct syslogTime st;
time_t tt;
- rs_ratelimit_state_t *ratelimiter = NULL;
int lenProp;
+ ratelimit_t *ratelimiter = NULL;
uchar propBuf[1024];
uchar msgbuf[8192];
uchar *pmsgbuf;
@@ -790,14 +723,6 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
struct syslogTime dummyTS;
struct json_object *json = NULL, *jval;
DEFiRet;
-#warning experimental code needs to be made production-ready!
-/* we need to decide how many ratelimiters we use --> hashtable
- also remove current homegrown ratelimiting functionality and
- replace it with the new one.
- */
-static ratelimit_t *ratelimit = NULL;
-if(ratelimit == NULL)
- ratelimitNew(&ratelimit);
/* TODO: handle format errors?? */
/* we need to parse the pri first, because we need the severity for
@@ -829,10 +754,12 @@ if(ratelimit == NULL)
tt = ts->tv_sec;
}
+#if 0 // TODO: think about stats counters (or wait for request...?)
if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) {
STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
FINALIZE;
}
+#endif
/* created trusted properties */
if(cred != NULL && pLstn->bAnnotate) {
@@ -939,7 +866,6 @@ if(ratelimit == NULL)
parse[15] = ' '; /* re-write \0 from fromatTimestamp3164 by SP */
/* update "counters" to reflect processed timestamp */
parse += 16;
- lenMsg -= 16;
}
}
@@ -969,7 +895,7 @@ if(ratelimit == NULL)
MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName);
CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP));
- ratelimitAddMsg(ratelimit, NULL, pMsg);
+ ratelimitAddMsg(ratelimiter, NULL, pMsg);
STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit);
finalize_it:
RETiRet;
diff --git a/runtime/msg.c b/runtime/msg.c
index d874178b..6ba20de6 100644
--- a/runtime/msg.c
+++ b/runtime/msg.c
@@ -2259,6 +2259,9 @@ void MsgSetMSGoffs(msg_t *pMsg, short offs)
ISOBJ_TYPE_assert(pMsg, msg);
pMsg->offMSG = offs;
if(offs > pMsg->iLenRawMsg) {
+ if(!(offs - 1 == pMsg->iLenRawMsg)) {
+ *((char*)(0)) = "abd";
+ }
assert(offs - 1 == pMsg->iLenRawMsg);
pMsg->iLenMSG = 0;
} else {
diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c
index e94ad1bc..22e785a8 100644
--- a/runtime/ratelimit.c
+++ b/runtime/ratelimit.c
@@ -73,8 +73,16 @@ static inline rsRetVal
doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
{
int bNeedUnlockMutex = 0;
+ rsRetVal localRet;
DEFiRet;
+ if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
+ if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
+ DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ ABORT_FINALIZE(RS_RET_DISCARDMSG);
+ }
+ }
+
if(ratelimit->bThreadSafe) {
pthread_mutex_lock(&ratelimit->mut);
bNeedUnlockMutex = 1;
@@ -110,6 +118,61 @@ finalize_it:
RETiRet;
}
+/* Linux-like ratelimiting, modelled after the linux kernel
+ * returns 1 if message is within rate limit and shall be
+ * processed, 0 otherwise.
+ * This implementation is NOT THREAD-SAFE and must not
+ * be called concurrently.
+ */
+static inline int
+withinRatelimit(ratelimit_t *ratelimit, time_t tt)
+{
+ int ret;
+ uchar msgbuf[1024];
+
+ if(ratelimit->interval == 0) {
+ ret = 1;
+ goto finalize_it;
+ }
+
+ assert(ratelimit->burst != 0);
+
+ if(ratelimit->begin == 0)
+ ratelimit->begin = tt;
+
+ /* resume if we go out of out time window */
+ if(tt > ratelimit->begin + ratelimit->interval) {
+ if(ratelimit->missed) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "%s: %u messages lost due to rate-limiting",
+ ratelimit->name, ratelimit->missed);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ ratelimit->missed = 0;
+ }
+ ratelimit->begin = 0;
+ ratelimit->done = 0;
+ }
+
+ /* do actual limit check */
+ if(ratelimit->burst > ratelimit->done) {
+ ratelimit->done++;
+ ret = 1;
+ } else {
+ if(ratelimit->missed == 0) {
+ snprintf((char*)msgbuf, sizeof(msgbuf),
+ "%s: begin to drop messages due to rate-limiting",
+ ratelimit->name);
+ logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
+ }
+ ratelimit->missed++;
+ ret = 0;
+ }
+
+finalize_it:
+ return ret;
+}
+
+
/* ratelimit a message, that means:
* - handle "last message repeated n times" logic
* - handle actual (discarding) rate-limiting
@@ -127,19 +190,13 @@ finalize_it:
rsRetVal
ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
{
- rsRetVal localRet;
DEFiRet;
-#warning be sure to parse only when actually required!
- if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
- if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
- DBGPRINTF("Message discarded, parsing error %d\n", localRet);
+ *ppRepMsg = NULL;
+ if(ratelimit->bLinuxLike) {
+ if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0)
ABORT_FINALIZE(RS_RET_DISCARDMSG);
- }
}
-
- *ppRepMsg = NULL;
-
if(ratelimit->bReduceRepeatMsgs) {
CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg));
}
@@ -147,6 +204,13 @@ finalize_it:
RETiRet;
}
+/* returns 1, if the ratelimiter performs any checks and 0 otherwise */
+int
+ratelimitChecked(ratelimit_t *ratelimit)
+{
+ return ratelimit->bLinuxLike || ratelimit->bReduceRepeatMsgs;
+}
+
/* add a message to a ratelimiter/multisubmit structure.
* ratelimiting is automatically handled according to the ratelimit
@@ -184,19 +248,53 @@ finalize_it:
RETiRet;
}
+
+/* modname must be a static name (usually expected to be the module
+ * name and MUST be present. dynname may be NULL and can be used for
+ * dynamic information, e.g. PID or listener IP, ...
+ * Both values should be kept brief.
+ */
rsRetVal
-ratelimitNew(ratelimit_t **ppThis)
+ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname)
{
ratelimit_t *pThis;
+ char namebuf[256];
DEFiRet;
CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t)));
+ if(modname == NULL)
+ modname ="*ERROR:MODULE NAME MISSING*";
+
+ if(dynname == NULL) {
+ pThis->name = strdup(modname);
+ } else {
+ snprintf(namebuf, sizeof(namebuf), "%s[%s]",
+ modname, dynname);
+ namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */
+ pThis->name = strdup(namebuf);
+ }
pThis->bReduceRepeatMsgs = runConf->globals.bReduceRepeatMsgs;
+ if(pThis->bReduceRepeatMsgs)
+ pThis->bActive = 1;
*ppThis = pThis;
finalize_it:
RETiRet;
}
+
+/* enable linux-like ratelimiting */
+void
+ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst)
+{
+ ratelimit->interval = interval;
+ ratelimit->burst = burst;
+ ratelimit->done = 0;
+ ratelimit->missed = 0;
+ ratelimit->begin = 0;
+ ratelimit->bLinuxLike = 1;
+}
+
+
/* enable thread-safe operations mode. This make sure that
* a single ratelimiter can be called from multiple threads. As
* this causes some overhead and is not always required, it needs
@@ -224,6 +322,7 @@ ratelimitDestruct(ratelimit_t *ratelimit)
}
if(ratelimit->bThreadSafe)
pthread_mutex_destroy(&ratelimit->mut);
+ free(ratelimit->name);
free(ratelimit);
}
diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h
index 5fa884a8..a7959dfe 100644
--- a/runtime/ratelimit.h
+++ b/runtime/ratelimit.h
@@ -22,7 +22,10 @@
#define INCLUDED_RATELIMIT_H
struct ratelimit_s {
+ int bActive; /**< any rate-limiting at all desired? */
+ char *name; /**< rate limiter name, e.g. for user messages */
/* support for Linux kernel-type ratelimiting */
+ int bLinuxLike; /**< Linux-like rate limiting enabled? */
unsigned short interval;
unsigned short burst;
unsigned done;
@@ -37,11 +40,13 @@ struct ratelimit_s {
};
/* prototypes */
-rsRetVal ratelimitNew(ratelimit_t **ppThis);
+rsRetVal ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname);
void ratelimitSetThreadSafe(ratelimit_t *ratelimit);
+void ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst);
rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRep);
rsRetVal ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg);
void ratelimitDestruct(ratelimit_t *pThis);
+int ratelimitChecked(ratelimit_t *ratelimit);
rsRetVal ratelimitModInit(void);
void ratelimitModExit(void);
diff --git a/tcpsrv.c b/tcpsrv.c
index 6b134017..89ad7325 100644
--- a/tcpsrv.c
+++ b/tcpsrv.c
@@ -152,7 +152,7 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram)
snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort);
statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
CHKiRet(statsobj.SetName(pEntry->stats, statname));
- CHKiRet(ratelimitNew(&pEntry->ratelimiter));
+ CHKiRet(ratelimitNew(&pEntry->ratelimiter, "tcperver", NULL));
STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(pEntry->ctrSubmit)));
diff --git a/tools/syslogd.c b/tools/syslogd.c
index a56cea94..f76fe8cb 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -705,7 +705,6 @@ rsRetVal
multiSubmitFlush(multi_submit_t *pMultiSub)
{
DEFiRet;
-dbgprintf("DDDD: multiSubmitFlish, nElem %d\n", pMultiSub->nElem);
if(pMultiSub->nElem > 0) {
iRet = multiSubmitMsg2(pMultiSub);
}