diff options
-rw-r--r-- | dirty.h | 2 | ||||
-rw-r--r-- | plugins/imfile/imfile.c | 2 | ||||
-rw-r--r-- | plugins/imptcp/imptcp.c | 3 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 4 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 12 | ||||
-rw-r--r-- | runtime/ratelimit.c | 141 | ||||
-rw-r--r-- | runtime/ratelimit.h | 9 | ||||
-rw-r--r-- | runtime/rsyslog.h | 1 | ||||
-rw-r--r-- | tools/syslogd.c | 32 |
9 files changed, 101 insertions, 105 deletions
@@ -28,10 +28,10 @@ #define DIRTY_H_INCLUDED 1 rsRetVal __attribute__((deprecated)) multiSubmitMsg(multi_submit_t *pMultiSub); +rsRetVal multiSubmitMsg2(multi_submit_t *pMultiSub); /* friends only! */ rsRetVal submitMsg2(msg_t *pMsg); rsRetVal __attribute__((deprecated)) submitMsg(msg_t *pMsg); rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub); -rsRetVal multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit); rsRetVal logmsgInternal(int iErr, int pri, uchar *msg, int flags); rsRetVal __attribute__((deprecated)) parseAndSubmitMessage(uchar *hname, uchar *hnameIP, uchar *msg, int len, int flags, flowControl_t flowCtlTypeu, prop_t *pInputName, struct syslogTime *stTime, time_t ttGenTime, ruleset_t *pRuleset); rsRetVal diagGetMainMsgQSize(int *piSize); /* for imdiag */ diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index d6e6ad0e..453b6b05 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -306,7 +306,7 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) finalize_it: if(pThis->multiSub.nElem > 0) { /* submit everything that was not yet submitted */ - CHKiRet(multiSubmitMsg2(&pThis->multiSub, NULL)); + CHKiRet(multiSubmitMsg(&pThis->multiSub)); } ; /*EMPTY STATEMENT - needed to keep compiler happy - see below! */ /* Note: the problem above is that pthread:cleanup_pop() is a macro which diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c index e7650812..22d3ff25 100644 --- a/plugins/imptcp/imptcp.c +++ b/plugins/imptcp/imptcp.c @@ -682,7 +682,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult MsgSetRuleset(pMsg, pSrv->pRuleset); STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit); - multiSubmitAddMsg(pMultiSub, pMsg, pSrv->ratelimiter); + ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg); finalize_it: /* reset status variables */ @@ -1124,6 +1124,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst) pSrv->iKeepAliveTime = inst->iKeepAliveTime; pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose; CHKiRet(ratelimitNew(&pSrv->ratelimiter)); + ratelimitSetThreadSafe(pSrv->ratelimiter); CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort)); pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim; if(inst->pszBindAddr == NULL) diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 382ccbc2..2ddc10bb 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -51,6 +51,7 @@ #include "prop.h" #include "ruleset.h" #include "statsobj.h" +#include "ratelimit.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -75,6 +76,7 @@ static struct lstn_s { int sock; /* socket */ ruleset_t *pRuleset; /* bound ruleset */ statsobj_t *stats; /* listener stats */ + ratelimit_t *ratelimiter; STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; @@ -246,6 +248,7 @@ addListner(instanceConf_t *inst) newlcnfinfo->next = NULL; newlcnfinfo->sock = newSocks[iSrc]; newlcnfinfo->pRuleset = inst->pBindRuleset; + CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter)); /* support statistics gathering */ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); @@ -882,6 +885,7 @@ CODESTARTafterRun net.clearAllowedSenders((uchar*)"UDP"); for(lstn = lcnfRoot ; lstn != NULL ; ) { statsobj.Destruct(&(lstn->stats)); + ratelimitDestruct(lstn->ratelimiter); close(lstn->sock); lstnDel = lstn; lstn = lstn->next; diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 2f7daf87..3421863d 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -811,6 +811,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim int toffs; /* offset for trusted properties */ struct syslogTime dummyTS; struct json_object *json = NULL, *jval; + msg_t *repMsg; DEFiRet; #warning experimental code needs to be made production-ready! /* we need to decide how many ratelimiters we use --> hashtable @@ -992,14 +993,11 @@ if(ratelimit == NULL) MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName); CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); - localRet = ratelimitMsg(pMsg, ratelimit); - if(localRet == RS_RET_OK_HAVE_REPMSG) { -dbgprintf("DDDD: doing repeat submit!\n"); - CHKiRet(submitMsg2(ratelimitGetRepeatMsg(ratelimit))); - localRet = RS_RET_OK; - } + localRet = ratelimitMsg(ratelimit, pMsg, &repMsg); + if(repMsg != NULL) + CHKiRet(submitMsg(repMsg)); if(localRet == RS_RET_OK) - CHKiRet(submitMsg2(pMsg)); + CHKiRet(submitMsg(pMsg)); STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit); finalize_it: diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index e6ea6f4e..6b637677 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -43,6 +43,31 @@ DEFobjCurrIf(parser) /* static data */ +/* generate a "repeated n times" message */ +static inline msg_t * +ratelimitGenRepMsg(ratelimit_t *ratelimit) +{ + msg_t *repMsg; + size_t lenRepMsg; + uchar szRepMsg[1024]; + + if(ratelimit->nsupp == 1) { /* we simply use the original message! */ + repMsg = MsgAddRef(ratelimit->pMsg); + } else {/* we need to duplicate, original message may still be in use in other + * parts of the system! */ + if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) { + DBGPRINTF("Message duplication failed, dropping repeat message.\n"); + goto done; + } + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), + " message repeated %d times: [%.800s]", + ratelimit->nsupp, getMSG(ratelimit->pMsg)); + MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg); + } + +done: return repMsg; +} + /* ratelimit a message, that means: * - handle "last message repeated n times" logic * - handle actual (discarding) rate-limiting @@ -54,14 +79,14 @@ DEFobjCurrIf(parser) * cooperative mode can enable a faulty caller to thrash up part * of the system, but we accept that risk (a faulty caller can * always do all sorts of evil, so...) - * Note that the message pointer may be updated upon return. In - * this case, the ratelimiter is reponsible for handling the - * original message. + * If *ppRepMsg != NULL on return, the caller must enqueue that + * message before the original message. */ rsRetVal -ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) +ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) { rsRetVal localRet; + int bNeedUnlockMutex = 0; DEFiRet; if((pMsg->msgFlags & NEEDS_PARSING) != 0) { @@ -71,7 +96,12 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) } } + if(ratelimit->bThreadSafe) { + pthread_mutex_lock(&ratelimit->mut); + bNeedUnlockMutex = 1; + } + *ppRepMsg = NULL; /* suppress duplicate messages */ if( ratelimit->pMsg != NULL && getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) && @@ -84,72 +114,59 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) /* use current message, so we have the new timestamp * (means we need to discard previous one) */ msgDestruct(&ratelimit->pMsg); - ratelimit->pMsg = MsgAddRef(pMsg); + ratelimit->pMsg = pMsg; ABORT_FINALIZE(RS_RET_DISCARDMSG); - } else {/* new message, save it */ - /* first check if we have a previous message stored - * if so, emit and then discard it first - */ + } else {/* new message, do "repeat processing" & save it */ if(ratelimit->pMsg != NULL) { if(ratelimit->nsupp > 0) { - dbgprintf("DDDD: would need to emit 'repeat' message\n"); - if(ratelimit->repMsg != NULL) { - dbgprintf("ratelimiter: call sequence error, have " - "previous repeat message - discarding\n"); - msgDestruct(&ratelimit->repMsg); - } - ratelimit->repMsg = ratelimit->pMsg; - iRet = RS_RET_OK_HAVE_REPMSG; + *ppRepMsg = ratelimitGenRepMsg(ratelimit); + ratelimit->nsupp = 0; } } ratelimit->pMsg = MsgAddRef(pMsg); } finalize_it: -dbgprintf("DDDD: in ratelimitMsg(): %d\n", iRet); + if(bNeedUnlockMutex) + pthread_mutex_unlock(&ratelimit->mut); RETiRet; } -/* return the current repeat message or NULL, if none is present. This MUST - * be called after ratelimitMsg() returned RS_RET_OK_HAVE_REPMSG. It is - * important that the message returned by us is enqueued BEFORE the original - * message, otherwise users my imply different order. - * If a message object is returned, the caller must destruct it if no longer - * needed. + +/* add a message to a ratelimiter/multisubmit structure. + * ratelimiting is automatically handled according to the ratelimit + * settings. + * if pMultiSub == NULL, a single-message enqueue happens (under reconsideration) */ -msg_t * -ratelimitGetRepeatMsg(ratelimit_t *ratelimit) +rsRetVal +ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg) { + rsRetVal localRet; msg_t *repMsg; - size_t lenRepMsg; - uchar szRepMsg[1024]; -dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n"); + DEFiRet; - /* we need to duplicate, original message may still be in use in other - * parts of the system! */ - if((repMsg = MsgDup(ratelimit->repMsg)) == NULL) { - DBGPRINTF("Message duplication failed, dropping repeat message.\n"); - goto done; + if(pMultiSub == NULL) { +dbgprintf("DDDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n"); +#warning missing multisub Implementation? + CHKiRet(submitMsg(pMsg)); + } else { + localRet = ratelimitMsg(ratelimit, pMsg, &repMsg); + if(repMsg != NULL) { + pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg2(pMultiSub)); + } + if(localRet == RS_RET_OK) { + pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg2(pMultiSub)); + } } - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), - " message repeated %d times: [%.800s]", - ratelimit->nsupp, getMSG(ratelimit->repMsg)); - - /* We now need to update the other message properties. Please note that digital - * signatures inside the message are invalidated. */ - datetime.getCurrTime(&(repMsg->tRcvdAt), &(repMsg->ttGenTime)); - memcpy(&repMsg->tTIMESTAMP, &repMsg->tRcvdAt, sizeof(struct syslogTime)); - MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg); - - if(ratelimit->repMsg != NULL) { - ratelimit->repMsg = NULL; - ratelimit->nsupp = 0; - } -done: return repMsg; +finalize_it: + RETiRet; } - rsRetVal ratelimitNew(ratelimit_t **ppThis) { @@ -162,25 +179,33 @@ finalize_it: RETiRet; } +/* enable thread-safe operations mode. This make sure that + * a single ratelimiter can be called from multiple threads. As + * this causes some overhead and is not always required, it needs + * to be explicitely enabled. This operation cannot be undone + * (think: why should one do that???) + */ +void +ratelimitSetThreadSafe(ratelimit_t *ratelimit) +{ + ratelimit->bThreadSafe = 1; + pthread_mutex_init(&ratelimit->mut, NULL); +} + void ratelimitDestruct(ratelimit_t *ratelimit) { msg_t *pMsg; if(ratelimit->pMsg != NULL) { if(ratelimit->nsupp > 0) { - if(ratelimit->repMsg != NULL) { - dbgprintf("ratelimiter/destuct: call sequence error, have " - "previous repeat message - discarding\n"); - msgDestruct(&ratelimit->repMsg); - } - ratelimit->repMsg = ratelimit->pMsg; - pMsg = ratelimitGetRepeatMsg(ratelimit); + pMsg = ratelimitGenRepMsg(ratelimit); if(pMsg != NULL) submitMsg2(pMsg); } - } else { msgDestruct(&ratelimit->pMsg); } + if(ratelimit->bThreadSafe) + pthread_mutex_destroy(&ratelimit->mut); free(ratelimit); } diff --git a/runtime/ratelimit.h b/runtime/ratelimit.h index ba2c6b23..37dad900 100644 --- a/runtime/ratelimit.h +++ b/runtime/ratelimit.h @@ -24,14 +24,15 @@ struct ratelimit_s { unsigned nsupp; /**< nbr of msgs suppressed */ msg_t *pMsg; - msg_t *repMsg; /**< repeat message, temporary buffer */ - /* dummy field list - TODO: implement */ + sbool bThreadSafe; /**< do we need to operate in Thread-Safe mode? */ + pthread_mutex_t mut; /**< mutex if thread-safe operation desired */ }; /* prototypes */ rsRetVal ratelimitNew(ratelimit_t **ppThis); -rsRetVal ratelimitMsg(msg_t *ppMsg, ratelimit_t *ratelimit); -msg_t * ratelimitGetRepeatMsg(ratelimit_t *ratelimit); +void ratelimitSetThreadSafe(ratelimit_t *ratelimit); +rsRetVal ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRep); +rsRetVal ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg); void ratelimitDestruct(ratelimit_t *pThis); rsRetVal ratelimitModInit(void); void ratelimitModExit(void); diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index e98a9b8c..4404c475 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -392,7 +392,6 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_JNAME_NOTFOUND = -2305, /**< JSON name not found (does not exist) */ RS_RET_INVLD_SETOP = -2305, /**< invalid variable set operation, incompatible type */ RS_RET_RULESET_EXISTS = -2306,/**< ruleset already exists */ - RS_RET_OK_HAVE_REPMSG = -2307,/**< processing was OK, but we do have a repeat message (ratelimiter status) */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ diff --git a/tools/syslogd.c b/tools/syslogd.c index 1eac6ccb..a56cea94 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -700,38 +700,6 @@ multiSubmitMsg(multi_submit_t *pMultiSub) /* backward compat. level */ } -/* add a message to a multisubmit structure. This handles ratelimiting. IF - * pMultiSub == NULL, a single-message enqueue happens. */ -rsRetVal -multiSubmitAddMsg(multi_submit_t *pMultiSub, msg_t *pMsg, ratelimit_t *ratelimit) -{ - rsRetVal localRet; - DEFiRet; - - if(pMultiSub == NULL) { -dbgprintf("DDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n"); - CHKiRet(submitMsg(pMsg)); - } else { -dbgprintf("DDDD: have multisub!\n"); - localRet = ratelimitMsg(pMsg, ratelimit); - if(localRet == RS_RET_OK_HAVE_REPMSG) { -dbgprintf("DDDD: doing repeat submit!\n"); - pMultiSub->ppMsgs[pMultiSub->nElem++] = ratelimitGetRepeatMsg(ratelimit); - if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg2(pMultiSub)); - localRet = RS_RET_OK; - } - if(localRet == RS_RET_OK) { - pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; - if(pMultiSub->nElem == pMultiSub->maxElem) - CHKiRet(multiSubmitMsg2(pMultiSub)); - } - } - -finalize_it: - RETiRet; -} - /* flush multiSubmit, e.g. at end of read records */ rsRetVal multiSubmitFlush(multi_submit_t *pMultiSub) |