summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imdiag/imdiag.c1
-rw-r--r--plugins/imfile/imfile.c1
-rw-r--r--plugins/imklog/imklog.c1
-rw-r--r--plugins/immark/immark.c18
-rw-r--r--plugins/imrelp/imrelp.c8
-rw-r--r--plugins/imtemplate/imtemplate.c36
-rw-r--r--plugins/imudp/imudp.c85
-rw-r--r--plugins/imuxsock/imuxsock.c78
8 files changed, 139 insertions, 89 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 9602f50d..4359cda1 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -213,7 +213,6 @@ doInjectMsg(int iNum)
MsgSetInputName(pMsg, pInputName);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
- pMsg->bParseHOSTNAME = 1;
MsgSetRcvFrom(pMsg, pRcvDummy);
CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy));
CHKiRet(submitMsg(pMsg));
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 7c588f90..7cfde940 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -107,7 +107,6 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
MsgSetTAG(pMsg, pInfo->pszTag, pInfo->lenTag);
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
- pMsg->bParseHOSTNAME = 0;
CHKiRet(submitMsg(pMsg));
finalize_it:
RETiRet;
diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c
index 7994c5eb..c59ce04f 100644
--- a/plugins/imklog/imklog.c
+++ b/plugins/imklog/imklog.c
@@ -111,7 +111,6 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity)
MsgSetTAG(pMsg, pszTag, ustrlen(pszTag));
pMsg->iFacility = LOG_FAC(iFacility);
pMsg->iSeverity = LOG_PRI(iSeverity);
- pMsg->bParseHOSTNAME = 0;
CHKiRet(submitMsg(pMsg));
finalize_it:
diff --git a/plugins/immark/immark.c b/plugins/immark/immark.c
index 8504f872..19f43456 100644
--- a/plugins/immark/immark.c
+++ b/plugins/immark/immark.c
@@ -42,6 +42,7 @@
#include "module-template.h"
#include "errmsg.h"
#include "msg.h"
+#include "glbl.h"
MODULE_TYPE_INPUT
@@ -50,8 +51,16 @@ MODULE_TYPE_INPUT
/* Module static data */
DEF_IMOD_STATIC_DATA
+DEFobjCurrIf(glbl)
static int iMarkMessagePeriod = DEFAULT_MARK_PERIOD;
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
/* This function is called to gather input. It must terminate only
* a) on failure (iRet set accordingly)
* b) on termination of the input module (as part of the unload process)
@@ -77,6 +86,10 @@ CODESTARTrunInput
* rgerhards, 2007-12-17
*/
CHKiRet(thrdSleep(pThrd, iMarkMessagePeriod, 0)); /* seconds, micro seconds */
+
+ if(glbl.GetGlobalInputTermState() == 1)
+ break; /* terminate input! */
+
logmsgInternal(NO_ERRCODE, LOG_INFO, (uchar*)"-- MARK --", MARK);
}
finalize_it:
@@ -106,6 +119,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
@@ -119,9 +133,9 @@ BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"markmessageperiod", 0, eCmdHdlrInt, NULL, &iMarkMessagePeriod, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
ENDmodInit
-/*
- * vi:set ai:
+/* vi:set ai:
*/
diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c
index b9e7b2f8..9be38f8f 100644
--- a/plugins/imrelp/imrelp.c
+++ b/plugins/imrelp/imrelp.c
@@ -81,14 +81,14 @@ isPermittedHost(struct sockaddr *addr, char *fromHostFQDN, void __attribute__((u
* are different from our rsRetVal. So we can simply use our own iRet system
* to fulfill the requirement.
* rgerhards, 2008-03-21
- * TODO: we currently do not receive the remote hosts's IP. As a work-around, we
- * use "???" for the time being. -- rgerhards, 2008-05-16
+ * Note: librelp 1.0.0 is required in order to receive the IP address, otherwise
+ * we will only see the hostname (twice). -- rgerhards, 2009-10-14
*/
static relpRetVal
-onSyslogRcv(uchar *pHostname, uchar __attribute__((unused)) *pIP, uchar *pMsg, size_t lenMsg)
+onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *pMsg, size_t lenMsg)
{
DEFiRet;
- parseAndSubmitMessage(pHostname, (uchar*) "[unset]", pMsg, lenMsg, PARSE_HOSTNAME,
+ parseAndSubmitMessage(pHostname, pIP, pMsg, lenMsg, PARSE_HOSTNAME,
eFLOWCTL_LIGHT_DELAY, pInputName, NULL, 0);
RETiRet;
diff --git a/plugins/imtemplate/imtemplate.c b/plugins/imtemplate/imtemplate.c
index 366408a0..82084745 100644
--- a/plugins/imtemplate/imtemplate.c
+++ b/plugins/imtemplate/imtemplate.c
@@ -231,49 +231,25 @@ CODESTARTrunInput
* logs an error message as syslogd, just as printf, e.g.
* errmsg.LogError(NO_ERRCODE, "Error %d occured during %s", 1, "test");
*
- * There are several ways how a message can be enqueued. This part of the
- * interface is currently underspecified. Have a look at the function definitions
- * in syslogd.c (sorry, folks...).
- *
- * If you received a full syslog message that must be decoded by a message
- * parser, parseAndSubmitMessage() is the way to go. It's not just a funny name
- * but also a quite some legacy. Consequently, its interface is, ummm, not
- * well designed.
- * parseAndSubmitMessage((char*)fromHost, (char*) pRcvBuf, lenRcvd, bParseHost);
- * fromHost
- * is the host that we received the message from (a string)
- * pRcvBuf
- * is the received (to-be-decoded) message
- * lenRcvd
- * is the length of the received message. Please note that pRcvBuf is
- * NOT a standard C-string. Most importantly it is NOT expected to be
- * \0-terminated. Thus the lenght is vitally imporant (if it is wrong,
- * rsyslogd will probably segfault).
- * bParseHost
- * is a boolean (0-no, 1-yes). It tells the parser whether or not
- * a hostname should be parsed from the message. This is important
- * for sources that are known not to provide a hostname.
- * Use define MSG_PARSE_HOSTNAME and MSG_DONT_PARSE_HOSTNAME
- *
- * Another, more elaborate, way is to create the message object ourselves and
- * pass it to the rule engine. That way is more appropriate if the message
+ * To submit the message to the queue engine, we must create the message
+ * object and fill it with data. If it contains a syslog message that must
+ * be parsed, we can add a flag that requests parsing. Otherwise, we must
+ * fill the properties ourselves. That is appropriate if the message
* does not need to be parsed, for example when reading text (log) files. In that way,
* we can set the message properties as of our liking. This is how it works:
*
msg_t *pMsg;
CHKiRet(msgConstruct(&pMsg));
- MsgSetUxTradMsg(pMsg, msg);
MsgSetRawMsg(pMsg, msg);
MsgSetHOSTNAME(pMsg, LocalHostName);
MsgSetTAG(pMsg, "rsyslogd:");
pMsg->iFacility = LOG_FAC(pri);
pMsg->iSeverity = LOG_PRI(pri);
- pMsg->bParseHOSTNAME = 0;
flags |= INTERNAL_MSG;
logmsg(pMsg, flags); / * some time, CHKiRet() will work here, too [today NOT!] * /
*
- * Note that UxTradMsg is a wild construct. For the time being, set it to
- * the raw message text. I am hard thinking at dropping that beast at all...
+ * NOTE: for up-to-date usage samples, see the other provided input modules.
+ * A good starting point is probably imuxsock.
*
* This example probably does not set all message properties (but the ones
* that are of practical importance). If you need all, check msg.h. Use
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 3fabf1a2..59d23adb 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -47,6 +47,7 @@
#include "parser.h"
#include "datetime.h"
#include "prop.h"
+#include "ruleset.h"
#include "unicode-helper.h"
MODULE_TYPE_INPUT
@@ -60,6 +61,7 @@ DEFobjCurrIf(glbl)
DEFobjCurrIf(net)
DEFobjCurrIf(datetime)
DEFobjCurrIf(prop)
+DEFobjCurrIf(ruleset)
static int iMaxLine; /* maximum UDP message size supported */
static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitted sender was last discarded
@@ -68,13 +70,14 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte
*/
static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements
* read-only after init(), but beware of restart! */
+static ruleset_t **udpRulesets = NULL; /* ruleset to be used with sockets in question (entry 0 is empty) */
static uchar *pszBindAddr = NULL; /* IP to bind socket to */
static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc
* it so that we can check available memory in willRun() and request
* termination if we can not get it. -- rgerhards, 2007-12-27
*/
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
-// TODO: static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
+static ruleset_t *pBindRuleset = NULL; /* ruleset to bind listener to (use system default if unspecified) */
#define TIME_REQUERY_DFLT 2
static int iTimeRequery = TIME_REQUERY_DFLT;/* how often is time to be queried inside tight recv loop? 0=always */
@@ -93,6 +96,7 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
int *newSocks;
int *tmpSocks;
int iSrc, iDst;
+ ruleset_t **tmpRulesets;
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -113,26 +117,39 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
if(udpLstnSocks == NULL) {
/* esay, we can just replace it */
udpLstnSocks = newSocks;
+ CHKmalloc(udpRulesets = (ruleset_t**) malloc(sizeof(ruleset_t*) * (newSocks[0] + 1)));
+ for(iDst = 1 ; iDst < newSocks[0] ; ++iDst)
+ udpRulesets[iDst] = pBindRuleset;
} else {
/* we need to add them */
- if((tmpSocks = malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]))) == NULL) {
+ tmpSocks = (int*) malloc(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]));
+ tmpRulesets = (ruleset_t**) malloc(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0]));
+ if(tmpSocks == NULL || tmpRulesets == NULL) {
DBGPRINTF("out of memory trying to allocate udp listen socket array\n");
/* in this case, we discard the new sockets but continue with what we
* already have
*/
free(newSocks);
+ free(tmpSocks);
+ free(tmpRulesets);
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
} else {
/* ready to copy */
iDst = 1;
- for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc)
+ for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc) {
tmpSocks[iDst++] = udpLstnSocks[iSrc];
- for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc)
+ tmpRulesets[iDst++] = udpRulesets[iSrc];
+ }
+ for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) {
tmpSocks[iDst++] = newSocks[iSrc];
+ tmpRulesets[iDst++] = pBindRuleset;
+ }
tmpSocks[0] = udpLstnSocks[0] + newSocks[0];
free(newSocks);
free(udpLstnSocks);
udpLstnSocks = tmpSocks;
+ free(udpRulesets);
+ udpRulesets = tmpRulesets;
}
}
}
@@ -144,7 +161,6 @@ finalize_it:
}
-#if 0 /* TODO: implement when tehre is time, requires restructure of socket array! */
/* accept a new ruleset to bind. Checks if it exists and complains, if not */
static rsRetVal
setRuleset(void __attribute__((unused)) *pVal, uchar *pszName)
@@ -165,7 +181,6 @@ finalize_it:
free(pszName); /* no longer needed */
RETiRet;
}
-#endif
/* This function is a helper to runInput. I have extracted it
@@ -184,7 +199,7 @@ finalize_it:
*/
static inline rsRetVal
processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
- uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP)
+ uchar *fromHost, uchar *fromHostFQDN, uchar *fromHostIP, ruleset_t *pRuleset)
{
DEFiRet;
int iNbrTimeUsed;
@@ -255,9 +270,9 @@ processSocket(int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf);
MsgSetInputName(pMsg, pInputName);
+ MsgSetRuleset(pMsg, pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
- pMsg->bParseHOSTNAME = 1;
MsgSetRcvFromStr(pMsg, fromHost, ustrlen(fromHost), &propFromHost);
CHKiRet(MsgSetRcvFromIPStr(pMsg, fromHostIP, ustrlen(fromHostIP), &propFromHostIP));
CHKiRet(submitMsg(pMsg));
@@ -279,16 +294,14 @@ finalize_it:
* interface. ./configure settings control which one is used.
* rgerhards, 2009-09-09
*/
-#if HAVE_EPOLL_CREATE1
+#if defined(HAVE_EPOLL_CREATE1) || defined(HAVE_EPOLL_CREATE)
#define NUM_EPOLL_EVENTS 10
-rsRetVal rcvMainLoop()
+rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
{
DEFiRet;
- int maxfds;
int nfds;
int efd;
int i;
- fd_set readfds;
struct sockaddr_storage frominetPrev;
int bIsPermitted;
uchar fromHost[NI_MAXHOST];
@@ -306,24 +319,25 @@ rsRetVal rcvMainLoop()
CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event)));
- efd = epoll_create1(EPOLL_CLOEXEC);
+# if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+ DBGPRINTF("imudp uses epoll_create1()\n");
+ efd = epoll_create1(EPOLL_CLOEXEC);
+# else
+ DBGPRINTF("imudp uses epoll_create()\n");
+ efd = epoll_create(NUM_EPOLL_EVENTS);
+# endif
if(efd < 0) {
DBGPRINTF("epoll_create1() could not create fd\n");
- // TODO: "good" error message
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
/* fill the epoll set - we need to do this only once, as the set
* can not change dyamically.
*/
- maxfds = 0;
- FD_ZERO (&readfds);
-
- /* Add the UDP listen sockets to the list of read descriptors. */
for (i = 0; i < *udpLstnSocks; i++) {
if (udpLstnSocks[i+1] != -1) {
udpEPollEvt[i].events = EPOLLIN | EPOLLET;
- udpEPollEvt[i].data.fd = udpLstnSocks[i+1];
+ udpEPollEvt[i].data.u64 = i+1;
if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) {
rs_strerror_r(errno, errStr, sizeof(errStr));
errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n",
@@ -337,18 +351,24 @@ rsRetVal rcvMainLoop()
nfds = epoll_wait(efd, currEvt, NUM_EPOLL_EVENTS, -1);
DBGPRINTF("imudp: epoll_wait() returned with %d fds\n", nfds);
+ if(pThrd->bShallStop == TRUE)
+ break; /* terminate input! */
+
for(i = 0 ; i < nfds ; ++i) {
- processSocket(currEvt[i].data.fd, &frominetPrev, &bIsPermitted,
- fromHost, fromHostFQDN, fromHostIP);
+ processSocket(udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted,
+ fromHost, fromHostFQDN, fromHostIP, udpRulesets[currEvt[i].data.u64]);
}
}
finalize_it:
+ if(udpEPollEvt != NULL)
+ free(udpEPollEvt);
+
RETiRet;
}
#else /* #if HAVE_EPOLL_CREATE1 */
/* this is the code for the select() interface */
-rsRetVal rcvMainLoop()
+rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
{
DEFiRet;
int maxfds;
@@ -366,6 +386,7 @@ rsRetVal rcvMainLoop()
*/
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
+ DBGPRINTF("imudp uses select()\n");
while(1) {
/* Add the Unix Domain Sockets to the list of read
@@ -402,7 +423,7 @@ rsRetVal rcvMainLoop()
for(i = 0; nfds && i < *udpLstnSocks; i++) {
if(FD_ISSET(udpLstnSocks[i+1], &readfds)) {
processSocket(udpLstnSocks[i+1], &frominetPrev, &bIsPermitted,
- fromHost, fromHostFQDN, fromHostIP);
+ fromHost, fromHostFQDN, fromHostIP, udpRulesets[i+1]);
--nfds; /* indicate we have processed one descriptor */
}
}
@@ -423,7 +444,7 @@ CODESTARTrunInput
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
*/
- iRet = rcvMainLoop();
+ iRet = rcvMainLoop(pThrd);
ENDrunInput
@@ -443,9 +464,7 @@ CODESTARTwillRun
iMaxLine = glbl.GetMaxLine();
- if((pRcvBuf = malloc((iMaxLine + 1) * sizeof(char))) == NULL) {
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- }
+ CHKmalloc(pRcvBuf = malloc((iMaxLine + 1) * sizeof(char)));
finalize_it:
ENDwillRun
@@ -457,6 +476,8 @@ CODESTARTafterRun
if(udpLstnSocks != NULL) {
net.closeUDPListenSockets(udpLstnSocks);
udpLstnSocks = NULL;
+ free(udpRulesets);
+ udpRulesets = NULL;
}
if(pRcvBuf != NULL) {
free(pRcvBuf);
@@ -474,6 +495,7 @@ CODESTARTmodExit
objRelease(glbl, CORE_COMPONENT);
objRelease(datetime, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
objRelease(net, LM_NET_FILENAME);
ENDmodExit
@@ -497,10 +519,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
free(pszBindAddr);
pszBindAddr = NULL;
}
- if(udpLstnSocks != NULL) {
- net.closeUDPListenSockets(udpLstnSocks);
- udpLstnSocks = NULL;
- }
iTimeRequery = TIME_REQUERY_DFLT;/* the default is to query only every second time */
return RS_RET_OK;
}
@@ -514,13 +532,12 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
CHKiRet(objUse(net, LM_NET_FILENAME));
/* register config file handlers */
- /* TODO: add - but this requires more changes, no time right now...
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverbindruleset", 0, eCmdHdlrGetWord,
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord,
setRuleset, NULL, STD_LOADABLE_MODULE_ID));
- */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserverrun", 0, eCmdHdlrGetWord,
addListner, NULL, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"udpserveraddress", 0, eCmdHdlrGetWord,
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index b8da4966..b8546ce3 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-12-20 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -70,6 +70,7 @@ DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(prop)
+static prop_t *pLocalHostIP = NULL; /* there is only one global IP for all internally-generated messages */
static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
static int startIndexUxLocalSockets; /* process funix from that index on (used to
* suppress local logging. rgerhards 2005-08-01
@@ -79,7 +80,7 @@ static int funixParseHost[MAXFUNIX] = { 0, }; /* should parser parse host name?
static int funixFlags[MAXFUNIX] = { IGNDATE, }; /* should parser parse host name? read-only after startup */
static int funixCreateSockPath[MAXFUNIX] = { 0, }; /* auto-creation of socket directory? */
static uchar *funixn[MAXFUNIX] = { (uchar*) _PATH_LOG }; /* read-only after startup */
-static uchar *funixHName[MAXFUNIX] = { NULL, }; /* host-name override - if set, use this instead of actual name */
+static prop_t *funixHName[MAXFUNIX] = { NULL, }; /* host-name override - if set, use this instead of actual name */
static int funixFlowCtl[MAXFUNIX] = { eFLOWCTL_NO_DELAY, }; /* flow control settings for this socket */
static int funix[MAXFUNIX] = { -1, }; /* read-only after startup */
static int nfunix = 1; /* number of Unix sockets open / read-only after startup */
@@ -122,30 +123,41 @@ static rsRetVal setSystemLogFlowControl(void __attribute__((unused)) *pVal, int
* rgerhards, 2007-12-20
* added capability to specify hostname for socket -- rgerhards, 2008-08-01
*/
-static rsRetVal addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal)
+static rsRetVal
+addLstnSocketName(void __attribute__((unused)) *pVal, uchar *pNewVal)
{
+ DEFiRet;
+
if(nfunix < MAXFUNIX) {
if(*pNewVal == ':') {
funixParseHost[nfunix] = 1;
- }
- else {
+ } else {
funixParseHost[nfunix] = 0;
}
- funixHName[nfunix] = pLogHostName;
- pLogHostName = NULL; /* re-init for next, not freed because funixHName[] now owns it */
+ CHKiRet(prop.Construct(&(funixHName[nfunix])));
+ if(pLogHostName == NULL) {
+ CHKiRet(prop.SetString(funixHName[nfunix], glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));
+ } else {
+ CHKiRet(prop.SetString(funixHName[nfunix], pLogHostName, ustrlen(pLogHostName)));
+ /* reset hostname for next socket */
+ free(pLogHostName);
+ pLogHostName = NULL;
+ }
+ CHKiRet(prop.ConstructFinalize(funixHName[nfunix]));
funixFlowCtl[nfunix] = bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
funixFlags[nfunix] = bIgnoreTimestamp ? IGNDATE : NOFLAG;
funixCreateSockPath[nfunix] = bCreateSockPath;
funixn[nfunix++] = pNewVal;
- }
- else {
+ } else {
errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n",
pNewVal);
}
- return RS_RET_OK;
+finalize_it:
+ RETiRet;
}
+
/* free the funixn[] socket names - needed as cleanup on several places
* note that nfunix is NOT reset! funixn[0] is never freed, as it comes from
* the constant memory pool - and if not, it is freeed via some other pointer.
@@ -160,8 +172,7 @@ static rsRetVal discardFunixn(void)
funixn[i] = NULL;
}
if(funixHName[i] != NULL) {
- free(funixHName[i]);
- funixHName[i] = NULL;
+ prop.Destruct(&(funixHName[i]));
}
}
@@ -197,6 +208,35 @@ static int create_unix_socket(const char *path, int bCreatePath)
}
+/* submit received message to the queue engine
+ */
+static inline rsRetVal
+SubmitMsg(uchar *pRcv, int lenRcv, int iSock)
+{
+ msg_t *pMsg;
+ DEFiRet;
+
+ /* we now create our own message object and submit it to the queue */
+ CHKiRet(msgConstruct(&pMsg));
+ MsgSetRawMsg(pMsg, (char*)pRcv, lenRcv);
+ MsgSetInputName(pMsg, pInputName);
+ MsgSetFlowControlType(pMsg, funixFlowCtl[iSock]);
+
+ if(funixParseHost[iSock]) {
+ pMsg->msgFlags = funixFlags[iSock] | NEEDS_PARSING | PARSE_HOSTNAME;
+ } else {
+ pMsg->msgFlags = funixFlags[iSock] | NEEDS_PARSING;
+ }
+
+ MsgSetRcvFrom(pMsg, funixHName[iSock]);
+ CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP));
+ CHKiRet(submitMsg(pMsg));
+
+finalize_it:
+ RETiRet;
+}
+
+
/* This function receives data from a socket indicated to be ready
* to receive and submits the message received for processing.
* rgerhards, 2007-12-20
@@ -231,10 +271,7 @@ static rsRetVal readSocket(int fd, int iSock)
iRcvd = recv(fd, pRcv, iMaxLine, 0);
dbgprintf("Message from UNIX socket: #%d\n", fd);
if (iRcvd > 0) {
- parseAndSubmitMessage(funixHName[iSock] == NULL ? glbl.GetLocalHostName() : funixHName[iSock],
- (uchar*)"127.0.0.1", pRcv,
- iRcvd, funixParseHost[iSock] ? (funixFlags[iSock] | PARSE_HOSTNAME) : funixFlags[iSock],
- funixFlowCtl[iSock], pInputName, NULL, 0);
+ CHKiRet(SubmitMsg(pRcv, iRcvd, iSock));
} else if (iRcvd < 0 && errno != EINTR) {
char errStr[1024];
rs_strerror_r(errno, errStr, sizeof(errStr));
@@ -413,6 +450,15 @@ CODEmodInit_QueryRegCFSLineHdlr
funix[i] = -1;
}
+ CHKiRet(prop.Construct(&pLocalHostIP));
+ CHKiRet(prop.SetString(pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1));
+ CHKiRet(prop.ConstructFinalize(pLocalHostIP));
+
+ /* now init listen socket zero, the local log socket */
+ CHKiRet(prop.Construct(&(funixHName[0])));
+ CHKiRet(prop.SetString(funixHName[0], glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));
+ CHKiRet(prop.ConstructFinalize(funixHName[0]));
+
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"omitlocallogging", 0, eCmdHdlrBinary,
NULL, &bOmitLocalLogging, STD_LOADABLE_MODULE_ID));