diff options
Diffstat (limited to 'plugins/imjournal/imjournal.c')
-rwxr-xr-x[-rw-r--r--] | plugins/imjournal/imjournal.c | 153 |
1 files changed, 134 insertions, 19 deletions
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c index 2af1958e..cce45b9c 100644..100755 --- a/plugins/imjournal/imjournal.c +++ b/plugins/imjournal/imjournal.c @@ -30,7 +30,9 @@ #include <ctype.h> #include <stdlib.h> #include <time.h> +#include <sys/poll.h> #include <sys/socket.h> +#include <errno.h> #include "dirty.h" #include "cfsysline.h" @@ -43,6 +45,7 @@ #include "glbl.h" #include "prop.h" #include "errmsg.h" +#include "srUtils.h" #include "unicode-helper.h" #include <systemd/sd-journal.h> @@ -136,16 +139,20 @@ readjournal() { uint64_t timestamp; struct json_object *json = NULL; + int r; /* Information from messages */ char *message; + char *sys_pid; char *sys_iden; char *sys_iden_help; const void *get; + const void *pidget; char *parse; char *get2; size_t length; + size_t pidlength; const void *equal_sign; struct json_object *jval; @@ -158,13 +165,6 @@ readjournal() { int priority = 0; int facility = 0; - /* Get next journal message, if there is none, wait a second */ - if (sd_journal_next(j) == 0) { - sleep(1); - iRet = RS_RET_OK; - goto ret; - } - /* Get message text */ if (sd_journal_get_data(j, "MESSAGE", &get, &length) < 0) { logmsgInternal(NO_ERRCODE, LOG_SYSLOG|LOG_INFO, (uchar *)"log message from journal doesn't have MESSAGE", 0); @@ -203,7 +203,7 @@ readjournal() { goto free_message; } - /* Get message identifier and add ':' */ + /* Get message identifier, client pid and add ':' */ if (sd_journal_get_data(j, "SYSLOG_IDENTIFIER", &get, &length) >= 0) { sys_iden = strndup(get+18, length-18); } else { @@ -214,19 +214,44 @@ readjournal() { goto free_message; } - asprintf(&sys_iden_help, "%s:", sys_iden); - if (sys_iden_help == NULL) { + if (sd_journal_get_data(j, "SYSLOG_PID", &pidget, &pidlength) >= 0) { + sys_pid = strndup(pidget+11, pidlength-11); + if (sys_pid == NULL) { + iRet = RS_RET_OUT_OF_MEMORY; + free (sys_iden); + goto free_message; + } + } else { + sys_pid = NULL; + } + + if (sys_pid) { + r = asprintf(&sys_iden_help, "%s[%s]:", sys_iden, sys_pid); + } else { + r = asprintf(&sys_iden_help, "%s:", sys_iden); + } + + free (sys_iden); + free (sys_pid); + + if (-1 == r) { iRet = RS_RET_OUT_OF_MEMORY; goto finalize_it; } - free (sys_iden); json = json_object_new_object(); SD_JOURNAL_FOREACH_DATA(j, get, l) { /* locate equal sign, this is always present */ equal_sign = memchr(get, '=', l); - assert (equal_sign != NULL); + + /* ... but we know better than to trust the specs */ + if (equal_sign == NULL) { + errmsg.LogError(0, RS_RET_ERR,"SD_JOURNAL_FOREACH_DATA()" + " returned a malformed field (has no '='): '%s'", + get); + continue; /* skip the entry */ + } /* get length of journal data prefix */ prefixlen = ((char *)equal_sign - (char *)get); @@ -337,7 +362,9 @@ persistJournalState () { char *cursor; int ret = 0; - if ((ret = sd_journal_get_cursor(j, &cursor)) > 0) { + /* On success, sd_journal_get_cursor() returns 1 in systemd + 197 or older and 0 in systemd 198 or newer */ + if ((ret = sd_journal_get_cursor(j, &cursor)) >= 0) { if ((sf = fopen(cs.stateFile, "wb")) != NULL) { if (fprintf(sf, "%s", cursor) < 0) { iRet = RS_RET_IO_ERROR; @@ -345,29 +372,92 @@ persistJournalState () { fclose(sf); free(cursor); } else { + char errStr[256]; + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "fopen() failed: " + "'%s', path: '%s'\n", errStr, cs.stateFile); iRet = RS_RET_FOPEN_FAILURE; } } else { + char errStr[256]; + rs_strerror_r(-(ret), errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, "sd_journal_get_cursor() failed: '%s'\n", errStr); iRet = RS_RET_ERR; } RETiRet; } +/* Polls the journal for new messages. Similar to sd_journal_wait() + * except for the special handling of EINTR. + */ +static rsRetVal +pollJournal() +{ + DEFiRet; + struct pollfd pollfd; + int r; + + pollfd.fd = sd_journal_get_fd(j); + pollfd.events = sd_journal_get_events(j); + r = poll(&pollfd, 1, -1); + if (r == -1) { + if (errno == EINTR) { + /* EINTR is also received during termination + * so return now to check the term state. + */ + ABORT_FINALIZE(RS_RET_OK); + } else { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "poll() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + } + + assert(r == 1); + + r = sd_journal_process(j); + if (r < 0) { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "sd_journal_process() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + +finalize_it: + RETiRet; +} + + BEGINrunInput CODESTARTrunInput /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, * right into the sleep below. */ - int count = 0; + if (cs.stateFile[0] != '/') { + char *new_stateFile; - char readCursor[128 + 1]; - FILE *r_sf; + if (-1 == asprintf(&new_stateFile, "%s/%s", (char *)glbl.GetWorkDir(), cs.stateFile)) { + errmsg.LogError(0, RS_RET_OUT_OF_MEMORY, "imjournal: asprintf failed\n"); + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + free (cs.stateFile); + cs.stateFile = new_stateFile; + } /* if state file exists, set cursor to appropriate position */ if (access(cs.stateFile, F_OK|R_OK) != -1) { + FILE *r_sf; + if ((r_sf = fopen(cs.stateFile, "rb")) != NULL) { + char readCursor[128 + 1]; + if (fscanf(r_sf, "%128s\n", readCursor) != EOF) { if (sd_journal_seek_cursor(j, readCursor) != 0) { errmsg.LogError(0, RS_RET_ERR, "imjournal: " @@ -390,14 +480,32 @@ CODESTARTrunInput } while (glbl.GetGlobalInputTermState() == 0) { + int count = 0, r; + + r = sd_journal_next(j); + if (r < 0) { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "sd_journal_next() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + + if (r == 0) { + /* No new messages, wait for activity. */ + CHKiRet(pollJournal()); + continue; + } + CHKiRet(readjournal()); + /* TODO: This could use some finer metric. */ count++; if (count == cs.iPersistStateInterval) { count = 0; persistJournalState(); } } - persistJournalState(); finalize_it: ENDrunInput @@ -444,6 +552,7 @@ ENDwillRun /* close journal */ BEGINafterRun CODESTARTafterRun + persistJournalState(); sd_journal_close(j); ENDafterRun @@ -500,16 +609,22 @@ finalize_it: ENDsetModCnf +BEGINisCompatibleWithFeature +CODESTARTisCompatibleWithFeature + if(eFeat == sFEATURENonCancelInputTermination) + iRet = RS_RET_OK; +ENDisCompatibleWithFeature + + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES +CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES ENDqueryEtryPt - - BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ |