summaryrefslogtreecommitdiffstats
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rw-r--r--tools/omdiscard.c17
-rw-r--r--tools/omfile.c111
-rw-r--r--tools/omfwd.c24
-rw-r--r--tools/ompipe.c25
-rw-r--r--tools/omshell.c25
-rw-r--r--tools/omusrmsg.c19
-rw-r--r--tools/syslogd.c4
7 files changed, 193 insertions, 32 deletions
diff --git a/tools/omdiscard.c b/tools/omdiscard.c
index 15c6ea82..a76bcc33 100644
--- a/tools/omdiscard.c
+++ b/tools/omdiscard.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-07-24 by RGerhards
*
- * Copyright 2007-2012 Adiscon GmbH.
+ * Copyright 2007-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -49,6 +49,10 @@ typedef struct _instanceData {
EMPTY_STRUCT
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* we do not need a createInstance()!
BEGINcreateInstance
CODESTARTcreateInstance
@@ -56,6 +60,11 @@ ENDcreateInstance
*/
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
/* do nothing */
@@ -87,6 +96,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINparseSelectorAct
CODESTARTparseSelectorAct
CODE_STD_STRING_REQUESTparseSelectorAct(0)
@@ -114,6 +128,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
ENDqueryEtryPt
diff --git a/tools/omfile.c b/tools/omfile.c
index fdcf355a..90efe71a 100644
--- a/tools/omfile.c
+++ b/tools/omfile.c
@@ -133,6 +133,7 @@ typedef struct s_dynaFileCacheEntry dynaFileCacheEntry;
typedef struct _instanceData {
+ pthread_mutex_t mutWrite; /* guard against multiple instances writing to single file */
uchar *f_fname; /* file or template name (display only) */
uchar *tplName; /* name of assigned template */
strm_t *pStrm; /* our output stream */
@@ -181,6 +182,20 @@ typedef struct _instanceData {
STATSCOUNTER_DEF(ctrMax, mutCtrMax);
} instanceData;
+/* to build a linked list for temporary storage of lines while we cannot commit */
+typedef struct linebuf {
+ uchar *filename; /* for dynafiles, make go away */
+ uchar *ln;
+ unsigned iMsgOpts;
+ struct linebuf *pNext;
+} linebuf_t;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ linebuf_t *pRoot;
+ linebuf_t *pLast;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
int iDynaFileCacheSize; /* max cache for dynamic files */
@@ -786,7 +801,7 @@ finalize_it:
/* rgerhards 2004-11-11: write to a file output. */
static rsRetVal
-writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
+writeFile(instanceData *pData, linebuf_t *linebuf)
{
DEFiRet;
@@ -796,7 +811,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
* check if it still is ok or a new file needs to be created
*/
if(pData->bDynamicName) {
- CHKiRet(prepareDynFile(pData, ppString[1], iMsgOpts));
+ CHKiRet(prepareDynFile(pData, linebuf->filename, linebuf->iMsgOpts));
} else { /* "regular", non-dynafile */
if(pData->pStrm == NULL) {
CHKiRet(prepareFile(pData, pData->f_fname));
@@ -806,7 +821,7 @@ writeFile(uchar **ppString, unsigned iMsgOpts, instanceData *pData)
}
}
- CHKiRet(doWrite(pData, ppString[0], strlen(CHAR_CONVERT(ppString[0]))));
+ CHKiRet(doWrite(pData, linebuf->ln, ustrlen(linebuf->ln)));
finalize_it:
RETiRet;
@@ -888,9 +903,15 @@ ENDfreeCnf
BEGINcreateInstance
CODESTARTcreateInstance
pData->pStrm = NULL;
+ pthread_mutex_init(&pData->mutWrite, NULL);
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINfreeInstance
CODESTARTfreeInstance
free(pData->tplName);
@@ -913,9 +934,15 @@ CODESTARTfreeInstance
free(pData->cryprovName);
free(pData->cryprovNameFull);
}
+ pthread_mutex_destroy(&pData->mutWrite);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINtryResume
CODESTARTtryResume
ENDtryResume
@@ -926,8 +953,68 @@ CODESTARTbeginTransaction
ENDbeginTransaction
+static rsRetVal
+bufferLine(wrkrInstanceData_t *pWrkrData, uchar *filename, uchar *line)
+{
+ linebuf_t *lb;
+ DEFiRet;
+
+ CHKmalloc(lb = (linebuf_t*) malloc(sizeof(linebuf_t)));
+ CHKmalloc(lb->filename = ustrdup(filename));
+ CHKmalloc(lb->ln = ustrdup(line));
+ lb->pNext = NULL;
+ if(pWrkrData->pRoot == NULL) {
+ pWrkrData->pRoot = pWrkrData->pLast = lb;
+ } else {
+ pWrkrData->pLast->pNext = lb;
+ pWrkrData->pLast = lb;
+ }
+finalize_it:
+ RETiRet;
+}
+
+static void
+submitCachedLines(wrkrInstanceData_t *pWrkrData, instanceData *pData)
+{
+ linebuf_t *curr, *todel;
+
+dbgprintf("omfile: waiting on write lock (pWrkrData %p)\n", pWrkrData);
+ pthread_mutex_lock(&pData->mutWrite);
+dbgprintf("omfile: aquired write lock (pWrkrData %p)\n", pWrkrData);
+
+ for(curr = pWrkrData->pRoot ; curr != NULL ; ) {
+ DBGPRINTF("omfile: file to log to: %s\n", curr->filename);
+ DBGPRINTF("omfile: start of data: '%.128s'\n", curr->ln);
+ STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests);
+ writeFile(pData, curr);
+
+ todel = curr;
+ curr = curr->pNext;
+ free(todel->filename);
+ free(todel->ln);
+ free(todel);
+ }
+ pthread_mutex_unlock(&pData->mutWrite);
+dbgprintf("omfile: free write lock (pWrkrData %p)\n", pWrkrData);
+ pWrkrData->pRoot = NULL;
+}
+
+
+BEGINdoAction
+ instanceData *pData;
+CODESTARTdoAction
+ pData = pWrkrData->pData;
+ iRet = bufferLine(pWrkrData, (pData->bDynamicName) ? ppString[1] : pData->f_fname,
+ ppString[0]);
+ if(iRet == RS_RET_OK)
+ iRet = RS_RET_DEFER_COMMIT;
+ENDdoAction
+
BEGINendTransaction
+ instanceData *pData;
CODESTARTendTransaction
+ pData = pWrkrData->pData;
+ submitCachedLines(pWrkrData, pData);
/* Note: pStrm may be NULL if there was an error opening the stream */
if(pData->bFlushOnTXEnd && pData->pStrm != NULL) {
/* if we have an async writer, it controls the flush via
@@ -941,21 +1028,6 @@ finalize_it:
ENDendTransaction
-BEGINdoAction
-CODESTARTdoAction
- DBGPRINTF("file to log to: %s\n",
- (pData->bDynamicName) ? ppString[1] : pData->f_fname);
- DBGPRINTF("omfile: start of data: '%.128s'\n", ppString[0]);
- STATSCOUNTER_INC(pData->ctrRequests, pData->mutCtrRequests);
- CHKiRet(writeFile(ppString, iMsgOpts, pData));
- if(!bCoreSupportsBatching && pData->bFlushOnTXEnd) {
- CHKiRet(strm.Flush(pData->pStrm));
- }
-finalize_it:
- if(iRet == RS_RET_OK)
- iRet = RS_RET_DEFER_COMMIT;
-ENDdoAction
-
static inline void
setInstParamDefaults(instanceData *pData)
@@ -1336,6 +1408,7 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
BEGINdoHUP
CODESTARTdoHUP
+ pthread_mutex_lock(&pData->mutWrite);
if(pData->bDynamicName) {
dynaFileFreeCacheEntries(pData);
} else {
@@ -1343,6 +1416,7 @@ CODESTARTdoHUP
closeFile(pData);
}
}
+ pthread_mutex_unlock(&pData->mutWrite);
ENDdoHUP
@@ -1358,6 +1432,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
diff --git a/tools/omfwd.c b/tools/omfwd.c
index 6e5cf809..ed0898c9 100644
--- a/tools/omfwd.c
+++ b/tools/omfwd.c
@@ -112,6 +112,10 @@ typedef struct _instanceData {
int errsToReport; /* (remaining) number of errors to report */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* config data */
typedef struct configSettings_s {
uchar *pszTplName; /* name of the default template to use */
@@ -335,6 +339,12 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ dbgprintf("DDDD: createWrkrInstance: pWrkrData %p\n", pWrkrData);
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -360,6 +370,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
dbgprintf("%s", pData->target);
@@ -720,7 +735,8 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- iRet = doTryResume(pData);
+ dbgprintf("DDDD: tryResume: pWrkrData %p\n", pWrkrData);
+ iRet = doTryResume(pWrkrData->pData);
ENDtryResume
@@ -737,7 +753,10 @@ BEGINdoAction
# ifdef USE_NETZIP
Bytef *out = NULL; /* for compression */
# endif
+ instanceData *pData;
CODESTARTdoAction
+ dbgprintf("DDDD: doAction: pWrkrData %p\n", pWrkrData);
+ pData = pWrkrData->pData;
CHKiRet(doTryResume(pData));
iMaxLine = glbl.GetMaxLine();
@@ -813,7 +832,9 @@ ENDdoAction
BEGINendTransaction
+ instanceData *pData;
CODESTARTendTransaction
+ pData = pWrkrData->pData;
dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf);
if(pData->offsSndBuf != 0) {
iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, IS_FLUSH);
@@ -1251,6 +1272,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
diff --git a/tools/ompipe.c b/tools/ompipe.c
index df8066b1..79f3ae84 100644
--- a/tools/ompipe.c
+++ b/tools/ompipe.c
@@ -12,7 +12,7 @@
* NOTE: read comments in module-template.h to understand how this pipe
* works!
*
- * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -69,9 +69,14 @@ typedef struct _instanceData {
uchar *pipe; /* pipe or template name (display only) */
uchar *tplName; /* format template to use */
short fd; /* pipe descriptor for (current) pipe */
+ pthread_mutex_t mutWrite; /* guard against multiple instances writing to same pipe */
sbool bHadError; /* did we already have/report an error on this pipe? */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
EMPTY_STRUCT
} configSettings_t;
@@ -276,25 +281,42 @@ CODESTARTcreateInstance
pData->pipe = NULL;
pData->fd = -1;
pData->bHadError = 0;
+ pthread_mutex_init(&pData->mutWrite, NULL);
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINfreeInstance
CODESTARTfreeInstance
+ pthread_mutex_destroy(&pData->mutWrite);
free(pData->pipe);
if(pData->fd != -1)
close(pData->fd);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINtryResume
CODESTARTtryResume
ENDtryResume
BEGINdoAction
+ instanceData *pData;
CODESTARTdoAction
+ pData = pWrkrData->pData;
DBGPRINTF(" (%s)\n", pData->pipe);
+ /* this module is single-threaded by nature */
+ pthread_mutex_lock(&pData->mutWrite);
iRet = writePipe(ppString, pData);
+ pthread_mutex_unlock(&pData->mutWrite);
ENDdoAction
@@ -390,6 +412,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_doHUP
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
diff --git a/tools/omshell.c b/tools/omshell.c
index ac62fa62..ad6e979f 100644
--- a/tools/omshell.c
+++ b/tools/omshell.c
@@ -19,7 +19,7 @@
* of the "old" message code without any modifications. However, it
* helps to have things at the right place one we go to the meat of it.
*
- * Copyright 2007-2012 Adiscon GmbH.
+ * Copyright 2007-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -63,11 +63,20 @@ typedef struct _instanceData {
uchar progName[MAXFNAME]; /* program to execute */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -80,6 +89,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
printf("%s", pData->progName);
@@ -92,13 +106,9 @@ ENDtryResume
BEGINdoAction
CODESTARTdoAction
- /* TODO: using pData->progName is not clean from the point of
- * modularization. We'll change that as we go ahead with modularization.
- * rgerhards, 2007-07-20
- */
dbgprintf("\n");
- if(execProg((uchar*) pData->progName, 1, ppString[0]) == 0)
- errmsg.LogError(0, NO_ERRCODE, "Executing program '%s' failed", (char*)pData->progName);
+ if(execProg((uchar*) pWrkrData->pData->progName, 1, ppString[0]) == 0)
+ errmsg.LogError(0, NO_ERRCODE, "Executing program '%s' failed", (char*)pWrkrData->pData->progName);
ENDdoAction
@@ -139,6 +149,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
ENDqueryEtryPt
diff --git a/tools/omusrmsg.c b/tools/omusrmsg.c
index f4cc4094..5d0b088f 100644
--- a/tools/omusrmsg.c
+++ b/tools/omusrmsg.c
@@ -8,7 +8,7 @@
* File begun on 2007-07-20 by RGerhards (extracted from syslogd.c, which at the
* time of the fork from sysklogd was under BSD license)
*
- * Copyright 2007-2012 Adiscon GmbH.
+ * Copyright 2007-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -87,6 +87,10 @@ typedef struct _instanceData {
uchar *tplName;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
EMPTY_STRUCT
} configSettings_t;
@@ -115,6 +119,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -128,6 +137,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
register int i;
CODESTARTdbgPrintInstInfo
@@ -276,7 +290,7 @@ ENDtryResume
BEGINdoAction
CODESTARTdoAction
dbgprintf("\n");
- iRet = wallmsg(ppString[0], pData);
+ iRet = wallmsg(ppString[0], pWrkrData->pData);
ENDdoAction
@@ -435,6 +449,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/tools/syslogd.c b/tools/syslogd.c
index aaeb9866..7597b05d 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -560,13 +560,13 @@ finalize_it:
* for the main queue.
*/
static rsRetVal
-msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShutdownImmediate)
+msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, wti_t *pWti, int *pbShutdownImmediate)
{
DEFiRet;
assert(pBatch != NULL);
pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */
preprocessBatch(pBatch);
- ruleset.ProcessBatch(pBatch);
+ ruleset.ProcessBatch(pBatch, pWti);
//TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we
//do not have this yet and so we emulate -- 2010-06-10
int i;