diff options
Diffstat (limited to 'tools/omfile.c')
-rw-r--r-- | tools/omfile.c | 113 |
1 files changed, 95 insertions, 18 deletions
diff --git a/tools/omfile.c b/tools/omfile.c index fdcf355a..b1cbbfd9 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -133,6 +133,7 @@ typedef struct s_dynaFileCacheEntry dynaFileCacheEntry; typedef struct _instanceData { + pthread_mutex_t mutWrite; /* guard against multiple instances writing to single file */ uchar *f_fname; /* file or template name (display only) */ uchar *tplName; /* name of assigned template */ strm_t *pStrm; /* our output stream */ @@ -181,6 +182,20 @@ typedef struct _instanceData { STATSCOUNTER_DEF(ctrMax, mutCtrMax); } instanceData; +/* to build a linked list for temporary storage of lines while we cannot commit */ +typedef struct linebuf { + uchar *filename; /* for dynafiles, make go away */ + uchar *ln; + unsigned iMsgOpts; + struct linebuf *pNext; +} linebuf_t; + +typedef struct wrkrInstanceData { + instanceData *pData; + linebuf_t *pRoot; + linebuf_t *pLast; +} wrkrInstanceData_t; + typedef struct configSettings_s { int iDynaFileCacheSize; /* max cache for dynamic files */ @@ -786,7 +801,7 @@ finalize_it: /* rgerhards 2004-11-11: write to a file output. */ static rsRetVal -writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) +writeFile(instanceData *pData, linebuf_t *linebuf) { DEFiRet; @@ -796,7 +811,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) * check if it still is ok or a new file needs to be created */ if(pData->bDynamicName) { - CHKiRet(prepareDynFile(pData, ppString[1], iMsgOpts)); + CHKiRet(prepareDynFile(pData, linebuf->filename, linebuf->iMsgOpts)); } else { /* "regular", non-dynafile */ if(pData->pStrm == NULL) { CHKiRet(prepareFile(pData, pData->f_fname)); @@ -806,7 +821,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) } } - CHKiRet(doWrite(pData, ppString[0], strlen(CHAR_CONVERT(ppString[0])))); + CHKiRet(doWrite(pData, linebuf->ln, ustrlen(linebuf->ln))); finalize_it: RETiRet; @@ -888,9 +903,15 @@ ENDfreeCnf BEGINcreateInstance CODESTARTcreateInstance pData->pStrm = NULL; + pthread_mutex_init(&pData->mutWrite, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINfreeInstance CODESTARTfreeInstance free(pData->tplName); @@ -913,9 +934,15 @@ CODESTARTfreeInstance free(pData->cryprovName); free(pData->cryprovNameFull); } + pthread_mutex_destroy(&pData->mutWrite); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINtryResume CODESTARTtryResume ENDtryResume @@ -926,8 +953,68 @@ CODESTARTbeginTransaction ENDbeginTransaction +static rsRetVal +bufferLine(wrkrInstanceData_t *pWrkrData, uchar *filename, uchar *line) +{ + linebuf_t *lb; + DEFiRet; + + CHKmalloc(lb = (linebuf_t*) malloc(sizeof(linebuf_t))); +dbgprintf("DDDD: filename '%s'\n", filename); + CHKmalloc(lb->filename = ustrdup(filename)); + CHKmalloc(lb->ln = ustrdup(line)); + lb->pNext = NULL; + if(pWrkrData->pRoot == NULL) { + pWrkrData->pRoot = pWrkrData->pLast = lb; + } else { + pWrkrData->pLast->pNext = lb; + pWrkrData->pLast = lb; + } +finalize_it: + RETiRet; +} + +static void +submitCachedLines(wrkrInstanceData_t *pWrkrData, instanceData *pData) +{ + linebuf_t *curr, *todel; + + for(curr = pWrkrData->pRoot ; curr != NULL ; ) { + DBGPRINTF("omfile: file to log to: %s\n", curr->filename); + DBGPRINTF("omfile: start of data: '%.128s'\n", curr->ln); + STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests); + writeFile(pData, curr); + + todel = curr; + curr = curr->pNext; + free(todel->filename); + free(todel->ln); + free(todel); + } + pWrkrData->pRoot = NULL; +} + + +BEGINdoAction + instanceData *pData; +CODESTARTdoAction + pData = pWrkrData->pData; +dbgprintf("DDDD: bDynName %d, filename '%s'\n", pData->bDynamicName, ppString[1]); + iRet = bufferLine(pWrkrData, (pData->bDynamicName) ? ppString[1] : pData->f_fname, + ppString[0]); + if(iRet == RS_RET_OK) + iRet = RS_RET_DEFER_COMMIT; +ENDdoAction + BEGINendTransaction + instanceData *pData; CODESTARTendTransaction + pData = pWrkrData->pData; +dbgprintf("omfile: waiting on write lock (pWrkrData %p)\n", pWrkrData); + pthread_mutex_lock(&pData->mutWrite); +dbgprintf("omfile: aquired write lock (pWrkrData %p)\n", pWrkrData); + + submitCachedLines(pWrkrData, pData); /* Note: pStrm may be NULL if there was an error opening the stream */ if(pData->bFlushOnTXEnd && pData->pStrm != NULL) { /* if we have an async writer, it controls the flush via @@ -938,24 +1025,11 @@ CODESTARTendTransaction CHKiRet(strm.Flush(pData->pStrm)); } finalize_it: + pthread_mutex_unlock(&pData->mutWrite); +dbgprintf("omfile: free write lock (pWrkrData %p)\n", pWrkrData); ENDendTransaction -BEGINdoAction -CODESTARTdoAction - DBGPRINTF("file to log to: %s\n", - (pData->bDynamicName) ? ppString[1] : pData->f_fname); - DBGPRINTF("omfile: start of data: '%.128s'\n", ppString[0]); - STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests); - CHKiRet(writeFile(ppString, iMsgOpts, pData)); - if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) { - CHKiRet(strm.Flush(pData->pStrm)); - } -finalize_it: - if(iRet == RS_RET_OK) - iRet = RS_RET_DEFER_COMMIT; -ENDdoAction - static inline void setInstParamDefaults(instanceData *pData) @@ -1336,6 +1410,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a BEGINdoHUP CODESTARTdoHUP + pthread_mutex_lock(&pData->mutWrite); if(pData->bDynamicName) { dynaFileFreeCacheEntries(pData); } else { @@ -1343,6 +1418,7 @@ CODESTARTdoHUP closeFile(pData); } } + pthread_mutex_unlock(&pData->mutWrite); ENDdoHUP @@ -1358,6 +1434,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES |