diff options
Diffstat (limited to 'plugins/imfile/imfile.c')
-rw-r--r-- | plugins/imfile/imfile.c | 115 |
1 files changed, 67 insertions, 48 deletions
diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c index b0211bf6..91493bb1 100644 --- a/plugins/imfile/imfile.c +++ b/plugins/imfile/imfile.c @@ -45,6 +45,8 @@ #include "errmsg.h" #include "glbl.h" #include "datetime.h" +#include "unicode-helper.h" +#include "prop.h" MODULE_TYPE_INPUT /* must be present for input modules, do not remove */ @@ -55,22 +57,31 @@ DEF_IMOD_STATIC_DATA /* must be present, starts static data */ DEFobjCurrIf(errmsg) DEFobjCurrIf(glbl) DEFobjCurrIf(datetime) +DEFobjCurrIf(strm) +DEFobjCurrIf(prop) typedef struct fileInfo_s { uchar *pszFileName; uchar *pszTag; + size_t lenTag; uchar *pszStateFile; /* file in which state between runs is to be stored */ int iFacility; int iSeverity; + int nRecords; /**< How many records did we process before persisting the stream? */ + int iPersistStateInterval; /**< how often should state be persisted? (0=on close only) */ strm_t *pStrm; /* its stream (NULL if not assigned) */ } fileInfo_t; +/* forward definitions */ +static rsRetVal persistStrmState(fileInfo_t *pInfo); + /* config variables */ static uchar *pszFileName = NULL; static uchar *pszFileTag = NULL; static uchar *pszStateFile = NULL; static int iPollInterval = 10; /* number of seconds to sleep when there was no file activity */ +static int iPersistStateInterval = 0; /* how often if state file to be persisted? (default 0->never) */ static int iFacility = 128; /* local0 */ static int iSeverity = 5; /* notice, as of rfc 3164 */ @@ -78,6 +89,7 @@ static int iFilPtr = 0; /* number of files to be monitored; pointer to next fre #define MAX_INPUT_FILES 100 static fileInfo_t files[MAX_INPUT_FILES]; +static prop_t *pInputName = NULL; /* there is only one global inputName for all messages generated by this input */ /* enqueue the read file line as a message. The provided string is * not freed - thuis must be done by the caller. @@ -94,12 +106,11 @@ static rsRetVal enqLine(fileInfo_t *pInfo, cstr_t *cstrLine) CHKiRet(msgConstruct(&pMsg)); MsgSetFlowControlType(pMsg, eFLOWCTL_FULL_DELAY); - MsgSetInputName(pMsg, "imfile"); - MsgSetUxTradMsg(pMsg, (char*)rsCStrGetSzStr(cstrLine)); - MsgSetRawMsg(pMsg, (char*)rsCStrGetSzStr(cstrLine)); - MsgSetMSG(pMsg, (char*)rsCStrGetSzStr(cstrLine)); - MsgSetHOSTNAME(pMsg, (char*)glbl.GetLocalHostName()); - MsgSetTAG(pMsg, (char*)pInfo->pszTag); + MsgSetInputName(pMsg, pInputName); + MsgSetRawMsg(pMsg, (char*)rsCStrGetSzStr(cstrLine), cstrLen(cstrLine)); + MsgSetMSGoffs(pMsg, 0); /* we do not have a header... */ + MsgSetHOSTNAME(pMsg, glbl.GetLocalHostName(), ustrlen(glbl.GetLocalHostName())); + MsgSetTAG(pMsg, pInfo->pszTag, pInfo->lenTag); pMsg->iFacility = LOG_FAC(pInfo->iFacility); pMsg->iSeverity = LOG_PRI(pInfo->iSeverity); pMsg->bParseHOSTNAME = 0; @@ -138,32 +149,31 @@ openFile(fileInfo_t *pThis) /* If we reach this point, we have a .si file */ - CHKiRet(strmConstruct(&psSF)); - CHKiRet(strmSettOperationsMode(psSF, STREAMMODE_READ)); - CHKiRet(strmSetsType(psSF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strmSetFName(psSF, pszSFNam, lenSFNam)); - CHKiRet(strmConstructFinalize(psSF)); + CHKiRet(strm.Construct(&psSF)); + CHKiRet(strm.SettOperationsMode(psSF, STREAMMODE_READ)); + CHKiRet(strm.SetsType(psSF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strm.SetFName(psSF, pszSFNam, lenSFNam)); + CHKiRet(strm.ConstructFinalize(psSF)); /* read back in the object */ CHKiRet(obj.Deserialize(&pThis->pStrm, (uchar*) "strm", psSF, NULL, pThis)); - CHKiRet(strmSeekCurrOffs(pThis->pStrm)); + CHKiRet(strm.SeekCurrOffs(pThis->pStrm)); - /* OK, we could successfully read the file, so we now can request that it be deleted. - * If we need it again, it will be written on the next shutdown. + /* note: we do not delete the state file, so that the last position remains + * known even in the case that rsyslogd aborts for some reason (like powerfail) */ - psSF->bDeleteOnClose = 1; finalize_it: if(psSF != NULL) - strmDestruct(&psSF); + strm.Destruct(&psSF); if(iRet != RS_RET_OK) { - CHKiRet(strmConstruct(&pThis->pStrm)); - CHKiRet(strmSettOperationsMode(pThis->pStrm, STREAMMODE_READ)); - CHKiRet(strmSetsType(pThis->pStrm, STREAMTYPE_FILE_MONITOR)); - CHKiRet(strmSetFName(pThis->pStrm, pThis->pszFileName, strlen((char*) pThis->pszFileName))); - CHKiRet(strmConstructFinalize(pThis->pStrm)); + CHKiRet(strm.Construct(&pThis->pStrm)); + CHKiRet(strm.SettOperationsMode(pThis->pStrm, STREAMMODE_READ)); + CHKiRet(strm.SetsType(pThis->pStrm, STREAMTYPE_FILE_MONITOR)); + CHKiRet(strm.SetFName(pThis->pStrm, pThis->pszFileName, strlen((char*) pThis->pszFileName))); + CHKiRet(strm.ConstructFinalize(pThis->pStrm)); } RETiRet; @@ -202,10 +212,14 @@ static rsRetVal pollFile(fileInfo_t *pThis, int *pbHadFileData) /* loop below will be exited when strmReadLine() returns EOF */ while(1) { - CHKiRet(strmReadLine(pThis->pStrm, &pCStr)); + CHKiRet(strm.ReadLine(pThis->pStrm, &pCStr)); *pbHadFileData = 1; /* this is just a flag, so set it and forget it */ CHKiRet(enqLine(pThis, pCStr)); /* process line */ rsCStrDestruct(&pCStr); /* discard string (must be done by us!) */ + if(pThis->iPersistStateInterval > 0 && pThis->nRecords++ >= pThis->iPersistStateInterval) { + persistStrmState(pThis); + pThis->nRecords = 0; + } } finalize_it: @@ -239,27 +253,12 @@ finalize_it: * IMPORTANT: the calling interface of this function can NOT be modified. It actually is * called by pthreads. The provided argument is currently not being used. */ -/* ------------------------------------------------------------------------------------------ * - * DO NOT TOUCH the following code - it will soon be part of the module generation macros! */ static void inputModuleCleanup(void __attribute__((unused)) *arg) { BEGINfunc -/* END no-touch zone * - * ------------------------------------------------------------------------------------------ */ - - - - /* so far not needed */ - - - -/* ------------------------------------------------------------------------------------------ * - * DO NOT TOUCH the following code - it will soon be part of the module generation macros! */ ENDfunc } -/* END no-touch zone * - * ------------------------------------------------------------------------------------------ */ /* This function is called by the framework to gather the input. The module stays @@ -334,6 +333,11 @@ CODESTARTwillRun ABORT_FINALIZE(RS_RET_NO_RUN); } + /* we need to create the inputName property (only once during our lifetime) */ + CHKiRet(prop.Construct(&pInputName)); + CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("imfile"), sizeof("imfile") - 1)); + CHKiRet(prop.ConstructFinalize(pInputName)); + finalize_it: ENDwillRun @@ -349,25 +353,27 @@ persistStrmState(fileInfo_t *pInfo) { DEFiRet; strm_t *psSF = NULL; /* state file (stream) */ + size_t lenDir; ASSERT(pInfo != NULL); /* TODO: create a function persistObj in obj.c? */ - CHKiRet(strmConstruct(&psSF)); - CHKiRet(strmSetDir(psSF, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); - CHKiRet(strmSettOperationsMode(psSF, STREAMMODE_WRITE)); - CHKiRet(strmSetiAddtlOpenFlags(psSF, O_TRUNC)); - CHKiRet(strmSetsType(psSF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strmSetFName(psSF, pInfo->pszStateFile, strlen((char*) pInfo->pszStateFile))); - CHKiRet(strmConstructFinalize(psSF)); + CHKiRet(strm.Construct(&psSF)); + lenDir = strlen((char*)glbl.GetWorkDir()); + if(lenDir > 0) + CHKiRet(strm.SetDir(psSF, glbl.GetWorkDir(), lenDir)); + CHKiRet(strm.SettOperationsMode(psSF, STREAMMODE_WRITE_TRUNC)); + CHKiRet(strm.SetsType(psSF, STREAMTYPE_FILE_SINGLE)); + CHKiRet(strm.SetFName(psSF, pInfo->pszStateFile, strlen((char*) pInfo->pszStateFile))); + CHKiRet(strm.ConstructFinalize(psSF)); - CHKiRet(strmSerialize(pInfo->pStrm, psSF)); + CHKiRet(strm.Serialize(pInfo->pStrm, psSF)); - CHKiRet(strmDestruct(&psSF)); + CHKiRet(strm.Destruct(&psSF)); finalize_it: if(psSF != NULL) - strmDestruct(&psSF); + strm.Destruct(&psSF); RETiRet; } @@ -387,9 +393,12 @@ CODESTARTafterRun for(i = 0 ; i < iFilPtr ; ++i) { if(files[i].pStrm != NULL) { /* stream open? */ persistStrmState(&files[i]); - strmDestruct(&(files[i].pStrm)); + strm.Destruct(&(files[i].pStrm)); } } + + if(pInputName != NULL) + prop.Destruct(&pInputName); ENDafterRun @@ -400,9 +409,11 @@ ENDafterRun BEGINmodExit CODESTARTmodExit /* release objects we used */ + objRelease(strm, CORE_COMPONENT); objRelease(datetime, CORE_COMPONENT); objRelease(glbl, CORE_COMPONENT); objRelease(errmsg, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); ENDmodExit @@ -470,6 +481,7 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal) ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } else { pThis->pszTag = (uchar*) strdup((char*) pszFileTag); + pThis->lenTag = ustrlen(pThis->pszTag); } if(pszStateFile == NULL) { @@ -481,6 +493,9 @@ static rsRetVal addMonitor(void __attribute__((unused)) *pVal, uchar *pNewVal) pThis->iSeverity = iSeverity; pThis->iFacility = iFacility; + pThis->iPersistStateInterval = iPersistStateInterval; + pThis->nRecords = 0; + iPersistStateInterval = 0; } else { errmsg.LogError(0, RS_RET_OUT_OF_DESRIPTORS, "Too many file monitors configured - ignoring this one"); ABORT_FINALIZE(RS_RET_OUT_OF_DESRIPTORS); @@ -511,6 +526,8 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(glbl, CORE_COMPONENT)); CHKiRet(objUse(datetime, CORE_COMPONENT)); + CHKiRet(objUse(strm, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilename", 0, eCmdHdlrGetWord, NULL, &pszFileName, STD_LOADABLE_MODULE_ID)); @@ -524,6 +541,8 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &iFacility, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepollinterval", 0, eCmdHdlrInt, NULL, &iPollInterval, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputfilepersiststateinterval", 0, eCmdHdlrInt, + NULL, &iPersistStateInterval, STD_LOADABLE_MODULE_ID)); /* that command ads a new file! */ CHKiRet(omsdRegCFSLineHdlr((uchar *)"inputrunfilemonitor", 0, eCmdHdlrGetWord, addMonitor, NULL, STD_LOADABLE_MODULE_ID)); |