diff options
Diffstat (limited to 'plugins/imuxsock/imuxsock.c')
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 135 |
1 files changed, 30 insertions, 105 deletions
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 871a1fa5..66a1c648 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -55,6 +55,7 @@ #include "statsobj.h" #include "datetime.h" #include "hashtable.h" +#include "ratelimit.h" MODULE_TYPE_INPUT MODULE_TYPE_NOKEEP @@ -105,15 +106,6 @@ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit) STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters) -struct rs_ratelimit_state { - unsigned short interval; - unsigned short burst; - unsigned done; - unsigned missed; - time_t begin; -}; -typedef struct rs_ratelimit_state rs_ratelimit_state_t; - /* a very simple "hash function" for process IDs - we simply use the * pid itself: it is quite expected that all pids may log some time, but @@ -143,6 +135,7 @@ typedef struct lstn_s { int flowCtl; /* flow control settings for this socket */ int ratelimitInterval; int ratelimitBurst; + ratelimit_t *dflt_ratelimiter;/*ratelimiter to apply if none else is to be used */ intTiny ratelimitSev; /* severity level (and below) for which rate-limiting shall apply */ struct hashtable *ht; /* our hashtable for rate-limiting */ sbool bParseHost; /* should parser parse host name? read-only after startup */ @@ -271,74 +264,9 @@ static struct cnfparamblk inppblk = /* we do not use this, because we do not bind to a ruleset so far * enable when this is changed: #include "im-helper.h" */ /* must be included AFTER the type definitions! */ -static void -initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst) -{ - rs->interval = interval; - rs->burst = burst; - rs->done = 0; - rs->missed = 0; - rs->begin = 0; -} - static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ -/* ratelimiting support, modelled after the linux kernel - * returns 1 if message is within rate limit and shall be - * processed, 0 otherwise. - * This implementation is NOT THREAD-SAFE and must not - * be called concurrently. - */ -static inline int -withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid) -{ - int ret; - uchar msgbuf[1024]; - - if(rs->interval == 0) { - ret = 1; - goto finalize_it; - } - - assert(rs->burst != 0); - - if(rs->begin == 0) - rs->begin = tt; - - /* resume if we go out of out time window */ - if(tt > rs->begin + rs->interval) { - if(rs->missed) { - snprintf((char*)msgbuf, sizeof(msgbuf), - "imuxsock lost %u messages from pid %lu due to rate-limiting", - rs->missed, (unsigned long) pid); - logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); - rs->missed = 0; - } - rs->begin = 0; - rs->done = 0; - } - - /* do actual limit check */ - if(rs->burst > rs->done) { - rs->done++; - ret = 1; - } else { - if(rs->missed == 0) { - snprintf((char*)msgbuf, sizeof(msgbuf), - "imuxsock begins to drop messages from pid %lu due to rate-limiting", - (unsigned long) pid); - logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0); - } - rs->missed++; - ret = 0; - } - -finalize_it: - return ret; -} - - /* create input instance, set default paramters, and * add it to the list of instances. */ @@ -445,7 +373,8 @@ addListner(instanceConf_t *inst) CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName)); } if(inst->ratelimitInterval > 0) { - if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) { + if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, + (void(*)(void*))ratelimitDestruct)) == NULL) { /* in this case, we simply turn off rate-limiting */ DBGPRINTF("imuxsock: turning off rate limiting because we could not " "create hash table\n"); @@ -464,6 +393,10 @@ addListner(instanceConf_t *inst) listeners[nfd].bParseTrusted = inst->bParseTrusted; listeners[nfd].bWritePid = inst->bWritePid; listeners[nfd].bUseSysTimeStamp = inst->bUseSysTimeStamp; + CHKiRet(ratelimitNew(&listeners[nfd].dflt_ratelimiter, "imuxsock", NULL)); + ratelimitSetLinuxLike(listeners[nfd].dflt_ratelimiter, + listeners[nfd].ratelimitInterval, + listeners[nfd].ratelimitBurst); nfd++; } else { errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n", @@ -475,7 +408,7 @@ finalize_it: } -/* discard all log sockets except for "socket" 0. Data for it comes from +/* discard/Destruct all log sockets except for "socket" 0. Data for it comes from * the constant memory pool - and if not, it is freeed via some other pointer. */ static rsRetVal discardLogSockets(void) @@ -493,6 +426,7 @@ static rsRetVal discardLogSockets(void) if(listeners[i].ht != NULL) { hashtable_destroy(listeners[i].ht, 1); /* 1 => free all values automatically */ } + ratelimitDestruct(listeners[i].dflt_ratelimiter); } return RS_RET_OK; @@ -604,19 +538,22 @@ finalize_it: * listener (the latter being a performance enhancement). */ static inline rsRetVal -findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) +findRatelimiter(lstn_t *pLstn, struct ucred *cred, ratelimit_t **prl) { - rs_ratelimit_state_t *rl; + ratelimit_t *rl; int r; pid_t *keybuf; + char pidbuf[256]; DEFiRet; if(cred == NULL) FINALIZE; +#if 0 // TODO: check deactivated? if(pLstn->ratelimitInterval == 0) { *prl = NULL; FINALIZE; } +#endif rl = hashtable_search(pLstn->ht, &cred->pid); if(rl == NULL) { @@ -624,10 +561,13 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n", (unsigned long) cred->pid); STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters); - CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t))); + snprintf(pidbuf, sizeof(pidbuf), "pid %lu", + (unsigned long) cred->pid); + pidbuf[sizeof(pidbuf)-1] = '\0'; /* to be on safe side */ + CHKiRet(ratelimitNew(&rl, "imuxsock", pidbuf)); + ratelimitSetLinuxLike(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst); CHKmalloc(keybuf = malloc(sizeof(pid_t))); *keybuf = cred->pid; - initRatelimitState(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst); r = hashtable_insert(pLstn->ht, keybuf, rl); if(r == 0) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); @@ -636,6 +576,8 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl) *prl = rl; finalize_it: + if(*prl == NULL) + *prl = pLstn->dflt_ratelimiter; RETiRet; } @@ -762,28 +704,6 @@ copyescaped(uchar *dstbuf, uchar *inbuf, int inlen) } -#if 0 -/* Creates new field to be added to event - * used for SystemLogParseTrusted parsing - */ -struct ee_field * -createNewField(char *fieldname, char *value, int lenValue) { - es_str_t *newStr; - struct ee_value *newVal; - struct ee_field *newField; - - newStr = es_newStrFromBuf(value, (es_size_t) lenValue); - - newVal = ee_newValue(ctxee); - ee_setStrValue(newVal, newStr); - - newField = ee_newFieldFromNV(ctxee, fieldname, newVal); - - return newField; -} -#endif - - /* submit received message to the queue engine * We now parse the message according to expected format so that we * can also mangle it if necessary. @@ -802,8 +722,8 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim uchar bufParseTAG[CONF_TAG_MAXSIZE]; struct syslogTime st; time_t tt; - rs_ratelimit_state_t *ratelimiter = NULL; int lenProp; + ratelimit_t *ratelimiter = NULL; uchar propBuf[1024]; uchar msgbuf[8192]; uchar *pmsgbuf; @@ -842,10 +762,12 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim tt = ts->tv_sec; } +#if 0 // TODO: think about stats counters (or wait for request...?) if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) { STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit); FINALIZE; } +#endif /* created trusted properties */ if(cred != NULL && pLstn->bAnnotate) { @@ -980,8 +902,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName); CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); - CHKiRet(submitMsg(pMsg)); - + ratelimitAddMsg(ratelimiter, NULL, pMsg); STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: RETiRet; @@ -1123,6 +1044,10 @@ activateListeners() listeners[0].bUseSysTimeStamp = runModConf->bUseSysTimeStamp; listeners[0].flags = runModConf->bIgnoreTimestamp ? IGNDATE : NOFLAG; listeners[0].flowCtl = runModConf->bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; + CHKiRet(ratelimitNew(&listeners[0].dflt_ratelimiter, "imuxsock", NULL)); + ratelimitSetLinuxLike(listeners[0].dflt_ratelimiter, + listeners[0].ratelimitInterval, + listeners[0].ratelimitBurst); sd_fds = sd_listen_fds(0); if(sd_fds < 0) { |