diff options
-rw-r--r-- | plugins/imfile/imfile.c | 2 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 3 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 10 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 106 | ||||
-rw-r--r-- | runtime/msg.c | 3 | ||||
-rw-r--r-- | runtime/ratelimit.c | 119 | ||||
-rw-r--r-- | runtime/ratelimit.h | 7 | ||||
-rw-r--r-- | tcpsrv.c | 2 | ||||
-rw-r--r-- | tools/syslogd.c | 1 |
9 files changed, 143 insertions, 110 deletions
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 99210b76..9a8a0373 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -412,7 +412,7 @@ addListner(instanceConf_t *inst) pThis->lenTag = ustrlen(pThis->pszTag); pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile); - CHKiRet(ratelimitNew(&pThis->ratelimiter)); + CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", inst->pszFileName)); CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*))); pThis->multiSub.maxElem = inst->nMultiSub; pThis->multiSub.nElem = 0; diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index 8495b293..aba4c439 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -1121,7 +1121,8 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) pSrv->iKeepAliveProbes = inst->iKeepAliveProbes; pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; - CHKiRet(ratelimitNew(&pSrv->ratelimiter)); + CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort)); +//TODO: add!ratelimitSetLinuxLike(pSrv->ratelimiter, 3, 2); ratelimitSetThreadSafe(pSrv->ratelimiter); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 7f9e00dc..aba7d69d 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -223,7 +223,7 @@ addListner(instanceConf_t *inst) struct lstn_s *newlcnfinfo; uchar *bindName; uchar *port; - uchar statname[64]; + uchar dispname[64]; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 @@ -248,12 +248,12 @@ addListner(instanceConf_t *inst) newlcnfinfo->next = NULL; newlcnfinfo->sock = newSocks[iSrc]; newlcnfinfo->pRuleset = inst->pBindRuleset; - CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter)); + snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port); + dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, dispname, NULL)); /* support statistics gathering */ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); - snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); - statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ - CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname)); + CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname)); STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit); CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 6aa2b1e8..c871391c 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -106,15 +106,6 @@ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit) STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters) -struct rs_ratelimit_state { - unsigned short interval; - unsigned short burst; - unsigned done; - unsigned missed; - time_t begin; -}; -typedef struct rs_ratelimit_state rs_ratelimit_state_t; - /* a very simple "hash function" for process IDs - we simply use the * pid itself: it is quite expected that all pids may log some time, but @@ -272,74 +263,9 @@ static struct cnfparamblk inppblk = /* we do not use this, because we do not bind to a ruleset so far * enable when this is changed: #include "im-helper.h" */ /* must be included AFTER the type definitions! */ -static void -initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst) -{ - rs->interval = interval; - rs->burst = burst; - rs->done = 0; - rs->missed = 0; - rs->begin = 0; -} - static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ -/* ratelimiting support, modelled after the linux kernel - * returns 1 if message is within rate limit and shall be - * processed, 0 otherwise. - * This implementation is NOT THREAD-SAFE and must not - * be called concurrently. - */ -static inline int -withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid) -{ - int ret; - uchar msgbuf[1024]; - - if(rs->interval == 0) { - ret = 1; - goto finalize_it; - } - - assert(rs->burst != 0); - - if(rs->begin == 0) - rs->begin = tt; - - /* resume if we go out of out time window */ - if(tt > rs->begin + rs->interval) { - if(rs->missed) { - snprintf((char*)msgbuf, sizeof(msgbuf), - "imuxsock lost %u messages from pid %lu due to rate-limiting", - rs->missed, (unsigned long) pid); - logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); - rs->missed = 0; - } - rs->begin = 0; - rs->done = 0; - } - - /* do actual limit check */ - if(rs->burst > rs->done) { - rs->done++; - ret = 1; - } else { - if(rs->missed == 0) { - snprintf((char*)msgbuf, sizeof(msgbuf), - "imuxsock begins to drop messages from pid %lu due to rate-limiting", - (unsigned long) pid); - logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); - } - rs->missed++; - ret = 0; - } - -finalize_it: - return ret; -} - - /* create input instance, set default paramters, and * add it to the list of instances. */ @@ -446,7 +372,8 @@ addListner(instanceConf_t *inst) CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName)); } if(inst->ratelimitInterval > 0) { - if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { + if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, + (void(*)(void*))ratelimitDestruct)) == NULL) { /* in this case, we simply turn off rate-limiting */ DBGPRINTF("imuxsock: turning off rate limiting because we could not " "create hash table\n"); @@ -605,19 +532,22 @@ finalize_it: * listener (the latter being a performance enhancement). */ static inline rsRetVal -findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) +findRatelimiter(lstn_t *pLstn, struct ucred *cred, ratelimit_t **prl) { - rs_ratelimit_state_t *rl; + ratelimit_t *rl; int r; pid_t *keybuf; + char pidbuf[256]; DEFiRet; if(cred == NULL) FINALIZE; +#if 0 // TODO: check deactivated? if(pLstn->ratelimitInterval == 0) { *prl = NULL; FINALIZE; } +#endif rl = hashtable_search(pLstn->ht, &cred->pid); if(rl == NULL) { @@ -625,10 +555,13 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n", (unsigned long) cred->pid); STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters); - CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t))); + snprintf(pidbuf, sizeof(pidbuf), "pid %lu", + (unsigned long) cred->pid); + pidbuf[sizeof(pidbuf)-1] = '\0'; /* to be on safe side */ + CHKiRet(ratelimitNew(&rl, "imuxsock", pidbuf)); + ratelimitSetLinuxLike(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst); CHKmalloc(keybuf = malloc(sizeof(pid_t))); *keybuf = cred->pid; - initRatelimitState(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst); r = hashtable_insert(pLstn->ht, keybuf, rl); if(r == 0) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -781,8 +714,8 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim uchar bufParseTAG[CONF_TAG_MAXSIZE]; struct syslogTime st; time_t tt; - rs_ratelimit_state_t *ratelimiter = NULL; int lenProp; + ratelimit_t *ratelimiter = NULL; uchar propBuf[1024]; uchar msgbuf[8192]; uchar *pmsgbuf; @@ -790,14 +723,6 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim struct syslogTime dummyTS; struct json_object *json = NULL, *jval; DEFiRet; -#warning experimental code needs to be made production-ready! -/* we need to decide how many ratelimiters we use --> hashtable - also remove current homegrown ratelimiting functionality and - replace it with the new one. - */ -static ratelimit_t *ratelimit = NULL; -if(ratelimit == NULL) - ratelimitNew(&ratelimit); /* TODO: handle format errors?? */ /* we need to parse the pri first, because we need the severity for @@ -829,10 +754,12 @@ if(ratelimit == NULL) tt = ts->tv_sec; } +#if 0 // TODO: think about stats counters (or wait for request...?) if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) { STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit); FINALIZE; } +#endif /* created trusted properties */ if(cred != NULL && pLstn->bAnnotate) { @@ -939,7 +866,6 @@ if(ratelimit == NULL) parse[15] = ' '; /* re-write \0 from fromatTimestamp3164 by SP */ /* update "counters" to reflect processed timestamp */ parse += 16; - lenMsg -= 16; } } @@ -969,7 +895,7 @@ if(ratelimit == NULL) MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName); CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); - ratelimitAddMsg(ratelimit, NULL, pMsg); + ratelimitAddMsg(ratelimiter, NULL, pMsg); STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: RETiRet; diff --git a/runtime/msg.c b/runtime/msg.c index d874178b..6ba20de6 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -2259,6 +2259,9 @@ void MsgSetMSGoffs(msg_t *pMsg, short offs) ISOBJ_TYPE_assert(pMsg, msg); pMsg->offMSG = offs; if(offs > pMsg->iLenRawMsg) { + if(!(offs - 1 == pMsg->iLenRawMsg)) { + *((char*)(0)) = "abd"; + } assert(offs - 1 == pMsg->iLenRawMsg); pMsg->iLenMSG = 0; } else { diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index e94ad1bc..22e785a8 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -73,8 +73,16 @@ static inline rsRetVal doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) { int bNeedUnlockMutex = 0; + rsRetVal localRet; DEFiRet; + if((pMsg->msgFlags & NEEDS_PARSING) != 0) { + if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { + DBGPRINTF("Message discarded, parsing error %d\n", localRet); + ABORT_FINALIZE(RS_RET_DISCARDMSG); + } + } + if(ratelimit->bThreadSafe) { pthread_mutex_lock(&ratelimit->mut); bNeedUnlockMutex = 1; @@ -110,6 +118,61 @@ finalize_it: RETiRet; } +/* Linux-like ratelimiting, modelled after the linux kernel + * returns 1 if message is within rate limit and shall be + * processed, 0 otherwise. + * This implementation is NOT THREAD-SAFE and must not + * be called concurrently. + */ +static inline int +withinRatelimit(ratelimit_t *ratelimit, time_t tt) +{ + int ret; + uchar msgbuf[1024]; + + if(ratelimit->interval == 0) { + ret = 1; + goto finalize_it; + } + + assert(ratelimit->burst != 0); + + if(ratelimit->begin == 0) + ratelimit->begin = tt; + + /* resume if we go out of out time window */ + if(tt > ratelimit->begin + ratelimit->interval) { + if(ratelimit->missed) { + snprintf((char*)msgbuf, sizeof(msgbuf), + "%s: %u messages lost due to rate-limiting", + ratelimit->name, ratelimit->missed); + logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); + ratelimit->missed = 0; + } + ratelimit->begin = 0; + ratelimit->done = 0; + } + + /* do actual limit check */ + if(ratelimit->burst > ratelimit->done) { + ratelimit->done++; + ret = 1; + } else { + if(ratelimit->missed == 0) { + snprintf((char*)msgbuf, sizeof(msgbuf), + "%s: begin to drop messages due to rate-limiting", + ratelimit->name); + logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); + } + ratelimit->missed++; + ret = 0; + } + +finalize_it: + return ret; +} + + /* ratelimit a message, that means: * - handle "last message repeated n times" logic * - handle actual (discarding) rate-limiting @@ -127,19 +190,13 @@ finalize_it: rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) { - rsRetVal localRet; DEFiRet; -#warning be sure to parse only when actually required! - if((pMsg->msgFlags & NEEDS_PARSING) != 0) { - if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) { - DBGPRINTF("Message discarded, parsing error %d\n", localRet); + *ppRepMsg = NULL; + if(ratelimit->bLinuxLike) { + if(withinRatelimit(ratelimit, pMsg->ttGenTime) == 0) ABORT_FINALIZE(RS_RET_DISCARDMSG); - } } - - *ppRepMsg = NULL; - if(ratelimit->bReduceRepeatMsgs) { CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg)); } @@ -147,6 +204,13 @@ finalize_it: RETiRet; } +/* returns 1, if the ratelimiter performs any checks and 0 otherwise */ +int +ratelimitChecked(ratelimit_t *ratelimit) +{ + return ratelimit->bLinuxLike || ratelimit->bReduceRepeatMsgs; +} + /* add a message to a ratelimiter/multisubmit structure. * ratelimiting is automatically handled according to the ratelimit @@ -184,19 +248,53 @@ finalize_it: RETiRet; } + +/* modname must be a static name (usually expected to be the module + * name and MUST be present. dynname may be NULL and can be used for + * dynamic information, e.g. PID or listener IP, ... + * Both values should be kept brief. + */ rsRetVal -ratelimitNew(ratelimit_t **ppThis) +ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname) { ratelimit_t *pThis; + char namebuf[256]; DEFiRet; CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t))); + if(modname == NULL) + modname ="*ERROR:MODULE NAME MISSING*"; + + if(dynname == NULL) { + pThis->name = strdup(modname); + } else { + snprintf(namebuf, sizeof(namebuf), "%s[%s]", + modname, dynname); + namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */ + pThis->name = strdup(namebuf); + } pThis->bReduceRepeatMsgs = runConf->globals.bReduceRepeatMsgs; + if(pThis->bReduceRepeatMsgs) + pThis->bActive = 1; *ppThis = pThis; finalize_it: RETiRet; } + +/* enable linux-like ratelimiting */ +void +ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst) +{ + ratelimit->interval = interval; + ratelimit->burst = burst; + ratelimit->done = 0; + ratelimit->missed = 0; + ratelimit->begin = 0; + ratelimit->bLinuxLike = 1; +} + + /* enable thread-safe operations mode. This make sure that * a single ratelimiter can be called from multiple threads. As * this causes some overhead and is not always required, it needs @@ -224,6 +322,7 @@ ratelimitDestruct(ratelimit_t *ratelimit) } if(ratelimit->bThreadSafe) pthread_mutex_destroy(&ratelimit->mut); + free(ratelimit->name); free(ratelimit); } diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h index 5fa884a8..a7959dfe 100644 --- a/runtime/ratelimit.h +++ b/runtime/ratelimit.h @@ -22,7 +22,10 @@ #define INCLUDED_RATELIMIT_H struct ratelimit_s { + int bActive; /**< any rate-limiting at all desired? */ + char *name; /**< rate limiter name, e.g. for user messages */ /* support for Linux kernel-type ratelimiting */ + int bLinuxLike; /**< Linux-like rate limiting enabled? */ unsigned short interval; unsigned short burst; unsigned done; @@ -37,11 +40,13 @@ struct ratelimit_s { }; /* prototypes */ -rsRetVal ratelimitNew(ratelimit_t **ppThis); +rsRetVal ratelimitNew(ratelimit_t **ppThis, char *modname, char *dynname); void ratelimitSetThreadSafe(ratelimit_t *ratelimit); +void ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned short interval, unsigned short burst); rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRep); rsRetVal ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg); void ratelimitDestruct(ratelimit_t *pThis); +int ratelimitChecked(ratelimit_t *ratelimit); rsRetVal ratelimitModInit(void); void ratelimitModExit(void); @@ -152,7 +152,7 @@ addNewLstnPort(tcpsrv_t *pThis, uchar *pszPort, int bSuppOctetFram) snprintf((char*)statname, sizeof(statname), "%s(%s)", pThis->pszInputName, pszPort); statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ CHKiRet(statsobj.SetName(pEntry->stats, statname)); - CHKiRet(ratelimitNew(&pEntry->ratelimiter)); + CHKiRet(ratelimitNew(&pEntry->ratelimiter, "tcperver", NULL)); STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit); CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, &(pEntry->ctrSubmit))); diff --git a/tools/syslogd.c b/tools/syslogd.c index a56cea94..f76fe8cb 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -705,7 +705,6 @@ rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub) { DEFiRet; -dbgprintf("DDDD: multiSubmitFlish, nElem %d\n", pMultiSub->nElem); if(pMultiSub->nElem > 0) { iRet = multiSubmitMsg2(pMultiSub); } |