summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imdiag/imdiag.c10
-rw-r--r--plugins/imfile/imfile.c25
-rw-r--r--plugins/imklog/bsd.c5
-rw-r--r--plugins/imklog/imklog.c8
-rw-r--r--plugins/imklog/imklog.h2
-rw-r--r--plugins/impstats/impstats.c79
-rw-r--r--plugins/imptcp/imptcp.c36
-rw-r--r--plugins/imrelp/imrelp.c25
-rw-r--r--plugins/imtcp/imtcp.c25
-rw-r--r--plugins/imudp/imudp.c82
-rw-r--r--plugins/imuxsock/imuxsock.c205
-rw-r--r--plugins/mmanon/Makefile.am8
-rw-r--r--plugins/mmanon/mmanon.c401
-rw-r--r--plugins/mmjsonparse/mmjsonparse.c74
-rw-r--r--plugins/mmnormalize/mmnormalize.c180
-rw-r--r--plugins/omelasticsearch/Makefile.am3
-rw-r--r--plugins/omelasticsearch/README17
-rw-r--r--plugins/omelasticsearch/cJSON/README247
-rw-r--r--plugins/omelasticsearch/cJSON/cjson.c514
-rw-r--r--plugins/omelasticsearch/cJSON/cjson.h130
-rw-r--r--plugins/omelasticsearch/cJSON/test.c156
-rw-r--r--plugins/omelasticsearch/cJSON/tests/test122
-rw-r--r--plugins/omelasticsearch/cJSON/tests/test211
-rw-r--r--plugins/omelasticsearch/cJSON/tests/test326
-rw-r--r--plugins/omelasticsearch/cJSON/tests/test488
-rw-r--r--plugins/omelasticsearch/cJSON/tests/test527
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c260
-rw-r--r--plugins/omjournal/Makefile.am8
-rw-r--r--plugins/omjournal/omjournal.c185
-rw-r--r--plugins/omlibdbi/omlibdbi.c202
-rw-r--r--plugins/ommongodb/ommongodb.c10
-rw-r--r--plugins/ommysql/ommysql.c32
-rw-r--r--plugins/omrelp/omrelp.c146
-rw-r--r--plugins/omruleset/omruleset.c10
-rw-r--r--plugins/omstdout/omstdout.c11
-rw-r--r--plugins/omudpspoof/omudpspoof.c312
36 files changed, 3131 insertions, 451 deletions
diff --git a/plugins/imdiag/imdiag.c b/plugins/imdiag/imdiag.c
index 640c9e1b..5fdc6ef1 100644
--- a/plugins/imdiag/imdiag.c
+++ b/plugins/imdiag/imdiag.c
@@ -53,6 +53,7 @@
#include "srUtils.h"
#include "msg.h"
#include "datetime.h"
+#include "ratelimit.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
MODULE_TYPE_INPUT
@@ -199,7 +200,7 @@ finalize_it:
/* actually submit a message to the rsyslog core
*/
static rsRetVal
-doInjectMsg(int iNum)
+doInjectMsg(int iNum, ratelimit_t *ratelimiter)
{
uchar szMsg[1024];
msg_t *pMsg;
@@ -219,7 +220,7 @@ doInjectMsg(int iNum)
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
MsgSetRcvFrom(pMsg, pRcvDummy);
CHKiRet(MsgSetRcvFromIP(pMsg, pRcvIPDummy));
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(ratelimitAddMsg(ratelimiter, NULL, pMsg));
finalize_it:
RETiRet;
@@ -237,6 +238,7 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess)
int iFrom;
int nMsgs;
int i;
+ ratelimit_t *ratelimit;
DEFiRet;
/* we do not check errors here! */
@@ -244,13 +246,15 @@ injectMsg(uchar *pszCmd, tcps_sess_t *pSess)
iFrom = atoi((char*)wordBuf);
getFirstWord(&pszCmd, wordBuf, sizeof(wordBuf)/sizeof(uchar), TO_LOWERCASE);
nMsgs = atoi((char*)wordBuf);
+ ratelimitNew(&ratelimit, "imdiag", "injectmsg");
for(i = 0 ; i < nMsgs ; ++i) {
- doInjectMsg(i + iFrom);
+ doInjectMsg(i + iFrom, ratelimit);
}
CHKiRet(sendResponse(pSess, "%d messages injected\n", nMsgs));
DBGPRINTF("imdiag: %d messages injected\n", nMsgs);
+ ratelimitDestruct(ratelimit);
finalize_it:
RETiRet;
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 188d692b..349acead 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -48,6 +48,7 @@
#include "prop.h"
#include "stringbuf.h"
#include "ruleset.h"
+#include "ratelimit.h"
MODULE_TYPE_INPUT /* must be present for input modules, do not remove */
MODULE_TYPE_NOKEEP
@@ -82,6 +83,7 @@ typedef struct fileInfo_s {
strm_t *pStrm; /* its stream (NULL if not assigned) */
int readMode; /* which mode to use in ReadMulteLine call? */
ruleset_t *pRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ ratelimit_t *ratelimiter;
multi_submit_t multiSub;
} fileInfo_t;
@@ -189,9 +191,7 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine)
pMsg->iFacility = LOG_FAC(pInfo->iFacility);
pMsg->iSeverity = LOG_PRI(pInfo->iSeverity);
MsgSetRuleset(pMsg, pInfo->pRuleset);
- pInfo->multiSub.ppMsgs[pInfo->multiSub.nElem++] = pMsg;
- if(pInfo->multiSub.nElem == pInfo->multiSub.maxElem)
- CHKiRet(multiSubmitMsg(&pInfo->multiSub));
+ ratelimitAddMsg(pInfo->ratelimiter, &pInfo->multiSub, pMsg);
finalize_it:
RETiRet;
}
@@ -235,6 +235,7 @@ openFile(fileInfo_t *pThis)
/* read back in the object */
CHKiRet(obj.Deserialize(&pThis->pStrm, (uchar*) "strm", psSF, NULL, pThis));
+ strm.CheckFileChange(pThis->pStrm);
CHKiRet(strm.SeekCurrOffs(pThis->pStrm));
/* note: we do not delete the state file, so that the last position remains
@@ -246,6 +247,8 @@ finalize_it:
strm.Destruct(&psSF);
if(iRet != RS_RET_OK) {
+ if(pThis->pStrm != NULL)
+ strm.Destruct(&pThis->pStrm);
CHKiRet(strm.Construct(&pThis->pStrm));
CHKiRet(strm.SettOperationsMode(pThis->pStrm, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->pStrm, STREAMTYPE_FILE_MONITOR));
@@ -304,10 +307,7 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData)
}
finalize_it:
- if(pThis->multiSub.nElem > 0) {
- /* submit everything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&pThis->multiSub));
- }
+ multiSubmitFlush(&pThis->multiSub);
pthread_cleanup_pop(0);
if(pCStr != NULL) {
@@ -415,6 +415,7 @@ addListner(instanceConf_t *inst)
pThis->lenTag = ustrlen(pThis->pszTag);
pThis->pszStateFile = (uchar*) strdup((char*) inst->pszStateFile);
+ CHKiRet(ratelimitNew(&pThis->ratelimiter, "imfile", (char*)inst->pszFileName));
CHKmalloc(pThis->multiSub.ppMsgs = MALLOC(inst->nMultiSub * sizeof(msg_t*)));
pThis->multiSub.maxElem = inst->nMultiSub;
pThis->multiSub.nElem = 0;
@@ -738,12 +739,20 @@ persistStrmState(fileInfo_t *pInfo)
CHKiRet(strm.ConstructFinalize(psSF));
CHKiRet(strm.Serialize(pInfo->pStrm, psSF));
+ CHKiRet(strm.Flush(psSF));
CHKiRet(strm.Destruct(&psSF));
finalize_it:
if(psSF != NULL)
strm.Destruct(&psSF);
+
+ if(iRet != RS_RET_OK) {
+ errmsg.LogError(0, iRet, "imfile: could not persist state "
+ "file %s - data may be repeated on next "
+ "startup. Is WorkDirectory set?",
+ pInfo->pszStateFile);
+ }
RETiRet;
}
@@ -765,6 +774,8 @@ CODESTARTafterRun
persistStrmState(&files[i]);
strm.Destruct(&(files[i].pStrm));
}
+ ratelimitDestruct(files[i].ratelimiter);
+ free(files[i].multiSub.ppMsgs);
free(files[i].pszFileName);
free(files[i].pszTag);
free(files[i].pszStateFile);
diff --git a/plugins/imklog/bsd.c b/plugins/imklog/bsd.c
index cddc6737..9c2eebb2 100644
--- a/plugins/imklog/bsd.c
+++ b/plugins/imklog/bsd.c
@@ -58,9 +58,6 @@ static int fklog = -1; /* kernel log fd */
#ifdef OS_LINUX
/* submit a message to imklog Syslog() API. In this function, we check if
* a kernel timestamp is present and, if so, extract and strip it.
- * Note: this is an extra processing step. We should revisit the whole
- * idea in v6 and remove all that old stuff that we do not longer need
- * (like symbol resolution). <-- TODO
* Note that this is heavily Linux specific and thus is not compiled or
* used for BSD.
* Special thanks to Lennart Poettering for suggesting on how to convert
@@ -175,7 +172,7 @@ klogWillRun(modConfData_t *pModConf)
fklog = open((char*)GetPath(pModConf), O_RDONLY, 0);
if (fklog < 0) {
- imklogLogIntMsg(RS_RET_ERR_OPEN_KLOG, "imklog: cannot open kernel log(%s): %s.",
+ imklogLogIntMsg(LOG_ERR, "imklog: cannot open kernel log(%s): %s.",
GetPath(pModConf), rs_strerror_r(errno, errmsg, sizeof(errmsg)));
ABORT_FINALIZE(RS_RET_ERR_OPEN_KLOG);
}
diff --git a/plugins/imklog/imklog.c b/plugins/imklog/imklog.c
index a24fc63b..810ac264 100644
--- a/plugins/imklog/imklog.c
+++ b/plugins/imklog/imklog.c
@@ -95,7 +95,7 @@ static struct cnfparamdescr modpdescr[] = {
{ "permitnonkernelfacility", eCmdHdlrBinary, 0 },
{ "consoleloglevel", eCmdHdlrInt, 0 },
{ "parsekerneltimestamp", eCmdHdlrBinary, 0 },
- { "keepkerneltimestamp", eCmdHdlrBinary, 0 },
+ { "keepkerneltimestamp", eCmdHdlrBinary, 0 },
{ "internalmsgfacility", eCmdHdlrFacility, 0 }
};
static struct cnfparamblk modpblk =
@@ -105,7 +105,7 @@ static struct cnfparamblk modpblk =
};
static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */
-static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */
+static prop_t *pLocalHostIP = NULL;
static inline void
initConfigSettings(void)
@@ -150,7 +150,8 @@ enqMsg(uchar *msg, uchar* pszTag, int iFacility, int iSeverity, struct timeval *
MsgSetTAG(pMsg, pszTag, ustrlen(pszTag));
pMsg->iFacility = iFacility;
pMsg->iSeverity = iSeverity;
- CHKiRet(submitMsg(pMsg));
+ /* note: we do NOT use rate-limiting, as the kernel itself does rate-limiting */
+ CHKiRet(submitMsg2(pMsg));
finalize_it:
RETiRet;
@@ -294,6 +295,7 @@ CODESTARTbeginCnfLoad
pModConf->bParseKernelStamp = 0;
pModConf->bKeepKernelStamp = 0;
pModConf->console_log_level = -1;
+ pModConf->bKeepKernelStamp = 0;
pModConf->iFacilIntMsg = klogFacilIntMsg();
loadModConf->configSetViaV2Method = 0;
bLegacyCnfModGlobalsPermitted = 1;
diff --git a/plugins/imklog/imklog.h b/plugins/imklog/imklog.h
index fa517ccc..1cf9b05a 100644
--- a/plugins/imklog/imklog.h
+++ b/plugins/imklog/imklog.h
@@ -35,9 +35,9 @@ struct modConfData_s {
int iFacilIntMsg;
uchar *pszPath;
int console_log_level;
- sbool bPermitNonKernel;
sbool bParseKernelStamp;
sbool bKeepKernelStamp;
+ sbool bPermitNonKernel;
sbool configSetViaV2Method;
};
diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c
index 62599969..cdd205fd 100644
--- a/plugins/impstats/impstats.c
+++ b/plugins/impstats/impstats.c
@@ -27,6 +27,8 @@
#include <signal.h>
#include <string.h>
#include <pthread.h>
+#include <fcntl.h>
+#include <sys/uio.h>
#include "dirty.h"
#include "cfsysline.h"
#include "module-template.h"
@@ -63,11 +65,14 @@ typedef struct configSettings_s {
} configSettings_t;
struct modConfData_s {
- rsconf_t *pConf; /* our overall config object */
+ rsconf_t *pConf; /* our overall config object */
int iStatsInterval;
int iFacility;
int iSeverity;
+ int logfd; /* fd if logging to file, or -1 if closed */
statsFmtType_t statsFmt;
+ sbool bLogToSyslog;
+ char *logfile;
sbool configSetViaV2Method;
};
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
@@ -82,6 +87,8 @@ static struct cnfparamdescr modpdescr[] = {
{ "interval", eCmdHdlrInt, 0 },
{ "facility", eCmdHdlrInt, 0 },
{ "severity", eCmdHdlrInt, 0 },
+ { "log.syslog", eCmdHdlrBinary, 0 },
+ { "log.file", eCmdHdlrGetWord, 0 },
{ "format", eCmdHdlrGetWord, 0 }
};
static struct cnfparamblk modpblk =
@@ -120,7 +127,7 @@ initConfigSettings(void)
/* actually submit a message to the rsyslog core
*/
-static inline rsRetVal
+static inline void
doSubmitMsg(uchar *line)
{
msg_t *pMsg;
@@ -138,11 +145,55 @@ doSubmitMsg(uchar *line)
pMsg->iSeverity = runModConf->iSeverity;
pMsg->msgFlags = 0;
- submitMsg(pMsg);
+ /* we do not use rate-limiting, as the stats message always need to be emitted */
+ submitMsg2(pMsg);
+ DBGPRINTF("impstats: submit [%d,%d] msg '%s'\n", runModConf->iFacility,
+ runModConf->iSeverity, line);
finalize_it:
- RETiRet;
+ return;
+}
+
+
+/* log stats message to file; limited error handling done */
+static inline void
+doLogToFile(cstr_t *cstr)
+{
+ struct iovec iov[4];
+ ssize_t nwritten;
+ ssize_t nexpect;
+ time_t t;
+ char timebuf[32];
+
+ if(cstrLen(cstr) == 0)
+ goto done;
+ if(runModConf->logfd == -1) {
+ runModConf->logfd = open(runModConf->logfile, O_WRONLY|O_CREAT|O_APPEND|O_CLOEXEC, S_IRUSR|S_IWUSR);
+ if(runModConf->logfd == -1) {
+ dbgprintf("error opening stats file %s\n", runModConf->logfile);
+ goto done;
+ }
+ }
+ time(&t);
+ iov[0].iov_base = ctime_r(&t, timebuf);
+ iov[0].iov_len = nexpect = strlen(iov[0].iov_base) - 1; /* -1: strip \n */
+ iov[1].iov_base = ": ";
+ iov[1].iov_len = 2;
+ nexpect += 2;
+ iov[2].iov_base = rsCStrGetSzStrNoNULL(cstr);
+ iov[2].iov_len = (size_t) cstrLen(cstr);
+ nexpect += cstrLen(cstr);
+ iov[3].iov_base = "\n";
+ iov[3].iov_len = 1;
+ nexpect++;
+ nwritten = writev(runModConf->logfd, iov, 4);
+
+ if(nwritten != nexpect) {
+ dbgprintf("error writing stats file %s, nwritten %lld, expected %lld\n",
+ runModConf->logfile, (long long) nwritten, (long long) nexpect);
+ }
+done: return;
}
@@ -153,7 +204,10 @@ static rsRetVal
doStatsLine(void __attribute__((unused)) *usrptr, cstr_t *cstr)
{
DEFiRet;
- doSubmitMsg(rsCStrGetSzStrNoNULL(cstr));
+ if(runModConf->bLogToSyslog)
+ doSubmitMsg(rsCStrGetSzStrNoNULL(cstr));
+ if(runModConf->logfile != NULL)
+ doLogToFile(cstr);
RETiRet;
}
@@ -178,6 +232,9 @@ CODESTARTbeginCnfLoad
loadModConf->iFacility = DEFAULT_FACILITY;
loadModConf->iSeverity = DEFAULT_SEVERITY;
loadModConf->statsFmt = statsFmt_Legacy;
+ loadModConf->logfd = -1;
+ loadModConf->logfile = NULL;
+ loadModConf->bLogToSyslog = 1;
bLegacyCnfModGlobalsPermitted = 1;
/* init legacy config vars */
initConfigSettings();
@@ -210,6 +267,10 @@ CODESTARTsetModCnf
loadModConf->iFacility = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "severity")) {
loadModConf->iSeverity = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "log.syslog")) {
+ loadModConf->bLogToSyslog = (sbool) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "log.file")) {
+ loadModConf->logfile = es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(modpblk.descr[i].name, "format")) {
mode = es_str2cstr(pvals[i].val.d.estr, NULL);
if(!strcasecmp(mode, "json")) {
@@ -270,7 +331,9 @@ BEGINactivateCnf
rsRetVal localRet;
CODESTARTactivateCnf
runModConf = pModConf;
- DBGPRINTF("impstats: stats interval %d seconds\n", runModConf->iStatsInterval);
+ DBGPRINTF("impstats: stats interval %d seconds, logToSyslog %d, logFile %s\n",
+ runModConf->iStatsInterval, runModConf->bLogToSyslog,
+ runModConf->logfile == NULL ? "deactivated" : (char*)runModConf->logfile);
localRet = statsobj.EnableStats();
if(localRet != RS_RET_OK) {
errmsg.LogError(0, localRet, "impstats: error enabling statistics gathering");
@@ -282,6 +345,9 @@ ENDactivateCnf
BEGINfreeCnf
CODESTARTfreeCnf
+ if(runModConf->logfd != -1)
+ close(runModConf->logfd);
+ free(runModConf->logfile);
ENDfreeCnf
@@ -297,6 +363,7 @@ CODESTARTrunInput
if(glbl.GetGlobalInputTermState() == 1)
break; /* terminate input! */
+ DBGPRINTF("impstats: woke up, generating messages\n");
generateStatsMsgs();
}
ENDrunInput
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 9888086f..5c8bb67a 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -67,6 +67,7 @@
#include "ruleset.h"
#include "msg.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "net.h" /* for permittedPeers, may be removed when this is removed */
/* the define is from tcpsrv.h, we need to find a new (but easier!!!) abstraction layer some time ... */
@@ -121,6 +122,8 @@ struct instanceConf_s {
uchar *pszBindRuleset; /* name of ruleset to bind to */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int ratelimitInterval;
+ int ratelimitBurst;
struct instanceConf_s *next;
};
@@ -158,6 +161,8 @@ static struct cnfparamdescr inppdescr[] = {
{ "keepalive.time", eCmdHdlrInt, 0 },
{ "keepalive.interval", eCmdHdlrInt, 0 },
{ "addtlframedelimiter", eCmdHdlrInt, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -195,6 +200,7 @@ struct ptcpsrv_s {
sbool bKeepAlive; /* support keep-alive packets */
sbool bEmitMsgOnClose;
sbool bSuppOctetFram;
+ ratelimit_t *ratelimiter;
};
/* the ptcp session object. Describes a single active session.
@@ -295,6 +301,7 @@ destructSess(ptcpsess_t *pSess)
static void
destructSrv(ptcpsrv_t *pSrv)
{
+ ratelimitDestruct(pSrv->ratelimiter);
prop.Destruct(&pSrv->pInputName);
pthread_mutex_destroy(&pSrv->mutSessLst);
free(pSrv->pszInputName);
@@ -679,14 +686,7 @@ doSubmitMsg(ptcpsess_t *pThis, struct syslogTime *stTime, time_t ttGenTime, mult
MsgSetRuleset(pMsg, pSrv->pRuleset);
STATSCOUNTER_INC(pThis->pLstn->ctrSubmit, pThis->pLstn->mutCtrSubmit);
- if(pMultiSub == NULL) {
- CHKiRet(submitMsg(pMsg));
- } else {
- pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
- if(pMultiSub->nElem == pMultiSub->maxElem)
- CHKiRet(multiSubmitMsg(pMultiSub));
- }
-
+ ratelimitAddMsg(pSrv->ratelimiter, pMultiSub, pMsg);
finalize_it:
/* reset status variables */
@@ -805,12 +805,11 @@ processDataRcvd(ptcpsess_t *pThis, char c, struct syslogTime *stTime, time_t ttG
* we have just received a bunch of data! -- rgerhards, 2009-06-16
* EXTRACT from tcps_sess.c
*/
-#define NUM_MULTISUB 1024
static rsRetVal
DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
{
multi_submit_t multiSub;
- msg_t *pMsgs[NUM_MULTISUB];
+ msg_t *pMsgs[CONF_NUM_MULTISUB];
struct syslogTime stTime;
time_t ttGenTime;
char *pEnd;
@@ -821,7 +820,7 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
datetime.getCurrTime(&stTime, &ttGenTime);
multiSub.ppMsgs = pMsgs;
- multiSub.maxElem = NUM_MULTISUB;
+ multiSub.maxElem = CONF_NUM_MULTISUB;
multiSub.nElem = 0;
/* We now copy the message to the session buffer. */
@@ -831,15 +830,11 @@ DataRcvd(ptcpsess_t *pThis, char *pData, size_t iLen)
CHKiRet(processDataRcvd(pThis, *pData++, &stTime, ttGenTime, &multiSub));
}
- if(multiSub.nElem > 0) {
- /* submit anything that was not yet submitted */
- CHKiRet(multiSubmitMsg(&multiSub));
- }
+ iRet = multiSubmitFlush(&multiSub);
finalize_it:
RETiRet;
}
-#undef NUM_MULTISUB
/****************************************** --END-- TCP SUPPORT FUNCTIONS ***********************************/
@@ -1051,6 +1046,8 @@ createInstance(instanceConf_t **pinst)
inst->bEmitMsgOnClose = 0;
inst->iAddtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
inst->pBindRuleset = NULL;
+ inst->ratelimitBurst = 10000; /* arbitrary high limit */
+ inst->ratelimitInterval = 0; /* off */
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -1130,6 +1127,9 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
pSrv->iKeepAliveProbes = inst->iKeepAliveProbes;
pSrv->iKeepAliveTime = inst->iKeepAliveTime;
pSrv->bEmitMsgOnClose = inst->bEmitMsgOnClose;
+ CHKiRet(ratelimitNew(&pSrv->ratelimiter, "imtcp", (char*)inst->pszBindPort));
+ ratelimitSetLinuxLike(pSrv->ratelimiter, inst->ratelimitInterval, inst->ratelimitBurst);
+ ratelimitSetThreadSafe(pSrv->ratelimiter);
CHKmalloc(pSrv->port = ustrdup(inst->pszBindPort));
pSrv->iAddtlFrameDelim = inst->iAddtlFrameDelim;
if(inst->pszBindAddr == NULL)
@@ -1458,6 +1458,10 @@ CODESTARTnewInpInst
inst->iAddtlFrameDelim = (int) pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "notifyonconnectionclose")) {
inst->bEmitMsgOnClose = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imptcp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
diff --git a/plugins/imrelp/imrelp.c b/plugins/imrelp/imrelp.c
index 5ee3b4b9..5e0ae552 100644
--- a/plugins/imrelp/imrelp.c
+++ b/plugins/imrelp/imrelp.c
@@ -113,11 +113,29 @@ static struct cnfparamblk inppblk =
* we will only see the hostname (twice). -- rgerhards, 2009-10-14
*/
static relpRetVal
-onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *pMsg, size_t lenMsg)
+onSyslogRcv(uchar *pHostname, uchar *pIP, uchar *msg, size_t lenMsg)
{
+ prop_t *pProp = NULL;
+ msg_t *pMsg;
DEFiRet;
- parseAndSubmitMessage(pHostname, pIP, pMsg, lenMsg, PARSE_HOSTNAME,
- eFLOWCTL_LIGHT_DELAY, pInputName, NULL, 0, runModConf->pBindRuleset);
+
+ CHKiRet(msgConstruct(&pMsg));
+ MsgSetInputName(pMsg, pInputName);
+ MsgSetRawMsg(pMsg, (char*)msg, lenMsg);
+ MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
+ MsgSetRuleset(pMsg, runModConf->pBindRuleset);
+ pMsg->msgFlags = PARSE_HOSTNAME | NEEDS_PARSING;
+
+ /* TODO: optimize this, we can store it inside the session, requires
+ * changes to librelp --> next librelp iteration?. rgerhards, 2012-10-29
+ */
+ MsgSetRcvFromStr(pMsg, pHostname, ustrlen(pHostname), &pProp);
+ CHKiRet(prop.Destruct(&pProp));
+ CHKiRet(MsgSetRcvFromIPStr(pMsg, pIP, ustrlen(pIP), &pProp));
+ CHKiRet(prop.Destruct(&pProp));
+ CHKiRet(submitMsg2(pMsg));
+
+finalize_it:
RETiRet;
}
@@ -190,6 +208,7 @@ addListner(modConfData_t __attribute__((unused)) *modConf, instanceConf_t *inst)
if(pRelpEngine == NULL) {
CHKiRet(relpEngineConstruct(&pRelpEngine));
CHKiRet(relpEngineSetDbgprint(pRelpEngine, dbgprintf));
+ CHKiRet(relpEngineSetFamily(pRelpEngine, glbl.GetDefPFFamily()));
CHKiRet(relpEngineSetEnableCmd(pRelpEngine, (uchar*) "syslog", eRelpCmdState_Required));
CHKiRet(relpEngineSetSyslogRcv(pRelpEngine, onSyslogRcv));
if (!glbl.GetDisableDNS()) {
diff --git a/plugins/imtcp/imtcp.c b/plugins/imtcp/imtcp.c
index 0cecb704..fc22d452 100644
--- a/plugins/imtcp/imtcp.c
+++ b/plugins/imtcp/imtcp.c
@@ -105,6 +105,8 @@ struct instanceConf_s {
uchar *pszBindRuleset; /* name of ruleset to bind to */
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
uchar *pszInputName; /* value for inputname property, NULL is OK and handled by core engine */
+ int ratelimitInterval;
+ int ratelimitBurst;
int bSuppOctetFram;
struct instanceConf_s *next;
};
@@ -155,7 +157,9 @@ static struct cnfparamdescr inppdescr[] = {
{ "port", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */
{ "name", eCmdHdlrString, 0 },
{ "ruleset", eCmdHdlrString, 0 },
- { "supportOctetCountedFraming", eCmdHdlrBinary, 0 }
+ { "supportOctetCountedFraming", eCmdHdlrBinary, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -251,6 +255,8 @@ createInstance(instanceConf_t **pinst)
inst->pszBindRuleset = NULL;
inst->pszInputName = NULL;
inst->bSuppOctetFram = 1;
+ inst->ratelimitInterval = 0;
+ inst->ratelimitBurst = 10000;
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -334,6 +340,7 @@ addListner(modConfData_t *modConf, instanceConf_t *inst)
CHKiRet(tcpsrv.SetRuleset(pOurTcpsrv, inst->pBindRuleset));
CHKiRet(tcpsrv.SetInputName(pOurTcpsrv, inst->pszInputName == NULL ?
UCHAR_CONSTANT("imtcp") : inst->pszInputName));
+ CHKiRet(tcpsrv.SetLinuxLikeRatelimiters(pOurTcpsrv, inst->ratelimitInterval, inst->ratelimitBurst));
tcpsrv.configureTCPListen(pOurTcpsrv, inst->pszBindPort, inst->bSuppOctetFram);
finalize_it:
@@ -376,6 +383,10 @@ CODESTARTnewInpInst
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "supportOctetCountedFraming")) {
inst->bSuppOctetFram = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imtcp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -487,10 +498,10 @@ CODESTARTendCnfLoad
loadModConf->pszStrmDrvrAuthMode = NULL;
} else {
loadModConf->pszStrmDrvrAuthMode = cs.pszStrmDrvrAuthMode;
+ cs.pszStrmDrvrAuthMode = NULL;
}
}
- if((cs.pszStrmDrvrAuthMode == NULL) || (cs.pszStrmDrvrAuthMode[0] == '\0'))
- free(cs.pszStrmDrvrAuthMode);
+ free(cs.pszStrmDrvrAuthMode);
cs.pszStrmDrvrAuthMode = NULL;
loadModConf = NULL; /* done loading */
@@ -550,6 +561,7 @@ ENDactivateCnf
BEGINfreeCnf
instanceConf_t *inst, *del;
CODESTARTfreeCnf
+ free(pModConf->pszStrmDrvrAuthMode);
if(pModConf->permittedPeers != NULL) {
cnfarrayContentDestruct(pModConf->permittedPeers);
free(pModConf->permittedPeers);
@@ -580,7 +592,9 @@ ENDwillRun
BEGINafterRun
CODESTARTafterRun
- /* do cleanup here */
+ if(pOurTcpsrv != NULL)
+ iRet = tcpsrv.Destruct(&pOurTcpsrv);
+
net.clearAllowedSenders(UCHAR_CONSTANT("TCP"));
ENDafterRun
@@ -594,9 +608,6 @@ ENDisCompatibleWithFeature
BEGINmodExit
CODESTARTmodExit
- if(pOurTcpsrv != NULL)
- iRet = tcpsrv.Destruct(&pOurTcpsrv);
-
if(pPermPeersRoot != NULL) {
net.DestructPermittedPeers(&pPermPeersRoot);
}
diff --git a/plugins/imudp/imudp.c b/plugins/imudp/imudp.c
index 06e1471d..7bf1473a 100644
--- a/plugins/imudp/imudp.c
+++ b/plugins/imudp/imudp.c
@@ -4,8 +4,6 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c)
- *
* Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
@@ -53,6 +51,7 @@
#include "prop.h"
#include "ruleset.h"
#include "statsobj.h"
+#include "ratelimit.h"
#include "unicode-helper.h"
MODULE_TYPE_INPUT
@@ -76,7 +75,9 @@ static struct lstn_s {
struct lstn_s *next;
int sock; /* socket */
ruleset_t *pRuleset; /* bound ruleset */
+ prop_t *pInputName;
statsobj_t *stats; /* listener stats */
+ ratelimit_t *ratelimiter;
STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
} *lcnfRoot = NULL, *lcnfLast = NULL;
@@ -91,7 +92,6 @@ static uchar *pRcvBuf = NULL; /* receive buffer (for a single packet). We use a
* it so that we can check available memory in willRun() and request
* termination if we can not get it. -- rgerhards, 2007-12-27
*/
-static prop_t *pInputName = NULL; /* our inputName currently is always "imudp", and this will hold it */
#define TIME_REQUERY_DFLT 2
#define SCHED_PRIO_UNSET -12345678 /* a value that indicates that the scheduling priority has not been set */
@@ -108,8 +108,12 @@ struct instanceConf_s {
uchar *pszBindAddr; /* IP to bind socket to */
uchar *pszBindPort; /* Port to bind socket to */
uchar *pszBindRuleset; /* name of ruleset to bind to */
+ uchar *inputname;
ruleset_t *pBindRuleset; /* ruleset to bind listener to (use system default if unspecified) */
+ int ratelimitInterval;
+ int ratelimitBurst;
struct instanceConf_s *next;
+ sbool bAppendPortToInpname;
};
struct modConfData_s {
@@ -139,8 +143,12 @@ static struct cnfparamblk modpblk =
/* input instance parameters */
static struct cnfparamdescr inppdescr[] = {
{ "port", eCmdHdlrArray, CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */
+ { "inputname", eCmdHdlrGetWord, 0 },
+ { "inputname.appendport", eCmdHdlrBinary, 0 },
{ "address", eCmdHdlrString, 0 },
- { "ruleset", eCmdHdlrString, 0 }
+ { "ruleset", eCmdHdlrString, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
@@ -165,6 +173,10 @@ createInstance(instanceConf_t **pinst)
inst->pszBindPort = NULL;
inst->pszBindAddr = NULL;
inst->pszBindRuleset = NULL;
+ inst->inputname = NULL;
+ inst->bAppendPortToInpname = 0;
+ inst->ratelimitBurst = 10000; /* arbitrary high limit */
+ inst->ratelimitInterval = 0; /* off */
/* node created, let's add to config */
if(loadModConf->tail == NULL) {
@@ -223,7 +235,8 @@ addListner(instanceConf_t *inst)
struct lstn_s *newlcnfinfo;
uchar *bindName;
uchar *port;
- uchar statname[64];
+ uchar dispname[64], inpnameBuf[128];
+ uchar *inputname;
/* check which address to bind to. We could do this more compact, but have not
* done so in order to make the code more readable. -- rgerhards, 2007-12-27
@@ -248,11 +261,29 @@ addListner(instanceConf_t *inst)
newlcnfinfo->next = NULL;
newlcnfinfo->sock = newSocks[iSrc];
newlcnfinfo->pRuleset = inst->pBindRuleset;
+ snprintf((char*)dispname, sizeof(dispname), "imudp(%s:%s)", bindName, port);
+ dispname[sizeof(dispname)-1] = '\0'; /* just to be on the save side... */
+ CHKiRet(ratelimitNew(&newlcnfinfo->ratelimiter, (char*)dispname, NULL));
+ if(inst->inputname == NULL) {
+ inputname = (uchar*)"imudp";
+ } else {
+ inputname = inst->inputname;
+ }
+ if(inst->bAppendPortToInpname) {
+ snprintf((char*)inpnameBuf, sizeof(inpnameBuf), "%s%s",
+ inputname, port);
+ inpnameBuf[sizeof(inpnameBuf)-1] = '\0';
+ inputname = inpnameBuf;
+ }
+ CHKiRet(prop.Construct(&newlcnfinfo->pInputName));
+ CHKiRet(prop.SetString(newlcnfinfo->pInputName,
+ inputname, ustrlen(inputname)));
+ CHKiRet(prop.ConstructFinalize(newlcnfinfo->pInputName));
+ ratelimitSetLinuxLike(newlcnfinfo->ratelimiter, inst->ratelimitInterval,
+ inst->ratelimitBurst);
/* support statistics gathering */
CHKiRet(statsobj.Construct(&(newlcnfinfo->stats)));
- snprintf((char*)statname, sizeof(statname), "imudp(%s:%s)", bindName, port);
- statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
- CHKiRet(statsobj.SetName(newlcnfinfo->stats, statname));
+ CHKiRet(statsobj.SetName(newlcnfinfo->stats, dispname));
STATSCOUNTER_INIT(newlcnfinfo->ctrSubmit, newlcnfinfo->mutCtrSubmit);
CHKiRet(statsobj.AddCounter(newlcnfinfo->stats, UCHAR_CONSTANT("submitted"),
ctrType_IntCtr, &(newlcnfinfo->ctrSubmit)));
@@ -304,7 +335,6 @@ std_checkRuleset_genErrMsg(__attribute__((unused)) modConfData_t *modConf, insta
static inline rsRetVal
processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *frominetPrev, int *pbIsPermitted)
{
- DEFiRet;
int iNbrTimeUsed;
time_t ttGenTime;
struct syslogTime stTime;
@@ -314,9 +344,15 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
msg_t *pMsg;
prop_t *propFromHost = NULL;
prop_t *propFromHostIP = NULL;
+ multi_submit_t multiSub;
+ msg_t *pMsgs[CONF_NUM_MULTISUB];
char errStr[1024];
+ DEFiRet;
assert(pThrd != NULL);
+ multiSub.ppMsgs = pMsgs;
+ multiSub.maxElem = CONF_NUM_MULTISUB;
+ multiSub.nElem = 0;
iNbrTimeUsed = 0;
while(1) { /* loop is terminated if we have a bad receive, done below in the body */
if(pThrd->bShallStop == RSTRUE)
@@ -367,7 +403,7 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
*pbIsPermitted = 1; /* no check -> everything permitted */
}
- DBGPRINTF("recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
+ DBGPRINTF("imudp:recv(%d,%d),acl:%d,msg:%s\n", lstn->sock, (int) lenRcvBuf, *pbIsPermitted, pRcvBuf);
if(*pbIsPermitted != 0) {
if((runModConf->iTimeRequery == 0) || (iNbrTimeUsed++ % runModConf->iTimeRequery) == 0) {
@@ -376,19 +412,22 @@ processSocket(thrdInfo_t *pThrd, struct lstn_s *lstn, struct sockaddr_storage *f
/* we now create our own message object and submit it to the queue */
CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
MsgSetRawMsg(pMsg, (char*)pRcvBuf, lenRcvBuf);
- MsgSetInputName(pMsg, pInputName);
+ MsgSetInputName(pMsg, lstn->pInputName);
MsgSetRuleset(pMsg, lstn->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME | NEEDS_DNSRESOL;
if(*pbIsPermitted == 2)
pMsg->msgFlags |= NEEDS_ACLCHK_U; /* request ACL check after resolution */
CHKiRet(msgSetFromSockinfo(pMsg, &frominet));
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(ratelimitAddMsg(lstn->ratelimiter, &multiSub, pMsg));
STATSCOUNTER_INC(lstn->ctrSubmit, lstn->mutCtrSubmit);
}
}
+
finalize_it:
+ multiSubmitFlush(&multiSub);
+
if(propFromHost != NULL)
prop.Destruct(&propFromHost);
if(propFromHostIP != NULL)
@@ -678,10 +717,18 @@ createListner(es_str_t *port, struct cnfparamvals *pvals)
continue;
if(!strcmp(inppblk.descr[i].name, "port")) {
continue; /* array, handled by caller */
+ } else if(!strcmp(inppblk.descr[i].name, "inputname")) {
+ inst->inputname = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "inputname.appendport")) {
+ inst->bAppendPortToInpname = (int) pvals[i].val.d.n;
} else if(!strcmp(inppblk.descr[i].name, "address")) {
inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "ruleset")) {
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
+ inst->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(inppblk.descr[i].name, "ratelimit.interval")) {
+ inst->ratelimitInterval = (int) pvals[i].val.d.n;
} else {
dbgprintf("imudp: program error, non-handled "
"param '%s'\n", inppblk.descr[i].name);
@@ -852,6 +899,7 @@ CODESTARTfreeCnf
free(inst->pszBindPort);
free(inst->pszBindAddr);
free(inst->pBindRuleset);
+ free(inst->inputname);
del = inst;
inst = inst->next;
free(del);
@@ -892,7 +940,9 @@ CODESTARTafterRun
net.clearAllowedSenders((uchar*)"UDP");
for(lstn = lcnfRoot ; lstn != NULL ; ) {
statsobj.Destruct(&(lstn->stats));
+ ratelimitDestruct(lstn->ratelimiter);
close(lstn->sock);
+ prop.Destruct(&lstn->pInputName);
lstnDel = lstn;
lstn = lstn->next;
free(lstnDel);
@@ -907,9 +957,6 @@ ENDafterRun
BEGINmodExit
CODESTARTmodExit
- if(pInputName != NULL)
- prop.Destruct(&pInputName);
-
/* release what we no longer need */
objRelease(errmsg, CORE_COMPONENT);
objRelease(glbl, CORE_COMPONENT);
@@ -964,11 +1011,6 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(ruleset, CORE_COMPONENT));
CHKiRet(objUse(net, LM_NET_FILENAME));
- /* we need to create the inputName property (only once during our lifetime) */
- CHKiRet(prop.Construct(&pInputName));
- CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imudp"), sizeof("imudp") - 1));
- CHKiRet(prop.ConstructFinalize(pInputName));
-
/* register config file handlers */
CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputudpserverbindruleset", 0, eCmdHdlrGetWord,
NULL, &cs.pszBindRuleset, STD_LOADABLE_MODULE_ID));
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 1a2b696e..c503852c 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-12-20 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -55,6 +55,7 @@
#include "statsobj.h"
#include "datetime.h"
#include "hashtable.h"
+#include "ratelimit.h"
MODULE_TYPE_INPUT
MODULE_TYPE_NOKEEP
@@ -105,15 +106,6 @@ STATSCOUNTER_DEF(ctrSubmit, mutCtrSubmit)
STATSCOUNTER_DEF(ctrLostRatelimit, mutCtrLostRatelimit)
STATSCOUNTER_DEF(ctrNumRatelimiters, mutCtrNumRatelimiters)
-struct rs_ratelimit_state {
- unsigned short interval;
- unsigned short burst;
- unsigned done;
- unsigned missed;
- time_t begin;
-};
-typedef struct rs_ratelimit_state rs_ratelimit_state_t;
-
/* a very simple "hash function" for process IDs - we simply use the
* pid itself: it is quite expected that all pids may log some time, but
@@ -143,6 +135,7 @@ typedef struct lstn_s {
int flowCtl; /* flow control settings for this socket */
int ratelimitInterval;
int ratelimitBurst;
+ ratelimit_t *dflt_ratelimiter;/*ratelimiter to apply if none else is to be used */
intTiny ratelimitSev; /* severity level (and below) for which rate-limiting shall apply */
struct hashtable *ht; /* our hashtable for rate-limiting */
sbool bParseHost; /* should parser parse host name? read-only after startup */
@@ -151,7 +144,9 @@ typedef struct lstn_s {
sbool bAnnotate; /* annotate events with trusted properties */
sbool bParseTrusted; /* parse trusted properties */
sbool bWritePid; /* write original PID into tag */
+ sbool bDiscardOwnMsgs; /* discard messages that originated from ourselves */
sbool bUseSysTimeStamp; /* use timestamp from system (instead of from message) */
+ sbool bUnlink; /* unlink&re-create socket at start and end of processing */
} lstn_t;
static lstn_t listeners[MAXFUNIX];
@@ -206,6 +201,8 @@ struct instanceConf_s {
int ratelimitSeverity;
int bAnnotate; /* annotate trusted properties */
int bParseTrusted; /* parse trusted properties */
+ sbool bDiscardOwnMsgs; /* discard messages that originated from our own pid? */
+ sbool bUnlink;
struct instanceConf_s *next;
};
@@ -223,7 +220,9 @@ struct modConfData_s {
sbool bOmitLocalLogging;
sbool bWritePidSysSock;
sbool bUseSysTimeStamp;
+ sbool bDiscardOwnMsgs;
sbool configSetViaV2Method;
+ sbool bUnlink;
};
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current load process */
@@ -232,7 +231,9 @@ static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current lo
static struct cnfparamdescr modpdescr[] = {
{ "syssock.use", eCmdHdlrBinary, 0 },
{ "syssock.name", eCmdHdlrGetWord, 0 },
+ { "syssock.unlink", eCmdHdlrBinary, 0 },
{ "syssock.ignoretimestamp", eCmdHdlrBinary, 0 },
+ { "syssock.ignoreownmessages", eCmdHdlrBinary, 0 },
{ "syssock.flowcontrol", eCmdHdlrBinary, 0 },
{ "syssock.usesystimestamp", eCmdHdlrBinary, 0 },
{ "syssock.annotate", eCmdHdlrBinary, 0 },
@@ -251,8 +252,10 @@ static struct cnfparamblk modpblk =
/* input instance parameters */
static struct cnfparamdescr inppdescr[] = {
{ "socket", eCmdHdlrString, CNFPARAM_REQUIRED }, /* legacy: addunixlistensocket */
+ { "unlink", eCmdHdlrBinary, 0 },
{ "createpath", eCmdHdlrBinary, 0 },
{ "parsetrusted", eCmdHdlrBinary, 0 },
+ { "ignoreownmessages", eCmdHdlrBinary, 0 },
{ "hostname", eCmdHdlrString, 0 },
{ "ignoretimestamp", eCmdHdlrBinary, 0 },
{ "flowcontrol", eCmdHdlrBinary, 0 },
@@ -272,74 +275,9 @@ static struct cnfparamblk inppblk =
/* we do not use this, because we do not bind to a ruleset so far
* enable when this is changed: #include "im-helper.h" */ /* must be included AFTER the type definitions! */
-static void
-initRatelimitState(struct rs_ratelimit_state *rs, unsigned short interval, unsigned short burst)
-{
- rs->interval = interval;
- rs->burst = burst;
- rs->done = 0;
- rs->missed = 0;
- rs->begin = 0;
-}
-
static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
-/* ratelimiting support, modelled after the linux kernel
- * returns 1 if message is within rate limit and shall be
- * processed, 0 otherwise.
- * This implementation is NOT THREAD-SAFE and must not
- * be called concurrently.
- */
-static inline int
-withinRatelimit(struct rs_ratelimit_state *rs, time_t tt, pid_t pid)
-{
- int ret;
- uchar msgbuf[1024];
-
- if(rs->interval == 0) {
- ret = 1;
- goto finalize_it;
- }
-
- assert(rs->burst != 0);
-
- if(rs->begin == 0)
- rs->begin = tt;
-
- /* resume if we go out of out time window */
- if(tt > rs->begin + rs->interval) {
- if(rs->missed) {
- snprintf((char*)msgbuf, sizeof(msgbuf),
- "imuxsock lost %u messages from pid %lu due to rate-limiting",
- rs->missed, (unsigned long) pid);
- logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
- rs->missed = 0;
- }
- rs->begin = 0;
- rs->done = 0;
- }
-
- /* do actual limit check */
- if(rs->burst > rs->done) {
- rs->done++;
- ret = 1;
- } else {
- if(rs->missed == 0) {
- snprintf((char*)msgbuf, sizeof(msgbuf),
- "imuxsock begins to drop messages from pid %lu due to rate-limiting",
- (unsigned long) pid);
- logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
- }
- rs->missed++;
- ret = 0;
- }
-
-finalize_it:
- return ret;
-}
-
-
/* create input instance, set default paramters, and
* add it to the list of instances.
*/
@@ -352,7 +290,7 @@ createInstance(instanceConf_t **pinst)
inst->sockName = NULL;
inst->pLogHostName = NULL;
inst->ratelimitInterval = DFLT_ratelimitInterval;
- inst->ratelimitBurst = DFLT_ratelimitSeverity;
+ inst->ratelimitBurst = DFLT_ratelimitBurst;
inst->ratelimitSeverity = DFLT_ratelimitSeverity;
inst->bUseFlowCtl = 0;
inst->bIgnoreTimestamp = 1;
@@ -361,6 +299,8 @@ createInstance(instanceConf_t **pinst)
inst->bWritePid = 0;
inst->bAnnotate = 0;
inst->bParseTrusted = 0;
+ inst->bDiscardOwnMsgs = 1;
+ inst->bUnlink = 1;
inst->next = NULL;
/* node created, let's add to config */
@@ -446,7 +386,8 @@ addListner(instanceConf_t *inst)
CHKiRet(prop.ConstructFinalize(listeners[nfd].hostName));
}
if(inst->ratelimitInterval > 0) {
- if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn, NULL)) == NULL) {
+ if((listeners[nfd].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn,
+ (void(*)(void*))ratelimitDestruct)) == NULL) {
/* in this case, we simply turn off rate-limiting */
DBGPRINTF("imuxsock: turning off rate limiting because we could not "
"create hash table\n");
@@ -460,11 +401,19 @@ addListner(instanceConf_t *inst)
listeners[nfd].flags = inst->bIgnoreTimestamp ? IGNDATE : NOFLAG;
listeners[nfd].bCreatePath = inst->bCreatePath;
listeners[nfd].sockName = ustrdup(inst->sockName);
- listeners[nfd].bUseCreds = (inst->bWritePid || inst->ratelimitInterval || inst->bAnnotate) ? 1 : 0;
+ listeners[nfd].bUseCreds = (inst->bDiscardOwnMsgs || inst->bWritePid || inst->ratelimitInterval || inst->bAnnotate) ? 1 : 0;
listeners[nfd].bAnnotate = inst->bAnnotate;
listeners[nfd].bParseTrusted = inst->bParseTrusted;
+ listeners[nfd].bDiscardOwnMsgs = inst->bDiscardOwnMsgs;
+ listeners[nfd].bUnlink = inst->bUnlink;
listeners[nfd].bWritePid = inst->bWritePid;
listeners[nfd].bUseSysTimeStamp = inst->bUseSysTimeStamp;
+ CHKiRet(ratelimitNew(&listeners[nfd].dflt_ratelimiter, "imuxsock", NULL));
+ ratelimitSetLinuxLike(listeners[nfd].dflt_ratelimiter,
+ listeners[nfd].ratelimitInterval,
+ listeners[nfd].ratelimitBurst);
+ ratelimitSetSeverity(listeners[nfd].dflt_ratelimiter,
+ listeners[nfd].ratelimitSev);
nfd++;
} else {
errmsg.LogError(0, NO_ERRCODE, "Out of unix socket name descriptors, ignoring %s\n",
@@ -476,7 +425,7 @@ finalize_it:
}
-/* discard all log sockets except for "socket" 0. Data for it comes from
+/* discard/Destruct all log sockets except for "socket" 0. Data for it comes from
* the constant memory pool - and if not, it is freeed via some other pointer.
*/
static rsRetVal discardLogSockets(void)
@@ -494,6 +443,7 @@ static rsRetVal discardLogSockets(void)
if(listeners[i].ht != NULL) {
hashtable_destroy(listeners[i].ht, 1); /* 1 => free all values automatically */
}
+ ratelimitDestruct(listeners[i].dflt_ratelimiter);
}
return RS_RET_OK;
@@ -508,7 +458,8 @@ createLogSocket(lstn_t *pLstn)
struct sockaddr_un sunx;
DEFiRet;
- unlink((char*)pLstn->sockName);
+ if(pLstn->bUnlink)
+ unlink((char*)pLstn->sockName);
memset(&sunx, 0, sizeof(sunx));
sunx.sun_family = AF_UNIX;
if(pLstn->bCreatePath) {
@@ -605,19 +556,26 @@ finalize_it:
* listener (the latter being a performance enhancement).
*/
static inline rsRetVal
-findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
+findRatelimiter(lstn_t *pLstn, struct ucred *cred, ratelimit_t **prl)
{
- rs_ratelimit_state_t *rl;
+ ratelimit_t *rl;
int r;
pid_t *keybuf;
+ char pidbuf[256];
DEFiRet;
if(cred == NULL)
FINALIZE;
+#if 0 // TODO: check deactivated?
if(pLstn->ratelimitInterval == 0) {
*prl = NULL;
FINALIZE;
}
+#endif
+ if(pLstn->ht == NULL) {
+ *prl = NULL;
+ FINALIZE;
+ }
rl = hashtable_search(pLstn->ht, &cred->pid);
if(rl == NULL) {
@@ -625,10 +583,14 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
DBGPRINTF("imuxsock: no ratelimiter for pid %lu, creating one\n",
(unsigned long) cred->pid);
STATSCOUNTER_INC(ctrNumRatelimiters, mutCtrNumRatelimiters);
- CHKmalloc(rl = malloc(sizeof(rs_ratelimit_state_t)));
+ snprintf(pidbuf, sizeof(pidbuf), "pid %lu",
+ (unsigned long) cred->pid);
+ pidbuf[sizeof(pidbuf)-1] = '\0'; /* to be on safe side */
+ CHKiRet(ratelimitNew(&rl, "imuxsock", pidbuf));
+ ratelimitSetLinuxLike(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst);
+ ratelimitSetSeverity(rl, pLstn->ratelimitSev);
CHKmalloc(keybuf = malloc(sizeof(pid_t)));
*keybuf = cred->pid;
- initRatelimitState(rl, pLstn->ratelimitInterval, pLstn->ratelimitBurst);
r = hashtable_insert(pLstn->ht, keybuf, rl);
if(r == 0)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
@@ -637,6 +599,8 @@ findRatelimiter(lstn_t *pLstn, struct ucred *cred, rs_ratelimit_state_t **prl)
*prl = rl;
finalize_it:
+ if(*prl == NULL)
+ *prl = pLstn->dflt_ratelimiter;
RETiRet;
}
@@ -763,28 +727,6 @@ copyescaped(uchar *dstbuf, uchar *inbuf, int inlen)
}
-#if 0
-/* Creates new field to be added to event
- * used for SystemLogParseTrusted parsing
- */
-struct ee_field *
-createNewField(char *fieldname, char *value, int lenValue) {
- es_str_t *newStr;
- struct ee_value *newVal;
- struct ee_field *newField;
-
- newStr = es_newStrFromBuf(value, (es_size_t) lenValue);
-
- newVal = ee_newValue(ctxee);
- ee_setStrValue(newVal, newStr);
-
- newField = ee_newFieldFromNV(ctxee, fieldname, newVal);
-
- return newField;
-}
-#endif
-
-
/* submit received message to the queue engine
* We now parse the message according to expected format so that we
* can also mangle it if necessary.
@@ -803,8 +745,8 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
uchar bufParseTAG[CONF_TAG_MAXSIZE];
struct syslogTime st;
time_t tt;
- rs_ratelimit_state_t *ratelimiter = NULL;
int lenProp;
+ ratelimit_t *ratelimiter = NULL;
uchar propBuf[1024];
uchar msgbuf[8192];
uchar *pmsgbuf;
@@ -813,6 +755,11 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
struct json_object *json = NULL, *jval;
DEFiRet;
+ if(pLstn->bDiscardOwnMsgs && cred != NULL && cred->pid == glblGetOurPid()) {
+ DBGPRINTF("imuxsock: discarding message from our own pid\n");
+ FINALIZE;
+ }
+
/* TODO: handle format errors?? */
/* we need to parse the pri first, because we need the severity for
* rate-limiting as well.
@@ -831,10 +778,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
facil = LOG_FAC(pri);
sever = LOG_PRI(pri);
- if(sever >= pLstn->ratelimitSev) {
- /* note: if cred == NULL, then ratelimiter == NULL as well! */
- findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */
- }
+ findRatelimiter(pLstn, cred, &ratelimiter); /* ignore error, better so than others... */
if(ts == NULL) {
datetime.getCurrTime(&st, &tt);
@@ -843,10 +787,12 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
tt = ts->tv_sec;
}
+#if 0 // TODO: think about stats counters (or wait for request...?)
if(ratelimiter != NULL && !withinRatelimit(ratelimiter, tt, cred->pid)) {
STATSCOUNTER_INC(ctrLostRatelimit, mutCtrLostRatelimit);
FINALIZE;
}
+#endif
/* created trusted properties */
if(cred != NULL && pLstn->bAnnotate) {
@@ -981,8 +927,7 @@ SubmitMsg(uchar *pRcv, int lenRcv, lstn_t *pLstn, struct ucred *cred, struct tim
MsgSetRcvFrom(pMsg, pLstn->hostName == NULL ? glbl.GetLocalHostNameProp() : pLstn->hostName);
CHKiRet(MsgSetRcvFromIP(pMsg, pLocalHostIP));
- CHKiRet(submitMsg(pMsg));
-
+ ratelimitAddMsg(ratelimiter, NULL, pMsg);
STATSCOUNTER_INC(ctrSubmit, mutCtrSubmit);
finalize_it:
RETiRet;
@@ -1117,13 +1062,20 @@ activateListeners()
listeners[0].ratelimitInterval = runModConf->ratelimitIntervalSysSock;
listeners[0].ratelimitBurst = runModConf->ratelimitBurstSysSock;
listeners[0].ratelimitSev = runModConf->ratelimitSeveritySysSock;
- listeners[0].bUseCreds = (runModConf->bWritePidSysSock || runModConf->ratelimitIntervalSysSock || runModConf->bAnnotateSysSock) ? 1 : 0;
+ listeners[0].bUseCreds = (runModConf->bWritePidSysSock || runModConf->ratelimitIntervalSysSock || runModConf->bAnnotateSysSock || runModConf->bDiscardOwnMsgs) ? 1 : 0;
listeners[0].bWritePid = runModConf->bWritePidSysSock;
listeners[0].bAnnotate = runModConf->bAnnotateSysSock;
listeners[0].bParseTrusted = runModConf->bParseTrusted;
+ listeners[0].bDiscardOwnMsgs = runModConf->bDiscardOwnMsgs;
+ listeners[0].bUnlink = runModConf->bUnlink;
listeners[0].bUseSysTimeStamp = runModConf->bUseSysTimeStamp;
listeners[0].flags = runModConf->bIgnoreTimestamp ? IGNDATE : NOFLAG;
listeners[0].flowCtl = runModConf->bUseFlowCtl ? eFLOWCTL_LIGHT_DELAY : eFLOWCTL_NO_DELAY;
+ CHKiRet(ratelimitNew(&listeners[0].dflt_ratelimiter, "imuxsock", NULL));
+ ratelimitSetLinuxLike(listeners[0].dflt_ratelimiter,
+ listeners[0].ratelimitInterval,
+ listeners[0].ratelimitBurst);
+ ratelimitSetSeverity(listeners[0].dflt_ratelimiter,listeners[0].ratelimitSev);
sd_fds = sd_listen_fds(0);
if(sd_fds < 0) {
@@ -1165,6 +1117,8 @@ CODESTARTbeginCnfLoad
pModConf->bWritePidSysSock = 0;
pModConf->bAnnotateSysSock = 0;
pModConf->bParseTrusted = 0;
+ pModConf->bDiscardOwnMsgs = 1;
+ pModConf->bUnlink = 1;
pModConf->ratelimitIntervalSysSock = DFLT_ratelimitInterval;
pModConf->ratelimitBurstSysSock = DFLT_ratelimitBurst;
pModConf->ratelimitSeveritySysSock = DFLT_ratelimitSeverity;
@@ -1199,6 +1153,10 @@ CODESTARTsetModCnf
loadModConf->pLogSockName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(modpblk.descr[i].name, "syssock.ignoretimestamp")) {
loadModConf->bIgnoreTimestamp = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "syssock.ignoreownmessages")) {
+ loadModConf->bDiscardOwnMsgs = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "syssock.unlink")) {
+ loadModConf->bUnlink = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "syssock.flowcontrol")) {
loadModConf->bUseFlowCtl = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "syssock.usesystimestamp")) {
@@ -1261,6 +1219,10 @@ CODESTARTnewInpInst
inst->bCreatePath = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "parsetrusted")) {
inst->bParseTrusted = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "ignoreownmessages")) {
+ inst->bDiscardOwnMsgs = (int) pvals[i].val.d.n;
+ } else if(!strcmp(modpblk.descr[i].name, "unlink")) {
+ inst->bUnlink = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "hostname")) {
inst->pLogHostName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(modpblk.descr[i].name, "ignoretimestamp")) {
@@ -1440,8 +1402,10 @@ CODESTARTafterRun
listeners[i].fd < SD_LISTEN_FDS_START + sd_fds)
continue;
- DBGPRINTF("imuxsock: unlinking unix socket file[%d] %s\n", i, listeners[i].sockName);
- unlink((char*) listeners[i].sockName);
+ if(listeners[i].bUnlink) {
+ DBGPRINTF("imuxsock: unlinking unix socket file[%d] %s\n", i, listeners[i].sockName);
+ unlink((char*) listeners[i].sockName);
+ }
}
discardLogSockets();
@@ -1553,8 +1517,17 @@ CODEmodInit_QueryRegCFSLineHdlr
listeners[0].bUseCreds = 0;
listeners[0].bAnnotate = 0;
listeners[0].bParseTrusted = 0;
+ listeners[0].bDiscardOwnMsgs = 1;
+ listeners[0].bUnlink = 1;
listeners[0].bCreatePath = 0;
listeners[0].bUseSysTimeStamp = 1;
+ if((listeners[0].ht = create_hashtable(100, hash_from_key_fn, key_equals_fn,
+ (void(*)(void*))ratelimitDestruct)) == NULL) {
+ /* in this case, we simply turn off rate-limiting */
+ DBGPRINTF("imuxsock: turning off rate limiting for system socket "
+ "because we could not create hash table\n");
+ listeners[0].ratelimitInterval = 0;
+ }
/* initialize socket names */
for(i = 1 ; i < MAXFUNIX ; ++i) {
diff --git a/plugins/mmanon/Makefile.am b/plugins/mmanon/Makefile.am
new file mode 100644
index 00000000..98f0da24
--- /dev/null
+++ b/plugins/mmanon/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = mmanon.la
+
+mmanon_la_SOURCES = mmanon.c
+mmanon_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+mmanon_la_LDFLAGS = -module -avoid-version
+mmanon_la_LIBADD =
+
+EXTRA_DIST =
diff --git a/plugins/mmanon/mmanon.c b/plugins/mmanon/mmanon.c
new file mode 100644
index 00000000..a1c99d09
--- /dev/null
+++ b/plugins/mmanon/mmanon.c
@@ -0,0 +1,401 @@
+/* mmanon.c
+ * anonnymize IP addresses inside the syslog message part
+ *
+ * Copyright 2013 Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include <stdint.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("mmanon")
+
+
+DEFobjCurrIf(errmsg);
+DEF_OMOD_STATIC_DATA
+
+/* config variables */
+
+/* precomputed table of IPv4 anonymization masks */
+static const uint32_t ipv4masks[33] = {
+ 0xffffffff, 0xfffffffe, 0xfffffffc, 0xfffffff8,
+ 0xfffffff0, 0xffffffe0, 0xffffffc0, 0xffffff80,
+ 0xffffff00, 0xfffffe00, 0xfffffc00, 0xfffff800,
+ 0xfffff000, 0xffffe000, 0xffffc000, 0xffff8000,
+ 0xffff0000, 0xfffe0000, 0xfffc0000, 0xfff80000,
+ 0xfff00000, 0xffe00000, 0xffc00000, 0xff800000,
+ 0xff000000, 0xfe000000, 0xfc000000, 0xf8000000,
+ 0xf0000000, 0xe0000000, 0xc0000000, 0x80000000,
+ 0x00000000
+ };
+
+/* define operation modes we have */
+#define SIMPLE_MODE 0 /* just overwrite */
+#define REWRITE_MODE 1 /* rewrite IP address, canoninized */
+typedef struct _instanceData {
+ char replChar;
+ int8_t mode;
+ struct {
+ int8_t bits;
+ } ipv4;
+} instanceData;
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "mode", eCmdHdlrGetWord, 0 },
+ { "replacementchar", eCmdHdlrGetChar, 0 },
+ { "ipv4.bits", eCmdHdlrInt, 0 },
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ENDfreeInstance
+
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->mode = REWRITE_MODE;
+ pData->replChar = 'x';
+ pData->ipv4.bits = 16;
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+ sbool bHadBitsErr;
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (mmanon)\n");
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "mode")) {
+ if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"simple",
+ sizeof("simple")-1)) {
+ pData->mode = SIMPLE_MODE;
+ } else if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"rewrite",
+ sizeof("rewrite")-1)) {
+ pData->mode = REWRITE_MODE;
+ } else {
+ char *cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
+ errmsg.LogError(0, RS_RET_INVLD_MODE,
+ "mmanon: invalid anonymization mode '%s' - ignored",
+ cstr);
+ free(cstr);
+ }
+ pData->replChar = es_getBufAddr(pvals[i].val.d.estr)[0];
+ } else if(!strcmp(actpblk.descr[i].name, "replacementchar")) {
+ pData->replChar = es_getBufAddr(pvals[i].val.d.estr)[0];
+ } else if(!strcmp(actpblk.descr[i].name, "ipv4.bits")) {
+ pData->ipv4.bits = (int8_t) pvals[i].val.d.n;
+ } else {
+ dbgprintf("mmanon: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+
+ if(pData->mode == SIMPLE_MODE) {
+ bHadBitsErr = 0;
+ if(pData->ipv4.bits < 8) {
+ pData->ipv4.bits = 8;
+ bHadBitsErr = 1;
+ } else if(pData->ipv4.bits < 16) {
+ pData->ipv4.bits = 16;
+ bHadBitsErr = 1;
+ } else if(pData->ipv4.bits < 24) {
+ pData->ipv4.bits = 24;
+ bHadBitsErr = 1;
+ } else if(pData->ipv4.bits != 32) {
+ pData->ipv4.bits = 32;
+ bHadBitsErr = 1;
+ }
+ if(bHadBitsErr)
+ errmsg.LogError(0, RS_RET_INVLD_ANON_BITS,
+ "mmanon: invalid number of ipv4 bits "
+ "in simple mode, corrected to %d",
+ pData->ipv4.bits);
+ } else { /* REWRITE_MODE */
+ if(pData->ipv4.bits < 1 || pData->ipv4.bits > 32) {
+ pData->ipv4.bits = 32;
+ errmsg.LogError(0, RS_RET_INVLD_ANON_BITS,
+ "mmanon: invalid number of ipv4 bits "
+ "in rewrite mode, corrected to %d",
+ pData->ipv4.bits);
+ }
+ if(pData->replChar != 'x') {
+ errmsg.LogError(0, RS_RET_REPLCHAR_IGNORED,
+ "mmanon: replacementChar parameter is ignored "
+ "in rewrite mode");
+ }
+ }
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+
+static int
+getnum(uchar *msg, int lenMsg, int *idx)
+{
+ int num = 0;
+ int i = *idx;
+
+ while(i < lenMsg && msg[i] >= '0' && msg[i] <= '9') {
+ num = num * 10 + msg[i] - '0';
+ ++i;
+ }
+
+ *idx = i;
+ return num;
+}
+
+
+/* write an IP address octet to the output position */
+static int
+writeOctet(uchar *msg, int idx, int *nxtidx, uint8_t octet)
+{
+ if(octet > 99) {
+ msg[idx++] = '0' + octet / 100;
+ octet = octet % 100;
+ }
+ if(octet > 9) {
+ msg[idx++] = '0' + octet / 10;
+ octet = octet % 10;
+ }
+ msg[idx++] = '0' + octet;
+
+ if(nxtidx != NULL) {
+ if(idx + 1 != *nxtidx) {
+ /* we got shorter, fix it! */
+ msg[idx] = '.';
+ *nxtidx = idx + 1;
+ }
+ }
+ return idx;
+}
+
+/* currently works for IPv4 only! */
+void
+anonip(instanceData *pData, uchar *msg, int *pLenMsg, int *idx)
+{
+ int i = *idx;
+ int octet;
+ uint32_t ipv4addr;
+ int ipstart[4];
+ int j;
+ int endpos;
+ int lenMsg = *pLenMsg;
+
+ while(i < lenMsg && (msg[i] <= '0' || msg[i] >= '9')) {
+ ++i; /* skip to first number */
+ }
+ if(i >= lenMsg)
+ goto done;
+
+ /* got digit, let's see if ip */
+ ipstart[0] = i;
+ octet = getnum(msg, lenMsg, &i);
+ if(octet > 255 || msg[i] != '.') goto done;
+ ipv4addr = octet << 24;
+ ++i;
+ ipstart[1] = i;
+ octet = getnum(msg, lenMsg, &i);
+ if(octet > 255 || msg[i] != '.') goto done;
+ ipv4addr |= octet << 16;
+ ++i;
+ ipstart[2] = i;
+ octet = getnum(msg, lenMsg, &i);
+ if(octet > 255 || msg[i] != '.') goto done;
+ ipv4addr |= octet << 8;
+ ++i;
+ ipstart[3] = i;
+ octet = getnum(msg, lenMsg, &i);
+ if(octet > 255 || !(msg[i] == ' ' || msg[i] == ':')) goto done;
+ ipv4addr |= octet;
+
+ /* OK, we now found an ip address */
+ if(pData->mode == SIMPLE_MODE) {
+ if(pData->ipv4.bits == 8)
+ j = ipstart[3];
+ else if(pData->ipv4.bits == 16)
+ j = ipstart[2];
+ else if(pData->ipv4.bits == 24)
+ j = ipstart[1];
+ else /* due to our checks, this *must* be 32 */
+ j = ipstart[0];
+ while(j < i) {
+ if(msg[j] != '.')
+ msg[j] = pData->replChar;
+ ++j;
+ }
+ } else { /* REWRITE_MODE */
+ ipv4addr &= ipv4masks[pData->ipv4.bits];
+ if(pData->ipv4.bits > 24)
+ writeOctet(msg, ipstart[0], &(ipstart[1]), ipv4addr >> 24);
+ if(pData->ipv4.bits > 16)
+ writeOctet(msg, ipstart[1], &(ipstart[2]), (ipv4addr >> 16) & 0xff);
+ if(pData->ipv4.bits > 8)
+ writeOctet(msg, ipstart[2], &(ipstart[3]), (ipv4addr >> 8) & 0xff);
+ endpos = writeOctet(msg, ipstart[3], NULL, ipv4addr & 0xff);
+ /* if we had truncation, we need to shrink the msg */
+ dbgprintf("existing i %d, endpos %d\n", i, endpos);
+ if(i - endpos > 0) {
+ *pLenMsg = lenMsg - (i - endpos);
+ memmove(msg+endpos, msg+i, lenMsg - i + 1);
+ }
+ }
+
+done: *idx = i;
+ return;
+}
+
+
+BEGINdoAction
+ msg_t *pMsg;
+ uchar *msg;
+ int lenMsg;
+ int i;
+CODESTARTdoAction
+ pMsg = (msg_t*) ppString[0];
+ lenMsg = getMSGLen(pMsg);
+ msg = getMSG(pMsg);
+ for(i = 0 ; i < lenMsg ; ++i) {
+ anonip(pData, msg, &lenMsg, &i);
+ }
+ if(lenMsg != getMSGLen(pMsg))
+ setMSGLen(pMsg, lenMsg);
+ENDdoAction
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char*) p, ":mmanon:", sizeof(":mmanon:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "mmanon supports only v6+ config format, use: "
+ "action(type=\"mmanon\" ...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("mmanon: module compiled with rsyslog version %s.\n", VERSION);
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ENDmodInit
diff --git a/plugins/mmjsonparse/mmjsonparse.c b/plugins/mmjsonparse/mmjsonparse.c
index a13962ab..c47aceb6 100644
--- a/plugins/mmjsonparse/mmjsonparse.c
+++ b/plugins/mmjsonparse/mmjsonparse.c
@@ -62,15 +62,46 @@ typedef struct _instanceData {
struct json_tokener *tokener;
} instanceData;
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
-BEGINinitConfVars /* (re)set config variables to default values */
-CODESTARTinitConfVars
- resetConfigVariables(NULL, NULL);
-ENDinitConfVars
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
BEGINcreateInstance
CODESTARTcreateInstance
+ pData->tokener = json_tokener_new();
+ if(pData->tokener == NULL) {
+ errmsg.LogError(0, RS_RET_ERR, "error: could not create json "
+ "tokener, cannot activate action");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+finalize_it:
ENDcreateInstance
@@ -88,7 +119,7 @@ ENDfreeInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
- dbgprintf("mmjsonparse\n");
+ DBGPRINTF("mmjsonparse\n");
ENDdbgPrintInstInfo
@@ -104,7 +135,8 @@ processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf)
const char *errMsg;
DEFiRet;
- dbgprintf("mmjsonparse: toParse: '%s'\n", buf);
+ assert(pData->tokener != NULL);
+ DBGPRINTF("mmjsonparse: toParse: '%s'\n", buf);
json_tokener_reset(pData->tokener);
json = json_tokener_parse_ex(pData->tokener, buf, lenBuf);
@@ -123,7 +155,7 @@ processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf)
else if(!json_object_is_type(json, json_type_object))
errMsg = "JSON value is not an object";
if(errMsg != NULL) {
- dbgprintf("mmjsonparse: Error parsing JSON '%s': %s\n",
+ DBGPRINTF("mmjsonparse: Error parsing JSON '%s': %s\n",
buf, errMsg);
}
}
@@ -177,6 +209,22 @@ finalize_it:
MsgSetParseSuccess(pMsg, bSuccess);
ENDdoAction
+BEGINnewActInst
+CODESTARTnewActInst
+ /* Note: we currently do not have any parameters, so we do not need
+ * the lst ptr. However, we will most probably need params in the
+ * future.
+ */
+ DBGPRINTF("newActInst (mmjsonparse)\n");
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ /*setInstParamDefaults(pData);*/
+
+CODE_STD_FINALIZERnewActInst
+/* cnfparamvalsDestruct(pvals, &actpblk);*/
+ENDnewActInst
BEGINparseSelectorAct
CODESTARTparseSelectorAct
@@ -197,14 +245,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
* the format specified (if any) is always ignored.
*/
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_MSG, (uchar*) "RSYSLOG_FileFormat"));
-
- /* finally build the instance */
- pData->tokener = json_tokener_new();
- if(pData->tokener == NULL) {
- errmsg.LogError(0, RS_RET_ERR, "error: could not create json "
- "tokener, cannot activate action");
- ABORT_FINALIZE(RS_RET_ERR);
- }
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -218,7 +258,8 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
-CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
@@ -238,7 +279,6 @@ BEGINmodInit()
unsigned long opts;
int bMsgPassingSupported;
CODESTARTmodInit
-INITLegCnfVars
*ipIFVersProvided = CURR_MOD_IF_VERSION;
/* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
diff --git a/plugins/mmnormalize/mmnormalize.c b/plugins/mmnormalize/mmnormalize.c
index c366d5fe..fcadc328 100644
--- a/plugins/mmnormalize/mmnormalize.c
+++ b/plugins/mmnormalize/mmnormalize.c
@@ -65,6 +65,7 @@ DEF_OMOD_STATIC_DATA
typedef struct _instanceData {
sbool bUseRawMsg; /**< use %rawmsg% instead of %msg% */
+ uchar *rulebase; /**< name of rulebase to use */
ln_ctx ctxln; /**< context to be used for liblognorm */
ee_ctx ctxee; /**< context to be used for libee */
} instanceData;
@@ -75,6 +76,58 @@ typedef struct configSettings_s {
} configSettings_t;
static configSettings_t cs;
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "rulebase", eCmdHdlrGetWord, 1 },
+ { "userawmsg", eCmdHdlrBinary, 0 }
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+
+
+/* to be called to build the libee part of the instance ONCE ALL PARAMETERS ARE CORRECT
+ * (and set within pData!).
+ */
+static rsRetVal
+buildInstance(instanceData *pData)
+{
+ DEFiRet;
+ if((pData->ctxee = ee_initCtx()) == NULL) {
+ errmsg.LogError(0, RS_RET_ERR_LIBEE_INIT, "error: could not initialize libee "
+ "ctx, cannot activate action");
+ ABORT_FINALIZE(RS_RET_ERR_LIBEE_INIT);
+ }
+
+ if((pData->ctxln = ln_initCtx()) == NULL) {
+ errmsg.LogError(0, RS_RET_ERR_LIBLOGNORM_INIT, "error: could not initialize "
+ "liblognorm ctx, cannot activate action");
+ ee_exitCtx(pData->ctxee);
+ ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_INIT);
+ }
+ ln_setEECtx(pData->ctxln, pData->ctxee);
+ if(ln_loadSamples(pData->ctxln, (char*) pData->rulebase) != 0) {
+ errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' "
+ "could not be loaded cannot activate action", cs.rulebase);
+ ee_exitCtx(pData->ctxee);
+ ln_exitCtx(pData->ctxln);
+ ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
+ }
+finalize_it:
+ RETiRet;
+}
+
+
BEGINinitConfVars /* (re)set config variables to default values */
CODESTARTinitConfVars
resetConfigVariables(NULL, NULL);
@@ -86,6 +139,35 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ loadModConf = NULL; /* done loading */
+ /* free legacy config vars */
+ free(cs.rulebase);
+ cs.rulebase = NULL;
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
@@ -93,6 +175,7 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
+ free(pData->rulebase);
ee_exitCtx(pData->ctxee);
ln_exitCtx(pData->ctxln);
ENDfreeInstance
@@ -141,7 +224,7 @@ CODESTARTdoAction
es_deleteStr(str);
/* reformat to our json data struct */
- // TODO: this is all extremly ineffcient!
+ /* TODO: this is all extremly ineffcient! */
ee_fmtEventToJSON(event, &str);
cstrJSON = es_str2cstr(str, NULL);
dbgprintf("mmnormalize generated: %s\n", cstrJSON);
@@ -156,6 +239,59 @@ CODESTARTdoAction
ENDdoAction
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->rulebase = NULL;
+ pData->bUseRawMsg = 0;
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+ int bDestructPValsOnExit;
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (mmnormalize)\n");
+
+ bDestructPValsOnExit = 0;
+ pvals = nvlstGetParams(lst, &actpblk, NULL);
+ if(pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "mmnormalize: error reading "
+ "config parameters");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+ bDestructPValsOnExit = 1;
+
+ if(Debug) {
+ dbgprintf("action param blk in mmnormalize:\n");
+ cnfparamsPrint(&actpblk, pvals);
+ }
+
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "rulebase")) {
+ pData->rulebase = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "userawmsg")) {
+ pData->bUseRawMsg = (int) pvals[i].val.d.n;
+ } else {
+ DBGPRINTF("mmnormalize: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+
+ iRet = buildInstance(pData);
+CODE_STD_FINALIZERnewActInst
+ if(bDestructPValsOnExit)
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
BEGINparseSelectorAct
CODESTARTparseSelectorAct
CODE_STD_STRING_REQUESTparseSelectorAct(1)
@@ -165,15 +301,21 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
}
if(cs.rulebase == NULL) {
- errmsg.LogError(0, RS_RET_NO_RULESET, "error: no normalization rulebase was specified, use "
+ errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: no normalization rulebase was specified, use "
"$MMNormalizeSampleDB directive first!");
- ABORT_FINALIZE(RS_RET_NO_RULESET);
+ ABORT_FINALIZE(RS_RET_NO_RULEBASE);
}
/* ok, if we reach this point, we have something for us */
p += sizeof(":mmnormalize:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
CHKiRet(createInstance(&pData));
+ pData->rulebase = cs.rulebase;
+ pData->bUseRawMsg = cs.bUseRawMsg;
+ /* all config vars auto-reset! */
+ cs.bUseRawMsg = 0;
+ cs.rulebase = NULL; /* we used it up! */
+
/* check if a non-standard template is to be applied */
if(*(p-1) == ';')
--p;
@@ -181,34 +323,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
* the format specified (if any) is always ignored.
*/
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_MSG, (uchar*) "RSYSLOG_FileFormat"));
-
- /* finally build the instance */
- if((pData->ctxee = ee_initCtx()) == NULL) {
- errmsg.LogError(0, RS_RET_NO_RULESET, "error: could not initialize libee ctx, cannot "
- "activate action");
- ABORT_FINALIZE(RS_RET_ERR_LIBEE_INIT);
- }
-
- if((pData->ctxln = ln_initCtx()) == NULL) {
- errmsg.LogError(0, RS_RET_NO_RULESET, "error: could not initialize liblognorm ctx, cannot "
- "activate action");
- ee_exitCtx(pData->ctxee);
- ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_INIT);
- }
- ln_setEECtx(pData->ctxln, pData->ctxee);
- if(ln_loadSamples(pData->ctxln, (char*) cs.rulebase) != 0) {
- errmsg.LogError(0, RS_RET_NO_RULESET, "error: normalization rulebase '%s' could not be loaded "
- "cannot activate action", cs.rulebase);
- ee_exitCtx(pData->ctxee);
- ln_exitCtx(pData->ctxln);
- ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
- }
- pData->bUseRawMsg = cs.bUseRawMsg;
-
- /* all config vars auto-reset! */
- cs.bUseRawMsg = 0;
- free(cs.rulebase);
- cs.rulebase = NULL;
+ CHKiRet(buildInstance(pData));
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -222,7 +337,8 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
-CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omelasticsearch/Makefile.am b/plugins/omelasticsearch/Makefile.am
index 2fadb74d..ba85a896 100644
--- a/plugins/omelasticsearch/Makefile.am
+++ b/plugins/omelasticsearch/Makefile.am
@@ -1,6 +1,7 @@
pkglib_LTLIBRARIES = omelasticsearch.la
-omelasticsearch_la_SOURCES = omelasticsearch.c
+# TODO: replace cJSON
+omelasticsearch_la_SOURCES = omelasticsearch.c cJSON/cjson.c cJSON/cjson.h
omelasticsearch_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
omelasticsearch_la_LDFLAGS = -module -avoid-version
omelasticsearch_la_LIBADD = $(CURL_LIBS) $(LIBM)
diff --git a/plugins/omelasticsearch/README b/plugins/omelasticsearch/README
new file mode 100644
index 00000000..9021bc0e
--- /dev/null
+++ b/plugins/omelasticsearch/README
@@ -0,0 +1,17 @@
+How to produce an error:
+========================
+It's quite easy to get 400, if you put a wrong mapping to your
+index. That would be easy to reproduce in "normal" omelasticsearch usage
+conditions, by only altering the ES configuration:
+
+1. Make your index first. Let's call it "testindex":
+$ curl -XPUT localhost:9200/testindex/
+
+2. Put your mapping for a search type called "mytype", where you specify
+that date property should be an integer:
+$ curl -XPUT localhost:9200/testindex/mytype/_mapping -d '{"mytype":{"properties": {"timegenerated":{"type":"integer"}}}}'
+
+3. Now try to insert something where date is not an integer:
+$ curl -XPOST localhost:9200/testindex/mytype/ -d '{"timegenerated":"bla"}'
+{"error":"MapperParsingException[Failed to parse [date]]; nested: NumberFormatException[For input string: \"bla\"]; ","status":400}
+
diff --git a/plugins/omelasticsearch/cJSON/README b/plugins/omelasticsearch/cJSON/README
new file mode 100644
index 00000000..7531c049
--- /dev/null
+++ b/plugins/omelasticsearch/cJSON/README
@@ -0,0 +1,247 @@
+/*
+ Copyright (c) 2009 Dave Gamble
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
+*/
+
+Welcome to cJSON.
+
+cJSON aims to be the dumbest possible parser that you can get your job done with.
+It's a single file of C, and a single header file.
+
+JSON is described best here: http://www.json.org/
+It's like XML, but fat-free. You use it to move data around, store things, or just
+generally represent your program's state.
+
+
+First up, how do I build?
+Add cJSON.c to your project, and put cJSON.h somewhere in the header search path.
+For example, to build the test app:
+
+gcc cJSON.c test.c -o test -lm
+./test
+
+
+As a library, cJSON exists to take away as much legwork as it can, but not get in your way.
+As a point of pragmatism (i.e. ignoring the truth), I'm going to say that you can use it
+in one of two modes: Auto and Manual. Let's have a quick run-through.
+
+
+I lifted some JSON from this page: http://www.json.org/fatfree.html
+That page inspired me to write cJSON, which is a parser that tries to share the same
+philosophy as JSON itself. Simple, dumb, out of the way.
+
+Some JSON:
+{
+ "name": "Jack (\"Bee\") Nimble",
+ "format": {
+ "type": "rect",
+ "width": 1920,
+ "height": 1080,
+ "interlace": false,
+ "frame rate": 24
+ }
+}
+
+Assume that you got this from a file, a webserver, or magic JSON elves, whatever,
+you have a char * to it. Everything is a cJSON struct.
+Get it parsed:
+ cJSON *root = cJSON_Parse(my_json_string);
+
+This is an object. We're in C. We don't have objects. But we do have structs.
+What's the framerate?
+
+ cJSON *format = cJSON_GetObjectItem(root,"format");
+ int framerate = cJSON_GetObjectItem(format,"frame rate")->valueint;
+
+
+Want to change the framerate?
+ cJSON_GetObjectItem(format,"frame rate")->valueint=25;
+
+Back to disk?
+ char *rendered=cJSON_Print(root);
+
+Finished? Delete the root (this takes care of everything else).
+ cJSON_Delete(root);
+
+That's AUTO mode. If you're going to use Auto mode, you really ought to check pointers
+before you dereference them. If you want to see how you'd build this struct in code?
+ cJSON *root,*fmt;
+ root=cJSON_CreateObject();
+ cJSON_AddItemToObject(root, "name", cJSON_CreateString("Jack (\"Bee\") Nimble"));
+ cJSON_AddItemToObject(root, "format", fmt=cJSON_CreateObject());
+ cJSON_AddStringToObject(fmt,"type", "rect");
+ cJSON_AddNumberToObject(fmt,"width", 1920);
+ cJSON_AddNumberToObject(fmt,"height", 1080);
+ cJSON_AddFalseToObject (fmt,"interlace");
+ cJSON_AddNumberToObject(fmt,"frame rate", 24);
+
+Hopefully we can agree that's not a lot of code? There's no overhead, no unnecessary setup.
+Look at test.c for a bunch of nice examples, mostly all ripped off the json.org site, and
+a few from elsewhere.
+
+What about manual mode? First up you need some detail.
+Let's cover how the cJSON objects represent the JSON data.
+cJSON doesn't distinguish arrays from objects in handling; just type.
+Each cJSON has, potentially, a child, siblings, value, a name.
+
+The root object has: Object Type and a Child
+The Child has name "name", with value "Jack ("Bee") Nimble", and a sibling:
+Sibling has type Object, name "format", and a child.
+That child has type String, name "type", value "rect", and a sibling:
+Sibling has type Number, name "width", value 1920, and a sibling:
+Sibling has type Number, name "height", value 1080, and a sibling:
+Sibling hs type False, name "interlace", and a sibling:
+Sibling has type Number, name "frame rate", value 24
+
+Here's the structure:
+typedef struct cJSON {
+ struct cJSON *next,*prev;
+ struct cJSON *child;
+
+ int type;
+
+ char *valuestring;
+ int valueint;
+ double valuedouble;
+
+ char *string;
+} cJSON;
+
+By default all values are 0 unless set by virtue of being meaningful.
+
+next/prev is a doubly linked list of siblings. next takes you to your sibling,
+prev takes you back from your sibling to you.
+Only objects and arrays have a "child", and it's the head of the doubly linked list.
+A "child" entry will have prev==0, but next potentially points on. The last sibling has next=0.
+The type expresses Null/True/False/Number/String/Array/Object, all of which are #defined in
+cJSON.h
+
+A Number has valueint and valuedouble. If you're expecting an int, read valueint, if not read
+valuedouble.
+
+Any entry which is in the linked list which is the child of an object will have a "string"
+which is the "name" of the entry. When I said "name" in the above example, that's "string".
+"string" is the JSON name for the 'variable name' if you will.
+
+Now you can trivially walk the lists, recursively, and parse as you please.
+You can invoke cJSON_Parse to get cJSON to parse for you, and then you can take
+the root object, and traverse the structure (which is, formally, an N-tree),
+and tokenise as you please. If you wanted to build a callback style parser, this is how
+you'd do it (just an example, since these things are very specific):
+
+void parse_and_callback(cJSON *item,const char *prefix)
+{
+ while (item)
+ {
+ char *newprefix=malloc(strlen(prefix)+strlen(item->name)+2);
+ sprintf(newprefix,"%s/%s",prefix,item->name);
+ int dorecurse=callback(newprefix, item->type, item);
+ if (item->child && dorecurse) parse_and_callback(item->child,newprefix);
+ item=item->next;
+ free(newprefix);
+ }
+}
+
+The prefix process will build you a separated list, to simplify your callback handling.
+The 'dorecurse' flag would let the callback decide to handle sub-arrays on it's own, or
+let you invoke it per-item. For the item above, your callback might look like this:
+
+int callback(const char *name,int type,cJSON *item)
+{
+ if (!strcmp(name,"name")) { /* populate name */ }
+ else if (!strcmp(name,"format/type") { /* handle "rect" */ }
+ else if (!strcmp(name,"format/width") { /* 800 */ }
+ else if (!strcmp(name,"format/height") { /* 600 */ }
+ else if (!strcmp(name,"format/interlace") { /* false */ }
+ else if (!strcmp(name,"format/frame rate") { /* 24 */ }
+ return 1;
+}
+
+Alternatively, you might like to parse iteratively.
+You'd use:
+
+void parse_object(cJSON *item)
+{
+ int i; for (i=0;i<cJSON_GetArraySize(item);i++)
+ {
+ cJSON *subitem=cJSON_GetArrayItem(item,i);
+ // handle subitem.
+ }
+}
+
+Or, for PROPER manual mode:
+
+void parse_object(cJSON *item)
+{
+ cJSON *subitem=item->child;
+ while (subitem)
+ {
+ // handle subitem
+ if (subitem->child) parse_object(subitem->child);
+
+ subitem=subitem->next;
+ }
+}
+
+Of course, this should look familiar, since this is just a stripped-down version
+of the callback-parser.
+
+This should cover most uses you'll find for parsing. The rest should be possible
+to infer.. and if in doubt, read the source! There's not a lot of it! ;)
+
+
+In terms of constructing JSON data, the example code above is the right way to do it.
+You can, of course, hand your sub-objects to other functions to populate.
+Also, if you find a use for it, you can manually build the objects.
+For instance, suppose you wanted to build an array of objects?
+
+cJSON *objects[24];
+
+cJSON *Create_array_of_anything(cJSON **items,int num)
+{
+ int i;cJSON *prev, *root=cJSON_CreateArray();
+ for (i=0;i<24;i++)
+ {
+ if (!i) root->child=objects[i];
+ else prev->next=objects[i], objects[i]->prev=prev;
+ prev=objects[i];
+ }
+ return root;
+}
+
+and simply: Create_array_of_anything(objects,24);
+
+cJSON doesn't make any assumptions about what order you create things in.
+You can attach the objects, as above, and later add children to each
+of those objects.
+
+As soon as you call cJSON_Print, it renders the structure to text.
+
+
+
+The test.c code shows how to handle a bunch of typical cases. If you uncomment
+the code, it'll load, parse and print a bunch of test files, also from json.org,
+which are more complex than I'd care to try and stash into a const char array[].
+
+
+Enjoy cJSON!
+
+
+- Dave Gamble, Aug 2009
diff --git a/plugins/omelasticsearch/cJSON/cjson.c b/plugins/omelasticsearch/cJSON/cjson.c
new file mode 100644
index 00000000..99a831e9
--- /dev/null
+++ b/plugins/omelasticsearch/cJSON/cjson.c
@@ -0,0 +1,514 @@
+/*
+ Copyright (c) 2009 Dave Gamble
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
+*/
+
+/* cJSON */
+/* JSON parser in C. */
+
+#include <string.h>
+#include <stdio.h>
+#include <math.h>
+#include <stdlib.h>
+#include <float.h>
+#include <limits.h>
+#include <ctype.h>
+#include "cjson.h"
+
+static const char *ep;
+
+const char *cJSON_GetErrorPtr() {return ep;}
+
+static int cJSON_strcasecmp(const char *s1,const char *s2)
+{
+ if (!s1) return (s1==s2)?0:1;if (!s2) return 1;
+ for(; tolower(*s1) == tolower(*s2); ++s1, ++s2) if(*s1 == 0) return 0;
+ return tolower(*(const unsigned char *)s1) - tolower(*(const unsigned char *)s2);
+}
+
+static void *(*cJSON_malloc)(size_t sz) = malloc;
+static void (*cJSON_free)(void *ptr) = free;
+
+static char* cJSON_strdup(const char* str)
+{
+ size_t len;
+ char* copy;
+
+ len = strlen(str) + 1;
+ if (!(copy = (char*)cJSON_malloc(len))) return 0;
+ memcpy(copy,str,len);
+ return copy;
+}
+
+void cJSON_InitHooks(cJSON_Hooks* hooks)
+{
+ if (!hooks) { /* Reset hooks */
+ cJSON_malloc = malloc;
+ cJSON_free = free;
+ return;
+ }
+
+ cJSON_malloc = (hooks->malloc_fn)?hooks->malloc_fn:malloc;
+ cJSON_free = (hooks->free_fn)?hooks->free_fn:free;
+}
+
+/* Internal constructor. */
+static cJSON *cJSON_New_Item()
+{
+ cJSON* node = (cJSON*)cJSON_malloc(sizeof(cJSON));
+ if (node) memset(node,0,sizeof(cJSON));
+ return node;
+}
+
+/* Delete a cJSON structure. */
+void cJSON_Delete(cJSON *c)
+{
+ cJSON *next;
+ while (c)
+ {
+ next=c->next;
+ if (!(c->type&cJSON_IsReference) && c->child) cJSON_Delete(c->child);
+ if (!(c->type&cJSON_IsReference) && c->valuestring) cJSON_free(c->valuestring);
+ if (c->string) cJSON_free(c->string);
+ cJSON_free(c);
+ c=next;
+ }
+}
+
+/* Parse the input text to generate a number, and populate the result into item. */
+static const char *parse_number(cJSON *item,const char *num)
+{
+ double n=0,sign=1,scale=0;int subscale=0,signsubscale=1;
+
+ /* Could use sscanf for this? */
+ if (*num=='-') sign=-1,num++; /* Has sign? */
+ if (*num=='0') num++; /* is zero */
+ if (*num>='1' && *num<='9') do n=(n*10.0)+(*num++ -'0'); while (*num>='0' && *num<='9'); /* Number? */
+ if (*num=='.' && num[1]>='0' && num[1]<='9') {num++; do n=(n*10.0)+(*num++ -'0'),scale--; while (*num>='0' && *num<='9');} /* Fractional part? */
+ if (*num=='e' || *num=='E') /* Exponent? */
+ { num++;if (*num=='+') num++; else if (*num=='-') signsubscale=-1,num++; /* With sign? */
+ while (*num>='0' && *num<='9') subscale=(subscale*10)+(*num++ - '0'); /* Number? */
+ }
+
+ n=sign*n*pow(10.0,(scale+subscale*signsubscale)); /* number = +/- number.fraction * 10^+/- exponent */
+
+ item->valuedouble=n;
+ item->valueint=(int)n;
+ item->type=cJSON_Number;
+ return num;
+}
+
+/* Render the number nicely from the given item into a string. */
+char *cJSON_print_number(cJSON *item)
+{
+ char *str;
+ double d=item->valuedouble;
+ if (fabs(((double)item->valueint)-d)<=DBL_EPSILON && d<=INT_MAX && d>=INT_MIN)
+ {
+ str=(char*)cJSON_malloc(21); /* 2^64+1 can be represented in 21 chars. */
+ if (str) sprintf(str,"%d",item->valueint);
+ }
+ else
+ {
+ str=(char*)cJSON_malloc(64); /* This is a nice tradeoff. */
+ if (str)
+ {
+ if (fabs(floor(d)-d)<=DBL_EPSILON) sprintf(str,"%.0f",d);
+ else if (fabs(d)<1.0e-6 || fabs(d)>1.0e9) sprintf(str,"%e",d);
+ else sprintf(str,"%f",d);
+ }
+ }
+ return str;
+}
+
+/* Parse the input text into an unescaped cstring, and populate item. */
+static const unsigned char firstByteMark[7] = { 0x00, 0x00, 0xC0, 0xE0, 0xF0, 0xF8, 0xFC };
+static const char *parse_string(cJSON *item,const char *str)
+{
+ const char *ptr=str+1;char *ptr2;char *out;int len=0;unsigned uc,uc2;
+ if (*str!='\"') {ep=str;return 0;} /* not a string! */
+
+ while (*ptr!='\"' && *ptr && ++len) if (*ptr++ == '\\') ptr++; /* Skip escaped quotes. */
+
+ out=(char*)cJSON_malloc(len+1); /* This is how long we need for the string, roughly. */
+ if (!out) return 0;
+
+ ptr=str+1;ptr2=out;
+ while (*ptr!='\"' && *ptr)
+ {
+ if (*ptr!='\\') *ptr2++=*ptr++;
+ else
+ {
+ ptr++;
+ switch (*ptr)
+ {
+ case 'b': *ptr2++='\b'; break;
+ case 'f': *ptr2++='\f'; break;
+ case 'n': *ptr2++='\n'; break;
+ case 'r': *ptr2++='\r'; break;
+ case 't': *ptr2++='\t'; break;
+ case 'u': /* transcode utf16 to utf8. */
+ sscanf(ptr+1,"%4x",&uc);ptr+=4; /* get the unicode char. */
+
+ if ((uc>=0xDC00 && uc<=0xDFFF) || uc==0) break; // check for invalid.
+
+ if (uc>=0xD800 && uc<=0xDBFF) // UTF16 surrogate pairs.
+ {
+ if (ptr[1]!='\\' || ptr[2]!='u') break; // missing second-half of surrogate.
+ sscanf(ptr+3,"%4x",&uc2);ptr+=6;
+ if (uc2<0xDC00 || uc2>0xDFFF) break; // invalid second-half of surrogate.
+ uc=0x10000 | ((uc&0x3FF)<<10) | (uc2&0x3FF);
+ }
+
+ len=4;if (uc<0x80) len=1;else if (uc<0x800) len=2;else if (uc<0x10000) len=3; ptr2+=len;
+
+ switch (len) {
+ case 4: *--ptr2 =((uc | 0x80) & 0xBF); uc >>= 6;
+ case 3: *--ptr2 =((uc | 0x80) & 0xBF); uc >>= 6;
+ case 2: *--ptr2 =((uc | 0x80) & 0xBF); uc >>= 6;
+ case 1: *--ptr2 =(uc | firstByteMark[len]);
+ }
+ ptr2+=len;
+ break;
+ default: *ptr2++=*ptr; break;
+ }
+ ptr++;
+ }
+ }
+ *ptr2=0;
+ if (*ptr=='\"') ptr++;
+ item->valuestring=out;
+ item->type=cJSON_String;
+ return ptr;
+}
+
+/* Render the cstring provided to an escaped version that can be printed. */
+static char *print_string_ptr(const char *str)
+{
+ const char *ptr;char *ptr2,*out;int len=0;unsigned char token;
+
+ if (!str) return cJSON_strdup("");
+ ptr=str;while ((token=*ptr) && ++len) {if (strchr("\"\\\b\f\n\r\t",token)) len++; else if (token<32) len+=5;ptr++;}
+
+ out=(char*)cJSON_malloc(len+3);
+ if (!out) return 0;
+
+ ptr2=out;ptr=str;
+ *ptr2++='\"';
+ while (*ptr)
+ {
+ if ((unsigned char)*ptr>31 && *ptr!='\"' && *ptr!='\\') *ptr2++=*ptr++;
+ else
+ {
+ *ptr2++='\\';
+ switch (token=*ptr++)
+ {
+ case '\\': *ptr2++='\\'; break;
+ case '\"': *ptr2++='\"'; break;
+ case '\b': *ptr2++='b'; break;
+ case '\f': *ptr2++='f'; break;
+ case '\n': *ptr2++='n'; break;
+ case '\r': *ptr2++='r'; break;
+ case '\t': *ptr2++='t'; break;
+ default: sprintf(ptr2,"u%04x",token);ptr2+=5; break; /* escape and print */
+ }
+ }
+ }
+ *ptr2++='\"';*ptr2++=0;
+ return out;
+}
+/* Invote print_string_ptr (which is useful) on an item. */
+static char *print_string(cJSON *item) {return print_string_ptr(item->valuestring);}
+
+/* Predeclare these prototypes. */
+static const char *parse_value(cJSON *item,const char *value);
+static char *print_value(cJSON *item,int depth,int fmt);
+static const char *parse_array(cJSON *item,const char *value);
+static char *print_array(cJSON *item,int depth,int fmt);
+static const char *parse_object(cJSON *item,const char *value);
+static char *print_object(cJSON *item,int depth,int fmt);
+
+/* Utility to jump whitespace and cr/lf */
+static const char *skip(const char *in) {while (in && *in && (unsigned char)*in<=32) in++; return in;}
+
+/* Parse an object - create a new root, and populate. */
+cJSON *cJSON_Parse(const char *value)
+{
+ cJSON *c=cJSON_New_Item();
+ ep=0;
+ if (!c) return 0; /* memory fail */
+
+ if (!parse_value(c,skip(value))) {cJSON_Delete(c);return 0;}
+ return c;
+}
+
+/* Render a cJSON item/entity/structure to text. */
+char *cJSON_Print(cJSON *item) {return print_value(item,0,1);}
+char *cJSON_PrintUnformatted(cJSON *item) {return print_value(item,0,0);}
+
+/* Parser core - when encountering text, process appropriately. */
+static const char *parse_value(cJSON *item,const char *value)
+{
+ if (!value) return 0; /* Fail on null. */
+ if (!strncmp(value,"null",4)) { item->type=cJSON_NULL; return value+4; }
+ if (!strncmp(value,"false",5)) { item->type=cJSON_False; return value+5; }
+ if (!strncmp(value,"true",4)) { item->type=cJSON_True; item->valueint=1; return value+4; }
+ if (*value=='\"') { return parse_string(item,value); }
+ if (*value=='-' || (*value>='0' && *value<='9')) { return parse_number(item,value); }
+ if (*value=='[') { return parse_array(item,value); }
+ if (*value=='{') { return parse_object(item,value); }
+
+ ep=value;return 0; /* failure. */
+}
+
+/* Render a value to text. */
+static char *print_value(cJSON *item,int depth,int fmt)
+{
+ char *out=0;
+ if (!item) return 0;
+ switch ((item->type)&255)
+ {
+ case cJSON_NULL: out=cJSON_strdup("null"); break;
+ case cJSON_False: out=cJSON_strdup("false");break;
+ case cJSON_True: out=cJSON_strdup("true"); break;
+ case cJSON_Number: out=cJSON_print_number(item);break;
+ case cJSON_String: out=print_string(item);break;
+ case cJSON_Array: out=print_array(item,depth,fmt);break;
+ case cJSON_Object: out=print_object(item,depth,fmt);break;
+ }
+ return out;
+}
+
+/* Build an array from input text. */
+static const char *parse_array(cJSON *item,const char *value)
+{
+ cJSON *child;
+ if (*value!='[') {ep=value;return 0;} /* not an array! */
+
+ item->type=cJSON_Array;
+ value=skip(value+1);
+ if (*value==']') return value+1; /* empty array. */
+
+ item->child=child=cJSON_New_Item();
+ if (!item->child) return 0; /* memory fail */
+ value=skip(parse_value(child,skip(value))); /* skip any spacing, get the value. */
+ if (!value) return 0;
+
+ while (*value==',')
+ {
+ cJSON *new_item;
+ if (!(new_item=cJSON_New_Item())) return 0; /* memory fail */
+ child->next=new_item;new_item->prev=child;child=new_item;
+ value=skip(parse_value(child,skip(value+1)));
+ if (!value) return 0; /* memory fail */
+ }
+
+ if (*value==']') return value+1; /* end of array */
+ ep=value;return 0; /* malformed. */
+}
+
+/* Render an array to text */
+static char *print_array(cJSON *item,int depth,int fmt)
+{
+ char **entries;
+ char *out=0,*ptr,*ret;int len=5;
+ cJSON *child=item->child;
+ int numentries=0,i=0,fail=0;
+
+ /* How many entries in the array? */
+ while (child) numentries++,child=child->next;
+ /* Allocate an array to hold the values for each */
+ entries=(char**)cJSON_malloc(numentries*sizeof(char*));
+ if (!entries) return 0;
+ memset(entries,0,numentries*sizeof(char*));
+ /* Retrieve all the results: */
+ child=item->child;
+ while (child && !fail)
+ {
+ ret=print_value(child,depth+1,fmt);
+ entries[i++]=ret;
+ if (ret) len+=strlen(ret)+2+(fmt?1:0); else fail=1;
+ child=child->next;
+ }
+
+ /* If we didn't fail, try to malloc the output string */
+ if (!fail) out=(char*)cJSON_malloc(len);
+ /* If that fails, we fail. */
+ if (!out) fail=1;
+
+ /* Handle failure. */
+ if (fail)
+ {
+ for (i=0;i<numentries;i++) if (entries[i]) cJSON_free(entries[i]);
+ cJSON_free(entries);
+ return 0;
+ }
+
+ /* Compose the output array. */
+ *out='[';
+ ptr=out+1;*ptr=0;
+ for (i=0;i<numentries;i++)
+ {
+ strcpy(ptr,entries[i]);ptr+=strlen(entries[i]);
+ if (i!=numentries-1) {*ptr++=',';if(fmt)*ptr++=' ';*ptr=0;}
+ cJSON_free(entries[i]);
+ }
+ cJSON_free(entries);
+ *ptr++=']';*ptr++=0;
+ return out;
+}
+
+/* Build an object from the text. */
+static const char *parse_object(cJSON *item,const char *value)
+{
+ cJSON *child;
+ if (*value!='{') {ep=value;return 0;} /* not an object! */
+
+ item->type=cJSON_Object;
+ value=skip(value+1);
+ if (*value=='}') return value+1; /* empty array. */
+
+ item->child=child=cJSON_New_Item();
+ if (!item->child) return 0;
+ value=skip(parse_string(child,skip(value)));
+ if (!value) return 0;
+ child->string=child->valuestring;child->valuestring=0;
+ if (*value!=':') {ep=value;return 0;} /* fail! */
+ value=skip(parse_value(child,skip(value+1))); /* skip any spacing, get the value. */
+ if (!value) return 0;
+
+ while (*value==',')
+ {
+ cJSON *new_item;
+ if (!(new_item=cJSON_New_Item())) return 0; /* memory fail */
+ child->next=new_item;new_item->prev=child;child=new_item;
+ value=skip(parse_string(child,skip(value+1)));
+ if (!value) return 0;
+ child->string=child->valuestring;child->valuestring=0;
+ if (*value!=':') {ep=value;return 0;} /* fail! */
+ value=skip(parse_value(child,skip(value+1))); /* skip any spacing, get the value. */
+ if (!value) return 0;
+ }
+
+ if (*value=='}') return value+1; /* end of array */
+ ep=value;return 0; /* malformed. */
+}
+
+/* Render an object to text. */
+static char *print_object(cJSON *item,int depth,int fmt)
+{
+ char **entries=0,**names=0;
+ char *out=0,*ptr,*ret,*str;int len=7,i=0,j;
+ cJSON *child=item->child;
+ int numentries=0,fail=0;
+ /* Count the number of entries. */
+ while (child) numentries++,child=child->next;
+ /* Allocate space for the names and the objects */
+ entries=(char**)cJSON_malloc(numentries*sizeof(char*));
+ if (!entries) return 0;
+ names=(char**)cJSON_malloc(numentries*sizeof(char*));
+ if (!names) {cJSON_free(entries);return 0;}
+ memset(entries,0,sizeof(char*)*numentries);
+ memset(names,0,sizeof(char*)*numentries);
+
+ /* Collect all the results into our arrays: */
+ child=item->child;depth++;if (fmt) len+=depth;
+ while (child)
+ {
+ names[i]=str=print_string_ptr(child->string);
+ entries[i++]=ret=print_value(child,depth,fmt);
+ if (str && ret) len+=strlen(ret)+strlen(str)+2+(fmt?2+depth:0); else fail=1;
+ child=child->next;
+ }
+
+ /* Try to allocate the output string */
+ if (!fail) out=(char*)cJSON_malloc(len);
+ if (!out) fail=1;
+
+ /* Handle failure */
+ if (fail)
+ {
+ for (i=0;i<numentries;i++) {if (names[i]) cJSON_free(names[i]);if (entries[i]) cJSON_free(entries[i]);}
+ cJSON_free(names);cJSON_free(entries);
+ return 0;
+ }
+
+ /* Compose the output: */
+ *out='{';ptr=out+1;if (fmt)*ptr++='\n';*ptr=0;
+ for (i=0;i<numentries;i++)
+ {
+ if (fmt) for (j=0;j<depth;j++) *ptr++='\t';
+ strcpy(ptr,names[i]);ptr+=strlen(names[i]);
+ *ptr++=':';if (fmt) *ptr++='\t';
+ strcpy(ptr,entries[i]);ptr+=strlen(entries[i]);
+ if (i!=numentries-1) *ptr++=',';
+ if (fmt) *ptr++='\n';*ptr=0;
+ cJSON_free(names[i]);cJSON_free(entries[i]);
+ }
+
+ cJSON_free(names);cJSON_free(entries);
+ if (fmt) for (i=0;i<depth-1;i++) *ptr++='\t';
+ *ptr++='}';*ptr++=0;
+ return out;
+}
+
+/* Get Array size/item / object item. */
+int cJSON_GetArraySize(cJSON *array) {cJSON *c=array->child;int i=0;while(c)i++,c=c->next;return i;}
+cJSON *cJSON_GetArrayItem(cJSON *array,int item) {cJSON *c=array->child; while (c && item>0) item--,c=c->next; return c;}
+cJSON *cJSON_GetObjectItem(cJSON *object,const char *string) {cJSON *c=object->child; while (c && cJSON_strcasecmp(c->string,string)) c=c->next; return c;}
+
+/* Utility for array list handling. */
+static void suffix_object(cJSON *prev,cJSON *item) {prev->next=item;item->prev=prev;}
+/* Utility for handling references. */
+static cJSON *create_reference(cJSON *item) {cJSON *ref=cJSON_New_Item();if (!ref) return 0;memcpy(ref,item,sizeof(cJSON));ref->string=0;ref->type|=cJSON_IsReference;ref->next=ref->prev=0;return ref;}
+
+/* Add item to array/object. */
+void cJSON_AddItemToArray(cJSON *array, cJSON *item) {cJSON *c=array->child;if (!item) return; if (!c) {array->child=item;} else {while (c && c->next) c=c->next; suffix_object(c,item);}}
+void cJSON_AddItemToObject(cJSON *object,const char *string,cJSON *item) {if (!item) return; if (item->string) cJSON_free(item->string);item->string=cJSON_strdup(string);cJSON_AddItemToArray(object,item);}
+void cJSON_AddItemReferenceToArray(cJSON *array, cJSON *item) {cJSON_AddItemToArray(array,create_reference(item));}
+void cJSON_AddItemReferenceToObject(cJSON *object,const char *string,cJSON *item) {cJSON_AddItemToObject(object,string,create_reference(item));}
+
+cJSON *cJSON_DetachItemFromArray(cJSON *array,int which) {cJSON *c=array->child;while (c && which>0) c=c->next,which--;if (!c) return 0;
+ if (c->prev) c->prev->next=c->next;if (c->next) c->next->prev=c->prev;if (c==array->child) array->child=c->next;c->prev=c->next=0;return c;}
+void cJSON_DeleteItemFromArray(cJSON *array,int which) {cJSON_Delete(cJSON_DetachItemFromArray(array,which));}
+cJSON *cJSON_DetachItemFromObject(cJSON *object,const char *string) {int i=0;cJSON *c=object->child;while (c && cJSON_strcasecmp(c->string,string)) i++,c=c->next;if (c) return cJSON_DetachItemFromArray(object,i);return 0;}
+void cJSON_DeleteItemFromObject(cJSON *object,const char *string) {cJSON_Delete(cJSON_DetachItemFromObject(object,string));}
+
+/* Replace array/object items with new ones. */
+void cJSON_ReplaceItemInArray(cJSON *array,int which,cJSON *newitem) {cJSON *c=array->child;while (c && which>0) c=c->next,which--;if (!c) return;
+ newitem->next=c->next;newitem->prev=c->prev;if (newitem->next) newitem->next->prev=newitem;
+ if (c==array->child) array->child=newitem; else newitem->prev->next=newitem;c->next=c->prev=0;cJSON_Delete(c);}
+void cJSON_ReplaceItemInObject(cJSON *object,const char *string,cJSON *newitem){int i=0;cJSON *c=object->child;while(c && cJSON_strcasecmp(c->string,string))i++,c=c->next;if(c){newitem->string=cJSON_strdup(string);cJSON_ReplaceItemInArray(object,i,newitem);}}
+
+/* Create basic types: */
+cJSON *cJSON_CreateNull() {cJSON *item=cJSON_New_Item();if(item)item->type=cJSON_NULL;return item;}
+cJSON *cJSON_CreateTrue() {cJSON *item=cJSON_New_Item();if(item)item->type=cJSON_True;return item;}
+cJSON *cJSON_CreateFalse() {cJSON *item=cJSON_New_Item();if(item)item->type=cJSON_False;return item;}
+cJSON *cJSON_CreateBool(int b) {cJSON *item=cJSON_New_Item();if(item)item->type=b?cJSON_True:cJSON_False;return item;}
+cJSON *cJSON_CreateNumber(double num) {cJSON *item=cJSON_New_Item();if(item){item->type=cJSON_Number;item->valuedouble=num;item->valueint=(int)num;}return item;}
+cJSON *cJSON_CreateString(const char *string) {cJSON *item=cJSON_New_Item();if(item){item->type=cJSON_String;item->valuestring=cJSON_strdup(string);}return item;}
+cJSON *cJSON_CreateArray() {cJSON *item=cJSON_New_Item();if(item)item->type=cJSON_Array;return item;}
+cJSON *cJSON_CreateObject() {cJSON *item=cJSON_New_Item();if(item)item->type=cJSON_Object;return item;}
+
+/* Create Arrays: */
+cJSON *cJSON_CreateIntArray(int *numbers,int count) {int i;cJSON *n=0,*p=0,*a=cJSON_CreateArray();for(i=0;a && i<count;i++){n=cJSON_CreateNumber(numbers[i]);if(!i)a->child=n;else suffix_object(p,n);p=n;}return a;}
+cJSON *cJSON_CreateFloatArray(float *numbers,int count) {int i;cJSON *n=0,*p=0,*a=cJSON_CreateArray();for(i=0;a && i<count;i++){n=cJSON_CreateNumber(numbers[i]);if(!i)a->child=n;else suffix_object(p,n);p=n;}return a;}
+cJSON *cJSON_CreateDoubleArray(double *numbers,int count) {int i;cJSON *n=0,*p=0,*a=cJSON_CreateArray();for(i=0;a && i<count;i++){n=cJSON_CreateNumber(numbers[i]);if(!i)a->child=n;else suffix_object(p,n);p=n;}return a;}
+cJSON *cJSON_CreateStringArray(const char **strings,int count) {int i;cJSON *n=0,*p=0,*a=cJSON_CreateArray();for(i=0;a && i<count;i++){n=cJSON_CreateString(strings[i]);if(!i)a->child=n;else suffix_object(p,n);p=n;}return a;}
diff --git a/plugins/omelasticsearch/cJSON/cjson.h b/plugins/omelasticsearch/cJSON/cjson.h
new file mode 100644
index 00000000..a621720c
--- /dev/null
+++ b/plugins/omelasticsearch/cJSON/cjson.h
@@ -0,0 +1,130 @@
+/*
+ Copyright (c) 2009 Dave Gamble
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
+*/
+
+#ifndef cJSON__h
+#define cJSON__h
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+/* cJSON Types: */
+#define cJSON_False 0
+#define cJSON_True 1
+#define cJSON_NULL 2
+#define cJSON_Number 3
+#define cJSON_String 4
+#define cJSON_Array 5
+#define cJSON_Object 6
+
+#define cJSON_IsReference 256
+
+/* The cJSON structure: */
+typedef struct cJSON {
+ struct cJSON *next,*prev; /* next/prev allow you to walk array/object chains. Alternatively, use GetArraySize/GetArrayItem/GetObjectItem */
+ struct cJSON *child; /* An array or object item will have a child pointer pointing to a chain of the items in the array/object. */
+
+ int type; /* The type of the item, as above. */
+
+ char *valuestring; /* The item's string, if type==cJSON_String */
+ int valueint; /* The item's number, if type==cJSON_Number */
+ double valuedouble; /* The item's number, if type==cJSON_Number */
+
+ char *string; /* The item's name string, if this item is the child of, or is in the list of subitems of an object. */
+} cJSON;
+
+typedef struct cJSON_Hooks {
+ void *(*malloc_fn)(size_t sz);
+ void (*free_fn)(void *ptr);
+} cJSON_Hooks;
+
+/* Supply malloc, realloc and free functions to cJSON */
+extern void cJSON_InitHooks(cJSON_Hooks* hooks);
+
+
+/* Supply a block of JSON, and this returns a cJSON object you can interrogate. Call cJSON_Delete when finished. */
+extern cJSON *cJSON_Parse(const char *value);
+/* Render a cJSON entity to text for transfer/storage. Free the char* when finished. */
+extern char *cJSON_Print(cJSON *item);
+/* Render a cJSON entity to text for transfer/storage without any formatting. Free the char* when finished. */
+extern char *cJSON_PrintUnformatted(cJSON *item);
+/* Delete a cJSON entity and all subentities. */
+extern void cJSON_Delete(cJSON *c);
+
+/* Returns the number of items in an array (or object). */
+extern int cJSON_GetArraySize(cJSON *array);
+/* Retrieve item number "item" from array "array". Returns NULL if unsuccessful. */
+extern cJSON *cJSON_GetArrayItem(cJSON *array,int item);
+/* Get item "string" from object. Case insensitive. */
+extern cJSON *cJSON_GetObjectItem(cJSON *object,const char *string);
+
+/* For analysing failed parses. This returns a pointer to the parse error. You'll probably need to look a few chars back to make sense of it. Defined when cJSON_Parse() returns 0. 0 when cJSON_Parse() succeeds. */
+extern const char *cJSON_GetErrorPtr();
+
+/* These calls create a cJSON item of the appropriate type. */
+extern cJSON *cJSON_CreateNull();
+extern cJSON *cJSON_CreateTrue();
+extern cJSON *cJSON_CreateFalse();
+extern cJSON *cJSON_CreateBool(int b);
+extern cJSON *cJSON_CreateNumber(double num);
+extern cJSON *cJSON_CreateString(const char *string);
+extern cJSON *cJSON_CreateArray();
+extern cJSON *cJSON_CreateObject();
+
+/* These utilities create an Array of count items. */
+extern cJSON *cJSON_CreateIntArray(int *numbers,int count);
+extern cJSON *cJSON_CreateFloatArray(float *numbers,int count);
+extern cJSON *cJSON_CreateDoubleArray(double *numbers,int count);
+extern cJSON *cJSON_CreateStringArray(const char **strings,int count);
+
+/* Append item to the specified array/object. */
+extern void cJSON_AddItemToArray(cJSON *array, cJSON *item);
+extern void cJSON_AddItemToObject(cJSON *object,const char *string,cJSON *item);
+/* Append reference to item to the specified array/object. Use this when you want to add an existing cJSON to a new cJSON, but don't want to corrupt your existing cJSON. */
+extern void cJSON_AddItemReferenceToArray(cJSON *array, cJSON *item);
+extern void cJSON_AddItemReferenceToObject(cJSON *object,const char *string,cJSON *item);
+
+/* Remove/Detatch items from Arrays/Objects. */
+extern cJSON *cJSON_DetachItemFromArray(cJSON *array,int which);
+extern void cJSON_DeleteItemFromArray(cJSON *array,int which);
+extern cJSON *cJSON_DetachItemFromObject(cJSON *object,const char *string);
+extern void cJSON_DeleteItemFromObject(cJSON *object,const char *string);
+
+/* Update array items. */
+extern void cJSON_ReplaceItemInArray(cJSON *array,int which,cJSON *newitem);
+extern void cJSON_ReplaceItemInObject(cJSON *object,const char *string,cJSON *newitem);
+
+/* rger: added helpers */
+
+char *cJSON_print_number(cJSON *item);
+#define cJSON_AddNullToObject(object,name) cJSON_AddItemToObject(object, name, cJSON_CreateNull())
+#define cJSON_AddTrueToObject(object,name) cJSON_AddItemToObject(object, name, cJSON_CreateTrue())
+#define cJSON_AddFalseToObject(object,name) cJSON_AddItemToObject(object, name, cJSON_CreateFalse())
+#define cJSON_AddNumberToObject(object,name,n) cJSON_AddItemToObject(object, name, cJSON_CreateNumber(n))
+#define cJSON_AddStringToObject(object,name,s) cJSON_AddItemToObject(object, name, cJSON_CreateString(s))
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/plugins/omelasticsearch/cJSON/test.c b/plugins/omelasticsearch/cJSON/test.c
new file mode 100644
index 00000000..2cab632a
--- /dev/null
+++ b/plugins/omelasticsearch/cJSON/test.c
@@ -0,0 +1,156 @@
+/*
+ Copyright (c) 2009 Dave Gamble
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
+*/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include "cJSON.h"
+
+/* Parse text to JSON, then render back to text, and print! */
+void doit(char *text)
+{
+ char *out;cJSON *json;
+
+ json=cJSON_Parse(text);
+ if (!json) {printf("Error before: [%s]\n",cJSON_GetErrorPtr());}
+ else
+ {
+ out=cJSON_Print(json);
+ cJSON_Delete(json);
+ printf("%s\n",out);
+ free(out);
+ }
+}
+
+/* Read a file, parse, render back, etc. */
+void dofile(char *filename)
+{
+ FILE *f=fopen(filename,"rb");fseek(f,0,SEEK_END);long len=ftell(f);fseek(f,0,SEEK_SET);
+ char *data=malloc(len+1);fread(data,1,len,f);fclose(f);
+ doit(data);
+ free(data);
+}
+
+/* Used by some code below as an example datatype. */
+struct record {const char *precision;double lat,lon;const char *address,*city,*state,*zip,*country; };
+
+/* Create a bunch of objects as demonstration. */
+void create_objects()
+{
+ cJSON *root,*fmt,*img,*thm,*fld;char *out;int i; /* declare a few. */
+
+ /* Here we construct some JSON standards, from the JSON site. */
+
+ /* Our "Video" datatype: */
+ root=cJSON_CreateObject();
+ cJSON_AddItemToObject(root, "name", cJSON_CreateString("Jack (\"Bee\") Nimble"));
+ cJSON_AddItemToObject(root, "format", fmt=cJSON_CreateObject());
+ cJSON_AddStringToObject(fmt,"type", "rect");
+ cJSON_AddNumberToObject(fmt,"width", 1920);
+ cJSON_AddNumberToObject(fmt,"height", 1080);
+ cJSON_AddFalseToObject (fmt,"interlace");
+ cJSON_AddNumberToObject(fmt,"frame rate", 24);
+
+ out=cJSON_Print(root); cJSON_Delete(root); printf("%s\n",out); free(out); /* Print to text, Delete the cJSON, print it, release the string.
+
+ /* Our "days of the week" array: */
+ const char *strings[7]={"Sunday","Monday","Tuesday","Wednesday","Thursday","Friday","Saturday"};
+ root=cJSON_CreateStringArray(strings,7);
+
+ out=cJSON_Print(root); cJSON_Delete(root); printf("%s\n",out); free(out);
+
+ /* Our matrix: */
+ int numbers[3][3]={{0,-1,0},{1,0,0},{0,0,1}};
+ root=cJSON_CreateArray();
+ for (i=0;i<3;i++) cJSON_AddItemToArray(root,cJSON_CreateIntArray(numbers[i],3));
+
+/* cJSON_ReplaceItemInArray(root,1,cJSON_CreateString("Replacement")); */
+
+ out=cJSON_Print(root); cJSON_Delete(root); printf("%s\n",out); free(out);
+
+
+ /* Our "gallery" item: */
+ int ids[4]={116,943,234,38793};
+ root=cJSON_CreateObject();
+ cJSON_AddItemToObject(root, "Image", img=cJSON_CreateObject());
+ cJSON_AddNumberToObject(img,"Width",800);
+ cJSON_AddNumberToObject(img,"Height",600);
+ cJSON_AddStringToObject(img,"Title","View from 15th Floor");
+ cJSON_AddItemToObject(img, "Thumbnail", thm=cJSON_CreateObject());
+ cJSON_AddStringToObject(thm, "Url", "http:/*www.example.com/image/481989943");
+ cJSON_AddNumberToObject(thm,"Height",125);
+ cJSON_AddStringToObject(thm,"Width","100");
+ cJSON_AddItemToObject(img,"IDs", cJSON_CreateIntArray(ids,4));
+
+ out=cJSON_Print(root); cJSON_Delete(root); printf("%s\n",out); free(out);
+
+ /* Our array of "records": */
+ struct record fields[2]={
+ {"zip",37.7668,-1.223959e+2,"","SAN FRANCISCO","CA","94107","US"},
+ {"zip",37.371991,-1.22026e+2,"","SUNNYVALE","CA","94085","US"}};
+
+ root=cJSON_CreateArray();
+ for (i=0;i<2;i++)
+ {
+ cJSON_AddItemToArray(root,fld=cJSON_CreateObject());
+ cJSON_AddStringToObject(fld, "precision", fields[i].precision);
+ cJSON_AddNumberToObject(fld, "Latitude", fields[i].lat);
+ cJSON_AddNumberToObject(fld, "Longitude", fields[i].lon);
+ cJSON_AddStringToObject(fld, "Address", fields[i].address);
+ cJSON_AddStringToObject(fld, "City", fields[i].city);
+ cJSON_AddStringToObject(fld, "State", fields[i].state);
+ cJSON_AddStringToObject(fld, "Zip", fields[i].zip);
+ cJSON_AddStringToObject(fld, "Country", fields[i].country);
+ }
+
+/* cJSON_ReplaceItemInObject(cJSON_GetArrayItem(root,1),"City",cJSON_CreateIntArray(ids,4)); */
+
+ out=cJSON_Print(root); cJSON_Delete(root); printf("%s\n",out); free(out);
+
+}
+
+int main (int argc, const char * argv[]) {
+ /* a bunch of json: */
+ char text1[]="{\n\"name\": \"Jack (\\\"Bee\\\") Nimble\", \n\"format\": {\"type\": \"rect\", \n\"width\": 1920, \n\"height\": 1080, \n\"interlace\": false,\"frame rate\": 24\n}\n}";
+ char text2[]="[\"Sunday\", \"Monday\", \"Tuesday\", \"Wednesday\", \"Thursday\", \"Friday\", \"Saturday\"]";
+ char text3[]="[\n [0, -1, 0],\n [1, 0, 0],\n [0, 0, 1]\n ]\n";
+ char text4[]="{\n \"Image\": {\n \"Width\": 800,\n \"Height\": 600,\n \"Title\": \"View from 15th Floor\",\n \"Thumbnail\": {\n \"Url\": \"http:/*www.example.com/image/481989943\",\n \"Height\": 125,\n \"Width\": \"100\"\n },\n \"IDs\": [116, 943, 234, 38793]\n }\n }";
+ char text5[]="[\n {\n \"precision\": \"zip\",\n \"Latitude\": 37.7668,\n \"Longitude\": -122.3959,\n \"Address\": \"\",\n \"City\": \"SAN FRANCISCO\",\n \"State\": \"CA\",\n \"Zip\": \"94107\",\n \"Country\": \"US\"\n },\n {\n \"precision\": \"zip\",\n \"Latitude\": 37.371991,\n \"Longitude\": -122.026020,\n \"Address\": \"\",\n \"City\": \"SUNNYVALE\",\n \"State\": \"CA\",\n \"Zip\": \"94085\",\n \"Country\": \"US\"\n }\n ]";
+
+ /* Process each json textblock by parsing, then rebuilding: */
+ doit(text1);
+ doit(text2);
+ doit(text3);
+ doit(text4);
+ doit(text5);
+
+ /* Parse standard testfiles:
+/* dofile("../../tests/test1"); */
+/* dofile("../../tests/test2"); */
+/* dofile("../../tests/test3"); */
+/* dofile("../../tests/test4"); */
+/* dofile("../../tests/test5"); */
+
+ /* Now some samplecode for building objects concisely: */
+ create_objects();
+
+ return 0;
+}
diff --git a/plugins/omelasticsearch/cJSON/tests/test1 b/plugins/omelasticsearch/cJSON/tests/test1
new file mode 100644
index 00000000..eacfbf5e
--- /dev/null
+++ b/plugins/omelasticsearch/cJSON/tests/test1
@@ -0,0 +1,22 @@
+{
+ "glossary": {
+ "title": "example glossary",
+ "GlossDiv": {
+ "title": "S",
+ "GlossList": {
+ "GlossEntry": {
+ "ID": "SGML",
+ "SortAs": "SGML",
+ "GlossTerm": "Standard Generalized Markup Language",
+ "Acronym": "SGML",
+ "Abbrev": "ISO 8879:1986",
+ "GlossDef": {
+ "para": "A meta-markup language, used to create markup languages such as DocBook.",
+ "GlossSeeAlso": ["GML", "XML"]
+ },
+ "GlossSee": "markup"
+ }
+ }
+ }
+ }
+}
diff --git a/plugins/omelasticsearch/cJSON/tests/test2 b/plugins/omelasticsearch/cJSON/tests/test2
new file mode 100644
index 00000000..5600991a
--- /dev/null
+++ b/plugins/omelasticsearch/cJSON/tests/test2
@@ -0,0 +1,11 @@
+{"menu": {
+ "id": "file",
+ "value": "File",
+ "popup": {
+ "menuitem": [
+ {"value": "New", "onclick": "CreateNewDoc()"},
+ {"value": "Open", "onclick": "OpenDoc()"},
+ {"value": "Close", "onclick": "CloseDoc()"}
+ ]
+ }
+}}
diff --git a/plugins/omelasticsearch/cJSON/tests/test3 b/plugins/omelasticsearch/cJSON/tests/test3
new file mode 100644
index 00000000..5662b377
--- /dev/null
+++ b/plugins/omelasticsearch/cJSON/tests/test3
@@ -0,0 +1,26 @@
+{"widget": {
+ "debug": "on",
+ "window": {
+ "title": "Sample Konfabulator Widget",
+ "name": "main_window",
+ "width": 500,
+ "height": 500
+ },
+ "image": {
+ "src": "Images/Sun.png",
+ "name": "sun1",
+ "hOffset": 250,
+ "vOffset": 250,
+ "alignment": "center"
+ },
+ "text": {
+ "data": "Click Here",
+ "size": 36,
+ "style": "bold",
+ "name": "text1",
+ "hOffset": 250,
+ "vOffset": 100,
+ "alignment": "center",
+ "onMouseUp": "sun1.opacity = (sun1.opacity / 100) * 90;"
+ }
+}} \ No newline at end of file
diff --git a/plugins/omelasticsearch/cJSON/tests/test4 b/plugins/omelasticsearch/cJSON/tests/test4
new file mode 100644
index 00000000..d540b57f
--- /dev/null
+++ b/plugins/omelasticsearch/cJSON/tests/test4
@@ -0,0 +1,88 @@
+{"web-app": {
+ "servlet": [
+ {
+ "servlet-name": "cofaxCDS",
+ "servlet-class": "org.cofax.cds.CDSServlet",
+ "init-param": {
+ "configGlossary:installationAt": "Philadelphia, PA",
+ "configGlossary:adminEmail": "ksm@pobox.com",
+ "configGlossary:poweredBy": "Cofax",
+ "configGlossary:poweredByIcon": "/images/cofax.gif",
+ "configGlossary:staticPath": "/content/static",
+ "templateProcessorClass": "org.cofax.WysiwygTemplate",
+ "templateLoaderClass": "org.cofax.FilesTemplateLoader",
+ "templatePath": "templates",
+ "templateOverridePath": "",
+ "defaultListTemplate": "listTemplate.htm",
+ "defaultFileTemplate": "articleTemplate.htm",
+ "useJSP": false,
+ "jspListTemplate": "listTemplate.jsp",
+ "jspFileTemplate": "articleTemplate.jsp",
+ "cachePackageTagsTrack": 200,
+ "cachePackageTagsStore": 200,
+ "cachePackageTagsRefresh": 60,
+ "cacheTemplatesTrack": 100,
+ "cacheTemplatesStore": 50,
+ "cacheTemplatesRefresh": 15,
+ "cachePagesTrack": 200,
+ "cachePagesStore": 100,
+ "cachePagesRefresh": 10,
+ "cachePagesDirtyRead": 10,
+ "searchEngineListTemplate": "forSearchEnginesList.htm",
+ "searchEngineFileTemplate": "forSearchEngines.htm",
+ "searchEngineRobotsDb": "WEB-INF/robots.db",
+ "useDataStore": true,
+ "dataStoreClass": "org.cofax.SqlDataStore",
+ "redirectionClass": "org.cofax.SqlRedirection",
+ "dataStoreName": "cofax",
+ "dataStoreDriver": "com.microsoft.jdbc.sqlserver.SQLServerDriver",
+ "dataStoreUrl": "jdbc:microsoft:sqlserver://LOCALHOST:1433;DatabaseName=goon",
+ "dataStoreUser": "sa",
+ "dataStorePassword": "dataStoreTestQuery",
+ "dataStoreTestQuery": "SET NOCOUNT ON;select test='test';",
+ "dataStoreLogFile": "/usr/local/tomcat/logs/datastore.log",
+ "dataStoreInitConns": 10,
+ "dataStoreMaxConns": 100,
+ "dataStoreConnUsageLimit": 100,
+ "dataStoreLogLevel": "debug",
+ "maxUrlLength": 500}},
+ {
+ "servlet-name": "cofaxEmail",
+ "servlet-class": "org.cofax.cds.EmailServlet",
+ "init-param": {
+ "mailHost": "mail1",
+ "mailHostOverride": "mail2"}},
+ {
+ "servlet-name": "cofaxAdmin",
+ "servlet-class": "org.cofax.cds.AdminServlet"},
+
+ {
+ "servlet-name": "fileServlet",
+ "servlet-class": "org.cofax.cds.FileServlet"},
+ {
+ "servlet-name": "cofaxTools",
+ "servlet-class": "org.cofax.cms.CofaxToolsServlet",
+ "init-param": {
+ "templatePath": "toolstemplates/",
+ "log": 1,
+ "logLocation": "/usr/local/tomcat/logs/CofaxTools.log",
+ "logMaxSize": "",
+ "dataLog": 1,
+ "dataLogLocation": "/usr/local/tomcat/logs/dataLog.log",
+ "dataLogMaxSize": "",
+ "removePageCache": "/content/admin/remove?cache=pages&id=",
+ "removeTemplateCache": "/content/admin/remove?cache=templates&id=",
+ "fileTransferFolder": "/usr/local/tomcat/webapps/content/fileTransferFolder",
+ "lookInContext": 1,
+ "adminGroupID": 4,
+ "betaServer": true}}],
+ "servlet-mapping": {
+ "cofaxCDS": "/",
+ "cofaxEmail": "/cofaxutil/aemail/*",
+ "cofaxAdmin": "/admin/*",
+ "fileServlet": "/static/*",
+ "cofaxTools": "/tools/*"},
+
+ "taglib": {
+ "taglib-uri": "cofax.tld",
+ "taglib-location": "/WEB-INF/tlds/cofax.tld"}}} \ No newline at end of file
diff --git a/plugins/omelasticsearch/cJSON/tests/test5 b/plugins/omelasticsearch/cJSON/tests/test5
new file mode 100644
index 00000000..49980ca2
--- /dev/null
+++ b/plugins/omelasticsearch/cJSON/tests/test5
@@ -0,0 +1,27 @@
+{"menu": {
+ "header": "SVG Viewer",
+ "items": [
+ {"id": "Open"},
+ {"id": "OpenNew", "label": "Open New"},
+ null,
+ {"id": "ZoomIn", "label": "Zoom In"},
+ {"id": "ZoomOut", "label": "Zoom Out"},
+ {"id": "OriginalView", "label": "Original View"},
+ null,
+ {"id": "Quality"},
+ {"id": "Pause"},
+ {"id": "Mute"},
+ null,
+ {"id": "Find", "label": "Find..."},
+ {"id": "FindAgain", "label": "Find Again"},
+ {"id": "Copy"},
+ {"id": "CopyAgain", "label": "Copy Again"},
+ {"id": "CopySVG", "label": "Copy SVG"},
+ {"id": "ViewSVG", "label": "View SVG"},
+ {"id": "ViewSource", "label": "View Source"},
+ {"id": "SaveAs", "label": "Save As"},
+ null,
+ {"id": "Help"},
+ {"id": "About", "label": "About Adobe CVG Viewer..."}
+ ]
+}}
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index 00e4dbac..f27fe62b 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -34,6 +34,10 @@
#include <signal.h>
#include <errno.h>
#include <time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include "cJSON/cjson.h"
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
@@ -64,8 +68,10 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
*/
typedef struct curl_slist HEADER;
typedef struct _instanceData {
- uchar *server;
int port;
+ int replyLen;
+ int fdErrFile; /* error file fd or -1 if not open */
+ uchar *server;
uchar *uid;
uchar *pwd;
uchar *searchIndex;
@@ -73,6 +79,9 @@ typedef struct _instanceData {
uchar *parent;
uchar *tplName;
uchar *timeout;
+ uchar *restURL; /* last used URL for error reporting */
+ uchar *errorFile;
+ char *reply;
sbool dynSrchIdx;
sbool dynSrchType;
sbool dynParent;
@@ -104,6 +113,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "bulkmode", eCmdHdlrBinary, 0 },
{ "asyncrepl", eCmdHdlrBinary, 0 },
{ "timeout", eCmdHdlrGetWord, 0 },
+ { "errorfile", eCmdHdlrGetWord, 0 },
{ "template", eCmdHdlrGetWord, 1 }
};
static struct cnfparamblk actpblk =
@@ -114,6 +124,8 @@ static struct cnfparamblk actpblk =
BEGINcreateInstance
CODESTARTcreateInstance
+ pData->restURL = NULL;
+ pData->fdErrFile = -1;
ENDcreateInstance
BEGINisCompatibleWithFeature
@@ -132,6 +144,8 @@ CODESTARTfreeInstance
curl_easy_cleanup(pData->curlHandle);
pData->curlHandle = NULL;
}
+ if(pData->fdErrFile != -1)
+ close(pData->fdErrFile);
free(pData->server);
free(pData->uid);
free(pData->pwd);
@@ -139,6 +153,9 @@ CODESTARTfreeInstance
free(pData->searchType);
free(pData->parent);
free(pData->tplName);
+ free(pData->timeout);
+ free(pData->restURL);
+ free(pData->errorFile);
ENDfreeInstance
BEGINdbgPrintInstInfo
@@ -158,6 +175,8 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tdynamic parent=%d\n", pData->dynParent);
dbgprintf("\tasync replication=%d\n", pData->asyncRepl);
dbgprintf("\tbulkmode=%d\n", pData->bulkmode);
+ dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ?
+ (uchar*)"(not configured)" : pData->errorFile);
ENDdbgPrintInstInfo
@@ -202,12 +221,16 @@ checkConn(instanceData *pData)
curl_easy_setopt(curl, CURLOPT_URL, cstr);
free(cstr);
+ pData->reply = NULL;
+ pData->replyLen = 0;
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() "
"failed: %s\n", curl_easy_strerror(res));
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ free(pData->reply);
DBGPRINTF("omelasticsearch: checkConn() completed with success\n");
finalize_it:
@@ -271,7 +294,6 @@ static rsRetVal
setCurlURL(instanceData *pData, uchar **tpls)
{
char authBuf[1024];
- char *restURL;
uchar *searchIndex;
uchar *searchType;
uchar *parent;
@@ -305,11 +327,12 @@ setCurlURL(instanceData *pData, uchar **tpls)
if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1);
if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent));
}
- restURL = es_str2cstr(url, NULL);
- curl_easy_setopt(pData->curlHandle, CURLOPT_URL, restURL);
+
+ free(pData->restURL);
+ pData->restURL = (uchar*)es_str2cstr(url, NULL);
+ curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL);
es_deleteStr(url);
- DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", restURL);
- free(restURL);
+ DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL);
if(pData->uid != NULL) {
rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid,
@@ -347,9 +370,6 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
# define META_END "\"}}\n"
getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent);
-dbgprintf("AAA: searchIndex: '%s'\n", searchIndex);
-dbgprintf("AAA: searchType: '%s'\n", searchType);
-dbgprintf("AAA: parent: '%s'\n", parent);
r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
@@ -373,22 +393,168 @@ finalize_it:
RETiRet;
}
+
+/* write data error request/replies to separate error file
+ * Note: we open the file but never close it before exit. If it
+ * needs to be closed, HUP must be sent.
+ */
+static inline rsRetVal
+writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
+{
+ char *rendered = NULL;
+ cJSON *errRoot;
+ cJSON *req;
+ cJSON *replyRoot = *pReplyRoot;
+ size_t toWrite;
+ ssize_t wrRet;
+ char errStr[1024];
+ DEFiRet;
+
+ if(pData->errorFile == NULL) {
+ DBGPRINTF("omelasticsearch: no local error logger defined - "
+ "ignoring ES error information\n");
+ FINALIZE;
+ }
+
+ if(pData->fdErrFile == -1) {
+ pData->fdErrFile = open((char*)pData->errorFile,
+ O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC,
+ S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
+ if(pData->fdErrFile == -1) {
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ DBGPRINTF("omelasticsearch: error opening error file: %s\n", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+ if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
+ cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL));
+ cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg));
+
+ if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
+ cJSON_AddItemToObject(errRoot, "request", req);
+ cJSON_AddItemToObject(errRoot, "reply", replyRoot);
+ rendered = cJSON_Print(errRoot);
+ /* we do not do real error-handling on the err file, as this finally complicates
+ * things way to much.
+ */
+ DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered);
+ toWrite = strlen(rendered);
+ wrRet = write(pData->fdErrFile, rendered, toWrite);
+ if(wrRet != (ssize_t) toWrite) {
+ DBGPRINTF("omelasticsearch: error %d writing error file, write returns %lld\n",
+ errno, (long long) wrRet);
+ }
+ free(rendered);
+ cJSON_Delete(errRoot);
+ *pReplyRoot = NULL; /* tell caller not to delete once again! */
+
+finalize_it:
+ if(rendered != NULL)
+ free(rendered);
+ RETiRet;
+}
+
+
+static inline rsRetVal
+checkResultBulkmode(instanceData *pData, cJSON *root)
+{
+ int i;
+ int numitems;
+ cJSON *items;
+ cJSON *item;
+ cJSON *create;
+ cJSON *ok;
+ DEFiRet;
+
+ items = cJSON_GetObjectItem(root, "items");
+ if(items == NULL || items->type != cJSON_Array) {
+ DBGPRINTF("omelasticsearch: error in elasticsearch reply: "
+ "bulkmode insert does not return array, reply is: %s\n",
+ pData->reply);
+ ABORT_FINALIZE(RS_RET_DATAFAIL);
+ }
+ numitems = cJSON_GetArraySize(items);
+DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
+ for(i = 0 ; i < numitems ; ++i) {
+ item = cJSON_GetArrayItem(items, i);
+ if(item == NULL) {
+ DBGPRINTF("omelasticsearch: error in elasticsearch reply: "
+ "cannot obtain reply array item %d\n", i);
+ ABORT_FINALIZE(RS_RET_DATAFAIL);
+ }
+ create = cJSON_GetObjectItem(item, "create");
+ if(create == NULL || create->type != cJSON_Object) {
+ DBGPRINTF("omelasticsearch: error in elasticsearch reply: "
+ "cannot obtain 'create' item for #%d\n", i);
+ ABORT_FINALIZE(RS_RET_DATAFAIL);
+ }
+ ok = cJSON_GetObjectItem(create, "ok");
+ if(ok == NULL || ok->type != cJSON_True) {
+ DBGPRINTF("omelasticsearch: error in elasticsearch reply: "
+ "item %d, prop ok (%p) not ok\n", i, ok);
+ ABORT_FINALIZE(RS_RET_DATAFAIL);
+ }
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+static inline rsRetVal
+checkResult(instanceData *pData, uchar *reqmsg)
+{
+ cJSON *root;
+ cJSON *ok;
+ DEFiRet;
+
+ root = cJSON_Parse(pData->reply);
+ if(root == NULL) {
+ DBGPRINTF("omelasticsearch: could not parse JSON result \n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if(pData->bulkmode) {
+ iRet = checkResultBulkmode(pData, root);
+ } else {
+ ok = cJSON_GetObjectItem(root, "ok");
+ if(ok == NULL || ok->type != cJSON_True) {
+ iRet = RS_RET_DATAFAIL;
+ }
+ }
+
+ /* Note: we ignore errors writing the error file, as we cannot handle
+ * these in any case.
+ */
+ if(iRet == RS_RET_DATAFAIL) {
+ writeDataError(pData, &root, reqmsg);
+ iRet = RS_RET_OK; /* we have handled the problem! */
+ }
+
+finalize_it:
+ if(root != NULL)
+ cJSON_Delete(root);
+ RETiRet;
+}
+
+
static rsRetVal
-curlPost(instanceData *instance, uchar *message, int msglen, uchar **tpls)
+curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
{
CURLcode code;
- CURL *curl = instance->curlHandle;
+ CURL *curl = pData->curlHandle;
DEFiRet;
- if(instance->dynSrchIdx || instance->dynSrchType || instance->dynParent)
- CHKiRet(setCurlURL(instance, tpls));
+ pData->reply = NULL;
+ pData->replyLen = 0;
+
+ if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent)
+ CHKiRet(setCurlURL(pData, tpls));
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, (char *)message);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
-dbgprintf("omelasticsearch: do curl_easy_perform()\n");
code = curl_easy_perform(curl);
-DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) code);
switch (code) {
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_COULDNT_RESOLVE_PROXY:
@@ -398,12 +564,18 @@ DBGPRINTF("omelasticsearch: curl_easy_perform() returned %lld\n", (long long) co
DBGPRINTF("omelasticsearch: we are suspending ourselfs due "
"to failure %lld of curl_easy_perform()\n",
(long long) code);
- return RS_RET_SUSPENDED;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
default:
STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
- return RS_RET_OK;
+ break;
}
+
+ pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */
+ DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply);
+
+ CHKiRet(checkResult(pData, message));
finalize_it:
+ free(pData->reply);
RETiRet;
}
@@ -424,7 +596,6 @@ CODESTARTdoAction
if(pData->bulkmode) {
CHKiRet(buildBatch(pData, ppString[0], ppString));
} else {
-dbgprintf("omelasticsearch: doAction calling curlPost\n");
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
ppString));
}
@@ -449,35 +620,20 @@ ENDendTransaction
size_t
curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
{
- unsigned int i;
char *p = (char *)ptr;
- char *jsonData = (char *)userdata;
- static char ok[] = "{\"ok\":true,";
-
- ASSERT(size == 1);
-DBGPRINTF("omelasticsearch request: %s\n", jsonData);
-DBGPRINTF("omelasticsearch result: ");
-for (i = 0; i < nmemb; i++)
- DBGPRINTF("%c", p[i]);
-DBGPRINTF("\n");
-
- if (size == 1 &&
- nmemb > sizeof(ok)-1 &&
- strncmp(p, ok, sizeof(ok)-1) == 0) {
- STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
-dbgprintf("omelasticsearch ok\n");
- } else {
-dbgprintf("omelasticsearch fail\n");
- STATSCOUNTER_INC(indexFailed, mutIndexFailed);
- if (Debug) {
- DBGPRINTF("omelasticsearch (fail) request: %s\n", jsonData);
- DBGPRINTF("omelasticsearch (fail) result: ");
- for (i = 0; i < nmemb; i++)
- DBGPRINTF("%c", p[i]);
- DBGPRINTF("\n");
- }
+ instanceData *pData = (instanceData*) userdata;
+ char *buf;
+ size_t newlen;
+
+ newlen = pData->replyLen + size*nmemb;
+ if((buf = realloc(pData->reply, newlen + 1)) == NULL) {
+ DBGPRINTF("omelasticsearch: realloc failed in curlResult\n");
+ return 0; /* abort due to failure */
}
- return size * nmemb;
+ memcpy(buf+pData->replyLen, p, size*nmemb);
+ pData->replyLen = newlen;
+ pData->reply = buf;
+ return size*nmemb;
}
@@ -533,6 +689,7 @@ setInstParamDefaults(instanceData *pData)
pData->asyncRepl = 0;
pData->bulkmode = 0;
pData->tplName = NULL;
+ pData->errorFile = NULL;
}
BEGINnewActInst
@@ -552,6 +709,8 @@ CODESTARTnewActInst
continue;
if(!strcmp(actpblk.descr[i].name, "server")) {
pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "errorfile")) {
+ pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "serverport")) {
pData->port = (int) pvals[i].val.d.n, NULL;
} else if(!strcmp(actpblk.descr[i].name, "uid")) {
@@ -693,6 +852,14 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
+BEGINdoHUP
+CODESTARTdoHUP
+ if(pData->fdErrFile != -1) {
+ close(pData->fdErrFile);
+ pData->fdErrFile = -1;
+ }
+ENDdoHUP
+
BEGINmodExit
CODESTARTmodExit
@@ -707,6 +874,7 @@ CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_doHUP
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
ENDqueryEtryPt
diff --git a/plugins/omjournal/Makefile.am b/plugins/omjournal/Makefile.am
new file mode 100644
index 00000000..4cfbbd96
--- /dev/null
+++ b/plugins/omjournal/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = omjournal.la
+
+omjournal_la_SOURCES = omjournal.c
+omjournal_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(LIBSYSTEMD_JOURNAL_CFLAGS)
+omjournal_la_LDFLAGS = -module -avoid-version
+omjournal_la_LIBADD = $(LIBSYSTEMD_JOURNAL_LIBS)
+
+EXTRA_DIST =
diff --git a/plugins/omjournal/omjournal.c b/plugins/omjournal/omjournal.c
new file mode 100644
index 00000000..c340287f
--- /dev/null
+++ b/plugins/omjournal/omjournal.c
@@ -0,0 +1,185 @@
+/* omjournal.c
+ * send messages to the Linux Journal. This is meant to be used
+ * in cases where journal serves as the whole system log database.
+ * Note that we may get into a loop if journald re-injects messages
+ * into the syslog stream and we read that via imuxsock. Thus there
+ * is an option in imuxsock to ignore messages from ourselves
+ * (actually from our pid). So there are some module-interdependencies.
+ *
+ * Copyright 2013 Adiscon GmbH.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <unistd.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include <systemd/sd-journal.h>
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("omjournal")
+
+
+DEFobjCurrIf(errmsg);
+DEF_OMOD_STATIC_DATA
+
+/* config variables */
+
+
+typedef struct _instanceData {
+} instanceData;
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+};
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ENDfreeInstance
+
+
+BEGINnewActInst
+CODESTARTnewActInst
+ /* Note: we currently do not have any parameters, so we do not need
+ * the lst ptr. However, we will most probably need params in the
+ * future.
+ */
+ DBGPRINTF("newActInst (mmjournal)\n");
+ CODE_STD_STRING_REQUESTnewActInst(1)
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+ CHKiRet(createInstance(&pData));
+ /*setInstParamDefaults(pData);*/
+CODE_STD_FINALIZERnewActInst
+/* cnfparamvalsDestruct(pvals, &actpblk);*/
+ENDnewActInst
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+BEGINdoAction
+ msg_t *pMsg;
+ uchar *tag;
+ int lenTag;
+ int sev;
+ int r;
+CODESTARTdoAction
+ pMsg = (msg_t*) ppString[0];
+ MsgGetSeverity(pMsg, &sev);
+ getTAG(pMsg, &tag, &lenTag);
+ /* we can use more properties here, but let's see if there
+ * is some real user interest. We can always add later...
+ */
+ r = sd_journal_send("MESSAGE=%s", getMSG(pMsg),
+ "PRIORITY=%d", sev,
+ "SYSLOG_FACILITY=%d", pMsg->iFacility,
+ "SYSLOG_IDENTIFIER=%s", tag,
+ NULL);
+ /* FIXME: think about what to do with errors ;) */
+ENDdoAction
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(strncmp((char*) p, ":omjournal:", sizeof(":omjournal:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "omjournal supports only v6+ config format, use: "
+ "action(type=\"omjournal\" ...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ DBGPRINTF("omjournal: module compiled with rsyslog version %s.\n", VERSION);
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ENDmodInit
diff --git a/plugins/omlibdbi/omlibdbi.c b/plugins/omlibdbi/omlibdbi.c
index 99bcaf9d..6e27ad22 100644
--- a/plugins/omlibdbi/omlibdbi.c
+++ b/plugins/omlibdbi/omlibdbi.c
@@ -70,6 +70,7 @@ typedef struct _instanceData {
uchar *dbName; /* database to use */
unsigned uLastDBErrno; /* last errno returned by libdbi or 0 if all is well */
uchar *tplName; /* format template to use */
+ int txSupport; /* transaction support */
} instanceData;
typedef struct configSettings_s {
@@ -81,15 +82,36 @@ typedef struct configSettings_s {
uchar *dbName; /* database to use */
} configSettings_t;
static configSettings_t cs;
+uchar *pszFileDfltTplName; /* name of the default template to use */
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+ uchar *dbiDrvrDir; /* where do the dbi drivers reside? */
+ uchar *tplName; /* default template */
+};
+
+static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
+
/* tables for interfacing with the v6 config system */
+/* module-global parameters */
+static struct cnfparamdescr modpdescr[] = {
+ { "template", eCmdHdlrGetWord, 0 },
+ { "driverdirectory", eCmdHdlrGetWord, 0 }
+};
+static struct cnfparamblk modpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+ };
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "server", eCmdHdlrGetWord, 1 },
{ "db", eCmdHdlrGetWord, 1 },
{ "uid", eCmdHdlrGetWord, 1 },
{ "pwd", eCmdHdlrGetWord, 1 },
- { "driverdirectory", eCmdHdlrGetWord, 0 },
{ "driver", eCmdHdlrGetWord, 1 },
{ "template", eCmdHdlrGetWord, 0 }
};
@@ -99,6 +121,20 @@ static struct cnfparamblk actpblk =
actpdescr
};
+/* this function gets the default template. It coordinates action between
+ * old-style and new-style configuration parts.
+ */
+static inline uchar*
+getDfltTpl(void)
+{
+ if(loadModConf != NULL && loadModConf->tplName != NULL)
+ return loadModConf->tplName;
+ else if(pszFileDfltTplName == NULL)
+ return (uchar*)" StdDBFmt";
+ else
+ return pszFileDfltTplName;
+}
+
BEGINinitConfVars /* (re)set config variables to default values */
CODESTARTinitConfVars
@@ -144,7 +180,6 @@ static void closeConn(instanceData *pData)
BEGINfreeInstance
CODESTARTfreeInstance
closeConn(pData);
- free(pData->dbiDrvrDir);
free(pData->drvrName);
free(pData->host);
free(pData->usrName);
@@ -227,7 +262,7 @@ static rsRetVal initConn(instanceData *pData, int bSilent)
# endif
if(pData->conn == NULL) {
errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize libdbi connection");
- iRet = RS_RET_SUSPENDED;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
} else { /* we could get the handle, now on with work... */
/* Connect to database */
dbi_conn_set_option(pData->conn, "host", (char*) pData->host);
@@ -238,8 +273,9 @@ static rsRetVal initConn(instanceData *pData, int bSilent)
if(dbi_conn_connect(pData->conn) < 0) {
reportDBError(pData, bSilent);
closeConn(pData); /* ignore any error we may get */
- iRet = RS_RET_SUSPENDED;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ pData->txSupport = dbi_conn_cap_get(pData->conn, "transaction_support");
}
finalize_it:
@@ -295,12 +331,119 @@ CODESTARTtryResume
}
ENDtryResume
+/* transaction support 2013-03 */
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+ if(pData->conn == NULL) {
+ CHKiRet(initConn(pData, 0));
+ }
+# if HAVE_DBI_TXSUPP
+ if (pData->txSupport == 1) {
+ if (dbi_conn_transaction_begin(pData->conn) != 0) {
+ dbgprintf("libdbi server error: begin transaction not successful\n");
+ iRet = RS_RET_SUSPENDED;
+ }
+ }
+# endif
+finalize_it:
+ENDbeginTransaction
+/* end transaction */
+
BEGINdoAction
CODESTARTdoAction
- dbgprintf("\n");
- iRet = writeDB(ppString[0], pData);
+ CHKiRet(writeDB(ppString[0], pData));
+# if HAVE_DBI_TXSUPP
+ if (pData->txSupport == 1) {
+ iRet = RS_RET_DEFER_COMMIT;
+ }
+# endif
+finalize_it:
ENDdoAction
+/* transaction support 2013-03 */
+BEGINendTransaction
+CODESTARTendTransaction
+# if HAVE_DBI_TXSUPP
+ if (dbi_conn_transaction_commit(pData->conn) != 0) {
+ dbgprintf("libdbi server error: transaction not committed\n");
+ iRet = RS_RET_SUSPENDED;
+ }
+# endif
+ENDendTransaction
+/* end transaction */
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ pModConf->tplName = NULL;
+ bLegacyCnfModGlobalsPermitted = 1;
+ENDbeginCnfLoad
+
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if(pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "omlibdbi: error processing "
+ "module config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("module (global) param blk for omlibdbi:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for(i = 0 ; i < modpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(modpblk.descr[i].name, "template")) {
+ loadModConf->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ if(pszFileDfltTplName != NULL) {
+ errmsg.LogError(0, RS_RET_DUP_PARAM, "omlibdbi: warning: default template "
+ "was already set via legacy directive - may lead to inconsistent "
+ "results.");
+ }
+ } else if(!strcmp(modpblk.descr[i].name, "driverdirectory")) {
+ loadModConf->dbiDrvrDir = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("omlibdbi: program error, non-handled "
+ "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
+ }
+ }
+ bLegacyCnfModGlobalsPermitted = 0;
+finalize_it:
+ if(pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ loadModConf = NULL; /* done loading */
+ /* free legacy config vars */
+ free(pszFileDfltTplName);
+ pszFileDfltTplName = NULL;
+ENDendCnfLoad
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ runModConf = pModConf;
+ENDactivateCnf
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ free(pModConf->tplName);
+ free(pModConf->dbiDrvrDir);
+ENDfreeCnf
+
+
+
static inline void
setInstParamDefaults(instanceData *pData)
@@ -311,6 +454,7 @@ setInstParamDefaults(instanceData *pData)
BEGINnewActInst
struct cnfparamvals *pvals;
+ uchar *tplToUse;
int i;
CODESTARTnewActInst
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
@@ -319,7 +463,6 @@ CODESTARTnewActInst
CHKiRet(createInstance(&pData));
setInstParamDefaults(pData);
-
CODE_STD_STRING_REQUESTnewActInst(1)
for(i = 0 ; i < actpblk.nParams ; ++i) {
if(!pvals[i].bUsed)
@@ -332,28 +475,19 @@ CODESTARTnewActInst
pData->usrName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "pwd")) {
pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
- } else if(!strcmp(actpblk.descr[i].name, "driverdirectory")) {
- pData->dbiDrvrDir = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "driver")) {
pData->drvrName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
- dbgprintf("ommysql: program error, non-handled "
+ dbgprintf("omlibdbi: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
}
}
- if(pData->tplName == NULL) {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*) strdup(" StdDBFmt"),
- OMSR_RQD_TPL_OPT_SQL));
- } else {
- CHKiRet(OMSRsetEntry(*ppOMSR, 0,
- (uchar*) strdup((char*) pData->tplName),
- OMSR_RQD_TPL_OPT_SQL));
- }
+ tplToUse = (pData->tplName == NULL) ? (uchar*)strdup((char*)getDfltTpl()) : pData->tplName;
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_RQD_TPL_OPT_SQL));
CODE_STD_FINALIZERnewActInst
-dbgprintf("XXXX: added param, iRet %d\n", iRet);
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
@@ -369,7 +503,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* ok, if we reach this point, we have something for us */
CHKiRet(createInstance(&pData));
-
/* no create the instance based on what we currently have */
if(cs.drvrName == NULL) {
errmsg.LogError(0, RS_RET_NO_DRIVERNAME, "omlibdbi: no db driver name given - action can not be created");
@@ -380,19 +513,17 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* NULL values are supported because drivers have different needs.
* They will err out on connect. -- rgerhards, 2008-02-15
*/
- if(cs.host != NULL)
- if((pData->host = (uchar*) strdup((char*)cs.host)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ if(cs.host != NULL)
+ CHKmalloc(pData->host = (uchar*) strdup((char*)cs.host));
if(cs.usrName != NULL)
- if((pData->usrName = (uchar*) strdup((char*)cs.usrName)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- if(cs.dbName != NULL)
- if((pData->dbName = (uchar*) strdup((char*)cs.dbName)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
- if(cs.pwd != NULL)
- if((pData->pwd = (uchar*) strdup((char*)cs.pwd)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ CHKmalloc(pData->usrName = (uchar*) strdup((char*)cs.usrName));
+ if(cs.dbName != NULL)
+ CHKmalloc(pData->dbName = (uchar*) strdup((char*)cs.dbName));
+ if(cs.pwd != NULL)
+ CHKmalloc(pData->pwd = (uchar*) strdup((char*)cs.pwd));
if(cs.dbiDrvrDir != NULL)
- if((pData->dbiDrvrDir = (uchar*) strdup((char*)cs.dbiDrvrDir)) == NULL) ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
-
- CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_RQD_TPL_OPT_SQL, (uchar*) " StdDBFmt"));
-
+ CHKmalloc(loadModConf->dbiDrvrDir = (uchar*) strdup((char*)cs.dbiDrvrDir));
+ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_RQD_TPL_OPT_SQL, getDfltTpl()));
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -413,7 +544,10 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
ENDqueryEtryPt
@@ -443,8 +577,12 @@ CODESTARTmodInit
INITLegCnfVars
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
+# ifndef HAVE_DBI_TXSUPP
+ DBGPRINTF("omlibdbi: no transaction support in libdbi\n");
+# warning libdbi too old - transactions are not enabled (use 0.9 or later)
+# endif
CHKiRet(objUse(errmsg, CORE_COMPONENT));
- CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbidriverdirectory", 0, eCmdHdlrGetWord, NULL, &cs.dbiDrvrDir, STD_LOADABLE_MODULE_ID));
+ CHKiRet(regCfSysLineHdlr2((uchar *)"actionlibdbidriverdirectory", 0, eCmdHdlrGetWord, NULL, &cs.dbiDrvrDir, STD_LOADABLE_MODULE_ID, &bLegacyCnfModGlobalsPermitted));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbidriver", 0, eCmdHdlrGetWord, NULL, &cs.drvrName, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbihost", 0, eCmdHdlrGetWord, NULL, &cs.host, STD_LOADABLE_MODULE_ID));
CHKiRet(omsdRegCFSLineHdlr((uchar *)"actionlibdbiusername", 0, eCmdHdlrGetWord, NULL, &cs.usrName, STD_LOADABLE_MODULE_ID));
diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c
index ed77f824..dd997410 100644
--- a/plugins/ommongodb/ommongodb.c
+++ b/plugins/ommongodb/ommongodb.c
@@ -236,11 +236,11 @@ getDefaultBSON(msg_t *pMsg)
gint64 ts_gen, ts_rcv; /* timestamps: generated, received */
int secfrac;
- procid = MsgGetProp(pMsg, NULL, PROP_PROGRAMNAME, NULL, &procid_len, &procid_free);
- tag = MsgGetProp(pMsg, NULL, PROP_SYSLOGTAG, NULL, &tag_len, &tag_free);
- pid = MsgGetProp(pMsg, NULL, PROP_PROCID, NULL, &pid_len, &pid_free);
- sys = MsgGetProp(pMsg, NULL, PROP_HOSTNAME, NULL, &sys_len, &sys_free);
- msg = MsgGetProp(pMsg, NULL, PROP_MSG, NULL, &msg_len, &msg_free);
+ procid = MsgGetProp(pMsg, NULL, PROP_PROGRAMNAME, NULL, &procid_len, &procid_free, NULL);
+ tag = MsgGetProp(pMsg, NULL, PROP_SYSLOGTAG, NULL, &tag_len, &tag_free, NULL);
+ pid = MsgGetProp(pMsg, NULL, PROP_PROCID, NULL, &pid_len, &pid_free, NULL);
+ sys = MsgGetProp(pMsg, NULL, PROP_HOSTNAME, NULL, &sys_len, &sys_free, NULL);
+ msg = MsgGetProp(pMsg, NULL, PROP_MSG, NULL, &msg_len, &msg_free, NULL);
// TODO: move to datetime? Refactor in any case! rgerhards, 2012-03-30
ts_gen = (gint64) datetime.syslogTime2time_t(&pMsg->tTIMESTAMP) * 1000; /* ms! */
diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c
index 69ffb9ac..2dfa29de 100644
--- a/plugins/ommysql/ommysql.c
+++ b/plugins/ommysql/ommysql.c
@@ -189,7 +189,6 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent)
ASSERT(pData != NULL);
ASSERT(pData->f_hmysql == NULL);
-
pData->f_hmysql = mysql_init(NULL);
if(pData->f_hmysql == NULL) {
errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MySQL handle");
@@ -219,10 +218,12 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent)
pData->f_dbpwd, pData->f_dbname, pData->f_dbsrvPort, NULL, 0) == NULL) {
reportDBError(pData, bSilent);
closeMySQL(pData); /* ignore any error we may get */
- iRet = RS_RET_SUSPENDED;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ mysql_autocommit(pData->f_hmysql, 0);
}
+finalize_it:
RETiRet;
}
@@ -241,6 +242,7 @@ rsRetVal writeMySQL(uchar *psz, instanceData *pData)
/* see if we are ready to proceed */
if(pData->f_hmysql == NULL) {
CHKiRet(initMySQL(pData, 0));
+
}
/* try insert */
@@ -272,12 +274,28 @@ CODESTARTtryResume
}
ENDtryResume
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+ CHKiRet(writeMySQL((uchar*)"START TRANSACTION", pData));
+finalize_it:
+ENDbeginTransaction
+
BEGINdoAction
CODESTARTdoAction
dbgprintf("\n");
- iRet = writeMySQL(ppString[0], pData);
+ CHKiRet(writeMySQL(ppString[0], pData));
+ iRet = RS_RET_DEFER_COMMIT;
+finalize_it:
ENDdoAction
+BEGINendTransaction
+CODESTARTendTransaction
+ if (mysql_commit(pData->f_hmysql) != 0) {
+ dbgprintf("mysql server error: transaction not committed\n");
+ iRet = RS_RET_SUSPENDED;
+ }
+ENDendTransaction
+
static inline void
setInstParamDefaults(instanceData *pData)
@@ -305,7 +323,7 @@ CODESTARTnewActInst
CHKiRet(createInstance(&pData));
setInstParamDefaults(pData);
- CODE_STD_STRING_REQUESTnewActInst(1)
+ CODE_STD_STRING_REQUESTparseSelectorAct(1)
for(i = 0 ; i < actpblk.nParams ; ++i) {
if(!pvals[i].bUsed)
continue;
@@ -437,6 +455,7 @@ BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
ENDqueryEtryPt
@@ -459,6 +478,11 @@ INITLegCnfVars
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
+ if(!bCoreSupportsBatching) {
+ errmsg.LogError(0, NO_ERRCODE, "ommysql: rsyslog core too old");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
/* we need to init the MySQL library. If that fails, we cannot run */
if(
diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c
index e55836c5..c9e32444 100644
--- a/plugins/omrelp/omrelp.c
+++ b/plugins/omrelp/omrelp.c
@@ -7,7 +7,7 @@
*
* File begun on 2008-03-13 by RGerhards
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -43,6 +43,7 @@
#include "glbl.h"
#include "errmsg.h"
#include "debug.h"
+#include "unicode-helper.h"
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
@@ -57,12 +58,14 @@ DEFobjCurrIf(glbl)
static relpEngine_t *pRelpEngine; /* our relp engine */
typedef struct _instanceData {
- char *f_hname;
+ uchar *target;
int compressionLevel; /* 0 - no compression, else level for zlib */
- char *port;
+ uchar *port;
int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
- relpClt_t *pRelpClt; /* relp client for this instance */
+ unsigned timeout;
+ relpClt_t *pRelpClt; /* relp client for this instance */
+ uchar *tplName;
} instanceData;
typedef struct configSettings_s {
@@ -70,30 +73,114 @@ typedef struct configSettings_s {
} configSettings_t;
static configSettings_t __attribute__((unused)) cs;
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "target", eCmdHdlrGetWord, 1 },
+ { "port", eCmdHdlrGetWord, 0 },
+ { "timeout", eCmdHdlrInt, 0 },
+ { "template", eCmdHdlrGetWord, 1 }
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
BEGINinitConfVars /* (re)set config variables to default values */
CODESTARTinitConfVars
ENDinitConfVars
-/* get the syslog forward port from selector_t. The passed in
- * struct must be one that is setup for forwarding.
- * rgerhards, 2007-06-28
- * We may change the implementation to try to lookup the port
- * if it is unspecified. So far, we use the IANA default auf 514.
+/* We may change the implementation to try to lookup the port
+ * if it is unspecified. So far, we use 514 as default (what probably
+ * is not a really bright idea, but kept for backward compatibility).
*/
-static char *getRelpPt(instanceData *pData)
+static uchar *getRelpPt(instanceData *pData)
{
assert(pData != NULL);
if(pData->port == NULL)
- return("514");
+ return((uchar*)"514");
else
return(pData->port);
}
+static inline rsRetVal
+doCreateRelpClient(instanceData *pData)
+{
+ DEFiRet;
+ if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+finalize_it:
+ RETiRet;
+}
+
+
BEGINcreateInstance
CODESTARTcreateInstance
pData->bInitialConnect = 1;
ENDcreateInstance
+BEGINfreeInstance
+CODESTARTfreeInstance
+ if(pData->pRelpClt != NULL)
+ relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt);
+ free(pData->target);
+ free(pData->port);
+ free(pData->tplName);
+ENDfreeInstance
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->target = NULL;
+ pData->port = NULL;
+ pData->tplName = NULL;
+ pData->timeout = 90;
+}
+
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "target")) {
+ pData->target = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "port")) {
+ pData->port = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "template")) {
+ pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "timeout")) {
+ pData->timeout = (unsigned) pvals[i].val.d.n;
+ } else {
+ dbgprintf("omrelp: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+
+ CODE_STD_STRING_REQUESTnewActInst(1)
+
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
+ "RSYSLOG_ForwardFormat" : (char*)pData->tplName),
+ OMSR_NO_RQD_TPL_OPTS));
+
+ CHKiRet(doCreateRelpClient(pData));
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -101,25 +188,16 @@ CODESTARTisCompatibleWithFeature
iRet = RS_RET_OK;
ENDisCompatibleWithFeature
-
-BEGINfreeInstance
-CODESTARTfreeInstance
- if(pData->port != NULL)
- free(pData->port);
-
- /* final cleanup */
- if(pData->pRelpClt != NULL)
- relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt);
-
- if(pData->f_hname != NULL)
- free(pData->f_hname);
-
-ENDfreeInstance
+BEGINSetShutdownImmdtPtr
+CODESTARTSetShutdownImmdtPtr
+ relpEngineSetShutdownImmdtPtr(pRelpEngine, pPtr);
+ DBGPRINTF("omrelp: shutdownImmediate ptr now is %p\n", pPtr);
+ENDSetShutdownImmdtPtr
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
- printf("RELP/%s", pData->f_hname);
+ dbgprintf("RELP/%s", pData->target);
ENDdbgPrintInstInfo
@@ -131,7 +209,7 @@ static rsRetVal doConnect(instanceData *pData)
DEFiRet;
if(pData->bInitialConnect) {
- iRet = relpCltConnect(pData->pRelpClt, glbl.GetDefPFFamily(), (uchar*) pData->port, (uchar*) pData->f_hname);
+ iRet = relpCltConnect(pData->pRelpClt, glbl.GetDefPFFamily(), pData->port, pData->target);
if(iRet == RELP_RET_OK)
pData->bInitialConnect = 0;
} else {
@@ -160,7 +238,7 @@ BEGINdoAction
size_t lenMsg;
relpRetVal ret;
CODESTARTdoAction
- dbgprintf(" %s:%s/RELP\n", pData->f_hname, getRelpPt(pData));
+ dbgprintf(" %s:%s/RELP\n", pData->target, getRelpPt(pData));
if(!pData->bIsConnected) {
CHKiRet(doConnect(pData));
@@ -309,21 +387,17 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* TODO: make this if go away! */
if(*p == ';') {
*p = '\0'; /* trick to obtain hostname (later)! */
- CHKmalloc(pData->f_hname = strdup((char*) q));
+ CHKmalloc(pData->target = ustrdup(q));
*p = ';';
} else {
- CHKmalloc(pData->f_hname = strdup((char*) q));
+ CHKmalloc(pData->target = ustrdup(q));
}
/* process template */
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) "RSYSLOG_ForwardFormat"));
- /* create our relp client */
- CHKiRet(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt)); /* we use CHKiRet as librelp has a similar return value range */
+ CHKiRet(doCreateRelpClient(pData));
- /* TODO: do we need to call freeInstance if we failed - this is a general question for
- * all output modules. I'll address it later as the interface evolves. rgerhards, 2007-07-25
- */
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -342,6 +416,8 @@ BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_SetShutdownImmdtPtr
ENDqueryEtryPt
diff --git a/plugins/omruleset/omruleset.c b/plugins/omruleset/omruleset.c
index 4c7e25d2..11765507 100644
--- a/plugins/omruleset/omruleset.c
+++ b/plugins/omruleset/omruleset.c
@@ -120,7 +120,11 @@ CODESTARTdoAction
(char*) pData->pszRulesetName, pData->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
MsgSetRuleset(pMsg, pData->pRuleset);
- submitMsg(pMsg);
+ /* Note: we intentionally use submitMsg2() here, as we process messages
+ * that were already run through the rate-limiter. So it is (at least)
+ * questionable if they were rate-limited again.
+ */
+ submitMsg2(pMsg);
finalize_it:
ENDdoAction
@@ -165,12 +169,8 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
p += sizeof(":omruleset:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
CHKiRet(createInstance(&pData));
- /* re-enable in v7.3: requires action list to support
- * action-like statements, something that is too late to
- * do in 7.1.
errmsg.LogError(0, RS_RET_DEPRECATED, "warning: omruleset is deprecated, consider "
"using the 'call' statement instead");
- */
/* check if a non-standard template is to be applied */
if(*(p-1) == ';')
diff --git a/plugins/omstdout/omstdout.c b/plugins/omstdout/omstdout.c
index 59f9c8bb..a84a7593 100644
--- a/plugins/omstdout/omstdout.c
+++ b/plugins/omstdout/omstdout.c
@@ -105,6 +105,7 @@ BEGINdoAction
int iBuf;
char szBuf[65564];
size_t len;
+ int r;
CODESTARTdoAction
if(pData->bUseArrayInterface) {
/* if we use array passing, we need to put together a string
@@ -140,9 +141,15 @@ CODESTARTdoAction
* actually intends to use this module in production (why???), this code
* needs to be more solid. -- rgerhards, 2012-11-28
*/
- if(write(1, toWrite, len)) {}; /* 1 is stdout! */
+ if((r = write(1, toWrite, len)) != (int) len) { /* 1 is stdout! */
+ DBGPRINTF("omstdout: error %d writing to stdout[%d]: %s\n",
+ r, len, toWrite);
+ }
if(pData->bEnsureLFEnding && toWrite[len-1] != '\n') {
- if(write(1, "\n", 1)) {}; /* write missing LF */
+ if((r = write(1, "\n", 1)) != 1) { /* write missing LF */
+ DBGPRINTF("omstdout: error %d writing \\n to stdout\n",
+ r);
+ }
}
ENDdoAction
diff --git a/plugins/omudpspoof/omudpspoof.c b/plugins/omudpspoof/omudpspoof.c
index a45d49fa..9c4c80ba 100644
--- a/plugins/omudpspoof/omudpspoof.c
+++ b/plugins/omudpspoof/omudpspoof.c
@@ -5,7 +5,7 @@
* This file builds on UDP spoofing code contributed by
* David Lang <david@lang.hm>. I then created a "real" rsyslog module
* out of that code and omfwd. I decided to make it a separate module because
- * omfwd already mixes up too many things (TCP & UDP & a differnt modes,
+ * omfwd already mixes up too many things (TCP & UDP & a different modes,
* this has historic reasons), it would not be a good idea to also add
* spoofing to it. And, looking at the requirements, there is little in
* common between omfwd and this module.
@@ -93,14 +93,19 @@ DEFobjCurrIf(glbl)
DEFobjCurrIf(net)
typedef struct _instanceData {
+ uchar *tplName; /* name of assigned template */
uchar *host;
uchar *port;
+ uchar *sourceTpl;
+ int mtu;
int *pSockArray; /* sockets to use for UDP */
- int compressionLevel; /* 0 - no compression, else level for zlib */
struct addrinfo *f_addr;
u_short sourcePort;
u_short sourcePortStart; /* for sorce port iteration */
u_short sourcePortEnd;
+ int bReportLibnetInitErr; /* help prevent multiple error messages on init err */
+ libnet_t *libnet_handle;
+ char errbuf[LIBNET_ERRBUF_SIZE];
} instanceData;
#define DFLT_SOURCE_PORT_START 32000
@@ -116,6 +121,22 @@ typedef struct configSettings_s {
} configSettings_t;
static configSettings_t cs;
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "target", eCmdHdlrGetWord, 1 },
+ { "port", eCmdHdlrGetWord, 0 },
+ { "sourcetemplate", eCmdHdlrGetWord, 0 },
+ { "sourceport.start", eCmdHdlrInt, 0 },
+ { "sourceport.end", eCmdHdlrInt, 0 },
+ { "mtu", eCmdHdlrInt, 0 },
+ { "template", eCmdHdlrGetWord, 0 }
+};
+static struct cnfparamblk actpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
/* module-global parameters */
static struct cnfparamdescr modpdescr[] = {
{ "template", eCmdHdlrGetWord, 0 },
@@ -148,8 +169,6 @@ ENDinitConfVars
/* add some variables needed for libnet */
-libnet_t *libnet_handle;
-char errbuf[LIBNET_ERRBUF_SIZE];
pthread_mutex_t mutLibnet;
/* forward definitions */
@@ -291,6 +310,9 @@ ENDfreeCnf
BEGINcreateInstance
CODESTARTcreateInstance
+ pData->libnet_handle = NULL;
+ pData->mtu = 1500;
+ pData->bReportLibnetInitErr = 1;
ENDcreateInstance
@@ -305,8 +327,12 @@ BEGINfreeInstance
CODESTARTfreeInstance
/* final cleanup */
closeUDPSockets(pData);
+ free(pData->tplName);
free(pData->port);
free(pData->host);
+ free(pData->sourceTpl);
+ if(pData->libnet_handle != NULL)
+ libnet_destroy(pData->libnet_handle);
ENDfreeInstance
@@ -331,16 +357,20 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
libnet_ptag_t ip, ipo;
libnet_ptag_t udp;
sbool bNeedUnlock = 0;
+ /* hdrOffs = fragmentation flags + offset (in bytes)
+ * divided by 8 */
+ unsigned msgOffs, hdrOffs;
+ unsigned maxPktLen, pktLen;
DEFiRet;
if(pData->pSockArray == NULL) {
CHKiRet(doTryResume(pData));
}
- if(len > 1472) {
- DBGPRINTF("omudpspoof: msg with length %d truncated to 1472 bytes: '%.768s'\n",
+ if(len > 65528) {
+ DBGPRINTF("omudpspoof: msg with length %d truncated to 64k: '%.768s'\n",
len, msg);
- len = 1472;
+ len = 65528;
}
ip = ipo = udp = 0;
@@ -355,26 +385,41 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
bNeedUnlock = 1;
for (r = pData->f_addr; r && bSendSuccess == RSFALSE ; r = r->ai_next) {
tempaddr = (struct sockaddr_in *)r->ai_addr;
- libnet_clear_packet(libnet_handle);
+ /* Getting max payload size (must be multiple of 8) */
+ maxPktLen = (pData->mtu - LIBNET_IPV4_H) & ~0x07;
+ msgOffs = 0;
+ /* We're doing (payload size - UDP header size) and not
+ * checking if it's a multiple of 8 because we know the
+ * header is 8 bytes long */
+ if(len > (maxPktLen - LIBNET_UDP_H) ) {
+ hdrOffs = IP_MF;
+ pktLen = maxPktLen - LIBNET_UDP_H;
+ } else {
+ hdrOffs = 0;
+ pktLen = len;
+ }
+ DBGPRINTF("omudpspoof: stage 1: MF:%d, hdrOffs %d, pktLen %d\n",
+ (hdrOffs & IP_MF) >> 13, (hdrOffs & 0x1FFF) << 3, pktLen);
+ libnet_clear_packet(pData->libnet_handle);
/* note: libnet does need ports in host order NOT in network byte order! -- rgerhards, 2009-11-12 */
udp = libnet_build_udp(
ntohs(pData->sourcePort),/* source port */
ntohs(tempaddr->sin_port),/* destination port */
- LIBNET_UDP_H + len, /* packet length */
+ pktLen+LIBNET_UDP_H, /* packet length */
0, /* checksum */
(u_char*)msg, /* payload */
- len, /* payload size */
- libnet_handle, /* libnet handle */
+ pktLen, /* payload size */
+ pData->libnet_handle, /* libnet handle */
udp); /* libnet id */
if (udp == -1) {
- DBGPRINTF("omudpspoof: can't build UDP header: %s\n", libnet_geterror(libnet_handle));
+ DBGPRINTF("omudpspoof: can't build UDP header: %s\n", libnet_geterror(pData->libnet_handle));
}
ip = libnet_build_ipv4(
- LIBNET_IPV4_H + len + LIBNET_UDP_H, /* length */
+ LIBNET_IPV4_H+LIBNET_UDP_H+pktLen, /* length */
0, /* TOS */
242, /* IP ID */
- 0, /* IP Frag */
+ hdrOffs, /* IP Frag */
64, /* TTL */
IPPROTO_UDP, /* protocol */
0, /* checksum */
@@ -382,31 +427,87 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
tempaddr->sin_addr.s_addr,
NULL, /* payload */
0, /* payload size */
- libnet_handle, /* libnet handle */
+ pData->libnet_handle, /* libnet handle */
ip); /* libnet id */
if (ip == -1) {
- DBGPRINTF("omudpspoof: can't build IP header: %s\n", libnet_geterror(libnet_handle));
+ DBGPRINTF("omudpspoof: can't build IP header: %s\n", libnet_geterror(pData->libnet_handle));
}
/* Write it to the wire. */
- lsent = libnet_write(libnet_handle);
- if(lsent != LIBNET_IPV4_H+LIBNET_UDP_H+len) {
- DBGPRINTF("omudpspoof: write error len %d, sent %d: %s\n",
- LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(libnet_handle));
+ lsent = libnet_write(pData->libnet_handle);
+ dbgprintf("DDDD: omudpspoof stage 1 return state %d (expected %d), fd %d\n", lsent,
+ (int) (LIBNET_IPV4_H+LIBNET_UDP_H+pktLen), pData->libnet_handle->fd);
+ if(lsent != (int) (LIBNET_IPV4_H+LIBNET_UDP_H+pktLen)) {
+ /* note: access to fd is a libnet internal. If a newer version of libnet does
+ * not expose that member, we should simply remove it. However, while it is there
+ * it is useful for consolidating with strace output.
+ */
+ DBGPRINTF("omudpspoof: write error (total len %d): pktLen %d, sent %d, fd %d: %s\n",
+ len, LIBNET_IPV4_H+LIBNET_UDP_H+pktLen, lsent, pData->libnet_handle->fd,
+ libnet_geterror(pData->libnet_handle));
if(lsent != -1) {
bSendSuccess = RSTRUE;
}
} else {
bSendSuccess = RSTRUE;
}
- }
- /* finished looping */
- if(bSendSuccess == RSFALSE) {
- DBGPRINTF("omudpspoof: error sending message, suspending\n");
- iRet = RS_RET_SUSPENDED;
+ msgOffs += pktLen;
+
+ /* We need to get rid of the UDP header to build the other fragments */
+ libnet_clear_packet(pData->libnet_handle);
+ ip = LIBNET_PTAG_INITIALIZER;
+ while(len > msgOffs ) { /* loop until all payload is sent */
+ /* check if there will be more fragments */
+ if((len - msgOffs) > maxPktLen) {
+ /* In IP's eyes, the UDP header in the first packet
+ * needs to be in the offset, so we add its size to
+ * the payload offset here */
+ hdrOffs = IP_MF + (msgOffs + LIBNET_UDP_H)/8;
+ pktLen = maxPktLen;
+ } else {
+ /* See above */
+ hdrOffs = (msgOffs + LIBNET_UDP_H)/8;
+ pktLen = len - msgOffs;
+ }
+ DBGPRINTF("omudpspoof: stage 2: MF:%d, hdrOffs %d, pktLen %d\n",
+ (hdrOffs & IP_MF) >> 13, (hdrOffs & 0x1FFF) << 3, pktLen);
+ ip = libnet_build_ipv4(
+ LIBNET_IPV4_H + pktLen, /* length */
+ 0, /* TOS */
+ 242, /* IP ID */
+ hdrOffs, /* IP Frag */
+ 64, /* TTL */
+ IPPROTO_UDP, /* protocol */
+ 0, /* checksum */
+ source_ip.sin_addr.s_addr,
+ tempaddr->sin_addr.s_addr,
+ (uint8_t*)(msg+msgOffs), /* payload */
+ pktLen, /* payload size */
+ pData->libnet_handle, /* libnet handle */
+ ip); /* libnet id */
+ if (ip == -1) {
+ DBGPRINTF("omudpspoof: can't build IP fragment header: %s\n", libnet_geterror(pData->libnet_handle));
+ }
+ /* Write it to the wire. */
+ lsent = libnet_write(pData->libnet_handle);
+ dbgprintf("DDDD: omudpspoof stage 1 return state %d (expected %d)\n", lsent, (int) (LIBNET_IPV4_H+pktLen));
+ if(lsent != (int) (LIBNET_IPV4_H+pktLen)) {
+ DBGPRINTF("omudpspoof: fragment write error len %d, sent %d: %s\n",
+ LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pData->libnet_handle));
+ bSendSuccess = RSFALSE;
+ continue;
+ }
+ msgOffs += pktLen;
+ }
}
finalize_it:
+ if(iRet != RS_RET_OK) {
+ if(pData->libnet_handle != NULL) {
+ libnet_destroy(pData->libnet_handle);
+ pData->libnet_handle = NULL;
+ }
+ }
if(bNeedUnlock) {
d_pthread_mutex_unlock(&mutLibnet);
}
@@ -427,8 +528,32 @@ static rsRetVal doTryResume(instanceData *pData)
if(pData->pSockArray != NULL)
FINALIZE;
+ if(pData->host == NULL)
+ ABORT_FINALIZE(RS_RET_DISABLE_ACTION);
+
+ if(pData->libnet_handle == NULL) {
+ /* Initialize the libnet library. Root priviledges are required.
+ * this initializes a IPv4 socket to use for forging UDP packets.
+ */
+ pData->libnet_handle = libnet_init(
+ LIBNET_RAW4, /* injection type */
+ NULL, /* network interface */
+ pData->errbuf); /* errbuf */
+
+ if(pData->libnet_handle == NULL) {
+ if(pData->bReportLibnetInitErr) {
+ errmsg.LogError(0, RS_RET_ERR_LIBNET_INIT, "omudpsoof: error "
+ "initializing libnet - are you running as root?");
+ pData->bReportLibnetInitErr = 0;
+ }
+ ABORT_FINALIZE(RS_RET_ERR_LIBNET_INIT);
+ }
+ }
+ DBGPRINTF("omudpspoof: libnit_init() ok\n");
+ pData->bReportLibnetInitErr = 1;
+
/* The remote address is not yet known and needs to be obtained */
- DBGPRINTF(" %s\n", pData->host);
+ DBGPRINTF("omudpspoof trying resume for '%s'\n", pData->host);
memset(&hints, 0, sizeof(hints));
/* port must be numeric, because config file syntax requires this */
hints.ai_flags = AI_NUMERICSERV;
@@ -449,7 +574,8 @@ finalize_it:
freeaddrinfo(pData->f_addr);
pData->f_addr = NULL;
}
- iRet = RS_RET_SUSPENDED;
+ if(iRet != RS_RET_DISABLE_ACTION)
+ iRet = RS_RET_SUSPENDED;
}
RETiRet;
@@ -463,69 +589,95 @@ ENDtryResume
BEGINdoAction
char *psz; /* temporary buffering */
- register unsigned l;
+ unsigned l;
int iMaxLine;
CODESTARTdoAction
CHKiRet(doTryResume(pData));
- iMaxLine = glbl.GetMaxLine();
-
- //TODO: enable THIS one! DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%.256s'\n", pData->host,
- DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%s'\n", pData->host,
+ DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%.256s'\n", pData->host,
getFwdPt(pData), ppString[1], ppString[0]);
+ iMaxLine = glbl.GetMaxLine();
psz = (char*) ppString[0];
l = strlen((char*) psz);
if((int) l > iMaxLine)
l = iMaxLine;
-# ifdef USE_NETZIP
- /* Check if we should compress and, if so, do it. We also
- * check if the message is large enough to justify compression.
- * The smaller the message, the less likely is a gain in compression.
- * To save CPU cycles, we do not try to compress very small messages.
- * What "very small" means needs to be configured. Currently, it is
- * hard-coded but this may be changed to a config parameter.
- * rgerhards, 2006-11-30
- */
- if(pData->compressionLevel && (l > CONF_MIN_SIZE_FOR_COMPRESS)) {
- Bytef *out;
- uLongf destLen = iMaxLine + iMaxLine/100 +12; /* recommended value from zlib doc */
- uLong srcLen = l;
- int ret;
- /* TODO: optimize malloc sequence? -- rgerhards, 2008-09-02 */
- CHKmalloc(out = (Bytef*) MALLOC(destLen));
- out[0] = 'z';
- out[1] = '\0';
- ret = compress2((Bytef*) out+1, &destLen, (Bytef*) psz,
- srcLen, pData->compressionLevel);
- DBGPRINTF("Compressing message, length was %d now %d, return state %d.\n",
- l, (int) destLen, ret);
- if(ret != Z_OK) {
- /* if we fail, we complain, but only in debug mode
- * Otherwise, we are silent. In any case, we ignore the
- * failed compression and just sent the uncompressed
- * data, which is still valid. So this is probably the
- * best course of action.
- * rgerhards, 2006-11-30
- */
- DBGPRINTF("Compression failed, sending uncompressed message\n");
- } else if(destLen+1 < l) {
- /* only use compression if there is a gain in using it! */
- DBGPRINTF("there is gain in compression, so we do it\n");
- psz = (char*) out;
- l = destLen + 1; /* take care for the "z" at message start! */
- }
- ++destLen;
- }
-# endif
-
CHKiRet(UDPSend(pData, ppString[1], psz, l));
finalize_it:
ENDdoAction
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->tplName = NULL;
+ pData->sourcePortStart = DFLT_SOURCE_PORT_START;
+ pData->sourcePortEnd = DFLT_SOURCE_PORT_END;
+ pData->host = NULL;
+ pData->port = NULL;
+ pData->sourceTpl = (uchar*) strdup("RSYSLOG_omudpspoofDfltSourceTpl");
+ pData->mtu = 1500;
+}
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ uchar *tplToUse;
+ int i;
+CODESTARTnewActInst
+ DBGPRINTF("newActInst (omudpspoof)\n");
+
+ pvals = nvlstGetParams(lst, &actpblk, NULL);
+ if(pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "omudpspoof: mandatory "
+ "parameters missing");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if(Debug) {
+ dbgprintf("action param blk in omudpspoof:\n");
+ cnfparamsPrint(&actpblk, pvals);
+ }
+
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if(!pvals[i].bUsed)
+ continue;
+ if(!strcmp(actpblk.descr[i].name, "target")) {
+ pData->host = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "port")) {
+ pData->port = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "sourcetemplate")) {
+ free(pData->sourceTpl);
+ pData->sourceTpl = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "sourceport.start")) {
+ pData->sourcePortStart = (int) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "sourceport.end")) {
+ pData->sourcePortEnd = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "mtu")) {
+ pData->mtu = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "template")) {
+ pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ DBGPRINTF("omudpspoof: program error, non-handled "
+ "param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+ CODE_STD_STRING_REQUESTnewActInst(2)
+ pData->sourcePort = pData->sourcePortStart;
+
+ tplToUse = ustrdup((pData->tplName == NULL) ? getDfltTpl() : pData->tplName);
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_NO_RQD_TPL_OPTS));
+ CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->sourceTpl), OMSR_NO_RQD_TPL_OPTS));
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
BEGINparseSelectorAct
uchar *sourceTpl;
CODESTARTparseSelectorAct
@@ -583,7 +735,6 @@ freeConfigVars(void)
BEGINmodExit
CODESTARTmodExit
/* destroy the libnet state needed for forged UDP sources */
- libnet_destroy(libnet_handle);
pthread_mutex_destroy(&mutLibnet);
/* release what we no longer need */
objRelease(errmsg, CORE_COMPONENT);
@@ -596,6 +747,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
ENDqueryEtryPt
@@ -623,18 +775,6 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(net,LM_NET_FILENAME));
- /* Initialize the libnet library. Root priviledges are required.
- * this initializes a IPv4 socket to use for forging UDP packets.
- */
- libnet_handle = libnet_init(
- LIBNET_RAW4, /* injection type */
- NULL, /* network interface */
- errbuf); /* errbuf */
-
- if(libnet_handle == NULL) {
- errmsg.LogError(0, NO_ERRCODE, "Error initializing libnet, can not continue ");
- ABORT_FINALIZE(RS_RET_ERR_LIBNET_INIT);
- }
pthread_mutex_init(&mutLibnet, NULL);
CHKiRet(regCfSysLineHdlr((uchar *)"actionomudpspoofdefaulttemplate", 0, eCmdHdlrGetWord, setLegacyDfltTpl, NULL, NULL));