summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imptcp/imptcp.c179
-rw-r--r--plugins/imrelp/imrelp.c118
-rw-r--r--plugins/imuxsock/imuxsock.c4
-rw-r--r--plugins/mmcount/Makefile.am8
-rw-r--r--plugins/mmcount/mmcount.c342
-rw-r--r--plugins/mmfields/Makefile.am8
-rw-r--r--plugins/mmfields/mmfields.c265
-rw-r--r--plugins/omrelp/omrelp.c93
8 files changed, 993 insertions, 24 deletions
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 5c8bb67a..e9a20c1c 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -10,7 +10,7 @@
*
* File begun on 2010-08-10 by RGerhards
*
- * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -50,6 +50,8 @@
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/tcp.h>
+#include <stdint.h>
+#include <zlib.h>
#if HAVE_FCNTL_H
#include <fcntl.h>
#endif
@@ -93,6 +95,11 @@ static void * wrkr(void *myself);
#define DFLT_wrkrMax 2
+#define COMPRESS_NEVER 0
+#define COMPRESS_SINGLE_MSG 1 /* old, single-message compression */
+/* all other settings are for stream-compression */
+#define COMPRESS_STREAM_ALWAYS 2
+
/* config settings */
typedef struct configSettings_s {
int bKeepAlive; /* support keep-alive packets */
@@ -117,6 +124,7 @@ struct instanceConf_s {
int bEmitMsgOnClose;
int bSuppOctetFram; /* support octet-counted framing? */
int iAddtlFrameDelim;
+ uint8_t compressionMode;
uchar *pszBindPort; /* port to bind to */
uchar *pszBindAddr; /* IP to bind socket to */
uchar *pszBindRuleset; /* name of ruleset to bind to */
@@ -156,6 +164,7 @@ static struct cnfparamdescr inppdescr[] = {
{ "ruleset", eCmdHdlrString, 0 },
{ "supportoctetcountedframing", eCmdHdlrBinary, 0 },
{ "notifyonconnectionclose", eCmdHdlrBinary, 0 },
+ { "compression.mode", eCmdHdlrGetWord, 0 },
{ "keepalive", eCmdHdlrBinary, 0 },
{ "keepalive.probes", eCmdHdlrInt, 0 },
{ "keepalive.time", eCmdHdlrInt, 0 },
@@ -191,6 +200,7 @@ struct ptcpsrv_s {
int iKeepAliveIntvl;
int iKeepAliveProbes;
int iKeepAliveTime;
+ uint8_t compressionMode;
uchar *pszInputName;
prop_t *pInputName; /* InputName in (fast to process) property format */
ruleset_t *pRuleset;
@@ -207,11 +217,13 @@ struct ptcpsrv_s {
* includes support for doubly-linked list.
*/
struct ptcpsess_s {
-// ptcpsrv_t *pSrv; /* our server TODO: check remove! */
ptcplstn_t *pLstn; /* our listener */
ptcpsess_t *prev, *next;
int sock;
epolld_t *epd;
+ sbool bzInitDone; /* did we do an init of zstrm already? */
+ z_stream zstrm; /* zip stream to use for tcp compression */
+ uint8_t compressionMode;
//--- from tcps_sess.h
int iMsg; /* index of next char to store in msg */
int bAtStrtOfFram; /* are we at the very beginning of a new frame? */
@@ -239,6 +251,8 @@ struct ptcplstn_s {
sbool bSuppOctetFram;
epolld_t *epd;
statsobj_t *stats; /* listener stats */
+ intctr_t rcvdBytes;
+ intctr_t rcvdDecompressed;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
};
@@ -806,19 +820,19 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG
* EXTRACT from tcps_sess.c
*/
static rsRetVal
-DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
+DataRcvdUncompressed(ptcpsess_t *pThis, char *pData, size_t iLen, time_t ttGenTime)
{
multi_submit_t multiSub;
msg_t *pMsgs[CONF_NUM_MULTISUB];
struct syslogTime stTime;
- time_t ttGenTime;
char *pEnd;
DEFiRet;
assert(pData != NULL);
assert(iLen > 0);
- datetime.getCurrTime(&stTime, &ttGenTime);
+ if(ttGenTime == 0)
+ datetime.getCurrTime(&stTime, &ttGenTime);
multiSub.ppMsgs = pMsgs;
multiSub.maxElem = CONF_NUM_MULTISUB;
multiSub.nElem = 0;
@@ -836,6 +850,71 @@ finalize_it:
RETiRet;
}
+static rsRetVal
+DataRcvdCompressed(ptcpsess_t *pThis, char *buf, size_t len)
+{
+ struct syslogTime stTime;
+ time_t ttGenTime;
+ int zRet; /* zlib return state */
+ unsigned outavail;
+ uchar zipBuf[64*1024]; // TODO: alloc on heap, and much larger (512KiB? batch size!)
+ DEFiRet;
+ // TODO: can we do stats counters? Even if they are not 100% correct under all cases,
+ // by simply updating the input and output sizes?
+ uint64_t outtotal;
+
+ assert(iLen > 0);
+
+ datetime.getCurrTime(&stTime, &ttGenTime);
+ outtotal = 0;
+
+ if(!pThis->bzInitDone) {
+ /* allocate deflate state */
+ pThis->zstrm.zalloc = Z_NULL;
+ pThis->zstrm.zfree = Z_NULL;
+ pThis->zstrm.opaque = Z_NULL;
+ zRet = inflateInit(&pThis->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("imptcp: error %d returned from zlib/inflateInit()\n", zRet);
+ ABORT_FINALIZE(RS_RET_ZLIB_ERR);
+ }
+ pThis->bzInitDone = RSTRUE;
+ }
+
+ pThis->zstrm.next_in = (Bytef*) buf;
+ pThis->zstrm.avail_in = len;
+ /* run inflate() on buffer until everything has been uncompressed */
+ do {
+ DBGPRINTF("imptcp: in inflate() loop, avail_in %d, total_in %ld\n", pThis->zstrm.avail_in, pThis->zstrm.total_in);
+ pThis->zstrm.avail_out = sizeof(zipBuf);
+ pThis->zstrm.next_out = zipBuf;
+ zRet = inflate(&pThis->zstrm, Z_NO_FLUSH); /* no bad return value */
+ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pThis->zstrm.avail_out);
+ outavail = sizeof(zipBuf) - pThis->zstrm.avail_out;
+ if(outavail != 0) {
+ outtotal += outavail;
+ pThis->pLstn->rcvdDecompressed += outavail;
+ CHKiRet(DataRcvdUncompressed(pThis, (char*)zipBuf, outavail, ttGenTime));
+ }
+ } while (pThis->zstrm.avail_out == 0);
+
+ dbgprintf("end of DataRcvCompress, sizes: in %lld, out %llu\n", (long long) len, outtotal);
+finalize_it:
+ RETiRet;
+}
+
+static rsRetVal
+DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
+{
+ DEFiRet;
+ pThis->pLstn->rcvdBytes += iLen;
+ if(pThis->compressionMode >= COMPRESS_STREAM_ALWAYS)
+ iRet = DataRcvdCompressed(pThis, pData, iLen);
+ else
+ iRet = DataRcvdUncompressed(pThis, pData, iLen, 0);
+ RETiRet;
+}
+
/****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/
@@ -936,6 +1015,14 @@ addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6)
STATSCOUNTER_INIT(pLstn->ctrSubmit, pLstn->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(pLstn->ctrSubmit)));
+ /* the following counters are not protected by mutexes; we accept
+ * that they may not be 100% correct */
+ pLstn->rcvdBytes = 0,
+ pLstn->rcvdDecompressed = 0;
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.received"),
+ ctrType_IntCtr, &(pLstn->rcvdBytes)));
+ CHKiRet(statsobj.AddCounter(pLstn->stats, UCHAR_CONSTANT("bytes.decompressed"),
+ ctrType_IntCtr, &(pLstn->rcvdDecompressed)));
CHKiRet(statsobj.ConstructFinalize(pLstn->stats));
/* add to start of server's listener list */
@@ -948,6 +1035,7 @@ addLstn(ptcpsrv_t *pSrv, int sock, int isIPv6)
iRet = addEPollSock(epolld_lstn, pLstn, sock, &pLstn->epd);
finalize_it:
+dbgprintf("DDDD: addLstn return %d\n", iRet);
RETiRet;
}
@@ -971,6 +1059,7 @@ addSess(ptcplstn_t *pLstn, int sock, prop_t *peerName, prop_t *peerIP)
pSess->bAtStrtOfFram = 1;
pSess->peerName = peerName;
pSess->peerIP = peerIP;
+ pSess->compressionMode = pLstn->pSrv->compressionMode;
/* add to start of server's listener list */
pSess->prev = NULL;
@@ -988,6 +1077,44 @@ finalize_it:
}
+/* finish zlib buffer, to be called before closing the session.
+ */
+static rsRetVal
+doZipFinish(ptcpsess_t *pSess)
+{
+ int zRet; /* zlib return state */
+ DEFiRet;
+ unsigned outavail;
+ uchar zipBuf[32*1024]; // TODO: use "global" one from pSess
+
+ if(!pSess->bzInitDone)
+ goto done;
+
+ pSess->zstrm.avail_in = 0;
+ /* run inflate() on buffer until everything has been compressed */
+ do {
+ DBGPRINTF("doZipFinish: in inflate() loop, avail_in %d, total_in %ld\n", pSess->zstrm.avail_in, pSess->zstrm.total_in);
+ pSess->zstrm.avail_out = sizeof(zipBuf);
+ pSess->zstrm.next_out = zipBuf;
+ zRet = inflate(&pSess->zstrm, Z_FINISH); /* no bad return value */
+ DBGPRINTF("after inflate, ret %d, avail_out %d\n", zRet, pSess->zstrm.avail_out);
+ outavail = sizeof(zipBuf) - pSess->zstrm.avail_out;
+ if(outavail != 0) {
+ pSess->pLstn->rcvdDecompressed += outavail;
+ CHKiRet(DataRcvdUncompressed(pSess, (char*)zipBuf, outavail, 0)); // TODO: query time!
+ }
+ } while (pSess->zstrm.avail_out == 0);
+
+finalize_it:
+ zRet = inflateEnd(&pSess->zstrm);
+ if(zRet != Z_OK) {
+ DBGPRINTF("imptcp: error %d returned from zlib/inflateEnd()\n", zRet);
+ }
+
+ pSess->bzInitDone = 0;
+done: RETiRet;
+}
+
/* close/remove a session
* NOTE: we must first remove the fd from the epoll set and then close it -- else we
* get an error "bad file descriptor" from epoll.
@@ -998,6 +1125,9 @@ closeSess(ptcpsess_t *pSess)
int sock;
DEFiRet;
+ if(pSess->compressionMode >= COMPRESS_STREAM_ALWAYS)
+ doZipFinish(pSess);
+
sock = pSess->sock;
CHKiRet(removeEPollSock(sock, pSess->epd));
close(sock);
@@ -1048,6 +1178,7 @@ createInstance(instanceConf_t **pinst)
inst->pBindRuleset = NULL;
inst->ratelimitBurst = 10000; /* arbitrary high limit */
inst->ratelimitInterval = 0; /* off */
+ inst->compressionMode = COMPRESS_SINGLE_MSG;
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -1127,6 +1258,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
+ pSrv->compressionMode = inst->compressionMode;
CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort));
ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst);
ratelimitSetThreadSafe(pSrv->ratelimiter);
@@ -1269,6 +1401,10 @@ sessActivity(ptcpsess_t *pSess)
{
int lenRcv;
int lenBuf;
+ uchar *peerName;
+ int lenPeer;
+ int remsock = 0; /* init just to keep compiler happy... :-( */
+ sbool bEmitOnClose = 0;
char rcvBuf[128*1024];
DEFiRet;
@@ -1285,13 +1421,15 @@ sessActivity(ptcpsess_t *pSess)
} else if (lenRcv == 0) {
/* session was closed, do clean-up */
if(pSess->pLstn->pSrv->bEmitMsgOnClose) {
- uchar *peerName;
- int lenPeer;
- prop.GetString(pSess->peerName, &peerName, &lenPeer);
- errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "imptcp session %d closed by remote peer %s.\n",
- pSess->sock, peerName);
+ prop.GetString(pSess->peerName, &peerName, &lenPeer),
+ remsock = pSess->sock;
+ bEmitOnClose = 1;
+ }
+ CHKiRet(closeSess(pSess)); /* close may emit more messages in strmzip mode! */
+ if(bEmitOnClose) {
+ errmsg.LogError(0, RS_RET_PEER_CLOSED_CONN, "imptcp session %d closed by "
+ "remote peer %s.\n", remsock, peerName);
}
- CHKiRet(closeSess(pSess));
break;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK)
@@ -1415,6 +1553,7 @@ wrkr(void *myself)
BEGINnewInpInst
struct cnfparamvals *pvals;
instanceConf_t *inst;
+ char *cstr;
int i;
CODESTARTnewInpInst
DBGPRINTF("newInpInst (imptcp)\n");
@@ -1446,6 +1585,19 @@ CODESTARTnewInpInst
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "supportoctetcountedframing")) {
inst->bSuppOctetFram = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "compression.mode")) {
+ cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
+ if(!strcasecmp(cstr, "stream:always")) {
+ inst->compressionMode = COMPRESS_STREAM_ALWAYS;
+ } else if(!strcasecmp(cstr, "none")) {
+ inst->compressionMode = COMPRESS_NEVER;
+ } else {
+ errmsg.LogError(0, RS_RET_PARAM_ERROR, "omfwd: invalid value for 'compression.mode' "
+ "parameter (given is '%s')", cstr);
+ free(cstr);
+ ABORT_FINALIZE(RS_RET_PARAM_ERROR);
+ }
+ free(cstr);
} else if(!strcmp(inppblk.descr[i].name, "keepalive")) {
inst->bKeepAlive = (int) pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "keepalive.probes")) {
@@ -1653,6 +1805,7 @@ shutdownSrv(ptcpsrv_t *pSrv)
ptcplstn_t *pLstn, *lstnDel;
ptcpsess_t *pSess, *sessDel;
+dbgprintf("DDDD: enter shutdownSrv\n");
/* listeners */
pLstn = pSrv->pLstn;
while(pLstn != NULL) {
@@ -1661,7 +1814,9 @@ shutdownSrv(ptcpsrv_t *pSrv)
/* now unlink listner */
lstnDel = pLstn;
pLstn = pLstn->next;
- DBGPRINTF("imptcp shutdown listen socket %d\n", lstnDel->sock);
+ DBGPRINTF("imptcp shutdown listen socket %d (rcvd %lld bytes, "
+ "decompressed %lld)\n", lstnDel->sock, lstnDel->rcvdBytes,
+ lstnDel->rcvdDecompressed);
free(lstnDel->epd);
free(lstnDel);
}
diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c
index 5e0ae552..06abde26 100644
--- a/plugins/imrelp/imrelp.c
+++ b/plugins/imrelp/imrelp.c
@@ -4,7 +4,7 @@
*
* File begun on 2008-03-13 by RGerhards
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -74,6 +74,13 @@ static struct configSettings_s {
struct instanceConf_s {
uchar *pszBindPort; /* port to bind to */
+ sbool bEnableTLS;
+ sbool bEnableTLSZip;
+ int dhBits;
+ uchar *pristring; /* GnuTLS priority string (NULL if not to be provided) */
+ uchar *caCertFile;
+ uchar *myCertFile;
+ uchar *myPrivKeyFile;
struct instanceConf_s *next;
};
@@ -88,9 +95,26 @@ struct modConfData_s {
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */
+/* module-global parameters */
+static struct cnfparamdescr modpdescr[] = {
+ { "ruleset", eCmdHdlrGetWord, 0 },
+};
+static struct cnfparamblk modpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+ };
+
/* input instance parameters */
static struct cnfparamdescr inppdescr[] = {
- { "port", eCmdHdlrString, CNFPARAM_REQUIRED }
+ { "port", eCmdHdlrString, CNFPARAM_REQUIRED },
+ { "tls", eCmdHdlrBinary, 0 },
+ { "tls.dhbits", eCmdHdlrInt, 0 },
+ { "tls.prioritystring", eCmdHdlrString, 0 },
+ { "tls.cacert", eCmdHdlrString, 0 },
+ { "tls.mycert", eCmdHdlrString, 0 },
+ { "tls.myprivkey", eCmdHdlrString, 0 },
+ { "tls.compression", eCmdHdlrBinary, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -155,6 +179,10 @@ createInstance(instanceConf_t **pinst)
inst->next = NULL;
inst->pszBindPort = NULL;
+ inst->bEnableTLS = 0;
+ inst->bEnableTLSZip = 0;
+ inst->dhBits = 0;
+ inst->pristring = NULL;
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -179,7 +207,7 @@ std_checkRuleset_genErrMsg(modConfData_t *modConf, __attribute__((unused)) insta
}
-/* This function is called when a new listener instace shall be added to
+/* This function is called when a new listener instance shall be added to
* the current config object via the legacy config system. It just shuffles
* all parameters to the listener in-memory instance.
* rgerhards, 2011-05-04
@@ -204,6 +232,7 @@ finalize_it:
static rsRetVal
addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
{
+ relpSrv_t *pSrv;
DEFiRet;
if(pRelpEngine == NULL) {
CHKiRet(relpEngineConstruct(&pRelpEngine));
@@ -216,7 +245,25 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
}
}
- CHKiRet(relpEngineAddListner(pRelpEngine, inst->pszBindPort));
+ CHKiRet(relpEngineListnerConstruct(pRelpEngine, &pSrv));
+ CHKiRet(relpSrvSetLstnPort(pSrv, inst->pszBindPort));
+ if(inst->bEnableTLS) {
+ relpSrvEnableTLS(pSrv);
+ if(inst->bEnableTLSZip) {
+ relpSrvEnableTLSZip(pSrv);
+ }
+ if(inst->dhBits) {
+ relpSrvSetDHBits(pSrv, inst->dhBits);
+ }
+ relpSrvSetGnuTLSPriString(pSrv, (char*)inst->pristring);
+ if(relpSrvSetCACert(pSrv, (char*) inst->caCertFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpSrvSetOwnCert(pSrv, (char*) inst->myCertFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpSrvSetPrivKey(pSrv, (char*) inst->myPrivKeyFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ CHKiRet(relpEngineListnerConstructFinalize(pRelpEngine, pSrv));
finalize_it:
RETiRet;
@@ -249,6 +296,20 @@ CODESTARTnewInpInst
continue;
if(!strcmp(inppblk.descr[i].name, "port")) {
inst->pszBindPort = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls")) {
+ inst->bEnableTLS = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "tls.dhbits")) {
+ inst->dhBits = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "tls.prioritystring")) {
+ inst->pristring = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls.compression")) {
+ inst->bEnableTLSZip = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "tls.cacert")) {
+ inst->caCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls.mycert")) {
+ inst->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "tls.myprivkey")) {
+ inst->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
dbgprintf("imrelp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -264,19 +325,58 @@ BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
loadModConf = pModConf;
pModConf->pConf = pConf;
+ pModConf->pszBindRuleset = NULL;
+ pModConf->pBindRuleset = NULL;
/* init legacy config variables */
cs.pszBindRuleset = NULL;
ENDbeginCnfLoad
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module "
+ "config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("module (global) param blk for imrelp:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for(i = 0 ; i < modpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(modpblk.descr[i].name, "ruleset")) {
+ loadModConf->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("imrelp: program error, non-handled "
+ "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
+ }
+ }
+finalize_it:
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
BEGINendCnfLoad
CODESTARTendCnfLoad
- if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) {
- loadModConf->pszBindRuleset = NULL;
+ if(loadModConf->pszBindRuleset == NULL) {
+ if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) {
+ loadModConf->pszBindRuleset = NULL;
+ } else {
+ CHKmalloc(loadModConf->pszBindRuleset = ustrdup(cs.pszBindRuleset));
+ }
} else {
- CHKmalloc(loadModConf->pszBindRuleset = ustrdup(cs.pszBindRuleset));
+ if((cs.pszBindRuleset != NULL) && (cs.pszBindRuleset[0] != '\0')) {
+ errmsg.LogError(0, RS_RET_DUP_PARAM, "imrelp: warning: ruleset "
+ "set via legacy directive ignored");
+ }
}
- loadModConf->pBindRuleset = NULL;
finalize_it:
free(cs.pszBindRuleset);
loadModConf = NULL; /* done loading */
@@ -293,6 +393,7 @@ CODESTARTcheckCnf
if(pModConf->pszBindRuleset == NULL) {
pModConf->pBindRuleset = NULL;
} else {
+ DBGPRINTF("imrelp: using ruleset '%s'\n", pModConf->pszBindRuleset);
localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, pModConf->pszBindRuleset);
if(localRet == RS_RET_NOT_FOUND) {
std_checkRuleset_genErrMsg(pModConf, NULL);
@@ -420,6 +521,7 @@ CODEqueryEtryPt_STD_IMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_PREPRIVDROP_QUERIES
CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index c503852c..dad09ab4 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -1284,6 +1284,8 @@ BEGINactivateCnfPrePrivDrop
instanceConf_t *inst;
CODESTARTactivateCnfPrePrivDrop
runModConf = pModConf;
+ if(runModConf->bOmitLocalLogging && nfd == 1)
+ ABORT_FINALIZE(RS_RET_OK);
for(inst = runModConf->root ; inst != NULL ; inst = inst->next) {
addListner(inst);
}
@@ -1325,6 +1327,8 @@ BEGINrunInput
#endif
CODESTARTrunInput
+ if(runModConf->bOmitLocalLogging && nfd == 1)
+ ABORT_FINALIZE(RS_RET_OK);
/* this is an endless loop - it is terminated when the thread is
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
diff --git a/plugins/mmcount/Makefile.am b/plugins/mmcount/Makefile.am
new file mode 100644
index 00000000..9c8c99db
--- /dev/null
+++ b/plugins/mmcount/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = mmcount.la
+
+mmcount_la_SOURCES = mmcount.c
+mmcount_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+mmcount_la_LDFLAGS = -module -avoid-version
+mmcount_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/mmcount/mmcount.c b/plugins/mmcount/mmcount.c
new file mode 100644
index 00000000..56a4de55
--- /dev/null
+++ b/plugins/mmcount/mmcount.c
@@ -0,0 +1,342 @@
+/* mmcount.c
+ * count messages by priority or json property of given app-name.
+ *
+ * Copyright 2013 Red Hat Inc.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdint.h>
+#include <json/json.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "hashtable.h"
+
+#define JSON_COUNT_NAME "!mmcount"
+#define SEVERITY_COUNT 8
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("mmcount")
+
+
+DEFobjCurrIf(errmsg);
+DEF_OMOD_STATIC_DATA
+
+/* config variables */
+
+typedef struct _instanceData {
+ char *pszAppName;
+ int severity[SEVERITY_COUNT];
+ char *pszKey;
+ char *pszValue;
+ int valueCounter;
+ struct hashtable *ht;
+} instanceData;
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "appname", eCmdHdlrGetWord, 0 },
+ { "key", eCmdHdlrGetWord, 0 },
+ { "value", eCmdHdlrGetWord, 0 },
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ENDfreeInstance
+
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ int i;
+
+ pData->pszAppName = NULL;
+ for (i = 0; i < SEVERITY_COUNT; i++)
+ pData->severity[i] = 0;
+ pData->pszKey = NULL;
+ pData->pszValue = NULL;
+ pData->valueCounter = 0;
+ pData->ht = NULL;
+}
+
+static unsigned int
+hash_from_key_fn(void *k)
+{
+ return *(unsigned int *)k;
+}
+
+static int
+key_equals_fn(void *k1, void *k2)
+{
+ return (*(unsigned int *)k1 == *(unsigned int *)k2);
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (mmcount)\n");
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "appname")) {
+ pData->pszAppName = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "key")) {
+ pData->pszKey = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ if(!strcmp(actpblk.descr[i].name, "value")) {
+ pData->pszValue = es_str2cstr(pvals[i].val.d.estr, NULL);
+ continue;
+ }
+ dbgprintf("mmcount: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+
+ if(pData->pszAppName == NULL) {
+ dbgprintf("mmcount: action requires a appname");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(pData->pszKey != NULL && pData->pszValue == NULL) {
+ if(NULL == (pData->ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL))) {
+ DBGPRINTF("mmcount: error creating hash table!\n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+static int *
+getCounter(struct hashtable *ht, char *str) {
+ unsigned int key;
+ int *pCounter;
+ unsigned int *pKey;
+
+ /* we dont store str as key, instead we store hash of the str
+ as key to reduce memory usage */
+ key = hash_from_string(str);
+ pCounter = hashtable_search(ht, &key);
+ if(pCounter) {
+ return pCounter;
+ }
+
+ /* counter is not found for the str, so add new entry and
+ return the counter */
+ if(NULL == (pKey = (unsigned int*)malloc(sizeof(unsigned int)))) {
+ DBGPRINTF("mmcount: memory allocation for key failed\n");
+ return NULL;
+ }
+ *pKey = key;
+
+ if(NULL == (pCounter = (int*)malloc(sizeof(int)))) {
+ DBGPRINTF("mmcount: memory allocation for value failed\n");
+ free(pKey);
+ return NULL;
+ }
+ *pCounter = 0;
+
+ if(!hashtable_insert(ht, pKey, pCounter)) {
+ DBGPRINTF("mmcount: inserting element into hashtable failed\n");
+ free(pKey);
+ free(pCounter);
+ return NULL;
+ }
+ return pCounter;
+}
+
+BEGINdoAction
+ msg_t *pMsg;
+ char *appname;
+ struct json_object *json = NULL;
+ es_str_t *estr = NULL;
+ struct json_object *keyjson = NULL;
+ char *pszValue;
+ int *pCounter;
+CODESTARTdoAction
+ pMsg = (msg_t*) ppString[0];
+ appname = getAPPNAME(pMsg, LOCK_MUTEX);
+
+ if(0 != strcmp(appname, pData->pszAppName)) {
+ /* we are not working for this appname. nothing to do */
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ if(!pData->pszKey) {
+ /* no key given for count, so we count severity */
+ if(pMsg->iSeverity <= SEVERITY_COUNT) {
+ pData->severity[pMsg->iSeverity]++;
+ json = json_object_new_int(pData->severity[pMsg->iSeverity]);
+ }
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* key is given, so get the property json */
+ estr = es_newStrFromBuf(pData->pszKey, strlen(pData->pszKey));
+ if(msgGetCEEPropJSON(pMsg, estr, &keyjson) != RS_RET_OK) {
+ /* key not found in the message. nothing to do */
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* key found, so get the value */
+ pszValue = (char*)json_object_get_string(keyjson);
+
+ if(pData->pszValue) {
+ /* value also given for count */
+ if(!strcmp(pszValue, pData->pszValue)) {
+ /* count for (value and key and appname) matched */
+ pData->valueCounter++;
+ json = json_object_new_int(pData->valueCounter);
+ }
+ ABORT_FINALIZE(RS_RET_OK);
+ }
+
+ /* value is not given, so we count for each value of given key */
+ pCounter = getCounter(pData->ht, pszValue);
+ if(pCounter) {
+ (*pCounter)++;
+ json = json_object_new_int(*pCounter);
+ }
+finalize_it:
+ if(estr) {
+ es_deleteStr(estr);
+ }
+
+ if(json) {
+ msgAddJSON(pMsg, (uchar *)JSON_COUNT_NAME, json);
+ }
+ENDdoAction
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char*) p, ":mmcount:", sizeof(":mmcount:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "mmcount supports only v6+ config format, use: "
+ "action(type=\"mmcount\" ...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("mmcount: module compiled with rsyslog version %s.\n", VERSION);
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ENDmodInit
diff --git a/plugins/mmfields/Makefile.am b/plugins/mmfields/Makefile.am
new file mode 100644
index 00000000..08170d52
--- /dev/null
+++ b/plugins/mmfields/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = mmfields.la
+
+mmfields_la_SOURCES = mmfields.c
+mmfields_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+mmfields_la_LDFLAGS = -module -avoid-version
+mmfields_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/mmfields/mmfields.c b/plugins/mmfields/mmfields.c
new file mode 100644
index 00000000..99c78916
--- /dev/null
+++ b/plugins/mmfields/mmfields.c
@@ -0,0 +1,265 @@
+/* mmfields.c
+ * Parse all fields of the message into structured data inside the
+ * JSON tree.
+ *
+ * Copyright 2013 Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdint.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("mmfields")
+
+
+DEFobjCurrIf(errmsg);
+DEF_OMOD_STATIC_DATA
+
+/* config variables */
+
+/* define operation modes we have */
+#define SIMPLE_MODE 0 /* just overwrite */
+#define REWRITE_MODE 1 /* rewrite IP address, canoninized */
+typedef struct _instanceData {
+ char separator;
+} instanceData;
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "separator", eCmdHdlrGetChar, 0 }
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ENDfreeInstance
+
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->separator = ',';
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (mmfields)\n");
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "separator")) {
+ pData->separator = es_getBufAddr(pvals[i].val.d.estr)[0];
+ } else {
+ dbgprintf("mmfields: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+
+static inline rsRetVal
+extractField(instanceData *pData, uchar *msgtext, int lenMsg, int *curridx, uchar *fieldbuf)
+{
+ int i, j;
+ DEFiRet;
+ i = *curridx;
+ j = 0;
+ while(i < lenMsg && msgtext[i] != pData->separator) {
+ fieldbuf[j++] = msgtext[i++];
+ }
+ fieldbuf[j] = '\0';
+ if(i < lenMsg)
+ ++i;
+ *curridx = i;
+
+ RETiRet;
+}
+
+
+static inline rsRetVal
+parse_fields(instanceData *pData, msg_t *pMsg, uchar *msgtext, int lenMsg)
+{
+ uchar fieldbuf[32*1024];
+ uchar fieldname[512];
+ struct json_object *json;
+ struct json_object *jval;
+ int field;
+ uchar *buf;
+ int currIdx = 0;
+ DEFiRet;
+
+ if(lenMsg < (int) sizeof(fieldbuf)) {
+ buf = fieldbuf;
+ } else {
+ CHKmalloc(buf = malloc(lenMsg+1));
+ }
+
+ json = json_object_new_object();
+ if(json == NULL) {
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ field = 1;
+ while(currIdx < lenMsg) {
+ CHKiRet(extractField(pData, msgtext, lenMsg, &currIdx, buf));
+ DBGPRINTF("mmfields: field %d: '%s'\n", field, buf);
+ snprintf(fieldname, sizeof(fieldname), "f%d", (char*)field);
+ fieldname[sizeof(fieldname)-1] = '\0';
+ jval = json_object_new_string((char*)fieldbuf);
+ json_object_object_add(json, (char*)fieldname, jval);
+ field++;
+ }
+ msgAddJSON(pMsg, (uchar*)"!", json);
+finalize_it:
+ RETiRet;
+}
+
+
+BEGINdoAction
+ msg_t *pMsg;
+ uchar *msg;
+ int lenMsg;
+CODESTARTdoAction
+ pMsg = (msg_t*) ppString[0];
+ lenMsg = getMSGLen(pMsg);
+ msg = getMSG(pMsg);
+ CHKiRet(parse_fields(pData, pMsg, msg, lenMsg));
+finalize_it:
+ENDdoAction
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char*) p, ":mmfields:", sizeof(":mmfields:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "mmfields supports only v6+ config format, use: "
+ "action(type=\"mmfields\" ...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("mmfields: module compiled with rsyslog version %s.\n", VERSION);
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ENDmodInit
diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c
index ae65f40f..e0650c62 100644
--- a/plugins/omrelp/omrelp.c
+++ b/plugins/omrelp/omrelp.c
@@ -55,6 +55,9 @@ DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
+#define DFLT_ENABLE_TLS 0
+#define DFLT_ENABLE_TLSZIP 0
+
static relpEngine_t *pRelpEngine; /* our relp engine */
typedef struct _instanceData {
@@ -63,7 +66,15 @@ typedef struct _instanceData {
int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
unsigned timeout;
+ unsigned rebindInterval;
+ unsigned nSent;
relpClt_t *pRelpClt; /* relp client for this instance */
+ sbool bEnableTLS;
+ sbool bEnableTLSZip;
+ uchar *pristring; /* GnuTLS priority string (NULL if not to be provided) */
+ uchar *caCertFile;
+ uchar *myCertFile;
+ uchar *myPrivKeyFile;
uchar *tplName;
} instanceData;
@@ -77,7 +88,14 @@ static configSettings_t __attribute__((unused)) cs;
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "target", eCmdHdlrGetWord, 1 },
+ { "tls", eCmdHdlrBinary, 0 },
+ { "tls.compression", eCmdHdlrBinary, 0 },
+ { "tls.prioritystring", eCmdHdlrString, 0 },
+ { "tls.cacert", eCmdHdlrString, 0 },
+ { "tls.mycert", eCmdHdlrString, 0 },
+ { "tls.myprivkey", eCmdHdlrString, 0 },
{ "port", eCmdHdlrGetWord, 0 },
+ { "rebindinterval", eCmdHdlrInt, 0 },
{ "timeout", eCmdHdlrInt, 0 },
{ "template", eCmdHdlrGetWord, 1 }
};
@@ -112,6 +130,28 @@ doCreateRelpClient(instanceData *pData)
ABORT_FINALIZE(RS_RET_RELP_ERR);
if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(pData->bEnableTLS) {
+ if(relpCltEnableTLS(pData->pRelpClt) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(pData->bEnableTLSZip) {
+ if(relpCltEnableTLSZip(pData->pRelpClt) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ if(relpCltSetGnuTLSPriString(pData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetCACert(pData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetOwnCert(pData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetPrivKey(pData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ if(glbl.GetSourceIPofLocalClient() == NULL) { /* ar Do we have a client IP set? */
+ if(relpCltSetClientIP(pData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ pData->bInitialConnect = 1;
+ pData->nSent = 0;
finalize_it:
RETiRet;
}
@@ -119,8 +159,14 @@ finalize_it:
BEGINcreateInstance
CODESTARTcreateInstance
- pData->bInitialConnect = 1;
pData->timeout = 90;
+ pData->rebindInterval = 0;
+ pData->bEnableTLS = DFLT_ENABLE_TLS;
+ pData->bEnableTLSZip = DFLT_ENABLE_TLSZIP;
+ pData->pristring = NULL;
+ pData->caCertFile = NULL;
+ pData->myCertFile = NULL;
+ pData->myPrivKeyFile = NULL;
ENDcreateInstance
BEGINfreeInstance
@@ -130,6 +176,10 @@ CODESTARTfreeInstance
free(pData->target);
free(pData->port);
free(pData->tplName);
+ free(pData->pristring);
+ free(pData->caCertFile);
+ free(pData->myCertFile);
+ free(pData->myPrivKeyFile);
ENDfreeInstance
static inline void
@@ -139,6 +189,13 @@ setInstParamDefaults(instanceData *pData)
pData->port = NULL;
pData->tplName = NULL;
pData->timeout = 90;
+ pData->rebindInterval = 0;
+ pData->bEnableTLS = DFLT_ENABLE_TLS;
+ pData->bEnableTLSZip = DFLT_ENABLE_TLSZIP;
+ pData->pristring = NULL;
+ pData->caCertFile = NULL;
+ pData->myCertFile = NULL;
+ pData->myPrivKeyFile = NULL;
}
@@ -164,6 +221,20 @@ CODESTARTnewActInst
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "timeout")) {
pData->timeout = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "rebindinterval")) {
+ pData->rebindInterval = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls")) {
+ pData->bEnableTLS = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls.compression")) {
+ pData->bEnableTLSZip = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls.prioritystring")) {
+ pData->pristring = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) {
+ pData->caCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) {
+ pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) {
+ pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
dbgprintf("omrelp: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
@@ -232,6 +303,17 @@ CODESTARTtryResume
iRet = doConnect(pData);
ENDtryResume
+static inline rsRetVal
+doRebind(instanceData *pData)
+{
+ DEFiRet;
+ DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n");
+ CHKiRet(relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt));
+ pData->bIsConnected = 0;
+ CHKiRet(doCreateRelpClient(pData));
+finalize_it:
+ RETiRet;
+}
BEGINdoAction
uchar *pMsg; /* temporary buffering */
@@ -247,7 +329,7 @@ CODESTARTdoAction
pMsg = ppString[0];
lenMsg = strlen((char*) pMsg); /* TODO: don't we get this? */
- /* TODO: think about handling oversize messages! */
+ /* we need to truncate oversize msgs - no way around that... */
if((int) lenMsg > glbl.GetMaxLine())
lenMsg = glbl.GetMaxLine();
@@ -256,9 +338,13 @@ CODESTARTdoAction
if(ret != RELP_RET_OK) {
/* error! */
dbgprintf("error forwarding via relp, suspending\n");
- iRet = RS_RET_SUSPENDED;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ if(pData->rebindInterval != 0 &&
+ (++pData->nSent >= pData->rebindInterval)) {
+ doRebind(pData);
+ }
finalize_it:
ENDdoAction
@@ -328,7 +414,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
++p;
}
- /* TODO: make this if go away! */
if(*p == ';') {
*p = '\0'; /* trick to obtain hostname (later)! */
CHKmalloc(pData->target = ustrdup(q));