From ace4f2f75202aec39449dac11b9eb1deca7428d7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 8 Oct 2008 18:55:11 +0200 Subject: reordered imudp processing. Message parsing is now done as part of main message queue worker processing (was part of the input thread) This should also improve performance, as potentially more work is done in parallel. --- plugins/imudp/imudp.c | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) (limited to 'plugins/imudp/imudp.c') diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 7f9afb68..aab201a9 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -40,6 +40,8 @@ #include "srUtils.h" #include "errmsg.h" #include "glbl.h" +#include "msg.h" +#include "parser.h" #include "datetime.h" MODULE_TYPE_INPUT @@ -153,15 +155,16 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, time_t ttGenTime; struct syslogTime stTime; socklen_t socklen; - ssize_t l; + ssize_t lenRcvBuf; struct sockaddr_storage frominet; + msg_t *pMsg; char errStr[1024]; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ socklen = sizeof(struct sockaddr_storage); - l = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); - if(l < 0) { + lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); @@ -193,13 +196,25 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, } } - DBGPRINTF("Message from inetd socket: #%d, host: %s, isPermitted: %d\n", fd, fromHost, *pbIsPermitted); + DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); + if(*pbIsPermitted) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { datetime.getCurrTime(&stTime, &ttGenTime); } - parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp", &stTime, ttGenTime); + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); + /* first trim the buffer to what we have actually received */ + CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar)* lenRcvBuf)); + memcpy(pMsg->pszRawMsg, pRcvBuf, lenRcvBuf); + pMsg->iLenRawMsg = lenRcvBuf; + MsgSetInputName(pMsg, "imudp"); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + pMsg->bParseHOSTNAME = MSG_PARSE_HOSTNAME; + pMsg->msgFlags = NOFLAG; + MsgSetRcvFrom(pMsg, (char*)fromHost); + CHKiRet(MsgSetRcvFromIP(pMsg, fromHostIP)); + CHKiRet(submitMsg(pMsg)); } } -- cgit v1.2.3