summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imptcp/imptcp.c180
-rw-r--r--plugins/imrelp/imrelp.c220
-rw-r--r--plugins/imuxsock/imuxsock.c4
-rw-r--r--plugins/mmcount/Makefile.am8
-rw-r--r--plugins/mmcount/mmcount.c342
-rw-r--r--plugins/mmfields/Makefile.am8
-rw-r--r--plugins/mmfields/mmfields.c274
-rw-r--r--plugins/omprog/omprog.c132
-rw-r--r--plugins/omrelp/omrelp.c205
9 files changed, 1321 insertions, 52 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 906521dd..d3a29470 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -10,7 +10,7 @@
*
* File begun on 2010-08-10 by RGerhards
*
- * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -50,6 +50,8 @@
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/tcp.h>
+#include <stdint.h>
+#include <zlib.h>
#if HAVE_FCNTL_H
#include <fcntl.h>
#endif
@@ -93,6 +95,11 @@ static void * wrkr(void *myself);
#define DFLT_wrkrMax 2
+#define COMPRESS_NEVER 0
+#define COMPRESS_SINGLE_MSG 1 /* old, single-message compression */
+/* all other settings are for stream-compression */
+#define COMPRESS_STREAM_ALWAYS 2
+
/* config settings */
typedef struct configSettings_s {
int bKeepAlive; /* support keep-alive packets */
@@ -117,6 +124,7 @@ struct instanceConf_s {
int bEmitMsgOnClose;
int bSuppOctetFram; /* support octet-counted framing? */
int iAddtlFrameDelim;
+ uint8_t compressionMode;
uchar *pszBindPort; /* port to bind to */
uchar *pszBindAddr; /* IP to bind socket to */
uchar *pszBindRuleset; /* name of ruleset to bind to */
@@ -156,6 +164,7 @@ static struct cnfparamdescr inppdescr[] = {
{ "ruleset", eCmdHdlrString, 0 },
{ "supportoctetcountedframing", eCmdHdlrBinary, 0 },
{ "notifyonconnectionclose", eCmdHdlrBinary, 0 },
+ { "compression.mode", eCmdHdlrGetWord, 0 },
{ "keepalive", eCmdHdlrBinary, 0 },
{ "keepalive.probes", eCmdHdlrInt, 0 },
{ "keepalive.time", eCmdHdlrInt, 0 },
@@ -191,6 +200,7 @@ struct ptcpsrv_s {
int iKeepAliveIntvl;
int iKeepAliveProbes;
int iKeepAliveTime;
+ uint8_t compressionMode;
uchar *pszInputName;
prop_t *pInputName; /* InputName in (fast to process) property format */
ruleset_t *pRuleset;
@@ -207,11 +217,13 @@ struct ptcpsrv_s {
* includes support for doubly-linked list.
*/
struct ptcpsess_s {
-// ptcpsrv_t *pSrv; /* our server TODO: check remove! */
ptcplstn_t *pLstn; /* our listener */
ptcpsess_t *prev, *next;
int sock;
epolld_t *epd;
+ sbool bzInitDone; /* did we do an init of zstrm already? */
+ z_stream zstrm; /* zip stream to use for tcp compression */
+ uint8_t compressionMode;
//--- from tcps_sess.h
int iMsg; /* index of next char to store in msg */
int bAtStrtOfFram; /* are we at the very beginning of a new frame? */
@@ -239,6 +251,8 @@ struct ptcplstn_s {
sbool bSuppOctetFram;
epolld_t *epd;
statsobj_t *stats; /* listener stats */
+ intctr_t rcvdBytes;
+ intctr_t rcvdDecompressed;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
};
@@ -806,19 +820,19 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG
* EXTRACT from tcps_sess.c
*/
static rsRetVal
-DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
+DataRcvdUncompressed(ptcpsess_t *pThis, char *pData, size_t iLen, time_t ttGenTime)
{
multi_submit_t multiSub;
msg_t *pMsgs[CONF_NUM_MULTISUB];
struct syslogTime stTime;
- time_t ttGenTime;
char *pEnd;
DEFiRet;
assert(pData != NULL);
assert(iLen > 0);
- datetime.getCurrTime(&stTime, &ttGenTime);
+ if(ttGenTime == 0)
+ datetime.getCurrTime(&stTime, &ttGenTime);
multiSub.ppMsgs = pMsgs;
multiSub.maxElem = CONF_NUM_MULTISUB;
multiSub.nElem = 0;
@@ -836,6 +850,71 @@ finalize_it:
RETiRet;
}
+static rsRetVal
+DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len)
+{
+ struct syslogTime stTime;
+ time_t ttGenTime;
+ int zRet; /* zlib return state */
+ unsigned outavail;
+ uchar zipBuf[64*1024]; // TODO: alloc on heap, and much larger (512KiB? batch size!)
+ DEFiRet;
+ // TODO: can we do stats counters? Even if they are not 100% correct under all cases,
+ // by simply updating the input and output sizes?
+ uint64_t outtotal;
+
+ assert(iLen > 0);
+
+ datetime.getCurrTime(&stTime, &ttGenTime);
+ outtotal = 0;
+
+ if(!pThis->bzInitDone) {
+ /* allocate deflate state */
+ pThis->zstrm.zalloc = Z_NULL;
+ pThis->zstrm.zfree = Z_NULL;
+ pThis->zstrm.opaque = Z_NULL;
+ zRet = inflateInit(&pThis->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("imptcp: error %d returned from zlib/inflateInit()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+ pThis->bzInitDone = RSTRUE;
+ }
+
+ pThis->zstrm.next_in = (Bytef*) buf;
+ pThis->zstrm.avail_in = len;
+ /* run inflate() on buffer until everything has been uncompressed */
+ do {
+ DBGPRINTF("imptcp: in inflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = sizeof(zipBuf);
+ pThis->zstrm.next_out = zipBuf;
+ zRet = inflate(&pThis->zstrm, Z_NO_FLUSH); /* no bad return value */
+ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail = sizeof(zipBuf) - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ outtotal += outavail;
+ pThis->pLstn->rcvdDecompressed += outavail;
+ CHKiRet(DataRcvdUncompressed(pThis, (char*)zipBuf, outavail, ttGenTime));
+ }
+ } while (pThis->zstrm.avail_out == 0);
+
+ dbgprintf("end of DataRcvCompress, sizes: in %lld, out %llu\n", (long long) len, outtotal);
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
+{
+ DEFiRet;
+ pThis->pLstn->rcvdBytes += iLen;
+ if(pThis->compressionMode >= COMPRESS_STREAM_ALWAYS)
+ iRet = DataRcvdCompressed(pThis, pData, iLen);
+ else
+ iRet = DataRcvdUncompressed(pThis, pData, iLen, 0);
+ RETiRet;
+}
+
/****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/
@@ -936,6 +1015,14 @@ addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6)
STATSCOUNTER_INIT(pLstn->ctrSubmit, pLstn->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(pLstn->ctrSubmit)));
+ /* the following counters are not protected by mutexes; we accept
+ * that they may not be 100% correct */
+ pLstn->rcvdBytes = 0,
+ pLstn->rcvdDecompressed = 0;
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.received"),
+ ctrType_IntCtr, &(pLstn->rcvdBytes)));
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.decompressed"),
+ ctrType_IntCtr, &(pLstn->rcvdDecompressed)));
CHKiRet(statsobj.ConstructFinalize(pLstn->stats));
/* add to start of server's listener list */
@@ -948,6 +1035,7 @@ addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6)
iRet = addEPollSock(epolld_lstn, pLstn, sock, &pLstn->epd);
finalize_it:
+dbgprintf("DDDD: addLstn return %d\n", iRet);
RETiRet;
}
@@ -968,9 +1056,11 @@ addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP)
pSess->bSuppOctetFram = pLstn->bSuppOctetFram;
pSess->inputState = eAtStrtFram;
pSess->iMsg = 0;
+ pSess->bzInitDone = 0;
pSess->bAtStrtOfFram = 1;
pSess->peerName = peerName;
pSess->peerIP = peerIP;
+ pSess->compressionMode = pLstn->pSrv->compressionMode;
/* add to start of server's listener list */
pSess->prev = NULL;
@@ -988,6 +1078,44 @@ finalize_it:
}
+/* finish zlib buffer, to be called before closing the session.
+ */
+static rsRetVal
+doZipFinish(ptcpsess_t *pSess)
+{
+ int zRet; /* zlib return state */
+ DEFiRet;
+ unsigned outavail;
+ uchar zipBuf[32*1024]; // TODO: use "global" one from pSess
+
+ if(!pSess->bzInitDone)
+ goto done;
+
+ pSess->zstrm.avail_in = 0;
+ /* run inflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("doZipFinish: in inflate() loop, avail_in %d, total_in %ld\n", pSess->zstrm.avail_in, pSess->zstrm.total_in);
+ pSess->zstrm.avail_out = sizeof(zipBuf);
+ pSess->zstrm.next_out = zipBuf;
+ zRet = inflate(&pSess->zstrm, Z_FINISH); /* no bad return value */
+ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pSess->zstrm.avail_out);
+ outavail = sizeof(zipBuf) - pSess->zstrm.avail_out;
+ if(outavail != 0) {
+ pSess->pLstn->rcvdDecompressed += outavail;
+ CHKiRet(DataRcvdUncompressed(pSess, (char*)zipBuf, outavail, 0)); // TODO: query time!
+ }
+ } while (pSess->zstrm.avail_out == 0);
+
+finalize_it:
+ zRet = inflateEnd(&pSess->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("imptcp: error %d returned from zlib/inflateEnd()\n", zRet);
+ }
+
+ pSess->bzInitDone = 0;
+done: RETiRet;
+}
+
/* close/remove a session
* NOTE: we must first remove the fd from the epoll set and then close it -- else we
* get an error "bad file descriptor" from epoll.
@@ -998,6 +1126,9 @@ closeSess(ptcpsess_t *pSess)
int sock;
DEFiRet;
+ if(pSess->compressionMode >= COMPRESS_STREAM_ALWAYS)
+ doZipFinish(pSess);
+
sock = pSess->sock;
CHKiRet(removeEPollSock(sock, pSess->epd));
close(sock);
@@ -1048,6 +1179,7 @@ createInstance(instanceConf_t **pinst)
inst->pBindRuleset = NULL;
inst->ratelimitBurst = 10000; /* arbitrary high limit */
inst->ratelimitInterval = 0; /* off */
+ inst->compressionMode = COMPRESS_SINGLE_MSG;
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -1127,6 +1259,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
+ pSrv->compressionMode = inst->compressionMode;
CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort));
ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst);
ratelimitSetThreadSafe(pSrv->ratelimiter);
@@ -1269,6 +1402,10 @@ sessActivity(ptcpsess_t *pSess)
{
int lenRcv;
int lenBuf;
+ uchar *peerName;
+ int lenPeer;
+ int remsock = 0; /* init just to keep compiler happy... :-( */
+ sbool bEmitOnClose = 0;
char rcvBuf[128*1024];
DEFiRet;
@@ -1285,13 +1422,15 @@ sessActivity(ptcpsess_t *pSess)
} else if (lenRcv == 0) {
/* session was closed, do clean-up */
if(pSess->pLstn->pSrv->bEmitMsgOnClose) {
- uchar *peerName;
- int lenPeer;
- prop.GetString(pSess->peerName, &peerName, &lenPeer);
- errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "imptcp session %d closed by remote peer %s.\n",
- pSess->sock, peerName);
+ prop.GetString(pSess->peerName, &peerName, &lenPeer),
+ remsock = pSess->sock;
+ bEmitOnClose = 1;
+ }
+ CHKiRet(closeSess(pSess)); /* close may emit more messages in strmzip mode! */
+ if(bEmitOnClose) {
+ errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "imptcp session %d closed by "
+ "remote peer %s.\n", remsock, peerName);
}
- CHKiRet(closeSess(pSess));
break;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK)
@@ -1415,6 +1554,7 @@ wrkr(void *myself)
BEGINnewInpInst
struct cnfparamvals *pvals;
instanceConf_t *inst;
+ char *cstr;
int i;
CODESTARTnewInpInst
DBGPRINTF("newInpInst (imptcp)\n");
@@ -1443,6 +1583,19 @@ CODESTARTnewInpInst
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "supportoctetcountedframing")) {
inst->bSuppOctetFram = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "compression.mode")) {
+ cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
+ if(!strcasecmp(cstr, "stream:always")) {
+ inst->compressionMode = COMPRESS_STREAM_ALWAYS;
+ } else if(!strcasecmp(cstr, "none")) {
+ inst->compressionMode = COMPRESS_NEVER;
+ } else {
+ errmsg.LogError(0, RS_RET_PARAM_ERROR, "omfwd: invalid value for 'compression.mode' "
+ "parameter (given is '%s')", cstr);
+ free(cstr);
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+ }
+ free(cstr);
} else if(!strcmp(inppblk.descr[i].name, "keepalive")) {
inst->bKeepAlive = (int) pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "keepalive.probes")) {
@@ -1650,6 +1803,7 @@ shutdownSrv(ptcpsrv_t *pSrv)
ptcplstn_t *pLstn, *lstnDel;
ptcpsess_t *pSess, *sessDel;
+dbgprintf("DDDD: enter shutdownSrv\n");
/* listeners */
pLstn = pSrv->pLstn;
while(pLstn != NULL) {
@@ -1658,7 +1812,9 @@ shutdownSrv(ptcpsrv_t *pSrv)
/* now unlink listner */
lstnDel = pLstn;
pLstn = pLstn->next;
- DBGPRINTF("imptcp shutdown listen socket %d\n", lstnDel->sock);
+ DBGPRINTF("imptcp shutdown listen socket %d (rcvd %lld bytes, "
+ "decompressed %lld)\n", lstnDel->sock, lstnDel->rcvdBytes,
+ lstnDel->rcvdDecompressed);
free(lstnDel->epd);
free(lstnDel);
}
diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c
index 5e0ae552..d04e41e1 100644
--- a/plugins/imrelp/imrelp.c
+++ b/plugins/imrelp/imrelp.c
@@ -4,7 +4,7 @@
*
* File begun on 2008-03-13 by RGerhards
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -47,6 +47,7 @@
#include "prop.h"
#include "ruleset.h"
#include "glbl.h"
+#include "statsobj.h"
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
@@ -59,6 +60,7 @@ DEFobjCurrIf(prop)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(ruleset)
DEFobjCurrIf(glbl)
+DEFobjCurrIf(statsobj)
/* forward definitions */
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);
@@ -74,7 +76,31 @@ static struct configSettings_s {
struct instanceConf_s {
uchar *pszBindPort; /* port to bind to */
+ sbool bEnableTLS;
+ sbool bEnableTLSZip;
+ int dhBits;
+ uchar *pristring; /* GnuTLS priority string (NULL if not to be provided) */
+ uchar *authmode; /* TLS auth mode */
+ uchar *caCertFile;
+ uchar *myCertFile;
+ uchar *myPrivKeyFile;
+ struct {
+ int nmemb;
+ uchar **name;
+ } permittedPeers;
+
struct instanceConf_s *next;
+ /* with librelp, this module does not have any own specific session
+ * or listener active data item. As a "work-around", we keep some
+ * data items inside the configuration object. To keep things
+ * decently clean, we put them all into their dedicated struct. So
+ * it is easy to judge what is actual configuration and what is
+ * dynamic runtime data. -- rgerhards, 2013-06-18
+ */
+ struct {
+ statsobj_t *stats; /* listener stats */
+ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
+ } data;
};
@@ -88,9 +114,28 @@ struct modConfData_s {
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */
+/* module-global parameters */
+static struct cnfparamdescr modpdescr[] = {
+ { "ruleset", eCmdHdlrGetWord, 0 },
+};
+static struct cnfparamblk modpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+ };
+
/* input instance parameters */
static struct cnfparamdescr inppdescr[] = {
- { "port", eCmdHdlrString, CNFPARAM_REQUIRED }
+ { "port", eCmdHdlrString, CNFPARAM_REQUIRED },
+ { "tls", eCmdHdlrBinary, 0 },
+ { "tls.permittedpeer", eCmdHdlrArray, 0 },
+ { "tls.authmode", eCmdHdlrString, 0 },
+ { "tls.dhbits", eCmdHdlrInt, 0 },
+ { "tls.prioritystring", eCmdHdlrString, 0 },
+ { "tls.cacert", eCmdHdlrString, 0 },
+ { "tls.mycert", eCmdHdlrString, 0 },
+ { "tls.myprivkey", eCmdHdlrString, 0 },
+ { "tls.compression", eCmdHdlrBinary, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -102,6 +147,30 @@ static struct cnfparamblk inppblk =
/* ------------------------------ callbacks ------------------------------ */
+static void
+onErr(void *pUsr, char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
+{
+ instanceConf_t *inst = (instanceConf_t*) pUsr;
+ errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "imrelp[%s]: error '%s', object "
+ " '%s' - input may not work as intended",
+ inst->pszBindPort, errmesg, objinfo);
+}
+
+static void
+onGenericErr(char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
+{
+ errmsg.LogError(0, RS_RET_RELP_ERR, "imrelp: librelp error '%s', object "
+ " '%s' - input may not work as intended", errmesg, objinfo);
+}
+
+static void
+onAuthErr(void *pUsr, char *authinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
+{
+ instanceConf_t *inst = (instanceConf_t*) pUsr;
+ errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "imrelp[%s]: authentication error '%s', peer "
+ "is '%s'", inst->pszBindPort, errmesg, authinfo);
+}
+
/* callback for receiving syslog messages. This function is invoked from the
* RELP engine when a syslog message arrived. It must return a relpRetVal,
* with anything else but RELP_RET_OK terminating the relp session. Please note
@@ -113,10 +182,11 @@ static struct cnfparamblk inppblk =
* we will only see the hostname (twice). -- rgerhards, 2009-10-14
*/
static relpRetVal
-onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *msg, size_t lenMsg)
+onSyslogRcv(void *pUsr, uchar *pHostname, uchar *pIP, uchar *msg, size_t lenMsg)
{
prop_t *pProp = NULL;
msg_t *pMsg;
+ instanceConf_t *inst = (instanceConf_t*) pUsr;
DEFiRet;
CHKiRet(msgConstruct(&pMsg));
@@ -134,6 +204,7 @@ onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *msg, size_t lenMsg)
CHKiRet(MsgSetRcvFromIPStr(pMsg, pIP, ustrlen(pIP), &pProp));
CHKiRet(prop.Destruct(&pProp));
CHKiRet(submitMsg2(pMsg));
+ STATSCOUNTER_INC(inst->data.ctrSubmit, inst->data.mutCtrSubmit);
finalize_it:
@@ -155,6 +226,15 @@ createInstance(instanceConf_t **pinst)
inst->next = NULL;
inst->pszBindPort = NULL;
+ inst->bEnableTLS = 0;
+ inst->bEnableTLSZip = 0;
+ inst->dhBits = 0;
+ inst->pristring = NULL;
+ inst->authmode = NULL;
+ inst->permittedPeers.nmemb = 0;
+ inst->caCertFile = NULL;
+ inst->myCertFile = NULL;
+ inst->myPrivKeyFile = NULL;
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -179,7 +259,7 @@ std_checkRuleset_genErrMsg(modConfData_t *modConf, __attribute__((unused)) insta
}
-/* This function is called when a new listener instace shall be added to
+/* This function is called when a new listener instance shall be added to
* the current config object via the legacy config system. It just shuffles
* all parameters to the listener in-memory instance.
* rgerhards, 2011-05-04
@@ -204,19 +284,63 @@ finalize_it:
static rsRetVal
addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
{
+ relpSrv_t *pSrv;
+ uchar statname[64];
+ int i;
DEFiRet;
if(pRelpEngine == NULL) {
CHKiRet(relpEngineConstruct(&pRelpEngine));
CHKiRet(relpEngineSetDbgprint(pRelpEngine, dbgprintf));
CHKiRet(relpEngineSetFamily(pRelpEngine, glbl.GetDefPFFamily()));
CHKiRet(relpEngineSetEnableCmd(pRelpEngine, (uchar*) "syslog", eRelpCmdState_Required));
- CHKiRet(relpEngineSetSyslogRcv(pRelpEngine, onSyslogRcv));
+ CHKiRet(relpEngineSetSyslogRcv2(pRelpEngine, onSyslogRcv));
+ CHKiRet(relpEngineSetOnErr(pRelpEngine, onErr));
+ CHKiRet(relpEngineSetOnGenericErr(pRelpEngine, onGenericErr));
+ CHKiRet(relpEngineSetOnAuthErr(pRelpEngine, onAuthErr));
if (!glbl.GetDisableDNS()) {
CHKiRet(relpEngineSetDnsLookupMode(pRelpEngine, 1));
}
}
- CHKiRet(relpEngineAddListner(pRelpEngine, inst->pszBindPort));
+ CHKiRet(relpEngineListnerConstruct(pRelpEngine, &pSrv));
+ CHKiRet(relpSrvSetLstnPort(pSrv, inst->pszBindPort));
+ /* support statistics gathering */
+ CHKiRet(statsobj.Construct(&(inst->data.stats)));
+ snprintf((char*)statname, sizeof(statname), "imrelp(%s)",
+ inst->pszBindPort);
+ statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(statsobj.SetName(inst->data.stats, statname));
+ STATSCOUNTER_INIT(inst->data.ctrSubmit, inst->data.mutCtrSubmit);
+ CHKiRet(statsobj.AddCounter(inst->data.stats, UCHAR_CONSTANT("submitted"),
+ ctrType_IntCtr, &(inst->data.ctrSubmit)));
+ CHKiRet(statsobj.ConstructFinalize(inst->data.stats));
+ /* end stats counters */
+ relpSrvSetUsrPtr(pSrv, inst);
+ if(inst->bEnableTLS) {
+ relpSrvEnableTLS(pSrv);
+ if(inst->bEnableTLSZip) {
+ relpSrvEnableTLSZip(pSrv);
+ }
+ if(inst->dhBits) {
+ relpSrvSetDHBits(pSrv, inst->dhBits);
+ }
+ relpSrvSetGnuTLSPriString(pSrv, (char*)inst->pristring);
+ if(relpSrvSetAuthMode(pSrv, (char*)inst->authmode) != RELP_RET_OK) {
+ errmsg.LogError(0, RS_RET_RELP_ERR,
+ "imrelp: invalid auth mode '%s'\n", inst->authmode);
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ if(relpSrvSetCACert(pSrv, (char*) inst->caCertFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpSrvSetOwnCert(pSrv, (char*) inst->myCertFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpSrvSetPrivKey(pSrv, (char*) inst->myPrivKeyFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ for(i = 0 ; i < inst->permittedPeers.nmemb ; ++i) {
+ relpSrvAddPermittedPeer(pSrv, (char*)inst->permittedPeers.name[i]);
+ }
+ }
+ CHKiRet(relpEngineListnerConstructFinalize(pRelpEngine, pSrv));
finalize_it:
RETiRet;
@@ -226,7 +350,7 @@ finalize_it:
BEGINnewInpInst
struct cnfparamvals *pvals;
instanceConf_t *inst;
- int i;
+ int i,j;
CODESTARTnewInpInst
DBGPRINTF("newInpInst (imrelp)\n");
@@ -249,6 +373,29 @@ CODESTARTnewInpInst
continue;
if(!strcmp(inppblk.descr[i].name, "port")) {
inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls")) {
+ inst->bEnableTLS = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "tls.dhbits")) {
+ inst->dhBits = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "tls.prioritystring")) {
+ inst->pristring = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls.authmode")) {
+ inst->authmode = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls.compression")) {
+ inst->bEnableTLSZip = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "tls.cacert")) {
+ inst->caCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls.mycert")) {
+ inst->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls.myprivkey")) {
+ inst->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls.permittedpeer")) {
+ inst->permittedPeers.nmemb = pvals[i].val.d.ar->nmemb;
+ CHKmalloc(inst->permittedPeers.name =
+ malloc(sizeof(uchar*) * inst->permittedPeers.nmemb));
+ for(j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) {
+ inst->permittedPeers.name[j] = (uchar*)es_str2cstr(pvals[i].val.d.ar->arr[j], NULL);
+ }
} else {
dbgprintf("imrelp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -264,19 +411,58 @@ BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
loadModConf = pModConf;
pModConf->pConf = pConf;
+ pModConf->pszBindRuleset = NULL;
+ pModConf->pBindRuleset = NULL;
/* init legacy config variables */
cs.pszBindRuleset = NULL;
ENDbeginCnfLoad
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module "
+ "config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("module (global) param blk for imrelp:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for(i = 0 ; i < modpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(modpblk.descr[i].name, "ruleset")) {
+ loadModConf->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("imrelp: program error, non-handled "
+ "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
+ }
+ }
+finalize_it:
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
BEGINendCnfLoad
CODESTARTendCnfLoad
- if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) {
- loadModConf->pszBindRuleset = NULL;
+ if(loadModConf->pszBindRuleset == NULL) {
+ if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) {
+ loadModConf->pszBindRuleset = NULL;
+ } else {
+ CHKmalloc(loadModConf->pszBindRuleset = ustrdup(cs.pszBindRuleset));
+ }
} else {
- CHKmalloc(loadModConf->pszBindRuleset = ustrdup(cs.pszBindRuleset));
+ if((cs.pszBindRuleset != NULL) && (cs.pszBindRuleset[0] != '\0')) {
+ errmsg.LogError(0, RS_RET_DUP_PARAM, "imrelp: warning: ruleset "
+ "set via legacy directive ignored");
+ }
}
- loadModConf->pBindRuleset = NULL;
finalize_it:
free(cs.pszBindRuleset);
loadModConf = NULL; /* done loading */
@@ -293,6 +479,7 @@ CODESTARTcheckCnf
if(pModConf->pszBindRuleset == NULL) {
pModConf->pBindRuleset = NULL;
} else {
+ DBGPRINTF("imrelp: using ruleset '%s'\n", pModConf->pszBindRuleset);
localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, pModConf->pszBindRuleset);
if(localRet == RS_RET_NOT_FOUND) {
std_checkRuleset_genErrMsg(pModConf, NULL);
@@ -323,13 +510,21 @@ ENDactivateCnf
BEGINfreeCnf
instanceConf_t *inst, *del;
+ int i;
CODESTARTfreeCnf
for(inst = pModConf->root ; inst != NULL ; ) {
free(inst->pszBindPort);
+ free(inst->pristring);
+ free(inst->authmode);
+ statsobj.Destruct(&(inst->data.stats));
+ for(i = 0 ; i < inst->permittedPeers.nmemb ; ++i) {
+ free(inst->permittedPeers.name[i]);
+ }
del = inst;
inst = inst->next;
free(del);
}
+ free(pModConf->pszBindRuleset);
ENDfreeCnf
/* This is used to terminate the plugin. Note that the signal handler blocks
@@ -390,6 +585,7 @@ CODESTARTmodExit
prop.Destruct(&pInputName);
/* release objects we used */
+ objRelease(statsobj, CORE_COMPONENT);
objRelease(ruleset, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
objRelease(prop, CORE_COMPONENT);
@@ -420,6 +616,7 @@ CODEqueryEtryPt_STD_IMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES
CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
@@ -435,6 +632,7 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(net, LM_NET_FILENAME));
CHKiRet(objUse(ruleset, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputrelpserverbindruleset", 0, eCmdHdlrGetWord,
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index c503852c..dad09ab4 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -1284,6 +1284,8 @@ BEGINactivateCnfPrePrivDrop
instanceConf_t *inst;
CODESTARTactivateCnfPrePrivDrop
runModConf = pModConf;
+ if(runModConf->bOmitLocalLogging && nfd == 1)
+ ABORT_FINALIZE(RS_RET_OK);
for(inst = runModConf->root ; inst != NULL ; inst = inst->next) {
addListner(inst);
}
@@ -1325,6 +1327,8 @@ BEGINrunInput
#endif
CODESTARTrunInput
+ if(runModConf->bOmitLocalLogging && nfd == 1)
+ ABORT_FINALIZE(RS_RET_OK);
/* this is an endless loop - it is terminated when the thread is
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
diff --git a/plugins/mmcount/Makefile.am b/plugins/mmcount/Makefile.am
new file mode 100644
index 00000000..9c8c99db
--- /dev/null
+++ b/plugins/mmcount/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = mmcount.la
+
+mmcount_la_SOURCES = mmcount.c
+mmcount_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+mmcount_la_LDFLAGS = -module -avoid-version
+mmcount_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/mmcount/mmcount.c b/plugins/mmcount/mmcount.c
new file mode 100644
index 00000000..56a4de55
--- /dev/null
+++ b/plugins/mmcount/mmcount.c
@@ -0,0 +1,342 @@
+/* mmcount.c
+ * count messages by priority or json property of given app-name.
+ *
+ * Copyright 2013 Red Hat Inc.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <json/json.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "hashtable.h"
+
+#define JSON_COUNT_NAME "!mmcount"
+#define SEVERITY_COUNT 8
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("mmcount")
+
+
+DEFobjCurrIf(errmsg);
+DEF_OMOD_STATIC_DATA
+
+/* config variables */
+
+typedef struct _instanceData {
+ char *pszAppName;
+ int severity[SEVERITY_COUNT];
+ char *pszKey;
+ char *pszValue;
+ int valueCounter;
+ struct hashtable *ht;
+} instanceData;
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "appname", eCmdHdlrGetWord, 0 },
+ { "key", eCmdHdlrGetWord, 0 },
+ { "value", eCmdHdlrGetWord, 0 },
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ENDfreeInstance
+
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ int i;
+
+ pData->pszAppName = NULL;
+ for (i = 0; i < SEVERITY_COUNT; i++)
+ pData->severity[i] = 0;
+ pData->pszKey = NULL;
+ pData->pszValue = NULL;
+ pData->valueCounter = 0;
+ pData->ht = NULL;
+}
+
+static unsigned int
+hash_from_key_fn(void *k)
+{
+ return *(unsigned int *)k;
+}
+
+static int
+key_equals_fn(void *k1, void *k2)
+{
+ return (*(unsigned int *)k1 == *(unsigned int *)k2);
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (mmcount)\n");
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "appname")) {
+ pData->pszAppName = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "key")) {
+ pData->pszKey = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "value")) {
+ pData->pszValue = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ dbgprintf("mmcount: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+
+ if(pData->pszAppName == NULL) {
+ dbgprintf("mmcount: action requires a appname");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(pData->pszKey != NULL && pData->pszValue == NULL) {
+ if(NULL == (pData->ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL))) {
+ DBGPRINTF("mmcount: error creating hash table!\n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+static int *
+getCounter(struct hashtable *ht, char *str) {
+ unsigned int key;
+ int *pCounter;
+ unsigned int *pKey;
+
+ /* we dont store str as key, instead we store hash of the str
+ as key to reduce memory usage */
+ key = hash_from_string(str);
+ pCounter = hashtable_search(ht, &key);
+ if(pCounter) {
+ return pCounter;
+ }
+
+ /* counter is not found for the str, so add new entry and
+ return the counter */
+ if(NULL == (pKey = (unsigned int*)malloc(sizeof(unsigned int)))) {
+ DBGPRINTF("mmcount: memory allocation for key failed\n");
+ return NULL;
+ }
+ *pKey = key;
+
+ if(NULL == (pCounter = (int*)malloc(sizeof(int)))) {
+ DBGPRINTF("mmcount: memory allocation for value failed\n");
+ free(pKey);
+ return NULL;
+ }
+ *pCounter = 0;
+
+ if(!hashtable_insert(ht, pKey, pCounter)) {
+ DBGPRINTF("mmcount: inserting element into hashtable failed\n");
+ free(pKey);
+ free(pCounter);
+ return NULL;
+ }
+ return pCounter;
+}
+
+BEGINdoAction
+ msg_t *pMsg;
+ char *appname;
+ struct json_object *json = NULL;
+ es_str_t *estr = NULL;
+ struct json_object *keyjson = NULL;
+ char *pszValue;
+ int *pCounter;
+CODESTARTdoAction
+ pMsg = (msg_t*) ppString[0];
+ appname = getAPPNAME(pMsg, LOCK_MUTEX);
+
+ if(0 != strcmp(appname, pData->pszAppName)) {
+ /* we are not working for this appname. nothing to do */
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ if(!pData->pszKey) {
+ /* no key given for count, so we count severity */
+ if(pMsg->iSeverity <= SEVERITY_COUNT) {
+ pData->severity[pMsg->iSeverity]++;
+ json = json_object_new_int(pData->severity[pMsg->iSeverity]);
+ }
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* key is given, so get the property json */
+ estr = es_newStrFromBuf(pData->pszKey, strlen(pData->pszKey));
+ if(msgGetCEEPropJSON(pMsg, estr, &keyjson) != RS_RET_OK) {
+ /* key not found in the message. nothing to do */
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* key found, so get the value */
+ pszValue = (char*)json_object_get_string(keyjson);
+
+ if(pData->pszValue) {
+ /* value also given for count */
+ if(!strcmp(pszValue, pData->pszValue)) {
+ /* count for (value and key and appname) matched */
+ pData->valueCounter++;
+ json = json_object_new_int(pData->valueCounter);
+ }
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* value is not given, so we count for each value of given key */
+ pCounter = getCounter(pData->ht, pszValue);
+ if(pCounter) {
+ (*pCounter)++;
+ json = json_object_new_int(*pCounter);
+ }
+finalize_it:
+ if(estr) {
+ es_deleteStr(estr);
+ }
+
+ if(json) {
+ msgAddJSON(pMsg, (uchar *)JSON_COUNT_NAME, json);
+ }
+ENDdoAction
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char*) p, ":mmcount:", sizeof(":mmcount:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "mmcount supports only v6+ config format, use: "
+ "action(type=\"mmcount\" ...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("mmcount: module compiled with rsyslog version %s.\n", VERSION);
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ENDmodInit
diff --git a/plugins/mmfields/Makefile.am b/plugins/mmfields/Makefile.am
new file mode 100644
index 00000000..08170d52
--- /dev/null
+++ b/plugins/mmfields/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = mmfields.la
+
+mmfields_la_SOURCES = mmfields.c
+mmfields_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+mmfields_la_LDFLAGS = -module -avoid-version
+mmfields_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/mmfields/mmfields.c b/plugins/mmfields/mmfields.c
new file mode 100644
index 00000000..fa7fa100
--- /dev/null
+++ b/plugins/mmfields/mmfields.c
@@ -0,0 +1,274 @@
+/* mmfields.c
+ * Parse all fields of the message into structured data inside the
+ * JSON tree.
+ *
+ * Copyright 2013 Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdint.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("mmfields")
+
+
+DEFobjCurrIf(errmsg);
+DEF_OMOD_STATIC_DATA
+
+/* config variables */
+
+/* define operation modes we have */
+#define SIMPLE_MODE 0 /* just overwrite */
+#define REWRITE_MODE 1 /* rewrite IP address, canoninized */
+typedef struct _instanceData {
+ char separator;
+ uchar *jsonRoot; /**< container where to store fields */
+} instanceData;
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "separator", eCmdHdlrGetChar, 0 },
+ { "jsonroot", eCmdHdlrString, 0 }
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ free(pData->jsonRoot);
+ENDfreeInstance
+
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->separator = ',';
+ pData->jsonRoot = NULL;
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (mmfields)\n");
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "separator")) {
+ pData->separator = es_getBufAddr(pvals[i].val.d.estr)[0];
+ } else if(!strcmp(actpblk.descr[i].name, "jsonroot")) {
+ pData->jsonRoot = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("mmfields: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+ if(pData->jsonRoot == NULL) {
+ CHKmalloc(pData->jsonRoot = (uchar*) strdup("!"));
+ }
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+
+static inline rsRetVal
+extractField(instanceData *pData, uchar *msgtext, int lenMsg, int *curridx, uchar *fieldbuf)
+{
+ int i, j;
+ DEFiRet;
+ i = *curridx;
+ j = 0;
+ while(i < lenMsg && msgtext[i] != pData->separator) {
+ fieldbuf[j++] = msgtext[i++];
+ }
+ fieldbuf[j] = '\0';
+ if(i < lenMsg)
+ ++i;
+ *curridx = i;
+
+ RETiRet;
+}
+
+
+static inline rsRetVal
+parse_fields(instanceData *pData, msg_t *pMsg, uchar *msgtext, int lenMsg)
+{
+ uchar fieldbuf[32*1024];
+ uchar fieldname[512];
+ struct json_object *json;
+ struct json_object *jval;
+ int field;
+ uchar *buf;
+ int currIdx = 0;
+ DEFiRet;
+
+ if(lenMsg < (int) sizeof(fieldbuf)) {
+ buf = fieldbuf;
+ } else {
+ CHKmalloc(buf = malloc(lenMsg+1));
+ }
+
+ json = json_object_new_object();
+ if(json == NULL) {
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ field = 1;
+ while(currIdx < lenMsg) {
+ CHKiRet(extractField(pData, msgtext, lenMsg, &currIdx, buf));
+ DBGPRINTF("mmfields: field %d: '%s'\n", field, buf);
+ snprintf((char*)fieldname, sizeof(fieldname), "f%d", field);
+ fieldname[sizeof(fieldname)-1] = '\0';
+ jval = json_object_new_string((char*)fieldbuf);
+ json_object_object_add(json, (char*)fieldname, jval);
+ field++;
+ }
+ msgAddJSON(pMsg, pData->jsonRoot, json);
+finalize_it:
+ RETiRet;
+}
+
+
+BEGINdoAction
+ msg_t *pMsg;
+ uchar *msg;
+ int lenMsg;
+CODESTARTdoAction
+ pMsg = (msg_t*) ppString[0];
+ lenMsg = getMSGLen(pMsg);
+ msg = getMSG(pMsg);
+ CHKiRet(parse_fields(pData, pMsg, msg, lenMsg));
+finalize_it:
+ENDdoAction
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char*) p, ":mmfields:", sizeof(":mmfields:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "mmfields supports only v6+ config format, use: "
+ "action(type=\"mmfields\" ...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("mmfields: module compiled with rsyslog version %s.\n", VERSION);
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ENDmodInit
diff --git a/plugins/omprog/omprog.c b/plugins/omprog/omprog.c
index e425b428..7a6ef056 100644
--- a/plugins/omprog/omprog.c
+++ b/plugins/omprog/omprog.c
@@ -54,10 +54,12 @@ DEFobjCurrIf(errmsg)
typedef struct _instanceData {
uchar *szBinary; /* name of binary to call */
+ char **aParams; /* Optional Parameters for binary command */
uchar *tplName; /* assigned output template */
- pid_t pid; /* pid of currently running process */
- int fdPipe; /* file descriptor to write to */
+ pid_t pid; /* pid of currently running process */
+ int fdPipe; /* file descriptor to write to */
int bIsRunning; /* is binary currently running? 0-no, 1-yes */
+ int iParams; /* Holds the count of parameters if set*/
} instanceData;
typedef struct configSettings_s {
@@ -98,9 +100,16 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
+ int i;
CODESTARTfreeInstance
if(pData->szBinary != NULL)
free(pData->szBinary);
+ if(pData->aParams != NULL) {
+ for (i = 0; i < pData->iParams; i++) {
+ free(pData->aParams[i]);
+ }
+ free(pData->aParams);
+ }
ENDfreeInstance
@@ -120,9 +129,8 @@ ENDtryResume
static void execBinary(instanceData *pData, int fdStdin)
{
- int i;
+ int i, iRet;
struct sigaction sigAct;
- char *newargv[] = { NULL };
char *newenviron[] = { NULL };
assert(pData != NULL);
@@ -134,7 +142,7 @@ static void execBinary(instanceData *pData, int fdStdin)
* gets some more widespread use...
*/
}
- //fclose(stdout);
+ /*fclose(stdout);*/
/* we close all file handles as we fork soon
* Is there a better way to do this? - mail me! rgerhards@adiscon.com
@@ -154,11 +162,11 @@ static void execBinary(instanceData *pData, int fdStdin)
alarm(0);
/* finally exec child */
- execve((char*)pData->szBinary, newargv, newenviron);
- /* switch to?
- execlp((char*)program, (char*) program, (char*)arg, NULL);
- */
-
+ iRet = execve((char*)pData->szBinary, pData->aParams, newenviron);
+ if (iRet == -1) {
+ dbgprintf("omprog: failed to execute binary '%s' with return code: %d\n", pData->szBinary, errno);
+ }
+
/* we should never reach this point, but if we do, we terminate */
exit(1);
}
@@ -180,7 +188,7 @@ openPipe(instanceData *pData)
ABORT_FINALIZE(RS_RET_ERR_CREAT_PIPE);
}
- DBGPRINTF("executing program '%s'\n", pData->szBinary);
+ DBGPRINTF("omprog: executing program '%s' with '%d' parameters\n", pData->szBinary, pData->iParams);
/* NO OUTPUT AFTER FORK! */
@@ -198,7 +206,7 @@ openPipe(instanceData *pData)
/*NO CODE HERE - WILL NEVER BE REACHED!*/
}
- DBGPRINTF("child has pid %d\n", (int) cpid);
+ DBGPRINTF("omprog: child has pid %d\n", (int) cpid);
pData->fdPipe = pipefd[1];
pData->pid = cpid;
close(pipefd[0]);
@@ -223,11 +231,11 @@ cleanup(instanceData *pData)
ret = waitpid(pData->pid, &status, 0);
if(ret != pData->pid) {
/* if waitpid() fails, we can not do much - try to ignore it... */
- DBGPRINTF("waitpid() returned state %d[%s], future malfunction may happen\n", ret,
+ DBGPRINTF("omprog: waitpid() returned state %d[%s], future malfunction may happen\n", ret,
rs_strerror_r(errno, errStr, sizeof(errStr)));
} else {
/* check if we should print out some diagnostic information */
- DBGPRINTF("waitpid status return for program '%s': %2.2x\n",
+ DBGPRINTF("omprog: waitpid status return for program '%s': %2.2x\n",
pData->szBinary, status);
if(WIFEXITED(status)) {
errmsg.LogError(0, NO_ERRCODE, "program '%s' exited normally, state %d",
@@ -282,13 +290,13 @@ writePipe(instanceData *pData, uchar *szMsg)
if(lenWritten == -1) {
switch(errno) {
case EPIPE:
- DBGPRINTF("Program '%s' terminated, trying to restart\n",
+ DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n",
pData->szBinary);
CHKiRet(cleanup(pData));
CHKiRet(tryRestart(pData));
break;
default:
- DBGPRINTF("error %d writing to pipe: %s\n", errno,
+ DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno,
rs_strerror_r(errno, errStr, sizeof(errStr)));
ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE);
break;
@@ -321,13 +329,23 @@ static inline void
setInstParamDefaults(instanceData *pData)
{
pData->szBinary = NULL;
+ pData->aParams = NULL;
+ pData->iParams = 0;
pData->fdPipe = -1;
pData->bIsRunning = 0;
}
BEGINnewActInst
struct cnfparamvals *pvals;
+ sbool bInQuotes;
int i;
+ int iPrm;
+ unsigned char *c;
+ es_size_t iCnt;
+ es_size_t iStr;
+ es_str_t *estrBinary;
+ es_str_t *estrParams;
+ es_str_t *estrTmp;
CODESTARTnewActInst
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
@@ -341,23 +359,97 @@ CODESTARTnewActInst
if(!pvals[i].bUsed)
continue;
if(!strcmp(actpblk.descr[i].name, "binary")) {
- pData->szBinary = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ estrBinary = pvals[i].val.d.estr;
+ estrParams = NULL;
+
+ /* Search for space */
+ c = es_getBufAddr(pvals[i].val.d.estr);
+ iCnt = 0;
+ while(iCnt < es_strlen(pvals[i].val.d.estr) ) {
+ if (c[iCnt] == ' ') {
+ /* Split binary name from parameters */
+ estrBinary = es_newStrFromSubStr ( pvals[i].val.d.estr, 0, iCnt );
+ estrParams = es_newStrFromSubStr ( pvals[i].val.d.estr, iCnt+1, es_strlen(pvals[i].val.d.estr));
+ break;
+ }
+ iCnt++;
+ }
+ /* Assign binary and params */
+ pData->szBinary = (uchar*)es_str2cstr(estrBinary, NULL);
+ dbgprintf("omprog: szBinary = '%s'\n", pData->szBinary);
+ /* Check for Params! */
+ if (estrParams != NULL) {
+ dbgprintf("omprog: szParams = '%s'\n", es_str2cstr(estrParams, NULL) );
+
+ /* Count parameters if set */
+ c = es_getBufAddr(estrParams); /* Reset to beginning */
+ pData->iParams = 2; /* Set default to 2, first parameter for binary and second parameter at least from config*/
+ iCnt = 0;
+ while(iCnt < es_strlen(estrParams) ) {
+ if (c[iCnt] == ' ' && c[iCnt-1] != '\\')
+ pData->iParams++;
+ iCnt++;
+ }
+ dbgprintf("omprog: iParams = '%d'\n", pData->iParams);
+
+ /* Create argv Array */
+ CHKmalloc(pData->aParams = malloc( (pData->iParams+1) * sizeof(char*))); /* One more for first param */
+
+ /* Second Loop, create parameter array*/
+ c = es_getBufAddr(estrParams); /* Reset to beginning */
+ iCnt = iStr = iPrm = 0;
+ estrTmp = NULL;
+ bInQuotes = FALSE;
+ /* Set first parameter to binary */
+ pData->aParams[iPrm] = strdup((char*)pData->szBinary);
+ dbgprintf("omprog: Param (%d): '%s'\n", iPrm, pData->aParams[iPrm]);
+ iPrm++;
+ while(iCnt < es_strlen(estrParams) ) {
+ if ( c[iCnt] == ' ' && !bInQuotes ) {
+ /* Copy into Param Array! */
+ estrTmp = es_newStrFromSubStr( estrParams, iStr, iCnt-iStr);
+ }
+ else if ( iCnt+1 >= es_strlen(estrParams) ) {
+ /* Copy rest of string into Param Array! */
+ estrTmp = es_newStrFromSubStr( estrParams, iStr, iCnt-iStr+1);
+ }
+ else if (c[iCnt] == '"') {
+ /* switch inQuotes Mode */
+ bInQuotes = !bInQuotes;
+ }
+
+ if ( estrTmp != NULL ) {
+ pData->aParams[iPrm] = es_str2cstr(estrTmp, NULL);
+ iStr = iCnt+1; /* Set new start */
+ dbgprintf("omprog: Param (%d): '%s'\n", iPrm, pData->aParams[iPrm]);
+ es_deleteStr( estrTmp );
+ estrTmp = NULL;
+ iPrm++;
+ }
+
+ /*Next char*/
+ iCnt++;
+ }
+ /* NULL last parameter! */
+ pData->aParams[iPrm] = NULL;
+
+ }
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
- dbgprintf("omprog: program error, non-handled "
- "param '%s'\n", actpblk.descr[i].name);
+ dbgprintf("omprog: program error, non-handled param '%s'\n", actpblk.descr[i].name);
}
}
if(pData->tplName == NULL) {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*) "RSYSLOG_FileFormat",
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*) strdup("RSYSLOG_FileFormat"),
OMSR_NO_RQD_TPL_OPTS));
} else {
CHKiRet(OMSRsetEntry(*ppOMSR, 0,
(uchar*) strdup((char*) pData->tplName),
OMSR_NO_RQD_TPL_OPTS));
}
+
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c
index 3e355464..34511e46 100644
--- a/plugins/omrelp/omrelp.c
+++ b/plugins/omrelp/omrelp.c
@@ -55,6 +55,9 @@ DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
+#define DFLT_ENABLE_TLS 0
+#define DFLT_ENABLE_TLSZIP 0
+
static relpEngine_t *pRelpEngine; /* our relp engine */
typedef struct _instanceData {
@@ -62,9 +65,24 @@ typedef struct _instanceData {
uchar *port;
int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
+ int sizeWindow; /**< the RELP window size - 0=use default */
unsigned timeout;
+ unsigned rebindInterval;
+ unsigned nSent;
relpClt_t *pRelpClt; /* relp client for this instance */
+ sbool bEnableTLS;
+ sbool bEnableTLSZip;
+ sbool bHadAuthFail; /**< set on auth failure, will cause retry to disable action */
+ uchar *pristring; /* GnuTLS priority string (NULL if not to be provided) */
+ uchar *authmode;
+ uchar *caCertFile;
+ uchar *myCertFile;
+ uchar *myPrivKeyFile;
uchar *tplName;
+ struct {
+ int nmemb;
+ uchar **name;
+ } permittedPeers;
} instanceData;
typedef struct configSettings_s {
@@ -77,7 +95,17 @@ static configSettings_t __attribute__((unused)) cs;
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "target", eCmdHdlrGetWord, 1 },
+ { "tls", eCmdHdlrBinary, 0 },
+ { "tls.compression", eCmdHdlrBinary, 0 },
+ { "tls.prioritystring", eCmdHdlrString, 0 },
+ { "tls.cacert", eCmdHdlrString, 0 },
+ { "tls.mycert", eCmdHdlrString, 0 },
+ { "tls.myprivkey", eCmdHdlrString, 0 },
+ { "tls.authmode", eCmdHdlrString, 0 },
+ { "tls.permittedpeer", eCmdHdlrArray, 0 },
{ "port", eCmdHdlrGetWord, 0 },
+ { "rebindinterval", eCmdHdlrInt, 0 },
+ { "windowsize", eCmdHdlrInt, 0 },
{ "timeout", eCmdHdlrInt, 0 },
{ "template", eCmdHdlrGetWord, 0 }
};
@@ -104,32 +132,111 @@ static uchar *getRelpPt(instanceData *pData)
return(pData->port);
}
+static void
+onErr(void *pUsr, char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
+{
+ instanceData *pData = (instanceData*) pUsr;
+ errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: error '%s', object "
+ " '%s' - action may not work as intended",
+ pData->target, pData->port, errmesg, objinfo);
+}
+
+static void
+onGenericErr(char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
+{
+ errmsg.LogError(0, RS_RET_RELP_ERR, "omrelp: librelp error '%s', object "
+ "'%s' - action may not work as intended",
+ errmesg, objinfo);
+}
+
+static void
+onAuthErr(void *pUsr, char *authinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
+{
+ instanceData *pData = (instanceData*) pUsr;
+ errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: authentication error '%s', peer "
+ "is '%s' - DISABLING action", pData->target, pData->port, errmesg, authinfo);
+ pData->bHadAuthFail = 1;
+}
+
static inline rsRetVal
doCreateRelpClient(instanceData *pData)
{
+ int i;
DEFiRet;
if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetWindowSize(pData->pRelpClt, pData->sizeWindow) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetUsrPtr(pData->pRelpClt, pData) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(pData->bEnableTLS) {
+ if(relpCltEnableTLS(pData->pRelpClt) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(pData->bEnableTLSZip) {
+ if(relpCltEnableTLSZip(pData->pRelpClt) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ if(relpCltSetGnuTLSPriString(pData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetAuthMode(pData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) {
+ errmsg.LogError(0, RS_RET_RELP_ERR,
+ "omrelp: invalid auth mode '%s'\n", pData->authmode);
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ if(relpCltSetCACert(pData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetOwnCert(pData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetPrivKey(pData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ for(i = 0 ; i < pData->permittedPeers.nmemb ; ++i) {
+ relpCltAddPermittedPeer(pData->pRelpClt, (char*)pData->permittedPeers.name[i]);
+ }
+ }
+ if(glbl.GetSourceIPofLocalClient() == NULL) { /* ar Do we have a client IP set? */
+ if(relpCltSetClientIP(pData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ pData->bInitialConnect = 1;
+ pData->nSent = 0;
finalize_it:
RETiRet;
}
-
BEGINcreateInstance
CODESTARTcreateInstance
- pData->bInitialConnect = 1;
+ pData->sizeWindow = 0;
pData->timeout = 90;
+ pData->rebindInterval = 0;
+ pData->bEnableTLS = DFLT_ENABLE_TLS;
+ pData->bEnableTLSZip = DFLT_ENABLE_TLSZIP;
+ pData->bHadAuthFail = 0;
+ pData->pristring = NULL;
+ pData->authmode = NULL;
+ pData->caCertFile = NULL;
+ pData->myCertFile = NULL;
+ pData->myPrivKeyFile = NULL;
+ pData->permittedPeers.nmemb = 0;
ENDcreateInstance
BEGINfreeInstance
+ int i;
CODESTARTfreeInstance
if(pData->pRelpClt != NULL)
relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt);
free(pData->target);
free(pData->port);
free(pData->tplName);
+ free(pData->pristring);
+ free(pData->authmode);
+ free(pData->caCertFile);
+ free(pData->myCertFile);
+ free(pData->myPrivKeyFile);
+ for(i = 0 ; i < pData->permittedPeers.nmemb ; ++i) {
+ free(pData->permittedPeers.name[i]);
+ }
ENDfreeInstance
static inline void
@@ -139,12 +246,22 @@ setInstParamDefaults(instanceData *pData)
pData->port = NULL;
pData->tplName = NULL;
pData->timeout = 90;
+ pData->sizeWindow = 0;
+ pData->rebindInterval = 0;
+ pData->bEnableTLS = DFLT_ENABLE_TLS;
+ pData->bEnableTLSZip = DFLT_ENABLE_TLSZIP;
+ pData->pristring = NULL;
+ pData->authmode = NULL;
+ pData->caCertFile = NULL;
+ pData->myCertFile = NULL;
+ pData->myPrivKeyFile = NULL;
+ pData->permittedPeers.nmemb = 0;
}
BEGINnewActInst
struct cnfparamvals *pvals;
- int i;
+ int i,j;
CODESTARTnewActInst
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
@@ -164,6 +281,31 @@ CODESTARTnewActInst
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "timeout")) {
pData->timeout = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "rebindinterval")) {
+ pData->rebindInterval = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "windowsize")) {
+ pData->sizeWindow = (int) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls")) {
+ pData->bEnableTLS = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls.compression")) {
+ pData->bEnableTLSZip = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls.prioritystring")) {
+ pData->pristring = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) {
+ pData->caCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) {
+ pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) {
+ pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.authmode")) {
+ pData->authmode = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.permittedpeer")) {
+ pData->permittedPeers.nmemb = pvals[i].val.d.ar->nmemb;
+ CHKmalloc(pData->permittedPeers.name =
+ malloc(sizeof(uchar*) * pData->permittedPeers.nmemb));
+ for(j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) {
+ pData->permittedPeers.name[j] = (uchar*)es_str2cstr(pvals[i].val.d.ar->arr[j], NULL);
+ }
} else {
dbgprintf("omrelp: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
@@ -230,9 +372,34 @@ static rsRetVal doConnect(instanceData *pData)
BEGINtryResume
CODESTARTtryResume
+ if(pData->bHadAuthFail) {
+ ABORT_FINALIZE(RS_RET_DISABLE_ACTION);
+ }
iRet = doConnect(pData);
+finalize_it:
ENDtryResume
+static inline rsRetVal
+doRebind(instanceData *pData)
+{
+ DEFiRet;
+ DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n");
+ CHKiRet(relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt));
+ pData->bIsConnected = 0;
+ CHKiRet(doCreateRelpClient(pData));
+finalize_it:
+ RETiRet;
+}
+
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+dbgprintf("omrelp: beginTransaction\n");
+ if(!pData->bIsConnected) {
+ CHKiRet(doConnect(pData));
+ }
+ relpCltHintBurstBegin(pData->pRelpClt);
+finalize_it:
+ENDbeginTransaction
BEGINdoAction
uchar *pMsg; /* temporary buffering */
@@ -248,7 +415,7 @@ CODESTARTdoAction
pMsg = ppString[0];
lenMsg = strlen((char*) pMsg); /* TODO: don't we get this? */
- /* TODO: think about handling oversize messages! */
+ /* we need to truncate oversize msgs - no way around that... */
if((int) lenMsg > glbl.GetMaxLine())
lenMsg = glbl.GetMaxLine();
@@ -257,13 +424,33 @@ CODESTARTdoAction
if(ret != RELP_RET_OK) {
/* error! */
dbgprintf("error forwarding via relp, suspending\n");
- iRet = RS_RET_SUSPENDED;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ if(pData->rebindInterval != 0 &&
+ (++pData->nSent >= pData->rebindInterval)) {
+ doRebind(pData);
+ }
finalize_it:
+ if(pData->bHadAuthFail)
+ iRet = RS_RET_DISABLE_ACTION;
+ if(iRet == RS_RET_OK) {
+ /* we mimic non-commit, as otherwise our endTransaction handler
+ * will not get called. While this is not 100% correct, the worst
+ * that can happen is some message duplication, something that
+ * rsyslog generally accepts and prefers over message loss.
+ */
+ iRet = RS_RET_PREVIOUS_COMMITTED;
+ }
ENDdoAction
+BEGINendTransaction
+CODESTARTendTransaction
+ dbgprintf("omrelp: endTransaction\n");
+ relpCltHintBurstEnd(pData->pRelpClt);
+ENDendTransaction
+
BEGINparseSelectorAct
uchar *q;
int i;
@@ -329,7 +516,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
++p;
}
- /* TODO: make this if go away! */
if(*p == ';') {
*p = '\0'; /* trick to obtain hostname (later)! */
CHKmalloc(pData->target = ustrdup(q));
@@ -362,6 +548,7 @@ CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES
CODEqueryEtryPt_SetShutdownImmdtPtr
ENDqueryEtryPt
@@ -374,12 +561,12 @@ CODEmodInit_QueryRegCFSLineHdlr
/* create our relp engine */
CHKiRet(relpEngineConstruct(&pRelpEngine));
CHKiRet(relpEngineSetDbgprint(pRelpEngine, dbgprintf));
+ CHKiRet(relpEngineSetOnAuthErr(pRelpEngine, onAuthErr));
+ CHKiRet(relpEngineSetOnGenericErr(pRelpEngine, onGenericErr));
+ CHKiRet(relpEngineSetOnErr(pRelpEngine, onErr));
CHKiRet(relpEngineSetEnableCmd(pRelpEngine, (uchar*) "syslog", eRelpCmdState_Required));
/* tell which objects we need */
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
ENDmodInit
-
-/* vim:set ai:
- */