diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imdiag/imdiag.c | 1 | ||||
-rw-r--r-- | plugins/imfile/imfile.c | 1 | ||||
-rw-r--r-- | plugins/imklog/imklog.c | 1 | ||||
-rw-r--r-- | plugins/immark/immark.c | 18 | ||||
-rw-r--r-- | plugins/imrelp/imrelp.c | 8 | ||||
-rw-r--r-- | plugins/imtemplate/imtemplate.c | 36 | ||||
-rw-r--r-- | plugins/imudp/imudp.c | 85 | ||||
-rw-r--r-- | plugins/imuxsock/imuxsock.c | 78 |
8 files changed, 139 insertions, 89 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c index 9602f50d..4359cda1 100644 --- a/plugins/imdiag/imdiag.c +++ b/plugins/imdiag/imdiag.c @@ -213,7 +213,6 @@ doInjectMsg(int iNum) MsgSetInputName(pMsg, pInputName); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; - pMsg->bParseHOSTNAME = 1; MsgSetRcvFrom(pMsg, pRcvDummy); CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy)); CHKiRet(submitMsg(pMsg)); diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index 7c588f90..7cfde940 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -107,7 +107,6 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) MsgSetTAG(pMsg, pInfo->pszTag, pInfo->lenTag); pMsg->iFacility = LOG_FAC(pInfo->iFacility); pMsg->iSeverity = LOG_PRI(pInfo->iSeverity); - pMsg->bParseHOSTNAME = 0; CHKiRet(submitMsg(pMsg)); finalize_it: RETiRet; diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c index 7994c5eb..c59ce04f 100644 --- a/plugins/imklog/imklog.c +++ b/plugins/imklog/imklog.c @@ -111,7 +111,6 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity) MsgSetTAG(pMsg, pszTag, ustrlen(pszTag)); pMsg->iFacility = LOG_FAC(iFacility); pMsg->iSeverity = LOG_PRI(iSeverity); - pMsg->bParseHOSTNAME = 0; CHKiRet(submitMsg(pMsg)); finalize_it: diff --git a/plugins/immark/immark.c b/plugins/immark/immark.c index 8504f872..19f43456 100644 --- a/plugins/immark/immark.c +++ b/plugins/immark/immark.c @@ -42,6 +42,7 @@ #include "module-template.h" #include "errmsg.h" #include "msg.h" +#include "glbl.h" MODULE_TYPE_INPUT @@ -50,8 +51,16 @@ MODULE_TYPE_INPUT /* Module static data */ DEF_IMOD_STATIC_DATA +DEFobjCurrIf(glbl) static int iMarkMessagePeriod = DEFAULT_MARK_PERIOD; +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + /* This function is called to gather input. It must terminate only * a) on failure (iRet set accordingly) * b) on termination of the input module (as part of the unload process) @@ -77,6 +86,10 @@ CODESTARTrunInput * rgerhards, 2007-12-17 */ CHKiRet(thrdSleep(pThrd, iMarkMessagePeriod, 0)); /* seconds, micro seconds */ + + if(glbl.GetGlobalInputTermState() == 1) + break; /* terminate input! */ + logmsgInternal(NO_ERRCODE, LOG_INFO, (uchar*)"-- MARK --", MARK); } finalize_it: @@ -106,6 +119,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) @@ -119,9 +133,9 @@ BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"markmessageperiod", 0, eCmdHdlrInt, NULL, &iMarkMessagePeriod, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID)); ENDmodInit -/* - * vi:set ai: +/* vi:set ai: */ diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c index b9e7b2f8..9be38f8f 100644 --- a/plugins/imrelp/imrelp.c +++ b/plugins/imrelp/imrelp.c @@ -81,14 +81,14 @@ isPermittedHost(struct sockaddr *addr, char *fromHostFQDN, void __attribute__((u * are different from our rsRetVal. So we can simply use our own iRet system * to fulfill the requirement. * rgerhards, 2008-03-21 - * TODO: we currently do not receive the remote hosts's IP. As a work-around, we - * use "???" for the time being. -- rgerhards, 2008-05-16 + * Note: librelp 1.0.0 is required in order to receive the IP address, otherwise + * we will only see the hostname (twice). -- rgerhards, 2009-10-14 */ static relpRetVal -onSyslogRcv(uchar *pHostname, uchar __attribute__((unused)) *pIP, uchar *pMsg, size_t lenMsg) +onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *pMsg, size_t lenMsg) { DEFiRet; - parseAndSubmitMessage(pHostname, (uchar*) "[unset]", pMsg, lenMsg, PARSE_HOSTNAME, + parseAndSubmitMessage(pHostname, pIP, pMsg, lenMsg, PARSE_HOSTNAME, eFLOWCTL_LIGHT_DELAY, pInputName, NULL, 0); RETiRet; diff --git a/plugins/imtemplate/imtemplate.c b/plugins/imtemplate/imtemplate.c index 366408a0..82084745 100644 --- a/plugins/imtemplate/imtemplate.c +++ b/plugins/imtemplate/imtemplate.c @@ -231,49 +231,25 @@ CODESTARTrunInput * logs an error message as syslogd, just as printf, e.g. * errmsg.LogError(NO_ERRCODE, "Error %d occured during %s", 1, "test"); * - * There are several ways how a message can be enqueued. This part of the - * interface is currently underspecified. Have a look at the function definitions - * in syslogd.c (sorry, folks...). - * - * If you received a full syslog message that must be decoded by a message - * parser, parseAndSubmitMessage() is the way to go. It's not just a funny name - * but also a quite some legacy. Consequently, its interface is, ummm, not - * well designed. - * parseAndSubmitMessage((char*)fromHost, (char*) pRcvBuf, lenRcvd, bParseHost); - * fromHost - * is the host that we received the message from (a string) - * pRcvBuf - * is the received (to-be-decoded) message - * lenRcvd - * is the length of the received message. Please note that pRcvBuf is - * NOT a standard C-string. Most importantly it is NOT expected to be - * \0-terminated. Thus the lenght is vitally imporant (if it is wrong, - * rsyslogd will probably segfault). - * bParseHost - * is a boolean (0-no, 1-yes). It tells the parser whether or not - * a hostname should be parsed from the message. This is important - * for sources that are known not to provide a hostname. - * Use define MSG_PARSE_HOSTNAME and MSG_DONT_PARSE_HOSTNAME - * - * Another, more elaborate, way is to create the message object ourselves and - * pass it to the rule engine. That way is more appropriate if the message + * To submit the message to the queue engine, we must create the message + * object and fill it with data. If it contains a syslog message that must + * be parsed, we can add a flag that requests parsing. Otherwise, we must + * fill the properties ourselves. That is appropriate if the message * does not need to be parsed, for example when reading text (log) files. In that way, * we can set the message properties as of our liking. This is how it works: * msg_t *pMsg; CHKiRet(msgConstruct(&pMsg)); - MsgSetUxTradMsg(pMsg, msg); MsgSetRawMsg(pMsg, msg); MsgSetHOSTNAME(pMsg, LocalHostName); MsgSetTAG(pMsg, "rsyslogd:"); pMsg->iFacility = LOG_FAC(pri); pMsg->iSeverity = LOG_PRI(pri); - pMsg->bParseHOSTNAME = 0; flags |= INTERNAL_MSG; logmsg(pMsg, flags); / * some time, CHKiRet() will work here, too [today NOT!] * / * - * Note that UxTradMsg is a wild construct. For the time being, set it to - * the raw message text. I am hard thinking at dropping that beast at all... + * NOTE: for up-to-date usage samples, see the other provided input modules. + * A good starting point is probably imuxsock. * * This example probably does not set all message properties (but the ones * that are of practical importance). If you need all, check msg.h. Use diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c index 3fabf1a2..59d23adb 100644 --- a/plugins/imudp/imudp.c +++ b/plugins/imudp/imudp.c @@ -47,6 +47,7 @@ #include "parser.h" #include "datetime.h" #include "prop.h" +#include "ruleset.h" #include "unicode-helper.h" MODULE_TYPE_INPUT @@ -60,6 +61,7 @@ DEFobjCurrIf(glbl) DEFobjCurrIf(net) DEFobjCurrIf(datetime) DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) static int iMaxLine; /* maximum UDP message size supported */ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded @@ -68,13 +70,14 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte */ static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements * read-only after init(), but beware of restart! */ +static ruleset_t **udpRulesets = NULL; /* ruleset to be used with sockets in question (entry 0 is empty) */ static uchar *pszBindAddr = NULL; /* IP to bind socket to */ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc * it so that we can check available memory in willRun() and request * termination if we can not get it. -- rgerhards, 2007-12-27 */ static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ -// TODO: static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ +static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */ #define TIME_REQUERY_DFLT 2 static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */ @@ -93,6 +96,7 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) int *newSocks; int *tmpSocks; int iSrc, iDst; + ruleset_t **tmpRulesets; /* check which address to bind to. We could do this more compact, but have not * done so in order to make the code more readable. -- rgerhards, 2007-12-27 @@ -113,26 +117,39 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal) if(udpLstnSocks == NULL) { /* esay, we can just replace it */ udpLstnSocks = newSocks; + CHKmalloc(udpRulesets = (ruleset_t**) malloc(sizeof(ruleset_t*) * (newSocks[0] + 1))); + for(iDst = 1 ; iDst < newSocks[0] ; ++iDst) + udpRulesets[iDst] = pBindRuleset; } else { /* we need to add them */ - if((tmpSocks = malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]))) == NULL) { + tmpSocks = (int*) malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0])); + tmpRulesets = (ruleset_t**) malloc(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0])); + if(tmpSocks == NULL || tmpRulesets == NULL) { DBGPRINTF("out of memory trying to allocate udp listen socket array\n"); /* in this case, we discard the new sockets but continue with what we * already have */ free(newSocks); + free(tmpSocks); + free(tmpRulesets); ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); } else { /* ready to copy */ iDst = 1; - for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc) + for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc) { tmpSocks[iDst++] = udpLstnSocks[iSrc]; - for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) + tmpRulesets[iDst++] = udpRulesets[iSrc]; + } + for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) { tmpSocks[iDst++] = newSocks[iSrc]; + tmpRulesets[iDst++] = pBindRuleset; + } tmpSocks[0] = udpLstnSocks[0] + newSocks[0]; free(newSocks); free(udpLstnSocks); udpLstnSocks = tmpSocks; + free(udpRulesets); + udpRulesets = tmpRulesets; } } } @@ -144,7 +161,6 @@ finalize_it: } -#if 0 /* TODO: implement when tehre is time, requires restructure of socket array! */ /* accept a new ruleset to bind. Checks if it exists and complains, if not */ static rsRetVal setRuleset(void __attribute__((unused)) *pVal, uchar *pszName) @@ -165,7 +181,6 @@ finalize_it: free(pszName); /* no longer needed */ RETiRet; } -#endif /* This function is a helper to runInput. I have extracted it @@ -184,7 +199,7 @@ finalize_it: */ static inline rsRetVal processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, - uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP) + uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP, ruleset_t *pRuleset) { DEFiRet; int iNbrTimeUsed; @@ -255,9 +270,9 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted, CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime)); MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf); MsgSetInputName(pMsg, pInputName); + MsgSetRuleset(pMsg, pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME; - pMsg->bParseHOSTNAME = 1; MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost); CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP)); CHKiRet(submitMsg(pMsg)); @@ -279,16 +294,14 @@ finalize_it: * interface. ./configure settings control which one is used. * rgerhards, 2009-09-09 */ -#if HAVE_EPOLL_CREATE1 +#if defined(HAVE_EPOLL_CREATE1) || defined(HAVE_EPOLL_CREATE) #define NUM_EPOLL_EVENTS 10 -rsRetVal rcvMainLoop() +rsRetVal rcvMainLoop(thrdInfo_t *pThrd) { DEFiRet; - int maxfds; int nfds; int efd; int i; - fd_set readfds; struct sockaddr_storage frominetPrev; int bIsPermitted; uchar fromHost[NI_MAXHOST]; @@ -306,24 +319,25 @@ rsRetVal rcvMainLoop() CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event))); - efd = epoll_create1(EPOLL_CLOEXEC); +# if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + DBGPRINTF("imudp uses epoll_create1()\n"); + efd = epoll_create1(EPOLL_CLOEXEC); +# else + DBGPRINTF("imudp uses epoll_create()\n"); + efd = epoll_create(NUM_EPOLL_EVENTS); +# endif if(efd < 0) { DBGPRINTF("epoll_create1() could not create fd\n"); - // TODO: "good" error message ABORT_FINALIZE(RS_RET_IO_ERROR); } /* fill the epoll set - we need to do this only once, as the set * can not change dyamically. */ - maxfds = 0; - FD_ZERO (&readfds); - - /* Add the UDP listen sockets to the list of read descriptors. */ for (i = 0; i < *udpLstnSocks; i++) { if (udpLstnSocks[i+1] != -1) { udpEPollEvt[i].events = EPOLLIN | EPOLLET; - udpEPollEvt[i].data.fd = udpLstnSocks[i+1]; + udpEPollEvt[i].data.u64 = i+1; if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) { rs_strerror_r(errno, errStr, sizeof(errStr)); errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n", @@ -337,18 +351,24 @@ rsRetVal rcvMainLoop() nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1); DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds); + if(pThrd->bShallStop == TRUE) + break; /* terminate input! */ + for(i = 0 ; i < nfds ; ++i) { - processSocket(currEvt[i].data.fd, &frominetPrev, &bIsPermitted, - fromHost, fromHostFQDN, fromHostIP); + processSocket(udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted, + fromHost, fromHostFQDN, fromHostIP, udpRulesets[currEvt[i].data.u64]); } } finalize_it: + if(udpEPollEvt != NULL) + free(udpEPollEvt); + RETiRet; } #else /* #if HAVE_EPOLL_CREATE1 */ /* this is the code for the select() interface */ -rsRetVal rcvMainLoop() +rsRetVal rcvMainLoop(thrdInfo_t *pThrd) { DEFiRet; int maxfds; @@ -366,6 +386,7 @@ rsRetVal rcvMainLoop() */ bIsPermitted = 0; memset(&frominetPrev, 0, sizeof(frominetPrev)); + DBGPRINTF("imudp uses select()\n"); while(1) { /* Add the Unix Domain Sockets to the list of read @@ -402,7 +423,7 @@ rsRetVal rcvMainLoop() for(i = 0; nfds && i < *udpLstnSocks; i++) { if(FD_ISSET(udpLstnSocks[i+1], &readfds)) { processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted, - fromHost, fromHostFQDN, fromHostIP); + fromHost, fromHostFQDN, fromHostIP, udpRulesets[i+1]); --nfds; /* indicate we have processed one descriptor */ } } @@ -423,7 +444,7 @@ CODESTARTrunInput * signalled to do so. This, however, is handled by the framework, * right into the sleep below. */ - iRet = rcvMainLoop(); + iRet = rcvMainLoop(pThrd); ENDrunInput @@ -443,9 +464,7 @@ CODESTARTwillRun iMaxLine = glbl.GetMaxLine(); - if((pRcvBuf = malloc((iMaxLine + 1) * sizeof(char))) == NULL) { - ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); - } + CHKmalloc(pRcvBuf = malloc((iMaxLine + 1) * sizeof(char))); finalize_it: ENDwillRun @@ -457,6 +476,8 @@ CODESTARTafterRun if(udpLstnSocks != NULL) { net.closeUDPListenSockets(udpLstnSocks); udpLstnSocks = NULL; + free(udpRulesets); + udpRulesets = NULL; } if(pRcvBuf != NULL) { free(pRcvBuf); @@ -474,6 +495,7 @@ CODESTARTmodExit objRelease(glbl, CORE_COMPONENT); objRelease(datetime, CORE_COMPONENT); objRelease(prop, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); objRelease(net, LM_NET_FILENAME); ENDmodExit @@ -497,10 +519,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a free(pszBindAddr); pszBindAddr = NULL; } - if(udpLstnSocks != NULL) { - net.closeUDPListenSockets(udpLstnSocks); - udpLstnSocks = NULL; - } iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */ return RS_RET_OK; } @@ -514,13 +532,12 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); CHKiRet(objUse(net, LM_NET_FILENAME)); /* register config file handlers */ - /* TODO: add - but this requires more changes, no time right now... - CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverbindruleset", 0, eCmdHdlrGetWord, + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord, setRuleset, NULL, STD_LOADABLE_MODULE_ID)); - */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverrun", 0, eCmdHdlrGetWord, addListner, NULL, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord, diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index b8da4966..b8546ce3 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -6,7 +6,7 @@ * * File begun on 2007-12-20 by RGerhards (extracted from syslogd.c) * - * Copyright 2007 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -70,6 +70,7 @@ DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(prop) +static prop_t *pLocalHostIP = NULL; /* there is only one global IP for all internally-generated messages */ static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */ static int startIndexUxLocalSockets; /* process funix from that index on (used to * suppress local logging. rgerhards 2005-08-01 @@ -79,7 +80,7 @@ static int funixParseHost[MAXFUNIX] = { 0, }; /* should parser parse host name? static int funixFlags[MAXFUNIX] = { IGNDATE, }; /* should parser parse host name? read-only after startup */ static int funixCreateSockPath[MAXFUNIX] = { 0, }; /* auto-creation of socket directory? */ static uchar *funixn[MAXFUNIX] = { (uchar*) _PATH_LOG }; /* read-only after startup */ -static uchar *funixHName[MAXFUNIX] = { NULL, }; /* host-name override - if set, use this instead of actual name */ +static prop_t *funixHName[MAXFUNIX] = { NULL, }; /* host-name override - if set, use this instead of actual name */ static int funixFlowCtl[MAXFUNIX] = { eFLOWCTL_NO_DELAY, }; /* flow control settings for this socket */ static int funix[MAXFUNIX] = { -1, }; /* read-only after startup */ static int nfunix = 1; /* number of Unix sockets open / read-only after startup */ @@ -122,30 +123,41 @@ static rsRetVal setSystemLogFlowControl(void __attribute__((unused)) *pVal, int * rgerhards, 2007-12-20 * added capability to specify hostname for socket -- rgerhards, 2008-08-01 */ -static rsRetVal addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal) +static rsRetVal +addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal) { + DEFiRet; + if(nfunix < MAXFUNIX) { if(*pNewVal == ':') { funixParseHost[nfunix] = 1; - } - else { + } else { funixParseHost[nfunix] = 0; } - funixHName[nfunix] = pLogHostName; - pLogHostName = NULL; /* re-init for next, not freed because funixHName[] now owns it */ + CHKiRet(prop.Construct(&(funixHName[nfunix]))); + if(pLogHostName == NULL) { + CHKiRet(prop.SetString(funixHName[nfunix], glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()))); + } else { + CHKiRet(prop.SetString(funixHName[nfunix], pLogHostName, ustrlen(pLogHostName))); + /* reset hostname for next socket */ + free(pLogHostName); + pLogHostName = NULL; + } + CHKiRet(prop.ConstructFinalize(funixHName[nfunix])); funixFlowCtl[nfunix] = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY; funixFlags[nfunix] = bIgnoreTimestamp ? IGNDATE : NOFLAG; funixCreateSockPath[nfunix] = bCreateSockPath; funixn[nfunix++] = pNewVal; - } - else { + } else { errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n", pNewVal); } - return RS_RET_OK; +finalize_it: + RETiRet; } + /* free the funixn[] socket names - needed as cleanup on several places * note that nfunix is NOT reset! funixn[0] is never freed, as it comes from * the constant memory pool - and if not, it is freeed via some other pointer. @@ -160,8 +172,7 @@ static rsRetVal discardFunixn(void) funixn[i] = NULL; } if(funixHName[i] != NULL) { - free(funixHName[i]); - funixHName[i] = NULL; + prop.Destruct(&(funixHName[i])); } } @@ -197,6 +208,35 @@ static int create_unix_socket(const char *path, int bCreatePath) } +/* submit received message to the queue engine + */ +static inline rsRetVal +SubmitMsg(uchar *pRcv, int lenRcv, int iSock) +{ + msg_t *pMsg; + DEFiRet; + + /* we now create our own message object and submit it to the queue */ + CHKiRet(msgConstruct(&pMsg)); + MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv); + MsgSetInputName(pMsg, pInputName); + MsgSetFlowControlType(pMsg, funixFlowCtl[iSock]); + + if(funixParseHost[iSock]) { + pMsg->msgFlags = funixFlags[iSock] | NEEDS_PARSING | PARSE_HOSTNAME; + } else { + pMsg->msgFlags = funixFlags[iSock] | NEEDS_PARSING; + } + + MsgSetRcvFrom(pMsg, funixHName[iSock]); + CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP)); + CHKiRet(submitMsg(pMsg)); + +finalize_it: + RETiRet; +} + + /* This function receives data from a socket indicated to be ready * to receive and submits the message received for processing. * rgerhards, 2007-12-20 @@ -231,10 +271,7 @@ static rsRetVal readSocket(int fd, int iSock) iRcvd = recv(fd, pRcv, iMaxLine, 0); dbgprintf("Message from UNIX socket: #%d\n", fd); if (iRcvd > 0) { - parseAndSubmitMessage(funixHName[iSock] == NULL ? glbl.GetLocalHostName() : funixHName[iSock], - (uchar*)"127.0.0.1", pRcv, - iRcvd, funixParseHost[iSock] ? (funixFlags[iSock] | PARSE_HOSTNAME) : funixFlags[iSock], - funixFlowCtl[iSock], pInputName, NULL, 0); + CHKiRet(SubmitMsg(pRcv, iRcvd, iSock)); } else if (iRcvd < 0 && errno != EINTR) { char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); @@ -413,6 +450,15 @@ CODEmodInit_QueryRegCFSLineHdlr funix[i] = -1; } + CHKiRet(prop.Construct(&pLocalHostIP)); + CHKiRet(prop.SetString(pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1)); + CHKiRet(prop.ConstructFinalize(pLocalHostIP)); + + /* now init listen socket zero, the local log socket */ + CHKiRet(prop.Construct(&(funixHName[0]))); + CHKiRet(prop.SetString(funixHName[0], glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()))); + CHKiRet(prop.ConstructFinalize(funixHName[0])); + /* register config file handlers */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"omitlocallogging", 0, eCmdHdlrBinary, NULL, &bOmitLocalLogging, STD_LOADABLE_MODULE_ID)); |