summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/imkmsg/kmsg.c46
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c122
-rw-r--r--plugins/omrabbitmq/Makefile.am8
-rw-r--r--plugins/omrabbitmq/README.md56
-rw-r--r--plugins/omrabbitmq/omrabbitmq.c466
-rw-r--r--plugins/omudpspoof/omudpspoof.c3
6 files changed, 658 insertions, 43 deletions
diff --git a/plugins/imkmsg/kmsg.c b/plugins/imkmsg/kmsg.c
index f1815f25..822d3dbd 100644
--- a/plugins/imkmsg/kmsg.c
+++ b/plugins/imkmsg/kmsg.c
@@ -32,9 +32,8 @@
#include <errno.h>
#include <string.h>
#include <ctype.h>
-#ifdef OS_LINUX
#include <sys/klog.h>
-#endif
+#include <sys/sysinfo.h>
#include <json/json.h>
#include "rsyslog.h"
@@ -58,9 +57,8 @@ submitSyslog(uchar *buf)
{
long offs = 0;
struct timeval tv;
- long int timestamp = 0;
- struct timespec monotonic;
- struct timespec realtime;
+ struct sysinfo info;
+ unsigned long int timestamp = 0;
char name[1024];
char value[1024];
char msg[1024];
@@ -87,12 +85,12 @@ submitSyslog(uchar *buf)
/* get timestamp */
for (; isdigit(*buf); buf++) {
- timestamp += (timestamp * 10) + (*buf - '0');
+ timestamp = (timestamp * 10) + (*buf - '0');
}
while (*buf != ';') {
buf++; /* skip everything till the first ; */
- }
+ }
buf++; /* skip ; */
/* get message */
@@ -131,10 +129,24 @@ submitSyslog(uchar *buf)
}
/* calculate timestamp */
- clock_gettime(CLOCK_MONOTONIC, &monotonic);
- clock_gettime(CLOCK_REALTIME, &realtime);
- tv.tv_sec = realtime.tv_sec + ((timestamp / 1000000l) - monotonic.tv_sec);
- tv.tv_usec = (realtime.tv_nsec + ((timestamp / 1000000000l) - monotonic.tv_nsec)) / 1000;
+ sysinfo(&info);
+ gettimeofday(&tv, NULL);
+
+ /* get boot time */
+ tv.tv_sec -= info.uptime;
+
+ tv.tv_sec += timestamp / 1000000;
+ tv.tv_usec += timestamp % 1000000;
+
+ while (tv.tv_usec < 0) {
+ tv.tv_sec--;
+ tv.tv_usec += 1000000;
+ }
+
+ while (tv.tv_usec >= 1000000) {
+ tv.tv_sec++;
+ tv.tv_usec -= 1000000;
+ }
Syslog(priority, (uchar *)msg, &tv, json);
}
@@ -146,7 +158,6 @@ rsRetVal
klogWillRun(modConfData_t *pModConf)
{
char errmsg[2048];
- int r;
DEFiRet;
fklog = open(_PATH_KLOG, O_RDONLY, 0);
@@ -156,17 +167,6 @@ klogWillRun(modConfData_t *pModConf)
ABORT_FINALIZE(RS_RET_ERR_OPEN_KLOG);
}
- /* Set level of kernel console messaging.. */
- if(pModConf->console_log_level != -1) {
- r = klogctl(8, NULL, pModConf->console_log_level);
- if(r != 0) {
- imkmsgLogIntMsg(LOG_WARNING, "imkmsg: cannot set console log level: %s",
- rs_strerror_r(errno, errmsg, sizeof(errmsg)));
- /* make sure we do not try to re-set! */
- pModConf->console_log_level = -1;
- }
- }
-
finalize_it:
RETiRet;
}
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index f27fe62b..33e58c1a 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -11,11 +11,11 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
* -or-
* see COPYING.ASL20 in the source distribution
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -79,12 +79,14 @@ typedef struct _instanceData {
uchar *parent;
uchar *tplName;
uchar *timeout;
+ uchar *bulkId;
uchar *restURL; /* last used URL for error reporting */
uchar *errorFile;
char *reply;
sbool dynSrchIdx;
sbool dynSrchType;
sbool dynParent;
+ sbool dynBulkId;
sbool bulkmode;
sbool asyncRepl;
struct {
@@ -114,7 +116,9 @@ static struct cnfparamdescr actpdescr[] = {
{ "asyncrepl", eCmdHdlrBinary, 0 },
{ "timeout", eCmdHdlrGetWord, 0 },
{ "errorfile", eCmdHdlrGetWord, 0 },
- { "template", eCmdHdlrGetWord, 1 }
+ { "template", eCmdHdlrGetWord, 1 },
+ { "dynbulkid", eCmdHdlrBinary, 0 },
+ { "bulkid", eCmdHdlrGetWord, 0 },
};
static struct cnfparamblk actpblk =
{ CNFPARAMBLK_VERSION,
@@ -156,6 +160,7 @@ CODESTARTfreeInstance
free(pData->timeout);
free(pData->restURL);
free(pData->errorFile);
+ free(pData->bulkId);
ENDfreeInstance
BEGINdbgPrintInstInfo
@@ -177,6 +182,8 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tbulkmode=%d\n", pData->bulkmode);
dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ?
(uchar*)"(not configured)" : pData->errorFile);
+ dbgprintf("\tdynbulkid=%d\n", pData->dynBulkId);
+ dbgprintf("\tbulkid='%s'\n", pData->bulkId);
ENDdbgPrintInstInfo
@@ -220,7 +227,7 @@ checkConn(instanceData *pData)
cstr = es_str2cstr(url, NULL);
curl_easy_setopt(curl, CURLOPT_URL, cstr);
free(cstr);
-
+
pData->reply = NULL;
pData->replyLen = 0;
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
@@ -250,7 +257,8 @@ ENDtryResume
/* get the current index and type for this message */
static inline void
getIndexTypeAndParent(instanceData *pData, uchar **tpls,
- uchar **srchIndex, uchar **srchType, uchar **parent)
+ uchar **srchIndex, uchar **srchType, uchar **parent,
+ uchar **bulkId)
{
if(pData->dynSrchIdx) {
*srchIndex = tpls[1];
@@ -258,15 +266,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls,
*srchType = tpls[2];
if(pData->dynParent) {
*parent = tpls[3];
+ if(pData->dynBulkId) {
+ *bulkId = tpls[4];
+ }
} else {
*parent = pData->parent;
+ if(pData->dynBulkId) {
+ *bulkId = tpls[3];
+ }
}
} else {
*srchType = pData->searchType;
if(pData->dynParent) {
*parent = tpls[2];
+ if(pData->dynBulkId) {
+ *bulkId = tpls[3];
+ }
} else {
*parent = pData->parent;
+ if(pData->dynBulkId) {
+ *bulkId = tpls[2];
+ }
}
}
} else {
@@ -275,15 +295,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls,
*srchType = tpls[1];
if(pData->dynParent) {
*parent = tpls[2];
+ if(pData->dynBulkId) {
+ *bulkId = tpls[3];
+ }
} else {
*parent = pData->parent;
+ if(pData->dynBulkId) {
+ *bulkId = tpls[2];
+ }
}
} else {
*srchType = pData->searchType;
if(pData->dynParent) {
*parent = tpls[1];
+ if(pData->dynBulkId) {
+ *bulkId = tpls[2];
+ }
} else {
*parent = pData->parent;
+ if(pData->dynBulkId) {
+ *bulkId = tpls[1];
+ }
}
}
}
@@ -297,6 +329,7 @@ setCurlURL(instanceData *pData, uchar **tpls)
uchar *searchIndex;
uchar *searchType;
uchar *parent;
+ uchar *bulkId;
es_str_t *url;
int rLocal;
int r;
@@ -308,7 +341,7 @@ setCurlURL(instanceData *pData, uchar **tpls)
r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1);
parent = NULL;
} else {
- getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent);
+ getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
if(r == 0) r = es_addChar(&url, '/');
if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
@@ -330,7 +363,7 @@ setCurlURL(instanceData *pData, uchar **tpls)
free(pData->restURL);
pData->restURL = (uchar*)es_str2cstr(url, NULL);
- curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL);
+ curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL);
es_deleteStr(url);
DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL);
@@ -343,7 +376,7 @@ setCurlURL(instanceData *pData, uchar **tpls)
rLocal);
ABORT_FINALIZE(RS_RET_ERR);
}
- curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf);
+ curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf);
curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
}
finalize_it:
@@ -363,13 +396,15 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
uchar *searchIndex;
uchar *searchType;
uchar *parent;
+ uchar *bulkId = NULL;
DEFiRet;
# define META_STRT "{\"index\":{\"_index\": \""
# define META_TYPE "\",\"_type\":\""
# define META_PARENT "\",\"_parent\":\""
+# define META_ID "\", \"_id\":\""
# define META_END "\"}}\n"
- getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent);
+ getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
@@ -380,6 +415,10 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent));
}
+ if(bulkId != NULL) {
+ if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1);
+ if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId));
+ }
if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length);
if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1);
@@ -409,7 +448,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
ssize_t wrRet;
char errStr[1024];
DEFiRet;
-
+
if(pData->errorFile == NULL) {
DBGPRINTF("omelasticsearch: no local error logger defined - "
"ignoring ES error information\n");
@@ -524,7 +563,7 @@ checkResult(instanceData *pData, uchar *reqmsg)
}
/* Note: we ignore errors writing the error file, as we cannot handle
- * these in any case.
+ * these in any case.
*/
if(iRet == RS_RET_DATAFAIL) {
writeDataError(pData, &root, reqmsg);
@@ -552,8 +591,8 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
CHKiRet(setCurlURL(pData, tpls));
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
- curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
- curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
code = curl_easy_perform(curl);
switch (code) {
case CURLE_COULDNT_RESOLVE_HOST:
@@ -649,10 +688,10 @@ curlSetup(instanceData *pData)
}
header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
- curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header);
+ curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header);
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
- curl_easy_setopt(handle, CURLOPT_POST, 1);
+ curl_easy_setopt(handle, CURLOPT_POST, 1);
pData->curlHandle = handle;
pData->postHeader = header;
@@ -690,6 +729,8 @@ setInstParamDefaults(instanceData *pData)
pData->bulkmode = 0;
pData->tplName = NULL;
pData->errorFile = NULL;
+ pData->dynBulkId= 0;
+ pData->bulkId = NULL;
}
BEGINnewActInst
@@ -737,12 +778,16 @@ CODESTARTnewActInst
pData->asyncRepl = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) {
+ pData->dynBulkId = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "bulkid")) {
+ pData->bulkId = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
dbgprintf("omelasticsearch: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
}
}
-
+
if(pData->pwd != NULL && pData->uid == NULL) {
errmsg.LogError(0, RS_RET_UID_MISSING,
"omelasticsearch: password is provided, but no uid "
@@ -767,6 +812,12 @@ CODESTARTnewActInst
"name for parent template given - action definition invalid");
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
+ if(pData->dynBulkId && pData->bulkId == NULL) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR,
+ "omelasticsearch: requested dynamic bulkid, but no "
+ "name for bulkid template given - action definition invalid");
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
if(pData->bulkmode) {
pData->batch.currTpl1 = NULL;
@@ -782,6 +833,7 @@ CODESTARTnewActInst
if(pData->dynSrchIdx) ++iNumTpls;
if(pData->dynSrchType) ++iNumTpls;
if(pData->dynParent) ++iNumTpls;
+ if(pData->dynBulkId) ++iNumTpls;
DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls);
CODE_STD_STRING_REQUESTnewActInst(iNumTpls)
@@ -803,11 +855,29 @@ CODESTARTnewActInst
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 4, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
}
} else {
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
}
}
} else {
@@ -817,12 +887,30 @@ CODESTARTnewActInst
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
}
} else {
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
- }
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ } else {
+ if(pData->dynBulkId) {
+ CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->bulkId),
+ OMSR_NO_RQD_TPL_OPTS));
+ }
+ }
}
}
diff --git a/plugins/omrabbitmq/Makefile.am b/plugins/omrabbitmq/Makefile.am
new file mode 100644
index 00000000..de374081
--- /dev/null
+++ b/plugins/omrabbitmq/Makefile.am
@@ -0,0 +1,8 @@
+pkglib_LTLIBRARIES = omrabbitmq.la
+
+omrabbitmq_la_SOURCES = omrabbitmq.c
+omrabbitmq_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
+omrabbitmq_la_LDFLAGS = -module -avoid-version
+omrabbitmq_la_LIBADD = $(RABBITMQ_LIBS)
+
+EXTRA_DIST =
diff --git a/plugins/omrabbitmq/README.md b/plugins/omrabbitmq/README.md
new file mode 100644
index 00000000..7aa60206
--- /dev/null
+++ b/plugins/omrabbitmq/README.md
@@ -0,0 +1,56 @@
+
+# rsyslog output module for RabbitMQ
+
+This module sends syslog messages into RabbitMQ server.
+
+Only v6 configuration syntax is supported.
+
+**omrabbitmq is tested only with 6.6.0 version of rsyslog.**
+
+
+## Compile
+To successfully compile omrabbitmq module you need [rabbitmq-c](https://github.com/alanxz/rabbitmq-c) library.
+
+ ./configure --enable-omrabbitmq ...
+
+
+----
+## Configure
+
+omrabbitmq output module supports only v6 configuration syntax.
+
+Parameters:
+
+* host=&lt;hostname&gt; &#8211; server
+* virtual_host=&lt;virtual\_host&gt; &#8211; virtual message broker
+* user=&lt;user&gt; &#8211; user name
+* password=&lt;password&gt; &#8211; password
+* exchange=&lt;name&gt; &#8211; exchange name
+* routing_key=&lt;name&gt; &#8211; name of routing key
+
+
+Example:
+
+ $ModLoad omrabbitmq
+
+ *.* action(type="omrabbitmq"
+ host="localhost"
+ virtual_host="/"
+ user="guest"
+ password="guest"
+ exchange="syslog"
+ routing_key="syslog.all"
+ template="RSYSLOG_ForwardFormat"
+ queue.type="linkedlist"
+ queue.timeoutenqueue="0"
+ queue.filename="rabbitmq"
+ queue.highwatermark="500000"
+ queue.lowwatermark="400000"
+ queue.discardmark="5000000"
+ queue.timeoutenqueue="0"
+ queue.maxdiskspace="5g"
+ queue.size="2000000"
+ queue.saveonshutdown="on"
+ action.resumeretrycount="-1")
+
+
diff --git a/plugins/omrabbitmq/omrabbitmq.c b/plugins/omrabbitmq/omrabbitmq.c
new file mode 100644
index 00000000..7ea7793d
--- /dev/null
+++ b/plugins/omrabbitmq/omrabbitmq.c
@@ -0,0 +1,466 @@
+/* omrabbitmq.c
+ *
+ * This output plugin enables rsyslog to send messages to the RabbitMQ.
+ *
+ * Copyright 2012-2013 Vaclav Tomec
+ *
+ * This program is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Author: Vaclav Tomec
+ * <vaclav.tomec@gmail.com>
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <signal.h>
+#include <errno.h>
+#include <time.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "srUtils.h"
+#include "template.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "cfsysline.h"
+
+#include <amqp.h>
+
+MODULE_TYPE_OUTPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("omrabbitmq")
+
+
+/*
+ * internal structures
+ */
+DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+
+
+typedef struct _instanceData {
+ /* here you need to define all action-specific data. A record of type
+ * instanceData will be handed over to each instance of the action. Keep
+ * in mind that there may be several invocations of the same type of action
+ * inside rsyslog.conf, and this is what keeps them apart. Do NOT use
+ * static data for this!
+ */
+ amqp_connection_state_t conn;
+ amqp_basic_properties_t props;
+ uchar *host;
+ int port;
+ uchar *vhost;
+ uchar *user;
+ uchar *password;
+ uchar *exchange;
+ uchar *routing_key;
+ uchar *tplName;
+} instanceData;
+
+
+/* tables for interfacing with the v6 config system */
+/* action (instance) parameters */
+static struct cnfparamdescr actpdescr[] = {
+ { "host", eCmdHdlrGetWord, 0 },
+ { "port", eCmdHdlrInt, 0 },
+ { "virtual_host", eCmdHdlrGetWord, 0 },
+ { "user", eCmdHdlrGetWord, 0 },
+ { "password", eCmdHdlrGetWord, 0 },
+ { "exchange", eCmdHdlrGetWord, 0 },
+ { "routing_key", eCmdHdlrGetWord, 0 },
+ { "template", eCmdHdlrGetWord, 0 }
+};
+static struct cnfparamblk actpblk =
+ {
+ CNFPARAMBLK_VERSION,
+ sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+ actpdescr
+ };
+
+
+/*
+ * Report general error
+ */
+static int
+die_on_error(int x, char const *context)
+{
+ int retVal = 0; // false
+
+ if (x < 0) {
+ char *errstr = amqp_error_string(-x);
+ errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: %s", context, errstr);
+ free(errstr);
+ retVal = 1; // true
+ }
+
+ return retVal;
+}
+
+
+/*
+ * Report AMQP specific error
+ */
+static int
+die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
+{
+ int retVal = 1; // true
+
+ switch (x.reply_type) {
+ case AMQP_RESPONSE_NORMAL:
+ retVal = 0; // false
+ break;
+
+ case AMQP_RESPONSE_NONE:
+ errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: missing RPC reply type!", context);
+ break;
+
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: %s", context, amqp_error_string(x.library_error));
+ break;
+
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ switch (x.reply.id) {
+ case AMQP_CONNECTION_CLOSE_METHOD: {
+ amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
+ errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: server connection error %d, message: %.*s",
+ context,
+ m->reply_code,
+ (int) m->reply_text.len, (char *) m->reply_text.bytes);
+ break;
+ }
+ case AMQP_CHANNEL_CLOSE_METHOD: {
+ amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
+ errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: server channel error %d, message: %.*s",
+ context,
+ m->reply_code,
+ (int) m->reply_text.len, (char *) m->reply_text.bytes);
+ break;
+ }
+ default:
+ errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: unknown server error, method id 0x%08X\n", context, x.reply.id);
+ break;
+ }
+ break;
+
+ }
+
+ return retVal;
+}
+
+
+static amqp_bytes_t
+cstring_bytes(const char *str)
+{
+ return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
+}
+
+
+static void
+closeAMQPConnection(instanceData *pData)
+{
+ if (pData->conn != NULL) {
+ die_on_amqp_error(amqp_channel_close(pData->conn, 1, AMQP_REPLY_SUCCESS), "amqp_channel_close");
+ die_on_amqp_error(amqp_connection_close(pData->conn, AMQP_REPLY_SUCCESS), "amqp_connection_close");
+ die_on_error(amqp_destroy_connection(pData->conn), "amqp_destroy_connection");
+
+ pData->conn = NULL;
+ }
+}
+
+
+/*
+ * Initialize RabbitMQ connection
+ */
+static rsRetVal
+initRabbitMQ(instanceData *pData)
+{
+ int sockfd;
+ DEFiRet;
+
+ DBGPRINTF("omrabbitmq: trying connect to '%s' at port %d\n", pData->host, pData->port);
+
+ pData->conn = amqp_new_connection();
+
+ if (die_on_error(sockfd = amqp_open_socket((char*) pData->host, pData->port), "Opening socket")) {
+ pData->conn = NULL;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+ amqp_set_sockfd(pData->conn, sockfd);
+
+ if (die_on_amqp_error(amqp_login(pData->conn, (char*) pData->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, pData->user, pData->password),
+ "Logging in")) {
+ pData->conn = NULL;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+ amqp_channel_open(pData->conn, 1);
+
+ if (die_on_amqp_error(amqp_get_rpc_reply(pData->conn), "Opening channel")) {
+ pData->conn = NULL;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ /* use this to specify if select features are supported by this
+ * plugin. If not, the framework will handle that. Currently, only
+ * RepeatedMsgReduction ("last message repeated n times") is optional.
+ */
+ if(eFeat == sFEATURERepeatedMsgReduction)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+ /* this is a cleanup callback. All dynamically-allocated resources
+ * in instance data must be cleaned up here. Prime examples are
+ * malloc()ed memory, file & database handles and the like.
+ */
+ closeAMQPConnection(pData);
+ free(pData->host);
+ free(pData->vhost);
+ free(pData->user);
+ free(pData->password);
+ free(pData->exchange);
+ free(pData->routing_key);
+ free(pData->tplName);
+ENDfreeInstance
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+ /* permits to spit out some debug info */
+ dbgprintf("omrabbitmq\n");
+ dbgprintf("\thost='%s'\n", pData->host);
+ dbgprintf("\tport=%d\n", pData->port);
+ dbgprintf("\tvirtual_host='%s'\n", pData->vhost);
+ dbgprintf("\tuser='%s'\n", pData->user == NULL ? (uchar*)"(not configured)" : pData->user);
+ dbgprintf("\tpassword=(%sconfigured)\n", pData->password == NULL ? "not " : "");
+ dbgprintf("\texchange='%s'\n", pData->exchange);
+ dbgprintf("\trouting_key='%s'\n", pData->routing_key);
+ dbgprintf("\ttemplate='%s'\n", pData->tplName);
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ /* this is called when an action has been suspended and the
+ * rsyslog core tries to resume it. The action must then
+ * retry (if possible) and report RS_RET_OK if it succeeded
+ * or RS_RET_SUSPENDED otherwise.
+ * Note that no data can be written in this callback, as it is
+ * not present. Prime examples of what can be retried are
+ * reconnects to remote hosts, reconnects to database,
+ * opening of files and the like.
+ * If there is no retry-type of operation, the action may
+ * return RS_RET_OK, so that it will get called on its doAction
+ * entry point (where it receives data), retries there, and
+ * immediately returns RS_RET_SUSPENDED if that does not work
+ * out. This disables some optimizations in the core's retry logic,
+ * but is a valid and expected behaviour. Note that it is also OK
+ * for the retry entry point to return OK but the immediately following
+ * doAction call to fail. In real life, for example, a buggy com line
+ * may cause such behaviour.
+ * Note that there is no guarantee that the core will very quickly
+ * call doAction after the retry succeeded. Today, it does, but that may
+ * not always be the case.
+ */
+
+ if (pData->conn == NULL) {
+ iRet = initRabbitMQ(pData);
+ }
+
+ENDtryResume
+
+
+BEGINdoAction
+CODESTARTdoAction
+ /* this is where you receive the message and need to carry out the
+ * action. Data is provided in ppString[i] where 0 <= i <= num of strings
+ * requested.
+ * Return RS_RET_OK if all goes well, RS_RET_SUSPENDED if the action can
+ * currently not complete, or an error code or RS_RET_DISABLED. The later
+ * two should only be returned if there is no hope that the action can be
+ * restored unless an rsyslog restart (prime example is an invalid config).
+ * Error code or RS_RET_DISABLED permanently disables the action, up to
+ * the next restart.
+ */
+
+ amqp_bytes_t body_bytes;
+
+ if (pData->conn == NULL) {
+ CHKiRet(initRabbitMQ(pData));
+ }
+
+ body_bytes = amqp_cstring_bytes((char *)ppString[0]);
+
+ if (die_on_error(amqp_basic_publish(pData->conn, 1,
+ cstring_bytes((char *) pData->exchange),
+ cstring_bytes((char *) pData->routing_key),
+ 0, 0, &pData->props, body_bytes), "amqp_basic_publish")) {
+ closeAMQPConnection(pData);
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
+ }
+
+finalize_it:
+
+ENDdoAction
+
+
+static inline void
+setInstParamDefaults(instanceData *pData)
+{
+ pData->host = NULL;
+ pData->port = 5672;
+ pData->vhost = NULL;
+ pData->user = NULL;
+ pData->password = NULL;
+ pData->exchange = NULL;
+ pData->routing_key = NULL;
+ pData->tplName = NULL;
+}
+
+
+BEGINnewActInst
+ struct cnfparamvals *pvals;
+ int i;
+CODESTARTnewActInst
+
+ if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ CHKiRet(createInstance(&pData));
+ setInstParamDefaults(pData);
+
+ CODE_STD_STRING_REQUESTparseSelectorAct(1)
+
+ for(i = 0 ; i < actpblk.nParams ; ++i) {
+ if (!pvals[i].bUsed)
+ continue;
+ if (!strcmp(actpblk.descr[i].name, "host")) {
+ pData->host = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "port")) {
+ pData->port = (int) pvals[i].val.d.n;
+ } else if (!strcmp(actpblk.descr[i].name, "virtual_host")) {
+ pData->vhost = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "user")) {
+ pData->user = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "password")) {
+ pData->password = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "exchange")) {
+ pData->exchange = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "routing_key")) {
+ pData->routing_key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if (!strcmp(actpblk.descr[i].name, "template")) {
+ pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("omrabbitmq: program error, non-handled param '%s'\n", actpblk.descr[i].name);
+ }
+ }
+
+ if (pData->host == NULL) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter host must be specified");
+ ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+ }
+
+ if (pData->vhost == NULL) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter virtual_host must be specified");
+ ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+ }
+
+ if (pData->user == NULL) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter user must be specified");
+ ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+ }
+
+ if (pData->password == NULL) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter password must be specified");
+ ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+ }
+
+ if (pData->exchange == NULL) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter exchange must be specified");
+ ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+ }
+
+ if (pData->routing_key == NULL) {
+ errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter routing_key must be specified");
+ ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
+ }
+
+ // RabbitMQ properties initialization
+ memset(&pData->props, 0, sizeof pData->props);
+ pData->props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
+ pData->props.delivery_mode = 2; /* persistent delivery mode */
+ pData->props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
+ pData->props.content_type = amqp_cstring_bytes("application/json");
+
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
+ " StdJSONFmt" : (char*)pData->tplName),
+ OMSR_NO_RQD_TPL_OPTS));
+
+CODE_STD_FINALIZERnewActInst
+ cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+ CODE_STD_STRING_REQUESTparseSelectorAct(1)
+ if(!strncmp((char*) p, ":omrabbitmq:", sizeof(":omrabbitmq:") - 1)) {
+ errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+ "omrabbitmq supports only v6 config format, use: "
+ "action(type=\"omrabbitmq\" host=...)");
+ }
+ ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINmodExit
+CODESTARTmodExit
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+ CODEqueryEtryPt_STD_OMOD_QUERIES
+ CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+ENDmodInit
diff --git a/plugins/omudpspoof/omudpspoof.c b/plugins/omudpspoof/omudpspoof.c
index 9c4c80ba..c80f0e57 100644
--- a/plugins/omudpspoof/omudpspoof.c
+++ b/plugins/omudpspoof/omudpspoof.c
@@ -435,8 +435,6 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
/* Write it to the wire. */
lsent = libnet_write(pData->libnet_handle);
- dbgprintf("DDDD: omudpspoof stage 1 return state %d (expected %d), fd %d\n", lsent,
- (int) (LIBNET_IPV4_H+LIBNET_UDP_H+pktLen), pData->libnet_handle->fd);
if(lsent != (int) (LIBNET_IPV4_H+LIBNET_UDP_H+pktLen)) {
/* note: access to fd is a libnet internal. If a newer version of libnet does
* not expose that member, we should simply remove it. However, while it is there
@@ -490,7 +488,6 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
}
/* Write it to the wire. */
lsent = libnet_write(pData->libnet_handle);
- dbgprintf("DDDD: omudpspoof stage 1 return state %d (expected %d)\n", lsent, (int) (LIBNET_IPV4_H+pktLen));
if(lsent != (int) (LIBNET_IPV4_H+pktLen)) {
DBGPRINTF("omudpspoof: fragment write error len %d, sent %d: %s\n",
LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pData->libnet_handle));