diff options
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r-- | plugins/imudp/imudp.c | 102 |
1 files changed, 75 insertions, 27 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index c64c06ca..87f4f37c 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -4,8 +4,6 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) - * * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. @@ -54,6 +52,7 @@ #include "prop.h" #include "ruleset.h" #include "statsobj.h" +#include "ratelimit.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -77,7 +76,9 @@ static struct lstn_s { struct lstn_s *next; int sock; /* socket */ ruleset_t *pRuleset; /* bound ruleset */ + prop_t *pInputName; statsobj_t *stats; /* listener stats */ + ratelimit_t *ratelimiter; STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; @@ -97,7 +98,6 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 */ -static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ #define BATCH_SIZE_DFLT 32 /* do not overdo, has heavy toll on memory, especially with large msgs */ #define TIME_REQUERY_DFLT 2 @@ -115,9 +115,13 @@ struct instanceConf_s { uchar *pszBindAddr; /* IP to bind socket to */ uchar *pszBindPort; /* Port to bind socket to */ uchar *pszBindRuleset; /* name of ruleset to bind to */ + uchar *inputname; ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + int ratelimitInterval; + int ratelimitBurst; int rcvbuf; /* 0 means: do not set, keep OS default */ struct instanceConf_s *next; + sbool bAppendPortToInpname; }; struct modConfData_s { @@ -149,7 +153,11 @@ static struct cnfparamblk modpblk = /* input instance parameters */ static struct cnfparamdescr inppdescr[] = { { "port", eCmdHdlrArray, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */ + { "inputname", eCmdHdlrGetWord, 0 }, + { "inputname.appendport", eCmdHdlrBinary, 0 }, { "address", eCmdHdlrString, 0 }, + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 }, { "rcvbufsize", eCmdHdlrSize, 0 }, { "ruleset", eCmdHdlrString, 0 } }; @@ -176,6 +184,10 @@ createInstance(instanceConf_t **pinst) inst->pszBindPort = NULL; inst->pszBindAddr = NULL; inst->pszBindRuleset = NULL; + inst->inputname = NULL; + inst->bAppendPortToInpname = 0; + inst->ratelimitBurst = 10000; /* arbitrary high limit */ + inst->ratelimitInterval = 0; /* off */ inst->rcvbuf = 0; /* node created, let's add to config */ @@ -235,7 +247,8 @@ addListner(instanceConf_t *inst) struct lstn_s *newlcnfinfo; uchar *bindName; uchar *port; - uchar statname[64]; + uchar dispname[64], inpnameBuf[128]; + uchar *inputname; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 @@ -260,11 +273,29 @@ addListner(instanceConf_t *inst) newlcnfinfo->next = NULL; newlcnfinfo->sock = newSocks[iSrc]; newlcnfinfo->pRuleset = inst->pBindRuleset; + snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port); + dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, (char*)dispname, NULL)); + if(inst->inputname == NULL) { + inputname = (uchar*)"imudp"; + } else { + inputname = inst->inputname; + } + if(inst->bAppendPortToInpname) { + snprintf((char*)inpnameBuf, sizeof(inpnameBuf), "%s%s", + inputname, port); + inpnameBuf[sizeof(inpnameBuf)-1] = '\0'; + inputname = inpnameBuf; + } + CHKiRet(prop.Construct(&newlcnfinfo->pInputName)); + CHKiRet(prop.SetString(newlcnfinfo->pInputName, + inputname, ustrlen(inputname))); + CHKiRet(prop.ConstructFinalize(newlcnfinfo->pInputName)); + ratelimitSetLinuxLike(newlcnfinfo->ratelimiter, inst->ratelimitInterval, + inst->ratelimitBurst); /* support statistics gathering */ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); - snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); - statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ - CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname)); + CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname)); STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit); CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); @@ -305,7 +336,7 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta static inline rsRetVal processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, uchar *rcvBuf, ssize_t lenRcvBuf, struct syslogTime *stTime, time_t ttGenTime, - struct sockaddr_storage *frominet, socklen_t socklen) + struct sockaddr_storage *frominet, socklen_t socklen, multi_submit_t *multiSub) { DEFiRet; msg_t *pMsg; @@ -354,14 +385,14 @@ processPacket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)rcvBuf, lenRcvBuf); - MsgSetInputName(pMsg, pInputName); + MsgSetInputName(pMsg, lstn->pInputName); MsgSetRuleset(pMsg, lstn->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, frominet)); - CHKiRet(submitMsg(pMsg)); + CHKiRet(ratelimitAddMsg(lstn->ratelimiter, multiSub, pMsg)); STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } @@ -386,10 +417,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f struct syslogTime stTime; struct sockaddr_storage frominet; char errStr[1024]; + msg_t *pMsgs[CONF_NUM_MULTISUB]; + multi_submit_t multiSub; int nelem; int i; assert(pThrd != NULL); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = CONF_NUM_MULTISUB; + multiSub.nElem = 0; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a "bad" receive, done below in the body */ if(pThrd->bShallStop == RSTRUE) @@ -430,11 +466,13 @@ dbgprintf("DDDD: recvmmsg returnd %d: %s\n", nelem, rs_strerror_r(errno, errStr, for(i = 0 ; i < nelem ; ++i) { processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, recvmsg_mmh[i].msg_hdr.msg_iov->iov_base, - recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet, recvmsg_mmh[i].msg_hdr.msg_namelen); + recvmsg_mmh[i].msg_len, &stTime, ttGenTime, &frominet, + recvmsg_mmh[i].msg_hdr.msg_namelen, &multiSub); } } finalize_it: + multiSubmitFlush(&multiSub); RETiRet; } #else /* we do not have recvmmsg() */ @@ -455,17 +493,26 @@ finalize_it: static inline rsRetVal processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { - DEFiRet; int iNbrTimeUsed; time_t ttGenTime; struct syslogTime stTime; ssize_t lenRcvBuf; struct sockaddr_storage frominet; + msg_t *pMsg; + prop_t *propFromHost = NULL; + prop_t *propFromHostIP = NULL; + multi_submit_t multiSub; + msg_t *pMsgs[CONF_NUM_MULTISUB]; + char errStr[1024]; + DEFiRet; char errStr[1024]; struct msghdr mh; struct iovec iov[1]; assert(pThrd != NULL); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = CONF_NUM_MULTISUB; + multiSub.nElem = 0; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ if(pThrd->bShallStop == RSTRUE) @@ -493,10 +540,12 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f } CHKiRet(processPacket(pThrd, lstn, frominetPrev, pbIsPermitted, pRcvBuf, lenRcvBuf, &stTime, - ttGenTime, &frominet, mh.msg_namelen)); + ttGenTime, &frominet, mh.msg_namelen, &multiSub)); } + finalize_it: + multiSubmitFlush(&multiSub); RETiRet; } #endif /* #ifdef HAVE_RECVMMSG */ @@ -782,10 +831,18 @@ createListner(es_str_t *port, struct cnfparamvals *pvals) continue; if(!strcmp(inppblk.descr[i].name, "port")) { continue; /* array, handled by caller */ + } else if(!strcmp(inppblk.descr[i].name, "inputname")) { + inst->inputname = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "inputname.appendport")) { + inst->bAppendPortToInpname = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "address")) { inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) { + inst->ratelimitBurst = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) { + inst->ratelimitInterval = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "rcvbufsize")) { inst->rcvbuf = (int) pvals[i].val.d.n; } else { @@ -805,10 +862,7 @@ BEGINnewInpInst CODESTARTnewInpInst DBGPRINTF("newInpInst (imudp)\n"); - pvals = nvlstGetParams(lst, &inppblk, NULL); - if(pvals == NULL) { - errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, - "imudp: required parameter are missing\n"); + if((pvals = nvlstGetParams(lst, &inppblk, NULL)) == NULL) { ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } if(Debug) { @@ -934,7 +988,7 @@ CODESTARTactivateCnfPrePrivDrop for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { addListner(inst); } - /* if we could not set up any listners, there is no point in running... */ + /* if we could not set up any listeners, there is no point in running... */ if(lcnfRoot == NULL) { errmsg.LogError(0, NO_ERRCODE, "imudp: no listeners could be started, " "input not activated.\n"); @@ -968,7 +1022,7 @@ CODESTARTfreeCnf for(inst = pModConf->root ; inst != NULL ; ) { free(inst->pszBindPort); free(inst->pszBindAddr); - free(inst->pBindRuleset); + free(inst->inputname); del = inst; inst = inst->next; free(del); @@ -1009,7 +1063,9 @@ CODESTARTafterRun net.clearAllowedSenders((uchar*)"UDP"); for(lstn = lcnfRoot ; lstn != NULL ; ) { statsobj.Destruct(&(lstn->stats)); + ratelimitDestruct(lstn->ratelimiter); close(lstn->sock); + prop.Destruct(&lstn->pInputName); lstnDel = lstn; lstn = lstn->next; free(lstnDel); @@ -1024,9 +1080,6 @@ ENDafterRun BEGINmodExit CODESTARTmodExit - if(pInputName != NULL) - prop.Destruct(&pInputName); - /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); @@ -1081,11 +1134,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); - /* we need to create the inputName property (only once during our lifetime) */ - CHKiRet(prop.Construct(&pInputName)); - CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imudp"), sizeof("imudp") - 1)); - CHKiRet(prop.ConstructFinalize(pInputName)); - /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); |