summaryrefslogtreecommitdiffstats
path: root/plugins/imjournal/imjournal.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imjournal/imjournal.c')
-rw-r--r--plugins/imjournal/imjournal.c385
1 files changed, 385 insertions, 0 deletions
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c
new file mode 100644
index 00000000..8e1066dc
--- /dev/null
+++ b/plugins/imjournal/imjournal.c
@@ -0,0 +1,385 @@
+/* The systemd journal import module
+ *
+ * To test under Linux:
+ * emmit log message into systemd journal
+ *
+ * Copyright (C) 2008-2012 Adiscon GmbH
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * -or-
+ * see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <assert.h>
+#include <string.h>
+#include <stdarg.h>
+#include <ctype.h>
+#include <stdlib.h>
+#include <time.h>
+#include <sys/socket.h>
+
+#include "dirty.h"
+#include "cfsysline.h"
+#include "obj.h"
+#include "msg.h"
+#include "module-template.h"
+#include "datetime.h"
+#include "imjournal.h"
+#include "net.h"
+#include "glbl.h"
+#include "prop.h"
+#include "errmsg.h"
+#include "unicode-helper.h"
+#include <systemd/sd-journal.h>
+
+MODULE_TYPE_INPUT
+MODULE_TYPE_NOKEEP
+MODULE_CNFNAME("imjournal")
+
+/* Module static data */
+DEF_IMOD_STATIC_DATA
+DEFobjCurrIf(datetime)
+DEFobjCurrIf(glbl)
+DEFobjCurrIf(prop)
+DEFobjCurrIf(net)
+DEFobjCurrIf(errmsg)
+
+static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
+
+static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */
+static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */
+
+static sd_journal *j;
+
+/* enqueue the the journal message into the message queue.
+ * The provided msg string is not freed - thus must be done
+ * by the caller.
+ */
+static rsRetVal
+enqMsg(uchar *msg, uchar *pszTag, int iFacility, int iSeverity, struct timeval *tp, struct json_object *json)
+{
+ struct syslogTime st;
+ msg_t *pMsg;
+ DEFiRet;
+
+ assert(msg != NULL);
+ assert(pszTag != NULL);
+
+ if(tp == NULL) {
+ CHKiRet(msgConstruct(&pMsg));
+ } else {
+ datetime.timeval2syslogTime(tp, &st);
+ CHKiRet(msgConstructWithTime(&pMsg, &st, tp->tv_sec));
+ }
+ MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
+ MsgSetInputName(pMsg, pInputName);
+ MsgSetRawMsgWOSize(pMsg, (char*)msg);
+ MsgSetMSGoffs(pMsg, 0); /* we do not have a header... */
+ MsgSetRcvFrom(pMsg, glbl.GetLocalHostNameProp());
+ MsgSetRcvFromIP(pMsg, pLocalHostIP);
+ MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName()));
+ MsgSetTAG(pMsg, pszTag, ustrlen(pszTag));
+ pMsg->iFacility = iFacility;
+ pMsg->iSeverity = iSeverity;
+
+ if(json != NULL) {
+ msgAddJSON(pMsg, (uchar*)"!", json);
+ }
+
+ CHKiRet(submitMsg(pMsg));
+
+finalize_it:
+ RETiRet;
+}
+
+
+/* Read journal log while data are available, each read() reads one
+ * record of printk buffer.
+ */
+static rsRetVal
+readjournal() {
+ DEFiRet;
+
+ struct timeval tv;
+ uint64_t timestamp;
+
+ struct json_object *json = NULL;
+ json = json_object_new_object();
+
+ /* Information from messages */
+ char *message;
+ char *sys_iden;
+ char *sys_iden_help;
+
+ const void *get;
+ uchar *parse;
+ char *get2;
+ size_t length;
+
+ const void *equal_sign;
+ struct json_object *jval;
+ char *data;
+ char *name;
+ size_t l;
+
+ long prefixlen = 0;
+
+ int priority = 0;
+ int facility = 0;
+
+ /* Get next journal message, if there is none, wait a second */
+ if (sd_journal_next(j) == 0) {
+ sleep(1);
+ iRet = RS_RET_OK;
+ goto ret;
+ }
+
+ /* Get message text */
+ if (sd_journal_get_data(j, "MESSAGE", &get, &length) < 0) {
+ logmsgInternal(NO_ERRCODE, LOG_SYSLOG|LOG_INFO, (uchar *)"log message from journal doesn't have MESSAGE", 0);
+ iRet = RS_RET_OK;
+ goto ret;
+ }
+ message = strndup(get+8, length-8);
+ if (message == NULL) {
+ iRet = RS_RET_OUT_OF_MEMORY;
+ goto ret;
+ }
+
+ /* Get message priority */
+ if (sd_journal_get_data(j, "PRIORITY", &get, &length) >= 0) {
+ get2 = strndup(get, length);
+ priority = ((char *)get2)[9] - '0';
+ free (get2);
+ }
+
+ /* Get syslog facility */
+ if (sd_journal_get_data(j, "SYSLOG_FACILITY", &get, &length) >= 0) {
+ get2 = strndup(get, length);
+ char f = ((char *)get2)[16];
+ if (f >= '0' && f <= '9') {
+ facility += f - '0';
+ }
+ f = ((char *)get2)[17];
+ if (f >= '0' && f <= '9') {
+ facility *= 10;
+ facility += (f - '0');
+ }
+ free (get2);
+ } else {
+ /* message is missing facility -> internal systemd journal msg, drop */
+ iRet = RS_RET_OK;
+ goto free_message;
+ }
+
+ /* Get message identifier and add ':' */
+ if (sd_journal_get_data(j, "SYSLOG_IDENTIFIER", &get, &length) >= 0) {
+ sys_iden = strndup(get+18, length-18);
+ } else {
+ sys_iden = strdup("journal");
+ }
+ if (sys_iden == NULL) {
+ iRet = RS_RET_OUT_OF_MEMORY;
+ goto free_message;
+ }
+
+ asprintf(&sys_iden_help, "%s:", sys_iden);
+ if (sys_iden_help == NULL) {
+ iRet = RS_RET_OUT_OF_MEMORY;
+ goto finalize_it;
+ }
+ free (sys_iden);
+
+
+ SD_JOURNAL_FOREACH_DATA(j, get, l) {
+ /* locate equal sign, this is always present */
+ equal_sign = memchr(get, '=', l);
+ assert (equal_sign != NULL);
+
+ /* get length of journal data prefix */
+ prefixlen = ((char *)equal_sign - (char *)get);
+
+ /* translate name fields to lumberjack names */
+ parse = (uchar *)get;
+
+ switch (*parse)
+ {
+ case '_':
+ ++parse;
+ if (*parse == 'P') {
+ name = strdup("pid");
+ } else if (*parse == 'G') {
+ name = strdup("gid");
+ } else if (*parse == 'U') {
+ name = strdup("uid");
+ } else if (*parse == 'E') {
+ name = strdup("exe");
+ } else if (*parse == 'C') {
+ parse++;
+ if (*parse == 'O') {
+ name = strdup("appname");
+ } else if (*parse == 'M') {
+ name = strdup("cmd");
+ } else {
+ name = strndup(get, prefixlen);
+ }
+ } else {
+ name = strndup(get, prefixlen);
+ }
+ break;
+
+ default:
+ name = strndup(get, prefixlen);
+ break;
+ }
+
+ if (name == NULL) {
+ iRet = RS_RET_OUT_OF_MEMORY;
+ goto ret;
+ }
+
+ prefixlen++; /* remove '=' */
+
+ data = strndup(get + prefixlen, l - prefixlen);
+ if (data == NULL) {
+ iRet = RS_RET_OUT_OF_MEMORY;
+ free (name);
+ goto ret;
+ }
+
+ /* and save them to json object */
+ jval = json_object_new_string((char *)data);
+ json_object_object_add(json, name, jval);
+ free (data);
+ free (name);
+ }
+
+ /* calculate timestamp */
+ if (sd_journal_get_realtime_usec(j, &timestamp) >= 0) {
+ tv.tv_sec = timestamp / 1000000;
+ tv.tv_usec = timestamp % 1000000;
+ }
+
+ /* submit message */
+ enqMsg((uchar *)message, (uchar *) sys_iden_help, facility, priority, &tv, json);
+
+finalize_it:
+ free(sys_iden_help);
+free_message:
+ free(message);
+ret:
+ RETiRet;
+}
+
+
+BEGINrunInput
+CODESTARTrunInput
+ /* this is an endless loop - it is terminated when the thread is
+ * signalled to do so. This, however, is handled by the framework,
+ * right into the sleep below.
+ */
+ while(!pThrd->bShallStop) {
+ CHKiRet(readjournal());
+ }
+finalize_it:
+ENDrunInput
+
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ bLegacyCnfModGlobalsPermitted = 1;
+ENDbeginCnfLoad
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ENDactivateCnf
+
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+/* open journal */
+BEGINwillRun
+CODESTARTwillRun
+ int ret;
+ ret = sd_journal_open(&j, SD_JOURNAL_LOCAL_ONLY);
+ if (ret < 0) {
+ iRet = RS_RET_IO_ERROR;
+ }
+ENDwillRun
+
+/* close journal */
+BEGINafterRun
+CODESTARTafterRun
+ sd_journal_close(j);
+ENDafterRun
+
+
+BEGINmodExit
+CODESTARTmodExit
+ if(pInputName != NULL)
+ prop.Destruct(&pInputName);
+ if(pLocalHostIP != NULL)
+ prop.Destruct(&pLocalHostIP);
+
+ /* release objects we used */
+ objRelease(glbl, CORE_COMPONENT);
+ objRelease(net, CORE_COMPONENT);
+ objRelease(datetime, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
+ objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_IMOD_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+ *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+ CHKiRet(objUse(datetime, CORE_COMPONENT));
+ CHKiRet(objUse(glbl, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(net, CORE_COMPONENT));
+ CHKiRet(objUse(errmsg, CORE_COMPONENT));
+
+ /* we need to create the inputName property (only once during our lifetime) */
+ CHKiRet(prop.CreateStringProp(&pInputName, UCHAR_CONSTANT("imjournal"), sizeof("imjournal") - 1));
+ CHKiRet(prop.CreateStringProp(&pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1));
+
+ /* init legacy config settings */
+
+ENDmodInit
+/* vim:set ai:
+ */