diff options
-rw-r--r-- | ChangeLog | 4 | ||||
-rw-r--r-- | action.c | 7 | ||||
-rw-r--r-- | grammar/parserif.h | 1 | ||||
-rw-r--r-- | plugins/mmanon/mmanon.c | 3 | ||||
-rw-r--r-- | plugins/mmfields/mmfields.c | 15 | ||||
-rw-r--r-- | plugins/mmjsonparse/mmjsonparse.c | 45 | ||||
-rw-r--r-- | plugins/mmnormalize/mmnormalize.c | 21 | ||||
-rw-r--r-- | plugins/mmpstrucdata/mmpstrucdata.c | 15 | ||||
-rw-r--r-- | plugins/mmsequence/mmsequence.c | 34 | ||||
-rw-r--r-- | plugins/mmutf8fix/mmutf8fix.c | 21 | ||||
-rw-r--r-- | plugins/omhiredis/omhiredis.c | 5 | ||||
-rw-r--r-- | plugins/omlibdbi/omlibdbi.c | 29 | ||||
-rw-r--r-- | plugins/ommail/ommail.c | 191 | ||||
-rw-r--r-- | plugins/ommongodb/ommongodb.c | 25 | ||||
-rw-r--r-- | plugins/ompgsql/ompgsql.c | 35 | ||||
-rw-r--r-- | plugins/omprog/omprog.c | 48 | ||||
-rw-r--r-- | plugins/omrelp/omrelp.c | 135 | ||||
-rw-r--r-- | plugins/omsnmp/omsnmp.c | 81 | ||||
-rw-r--r-- | plugins/omudpspoof/omudpspoof.c | 130 | ||||
-rw-r--r-- | plugins/omuxsock/omuxsock.c | 29 | ||||
-rw-r--r-- | runtime/rsconf.c | 15 | ||||
-rw-r--r-- | runtime/rsyslog.h | 1 | ||||
-rw-r--r-- | template.c | 2 | ||||
-rw-r--r-- | tools/omfwd.c | 314 |
24 files changed, 750 insertions, 456 deletions
@@ -9,6 +9,8 @@ Version 7.5.7 [devel] 2013-11-?? - bugfix: ommysql lost configfile/section parameters after first close This means that when a connection was broken, it was probably re-instantiated with different parameters than configured. +- bugfix: regression in template processing with subtrees in templates + Thanks to Pavel Levshin for the fix - bugfix: regular worker threads are not properly (re)started if DA mode is active. This occurs only under rare conditions, but definitely is a bug that @@ -22,6 +24,8 @@ Version 7.5.7 [devel] 2013-11-?? previous implementation. This will also improve performance and/or lower system overhead on busy systems. Thanks to Pavel Levshin for the enhancement. +- now emit warning message if om with msg passing mode uses action queue + These can modify the message, and this causes races. - improved checking of queue config parameters on startup - bugfix: call to ruleset with async queue did not use the queue closes: http://bugzilla.adiscon.com/show_bug.cgi?id=443 @@ -108,6 +108,7 @@ #include "unicode-helper.h" #include "atomic.h" #include "ruleset.h" +#include "parserif.h" #include "statsobj.h" #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ @@ -467,6 +468,12 @@ actionConstructFinalize(action_t *pThis, struct nvlst *lst) qqueueDbgPrint(pThis->pQueue); DBGPRINTF("Action %p: queue %p created\n", pThis, pThis->pQueue); + + if(pThis->eParamPassing == ACT_MSG_PASSING && pThis->pQueue->qType != QUEUETYPE_DIRECT) { + parser_warnmsg("module %s with message passing mode uses " + "non-direct queue. This most probably leads to undesired " + "results", (char*)modGetName(pThis->pMod)); + } /* and now reset the queue params (see comment in its function header!) */ actionResetQueueParams(); diff --git a/grammar/parserif.h b/grammar/parserif.h index 6c2c1365..21da0950 100644 --- a/grammar/parserif.h +++ b/grammar/parserif.h @@ -6,6 +6,7 @@ int yyparse(); char *cnfcurrfn; void dbgprintf(char *fmt, ...) __attribute__((format(printf, 1, 2))); void parser_errmsg(char *fmt, ...) __attribute__((format(printf, 1, 2))); +void parser_warnmsg(char *fmt, ...) __attribute__((format(printf, 1, 2))); void tellLexEndParsing(void); extern int yydebug; extern int yylineno; diff --git a/plugins/mmanon/mmanon.c b/plugins/mmanon/mmanon.c index c714706a..28797807 100644 --- a/plugins/mmanon/mmanon.c +++ b/plugins/mmanon/mmanon.c @@ -367,12 +367,11 @@ BEGINdoAction int lenMsg; int i; CODESTARTdoAction - pData = pWrkrData->pData; pMsg = (msg_t*) ppString[0]; lenMsg = getMSGLen(pMsg); msg = getMSG(pMsg); for(i = 0 ; i < lenMsg ; ++i) { - anonip(pData, msg, &lenMsg, &i); + anonip(pWrkrData->pData, msg, &lenMsg, &i); } if(lenMsg != getMSGLen(pMsg)) setMSGLen(pMsg, lenMsg); diff --git a/plugins/mmfields/mmfields.c b/plugins/mmfields/mmfields.c index fa7fa100..c408a6c9 100644 --- a/plugins/mmfields/mmfields.c +++ b/plugins/mmfields/mmfields.c @@ -56,6 +56,10 @@ typedef struct _instanceData { uchar *jsonRoot; /**< container where to store fields */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -103,6 +107,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -114,6 +122,10 @@ CODESTARTfreeInstance free(pData->jsonRoot); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + static inline void setInstParamDefaults(instanceData *pData) @@ -232,7 +244,7 @@ CODESTARTdoAction pMsg = (msg_t*) ppString[0]; lenMsg = getMSGLen(pMsg); msg = getMSG(pMsg); - CHKiRet(parse_fields(pData, pMsg, msg, lenMsg)); + CHKiRet(parse_fields(pWrkrData->pData, pMsg, msg, lenMsg)); finalize_it: ENDdoAction @@ -259,6 +271,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/mmjsonparse/mmjsonparse.c b/plugins/mmjsonparse/mmjsonparse.c index b16aef0e..9c0ab88f 100644 --- a/plugins/mmjsonparse/mmjsonparse.c +++ b/plugins/mmjsonparse/mmjsonparse.c @@ -58,9 +58,15 @@ DEFobjCurrIf(errmsg); DEF_OMOD_STATIC_DATA typedef struct _instanceData { - struct json_tokener *tokener; + int dummy; /* not needed, but some compilers do not support empty structs */ + /* REMOVE dummy when real data items are to be added! */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + struct json_tokener *tokener; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -94,14 +100,18 @@ ENDfreeCnf BEGINcreateInstance CODESTARTcreateInstance - pData->tokener = json_tokener_new(); - if(pData->tokener == NULL) { +ENDcreateInstance + +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->tokener = json_tokener_new(); + if(pWrkrData->tokener == NULL) { errmsg.LogError(0, RS_RET_ERR, "error: could not create json " - "tokener, cannot activate action"); + "tokener, cannot activate instance"); ABORT_FINALIZE(RS_RET_ERR); } finalize_it: -ENDcreateInstance +ENDcreateWrkrInstance BEGINisCompatibleWithFeature @@ -111,10 +121,14 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - if(pData->tokener != NULL) - json_tokener_free(pData->tokener); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + if(pWrkrData->tokener != NULL) + json_tokener_free(pWrkrData->tokener); +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -128,28 +142,28 @@ ENDtryResume static rsRetVal -processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf) +processJSON(wrkrInstanceData_t *pWrkrData, msg_t *pMsg, char *buf, size_t lenBuf) { struct json_object *json; const char *errMsg; DEFiRet; - assert(pData->tokener != NULL); + assert(pWrkrData->tokener != NULL); DBGPRINTF("mmjsonparse: toParse: '%s'\n", buf); - json_tokener_reset(pData->tokener); + json_tokener_reset(pWrkrData->tokener); - json = json_tokener_parse_ex(pData->tokener, buf, lenBuf); + json = json_tokener_parse_ex(pWrkrData->tokener, buf, lenBuf); if(Debug) { errMsg = NULL; if(json == NULL) { enum json_tokener_error err; - err = pData->tokener->err; + err = pWrkrData->tokener->err; if(err != json_tokener_continue) errMsg = json_tokener_errors[err]; else errMsg = "Unterminated input"; - } else if((size_t)pData->tokener->char_offset < lenBuf) + } else if((size_t)pWrkrData->tokener->char_offset < lenBuf) errMsg = "Extra characters after JSON object"; else if(!json_object_is_type(json, json_type_object)) errMsg = "JSON value is not an object"; @@ -159,7 +173,7 @@ processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf) } } if(json == NULL - || ((size_t)pData->tokener->char_offset < lenBuf) + || ((size_t)pWrkrData->tokener->char_offset < lenBuf) || (!json_object_is_type(json, json_type_object))) { ABORT_FINALIZE(RS_RET_NO_CEE_MSG); } @@ -194,7 +208,7 @@ CODESTARTdoAction ABORT_FINALIZE(RS_RET_NO_CEE_MSG); } buf += LEN_COOKIE; - CHKiRet(processJSON(pData, pMsg, (char*) buf, strlen((char*)buf))); + CHKiRet(processJSON(pWrkrData, pMsg, (char*) buf, strlen((char*)buf))); bSuccess = 1; finalize_it: if(iRet == RS_RET_NO_CEE_MSG) { @@ -257,6 +271,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/mmnormalize/mmnormalize.c b/plugins/mmnormalize/mmnormalize.c index 7e25824a..f82836b1 100644 --- a/plugins/mmnormalize/mmnormalize.c +++ b/plugins/mmnormalize/mmnormalize.c @@ -9,7 +9,7 @@ * * File begun on 2010-01-01 by RGerhards * - * Copyright 2010-2012 Rainer Gerhards and Adiscon GmbH. + * Copyright 2010-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -70,6 +70,10 @@ typedef struct _instanceData { ee_ctx ctxee; /**< context to be used for libee */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { uchar *rulebase; /**< name of normalization rulebase to use */ int bUseRawMsg; /**< use %rawmsg% instead of %msg% */ @@ -139,6 +143,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; @@ -181,6 +190,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("mmnormalize\n"); @@ -207,14 +221,14 @@ CODESTARTdoAction * requires changes to the libraries. For now, we accept message * duplication. -- rgerhards, 2010-12-01 */ - if(pData->bUseRawMsg) { + if(pWrkrData->pData->bUseRawMsg) { getRawMsg(pMsg, &buf, &len); } else { buf = getMSG(pMsg); len = getMSGLen(pMsg); } str = es_newStrFromCStr((char*)buf, len); - r = ln_normalize(pData->ctxln, str, &event); + r = ln_normalize(pWrkrData->pData->ctxln, str, &event); if(r != 0) { DBGPRINTF("error %d during ln_normalize\n", r); MsgSetParseSuccess(pMsg, 0); @@ -338,6 +352,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/mmpstrucdata/mmpstrucdata.c b/plugins/mmpstrucdata/mmpstrucdata.c index 4b2a985b..e9e36b27 100644 --- a/plugins/mmpstrucdata/mmpstrucdata.c +++ b/plugins/mmpstrucdata/mmpstrucdata.c @@ -52,6 +52,10 @@ typedef struct _instanceData { uchar *jsonRoot; /**< container where to store fields */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -98,6 +102,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -109,6 +117,10 @@ CODESTARTfreeInstance free(pData->jsonRoot); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + static inline void setInstParamDefaults(instanceData *pData) @@ -357,7 +369,7 @@ dbgprintf("DDDD: parse mmpstrucdata\n"); /* don't check return code - we never want rsyslog to retry * or suspend this action! */ - parse_sd(pData, pMsg); + parse_sd(pWrkrData->pData, pMsg); dbgprintf("DDDD: done parse mmpstrucdata\n"); finalize_it: ENDdoAction @@ -385,6 +397,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/mmsequence/mmsequence.c b/plugins/mmsequence/mmsequence.c index 20a85370..609338c2 100644 --- a/plugins/mmsequence/mmsequence.c +++ b/plugins/mmsequence/mmsequence.c @@ -74,6 +74,10 @@ typedef struct _instanceData { char *pszVar; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -100,6 +104,8 @@ static struct cnfparamblk actpblk = /* table for key-counter pairs */ static struct hashtable *ght; static pthread_mutex_t ght_mutex = PTHREAD_MUTEX_INITIALIZER; + +static pthread_mutex_t inst_mutex = PTHREAD_MUTEX_INITIALIZER; BEGINbeginCnfLoad CODESTARTbeginCnfLoad @@ -129,6 +135,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -139,6 +149,10 @@ BEGINfreeInstance CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + static inline void setInstParamDefaults(instanceData *pData) @@ -243,7 +257,7 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_ERR); } } - pthread_mutex_unlock(&ght_mutex); + pthread_mutex_unlock(&ght_mutex); break; default: errmsg.LogError(0, RS_RET_INVLD_MODE, @@ -303,7 +317,9 @@ BEGINdoAction struct json_object *json; int val = 0; int *pCounter; + instanceData *pData; CODESTARTdoAction + pData = pWrkrData->pData; pMsg = (msg_t*) ppString[0]; switch(pData->mode) { @@ -312,12 +328,19 @@ CODESTARTdoAction (pData->valueTo - pData->valueFrom)); break; case mmSequencePerInstance: - if (pData->value >= pData->valueTo - pData->step) { - pData->value = pData->valueFrom; + if (!pthread_mutex_lock(&inst_mutex)) { + pCounter = getCounter(ght, pData->pszKey, pData->valueTo); + if (pData->value >= pData->valueTo - pData->step) { + pData->value = pData->valueFrom; + } else { + pData->value += pData->step; + } + val = pData->value; + pthread_mutex_unlock(&inst_mutex); } else { - pData->value += pData->step; + errmsg.LogError(0, RS_RET_ERR, + "mmsequence: mutex lock has failed!"); } - val = pData->value; break; case mmSequencePerKey: if (!pthread_mutex_lock(&ght_mutex)) { @@ -381,6 +404,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/mmutf8fix/mmutf8fix.c b/plugins/mmutf8fix/mmutf8fix.c index e2077950..351bb129 100644 --- a/plugins/mmutf8fix/mmutf8fix.c +++ b/plugins/mmutf8fix/mmutf8fix.c @@ -60,6 +60,10 @@ typedef struct _instanceData { uint8_t mode; /* operations mode */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -108,6 +112,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature ENDisCompatibleWithFeature @@ -118,6 +127,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + static inline void setInstParamDefaults(instanceData *pData) { @@ -274,10 +288,10 @@ CODESTARTdoAction pMsg = (msg_t*) ppString[0]; lenMsg = getMSGLen(pMsg); msg = getMSG(pMsg); - if(pData->mode == MODE_CC) { - doCC(pData, msg, lenMsg); + if(pWrkrData->pData->mode == MODE_CC) { + doCC(pWrkrData->pData, msg, lenMsg); } else { - doUTF8(pData, msg, lenMsg); + doUTF8(pWrkrData->pData, msg, lenMsg); } ENDdoAction @@ -304,6 +318,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/omhiredis/omhiredis.c b/plugins/omhiredis/omhiredis.c index 7a35bac2..757d5eb2 100644 --- a/plugins/omhiredis/omhiredis.c +++ b/plugins/omhiredis/omhiredis.c @@ -97,7 +97,6 @@ BEGINfreeInstance CODESTARTfreeInstance closeHiredis(pData); free(pData->server); - free(pData->tplName); ENDfreeInstance @@ -196,9 +195,9 @@ CODESTARTendTransaction for ( i = 0; i < pData->count; i++ ) { redisGetReply ( pData->conn, (void *)&pData->replies[i] ); /* TODO: add error checking here! */ - free ( pData->replies[i] ); + freeReplyObject ( pData->replies[i] ); } - freeReplyObject ( pData->replies ); + free ( pData->replies ); pData->count = 0; ENDendTransaction diff --git a/plugins/omlibdbi/omlibdbi.c b/plugins/omlibdbi/omlibdbi.c index 3beba4f0..c203b4aa 100644 --- a/plugins/omlibdbi/omlibdbi.c +++ b/plugins/omlibdbi/omlibdbi.c @@ -50,6 +50,9 @@ #include "errmsg.h" #include "conf.h" +#undef HAVE_DBI_TXSUPP +#warning transaction support disabled in v8 -- TODO: reenable + MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP MODULE_CNFNAME("omlibdbi") @@ -73,6 +76,10 @@ typedef struct _instanceData { int txSupport; /* transaction support */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { uchar *dbiDrvrDir; /* global: where do the dbi drivers reside? */ uchar *drvrName; /* driver to use */ @@ -94,6 +101,8 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; + /* tables for interfacing with the v6 config system */ /* module-global parameters */ @@ -157,6 +166,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -187,6 +200,9 @@ CODESTARTfreeInstance free(pData->dbName); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -326,16 +342,16 @@ finalize_it: BEGINtryResume CODESTARTtryResume - if(pData->conn == NULL) { - iRet = initConn(pData, 1); + if(pWrkrData->pData->conn == NULL) { + iRet = initConn(pWrkrData->pData, 1); } ENDtryResume /* transaction support 2013-03 */ BEGINbeginTransaction CODESTARTbeginTransaction - if(pData->conn == NULL) { - CHKiRet(initConn(pData, 0)); + if(pWrkrData->pData->conn == NULL) { + CHKiRet(initConn(pWrkrData->pData, 0)); } # if HAVE_DBI_TXSUPP if (pData->txSupport == 1) { @@ -355,13 +371,15 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction - CHKiRet(writeDB(ppString[0], pData)); + pthread_mutex_lock(&mutDoAct); + CHKiRet(writeDB(ppString[0], pWrkrData->pData)); # if HAVE_DBI_TXSUPP if (pData->txSupport == 1) { iRet = RS_RET_DEFER_COMMIT; } # endif finalize_it: + pthread_mutex_unlock(&mutDoAct); ENDdoAction /* transaction support 2013-03 */ @@ -552,6 +570,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES diff --git a/plugins/ommail/ommail.c b/plugins/ommail/ommail.c index 0a781e10..910b371a 100644 --- a/plugins/ommail/ommail.c +++ b/plugins/ommail/ommail.c @@ -13,7 +13,7 @@ * * File begun on 2008-04-04 by RGerhards * - * Copyright 2008-2012 Adiscon GmbH. + * Copyright 2008-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -82,13 +82,21 @@ typedef struct _instanceData { uchar *pszSrvPort; uchar *pszFrom; toRcpt_t *lstRcpt; + } smtp; + } md; /* mode-specific data */ +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; + union { + struct { char RcvBuf[1024]; /* buffer for receiving server responses */ size_t lenRcvBuf; size_t iRcvBuf; /* current index into the rcvBuf (buf empty if iRcvBuf == lenRcvBuf) */ int sock; /* socket to this server (most important when we do multiple msgs per mail) */ } smtp; } md; /* mode-specific data */ -} instanceData; +} wrkrInstanceData_t; typedef struct configSettings_s { toRcpt_t *lstRcpt; @@ -112,7 +120,7 @@ ENDinitConfVars /* forward definitions (as few as possible) */ static rsRetVal Send(int sock, char *msg, size_t len); -static rsRetVal readResponse(instanceData *pData, int *piState, int iExpected); +static rsRetVal readResponse(wrkrInstanceData_t *pWrkrData, int *piState, int iExpected); /* helpers for handling the recipient lists */ @@ -163,24 +171,22 @@ finalize_it: * iStatusToCheck < 0 means no checking should happen */ static rsRetVal -WriteRcpts(instanceData *pData, uchar *pszOp, size_t lenOp, int iStatusToCheck) +WriteRcpts(wrkrInstanceData_t *pWrkrData, uchar *pszOp, size_t lenOp, int iStatusToCheck) { toRcpt_t *pRcpt; int iState; DEFiRet; - assert(pData != NULL); - assert(pszOp != NULL); assert(lenOp != 0); - for(pRcpt = pData->md.smtp.lstRcpt ; pRcpt != NULL ; pRcpt = pRcpt->pNext) { + for(pRcpt = pWrkrData->pData->md.smtp.lstRcpt ; pRcpt != NULL ; pRcpt = pRcpt->pNext) { dbgprintf("Sending '%s: <%s>'\n", pszOp, pRcpt->pszTo); - CHKiRet(Send(pData->md.smtp.sock, (char*)pszOp, lenOp)); - CHKiRet(Send(pData->md.smtp.sock, ":<", sizeof(":<") - 1)); - CHKiRet(Send(pData->md.smtp.sock, (char*)pRcpt->pszTo, strlen((char*)pRcpt->pszTo))); - CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pszOp, lenOp)); + CHKiRet(Send(pWrkrData->md.smtp.sock, ":<", sizeof(":<") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pRcpt->pszTo, strlen((char*)pRcpt->pszTo))); + CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); if(iStatusToCheck >= 0) - CHKiRet(readResponse(pData, &iState, iStatusToCheck)); + CHKiRet(readResponse(pWrkrData, &iState, iStatusToCheck)); } finalize_it: @@ -193,6 +199,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -203,17 +214,19 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance if(pData->iMode == 0) { - if(pData->md.smtp.pszSrv != NULL) - free(pData->md.smtp.pszSrv); - if(pData->md.smtp.pszSrvPort != NULL) - free(pData->md.smtp.pszSrvPort); - if(pData->md.smtp.pszFrom != NULL) - free(pData->md.smtp.pszFrom); + free(pData->md.smtp.pszSrv); + free(pData->md.smtp.pszSrvPort); + free(pData->md.smtp.pszFrom); lstRcptDestruct(pData->md.smtp.lstRcpt); } ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo printf("mail"); /* TODO: extend! */ @@ -229,16 +242,16 @@ ENDdbgPrintInstInfo * rgerhards, 2008-04-04 */ static rsRetVal -getRcvChar(instanceData *pData, char *pC) +getRcvChar(wrkrInstanceData_t *pWrkrData, char *pC) { DEFiRet; ssize_t lenBuf; - assert(pData != NULL); - if(pData->md.smtp.iRcvBuf == pData->md.smtp.lenRcvBuf) { /* buffer empty? */ + if(pWrkrData->md.smtp.iRcvBuf == pWrkrData->md.smtp.lenRcvBuf) { /* buffer empty? */ /* yes, we need to read the next server response */ do { - lenBuf = recv(pData->md.smtp.sock, pData->md.smtp.RcvBuf, sizeof(pData->md.smtp.RcvBuf), 0); + lenBuf = recv(pWrkrData->md.smtp.sock, pWrkrData->md.smtp.RcvBuf, + sizeof(pWrkrData->md.smtp.RcvBuf), 0); if(lenBuf == 0) { ABORT_FINALIZE(RS_RET_NO_MORE_DATA); } else if(lenBuf < 0) { @@ -247,15 +260,15 @@ getRcvChar(instanceData *pData, char *pC) } } else { /* good read */ - pData->md.smtp.iRcvBuf = 0; - pData->md.smtp.lenRcvBuf = lenBuf; + pWrkrData->md.smtp.iRcvBuf = 0; + pWrkrData->md.smtp.lenRcvBuf = lenBuf; } } while(lenBuf < 1); } /* when we reach this point, we have a non-empty buffer */ - *pC = pData->md.smtp.RcvBuf[pData->md.smtp.iRcvBuf++]; + *pC = pWrkrData->md.smtp.RcvBuf[pWrkrData->md.smtp.iRcvBuf++]; finalize_it: RETiRet; @@ -266,14 +279,14 @@ finalize_it: * rgerhards, 2008-04-08 */ static rsRetVal -serverDisconnect(instanceData *pData) +serverDisconnect(wrkrInstanceData_t *pWrkrData) { DEFiRet; - assert(pData != NULL); + assert(pWrkrData != NULL); - if(pData->md.smtp.sock != -1) { - close(pData->md.smtp.sock); - pData->md.smtp.sock = -1; + if(pWrkrData->md.smtp.sock != -1) { + close(pWrkrData->md.smtp.sock); + pWrkrData->md.smtp.sock = -1; } RETiRet; @@ -284,16 +297,17 @@ serverDisconnect(instanceData *pData) * rgerhards, 2008-04-04 */ static rsRetVal -serverConnect(instanceData *pData) +serverConnect(wrkrInstanceData_t *pWrkrData) { struct addrinfo *res = NULL; struct addrinfo hints; char *smtpPort; char *smtpSrv; char errStr[1024]; - + instanceData *pData; DEFiRet; - assert(pData != NULL); + + pData = pWrkrData->pData; if(pData->md.smtp.pszSrv == NULL) smtpSrv = "127.0.0.1"; @@ -313,12 +327,12 @@ serverConnect(instanceData *pData) ABORT_FINALIZE(RS_RET_IO_ERROR); } - if((pData->md.smtp.sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) == -1) { + if((pWrkrData->md.smtp.sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) == -1) { dbgprintf("couldn't create send socket, reason %s", rs_strerror_r(errno, errStr, sizeof(errStr))); ABORT_FINALIZE(RS_RET_IO_ERROR); } - if(connect(pData->md.smtp.sock, res->ai_addr, res->ai_addrlen) != 0) { + if(connect(pWrkrData->md.smtp.sock, res->ai_addr, res->ai_addrlen) != 0) { dbgprintf("create tcp connection failed, reason %s", rs_strerror_r(errno, errStr, sizeof(errStr))); ABORT_FINALIZE(RS_RET_IO_ERROR); } @@ -328,9 +342,9 @@ finalize_it: freeaddrinfo(res); if(iRet != RS_RET_OK) { - if(pData->md.smtp.sock != -1) { - close(pData->md.smtp.sock); - pData->md.smtp.sock = -1; + if(pWrkrData->md.smtp.sock != -1) { + close(pWrkrData->md.smtp.sock); + pWrkrData->md.smtp.sock = -1; } } @@ -374,7 +388,7 @@ finalize_it: * The body is special in that we must escape a leading dot inside a line */ static rsRetVal -bodySend(instanceData *pData, char *msg, size_t len) +bodySend(wrkrInstanceData_t *pWrkrData, char *msg, size_t len) { DEFiRet; char szBuf[2048]; @@ -383,12 +397,12 @@ bodySend(instanceData *pData, char *msg, size_t len) int bHadCR = 0; int bInStartOfLine = 1; - assert(pData != NULL); + assert(pWrkrData != NULL); assert(msg != NULL); for(iSrc = 0 ; iSrc < len ; ++iSrc) { if(iBuf >= sizeof(szBuf) - 1) { /* one is reserved for our extra dot */ - CHKiRet(Send(pData->md.smtp.sock, szBuf, iBuf)); + CHKiRet(Send(pWrkrData->md.smtp.sock, szBuf, iBuf)); iBuf = 0; } szBuf[iBuf++] = msg[iSrc]; @@ -413,7 +427,7 @@ bodySend(instanceData *pData, char *msg, size_t len) } if(iBuf > 0) { /* incomplete buffer to send (the *usual* case)? */ - CHKiRet(Send(pData->md.smtp.sock, szBuf, iBuf)); + CHKiRet(Send(pWrkrData->md.smtp.sock, szBuf, iBuf)); } finalize_it: @@ -424,17 +438,17 @@ finalize_it: /* read response line from server */ static rsRetVal -readResponseLn(instanceData *pData, char *pLn, size_t lenLn) +readResponseLn(wrkrInstanceData_t *pWrkrData, char *pLn, size_t lenLn) { DEFiRet; size_t i = 0; char c; - assert(pData != NULL); + assert(pWrkrData != NULL); assert(pLn != NULL); do { - CHKiRet(getRcvChar(pData, &c)); + CHKiRet(getRcvChar(pWrkrData, &c)); if(c == '\n') break; if(i < (lenLn - 1)) /* if line is too long, we simply discard the rest */ @@ -453,18 +467,18 @@ finalize_it: * rgerhards, 2008-04-07 */ static rsRetVal -readResponse(instanceData *pData, int *piState, int iExpected) +readResponse(wrkrInstanceData_t *pWrkrData, int *piState, int iExpected) { DEFiRet; int bCont; char buf[128]; - assert(pData != NULL); + assert(pWrkrData != NULL); assert(piState != NULL); bCont = 1; do { - CHKiRet(readResponseLn(pData, buf, sizeof(buf))); + CHKiRet(readResponseLn(pWrkrData, buf, sizeof(buf))); /* note: the code below is not 100% clean as we may have received less than 4 characters. * However, as we have a fixed size this will not create a vulnerability. An error will * also most likely be generated, so it is quite acceptable IMHO -- rgerhards, 2008-04-08 @@ -506,64 +520,65 @@ mkSMTPTimestamp(uchar *pszBuf, size_t lenBuf) * rgerhards, 2008-04-04 */ static rsRetVal -sendSMTP(instanceData *pData, uchar *body, uchar *subject) +sendSMTP(wrkrInstanceData_t *pWrkrData, uchar *body, uchar *subject) { DEFiRet; int iState; /* SMTP state */ + instanceData *pData; uchar szDateBuf[64]; - assert(pData != NULL); + pData = pWrkrData->pData; - CHKiRet(serverConnect(pData)); - CHKiRet(readResponse(pData, &iState, 220)); + CHKiRet(serverConnect(pWrkrData)); + CHKiRet(readResponse(pWrkrData, &iState, 220)); - CHKiRet(Send(pData->md.smtp.sock, "HELO ", 5)); - CHKiRet(Send(pData->md.smtp.sock, (char*)glbl.GetLocalHostName(), strlen((char*)glbl.GetLocalHostName()))); - CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); - CHKiRet(readResponse(pData, &iState, 250)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "HELO ", 5)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)glbl.GetLocalHostName(), strlen((char*)glbl.GetLocalHostName()))); + CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); + CHKiRet(readResponse(pWrkrData, &iState, 250)); - CHKiRet(Send(pData->md.smtp.sock, "MAIL FROM:<", sizeof("MAIL FROM:<") - 1)); - CHKiRet(Send(pData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom))); - CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); - CHKiRet(readResponse(pData, &iState, 250)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "MAIL FROM:<", sizeof("MAIL FROM:<") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom))); + CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); + CHKiRet(readResponse(pWrkrData, &iState, 250)); - CHKiRet(WriteRcpts(pData, (uchar*)"RCPT TO", sizeof("RCPT TO") - 1, 250)); + CHKiRet(WriteRcpts(pWrkrData, (uchar*)"RCPT TO", sizeof("RCPT TO") - 1, 250)); - CHKiRet(Send(pData->md.smtp.sock, "DATA\r\n", sizeof("DATA\r\n") - 1)); - CHKiRet(readResponse(pData, &iState, 354)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "DATA\r\n", sizeof("DATA\r\n") - 1)); + CHKiRet(readResponse(pWrkrData, &iState, 354)); /* now come the data part */ /* header */ mkSMTPTimestamp(szDateBuf, sizeof(szDateBuf)); - CHKiRet(Send(pData->md.smtp.sock, (char*)szDateBuf, strlen((char*)szDateBuf))); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)szDateBuf, strlen((char*)szDateBuf))); - CHKiRet(Send(pData->md.smtp.sock, "From: <", sizeof("From: <") - 1)); - CHKiRet(Send(pData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom))); - CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "From: <", sizeof("From: <") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom))); + CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); - CHKiRet(WriteRcpts(pData, (uchar*)"To", sizeof("To") - 1, -1)); + CHKiRet(WriteRcpts(pWrkrData, (uchar*)"To", sizeof("To") - 1, -1)); - CHKiRet(Send(pData->md.smtp.sock, "Subject: ", sizeof("Subject: ") - 1)); - CHKiRet(Send(pData->md.smtp.sock, (char*)subject, strlen((char*)subject))); - CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "Subject: ", sizeof("Subject: ") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)subject, strlen((char*)subject))); + CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); - CHKiRet(Send(pData->md.smtp.sock, "X-Mailer: rsyslog-immail\r\n", sizeof("x-mailer: rsyslog-immail\r\n") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "X-Mailer: rsyslog-immail\r\n", sizeof("x-mailer: rsyslog-immail\r\n") - 1)); - CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); /* indicate end of header */ + CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); /* indicate end of header */ /* body */ if(pData->bEnableBody) - CHKiRet(bodySend(pData, (char*)body, strlen((char*) body))); + CHKiRet(bodySend(pWrkrData, (char*)body, strlen((char*) body))); /* end of data, back to envelope transaction */ - CHKiRet(Send(pData->md.smtp.sock, "\r\n.\r\n", sizeof("\r\n.\r\n") - 1)); - CHKiRet(readResponse(pData, &iState, 250)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n.\r\n", sizeof("\r\n.\r\n") - 1)); + CHKiRet(readResponse(pWrkrData, &iState, 250)); - CHKiRet(Send(pData->md.smtp.sock, "QUIT\r\n", sizeof("QUIT\r\n") - 1)); - CHKiRet(readResponse(pData, &iState, 221)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "QUIT\r\n", sizeof("QUIT\r\n") - 1)); + CHKiRet(readResponse(pWrkrData, &iState, 221)); /* we are finished, a new connection is created for each request, so let's close it now */ - CHKiRet(serverDisconnect(pData)); + CHKiRet(serverDisconnect(pWrkrData)); finalize_it: RETiRet; @@ -583,8 +598,8 @@ finalize_it: */ BEGINtryResume CODESTARTtryResume - CHKiRet(serverConnect(pData)); - CHKiRet(serverDisconnect(pData)); /* if we fail, we will never reach this line */ + CHKiRet(serverConnect(pWrkrData)); + CHKiRet(serverDisconnect(pWrkrData)); /* if we fail, we will never reach this line */ finalize_it: if(iRet == RS_RET_IO_ERROR) iRet = RS_RET_SUSPENDED; @@ -593,17 +608,14 @@ ENDtryResume BEGINdoAction CODESTARTdoAction - dbgprintf(" Mail\n"); + DBGPRINTF(" Mail\n"); - /* forward */ - if(pData->bHaveSubject) - iRet = sendSMTP(pData, ppString[0], ppString[1]); - else - iRet = sendSMTP(pData, ppString[0], (uchar*)"message from rsyslog"); + iRet = sendSMTP(pWrkrData, ppString[0], + (pWrkrData->pData->bHaveSubject) ? + ppString[1] : (uchar*)"message from rsyslog"); if(iRet != RS_RET_OK) { - /* error! */ - dbgprintf("error sending mail, suspending\n"); + DBGPRINTF("error sending mail, suspending\n"); iRet = RS_RET_SUSPENDED; } ENDdoAction @@ -689,6 +701,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES ENDqueryEtryPt diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c index af1f5a37..09f19768 100644 --- a/plugins/ommongodb/ommongodb.c +++ b/plugins/ommongodb/ommongodb.c @@ -4,7 +4,7 @@ * mongodb C interface is crap. Obtain the library here: * https://github.com/algernon/libmongo-client * - * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -71,6 +71,10 @@ typedef struct _instanceData { int bErrMsgPermitted; /* only one errmsg permitted per connection */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* tables for interfacing with the v6 config system */ /* action (instance) parameters */ @@ -89,10 +93,16 @@ static struct cnfparamblk actpblk = actpdescr }; +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; + BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature /* use this to specify if select features are supported by this @@ -126,6 +136,10 @@ CODESTARTfreeInstance free(pData->tplName); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -422,14 +436,17 @@ error: BEGINtryResume CODESTARTtryResume - if(pData->conn == NULL) { - iRet = initMongoDB(pData, 1); + if(pWrkrData->pData->conn == NULL) { + iRet = initMongoDB(pWrkrData->pData, 1); } ENDtryResume BEGINdoAction bson *doc = NULL; + instanceData *pData; CODESTARTdoAction + pthread_mutex_lock(&mutDoAct); + pData = pWrkrData->pData; /* see if we are ready to proceed */ if(pData->conn == NULL) { CHKiRet(initMongoDB(pData, 0)); @@ -454,6 +471,7 @@ CODESTARTdoAction } finalize_it: + pthread_mutex_unlock(&mutDoAct); if(doc != NULL) bson_free(doc); ENDdoAction @@ -560,6 +578,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/ompgsql/ompgsql.c b/plugins/ompgsql/ompgsql.c index 11f346f6..87599484 100644 --- a/plugins/ompgsql/ompgsql.c +++ b/plugins/ompgsql/ompgsql.c @@ -6,7 +6,7 @@ * * File begun on 2007-10-18 by sur5r (converted from ommysql.c) * - * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007, 2013 Rainer Gerhards and Adiscon GmbH. * * The following link my be useful for the not-so-postgres literate * when setting up a test environment (on Fedora): @@ -66,11 +66,17 @@ typedef struct _instanceData { ConnStatusType eLastPgSQLStatus; /* last status from postgres */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { EMPTY_STRUCT } configSettings_t; static configSettings_t __attribute__((unused)) cs; +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; + BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars ENDinitConfVars @@ -82,6 +88,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -108,6 +118,9 @@ CODESTARTfreeInstance closePgSQL(pData); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -244,8 +257,8 @@ finalize_it: BEGINtryResume CODESTARTtryResume - if(pData->f_hpgsql == NULL) { - iRet = initPgSQL(pData, 1); + if(pWrkrData->pData->f_hpgsql == NULL) { + iRet = initPgSQL(pWrkrData->pData, 1); if(iRet == RS_RET_OK) { /* the code above seems not to actually connect to the database. As such, we do a * dummy statement (a pointless select...) to verify the connection and return @@ -253,7 +266,7 @@ CODESTARTtryResume * PostgreSQL expert, so any patch that does the desired result in a more * intelligent way is highly welcome. -- rgerhards, 2009-12-16 */ - iRet = writePgSQL((uchar*)"select 'a' as a", pData); + iRet = writePgSQL((uchar*)"select 'a' as a", pWrkrData->pData); } } @@ -263,23 +276,25 @@ ENDtryResume BEGINbeginTransaction CODESTARTbeginTransaction dbgprintf("ompgsql: beginTransaction\n"); - iRet = writePgSQL((uchar*) "begin", pData); /* TODO: make user-configurable */ + iRet = writePgSQL((uchar*) "begin", pWrkrData->pData); /* TODO: make user-configurable */ ENDbeginTransaction BEGINdoAction CODESTARTdoAction + pthread_mutex_lock(&mutDoAct); dbgprintf("\n"); - CHKiRet(writePgSQL(ppString[0], pData)); + CHKiRet(writePgSQL(ppString[0], pWrkrData->pData)); if(bCoreSupportsBatching) iRet = RS_RET_DEFER_COMMIT; finalize_it: + pthread_mutex_unlock(&mutDoAct); ENDdoAction BEGINendTransaction CODESTARTendTransaction - iRet = writePgSQL((uchar*) "commit;", pData); /* TODO: make user-configurable */ + iRet = writePgSQL((uchar*) "commit;", pWrkrData->pData); /* TODO: make user-configurable */ dbgprintf("ompgsql: endTransaction\n"); ENDendTransaction @@ -361,6 +376,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt @@ -372,6 +388,11 @@ INITLegCnfVars CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); + +# warning: transaction support missing for v8 + bCoreSupportsBatching= 0; + DBGPRINTF("ompgsql: transactions are not yet supported on v8\n"); + DBGPRINTF("ompgsql: module compiled with rsyslog version %s.\n", VERSION); DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not "); ENDmodInit diff --git a/plugins/omprog/omprog.c b/plugins/omprog/omprog.c index cd07dcfb..1060b5c3 100644 --- a/plugins/omprog/omprog.c +++ b/plugins/omprog/omprog.c @@ -6,7 +6,7 @@ * * File begun on 2009-04-01 by RGerhards * - * Copyright 2009-2012 Adiscon GmbH. + * Copyright 2009-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -35,6 +35,7 @@ #include <errno.h> #include <unistd.h> #include <wait.h> +#include <pthread.h> #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" @@ -60,8 +61,13 @@ typedef struct _instanceData { int fdPipe; /* file descriptor to write to */ int bIsRunning; /* is binary currently running? 0-no, 1-yes */ int iParams; /* Holds the count of parameters if set*/ + pthread_mutex_t mut; /* make sure only one instance is active */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { uchar *szBinary; /* name of binary to call */ } configSettings_t; @@ -89,8 +95,13 @@ ENDinitConfVars BEGINcreateInstance CODESTARTcreateInstance + pthread_mutex_init(&pData->mut, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -102,6 +113,7 @@ ENDisCompatibleWithFeature BEGINfreeInstance int i; CODESTARTfreeInstance + pthread_mutex_destroy(&pData->mut); if(pData->szBinary != NULL) free(pData->szBinary); if(pData->aParams != NULL) { @@ -112,6 +124,10 @@ CODESTARTfreeInstance } ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -287,22 +303,21 @@ writePipe(instanceData *pData, uchar *szMsg) lenWrite = strlen((char*)szMsg); writeOffset = 0; - do - { + do { lenWritten = write(pData->fdPipe, ((char*)szMsg)+writeOffset, lenWrite); if(lenWritten == -1) { switch(errno) { - case EPIPE: - DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n", - pData->szBinary); - CHKiRet(cleanup(pData)); - CHKiRet(tryRestart(pData)); - break; - default: - DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno, - rs_strerror_r(errno, errStr, sizeof(errStr))); - ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE); - break; + case EPIPE: + DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n", + pData->szBinary); + CHKiRet(cleanup(pData)); + CHKiRet(tryRestart(pData)); + break; + default: + DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno, + rs_strerror_r(errno, errStr, sizeof(errStr))); + ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE); + break; } } else { writeOffset += lenWritten; @@ -316,7 +331,10 @@ finalize_it: BEGINdoAction + instanceData *pData; CODESTARTdoAction + pData = pWrkrData->pData; + pthread_mutex_lock(&pData->mut); if(pData->bIsRunning == 0) { openPipe(pData); } @@ -325,6 +343,7 @@ CODESTARTdoAction if(iRet != RS_RET_OK) iRet = RS_RET_SUSPENDED; + pthread_mutex_unlock(&pData->mut); ENDdoAction @@ -496,6 +515,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c index 34511e46..3b1584e2 100644 --- a/plugins/omrelp/omrelp.c +++ b/plugins/omrelp/omrelp.c @@ -2,8 +2,17 @@ * * This is the implementation of the RELP output module. * - * NOTE: read comments in module-template.h to understand how this file - * works! + * Note that when multiple action workers are activated, we currently + * also create multiple actions. This may be the source of some mild + * message loss (!) if the worker instance is shut down while the + * connection to the remote system is in retry state. + * TODO: think if we should implement a mode where we do NOT + * support multiple action worker instances. This would be + * slower, but not have this loss opportunity. But it should + * definitely be optional and by default off due to the + * performance implications (and given the fact that message + * loss is pretty unlikely in usual cases). + * * * File begun on 2008-03-13 by RGerhards * @@ -63,13 +72,9 @@ static relpEngine_t *pRelpEngine; /* our relp engine */ typedef struct _instanceData { uchar *target; uchar *port; - int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */ - int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */ int sizeWindow; /**< the RELP window size - 0=use default */ unsigned timeout; unsigned rebindInterval; - unsigned nSent; - relpClt_t *pRelpClt; /* relp client for this instance */ sbool bEnableTLS; sbool bEnableTLSZip; sbool bHadAuthFail; /**< set on auth failure, will cause retry to disable action */ @@ -85,11 +90,20 @@ typedef struct _instanceData { } permittedPeers; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */ + int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */ + relpClt_t *pRelpClt; /* relp client for this instance */ + unsigned nSent; /* number msgs sent - for rebind support */ +} wrkrInstanceData_t; + typedef struct configSettings_s { EMPTY_STRUCT } configSettings_t; static configSettings_t __attribute__((unused)) cs; +static rsRetVal doCreateRelpClient(wrkrInstanceData_t *pWrkrData); /* tables for interfacing with the v6 config system */ /* action (instance) parameters */ @@ -135,10 +149,10 @@ static uchar *getRelpPt(instanceData *pData) static void onErr(void *pUsr, char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode) { - instanceData *pData = (instanceData*) pUsr; + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) pUsr; errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: error '%s', object " " '%s' - action may not work as intended", - pData->target, pData->port, errmesg, objinfo); + pWrkrData->pData->target, pWrkrData->pData->port, errmesg, objinfo); } static void @@ -152,55 +166,58 @@ onGenericErr(char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal er static void onAuthErr(void *pUsr, char *authinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode) { - instanceData *pData = (instanceData*) pUsr; + instanceData *pData = ((wrkrInstanceData_t*) pUsr)->pData; errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: authentication error '%s', peer " "is '%s' - DISABLING action", pData->target, pData->port, errmesg, authinfo); pData->bHadAuthFail = 1; } -static inline rsRetVal -doCreateRelpClient(instanceData *pData) +static rsRetVal +doCreateRelpClient(wrkrInstanceData_t *pWrkrData) { int i; + instanceData *pData; DEFiRet; - if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK) + + pData = pWrkrData->pData; + if(relpEngineCltConstruct(pRelpEngine, &pWrkrData->pRelpClt) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK) + if(relpCltSetTimeout(pWrkrData->pRelpClt, pData->timeout) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetWindowSize(pData->pRelpClt, pData->sizeWindow) != RELP_RET_OK) + if(relpCltSetWindowSize(pWrkrData->pRelpClt, pData->sizeWindow) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetUsrPtr(pData->pRelpClt, pData) != RELP_RET_OK) + if(relpCltSetUsrPtr(pWrkrData->pRelpClt, pWrkrData) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); if(pData->bEnableTLS) { - if(relpCltEnableTLS(pData->pRelpClt) != RELP_RET_OK) + if(relpCltEnableTLS(pWrkrData->pRelpClt) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); if(pData->bEnableTLSZip) { - if(relpCltEnableTLSZip(pData->pRelpClt) != RELP_RET_OK) + if(relpCltEnableTLSZip(pWrkrData->pRelpClt) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); } - if(relpCltSetGnuTLSPriString(pData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK) + if(relpCltSetGnuTLSPriString(pWrkrData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetAuthMode(pData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) { + if(relpCltSetAuthMode(pWrkrData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) { errmsg.LogError(0, RS_RET_RELP_ERR, "omrelp: invalid auth mode '%s'\n", pData->authmode); ABORT_FINALIZE(RS_RET_RELP_ERR); } - if(relpCltSetCACert(pData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK) + if(relpCltSetCACert(pWrkrData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetOwnCert(pData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK) + if(relpCltSetOwnCert(pWrkrData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetPrivKey(pData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK) + if(relpCltSetPrivKey(pWrkrData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); for(i = 0 ; i < pData->permittedPeers.nmemb ; ++i) { - relpCltAddPermittedPeer(pData->pRelpClt, (char*)pData->permittedPeers.name[i]); + relpCltAddPermittedPeer(pWrkrData->pRelpClt, (char*)pData->permittedPeers.name[i]); } } if(glbl.GetSourceIPofLocalClient() == NULL) { /* ar Do we have a client IP set? */ - if(relpCltSetClientIP(pData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK) + if(relpCltSetClientIP(pWrkrData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); } - pData->bInitialConnect = 1; - pData->nSent = 0; + pWrkrData->bInitialConnect = 1; + pWrkrData->nSent = 0; finalize_it: RETiRet; } @@ -221,11 +238,15 @@ CODESTARTcreateInstance pData->permittedPeers.nmemb = 0; ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->pRelpClt = NULL; + iRet = doCreateRelpClient(pWrkrData); +ENDcreateWrkrInstance + BEGINfreeInstance int i; CODESTARTfreeInstance - if(pData->pRelpClt != NULL) - relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt); free(pData->target); free(pData->port); free(pData->tplName); @@ -239,6 +260,12 @@ CODESTARTfreeInstance } ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + if(pWrkrData->pRelpClt != NULL) + relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt); +ENDfreeWrkrInstance + static inline void setInstParamDefaults(instanceData *pData) { @@ -318,8 +345,6 @@ CODESTARTnewActInst "RSYSLOG_ForwardFormat" : (char*)pData->tplName), OMSR_NO_RQD_TPL_OPTS)); - CHKiRet(doCreateRelpClient(pData)); - CODE_STD_FINALIZERnewActInst if(pvals != NULL) cnfparamvalsDestruct(pvals, &actpblk); @@ -347,22 +372,23 @@ ENDdbgPrintInstInfo /* try to connect to server * rgerhards, 2008-03-21 */ -static rsRetVal doConnect(instanceData *pData) +static rsRetVal doConnect(wrkrInstanceData_t *pWrkrData) { DEFiRet; - if(pData->bInitialConnect) { - iRet = relpCltConnect(pData->pRelpClt, glbl.GetDefPFFamily(), pData->port, pData->target); + if(pWrkrData->bInitialConnect) { + iRet = relpCltConnect(pWrkrData->pRelpClt, glbl.GetDefPFFamily(), + pWrkrData->pData->port, pWrkrData->pData->target); if(iRet == RELP_RET_OK) - pData->bInitialConnect = 0; + pWrkrData->bInitialConnect = 0; } else { - iRet = relpCltReconnect(pData->pRelpClt); + iRet = relpCltReconnect(pWrkrData->pRelpClt); } if(iRet == RELP_RET_OK) { - pData->bIsConnected = 1; + pWrkrData->bIsConnected = 1; } else { - pData->bIsConnected = 0; + pWrkrData->bIsConnected = 0; iRet = RS_RET_SUSPENDED; } @@ -372,21 +398,21 @@ static rsRetVal doConnect(instanceData *pData) BEGINtryResume CODESTARTtryResume - if(pData->bHadAuthFail) { + if(pWrkrData->pData->bHadAuthFail) { ABORT_FINALIZE(RS_RET_DISABLE_ACTION); } - iRet = doConnect(pData); + iRet = doConnect(pWrkrData); finalize_it: ENDtryResume static inline rsRetVal -doRebind(instanceData *pData) +doRebind(wrkrInstanceData_t *pWrkrData) { DEFiRet; DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n"); - CHKiRet(relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt)); - pData->bIsConnected = 0; - CHKiRet(doCreateRelpClient(pData)); + CHKiRet(relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt)); + pWrkrData->bIsConnected = 0; + CHKiRet(doCreateRelpClient(pWrkrData)); finalize_it: RETiRet; } @@ -394,10 +420,10 @@ finalize_it: BEGINbeginTransaction CODESTARTbeginTransaction dbgprintf("omrelp: beginTransaction\n"); - if(!pData->bIsConnected) { - CHKiRet(doConnect(pData)); + if(!pWrkrData->bIsConnected) { + CHKiRet(doConnect(pWrkrData)); } - relpCltHintBurstBegin(pData->pRelpClt); + relpCltHintBurstBegin(pWrkrData->pRelpClt); finalize_it: ENDbeginTransaction @@ -405,11 +431,13 @@ BEGINdoAction uchar *pMsg; /* temporary buffering */ size_t lenMsg; relpRetVal ret; + instanceData *pData; CODESTARTdoAction + pData = pWrkrData->pData; dbgprintf(" %s:%s/RELP\n", pData->target, getRelpPt(pData)); - if(!pData->bIsConnected) { - CHKiRet(doConnect(pData)); + if(!pWrkrData->bIsConnected) { + CHKiRet(doConnect(pWrkrData)); } pMsg = ppString[0]; @@ -420,7 +448,7 @@ CODESTARTdoAction lenMsg = glbl.GetMaxLine(); /* forward */ - ret = relpCltSendSyslog(pData->pRelpClt, (uchar*) pMsg, lenMsg); + ret = relpCltSendSyslog(pWrkrData->pRelpClt, (uchar*) pMsg, lenMsg); if(ret != RELP_RET_OK) { /* error! */ dbgprintf("error forwarding via relp, suspending\n"); @@ -428,8 +456,8 @@ CODESTARTdoAction } if(pData->rebindInterval != 0 && - (++pData->nSent >= pData->rebindInterval)) { - doRebind(pData); + (++pWrkrData->nSent >= pData->rebindInterval)) { + doRebind(pWrkrData); } finalize_it: if(pData->bHadAuthFail) @@ -448,7 +476,7 @@ ENDdoAction BEGINendTransaction CODESTARTendTransaction dbgprintf("omrelp: endTransaction\n"); - relpCltHintBurstEnd(pData->pRelpClt); + relpCltHintBurstEnd(pWrkrData->pRelpClt); ENDendTransaction BEGINparseSelectorAct @@ -527,8 +555,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) /* process template */ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) "RSYSLOG_ForwardFormat")); - CHKiRet(doCreateRelpClient(pData)); - CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -546,6 +572,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES diff --git a/plugins/omsnmp/omsnmp.c b/plugins/omsnmp/omsnmp.c index 42d1de6b..02862c94 100644 --- a/plugins/omsnmp/omsnmp.c +++ b/plugins/omsnmp/omsnmp.c @@ -2,7 +2,7 @@ * * This module sends an snmp trap. * - * Copyright 2007-2012 Adiscon GmbH. + * Copyright 2007-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -74,15 +74,19 @@ typedef struct _instanceData { * http://www.adiscon.org/download/ADISCON-MONITORWARE-MIB.txt * http://www.adiscon.org/download/ADISCON-MIB.txt */ - int iPort; /* Target Port */ - int iSNMPVersion; /* SNMP Version to use */ - int iTrapType; /* Snmp TrapType or GenericType */ - int iSpecificType; /* Snmp Specific Type */ + int iPort; /* Target Port */ + int iSNMPVersion; /* SNMP Version to use */ + int iTrapType; /* Snmp TrapType or GenericType */ + int iSpecificType; /* Snmp Specific Type */ - netsnmp_session *snmpsession; /* Holds to SNMP Session, NULL if not initialized */ - uchar *tplName; /* format template to use */ + uchar *tplName; /* format template to use */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + netsnmp_session *snmpsession; /* Holds to SNMP Session, NULL if not initialized */ +} wrkrInstanceData_t; + typedef struct configSettings_s { uchar* pszTransport; /* default transport */ uchar* pszTarget; @@ -147,6 +151,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->snmpsession = NULL; +ENDcreateWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -171,14 +179,16 @@ ENDisCompatibleWithFeature /* Exit SNMP Session * alorbach, 2008-02-12 */ -static rsRetVal omsnmp_exitSession(instanceData *pData) +static rsRetVal +omsnmp_exitSession(wrkrInstanceData_t *pWrkrData) { DEFiRet; - if(pData->snmpsession != NULL) { - dbgprintf( "omsnmp_exitSession: Clearing Session to '%s' on Port = '%d'\n", pData->szTarget, pData->iPort); - snmp_close(pData->snmpsession); - pData->snmpsession = NULL; + if(pWrkrData->snmpsession != NULL) { + DBGPRINTF("omsnmp_exitSession: Clearing Session to '%s' on Port = '%d'\n", + pWrkrData->pData->szTarget, pWrkrData->pData->iPort); + snmp_close(pWrkrData->snmpsession); + pWrkrData->snmpsession = NULL; } RETiRet; @@ -187,15 +197,19 @@ static rsRetVal omsnmp_exitSession(instanceData *pData) /* Init SNMP Session * alorbach, 2008-02-12 */ -static rsRetVal omsnmp_initSession(instanceData *pData) +static rsRetVal +omsnmp_initSession(wrkrInstanceData_t *pWrkrData) { netsnmp_session session; + instanceData *pData; char szTargetAndPort[MAXHOSTNAMELEN+128]; /* work buffer for specifying a full target and port string */ DEFiRet; /* should not happen, but if session is not cleared yet - we do it now! */ - if (pData->snmpsession != NULL) - omsnmp_exitSession(pData); + if (pWrkrData->snmpsession != NULL) + omsnmp_exitSession(pWrkrData); + + pData = pWrkrData->pData; snprintf((char*)szTargetAndPort, sizeof(szTargetAndPort), "%s:%s:%d", (pData->szTransport == NULL) ? "udp" : (char*)pData->szTransport, @@ -217,8 +231,8 @@ static rsRetVal omsnmp_initSession(instanceData *pData) session.community_len = strlen((char*) session.community); } - pData->snmpsession = snmp_open(&session); - if (pData->snmpsession == NULL) { + pWrkrData->snmpsession = snmp_open(&session); + if (pWrkrData->snmpsession == NULL) { errmsg.LogError(0, RS_RET_SUSPENDED, "omsnmp_initSession: snmp_open to host '%s' on Port '%d' failed\n", pData->szTarget, pData->iPort); /* Stay suspended */ iRet = RS_RET_SUSPENDED; @@ -227,7 +241,7 @@ static rsRetVal omsnmp_initSession(instanceData *pData) RETiRet; } -static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz) +static rsRetVal omsnmp_sendsnmp(wrkrInstanceData_t *pWrkrData, uchar *psz) { DEFiRet; @@ -239,10 +253,12 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz) int status; char *trap = NULL; const char *strErr = NULL; + instanceData *pData; + pData = pWrkrData->pData; /* Init SNMP Session if necessary */ - if (pData->snmpsession == NULL) { - CHKiRet(omsnmp_initSession(pData)); + if (pWrkrData->snmpsession == NULL) { + CHKiRet(omsnmp_initSession(pWrkrData)); } /* String should not be NULL */ @@ -250,7 +266,7 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz) dbgprintf( "omsnmp_sendsnmp: ENTER - Syslogmessage = '%s'\n", (char*)psz); /* If SNMP Version1 is configured !*/ - if(pData->snmpsession->version == SNMP_VERSION_1) { + if(pWrkrData->snmpsession->version == SNMP_VERSION_1) { pdu = snmp_pdu_create(SNMP_MSG_TRAP); /* Set enterprise */ @@ -275,7 +291,7 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz) pdu->time = get_uptime(); } /* If SNMP Version2c is configured !*/ - else if (pData->snmpsession->version == SNMP_VERSION_2c) + else if (pWrkrData->snmpsession->version == SNMP_VERSION_2c) { long sysuptime; char csysuptime[20]; @@ -320,15 +336,15 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz) } /* Send the TRAP */ - status = snmp_send(pData->snmpsession, pdu) == 0; + status = snmp_send(pWrkrData->snmpsession, pdu) == 0; if (status) { /* Debug Output! */ - int iErrorCode = pData->snmpsession->s_snmp_errno; + int iErrorCode = pWrkrData->snmpsession->s_snmp_errno; errmsg.LogError(0, RS_RET_SUSPENDED, "omsnmp_sendsnmp: snmp_send failed error '%d', Description='%s'\n", iErrorCode*(-1), api_errors[iErrorCode*(-1)]); /* Clear Session */ - omsnmp_exitSession(pData); + omsnmp_exitSession(pWrkrData); ABORT_FINALIZE(RS_RET_SUSPENDED); } @@ -347,7 +363,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume - iRet = omsnmp_initSession(pData); + iRet = omsnmp_initSession(pWrkrData); ENDtryResume BEGINdoAction @@ -358,19 +374,20 @@ CODESTARTdoAction } /* This will generate and send the SNMP Trap */ - iRet = omsnmp_sendsnmp(pData, ppString[0]); + iRet = omsnmp_sendsnmp(pWrkrData, ppString[0]); finalize_it: ENDdoAction BEGINfreeInstance CODESTARTfreeInstance - /* free snmp Session here */ - omsnmp_exitSession(pData); - free(pData->tplName); free(pData->szTarget); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + omsnmp_exitSession(pWrkrData); +ENDfreeWrkrInstance static inline void setInstParamDefaults(instanceData *pData) @@ -499,9 +516,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) /* Set some defaults in the NetSNMP library */ netsnmp_ds_set_int(NETSNMP_DS_LIBRARY_ID, NETSNMP_DS_LIB_DEFAULT_PORT, pData->iPort ); - - /* Init Session Pointer */ - pData->snmpsession = NULL; CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -545,6 +559,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/omudpspoof/omudpspoof.c b/plugins/omudpspoof/omudpspoof.c index cb907bba..ad3508c8 100644 --- a/plugins/omudpspoof/omudpspoof.c +++ b/plugins/omudpspoof/omudpspoof.c @@ -98,15 +98,19 @@ typedef struct _instanceData { uchar *port; uchar *sourceTpl; int mtu; - int *pSockArray; /* sockets to use for UDP */ - struct addrinfo *f_addr; - u_short sourcePort; u_short sourcePortStart; /* for sorce port iteration */ u_short sourcePortEnd; int bReportLibnetInitErr; /* help prevent multiple error messages on init err */ +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; libnet_t *libnet_handle; + u_short sourcePort; + int *pSockArray; /* sockets to use for UDP */ + struct addrinfo *f_addr; char errbuf[LIBNET_ERRBUF_SIZE]; -} instanceData; +} wrkrInstanceData_t; #define DFLT_SOURCE_PORT_START 32000 #define DFLT_SOURCE_PORT_END 42000 @@ -172,7 +176,7 @@ ENDinitConfVars pthread_mutex_t mutLibnet; /* forward definitions */ -static rsRetVal doTryResume(instanceData *pData); +static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData); /* this function gets the default template. It coordinates action between @@ -217,15 +221,14 @@ finalize_it: * rgerhards, 2009-05-29 */ static rsRetVal -closeUDPSockets(instanceData *pData) +closeUDPSockets(wrkrInstanceData_t *pWrkrData) { DEFiRet; - assert(pData != NULL); - if(pData->pSockArray != NULL) { - net.closeUDPListenSockets(pData->pSockArray); - pData->pSockArray = NULL; - freeaddrinfo(pData->f_addr); - pData->f_addr = NULL; + if(pWrkrData->pSockArray != NULL) { + net.closeUDPListenSockets(pWrkrData->pSockArray); + pWrkrData->pSockArray = NULL; + freeaddrinfo(pWrkrData->f_addr); + pWrkrData->f_addr = NULL; } RETiRet; } @@ -310,12 +313,17 @@ ENDfreeCnf BEGINcreateInstance CODESTARTcreateInstance - pData->libnet_handle = NULL; pData->mtu = 1500; pData->bReportLibnetInitErr = 1; ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->libnet_handle = NULL; + pWrkrData->sourcePort = pData->sourcePortStart; +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -326,15 +334,19 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance /* final cleanup */ - closeUDPSockets(pData); free(pData->tplName); free(pData->port); free(pData->host); free(pData->sourceTpl); - if(pData->libnet_handle != NULL) - libnet_destroy(pData->libnet_handle); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + closeUDPSockets(pWrkrData); + if(pWrkrData->libnet_handle != NULL) + libnet_destroy(pWrkrData->libnet_handle); +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -348,11 +360,12 @@ ENDdbgPrintInstInfo * rgehards, 2007-12-20 */ static inline rsRetVal -UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) +UDPSend(wrkrInstanceData_t *pWrkrData, uchar *pszSourcename, char *msg, size_t len) { struct addrinfo *r; int lsent = 0; int bSendSuccess; + instanceData *pData; struct sockaddr_in *tempaddr,source_ip; libnet_ptag_t ip, ipo; libnet_ptag_t udp; @@ -363,9 +376,10 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) unsigned maxPktLen, pktLen; DEFiRet; - if(pData->pSockArray == NULL) { - CHKiRet(doTryResume(pData)); + if(pWrkrData->pSockArray == NULL) { + CHKiRet(doTryResume(pWrkrData)); } + pData = pWrkrData->pData; if(len > 65528) { DBGPRINTF("omudpspoof: msg with length %d truncated to 64k: '%.768s'\n", @@ -374,8 +388,8 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) } ip = ipo = udp = 0; - if(pData->sourcePort++ >= pData->sourcePortEnd){ - pData->sourcePort = pData->sourcePortStart; + if(pWrkrData->sourcePort++ >= pData->sourcePortEnd){ + pWrkrData->sourcePort = pData->sourcePortStart; } inet_pton(AF_INET, (char*)pszSourcename, &(source_ip.sin_addr)); @@ -383,7 +397,7 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) bSendSuccess = RSFALSE; d_pthread_mutex_lock(&mutLibnet); bNeedUnlock = 1; - for (r = pData->f_addr; r && bSendSuccess == RSFALSE ; r = r->ai_next) { + for (r = pWrkrData->f_addr; r && bSendSuccess == RSFALSE ; r = r->ai_next) { tempaddr = (struct sockaddr_in *)r->ai_addr; /* Getting max payload size (must be multiple of 8) */ maxPktLen = (pData->mtu - LIBNET_IPV4_H) & ~0x07; @@ -400,19 +414,19 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) } DBGPRINTF("omudpspoof: stage 1: MF:%d, hdrOffs %d, pktLen %d\n", (hdrOffs & IP_MF) >> 13, (hdrOffs & 0x1FFF) << 3, pktLen); - libnet_clear_packet(pData->libnet_handle); + libnet_clear_packet(pWrkrData->libnet_handle); /* note: libnet does need ports in host order NOT in network byte order! -- rgerhards, 2009-11-12 */ udp = libnet_build_udp( - ntohs(pData->sourcePort),/* source port */ + ntohs(pWrkrData->sourcePort),/* source port */ ntohs(tempaddr->sin_port),/* destination port */ pktLen+LIBNET_UDP_H, /* packet length */ 0, /* checksum */ (u_char*)msg, /* payload */ pktLen, /* payload size */ - pData->libnet_handle, /* libnet handle */ + pWrkrData->libnet_handle, /* libnet handle */ udp); /* libnet id */ if (udp == -1) { - DBGPRINTF("omudpspoof: can't build UDP header: %s\n", libnet_geterror(pData->libnet_handle)); + DBGPRINTF("omudpspoof: can't build UDP header: %s\n", libnet_geterror(pWrkrData->libnet_handle)); } ip = libnet_build_ipv4( @@ -427,22 +441,22 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) tempaddr->sin_addr.s_addr, NULL, /* payload */ 0, /* payload size */ - pData->libnet_handle, /* libnet handle */ + pWrkrData->libnet_handle, /* libnet handle */ ip); /* libnet id */ if (ip == -1) { - DBGPRINTF("omudpspoof: can't build IP header: %s\n", libnet_geterror(pData->libnet_handle)); + DBGPRINTF("omudpspoof: can't build IP header: %s\n", libnet_geterror(pWrkrData->libnet_handle)); } /* Write it to the wire. */ - lsent = libnet_write(pData->libnet_handle); + lsent = libnet_write(pWrkrData->libnet_handle); if(lsent != (int) (LIBNET_IPV4_H+LIBNET_UDP_H+pktLen)) { /* note: access to fd is a libnet internal. If a newer version of libnet does * not expose that member, we should simply remove it. However, while it is there * it is useful for consolidating with strace output. */ DBGPRINTF("omudpspoof: write error (total len %d): pktLen %d, sent %d, fd %d: %s\n", - len, LIBNET_IPV4_H+LIBNET_UDP_H+pktLen, lsent, pData->libnet_handle->fd, - libnet_geterror(pData->libnet_handle)); + len, LIBNET_IPV4_H+LIBNET_UDP_H+pktLen, lsent, pWrkrData->libnet_handle->fd, + libnet_geterror(pWrkrData->libnet_handle)); if(lsent != -1) { bSendSuccess = RSTRUE; } @@ -452,7 +466,7 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) msgOffs += pktLen; /* We need to get rid of the UDP header to build the other fragments */ - libnet_clear_packet(pData->libnet_handle); + libnet_clear_packet(pWrkrData->libnet_handle); ip = LIBNET_PTAG_INITIALIZER; while(len > msgOffs ) { /* loop until all payload is sent */ /* check if there will be more fragments */ @@ -481,16 +495,16 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) tempaddr->sin_addr.s_addr, (uint8_t*)(msg+msgOffs), /* payload */ pktLen, /* payload size */ - pData->libnet_handle, /* libnet handle */ + pWrkrData->libnet_handle, /* libnet handle */ ip); /* libnet id */ if (ip == -1) { - DBGPRINTF("omudpspoof: can't build IP fragment header: %s\n", libnet_geterror(pData->libnet_handle)); + DBGPRINTF("omudpspoof: can't build IP fragment header: %s\n", libnet_geterror(pWrkrData->libnet_handle)); } /* Write it to the wire. */ - lsent = libnet_write(pData->libnet_handle); + lsent = libnet_write(pWrkrData->libnet_handle); if(lsent != (int) (LIBNET_IPV4_H+pktLen)) { DBGPRINTF("omudpspoof: fragment write error len %d, sent %d: %s\n", - LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pData->libnet_handle)); + LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pWrkrData->libnet_handle)); bSendSuccess = RSFALSE; continue; } @@ -500,9 +514,9 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) finalize_it: if(iRet != RS_RET_OK) { - if(pData->libnet_handle != NULL) { - libnet_destroy(pData->libnet_handle); - pData->libnet_handle = NULL; + if(pWrkrData->libnet_handle != NULL) { + libnet_destroy(pWrkrData->libnet_handle); + pWrkrData->libnet_handle = NULL; } } if(bNeedUnlock) { @@ -515,26 +529,28 @@ finalize_it: /* try to resume connection if it is not ready * rgerhards, 2007-08-02 */ -static rsRetVal doTryResume(instanceData *pData) +static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData) { int iErr; struct addrinfo *res; struct addrinfo hints; + instanceData *pData; DEFiRet; - if(pData->pSockArray != NULL) + if(pWrkrData->pSockArray != NULL) FINALIZE; + pData = pWrkrData->pData; - if(pData->libnet_handle == NULL) { + if(pWrkrData->libnet_handle == NULL) { /* Initialize the libnet library. Root priviledges are required. * this initializes a IPv4 socket to use for forging UDP packets. */ - pData->libnet_handle = libnet_init( + pWrkrData->libnet_handle = libnet_init( LIBNET_RAW4, /* injection type */ NULL, /* network interface */ - pData->errbuf); /* errbuf */ + pWrkrData->errbuf); /* errbuf */ - if(pData->libnet_handle == NULL) { + if(pWrkrData->libnet_handle == NULL) { if(pData->bReportLibnetInitErr) { errmsg.LogError(0, RS_RET_ERR_LIBNET_INIT, "omudpsoof: error " "initializing libnet - are you running as root?"); @@ -559,14 +575,14 @@ static rsRetVal doTryResume(instanceData *pData) ABORT_FINALIZE(RS_RET_SUSPENDED); } DBGPRINTF("%s found, resuming.\n", pData->host); - pData->f_addr = res; - pData->pSockArray = net.create_udp_socket((uchar*)pData->host, NULL, 0, 0); + pWrkrData->f_addr = res; + pWrkrData->pSockArray = net.create_udp_socket((uchar*)pData->host, NULL, 0, 0); finalize_it: if(iRet != RS_RET_OK) { - if(pData->f_addr != NULL) { - freeaddrinfo(pData->f_addr); - pData->f_addr = NULL; + if(pWrkrData->f_addr != NULL) { + freeaddrinfo(pWrkrData->f_addr); + pWrkrData->f_addr = NULL; } iRet = RS_RET_SUSPENDED; } @@ -577,7 +593,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume - iRet = doTryResume(pData); + iRet = doTryResume(pWrkrData); ENDtryResume BEGINdoAction @@ -585,10 +601,10 @@ BEGINdoAction unsigned l; int iMaxLine; CODESTARTdoAction - CHKiRet(doTryResume(pData)); + CHKiRet(doTryResume(pWrkrData)); - DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%.256s'\n", pData->host, - getFwdPt(pData), ppString[1], ppString[0]); + DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%.256s'\n", pWrkrData->pData->host, + getFwdPt(pWrkrData->pData), ppString[1], ppString[0]); iMaxLine = glbl.GetMaxLine(); psz = (char*) ppString[0]; @@ -596,7 +612,7 @@ CODESTARTdoAction if((int) l > iMaxLine) l = iMaxLine; - CHKiRet(UDPSend(pData, ppString[1], psz, l)); + CHKiRet(UDPSend(pWrkrData, ppString[1], psz, l)); finalize_it: ENDdoAction @@ -660,7 +676,6 @@ CODESTARTnewActInst } } CODE_STD_STRING_REQUESTnewActInst(2) - pData->sourcePort = pData->sourcePortStart; tplToUse = ustrdup((pData->tplName == NULL) ? getDfltTpl() : pData->tplName); CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_NO_RQD_TPL_OPTS)); @@ -699,7 +714,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(2) else CHKmalloc(pData->port = ustrdup(cs.pszTargetPort)); CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(sourceTpl), OMSR_NO_RQD_TPL_OPTS)); - pData->sourcePort = pData->sourcePortStart = cs.iSourcePortStart; + pData->sourcePortStart = cs.iSourcePortStart; pData->sourcePortEnd = cs.iSourcePortEnd; /* process template */ @@ -740,6 +755,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES diff --git a/plugins/omuxsock/omuxsock.c b/plugins/omuxsock/omuxsock.c index 583b9f94..da4e8e94 100644 --- a/plugins/omuxsock/omuxsock.c +++ b/plugins/omuxsock/omuxsock.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * Copyright 2010-2012 Adiscon GmbH. + * Copyright 2010-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -60,10 +60,14 @@ typedef struct _instanceData { permittedPeers_t *pPermPeers; uchar *sockName; int sock; - int bIsConnected; /* are we connected to remote host? 0 - no, 1 - yes, UDP means addr resolved */ struct sockaddr_un addr; } instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* config data */ typedef struct configSettings_s { uchar *tplName; /* name of the default template to use */ @@ -90,6 +94,7 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars @@ -147,7 +152,6 @@ closeSocket(instanceData *pData) close(pData->sock); pData->sock = INVLD_SOCK; } -pData->bIsConnected = 0; // TODO: remove this variable altogether RETiRet; } @@ -224,6 +228,10 @@ CODESTARTcreateInstance pData->sock = INVLD_SOCK; ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -239,6 +247,10 @@ CODESTARTfreeInstance free(pData->sockName); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -332,7 +344,7 @@ static rsRetVal doTryResume(instanceData *pData) BEGINtryResume CODESTARTtryResume - iRet = doTryResume(pData); + iRet = doTryResume(pWrkrData->pData); ENDtryResume BEGINdoAction @@ -340,20 +352,22 @@ BEGINdoAction register unsigned l; int iMaxLine; CODESTARTdoAction - CHKiRet(doTryResume(pData)); + pthread_mutex_lock(&mutDoAct); + CHKiRet(doTryResume(pWrkrData->pData)); iMaxLine = glbl.GetMaxLine(); - DBGPRINTF(" omuxsock:%s\n", pData->sockName); + DBGPRINTF(" omuxsock:%s\n", pWrkrData->pData->sockName); psz = (char*) ppString[0]; l = strlen((char*) psz); if((int) l > iMaxLine) l = iMaxLine; - CHKiRet(sendMsg(pData, psz, l)); + CHKiRet(sendMsg(pWrkrData->pData, psz, l)); finalize_it: + pthread_mutex_unlock(&mutDoAct); ENDdoAction @@ -413,6 +427,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES ENDqueryEtryPt diff --git a/runtime/rsconf.c b/runtime/rsconf.c index abce53b8..c855779e 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -292,6 +292,21 @@ finalize_it: extern int yylineno; void +parser_warnmsg(char *fmt, ...) +{ + va_list ap; + char errBuf[1024]; + + va_start(ap, fmt); + if(vsnprintf(errBuf, sizeof(errBuf), fmt, ap) == sizeof(errBuf)) + errBuf[sizeof(errBuf)-1] = '\0'; + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, + "warning during parsing file %s, on or before line %d: %s", + cnfcurrfn, yylineno, errBuf); + va_end(ap); +} + +void parser_errmsg(char *fmt, ...) { va_list ap; diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index ae56f08f..e0331da7 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -382,6 +382,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_ERR_UDPSEND = -2354,/**< sending msg via UDP failed */ RS_RET_LAST_ERRREPORT = -2355,/**< module does not emit more error messages as limit is reached */ RS_RET_READ_ERR = -2356,/**< read error occured (file i/o) */ + RS_RET_CONF_PARSE_WARNING = -2357,/**< warning parsing config file */ /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ @@ -1794,7 +1794,7 @@ tplProcessCnf(struct cnfobj *o) } } - if(bHaveSubtree) { + if(!bHaveSubtree) { if(tplType == T_SUBTREE) { errmsg.LogError(0, RS_RET_ERR, "template '%s' of type subtree needs " "subtree parameter", name); diff --git a/tools/omfwd.c b/tools/omfwd.c index ed0898c9..8cf306ed 100644 --- a/tools/omfwd.c +++ b/tools/omfwd.c @@ -72,48 +72,50 @@ DEFobjCurrIf(netstrms) DEFobjCurrIf(netstrm) DEFobjCurrIf(tcpclt) + /* some local constants (just) for better readybility */ #define IS_FLUSH 1 #define NO_FLUSH 0 typedef struct _instanceData { uchar *tplName; /* name of assigned template */ - netstrms_t *pNS; /* netstream subsystem */ - netstrm_t *pNetstrm; /* our output netstream */ uchar *pszStrmDrvr; uchar *pszStrmDrvrAuthMode; permittedPeers_t *pPermPeers; int iStrmDrvrMode; char *target; - int *pSockArray; /* sockets to use for UDP */ - int bIsConnected; /* are we connected to remote host? 0 - no, 1 - yes, UDP means addr resolved */ - struct addrinfo *f_addr; int compressionLevel; /* 0 - no compression, else level for zlib */ char *port; int protocol; int iRebindInterval; /* rebind interval */ - int nXmit; /* number of transmissions since last (re-)bind */ # define FORW_UDP 0 # define FORW_TCP 1 /* following fields for TCP-based delivery */ TCPFRAMINGMODE tcp_framing; int bResendLastOnRecon; /* should the last message be re-sent on a successful reconnect? */ - tcpclt_t *pTCPClt; /* our tcpclt object */ # define COMPRESS_NEVER 0 # define COMPRESS_SINGLE_MSG 1 /* old, single-message compression */ /* all other settings are for stream-compression */ # define COMPRESS_STREAM_ALWAYS 2 uint8_t compressionMode; + int errsToReport; /* max number of errors to report (per instance) */ sbool strmCompFlushOnTxEnd; /* flush stream compression on transaction end? */ +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; + netstrms_t *pNS; /* netstream subsystem */ + netstrm_t *pNetstrm; /* our output netstream */ + struct addrinfo *f_addr; + int *pSockArray; /* sockets to use for UDP */ + int bIsConnected; /* are we connected to remote host? 0 - no, 1 - yes, UDP means addr resolved */ + int nXmit; /* number of transmissions since last (re-)bind */ + tcpclt_t *pTCPClt; /* our tcpclt object */ sbool bzInitDone; /* did we do an init of zstrm already? */ z_stream zstrm; /* zip stream to use for tcp compression */ uchar sndBuf[16*1024]; /* this is intensionally fixed -- see no good reason to make configurable */ unsigned offsSndBuf; /* next free spot in send buffer */ int errsToReport; /* (remaining) number of errors to report */ -} instanceData; - -typedef struct wrkrInstanceData { - instanceData *pData; } wrkrInstanceData_t; /* config data */ @@ -173,6 +175,9 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ +static rsRetVal initTCP(wrkrInstanceData_t *pWrkrData); + + BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars cs.pszTplName = NULL; /* name of the default template to use */ @@ -186,8 +191,8 @@ CODESTARTinitConfVars ENDinitConfVars -static rsRetVal doTryResume(instanceData *pData); -static rsRetVal doZipFinish(instanceData *pData); +static rsRetVal doTryResume(wrkrInstanceData_t *); +static rsRetVal doZipFinish(wrkrInstanceData_t *); /* this function gets the default template. It coordinates action between * old-style and new-style configuration parts. @@ -231,17 +236,16 @@ finalize_it: * rgerhards, 2009-05-29 */ static rsRetVal -closeUDPSockets(instanceData *pData) +closeUDPSockets(wrkrInstanceData_t *pWrkrData) { DEFiRet; - assert(pData != NULL); - if(pData->pSockArray != NULL) { - net.closeUDPListenSockets(pData->pSockArray); - pData->pSockArray = NULL; - freeaddrinfo(pData->f_addr); - pData->f_addr = NULL; + if(pWrkrData->pSockArray != NULL) { + net.closeUDPListenSockets(pWrkrData->pSockArray); + pWrkrData->pSockArray = NULL; + freeaddrinfo(pWrkrData->f_addr); + pWrkrData->f_addr = NULL; } -pData->bIsConnected = 0; // TODO: remove this variable altogether +pWrkrData->bIsConnected = 0; // TODO: remove this variable altogether RETiRet; } @@ -252,18 +256,17 @@ pData->bIsConnected = 0; // TODO: remove this variable altogether * rgerhards, 2008-06-04 * Note that we DO NOT discard the current buffer contents * (if any). This permits us to save data between sessions. In - * the wort case, some duplication occurs, but we do not + * the worst case, some duplication occurs, but we do not * loose data. */ static inline void -DestructTCPInstanceData(instanceData *pData) +DestructTCPInstanceData(wrkrInstanceData_t *pWrkrData) { - assert(pData != NULL); - doZipFinish(pData); - if(pData->pNetstrm != NULL) - netstrm.Destruct(&pData->pNetstrm); - if(pData->pNS != NULL) - netstrms.Destruct(&pData->pNS); + doZipFinish(pWrkrData); + if(pWrkrData->pNetstrm != NULL) + netstrm.Destruct(&pWrkrData->pNetstrm); + if(pWrkrData->pNS != NULL) + netstrms.Destruct(&pWrkrData->pNS); } @@ -334,14 +337,22 @@ ENDfreeCnf BEGINcreateInstance CODESTARTcreateInstance - pData->offsSndBuf = 0; pData->errsToReport = 5; + if(cs.pszStrmDrvr != NULL) + CHKmalloc(pData->pszStrmDrvr = (uchar*)strdup((char*)cs.pszStrmDrvr)); + if(cs.pszStrmDrvrAuthMode != NULL) + CHKmalloc(pData->pszStrmDrvrAuthMode = + (uchar*)strdup((char*)cs.pszStrmDrvrAuthMode)); +finalize_it: ENDcreateInstance BEGINcreateWrkrInstance CODESTARTcreateWrkrInstance dbgprintf("DDDD: createWrkrInstance: pWrkrData %p\n", pWrkrData); + pWrkrData->offsSndBuf = 0; + pWrkrData->errsToReport = pData->errsToReport; + iRet = initTCP(pWrkrData); ENDcreateWrkrInstance @@ -354,24 +365,22 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - /* final cleanup */ - DestructTCPInstanceData(pData); - closeUDPSockets(pData); - - if(pData->protocol == FORW_TCP) { - tcpclt.Destruct(&pData->pTCPClt); - } - - free(pData->port); - free(pData->target); free(pData->pszStrmDrvr); free(pData->pszStrmDrvrAuthMode); + free(pData->port); + free(pData->target); net.DestructPermittedPeers(&pData->pPermPeers); ENDfreeInstance BEGINfreeWrkrInstance CODESTARTfreeWrkrInstance + DestructTCPInstanceData(pWrkrData); + closeUDPSockets(pWrkrData); + + if(pWrkrData->pData->protocol == FORW_TCP) { + tcpclt.Destruct(&pWrkrData->pTCPClt); + } ENDfreeWrkrInstance @@ -384,7 +393,7 @@ ENDdbgPrintInstInfo /* Send a message via UDP * rgehards, 2007-12-20 */ -static rsRetVal UDPSend(instanceData *pData, char *msg, size_t len) +static rsRetVal UDPSend(wrkrInstanceData_t *pWrkrData, char *msg, size_t len) { DEFiRet; struct addrinfo *r; @@ -394,17 +403,17 @@ static rsRetVal UDPSend(instanceData *pData, char *msg, size_t len) int lasterrno; char errStr[1024]; - if(pData->iRebindInterval && (pData->nXmit++ % pData->iRebindInterval == 0)) { + if(pWrkrData->pData->iRebindInterval && (pWrkrData->nXmit++ % pWrkrData->pData->iRebindInterval == 0)) { dbgprintf("omfwd dropping UDP 'connection' (as configured)\n"); - pData->nXmit = 1; /* else we have an addtl wrap at 2^31-1 */ - CHKiRet(closeUDPSockets(pData)); + pWrkrData->nXmit = 1; /* else we have an addtl wrap at 2^31-1 */ + CHKiRet(closeUDPSockets(pWrkrData)); } - if(pData->pSockArray == NULL) { - CHKiRet(doTryResume(pData)); + if(pWrkrData->pSockArray == NULL) { + CHKiRet(doTryResume(pWrkrData)); } - if(pData->pSockArray != NULL) { + if(pWrkrData->pSockArray != NULL) { /* we need to track if we have success sending to the remote * peer. Success is indicated by at least one sendto() call * succeeding. We track this be bSendSuccess. We can not simply @@ -413,9 +422,9 @@ static rsRetVal UDPSend(instanceData *pData, char *msg, size_t len) * the sendto() succeeded. -- rgerhards, 2007-06-22 */ bSendSuccess = RSFALSE; - for (r = pData->f_addr; r; r = r->ai_next) { - for (i = 0; i < *pData->pSockArray; i++) { - lsent = sendto(pData->pSockArray[i+1], msg, len, 0, r->ai_addr, r->ai_addrlen); + for (r = pWrkrData->f_addr; r; r = r->ai_next) { + for (i = 0; i < *pWrkrData->pSockArray; i++) { + lsent = sendto(pWrkrData->pSockArray[i+1], msg, len, 0, r->ai_addr, r->ai_addrlen); if (lsent == len) { bSendSuccess = RSTRUE; break; @@ -432,17 +441,17 @@ static rsRetVal UDPSend(instanceData *pData, char *msg, size_t len) /* finished looping */ if(bSendSuccess == RSFALSE) { dbgprintf("error forwarding via udp, suspending\n"); - if(pData->errsToReport > 0) { + if(pWrkrData->errsToReport > 0) { rs_strerror_r(lasterrno, errStr, sizeof(errStr)); errmsg.LogError(0, RS_RET_ERR_UDPSEND, "omfwd: error sending " "via udp: %s", errStr); - if(pData->errsToReport == 1) { + if(pWrkrData->errsToReport == 1) { errmsg.LogError(0, RS_RET_LAST_ERRREPORT, "omfwd: " "max number of error message emitted " "- further messages will be " "suppressed"); } - --pData->errsToReport; + --pWrkrData->errsToReport; } iRet = RS_RET_SUSPENDED; } @@ -470,18 +479,18 @@ finalize_it: /* CODE FOR SENDING TCP MESSAGES */ static rsRetVal -TCPSendBufUncompressed(instanceData *pData, uchar *buf, unsigned len) +TCPSendBufUncompressed(wrkrInstanceData_t *pWrkrData, uchar *buf, unsigned len) { DEFiRet; unsigned alreadySent; ssize_t lenSend; alreadySent = 0; - CHKiRet(netstrm.CheckConnection(pData->pNetstrm)); /* hack for plain tcp syslog - see ptcp driver for details */ + CHKiRet(netstrm.CheckConnection(pWrkrData->pNetstrm)); /* hack for plain tcp syslog - see ptcp driver for details */ while(alreadySent != len) { lenSend = len - alreadySent; - CHKiRet(netstrm.Send(pData->pNetstrm, buf+alreadySent, &lenSend)); + CHKiRet(netstrm.Send(pWrkrData->pNetstrm, buf+alreadySent, &lenSend)); DBGPRINTF("omfwd: TCP sent %ld bytes, requested %u\n", (long) lenSend, len - alreadySent); alreadySent += lenSend; } @@ -490,14 +499,14 @@ finalize_it: if(iRet != RS_RET_OK) { /* error! */ dbgprintf("TCPSendBuf error %d, destruct TCP Connection!\n", iRet); - DestructTCPInstanceData(pData); + DestructTCPInstanceData(pWrkrData); iRet = RS_RET_SUSPENDED; } RETiRet; } static rsRetVal -TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush) +TCPSendBufCompressed(wrkrInstanceData_t *pWrkrData, uchar *buf, unsigned len, sbool bIsFlush) { int zRet; /* zlib return state */ unsigned outavail; @@ -505,52 +514,52 @@ TCPSendBufCompressed(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlu int op; DEFiRet; - if(!pData->bzInitDone) { + if(!pWrkrData->bzInitDone) { /* allocate deflate state */ - pData->zstrm.zalloc = Z_NULL; - pData->zstrm.zfree = Z_NULL; - pData->zstrm.opaque = Z_NULL; + pWrkrData->zstrm.zalloc = Z_NULL; + pWrkrData->zstrm.zfree = Z_NULL; + pWrkrData->zstrm.opaque = Z_NULL; /* see note in file header for the params we use with deflateInit2() */ - zRet = deflateInit(&pData->zstrm, 9); + zRet = deflateInit(&pWrkrData->zstrm, 9); if(zRet != Z_OK) { DBGPRINTF("error %d returned from zlib/deflateInit()\n", zRet); ABORT_FINALIZE(RS_RET_ZLIB_ERR); } - pData->bzInitDone = RSTRUE; + pWrkrData->bzInitDone = RSTRUE; } /* now doing the compression */ - pData->zstrm.next_in = (Bytef*) buf; - pData->zstrm.avail_in = len; - if(pData->strmCompFlushOnTxEnd && bIsFlush) + pWrkrData->zstrm.next_in = (Bytef*) buf; + pWrkrData->zstrm.avail_in = len; + if(pWrkrData->pData->strmCompFlushOnTxEnd && bIsFlush) op = Z_SYNC_FLUSH; else op = Z_NO_FLUSH; /* run deflate() on buffer until everything has been compressed */ do { - DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld, isFlush %d\n", pData->zstrm.avail_in, pData->zstrm.total_in, bIsFlush); - pData->zstrm.avail_out = sizeof(zipBuf); - pData->zstrm.next_out = zipBuf; - zRet = deflate(&pData->zstrm, op); /* no bad return value */ - DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pData->zstrm.avail_out); - outavail = sizeof(zipBuf) - pData->zstrm.avail_out; + DBGPRINTF("omfwd: in deflate() loop, avail_in %d, total_in %ld, isFlush %d\n", pWrkrData->zstrm.avail_in, pWrkrData->zstrm.total_in, bIsFlush); + pWrkrData->zstrm.avail_out = sizeof(zipBuf); + pWrkrData->zstrm.next_out = zipBuf; + zRet = deflate(&pWrkrData->zstrm, op); /* no bad return value */ + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pWrkrData->zstrm.avail_out); + outavail = sizeof(zipBuf) - pWrkrData->zstrm.avail_out; if(outavail != 0) { - CHKiRet(TCPSendBufUncompressed(pData, zipBuf, outavail)); + CHKiRet(TCPSendBufUncompressed(pWrkrData, zipBuf, outavail)); } - } while (pData->zstrm.avail_out == 0); + } while (pWrkrData->zstrm.avail_out == 0); finalize_it: RETiRet; } static rsRetVal -TCPSendBuf(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush) +TCPSendBuf(wrkrInstanceData_t *pWrkrData, uchar *buf, unsigned len, sbool bIsFlush) { DEFiRet; - if(pData->compressionMode >= COMPRESS_STREAM_ALWAYS) - iRet = TCPSendBufCompressed(pData, buf, len, bIsFlush); + if(pWrkrData->pData->compressionMode >= COMPRESS_STREAM_ALWAYS) + iRet = TCPSendBufCompressed(pWrkrData, buf, len, bIsFlush); else - iRet = TCPSendBufUncompressed(pData, buf, len); + iRet = TCPSendBufUncompressed(pWrkrData, buf, len); RETiRet; } @@ -558,39 +567,39 @@ TCPSendBuf(instanceData *pData, uchar *buf, unsigned len, sbool bIsFlush) * running in stream mode). */ static rsRetVal -doZipFinish(instanceData *pData) +doZipFinish(wrkrInstanceData_t *pWrkrData) { int zRet; /* zlib return state */ DEFiRet; unsigned outavail; uchar zipBuf[32*1024]; - if(!pData->bzInitDone) + if(!pWrkrData->bzInitDone) goto done; // TODO: can we get this into a single common function? dbgprintf("DDDD: in doZipFinish()\n"); - pData->zstrm.avail_in = 0; + pWrkrData->zstrm.avail_in = 0; /* run deflate() on buffer until everything has been compressed */ do { - DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pData->zstrm.avail_in, pData->zstrm.total_in); - pData->zstrm.avail_out = sizeof(zipBuf); - pData->zstrm.next_out = zipBuf; - zRet = deflate(&pData->zstrm, Z_FINISH); /* no bad return value */ - DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pData->zstrm.avail_out); - outavail = sizeof(zipBuf) - pData->zstrm.avail_out; + DBGPRINTF("in deflate() loop, avail_in %d, total_in %ld\n", pWrkrData->zstrm.avail_in, pWrkrData->zstrm.total_in); + pWrkrData->zstrm.avail_out = sizeof(zipBuf); + pWrkrData->zstrm.next_out = zipBuf; + zRet = deflate(&pWrkrData->zstrm, Z_FINISH); /* no bad return value */ + DBGPRINTF("after deflate, ret %d, avail_out %d\n", zRet, pWrkrData->zstrm.avail_out); + outavail = sizeof(zipBuf) - pWrkrData->zstrm.avail_out; if(outavail != 0) { - CHKiRet(TCPSendBufUncompressed(pData, zipBuf, outavail)); + CHKiRet(TCPSendBufUncompressed(pWrkrData, zipBuf, outavail)); } - } while (pData->zstrm.avail_out == 0); + } while (pWrkrData->zstrm.avail_out == 0); finalize_it: - zRet = deflateEnd(&pData->zstrm); + zRet = deflateEnd(&pWrkrData->zstrm); if(zRet != Z_OK) { DBGPRINTF("error %d returned from zlib/deflateEnd()\n", zRet); } - pData->bzInitDone = 0; + pWrkrData->bzInitDone = 0; done: RETiRet; } @@ -600,26 +609,26 @@ done: RETiRet; static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len) { DEFiRet; - instanceData *pData = (instanceData *) pvData; + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t *) pvData; DBGPRINTF("omfwd: add %u bytes to send buffer (curr offs %u)\n", - (unsigned) len, pData->offsSndBuf); - if(pData->offsSndBuf != 0 && pData->offsSndBuf + len >= sizeof(pData->sndBuf)) { + (unsigned) len, pWrkrData->offsSndBuf); + if(pWrkrData->offsSndBuf != 0 && pWrkrData->offsSndBuf + len >= sizeof(pWrkrData->sndBuf)) { /* no buffer space left, need to commit previous records */ - CHKiRet(TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, NO_FLUSH)); - pData->offsSndBuf = 0; + CHKiRet(TCPSendBuf(pWrkrData, pWrkrData->sndBuf, pWrkrData->offsSndBuf, NO_FLUSH)); + pWrkrData->offsSndBuf = 0; iRet = RS_RET_PREVIOUS_COMMITTED; } /* check if the message is too large to fit into buffer */ - if(len > sizeof(pData->sndBuf)) { - CHKiRet(TCPSendBuf(pData, (uchar*)msg, len, NO_FLUSH)); + if(len > sizeof(pWrkrData->sndBuf)) { + CHKiRet(TCPSendBuf(pWrkrData, (uchar*)msg, len, NO_FLUSH)); ABORT_FINALIZE(RS_RET_OK); /* committed everything so far */ } /* we now know the buffer has enough free space */ - memcpy(pData->sndBuf + pData->offsSndBuf, msg, len); - pData->offsSndBuf += len; + memcpy(pWrkrData->sndBuf + pWrkrData->offsSndBuf, msg, len); + pWrkrData->offsSndBuf += len; iRet = RS_RET_DEFER_COMMIT; finalize_it: @@ -634,10 +643,10 @@ finalize_it: static rsRetVal TCPSendPrepRetry(void *pvData) { DEFiRet; - instanceData *pData = (instanceData *) pvData; + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t *) pvData; - assert(pData != NULL); - DestructTCPInstanceData(pData); + assert(pWrkrData != NULL); + DestructTCPInstanceData(pWrkrData); RETiRet; } @@ -648,36 +657,39 @@ static rsRetVal TCPSendPrepRetry(void *pvData) static rsRetVal TCPSendInit(void *pvData) { DEFiRet; - instanceData *pData = (instanceData *) pvData; + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t *) pvData; + instanceData *pData; + + assert(pWrkrData != NULL); + pData = pWrkrData->pData; - assert(pData != NULL); - if(pData->pNetstrm == NULL) { + if(pWrkrData->pNetstrm == NULL) { dbgprintf("TCPSendInit CREATE\n"); - CHKiRet(netstrms.Construct(&pData->pNS)); + CHKiRet(netstrms.Construct(&pWrkrData->pNS)); /* the stream driver must be set before the object is finalized! */ - CHKiRet(netstrms.SetDrvrName(pData->pNS, pData->pszStrmDrvr)); - CHKiRet(netstrms.ConstructFinalize(pData->pNS)); + CHKiRet(netstrms.SetDrvrName(pWrkrData->pNS, pData->pszStrmDrvr)); + CHKiRet(netstrms.ConstructFinalize(pWrkrData->pNS)); /* now create the actual stream and connect to the server */ - CHKiRet(netstrms.CreateStrm(pData->pNS, &pData->pNetstrm)); - CHKiRet(netstrm.ConstructFinalize(pData->pNetstrm)); - CHKiRet(netstrm.SetDrvrMode(pData->pNetstrm, pData->iStrmDrvrMode)); + CHKiRet(netstrms.CreateStrm(pWrkrData->pNS, &pWrkrData->pNetstrm)); + CHKiRet(netstrm.ConstructFinalize(pWrkrData->pNetstrm)); + CHKiRet(netstrm.SetDrvrMode(pWrkrData->pNetstrm, pData->iStrmDrvrMode)); /* now set optional params, but only if they were actually configured */ if(pData->pszStrmDrvrAuthMode != NULL) { - CHKiRet(netstrm.SetDrvrAuthMode(pData->pNetstrm, pData->pszStrmDrvrAuthMode)); + CHKiRet(netstrm.SetDrvrAuthMode(pWrkrData->pNetstrm, pData->pszStrmDrvrAuthMode)); } if(pData->pPermPeers != NULL) { - CHKiRet(netstrm.SetDrvrPermPeers(pData->pNetstrm, pData->pPermPeers)); + CHKiRet(netstrm.SetDrvrPermPeers(pWrkrData->pNetstrm, pData->pPermPeers)); } /* params set, now connect */ - CHKiRet(netstrm.Connect(pData->pNetstrm, glbl.GetDefPFFamily(), + CHKiRet(netstrm.Connect(pWrkrData->pNetstrm, glbl.GetDefPFFamily(), (uchar*)pData->port, (uchar*)pData->target)); } finalize_it: if(iRet != RS_RET_OK) { dbgprintf("TCPSendInit FAILED with %d.\n", iRet); - DestructTCPInstanceData(pData); + DestructTCPInstanceData(pWrkrData); } RETiRet; @@ -687,15 +699,17 @@ finalize_it: /* try to resume connection if it is not ready * rgerhards, 2007-08-02 */ -static rsRetVal doTryResume(instanceData *pData) +static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData) { int iErr; struct addrinfo *res; struct addrinfo hints; + instanceData *pData; DEFiRet; - if(pData->bIsConnected) + if(pWrkrData->bIsConnected) FINALIZE; + pData = pWrkrData->pData; /* The remote address is not yet known and needs to be obtained */ dbgprintf(" %s\n", pData->target); @@ -711,20 +725,20 @@ static rsRetVal doTryResume(instanceData *pData) ABORT_FINALIZE(RS_RET_SUSPENDED); } dbgprintf("%s found, resuming.\n", pData->target); - pData->f_addr = res; - pData->bIsConnected = 1; - if(pData->pSockArray == NULL) { - pData->pSockArray = net.create_udp_socket((uchar*)pData->target, NULL, 0, 0); + pWrkrData->f_addr = res; + pWrkrData->bIsConnected = 1; + if(pWrkrData->pSockArray == NULL) { + pWrkrData->pSockArray = net.create_udp_socket((uchar*)pData->target, NULL, 0, 0); } } else { - CHKiRet(TCPSendInit((void*)pData)); + CHKiRet(TCPSendInit((void*)pWrkrData)); } finalize_it: if(iRet != RS_RET_OK) { - if(pData->f_addr != NULL) { - freeaddrinfo(pData->f_addr); - pData->f_addr = NULL; + if(pWrkrData->f_addr != NULL) { + freeaddrinfo(pWrkrData->f_addr); + pWrkrData->f_addr = NULL; } iRet = RS_RET_SUSPENDED; } @@ -736,7 +750,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume dbgprintf("DDDD: tryResume: pWrkrData %p\n", pWrkrData); - iRet = doTryResume(pWrkrData->pData); + iRet = doTryResume(pWrkrData); ENDtryResume @@ -757,7 +771,7 @@ BEGINdoAction CODESTARTdoAction dbgprintf("DDDD: doAction: pWrkrData %p\n", pWrkrData); pData = pWrkrData->pData; - CHKiRet(doTryResume(pData)); + CHKiRet(doTryResume(pWrkrData)); iMaxLine = glbl.GetMaxLine(); @@ -810,14 +824,14 @@ CODESTARTdoAction if(pData->protocol == FORW_UDP) { /* forward via UDP */ - CHKiRet(UDPSend(pData, psz, l)); + CHKiRet(UDPSend(pWrkrData, psz, l)); } else { /* forward via TCP */ - iRet = tcpclt.Send(pData->pTCPClt, pData, psz, l); + iRet = tcpclt.Send(pWrkrData->pTCPClt, pWrkrData, psz, l); if(iRet != RS_RET_OK && iRet != RS_RET_DEFER_COMMIT && iRet != RS_RET_PREVIOUS_COMMITTED) { /* error! */ dbgprintf("error forwarding via tcp, suspending\n"); - DestructTCPInstanceData(pData); + DestructTCPInstanceData(pWrkrData); iRet = RS_RET_SUSPENDED; } if(pData->compressionMode >= COMPRESS_STREAM_ALWAYS && pData->strmCompFlushOnTxEnd) @@ -832,13 +846,11 @@ ENDdoAction BEGINendTransaction - instanceData *pData; CODESTARTendTransaction - pData = pWrkrData->pData; -dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pData->offsSndBuf); - if(pData->offsSndBuf != 0) { - iRet = TCPSendBuf(pData, pData->sndBuf, pData->offsSndBuf, IS_FLUSH); - pData->offsSndBuf = 0; +dbgprintf("omfwd: endTransaction, offsSndBuf %u\n", pWrkrData->offsSndBuf); + if(pWrkrData->offsSndBuf != 0) { + iRet = TCPSendBuf(pWrkrData, pWrkrData->sndBuf, pWrkrData->offsSndBuf, IS_FLUSH); + pWrkrData->offsSndBuf = 0; } ENDendTransaction @@ -865,24 +877,22 @@ finalize_it: * created. */ static rsRetVal -initTCP(instanceData *pData) +initTCP(wrkrInstanceData_t *pWrkrData) { + instanceData *pData; DEFiRet; + + pData = pWrkrData->pData; if(pData->protocol == FORW_TCP) { /* create our tcpclt */ - CHKiRet(tcpclt.Construct(&pData->pTCPClt)); - CHKiRet(tcpclt.SetResendLastOnRecon(pData->pTCPClt, pData->bResendLastOnRecon)); + CHKiRet(tcpclt.Construct(&pWrkrData->pTCPClt)); + CHKiRet(tcpclt.SetResendLastOnRecon(pWrkrData->pTCPClt, pData->bResendLastOnRecon)); /* and set callbacks */ - CHKiRet(tcpclt.SetSendInit(pData->pTCPClt, TCPSendInit)); - CHKiRet(tcpclt.SetSendFrame(pData->pTCPClt, TCPSendFrame)); - CHKiRet(tcpclt.SetSendPrepRetry(pData->pTCPClt, TCPSendPrepRetry)); - CHKiRet(tcpclt.SetFraming(pData->pTCPClt, pData->tcp_framing)); - CHKiRet(tcpclt.SetRebindInterval(pData->pTCPClt, pData->iRebindInterval)); - if(cs.pszStrmDrvr != NULL) - CHKmalloc(pData->pszStrmDrvr = (uchar*)strdup((char*)cs.pszStrmDrvr)); - if(cs.pszStrmDrvrAuthMode != NULL) - CHKmalloc(pData->pszStrmDrvrAuthMode = - (uchar*)strdup((char*)cs.pszStrmDrvrAuthMode)); + CHKiRet(tcpclt.SetSendInit(pWrkrData->pTCPClt, TCPSendInit)); + CHKiRet(tcpclt.SetSendFrame(pWrkrData->pTCPClt, TCPSendFrame)); + CHKiRet(tcpclt.SetSendPrepRetry(pWrkrData->pTCPClt, TCPSendPrepRetry)); + CHKiRet(tcpclt.SetFraming(pWrkrData->pTCPClt, pData->tcp_framing)); + CHKiRet(tcpclt.SetRebindInterval(pWrkrData->pTCPClt, pData->iRebindInterval)); } finalize_it: RETiRet; @@ -1067,7 +1077,6 @@ CODESTARTnewActInst tplToUse = ustrdup((pData->tplName == NULL) ? getDfltTpl() : pData->tplName); CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_NO_RQD_TPL_OPTS)); - CHKiRet(initTCP(pData)); CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst @@ -1236,7 +1245,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) cs.pPermPeers = NULL; } } - CHKiRet(initTCP(pData)); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct |