diff options
Diffstat (limited to 'runtime/ratelimit.c')
-rw-r--r-- | runtime/ratelimit.c | 141 |
1 files changed, 83 insertions, 58 deletions
diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c index e6ea6f4e..6b637677 100644 --- a/runtime/ratelimit.c +++ b/runtime/ratelimit.c @@ -43,6 +43,31 @@ DEFobjCurrIf(parser) /* static data */ +/* generate a "repeated n times" message */ +static inline msg_t * +ratelimitGenRepMsg(ratelimit_t *ratelimit) +{ + msg_t *repMsg; + size_t lenRepMsg; + uchar szRepMsg[1024]; + + if(ratelimit->nsupp == 1) { /* we simply use the original message! */ + repMsg = MsgAddRef(ratelimit->pMsg); + } else {/* we need to duplicate, original message may still be in use in other + * parts of the system! */ + if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) { + DBGPRINTF("Message duplication failed, dropping repeat message.\n"); + goto done; + } + lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), + " message repeated %d times: [%.800s]", + ratelimit->nsupp, getMSG(ratelimit->pMsg)); + MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg); + } + +done: return repMsg; +} + /* ratelimit a message, that means: * - handle "last message repeated n times" logic * - handle actual (discarding) rate-limiting @@ -54,14 +79,14 @@ DEFobjCurrIf(parser) * cooperative mode can enable a faulty caller to thrash up part * of the system, but we accept that risk (a faulty caller can * always do all sorts of evil, so...) - * Note that the message pointer may be updated upon return. In - * this case, the ratelimiter is reponsible for handling the - * original message. + * If *ppRepMsg != NULL on return, the caller must enqueue that + * message before the original message. */ rsRetVal -ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) +ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg) { rsRetVal localRet; + int bNeedUnlockMutex = 0; DEFiRet; if((pMsg->msgFlags & NEEDS_PARSING) != 0) { @@ -71,7 +96,12 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) } } + if(ratelimit->bThreadSafe) { + pthread_mutex_lock(&ratelimit->mut); + bNeedUnlockMutex = 1; + } + *ppRepMsg = NULL; /* suppress duplicate messages */ if( ratelimit->pMsg != NULL && getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) && @@ -84,72 +114,59 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit) /* use current message, so we have the new timestamp * (means we need to discard previous one) */ msgDestruct(&ratelimit->pMsg); - ratelimit->pMsg = MsgAddRef(pMsg); + ratelimit->pMsg = pMsg; ABORT_FINALIZE(RS_RET_DISCARDMSG); - } else {/* new message, save it */ - /* first check if we have a previous message stored - * if so, emit and then discard it first - */ + } else {/* new message, do "repeat processing" & save it */ if(ratelimit->pMsg != NULL) { if(ratelimit->nsupp > 0) { - dbgprintf("DDDD: would need to emit 'repeat' message\n"); - if(ratelimit->repMsg != NULL) { - dbgprintf("ratelimiter: call sequence error, have " - "previous repeat message - discarding\n"); - msgDestruct(&ratelimit->repMsg); - } - ratelimit->repMsg = ratelimit->pMsg; - iRet = RS_RET_OK_HAVE_REPMSG; + *ppRepMsg = ratelimitGenRepMsg(ratelimit); + ratelimit->nsupp = 0; } } ratelimit->pMsg = MsgAddRef(pMsg); } finalize_it: -dbgprintf("DDDD: in ratelimitMsg(): %d\n", iRet); + if(bNeedUnlockMutex) + pthread_mutex_unlock(&ratelimit->mut); RETiRet; } -/* return the current repeat message or NULL, if none is present. This MUST - * be called after ratelimitMsg() returned RS_RET_OK_HAVE_REPMSG. It is - * important that the message returned by us is enqueued BEFORE the original - * message, otherwise users my imply different order. - * If a message object is returned, the caller must destruct it if no longer - * needed. + +/* add a message to a ratelimiter/multisubmit structure. + * ratelimiting is automatically handled according to the ratelimit + * settings. + * if pMultiSub == NULL, a single-message enqueue happens (under reconsideration) */ -msg_t * -ratelimitGetRepeatMsg(ratelimit_t *ratelimit) +rsRetVal +ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg) { + rsRetVal localRet; msg_t *repMsg; - size_t lenRepMsg; - uchar szRepMsg[1024]; -dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n"); + DEFiRet; - /* we need to duplicate, original message may still be in use in other - * parts of the system! */ - if((repMsg = MsgDup(ratelimit->repMsg)) == NULL) { - DBGPRINTF("Message duplication failed, dropping repeat message.\n"); - goto done; + if(pMultiSub == NULL) { +dbgprintf("DDDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n"); +#warning missing multisub Implementation? + CHKiRet(submitMsg(pMsg)); + } else { + localRet = ratelimitMsg(ratelimit, pMsg, &repMsg); + if(repMsg != NULL) { + pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg2(pMultiSub)); + } + if(localRet == RS_RET_OK) { + pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg; + if(pMultiSub->nElem == pMultiSub->maxElem) + CHKiRet(multiSubmitMsg2(pMultiSub)); + } } - lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg), - " message repeated %d times: [%.800s]", - ratelimit->nsupp, getMSG(ratelimit->repMsg)); - - /* We now need to update the other message properties. Please note that digital - * signatures inside the message are invalidated. */ - datetime.getCurrTime(&(repMsg->tRcvdAt), &(repMsg->ttGenTime)); - memcpy(&repMsg->tTIMESTAMP, &repMsg->tRcvdAt, sizeof(struct syslogTime)); - MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg); - - if(ratelimit->repMsg != NULL) { - ratelimit->repMsg = NULL; - ratelimit->nsupp = 0; - } -done: return repMsg; +finalize_it: + RETiRet; } - rsRetVal ratelimitNew(ratelimit_t **ppThis) { @@ -162,25 +179,33 @@ finalize_it: RETiRet; } +/* enable thread-safe operations mode. This make sure that + * a single ratelimiter can be called from multiple threads. As + * this causes some overhead and is not always required, it needs + * to be explicitely enabled. This operation cannot be undone + * (think: why should one do that???) + */ +void +ratelimitSetThreadSafe(ratelimit_t *ratelimit) +{ + ratelimit->bThreadSafe = 1; + pthread_mutex_init(&ratelimit->mut, NULL); +} + void ratelimitDestruct(ratelimit_t *ratelimit) { msg_t *pMsg; if(ratelimit->pMsg != NULL) { if(ratelimit->nsupp > 0) { - if(ratelimit->repMsg != NULL) { - dbgprintf("ratelimiter/destuct: call sequence error, have " - "previous repeat message - discarding\n"); - msgDestruct(&ratelimit->repMsg); - } - ratelimit->repMsg = ratelimit->pMsg; - pMsg = ratelimitGetRepeatMsg(ratelimit); + pMsg = ratelimitGenRepMsg(ratelimit); if(pMsg != NULL) submitMsg2(pMsg); } - } else { msgDestruct(&ratelimit->pMsg); } + if(ratelimit->bThreadSafe) + pthread_mutex_destroy(&ratelimit->mut); free(ratelimit); } |