diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imkmsg/kmsg.c | 46 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 122 | ||||
-rw-r--r-- | plugins/omrabbitmq/Makefile.am | 8 | ||||
-rw-r--r-- | plugins/omrabbitmq/README.md | 56 | ||||
-rw-r--r-- | plugins/omrabbitmq/omrabbitmq.c | 466 | ||||
-rw-r--r-- | plugins/omudpspoof/omudpspoof.c | 3 |
6 files changed, 658 insertions, 43 deletions
diff --git a/plugins/imkmsg/kmsg.c b/plugins/imkmsg/kmsg.c index f1815f25..822d3dbd 100644 --- a/plugins/imkmsg/kmsg.c +++ b/plugins/imkmsg/kmsg.c @@ -32,9 +32,8 @@ #include <errno.h> #include <string.h> #include <ctype.h> -#ifdef OS_LINUX #include <sys/klog.h> -#endif +#include <sys/sysinfo.h> #include <json/json.h> #include "rsyslog.h" @@ -58,9 +57,8 @@ submitSyslog(uchar *buf) { long offs = 0; struct timeval tv; - long int timestamp = 0; - struct timespec monotonic; - struct timespec realtime; + struct sysinfo info; + unsigned long int timestamp = 0; char name[1024]; char value[1024]; char msg[1024]; @@ -87,12 +85,12 @@ submitSyslog(uchar *buf) /* get timestamp */ for (; isdigit(*buf); buf++) { - timestamp += (timestamp * 10) + (*buf - '0'); + timestamp = (timestamp * 10) + (*buf - '0'); } while (*buf != ';') { buf++; /* skip everything till the first ; */ - } + } buf++; /* skip ; */ /* get message */ @@ -131,10 +129,24 @@ submitSyslog(uchar *buf) } /* calculate timestamp */ - clock_gettime(CLOCK_MONOTONIC, &monotonic); - clock_gettime(CLOCK_REALTIME, &realtime); - tv.tv_sec = realtime.tv_sec + ((timestamp / 1000000l) - monotonic.tv_sec); - tv.tv_usec = (realtime.tv_nsec + ((timestamp / 1000000000l) - monotonic.tv_nsec)) / 1000; + sysinfo(&info); + gettimeofday(&tv, NULL); + + /* get boot time */ + tv.tv_sec -= info.uptime; + + tv.tv_sec += timestamp / 1000000; + tv.tv_usec += timestamp % 1000000; + + while (tv.tv_usec < 0) { + tv.tv_sec--; + tv.tv_usec += 1000000; + } + + while (tv.tv_usec >= 1000000) { + tv.tv_sec++; + tv.tv_usec -= 1000000; + } Syslog(priority, (uchar *)msg, &tv, json); } @@ -146,7 +158,6 @@ rsRetVal klogWillRun(modConfData_t *pModConf) { char errmsg[2048]; - int r; DEFiRet; fklog = open(_PATH_KLOG, O_RDONLY, 0); @@ -156,17 +167,6 @@ klogWillRun(modConfData_t *pModConf) ABORT_FINALIZE(RS_RET_ERR_OPEN_KLOG); } - /* Set level of kernel console messaging.. */ - if(pModConf->console_log_level != -1) { - r = klogctl(8, NULL, pModConf->console_log_level); - if(r != 0) { - imkmsgLogIntMsg(LOG_WARNING, "imkmsg: cannot set console log level: %s", - rs_strerror_r(errno, errmsg, sizeof(errmsg))); - /* make sure we do not try to re-set! */ - pModConf->console_log_level = -1; - } - } - finalize_it: RETiRet; } diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index f27fe62b..33e58c1a 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -11,11 +11,11 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * -or- * see COPYING.ASL20 in the source distribution - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -79,12 +79,14 @@ typedef struct _instanceData { uchar *parent; uchar *tplName; uchar *timeout; + uchar *bulkId; uchar *restURL; /* last used URL for error reporting */ uchar *errorFile; char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; + sbool dynBulkId; sbool bulkmode; sbool asyncRepl; struct { @@ -114,7 +116,9 @@ static struct cnfparamdescr actpdescr[] = { { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "errorfile", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 } + { "template", eCmdHdlrGetWord, 1 }, + { "dynbulkid", eCmdHdlrBinary, 0 }, + { "bulkid", eCmdHdlrGetWord, 0 }, }; static struct cnfparamblk actpblk = { CNFPARAMBLK_VERSION, @@ -156,6 +160,7 @@ CODESTARTfreeInstance free(pData->timeout); free(pData->restURL); free(pData->errorFile); + free(pData->bulkId); ENDfreeInstance BEGINdbgPrintInstInfo @@ -177,6 +182,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tbulkmode=%d\n", pData->bulkmode); dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? (uchar*)"(not configured)" : pData->errorFile); + dbgprintf("\tdynbulkid=%d\n", pData->dynBulkId); + dbgprintf("\tbulkid='%s'\n", pData->bulkId); ENDdbgPrintInstInfo @@ -220,7 +227,7 @@ checkConn(instanceData *pData) cstr = es_str2cstr(url, NULL); curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); - + pData->reply = NULL; pData->replyLen = 0; curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); @@ -250,7 +257,8 @@ ENDtryResume /* get the current index and type for this message */ static inline void getIndexTypeAndParent(instanceData *pData, uchar **tpls, - uchar **srchIndex, uchar **srchType, uchar **parent) + uchar **srchIndex, uchar **srchType, uchar **parent, + uchar **bulkId) { if(pData->dynSrchIdx) { *srchIndex = tpls[1]; @@ -258,15 +266,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, *srchType = tpls[2]; if(pData->dynParent) { *parent = tpls[3]; + if(pData->dynBulkId) { + *bulkId = tpls[4]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } } else { *srchType = pData->searchType; if(pData->dynParent) { *parent = tpls[2]; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } } } else { @@ -275,15 +295,27 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, *srchType = tpls[1]; if(pData->dynParent) { *parent = tpls[2]; + if(pData->dynBulkId) { + *bulkId = tpls[3]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } } else { *srchType = pData->searchType; if(pData->dynParent) { *parent = tpls[1]; + if(pData->dynBulkId) { + *bulkId = tpls[2]; + } } else { *parent = pData->parent; + if(pData->dynBulkId) { + *bulkId = tpls[1]; + } } } } @@ -297,6 +329,7 @@ setCurlURL(instanceData *pData, uchar **tpls) uchar *searchIndex; uchar *searchType; uchar *parent; + uchar *bulkId; es_str_t *url; int rLocal; int r; @@ -308,7 +341,7 @@ setCurlURL(instanceData *pData, uchar **tpls) r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1); parent = NULL; } else { - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addChar(&url, '/'); if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType)); @@ -330,7 +363,7 @@ setCurlURL(instanceData *pData, uchar **tpls) free(pData->restURL); pData->restURL = (uchar*)es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); + curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); es_deleteStr(url); DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); @@ -343,7 +376,7 @@ setCurlURL(instanceData *pData, uchar **tpls) rLocal); ABORT_FINALIZE(RS_RET_ERR); } - curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } finalize_it: @@ -363,13 +396,15 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) uchar *searchIndex; uchar *searchType; uchar *parent; + uchar *bulkId = NULL; DEFiRet; # define META_STRT "{\"index\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_PARENT "\",\"_parent\":\"" +# define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent); + getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); @@ -380,6 +415,10 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); } + if(bulkId != NULL) { + if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId)); + } if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); @@ -409,7 +448,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) ssize_t wrRet; char errStr[1024]; DEFiRet; - + if(pData->errorFile == NULL) { DBGPRINTF("omelasticsearch: no local error logger defined - " "ignoring ES error information\n"); @@ -524,7 +563,7 @@ checkResult(instanceData *pData, uchar *reqmsg) } /* Note: we ignore errors writing the error file, as we cannot handle - * these in any case. + * these in any case. */ if(iRet == RS_RET_DATAFAIL) { writeDataError(pData, &root, reqmsg); @@ -552,8 +591,8 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) CHKiRet(setCurlURL(pData, tpls)); curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); switch (code) { case CURLE_COULDNT_RESOLVE_HOST: @@ -649,10 +688,10 @@ curlSetup(instanceData *pData) } header = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, header); curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); - curl_easy_setopt(handle, CURLOPT_POST, 1); + curl_easy_setopt(handle, CURLOPT_POST, 1); pData->curlHandle = handle; pData->postHeader = header; @@ -690,6 +729,8 @@ setInstParamDefaults(instanceData *pData) pData->bulkmode = 0; pData->tplName = NULL; pData->errorFile = NULL; + pData->dynBulkId= 0; + pData->bulkId = NULL; } BEGINnewActInst @@ -737,12 +778,16 @@ CODESTARTnewActInst pData->asyncRepl = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) { + pData->dynBulkId = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "bulkid")) { + pData->bulkId = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { dbgprintf("omelasticsearch: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); } } - + if(pData->pwd != NULL && pData->uid == NULL) { errmsg.LogError(0, RS_RET_UID_MISSING, "omelasticsearch: password is provided, but no uid " @@ -767,6 +812,12 @@ CODESTARTnewActInst "name for parent template given - action definition invalid"); ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } + if(pData->dynBulkId && pData->bulkId == NULL) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: requested dynamic bulkid, but no " + "name for bulkid template given - action definition invalid"); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } if(pData->bulkmode) { pData->batch.currTpl1 = NULL; @@ -782,6 +833,7 @@ CODESTARTnewActInst if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; if(pData->dynParent) ++iNumTpls; + if(pData->dynBulkId) ++iNumTpls; DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls); CODE_STD_STRING_REQUESTnewActInst(iNumTpls) @@ -803,11 +855,29 @@ CODESTARTnewActInst if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 4, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } } else { @@ -817,12 +887,30 @@ CODESTARTnewActInst if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } } } else { if(pData->dynParent) { CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent), OMSR_NO_RQD_TPL_OPTS)); - } + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } else { + if(pData->dynBulkId) { + CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->bulkId), + OMSR_NO_RQD_TPL_OPTS)); + } + } } } diff --git a/plugins/omrabbitmq/Makefile.am b/plugins/omrabbitmq/Makefile.am new file mode 100644 index 00000000..de374081 --- /dev/null +++ b/plugins/omrabbitmq/Makefile.am @@ -0,0 +1,8 @@ +pkglib_LTLIBRARIES = omrabbitmq.la + +omrabbitmq_la_SOURCES = omrabbitmq.c +omrabbitmq_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) +omrabbitmq_la_LDFLAGS = -module -avoid-version +omrabbitmq_la_LIBADD = $(RABBITMQ_LIBS) + +EXTRA_DIST = diff --git a/plugins/omrabbitmq/README.md b/plugins/omrabbitmq/README.md new file mode 100644 index 00000000..7aa60206 --- /dev/null +++ b/plugins/omrabbitmq/README.md @@ -0,0 +1,56 @@ + +# rsyslog output module for RabbitMQ + +This module sends syslog messages into RabbitMQ server. + +Only v6 configuration syntax is supported. + +**omrabbitmq is tested only with 6.6.0 version of rsyslog.** + + +## Compile +To successfully compile omrabbitmq module you need [rabbitmq-c](https://github.com/alanxz/rabbitmq-c) library. + + ./configure --enable-omrabbitmq ... + + +---- +## Configure + +omrabbitmq output module supports only v6 configuration syntax. + +Parameters: + +* host=<hostname> – server +* virtual_host=<virtual\_host> – virtual message broker +* user=<user> – user name +* password=<password> – password +* exchange=<name> – exchange name +* routing_key=<name> – name of routing key + + +Example: + + $ModLoad omrabbitmq + + *.* action(type="omrabbitmq" + host="localhost" + virtual_host="/" + user="guest" + password="guest" + exchange="syslog" + routing_key="syslog.all" + template="RSYSLOG_ForwardFormat" + queue.type="linkedlist" + queue.timeoutenqueue="0" + queue.filename="rabbitmq" + queue.highwatermark="500000" + queue.lowwatermark="400000" + queue.discardmark="5000000" + queue.timeoutenqueue="0" + queue.maxdiskspace="5g" + queue.size="2000000" + queue.saveonshutdown="on" + action.resumeretrycount="-1") + + diff --git a/plugins/omrabbitmq/omrabbitmq.c b/plugins/omrabbitmq/omrabbitmq.c new file mode 100644 index 00000000..7ea7793d --- /dev/null +++ b/plugins/omrabbitmq/omrabbitmq.c @@ -0,0 +1,466 @@ +/* omrabbitmq.c + * + * This output plugin enables rsyslog to send messages to the RabbitMQ. + * + * Copyright 2012-2013 Vaclav Tomec + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/>. + * + * Author: Vaclav Tomec + * <vaclav.tomec@gmail.com> + */ +#include "config.h" +#include "rsyslog.h" +#include <stdio.h> +#include <stdarg.h> +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <signal.h> +#include <errno.h> +#include <time.h> +#include "conf.h" +#include "syslogd-types.h" +#include "srUtils.h" +#include "template.h" +#include "module-template.h" +#include "errmsg.h" +#include "cfsysline.h" + +#include <amqp.h> + +MODULE_TYPE_OUTPUT +MODULE_TYPE_NOKEEP +MODULE_CNFNAME("omrabbitmq") + + +/* + * internal structures + */ +DEF_OMOD_STATIC_DATA +DEFobjCurrIf(errmsg) + + +typedef struct _instanceData { + /* here you need to define all action-specific data. A record of type + * instanceData will be handed over to each instance of the action. Keep + * in mind that there may be several invocations of the same type of action + * inside rsyslog.conf, and this is what keeps them apart. Do NOT use + * static data for this! + */ + amqp_connection_state_t conn; + amqp_basic_properties_t props; + uchar *host; + int port; + uchar *vhost; + uchar *user; + uchar *password; + uchar *exchange; + uchar *routing_key; + uchar *tplName; +} instanceData; + + +/* tables for interfacing with the v6 config system */ +/* action (instance) parameters */ +static struct cnfparamdescr actpdescr[] = { + { "host", eCmdHdlrGetWord, 0 }, + { "port", eCmdHdlrInt, 0 }, + { "virtual_host", eCmdHdlrGetWord, 0 }, + { "user", eCmdHdlrGetWord, 0 }, + { "password", eCmdHdlrGetWord, 0 }, + { "exchange", eCmdHdlrGetWord, 0 }, + { "routing_key", eCmdHdlrGetWord, 0 }, + { "template", eCmdHdlrGetWord, 0 } +}; +static struct cnfparamblk actpblk = + { + CNFPARAMBLK_VERSION, + sizeof(actpdescr)/sizeof(struct cnfparamdescr), + actpdescr + }; + + +/* + * Report general error + */ +static int +die_on_error(int x, char const *context) +{ + int retVal = 0; // false + + if (x < 0) { + char *errstr = amqp_error_string(-x); + errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: %s", context, errstr); + free(errstr); + retVal = 1; // true + } + + return retVal; +} + + +/* + * Report AMQP specific error + */ +static int +die_on_amqp_error(amqp_rpc_reply_t x, char const *context) +{ + int retVal = 1; // true + + switch (x.reply_type) { + case AMQP_RESPONSE_NORMAL: + retVal = 0; // false + break; + + case AMQP_RESPONSE_NONE: + errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: missing RPC reply type!", context); + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: %s", context, amqp_error_string(x.library_error)); + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (x.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; + errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: server connection error %d, message: %.*s", + context, + m->reply_code, + (int) m->reply_text.len, (char *) m->reply_text.bytes); + break; + } + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; + errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: server channel error %d, message: %.*s", + context, + m->reply_code, + (int) m->reply_text.len, (char *) m->reply_text.bytes); + break; + } + default: + errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: unknown server error, method id 0x%08X\n", context, x.reply.id); + break; + } + break; + + } + + return retVal; +} + + +static amqp_bytes_t +cstring_bytes(const char *str) +{ + return str ? amqp_cstring_bytes(str) : amqp_empty_bytes; +} + + +static void +closeAMQPConnection(instanceData *pData) +{ + if (pData->conn != NULL) { + die_on_amqp_error(amqp_channel_close(pData->conn, 1, AMQP_REPLY_SUCCESS), "amqp_channel_close"); + die_on_amqp_error(amqp_connection_close(pData->conn, AMQP_REPLY_SUCCESS), "amqp_connection_close"); + die_on_error(amqp_destroy_connection(pData->conn), "amqp_destroy_connection"); + + pData->conn = NULL; + } +} + + +/* + * Initialize RabbitMQ connection + */ +static rsRetVal +initRabbitMQ(instanceData *pData) +{ + int sockfd; + DEFiRet; + + DBGPRINTF("omrabbitmq: trying connect to '%s' at port %d\n", pData->host, pData->port); + + pData->conn = amqp_new_connection(); + + if (die_on_error(sockfd = amqp_open_socket((char*) pData->host, pData->port), "Opening socket")) { + pData->conn = NULL; + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + + amqp_set_sockfd(pData->conn, sockfd); + + if (die_on_amqp_error(amqp_login(pData->conn, (char*) pData->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, pData->user, pData->password), + "Logging in")) { + pData->conn = NULL; + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + + amqp_channel_open(pData->conn, 1); + + if (die_on_amqp_error(amqp_get_rpc_reply(pData->conn), "Opening channel")) { + pData->conn = NULL; + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + RETiRet; +} + + +BEGINcreateInstance +CODESTARTcreateInstance +ENDcreateInstance + + +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + /* use this to specify if select features are supported by this + * plugin. If not, the framework will handle that. Currently, only + * RepeatedMsgReduction ("last message repeated n times") is optional. + */ + if(eFeat == sFEATURERepeatedMsgReduction) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + +BEGINfreeInstance +CODESTARTfreeInstance + /* this is a cleanup callback. All dynamically-allocated resources + * in instance data must be cleaned up here. Prime examples are + * malloc()ed memory, file & database handles and the like. + */ + closeAMQPConnection(pData); + free(pData->host); + free(pData->vhost); + free(pData->user); + free(pData->password); + free(pData->exchange); + free(pData->routing_key); + free(pData->tplName); +ENDfreeInstance + + +BEGINdbgPrintInstInfo +CODESTARTdbgPrintInstInfo + /* permits to spit out some debug info */ + dbgprintf("omrabbitmq\n"); + dbgprintf("\thost='%s'\n", pData->host); + dbgprintf("\tport=%d\n", pData->port); + dbgprintf("\tvirtual_host='%s'\n", pData->vhost); + dbgprintf("\tuser='%s'\n", pData->user == NULL ? (uchar*)"(not configured)" : pData->user); + dbgprintf("\tpassword=(%sconfigured)\n", pData->password == NULL ? "not " : ""); + dbgprintf("\texchange='%s'\n", pData->exchange); + dbgprintf("\trouting_key='%s'\n", pData->routing_key); + dbgprintf("\ttemplate='%s'\n", pData->tplName); +ENDdbgPrintInstInfo + + +BEGINtryResume +CODESTARTtryResume + /* this is called when an action has been suspended and the + * rsyslog core tries to resume it. The action must then + * retry (if possible) and report RS_RET_OK if it succeeded + * or RS_RET_SUSPENDED otherwise. + * Note that no data can be written in this callback, as it is + * not present. Prime examples of what can be retried are + * reconnects to remote hosts, reconnects to database, + * opening of files and the like. + * If there is no retry-type of operation, the action may + * return RS_RET_OK, so that it will get called on its doAction + * entry point (where it receives data), retries there, and + * immediately returns RS_RET_SUSPENDED if that does not work + * out. This disables some optimizations in the core's retry logic, + * but is a valid and expected behaviour. Note that it is also OK + * for the retry entry point to return OK but the immediately following + * doAction call to fail. In real life, for example, a buggy com line + * may cause such behaviour. + * Note that there is no guarantee that the core will very quickly + * call doAction after the retry succeeded. Today, it does, but that may + * not always be the case. + */ + + if (pData->conn == NULL) { + iRet = initRabbitMQ(pData); + } + +ENDtryResume + + +BEGINdoAction +CODESTARTdoAction + /* this is where you receive the message and need to carry out the + * action. Data is provided in ppString[i] where 0 <= i <= num of strings + * requested. + * Return RS_RET_OK if all goes well, RS_RET_SUSPENDED if the action can + * currently not complete, or an error code or RS_RET_DISABLED. The later + * two should only be returned if there is no hope that the action can be + * restored unless an rsyslog restart (prime example is an invalid config). + * Error code or RS_RET_DISABLED permanently disables the action, up to + * the next restart. + */ + + amqp_bytes_t body_bytes; + + if (pData->conn == NULL) { + CHKiRet(initRabbitMQ(pData)); + } + + body_bytes = amqp_cstring_bytes((char *)ppString[0]); + + if (die_on_error(amqp_basic_publish(pData->conn, 1, + cstring_bytes((char *) pData->exchange), + cstring_bytes((char *) pData->routing_key), + 0, 0, &pData->props, body_bytes), "amqp_basic_publish")) { + closeAMQPConnection(pData); + ABORT_FINALIZE(RS_RET_SUSPENDED); + } + +finalize_it: + +ENDdoAction + + +static inline void +setInstParamDefaults(instanceData *pData) +{ + pData->host = NULL; + pData->port = 5672; + pData->vhost = NULL; + pData->user = NULL; + pData->password = NULL; + pData->exchange = NULL; + pData->routing_key = NULL; + pData->tplName = NULL; +} + + +BEGINnewActInst + struct cnfparamvals *pvals; + int i; +CODESTARTnewActInst + + if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + CHKiRet(createInstance(&pData)); + setInstParamDefaults(pData); + + CODE_STD_STRING_REQUESTparseSelectorAct(1) + + for(i = 0 ; i < actpblk.nParams ; ++i) { + if (!pvals[i].bUsed) + continue; + if (!strcmp(actpblk.descr[i].name, "host")) { + pData->host = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "port")) { + pData->port = (int) pvals[i].val.d.n; + } else if (!strcmp(actpblk.descr[i].name, "virtual_host")) { + pData->vhost = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "user")) { + pData->user = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "password")) { + pData->password = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "exchange")) { + pData->exchange = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "routing_key")) { + pData->routing_key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(actpblk.descr[i].name, "template")) { + pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("omrabbitmq: program error, non-handled param '%s'\n", actpblk.descr[i].name); + } + } + + if (pData->host == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter host must be specified"); + ABORT_FINALIZE(RS_RET_INVALID_PARAMS); + } + + if (pData->vhost == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter virtual_host must be specified"); + ABORT_FINALIZE(RS_RET_INVALID_PARAMS); + } + + if (pData->user == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter user must be specified"); + ABORT_FINALIZE(RS_RET_INVALID_PARAMS); + } + + if (pData->password == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter password must be specified"); + ABORT_FINALIZE(RS_RET_INVALID_PARAMS); + } + + if (pData->exchange == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter exchange must be specified"); + ABORT_FINALIZE(RS_RET_INVALID_PARAMS); + } + + if (pData->routing_key == NULL) { + errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter routing_key must be specified"); + ABORT_FINALIZE(RS_RET_INVALID_PARAMS); + } + + // RabbitMQ properties initialization + memset(&pData->props, 0, sizeof pData->props); + pData->props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG; + pData->props.delivery_mode = 2; /* persistent delivery mode */ + pData->props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; + pData->props.content_type = amqp_cstring_bytes("application/json"); + + CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? + " StdJSONFmt" : (char*)pData->tplName), + OMSR_NO_RQD_TPL_OPTS)); + +CODE_STD_FINALIZERnewActInst + cnfparamvalsDestruct(pvals, &actpblk); +ENDnewActInst + + +BEGINparseSelectorAct +CODESTARTparseSelectorAct + CODE_STD_STRING_REQUESTparseSelectorAct(1) + if(!strncmp((char*) p, ":omrabbitmq:", sizeof(":omrabbitmq:") - 1)) { + errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, + "omrabbitmq supports only v6 config format, use: " + "action(type=\"omrabbitmq\" host=...)"); + } + ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); +CODE_STD_FINALIZERparseSelectorAct +ENDparseSelectorAct + + +BEGINmodExit +CODESTARTmodExit + objRelease(errmsg, CORE_COMPONENT); +ENDmodExit + + +BEGINqueryEtryPt +CODESTARTqueryEtryPt + CODEqueryEtryPt_STD_OMOD_QUERIES + CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +ENDqueryEtryPt + + +BEGINmodInit() +CODESTARTmodInit + *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ +CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(errmsg, CORE_COMPONENT)); +ENDmodInit diff --git a/plugins/omudpspoof/omudpspoof.c b/plugins/omudpspoof/omudpspoof.c index 9c4c80ba..c80f0e57 100644 --- a/plugins/omudpspoof/omudpspoof.c +++ b/plugins/omudpspoof/omudpspoof.c @@ -435,8 +435,6 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) /* Write it to the wire. */ lsent = libnet_write(pData->libnet_handle); - dbgprintf("DDDD: omudpspoof stage 1 return state %d (expected %d), fd %d\n", lsent, - (int) (LIBNET_IPV4_H+LIBNET_UDP_H+pktLen), pData->libnet_handle->fd); if(lsent != (int) (LIBNET_IPV4_H+LIBNET_UDP_H+pktLen)) { /* note: access to fd is a libnet internal. If a newer version of libnet does * not expose that member, we should simply remove it. However, while it is there @@ -490,7 +488,6 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) } /* Write it to the wire. */ lsent = libnet_write(pData->libnet_handle); - dbgprintf("DDDD: omudpspoof stage 1 return state %d (expected %d)\n", lsent, (int) (LIBNET_IPV4_H+pktLen)); if(lsent != (int) (LIBNET_IPV4_H+pktLen)) { DBGPRINTF("omudpspoof: fragment write error len %d, sent %d: %s\n", LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pData->libnet_handle)); |