diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2008-10-08 18:55:11 +0200 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2008-10-08 18:55:11 +0200 |
commit | ace4f2f75202aec39449dac11b9eb1deca7428d7 (patch) | |
tree | 86637d21fafb06b262a30ff2f57dee32cd6483df /plugins/imudp/imudp.c | |
parent | 82b583c4f99dd9beb30360f222c4d2a1152f75e1 (diff) | |
download | rsyslog-ace4f2f75202aec39449dac11b9eb1deca7428d7.tar.gz rsyslog-ace4f2f75202aec39449dac11b9eb1deca7428d7.tar.bz2 rsyslog-ace4f2f75202aec39449dac11b9eb1deca7428d7.zip |
reordered imudp processing.
Message parsing is now done as part of main message queue worker
processing (was part of the input thread) This should also improve
performance, as potentially more work is done in parallel.
Diffstat (limited to 'plugins/imudp/imudp.c')
-rw-r--r-- | plugins/imudp/imudp.c | 27 |
1 files changed, 21 insertions, 6 deletions
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 7f9afb68..aab201a9 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -40,6 +40,8 @@ #include "srUtils.h" #include "errmsg.h" #include "glbl.h" +#include "msg.h" +#include "parser.h" #include "datetime.h" MODULE_TYPE_INPUT @@ -153,15 +155,16 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, time_t ttGenTime; struct syslogTime stTime; socklen_t socklen; - ssize_t l; + ssize_t lenRcvBuf; struct sockaddr_storage frominet; + msg_t *pMsg; char errStr[1024]; iNbrTimeUsed = 0; while(1) { /* loop is terminated if we have a bad receive, done below in the body */ socklen = sizeof(struct sockaddr_storage); - l = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); - if(l < 0) { + lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen); + if(lenRcvBuf < 0) { if(errno != EINTR && errno != EAGAIN) { rs_strerror_r(errno, errStr, sizeof(errStr)); DBGPRINTF("INET socket error: %d = %s.\n", errno, errStr); @@ -193,13 +196,25 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, } } - DBGPRINTF("Message from inetd socket: #%d, host: %s, isPermitted: %d\n", fd, fromHost, *pbIsPermitted); + DBGPRINTF("recv(%d,%d)/%s,acl:%d,msg:%.80s\n", fd, (int) lenRcvBuf, fromHost, *pbIsPermitted, pRcvBuf); + if(*pbIsPermitted) { if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) { datetime.getCurrTime(&stTime, &ttGenTime); } - parseAndSubmitMessage(fromHost, fromHostIP, pRcvBuf, l, - MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY, (uchar*)"imudp", &stTime, ttGenTime); + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); + /* first trim the buffer to what we have actually received */ + CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar)* lenRcvBuf)); + memcpy(pMsg->pszRawMsg, pRcvBuf, lenRcvBuf); + pMsg->iLenRawMsg = lenRcvBuf; + MsgSetInputName(pMsg, "imudp"); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + pMsg->bParseHOSTNAME = MSG_PARSE_HOSTNAME; + pMsg->msgFlags = NOFLAG; + MsgSetRcvFrom(pMsg, (char*)fromHost); + CHKiRet(MsgSetRcvFromIP(pMsg, fromHostIP)); + CHKiRet(submitMsg(pMsg)); } } |