summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rw-r--r--tools/omdiscard.c17
-rw-r--r--tools/omfile.c109
-rw-r--r--tools/omfwd.c17
-rw-r--r--tools/ompipe.c17
-rw-r--r--tools/omshell.c15
-rw-r--r--tools/omusrmsg.c17
-rw-r--r--tools/syslogd.c4
7 files changed, 172 insertions, 24 deletions
diff --git a/tools/omdiscard.c b/tools/omdiscard.c
index 15c6ea82..a76bcc33 100644
--- a/tools/omdiscard.c
+++ b/tools/omdiscard.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-07-24 by RGerhards
*
- * Copyright 2007-2012 Adiscon GmbH.
+ * Copyright 2007-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -49,6 +49,10 @@ typedef struct _instanceData {
EMPTY_STRUCT
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* we do not need a createInstance()!
BEGINcreateInstance
CODESTARTcreateInstance
@@ -56,6 +60,11 @@ ENDcreateInstance
*/
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
/* do nothing */
@@ -87,6 +96,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINparseSelectorAct
CODESTARTparseSelectorAct
CODE_STD_STRING_REQUESTparseSelectorAct(0)
@@ -114,6 +128,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
ENDqueryEtryPt
diff --git a/tools/omfile.c b/tools/omfile.c
index fdcf355a..e8cd24d3 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -133,6 +133,7 @@ typedef struct s_dynaFileCacheEntry dynaFileCacheEntry;
typedef struct _instanceData {
+ pthread_mutex_t mutWrite; /* guard against multiple instances writing to single file */
uchar *f_fname; /* file or template name (display only) */
uchar *tplName; /* name of assigned template */
strm_t *pStrm; /* our output stream */
@@ -181,6 +182,20 @@ typedef struct _instanceData {
STATSCOUNTER_DEF(ctrMax, mutCtrMax);
} instanceData;
+/* to build a linked list for temporary storage of lines while we cannot commit */
+typedef struct linebuf {
+ uchar *filename; /* for dynafiles, make go away */
+ uchar *ln;
+ unsigned iMsgOpts;
+ struct linebuf *pNext;
+} linebuf_t;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ linebuf_t *pRoot;
+ linebuf_t *pLast;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
int iDynaFileCacheSize; /* max cache for dynamic files */
@@ -786,7 +801,7 @@ finalize_it:
/* rgerhards 2004-11-11: write to a file output. */
static rsRetVal
-writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
+writeFile(instanceData *pData, linebuf_t *linebuf)
{
DEFiRet;
@@ -796,7 +811,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
* check if it still is ok or a new file needs to be created
*/
if(pData->bDynamicName) {
- CHKiRet(prepareDynFile(pData, ppString[1], iMsgOpts));
+ CHKiRet(prepareDynFile(pData, linebuf->filename, linebuf->iMsgOpts));
} else { /* "regular", non-dynafile */
if(pData->pStrm == NULL) {
CHKiRet(prepareFile(pData, pData->f_fname));
@@ -806,7 +821,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
}
}
- CHKiRet(doWrite(pData, ppString[0], strlen(CHAR_CONVERT(ppString[0]))));
+ CHKiRet(doWrite(pData, linebuf->ln, ustrlen(linebuf->ln)));
finalize_it:
RETiRet;
@@ -888,9 +903,15 @@ ENDfreeCnf
BEGINcreateInstance
CODESTARTcreateInstance
pData->pStrm = NULL;
+ pthread_mutex_init(&pData->mutWrite, NULL);
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINfreeInstance
CODESTARTfreeInstance
free(pData->tplName);
@@ -913,9 +934,15 @@ CODESTARTfreeInstance
free(pData->cryprovName);
free(pData->cryprovNameFull);
}
+ pthread_mutex_destroy(&pData->mutWrite);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINtryResume
CODESTARTtryResume
ENDtryResume
@@ -926,8 +953,66 @@ CODESTARTbeginTransaction
ENDbeginTransaction
+static rsRetVal
+bufferLine(wrkrInstanceData_t *pWrkrData, uchar *filename, uchar *line)
+{
+ linebuf_t *lb;
+ DEFiRet;
+
+ CHKmalloc(lb = (linebuf_t*) malloc(sizeof(linebuf_t)));
+ CHKmalloc(lb->filename = ustrdup(filename));
+ CHKmalloc(lb->ln = ustrdup(line));
+ lb->pNext = NULL;
+ if(pWrkrData->pRoot == NULL) {
+ pWrkrData->pRoot = pWrkrData->pLast = lb;
+ } else {
+ pWrkrData->pLast->pNext = lb;
+ pWrkrData->pLast = lb;
+ }
+finalize_it:
+ RETiRet;
+}
+
+static void
+submitCachedLines(wrkrInstanceData_t *pWrkrData, instanceData *pData)
+{
+ linebuf_t *curr, *todel;
+
+dbgprintf("omfile: waiting on write lock (pWrkrData %p)\n", pWrkrData);
+ pthread_mutex_lock(&pData->mutWrite);
+dbgprintf("omfile: aquired write lock (pWrkrData %p)\n", pWrkrData);
+
+ for(curr = pWrkrData->pRoot ; curr != NULL ; ) {
+ DBGPRINTF("omfile: file to log to: %s\n", curr->filename);
+ DBGPRINTF("omfile: start of data: '%.128s'\n", curr->ln);
+ STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests);
+ writeFile(pData, curr);
+
+ todel = curr;
+ curr = curr->pNext;
+ free(todel->filename);
+ free(todel->ln);
+ free(todel);
+ }
+ pthread_mutex_unlock(&pData->mutWrite);
+dbgprintf("omfile: free write lock (pWrkrData %p)\n", pWrkrData);
+ pWrkrData->pRoot = NULL;
+}
+
+
+BEGINdoAction
+CODESTARTdoAction
+ pData = pWrkrData->pData;
+ iRet = bufferLine(pWrkrData, (pData->bDynamicName) ? ppString[1] : pData->f_fname,
+ ppString[0]);
+ if(iRet == RS_RET_OK)
+ iRet = RS_RET_DEFER_COMMIT;
+ENDdoAction
+
BEGINendTransaction
CODESTARTendTransaction
+ pData = pWrkrData->pData;
+ submitCachedLines(pWrkrData, pData);
/* Note: pStrm may be NULL if there was an error opening the stream */
if(pData->bFlushOnTXEnd && pData->pStrm != NULL) {
/* if we have an async writer, it controls the flush via
@@ -941,21 +1026,6 @@ finalize_it:
ENDendTransaction
-BEGINdoAction
-CODESTARTdoAction
- DBGPRINTF("file to log to: %s\n",
- (pData->bDynamicName) ? ppString[1] : pData->f_fname);
- DBGPRINTF("omfile: start of data: '%.128s'\n", ppString[0]);
- STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests);
- CHKiRet(writeFile(ppString, iMsgOpts, pData));
- if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) {
- CHKiRet(strm.Flush(pData->pStrm));
- }
-finalize_it:
- if(iRet == RS_RET_OK)
- iRet = RS_RET_DEFER_COMMIT;
-ENDdoAction
-
static inline void
setInstParamDefaults(instanceData *pData)
@@ -1336,6 +1406,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
BEGINdoHUP
CODESTARTdoHUP
+ pthread_mutex_lock(&pData->mutWrite);
if(pData->bDynamicName) {
dynaFileFreeCacheEntries(pData);
} else {
@@ -1343,6 +1414,7 @@ CODESTARTdoHUP
closeFile(pData);
}
}
+ pthread_mutex_unlock(&pData->mutWrite);
ENDdoHUP
@@ -1358,6 +1430,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
diff --git a/tools/omfwd.c b/tools/omfwd.c
index 6e5cf809..a7ba4d01 100644
--- a/tools/omfwd.c
+++ b/tools/omfwd.c
@@ -112,6 +112,10 @@ typedef struct _instanceData {
int errsToReport; /* (remaining) number of errors to report */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* config data */
typedef struct configSettings_s {
uchar *pszTplName; /* name of the default template to use */
@@ -335,6 +339,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -360,6 +369,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
dbgprintf("%s", pData->target);
@@ -720,7 +734,7 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- iRet = doTryResume(pData);
+ iRet = doTryResume(pWrkrData->pData);
ENDtryResume
@@ -1251,6 +1265,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
diff --git a/tools/ompipe.c b/tools/ompipe.c
index df8066b1..eab7f694 100644
--- a/tools/ompipe.c
+++ b/tools/ompipe.c
@@ -12,7 +12,7 @@
* NOTE: read comments in module-template.h to understand how this pipe
* works!
*
- * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -72,6 +72,10 @@ typedef struct _instanceData {
sbool bHadError; /* did we already have/report an error on this pipe? */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
EMPTY_STRUCT
} configSettings_t;
@@ -279,6 +283,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINfreeInstance
CODESTARTfreeInstance
free(pData->pipe);
@@ -287,6 +296,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINtryResume
CODESTARTtryResume
ENDtryResume
@@ -390,6 +404,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_doHUP
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
diff --git a/tools/omshell.c b/tools/omshell.c
index ac62fa62..6680faab 100644
--- a/tools/omshell.c
+++ b/tools/omshell.c
@@ -63,11 +63,20 @@ typedef struct _instanceData {
uchar progName[MAXFNAME]; /* program to execute */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -80,6 +89,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
printf("%s", pData->progName);
@@ -139,6 +153,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
ENDqueryEtryPt
diff --git a/tools/omusrmsg.c b/tools/omusrmsg.c
index f4cc4094..cab2a4fe 100644
--- a/tools/omusrmsg.c
+++ b/tools/omusrmsg.c
@@ -8,7 +8,7 @@
* File begun on 2007-07-20 by RGerhards (extracted from syslogd.c, which at the
* time of the fork from sysklogd was under BSD license)
*
- * Copyright 2007-2012 Adiscon GmbH.
+ * Copyright 2007-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -87,6 +87,10 @@ typedef struct _instanceData {
uchar *tplName;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
EMPTY_STRUCT
} configSettings_t;
@@ -115,6 +119,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -128,6 +137,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
register int i;
CODESTARTdbgPrintInstInfo
@@ -435,6 +449,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/tools/syslogd.c b/tools/syslogd.c
index aaeb9866..7597b05d 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -560,13 +560,13 @@ finalize_it:
* for the main queue.
*/
static rsRetVal
-msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate)
+msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate)
{
DEFiRet;
assert(pBatch != NULL);
pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */
preprocessBatch(pBatch);
- ruleset.ProcessBatch(pBatch);
+ ruleset.ProcessBatch(pBatch, pWti);
//TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we
//do not have this yet and so we emulate -- 2010-06-10
int i;