summaryrefslogtreecommitdiffstats
path: root/tools/omfile.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/omfile.c')
-rw-r--r--tools/omfile.c84
1 files changed, 77 insertions, 7 deletions
diff --git a/tools/omfile.c b/tools/omfile.c
index 715b218c..1c65fc59 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -68,6 +68,7 @@
#include "stream.h"
#include "unicode-helper.h"
#include "atomic.h"
+#include "statsobj.h"
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
@@ -81,6 +82,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(strm)
+DEFobjCurrIf(statsobj)
/* for our current LRU mechanism, we need a monotonically increasing counters. We use
* it much like a "Lamport logical clock": we do not need the actual time, we just need
@@ -156,6 +158,13 @@ typedef struct _instanceData {
int iFlushInterval; /* how fast flush buffer on inactivity? */
sbool bFlushOnTXEnd; /* flush write buffers when transaction has ended? */
sbool bUseAsyncWriter; /* use async stream writer? */
+ sbool bVeryRobustZip;
+ statsobj_t *stats; /* dynafile, primarily cache stats */
+ STATSCOUNTER_DEF(ctrRequests, mutCtrRequests);
+ STATSCOUNTER_DEF(ctrLevel0, mutCtrLevel0);
+ STATSCOUNTER_DEF(ctrEvict, mutCtrEvict);
+ STATSCOUNTER_DEF(ctrMiss, mutCtrMiss);
+ STATSCOUNTER_DEF(ctrMax, mutCtrMax);
} instanceData;
@@ -205,6 +214,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "ziplevel", eCmdHdlrInt, 0 }, /* legacy: omfileziplevel */
{ "flushinterval", eCmdHdlrInt, 0 }, /* legacy: omfileflushinterval */
{ "asyncwriting", eCmdHdlrBinary, 0 }, /* legacy: omfileasyncwriting */
+ { "veryrobustzip", eCmdHdlrBinary, 0 },
{ "flushontxend", eCmdHdlrBinary, 0 }, /* legacy: omfileflushontxend */
{ "iobuffersize", eCmdHdlrSize, 0 }, /* legacy: omfileiobuffersize */
{ "dirowner", eCmdHdlrUID, 0 }, /* legacy: dirowner */
@@ -269,7 +279,8 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tflush on TX end=%d\n", pData->bFlushOnTXEnd);
dbgprintf("\tflush interval=%d\n", pData->iFlushInterval);
dbgprintf("\tfile cache size=%d\n", pData->iDynaFileCacheSize);
- dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "yes" : "no");
+ dbgprintf("\tcreate directories: %s\n", pData->bCreateDirs ? "on" : "off");
+ dbgprintf("\tvery robust zip: %s\n", pData->bCreateDirs ? "on" : "off");
dbgprintf("\tfile owner %d, group %d\n", (int) pData->fileUID, (int) pData->fileGID);
dbgprintf("\tdirectory owner %d, group %d\n", (int) pData->dirUID, (int) pData->dirGID);
dbgprintf("\tdir create mode 0%3.3o, file create mode 0%3.3o\n",
@@ -292,7 +303,7 @@ setLegacyDfltTpl(void __attribute__((unused)) *pVal, uchar* newVal)
if(loadModConf != NULL && loadModConf->tplName != NULL) {
free(newVal);
- errmsg.LogError(0, RS_RET_ERR, "omfile default template already set via module "
+ errmsg.LogError(0, RS_RET_ERR, "omfile: default template already set via module "
"global parameter - can no longer be changed");
ABORT_FINALIZE(RS_RET_ERR);
}
@@ -536,6 +547,7 @@ prepareFile(instanceData *pData, uchar *newFileName)
CHKiRet(strm.SetFName(pData->pStrm, szBaseName, ustrlen(szBaseName)));
CHKiRet(strm.SetDir(pData->pStrm, szDirName, ustrlen(szDirName)));
CHKiRet(strm.SetiZipLevel(pData->pStrm, pData->iZipLevel));
+ CHKiRet(strm.SetbVeryReliableZip(pData->pStrm, pData->bVeryRobustZip));
CHKiRet(strm.SetsIOBufSize(pData->pStrm, (size_t) pData->iIOBufSize));
CHKiRet(strm.SettOperationsMode(pData->pStrm, STREAMMODE_WRITE_APPEND));
CHKiRet(strm.SettOpenMode(pData->pStrm, cs.fCreateMode));
@@ -593,7 +605,8 @@ prepareDynFile(instanceData *pData, uchar *newFileName, unsigned iMsgOpts)
&& !ustrcmp(newFileName, pCache[pData->iCurrElt]->pName)) {
/* great, we are all set */
pCache[pData->iCurrElt]->clkTickAccessed = getClockFileAccess();
- // LRU needs only a strictly monotonically increasing counter, so such a one could do
+ STATSCOUNTER_INC(pData->ctrLevel0, pData->mutCtrLevel0);
+ /* LRU needs only a strictly monotonically increasing counter, so such a one could do */
FINALIZE;
}
@@ -625,6 +638,7 @@ prepareDynFile(instanceData *pData, uchar *newFileName, unsigned iMsgOpts)
}
/* we have not found an entry */
+ STATSCOUNTER_INC(pData->ctrMiss, pData->mutCtrMiss);
/* invalidate iCurrElt as we may error-exit out of this function when the currrent
* iCurrElt has been freed or otherwise become unusable. This is a precaution, and
@@ -642,6 +656,7 @@ prepareDynFile(instanceData *pData, uchar *newFileName, unsigned iMsgOpts)
if(iFirstFree == -1 && (pData->iCurrCacheSize < pData->iDynaFileCacheSize)) {
/* there is space left, so set it to that index */
iFirstFree = pData->iCurrCacheSize++;
+ STATSCOUNTER_SETMAX_NOMUT(pData->ctrMax, (unsigned) pData->iCurrCacheSize);
}
/* Note that the following code sequence does not work with the cache entry itself,
@@ -650,6 +665,7 @@ prepareDynFile(instanceData *pData, uchar *newFileName, unsigned iMsgOpts)
*/
if(iFirstFree == -1) {
dynaFileDelCacheEntry(pCache, iOldest, 0);
+ STATSCOUNTER_INC(pData->ctrEvict, pData->mutCtrEvict);
iFirstFree = iOldest; /* this one *is* now free ;) */
} else {
/* we need to allocate memory for the cache structure */
@@ -843,7 +859,12 @@ BEGINendTransaction
CODESTARTendTransaction
/* Note: pStrm may be NULL if there was an error opening the stream */
if(pData->bFlushOnTXEnd && pData->pStrm != NULL) {
- CHKiRet(strm.Flush(pData->pStrm));
+ /* if we have an async writer, it controls the flush via
+ * a timeout. However, without it, we actually need to flush,
+ * else incomplete records are written.
+ */
+ if(!pData->bUseAsyncWriter)
+ CHKiRet(strm.Flush(pData->pStrm));
}
finalize_it:
ENDendTransaction
@@ -851,7 +872,10 @@ ENDendTransaction
BEGINdoAction
CODESTARTdoAction
- DBGPRINTF("file to log to: %s\n", pData->f_fname);
+ DBGPRINTF("file to log to: %s\n",
+ (pData->bDynamicName) ? ppString[1] : pData->f_fname);
+ DBGPRINTF("omfile: start of data: '%.128s'\n", ppString[0]);
+ STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests);
CHKiRet(writeFile(ppString, iMsgOpts, pData));
if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) {
CHKiRet(strm.Flush(pData->pStrm));
@@ -878,14 +902,53 @@ setInstParamDefaults(instanceData *pData)
pData->bCreateDirs = 1;
pData->bSyncFile = 0;
pData->iZipLevel = 0;
+ pData->bVeryRobustZip = 0;
pData->bFlushOnTXEnd = FLUSHONTX_DFLT;
pData->iIOBufSize = IOBUF_DFLT_SIZE;
pData->iFlushInterval = FLUSH_INTRVL_DFLT;
pData->bUseAsyncWriter = USE_ASYNCWRITER_DFLT;
}
+
+static rsRetVal
+setupInstStatsCtrs(instanceData *pData)
+{
+ uchar ctrName[512];
+ DEFiRet;
+
+ if(!pData->bDynamicName) {
+ FINALIZE;
+ }
+
+ /* support statistics gathering */
+ snprintf((char*)ctrName, sizeof(ctrName), "dynafile cache %s", pData->f_fname);
+ ctrName[sizeof(ctrName)-1] = '\0'; /* be on the save side */
+ CHKiRet(statsobj.Construct(&(pData->stats)));
+ CHKiRet(statsobj.SetName(pData->stats, ctrName));
+ STATSCOUNTER_INIT(pData->ctrRequests, pData->mutCtrRequests);
+ CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("requests"),
+ ctrType_IntCtr, &(pData->ctrRequests)));
+ STATSCOUNTER_INIT(pData->ctrLevel0, pData->mutCtrLevel0);
+ CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("level0"),
+ ctrType_IntCtr, &(pData->ctrLevel0)));
+ STATSCOUNTER_INIT(pData->ctrMiss, pData->mutCtrMiss);
+ CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("missed"),
+ ctrType_IntCtr, &(pData->ctrMiss)));
+ STATSCOUNTER_INIT(pData->ctrEvict, pData->mutCtrEvict);
+ CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("evicted"),
+ ctrType_IntCtr, &(pData->ctrEvict)));
+ STATSCOUNTER_INIT(pData->ctrMax, pData->mutCtrMax);
+ CHKiRet(statsobj.AddCounter(pData->stats, UCHAR_CONSTANT("maxused"),
+ ctrType_IntCtr, &(pData->ctrMax)));
+ CHKiRet(statsobj.ConstructFinalize(pData->stats));
+
+finalize_it:
+ RETiRet;
+}
+
BEGINnewActInst
struct cnfparamvals *pvals;
+ uchar *tplToUse;
int i;
CODESTARTnewActInst
DBGPRINTF("newActInst (omfile)\n");
@@ -914,6 +977,8 @@ CODESTARTnewActInst
pData->iZipLevel = (int) pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "flushinterval")) {
pData->iFlushInterval = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "veryrobustzip")) {
+ pData->bVeryRobustZip = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "asyncwriting")) {
pData->bUseAsyncWriter = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "flushontxend")) {
@@ -960,7 +1025,8 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
- CHKiRet(OMSRsetEntry(*ppOMSR, 0, ustrdup(getDfltTpl()), OMSR_NO_RQD_TPL_OPTS));
+ tplToUse = ustrdup((pData->tplName == NULL) ? getDfltTpl() : pData->tplName);
+ CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_NO_RQD_TPL_OPTS));
if(pData->bDynamicName) {
/* "filename" is actually a template name, we need this as string 1. So let's add it
@@ -974,6 +1040,7 @@ CODESTARTnewActInst
pData->iCurrElt = -1; /* no current element */
}
// TODO: add pData->iSizeLimit = 0; /* default value, use outchannels to configure! */
+ setupInstStatsCtrs(pData);
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
@@ -1062,6 +1129,8 @@ CODESTARTparseSelectorAct
pData->iIOBufSize = (int) cs.iIOBufSize;
pData->iFlushInterval = cs.iFlushInterval;
pData->bUseAsyncWriter = cs.bUseAsyncWriter;
+ pData->bVeryRobustZip = 0; /* cannot be specified via legacy conf */
+ setupInstStatsCtrs(pData);
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -1088,7 +1157,6 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
cs.bUseAsyncWriter = USE_ASYNCWRITER_DFLT;
free(pszFileDfltTplName);
pszFileDfltTplName = NULL;
-
return RS_RET_OK;
}
@@ -1110,6 +1178,7 @@ BEGINmodExit
CODESTARTmodExit
objRelease(errmsg, CORE_COMPONENT);
objRelease(strm, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
DESTROY_ATOMIC_HELPER_MUT(mutClock);
ENDmodExit
@@ -1132,6 +1201,7 @@ CODEmodInit_QueryRegCFSLineHdlr
INITLegCnfVars
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(strm, CORE_COMPONENT));
+ CHKiRet(objUse(statsobj, CORE_COMPONENT));
INIT_ATOMIC_HELPER_MUT(mutClock);