summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imdiag/imdiag.c3
-rw-r--r--plugins/imgssapi/imgssapi.c2
-rw-r--r--plugins/imklog/imklog.c9
-rw-r--r--plugins/impstats/impstats.c8
-rw-r--r--plugins/imptcp/imptcp.c76
-rw-r--r--plugins/imtcp/imtcp.c6
-rw-r--r--plugins/imudp/imudp.c174
-rw-r--r--plugins/imuxsock/imuxsock.c10
8 files changed, 171 insertions, 117 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index b3468921..bcbbab09 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -376,7 +376,8 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
/* initialized, now add socket */
CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, pszInputName == NULL ?
UCHAR_CONSTANT("imdiag") : pszInputName));
- tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal);
+ /* we support octect-cuunted frame (constant 1 below) */
+ tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal, 1);
finalize_it:
if(iRet != RS_RET_OK) {
diff --git a/plugins/imgssapi/imgssapi.c b/plugins/imgssapi/imgssapi.c
index 446795d6..cc969aa5 100644
--- a/plugins/imgssapi/imgssapi.c
+++ b/plugins/imgssapi/imgssapi.c
@@ -335,7 +335,7 @@ addGSSListener(void __attribute__((unused)) *pVal, uchar *pNewVal)
CHKiRet(tcpsrv.SetCBOnRegularClose(pOurTcpsrv, onRegularClose));
CHKiRet(tcpsrv.SetCBOnErrClose(pOurTcpsrv, onErrClose));
CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, UCHAR_CONSTANT("imgssapi")));
- tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal);
+ tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal, 1);
CHKiRet(tcpsrv.ConstructFinalize(pOurTcpsrv));
}
diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c
index 16adbc21..b6fd7c79 100644
--- a/plugins/imklog/imklog.c
+++ b/plugins/imklog/imklog.c
@@ -44,6 +44,7 @@
#include <stdarg.h>
#include <ctype.h>
#include <stdlib.h>
+#include <sys/socket.h>
#include "dirty.h"
#include "cfsysline.h"
@@ -52,6 +53,7 @@
#include "module-template.h"
#include "datetime.h"
#include "imklog.h"
+#include "net.h"
#include "glbl.h"
#include "prop.h"
#include "unicode-helper.h"
@@ -64,6 +66,7 @@ DEF_IMOD_STATIC_DATA
DEFobjCurrIf(datetime)
DEFobjCurrIf(glbl)
DEFobjCurrIf(prop)
+DEFobjCurrIf(net)
/* configuration settings */
int dbgPrintSymbols = 0; /* this one is extern so the helpers can access it! */
@@ -236,7 +239,7 @@ BEGINwillRun
CODESTARTwillRun
/* we need to create the inputName property (only once during our lifetime) */
CHKiRet(prop.CreateStringProp(&pInputName, UCHAR_CONSTANT("imklog"), sizeof("imklog") - 1));
- CHKiRet(prop.CreateStringProp(&pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1));
+ pLocalHostIP = glbl.GetLocalHostIP();
iRet = klogWillRun();
finalize_it:
@@ -249,8 +252,6 @@ CODESTARTafterRun
if(pInputName != NULL)
prop.Destruct(&pInputName);
- if(pLocalHostIP != NULL)
- prop.Destruct(&pLocalHostIP);
ENDafterRun
@@ -258,6 +259,7 @@ BEGINmodExit
CODESTARTmodExit
/* release objects we used */
objRelease(glbl, CORE_COMPONENT);
+ objRelease(net, CORE_COMPONENT);
objRelease(datetime, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
if(pszPath != NULL)
@@ -293,6 +295,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(net, CORE_COMPONENT));
iFacilIntMsg = klogFacilIntMsg();
diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c
index 3012136c..ba497e01 100644
--- a/plugins/impstats/impstats.c
+++ b/plugins/impstats/impstats.c
@@ -62,7 +62,6 @@ typedef struct configSettings_s {
static configSettings_t cs;
static prop_t *pInputName = NULL;
-static prop_t *pLocalHostIP = NULL;
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -92,7 +91,7 @@ doSubmitMsg(uchar *line)
MsgSetRawMsgWOSize(pMsg, (char*)line);
MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()));
MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp());
- MsgSetRcvFromIP(pMsg, pLocalHostIP);
+ MsgSetRcvFromIP(pMsg, glbl.GetLocalHostIP());
MsgSetMSGoffs(pMsg, 0);
MsgSetTAG(pMsg, UCHAR_CONSTANT("rsyslogd-pstats:"), sizeof("rsyslogd-pstats:") - 1);
pMsg->iFacility = cs.iFacility;
@@ -169,7 +168,6 @@ ENDafterRun
BEGINmodExit
CODESTARTmodExit
prop.Destruct(&pInputName);
- prop.Destruct(&pLocalHostIP);
/* release objects we used */
objRelease(glbl, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
@@ -212,10 +210,6 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(prop.Construct(&pInputName));
CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("impstats"), sizeof("impstats") - 1));
CHKiRet(prop.ConstructFinalize(pInputName));
-
- CHKiRet(prop.Construct(&pLocalHostIP));
- CHKiRet(prop.SetString(pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1));
- CHKiRet(prop.ConstructFinalize(pLocalHostIP));
ENDmodInit
/* vi:set ai:
*/
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 65fe703c..3973041b 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -65,6 +65,7 @@
#include "datetime.h"
#include "ruleset.h"
#include "msg.h"
+#include "statsobj.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
/* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */
@@ -82,12 +83,13 @@ DEFobjCurrIf(prop)
DEFobjCurrIf(datetime)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(ruleset)
-
+DEFobjCurrIf(statsobj)
/* config settings */
typedef struct configSettings_s {
int bEmitMsgOnClose; /* emit an informational message on close by remote peer */
+ int bSuppOctetFram; /* support octet-counted framing? */
int iAddtlFrameDelim; /* addtl frame delimiter, e.g. for netscreen, default none */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
uchar *lstnIP; /* which IP we should listen on? */
@@ -110,7 +112,8 @@ struct ptcpsrv_s {
ptcpsrv_t *pNext; /* linked list maintenance */
uchar *port; /* Port to listen to */
uchar *lstnIP; /* which IP we should listen on? */
- int bEmitMsgOnClose;
+ sbool bEmitMsgOnClose;
+ sbool bSuppOctetFram;
int iAddtlFrameDelim;
uchar *pszInputName;
prop_t *pInputName; /* InputName in (fast to process) property format */
@@ -123,13 +126,15 @@ struct ptcpsrv_s {
* includes support for doubly-linked list.
*/
struct ptcpsess_s {
- ptcpsrv_t *pSrv; /* our server */
+// ptcpsrv_t *pSrv; /* our server TODO: check remove! */
+ ptcplstn_t *pLstn; /* our listener */
ptcpsess_t *prev, *next;
int sock;
epolld_t *epd;
//--- from tcps_sess.h
int iMsg; /* index of next char to store in msg */
int bAtStrtOfFram; /* are we at the very beginning of a new frame? */
+ sbool bSuppOctetFram; /**< copy from listener, to speed up access */
enum {
eAtStrtFram,
eInOctetCnt,
@@ -150,7 +155,10 @@ struct ptcplstn_s {
ptcpsrv_t *pSrv; /* our server */
ptcplstn_t *prev, *next;
int sock;
+ sbool bSuppOctetFram;
epolld_t *epd;
+ statsobj_t *stats; /* listener stats */
+ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
};
@@ -188,7 +196,7 @@ static char rcvBuf[128*1024];
/* forward definitions */
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);
-static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock);
+static rsRetVal addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6);
/* some simple constructors/destructors */
@@ -233,6 +241,7 @@ startupSrv(ptcpsrv_t *pSrv)
int sockflags;
struct addrinfo hints, *res = NULL, *r;
uchar *lstnIP;
+ int isIPv6 = 0;
lstnIP = pSrv->lstnIP == NULL ? UCHAR_CONSTANT("") : pSrv->lstnIP;
@@ -265,8 +274,9 @@ startupSrv(ptcpsrv_t *pSrv)
continue;
}
-#ifdef IPV6_V6ONLY
if(r->ai_family == AF_INET6) {
+ isIPv6 = 1;
+#ifdef IPV6_V6ONLY
int iOn = 1;
if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
(char *)&iOn, sizeof (iOn)) < 0) {
@@ -274,8 +284,10 @@ startupSrv(ptcpsrv_t *pSrv)
sock = -1;
continue;
}
- }
#endif
+ } else {
+ isIPv6 = 0;
+ }
if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0 ) {
DBGPRINTF("error %d setting tcp socket option\n", errno);
close(sock);
@@ -337,7 +349,7 @@ startupSrv(ptcpsrv_t *pSrv)
/* if we reach this point, we were able to obtain a valid socket, so we can
* create our listener object. -- rgerhards, 2010-08-10
*/
- CHKiRet(addLstn(pSrv, sock));
+ CHKiRet(addLstn(pSrv, sock, isIPv6));
++numSocks;
}
@@ -492,22 +504,25 @@ static rsRetVal
doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, multi_submit_t *pMultiSub)
{
msg_t *pMsg;
+ ptcpsrv_t *pSrv;
DEFiRet;
if(pThis->iMsg == 0) {
DBGPRINTF("discarding zero-sized message\n");
FINALIZE;
}
+ pSrv = pThis->pLstn->pSrv;
/* we now create our own message object and submit it to the queue */
CHKiRet(msgConstructWithTime(&pMsg, stTime, ttGenTime));
MsgSetRawMsg(pMsg, (char*)pThis->pMsg, pThis->iMsg);
- MsgSetInputName(pMsg, pThis->pSrv->pInputName);
+ MsgSetInputName(pMsg, pSrv->pInputName);
MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
MsgSetRcvFrom(pMsg, pThis->peerName);
CHKiRet(MsgSetRcvFromIP(pMsg, pThis->peerIP));
- MsgSetRuleset(pMsg, pThis->pSrv->pRuleset);
+ MsgSetRuleset(pMsg, pSrv->pRuleset);
+ STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit);
if(pMultiSub == NULL) {
CHKiRet(submitMsg(pMsg));
@@ -540,7 +555,7 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG
DEFiRet;
if(pThis->inputState == eAtStrtFram) {
- if(isdigit((int) c)) {
+ if(pThis->bSuppOctetFram && isdigit((int) c)) {
pThis->inputState = eInOctetCnt;
pThis->iOctetsRemain = 0;
pThis->eFraming = TCP_FRAMING_OCTET_COUNTING;
@@ -589,7 +604,8 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG
}
if(( (c == '\n')
- || ((pThis->pSrv->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->iAddtlFrameDelim))
+ || ((pThis->pLstn->pSrv->iAddtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER)
+ && (c == pThis->pLstn->pSrv->iAddtlFrameDelim))
) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */
doSubmitMsg(pThis, stTime, ttGenTime, pMultiSub);
pThis->inputState = eAtStrtFram;
@@ -678,6 +694,7 @@ static inline void
initConfigSettings(void)
{
cs.bEmitMsgOnClose = 0;
+ cs.bSuppOctetFram = 1;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
cs.pszInputName = NULL;
cs.pRuleset = NULL;
@@ -693,7 +710,7 @@ addEPollSock(epolld_type_t typ, void *ptr, int sock, epolld_t **pEpd)
DEFiRet;
epolld_t *epd = NULL;
- CHKmalloc(epd = malloc(sizeof(epolld_t)));
+ CHKmalloc(epd = calloc(sizeof(epolld_t), 1));
epd->typ = typ;
epd->ptr = ptr;
*pEpd = epd;
@@ -747,14 +764,27 @@ finalize_it:
/* add a listener to the server
*/
static rsRetVal
-addLstn(ptcpsrv_t *pSrv, int sock)
+addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6)
{
DEFiRet;
ptcplstn_t *pLstn;
+ uchar statname[64];
CHKmalloc(pLstn = malloc(sizeof(ptcplstn_t)));
pLstn->pSrv = pSrv;
+ pLstn->bSuppOctetFram = pSrv->bSuppOctetFram;
pLstn->sock = sock;
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&(pLstn->stats)));
+ snprintf((char*)statname, sizeof(statname), "imptcp(%s/%s/%s)",
+ (pSrv->lstnIP == NULL) ? "*" : (char*)pSrv->lstnIP, pSrv->port,
+ isIPv6 ? "IPv6" : "IPv4");
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(pLstn->stats, statname));
+ STATSCOUNTER_INIT(pLstn->ctrSubmit, pLstn->mutCtrSubmit);
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &(pLstn->ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(pLstn->stats));
/* add to start of server's listener list */
pLstn->prev = NULL;
@@ -773,15 +803,17 @@ finalize_it:
/* add a session to the server
*/
static rsRetVal
-addSess(ptcpsrv_t *pSrv, int sock, prop_t *peerName, prop_t *peerIP)
+addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP)
{
DEFiRet;
ptcpsess_t *pSess = NULL;
+ ptcpsrv_t *pSrv = pLstn->pSrv;
CHKmalloc(pSess = malloc(sizeof(ptcpsess_t)));
CHKmalloc(pSess->pMsg = malloc(iMaxLine * sizeof(uchar)));
- pSess->pSrv = pSrv;
+ pSess->pLstn = pLstn;
pSess->sock = sock;
+ pSess->bSuppOctetFram = pLstn->bSuppOctetFram;
pSess->inputState = eAtStrtFram;
pSess->iMsg = 0;
pSess->bAtStrtOfFram = 1;
@@ -824,7 +856,7 @@ closeSess(ptcpsess_t *pSess)
pSess->next->prev = pSess->prev;
if(pSess->prev == NULL) {
/* need to update root! */
- pSess->pSrv->pSess = pSess->next;
+ pSess->pLstn->pSrv->pSess = pSess->next;
} else {
pSess->prev->next = pSess->next;
}
@@ -882,6 +914,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
CHKmalloc(pSrv = malloc(sizeof(ptcpsrv_t)));
pSrv->pSess = NULL;
pSrv->pLstn = NULL;
+ pSrv->bSuppOctetFram = cs.bSuppOctetFram;
pSrv->bEmitMsgOnClose = cs.bEmitMsgOnClose;
pSrv->port = pNewVal;
pSrv->iAddtlFrameDelim = cs.iAddtlFrameDelim;
@@ -949,7 +982,7 @@ lstnActivity(ptcplstn_t *pLstn)
if(localRet == RS_RET_NO_MORE_DATA)
break;
CHKiRet(localRet);
- CHKiRet(addSess(pLstn->pSrv, newSock, peerName, peerIP));
+ CHKiRet(addSess(pLstn, newSock, peerName, peerIP));
}
finalize_it:
@@ -979,7 +1012,7 @@ sessActivity(ptcpsess_t *pSess)
CHKiRet(DataRcvd(pSess, rcvBuf, lenRcv));
} else if (lenRcv == 0) {
/* session was closed, do clean-up */
- if(pSess->pSrv->bEmitMsgOnClose) {
+ if(pSess->pLstn->pSrv->bEmitMsgOnClose) {
uchar *peerName;
int lenPeer;
prop.GetString(pSess->peerName, &peerName, &lenPeer);
@@ -1085,6 +1118,8 @@ shutdownSrv(ptcpsrv_t *pSrv)
pLstn = pSrv->pLstn;
while(pLstn != NULL) {
close(pLstn->sock);
+ statsobj.Destruct(&(pLstn->stats));
+ /* now unlink listner */
lstnDel = pLstn;
pLstn = pLstn->next;
DBGPRINTF("imptcp shutdown listen socket %d\n", lstnDel->sock);
@@ -1132,6 +1167,7 @@ CODESTARTmodExit
/* release objects we used */
objRelease(glbl, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
objRelease(net, LM_NET_FILENAME);
objRelease(datetime, CORE_COMPONENT);
@@ -1144,6 +1180,7 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
cs.bEmitMsgOnClose = 0;
+ cs.bSuppOctetFram = 1;
cs.iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
free(cs.pszInputName);
cs.pszInputName = NULL;
@@ -1167,6 +1204,7 @@ CODEmodInit_QueryRegCFSLineHdlr
initConfigSettings();
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
CHKiRet(objUse(net, LM_NET_FILENAME));
CHKiRet(objUse(errmsg, CORE_COMPONENT));
@@ -1176,6 +1214,8 @@ CODEmodInit_QueryRegCFSLineHdlr
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserversupportoctetcountedframing"), 0, eCmdHdlrBinary,
+ NULL, &cs.bSuppOctetFram, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpservernotifyonconnectionclose"), 0,
eCmdHdlrBinary, NULL, &cs.bEmitMsgOnClose, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputptcpserveraddtlframedelimiter"), 0, eCmdHdlrInt,
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index 6ab39477..2239d0f4 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -83,6 +83,7 @@ static permittedPeers_t *pPermPeersRoot = NULL;
/* config settings */
static int iTCPSessMax = 200; /* max number of sessions */
+static int bSuppOctetFram = 1; /* octet counted TCP framing supported? */
static int iTCPLstnMax = 20; /* max number of sessions */
static int iStrmDrvrMode = 0; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */
static int bEmitMsgOnClose = 0; /* emit an informational message on close by remote peer */
@@ -215,7 +216,7 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
CHKiRet(tcpsrv.SetRuleset(pOurTcpsrv, pBindRuleset));
CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, pszInputName == NULL ?
UCHAR_CONSTANT("imtcp") : pszInputName));
- tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal);
+ tcpsrv.configureTCPListen(pOurTcpsrv, pNewVal, bSuppOctetFram);
finalize_it:
if(iRet != RS_RET_OK) {
@@ -287,6 +288,7 @@ static rsRetVal
resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
iTCPSessMax = 200;
+ bSuppOctetFram = 1;
iTCPLstnMax = 20;
iStrmDrvrMode = 0;
bEmitMsgOnClose = 0;
@@ -324,6 +326,8 @@ CODEmodInit_QueryRegCFSLineHdlr
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserverrun"), 0, eCmdHdlrGetWord,
addTCPListener, NULL, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpserversupportoctetcountedframing"), 0, eCmdHdlrBinary,
+ NULL, &bSuppOctetFram, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxsessions"), 0, eCmdHdlrInt,
NULL, &iTCPSessMax, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr(UCHAR_CONSTANT("inputtcpmaxlisteners"), 0, eCmdHdlrInt,
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index a5002591..46631e97 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-12-21 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007-2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2011 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -26,6 +26,7 @@
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
#include "config.h"
+#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
@@ -51,6 +52,7 @@
#include "datetime.h"
#include "prop.h"
#include "ruleset.h"
+#include "statsobj.h"
#include "unicode-helper.h"
MODULE_TYPE_INPUT
@@ -66,6 +68,16 @@ DEFobjCurrIf(net)
DEFobjCurrIf(datetime)
DEFobjCurrIf(prop)
DEFobjCurrIf(ruleset)
+DEFobjCurrIf(statsobj)
+
+
+static struct lstn_s {
+ struct lstn_s *next;
+ int sock; /* socket */
+ ruleset_t *pRuleset; /* bound ruleset */
+ statsobj_t *stats; /* listener stats */
+ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
+} *lcnfRoot = NULL, *lcnfLast = NULL;
static int bDoACLCheck; /* are ACL checks neeed? Cached once immediately before listener startup */
static int iMaxLine; /* maximum UDP message size supported */
@@ -73,9 +85,7 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte
* This shall prevent remote DoS when the "discard on disallowed sender"
* message is configured to be logged on occurance of such a case.
*/
-static int *udpLstnSocks = NULL; /* Internet datagram sockets, first element is nbr of elements
- * read-only after init(), but beware of restart! */
-static ruleset_t **udpRulesets = NULL; /* ruleset to be used with sockets in question (entry 0 is empty) */
+
static uchar *pszBindAddr = NULL; /* IP to bind socket to */
static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a global and alloc
* it so that we can check available memory in willRun() and request
@@ -190,9 +200,11 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
DEFiRet;
uchar *bindAddr;
int *newSocks;
- int *tmpSocks;
- int iSrc, iDst;
- ruleset_t **tmpRulesets;
+ int iSrc;
+ struct lstn_s *newlcnfinfo;
+ uchar *bindName;
+ uchar *port;
+ uchar statname[64];
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -203,55 +215,46 @@ static rsRetVal addListner(void __attribute__((unused)) *pVal, uchar *pNewVal)
bindAddr = NULL;
else
bindAddr = pszBindAddr;
+ bindName = (bindAddr == NULL) ? (uchar*)"*" : bindAddr;
- DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n",
- (bindAddr == NULL) ? (uchar*)"*" : bindAddr, pNewVal);
+ DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, pNewVal);
- newSocks = net.create_udp_socket(bindAddr, (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal, 1);
+ port = (pNewVal == NULL || *pNewVal == '\0') ? (uchar*) "514" : pNewVal;
+ newSocks = net.create_udp_socket(bindAddr, port, 1);
if(newSocks != NULL) {
/* we now need to add the new sockets to the existing set */
- if(udpLstnSocks == NULL) {
- /* esay, we can just replace it */
- udpLstnSocks = newSocks;
- CHKmalloc(udpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (newSocks[0] + 1)));
- for(iDst = 1 ; iDst <= newSocks[0] ; ++iDst)
- udpRulesets[iDst] = pBindRuleset;
- } else {
- /* we need to add them */
- tmpSocks = (int*) MALLOC(sizeof(int) * (1 + newSocks[0] + udpLstnSocks[0]));
- tmpRulesets = (ruleset_t**) MALLOC(sizeof(ruleset_t*) * (1 + newSocks[0] + udpLstnSocks[0]));
- if(tmpSocks == NULL || tmpRulesets == NULL) {
- DBGPRINTF("out of memory trying to allocate udp listen socket array\n");
- /* in this case, we discard the new sockets but continue with what we
- * already have
- */
- free(newSocks);
- free(tmpSocks);
- free(tmpRulesets);
- ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- } else {
- /* ready to copy */
- iDst = 1;
- for(iSrc = 1 ; iSrc <= udpLstnSocks[0] ; ++iSrc, ++iDst) {
- tmpSocks[iDst] = udpLstnSocks[iSrc];
- tmpRulesets[iDst] = udpRulesets[iSrc];
- }
- for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc, ++iDst) {
- tmpSocks[iDst] = newSocks[iSrc];
- tmpRulesets[iDst] = pBindRuleset;
- }
- tmpSocks[0] = udpLstnSocks[0] + newSocks[0];
- free(newSocks);
- free(udpLstnSocks);
- udpLstnSocks = tmpSocks;
- free(udpRulesets);
- udpRulesets = tmpRulesets;
+ /* ready to copy */
+ for(iSrc = 1 ; iSrc <= newSocks[0] ; ++iSrc) {
+ CHKmalloc(newlcnfinfo = (struct lstn_s*) MALLOC(sizeof(struct lstn_s)));
+ newlcnfinfo->next = NULL;
+ newlcnfinfo->sock = newSocks[iSrc];
+ newlcnfinfo->pRuleset = pBindRuleset;
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&(newlcnfinfo->stats)));
+ snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port);
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname));
+ STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit);
+ CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &(newlcnfinfo->ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(newlcnfinfo->stats));
+ /* link to list. Order must be preserved to take care for
+ * conflicting matches.
+ */
+ if(lcnfRoot == NULL)
+ lcnfRoot = newlcnfinfo;
+ if(lcnfLast == NULL)
+ lcnfLast = newlcnfinfo;
+ else {
+ lcnfLast->next = newlcnfinfo;
+ lcnfLast = newlcnfinfo;
}
}
}
finalize_it:
free(pNewVal); /* in any case, this is no longer needed */
+ free(newSocks);
RETiRet;
}
@@ -294,8 +297,7 @@ finalize_it:
* on scheduling order. -- rgerhards, 2008-10-02
*/
static inline rsRetVal
-processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev, int *pbIsPermitted,
- ruleset_t *pRuleset)
+processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
{
DEFiRet;
int iNbrTimeUsed;
@@ -315,7 +317,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev,
if(pThrd->bShallStop == TRUE)
ABORT_FINALIZE(RS_RET_FORCE_TERM);
socklen = sizeof(struct sockaddr_storage);
- lenRcvBuf = recvfrom(fd, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen);
+ lenRcvBuf = recvfrom(lstn->sock, (char*) pRcvBuf, iMaxLine, 0, (struct sockaddr *)&frominet, &socklen);
if(lenRcvBuf < 0) {
if(errno != EINTR && errno != EAGAIN) {
rs_strerror_r(errno, errStr, sizeof(errStr));
@@ -360,7 +362,7 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev,
*pbIsPermitted = 1; /* no check -> everything permitted */
}
- DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", fd, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
+ DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
if(*pbIsPermitted != 0) {
if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) {
@@ -370,13 +372,14 @@ processSocket(thrdInfo_t *pThrd, int fd, struct sockaddr_storage *frominetPrev,
CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf);
MsgSetInputName(pMsg, pInputName);
- MsgSetRuleset(pMsg, pRuleset);
+ MsgSetRuleset(pMsg, lstn->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL;
if(*pbIsPermitted == 2)
pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */
CHKiRet(msgSetFromSockinfo(pMsg, &frominet));
CHKiRet(submitMsg(pMsg));
+ STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit);
}
}
@@ -443,6 +446,8 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
struct epoll_event *udpEPollEvt = NULL;
struct epoll_event currEvt[NUM_EPOLL_EVENTS];
char errStr[1024];
+ struct lstn_s *lstn;
+ int nLstn;
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
@@ -451,7 +456,11 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
bIsPermitted = 0;
memset(&frominetPrev, 0, sizeof(frominetPrev));
- CHKmalloc(udpEPollEvt = calloc(udpLstnSocks[0], sizeof(struct epoll_event)));
+ /* count num listeners -- do it here in order to avoid inconsistency */
+ nLstn = 0;
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next)
+ ++nLstn;
+ CHKmalloc(udpEPollEvt = calloc(nLstn, sizeof(struct epoll_event)));
#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
DBGPRINTF("imudp uses epoll_create1()\n");
@@ -471,16 +480,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
/* fill the epoll set - we need to do this only once, as the set
* can not change dyamically.
*/
- for (i = 0; i < *udpLstnSocks; i++) {
- if (udpLstnSocks[i+1] != -1) {
+ i = 0;
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
+ if(lstn->sock != -1) {
udpEPollEvt[i].events = EPOLLIN | EPOLLET;
- udpEPollEvt[i].data.u64 = i+1;
- if(epoll_ctl(efd, EPOLL_CTL_ADD, udpLstnSocks[i+1], &(udpEPollEvt[i])) < 0) {
+ udpEPollEvt[i].data.u64 = (long long unsigned) lstn;
+ if(epoll_ctl(efd, EPOLL_CTL_ADD, lstn->sock, &(udpEPollEvt[i])) < 0) {
rs_strerror_r(errno, errStr, sizeof(errStr));
errmsg.LogError(errno, NO_ERRCODE, "epoll_ctrl failed on fd %d with %s\n",
- udpLstnSocks[i+1], errStr);
+ lstn->sock, errStr);
}
}
+ i++;
}
while(1) {
@@ -492,8 +503,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
break; /* terminate input! */
for(i = 0 ; i < nfds ; ++i) {
- processSocket(pThrd, udpLstnSocks[currEvt[i].data.u64], &frominetPrev, &bIsPermitted,
- udpRulesets[currEvt[i].data.u64]);
+ processSocket(pThrd, (struct lstn_s*)currEvt[i].data.u64, &frominetPrev, &bIsPermitted);
}
}
@@ -510,10 +520,10 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
DEFiRet;
int maxfds;
int nfds;
- int i;
fd_set readfds;
struct sockaddr_storage frominetPrev;
int bIsPermitted;
+ struct lstn_s *lstn;
/* start "name caching" algo by making sure the previous system indicator
* is invalidated.
@@ -524,22 +534,18 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
DBGPRINTF("imudp uses select()\n");
while(1) {
- /* Add the Unix Domain Sockets to the list of read
- * descriptors.
- * rgerhards 2005-08-01: we must now check if there are
- * any local sockets to listen to at all. If the -o option
- * is given without -a, we do not need to listen at all..
+ /* Add the Unix Domain Sockets to the list of read descriptors.
*/
maxfds = 0;
FD_ZERO(&readfds);
/* Add the UDP listen sockets to the list of read descriptors. */
- for (i = 0; i < *udpLstnSocks; i++) {
- if (udpLstnSocks[i+1] != -1) {
+ for(lstn = lcnfRoot ; lstn != NULL ; lstn = lstn->next) {
+ if (lstn->sock != -1) {
if(Debug)
- net.debugListenInfo(udpLstnSocks[i+1], "UDP");
- FD_SET(udpLstnSocks[i+1], &readfds);
- if(udpLstnSocks[i+1]>maxfds) maxfds=udpLstnSocks[i+1];
+ net.debugListenInfo(lstn->sock, "UDP");
+ FD_SET(lstn->sock, &readfds);
+ if(lstn->sock>maxfds) maxfds=lstn->sock;
}
}
if(Debug) {
@@ -555,10 +561,9 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
- for(i = 0; nfds && i < *udpLstnSocks; i++) {
- if(FD_ISSET(udpLstnSocks[i+1], &readfds)) {
- processSocket(pThrd, udpLstnSocks[i+1], &frominetPrev, &bIsPermitted,
- udpRulesets[i+1]);
+ for(lstn = lcnfRoot ; nfds && lstn != NULL ; lstn = lstn->next) {
+ if(FD_ISSET(lstn->sock, &readfds)) {
+ processSocket(pThrd, lstn, &frominetPrev, &bIsPermitted);
--nfds; /* indicate we have processed one descriptor */
}
}
@@ -570,7 +575,7 @@ rsRetVal rcvMainLoop(thrdInfo_t *pThrd)
#endif /* #if HAVE_EPOLL_CREATE1 */
/* This function is called to gather input.
- * Note that udpLstnSocks must be non-NULL because otherwise we would not have
+ * Note that sock must be non-NULL because otherwise we would not have
* indicated that we want to run (or we have a programming error ;)). -- rgerhards, 2008-10-02
*/
BEGINrunInput
@@ -591,8 +596,10 @@ CODESTARTwillRun
net.HasRestrictions(UCHAR_CONSTANT("UDP"), &bDoACLCheck); /* UDP */
/* if we could not set up any listners, there is no point in running... */
- if(udpLstnSocks == NULL)
+ if(lcnfRoot == NULL) {
+ DBGPRINTF("imudp: no listeners configured, will not run\n");
ABORT_FINALIZE(RS_RET_NO_RUN);
+ }
iMaxLine = glbl.GetMaxLine();
@@ -602,15 +609,18 @@ ENDwillRun
BEGINafterRun
+ struct lstn_s *lstn, *lstnDel;
CODESTARTafterRun
/* do cleanup here */
net.clearAllowedSenders((uchar*)"UDP");
- if(udpLstnSocks != NULL) {
- net.closeUDPListenSockets(udpLstnSocks);
- udpLstnSocks = NULL;
- free(udpRulesets);
- udpRulesets = NULL;
+ for(lstn = lcnfRoot ; lstn != NULL ; ) {
+ statsobj.Destruct(&(lstn->stats));
+ close(lstn->sock);
+ lstnDel = lstn;
+ lstn = lstn->next;
+ free(lstnDel);
}
+ lcnfRoot = lcnfLast = NULL;
if(pRcvBuf != NULL) {
free(pRcvBuf);
pRcvBuf = NULL;
@@ -625,6 +635,7 @@ CODESTARTmodExit
/* release what we no longer need */
objRelease(errmsg, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
objRelease(datetime, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
@@ -662,6 +673,7 @@ CODESTARTmodInit
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 5867f1c9..ad290331 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -89,6 +89,7 @@ DEF_IMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
DEFobjCurrIf(prop)
+DEFobjCurrIf(net)
DEFobjCurrIf(parser)
DEFobjCurrIf(datetime)
DEFobjCurrIf(statsobj)
@@ -784,7 +785,7 @@ CODESTARTwillRun
else if(sd_booted()) {
struct stat st;
if(stat(SYSTEMD_JOURNAL, &st) != -1 && S_ISDIR(st.st_mode)) {
- listeners[0].sockName = (uchar*) SYSTEMD_PATH_LOG;
+ listeners[0].sockName = (uchar*)SYSTEMD_PATH_LOG;
}
}
if(ratelimitIntervalSysSock > 0) {
@@ -826,6 +827,8 @@ CODESTARTwillRun
CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imuxsock"), sizeof("imuxsock") - 1));
CHKiRet(prop.ConstructFinalize(pInputName));
+ pLocalHostIP = glbl.GetLocalHostIP();
+
finalize_it:
ENDwillRun
@@ -931,6 +934,7 @@ CODESTARTmodInit
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(net, CORE_COMPONENT));
CHKiRet(objUse(prop, CORE_COMPONENT));
CHKiRet(objUse(statsobj, CORE_COMPONENT));
CHKiRet(objUse(datetime, CORE_COMPONENT));
@@ -954,10 +958,6 @@ CODEmodInit_QueryRegCFSLineHdlr
listeners[i].fd = -1;
}
- CHKiRet(prop.Construct(&pLocalHostIP));
- CHKiRet(prop.SetString(pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1));
- CHKiRet(prop.ConstructFinalize(pLocalHostIP));
-
/* now init listen socket zero, the local log socket */
CHKiRet(prop.Construct(&(listeners[0].hostName)));
CHKiRet(prop.SetString(listeners[0].hostName, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())));