diff options
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r-- | plugins/imudp/imudp.c | 90 |
1 files changed, 64 insertions, 26 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 06e1471d..312645bd 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -4,8 +4,6 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c) - * * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. @@ -53,6 +51,7 @@ #include "prop.h" #include "ruleset.h" #include "statsobj.h" +#include "ratelimit.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -76,7 +75,9 @@ static struct lstn_s { struct lstn_s *next; int sock; /* socket */ ruleset_t *pRuleset; /* bound ruleset */ + prop_t *pInputName; statsobj_t *stats; /* listener stats */ + ratelimit_t *ratelimiter; STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit) } *lcnfRoot = NULL, *lcnfLast = NULL; @@ -91,7 +92,6 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 */ -static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ #define TIME_REQUERY_DFLT 2 #define SCHED_PRIO_UNSET -12345678 /* a value that indicates that the scheduling priority has not been set */ @@ -108,8 +108,12 @@ struct instanceConf_s { uchar *pszBindAddr; /* IP to bind socket to */ uchar *pszBindPort; /* Port to bind socket to */ uchar *pszBindRuleset; /* name of ruleset to bind to */ + uchar *inputname; ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */ + int ratelimitInterval; + int ratelimitBurst; struct instanceConf_s *next; + sbool bAppendPortToInpname; }; struct modConfData_s { @@ -139,8 +143,12 @@ static struct cnfparamblk modpblk = /* input instance parameters */ static struct cnfparamdescr inppdescr[] = { { "port", eCmdHdlrArray, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */ + { "inputname", eCmdHdlrGetWord, 0 }, + { "inputname.appendport", eCmdHdlrBinary, 0 }, { "address", eCmdHdlrString, 0 }, - { "ruleset", eCmdHdlrString, 0 } + { "ruleset", eCmdHdlrString, 0 }, + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 } }; static struct cnfparamblk inppblk = { CNFPARAMBLK_VERSION, @@ -165,6 +173,10 @@ createInstance(instanceConf_t **pinst) inst->pszBindPort = NULL; inst->pszBindAddr = NULL; inst->pszBindRuleset = NULL; + inst->inputname = NULL; + inst->bAppendPortToInpname = 0; + inst->ratelimitBurst = 10000; /* arbitrary high limit */ + inst->ratelimitInterval = 0; /* off */ /* node created, let's add to config */ if(loadModConf->tail == NULL) { @@ -223,7 +235,8 @@ addListner(instanceConf_t *inst) struct lstn_s *newlcnfinfo; uchar *bindName; uchar *port; - uchar statname[64]; + uchar dispname[64], inpnameBuf[128]; + uchar *inputname; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 @@ -248,11 +261,29 @@ addListner(instanceConf_t *inst) newlcnfinfo->next = NULL; newlcnfinfo->sock = newSocks[iSrc]; newlcnfinfo->pRuleset = inst->pBindRuleset; + snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port); + dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */ + CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, (char*)dispname, NULL)); + if(inst->inputname == NULL) { + inputname = (uchar*)"imudp"; + } else { + inputname = inst->inputname; + } + if(inst->bAppendPortToInpname) { + snprintf((char*)inpnameBuf, sizeof(inpnameBuf), "%s%s", + inputname, port); + inpnameBuf[sizeof(inpnameBuf)-1] = '\0'; + inputname = inpnameBuf; + } + CHKiRet(prop.Construct(&newlcnfinfo->pInputName)); + CHKiRet(prop.SetString(newlcnfinfo->pInputName, + inputname, ustrlen(inputname))); + CHKiRet(prop.ConstructFinalize(newlcnfinfo->pInputName)); + ratelimitSetLinuxLike(newlcnfinfo->ratelimiter, inst->ratelimitInterval, + inst->ratelimitBurst); /* support statistics gathering */ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats))); - snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port); - statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */ - CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname)); + CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname)); STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit); CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"), ctrType_IntCtr, &(newlcnfinfo->ctrSubmit))); @@ -304,7 +335,6 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta static inline rsRetVal processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted) { - DEFiRet; int iNbrTimeUsed; time_t ttGenTime; struct syslogTime stTime; @@ -314,9 +344,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f msg_t *pMsg; prop_t *propFromHost = NULL; prop_t *propFromHostIP = NULL; + multi_submit_t multiSub; + msg_t *pMsgs[CONF_NUM_MULTISUB]; char errStr[1024]; + DEFiRet; assert(pThrd != NULL); + multiSub.ppMsgs = pMsgs; + multiSub.maxElem = CONF_NUM_MULTISUB; + multiSub.nElem = 0; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ if(pThrd->bShallStop == RSTRUE) @@ -367,7 +403,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f *pbIsPermitted = 1; /* no check -> everything permitted */ } - DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); + DBGPRINTF("imudp:recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf); if(*pbIsPermitted != 0) { if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) { @@ -376,19 +412,22 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f /* we now create our own message object and submit it to the queue */ CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); - MsgSetInputName(pMsg, pInputName); + MsgSetInputName(pMsg, lstn->pInputName); MsgSetRuleset(pMsg, lstn->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL; if(*pbIsPermitted == 2) pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */ CHKiRet(msgSetFromSockinfo(pMsg, &frominet)); - CHKiRet(submitMsg(pMsg)); + CHKiRet(ratelimitAddMsg(lstn->ratelimiter, &multiSub, pMsg)); STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit); } } + finalize_it: + multiSubmitFlush(&multiSub); + if(propFromHost != NULL) prop.Destruct(&propFromHost); if(propFromHostIP != NULL) @@ -678,10 +717,18 @@ createListner(es_str_t *port, struct cnfparamvals *pvals) continue; if(!strcmp(inppblk.descr[i].name, "port")) { continue; /* array, handled by caller */ + } else if(!strcmp(inppblk.descr[i].name, "inputname")) { + inst->inputname = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "inputname.appendport")) { + inst->bAppendPortToInpname = (int) pvals[i].val.d.n; } else if(!strcmp(inppblk.descr[i].name, "address")) { inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(inppblk.descr[i].name, "ruleset")) { inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) { + inst->ratelimitBurst = (int) pvals[i].val.d.n; + } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) { + inst->ratelimitInterval = (int) pvals[i].val.d.n; } else { dbgprintf("imudp: program error, non-handled " "param '%s'\n", inppblk.descr[i].name); @@ -699,10 +746,7 @@ BEGINnewInpInst CODESTARTnewInpInst DBGPRINTF("newInpInst (imudp)\n"); - pvals = nvlstGetParams(lst, &inppblk, NULL); - if(pvals == NULL) { - errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, - "imudp: required parameter are missing\n"); + if((pvals = nvlstGetParams(lst, &inppblk, NULL)) == NULL) { ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } if(Debug) { @@ -825,7 +869,7 @@ CODESTARTactivateCnfPrePrivDrop for(inst = runModConf->root ; inst != NULL ; inst = inst->next) { addListner(inst); } - /* if we could not set up any listners, there is no point in running... */ + /* if we could not set up any listeners, there is no point in running... */ if(lcnfRoot == NULL) { errmsg.LogError(0, NO_ERRCODE, "imudp: no listeners could be started, " "input not activated.\n"); @@ -851,7 +895,7 @@ CODESTARTfreeCnf for(inst = pModConf->root ; inst != NULL ; ) { free(inst->pszBindPort); free(inst->pszBindAddr); - free(inst->pBindRuleset); + free(inst->inputname); del = inst; inst = inst->next; free(del); @@ -892,7 +936,9 @@ CODESTARTafterRun net.clearAllowedSenders((uchar*)"UDP"); for(lstn = lcnfRoot ; lstn != NULL ; ) { statsobj.Destruct(&(lstn->stats)); + ratelimitDestruct(lstn->ratelimiter); close(lstn->sock); + prop.Destruct(&lstn->pInputName); lstnDel = lstn; lstn = lstn->next; free(lstnDel); @@ -907,9 +953,6 @@ ENDafterRun BEGINmodExit CODESTARTmodExit - if(pInputName != NULL) - prop.Destruct(&pInputName); - /* release what we no longer need */ objRelease(errmsg, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); @@ -964,11 +1007,6 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); - /* we need to create the inputName property (only once during our lifetime) */ - CHKiRet(prop.Construct(&pInputName)); - CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imudp"), sizeof("imudp") - 1)); - CHKiRet(prop.ConstructFinalize(pInputName)); - /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord, NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID)); |