From f4161583cc799acfe5b5613e103418d1baaf450e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 28 Oct 2013 15:04:58 +0100 Subject: milestone: omfile supports buffering mechanism --- runtime/module-template.h | 8 ++++- tools/omfile.c | 88 ++++++++++++++++++++++++++++++----------------- 2 files changed, 63 insertions(+), 33 deletions(-) diff --git a/runtime/module-template.h b/runtime/module-template.h index b9d1844d..f6afc961 100644 --- a/runtime/module-template.h +++ b/runtime/module-template.h @@ -247,8 +247,9 @@ static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ * introduced in v4.3.3 -- rgerhards, 2009-04-27 */ #define BEGINendTransaction \ -static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ +static rsRetVal endTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ + instanceData *pData = NULL; /* deliberately make module abort if it does not support new IF */\ DEFiRet; #define CODESTARTendTransaction /* currently empty, but may be extended */ @@ -506,6 +507,11 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\ *pEtryPoint = tryResume;\ } +/* standard queries for output module interface in rsyslog v8+ */ +#define CODEqueryEtryPt_STD_OMOD8_QUERIES \ + else if(!strcmp((char*) name, "createWrkrInstance")) {\ + *pEtryPoint = createWrkrInstance;\ + } /* the following definition is queryEtryPt block that must be added * if an output module supports the transactional interface. diff --git a/tools/omfile.c b/tools/omfile.c index 2100bfa7..578407d1 100644 --- a/tools/omfile.c +++ b/tools/omfile.c @@ -185,12 +185,14 @@ typedef struct _instanceData { typedef struct linebuf { uchar *filename; /* for dynafiles, make go away */ uchar *ln; - struct linebuf *next; + unsigned iMsgOpts; + struct linebuf *pNext; } linebuf_t; typedef struct wrkrInstanceData { instanceData *pData; linebuf_t *pRoot; + linebuf_t *pLast; } wrkrInstanceData_t; @@ -798,7 +800,7 @@ finalize_it: /* rgerhards 2004-11-11: write to a file output. */ static rsRetVal -writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) +writeFile(instanceData *pData, linebuf_t *linebuf) { DEFiRet; @@ -808,7 +810,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) * check if it still is ok or a new file needs to be created */ if(pData->bDynamicName) { - CHKiRet(prepareDynFile(pData, ppString[1], iMsgOpts)); + CHKiRet(prepareDynFile(pData, linebuf->filename, linebuf->iMsgOpts)); } else { /* "regular", non-dynafile */ if(pData->pStrm == NULL) { CHKiRet(prepareFile(pData, pData->f_fname)); @@ -818,7 +820,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData) } } - CHKiRet(doWrite(pData, ppString[0], strlen(CHAR_CONVERT(ppString[0])))); + CHKiRet(doWrite(pData, linebuf->ln, ustrlen(linebuf->ln))); finalize_it: RETiRet; @@ -944,8 +946,57 @@ CODESTARTbeginTransaction ENDbeginTransaction +static rsRetVal +bufferLine(wrkrInstanceData_t *pWrkrData, uchar *filename, uchar *line) +{ + linebuf_t *lb; + DEFiRet; + + dbgprintf("DDDD: buffering line %s\n", line); + CHKmalloc(lb = (linebuf_t*) malloc(sizeof(linebuf_t))); + CHKmalloc(lb->filename = ustrdup(filename)); + CHKmalloc(lb->ln = ustrdup(line)); + lb->pNext = NULL; + if(pWrkrData->pRoot == NULL) { + pWrkrData->pRoot = pWrkrData->pLast = lb; + } else { + pWrkrData->pLast->pNext = lb; + pWrkrData->pLast = lb; + } +finalize_it: + RETiRet; +} + +static void +submitCachedLines(wrkrInstanceData_t *pWrkrData, instanceData *pData) +{ + linebuf_t *curr, *todel; + + for(curr = pWrkrData->pRoot ; curr != NULL ; ) { + DBGPRINTF("omfile: file to log to: %s\n", curr->filename); + DBGPRINTF("omfile: start of data: '%.128s'\n", curr->ln); + STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests); + writeFile(pData, curr); + + todel = curr; + curr = curr->pNext; + free(todel->filename); + free(todel->ln); + free(todel); + } + pWrkrData->pRoot = NULL; +} + + +BEGINdoAction +CODESTARTdoAction + iRet = bufferLine(pWrkrData, (pData->bDynamicName) ? ppString[1] : pData->f_fname, + ppString[0]); +ENDdoAction + BEGINendTransaction CODESTARTendTransaction + submitCachedLines(pWrkrData, pData); /* Note: pStrm may be NULL if there was an error opening the stream */ if(pData->bFlushOnTXEnd && pData->pStrm != NULL) { /* if we have an async writer, it controls the flush via @@ -959,34 +1010,6 @@ finalize_it: ENDendTransaction -#if 0 -static rsRetVal -doRealAction() -{ - DBGPRINTF("file to log to: %s\n", - (pData->bDynamicName) ? ppString[1] : pData->f_fname); - DBGPRINTF("omfile: start of data: '%.128s'\n", ppString[0]); - STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests); - CHKiRet(writeFile(ppString, iMsgOpts, pData)); - if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) { - CHKiRet(strm.Flush(pData->pStrm)); - } -} -#endif - -static rsRetVal -bufferLine(wrkrInstanceData_t *pWrkrData, uchar *filename, uchar *line) -{ - DEFiRet; - RETiRet; -} - -BEGINdoAction -CODESTARTdoAction - iRet = bufferLine(pWrkrData, (pData->bDynamicName) ? ppString[1] : pData->f_fname, - ppString[0]); -ENDdoAction - static inline void setInstParamDefaults(instanceData *pData) @@ -1389,6 +1412,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES -- cgit v1.2.3