summaryrefslogtreecommitdiffstats
path: root/tools/omfile.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/omfile.c')
-rw-r--r--tools/omfile.c113
1 files changed, 95 insertions, 18 deletions
diff --git a/tools/omfile.c b/tools/omfile.c
index fdcf355a..b1cbbfd9 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -133,6 +133,7 @@ typedef struct s_dynaFileCacheEntry dynaFileCacheEntry;
typedef struct _instanceData {
+ pthread_mutex_t mutWrite; /* guard against multiple instances writing to single file */
uchar *f_fname; /* file or template name (display only) */
uchar *tplName; /* name of assigned template */
strm_t *pStrm; /* our output stream */
@@ -181,6 +182,20 @@ typedef struct _instanceData {
STATSCOUNTER_DEF(ctrMax, mutCtrMax);
} instanceData;
+/* to build a linked list for temporary storage of lines while we cannot commit */
+typedef struct linebuf {
+ uchar *filename; /* for dynafiles, make go away */
+ uchar *ln;
+ unsigned iMsgOpts;
+ struct linebuf *pNext;
+} linebuf_t;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ linebuf_t *pRoot;
+ linebuf_t *pLast;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
int iDynaFileCacheSize; /* max cache for dynamic files */
@@ -786,7 +801,7 @@ finalize_it:
/* rgerhards 2004-11-11: write to a file output. */
static rsRetVal
-writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
+writeFile(instanceData *pData, linebuf_t *linebuf)
{
DEFiRet;
@@ -796,7 +811,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
* check if it still is ok or a new file needs to be created
*/
if(pData->bDynamicName) {
- CHKiRet(prepareDynFile(pData, ppString[1], iMsgOpts));
+ CHKiRet(prepareDynFile(pData, linebuf->filename, linebuf->iMsgOpts));
} else { /* "regular", non-dynafile */
if(pData->pStrm == NULL) {
CHKiRet(prepareFile(pData, pData->f_fname));
@@ -806,7 +821,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
}
}
- CHKiRet(doWrite(pData, ppString[0], strlen(CHAR_CONVERT(ppString[0]))));
+ CHKiRet(doWrite(pData, linebuf->ln, ustrlen(linebuf->ln)));
finalize_it:
RETiRet;
@@ -888,9 +903,15 @@ ENDfreeCnf
BEGINcreateInstance
CODESTARTcreateInstance
pData->pStrm = NULL;
+ pthread_mutex_init(&pData->mutWrite, NULL);
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINfreeInstance
CODESTARTfreeInstance
free(pData->tplName);
@@ -913,9 +934,15 @@ CODESTARTfreeInstance
free(pData->cryprovName);
free(pData->cryprovNameFull);
}
+ pthread_mutex_destroy(&pData->mutWrite);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINtryResume
CODESTARTtryResume
ENDtryResume
@@ -926,8 +953,68 @@ CODESTARTbeginTransaction
ENDbeginTransaction
+static rsRetVal
+bufferLine(wrkrInstanceData_t *pWrkrData, uchar *filename, uchar *line)
+{
+ linebuf_t *lb;
+ DEFiRet;
+
+ CHKmalloc(lb = (linebuf_t*) malloc(sizeof(linebuf_t)));
+dbgprintf("DDDD: filename '%s'\n", filename);
+ CHKmalloc(lb->filename = ustrdup(filename));
+ CHKmalloc(lb->ln = ustrdup(line));
+ lb->pNext = NULL;
+ if(pWrkrData->pRoot == NULL) {
+ pWrkrData->pRoot = pWrkrData->pLast = lb;
+ } else {
+ pWrkrData->pLast->pNext = lb;
+ pWrkrData->pLast = lb;
+ }
+finalize_it:
+ RETiRet;
+}
+
+static void
+submitCachedLines(wrkrInstanceData_t *pWrkrData, instanceData *pData)
+{
+ linebuf_t *curr, *todel;
+
+ for(curr = pWrkrData->pRoot ; curr != NULL ; ) {
+ DBGPRINTF("omfile: file to log to: %s\n", curr->filename);
+ DBGPRINTF("omfile: start of data: '%.128s'\n", curr->ln);
+ STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests);
+ writeFile(pData, curr);
+
+ todel = curr;
+ curr = curr->pNext;
+ free(todel->filename);
+ free(todel->ln);
+ free(todel);
+ }
+ pWrkrData->pRoot = NULL;
+}
+
+
+BEGINdoAction
+ instanceData *pData;
+CODESTARTdoAction
+ pData = pWrkrData->pData;
+dbgprintf("DDDD: bDynName %d, filename '%s'\n", pData->bDynamicName, ppString[1]);
+ iRet = bufferLine(pWrkrData, (pData->bDynamicName) ? ppString[1] : pData->f_fname,
+ ppString[0]);
+ if(iRet == RS_RET_OK)
+ iRet = RS_RET_DEFER_COMMIT;
+ENDdoAction
+
BEGINendTransaction
+ instanceData *pData;
CODESTARTendTransaction
+ pData = pWrkrData->pData;
+dbgprintf("omfile: waiting on write lock (pWrkrData %p)\n", pWrkrData);
+ pthread_mutex_lock(&pData->mutWrite);
+dbgprintf("omfile: aquired write lock (pWrkrData %p)\n", pWrkrData);
+
+ submitCachedLines(pWrkrData, pData);
/* Note: pStrm may be NULL if there was an error opening the stream */
if(pData->bFlushOnTXEnd && pData->pStrm != NULL) {
/* if we have an async writer, it controls the flush via
@@ -938,24 +1025,11 @@ CODESTARTendTransaction
CHKiRet(strm.Flush(pData->pStrm));
}
finalize_it:
+ pthread_mutex_unlock(&pData->mutWrite);
+dbgprintf("omfile: free write lock (pWrkrData %p)\n", pWrkrData);
ENDendTransaction
-BEGINdoAction
-CODESTARTdoAction
- DBGPRINTF("file to log to: %s\n",
- (pData->bDynamicName) ? ppString[1] : pData->f_fname);
- DBGPRINTF("omfile: start of data: '%.128s'\n", ppString[0]);
- STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests);
- CHKiRet(writeFile(ppString, iMsgOpts, pData));
- if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) {
- CHKiRet(strm.Flush(pData->pStrm));
- }
-finalize_it:
- if(iRet == RS_RET_OK)
- iRet = RS_RET_DEFER_COMMIT;
-ENDdoAction
-
static inline void
setInstParamDefaults(instanceData *pData)
@@ -1336,6 +1410,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
BEGINdoHUP
CODESTARTdoHUP
+ pthread_mutex_lock(&pData->mutWrite);
if(pData->bDynamicName) {
dynaFileFreeCacheEntries(pData);
} else {
@@ -1343,6 +1418,7 @@ CODESTARTdoHUP
closeFile(pData);
}
}
+ pthread_mutex_unlock(&pData->mutWrite);
ENDdoHUP
@@ -1358,6 +1434,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES