summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/impstats/impstats.c2
-rw-r--r--plugins/imuxsock/imuxsock.c2
-rw-r--r--plugins/mmanon/mmanon.c16
-rw-r--r--plugins/mmaudit/mmaudit.c16
-rw-r--r--plugins/mmfields/mmfields.c15
-rw-r--r--plugins/mmjsonparse/mmjsonparse.c45
-rw-r--r--plugins/mmnormalize/Makefile.am4
-rw-r--r--plugins/mmnormalize/mmnormalize.c95
-rw-r--r--plugins/mmpstrucdata/mmpstrucdata.c15
-rw-r--r--plugins/mmsequence/mmsequence.c33
-rw-r--r--plugins/mmutf8fix/mmutf8fix.c21
-rw-r--r--plugins/omelasticsearch/README4
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c228
-rw-r--r--plugins/omhiredis/omhiredis.c120
-rw-r--r--plugins/omjournal/omjournal.c15
-rw-r--r--plugins/omlibdbi/omlibdbi.c29
-rw-r--r--plugins/ommail/ommail.c191
-rw-r--r--plugins/ommongodb/ommongodb.c25
-rw-r--r--plugins/ommysql/ommysql.c136
-rw-r--r--plugins/ompgsql/ompgsql.c35
-rw-r--r--plugins/omprog/omprog.c48
-rw-r--r--plugins/omrelp/omrelp.c135
-rw-r--r--plugins/omruleset/omruleset.c21
-rw-r--r--plugins/omsnmp/omsnmp.c81
-rw-r--r--plugins/omstdout/omstdout.c21
-rw-r--r--plugins/omtesting/omtesting.c24
-rw-r--r--plugins/omudpspoof/omudpspoof.c130
-rw-r--r--plugins/omuxsock/omuxsock.c29
28 files changed, 960 insertions, 576 deletions
diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c
index 694c07c4..a883ef1b 100644
--- a/plugins/impstats/impstats.c
+++ b/plugins/impstats/impstats.c
@@ -377,7 +377,6 @@ checkRuleset(modConfData_t *modConf)
DEFiRet;
modConf->pBindRuleset = NULL; /* assume default ruleset */
-dbgprintf("DDDD: impstats ruleset %s\n", modConf->pszBindRuleset);
if(modConf->pszBindRuleset == NULL)
FINALIZE;
@@ -390,7 +389,6 @@ dbgprintf("DDDD: impstats ruleset %s\n", modConf->pszBindRuleset);
CHKiRet(localRet);
modConf->pBindRuleset = pRuleset;
finalize_it:
-dbgprintf("DDDD: impstats ruleset ptr %p\n", modConf->pBindRuleset);
RETiRet;
}
diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c
index 4bce0c1c..a53ce159 100644
--- a/plugins/imuxsock/imuxsock.c
+++ b/plugins/imuxsock/imuxsock.c
@@ -398,7 +398,7 @@ addListner(instanceConf_t *inst)
listeners[nfd].flags = inst->bIgnoreTimestamp ? IGNDATE : NOFLAG;
listeners[nfd].bCreatePath = inst->bCreatePath;
listeners[nfd].sockName = ustrdup(inst->sockName);
- listeners[nfd].bUseCreds = (inst->bDiscardOwnMsgs || inst->bWritePid || inst->ratelimitInterval || inst->bAnnotate) ? 1 : 0;
+ listeners[nfd].bUseCreds = (inst->bDiscardOwnMsgs || inst->bWritePid || inst->ratelimitInterval || inst->bAnnotate || inst->bUseSysTimeStamp) ? 1 : 0;
listeners[nfd].bAnnotate = inst->bAnnotate;
listeners[nfd].bParseTrusted = inst->bParseTrusted;
listeners[nfd].bDiscardOwnMsgs = inst->bDiscardOwnMsgs;
diff --git a/plugins/mmanon/mmanon.c b/plugins/mmanon/mmanon.c
index 16a4f34b..28797807 100644
--- a/plugins/mmanon/mmanon.c
+++ b/plugins/mmanon/mmanon.c
@@ -71,6 +71,10 @@ typedef struct _instanceData {
} ipv4;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -119,6 +123,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -130,6 +138,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
static inline void
setInstParamDefaults(instanceData *pData)
{
@@ -358,7 +371,7 @@ CODESTARTdoAction
lenMsg = getMSGLen(pMsg);
msg = getMSG(pMsg);
for(i = 0 ; i < lenMsg ; ++i) {
- anonip(pData, msg, &lenMsg, &i);
+ anonip(pWrkrData->pData, msg, &lenMsg, &i);
}
if(lenMsg != getMSGLen(pMsg))
setMSGLen(pMsg, lenMsg);
@@ -387,6 +400,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmaudit/mmaudit.c b/plugins/mmaudit/mmaudit.c
index c7cff2cb..75f8dd4b 100644
--- a/plugins/mmaudit/mmaudit.c
+++ b/plugins/mmaudit/mmaudit.c
@@ -14,7 +14,7 @@
*
* File begun on 2012-02-23 by RGerhards
*
- * Copyright 2012 Adiscon GmbH.
+ * Copyright 2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -69,6 +69,11 @@ typedef struct _instanceData {
int dummy; /* remove when the first real parameter is needed */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
+
BEGINinitConfVars /* (re)set config variables to default values */
CODESTARTinitConfVars
resetConfigVariables(NULL, NULL);
@@ -79,6 +84,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -89,6 +98,10 @@ BEGINfreeInstance
CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -302,6 +315,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmfields/mmfields.c b/plugins/mmfields/mmfields.c
index fa7fa100..c408a6c9 100644
--- a/plugins/mmfields/mmfields.c
+++ b/plugins/mmfields/mmfields.c
@@ -56,6 +56,10 @@ typedef struct _instanceData {
uchar *jsonRoot; /**< container where to store fields */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -103,6 +107,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -114,6 +122,10 @@ CODESTARTfreeInstance
free(pData->jsonRoot);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
static inline void
setInstParamDefaults(instanceData *pData)
@@ -232,7 +244,7 @@ CODESTARTdoAction
pMsg = (msg_t*) ppString[0];
lenMsg = getMSGLen(pMsg);
msg = getMSG(pMsg);
- CHKiRet(parse_fields(pData, pMsg, msg, lenMsg));
+ CHKiRet(parse_fields(pWrkrData->pData, pMsg, msg, lenMsg));
finalize_it:
ENDdoAction
@@ -259,6 +271,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmjsonparse/mmjsonparse.c b/plugins/mmjsonparse/mmjsonparse.c
index b16aef0e..9c0ab88f 100644
--- a/plugins/mmjsonparse/mmjsonparse.c
+++ b/plugins/mmjsonparse/mmjsonparse.c
@@ -58,9 +58,15 @@ DEFobjCurrIf(errmsg);
DEF_OMOD_STATIC_DATA
typedef struct _instanceData {
- struct json_tokener *tokener;
+ int dummy; /* not needed, but some compilers do not support empty structs */
+ /* REMOVE dummy when real data items are to be added! */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ struct json_tokener *tokener;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -94,14 +100,18 @@ ENDfreeCnf
BEGINcreateInstance
CODESTARTcreateInstance
- pData->tokener = json_tokener_new();
- if(pData->tokener == NULL) {
+ENDcreateInstance
+
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->tokener = json_tokener_new();
+ if(pWrkrData->tokener == NULL) {
errmsg.LogError(0, RS_RET_ERR, "error: could not create json "
- "tokener, cannot activate action");
+ "tokener, cannot activate instance");
ABORT_FINALIZE(RS_RET_ERR);
}
finalize_it:
-ENDcreateInstance
+ENDcreateWrkrInstance
BEGINisCompatibleWithFeature
@@ -111,10 +121,14 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
- if(pData->tokener != NULL)
- json_tokener_free(pData->tokener);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ if(pWrkrData->tokener != NULL)
+ json_tokener_free(pWrkrData->tokener);
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -128,28 +142,28 @@ ENDtryResume
static rsRetVal
-processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf)
+processJSON(wrkrInstanceData_t *pWrkrData, msg_t *pMsg, char *buf, size_t lenBuf)
{
struct json_object *json;
const char *errMsg;
DEFiRet;
- assert(pData->tokener != NULL);
+ assert(pWrkrData->tokener != NULL);
DBGPRINTF("mmjsonparse: toParse: '%s'\n", buf);
- json_tokener_reset(pData->tokener);
+ json_tokener_reset(pWrkrData->tokener);
- json = json_tokener_parse_ex(pData->tokener, buf, lenBuf);
+ json = json_tokener_parse_ex(pWrkrData->tokener, buf, lenBuf);
if(Debug) {
errMsg = NULL;
if(json == NULL) {
enum json_tokener_error err;
- err = pData->tokener->err;
+ err = pWrkrData->tokener->err;
if(err != json_tokener_continue)
errMsg = json_tokener_errors[err];
else
errMsg = "Unterminated input";
- } else if((size_t)pData->tokener->char_offset < lenBuf)
+ } else if((size_t)pWrkrData->tokener->char_offset < lenBuf)
errMsg = "Extra characters after JSON object";
else if(!json_object_is_type(json, json_type_object))
errMsg = "JSON value is not an object";
@@ -159,7 +173,7 @@ processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf)
}
}
if(json == NULL
- || ((size_t)pData->tokener->char_offset < lenBuf)
+ || ((size_t)pWrkrData->tokener->char_offset < lenBuf)
|| (!json_object_is_type(json, json_type_object))) {
ABORT_FINALIZE(RS_RET_NO_CEE_MSG);
}
@@ -194,7 +208,7 @@ CODESTARTdoAction
ABORT_FINALIZE(RS_RET_NO_CEE_MSG);
}
buf += LEN_COOKIE;
- CHKiRet(processJSON(pData, pMsg, (char*) buf, strlen((char*)buf)));
+ CHKiRet(processJSON(pWrkrData, pMsg, (char*) buf, strlen((char*)buf)));
bSuccess = 1;
finalize_it:
if(iRet == RS_RET_NO_CEE_MSG) {
@@ -257,6 +271,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmnormalize/Makefile.am b/plugins/mmnormalize/Makefile.am
index 0a3b5ba5..6a50264d 100644
--- a/plugins/mmnormalize/Makefile.am
+++ b/plugins/mmnormalize/Makefile.am
@@ -1,8 +1,8 @@
pkglib_LTLIBRARIES = mmnormalize.la
mmnormalize_la_SOURCES = mmnormalize.c
-mmnormalize_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(LIBLOGNORM_CFLAGS) $(LIBEE_CFLAGS)
-mmnormalize_la_LDFLAGS = -module -avoid-version $(LIBLOGNORM_LIBS) $(LIBEE_LIBS)
+mmnormalize_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(LIBLOGNORM_CFLAGS)
+mmnormalize_la_LDFLAGS = -module -avoid-version $(LIBLOGNORM_LIBS)
mmnormalize_la_LIBADD =
EXTRA_DIST =
diff --git a/plugins/mmnormalize/mmnormalize.c b/plugins/mmnormalize/mmnormalize.c
index 7e25824a..ba2e730d 100644
--- a/plugins/mmnormalize/mmnormalize.c
+++ b/plugins/mmnormalize/mmnormalize.c
@@ -1,15 +1,12 @@
/* mmnormalize.c
* This is a message modification module. It normalizes the input message with
- * the help of liblognorm. The messages EE event structure is updated.
+ * the help of liblognorm. The message's JSON variables are updated.
*
* NOTE: read comments in module-template.h for details on the calling interface!
*
- * TODO: check if we can replace libee via JSON system - currently that part
- * is pretty inefficient... rgerhards, 2012-08-27
- *
* File begun on 2010-01-01 by RGerhards
*
- * Copyright 2010-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2010-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -39,7 +36,6 @@
#include <errno.h>
#include <unistd.h>
#include <libestr.h>
-#include <libee/libee.h>
#include <json.h>
#include <liblognorm.h>
#include "conf.h"
@@ -67,9 +63,13 @@ typedef struct _instanceData {
sbool bUseRawMsg; /**< use %rawmsg% instead of %msg% */
uchar *rulebase; /**< name of rulebase to use */
ln_ctx ctxln; /**< context to be used for liblognorm */
- ee_ctx ctxee; /**< context to be used for libee */
+ char *pszPath; /**< path of normalized data */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
uchar *rulebase; /**< name of normalization rulebase to use */
int bUseRawMsg; /**< use %rawmsg% instead of %msg% */
@@ -80,6 +80,7 @@ static configSettings_t cs;
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "rulebase", eCmdHdlrGetWord, 1 },
+ { "path", eCmdHdlrGetWord, 0 },
{ "userawmsg", eCmdHdlrBinary, 0 }
};
static struct cnfparamblk actpblk =
@@ -96,30 +97,21 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
-/* to be called to build the libee part of the instance ONCE ALL PARAMETERS ARE CORRECT
+/* to be called to build the liblognorm part of the instance ONCE ALL PARAMETERS ARE CORRECT
* (and set within pData!).
*/
static rsRetVal
buildInstance(instanceData *pData)
{
DEFiRet;
- if((pData->ctxee = ee_initCtx()) == NULL) {
- errmsg.LogError(0, RS_RET_ERR_LIBEE_INIT, "error: could not initialize libee "
- "ctx, cannot activate action");
- ABORT_FINALIZE(RS_RET_ERR_LIBEE_INIT);
- }
-
if((pData->ctxln = ln_initCtx()) == NULL) {
errmsg.LogError(0, RS_RET_ERR_LIBLOGNORM_INIT, "error: could not initialize "
"liblognorm ctx, cannot activate action");
- ee_exitCtx(pData->ctxee);
ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_INIT);
}
- ln_setEECtx(pData->ctxln, pData->ctxee);
if(ln_loadSamples(pData->ctxln, (char*) pData->rulebase) != 0) {
errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' "
- "could not be loaded cannot activate action", cs.rulebase);
- ee_exitCtx(pData->ctxee);
+ "could not be loaded cannot activate action", pData->rulebase);
ln_exitCtx(pData->ctxln);
ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
}
@@ -139,6 +131,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
loadModConf = pModConf;
@@ -176,11 +173,16 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
free(pData->rulebase);
- ee_exitCtx(pData->ctxee);
ln_exitCtx(pData->ctxln);
+ free(pData->pszPath);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
dbgprintf("mmnormalize\n");
@@ -193,50 +195,28 @@ ENDtryResume
BEGINdoAction
msg_t *pMsg;
- es_str_t *str;
uchar *buf;
- char *cstrJSON;
int len;
int r;
- struct ee_event *event = NULL;
- struct json_tokener *tokener;
- struct json_object *json;
+ struct json_object *json = NULL;
CODESTARTdoAction
pMsg = (msg_t*) ppString[0];
- /* note that we can performance-optimize the interface, but this also
- * requires changes to the libraries. For now, we accept message
- * duplication. -- rgerhards, 2010-12-01
- */
- if(pData->bUseRawMsg) {
+ if(pWrkrData->pData->bUseRawMsg) {
getRawMsg(pMsg, &buf, &len);
} else {
buf = getMSG(pMsg);
len = getMSGLen(pMsg);
}
- str = es_newStrFromCStr((char*)buf, len);
- r = ln_normalize(pData->ctxln, str, &event);
+ r = ln_normalize(pWrkrData->pData->ctxln, (char*)buf, len, &json);
if(r != 0) {
DBGPRINTF("error %d during ln_normalize\n", r);
MsgSetParseSuccess(pMsg, 0);
} else {
MsgSetParseSuccess(pMsg, 1);
}
- es_deleteStr(str);
-
- /* reformat to our json data struct */
- /* TODO: this is all extremly ineffcient! */
- ee_fmtEventToJSON(event, &str);
- cstrJSON = es_str2cstr(str, NULL);
- ee_deleteEvent(event);
- dbgprintf("mmnormalize generated: %s\n", cstrJSON);
-
- tokener = json_tokener_new();
- json = json_tokener_parse_ex(tokener, cstrJSON, strlen((char*)cstrJSON));
- json_tokener_free(tokener);
- msgAddJSON(pMsg, (uchar*)"!", json);
-
- free(cstrJSON);
- es_deleteStr(str);
+
+ msgAddJSON(pMsg, (uchar*)pWrkrData->pData->pszPath + 1, json);
+
ENDdoAction
@@ -245,12 +225,14 @@ setInstParamDefaults(instanceData *pData)
{
pData->rulebase = NULL;
pData->bUseRawMsg = 0;
+ pData->pszPath = strdup("$!");
}
BEGINnewActInst
struct cnfparamvals *pvals;
int i;
int bDestructPValsOnExit;
+ char *cstr;
CODESTARTnewActInst
DBGPRINTF("newActInst (mmnormalize)\n");
@@ -278,6 +260,23 @@ CODESTARTnewActInst
pData->rulebase = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "userawmsg")) {
pData->bUseRawMsg = (int) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "path")) {
+ cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
+ if (strlen(cstr) < 2) {
+ errmsg.LogError(0, RS_RET_VALUE_NOT_SUPPORTED,
+ "mmnormalize: valid path name should be at least "
+ "2 symbols long, got %s", cstr);
+ free(cstr);
+ } else if (cstr[0] != '$') {
+ errmsg.LogError(0, RS_RET_VALUE_NOT_SUPPORTED,
+ "mmnormalize: valid path name should start with $,"
+ "got %s", cstr);
+ free(cstr);
+ } else {
+ free(pData->pszPath);
+ pData->pszPath = cstr;
+ }
+ continue;
} else {
DBGPRINTF("mmnormalize: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
@@ -313,6 +312,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
pData->rulebase = cs.rulebase;
pData->bUseRawMsg = cs.bUseRawMsg;
+ pData->pszPath = strdup("$!"); /* old interface does not support this feature */
/* all config vars auto-reset! */
cs.bUseRawMsg = 0;
cs.rulebase = NULL; /* we used it up! */
@@ -338,6 +338,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmpstrucdata/mmpstrucdata.c b/plugins/mmpstrucdata/mmpstrucdata.c
index 123363bc..680ba92b 100644
--- a/plugins/mmpstrucdata/mmpstrucdata.c
+++ b/plugins/mmpstrucdata/mmpstrucdata.c
@@ -53,6 +53,10 @@ typedef struct _instanceData {
uchar *jsonRoot; /**< container where to store fields */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -99,6 +103,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -110,6 +118,10 @@ CODESTARTfreeInstance
free(pData->jsonRoot);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
static inline void
setInstParamDefaults(instanceData *pData)
@@ -359,7 +371,7 @@ dbgprintf("DDDD: parse mmpstrucdata\n");
/* don't check return code - we never want rsyslog to retry
* or suspend this action!
*/
- parse_sd(pData, pMsg);
+ parse_sd(pWrkrData->pData, pMsg);
dbgprintf("DDDD: done parse mmpstrucdata\n");
finalize_it:
ENDdoAction
@@ -387,6 +399,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmsequence/mmsequence.c b/plugins/mmsequence/mmsequence.c
index 20a85370..d1ea85b6 100644
--- a/plugins/mmsequence/mmsequence.c
+++ b/plugins/mmsequence/mmsequence.c
@@ -74,6 +74,10 @@ typedef struct _instanceData {
char *pszVar;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -100,6 +104,8 @@ static struct cnfparamblk actpblk =
/* table for key-counter pairs */
static struct hashtable *ght;
static pthread_mutex_t ght_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static pthread_mutex_t inst_mutex = PTHREAD_MUTEX_INITIALIZER;
BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
@@ -129,6 +135,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -139,6 +149,10 @@ BEGINfreeInstance
CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
static inline void
setInstParamDefaults(instanceData *pData)
@@ -243,7 +257,7 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_ERR);
}
}
- pthread_mutex_unlock(&ght_mutex);
+ pthread_mutex_unlock(&ght_mutex);
break;
default:
errmsg.LogError(0, RS_RET_INVLD_MODE,
@@ -303,7 +317,9 @@ BEGINdoAction
struct json_object *json;
int val = 0;
int *pCounter;
+ instanceData *pData;
CODESTARTdoAction
+ pData = pWrkrData->pData;
pMsg = (msg_t*) ppString[0];
switch(pData->mode) {
@@ -312,12 +328,18 @@ CODESTARTdoAction
(pData->valueTo - pData->valueFrom));
break;
case mmSequencePerInstance:
- if (pData->value >= pData->valueTo - pData->step) {
- pData->value = pData->valueFrom;
+ if (!pthread_mutex_lock(&inst_mutex)) {
+ if (pData->value >= pData->valueTo - pData->step) {
+ pData->value = pData->valueFrom;
+ } else {
+ pData->value += pData->step;
+ }
+ val = pData->value;
+ pthread_mutex_unlock(&inst_mutex);
} else {
- pData->value += pData->step;
+ errmsg.LogError(0, RS_RET_ERR,
+ "mmsequence: mutex lock has failed!");
}
- val = pData->value;
break;
case mmSequencePerKey:
if (!pthread_mutex_lock(&ght_mutex)) {
@@ -381,6 +403,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmutf8fix/mmutf8fix.c b/plugins/mmutf8fix/mmutf8fix.c
index e2077950..351bb129 100644
--- a/plugins/mmutf8fix/mmutf8fix.c
+++ b/plugins/mmutf8fix/mmutf8fix.c
@@ -60,6 +60,10 @@ typedef struct _instanceData {
uint8_t mode; /* operations mode */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -108,6 +112,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
@@ -118,6 +127,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
static inline void
setInstParamDefaults(instanceData *pData)
{
@@ -274,10 +288,10 @@ CODESTARTdoAction
pMsg = (msg_t*) ppString[0];
lenMsg = getMSGLen(pMsg);
msg = getMSG(pMsg);
- if(pData->mode == MODE_CC) {
- doCC(pData, msg, lenMsg);
+ if(pWrkrData->pData->mode == MODE_CC) {
+ doCC(pWrkrData->pData, msg, lenMsg);
} else {
- doUTF8(pData, msg, lenMsg);
+ doUTF8(pWrkrData->pData, msg, lenMsg);
}
ENDdoAction
@@ -304,6 +318,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omelasticsearch/README b/plugins/omelasticsearch/README
index 9021bc0e..b8bf4151 100644
--- a/plugins/omelasticsearch/README
+++ b/plugins/omelasticsearch/README
@@ -1,3 +1,7 @@
+How to access ElasticSearch on local machine (for testing):
+===========================================================
+see: https://github.com/mobz/elasticsearch-head
+
How to produce an error:
========================
It's quite easy to get 400, if you put a wrong mapping to your
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index eb82c35f..b878050d 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -69,8 +69,8 @@ STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
typedef struct curl_slist HEADER;
typedef struct _instanceData {
int port;
- int replyLen;
int fdErrFile; /* error file fd or -1 if not open */
+ pthread_mutex_t mutErrFile;
uchar *server;
uchar *uid;
uchar *pwd;
@@ -80,24 +80,29 @@ typedef struct _instanceData {
uchar *tplName;
uchar *timeout;
uchar *bulkId;
- uchar *restURL; /* last used URL for error reporting */
uchar *errorFile;
- char *reply;
sbool dynSrchIdx;
sbool dynSrchType;
sbool dynParent;
sbool dynBulkId;
sbool bulkmode;
sbool asyncRepl;
+} instanceData;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ int replyLen;
+ char *reply;
+ CURL *curlHandle; /* libcurl session handle */
+ HEADER *postHeader; /* json POST request info */
+ uchar *restURL; /* last used URL for error reporting */
struct {
es_str_t *data;
int nmemb; /* number of messages in batch (for statistics counting) */
uchar *currTpl1;
uchar *currTpl2;
} batch;
- CURL *curlHandle; /* libcurl session handle */
- HEADER *postHeader; /* json POST request info */
-} instanceData;
+} wrkrInstanceData_t;
/* tables for interfacing with the v6 config system */
@@ -117,7 +122,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "asyncrepl", eCmdHdlrBinary, 0 },
{ "timeout", eCmdHdlrGetWord, 0 },
{ "errorfile", eCmdHdlrGetWord, 0 },
- { "template", eCmdHdlrGetWord, 1 },
+ { "template", eCmdHdlrGetWord, 0 },
{ "dynbulkid", eCmdHdlrBinary, 0 },
{ "bulkid", eCmdHdlrGetWord, 0 },
};
@@ -127,12 +132,32 @@ static struct cnfparamblk actpblk =
actpdescr
};
+static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData);
+
BEGINcreateInstance
CODESTARTcreateInstance
- pData->restURL = NULL;
pData->fdErrFile = -1;
+ pthread_mutex_init(&pData->mutErrFile, NULL);
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+dbgprintf("omelasticsearch: createWrkrInstance\n");
+ pWrkrData->restURL = NULL;
+ if(pData->bulkmode) {
+ pWrkrData->batch.currTpl1 = NULL;
+ pWrkrData->batch.currTpl2 = NULL;
+ if((pWrkrData->batch.data = es_newStr(1024)) == NULL) {
+ DBGPRINTF("omelasticsearch: error creating batch string "
+ "turned off bulk mode\n");
+ pData->bulkmode = 0; /* at least it works */
+ }
+ }
+ CHKiRet(curlSetup(pWrkrData, pWrkrData->pData));
+finalize_it:
+dbgprintf("DDDD: createWrkrInstance,pData %p/%p, pWrkrData %p\n", pData, pWrkrData->pData, pWrkrData);
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -141,16 +166,9 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
- if (pData->postHeader) {
- curl_slist_free_all(pData->postHeader);
- pData->postHeader = NULL;
- }
- if (pData->curlHandle) {
- curl_easy_cleanup(pData->curlHandle);
- pData->curlHandle = NULL;
- }
if(pData->fdErrFile != -1)
close(pData->fdErrFile);
+ pthread_mutex_destroy(&pData->mutErrFile);
free(pData->server);
free(pData->uid);
free(pData->pwd);
@@ -159,11 +177,23 @@ CODESTARTfreeInstance
free(pData->parent);
free(pData->tplName);
free(pData->timeout);
- free(pData->restURL);
free(pData->errorFile);
free(pData->bulkId);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ if(pWrkrData->postHeader) {
+ curl_slist_free_all(pWrkrData->postHeader);
+ pWrkrData->postHeader = NULL;
+ }
+ if(pWrkrData->curlHandle) {
+ curl_easy_cleanup(pWrkrData->curlHandle);
+ pWrkrData->curlHandle = NULL;
+ }
+ free(pWrkrData->restURL);
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
dbgprintf("omelasticsearch\n");
@@ -211,7 +241,7 @@ setBaseURL(instanceData *pData, es_str_t **url)
static inline rsRetVal
-checkConn(instanceData *pData)
+checkConn(wrkrInstanceData_t *pWrkrData)
{
es_str_t *url;
CURL *curl = NULL;
@@ -219,7 +249,7 @@ checkConn(instanceData *pData)
char *cstr;
DEFiRet;
- setBaseURL(pData, &url);
+ setBaseURL(pWrkrData->pData, &url);
curl = curl_easy_init();
if(curl == NULL) {
DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n");
@@ -235,16 +265,16 @@ checkConn(instanceData *pData)
curl_easy_setopt(curl, CURLOPT_URL, cstr);
free(cstr);
- pData->reply = NULL;
- pData->replyLen = 0;
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
+ pWrkrData->reply = NULL;
+ pWrkrData->replyLen = 0;
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData);
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() "
"failed: %s\n", curl_easy_strerror(res));
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
- free(pData->reply);
+ free(pWrkrData->reply);
DBGPRINTF("omelasticsearch: checkConn() completed with success\n");
finalize_it:
@@ -257,7 +287,7 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
DBGPRINTF("omelasticsearch: tryResume called\n");
- iRet = checkConn(pData);
+ iRet = checkConn(pWrkrData);
ENDtryResume
@@ -330,7 +360,7 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls,
static rsRetVal
-setCurlURL(instanceData *pData, uchar **tpls)
+setCurlURL(wrkrInstanceData_t *pWrkrData, instanceData *pData, uchar **tpls)
{
char authBuf[1024];
uchar *searchIndex;
@@ -368,11 +398,11 @@ setCurlURL(instanceData *pData, uchar **tpls)
if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent));
}
- free(pData->restURL);
- pData->restURL = (uchar*)es_str2cstr(url, NULL);
- curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL);
+ free(pWrkrData->restURL);
+ pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL);
+ curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_URL, pWrkrData->restURL);
es_deleteStr(url);
- DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL);
+ DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pWrkrData->restURL);
if(pData->uid != NULL) {
rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid,
@@ -383,8 +413,8 @@ setCurlURL(instanceData *pData, uchar **tpls)
rLocal);
ABORT_FINALIZE(RS_RET_ERR);
}
- curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf);
- curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
+ curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_USERPWD, authBuf);
+ curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
}
finalize_it:
RETiRet;
@@ -396,7 +426,7 @@ finalize_it:
* index changes.
*/
static rsRetVal
-buildBatch(instanceData *pData, uchar *message, uchar **tpls)
+buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
{
int length = strlen((char *)message);
int r;
@@ -411,29 +441,29 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
# define META_ID "\", \"_id\":\""
# define META_END "\"}}\n"
- getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
- r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
- if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
+ getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
+ r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
- if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
- if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType,
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType,
ustrlen(searchType));
if(parent != NULL) {
- if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
- if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent));
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent));
}
if(bulkId != NULL) {
- if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1);
- if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId));
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1);
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId));
}
- if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1);
- if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length);
- if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1);
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1);
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length);
+ if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1);
if(r != 0) {
DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r);
ABORT_FINALIZE(RS_RET_ERR);
}
- ++pData->batch.nmemb;
+ ++pWrkrData->batch.nmemb;
iRet = RS_RET_DEFER_COMMIT;
finalize_it:
@@ -446,7 +476,7 @@ finalize_it:
* needs to be closed, HUP must be sent.
*/
static inline rsRetVal
-writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
+writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
{
char *rendered = NULL;
cJSON *errRoot;
@@ -454,6 +484,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
cJSON *replyRoot = *pReplyRoot;
size_t toWrite;
ssize_t wrRet;
+ sbool bMutLocked = 0;
char errStr[1024];
DEFiRet;
@@ -463,6 +494,9 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
FINALIZE;
}
+ pthread_mutex_lock(&pData->mutErrFile);
+ bMutLocked = 1;
+
if(pData->fdErrFile == -1) {
pData->fdErrFile = open((char*)pData->errorFile,
O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC,
@@ -474,7 +508,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
}
}
if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
- cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL));
+ cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pWrkrData->restURL));
cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg));
if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
@@ -495,13 +529,15 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
*pReplyRoot = NULL; /* tell caller not to delete once again! */
finalize_it:
+ if(bMutLocked)
+ pthread_mutex_unlock(&pData->mutErrFile);
free(rendered);
RETiRet;
}
static inline rsRetVal
-checkResultBulkmode(instanceData *pData, cJSON *root)
+checkResultBulkmode(wrkrInstanceData_t *pWrkrData, cJSON *root)
{
int i;
int numitems;
@@ -515,7 +551,7 @@ checkResultBulkmode(instanceData *pData, cJSON *root)
if(items == NULL || items->type != cJSON_Array) {
DBGPRINTF("omelasticsearch: error in elasticsearch reply: "
"bulkmode insert does not return array, reply is: %s\n",
- pData->reply);
+ pWrkrData->reply);
ABORT_FINALIZE(RS_RET_DATAFAIL);
}
numitems = cJSON_GetArraySize(items);
@@ -547,20 +583,20 @@ finalize_it:
static inline rsRetVal
-checkResult(instanceData *pData, uchar *reqmsg)
+checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
{
cJSON *root;
cJSON *ok;
DEFiRet;
- root = cJSON_Parse(pData->reply);
+ root = cJSON_Parse(pWrkrData->reply);
if(root == NULL) {
DBGPRINTF("omelasticsearch: could not parse JSON result \n");
ABORT_FINALIZE(RS_RET_ERR);
}
- if(pData->bulkmode) {
- iRet = checkResultBulkmode(pData, root);
+ if(pWrkrData->pData->bulkmode) {
+ iRet = checkResultBulkmode(pWrkrData, root);
} else {
ok = cJSON_GetObjectItem(root, "ok");
if(ok == NULL || ok->type != cJSON_True) {
@@ -572,7 +608,7 @@ checkResult(instanceData *pData, uchar *reqmsg)
* these in any case.
*/
if(iRet == RS_RET_DATAFAIL) {
- writeDataError(pData, &root, reqmsg);
+ writeDataError(pWrkrData, pWrkrData->pData, &root, reqmsg);
iRet = RS_RET_OK; /* we have handled the problem! */
}
@@ -587,19 +623,19 @@ finalize_it:
static rsRetVal
-curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsgs)
+curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls, int nmsgs)
{
CURLcode code;
- CURL *curl = pData->curlHandle;
+ CURL *curl = pWrkrData->curlHandle;
DEFiRet;
- pData->reply = NULL;
- pData->replyLen = 0;
+ pWrkrData->reply = NULL;
+ pWrkrData->replyLen = 0;
- if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent)
- CHKiRet(setCurlURL(pData, tpls));
+ if(pWrkrData->pData->dynSrchIdx || pWrkrData->pData->dynSrchType || pWrkrData->pData->dynParent)
+ CHKiRet(setCurlURL(pWrkrData, pWrkrData->pData, tpls));
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
code = curl_easy_perform(curl);
@@ -618,27 +654,27 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsg
break;
}
- DBGPRINTF("omelasticsearch: pData replyLen = '%d'\n", pData->replyLen);
- if (pData->replyLen > 0) {
- pData->reply[pData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */
+ DBGPRINTF("omelasticsearch: pWrkrData replyLen = '%d'\n", pWrkrData->replyLen);
+ if(pWrkrData->replyLen > 0) {
+ pWrkrData->reply[pWrkrData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */
}
- DBGPRINTF("omelasticsearch: pData reply: '%s'\n", pData->reply);
+ DBGPRINTF("omelasticsearch: pWrkrData reply: '%s'\n", pWrkrData->reply);
- CHKiRet(checkResult(pData, message));
+ CHKiRet(checkResult(pWrkrData, message));
finalize_it:
- free(pData->reply);
+ free(pWrkrData->reply);
RETiRet;
}
BEGINbeginTransaction
CODESTARTbeginTransaction
-dbgprintf("omelasticsearch: beginTransaction\n");
- if(!pData->bulkmode) {
+dbgprintf("omelasticsearch: beginTransaction, pWrkrData %p, pData %p\n", pWrkrData, pWrkrData->pData);
+ if(!pWrkrData->pData->bulkmode) {
FINALIZE;
}
- es_emptyStr(pData->batch.data);
- pData->batch.nmemb = 0;
+ es_emptyStr(pWrkrData->batch.data);
+ pWrkrData->batch.nmemb = 0;
finalize_it:
ENDbeginTransaction
@@ -646,14 +682,14 @@ ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
- if(pData->bulkmode) {
- CHKiRet(buildBatch(pData, ppString[0], ppString));
+ if(pWrkrData->pData->bulkmode) {
+ CHKiRet(buildBatch(pWrkrData, ppString[0], ppString));
} else {
- CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
+ CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]),
ppString, 1));
}
finalize_it:
-dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode);
+dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pWrkrData->pData->bulkmode);
ENDdoAction
@@ -662,13 +698,13 @@ BEGINendTransaction
CODESTARTendTransaction
dbgprintf("omelasticsearch: endTransaction init\n");
/* End Transaction only if batch data is not empty */
- if (pData->batch.data != NULL ) {
- cstr = es_str2cstr(pData->batch.data, NULL);
+ if (pWrkrData->batch.data != NULL ) {
+ cstr = es_str2cstr(pWrkrData->batch.data, NULL);
dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr);
- CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, pData->batch.nmemb));
+ CHKiRet(curlPost(pWrkrData, (uchar*) cstr, strlen(cstr), NULL, pWrkrData->batch.nmemb));
}
else
- dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n");
+ dbgprintf("omelasticsearch: endTransaction, pWrkrData->batch.data is NULL, nothing to send. \n");
finalize_it:
free(cstr);
dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet);
@@ -679,24 +715,24 @@ size_t
curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
{
char *p = (char *)ptr;
- instanceData *pData = (instanceData*) userdata;
+ wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) userdata;
char *buf;
size_t newlen;
- newlen = pData->replyLen + size*nmemb;
- if((buf = realloc(pData->reply, newlen + 1)) == NULL) {
+ newlen = pWrkrData->replyLen + size*nmemb;
+ if((buf = realloc(pWrkrData->reply, newlen + 1)) == NULL) {
DBGPRINTF("omelasticsearch: realloc failed in curlResult\n");
return 0; /* abort due to failure */
}
- memcpy(buf+pData->replyLen, p, size*nmemb);
- pData->replyLen = newlen;
- pData->reply = buf;
+ memcpy(buf+pWrkrData->replyLen, p, size*nmemb);
+ pWrkrData->replyLen = newlen;
+ pWrkrData->reply = buf;
return size*nmemb;
}
static rsRetVal
-curlSetup(instanceData *pData)
+curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData)
{
HEADER *header;
CURL *handle;
@@ -712,13 +748,13 @@ curlSetup(instanceData *pData)
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
curl_easy_setopt(handle, CURLOPT_POST, 1);
- pData->curlHandle = handle;
- pData->postHeader = header;
+ pWrkrData->curlHandle = handle;
+ pWrkrData->postHeader = header;
if( pData->bulkmode
|| (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) {
/* in this case, we know no tpls are involved in the request-->NULL OK! */
- setCurlURL(pData, NULL);
+ setCurlURL(pWrkrData, pData, NULL);
}
if(Debug) {
@@ -838,16 +874,6 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
- if(pData->bulkmode) {
- pData->batch.currTpl1 = NULL;
- pData->batch.currTpl2 = NULL;
- if((pData->batch.data = es_newStr(1024)) == NULL) {
- DBGPRINTF("omelasticsearch: error creating batch string "
- "turned off bulk mode\n");
- pData->bulkmode = 0; /* at least it works */
- }
- }
-
iNumTpls = 1;
if(pData->dynSrchIdx) ++iNumTpls;
if(pData->dynSrchType) ++iNumTpls;
@@ -939,9 +965,6 @@ CODESTARTnewActInst
pData->searchIndex = (uchar*) strdup("system");
if(pData->searchType == NULL)
pData->searchType = (uchar*) strdup("events");
-
- CHKiRet(curlSetup(pData));
-
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
@@ -979,6 +1002,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_doHUP
diff --git a/plugins/omhiredis/omhiredis.c b/plugins/omhiredis/omhiredis.c
index 757d5eb2..28a1b9cd 100644
--- a/plugins/omhiredis/omhiredis.c
+++ b/plugins/omhiredis/omhiredis.c
@@ -52,14 +52,17 @@ DEFobjCurrIf(errmsg)
* this will be accessable
* via pData */
typedef struct _instanceData {
- redisContext *conn; /* redis connection */
uchar *server; /* redis server address */
int port; /* redis port */
uchar *tplName; /* template name */
- redisReply **replies; /* array to hold replies from redis */
- int count; /* count of command sent for current batch */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ redisContext *conn; /* redis connection */
+ redisReply **replies; /* array to hold replies from redis */
+ int count; /* count of command sent for current batch */
+} wrkrInstanceData_t;
static struct cnfparamdescr actpdescr[] = {
{ "server", eCmdHdlrGetWord, 0 },
@@ -76,6 +79,11 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->conn = NULL; /* Connect later */
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -83,11 +91,11 @@ CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
/* called when closing */
-static void closeHiredis(instanceData *pData)
+static void closeHiredis(wrkrInstanceData_t *pWrkrData)
{
- if(pData->conn != NULL) {
- redisFree(pData->conn);
- pData->conn = NULL;
+ if(pWrkrData->conn != NULL) {
+ redisFree(pWrkrData->conn);
+ pWrkrData->conn = NULL;
}
}
@@ -95,10 +103,15 @@ static void closeHiredis(instanceData *pData)
* TODO: free **replies */
BEGINfreeInstance
CODESTARTfreeInstance
- closeHiredis(pData);
- free(pData->server);
+ if (pData->server != NULL) {
+ free(pData->server);
+ }
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ closeHiredis(pWrkrData);
+ENDfreeWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -106,17 +119,20 @@ CODESTARTdbgPrintInstInfo
ENDdbgPrintInstInfo
/* establish our connection to redis */
-static rsRetVal initHiredis(instanceData *pData, int bSilent)
+static rsRetVal initHiredis(wrkrInstanceData_t *pWrkrData, int bSilent)
{
char *server;
DEFiRet;
- server = (pData->server == NULL) ? "127.0.0.1" : (char*) pData->server;
- DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server, pData->port);
-
+ server = (pWrkrData->pData->server == NULL) ? "127.0.0.1" :
+ (char*) pWrkrData->pData->server;
+ DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server,
+ pWrkrData->pData->port);
+
struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
- pData->conn = redisConnectWithTimeout(server, pData->port, timeout);
- if (pData->conn->err) {
+ pWrkrData->conn = redisConnectWithTimeout(server, pWrkrData->pData->port,
+ timeout);
+ if (pWrkrData->conn->err) {
if(!bSilent)
errmsg.LogError(0, RS_RET_SUSPENDED,
"can not initialize redis handle");
@@ -126,29 +142,29 @@ finalize_it:
RETiRet;
}
-rsRetVal writeHiredis(uchar *message, instanceData *pData)
+rsRetVal writeHiredis(uchar *message, wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- /* if we do not have a redis connection, call
- * initHiredis and try to establish one */
- if(pData->conn == NULL)
- CHKiRet(initHiredis(pData, 0));
+ /* if we do not have a redis connection, call
+ * initHiredis and try to establish one */
+ if(pWrkrData->conn == NULL)
+ CHKiRet(initHiredis(pWrkrData, 0));
- /* try to append the command to the pipeline.
- * REDIS_ERR reply indicates something bad
- * happened, in which case abort. otherwise
- * increase our current pipeline count
- * by 1 and continue. */
+ /* try to append the command to the pipeline.
+ * REDIS_ERR reply indicates something bad
+ * happened, in which case abort. otherwise
+ * increase our current pipeline count
+ * by 1 and continue. */
int rc;
- rc = redisAppendCommand(pData->conn, (char*)message);
+ rc = redisAppendCommand(pWrkrData->conn, (char*)message);
if (rc == REDIS_ERR) {
- errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", pData->conn->errstr);
- dbgprintf("omhiredis: %s\n", pData->conn->errstr);
+ errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
+ dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr);
ABORT_FINALIZE(RS_RET_ERR);
} else {
- pData->count++;
- }
+ pWrkrData->count++;
+ }
finalize_it:
RETiRet;
@@ -158,17 +174,18 @@ finalize_it:
* try to restablish our connection to redis */
BEGINtryResume
CODESTARTtryResume
- if(pData->conn == NULL)
- iRet = initHiredis(pData, 0);
+ if(pWrkrData->conn == NULL)
+ iRet = initHiredis(pWrkrData, 0);
ENDtryResume
-/* begin a transaction. for now does nothing.
+/* begin a transaction.
* if I decide to use MULTI ... EXEC in the
- * fture, this block should send the
+ * future, this block should send the
* MULTI command to redis. */
BEGINbeginTransaction
CODESTARTbeginTransaction
- dbgprintf("omhiredis: beginTransaction called\n");
+ dbgprintf("omhiredis: beginTransaction called\n");
+ pWrkrData->count = 0;
ENDbeginTransaction
/* call writeHiredis for this log line,
@@ -176,8 +193,8 @@ ENDbeginTransaction
* current pipeline */
BEGINdoAction
CODESTARTdoAction
- CHKiRet(writeHiredis(ppString[0], pData));
- iRet = RS_RET_DEFER_COMMIT;
+ CHKiRet(writeHiredis(ppString[0], pWrkrData));
+ iRet = RS_RET_DEFER_COMMIT;
finalize_it:
ENDdoAction
@@ -189,16 +206,15 @@ ENDdoAction
* which should be fixed */
BEGINendTransaction
CODESTARTendTransaction
- dbgprintf("omhiredis: endTransaction called\n");
- int i;
- pData->replies = malloc ( sizeof ( redisReply* ) * pData->count );
- for ( i = 0; i < pData->count; i++ ) {
- redisGetReply ( pData->conn, (void *)&pData->replies[i] );
- /* TODO: add error checking here! */
- freeReplyObject ( pData->replies[i] );
- }
- free ( pData->replies );
- pData->count = 0;
+ dbgprintf("omhiredis: endTransaction called\n");
+ int i;
+ pWrkrData->replies = malloc ( sizeof ( redisReply* ) * pWrkrData->count );
+ for ( i = 0; i < pWrkrData->count; i++ ) {
+ redisGetReply ( pWrkrData->conn, (void *)&pWrkrData->replies[i] );
+ /* TODO: add error checking here! */
+ freeReplyObject ( pWrkrData->replies[i] );
+ }
+ free ( pWrkrData->replies );
ENDendTransaction
/* set defaults. note server is set to NULL
@@ -211,7 +227,6 @@ setInstParamDefaults(instanceData *pData)
pData->server = NULL;
pData->port = 6379;
pData->tplName = NULL;
- pData->count = 0;
}
/* here is where the work to set up a new instance
@@ -281,6 +296,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* supports transaction interface */
ENDqueryEtryPt
@@ -292,9 +308,9 @@ CODESTARTmodInit
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
- if (!bCoreSupportsBatching) {
- errmsg.LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort");
- ABORT_FINALIZE(RS_RET_ERR);
- }
+ if (!bCoreSupportsBatching) {
+ errmsg.LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
DBGPRINTF("omhiredis: module compiled with rsyslog version %s.\n", VERSION);
ENDmodInit
diff --git a/plugins/omjournal/omjournal.c b/plugins/omjournal/omjournal.c
index 160c369d..82fd7bfb 100644
--- a/plugins/omjournal/omjournal.c
+++ b/plugins/omjournal/omjournal.c
@@ -56,6 +56,10 @@ DEF_OMOD_STATIC_DATA
typedef struct _instanceData {
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -91,6 +95,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
@@ -101,6 +110,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINnewActInst
CODESTARTnewActInst
/* Note: we currently do not have any parameters, so we do not need
@@ -172,6 +186,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omlibdbi/omlibdbi.c b/plugins/omlibdbi/omlibdbi.c
index 3beba4f0..c203b4aa 100644
--- a/plugins/omlibdbi/omlibdbi.c
+++ b/plugins/omlibdbi/omlibdbi.c
@@ -50,6 +50,9 @@
#include "errmsg.h"
#include "conf.h"
+#undef HAVE_DBI_TXSUPP
+#warning transaction support disabled in v8 -- TODO: reenable
+
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
MODULE_CNFNAME("omlibdbi")
@@ -73,6 +76,10 @@ typedef struct _instanceData {
int txSupport; /* transaction support */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
uchar *dbiDrvrDir; /* global: where do the dbi drivers reside? */
uchar *drvrName; /* driver to use */
@@ -94,6 +101,8 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
+
/* tables for interfacing with the v6 config system */
/* module-global parameters */
@@ -157,6 +166,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -187,6 +200,9 @@ CODESTARTfreeInstance
free(pData->dbName);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -326,16 +342,16 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- if(pData->conn == NULL) {
- iRet = initConn(pData, 1);
+ if(pWrkrData->pData->conn == NULL) {
+ iRet = initConn(pWrkrData->pData, 1);
}
ENDtryResume
/* transaction support 2013-03 */
BEGINbeginTransaction
CODESTARTbeginTransaction
- if(pData->conn == NULL) {
- CHKiRet(initConn(pData, 0));
+ if(pWrkrData->pData->conn == NULL) {
+ CHKiRet(initConn(pWrkrData->pData, 0));
}
# if HAVE_DBI_TXSUPP
if (pData->txSupport == 1) {
@@ -355,13 +371,15 @@ ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
- CHKiRet(writeDB(ppString[0], pData));
+ pthread_mutex_lock(&mutDoAct);
+ CHKiRet(writeDB(ppString[0], pWrkrData->pData));
# if HAVE_DBI_TXSUPP
if (pData->txSupport == 1) {
iRet = RS_RET_DEFER_COMMIT;
}
# endif
finalize_it:
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
/* transaction support 2013-03 */
@@ -552,6 +570,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
diff --git a/plugins/ommail/ommail.c b/plugins/ommail/ommail.c
index 0a781e10..910b371a 100644
--- a/plugins/ommail/ommail.c
+++ b/plugins/ommail/ommail.c
@@ -13,7 +13,7 @@
*
* File begun on 2008-04-04 by RGerhards
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -82,13 +82,21 @@ typedef struct _instanceData {
uchar *pszSrvPort;
uchar *pszFrom;
toRcpt_t *lstRcpt;
+ } smtp;
+ } md; /* mode-specific data */
+} instanceData;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ union {
+ struct {
char RcvBuf[1024]; /* buffer for receiving server responses */
size_t lenRcvBuf;
size_t iRcvBuf; /* current index into the rcvBuf (buf empty if iRcvBuf == lenRcvBuf) */
int sock; /* socket to this server (most important when we do multiple msgs per mail) */
} smtp;
} md; /* mode-specific data */
-} instanceData;
+} wrkrInstanceData_t;
typedef struct configSettings_s {
toRcpt_t *lstRcpt;
@@ -112,7 +120,7 @@ ENDinitConfVars
/* forward definitions (as few as possible) */
static rsRetVal Send(int sock, char *msg, size_t len);
-static rsRetVal readResponse(instanceData *pData, int *piState, int iExpected);
+static rsRetVal readResponse(wrkrInstanceData_t *pWrkrData, int *piState, int iExpected);
/* helpers for handling the recipient lists */
@@ -163,24 +171,22 @@ finalize_it:
* iStatusToCheck < 0 means no checking should happen
*/
static rsRetVal
-WriteRcpts(instanceData *pData, uchar *pszOp, size_t lenOp, int iStatusToCheck)
+WriteRcpts(wrkrInstanceData_t *pWrkrData, uchar *pszOp, size_t lenOp, int iStatusToCheck)
{
toRcpt_t *pRcpt;
int iState;
DEFiRet;
- assert(pData != NULL);
- assert(pszOp != NULL);
assert(lenOp != 0);
- for(pRcpt = pData->md.smtp.lstRcpt ; pRcpt != NULL ; pRcpt = pRcpt->pNext) {
+ for(pRcpt = pWrkrData->pData->md.smtp.lstRcpt ; pRcpt != NULL ; pRcpt = pRcpt->pNext) {
dbgprintf("Sending '%s: <%s>'\n", pszOp, pRcpt->pszTo);
- CHKiRet(Send(pData->md.smtp.sock, (char*)pszOp, lenOp));
- CHKiRet(Send(pData->md.smtp.sock, ":<", sizeof(":<") - 1));
- CHKiRet(Send(pData->md.smtp.sock, (char*)pRcpt->pszTo, strlen((char*)pRcpt->pszTo)));
- CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pszOp, lenOp));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, ":<", sizeof(":<") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pRcpt->pszTo, strlen((char*)pRcpt->pszTo)));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
if(iStatusToCheck >= 0)
- CHKiRet(readResponse(pData, &iState, iStatusToCheck));
+ CHKiRet(readResponse(pWrkrData, &iState, iStatusToCheck));
}
finalize_it:
@@ -193,6 +199,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -203,17 +214,19 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
if(pData->iMode == 0) {
- if(pData->md.smtp.pszSrv != NULL)
- free(pData->md.smtp.pszSrv);
- if(pData->md.smtp.pszSrvPort != NULL)
- free(pData->md.smtp.pszSrvPort);
- if(pData->md.smtp.pszFrom != NULL)
- free(pData->md.smtp.pszFrom);
+ free(pData->md.smtp.pszSrv);
+ free(pData->md.smtp.pszSrvPort);
+ free(pData->md.smtp.pszFrom);
lstRcptDestruct(pData->md.smtp.lstRcpt);
}
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
printf("mail"); /* TODO: extend! */
@@ -229,16 +242,16 @@ ENDdbgPrintInstInfo
* rgerhards, 2008-04-04
*/
static rsRetVal
-getRcvChar(instanceData *pData, char *pC)
+getRcvChar(wrkrInstanceData_t *pWrkrData, char *pC)
{
DEFiRet;
ssize_t lenBuf;
- assert(pData != NULL);
- if(pData->md.smtp.iRcvBuf == pData->md.smtp.lenRcvBuf) { /* buffer empty? */
+ if(pWrkrData->md.smtp.iRcvBuf == pWrkrData->md.smtp.lenRcvBuf) { /* buffer empty? */
/* yes, we need to read the next server response */
do {
- lenBuf = recv(pData->md.smtp.sock, pData->md.smtp.RcvBuf, sizeof(pData->md.smtp.RcvBuf), 0);
+ lenBuf = recv(pWrkrData->md.smtp.sock, pWrkrData->md.smtp.RcvBuf,
+ sizeof(pWrkrData->md.smtp.RcvBuf), 0);
if(lenBuf == 0) {
ABORT_FINALIZE(RS_RET_NO_MORE_DATA);
} else if(lenBuf < 0) {
@@ -247,15 +260,15 @@ getRcvChar(instanceData *pData, char *pC)
}
} else {
/* good read */
- pData->md.smtp.iRcvBuf = 0;
- pData->md.smtp.lenRcvBuf = lenBuf;
+ pWrkrData->md.smtp.iRcvBuf = 0;
+ pWrkrData->md.smtp.lenRcvBuf = lenBuf;
}
} while(lenBuf < 1);
}
/* when we reach this point, we have a non-empty buffer */
- *pC = pData->md.smtp.RcvBuf[pData->md.smtp.iRcvBuf++];
+ *pC = pWrkrData->md.smtp.RcvBuf[pWrkrData->md.smtp.iRcvBuf++];
finalize_it:
RETiRet;
@@ -266,14 +279,14 @@ finalize_it:
* rgerhards, 2008-04-08
*/
static rsRetVal
-serverDisconnect(instanceData *pData)
+serverDisconnect(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- assert(pData != NULL);
+ assert(pWrkrData != NULL);
- if(pData->md.smtp.sock != -1) {
- close(pData->md.smtp.sock);
- pData->md.smtp.sock = -1;
+ if(pWrkrData->md.smtp.sock != -1) {
+ close(pWrkrData->md.smtp.sock);
+ pWrkrData->md.smtp.sock = -1;
}
RETiRet;
@@ -284,16 +297,17 @@ serverDisconnect(instanceData *pData)
* rgerhards, 2008-04-04
*/
static rsRetVal
-serverConnect(instanceData *pData)
+serverConnect(wrkrInstanceData_t *pWrkrData)
{
struct addrinfo *res = NULL;
struct addrinfo hints;
char *smtpPort;
char *smtpSrv;
char errStr[1024];
-
+ instanceData *pData;
DEFiRet;
- assert(pData != NULL);
+
+ pData = pWrkrData->pData;
if(pData->md.smtp.pszSrv == NULL)
smtpSrv = "127.0.0.1";
@@ -313,12 +327,12 @@ serverConnect(instanceData *pData)
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
- if((pData->md.smtp.sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) == -1) {
+ if((pWrkrData->md.smtp.sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) == -1) {
dbgprintf("couldn't create send socket, reason %s", rs_strerror_r(errno, errStr, sizeof(errStr)));
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
- if(connect(pData->md.smtp.sock, res->ai_addr, res->ai_addrlen) != 0) {
+ if(connect(pWrkrData->md.smtp.sock, res->ai_addr, res->ai_addrlen) != 0) {
dbgprintf("create tcp connection failed, reason %s", rs_strerror_r(errno, errStr, sizeof(errStr)));
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
@@ -328,9 +342,9 @@ finalize_it:
freeaddrinfo(res);
if(iRet != RS_RET_OK) {
- if(pData->md.smtp.sock != -1) {
- close(pData->md.smtp.sock);
- pData->md.smtp.sock = -1;
+ if(pWrkrData->md.smtp.sock != -1) {
+ close(pWrkrData->md.smtp.sock);
+ pWrkrData->md.smtp.sock = -1;
}
}
@@ -374,7 +388,7 @@ finalize_it:
* The body is special in that we must escape a leading dot inside a line
*/
static rsRetVal
-bodySend(instanceData *pData, char *msg, size_t len)
+bodySend(wrkrInstanceData_t *pWrkrData, char *msg, size_t len)
{
DEFiRet;
char szBuf[2048];
@@ -383,12 +397,12 @@ bodySend(instanceData *pData, char *msg, size_t len)
int bHadCR = 0;
int bInStartOfLine = 1;
- assert(pData != NULL);
+ assert(pWrkrData != NULL);
assert(msg != NULL);
for(iSrc = 0 ; iSrc < len ; ++iSrc) {
if(iBuf >= sizeof(szBuf) - 1) { /* one is reserved for our extra dot */
- CHKiRet(Send(pData->md.smtp.sock, szBuf, iBuf));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, szBuf, iBuf));
iBuf = 0;
}
szBuf[iBuf++] = msg[iSrc];
@@ -413,7 +427,7 @@ bodySend(instanceData *pData, char *msg, size_t len)
}
if(iBuf > 0) { /* incomplete buffer to send (the *usual* case)? */
- CHKiRet(Send(pData->md.smtp.sock, szBuf, iBuf));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, szBuf, iBuf));
}
finalize_it:
@@ -424,17 +438,17 @@ finalize_it:
/* read response line from server
*/
static rsRetVal
-readResponseLn(instanceData *pData, char *pLn, size_t lenLn)
+readResponseLn(wrkrInstanceData_t *pWrkrData, char *pLn, size_t lenLn)
{
DEFiRet;
size_t i = 0;
char c;
- assert(pData != NULL);
+ assert(pWrkrData != NULL);
assert(pLn != NULL);
do {
- CHKiRet(getRcvChar(pData, &c));
+ CHKiRet(getRcvChar(pWrkrData, &c));
if(c == '\n')
break;
if(i < (lenLn - 1)) /* if line is too long, we simply discard the rest */
@@ -453,18 +467,18 @@ finalize_it:
* rgerhards, 2008-04-07
*/
static rsRetVal
-readResponse(instanceData *pData, int *piState, int iExpected)
+readResponse(wrkrInstanceData_t *pWrkrData, int *piState, int iExpected)
{
DEFiRet;
int bCont;
char buf[128];
- assert(pData != NULL);
+ assert(pWrkrData != NULL);
assert(piState != NULL);
bCont = 1;
do {
- CHKiRet(readResponseLn(pData, buf, sizeof(buf)));
+ CHKiRet(readResponseLn(pWrkrData, buf, sizeof(buf)));
/* note: the code below is not 100% clean as we may have received less than 4 characters.
* However, as we have a fixed size this will not create a vulnerability. An error will
* also most likely be generated, so it is quite acceptable IMHO -- rgerhards, 2008-04-08
@@ -506,64 +520,65 @@ mkSMTPTimestamp(uchar *pszBuf, size_t lenBuf)
* rgerhards, 2008-04-04
*/
static rsRetVal
-sendSMTP(instanceData *pData, uchar *body, uchar *subject)
+sendSMTP(wrkrInstanceData_t *pWrkrData, uchar *body, uchar *subject)
{
DEFiRet;
int iState; /* SMTP state */
+ instanceData *pData;
uchar szDateBuf[64];
- assert(pData != NULL);
+ pData = pWrkrData->pData;
- CHKiRet(serverConnect(pData));
- CHKiRet(readResponse(pData, &iState, 220));
+ CHKiRet(serverConnect(pWrkrData));
+ CHKiRet(readResponse(pWrkrData, &iState, 220));
- CHKiRet(Send(pData->md.smtp.sock, "HELO ", 5));
- CHKiRet(Send(pData->md.smtp.sock, (char*)glbl.GetLocalHostName(), strlen((char*)glbl.GetLocalHostName())));
- CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1));
- CHKiRet(readResponse(pData, &iState, 250));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "HELO ", 5));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)glbl.GetLocalHostName(), strlen((char*)glbl.GetLocalHostName())));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1));
+ CHKiRet(readResponse(pWrkrData, &iState, 250));
- CHKiRet(Send(pData->md.smtp.sock, "MAIL FROM:<", sizeof("MAIL FROM:<") - 1));
- CHKiRet(Send(pData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom)));
- CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
- CHKiRet(readResponse(pData, &iState, 250));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "MAIL FROM:<", sizeof("MAIL FROM:<") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom)));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
+ CHKiRet(readResponse(pWrkrData, &iState, 250));
- CHKiRet(WriteRcpts(pData, (uchar*)"RCPT TO", sizeof("RCPT TO") - 1, 250));
+ CHKiRet(WriteRcpts(pWrkrData, (uchar*)"RCPT TO", sizeof("RCPT TO") - 1, 250));
- CHKiRet(Send(pData->md.smtp.sock, "DATA\r\n", sizeof("DATA\r\n") - 1));
- CHKiRet(readResponse(pData, &iState, 354));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "DATA\r\n", sizeof("DATA\r\n") - 1));
+ CHKiRet(readResponse(pWrkrData, &iState, 354));
/* now come the data part */
/* header */
mkSMTPTimestamp(szDateBuf, sizeof(szDateBuf));
- CHKiRet(Send(pData->md.smtp.sock, (char*)szDateBuf, strlen((char*)szDateBuf)));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)szDateBuf, strlen((char*)szDateBuf)));
- CHKiRet(Send(pData->md.smtp.sock, "From: <", sizeof("From: <") - 1));
- CHKiRet(Send(pData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom)));
- CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "From: <", sizeof("From: <") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom)));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
- CHKiRet(WriteRcpts(pData, (uchar*)"To", sizeof("To") - 1, -1));
+ CHKiRet(WriteRcpts(pWrkrData, (uchar*)"To", sizeof("To") - 1, -1));
- CHKiRet(Send(pData->md.smtp.sock, "Subject: ", sizeof("Subject: ") - 1));
- CHKiRet(Send(pData->md.smtp.sock, (char*)subject, strlen((char*)subject)));
- CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "Subject: ", sizeof("Subject: ") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)subject, strlen((char*)subject)));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1));
- CHKiRet(Send(pData->md.smtp.sock, "X-Mailer: rsyslog-immail\r\n", sizeof("x-mailer: rsyslog-immail\r\n") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "X-Mailer: rsyslog-immail\r\n", sizeof("x-mailer: rsyslog-immail\r\n") - 1));
- CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); /* indicate end of header */
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); /* indicate end of header */
/* body */
if(pData->bEnableBody)
- CHKiRet(bodySend(pData, (char*)body, strlen((char*) body)));
+ CHKiRet(bodySend(pWrkrData, (char*)body, strlen((char*) body)));
/* end of data, back to envelope transaction */
- CHKiRet(Send(pData->md.smtp.sock, "\r\n.\r\n", sizeof("\r\n.\r\n") - 1));
- CHKiRet(readResponse(pData, &iState, 250));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n.\r\n", sizeof("\r\n.\r\n") - 1));
+ CHKiRet(readResponse(pWrkrData, &iState, 250));
- CHKiRet(Send(pData->md.smtp.sock, "QUIT\r\n", sizeof("QUIT\r\n") - 1));
- CHKiRet(readResponse(pData, &iState, 221));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "QUIT\r\n", sizeof("QUIT\r\n") - 1));
+ CHKiRet(readResponse(pWrkrData, &iState, 221));
/* we are finished, a new connection is created for each request, so let's close it now */
- CHKiRet(serverDisconnect(pData));
+ CHKiRet(serverDisconnect(pWrkrData));
finalize_it:
RETiRet;
@@ -583,8 +598,8 @@ finalize_it:
*/
BEGINtryResume
CODESTARTtryResume
- CHKiRet(serverConnect(pData));
- CHKiRet(serverDisconnect(pData)); /* if we fail, we will never reach this line */
+ CHKiRet(serverConnect(pWrkrData));
+ CHKiRet(serverDisconnect(pWrkrData)); /* if we fail, we will never reach this line */
finalize_it:
if(iRet == RS_RET_IO_ERROR)
iRet = RS_RET_SUSPENDED;
@@ -593,17 +608,14 @@ ENDtryResume
BEGINdoAction
CODESTARTdoAction
- dbgprintf(" Mail\n");
+ DBGPRINTF(" Mail\n");
- /* forward */
- if(pData->bHaveSubject)
- iRet = sendSMTP(pData, ppString[0], ppString[1]);
- else
- iRet = sendSMTP(pData, ppString[0], (uchar*)"message from rsyslog");
+ iRet = sendSMTP(pWrkrData, ppString[0],
+ (pWrkrData->pData->bHaveSubject) ?
+ ppString[1] : (uchar*)"message from rsyslog");
if(iRet != RS_RET_OK) {
- /* error! */
- dbgprintf("error sending mail, suspending\n");
+ DBGPRINTF("error sending mail, suspending\n");
iRet = RS_RET_SUSPENDED;
}
ENDdoAction
@@ -689,6 +701,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
ENDqueryEtryPt
diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c
index af1f5a37..09f19768 100644
--- a/plugins/ommongodb/ommongodb.c
+++ b/plugins/ommongodb/ommongodb.c
@@ -4,7 +4,7 @@
* mongodb C interface is crap. Obtain the library here:
* https://github.com/algernon/libmongo-client
*
- * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -71,6 +71,10 @@ typedef struct _instanceData {
int bErrMsgPermitted; /* only one errmsg permitted per connection */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
@@ -89,10 +93,16 @@ static struct cnfparamblk actpblk =
actpdescr
};
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
+
BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
/* use this to specify if select features are supported by this
@@ -126,6 +136,10 @@ CODESTARTfreeInstance
free(pData->tplName);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -422,14 +436,17 @@ error:
BEGINtryResume
CODESTARTtryResume
- if(pData->conn == NULL) {
- iRet = initMongoDB(pData, 1);
+ if(pWrkrData->pData->conn == NULL) {
+ iRet = initMongoDB(pWrkrData->pData, 1);
}
ENDtryResume
BEGINdoAction
bson *doc = NULL;
+ instanceData *pData;
CODESTARTdoAction
+ pthread_mutex_lock(&mutDoAct);
+ pData = pWrkrData->pData;
/* see if we are ready to proceed */
if(pData->conn == NULL) {
CHKiRet(initMongoDB(pData, 0));
@@ -454,6 +471,7 @@ CODESTARTdoAction
}
finalize_it:
+ pthread_mutex_unlock(&mutDoAct);
if(doc != NULL)
bson_free(doc);
ENDdoAction
@@ -560,6 +578,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c
index 49079ab1..c004d1c6 100644
--- a/plugins/ommysql/ommysql.c
+++ b/plugins/ommysql/ommysql.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-07-20 by RGerhards (extracted from syslogd.c)
*
- * Copyright 2007-2012 Adiscon GmbH.
+ * Copyright 2007-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -55,18 +55,22 @@ DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
typedef struct _instanceData {
- MYSQL *f_hmysql; /* handle to MySQL */
char dbsrv[MAXHOSTNAMELEN+1]; /* IP or hostname of DB server*/
unsigned int dbsrvPort; /* port of MySQL server */
char dbname[_DB_MAXDBLEN+1]; /* DB name */
char dbuid[_DB_MAXUNAMELEN+1]; /* DB user */
char dbpwd[_DB_MAXPWDLEN+1]; /* DB user's password */
- unsigned uLastMySQLErrno; /* last errno returned by MySQL or 0 if all is well */
- uchar * f_configfile; /* MySQL Client Configuration File */
- uchar * f_configsection; /* MySQL Client Configuration Section */
- uchar *tplName; /* format template to use */
+ uchar *configfile; /* MySQL Client Configuration File */
+ uchar *configsection; /* MySQL Client Configuration Section */
+ uchar *tplName; /* format template to use */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ MYSQL *hmysql; /* handle to MySQL */
+ unsigned uLastMySQLErrno; /* last errno returned by MySQL or 0 if all is well */
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
int iSrvPort; /* database server port */
uchar *pszMySQLConfigFile; /* MySQL Client Configuration File */
@@ -104,6 +108,12 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->hmysql = NULL;
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -115,25 +125,28 @@ ENDisCompatibleWithFeature
* MySQL connection.
* Initially added 2004-10-28
*/
-static void closeMySQL(instanceData *pData)
+static void closeMySQL(wrkrInstanceData_t *pWrkrData)
{
- ASSERT(pData != NULL);
-
- if(pData->f_hmysql != NULL) { /* just to be on the safe side... */
- mysql_close(pData->f_hmysql);
- pData->f_hmysql = NULL;
+ if(pWrkrData->hmysql != NULL) { /* just to be on the safe side... */
+ mysql_close(pWrkrData->hmysql);
+ pWrkrData->hmysql = NULL;
}
}
BEGINfreeInstance
CODESTARTfreeInstance
- free(pData->f_configfile);
- free(pData->f_configsection);
+ free(pData->configfile);
+ free(pData->configsection);
free(pData->tplName);
- closeMySQL(pData);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ closeMySQL(pWrkrData);
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
/* nothing special here */
@@ -144,25 +157,23 @@ ENDdbgPrintInstInfo
* We check if we have a valid MySQL handle. If not, we simply
* report an error, but can not be specific. RGerhards, 2007-01-30
*/
-static void reportDBError(instanceData *pData, int bSilent)
+static void reportDBError(wrkrInstanceData_t *pWrkrData, int bSilent)
{
char errMsg[512];
unsigned uMySQLErrno;
- ASSERT(pData != NULL);
-
/* output log message */
errno = 0;
- if(pData->f_hmysql == NULL) {
+ if(pWrkrData->hmysql == NULL) {
errmsg.LogError(0, NO_ERRCODE, "unknown DB error occured - could not obtain MySQL handle");
} else { /* we can ask mysql for the error description... */
- uMySQLErrno = mysql_errno(pData->f_hmysql);
+ uMySQLErrno = mysql_errno(pWrkrData->hmysql);
snprintf(errMsg, sizeof(errMsg)/sizeof(char), "db error (%d): %s\n", uMySQLErrno,
- mysql_error(pData->f_hmysql));
- if(bSilent || uMySQLErrno == pData->uLastMySQLErrno)
+ mysql_error(pWrkrData->hmysql));
+ if(bSilent || uMySQLErrno == pWrkrData->uLastMySQLErrno)
dbgprintf("mysql, DBError(silent): %s\n", errMsg);
else {
- pData->uLastMySQLErrno = uMySQLErrno;
+ pWrkrData->uLastMySQLErrno = uMySQLErrno;
errmsg.LogError(0, NO_ERRCODE, "%s", errMsg);
}
}
@@ -175,25 +186,26 @@ static void reportDBError(instanceData *pData, int bSilent)
* MySQL connection.
* Initially added 2004-10-28 mmeckelein
*/
-static rsRetVal initMySQL(instanceData *pData, int bSilent)
+static rsRetVal initMySQL(wrkrInstanceData_t *pWrkrData, int bSilent)
{
+ instanceData *pData;
DEFiRet;
- ASSERT(pData != NULL);
- ASSERT(pData->f_hmysql == NULL);
- pData->f_hmysql = mysql_init(NULL);
- if(pData->f_hmysql == NULL) {
+ ASSERT(pWrkrData->hmysql == NULL);
+ pData = pWrkrData->pData;
+ pWrkrData->hmysql = mysql_init(NULL);
+ if(pWrkrData->hmysql == NULL) {
errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MySQL handle");
iRet = RS_RET_SUSPENDED;
} else { /* we could get the handle, now on with work... */
- mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_GROUP,((pData->f_configsection!=NULL)?(char*)pData->f_configsection:"client"));
- if(pData->f_configfile!=NULL){
+ mysql_options(pWrkrData->hmysql,MYSQL_READ_DEFAULT_GROUP,((pData->configsection!=NULL)?(char*)pData->configsection:"client"));
+ if(pData->configfile!=NULL){
FILE * fp;
- fp=fopen((char*)pData->f_configfile,"r");
+ fp=fopen((char*)pData->configfile,"r");
int err=errno;
if(fp==NULL){
char msg[512];
- snprintf(msg,sizeof(msg)/sizeof(char),"Could not open '%s' for reading",pData->f_configfile);
+ snprintf(msg,sizeof(msg)/sizeof(char),"Could not open '%s' for reading",pData->configfile);
if(bSilent) {
char errStr[512];
rs_strerror_r(err, errStr, sizeof(errStr));
@@ -202,17 +214,17 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent)
errmsg.LogError(err,NO_ERRCODE,"mysql configuration error: %s\n",msg);
} else {
fclose(fp);
- mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_FILE,pData->f_configfile);
+ mysql_options(pWrkrData->hmysql,MYSQL_READ_DEFAULT_FILE,pData->configfile);
}
}
/* Connect to database */
- if(mysql_real_connect(pData->f_hmysql, pData->dbsrv, pData->dbuid,
+ if(mysql_real_connect(pWrkrData->hmysql, pData->dbsrv, pData->dbuid,
pData->dbpwd, pData->dbname, pData->dbsrvPort, NULL, 0) == NULL) {
- reportDBError(pData, bSilent);
- closeMySQL(pData); /* ignore any error we may get */
+ reportDBError(pWrkrData, bSilent);
+ closeMySQL(pWrkrData); /* ignore any error we may get */
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
- mysql_autocommit(pData->f_hmysql, 0);
+ mysql_autocommit(pWrkrData->hmysql, 0);
}
finalize_it:
@@ -224,35 +236,32 @@ finalize_it:
* to an established MySQL session.
* Initially added 2004-10-28 mmeckelein
*/
-rsRetVal writeMySQL(uchar *psz, instanceData *pData)
+rsRetVal writeMySQL(wrkrInstanceData_t *pWrkrData, uchar *psz)
{
DEFiRet;
- ASSERT(psz != NULL);
- ASSERT(pData != NULL);
-
/* see if we are ready to proceed */
- if(pData->f_hmysql == NULL) {
- CHKiRet(initMySQL(pData, 0));
+ if(pWrkrData->hmysql == NULL) {
+ CHKiRet(initMySQL(pWrkrData, 0));
}
/* try insert */
- if(mysql_query(pData->f_hmysql, (char*)psz)) {
+ if(mysql_query(pWrkrData->hmysql, (char*)psz)) {
/* error occured, try to re-init connection and retry */
- closeMySQL(pData); /* close the current handle */
- CHKiRet(initMySQL(pData, 0)); /* try to re-open */
- if(mysql_query(pData->f_hmysql, (char*)psz)) { /* re-try insert */
+ closeMySQL(pWrkrData); /* close the current handle */
+ CHKiRet(initMySQL(pWrkrData, 0)); /* try to re-open */
+ if(mysql_query(pWrkrData->hmysql, (char*)psz)) { /* re-try insert */
/* we failed, giving up for now */
- reportDBError(pData, 0);
- closeMySQL(pData); /* free ressources */
+ reportDBError(pWrkrData, 0);
+ closeMySQL(pWrkrData); /* free ressources */
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
}
finalize_it:
if(iRet == RS_RET_OK) {
- pData->uLastMySQLErrno = 0; /* reset error for error supression */
+ pWrkrData->uLastMySQLErrno = 0; /* reset error for error supression */
}
RETiRet;
@@ -261,28 +270,28 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- if(pData->f_hmysql == NULL) {
- iRet = initMySQL(pData, 1);
+ if(pWrkrData->hmysql == NULL) {
+ iRet = initMySQL(pWrkrData, 1);
}
ENDtryResume
BEGINbeginTransaction
CODESTARTbeginTransaction
- CHKiRet(writeMySQL((uchar*)"START TRANSACTION", pData));
+ CHKiRet(writeMySQL(pWrkrData, (uchar*)"START TRANSACTION"));
finalize_it:
ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
dbgprintf("\n");
- CHKiRet(writeMySQL(ppString[0], pData));
+ CHKiRet(writeMySQL(pWrkrData, ppString[0]));
iRet = RS_RET_DEFER_COMMIT;
finalize_it:
ENDdoAction
BEGINendTransaction
CODESTARTendTransaction
- if (mysql_commit(pData->f_hmysql) != 0) {
+ if(mysql_commit(pWrkrData->hmysql) != 0) {
dbgprintf("mysql server error: transaction not committed\n");
iRet = RS_RET_SUSPENDED;
}
@@ -293,10 +302,9 @@ static inline void
setInstParamDefaults(instanceData *pData)
{
pData->dbsrvPort = 0;
- pData->f_configfile = NULL;
- pData->f_configsection = NULL;
+ pData->configfile = NULL;
+ pData->configsection = NULL;
pData->tplName = NULL;
- pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */
}
@@ -338,9 +346,9 @@ CODESTARTnewActInst
strncpy(pData->dbpwd, cstr, sizeof(pData->dbpwd));
free(cstr);
} else if(!strcmp(actpblk.descr[i].name, "mysqlconfig.file")) {
- pData->f_configfile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ pData->configfile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "mysqlconfig.section")) {
- pData->f_configsection = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ pData->configsection = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
@@ -424,9 +432,8 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
} else {
pData->dbsrvPort = (unsigned) cs.iSrvPort; /* set configured port */
- pData->f_configfile = cs.pszMySQLConfigFile;
- pData->f_configsection = cs.pszMySQLConfigSection;
- pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */
+ pData->configfile = cs.pszMySQLConfigFile;
+ pData->configsection = cs.pszMySQLConfigSection;
}
CODE_STD_FINALIZERparseSelectorAct
@@ -446,6 +453,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
ENDqueryEtryPt
@@ -484,7 +492,7 @@ CODEmodInit_QueryRegCFSLineHdlr
mysql_server_init(0, NULL, NULL)
# endif
) {
- errmsg.LogError(0, NO_ERRCODE, "ommysql: mysql_server_init() failed, plugin "
+ errmsg.LogError(0, NO_ERRCODE, "ommysql: intializing mysql client failed, plugin "
"can not run");
ABORT_FINALIZE(RS_RET_ERR);
}
diff --git a/plugins/ompgsql/ompgsql.c b/plugins/ompgsql/ompgsql.c
index 11f346f6..87599484 100644
--- a/plugins/ompgsql/ompgsql.c
+++ b/plugins/ompgsql/ompgsql.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-10-18 by sur5r (converted from ommysql.c)
*
- * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007, 2013 Rainer Gerhards and Adiscon GmbH.
*
* The following link my be useful for the not-so-postgres literate
* when setting up a test environment (on Fedora):
@@ -66,11 +66,17 @@ typedef struct _instanceData {
ConnStatusType eLastPgSQLStatus; /* last status from postgres */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
EMPTY_STRUCT
} configSettings_t;
static configSettings_t __attribute__((unused)) cs;
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
+
BEGINinitConfVars /* (re)set config variables to default values */
CODESTARTinitConfVars
ENDinitConfVars
@@ -82,6 +88,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -108,6 +118,9 @@ CODESTARTfreeInstance
closePgSQL(pData);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -244,8 +257,8 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- if(pData->f_hpgsql == NULL) {
- iRet = initPgSQL(pData, 1);
+ if(pWrkrData->pData->f_hpgsql == NULL) {
+ iRet = initPgSQL(pWrkrData->pData, 1);
if(iRet == RS_RET_OK) {
/* the code above seems not to actually connect to the database. As such, we do a
* dummy statement (a pointless select...) to verify the connection and return
@@ -253,7 +266,7 @@ CODESTARTtryResume
* PostgreSQL expert, so any patch that does the desired result in a more
* intelligent way is highly welcome. -- rgerhards, 2009-12-16
*/
- iRet = writePgSQL((uchar*)"select 'a' as a", pData);
+ iRet = writePgSQL((uchar*)"select 'a' as a", pWrkrData->pData);
}
}
@@ -263,23 +276,25 @@ ENDtryResume
BEGINbeginTransaction
CODESTARTbeginTransaction
dbgprintf("ompgsql: beginTransaction\n");
- iRet = writePgSQL((uchar*) "begin", pData); /* TODO: make user-configurable */
+ iRet = writePgSQL((uchar*) "begin", pWrkrData->pData); /* TODO: make user-configurable */
ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
+ pthread_mutex_lock(&mutDoAct);
dbgprintf("\n");
- CHKiRet(writePgSQL(ppString[0], pData));
+ CHKiRet(writePgSQL(ppString[0], pWrkrData->pData));
if(bCoreSupportsBatching)
iRet = RS_RET_DEFER_COMMIT;
finalize_it:
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
BEGINendTransaction
CODESTARTendTransaction
- iRet = writePgSQL((uchar*) "commit;", pData); /* TODO: make user-configurable */
+ iRet = writePgSQL((uchar*) "commit;", pWrkrData->pData); /* TODO: make user-configurable */
dbgprintf("ompgsql: endTransaction\n");
ENDendTransaction
@@ -361,6 +376,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
ENDqueryEtryPt
@@ -372,6 +388,11 @@ INITLegCnfVars
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
+
+# warning: transaction support missing for v8
+ bCoreSupportsBatching= 0;
+ DBGPRINTF("ompgsql: transactions are not yet supported on v8\n");
+
DBGPRINTF("ompgsql: module compiled with rsyslog version %s.\n", VERSION);
DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not ");
ENDmodInit
diff --git a/plugins/omprog/omprog.c b/plugins/omprog/omprog.c
index cd07dcfb..1060b5c3 100644
--- a/plugins/omprog/omprog.c
+++ b/plugins/omprog/omprog.c
@@ -6,7 +6,7 @@
*
* File begun on 2009-04-01 by RGerhards
*
- * Copyright 2009-2012 Adiscon GmbH.
+ * Copyright 2009-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -35,6 +35,7 @@
#include <errno.h>
#include <unistd.h>
#include <wait.h>
+#include <pthread.h>
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
@@ -60,8 +61,13 @@ typedef struct _instanceData {
int fdPipe; /* file descriptor to write to */
int bIsRunning; /* is binary currently running? 0-no, 1-yes */
int iParams; /* Holds the count of parameters if set*/
+ pthread_mutex_t mut; /* make sure only one instance is active */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
uchar *szBinary; /* name of binary to call */
} configSettings_t;
@@ -89,8 +95,13 @@ ENDinitConfVars
BEGINcreateInstance
CODESTARTcreateInstance
+ pthread_mutex_init(&pData->mut, NULL);
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -102,6 +113,7 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
int i;
CODESTARTfreeInstance
+ pthread_mutex_destroy(&pData->mut);
if(pData->szBinary != NULL)
free(pData->szBinary);
if(pData->aParams != NULL) {
@@ -112,6 +124,10 @@ CODESTARTfreeInstance
}
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -287,22 +303,21 @@ writePipe(instanceData *pData, uchar *szMsg)
lenWrite = strlen((char*)szMsg);
writeOffset = 0;
- do
- {
+ do {
lenWritten = write(pData->fdPipe, ((char*)szMsg)+writeOffset, lenWrite);
if(lenWritten == -1) {
switch(errno) {
- case EPIPE:
- DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n",
- pData->szBinary);
- CHKiRet(cleanup(pData));
- CHKiRet(tryRestart(pData));
- break;
- default:
- DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno,
- rs_strerror_r(errno, errStr, sizeof(errStr)));
- ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE);
- break;
+ case EPIPE:
+ DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n",
+ pData->szBinary);
+ CHKiRet(cleanup(pData));
+ CHKiRet(tryRestart(pData));
+ break;
+ default:
+ DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno,
+ rs_strerror_r(errno, errStr, sizeof(errStr)));
+ ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE);
+ break;
}
} else {
writeOffset += lenWritten;
@@ -316,7 +331,10 @@ finalize_it:
BEGINdoAction
+ instanceData *pData;
CODESTARTdoAction
+ pData = pWrkrData->pData;
+ pthread_mutex_lock(&pData->mut);
if(pData->bIsRunning == 0) {
openPipe(pData);
}
@@ -325,6 +343,7 @@ CODESTARTdoAction
if(iRet != RS_RET_OK)
iRet = RS_RET_SUSPENDED;
+ pthread_mutex_unlock(&pData->mut);
ENDdoAction
@@ -496,6 +515,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c
index 34511e46..3b1584e2 100644
--- a/plugins/omrelp/omrelp.c
+++ b/plugins/omrelp/omrelp.c
@@ -2,8 +2,17 @@
*
* This is the implementation of the RELP output module.
*
- * NOTE: read comments in module-template.h to understand how this file
- * works!
+ * Note that when multiple action workers are activated, we currently
+ * also create multiple actions. This may be the source of some mild
+ * message loss (!) if the worker instance is shut down while the
+ * connection to the remote system is in retry state.
+ * TODO: think if we should implement a mode where we do NOT
+ * support multiple action worker instances. This would be
+ * slower, but not have this loss opportunity. But it should
+ * definitely be optional and by default off due to the
+ * performance implications (and given the fact that message
+ * loss is pretty unlikely in usual cases).
+ *
*
* File begun on 2008-03-13 by RGerhards
*
@@ -63,13 +72,9 @@ static relpEngine_t *pRelpEngine; /* our relp engine */
typedef struct _instanceData {
uchar *target;
uchar *port;
- int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
- int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
int sizeWindow; /**< the RELP window size - 0=use default */
unsigned timeout;
unsigned rebindInterval;
- unsigned nSent;
- relpClt_t *pRelpClt; /* relp client for this instance */
sbool bEnableTLS;
sbool bEnableTLSZip;
sbool bHadAuthFail; /**< set on auth failure, will cause retry to disable action */
@@ -85,11 +90,20 @@ typedef struct _instanceData {
} permittedPeers;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
+ int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
+ relpClt_t *pRelpClt; /* relp client for this instance */
+ unsigned nSent; /* number msgs sent - for rebind support */
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
EMPTY_STRUCT
} configSettings_t;
static configSettings_t __attribute__((unused)) cs;
+static rsRetVal doCreateRelpClient(wrkrInstanceData_t *pWrkrData);
/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
@@ -135,10 +149,10 @@ static uchar *getRelpPt(instanceData *pData)
static void
onErr(void *pUsr, char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
{
- instanceData *pData = (instanceData*) pUsr;
+ wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) pUsr;
errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: error '%s', object "
" '%s' - action may not work as intended",
- pData->target, pData->port, errmesg, objinfo);
+ pWrkrData->pData->target, pWrkrData->pData->port, errmesg, objinfo);
}
static void
@@ -152,55 +166,58 @@ onGenericErr(char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal er
static void
onAuthErr(void *pUsr, char *authinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
{
- instanceData *pData = (instanceData*) pUsr;
+ instanceData *pData = ((wrkrInstanceData_t*) pUsr)->pData;
errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: authentication error '%s', peer "
"is '%s' - DISABLING action", pData->target, pData->port, errmesg, authinfo);
pData->bHadAuthFail = 1;
}
-static inline rsRetVal
-doCreateRelpClient(instanceData *pData)
+static rsRetVal
+doCreateRelpClient(wrkrInstanceData_t *pWrkrData)
{
int i;
+ instanceData *pData;
DEFiRet;
- if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK)
+
+ pData = pWrkrData->pData;
+ if(relpEngineCltConstruct(pRelpEngine, &pWrkrData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK)
+ if(relpCltSetTimeout(pWrkrData->pRelpClt, pData->timeout) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetWindowSize(pData->pRelpClt, pData->sizeWindow) != RELP_RET_OK)
+ if(relpCltSetWindowSize(pWrkrData->pRelpClt, pData->sizeWindow) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetUsrPtr(pData->pRelpClt, pData) != RELP_RET_OK)
+ if(relpCltSetUsrPtr(pWrkrData->pRelpClt, pWrkrData) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
if(pData->bEnableTLS) {
- if(relpCltEnableTLS(pData->pRelpClt) != RELP_RET_OK)
+ if(relpCltEnableTLS(pWrkrData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
if(pData->bEnableTLSZip) {
- if(relpCltEnableTLSZip(pData->pRelpClt) != RELP_RET_OK)
+ if(relpCltEnableTLSZip(pWrkrData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
}
- if(relpCltSetGnuTLSPriString(pData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
+ if(relpCltSetGnuTLSPriString(pWrkrData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetAuthMode(pData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) {
+ if(relpCltSetAuthMode(pWrkrData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) {
errmsg.LogError(0, RS_RET_RELP_ERR,
"omrelp: invalid auth mode '%s'\n", pData->authmode);
ABORT_FINALIZE(RS_RET_RELP_ERR);
}
- if(relpCltSetCACert(pData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
+ if(relpCltSetCACert(pWrkrData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetOwnCert(pData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
+ if(relpCltSetOwnCert(pWrkrData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetPrivKey(pData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
+ if(relpCltSetPrivKey(pWrkrData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
for(i = 0 ; i < pData->permittedPeers.nmemb ; ++i) {
- relpCltAddPermittedPeer(pData->pRelpClt, (char*)pData->permittedPeers.name[i]);
+ relpCltAddPermittedPeer(pWrkrData->pRelpClt, (char*)pData->permittedPeers.name[i]);
}
}
if(glbl.GetSourceIPofLocalClient() == NULL) { /* ar Do we have a client IP set? */
- if(relpCltSetClientIP(pData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
+ if(relpCltSetClientIP(pWrkrData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
}
- pData->bInitialConnect = 1;
- pData->nSent = 0;
+ pWrkrData->bInitialConnect = 1;
+ pWrkrData->nSent = 0;
finalize_it:
RETiRet;
}
@@ -221,11 +238,15 @@ CODESTARTcreateInstance
pData->permittedPeers.nmemb = 0;
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->pRelpClt = NULL;
+ iRet = doCreateRelpClient(pWrkrData);
+ENDcreateWrkrInstance
+
BEGINfreeInstance
int i;
CODESTARTfreeInstance
- if(pData->pRelpClt != NULL)
- relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt);
free(pData->target);
free(pData->port);
free(pData->tplName);
@@ -239,6 +260,12 @@ CODESTARTfreeInstance
}
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ if(pWrkrData->pRelpClt != NULL)
+ relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt);
+ENDfreeWrkrInstance
+
static inline void
setInstParamDefaults(instanceData *pData)
{
@@ -318,8 +345,6 @@ CODESTARTnewActInst
"RSYSLOG_ForwardFormat" : (char*)pData->tplName),
OMSR_NO_RQD_TPL_OPTS));
- CHKiRet(doCreateRelpClient(pData));
-
CODE_STD_FINALIZERnewActInst
if(pvals != NULL)
cnfparamvalsDestruct(pvals, &actpblk);
@@ -347,22 +372,23 @@ ENDdbgPrintInstInfo
/* try to connect to server
* rgerhards, 2008-03-21
*/
-static rsRetVal doConnect(instanceData *pData)
+static rsRetVal doConnect(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- if(pData->bInitialConnect) {
- iRet = relpCltConnect(pData->pRelpClt, glbl.GetDefPFFamily(), pData->port, pData->target);
+ if(pWrkrData->bInitialConnect) {
+ iRet = relpCltConnect(pWrkrData->pRelpClt, glbl.GetDefPFFamily(),
+ pWrkrData->pData->port, pWrkrData->pData->target);
if(iRet == RELP_RET_OK)
- pData->bInitialConnect = 0;
+ pWrkrData->bInitialConnect = 0;
} else {
- iRet = relpCltReconnect(pData->pRelpClt);
+ iRet = relpCltReconnect(pWrkrData->pRelpClt);
}
if(iRet == RELP_RET_OK) {
- pData->bIsConnected = 1;
+ pWrkrData->bIsConnected = 1;
} else {
- pData->bIsConnected = 0;
+ pWrkrData->bIsConnected = 0;
iRet = RS_RET_SUSPENDED;
}
@@ -372,21 +398,21 @@ static rsRetVal doConnect(instanceData *pData)
BEGINtryResume
CODESTARTtryResume
- if(pData->bHadAuthFail) {
+ if(pWrkrData->pData->bHadAuthFail) {
ABORT_FINALIZE(RS_RET_DISABLE_ACTION);
}
- iRet = doConnect(pData);
+ iRet = doConnect(pWrkrData);
finalize_it:
ENDtryResume
static inline rsRetVal
-doRebind(instanceData *pData)
+doRebind(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n");
- CHKiRet(relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt));
- pData->bIsConnected = 0;
- CHKiRet(doCreateRelpClient(pData));
+ CHKiRet(relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt));
+ pWrkrData->bIsConnected = 0;
+ CHKiRet(doCreateRelpClient(pWrkrData));
finalize_it:
RETiRet;
}
@@ -394,10 +420,10 @@ finalize_it:
BEGINbeginTransaction
CODESTARTbeginTransaction
dbgprintf("omrelp: beginTransaction\n");
- if(!pData->bIsConnected) {
- CHKiRet(doConnect(pData));
+ if(!pWrkrData->bIsConnected) {
+ CHKiRet(doConnect(pWrkrData));
}
- relpCltHintBurstBegin(pData->pRelpClt);
+ relpCltHintBurstBegin(pWrkrData->pRelpClt);
finalize_it:
ENDbeginTransaction
@@ -405,11 +431,13 @@ BEGINdoAction
uchar *pMsg; /* temporary buffering */
size_t lenMsg;
relpRetVal ret;
+ instanceData *pData;
CODESTARTdoAction
+ pData = pWrkrData->pData;
dbgprintf(" %s:%s/RELP\n", pData->target, getRelpPt(pData));
- if(!pData->bIsConnected) {
- CHKiRet(doConnect(pData));
+ if(!pWrkrData->bIsConnected) {
+ CHKiRet(doConnect(pWrkrData));
}
pMsg = ppString[0];
@@ -420,7 +448,7 @@ CODESTARTdoAction
lenMsg = glbl.GetMaxLine();
/* forward */
- ret = relpCltSendSyslog(pData->pRelpClt, (uchar*) pMsg, lenMsg);
+ ret = relpCltSendSyslog(pWrkrData->pRelpClt, (uchar*) pMsg, lenMsg);
if(ret != RELP_RET_OK) {
/* error! */
dbgprintf("error forwarding via relp, suspending\n");
@@ -428,8 +456,8 @@ CODESTARTdoAction
}
if(pData->rebindInterval != 0 &&
- (++pData->nSent >= pData->rebindInterval)) {
- doRebind(pData);
+ (++pWrkrData->nSent >= pData->rebindInterval)) {
+ doRebind(pWrkrData);
}
finalize_it:
if(pData->bHadAuthFail)
@@ -448,7 +476,7 @@ ENDdoAction
BEGINendTransaction
CODESTARTendTransaction
dbgprintf("omrelp: endTransaction\n");
- relpCltHintBurstEnd(pData->pRelpClt);
+ relpCltHintBurstEnd(pWrkrData->pRelpClt);
ENDendTransaction
BEGINparseSelectorAct
@@ -527,8 +555,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* process template */
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) "RSYSLOG_ForwardFormat"));
- CHKiRet(doCreateRelpClient(pData));
-
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -546,6 +572,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES
diff --git a/plugins/omruleset/omruleset.c b/plugins/omruleset/omruleset.c
index 11765507..73419915 100644
--- a/plugins/omruleset/omruleset.c
+++ b/plugins/omruleset/omruleset.c
@@ -10,7 +10,7 @@
*
* File begun on 2009-11-02 by RGerhards
*
- * Copyright 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2009-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -70,6 +70,10 @@ typedef struct _instanceData {
uchar *pszRulesetName; /* primarily for debugging/display purposes */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
ruleset_t *pRuleset; /* ruleset to enqueue message to (NULL = Default, not recommended) */
uchar *pszRulesetName;
@@ -87,11 +91,21 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINfreeInstance
CODESTARTfreeInstance
free(pData->pszRulesetName);
@@ -117,9 +131,9 @@ BEGINdoAction
CODESTARTdoAction
CHKmalloc(pMsg = MsgDup((msg_t*) ppString[0]));
DBGPRINTF(":omruleset: forwarding message %p to ruleset %s[%p]\n", pMsg,
- (char*) pData->pszRulesetName, pData->pRuleset);
+ (char*) pWrkrData->pData->pszRulesetName, pWrkrData->pData->pRuleset);
MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY);
- MsgSetRuleset(pMsg, pData->pRuleset);
+ MsgSetRuleset(pMsg, pWrkrData->pData->pRuleset);
/* Note: we intentionally use submitMsg2() here, as we process messages
* that were already run through the rate-limiter. So it is (at least)
* questionable if they were rate-limited again.
@@ -199,6 +213,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omsnmp/omsnmp.c b/plugins/omsnmp/omsnmp.c
index 42d1de6b..02862c94 100644
--- a/plugins/omsnmp/omsnmp.c
+++ b/plugins/omsnmp/omsnmp.c
@@ -2,7 +2,7 @@
*
* This module sends an snmp trap.
*
- * Copyright 2007-2012 Adiscon GmbH.
+ * Copyright 2007-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -74,15 +74,19 @@ typedef struct _instanceData {
* http://www.adiscon.org/download/ADISCON-MONITORWARE-MIB.txt
* http://www.adiscon.org/download/ADISCON-MIB.txt
*/
- int iPort; /* Target Port */
- int iSNMPVersion; /* SNMP Version to use */
- int iTrapType; /* Snmp TrapType or GenericType */
- int iSpecificType; /* Snmp Specific Type */
+ int iPort; /* Target Port */
+ int iSNMPVersion; /* SNMP Version to use */
+ int iTrapType; /* Snmp TrapType or GenericType */
+ int iSpecificType; /* Snmp Specific Type */
- netsnmp_session *snmpsession; /* Holds to SNMP Session, NULL if not initialized */
- uchar *tplName; /* format template to use */
+ uchar *tplName; /* format template to use */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ netsnmp_session *snmpsession; /* Holds to SNMP Session, NULL if not initialized */
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
uchar* pszTransport; /* default transport */
uchar* pszTarget;
@@ -147,6 +151,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->snmpsession = NULL;
+ENDcreateWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -171,14 +179,16 @@ ENDisCompatibleWithFeature
/* Exit SNMP Session
* alorbach, 2008-02-12
*/
-static rsRetVal omsnmp_exitSession(instanceData *pData)
+static rsRetVal
+omsnmp_exitSession(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- if(pData->snmpsession != NULL) {
- dbgprintf( "omsnmp_exitSession: Clearing Session to '%s' on Port = '%d'\n", pData->szTarget, pData->iPort);
- snmp_close(pData->snmpsession);
- pData->snmpsession = NULL;
+ if(pWrkrData->snmpsession != NULL) {
+ DBGPRINTF("omsnmp_exitSession: Clearing Session to '%s' on Port = '%d'\n",
+ pWrkrData->pData->szTarget, pWrkrData->pData->iPort);
+ snmp_close(pWrkrData->snmpsession);
+ pWrkrData->snmpsession = NULL;
}
RETiRet;
@@ -187,15 +197,19 @@ static rsRetVal omsnmp_exitSession(instanceData *pData)
/* Init SNMP Session
* alorbach, 2008-02-12
*/
-static rsRetVal omsnmp_initSession(instanceData *pData)
+static rsRetVal
+omsnmp_initSession(wrkrInstanceData_t *pWrkrData)
{
netsnmp_session session;
+ instanceData *pData;
char szTargetAndPort[MAXHOSTNAMELEN+128]; /* work buffer for specifying a full target and port string */
DEFiRet;
/* should not happen, but if session is not cleared yet - we do it now! */
- if (pData->snmpsession != NULL)
- omsnmp_exitSession(pData);
+ if (pWrkrData->snmpsession != NULL)
+ omsnmp_exitSession(pWrkrData);
+
+ pData = pWrkrData->pData;
snprintf((char*)szTargetAndPort, sizeof(szTargetAndPort), "%s:%s:%d",
(pData->szTransport == NULL) ? "udp" : (char*)pData->szTransport,
@@ -217,8 +231,8 @@ static rsRetVal omsnmp_initSession(instanceData *pData)
session.community_len = strlen((char*) session.community);
}
- pData->snmpsession = snmp_open(&session);
- if (pData->snmpsession == NULL) {
+ pWrkrData->snmpsession = snmp_open(&session);
+ if (pWrkrData->snmpsession == NULL) {
errmsg.LogError(0, RS_RET_SUSPENDED, "omsnmp_initSession: snmp_open to host '%s' on Port '%d' failed\n", pData->szTarget, pData->iPort);
/* Stay suspended */
iRet = RS_RET_SUSPENDED;
@@ -227,7 +241,7 @@ static rsRetVal omsnmp_initSession(instanceData *pData)
RETiRet;
}
-static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz)
+static rsRetVal omsnmp_sendsnmp(wrkrInstanceData_t *pWrkrData, uchar *psz)
{
DEFiRet;
@@ -239,10 +253,12 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz)
int status;
char *trap = NULL;
const char *strErr = NULL;
+ instanceData *pData;
+ pData = pWrkrData->pData;
/* Init SNMP Session if necessary */
- if (pData->snmpsession == NULL) {
- CHKiRet(omsnmp_initSession(pData));
+ if (pWrkrData->snmpsession == NULL) {
+ CHKiRet(omsnmp_initSession(pWrkrData));
}
/* String should not be NULL */
@@ -250,7 +266,7 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz)
dbgprintf( "omsnmp_sendsnmp: ENTER - Syslogmessage = '%s'\n", (char*)psz);
/* If SNMP Version1 is configured !*/
- if(pData->snmpsession->version == SNMP_VERSION_1) {
+ if(pWrkrData->snmpsession->version == SNMP_VERSION_1) {
pdu = snmp_pdu_create(SNMP_MSG_TRAP);
/* Set enterprise */
@@ -275,7 +291,7 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz)
pdu->time = get_uptime();
}
/* If SNMP Version2c is configured !*/
- else if (pData->snmpsession->version == SNMP_VERSION_2c)
+ else if (pWrkrData->snmpsession->version == SNMP_VERSION_2c)
{
long sysuptime;
char csysuptime[20];
@@ -320,15 +336,15 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz)
}
/* Send the TRAP */
- status = snmp_send(pData->snmpsession, pdu) == 0;
+ status = snmp_send(pWrkrData->snmpsession, pdu) == 0;
if (status)
{
/* Debug Output! */
- int iErrorCode = pData->snmpsession->s_snmp_errno;
+ int iErrorCode = pWrkrData->snmpsession->s_snmp_errno;
errmsg.LogError(0, RS_RET_SUSPENDED, "omsnmp_sendsnmp: snmp_send failed error '%d', Description='%s'\n", iErrorCode*(-1), api_errors[iErrorCode*(-1)]);
/* Clear Session */
- omsnmp_exitSession(pData);
+ omsnmp_exitSession(pWrkrData);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
@@ -347,7 +363,7 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- iRet = omsnmp_initSession(pData);
+ iRet = omsnmp_initSession(pWrkrData);
ENDtryResume
BEGINdoAction
@@ -358,19 +374,20 @@ CODESTARTdoAction
}
/* This will generate and send the SNMP Trap */
- iRet = omsnmp_sendsnmp(pData, ppString[0]);
+ iRet = omsnmp_sendsnmp(pWrkrData, ppString[0]);
finalize_it:
ENDdoAction
BEGINfreeInstance
CODESTARTfreeInstance
- /* free snmp Session here */
- omsnmp_exitSession(pData);
-
free(pData->tplName);
free(pData->szTarget);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ omsnmp_exitSession(pWrkrData);
+ENDfreeWrkrInstance
static inline void
setInstParamDefaults(instanceData *pData)
@@ -499,9 +516,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* Set some defaults in the NetSNMP library */
netsnmp_ds_set_int(NETSNMP_DS_LIBRARY_ID, NETSNMP_DS_LIB_DEFAULT_PORT, pData->iPort );
-
- /* Init Session Pointer */
- pData->snmpsession = NULL;
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -545,6 +559,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omstdout/omstdout.c b/plugins/omstdout/omstdout.c
index a84a7593..210b0165 100644
--- a/plugins/omstdout/omstdout.c
+++ b/plugins/omstdout/omstdout.c
@@ -6,7 +6,7 @@
*
* File begun on 2009-03-19 by RGerhards
*
- * Copyright 2009-2012 Adiscon GmbH.
+ * Copyright 2009-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -60,6 +60,10 @@ typedef struct _instanceData {
int bEnsureLFEnding; /* ensure that a linefeed is written at the end of EACH record (test aid for nettester) */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
int bUseArrayInterface; /* shall action use array instead of string template interface? */
int bEnsureLFEnding; /* shall action use array instead of string template interface? */
@@ -76,6 +80,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -88,6 +97,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
ENDdbgPrintInstInfo
@@ -107,7 +121,7 @@ BEGINdoAction
size_t len;
int r;
CODESTARTdoAction
- if(pData->bUseArrayInterface) {
+ if(pWrkrData->pData->bUseArrayInterface) {
/* if we use array passing, we need to put together a string
* ourselves. At this point, please keep in mind that omstdout is
* primarily a testing aid. Other modules may do different processing
@@ -145,7 +159,7 @@ CODESTARTdoAction
DBGPRINTF("omstdout: error %d writing to stdout[%d]: %s\n",
r, len, toWrite);
}
- if(pData->bEnsureLFEnding && toWrite[len-1] != '\n') {
+ if(pWrkrData->pData->bEnsureLFEnding && toWrite[len-1] != '\n') {
if((r = write(1, "\n", 1)) != 1) { /* write missing LF */
DBGPRINTF("omstdout: error %d writing \\n to stdout\n",
r);
@@ -186,6 +200,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omtesting/omtesting.c b/plugins/omtesting/omtesting.c
index c9f1e06b..2cc1159e 100644
--- a/plugins/omtesting/omtesting.c
+++ b/plugins/omtesting/omtesting.c
@@ -22,7 +22,7 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -63,7 +63,6 @@ MODULE_CNFNAME("omtesting")
*/
DEF_OMOD_STATIC_DATA
-
typedef struct _instanceData {
enum { MD_SLEEP, MD_FAIL, MD_RANDFAIL, MD_ALWAYS_SUSPEND }
mode;
@@ -76,6 +75,10 @@ typedef struct _instanceData {
int iCurrRetries;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
int bEchoStdout; /* echo non-failed messages to stdout */
} configSettings_t;
@@ -93,6 +96,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
dbgprintf("Action delays rule by %d second(s) and %d millisecond(s)\n",
@@ -170,11 +178,11 @@ static rsRetVal doRandFail(void)
BEGINtryResume
CODESTARTtryResume
dbgprintf("omtesting tryResume() called\n");
- switch(pData->mode) {
+ switch(pWrkrData->pData->mode) {
case MD_SLEEP:
break;
case MD_FAIL:
- iRet = doFailOnResume(pData);
+ iRet = doFailOnResume(pWrkrData->pData);
break;
case MD_RANDFAIL:
iRet = doRandFail();
@@ -187,8 +195,10 @@ ENDtryResume
BEGINdoAction
+ instanceData *pData;
CODESTARTdoAction
dbgprintf("omtesting received msg '%s'\n", ppString[0]);
+ pData = pWrkrData->pData;
switch(pData->mode) {
case MD_SLEEP:
iRet = doSleep(pData);
@@ -220,6 +230,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINparseSelectorAct
int i;
uchar szBuf[1024];
@@ -313,6 +328,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omudpspoof/omudpspoof.c b/plugins/omudpspoof/omudpspoof.c
index cb907bba..ad3508c8 100644
--- a/plugins/omudpspoof/omudpspoof.c
+++ b/plugins/omudpspoof/omudpspoof.c
@@ -98,15 +98,19 @@ typedef struct _instanceData {
uchar *port;
uchar *sourceTpl;
int mtu;
- int *pSockArray; /* sockets to use for UDP */
- struct addrinfo *f_addr;
- u_short sourcePort;
u_short sourcePortStart; /* for sorce port iteration */
u_short sourcePortEnd;
int bReportLibnetInitErr; /* help prevent multiple error messages on init err */
+} instanceData;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
libnet_t *libnet_handle;
+ u_short sourcePort;
+ int *pSockArray; /* sockets to use for UDP */
+ struct addrinfo *f_addr;
char errbuf[LIBNET_ERRBUF_SIZE];
-} instanceData;
+} wrkrInstanceData_t;
#define DFLT_SOURCE_PORT_START 32000
#define DFLT_SOURCE_PORT_END 42000
@@ -172,7 +176,7 @@ ENDinitConfVars
pthread_mutex_t mutLibnet;
/* forward definitions */
-static rsRetVal doTryResume(instanceData *pData);
+static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData);
/* this function gets the default template. It coordinates action between
@@ -217,15 +221,14 @@ finalize_it:
* rgerhards, 2009-05-29
*/
static rsRetVal
-closeUDPSockets(instanceData *pData)
+closeUDPSockets(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- assert(pData != NULL);
- if(pData->pSockArray != NULL) {
- net.closeUDPListenSockets(pData->pSockArray);
- pData->pSockArray = NULL;
- freeaddrinfo(pData->f_addr);
- pData->f_addr = NULL;
+ if(pWrkrData->pSockArray != NULL) {
+ net.closeUDPListenSockets(pWrkrData->pSockArray);
+ pWrkrData->pSockArray = NULL;
+ freeaddrinfo(pWrkrData->f_addr);
+ pWrkrData->f_addr = NULL;
}
RETiRet;
}
@@ -310,12 +313,17 @@ ENDfreeCnf
BEGINcreateInstance
CODESTARTcreateInstance
- pData->libnet_handle = NULL;
pData->mtu = 1500;
pData->bReportLibnetInitErr = 1;
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->libnet_handle = NULL;
+ pWrkrData->sourcePort = pData->sourcePortStart;
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -326,15 +334,19 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
/* final cleanup */
- closeUDPSockets(pData);
free(pData->tplName);
free(pData->port);
free(pData->host);
free(pData->sourceTpl);
- if(pData->libnet_handle != NULL)
- libnet_destroy(pData->libnet_handle);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ closeUDPSockets(pWrkrData);
+ if(pWrkrData->libnet_handle != NULL)
+ libnet_destroy(pWrkrData->libnet_handle);
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -348,11 +360,12 @@ ENDdbgPrintInstInfo
* rgehards, 2007-12-20
*/
static inline rsRetVal
-UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
+UDPSend(wrkrInstanceData_t *pWrkrData, uchar *pszSourcename, char *msg, size_t len)
{
struct addrinfo *r;
int lsent = 0;
int bSendSuccess;
+ instanceData *pData;
struct sockaddr_in *tempaddr,source_ip;
libnet_ptag_t ip, ipo;
libnet_ptag_t udp;
@@ -363,9 +376,10 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
unsigned maxPktLen, pktLen;
DEFiRet;
- if(pData->pSockArray == NULL) {
- CHKiRet(doTryResume(pData));
+ if(pWrkrData->pSockArray == NULL) {
+ CHKiRet(doTryResume(pWrkrData));
}
+ pData = pWrkrData->pData;
if(len > 65528) {
DBGPRINTF("omudpspoof: msg with length %d truncated to 64k: '%.768s'\n",
@@ -374,8 +388,8 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
}
ip = ipo = udp = 0;
- if(pData->sourcePort++ >= pData->sourcePortEnd){
- pData->sourcePort = pData->sourcePortStart;
+ if(pWrkrData->sourcePort++ >= pData->sourcePortEnd){
+ pWrkrData->sourcePort = pData->sourcePortStart;
}
inet_pton(AF_INET, (char*)pszSourcename, &(source_ip.sin_addr));
@@ -383,7 +397,7 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
bSendSuccess = RSFALSE;
d_pthread_mutex_lock(&mutLibnet);
bNeedUnlock = 1;
- for (r = pData->f_addr; r && bSendSuccess == RSFALSE ; r = r->ai_next) {
+ for (r = pWrkrData->f_addr; r && bSendSuccess == RSFALSE ; r = r->ai_next) {
tempaddr = (struct sockaddr_in *)r->ai_addr;
/* Getting max payload size (must be multiple of 8) */
maxPktLen = (pData->mtu - LIBNET_IPV4_H) & ~0x07;
@@ -400,19 +414,19 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
}
DBGPRINTF("omudpspoof: stage 1: MF:%d, hdrOffs %d, pktLen %d\n",
(hdrOffs & IP_MF) >> 13, (hdrOffs & 0x1FFF) << 3, pktLen);
- libnet_clear_packet(pData->libnet_handle);
+ libnet_clear_packet(pWrkrData->libnet_handle);
/* note: libnet does need ports in host order NOT in network byte order! -- rgerhards, 2009-11-12 */
udp = libnet_build_udp(
- ntohs(pData->sourcePort),/* source port */
+ ntohs(pWrkrData->sourcePort),/* source port */
ntohs(tempaddr->sin_port),/* destination port */
pktLen+LIBNET_UDP_H, /* packet length */
0, /* checksum */
(u_char*)msg, /* payload */
pktLen, /* payload size */
- pData->libnet_handle, /* libnet handle */
+ pWrkrData->libnet_handle, /* libnet handle */
udp); /* libnet id */
if (udp == -1) {
- DBGPRINTF("omudpspoof: can't build UDP header: %s\n", libnet_geterror(pData->libnet_handle));
+ DBGPRINTF("omudpspoof: can't build UDP header: %s\n", libnet_geterror(pWrkrData->libnet_handle));
}
ip = libnet_build_ipv4(
@@ -427,22 +441,22 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
tempaddr->sin_addr.s_addr,
NULL, /* payload */
0, /* payload size */
- pData->libnet_handle, /* libnet handle */
+ pWrkrData->libnet_handle, /* libnet handle */
ip); /* libnet id */
if (ip == -1) {
- DBGPRINTF("omudpspoof: can't build IP header: %s\n", libnet_geterror(pData->libnet_handle));
+ DBGPRINTF("omudpspoof: can't build IP header: %s\n", libnet_geterror(pWrkrData->libnet_handle));
}
/* Write it to the wire. */
- lsent = libnet_write(pData->libnet_handle);
+ lsent = libnet_write(pWrkrData->libnet_handle);
if(lsent != (int) (LIBNET_IPV4_H+LIBNET_UDP_H+pktLen)) {
/* note: access to fd is a libnet internal. If a newer version of libnet does
* not expose that member, we should simply remove it. However, while it is there
* it is useful for consolidating with strace output.
*/
DBGPRINTF("omudpspoof: write error (total len %d): pktLen %d, sent %d, fd %d: %s\n",
- len, LIBNET_IPV4_H+LIBNET_UDP_H+pktLen, lsent, pData->libnet_handle->fd,
- libnet_geterror(pData->libnet_handle));
+ len, LIBNET_IPV4_H+LIBNET_UDP_H+pktLen, lsent, pWrkrData->libnet_handle->fd,
+ libnet_geterror(pWrkrData->libnet_handle));
if(lsent != -1) {
bSendSuccess = RSTRUE;
}
@@ -452,7 +466,7 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
msgOffs += pktLen;
/* We need to get rid of the UDP header to build the other fragments */
- libnet_clear_packet(pData->libnet_handle);
+ libnet_clear_packet(pWrkrData->libnet_handle);
ip = LIBNET_PTAG_INITIALIZER;
while(len > msgOffs ) { /* loop until all payload is sent */
/* check if there will be more fragments */
@@ -481,16 +495,16 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
tempaddr->sin_addr.s_addr,
(uint8_t*)(msg+msgOffs), /* payload */
pktLen, /* payload size */
- pData->libnet_handle, /* libnet handle */
+ pWrkrData->libnet_handle, /* libnet handle */
ip); /* libnet id */
if (ip == -1) {
- DBGPRINTF("omudpspoof: can't build IP fragment header: %s\n", libnet_geterror(pData->libnet_handle));
+ DBGPRINTF("omudpspoof: can't build IP fragment header: %s\n", libnet_geterror(pWrkrData->libnet_handle));
}
/* Write it to the wire. */
- lsent = libnet_write(pData->libnet_handle);
+ lsent = libnet_write(pWrkrData->libnet_handle);
if(lsent != (int) (LIBNET_IPV4_H+pktLen)) {
DBGPRINTF("omudpspoof: fragment write error len %d, sent %d: %s\n",
- LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pData->libnet_handle));
+ LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pWrkrData->libnet_handle));
bSendSuccess = RSFALSE;
continue;
}
@@ -500,9 +514,9 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
finalize_it:
if(iRet != RS_RET_OK) {
- if(pData->libnet_handle != NULL) {
- libnet_destroy(pData->libnet_handle);
- pData->libnet_handle = NULL;
+ if(pWrkrData->libnet_handle != NULL) {
+ libnet_destroy(pWrkrData->libnet_handle);
+ pWrkrData->libnet_handle = NULL;
}
}
if(bNeedUnlock) {
@@ -515,26 +529,28 @@ finalize_it:
/* try to resume connection if it is not ready
* rgerhards, 2007-08-02
*/
-static rsRetVal doTryResume(instanceData *pData)
+static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData)
{
int iErr;
struct addrinfo *res;
struct addrinfo hints;
+ instanceData *pData;
DEFiRet;
- if(pData->pSockArray != NULL)
+ if(pWrkrData->pSockArray != NULL)
FINALIZE;
+ pData = pWrkrData->pData;
- if(pData->libnet_handle == NULL) {
+ if(pWrkrData->libnet_handle == NULL) {
/* Initialize the libnet library. Root priviledges are required.
* this initializes a IPv4 socket to use for forging UDP packets.
*/
- pData->libnet_handle = libnet_init(
+ pWrkrData->libnet_handle = libnet_init(
LIBNET_RAW4, /* injection type */
NULL, /* network interface */
- pData->errbuf); /* errbuf */
+ pWrkrData->errbuf); /* errbuf */
- if(pData->libnet_handle == NULL) {
+ if(pWrkrData->libnet_handle == NULL) {
if(pData->bReportLibnetInitErr) {
errmsg.LogError(0, RS_RET_ERR_LIBNET_INIT, "omudpsoof: error "
"initializing libnet - are you running as root?");
@@ -559,14 +575,14 @@ static rsRetVal doTryResume(instanceData *pData)
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
DBGPRINTF("%s found, resuming.\n", pData->host);
- pData->f_addr = res;
- pData->pSockArray = net.create_udp_socket((uchar*)pData->host, NULL, 0, 0);
+ pWrkrData->f_addr = res;
+ pWrkrData->pSockArray = net.create_udp_socket((uchar*)pData->host, NULL, 0, 0);
finalize_it:
if(iRet != RS_RET_OK) {
- if(pData->f_addr != NULL) {
- freeaddrinfo(pData->f_addr);
- pData->f_addr = NULL;
+ if(pWrkrData->f_addr != NULL) {
+ freeaddrinfo(pWrkrData->f_addr);
+ pWrkrData->f_addr = NULL;
}
iRet = RS_RET_SUSPENDED;
}
@@ -577,7 +593,7 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- iRet = doTryResume(pData);
+ iRet = doTryResume(pWrkrData);
ENDtryResume
BEGINdoAction
@@ -585,10 +601,10 @@ BEGINdoAction
unsigned l;
int iMaxLine;
CODESTARTdoAction
- CHKiRet(doTryResume(pData));
+ CHKiRet(doTryResume(pWrkrData));
- DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%.256s'\n", pData->host,
- getFwdPt(pData), ppString[1], ppString[0]);
+ DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%.256s'\n", pWrkrData->pData->host,
+ getFwdPt(pWrkrData->pData), ppString[1], ppString[0]);
iMaxLine = glbl.GetMaxLine();
psz = (char*) ppString[0];
@@ -596,7 +612,7 @@ CODESTARTdoAction
if((int) l > iMaxLine)
l = iMaxLine;
- CHKiRet(UDPSend(pData, ppString[1], psz, l));
+ CHKiRet(UDPSend(pWrkrData, ppString[1], psz, l));
finalize_it:
ENDdoAction
@@ -660,7 +676,6 @@ CODESTARTnewActInst
}
}
CODE_STD_STRING_REQUESTnewActInst(2)
- pData->sourcePort = pData->sourcePortStart;
tplToUse = ustrdup((pData->tplName == NULL) ? getDfltTpl() : pData->tplName);
CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_NO_RQD_TPL_OPTS));
@@ -699,7 +714,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(2)
else
CHKmalloc(pData->port = ustrdup(cs.pszTargetPort));
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(sourceTpl), OMSR_NO_RQD_TPL_OPTS));
- pData->sourcePort = pData->sourcePortStart = cs.iSourcePortStart;
+ pData->sourcePortStart = cs.iSourcePortStart;
pData->sourcePortEnd = cs.iSourcePortEnd;
/* process template */
@@ -740,6 +755,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
diff --git a/plugins/omuxsock/omuxsock.c b/plugins/omuxsock/omuxsock.c
index 583b9f94..da4e8e94 100644
--- a/plugins/omuxsock/omuxsock.c
+++ b/plugins/omuxsock/omuxsock.c
@@ -4,7 +4,7 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * Copyright 2010-2012 Adiscon GmbH.
+ * Copyright 2010-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -60,10 +60,14 @@ typedef struct _instanceData {
permittedPeers_t *pPermPeers;
uchar *sockName;
int sock;
- int bIsConnected; /* are we connected to remote host? 0 - no, 1 - yes, UDP means addr resolved */
struct sockaddr_un addr;
} instanceData;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* config data */
typedef struct configSettings_s {
uchar *tplName; /* name of the default template to use */
@@ -90,6 +94,7 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
BEGINinitConfVars /* (re)set config variables to default values */
CODESTARTinitConfVars
@@ -147,7 +152,6 @@ closeSocket(instanceData *pData)
close(pData->sock);
pData->sock = INVLD_SOCK;
}
-pData->bIsConnected = 0; // TODO: remove this variable altogether
RETiRet;
}
@@ -224,6 +228,10 @@ CODESTARTcreateInstance
pData->sock = INVLD_SOCK;
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -239,6 +247,10 @@ CODESTARTfreeInstance
free(pData->sockName);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -332,7 +344,7 @@ static rsRetVal doTryResume(instanceData *pData)
BEGINtryResume
CODESTARTtryResume
- iRet = doTryResume(pData);
+ iRet = doTryResume(pWrkrData->pData);
ENDtryResume
BEGINdoAction
@@ -340,20 +352,22 @@ BEGINdoAction
register unsigned l;
int iMaxLine;
CODESTARTdoAction
- CHKiRet(doTryResume(pData));
+ pthread_mutex_lock(&mutDoAct);
+ CHKiRet(doTryResume(pWrkrData->pData));
iMaxLine = glbl.GetMaxLine();
- DBGPRINTF(" omuxsock:%s\n", pData->sockName);
+ DBGPRINTF(" omuxsock:%s\n", pWrkrData->pData->sockName);
psz = (char*) ppString[0];
l = strlen((char*) psz);
if((int) l > iMaxLine)
l = iMaxLine;
- CHKiRet(sendMsg(pData, psz, l));
+ CHKiRet(sendMsg(pWrkrData->pData, psz, l));
finalize_it:
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
@@ -413,6 +427,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
ENDqueryEtryPt