summaryrefslogtreecommitdiffstats
path: root/runtime/ratelimit.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ratelimit.c')
-rw-r--r--runtime/ratelimit.c141
1 files changed, 83 insertions, 58 deletions
diff --git a/runtime/ratelimit.c b/runtime/ratelimit.c
index e6ea6f4e..6b637677 100644
--- a/runtime/ratelimit.c
+++ b/runtime/ratelimit.c
@@ -43,6 +43,31 @@ DEFobjCurrIf(parser)
/* static data */
+/* generate a "repeated n times" message */
+static inline msg_t *
+ratelimitGenRepMsg(ratelimit_t *ratelimit)
+{
+ msg_t *repMsg;
+ size_t lenRepMsg;
+ uchar szRepMsg[1024];
+
+ if(ratelimit->nsupp == 1) { /* we simply use the original message! */
+ repMsg = MsgAddRef(ratelimit->pMsg);
+ } else {/* we need to duplicate, original message may still be in use in other
+ * parts of the system! */
+ if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) {
+ DBGPRINTF("Message duplication failed, dropping repeat message.\n");
+ goto done;
+ }
+ lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
+ " message repeated %d times: [%.800s]",
+ ratelimit->nsupp, getMSG(ratelimit->pMsg));
+ MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg);
+ }
+
+done: return repMsg;
+}
+
/* ratelimit a message, that means:
* - handle "last message repeated n times" logic
* - handle actual (discarding) rate-limiting
@@ -54,14 +79,14 @@ DEFobjCurrIf(parser)
* cooperative mode can enable a faulty caller to thrash up part
* of the system, but we accept that risk (a faulty caller can
* always do all sorts of evil, so...)
- * Note that the message pointer may be updated upon return. In
- * this case, the ratelimiter is reponsible for handling the
- * original message.
+ * If *ppRepMsg != NULL on return, the caller must enqueue that
+ * message before the original message.
*/
rsRetVal
-ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit)
+ratelimitMsg(ratelimit_t *ratelimit, msg_t *pMsg, msg_t **ppRepMsg)
{
rsRetVal localRet;
+ int bNeedUnlockMutex = 0;
DEFiRet;
if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
@@ -71,7 +96,12 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit)
}
}
+ if(ratelimit->bThreadSafe) {
+ pthread_mutex_lock(&ratelimit->mut);
+ bNeedUnlockMutex = 1;
+ }
+ *ppRepMsg = NULL;
/* suppress duplicate messages */
if( ratelimit->pMsg != NULL &&
getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) &&
@@ -84,72 +114,59 @@ ratelimitMsg(msg_t *pMsg, ratelimit_t *ratelimit)
/* use current message, so we have the new timestamp
* (means we need to discard previous one) */
msgDestruct(&ratelimit->pMsg);
- ratelimit->pMsg = MsgAddRef(pMsg);
+ ratelimit->pMsg = pMsg;
ABORT_FINALIZE(RS_RET_DISCARDMSG);
- } else {/* new message, save it */
- /* first check if we have a previous message stored
- * if so, emit and then discard it first
- */
+ } else {/* new message, do "repeat processing" & save it */
if(ratelimit->pMsg != NULL) {
if(ratelimit->nsupp > 0) {
- dbgprintf("DDDD: would need to emit 'repeat' message\n");
- if(ratelimit->repMsg != NULL) {
- dbgprintf("ratelimiter: call sequence error, have "
- "previous repeat message - discarding\n");
- msgDestruct(&ratelimit->repMsg);
- }
- ratelimit->repMsg = ratelimit->pMsg;
- iRet = RS_RET_OK_HAVE_REPMSG;
+ *ppRepMsg = ratelimitGenRepMsg(ratelimit);
+ ratelimit->nsupp = 0;
}
}
ratelimit->pMsg = MsgAddRef(pMsg);
}
finalize_it:
-dbgprintf("DDDD: in ratelimitMsg(): %d\n", iRet);
+ if(bNeedUnlockMutex)
+ pthread_mutex_unlock(&ratelimit->mut);
RETiRet;
}
-/* return the current repeat message or NULL, if none is present. This MUST
- * be called after ratelimitMsg() returned RS_RET_OK_HAVE_REPMSG. It is
- * important that the message returned by us is enqueued BEFORE the original
- * message, otherwise users my imply different order.
- * If a message object is returned, the caller must destruct it if no longer
- * needed.
+
+/* add a message to a ratelimiter/multisubmit structure.
+ * ratelimiting is automatically handled according to the ratelimit
+ * settings.
+ * if pMultiSub == NULL, a single-message enqueue happens (under reconsideration)
*/
-msg_t *
-ratelimitGetRepeatMsg(ratelimit_t *ratelimit)
+rsRetVal
+ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, msg_t *pMsg)
{
+ rsRetVal localRet;
msg_t *repMsg;
- size_t lenRepMsg;
- uchar szRepMsg[1024];
-dbgprintf("DDDD: in ratelimitGetRepeatMsg()\n");
+ DEFiRet;
- /* we need to duplicate, original message may still be in use in other
- * parts of the system! */
- if((repMsg = MsgDup(ratelimit->repMsg)) == NULL) {
- DBGPRINTF("Message duplication failed, dropping repeat message.\n");
- goto done;
+ if(pMultiSub == NULL) {
+dbgprintf("DDDDD: multiSubmitAddMsg, not checking ratelimiter for single submit!\n");
+#warning missing multisub Implementation?
+ CHKiRet(submitMsg(pMsg));
+ } else {
+ localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
+ if(repMsg != NULL) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub));
+ }
+ if(localRet == RS_RET_OK) {
+ pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
+ if(pMultiSub->nElem == pMultiSub->maxElem)
+ CHKiRet(multiSubmitMsg2(pMultiSub));
+ }
}
- lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
- " message repeated %d times: [%.800s]",
- ratelimit->nsupp, getMSG(ratelimit->repMsg));
-
- /* We now need to update the other message properties. Please note that digital
- * signatures inside the message are invalidated. */
- datetime.getCurrTime(&(repMsg->tRcvdAt), &(repMsg->ttGenTime));
- memcpy(&repMsg->tTIMESTAMP, &repMsg->tRcvdAt, sizeof(struct syslogTime));
- MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg);
-
- if(ratelimit->repMsg != NULL) {
- ratelimit->repMsg = NULL;
- ratelimit->nsupp = 0;
- }
-done: return repMsg;
+finalize_it:
+ RETiRet;
}
-
rsRetVal
ratelimitNew(ratelimit_t **ppThis)
{
@@ -162,25 +179,33 @@ finalize_it:
RETiRet;
}
+/* enable thread-safe operations mode. This make sure that
+ * a single ratelimiter can be called from multiple threads. As
+ * this causes some overhead and is not always required, it needs
+ * to be explicitely enabled. This operation cannot be undone
+ * (think: why should one do that???)
+ */
+void
+ratelimitSetThreadSafe(ratelimit_t *ratelimit)
+{
+ ratelimit->bThreadSafe = 1;
+ pthread_mutex_init(&ratelimit->mut, NULL);
+}
+
void
ratelimitDestruct(ratelimit_t *ratelimit)
{
msg_t *pMsg;
if(ratelimit->pMsg != NULL) {
if(ratelimit->nsupp > 0) {
- if(ratelimit->repMsg != NULL) {
- dbgprintf("ratelimiter/destuct: call sequence error, have "
- "previous repeat message - discarding\n");
- msgDestruct(&ratelimit->repMsg);
- }
- ratelimit->repMsg = ratelimit->pMsg;
- pMsg = ratelimitGetRepeatMsg(ratelimit);
+ pMsg = ratelimitGenRepMsg(ratelimit);
if(pMsg != NULL)
submitMsg2(pMsg);
}
- } else {
msgDestruct(&ratelimit->pMsg);
}
+ if(ratelimit->bThreadSafe)
+ pthread_mutex_destroy(&ratelimit->mut);
free(ratelimit);
}