summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/mmanon/mmanon.c3
-rw-r--r--plugins/mmfields/mmfields.c15
-rw-r--r--plugins/mmjsonparse/mmjsonparse.c45
-rw-r--r--plugins/mmnormalize/mmnormalize.c21
-rw-r--r--plugins/mmpstrucdata/mmpstrucdata.c15
-rw-r--r--plugins/mmsequence/mmsequence.c34
-rw-r--r--plugins/mmutf8fix/mmutf8fix.c21
-rw-r--r--plugins/omhiredis/omhiredis.c5
-rw-r--r--plugins/omlibdbi/omlibdbi.c29
-rw-r--r--plugins/ommail/ommail.c191
-rw-r--r--plugins/ommongodb/ommongodb.c25
-rw-r--r--plugins/ompgsql/ompgsql.c35
-rw-r--r--plugins/omprog/omprog.c48
-rw-r--r--plugins/omrelp/omrelp.c135
-rw-r--r--plugins/omsnmp/omsnmp.c81
-rw-r--r--plugins/omudpspoof/omudpspoof.c130
-rw-r--r--plugins/omuxsock/omuxsock.c29
17 files changed, 560 insertions, 302 deletions
diff --git a/plugins/mmanon/mmanon.c b/plugins/mmanon/mmanon.c
index c714706a..28797807 100644
--- a/plugins/mmanon/mmanon.c
+++ b/plugins/mmanon/mmanon.c
@@ -367,12 +367,11 @@ BEGINdoAction
int lenMsg;
int i;
CODESTARTdoAction
- pData = pWrkrData->pData;
pMsg = (msg_t*) ppString[0];
lenMsg = getMSGLen(pMsg);
msg = getMSG(pMsg);
for(i = 0 ; i < lenMsg ; ++i) {
- anonip(pData, msg, &lenMsg, &i);
+ anonip(pWrkrData->pData, msg, &lenMsg, &i);
}
if(lenMsg != getMSGLen(pMsg))
setMSGLen(pMsg, lenMsg);
diff --git a/plugins/mmfields/mmfields.c b/plugins/mmfields/mmfields.c
index fa7fa100..c408a6c9 100644
--- a/plugins/mmfields/mmfields.c
+++ b/plugins/mmfields/mmfields.c
@@ -56,6 +56,10 @@ typedef struct _instanceData {
uchar *jsonRoot; /**< container where to store fields */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -103,6 +107,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -114,6 +122,10 @@ CODESTARTfreeInstance
free(pData->jsonRoot);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
static inline void
setInstParamDefaults(instanceData *pData)
@@ -232,7 +244,7 @@ CODESTARTdoAction
pMsg = (msg_t*) ppString[0];
lenMsg = getMSGLen(pMsg);
msg = getMSG(pMsg);
- CHKiRet(parse_fields(pData, pMsg, msg, lenMsg));
+ CHKiRet(parse_fields(pWrkrData->pData, pMsg, msg, lenMsg));
finalize_it:
ENDdoAction
@@ -259,6 +271,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmjsonparse/mmjsonparse.c b/plugins/mmjsonparse/mmjsonparse.c
index b16aef0e..9c0ab88f 100644
--- a/plugins/mmjsonparse/mmjsonparse.c
+++ b/plugins/mmjsonparse/mmjsonparse.c
@@ -58,9 +58,15 @@ DEFobjCurrIf(errmsg);
DEF_OMOD_STATIC_DATA
typedef struct _instanceData {
- struct json_tokener *tokener;
+ int dummy; /* not needed, but some compilers do not support empty structs */
+ /* REMOVE dummy when real data items are to be added! */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ struct json_tokener *tokener;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -94,14 +100,18 @@ ENDfreeCnf
BEGINcreateInstance
CODESTARTcreateInstance
- pData->tokener = json_tokener_new();
- if(pData->tokener == NULL) {
+ENDcreateInstance
+
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->tokener = json_tokener_new();
+ if(pWrkrData->tokener == NULL) {
errmsg.LogError(0, RS_RET_ERR, "error: could not create json "
- "tokener, cannot activate action");
+ "tokener, cannot activate instance");
ABORT_FINALIZE(RS_RET_ERR);
}
finalize_it:
-ENDcreateInstance
+ENDcreateWrkrInstance
BEGINisCompatibleWithFeature
@@ -111,10 +121,14 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
- if(pData->tokener != NULL)
- json_tokener_free(pData->tokener);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ if(pWrkrData->tokener != NULL)
+ json_tokener_free(pWrkrData->tokener);
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -128,28 +142,28 @@ ENDtryResume
static rsRetVal
-processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf)
+processJSON(wrkrInstanceData_t *pWrkrData, msg_t *pMsg, char *buf, size_t lenBuf)
{
struct json_object *json;
const char *errMsg;
DEFiRet;
- assert(pData->tokener != NULL);
+ assert(pWrkrData->tokener != NULL);
DBGPRINTF("mmjsonparse: toParse: '%s'\n", buf);
- json_tokener_reset(pData->tokener);
+ json_tokener_reset(pWrkrData->tokener);
- json = json_tokener_parse_ex(pData->tokener, buf, lenBuf);
+ json = json_tokener_parse_ex(pWrkrData->tokener, buf, lenBuf);
if(Debug) {
errMsg = NULL;
if(json == NULL) {
enum json_tokener_error err;
- err = pData->tokener->err;
+ err = pWrkrData->tokener->err;
if(err != json_tokener_continue)
errMsg = json_tokener_errors[err];
else
errMsg = "Unterminated input";
- } else if((size_t)pData->tokener->char_offset < lenBuf)
+ } else if((size_t)pWrkrData->tokener->char_offset < lenBuf)
errMsg = "Extra characters after JSON object";
else if(!json_object_is_type(json, json_type_object))
errMsg = "JSON value is not an object";
@@ -159,7 +173,7 @@ processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf)
}
}
if(json == NULL
- || ((size_t)pData->tokener->char_offset < lenBuf)
+ || ((size_t)pWrkrData->tokener->char_offset < lenBuf)
|| (!json_object_is_type(json, json_type_object))) {
ABORT_FINALIZE(RS_RET_NO_CEE_MSG);
}
@@ -194,7 +208,7 @@ CODESTARTdoAction
ABORT_FINALIZE(RS_RET_NO_CEE_MSG);
}
buf += LEN_COOKIE;
- CHKiRet(processJSON(pData, pMsg, (char*) buf, strlen((char*)buf)));
+ CHKiRet(processJSON(pWrkrData, pMsg, (char*) buf, strlen((char*)buf)));
bSuccess = 1;
finalize_it:
if(iRet == RS_RET_NO_CEE_MSG) {
@@ -257,6 +271,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmnormalize/mmnormalize.c b/plugins/mmnormalize/mmnormalize.c
index 7e25824a..f82836b1 100644
--- a/plugins/mmnormalize/mmnormalize.c
+++ b/plugins/mmnormalize/mmnormalize.c
@@ -9,7 +9,7 @@
*
* File begun on 2010-01-01 by RGerhards
*
- * Copyright 2010-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2010-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -70,6 +70,10 @@ typedef struct _instanceData {
ee_ctx ctxee; /**< context to be used for libee */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
uchar *rulebase; /**< name of normalization rulebase to use */
int bUseRawMsg; /**< use %rawmsg% instead of %msg% */
@@ -139,6 +143,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
loadModConf = pModConf;
@@ -181,6 +190,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
dbgprintf("mmnormalize\n");
@@ -207,14 +221,14 @@ CODESTARTdoAction
* requires changes to the libraries. For now, we accept message
* duplication. -- rgerhards, 2010-12-01
*/
- if(pData->bUseRawMsg) {
+ if(pWrkrData->pData->bUseRawMsg) {
getRawMsg(pMsg, &buf, &len);
} else {
buf = getMSG(pMsg);
len = getMSGLen(pMsg);
}
str = es_newStrFromCStr((char*)buf, len);
- r = ln_normalize(pData->ctxln, str, &event);
+ r = ln_normalize(pWrkrData->pData->ctxln, str, &event);
if(r != 0) {
DBGPRINTF("error %d during ln_normalize\n", r);
MsgSetParseSuccess(pMsg, 0);
@@ -338,6 +352,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmpstrucdata/mmpstrucdata.c b/plugins/mmpstrucdata/mmpstrucdata.c
index 4b2a985b..e9e36b27 100644
--- a/plugins/mmpstrucdata/mmpstrucdata.c
+++ b/plugins/mmpstrucdata/mmpstrucdata.c
@@ -52,6 +52,10 @@ typedef struct _instanceData {
uchar *jsonRoot; /**< container where to store fields */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -98,6 +102,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -109,6 +117,10 @@ CODESTARTfreeInstance
free(pData->jsonRoot);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
static inline void
setInstParamDefaults(instanceData *pData)
@@ -357,7 +369,7 @@ dbgprintf("DDDD: parse mmpstrucdata\n");
/* don't check return code - we never want rsyslog to retry
* or suspend this action!
*/
- parse_sd(pData, pMsg);
+ parse_sd(pWrkrData->pData, pMsg);
dbgprintf("DDDD: done parse mmpstrucdata\n");
finalize_it:
ENDdoAction
@@ -385,6 +397,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmsequence/mmsequence.c b/plugins/mmsequence/mmsequence.c
index 20a85370..609338c2 100644
--- a/plugins/mmsequence/mmsequence.c
+++ b/plugins/mmsequence/mmsequence.c
@@ -74,6 +74,10 @@ typedef struct _instanceData {
char *pszVar;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -100,6 +104,8 @@ static struct cnfparamblk actpblk =
/* table for key-counter pairs */
static struct hashtable *ght;
static pthread_mutex_t ght_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static pthread_mutex_t inst_mutex = PTHREAD_MUTEX_INITIALIZER;
BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
@@ -129,6 +135,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -139,6 +149,10 @@ BEGINfreeInstance
CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
static inline void
setInstParamDefaults(instanceData *pData)
@@ -243,7 +257,7 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_ERR);
}
}
- pthread_mutex_unlock(&ght_mutex);
+ pthread_mutex_unlock(&ght_mutex);
break;
default:
errmsg.LogError(0, RS_RET_INVLD_MODE,
@@ -303,7 +317,9 @@ BEGINdoAction
struct json_object *json;
int val = 0;
int *pCounter;
+ instanceData *pData;
CODESTARTdoAction
+ pData = pWrkrData->pData;
pMsg = (msg_t*) ppString[0];
switch(pData->mode) {
@@ -312,12 +328,19 @@ CODESTARTdoAction
(pData->valueTo - pData->valueFrom));
break;
case mmSequencePerInstance:
- if (pData->value >= pData->valueTo - pData->step) {
- pData->value = pData->valueFrom;
+ if (!pthread_mutex_lock(&inst_mutex)) {
+ pCounter = getCounter(ght, pData->pszKey, pData->valueTo);
+ if (pData->value >= pData->valueTo - pData->step) {
+ pData->value = pData->valueFrom;
+ } else {
+ pData->value += pData->step;
+ }
+ val = pData->value;
+ pthread_mutex_unlock(&inst_mutex);
} else {
- pData->value += pData->step;
+ errmsg.LogError(0, RS_RET_ERR,
+ "mmsequence: mutex lock has failed!");
}
- val = pData->value;
break;
case mmSequencePerKey:
if (!pthread_mutex_lock(&ght_mutex)) {
@@ -381,6 +404,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/mmutf8fix/mmutf8fix.c b/plugins/mmutf8fix/mmutf8fix.c
index e2077950..351bb129 100644
--- a/plugins/mmutf8fix/mmutf8fix.c
+++ b/plugins/mmutf8fix/mmutf8fix.c
@@ -60,6 +60,10 @@ typedef struct _instanceData {
uint8_t mode; /* operations mode */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
@@ -108,6 +112,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
@@ -118,6 +127,11 @@ CODESTARTfreeInstance
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
static inline void
setInstParamDefaults(instanceData *pData)
{
@@ -274,10 +288,10 @@ CODESTARTdoAction
pMsg = (msg_t*) ppString[0];
lenMsg = getMSGLen(pMsg);
msg = getMSG(pMsg);
- if(pData->mode == MODE_CC) {
- doCC(pData, msg, lenMsg);
+ if(pWrkrData->pData->mode == MODE_CC) {
+ doCC(pWrkrData->pData, msg, lenMsg);
} else {
- doUTF8(pData, msg, lenMsg);
+ doUTF8(pWrkrData->pData, msg, lenMsg);
}
ENDdoAction
@@ -304,6 +318,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omhiredis/omhiredis.c b/plugins/omhiredis/omhiredis.c
index 7a35bac2..757d5eb2 100644
--- a/plugins/omhiredis/omhiredis.c
+++ b/plugins/omhiredis/omhiredis.c
@@ -97,7 +97,6 @@ BEGINfreeInstance
CODESTARTfreeInstance
closeHiredis(pData);
free(pData->server);
- free(pData->tplName);
ENDfreeInstance
@@ -196,9 +195,9 @@ CODESTARTendTransaction
for ( i = 0; i < pData->count; i++ ) {
redisGetReply ( pData->conn, (void *)&pData->replies[i] );
/* TODO: add error checking here! */
- free ( pData->replies[i] );
+ freeReplyObject ( pData->replies[i] );
}
- freeReplyObject ( pData->replies );
+ free ( pData->replies );
pData->count = 0;
ENDendTransaction
diff --git a/plugins/omlibdbi/omlibdbi.c b/plugins/omlibdbi/omlibdbi.c
index 3beba4f0..c203b4aa 100644
--- a/plugins/omlibdbi/omlibdbi.c
+++ b/plugins/omlibdbi/omlibdbi.c
@@ -50,6 +50,9 @@
#include "errmsg.h"
#include "conf.h"
+#undef HAVE_DBI_TXSUPP
+#warning transaction support disabled in v8 -- TODO: reenable
+
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
MODULE_CNFNAME("omlibdbi")
@@ -73,6 +76,10 @@ typedef struct _instanceData {
int txSupport; /* transaction support */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
uchar *dbiDrvrDir; /* global: where do the dbi drivers reside? */
uchar *drvrName; /* driver to use */
@@ -94,6 +101,8 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
+
/* tables for interfacing with the v6 config system */
/* module-global parameters */
@@ -157,6 +166,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -187,6 +200,9 @@ CODESTARTfreeInstance
free(pData->dbName);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -326,16 +342,16 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- if(pData->conn == NULL) {
- iRet = initConn(pData, 1);
+ if(pWrkrData->pData->conn == NULL) {
+ iRet = initConn(pWrkrData->pData, 1);
}
ENDtryResume
/* transaction support 2013-03 */
BEGINbeginTransaction
CODESTARTbeginTransaction
- if(pData->conn == NULL) {
- CHKiRet(initConn(pData, 0));
+ if(pWrkrData->pData->conn == NULL) {
+ CHKiRet(initConn(pWrkrData->pData, 0));
}
# if HAVE_DBI_TXSUPP
if (pData->txSupport == 1) {
@@ -355,13 +371,15 @@ ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
- CHKiRet(writeDB(ppString[0], pData));
+ pthread_mutex_lock(&mutDoAct);
+ CHKiRet(writeDB(ppString[0], pWrkrData->pData));
# if HAVE_DBI_TXSUPP
if (pData->txSupport == 1) {
iRet = RS_RET_DEFER_COMMIT;
}
# endif
finalize_it:
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
/* transaction support 2013-03 */
@@ -552,6 +570,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
diff --git a/plugins/ommail/ommail.c b/plugins/ommail/ommail.c
index 0a781e10..910b371a 100644
--- a/plugins/ommail/ommail.c
+++ b/plugins/ommail/ommail.c
@@ -13,7 +13,7 @@
*
* File begun on 2008-04-04 by RGerhards
*
- * Copyright 2008-2012 Adiscon GmbH.
+ * Copyright 2008-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -82,13 +82,21 @@ typedef struct _instanceData {
uchar *pszSrvPort;
uchar *pszFrom;
toRcpt_t *lstRcpt;
+ } smtp;
+ } md; /* mode-specific data */
+} instanceData;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ union {
+ struct {
char RcvBuf[1024]; /* buffer for receiving server responses */
size_t lenRcvBuf;
size_t iRcvBuf; /* current index into the rcvBuf (buf empty if iRcvBuf == lenRcvBuf) */
int sock; /* socket to this server (most important when we do multiple msgs per mail) */
} smtp;
} md; /* mode-specific data */
-} instanceData;
+} wrkrInstanceData_t;
typedef struct configSettings_s {
toRcpt_t *lstRcpt;
@@ -112,7 +120,7 @@ ENDinitConfVars
/* forward definitions (as few as possible) */
static rsRetVal Send(int sock, char *msg, size_t len);
-static rsRetVal readResponse(instanceData *pData, int *piState, int iExpected);
+static rsRetVal readResponse(wrkrInstanceData_t *pWrkrData, int *piState, int iExpected);
/* helpers for handling the recipient lists */
@@ -163,24 +171,22 @@ finalize_it:
* iStatusToCheck < 0 means no checking should happen
*/
static rsRetVal
-WriteRcpts(instanceData *pData, uchar *pszOp, size_t lenOp, int iStatusToCheck)
+WriteRcpts(wrkrInstanceData_t *pWrkrData, uchar *pszOp, size_t lenOp, int iStatusToCheck)
{
toRcpt_t *pRcpt;
int iState;
DEFiRet;
- assert(pData != NULL);
- assert(pszOp != NULL);
assert(lenOp != 0);
- for(pRcpt = pData->md.smtp.lstRcpt ; pRcpt != NULL ; pRcpt = pRcpt->pNext) {
+ for(pRcpt = pWrkrData->pData->md.smtp.lstRcpt ; pRcpt != NULL ; pRcpt = pRcpt->pNext) {
dbgprintf("Sending '%s: <%s>'\n", pszOp, pRcpt->pszTo);
- CHKiRet(Send(pData->md.smtp.sock, (char*)pszOp, lenOp));
- CHKiRet(Send(pData->md.smtp.sock, ":<", sizeof(":<") - 1));
- CHKiRet(Send(pData->md.smtp.sock, (char*)pRcpt->pszTo, strlen((char*)pRcpt->pszTo)));
- CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pszOp, lenOp));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, ":<", sizeof(":<") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pRcpt->pszTo, strlen((char*)pRcpt->pszTo)));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
if(iStatusToCheck >= 0)
- CHKiRet(readResponse(pData, &iState, iStatusToCheck));
+ CHKiRet(readResponse(pWrkrData, &iState, iStatusToCheck));
}
finalize_it:
@@ -193,6 +199,11 @@ CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -203,17 +214,19 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
if(pData->iMode == 0) {
- if(pData->md.smtp.pszSrv != NULL)
- free(pData->md.smtp.pszSrv);
- if(pData->md.smtp.pszSrvPort != NULL)
- free(pData->md.smtp.pszSrvPort);
- if(pData->md.smtp.pszFrom != NULL)
- free(pData->md.smtp.pszFrom);
+ free(pData->md.smtp.pszSrv);
+ free(pData->md.smtp.pszSrvPort);
+ free(pData->md.smtp.pszFrom);
lstRcptDestruct(pData->md.smtp.lstRcpt);
}
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
printf("mail"); /* TODO: extend! */
@@ -229,16 +242,16 @@ ENDdbgPrintInstInfo
* rgerhards, 2008-04-04
*/
static rsRetVal
-getRcvChar(instanceData *pData, char *pC)
+getRcvChar(wrkrInstanceData_t *pWrkrData, char *pC)
{
DEFiRet;
ssize_t lenBuf;
- assert(pData != NULL);
- if(pData->md.smtp.iRcvBuf == pData->md.smtp.lenRcvBuf) { /* buffer empty? */
+ if(pWrkrData->md.smtp.iRcvBuf == pWrkrData->md.smtp.lenRcvBuf) { /* buffer empty? */
/* yes, we need to read the next server response */
do {
- lenBuf = recv(pData->md.smtp.sock, pData->md.smtp.RcvBuf, sizeof(pData->md.smtp.RcvBuf), 0);
+ lenBuf = recv(pWrkrData->md.smtp.sock, pWrkrData->md.smtp.RcvBuf,
+ sizeof(pWrkrData->md.smtp.RcvBuf), 0);
if(lenBuf == 0) {
ABORT_FINALIZE(RS_RET_NO_MORE_DATA);
} else if(lenBuf < 0) {
@@ -247,15 +260,15 @@ getRcvChar(instanceData *pData, char *pC)
}
} else {
/* good read */
- pData->md.smtp.iRcvBuf = 0;
- pData->md.smtp.lenRcvBuf = lenBuf;
+ pWrkrData->md.smtp.iRcvBuf = 0;
+ pWrkrData->md.smtp.lenRcvBuf = lenBuf;
}
} while(lenBuf < 1);
}
/* when we reach this point, we have a non-empty buffer */
- *pC = pData->md.smtp.RcvBuf[pData->md.smtp.iRcvBuf++];
+ *pC = pWrkrData->md.smtp.RcvBuf[pWrkrData->md.smtp.iRcvBuf++];
finalize_it:
RETiRet;
@@ -266,14 +279,14 @@ finalize_it:
* rgerhards, 2008-04-08
*/
static rsRetVal
-serverDisconnect(instanceData *pData)
+serverDisconnect(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- assert(pData != NULL);
+ assert(pWrkrData != NULL);
- if(pData->md.smtp.sock != -1) {
- close(pData->md.smtp.sock);
- pData->md.smtp.sock = -1;
+ if(pWrkrData->md.smtp.sock != -1) {
+ close(pWrkrData->md.smtp.sock);
+ pWrkrData->md.smtp.sock = -1;
}
RETiRet;
@@ -284,16 +297,17 @@ serverDisconnect(instanceData *pData)
* rgerhards, 2008-04-04
*/
static rsRetVal
-serverConnect(instanceData *pData)
+serverConnect(wrkrInstanceData_t *pWrkrData)
{
struct addrinfo *res = NULL;
struct addrinfo hints;
char *smtpPort;
char *smtpSrv;
char errStr[1024];
-
+ instanceData *pData;
DEFiRet;
- assert(pData != NULL);
+
+ pData = pWrkrData->pData;
if(pData->md.smtp.pszSrv == NULL)
smtpSrv = "127.0.0.1";
@@ -313,12 +327,12 @@ serverConnect(instanceData *pData)
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
- if((pData->md.smtp.sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) == -1) {
+ if((pWrkrData->md.smtp.sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) == -1) {
dbgprintf("couldn't create send socket, reason %s", rs_strerror_r(errno, errStr, sizeof(errStr)));
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
- if(connect(pData->md.smtp.sock, res->ai_addr, res->ai_addrlen) != 0) {
+ if(connect(pWrkrData->md.smtp.sock, res->ai_addr, res->ai_addrlen) != 0) {
dbgprintf("create tcp connection failed, reason %s", rs_strerror_r(errno, errStr, sizeof(errStr)));
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
@@ -328,9 +342,9 @@ finalize_it:
freeaddrinfo(res);
if(iRet != RS_RET_OK) {
- if(pData->md.smtp.sock != -1) {
- close(pData->md.smtp.sock);
- pData->md.smtp.sock = -1;
+ if(pWrkrData->md.smtp.sock != -1) {
+ close(pWrkrData->md.smtp.sock);
+ pWrkrData->md.smtp.sock = -1;
}
}
@@ -374,7 +388,7 @@ finalize_it:
* The body is special in that we must escape a leading dot inside a line
*/
static rsRetVal
-bodySend(instanceData *pData, char *msg, size_t len)
+bodySend(wrkrInstanceData_t *pWrkrData, char *msg, size_t len)
{
DEFiRet;
char szBuf[2048];
@@ -383,12 +397,12 @@ bodySend(instanceData *pData, char *msg, size_t len)
int bHadCR = 0;
int bInStartOfLine = 1;
- assert(pData != NULL);
+ assert(pWrkrData != NULL);
assert(msg != NULL);
for(iSrc = 0 ; iSrc < len ; ++iSrc) {
if(iBuf >= sizeof(szBuf) - 1) { /* one is reserved for our extra dot */
- CHKiRet(Send(pData->md.smtp.sock, szBuf, iBuf));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, szBuf, iBuf));
iBuf = 0;
}
szBuf[iBuf++] = msg[iSrc];
@@ -413,7 +427,7 @@ bodySend(instanceData *pData, char *msg, size_t len)
}
if(iBuf > 0) { /* incomplete buffer to send (the *usual* case)? */
- CHKiRet(Send(pData->md.smtp.sock, szBuf, iBuf));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, szBuf, iBuf));
}
finalize_it:
@@ -424,17 +438,17 @@ finalize_it:
/* read response line from server
*/
static rsRetVal
-readResponseLn(instanceData *pData, char *pLn, size_t lenLn)
+readResponseLn(wrkrInstanceData_t *pWrkrData, char *pLn, size_t lenLn)
{
DEFiRet;
size_t i = 0;
char c;
- assert(pData != NULL);
+ assert(pWrkrData != NULL);
assert(pLn != NULL);
do {
- CHKiRet(getRcvChar(pData, &c));
+ CHKiRet(getRcvChar(pWrkrData, &c));
if(c == '\n')
break;
if(i < (lenLn - 1)) /* if line is too long, we simply discard the rest */
@@ -453,18 +467,18 @@ finalize_it:
* rgerhards, 2008-04-07
*/
static rsRetVal
-readResponse(instanceData *pData, int *piState, int iExpected)
+readResponse(wrkrInstanceData_t *pWrkrData, int *piState, int iExpected)
{
DEFiRet;
int bCont;
char buf[128];
- assert(pData != NULL);
+ assert(pWrkrData != NULL);
assert(piState != NULL);
bCont = 1;
do {
- CHKiRet(readResponseLn(pData, buf, sizeof(buf)));
+ CHKiRet(readResponseLn(pWrkrData, buf, sizeof(buf)));
/* note: the code below is not 100% clean as we may have received less than 4 characters.
* However, as we have a fixed size this will not create a vulnerability. An error will
* also most likely be generated, so it is quite acceptable IMHO -- rgerhards, 2008-04-08
@@ -506,64 +520,65 @@ mkSMTPTimestamp(uchar *pszBuf, size_t lenBuf)
* rgerhards, 2008-04-04
*/
static rsRetVal
-sendSMTP(instanceData *pData, uchar *body, uchar *subject)
+sendSMTP(wrkrInstanceData_t *pWrkrData, uchar *body, uchar *subject)
{
DEFiRet;
int iState; /* SMTP state */
+ instanceData *pData;
uchar szDateBuf[64];
- assert(pData != NULL);
+ pData = pWrkrData->pData;
- CHKiRet(serverConnect(pData));
- CHKiRet(readResponse(pData, &iState, 220));
+ CHKiRet(serverConnect(pWrkrData));
+ CHKiRet(readResponse(pWrkrData, &iState, 220));
- CHKiRet(Send(pData->md.smtp.sock, "HELO ", 5));
- CHKiRet(Send(pData->md.smtp.sock, (char*)glbl.GetLocalHostName(), strlen((char*)glbl.GetLocalHostName())));
- CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1));
- CHKiRet(readResponse(pData, &iState, 250));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "HELO ", 5));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)glbl.GetLocalHostName(), strlen((char*)glbl.GetLocalHostName())));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1));
+ CHKiRet(readResponse(pWrkrData, &iState, 250));
- CHKiRet(Send(pData->md.smtp.sock, "MAIL FROM:<", sizeof("MAIL FROM:<") - 1));
- CHKiRet(Send(pData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom)));
- CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
- CHKiRet(readResponse(pData, &iState, 250));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "MAIL FROM:<", sizeof("MAIL FROM:<") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom)));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
+ CHKiRet(readResponse(pWrkrData, &iState, 250));
- CHKiRet(WriteRcpts(pData, (uchar*)"RCPT TO", sizeof("RCPT TO") - 1, 250));
+ CHKiRet(WriteRcpts(pWrkrData, (uchar*)"RCPT TO", sizeof("RCPT TO") - 1, 250));
- CHKiRet(Send(pData->md.smtp.sock, "DATA\r\n", sizeof("DATA\r\n") - 1));
- CHKiRet(readResponse(pData, &iState, 354));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "DATA\r\n", sizeof("DATA\r\n") - 1));
+ CHKiRet(readResponse(pWrkrData, &iState, 354));
/* now come the data part */
/* header */
mkSMTPTimestamp(szDateBuf, sizeof(szDateBuf));
- CHKiRet(Send(pData->md.smtp.sock, (char*)szDateBuf, strlen((char*)szDateBuf)));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)szDateBuf, strlen((char*)szDateBuf)));
- CHKiRet(Send(pData->md.smtp.sock, "From: <", sizeof("From: <") - 1));
- CHKiRet(Send(pData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom)));
- CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "From: <", sizeof("From: <") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom)));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1));
- CHKiRet(WriteRcpts(pData, (uchar*)"To", sizeof("To") - 1, -1));
+ CHKiRet(WriteRcpts(pWrkrData, (uchar*)"To", sizeof("To") - 1, -1));
- CHKiRet(Send(pData->md.smtp.sock, "Subject: ", sizeof("Subject: ") - 1));
- CHKiRet(Send(pData->md.smtp.sock, (char*)subject, strlen((char*)subject)));
- CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "Subject: ", sizeof("Subject: ") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)subject, strlen((char*)subject)));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1));
- CHKiRet(Send(pData->md.smtp.sock, "X-Mailer: rsyslog-immail\r\n", sizeof("x-mailer: rsyslog-immail\r\n") - 1));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "X-Mailer: rsyslog-immail\r\n", sizeof("x-mailer: rsyslog-immail\r\n") - 1));
- CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); /* indicate end of header */
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); /* indicate end of header */
/* body */
if(pData->bEnableBody)
- CHKiRet(bodySend(pData, (char*)body, strlen((char*) body)));
+ CHKiRet(bodySend(pWrkrData, (char*)body, strlen((char*) body)));
/* end of data, back to envelope transaction */
- CHKiRet(Send(pData->md.smtp.sock, "\r\n.\r\n", sizeof("\r\n.\r\n") - 1));
- CHKiRet(readResponse(pData, &iState, 250));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n.\r\n", sizeof("\r\n.\r\n") - 1));
+ CHKiRet(readResponse(pWrkrData, &iState, 250));
- CHKiRet(Send(pData->md.smtp.sock, "QUIT\r\n", sizeof("QUIT\r\n") - 1));
- CHKiRet(readResponse(pData, &iState, 221));
+ CHKiRet(Send(pWrkrData->md.smtp.sock, "QUIT\r\n", sizeof("QUIT\r\n") - 1));
+ CHKiRet(readResponse(pWrkrData, &iState, 221));
/* we are finished, a new connection is created for each request, so let's close it now */
- CHKiRet(serverDisconnect(pData));
+ CHKiRet(serverDisconnect(pWrkrData));
finalize_it:
RETiRet;
@@ -583,8 +598,8 @@ finalize_it:
*/
BEGINtryResume
CODESTARTtryResume
- CHKiRet(serverConnect(pData));
- CHKiRet(serverDisconnect(pData)); /* if we fail, we will never reach this line */
+ CHKiRet(serverConnect(pWrkrData));
+ CHKiRet(serverDisconnect(pWrkrData)); /* if we fail, we will never reach this line */
finalize_it:
if(iRet == RS_RET_IO_ERROR)
iRet = RS_RET_SUSPENDED;
@@ -593,17 +608,14 @@ ENDtryResume
BEGINdoAction
CODESTARTdoAction
- dbgprintf(" Mail\n");
+ DBGPRINTF(" Mail\n");
- /* forward */
- if(pData->bHaveSubject)
- iRet = sendSMTP(pData, ppString[0], ppString[1]);
- else
- iRet = sendSMTP(pData, ppString[0], (uchar*)"message from rsyslog");
+ iRet = sendSMTP(pWrkrData, ppString[0],
+ (pWrkrData->pData->bHaveSubject) ?
+ ppString[1] : (uchar*)"message from rsyslog");
if(iRet != RS_RET_OK) {
- /* error! */
- dbgprintf("error sending mail, suspending\n");
+ DBGPRINTF("error sending mail, suspending\n");
iRet = RS_RET_SUSPENDED;
}
ENDdoAction
@@ -689,6 +701,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
ENDqueryEtryPt
diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c
index af1f5a37..09f19768 100644
--- a/plugins/ommongodb/ommongodb.c
+++ b/plugins/ommongodb/ommongodb.c
@@ -4,7 +4,7 @@
* mongodb C interface is crap. Obtain the library here:
* https://github.com/algernon/libmongo-client
*
- * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -71,6 +71,10 @@ typedef struct _instanceData {
int bErrMsgPermitted; /* only one errmsg permitted per connection */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
@@ -89,10 +93,16 @@ static struct cnfparamblk actpblk =
actpdescr
};
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
+
BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
/* use this to specify if select features are supported by this
@@ -126,6 +136,10 @@ CODESTARTfreeInstance
free(pData->tplName);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -422,14 +436,17 @@ error:
BEGINtryResume
CODESTARTtryResume
- if(pData->conn == NULL) {
- iRet = initMongoDB(pData, 1);
+ if(pWrkrData->pData->conn == NULL) {
+ iRet = initMongoDB(pWrkrData->pData, 1);
}
ENDtryResume
BEGINdoAction
bson *doc = NULL;
+ instanceData *pData;
CODESTARTdoAction
+ pthread_mutex_lock(&mutDoAct);
+ pData = pWrkrData->pData;
/* see if we are ready to proceed */
if(pData->conn == NULL) {
CHKiRet(initMongoDB(pData, 0));
@@ -454,6 +471,7 @@ CODESTARTdoAction
}
finalize_it:
+ pthread_mutex_unlock(&mutDoAct);
if(doc != NULL)
bson_free(doc);
ENDdoAction
@@ -560,6 +578,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/ompgsql/ompgsql.c b/plugins/ompgsql/ompgsql.c
index 11f346f6..87599484 100644
--- a/plugins/ompgsql/ompgsql.c
+++ b/plugins/ompgsql/ompgsql.c
@@ -6,7 +6,7 @@
*
* File begun on 2007-10-18 by sur5r (converted from ommysql.c)
*
- * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2007, 2013 Rainer Gerhards and Adiscon GmbH.
*
* The following link my be useful for the not-so-postgres literate
* when setting up a test environment (on Fedora):
@@ -66,11 +66,17 @@ typedef struct _instanceData {
ConnStatusType eLastPgSQLStatus; /* last status from postgres */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
EMPTY_STRUCT
} configSettings_t;
static configSettings_t __attribute__((unused)) cs;
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
+
BEGINinitConfVars /* (re)set config variables to default values */
CODESTARTinitConfVars
ENDinitConfVars
@@ -82,6 +88,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -108,6 +118,9 @@ CODESTARTfreeInstance
closePgSQL(pData);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -244,8 +257,8 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- if(pData->f_hpgsql == NULL) {
- iRet = initPgSQL(pData, 1);
+ if(pWrkrData->pData->f_hpgsql == NULL) {
+ iRet = initPgSQL(pWrkrData->pData, 1);
if(iRet == RS_RET_OK) {
/* the code above seems not to actually connect to the database. As such, we do a
* dummy statement (a pointless select...) to verify the connection and return
@@ -253,7 +266,7 @@ CODESTARTtryResume
* PostgreSQL expert, so any patch that does the desired result in a more
* intelligent way is highly welcome. -- rgerhards, 2009-12-16
*/
- iRet = writePgSQL((uchar*)"select 'a' as a", pData);
+ iRet = writePgSQL((uchar*)"select 'a' as a", pWrkrData->pData);
}
}
@@ -263,23 +276,25 @@ ENDtryResume
BEGINbeginTransaction
CODESTARTbeginTransaction
dbgprintf("ompgsql: beginTransaction\n");
- iRet = writePgSQL((uchar*) "begin", pData); /* TODO: make user-configurable */
+ iRet = writePgSQL((uchar*) "begin", pWrkrData->pData); /* TODO: make user-configurable */
ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
+ pthread_mutex_lock(&mutDoAct);
dbgprintf("\n");
- CHKiRet(writePgSQL(ppString[0], pData));
+ CHKiRet(writePgSQL(ppString[0], pWrkrData->pData));
if(bCoreSupportsBatching)
iRet = RS_RET_DEFER_COMMIT;
finalize_it:
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
BEGINendTransaction
CODESTARTendTransaction
- iRet = writePgSQL((uchar*) "commit;", pData); /* TODO: make user-configurable */
+ iRet = writePgSQL((uchar*) "commit;", pWrkrData->pData); /* TODO: make user-configurable */
dbgprintf("ompgsql: endTransaction\n");
ENDendTransaction
@@ -361,6 +376,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
ENDqueryEtryPt
@@ -372,6 +388,11 @@ INITLegCnfVars
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
+
+# warning: transaction support missing for v8
+ bCoreSupportsBatching= 0;
+ DBGPRINTF("ompgsql: transactions are not yet supported on v8\n");
+
DBGPRINTF("ompgsql: module compiled with rsyslog version %s.\n", VERSION);
DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not ");
ENDmodInit
diff --git a/plugins/omprog/omprog.c b/plugins/omprog/omprog.c
index cd07dcfb..1060b5c3 100644
--- a/plugins/omprog/omprog.c
+++ b/plugins/omprog/omprog.c
@@ -6,7 +6,7 @@
*
* File begun on 2009-04-01 by RGerhards
*
- * Copyright 2009-2012 Adiscon GmbH.
+ * Copyright 2009-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -35,6 +35,7 @@
#include <errno.h>
#include <unistd.h>
#include <wait.h>
+#include <pthread.h>
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
@@ -60,8 +61,13 @@ typedef struct _instanceData {
int fdPipe; /* file descriptor to write to */
int bIsRunning; /* is binary currently running? 0-no, 1-yes */
int iParams; /* Holds the count of parameters if set*/
+ pthread_mutex_t mut; /* make sure only one instance is active */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
uchar *szBinary; /* name of binary to call */
} configSettings_t;
@@ -89,8 +95,13 @@ ENDinitConfVars
BEGINcreateInstance
CODESTARTcreateInstance
+ pthread_mutex_init(&pData->mut, NULL);
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -102,6 +113,7 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
int i;
CODESTARTfreeInstance
+ pthread_mutex_destroy(&pData->mut);
if(pData->szBinary != NULL)
free(pData->szBinary);
if(pData->aParams != NULL) {
@@ -112,6 +124,10 @@ CODESTARTfreeInstance
}
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -287,22 +303,21 @@ writePipe(instanceData *pData, uchar *szMsg)
lenWrite = strlen((char*)szMsg);
writeOffset = 0;
- do
- {
+ do {
lenWritten = write(pData->fdPipe, ((char*)szMsg)+writeOffset, lenWrite);
if(lenWritten == -1) {
switch(errno) {
- case EPIPE:
- DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n",
- pData->szBinary);
- CHKiRet(cleanup(pData));
- CHKiRet(tryRestart(pData));
- break;
- default:
- DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno,
- rs_strerror_r(errno, errStr, sizeof(errStr)));
- ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE);
- break;
+ case EPIPE:
+ DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n",
+ pData->szBinary);
+ CHKiRet(cleanup(pData));
+ CHKiRet(tryRestart(pData));
+ break;
+ default:
+ DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno,
+ rs_strerror_r(errno, errStr, sizeof(errStr)));
+ ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE);
+ break;
}
} else {
writeOffset += lenWritten;
@@ -316,7 +331,10 @@ finalize_it:
BEGINdoAction
+ instanceData *pData;
CODESTARTdoAction
+ pData = pWrkrData->pData;
+ pthread_mutex_lock(&pData->mut);
if(pData->bIsRunning == 0) {
openPipe(pData);
}
@@ -325,6 +343,7 @@ CODESTARTdoAction
if(iRet != RS_RET_OK)
iRet = RS_RET_SUSPENDED;
+ pthread_mutex_unlock(&pData->mut);
ENDdoAction
@@ -496,6 +515,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c
index 34511e46..3b1584e2 100644
--- a/plugins/omrelp/omrelp.c
+++ b/plugins/omrelp/omrelp.c
@@ -2,8 +2,17 @@
*
* This is the implementation of the RELP output module.
*
- * NOTE: read comments in module-template.h to understand how this file
- * works!
+ * Note that when multiple action workers are activated, we currently
+ * also create multiple actions. This may be the source of some mild
+ * message loss (!) if the worker instance is shut down while the
+ * connection to the remote system is in retry state.
+ * TODO: think if we should implement a mode where we do NOT
+ * support multiple action worker instances. This would be
+ * slower, but not have this loss opportunity. But it should
+ * definitely be optional and by default off due to the
+ * performance implications (and given the fact that message
+ * loss is pretty unlikely in usual cases).
+ *
*
* File begun on 2008-03-13 by RGerhards
*
@@ -63,13 +72,9 @@ static relpEngine_t *pRelpEngine; /* our relp engine */
typedef struct _instanceData {
uchar *target;
uchar *port;
- int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
- int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
int sizeWindow; /**< the RELP window size - 0=use default */
unsigned timeout;
unsigned rebindInterval;
- unsigned nSent;
- relpClt_t *pRelpClt; /* relp client for this instance */
sbool bEnableTLS;
sbool bEnableTLSZip;
sbool bHadAuthFail; /**< set on auth failure, will cause retry to disable action */
@@ -85,11 +90,20 @@ typedef struct _instanceData {
} permittedPeers;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
+ int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
+ relpClt_t *pRelpClt; /* relp client for this instance */
+ unsigned nSent; /* number msgs sent - for rebind support */
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
EMPTY_STRUCT
} configSettings_t;
static configSettings_t __attribute__((unused)) cs;
+static rsRetVal doCreateRelpClient(wrkrInstanceData_t *pWrkrData);
/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
@@ -135,10 +149,10 @@ static uchar *getRelpPt(instanceData *pData)
static void
onErr(void *pUsr, char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
{
- instanceData *pData = (instanceData*) pUsr;
+ wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) pUsr;
errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: error '%s', object "
" '%s' - action may not work as intended",
- pData->target, pData->port, errmesg, objinfo);
+ pWrkrData->pData->target, pWrkrData->pData->port, errmesg, objinfo);
}
static void
@@ -152,55 +166,58 @@ onGenericErr(char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal er
static void
onAuthErr(void *pUsr, char *authinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
{
- instanceData *pData = (instanceData*) pUsr;
+ instanceData *pData = ((wrkrInstanceData_t*) pUsr)->pData;
errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: authentication error '%s', peer "
"is '%s' - DISABLING action", pData->target, pData->port, errmesg, authinfo);
pData->bHadAuthFail = 1;
}
-static inline rsRetVal
-doCreateRelpClient(instanceData *pData)
+static rsRetVal
+doCreateRelpClient(wrkrInstanceData_t *pWrkrData)
{
int i;
+ instanceData *pData;
DEFiRet;
- if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK)
+
+ pData = pWrkrData->pData;
+ if(relpEngineCltConstruct(pRelpEngine, &pWrkrData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK)
+ if(relpCltSetTimeout(pWrkrData->pRelpClt, pData->timeout) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetWindowSize(pData->pRelpClt, pData->sizeWindow) != RELP_RET_OK)
+ if(relpCltSetWindowSize(pWrkrData->pRelpClt, pData->sizeWindow) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetUsrPtr(pData->pRelpClt, pData) != RELP_RET_OK)
+ if(relpCltSetUsrPtr(pWrkrData->pRelpClt, pWrkrData) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
if(pData->bEnableTLS) {
- if(relpCltEnableTLS(pData->pRelpClt) != RELP_RET_OK)
+ if(relpCltEnableTLS(pWrkrData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
if(pData->bEnableTLSZip) {
- if(relpCltEnableTLSZip(pData->pRelpClt) != RELP_RET_OK)
+ if(relpCltEnableTLSZip(pWrkrData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
}
- if(relpCltSetGnuTLSPriString(pData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
+ if(relpCltSetGnuTLSPriString(pWrkrData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetAuthMode(pData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) {
+ if(relpCltSetAuthMode(pWrkrData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) {
errmsg.LogError(0, RS_RET_RELP_ERR,
"omrelp: invalid auth mode '%s'\n", pData->authmode);
ABORT_FINALIZE(RS_RET_RELP_ERR);
}
- if(relpCltSetCACert(pData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
+ if(relpCltSetCACert(pWrkrData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetOwnCert(pData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
+ if(relpCltSetOwnCert(pWrkrData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetPrivKey(pData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
+ if(relpCltSetPrivKey(pWrkrData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
for(i = 0 ; i < pData->permittedPeers.nmemb ; ++i) {
- relpCltAddPermittedPeer(pData->pRelpClt, (char*)pData->permittedPeers.name[i]);
+ relpCltAddPermittedPeer(pWrkrData->pRelpClt, (char*)pData->permittedPeers.name[i]);
}
}
if(glbl.GetSourceIPofLocalClient() == NULL) { /* ar Do we have a client IP set? */
- if(relpCltSetClientIP(pData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
+ if(relpCltSetClientIP(pWrkrData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
}
- pData->bInitialConnect = 1;
- pData->nSent = 0;
+ pWrkrData->bInitialConnect = 1;
+ pWrkrData->nSent = 0;
finalize_it:
RETiRet;
}
@@ -221,11 +238,15 @@ CODESTARTcreateInstance
pData->permittedPeers.nmemb = 0;
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->pRelpClt = NULL;
+ iRet = doCreateRelpClient(pWrkrData);
+ENDcreateWrkrInstance
+
BEGINfreeInstance
int i;
CODESTARTfreeInstance
- if(pData->pRelpClt != NULL)
- relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt);
free(pData->target);
free(pData->port);
free(pData->tplName);
@@ -239,6 +260,12 @@ CODESTARTfreeInstance
}
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ if(pWrkrData->pRelpClt != NULL)
+ relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt);
+ENDfreeWrkrInstance
+
static inline void
setInstParamDefaults(instanceData *pData)
{
@@ -318,8 +345,6 @@ CODESTARTnewActInst
"RSYSLOG_ForwardFormat" : (char*)pData->tplName),
OMSR_NO_RQD_TPL_OPTS));
- CHKiRet(doCreateRelpClient(pData));
-
CODE_STD_FINALIZERnewActInst
if(pvals != NULL)
cnfparamvalsDestruct(pvals, &actpblk);
@@ -347,22 +372,23 @@ ENDdbgPrintInstInfo
/* try to connect to server
* rgerhards, 2008-03-21
*/
-static rsRetVal doConnect(instanceData *pData)
+static rsRetVal doConnect(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- if(pData->bInitialConnect) {
- iRet = relpCltConnect(pData->pRelpClt, glbl.GetDefPFFamily(), pData->port, pData->target);
+ if(pWrkrData->bInitialConnect) {
+ iRet = relpCltConnect(pWrkrData->pRelpClt, glbl.GetDefPFFamily(),
+ pWrkrData->pData->port, pWrkrData->pData->target);
if(iRet == RELP_RET_OK)
- pData->bInitialConnect = 0;
+ pWrkrData->bInitialConnect = 0;
} else {
- iRet = relpCltReconnect(pData->pRelpClt);
+ iRet = relpCltReconnect(pWrkrData->pRelpClt);
}
if(iRet == RELP_RET_OK) {
- pData->bIsConnected = 1;
+ pWrkrData->bIsConnected = 1;
} else {
- pData->bIsConnected = 0;
+ pWrkrData->bIsConnected = 0;
iRet = RS_RET_SUSPENDED;
}
@@ -372,21 +398,21 @@ static rsRetVal doConnect(instanceData *pData)
BEGINtryResume
CODESTARTtryResume
- if(pData->bHadAuthFail) {
+ if(pWrkrData->pData->bHadAuthFail) {
ABORT_FINALIZE(RS_RET_DISABLE_ACTION);
}
- iRet = doConnect(pData);
+ iRet = doConnect(pWrkrData);
finalize_it:
ENDtryResume
static inline rsRetVal
-doRebind(instanceData *pData)
+doRebind(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n");
- CHKiRet(relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt));
- pData->bIsConnected = 0;
- CHKiRet(doCreateRelpClient(pData));
+ CHKiRet(relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt));
+ pWrkrData->bIsConnected = 0;
+ CHKiRet(doCreateRelpClient(pWrkrData));
finalize_it:
RETiRet;
}
@@ -394,10 +420,10 @@ finalize_it:
BEGINbeginTransaction
CODESTARTbeginTransaction
dbgprintf("omrelp: beginTransaction\n");
- if(!pData->bIsConnected) {
- CHKiRet(doConnect(pData));
+ if(!pWrkrData->bIsConnected) {
+ CHKiRet(doConnect(pWrkrData));
}
- relpCltHintBurstBegin(pData->pRelpClt);
+ relpCltHintBurstBegin(pWrkrData->pRelpClt);
finalize_it:
ENDbeginTransaction
@@ -405,11 +431,13 @@ BEGINdoAction
uchar *pMsg; /* temporary buffering */
size_t lenMsg;
relpRetVal ret;
+ instanceData *pData;
CODESTARTdoAction
+ pData = pWrkrData->pData;
dbgprintf(" %s:%s/RELP\n", pData->target, getRelpPt(pData));
- if(!pData->bIsConnected) {
- CHKiRet(doConnect(pData));
+ if(!pWrkrData->bIsConnected) {
+ CHKiRet(doConnect(pWrkrData));
}
pMsg = ppString[0];
@@ -420,7 +448,7 @@ CODESTARTdoAction
lenMsg = glbl.GetMaxLine();
/* forward */
- ret = relpCltSendSyslog(pData->pRelpClt, (uchar*) pMsg, lenMsg);
+ ret = relpCltSendSyslog(pWrkrData->pRelpClt, (uchar*) pMsg, lenMsg);
if(ret != RELP_RET_OK) {
/* error! */
dbgprintf("error forwarding via relp, suspending\n");
@@ -428,8 +456,8 @@ CODESTARTdoAction
}
if(pData->rebindInterval != 0 &&
- (++pData->nSent >= pData->rebindInterval)) {
- doRebind(pData);
+ (++pWrkrData->nSent >= pData->rebindInterval)) {
+ doRebind(pWrkrData);
}
finalize_it:
if(pData->bHadAuthFail)
@@ -448,7 +476,7 @@ ENDdoAction
BEGINendTransaction
CODESTARTendTransaction
dbgprintf("omrelp: endTransaction\n");
- relpCltHintBurstEnd(pData->pRelpClt);
+ relpCltHintBurstEnd(pWrkrData->pRelpClt);
ENDendTransaction
BEGINparseSelectorAct
@@ -527,8 +555,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* process template */
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) "RSYSLOG_ForwardFormat"));
- CHKiRet(doCreateRelpClient(pData));
-
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -546,6 +572,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES
diff --git a/plugins/omsnmp/omsnmp.c b/plugins/omsnmp/omsnmp.c
index 42d1de6b..02862c94 100644
--- a/plugins/omsnmp/omsnmp.c
+++ b/plugins/omsnmp/omsnmp.c
@@ -2,7 +2,7 @@
*
* This module sends an snmp trap.
*
- * Copyright 2007-2012 Adiscon GmbH.
+ * Copyright 2007-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -74,15 +74,19 @@ typedef struct _instanceData {
* http://www.adiscon.org/download/ADISCON-MONITORWARE-MIB.txt
* http://www.adiscon.org/download/ADISCON-MIB.txt
*/
- int iPort; /* Target Port */
- int iSNMPVersion; /* SNMP Version to use */
- int iTrapType; /* Snmp TrapType or GenericType */
- int iSpecificType; /* Snmp Specific Type */
+ int iPort; /* Target Port */
+ int iSNMPVersion; /* SNMP Version to use */
+ int iTrapType; /* Snmp TrapType or GenericType */
+ int iSpecificType; /* Snmp Specific Type */
- netsnmp_session *snmpsession; /* Holds to SNMP Session, NULL if not initialized */
- uchar *tplName; /* format template to use */
+ uchar *tplName; /* format template to use */
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ netsnmp_session *snmpsession; /* Holds to SNMP Session, NULL if not initialized */
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
uchar* pszTransport; /* default transport */
uchar* pszTarget;
@@ -147,6 +151,10 @@ BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->snmpsession = NULL;
+ENDcreateWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -171,14 +179,16 @@ ENDisCompatibleWithFeature
/* Exit SNMP Session
* alorbach, 2008-02-12
*/
-static rsRetVal omsnmp_exitSession(instanceData *pData)
+static rsRetVal
+omsnmp_exitSession(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- if(pData->snmpsession != NULL) {
- dbgprintf( "omsnmp_exitSession: Clearing Session to '%s' on Port = '%d'\n", pData->szTarget, pData->iPort);
- snmp_close(pData->snmpsession);
- pData->snmpsession = NULL;
+ if(pWrkrData->snmpsession != NULL) {
+ DBGPRINTF("omsnmp_exitSession: Clearing Session to '%s' on Port = '%d'\n",
+ pWrkrData->pData->szTarget, pWrkrData->pData->iPort);
+ snmp_close(pWrkrData->snmpsession);
+ pWrkrData->snmpsession = NULL;
}
RETiRet;
@@ -187,15 +197,19 @@ static rsRetVal omsnmp_exitSession(instanceData *pData)
/* Init SNMP Session
* alorbach, 2008-02-12
*/
-static rsRetVal omsnmp_initSession(instanceData *pData)
+static rsRetVal
+omsnmp_initSession(wrkrInstanceData_t *pWrkrData)
{
netsnmp_session session;
+ instanceData *pData;
char szTargetAndPort[MAXHOSTNAMELEN+128]; /* work buffer for specifying a full target and port string */
DEFiRet;
/* should not happen, but if session is not cleared yet - we do it now! */
- if (pData->snmpsession != NULL)
- omsnmp_exitSession(pData);
+ if (pWrkrData->snmpsession != NULL)
+ omsnmp_exitSession(pWrkrData);
+
+ pData = pWrkrData->pData;
snprintf((char*)szTargetAndPort, sizeof(szTargetAndPort), "%s:%s:%d",
(pData->szTransport == NULL) ? "udp" : (char*)pData->szTransport,
@@ -217,8 +231,8 @@ static rsRetVal omsnmp_initSession(instanceData *pData)
session.community_len = strlen((char*) session.community);
}
- pData->snmpsession = snmp_open(&session);
- if (pData->snmpsession == NULL) {
+ pWrkrData->snmpsession = snmp_open(&session);
+ if (pWrkrData->snmpsession == NULL) {
errmsg.LogError(0, RS_RET_SUSPENDED, "omsnmp_initSession: snmp_open to host '%s' on Port '%d' failed\n", pData->szTarget, pData->iPort);
/* Stay suspended */
iRet = RS_RET_SUSPENDED;
@@ -227,7 +241,7 @@ static rsRetVal omsnmp_initSession(instanceData *pData)
RETiRet;
}
-static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz)
+static rsRetVal omsnmp_sendsnmp(wrkrInstanceData_t *pWrkrData, uchar *psz)
{
DEFiRet;
@@ -239,10 +253,12 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz)
int status;
char *trap = NULL;
const char *strErr = NULL;
+ instanceData *pData;
+ pData = pWrkrData->pData;
/* Init SNMP Session if necessary */
- if (pData->snmpsession == NULL) {
- CHKiRet(omsnmp_initSession(pData));
+ if (pWrkrData->snmpsession == NULL) {
+ CHKiRet(omsnmp_initSession(pWrkrData));
}
/* String should not be NULL */
@@ -250,7 +266,7 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz)
dbgprintf( "omsnmp_sendsnmp: ENTER - Syslogmessage = '%s'\n", (char*)psz);
/* If SNMP Version1 is configured !*/
- if(pData->snmpsession->version == SNMP_VERSION_1) {
+ if(pWrkrData->snmpsession->version == SNMP_VERSION_1) {
pdu = snmp_pdu_create(SNMP_MSG_TRAP);
/* Set enterprise */
@@ -275,7 +291,7 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz)
pdu->time = get_uptime();
}
/* If SNMP Version2c is configured !*/
- else if (pData->snmpsession->version == SNMP_VERSION_2c)
+ else if (pWrkrData->snmpsession->version == SNMP_VERSION_2c)
{
long sysuptime;
char csysuptime[20];
@@ -320,15 +336,15 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz)
}
/* Send the TRAP */
- status = snmp_send(pData->snmpsession, pdu) == 0;
+ status = snmp_send(pWrkrData->snmpsession, pdu) == 0;
if (status)
{
/* Debug Output! */
- int iErrorCode = pData->snmpsession->s_snmp_errno;
+ int iErrorCode = pWrkrData->snmpsession->s_snmp_errno;
errmsg.LogError(0, RS_RET_SUSPENDED, "omsnmp_sendsnmp: snmp_send failed error '%d', Description='%s'\n", iErrorCode*(-1), api_errors[iErrorCode*(-1)]);
/* Clear Session */
- omsnmp_exitSession(pData);
+ omsnmp_exitSession(pWrkrData);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
@@ -347,7 +363,7 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- iRet = omsnmp_initSession(pData);
+ iRet = omsnmp_initSession(pWrkrData);
ENDtryResume
BEGINdoAction
@@ -358,19 +374,20 @@ CODESTARTdoAction
}
/* This will generate and send the SNMP Trap */
- iRet = omsnmp_sendsnmp(pData, ppString[0]);
+ iRet = omsnmp_sendsnmp(pWrkrData, ppString[0]);
finalize_it:
ENDdoAction
BEGINfreeInstance
CODESTARTfreeInstance
- /* free snmp Session here */
- omsnmp_exitSession(pData);
-
free(pData->tplName);
free(pData->szTarget);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ omsnmp_exitSession(pWrkrData);
+ENDfreeWrkrInstance
static inline void
setInstParamDefaults(instanceData *pData)
@@ -499,9 +516,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* Set some defaults in the NetSNMP library */
netsnmp_ds_set_int(NETSNMP_DS_LIBRARY_ID, NETSNMP_DS_LIB_DEFAULT_PORT, pData->iPort );
-
- /* Init Session Pointer */
- pData->snmpsession = NULL;
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -545,6 +559,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt
diff --git a/plugins/omudpspoof/omudpspoof.c b/plugins/omudpspoof/omudpspoof.c
index cb907bba..ad3508c8 100644
--- a/plugins/omudpspoof/omudpspoof.c
+++ b/plugins/omudpspoof/omudpspoof.c
@@ -98,15 +98,19 @@ typedef struct _instanceData {
uchar *port;
uchar *sourceTpl;
int mtu;
- int *pSockArray; /* sockets to use for UDP */
- struct addrinfo *f_addr;
- u_short sourcePort;
u_short sourcePortStart; /* for sorce port iteration */
u_short sourcePortEnd;
int bReportLibnetInitErr; /* help prevent multiple error messages on init err */
+} instanceData;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
libnet_t *libnet_handle;
+ u_short sourcePort;
+ int *pSockArray; /* sockets to use for UDP */
+ struct addrinfo *f_addr;
char errbuf[LIBNET_ERRBUF_SIZE];
-} instanceData;
+} wrkrInstanceData_t;
#define DFLT_SOURCE_PORT_START 32000
#define DFLT_SOURCE_PORT_END 42000
@@ -172,7 +176,7 @@ ENDinitConfVars
pthread_mutex_t mutLibnet;
/* forward definitions */
-static rsRetVal doTryResume(instanceData *pData);
+static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData);
/* this function gets the default template. It coordinates action between
@@ -217,15 +221,14 @@ finalize_it:
* rgerhards, 2009-05-29
*/
static rsRetVal
-closeUDPSockets(instanceData *pData)
+closeUDPSockets(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- assert(pData != NULL);
- if(pData->pSockArray != NULL) {
- net.closeUDPListenSockets(pData->pSockArray);
- pData->pSockArray = NULL;
- freeaddrinfo(pData->f_addr);
- pData->f_addr = NULL;
+ if(pWrkrData->pSockArray != NULL) {
+ net.closeUDPListenSockets(pWrkrData->pSockArray);
+ pWrkrData->pSockArray = NULL;
+ freeaddrinfo(pWrkrData->f_addr);
+ pWrkrData->f_addr = NULL;
}
RETiRet;
}
@@ -310,12 +313,17 @@ ENDfreeCnf
BEGINcreateInstance
CODESTARTcreateInstance
- pData->libnet_handle = NULL;
pData->mtu = 1500;
pData->bReportLibnetInitErr = 1;
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->libnet_handle = NULL;
+ pWrkrData->sourcePort = pData->sourcePortStart;
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@@ -326,15 +334,19 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
/* final cleanup */
- closeUDPSockets(pData);
free(pData->tplName);
free(pData->port);
free(pData->host);
free(pData->sourceTpl);
- if(pData->libnet_handle != NULL)
- libnet_destroy(pData->libnet_handle);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ closeUDPSockets(pWrkrData);
+ if(pWrkrData->libnet_handle != NULL)
+ libnet_destroy(pWrkrData->libnet_handle);
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -348,11 +360,12 @@ ENDdbgPrintInstInfo
* rgehards, 2007-12-20
*/
static inline rsRetVal
-UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
+UDPSend(wrkrInstanceData_t *pWrkrData, uchar *pszSourcename, char *msg, size_t len)
{
struct addrinfo *r;
int lsent = 0;
int bSendSuccess;
+ instanceData *pData;
struct sockaddr_in *tempaddr,source_ip;
libnet_ptag_t ip, ipo;
libnet_ptag_t udp;
@@ -363,9 +376,10 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
unsigned maxPktLen, pktLen;
DEFiRet;
- if(pData->pSockArray == NULL) {
- CHKiRet(doTryResume(pData));
+ if(pWrkrData->pSockArray == NULL) {
+ CHKiRet(doTryResume(pWrkrData));
}
+ pData = pWrkrData->pData;
if(len > 65528) {
DBGPRINTF("omudpspoof: msg with length %d truncated to 64k: '%.768s'\n",
@@ -374,8 +388,8 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
}
ip = ipo = udp = 0;
- if(pData->sourcePort++ >= pData->sourcePortEnd){
- pData->sourcePort = pData->sourcePortStart;
+ if(pWrkrData->sourcePort++ >= pData->sourcePortEnd){
+ pWrkrData->sourcePort = pData->sourcePortStart;
}
inet_pton(AF_INET, (char*)pszSourcename, &(source_ip.sin_addr));
@@ -383,7 +397,7 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
bSendSuccess = RSFALSE;
d_pthread_mutex_lock(&mutLibnet);
bNeedUnlock = 1;
- for (r = pData->f_addr; r && bSendSuccess == RSFALSE ; r = r->ai_next) {
+ for (r = pWrkrData->f_addr; r && bSendSuccess == RSFALSE ; r = r->ai_next) {
tempaddr = (struct sockaddr_in *)r->ai_addr;
/* Getting max payload size (must be multiple of 8) */
maxPktLen = (pData->mtu - LIBNET_IPV4_H) & ~0x07;
@@ -400,19 +414,19 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
}
DBGPRINTF("omudpspoof: stage 1: MF:%d, hdrOffs %d, pktLen %d\n",
(hdrOffs & IP_MF) >> 13, (hdrOffs & 0x1FFF) << 3, pktLen);
- libnet_clear_packet(pData->libnet_handle);
+ libnet_clear_packet(pWrkrData->libnet_handle);
/* note: libnet does need ports in host order NOT in network byte order! -- rgerhards, 2009-11-12 */
udp = libnet_build_udp(
- ntohs(pData->sourcePort),/* source port */
+ ntohs(pWrkrData->sourcePort),/* source port */
ntohs(tempaddr->sin_port),/* destination port */
pktLen+LIBNET_UDP_H, /* packet length */
0, /* checksum */
(u_char*)msg, /* payload */
pktLen, /* payload size */
- pData->libnet_handle, /* libnet handle */
+ pWrkrData->libnet_handle, /* libnet handle */
udp); /* libnet id */
if (udp == -1) {
- DBGPRINTF("omudpspoof: can't build UDP header: %s\n", libnet_geterror(pData->libnet_handle));
+ DBGPRINTF("omudpspoof: can't build UDP header: %s\n", libnet_geterror(pWrkrData->libnet_handle));
}
ip = libnet_build_ipv4(
@@ -427,22 +441,22 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
tempaddr->sin_addr.s_addr,
NULL, /* payload */
0, /* payload size */
- pData->libnet_handle, /* libnet handle */
+ pWrkrData->libnet_handle, /* libnet handle */
ip); /* libnet id */
if (ip == -1) {
- DBGPRINTF("omudpspoof: can't build IP header: %s\n", libnet_geterror(pData->libnet_handle));
+ DBGPRINTF("omudpspoof: can't build IP header: %s\n", libnet_geterror(pWrkrData->libnet_handle));
}
/* Write it to the wire. */
- lsent = libnet_write(pData->libnet_handle);
+ lsent = libnet_write(pWrkrData->libnet_handle);
if(lsent != (int) (LIBNET_IPV4_H+LIBNET_UDP_H+pktLen)) {
/* note: access to fd is a libnet internal. If a newer version of libnet does
* not expose that member, we should simply remove it. However, while it is there
* it is useful for consolidating with strace output.
*/
DBGPRINTF("omudpspoof: write error (total len %d): pktLen %d, sent %d, fd %d: %s\n",
- len, LIBNET_IPV4_H+LIBNET_UDP_H+pktLen, lsent, pData->libnet_handle->fd,
- libnet_geterror(pData->libnet_handle));
+ len, LIBNET_IPV4_H+LIBNET_UDP_H+pktLen, lsent, pWrkrData->libnet_handle->fd,
+ libnet_geterror(pWrkrData->libnet_handle));
if(lsent != -1) {
bSendSuccess = RSTRUE;
}
@@ -452,7 +466,7 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
msgOffs += pktLen;
/* We need to get rid of the UDP header to build the other fragments */
- libnet_clear_packet(pData->libnet_handle);
+ libnet_clear_packet(pWrkrData->libnet_handle);
ip = LIBNET_PTAG_INITIALIZER;
while(len > msgOffs ) { /* loop until all payload is sent */
/* check if there will be more fragments */
@@ -481,16 +495,16 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
tempaddr->sin_addr.s_addr,
(uint8_t*)(msg+msgOffs), /* payload */
pktLen, /* payload size */
- pData->libnet_handle, /* libnet handle */
+ pWrkrData->libnet_handle, /* libnet handle */
ip); /* libnet id */
if (ip == -1) {
- DBGPRINTF("omudpspoof: can't build IP fragment header: %s\n", libnet_geterror(pData->libnet_handle));
+ DBGPRINTF("omudpspoof: can't build IP fragment header: %s\n", libnet_geterror(pWrkrData->libnet_handle));
}
/* Write it to the wire. */
- lsent = libnet_write(pData->libnet_handle);
+ lsent = libnet_write(pWrkrData->libnet_handle);
if(lsent != (int) (LIBNET_IPV4_H+pktLen)) {
DBGPRINTF("omudpspoof: fragment write error len %d, sent %d: %s\n",
- LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pData->libnet_handle));
+ LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pWrkrData->libnet_handle));
bSendSuccess = RSFALSE;
continue;
}
@@ -500,9 +514,9 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len)
finalize_it:
if(iRet != RS_RET_OK) {
- if(pData->libnet_handle != NULL) {
- libnet_destroy(pData->libnet_handle);
- pData->libnet_handle = NULL;
+ if(pWrkrData->libnet_handle != NULL) {
+ libnet_destroy(pWrkrData->libnet_handle);
+ pWrkrData->libnet_handle = NULL;
}
}
if(bNeedUnlock) {
@@ -515,26 +529,28 @@ finalize_it:
/* try to resume connection if it is not ready
* rgerhards, 2007-08-02
*/
-static rsRetVal doTryResume(instanceData *pData)
+static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData)
{
int iErr;
struct addrinfo *res;
struct addrinfo hints;
+ instanceData *pData;
DEFiRet;
- if(pData->pSockArray != NULL)
+ if(pWrkrData->pSockArray != NULL)
FINALIZE;
+ pData = pWrkrData->pData;
- if(pData->libnet_handle == NULL) {
+ if(pWrkrData->libnet_handle == NULL) {
/* Initialize the libnet library. Root priviledges are required.
* this initializes a IPv4 socket to use for forging UDP packets.
*/
- pData->libnet_handle = libnet_init(
+ pWrkrData->libnet_handle = libnet_init(
LIBNET_RAW4, /* injection type */
NULL, /* network interface */
- pData->errbuf); /* errbuf */
+ pWrkrData->errbuf); /* errbuf */
- if(pData->libnet_handle == NULL) {
+ if(pWrkrData->libnet_handle == NULL) {
if(pData->bReportLibnetInitErr) {
errmsg.LogError(0, RS_RET_ERR_LIBNET_INIT, "omudpsoof: error "
"initializing libnet - are you running as root?");
@@ -559,14 +575,14 @@ static rsRetVal doTryResume(instanceData *pData)
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
DBGPRINTF("%s found, resuming.\n", pData->host);
- pData->f_addr = res;
- pData->pSockArray = net.create_udp_socket((uchar*)pData->host, NULL, 0, 0);
+ pWrkrData->f_addr = res;
+ pWrkrData->pSockArray = net.create_udp_socket((uchar*)pData->host, NULL, 0, 0);
finalize_it:
if(iRet != RS_RET_OK) {
- if(pData->f_addr != NULL) {
- freeaddrinfo(pData->f_addr);
- pData->f_addr = NULL;
+ if(pWrkrData->f_addr != NULL) {
+ freeaddrinfo(pWrkrData->f_addr);
+ pWrkrData->f_addr = NULL;
}
iRet = RS_RET_SUSPENDED;
}
@@ -577,7 +593,7 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
- iRet = doTryResume(pData);
+ iRet = doTryResume(pWrkrData);
ENDtryResume
BEGINdoAction
@@ -585,10 +601,10 @@ BEGINdoAction
unsigned l;
int iMaxLine;
CODESTARTdoAction
- CHKiRet(doTryResume(pData));
+ CHKiRet(doTryResume(pWrkrData));
- DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%.256s'\n", pData->host,
- getFwdPt(pData), ppString[1], ppString[0]);
+ DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%.256s'\n", pWrkrData->pData->host,
+ getFwdPt(pWrkrData->pData), ppString[1], ppString[0]);
iMaxLine = glbl.GetMaxLine();
psz = (char*) ppString[0];
@@ -596,7 +612,7 @@ CODESTARTdoAction
if((int) l > iMaxLine)
l = iMaxLine;
- CHKiRet(UDPSend(pData, ppString[1], psz, l));
+ CHKiRet(UDPSend(pWrkrData, ppString[1], psz, l));
finalize_it:
ENDdoAction
@@ -660,7 +676,6 @@ CODESTARTnewActInst
}
}
CODE_STD_STRING_REQUESTnewActInst(2)
- pData->sourcePort = pData->sourcePortStart;
tplToUse = ustrdup((pData->tplName == NULL) ? getDfltTpl() : pData->tplName);
CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_NO_RQD_TPL_OPTS));
@@ -699,7 +714,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(2)
else
CHKmalloc(pData->port = ustrdup(cs.pszTargetPort));
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(sourceTpl), OMSR_NO_RQD_TPL_OPTS));
- pData->sourcePort = pData->sourcePortStart = cs.iSourcePortStart;
+ pData->sourcePortStart = cs.iSourcePortStart;
pData->sourcePortEnd = cs.iSourcePortEnd;
/* process template */
@@ -740,6 +755,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
diff --git a/plugins/omuxsock/omuxsock.c b/plugins/omuxsock/omuxsock.c
index 583b9f94..da4e8e94 100644
--- a/plugins/omuxsock/omuxsock.c
+++ b/plugins/omuxsock/omuxsock.c
@@ -4,7 +4,7 @@
* NOTE: read comments in module-template.h to understand how this file
* works!
*
- * Copyright 2010-2012 Adiscon GmbH.
+ * Copyright 2010-2013 Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -60,10 +60,14 @@ typedef struct _instanceData {
permittedPeers_t *pPermPeers;
uchar *sockName;
int sock;
- int bIsConnected; /* are we connected to remote host? 0 - no, 1 - yes, UDP means addr resolved */
struct sockaddr_un addr;
} instanceData;
+
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+} wrkrInstanceData_t;
+
/* config data */
typedef struct configSettings_s {
uchar *tplName; /* name of the default template to use */
@@ -90,6 +94,7 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
+static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER;
BEGINinitConfVars /* (re)set config variables to default values */
CODESTARTinitConfVars
@@ -147,7 +152,6 @@ closeSocket(instanceData *pData)
close(pData->sock);
pData->sock = INVLD_SOCK;
}
-pData->bIsConnected = 0; // TODO: remove this variable altogether
RETiRet;
}
@@ -224,6 +228,10 @@ CODESTARTcreateInstance
pData->sock = INVLD_SOCK;
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ENDcreateWrkrInstance
+
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
@@ -239,6 +247,10 @@ CODESTARTfreeInstance
free(pData->sockName);
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ENDfreeWrkrInstance
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -332,7 +344,7 @@ static rsRetVal doTryResume(instanceData *pData)
BEGINtryResume
CODESTARTtryResume
- iRet = doTryResume(pData);
+ iRet = doTryResume(pWrkrData->pData);
ENDtryResume
BEGINdoAction
@@ -340,20 +352,22 @@ BEGINdoAction
register unsigned l;
int iMaxLine;
CODESTARTdoAction
- CHKiRet(doTryResume(pData));
+ pthread_mutex_lock(&mutDoAct);
+ CHKiRet(doTryResume(pWrkrData->pData));
iMaxLine = glbl.GetMaxLine();
- DBGPRINTF(" omuxsock:%s\n", pData->sockName);
+ DBGPRINTF(" omuxsock:%s\n", pWrkrData->pData->sockName);
psz = (char*) ppString[0];
l = strlen((char*) psz);
if((int) l > iMaxLine)
l = iMaxLine;
- CHKiRet(sendMsg(pData, psz, l));
+ CHKiRet(sendMsg(pWrkrData->pData, psz, l));
finalize_it:
+ pthread_mutex_unlock(&mutDoAct);
ENDdoAction
@@ -413,6 +427,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
ENDqueryEtryPt