diff options
Diffstat (limited to 'tools/omfile.c')
-rw-r--r-- | tools/omfile.c | 84 |
1 files changed, 77 insertions, 7 deletions
diff --git a/tools/omfile.c b/tools/omfile.c index 715b218c..1c65fc59 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -68,6 +68,7 @@ #include "stream.h" #include "unicode-helper.h" #include "atomic.h" +#include "statsobj.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP @@ -81,6 +82,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(strm) +DEFobjCurrIf(statsobj) /* for our current LRU mechanism, we need a monotonically increasing counters. We use * it much like a "Lamport logical clock": we do not need the actual time, we just need @@ -156,6 +158,13 @@ typedef struct _instanceData { int iFlushInterval; /* how fast flush buffer on inactivity? */ sbool bFlushOnTXEnd; /* flush write buffers when transaction has ended? */ sbool bUseAsyncWriter; /* use async stream writer? */ + sbool bVeryRobustZip; + statsobj_t *stats; /* dynafile, primarily cache stats */ + STATSCOUNTER_DEF(ctrRequests, mutCtrRequests); + STATSCOUNTER_DEF(ctrLevel0, mutCtrLevel0); + STATSCOUNTER_DEF(ctrEvict, mutCtrEvict); + STATSCOUNTER_DEF(ctrMiss, mutCtrMiss); + STATSCOUNTER_DEF(ctrMax, mutCtrMax); } instanceData; @@ -205,6 +214,7 @@ static struct cnfparamdescr actpdescr[] = { { "ziplevel", eCmdHdlrInt, 0 }, /* legacy: omfileziplevel */ { "flushinterval", eCmdHdlrInt, 0 }, /* legacy: omfileflushinterval */ { "asyncwriting", eCmdHdlrBinary, 0 }, /* legacy: omfileasyncwriting */ + { "veryrobustzip", eCmdHdlrBinary, 0 }, { "flushontxend", eCmdHdlrBinary, 0 }, /* legacy: omfileflushontxend */ { "iobuffersize", eCmdHdlrSize, 0 }, /* legacy: omfileiobuffersize */ { "dirowner", eCmdHdlrUID, 0 }, /* legacy: dirowner */ @@ -269,7 +279,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tflush on TX end=%d\n", pData->bFlushOnTXEnd); dbgprintf("\tflush interval=%d\n", pData->iFlushInterval); dbgprintf("\tfile cache size=%d\n", pData->iDynaFileCacheSize); - dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "yes" : "no"); + dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "on" : "off"); + dbgprintf("\tvery robust zip: %s\n", pData->bCreateDirs ? "on" : "off"); dbgprintf("\tfile owner %d, group %d\n", (int) pData->fileUID, (int) pData->fileGID); dbgprintf("\tdirectory owner %d, group %d\n", (int) pData->dirUID, (int) pData->dirGID); dbgprintf("\tdir create mode 0%3.3o, file create mode 0%3.3o\n", @@ -292,7 +303,7 @@ setLegacyDfltTpl(void __attribute__((unused)) *pVal, uchar* newVal) if(loadModConf != NULL && loadModConf->tplName != NULL) { free(newVal); - errmsg.LogError(0, RS_RET_ERR, "omfile default template already set via module " + errmsg.LogError(0, RS_RET_ERR, "omfile: default template already set via module " "global parameter - can no longer be changed"); ABORT_FINALIZE(RS_RET_ERR); } @@ -536,6 +547,7 @@ prepareFile(instanceData *pData, uchar *newFileName) CHKiRet(strm.SetFName(pData->pStrm, szBaseName, ustrlen(szBaseName))); CHKiRet(strm.SetDir(pData->pStrm, szDirName, ustrlen(szDirName))); CHKiRet(strm.SetiZipLevel(pData->pStrm, pData->iZipLevel)); + CHKiRet(strm.SetbVeryReliableZip(pData->pStrm, pData->bVeryRobustZip)); CHKiRet(strm.SetsIOBufSize(pData->pStrm, (size_t) pData->iIOBufSize)); CHKiRet(strm.SettOperationsMode(pData->pStrm, STREAMMODE_WRITE_APPEND)); CHKiRet(strm.SettOpenMode(pData->pStrm, cs.fCreateMode)); @@ -593,7 +605,8 @@ prepareDynFile(instanceData *pData, uchar *newFileName, unsigned iMsgOpts) && !ustrcmp(newFileName, pCache[pData->iCurrElt]->pName)) { /* great, we are all set */ pCache[pData->iCurrElt]->clkTickAccessed = getClockFileAccess(); - // LRU needs only a strictly monotonically increasing counter, so such a one could do + STATSCOUNTER_INC(pData->ctrLevel0, pData->mutCtrLevel0); + /* LRU needs only a strictly monotonically increasing counter, so such a one could do */ FINALIZE; } @@ -625,6 +638,7 @@ prepareDynFile(instanceData *pData, uchar *newFileName, unsigned iMsgOpts) } /* we have not found an entry */ + STATSCOUNTER_INC(pData->ctrMiss, pData->mutCtrMiss); /* invalidate iCurrElt as we may error-exit out of this function when the currrent * iCurrElt has been freed or otherwise become unusable. This is a precaution, and @@ -642,6 +656,7 @@ prepareDynFile(instanceData *pData, uchar *newFileName, unsigned iMsgOpts) if(iFirstFree == -1 && (pData->iCurrCacheSize < pData->iDynaFileCacheSize)) { /* there is space left, so set it to that index */ iFirstFree = pData->iCurrCacheSize++; + STATSCOUNTER_SETMAX_NOMUT(pData->ctrMax, (unsigned) pData->iCurrCacheSize); } /* Note that the following code sequence does not work with the cache entry itself, @@ -650,6 +665,7 @@ prepareDynFile(instanceData *pData, uchar *newFileName, unsigned iMsgOpts) */ if(iFirstFree == -1) { dynaFileDelCacheEntry(pCache, iOldest, 0); + STATSCOUNTER_INC(pData->ctrEvict, pData->mutCtrEvict); iFirstFree = iOldest; /* this one *is* now free ;) */ } else { /* we need to allocate memory for the cache structure */ @@ -843,7 +859,12 @@ BEGINendTransaction CODESTARTendTransaction /* Note: pStrm may be NULL if there was an error opening the stream */ if(pData->bFlushOnTXEnd && pData->pStrm != NULL) { - CHKiRet(strm.Flush(pData->pStrm)); + /* if we have an async writer, it controls the flush via + * a timeout. However, without it, we actually need to flush, + * else incomplete records are written. + */ + if(!pData->bUseAsyncWriter) + CHKiRet(strm.Flush(pData->pStrm)); } finalize_it: ENDendTransaction @@ -851,7 +872,10 @@ ENDendTransaction BEGINdoAction CODESTARTdoAction - DBGPRINTF("file to log to: %s\n", pData->f_fname); + DBGPRINTF("file to log to: %s\n", + (pData->bDynamicName) ? ppString[1] : pData->f_fname); + DBGPRINTF("omfile: start of data: '%.128s'\n", ppString[0]); + STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests); CHKiRet(writeFile(ppString, iMsgOpts, pData)); if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) { CHKiRet(strm.Flush(pData->pStrm)); @@ -878,14 +902,53 @@ setInstParamDefaults(instanceData *pData) pData->bCreateDirs = 1; pData->bSyncFile = 0; pData->iZipLevel = 0; + pData->bVeryRobustZip = 0; pData->bFlushOnTXEnd = FLUSHONTX_DFLT; pData->iIOBufSize = IOBUF_DFLT_SIZE; pData->iFlushInterval = FLUSH_INTRVL_DFLT; pData->bUseAsyncWriter = USE_ASYNCWRITER_DFLT; } + +static rsRetVal +setupInstStatsCtrs(instanceData *pData) +{ + uchar ctrName[512]; + DEFiRet; + + if(!pData->bDynamicName) { + FINALIZE; + } + + /* support statistics gathering */ + snprintf((char*)ctrName, sizeof(ctrName), "dynafile cache %s", pData->f_fname); + ctrName[sizeof(ctrName)-1] = '\0'; /* be on the save side */ + CHKiRet(statsobj.Construct(&(pData->stats))); + CHKiRet(statsobj.SetName(pData->stats, ctrName)); + STATSCOUNTER_INIT(pData->ctrRequests, pData->mutCtrRequests); + CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("requests"), + ctrType_IntCtr, &(pData->ctrRequests))); + STATSCOUNTER_INIT(pData->ctrLevel0, pData->mutCtrLevel0); + CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("level0"), + ctrType_IntCtr, &(pData->ctrLevel0))); + STATSCOUNTER_INIT(pData->ctrMiss, pData->mutCtrMiss); + CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("missed"), + ctrType_IntCtr, &(pData->ctrMiss))); + STATSCOUNTER_INIT(pData->ctrEvict, pData->mutCtrEvict); + CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("evicted"), + ctrType_IntCtr, &(pData->ctrEvict))); + STATSCOUNTER_INIT(pData->ctrMax, pData->mutCtrMax); + CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("maxused"), + ctrType_IntCtr, &(pData->ctrMax))); + CHKiRet(statsobj.ConstructFinalize(pData->stats)); + +finalize_it: + RETiRet; +} + BEGINnewActInst struct cnfparamvals *pvals; + uchar *tplToUse; int i; CODESTARTnewActInst DBGPRINTF("newActInst (omfile)\n"); @@ -914,6 +977,8 @@ CODESTARTnewActInst pData->iZipLevel = (int) pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "flushinterval")) { pData->iFlushInterval = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "veryrobustzip")) { + pData->bVeryRobustZip = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "asyncwriting")) { pData->bUseAsyncWriter = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "flushontxend")) { @@ -960,7 +1025,8 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } - CHKiRet(OMSRsetEntry(*ppOMSR, 0, ustrdup(getDfltTpl()), OMSR_NO_RQD_TPL_OPTS)); + tplToUse = ustrdup((pData->tplName == NULL) ? getDfltTpl() : pData->tplName); + CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_NO_RQD_TPL_OPTS)); if(pData->bDynamicName) { /* "filename" is actually a template name, we need this as string 1. So let's add it @@ -974,6 +1040,7 @@ CODESTARTnewActInst pData->iCurrElt = -1; /* no current element */ } // TODO: add pData->iSizeLimit = 0; /* default value, use outchannels to configure! */ + setupInstStatsCtrs(pData); CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); @@ -1062,6 +1129,8 @@ CODESTARTparseSelectorAct pData->iIOBufSize = (int) cs.iIOBufSize; pData->iFlushInterval = cs.iFlushInterval; pData->bUseAsyncWriter = cs.bUseAsyncWriter; + pData->bVeryRobustZip = 0; /* cannot be specified via legacy conf */ + setupInstStatsCtrs(pData); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -1088,7 +1157,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a cs.bUseAsyncWriter = USE_ASYNCWRITER_DFLT; free(pszFileDfltTplName); pszFileDfltTplName = NULL; - return RS_RET_OK; } @@ -1110,6 +1178,7 @@ BEGINmodExit CODESTARTmodExit objRelease(errmsg, CORE_COMPONENT); objRelease(strm, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); DESTROY_ATOMIC_HELPER_MUT(mutClock); ENDmodExit @@ -1132,6 +1201,7 @@ CODEmodInit_QueryRegCFSLineHdlr INITLegCnfVars CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(strm, CORE_COMPONENT)); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); INIT_ATOMIC_HELPER_MUT(mutClock); |