summaryrefslogtreecommitdiffstats
path: root/plugins/imjournal/imjournal.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/imjournal/imjournal.c')
-rwxr-xr-x[-rw-r--r--]plugins/imjournal/imjournal.c153
1 files changed, 134 insertions, 19 deletions
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c
index 2af1958e..cce45b9c 100644..100755
--- a/plugins/imjournal/imjournal.c
+++ b/plugins/imjournal/imjournal.c
@@ -30,7 +30,9 @@
#include <ctype.h>
#include <stdlib.h>
#include <time.h>
+#include <sys/poll.h>
#include <sys/socket.h>
+#include <errno.h>
#include "dirty.h"
#include "cfsysline.h"
@@ -43,6 +45,7 @@
#include "glbl.h"
#include "prop.h"
#include "errmsg.h"
+#include "srUtils.h"
#include "unicode-helper.h"
#include <systemd/sd-journal.h>
@@ -136,16 +139,20 @@ readjournal() {
uint64_t timestamp;
struct json_object *json = NULL;
+ int r;
/* Information from messages */
char *message;
+ char *sys_pid;
char *sys_iden;
char *sys_iden_help;
const void *get;
+ const void *pidget;
char *parse;
char *get2;
size_t length;
+ size_t pidlength;
const void *equal_sign;
struct json_object *jval;
@@ -158,13 +165,6 @@ readjournal() {
int priority = 0;
int facility = 0;
- /* Get next journal message, if there is none, wait a second */
- if (sd_journal_next(j) == 0) {
- sleep(1);
- iRet = RS_RET_OK;
- goto ret;
- }
-
/* Get message text */
if (sd_journal_get_data(j, "MESSAGE", &get, &length) < 0) {
logmsgInternal(NO_ERRCODE, LOG_SYSLOG|LOG_INFO, (uchar *)"log message from journal doesn't have MESSAGE", 0);
@@ -203,7 +203,7 @@ readjournal() {
goto free_message;
}
- /* Get message identifier and add ':' */
+ /* Get message identifier, client pid and add ':' */
if (sd_journal_get_data(j, "SYSLOG_IDENTIFIER", &get, &length) >= 0) {
sys_iden = strndup(get+18, length-18);
} else {
@@ -214,19 +214,44 @@ readjournal() {
goto free_message;
}
- asprintf(&sys_iden_help, "%s:", sys_iden);
- if (sys_iden_help == NULL) {
+ if (sd_journal_get_data(j, "SYSLOG_PID", &pidget, &pidlength) >= 0) {
+ sys_pid = strndup(pidget+11, pidlength-11);
+ if (sys_pid == NULL) {
+ iRet = RS_RET_OUT_OF_MEMORY;
+ free (sys_iden);
+ goto free_message;
+ }
+ } else {
+ sys_pid = NULL;
+ }
+
+ if (sys_pid) {
+ r = asprintf(&sys_iden_help, "%s[%s]:", sys_iden, sys_pid);
+ } else {
+ r = asprintf(&sys_iden_help, "%s:", sys_iden);
+ }
+
+ free (sys_iden);
+ free (sys_pid);
+
+ if (-1 == r) {
iRet = RS_RET_OUT_OF_MEMORY;
goto finalize_it;
}
- free (sys_iden);
json = json_object_new_object();
SD_JOURNAL_FOREACH_DATA(j, get, l) {
/* locate equal sign, this is always present */
equal_sign = memchr(get, '=', l);
- assert (equal_sign != NULL);
+
+ /* ... but we know better than to trust the specs */
+ if (equal_sign == NULL) {
+ errmsg.LogError(0, RS_RET_ERR,"SD_JOURNAL_FOREACH_DATA()"
+ " returned a malformed field (has no '='): '%s'",
+ get);
+ continue; /* skip the entry */
+ }
/* get length of journal data prefix */
prefixlen = ((char *)equal_sign - (char *)get);
@@ -337,7 +362,9 @@ persistJournalState () {
char *cursor;
int ret = 0;
- if ((ret = sd_journal_get_cursor(j, &cursor)) > 0) {
+ /* On success, sd_journal_get_cursor() returns 1 in systemd
+ 197 or older and 0 in systemd 198 or newer */
+ if ((ret = sd_journal_get_cursor(j, &cursor)) >= 0) {
if ((sf = fopen(cs.stateFile, "wb")) != NULL) {
if (fprintf(sf, "%s", cursor) < 0) {
iRet = RS_RET_IO_ERROR;
@@ -345,29 +372,92 @@ persistJournalState () {
fclose(sf);
free(cursor);
} else {
+ char errStr[256];
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "fopen() failed: "
+ "'%s', path: '%s'\n", errStr, cs.stateFile);
iRet = RS_RET_FOPEN_FAILURE;
}
} else {
+ char errStr[256];
+ rs_strerror_r(-(ret), errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_ERR, "sd_journal_get_cursor() failed: '%s'\n", errStr);
iRet = RS_RET_ERR;
}
RETiRet;
}
+/* Polls the journal for new messages. Similar to sd_journal_wait()
+ * except for the special handling of EINTR.
+ */
+static rsRetVal
+pollJournal()
+{
+ DEFiRet;
+ struct pollfd pollfd;
+ int r;
+
+ pollfd.fd = sd_journal_get_fd(j);
+ pollfd.events = sd_journal_get_events(j);
+ r = poll(&pollfd, 1, -1);
+ if (r == -1) {
+ if (errno == EINTR) {
+ /* EINTR is also received during termination
+ * so return now to check the term state.
+ */
+ ABORT_FINALIZE(RS_RET_OK);
+ } else {
+ char errStr[256];
+
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_ERR,
+ "poll() failed: '%s'", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ }
+
+ assert(r == 1);
+
+ r = sd_journal_process(j);
+ if (r < 0) {
+ char errStr[256];
+
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_ERR,
+ "sd_journal_process() failed: '%s'", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+finalize_it:
+ RETiRet;
+}
+
+
BEGINrunInput
CODESTARTrunInput
/* this is an endless loop - it is terminated when the thread is
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
*/
- int count = 0;
+ if (cs.stateFile[0] != '/') {
+ char *new_stateFile;
- char readCursor[128 + 1];
- FILE *r_sf;
+ if (-1 == asprintf(&new_stateFile, "%s/%s", (char *)glbl.GetWorkDir(), cs.stateFile)) {
+ errmsg.LogError(0, RS_RET_OUT_OF_MEMORY, "imjournal: asprintf failed\n");
+ ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+ }
+ free (cs.stateFile);
+ cs.stateFile = new_stateFile;
+ }
/* if state file exists, set cursor to appropriate position */
if (access(cs.stateFile, F_OK|R_OK) != -1) {
+ FILE *r_sf;
+
if ((r_sf = fopen(cs.stateFile, "rb")) != NULL) {
+ char readCursor[128 + 1];
+
if (fscanf(r_sf, "%128s\n", readCursor) != EOF) {
if (sd_journal_seek_cursor(j, readCursor) != 0) {
errmsg.LogError(0, RS_RET_ERR, "imjournal: "
@@ -390,14 +480,32 @@ CODESTARTrunInput
}
while (glbl.GetGlobalInputTermState() == 0) {
+ int count = 0, r;
+
+ r = sd_journal_next(j);
+ if (r < 0) {
+ char errStr[256];
+
+ rs_strerror_r(errno, errStr, sizeof(errStr));
+ errmsg.LogError(0, RS_RET_ERR,
+ "sd_journal_next() failed: '%s'", errStr);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ if (r == 0) {
+ /* No new messages, wait for activity. */
+ CHKiRet(pollJournal());
+ continue;
+ }
+
CHKiRet(readjournal());
+ /* TODO: This could use some finer metric. */
count++;
if (count == cs.iPersistStateInterval) {
count = 0;
persistJournalState();
}
}
- persistJournalState();
finalize_it:
ENDrunInput
@@ -444,6 +552,7 @@ ENDwillRun
/* close journal */
BEGINafterRun
CODESTARTafterRun
+ persistJournalState();
sd_journal_close(j);
ENDafterRun
@@ -500,16 +609,22 @@ finalize_it:
ENDsetModCnf
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ if(eFeat == sFEATURENonCancelInputTermination)
+ iRet = RS_RET_OK;
+ENDisCompatibleWithFeature
+
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
ENDqueryEtryPt
-
-
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */