summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/imjournal/imjournal.c138
1 files changed, 132 insertions, 6 deletions
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c
index 8e1066dc..88b71c54 100644
--- a/plugins/imjournal/imjournal.c
+++ b/plugins/imjournal/imjournal.c
@@ -58,7 +58,25 @@ DEFobjCurrIf(prop)
DEFobjCurrIf(net)
DEFobjCurrIf(errmsg)
-static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
+static struct configSettings_s {
+ char *stateFile;
+ int iPersistStateInterval;
+} cs;
+
+/* module-gloval parameters */
+static struct cnfparamdescr modpdescr[] = {
+ { "statefile", eCmdHdlrGetWord, 0 },
+ { "persiststateinterval", eCmdHdlrInt, 0 }
+};
+static struct cnfparamblk modpblk =
+ { CNFPARAMBLK_VERSION,
+ sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+ modpdescr
+ };
+
+#define DFLT_persiststateinterval 10
+
+static int bLegacyCnfModGlobalsPermitted = 0;/* are legacy module-global config parameters permitted? */
static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */
static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */
@@ -100,7 +118,7 @@ enqMsg(uchar *msg, uchar *pszTag, int iFacility, int iSeverity, struct timeval *
msgAddJSON(pMsg, (uchar*)"!", json);
}
- CHKiRet(submitMsg(pMsg));
+ CHKiRet(submitMsg2(pMsg));
finalize_it:
RETiRet;
@@ -118,7 +136,6 @@ readjournal() {
uint64_t timestamp;
struct json_object *json = NULL;
- json = json_object_new_object();
/* Information from messages */
char *message;
@@ -204,6 +221,7 @@ readjournal() {
}
free (sys_iden);
+ json = json_object_new_object();
SD_JOURNAL_FOREACH_DATA(j, get, l) {
/* locate equal sign, this is always present */
@@ -286,15 +304,77 @@ ret:
}
+/* This function gets journal cursor and saves it into state file
+ */
+static rsRetVal
+persistJournalState () {
+ DEFiRet;
+ FILE *sf; /* state file */
+ char *cursor;
+ int ret = 0;
+
+ if ((ret = sd_journal_get_cursor(j, &cursor)) > 0) {
+ if ((sf = fopen(cs.stateFile, "wb")) != NULL) {
+ if (fprintf(sf, "%s", cursor) < 0) {
+ iRet = RS_RET_IO_ERROR;
+ }
+ fclose(sf);
+ free(cursor);
+ } else {
+ iRet = RS_RET_FOPEN_FAILURE;
+ }
+ } else {
+ iRet = RS_RET_ERR;
+ }
+ RETiRet;
+}
+
+
BEGINrunInput
CODESTARTrunInput
/* this is an endless loop - it is terminated when the thread is
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
*/
- while(!pThrd->bShallStop) {
- CHKiRet(readjournal());
+ int count = 0;
+
+ char readCursor[128 + 1];
+ FILE *r_sf;
+
+ /* if state file exists, set cursor to appropriate position */
+ if (access(cs.stateFile, F_OK|R_OK) != -1) {
+ if ((r_sf = fopen(cs.stateFile, "rb")) != NULL) {
+ if (fscanf(r_sf, "%128s\n", readCursor) != EOF) {
+ if (sd_journal_seek_cursor(j, readCursor) != 0) {
+ errmsg.LogError(0, RS_RET_ERR, "imjournal: "
+ "couldn't seek to cursor `%s'\n", readCursor);
+ iRet = RS_RET_ERR;
+ goto finalize_it;
+ }
+ sd_journal_next(j);
+ } else {
+ errmsg.LogError(0, RS_RET_IO_ERROR, "imjournal: "
+ "fscanf on state file `%s' failed\n", cs.stateFile);
+ iRet = RS_RET_IO_ERROR;
+ goto finalize_it;
+ }
+ fclose(r_sf);
+ } else {
+ errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "imjournal: "
+ "open on state file `%s' failed\n", cs.stateFile);
+ }
+ }
+
+ while (glbl.GetGlobalInputTermState() == 0) {
+ CHKiRet(readjournal());
+ count++;
+ if (count == cs.iPersistStateInterval) {
+ count = 0;
+ persistJournalState();
+ }
}
+ persistJournalState();
+
finalize_it:
ENDrunInput
@@ -302,6 +382,9 @@ ENDrunInput
BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
bLegacyCnfModGlobalsPermitted = 1;
+
+ cs.iPersistStateInterval = DFLT_persiststateinterval;
+ cs.stateFile = NULL;
ENDbeginCnfLoad
@@ -357,13 +440,52 @@ CODESTARTmodExit
ENDmodExit
+BEGINsetModCnf
+ struct cnfparamvals *pvals = NULL;
+ int i;
+CODESTARTsetModCnf
+ pvals = nvlstGetParams(lst, &modpblk, NULL);
+ if (pvals == NULL) {
+ errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module "
+ "config parameters [module(...)]");
+ ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+ }
+
+ if (Debug) {
+ dbgprintf("module (global) param blk for imjournal:\n");
+ cnfparamsPrint(&modpblk, pvals);
+ }
+
+ for (i = 0 ; i < modpblk.nParams ; ++i) {
+ if (!pvals[i].bUsed)
+ continue;
+ if (!strcmp(modpblk.descr[i].name, "persiststateinterval")) {
+ cs.iPersistStateInterval = (int) pvals[i].val.d.n;
+ } else if (!strcmp(modpblk.descr[i].name, "statefile")) {
+ cs.stateFile = (char *)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else {
+ dbgprintf("imjournal: program error, non-handled "
+ "param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
+ }
+ }
+
+
+finalize_it:
+ if (pvals != NULL)
+ cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_IMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
ENDqueryEtryPt
+
+
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
@@ -378,7 +500,11 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(prop.CreateStringProp(&pInputName, UCHAR_CONSTANT("imjournal"), sizeof("imjournal") - 1));
CHKiRet(prop.CreateStringProp(&pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1));
- /* init legacy config settings */
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"persiststateinterval", 0, eCmdHdlrInt,
+ NULL, &cs.iPersistStateInterval, STD_LOADABLE_MODULE_ID));
+ CHKiRet(omsdRegCFSLineHdlr((uchar *)"statefile", 0, eCmdHdlrGetWord,
+ NULL, &cs.stateFile, STD_LOADABLE_MODULE_ID));
+
ENDmodInit
/* vim:set ai: