diff options
Diffstat (limited to 'plugins/imjournal/imjournal.c')
-rwxr-xr-x | plugins/imjournal/imjournal.c | 78 |
1 files changed, 67 insertions, 11 deletions
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c index aad7194a..ae29154d 100755 --- a/plugins/imjournal/imjournal.c +++ b/plugins/imjournal/imjournal.c @@ -30,6 +30,7 @@ #include <ctype.h> #include <stdlib.h> #include <time.h> +#include <sys/poll.h> #include <sys/socket.h> #include <errno.h> @@ -164,14 +165,6 @@ readjournal() { int priority = 0; int facility = 0; - /* Get next journal message, if there is none, wait for next */ - while (sd_journal_next(j) == 0) { - if (sd_journal_wait(j, (uint64_t) -1) < 0) { - iRet = RS_RET_ERR; - goto ret; - } - } - /* Get message text */ if (sd_journal_get_data(j, "MESSAGE", &get, &length) < 0) { logmsgInternal(NO_ERRCODE, LOG_SYSLOG|LOG_INFO, (uchar *)"log message from journal doesn't have MESSAGE", 0); @@ -388,14 +381,58 @@ persistJournalState () { } +/* Polls the journal for new messages. Similar to sd_journal_wait() + * except for the special handling of EINTR. + */ +static rsRetVal +pollJournal() +{ + DEFiRet; + struct pollfd pollfd; + int r; + + pollfd.fd = sd_journal_get_fd(j); + pollfd.events = sd_journal_get_events(j); + r = poll(&pollfd, 1, -1); + if (r == -1) { + if (errno == EINTR) { + /* EINTR is also received during termination + * so return now to check the term state. + */ + ABORT_FINALIZE(RS_RET_OK); + } else { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "poll() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + } + + assert(r == 1); + + r = sd_journal_process(j); + if (r < 0) { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "sd_journal_process() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + +finalize_it: + RETiRet; +} + + BEGINrunInput CODESTARTrunInput /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, * right into the sleep below. */ - int count = 0; - if (cs.stateFile[0] != '/') { char *new_stateFile; @@ -436,14 +473,32 @@ CODESTARTrunInput } while (glbl.GetGlobalInputTermState() == 0) { + int count = 0, r; + + r = sd_journal_next(j); + if (r < 0) { + char errStr[256]; + + rs_strerror_r(errno, errStr, sizeof(errStr)); + errmsg.LogError(0, RS_RET_ERR, + "sd_journal_next() failed: '%s'", errStr); + ABORT_FINALIZE(RS_RET_ERR); + } + + if (r == 0) { + /* No new messages, wait for activity. */ + CHKiRet(pollJournal()); + continue; + } + CHKiRet(readjournal()); + /* TODO: This could use some finer metric. */ count++; if (count == cs.iPersistStateInterval) { count = 0; persistJournalState(); } } - persistJournalState(); finalize_it: ENDrunInput @@ -490,6 +545,7 @@ ENDwillRun /* close journal */ BEGINafterRun CODESTARTafterRun + persistJournalState(); sd_journal_close(j); ENDafterRun |