diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/imjournal/imjournal.c | 138 |
1 files changed, 132 insertions, 6 deletions
diff --git a/plugins/imjournal/imjournal.c b/plugins/imjournal/imjournal.c index 8e1066dc..88b71c54 100644 --- a/plugins/imjournal/imjournal.c +++ b/plugins/imjournal/imjournal.c @@ -58,7 +58,25 @@ DEFobjCurrIf(prop) DEFobjCurrIf(net) DEFobjCurrIf(errmsg) -static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ +static struct configSettings_s { + char *stateFile; + int iPersistStateInterval; +} cs; + +/* module-gloval parameters */ +static struct cnfparamdescr modpdescr[] = { + { "statefile", eCmdHdlrGetWord, 0 }, + { "persiststateinterval", eCmdHdlrInt, 0 } +}; +static struct cnfparamblk modpblk = + { CNFPARAMBLK_VERSION, + sizeof(modpdescr)/sizeof(struct cnfparamdescr), + modpdescr + }; + +#define DFLT_persiststateinterval 10 + +static int bLegacyCnfModGlobalsPermitted = 0;/* are legacy module-global config parameters permitted? */ static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this module */ static prop_t *pLocalHostIP = NULL; /* a pseudo-constant propterty for 127.0.0.1 */ @@ -100,7 +118,7 @@ enqMsg(uchar *msg, uchar *pszTag, int iFacility, int iSeverity, struct timeval * msgAddJSON(pMsg, (uchar*)"!", json); } - CHKiRet(submitMsg(pMsg)); + CHKiRet(submitMsg2(pMsg)); finalize_it: RETiRet; @@ -118,7 +136,6 @@ readjournal() { uint64_t timestamp; struct json_object *json = NULL; - json = json_object_new_object(); /* Information from messages */ char *message; @@ -204,6 +221,7 @@ readjournal() { } free (sys_iden); + json = json_object_new_object(); SD_JOURNAL_FOREACH_DATA(j, get, l) { /* locate equal sign, this is always present */ @@ -286,15 +304,77 @@ ret: } +/* This function gets journal cursor and saves it into state file + */ +static rsRetVal +persistJournalState () { + DEFiRet; + FILE *sf; /* state file */ + char *cursor; + int ret = 0; + + if ((ret = sd_journal_get_cursor(j, &cursor)) > 0) { + if ((sf = fopen(cs.stateFile, "wb")) != NULL) { + if (fprintf(sf, "%s", cursor) < 0) { + iRet = RS_RET_IO_ERROR; + } + fclose(sf); + free(cursor); + } else { + iRet = RS_RET_FOPEN_FAILURE; + } + } else { + iRet = RS_RET_ERR; + } + RETiRet; +} + + BEGINrunInput CODESTARTrunInput /* this is an endless loop - it is terminated when the thread is * signalled to do so. This, however, is handled by the framework, * right into the sleep below. */ - while(!pThrd->bShallStop) { - CHKiRet(readjournal()); + int count = 0; + + char readCursor[128 + 1]; + FILE *r_sf; + + /* if state file exists, set cursor to appropriate position */ + if (access(cs.stateFile, F_OK|R_OK) != -1) { + if ((r_sf = fopen(cs.stateFile, "rb")) != NULL) { + if (fscanf(r_sf, "%128s\n", readCursor) != EOF) { + if (sd_journal_seek_cursor(j, readCursor) != 0) { + errmsg.LogError(0, RS_RET_ERR, "imjournal: " + "couldn't seek to cursor `%s'\n", readCursor); + iRet = RS_RET_ERR; + goto finalize_it; + } + sd_journal_next(j); + } else { + errmsg.LogError(0, RS_RET_IO_ERROR, "imjournal: " + "fscanf on state file `%s' failed\n", cs.stateFile); + iRet = RS_RET_IO_ERROR; + goto finalize_it; + } + fclose(r_sf); + } else { + errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "imjournal: " + "open on state file `%s' failed\n", cs.stateFile); + } + } + + while (glbl.GetGlobalInputTermState() == 0) { + CHKiRet(readjournal()); + count++; + if (count == cs.iPersistStateInterval) { + count = 0; + persistJournalState(); + } } + persistJournalState(); + finalize_it: ENDrunInput @@ -302,6 +382,9 @@ ENDrunInput BEGINbeginCnfLoad CODESTARTbeginCnfLoad bLegacyCnfModGlobalsPermitted = 1; + + cs.iPersistStateInterval = DFLT_persiststateinterval; + cs.stateFile = NULL; ENDbeginCnfLoad @@ -357,13 +440,52 @@ CODESTARTmodExit ENDmodExit +BEGINsetModCnf + struct cnfparamvals *pvals = NULL; + int i; +CODESTARTsetModCnf + pvals = nvlstGetParams(lst, &modpblk, NULL); + if (pvals == NULL) { + errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "error processing module " + "config parameters [module(...)]"); + ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); + } + + if (Debug) { + dbgprintf("module (global) param blk for imjournal:\n"); + cnfparamsPrint(&modpblk, pvals); + } + + for (i = 0 ; i < modpblk.nParams ; ++i) { + if (!pvals[i].bUsed) + continue; + if (!strcmp(modpblk.descr[i].name, "persiststateinterval")) { + cs.iPersistStateInterval = (int) pvals[i].val.d.n; + } else if (!strcmp(modpblk.descr[i].name, "statefile")) { + cs.stateFile = (char *)es_str2cstr(pvals[i].val.d.estr, NULL); + } else { + dbgprintf("imjournal: program error, non-handled " + "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); + } + } + + +finalize_it: + if (pvals != NULL) + cnfparamvalsDestruct(pvals, &modpblk); +ENDsetModCnf + + BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_IMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES +CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES ENDqueryEtryPt + + BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ @@ -378,7 +500,11 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(prop.CreateStringProp(&pInputName, UCHAR_CONSTANT("imjournal"), sizeof("imjournal") - 1)); CHKiRet(prop.CreateStringProp(&pLocalHostIP, UCHAR_CONSTANT("127.0.0.1"), sizeof("127.0.0.1") - 1)); - /* init legacy config settings */ + CHKiRet(omsdRegCFSLineHdlr((uchar *)"persiststateinterval", 0, eCmdHdlrInt, + NULL, &cs.iPersistStateInterval, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"statefile", 0, eCmdHdlrGetWord, + NULL, &cs.stateFile, STD_LOADABLE_MODULE_ID)); + ENDmodInit /* vim:set ai: |