diff options
Diffstat (limited to 'plugins')
28 files changed, 960 insertions, 576 deletions
diff --git a/plugins/impstats/impstats.c b/plugins/impstats/impstats.c index 694c07c4..a883ef1b 100644 --- a/plugins/impstats/impstats.c +++ b/plugins/impstats/impstats.c @@ -377,7 +377,6 @@ checkRuleset(modConfData_t *modConf) DEFiRet; modConf->pBindRuleset = NULL; /* assume default ruleset */ -dbgprintf("DDDD: impstats ruleset %s\n", modConf->pszBindRuleset); if(modConf->pszBindRuleset == NULL) FINALIZE; @@ -390,7 +389,6 @@ dbgprintf("DDDD: impstats ruleset %s\n", modConf->pszBindRuleset); CHKiRet(localRet); modConf->pBindRuleset = pRuleset; finalize_it: -dbgprintf("DDDD: impstats ruleset ptr %p\n", modConf->pBindRuleset); RETiRet; } diff --git a/plugins/imuxsock/imuxsock.c b/plugins/imuxsock/imuxsock.c index 4bce0c1c..a53ce159 100644 --- a/plugins/imuxsock/imuxsock.c +++ b/plugins/imuxsock/imuxsock.c @@ -398,7 +398,7 @@ addListner(instanceConf_t *inst) listeners[nfd].flags = inst->bIgnoreTimestamp ? IGNDATE : NOFLAG; listeners[nfd].bCreatePath = inst->bCreatePath; listeners[nfd].sockName = ustrdup(inst->sockName); - listeners[nfd].bUseCreds = (inst->bDiscardOwnMsgs || inst->bWritePid || inst->ratelimitInterval || inst->bAnnotate) ? 1 : 0; + listeners[nfd].bUseCreds = (inst->bDiscardOwnMsgs || inst->bWritePid || inst->ratelimitInterval || inst->bAnnotate || inst->bUseSysTimeStamp) ? 1 : 0; listeners[nfd].bAnnotate = inst->bAnnotate; listeners[nfd].bParseTrusted = inst->bParseTrusted; listeners[nfd].bDiscardOwnMsgs = inst->bDiscardOwnMsgs; diff --git a/plugins/mmanon/mmanon.c b/plugins/mmanon/mmanon.c index 16a4f34b..28797807 100644 --- a/plugins/mmanon/mmanon.c +++ b/plugins/mmanon/mmanon.c @@ -71,6 +71,10 @@ typedef struct _instanceData { } ipv4; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -119,6 +123,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -130,6 +138,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + static inline void setInstParamDefaults(instanceData *pData) { @@ -358,7 +371,7 @@ CODESTARTdoAction lenMsg = getMSGLen(pMsg); msg = getMSG(pMsg); for(i = 0 ; i < lenMsg ; ++i) { - anonip(pData, msg, &lenMsg, &i); + anonip(pWrkrData->pData, msg, &lenMsg, &i); } if(lenMsg != getMSGLen(pMsg)) setMSGLen(pMsg, lenMsg); @@ -387,6 +400,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/mmaudit/mmaudit.c b/plugins/mmaudit/mmaudit.c index c7cff2cb..75f8dd4b 100644 --- a/plugins/mmaudit/mmaudit.c +++ b/plugins/mmaudit/mmaudit.c @@ -14,7 +14,7 @@ * * File begun on 2012-02-23 by RGerhards * - * Copyright 2012 Adiscon GmbH. + * Copyright 2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -69,6 +69,11 @@ typedef struct _instanceData { int dummy; /* remove when the first real parameter is needed */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + + BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars resetConfigVariables(NULL, NULL); @@ -79,6 +84,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -89,6 +98,10 @@ BEGINfreeInstance CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -302,6 +315,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt diff --git a/plugins/mmfields/mmfields.c b/plugins/mmfields/mmfields.c index fa7fa100..c408a6c9 100644 --- a/plugins/mmfields/mmfields.c +++ b/plugins/mmfields/mmfields.c @@ -56,6 +56,10 @@ typedef struct _instanceData { uchar *jsonRoot; /**< container where to store fields */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -103,6 +107,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -114,6 +122,10 @@ CODESTARTfreeInstance free(pData->jsonRoot); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + static inline void setInstParamDefaults(instanceData *pData) @@ -232,7 +244,7 @@ CODESTARTdoAction pMsg = (msg_t*) ppString[0]; lenMsg = getMSGLen(pMsg); msg = getMSG(pMsg); - CHKiRet(parse_fields(pData, pMsg, msg, lenMsg)); + CHKiRet(parse_fields(pWrkrData->pData, pMsg, msg, lenMsg)); finalize_it: ENDdoAction @@ -259,6 +271,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/mmjsonparse/mmjsonparse.c b/plugins/mmjsonparse/mmjsonparse.c index b16aef0e..9c0ab88f 100644 --- a/plugins/mmjsonparse/mmjsonparse.c +++ b/plugins/mmjsonparse/mmjsonparse.c @@ -58,9 +58,15 @@ DEFobjCurrIf(errmsg); DEF_OMOD_STATIC_DATA typedef struct _instanceData { - struct json_tokener *tokener; + int dummy; /* not needed, but some compilers do not support empty structs */ + /* REMOVE dummy when real data items are to be added! */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + struct json_tokener *tokener; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -94,14 +100,18 @@ ENDfreeCnf BEGINcreateInstance CODESTARTcreateInstance - pData->tokener = json_tokener_new(); - if(pData->tokener == NULL) { +ENDcreateInstance + +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->tokener = json_tokener_new(); + if(pWrkrData->tokener == NULL) { errmsg.LogError(0, RS_RET_ERR, "error: could not create json " - "tokener, cannot activate action"); + "tokener, cannot activate instance"); ABORT_FINALIZE(RS_RET_ERR); } finalize_it: -ENDcreateInstance +ENDcreateWrkrInstance BEGINisCompatibleWithFeature @@ -111,10 +121,14 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - if(pData->tokener != NULL) - json_tokener_free(pData->tokener); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + if(pWrkrData->tokener != NULL) + json_tokener_free(pWrkrData->tokener); +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -128,28 +142,28 @@ ENDtryResume static rsRetVal -processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf) +processJSON(wrkrInstanceData_t *pWrkrData, msg_t *pMsg, char *buf, size_t lenBuf) { struct json_object *json; const char *errMsg; DEFiRet; - assert(pData->tokener != NULL); + assert(pWrkrData->tokener != NULL); DBGPRINTF("mmjsonparse: toParse: '%s'\n", buf); - json_tokener_reset(pData->tokener); + json_tokener_reset(pWrkrData->tokener); - json = json_tokener_parse_ex(pData->tokener, buf, lenBuf); + json = json_tokener_parse_ex(pWrkrData->tokener, buf, lenBuf); if(Debug) { errMsg = NULL; if(json == NULL) { enum json_tokener_error err; - err = pData->tokener->err; + err = pWrkrData->tokener->err; if(err != json_tokener_continue) errMsg = json_tokener_errors[err]; else errMsg = "Unterminated input"; - } else if((size_t)pData->tokener->char_offset < lenBuf) + } else if((size_t)pWrkrData->tokener->char_offset < lenBuf) errMsg = "Extra characters after JSON object"; else if(!json_object_is_type(json, json_type_object)) errMsg = "JSON value is not an object"; @@ -159,7 +173,7 @@ processJSON(instanceData *pData, msg_t *pMsg, char *buf, size_t lenBuf) } } if(json == NULL - || ((size_t)pData->tokener->char_offset < lenBuf) + || ((size_t)pWrkrData->tokener->char_offset < lenBuf) || (!json_object_is_type(json, json_type_object))) { ABORT_FINALIZE(RS_RET_NO_CEE_MSG); } @@ -194,7 +208,7 @@ CODESTARTdoAction ABORT_FINALIZE(RS_RET_NO_CEE_MSG); } buf += LEN_COOKIE; - CHKiRet(processJSON(pData, pMsg, (char*) buf, strlen((char*)buf))); + CHKiRet(processJSON(pWrkrData, pMsg, (char*) buf, strlen((char*)buf))); bSuccess = 1; finalize_it: if(iRet == RS_RET_NO_CEE_MSG) { @@ -257,6 +271,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/mmnormalize/Makefile.am b/plugins/mmnormalize/Makefile.am index 0a3b5ba5..6a50264d 100644 --- a/plugins/mmnormalize/Makefile.am +++ b/plugins/mmnormalize/Makefile.am @@ -1,8 +1,8 @@ pkglib_LTLIBRARIES = mmnormalize.la mmnormalize_la_SOURCES = mmnormalize.c -mmnormalize_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(LIBLOGNORM_CFLAGS) $(LIBEE_CFLAGS) -mmnormalize_la_LDFLAGS = -module -avoid-version $(LIBLOGNORM_LIBS) $(LIBEE_LIBS) +mmnormalize_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(LIBLOGNORM_CFLAGS) +mmnormalize_la_LDFLAGS = -module -avoid-version $(LIBLOGNORM_LIBS) mmnormalize_la_LIBADD = EXTRA_DIST = diff --git a/plugins/mmnormalize/mmnormalize.c b/plugins/mmnormalize/mmnormalize.c index 7e25824a..ba2e730d 100644 --- a/plugins/mmnormalize/mmnormalize.c +++ b/plugins/mmnormalize/mmnormalize.c @@ -1,15 +1,12 @@ /* mmnormalize.c * This is a message modification module. It normalizes the input message with - * the help of liblognorm. The messages EE event structure is updated. + * the help of liblognorm. The message's JSON variables are updated. * * NOTE: read comments in module-template.h for details on the calling interface! * - * TODO: check if we can replace libee via JSON system - currently that part - * is pretty inefficient... rgerhards, 2012-08-27 - * * File begun on 2010-01-01 by RGerhards * - * Copyright 2010-2012 Rainer Gerhards and Adiscon GmbH. + * Copyright 2010-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -39,7 +36,6 @@ #include <errno.h> #include <unistd.h> #include <libestr.h> -#include <libee/libee.h> #include <json.h> #include <liblognorm.h> #include "conf.h" @@ -67,9 +63,13 @@ typedef struct _instanceData { sbool bUseRawMsg; /**< use %rawmsg% instead of %msg% */ uchar *rulebase; /**< name of rulebase to use */ ln_ctx ctxln; /**< context to be used for liblognorm */ - ee_ctx ctxee; /**< context to be used for libee */ + char *pszPath; /**< path of normalized data */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { uchar *rulebase; /**< name of normalization rulebase to use */ int bUseRawMsg; /**< use %rawmsg% instead of %msg% */ @@ -80,6 +80,7 @@ static configSettings_t cs; /* action (instance) parameters */ static struct cnfparamdescr actpdescr[] = { { "rulebase", eCmdHdlrGetWord, 1 }, + { "path", eCmdHdlrGetWord, 0 }, { "userawmsg", eCmdHdlrBinary, 0 } }; static struct cnfparamblk actpblk = @@ -96,30 +97,21 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ -/* to be called to build the libee part of the instance ONCE ALL PARAMETERS ARE CORRECT +/* to be called to build the liblognorm part of the instance ONCE ALL PARAMETERS ARE CORRECT * (and set within pData!). */ static rsRetVal buildInstance(instanceData *pData) { DEFiRet; - if((pData->ctxee = ee_initCtx()) == NULL) { - errmsg.LogError(0, RS_RET_ERR_LIBEE_INIT, "error: could not initialize libee " - "ctx, cannot activate action"); - ABORT_FINALIZE(RS_RET_ERR_LIBEE_INIT); - } - if((pData->ctxln = ln_initCtx()) == NULL) { errmsg.LogError(0, RS_RET_ERR_LIBLOGNORM_INIT, "error: could not initialize " "liblognorm ctx, cannot activate action"); - ee_exitCtx(pData->ctxee); ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_INIT); } - ln_setEECtx(pData->ctxln, pData->ctxee); if(ln_loadSamples(pData->ctxln, (char*) pData->rulebase) != 0) { errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' " - "could not be loaded cannot activate action", cs.rulebase); - ee_exitCtx(pData->ctxee); + "could not be loaded cannot activate action", pData->rulebase); ln_exitCtx(pData->ctxln); ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD); } @@ -139,6 +131,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINbeginCnfLoad CODESTARTbeginCnfLoad loadModConf = pModConf; @@ -176,11 +173,16 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance free(pData->rulebase); - ee_exitCtx(pData->ctxee); ln_exitCtx(pData->ctxln); + free(pData->pszPath); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("mmnormalize\n"); @@ -193,50 +195,28 @@ ENDtryResume BEGINdoAction msg_t *pMsg; - es_str_t *str; uchar *buf; - char *cstrJSON; int len; int r; - struct ee_event *event = NULL; - struct json_tokener *tokener; - struct json_object *json; + struct json_object *json = NULL; CODESTARTdoAction pMsg = (msg_t*) ppString[0]; - /* note that we can performance-optimize the interface, but this also - * requires changes to the libraries. For now, we accept message - * duplication. -- rgerhards, 2010-12-01 - */ - if(pData->bUseRawMsg) { + if(pWrkrData->pData->bUseRawMsg) { getRawMsg(pMsg, &buf, &len); } else { buf = getMSG(pMsg); len = getMSGLen(pMsg); } - str = es_newStrFromCStr((char*)buf, len); - r = ln_normalize(pData->ctxln, str, &event); + r = ln_normalize(pWrkrData->pData->ctxln, (char*)buf, len, &json); if(r != 0) { DBGPRINTF("error %d during ln_normalize\n", r); MsgSetParseSuccess(pMsg, 0); } else { MsgSetParseSuccess(pMsg, 1); } - es_deleteStr(str); - - /* reformat to our json data struct */ - /* TODO: this is all extremly ineffcient! */ - ee_fmtEventToJSON(event, &str); - cstrJSON = es_str2cstr(str, NULL); - ee_deleteEvent(event); - dbgprintf("mmnormalize generated: %s\n", cstrJSON); - - tokener = json_tokener_new(); - json = json_tokener_parse_ex(tokener, cstrJSON, strlen((char*)cstrJSON)); - json_tokener_free(tokener); - msgAddJSON(pMsg, (uchar*)"!", json); - - free(cstrJSON); - es_deleteStr(str); + + msgAddJSON(pMsg, (uchar*)pWrkrData->pData->pszPath + 1, json); + ENDdoAction @@ -245,12 +225,14 @@ setInstParamDefaults(instanceData *pData) { pData->rulebase = NULL; pData->bUseRawMsg = 0; + pData->pszPath = strdup("$!"); } BEGINnewActInst struct cnfparamvals *pvals; int i; int bDestructPValsOnExit; + char *cstr; CODESTARTnewActInst DBGPRINTF("newActInst (mmnormalize)\n"); @@ -278,6 +260,23 @@ CODESTARTnewActInst pData->rulebase = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "userawmsg")) { pData->bUseRawMsg = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "path")) { + cstr = es_str2cstr(pvals[i].val.d.estr, NULL); + if (strlen(cstr) < 2) { + errmsg.LogError(0, RS_RET_VALUE_NOT_SUPPORTED, + "mmnormalize: valid path name should be at least " + "2 symbols long, got %s", cstr); + free(cstr); + } else if (cstr[0] != '$') { + errmsg.LogError(0, RS_RET_VALUE_NOT_SUPPORTED, + "mmnormalize: valid path name should start with $," + "got %s", cstr); + free(cstr); + } else { + free(pData->pszPath); + pData->pszPath = cstr; + } + continue; } else { DBGPRINTF("mmnormalize: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); @@ -313,6 +312,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) pData->rulebase = cs.rulebase; pData->bUseRawMsg = cs.bUseRawMsg; + pData->pszPath = strdup("$!"); /* old interface does not support this feature */ /* all config vars auto-reset! */ cs.bUseRawMsg = 0; cs.rulebase = NULL; /* we used it up! */ @@ -338,6 +338,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/mmpstrucdata/mmpstrucdata.c b/plugins/mmpstrucdata/mmpstrucdata.c index 123363bc..680ba92b 100644 --- a/plugins/mmpstrucdata/mmpstrucdata.c +++ b/plugins/mmpstrucdata/mmpstrucdata.c @@ -53,6 +53,10 @@ typedef struct _instanceData { uchar *jsonRoot; /**< container where to store fields */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -99,6 +103,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -110,6 +118,10 @@ CODESTARTfreeInstance free(pData->jsonRoot); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + static inline void setInstParamDefaults(instanceData *pData) @@ -359,7 +371,7 @@ dbgprintf("DDDD: parse mmpstrucdata\n"); /* don't check return code - we never want rsyslog to retry * or suspend this action! */ - parse_sd(pData, pMsg); + parse_sd(pWrkrData->pData, pMsg); dbgprintf("DDDD: done parse mmpstrucdata\n"); finalize_it: ENDdoAction @@ -387,6 +399,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/mmsequence/mmsequence.c b/plugins/mmsequence/mmsequence.c index 20a85370..d1ea85b6 100644 --- a/plugins/mmsequence/mmsequence.c +++ b/plugins/mmsequence/mmsequence.c @@ -74,6 +74,10 @@ typedef struct _instanceData { char *pszVar; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -100,6 +104,8 @@ static struct cnfparamblk actpblk = /* table for key-counter pairs */ static struct hashtable *ght; static pthread_mutex_t ght_mutex = PTHREAD_MUTEX_INITIALIZER; + +static pthread_mutex_t inst_mutex = PTHREAD_MUTEX_INITIALIZER; BEGINbeginCnfLoad CODESTARTbeginCnfLoad @@ -129,6 +135,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -139,6 +149,10 @@ BEGINfreeInstance CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + static inline void setInstParamDefaults(instanceData *pData) @@ -243,7 +257,7 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_ERR); } } - pthread_mutex_unlock(&ght_mutex); + pthread_mutex_unlock(&ght_mutex); break; default: errmsg.LogError(0, RS_RET_INVLD_MODE, @@ -303,7 +317,9 @@ BEGINdoAction struct json_object *json; int val = 0; int *pCounter; + instanceData *pData; CODESTARTdoAction + pData = pWrkrData->pData; pMsg = (msg_t*) ppString[0]; switch(pData->mode) { @@ -312,12 +328,18 @@ CODESTARTdoAction (pData->valueTo - pData->valueFrom)); break; case mmSequencePerInstance: - if (pData->value >= pData->valueTo - pData->step) { - pData->value = pData->valueFrom; + if (!pthread_mutex_lock(&inst_mutex)) { + if (pData->value >= pData->valueTo - pData->step) { + pData->value = pData->valueFrom; + } else { + pData->value += pData->step; + } + val = pData->value; + pthread_mutex_unlock(&inst_mutex); } else { - pData->value += pData->step; + errmsg.LogError(0, RS_RET_ERR, + "mmsequence: mutex lock has failed!"); } - val = pData->value; break; case mmSequencePerKey: if (!pthread_mutex_lock(&ght_mutex)) { @@ -381,6 +403,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/mmutf8fix/mmutf8fix.c b/plugins/mmutf8fix/mmutf8fix.c index e2077950..351bb129 100644 --- a/plugins/mmutf8fix/mmutf8fix.c +++ b/plugins/mmutf8fix/mmutf8fix.c @@ -60,6 +60,10 @@ typedef struct _instanceData { uint8_t mode; /* operations mode */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -108,6 +112,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature ENDisCompatibleWithFeature @@ -118,6 +127,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + static inline void setInstParamDefaults(instanceData *pData) { @@ -274,10 +288,10 @@ CODESTARTdoAction pMsg = (msg_t*) ppString[0]; lenMsg = getMSGLen(pMsg); msg = getMSG(pMsg); - if(pData->mode == MODE_CC) { - doCC(pData, msg, lenMsg); + if(pWrkrData->pData->mode == MODE_CC) { + doCC(pWrkrData->pData, msg, lenMsg); } else { - doUTF8(pData, msg, lenMsg); + doUTF8(pWrkrData->pData, msg, lenMsg); } ENDdoAction @@ -304,6 +318,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/omelasticsearch/README b/plugins/omelasticsearch/README index 9021bc0e..b8bf4151 100644 --- a/plugins/omelasticsearch/README +++ b/plugins/omelasticsearch/README @@ -1,3 +1,7 @@ +How to access ElasticSearch on local machine (for testing): +=========================================================== +see: https://github.com/mobz/elasticsearch-head + How to produce an error: ======================== It's quite easy to get 400, if you put a wrong mapping to your diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index eb82c35f..b878050d 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -69,8 +69,8 @@ STATSCOUNTER_DEF(indexESFail, mutIndexESFail) typedef struct curl_slist HEADER; typedef struct _instanceData { int port; - int replyLen; int fdErrFile; /* error file fd or -1 if not open */ + pthread_mutex_t mutErrFile; uchar *server; uchar *uid; uchar *pwd; @@ -80,24 +80,29 @@ typedef struct _instanceData { uchar *tplName; uchar *timeout; uchar *bulkId; - uchar *restURL; /* last used URL for error reporting */ uchar *errorFile; - char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; sbool dynBulkId; sbool bulkmode; sbool asyncRepl; +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; + int replyLen; + char *reply; + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ + uchar *restURL; /* last used URL for error reporting */ struct { es_str_t *data; int nmemb; /* number of messages in batch (for statistics counting) */ uchar *currTpl1; uchar *currTpl2; } batch; - CURL *curlHandle; /* libcurl session handle */ - HEADER *postHeader; /* json POST request info */ -} instanceData; +} wrkrInstanceData_t; /* tables for interfacing with the v6 config system */ @@ -117,7 +122,7 @@ static struct cnfparamdescr actpdescr[] = { { "asyncrepl", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "errorfile", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 }, + { "template", eCmdHdlrGetWord, 0 }, { "dynbulkid", eCmdHdlrBinary, 0 }, { "bulkid", eCmdHdlrGetWord, 0 }, }; @@ -127,12 +132,32 @@ static struct cnfparamblk actpblk = actpdescr }; +static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData); + BEGINcreateInstance CODESTARTcreateInstance - pData->restURL = NULL; pData->fdErrFile = -1; + pthread_mutex_init(&pData->mutErrFile, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +dbgprintf("omelasticsearch: createWrkrInstance\n"); + pWrkrData->restURL = NULL; + if(pData->bulkmode) { + pWrkrData->batch.currTpl1 = NULL; + pWrkrData->batch.currTpl2 = NULL; + if((pWrkrData->batch.data = es_newStr(1024)) == NULL) { + DBGPRINTF("omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + CHKiRet(curlSetup(pWrkrData, pWrkrData->pData)); +finalize_it: +dbgprintf("DDDD: createWrkrInstance,pData %p/%p, pWrkrData %p\n", pData, pWrkrData->pData, pWrkrData); +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -141,16 +166,9 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - if (pData->postHeader) { - curl_slist_free_all(pData->postHeader); - pData->postHeader = NULL; - } - if (pData->curlHandle) { - curl_easy_cleanup(pData->curlHandle); - pData->curlHandle = NULL; - } if(pData->fdErrFile != -1) close(pData->fdErrFile); + pthread_mutex_destroy(&pData->mutErrFile); free(pData->server); free(pData->uid); free(pData->pwd); @@ -159,11 +177,23 @@ CODESTARTfreeInstance free(pData->parent); free(pData->tplName); free(pData->timeout); - free(pData->restURL); free(pData->errorFile); free(pData->bulkId); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + if(pWrkrData->postHeader) { + curl_slist_free_all(pWrkrData->postHeader); + pWrkrData->postHeader = NULL; + } + if(pWrkrData->curlHandle) { + curl_easy_cleanup(pWrkrData->curlHandle); + pWrkrData->curlHandle = NULL; + } + free(pWrkrData->restURL); +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("omelasticsearch\n"); @@ -211,7 +241,7 @@ setBaseURL(instanceData *pData, es_str_t **url) static inline rsRetVal -checkConn(instanceData *pData) +checkConn(wrkrInstanceData_t *pWrkrData) { es_str_t *url; CURL *curl = NULL; @@ -219,7 +249,7 @@ checkConn(instanceData *pData) char *cstr; DEFiRet; - setBaseURL(pData, &url); + setBaseURL(pWrkrData->pData, &url); curl = curl_easy_init(); if(curl == NULL) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); @@ -235,16 +265,16 @@ checkConn(instanceData *pData) curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); - pData->reply = NULL; - pData->replyLen = 0; - curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData); res = curl_easy_perform(curl); if(res != CURLE_OK) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } - free(pData->reply); + free(pWrkrData->reply); DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: @@ -257,7 +287,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume DBGPRINTF("omelasticsearch: tryResume called\n"); - iRet = checkConn(pData); + iRet = checkConn(pWrkrData); ENDtryResume @@ -330,7 +360,7 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, static rsRetVal -setCurlURL(instanceData *pData, uchar **tpls) +setCurlURL(wrkrInstanceData_t *pWrkrData, instanceData *pData, uchar **tpls) { char authBuf[1024]; uchar *searchIndex; @@ -368,11 +398,11 @@ setCurlURL(instanceData *pData, uchar **tpls) if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } - free(pData->restURL); - pData->restURL = (uchar*)es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); + free(pWrkrData->restURL); + pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_URL, pWrkrData->restURL); es_deleteStr(url); - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pWrkrData->restURL); if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -383,8 +413,8 @@ setCurlURL(instanceData *pData, uchar **tpls) rLocal); ABORT_FINALIZE(RS_RET_ERR); } - curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); - curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } finalize_it: RETiRet; @@ -396,7 +426,7 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar **tpls) +buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; @@ -411,29 +441,29 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) # define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); - r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, + getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId); + r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); - if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType, ustrlen(searchType)); if(parent != NULL) { - if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent)); } if(bulkId != NULL) { - if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId)); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId)); } - if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); - if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); } - ++pData->batch.nmemb; + ++pWrkrData->batch.nmemb; iRet = RS_RET_DEFER_COMMIT; finalize_it: @@ -446,7 +476,7 @@ finalize_it: * needs to be closed, HUP must be sent. */ static inline rsRetVal -writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) +writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) { char *rendered = NULL; cJSON *errRoot; @@ -454,6 +484,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) cJSON *replyRoot = *pReplyRoot; size_t toWrite; ssize_t wrRet; + sbool bMutLocked = 0; char errStr[1024]; DEFiRet; @@ -463,6 +494,9 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) FINALIZE; } + pthread_mutex_lock(&pData->mutErrFile); + bMutLocked = 1; + if(pData->fdErrFile == -1) { pData->fdErrFile = open((char*)pData->errorFile, O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, @@ -474,7 +508,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) } } if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); - cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL)); + cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pWrkrData->restURL)); cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg)); if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); @@ -495,13 +529,15 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) *pReplyRoot = NULL; /* tell caller not to delete once again! */ finalize_it: + if(bMutLocked) + pthread_mutex_unlock(&pData->mutErrFile); free(rendered); RETiRet; } static inline rsRetVal -checkResultBulkmode(instanceData *pData, cJSON *root) +checkResultBulkmode(wrkrInstanceData_t *pWrkrData, cJSON *root) { int i; int numitems; @@ -515,7 +551,7 @@ checkResultBulkmode(instanceData *pData, cJSON *root) if(items == NULL || items->type != cJSON_Array) { DBGPRINTF("omelasticsearch: error in elasticsearch reply: " "bulkmode insert does not return array, reply is: %s\n", - pData->reply); + pWrkrData->reply); ABORT_FINALIZE(RS_RET_DATAFAIL); } numitems = cJSON_GetArraySize(items); @@ -547,20 +583,20 @@ finalize_it: static inline rsRetVal -checkResult(instanceData *pData, uchar *reqmsg) +checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) { cJSON *root; cJSON *ok; DEFiRet; - root = cJSON_Parse(pData->reply); + root = cJSON_Parse(pWrkrData->reply); if(root == NULL) { DBGPRINTF("omelasticsearch: could not parse JSON result \n"); ABORT_FINALIZE(RS_RET_ERR); } - if(pData->bulkmode) { - iRet = checkResultBulkmode(pData, root); + if(pWrkrData->pData->bulkmode) { + iRet = checkResultBulkmode(pWrkrData, root); } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { @@ -572,7 +608,7 @@ checkResult(instanceData *pData, uchar *reqmsg) * these in any case. */ if(iRet == RS_RET_DATAFAIL) { - writeDataError(pData, &root, reqmsg); + writeDataError(pWrkrData, pWrkrData->pData, &root, reqmsg); iRet = RS_RET_OK; /* we have handled the problem! */ } @@ -587,19 +623,19 @@ finalize_it: static rsRetVal -curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsgs) +curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls, int nmsgs) { CURLcode code; - CURL *curl = pData->curlHandle; + CURL *curl = pWrkrData->curlHandle; DEFiRet; - pData->reply = NULL; - pData->replyLen = 0; + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; - if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent) - CHKiRet(setCurlURL(pData, tpls)); + if(pWrkrData->pData->dynSrchIdx || pWrkrData->pData->dynSrchType || pWrkrData->pData->dynParent) + CHKiRet(setCurlURL(pWrkrData, pWrkrData->pData, tpls)); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); @@ -618,27 +654,27 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsg break; } - DBGPRINTF("omelasticsearch: pData replyLen = '%d'\n", pData->replyLen); - if (pData->replyLen > 0) { - pData->reply[pData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ + DBGPRINTF("omelasticsearch: pWrkrData replyLen = '%d'\n", pWrkrData->replyLen); + if(pWrkrData->replyLen > 0) { + pWrkrData->reply[pWrkrData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ } - DBGPRINTF("omelasticsearch: pData reply: '%s'\n", pData->reply); + DBGPRINTF("omelasticsearch: pWrkrData reply: '%s'\n", pWrkrData->reply); - CHKiRet(checkResult(pData, message)); + CHKiRet(checkResult(pWrkrData, message)); finalize_it: - free(pData->reply); + free(pWrkrData->reply); RETiRet; } BEGINbeginTransaction CODESTARTbeginTransaction -dbgprintf("omelasticsearch: beginTransaction\n"); - if(!pData->bulkmode) { +dbgprintf("omelasticsearch: beginTransaction, pWrkrData %p, pData %p\n", pWrkrData, pWrkrData->pData); + if(!pWrkrData->pData->bulkmode) { FINALIZE; } - es_emptyStr(pData->batch.data); - pData->batch.nmemb = 0; + es_emptyStr(pWrkrData->batch.data); + pWrkrData->batch.nmemb = 0; finalize_it: ENDbeginTransaction @@ -646,14 +682,14 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); - if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString)); + if(pWrkrData->pData->bulkmode) { + CHKiRet(buildBatch(pWrkrData, ppString[0], ppString)); } else { - CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), + CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]), ppString, 1)); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pWrkrData->pData->bulkmode); ENDdoAction @@ -662,13 +698,13 @@ BEGINendTransaction CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); /* End Transaction only if batch data is not empty */ - if (pData->batch.data != NULL ) { - cstr = es_str2cstr(pData->batch.data, NULL); + if (pWrkrData->batch.data != NULL ) { + cstr = es_str2cstr(pWrkrData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, pData->batch.nmemb)); + CHKiRet(curlPost(pWrkrData, (uchar*) cstr, strlen(cstr), NULL, pWrkrData->batch.nmemb)); } else - dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n"); + dbgprintf("omelasticsearch: endTransaction, pWrkrData->batch.data is NULL, nothing to send. \n"); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -679,24 +715,24 @@ size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) { char *p = (char *)ptr; - instanceData *pData = (instanceData*) userdata; + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) userdata; char *buf; size_t newlen; - newlen = pData->replyLen + size*nmemb; - if((buf = realloc(pData->reply, newlen + 1)) == NULL) { + newlen = pWrkrData->replyLen + size*nmemb; + if((buf = realloc(pWrkrData->reply, newlen + 1)) == NULL) { DBGPRINTF("omelasticsearch: realloc failed in curlResult\n"); return 0; /* abort due to failure */ } - memcpy(buf+pData->replyLen, p, size*nmemb); - pData->replyLen = newlen; - pData->reply = buf; + memcpy(buf+pWrkrData->replyLen, p, size*nmemb); + pWrkrData->replyLen = newlen; + pWrkrData->reply = buf; return size*nmemb; } static rsRetVal -curlSetup(instanceData *pData) +curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData) { HEADER *header; CURL *handle; @@ -712,13 +748,13 @@ curlSetup(instanceData *pData) curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); curl_easy_setopt(handle, CURLOPT_POST, 1); - pData->curlHandle = handle; - pData->postHeader = header; + pWrkrData->curlHandle = handle; + pWrkrData->postHeader = header; if( pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL); + setCurlURL(pWrkrData, pData, NULL); } if(Debug) { @@ -838,16 +874,6 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } - if(pData->bulkmode) { - pData->batch.currTpl1 = NULL; - pData->batch.currTpl2 = NULL; - if((pData->batch.data = es_newStr(1024)) == NULL) { - DBGPRINTF("omelasticsearch: error creating batch string " - "turned off bulk mode\n"); - pData->bulkmode = 0; /* at least it works */ - } - } - iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; @@ -939,9 +965,6 @@ CODESTARTnewActInst pData->searchIndex = (uchar*) strdup("system"); if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); - - CHKiRet(curlSetup(pData)); - CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst @@ -979,6 +1002,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_doHUP diff --git a/plugins/omhiredis/omhiredis.c b/plugins/omhiredis/omhiredis.c index 757d5eb2..28a1b9cd 100644 --- a/plugins/omhiredis/omhiredis.c +++ b/plugins/omhiredis/omhiredis.c @@ -52,14 +52,17 @@ DEFobjCurrIf(errmsg) * this will be accessable * via pData */ typedef struct _instanceData { - redisContext *conn; /* redis connection */ uchar *server; /* redis server address */ int port; /* redis port */ uchar *tplName; /* template name */ - redisReply **replies; /* array to hold replies from redis */ - int count; /* count of command sent for current batch */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + redisContext *conn; /* redis connection */ + redisReply **replies; /* array to hold replies from redis */ + int count; /* count of command sent for current batch */ +} wrkrInstanceData_t; static struct cnfparamdescr actpdescr[] = { { "server", eCmdHdlrGetWord, 0 }, @@ -76,6 +79,11 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->conn = NULL; /* Connect later */ +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -83,11 +91,11 @@ CODESTARTisCompatibleWithFeature ENDisCompatibleWithFeature /* called when closing */ -static void closeHiredis(instanceData *pData) +static void closeHiredis(wrkrInstanceData_t *pWrkrData) { - if(pData->conn != NULL) { - redisFree(pData->conn); - pData->conn = NULL; + if(pWrkrData->conn != NULL) { + redisFree(pWrkrData->conn); + pWrkrData->conn = NULL; } } @@ -95,10 +103,15 @@ static void closeHiredis(instanceData *pData) * TODO: free **replies */ BEGINfreeInstance CODESTARTfreeInstance - closeHiredis(pData); - free(pData->server); + if (pData->server != NULL) { + free(pData->server); + } ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + closeHiredis(pWrkrData); +ENDfreeWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -106,17 +119,20 @@ CODESTARTdbgPrintInstInfo ENDdbgPrintInstInfo /* establish our connection to redis */ -static rsRetVal initHiredis(instanceData *pData, int bSilent) +static rsRetVal initHiredis(wrkrInstanceData_t *pWrkrData, int bSilent) { char *server; DEFiRet; - server = (pData->server == NULL) ? "127.0.0.1" : (char*) pData->server; - DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server, pData->port); - + server = (pWrkrData->pData->server == NULL) ? "127.0.0.1" : + (char*) pWrkrData->pData->server; + DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server, + pWrkrData->pData->port); + struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */ - pData->conn = redisConnectWithTimeout(server, pData->port, timeout); - if (pData->conn->err) { + pWrkrData->conn = redisConnectWithTimeout(server, pWrkrData->pData->port, + timeout); + if (pWrkrData->conn->err) { if(!bSilent) errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize redis handle"); @@ -126,29 +142,29 @@ finalize_it: RETiRet; } -rsRetVal writeHiredis(uchar *message, instanceData *pData) +rsRetVal writeHiredis(uchar *message, wrkrInstanceData_t *pWrkrData) { DEFiRet; - /* if we do not have a redis connection, call - * initHiredis and try to establish one */ - if(pData->conn == NULL) - CHKiRet(initHiredis(pData, 0)); + /* if we do not have a redis connection, call + * initHiredis and try to establish one */ + if(pWrkrData->conn == NULL) + CHKiRet(initHiredis(pWrkrData, 0)); - /* try to append the command to the pipeline. - * REDIS_ERR reply indicates something bad - * happened, in which case abort. otherwise - * increase our current pipeline count - * by 1 and continue. */ + /* try to append the command to the pipeline. + * REDIS_ERR reply indicates something bad + * happened, in which case abort. otherwise + * increase our current pipeline count + * by 1 and continue. */ int rc; - rc = redisAppendCommand(pData->conn, (char*)message); + rc = redisAppendCommand(pWrkrData->conn, (char*)message); if (rc == REDIS_ERR) { - errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", pData->conn->errstr); - dbgprintf("omhiredis: %s\n", pData->conn->errstr); + errmsg.LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr); + dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr); ABORT_FINALIZE(RS_RET_ERR); } else { - pData->count++; - } + pWrkrData->count++; + } finalize_it: RETiRet; @@ -158,17 +174,18 @@ finalize_it: * try to restablish our connection to redis */ BEGINtryResume CODESTARTtryResume - if(pData->conn == NULL) - iRet = initHiredis(pData, 0); + if(pWrkrData->conn == NULL) + iRet = initHiredis(pWrkrData, 0); ENDtryResume -/* begin a transaction. for now does nothing. +/* begin a transaction. * if I decide to use MULTI ... EXEC in the - * fture, this block should send the + * future, this block should send the * MULTI command to redis. */ BEGINbeginTransaction CODESTARTbeginTransaction - dbgprintf("omhiredis: beginTransaction called\n"); + dbgprintf("omhiredis: beginTransaction called\n"); + pWrkrData->count = 0; ENDbeginTransaction /* call writeHiredis for this log line, @@ -176,8 +193,8 @@ ENDbeginTransaction * current pipeline */ BEGINdoAction CODESTARTdoAction - CHKiRet(writeHiredis(ppString[0], pData)); - iRet = RS_RET_DEFER_COMMIT; + CHKiRet(writeHiredis(ppString[0], pWrkrData)); + iRet = RS_RET_DEFER_COMMIT; finalize_it: ENDdoAction @@ -189,16 +206,15 @@ ENDdoAction * which should be fixed */ BEGINendTransaction CODESTARTendTransaction - dbgprintf("omhiredis: endTransaction called\n"); - int i; - pData->replies = malloc ( sizeof ( redisReply* ) * pData->count ); - for ( i = 0; i < pData->count; i++ ) { - redisGetReply ( pData->conn, (void *)&pData->replies[i] ); - /* TODO: add error checking here! */ - freeReplyObject ( pData->replies[i] ); - } - free ( pData->replies ); - pData->count = 0; + dbgprintf("omhiredis: endTransaction called\n"); + int i; + pWrkrData->replies = malloc ( sizeof ( redisReply* ) * pWrkrData->count ); + for ( i = 0; i < pWrkrData->count; i++ ) { + redisGetReply ( pWrkrData->conn, (void *)&pWrkrData->replies[i] ); + /* TODO: add error checking here! */ + freeReplyObject ( pWrkrData->replies[i] ); + } + free ( pWrkrData->replies ); ENDendTransaction /* set defaults. note server is set to NULL @@ -211,7 +227,6 @@ setInstParamDefaults(instanceData *pData) pData->server = NULL; pData->port = 6379; pData->tplName = NULL; - pData->count = 0; } /* here is where the work to set up a new instance @@ -281,6 +296,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES /* supports transaction interface */ ENDqueryEtryPt @@ -292,9 +308,9 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); - if (!bCoreSupportsBatching) { - errmsg.LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort"); - ABORT_FINALIZE(RS_RET_ERR); - } + if (!bCoreSupportsBatching) { + errmsg.LogError(0, NO_ERRCODE, "omhiredis: rsyslog core does not support batching - abort"); + ABORT_FINALIZE(RS_RET_ERR); + } DBGPRINTF("omhiredis: module compiled with rsyslog version %s.\n", VERSION); ENDmodInit diff --git a/plugins/omjournal/omjournal.c b/plugins/omjournal/omjournal.c index 160c369d..82fd7bfb 100644 --- a/plugins/omjournal/omjournal.c +++ b/plugins/omjournal/omjournal.c @@ -56,6 +56,10 @@ DEF_OMOD_STATIC_DATA typedef struct _instanceData { } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + struct modConfData_s { rsconf_t *pConf; /* our overall config object */ }; @@ -91,6 +95,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature ENDisCompatibleWithFeature @@ -101,6 +110,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINnewActInst CODESTARTnewActInst /* Note: we currently do not have any parameters, so we do not need @@ -172,6 +186,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt diff --git a/plugins/omlibdbi/omlibdbi.c b/plugins/omlibdbi/omlibdbi.c index 3beba4f0..c203b4aa 100644 --- a/plugins/omlibdbi/omlibdbi.c +++ b/plugins/omlibdbi/omlibdbi.c @@ -50,6 +50,9 @@ #include "errmsg.h" #include "conf.h" +#undef HAVE_DBI_TXSUPP +#warning transaction support disabled in v8 -- TODO: reenable + MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP MODULE_CNFNAME("omlibdbi") @@ -73,6 +76,10 @@ typedef struct _instanceData { int txSupport; /* transaction support */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { uchar *dbiDrvrDir; /* global: where do the dbi drivers reside? */ uchar *drvrName; /* driver to use */ @@ -94,6 +101,8 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ static int bLegacyCnfModGlobalsPermitted;/* are legacy module-global config parameters permitted? */ +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; + /* tables for interfacing with the v6 config system */ /* module-global parameters */ @@ -157,6 +166,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -187,6 +200,9 @@ CODESTARTfreeInstance free(pData->dbName); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -326,16 +342,16 @@ finalize_it: BEGINtryResume CODESTARTtryResume - if(pData->conn == NULL) { - iRet = initConn(pData, 1); + if(pWrkrData->pData->conn == NULL) { + iRet = initConn(pWrkrData->pData, 1); } ENDtryResume /* transaction support 2013-03 */ BEGINbeginTransaction CODESTARTbeginTransaction - if(pData->conn == NULL) { - CHKiRet(initConn(pData, 0)); + if(pWrkrData->pData->conn == NULL) { + CHKiRet(initConn(pWrkrData->pData, 0)); } # if HAVE_DBI_TXSUPP if (pData->txSupport == 1) { @@ -355,13 +371,15 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction - CHKiRet(writeDB(ppString[0], pData)); + pthread_mutex_lock(&mutDoAct); + CHKiRet(writeDB(ppString[0], pWrkrData->pData)); # if HAVE_DBI_TXSUPP if (pData->txSupport == 1) { iRet = RS_RET_DEFER_COMMIT; } # endif finalize_it: + pthread_mutex_unlock(&mutDoAct); ENDdoAction /* transaction support 2013-03 */ @@ -552,6 +570,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES diff --git a/plugins/ommail/ommail.c b/plugins/ommail/ommail.c index 0a781e10..910b371a 100644 --- a/plugins/ommail/ommail.c +++ b/plugins/ommail/ommail.c @@ -13,7 +13,7 @@ * * File begun on 2008-04-04 by RGerhards * - * Copyright 2008-2012 Adiscon GmbH. + * Copyright 2008-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -82,13 +82,21 @@ typedef struct _instanceData { uchar *pszSrvPort; uchar *pszFrom; toRcpt_t *lstRcpt; + } smtp; + } md; /* mode-specific data */ +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; + union { + struct { char RcvBuf[1024]; /* buffer for receiving server responses */ size_t lenRcvBuf; size_t iRcvBuf; /* current index into the rcvBuf (buf empty if iRcvBuf == lenRcvBuf) */ int sock; /* socket to this server (most important when we do multiple msgs per mail) */ } smtp; } md; /* mode-specific data */ -} instanceData; +} wrkrInstanceData_t; typedef struct configSettings_s { toRcpt_t *lstRcpt; @@ -112,7 +120,7 @@ ENDinitConfVars /* forward definitions (as few as possible) */ static rsRetVal Send(int sock, char *msg, size_t len); -static rsRetVal readResponse(instanceData *pData, int *piState, int iExpected); +static rsRetVal readResponse(wrkrInstanceData_t *pWrkrData, int *piState, int iExpected); /* helpers for handling the recipient lists */ @@ -163,24 +171,22 @@ finalize_it: * iStatusToCheck < 0 means no checking should happen */ static rsRetVal -WriteRcpts(instanceData *pData, uchar *pszOp, size_t lenOp, int iStatusToCheck) +WriteRcpts(wrkrInstanceData_t *pWrkrData, uchar *pszOp, size_t lenOp, int iStatusToCheck) { toRcpt_t *pRcpt; int iState; DEFiRet; - assert(pData != NULL); - assert(pszOp != NULL); assert(lenOp != 0); - for(pRcpt = pData->md.smtp.lstRcpt ; pRcpt != NULL ; pRcpt = pRcpt->pNext) { + for(pRcpt = pWrkrData->pData->md.smtp.lstRcpt ; pRcpt != NULL ; pRcpt = pRcpt->pNext) { dbgprintf("Sending '%s: <%s>'\n", pszOp, pRcpt->pszTo); - CHKiRet(Send(pData->md.smtp.sock, (char*)pszOp, lenOp)); - CHKiRet(Send(pData->md.smtp.sock, ":<", sizeof(":<") - 1)); - CHKiRet(Send(pData->md.smtp.sock, (char*)pRcpt->pszTo, strlen((char*)pRcpt->pszTo))); - CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pszOp, lenOp)); + CHKiRet(Send(pWrkrData->md.smtp.sock, ":<", sizeof(":<") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pRcpt->pszTo, strlen((char*)pRcpt->pszTo))); + CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); if(iStatusToCheck >= 0) - CHKiRet(readResponse(pData, &iState, iStatusToCheck)); + CHKiRet(readResponse(pWrkrData, &iState, iStatusToCheck)); } finalize_it: @@ -193,6 +199,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -203,17 +214,19 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance if(pData->iMode == 0) { - if(pData->md.smtp.pszSrv != NULL) - free(pData->md.smtp.pszSrv); - if(pData->md.smtp.pszSrvPort != NULL) - free(pData->md.smtp.pszSrvPort); - if(pData->md.smtp.pszFrom != NULL) - free(pData->md.smtp.pszFrom); + free(pData->md.smtp.pszSrv); + free(pData->md.smtp.pszSrvPort); + free(pData->md.smtp.pszFrom); lstRcptDestruct(pData->md.smtp.lstRcpt); } ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo printf("mail"); /* TODO: extend! */ @@ -229,16 +242,16 @@ ENDdbgPrintInstInfo * rgerhards, 2008-04-04 */ static rsRetVal -getRcvChar(instanceData *pData, char *pC) +getRcvChar(wrkrInstanceData_t *pWrkrData, char *pC) { DEFiRet; ssize_t lenBuf; - assert(pData != NULL); - if(pData->md.smtp.iRcvBuf == pData->md.smtp.lenRcvBuf) { /* buffer empty? */ + if(pWrkrData->md.smtp.iRcvBuf == pWrkrData->md.smtp.lenRcvBuf) { /* buffer empty? */ /* yes, we need to read the next server response */ do { - lenBuf = recv(pData->md.smtp.sock, pData->md.smtp.RcvBuf, sizeof(pData->md.smtp.RcvBuf), 0); + lenBuf = recv(pWrkrData->md.smtp.sock, pWrkrData->md.smtp.RcvBuf, + sizeof(pWrkrData->md.smtp.RcvBuf), 0); if(lenBuf == 0) { ABORT_FINALIZE(RS_RET_NO_MORE_DATA); } else if(lenBuf < 0) { @@ -247,15 +260,15 @@ getRcvChar(instanceData *pData, char *pC) } } else { /* good read */ - pData->md.smtp.iRcvBuf = 0; - pData->md.smtp.lenRcvBuf = lenBuf; + pWrkrData->md.smtp.iRcvBuf = 0; + pWrkrData->md.smtp.lenRcvBuf = lenBuf; } } while(lenBuf < 1); } /* when we reach this point, we have a non-empty buffer */ - *pC = pData->md.smtp.RcvBuf[pData->md.smtp.iRcvBuf++]; + *pC = pWrkrData->md.smtp.RcvBuf[pWrkrData->md.smtp.iRcvBuf++]; finalize_it: RETiRet; @@ -266,14 +279,14 @@ finalize_it: * rgerhards, 2008-04-08 */ static rsRetVal -serverDisconnect(instanceData *pData) +serverDisconnect(wrkrInstanceData_t *pWrkrData) { DEFiRet; - assert(pData != NULL); + assert(pWrkrData != NULL); - if(pData->md.smtp.sock != -1) { - close(pData->md.smtp.sock); - pData->md.smtp.sock = -1; + if(pWrkrData->md.smtp.sock != -1) { + close(pWrkrData->md.smtp.sock); + pWrkrData->md.smtp.sock = -1; } RETiRet; @@ -284,16 +297,17 @@ serverDisconnect(instanceData *pData) * rgerhards, 2008-04-04 */ static rsRetVal -serverConnect(instanceData *pData) +serverConnect(wrkrInstanceData_t *pWrkrData) { struct addrinfo *res = NULL; struct addrinfo hints; char *smtpPort; char *smtpSrv; char errStr[1024]; - + instanceData *pData; DEFiRet; - assert(pData != NULL); + + pData = pWrkrData->pData; if(pData->md.smtp.pszSrv == NULL) smtpSrv = "127.0.0.1"; @@ -313,12 +327,12 @@ serverConnect(instanceData *pData) ABORT_FINALIZE(RS_RET_IO_ERROR); } - if((pData->md.smtp.sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) == -1) { + if((pWrkrData->md.smtp.sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) == -1) { dbgprintf("couldn't create send socket, reason %s", rs_strerror_r(errno, errStr, sizeof(errStr))); ABORT_FINALIZE(RS_RET_IO_ERROR); } - if(connect(pData->md.smtp.sock, res->ai_addr, res->ai_addrlen) != 0) { + if(connect(pWrkrData->md.smtp.sock, res->ai_addr, res->ai_addrlen) != 0) { dbgprintf("create tcp connection failed, reason %s", rs_strerror_r(errno, errStr, sizeof(errStr))); ABORT_FINALIZE(RS_RET_IO_ERROR); } @@ -328,9 +342,9 @@ finalize_it: freeaddrinfo(res); if(iRet != RS_RET_OK) { - if(pData->md.smtp.sock != -1) { - close(pData->md.smtp.sock); - pData->md.smtp.sock = -1; + if(pWrkrData->md.smtp.sock != -1) { + close(pWrkrData->md.smtp.sock); + pWrkrData->md.smtp.sock = -1; } } @@ -374,7 +388,7 @@ finalize_it: * The body is special in that we must escape a leading dot inside a line */ static rsRetVal -bodySend(instanceData *pData, char *msg, size_t len) +bodySend(wrkrInstanceData_t *pWrkrData, char *msg, size_t len) { DEFiRet; char szBuf[2048]; @@ -383,12 +397,12 @@ bodySend(instanceData *pData, char *msg, size_t len) int bHadCR = 0; int bInStartOfLine = 1; - assert(pData != NULL); + assert(pWrkrData != NULL); assert(msg != NULL); for(iSrc = 0 ; iSrc < len ; ++iSrc) { if(iBuf >= sizeof(szBuf) - 1) { /* one is reserved for our extra dot */ - CHKiRet(Send(pData->md.smtp.sock, szBuf, iBuf)); + CHKiRet(Send(pWrkrData->md.smtp.sock, szBuf, iBuf)); iBuf = 0; } szBuf[iBuf++] = msg[iSrc]; @@ -413,7 +427,7 @@ bodySend(instanceData *pData, char *msg, size_t len) } if(iBuf > 0) { /* incomplete buffer to send (the *usual* case)? */ - CHKiRet(Send(pData->md.smtp.sock, szBuf, iBuf)); + CHKiRet(Send(pWrkrData->md.smtp.sock, szBuf, iBuf)); } finalize_it: @@ -424,17 +438,17 @@ finalize_it: /* read response line from server */ static rsRetVal -readResponseLn(instanceData *pData, char *pLn, size_t lenLn) +readResponseLn(wrkrInstanceData_t *pWrkrData, char *pLn, size_t lenLn) { DEFiRet; size_t i = 0; char c; - assert(pData != NULL); + assert(pWrkrData != NULL); assert(pLn != NULL); do { - CHKiRet(getRcvChar(pData, &c)); + CHKiRet(getRcvChar(pWrkrData, &c)); if(c == '\n') break; if(i < (lenLn - 1)) /* if line is too long, we simply discard the rest */ @@ -453,18 +467,18 @@ finalize_it: * rgerhards, 2008-04-07 */ static rsRetVal -readResponse(instanceData *pData, int *piState, int iExpected) +readResponse(wrkrInstanceData_t *pWrkrData, int *piState, int iExpected) { DEFiRet; int bCont; char buf[128]; - assert(pData != NULL); + assert(pWrkrData != NULL); assert(piState != NULL); bCont = 1; do { - CHKiRet(readResponseLn(pData, buf, sizeof(buf))); + CHKiRet(readResponseLn(pWrkrData, buf, sizeof(buf))); /* note: the code below is not 100% clean as we may have received less than 4 characters. * However, as we have a fixed size this will not create a vulnerability. An error will * also most likely be generated, so it is quite acceptable IMHO -- rgerhards, 2008-04-08 @@ -506,64 +520,65 @@ mkSMTPTimestamp(uchar *pszBuf, size_t lenBuf) * rgerhards, 2008-04-04 */ static rsRetVal -sendSMTP(instanceData *pData, uchar *body, uchar *subject) +sendSMTP(wrkrInstanceData_t *pWrkrData, uchar *body, uchar *subject) { DEFiRet; int iState; /* SMTP state */ + instanceData *pData; uchar szDateBuf[64]; - assert(pData != NULL); + pData = pWrkrData->pData; - CHKiRet(serverConnect(pData)); - CHKiRet(readResponse(pData, &iState, 220)); + CHKiRet(serverConnect(pWrkrData)); + CHKiRet(readResponse(pWrkrData, &iState, 220)); - CHKiRet(Send(pData->md.smtp.sock, "HELO ", 5)); - CHKiRet(Send(pData->md.smtp.sock, (char*)glbl.GetLocalHostName(), strlen((char*)glbl.GetLocalHostName()))); - CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); - CHKiRet(readResponse(pData, &iState, 250)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "HELO ", 5)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)glbl.GetLocalHostName(), strlen((char*)glbl.GetLocalHostName()))); + CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); + CHKiRet(readResponse(pWrkrData, &iState, 250)); - CHKiRet(Send(pData->md.smtp.sock, "MAIL FROM:<", sizeof("MAIL FROM:<") - 1)); - CHKiRet(Send(pData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom))); - CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); - CHKiRet(readResponse(pData, &iState, 250)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "MAIL FROM:<", sizeof("MAIL FROM:<") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom))); + CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); + CHKiRet(readResponse(pWrkrData, &iState, 250)); - CHKiRet(WriteRcpts(pData, (uchar*)"RCPT TO", sizeof("RCPT TO") - 1, 250)); + CHKiRet(WriteRcpts(pWrkrData, (uchar*)"RCPT TO", sizeof("RCPT TO") - 1, 250)); - CHKiRet(Send(pData->md.smtp.sock, "DATA\r\n", sizeof("DATA\r\n") - 1)); - CHKiRet(readResponse(pData, &iState, 354)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "DATA\r\n", sizeof("DATA\r\n") - 1)); + CHKiRet(readResponse(pWrkrData, &iState, 354)); /* now come the data part */ /* header */ mkSMTPTimestamp(szDateBuf, sizeof(szDateBuf)); - CHKiRet(Send(pData->md.smtp.sock, (char*)szDateBuf, strlen((char*)szDateBuf))); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)szDateBuf, strlen((char*)szDateBuf))); - CHKiRet(Send(pData->md.smtp.sock, "From: <", sizeof("From: <") - 1)); - CHKiRet(Send(pData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom))); - CHKiRet(Send(pData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "From: <", sizeof("From: <") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)pData->md.smtp.pszFrom, strlen((char*)pData->md.smtp.pszFrom))); + CHKiRet(Send(pWrkrData->md.smtp.sock, ">\r\n", sizeof(">\r\n") - 1)); - CHKiRet(WriteRcpts(pData, (uchar*)"To", sizeof("To") - 1, -1)); + CHKiRet(WriteRcpts(pWrkrData, (uchar*)"To", sizeof("To") - 1, -1)); - CHKiRet(Send(pData->md.smtp.sock, "Subject: ", sizeof("Subject: ") - 1)); - CHKiRet(Send(pData->md.smtp.sock, (char*)subject, strlen((char*)subject))); - CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "Subject: ", sizeof("Subject: ") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, (char*)subject, strlen((char*)subject))); + CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); - CHKiRet(Send(pData->md.smtp.sock, "X-Mailer: rsyslog-immail\r\n", sizeof("x-mailer: rsyslog-immail\r\n") - 1)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "X-Mailer: rsyslog-immail\r\n", sizeof("x-mailer: rsyslog-immail\r\n") - 1)); - CHKiRet(Send(pData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); /* indicate end of header */ + CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n", sizeof("\r\n") - 1)); /* indicate end of header */ /* body */ if(pData->bEnableBody) - CHKiRet(bodySend(pData, (char*)body, strlen((char*) body))); + CHKiRet(bodySend(pWrkrData, (char*)body, strlen((char*) body))); /* end of data, back to envelope transaction */ - CHKiRet(Send(pData->md.smtp.sock, "\r\n.\r\n", sizeof("\r\n.\r\n") - 1)); - CHKiRet(readResponse(pData, &iState, 250)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "\r\n.\r\n", sizeof("\r\n.\r\n") - 1)); + CHKiRet(readResponse(pWrkrData, &iState, 250)); - CHKiRet(Send(pData->md.smtp.sock, "QUIT\r\n", sizeof("QUIT\r\n") - 1)); - CHKiRet(readResponse(pData, &iState, 221)); + CHKiRet(Send(pWrkrData->md.smtp.sock, "QUIT\r\n", sizeof("QUIT\r\n") - 1)); + CHKiRet(readResponse(pWrkrData, &iState, 221)); /* we are finished, a new connection is created for each request, so let's close it now */ - CHKiRet(serverDisconnect(pData)); + CHKiRet(serverDisconnect(pWrkrData)); finalize_it: RETiRet; @@ -583,8 +598,8 @@ finalize_it: */ BEGINtryResume CODESTARTtryResume - CHKiRet(serverConnect(pData)); - CHKiRet(serverDisconnect(pData)); /* if we fail, we will never reach this line */ + CHKiRet(serverConnect(pWrkrData)); + CHKiRet(serverDisconnect(pWrkrData)); /* if we fail, we will never reach this line */ finalize_it: if(iRet == RS_RET_IO_ERROR) iRet = RS_RET_SUSPENDED; @@ -593,17 +608,14 @@ ENDtryResume BEGINdoAction CODESTARTdoAction - dbgprintf(" Mail\n"); + DBGPRINTF(" Mail\n"); - /* forward */ - if(pData->bHaveSubject) - iRet = sendSMTP(pData, ppString[0], ppString[1]); - else - iRet = sendSMTP(pData, ppString[0], (uchar*)"message from rsyslog"); + iRet = sendSMTP(pWrkrData, ppString[0], + (pWrkrData->pData->bHaveSubject) ? + ppString[1] : (uchar*)"message from rsyslog"); if(iRet != RS_RET_OK) { - /* error! */ - dbgprintf("error sending mail, suspending\n"); + DBGPRINTF("error sending mail, suspending\n"); iRet = RS_RET_SUSPENDED; } ENDdoAction @@ -689,6 +701,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES ENDqueryEtryPt diff --git a/plugins/ommongodb/ommongodb.c b/plugins/ommongodb/ommongodb.c index af1f5a37..09f19768 100644 --- a/plugins/ommongodb/ommongodb.c +++ b/plugins/ommongodb/ommongodb.c @@ -4,7 +4,7 @@ * mongodb C interface is crap. Obtain the library here: * https://github.com/algernon/libmongo-client * - * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -71,6 +71,10 @@ typedef struct _instanceData { int bErrMsgPermitted; /* only one errmsg permitted per connection */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* tables for interfacing with the v6 config system */ /* action (instance) parameters */ @@ -89,10 +93,16 @@ static struct cnfparamblk actpblk = actpdescr }; +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; + BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature /* use this to specify if select features are supported by this @@ -126,6 +136,10 @@ CODESTARTfreeInstance free(pData->tplName); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -422,14 +436,17 @@ error: BEGINtryResume CODESTARTtryResume - if(pData->conn == NULL) { - iRet = initMongoDB(pData, 1); + if(pWrkrData->pData->conn == NULL) { + iRet = initMongoDB(pWrkrData->pData, 1); } ENDtryResume BEGINdoAction bson *doc = NULL; + instanceData *pData; CODESTARTdoAction + pthread_mutex_lock(&mutDoAct); + pData = pWrkrData->pData; /* see if we are ready to proceed */ if(pData->conn == NULL) { CHKiRet(initMongoDB(pData, 0)); @@ -454,6 +471,7 @@ CODESTARTdoAction } finalize_it: + pthread_mutex_unlock(&mutDoAct); if(doc != NULL) bson_free(doc); ENDdoAction @@ -560,6 +578,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/ommysql/ommysql.c b/plugins/ommysql/ommysql.c index 49079ab1..c004d1c6 100644 --- a/plugins/ommysql/ommysql.c +++ b/plugins/ommysql/ommysql.c @@ -6,7 +6,7 @@ * * File begun on 2007-07-20 by RGerhards (extracted from syslogd.c) * - * Copyright 2007-2012 Adiscon GmbH. + * Copyright 2007-2013 Adiscon GmbH. * * This file is part of rsyslog. * Licensed under the Apache License, Version 2.0 (the "License"); @@ -55,18 +55,22 @@ DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) typedef struct _instanceData { - MYSQL *f_hmysql; /* handle to MySQL */ char dbsrv[MAXHOSTNAMELEN+1]; /* IP or hostname of DB server*/ unsigned int dbsrvPort; /* port of MySQL server */ char dbname[_DB_MAXDBLEN+1]; /* DB name */ char dbuid[_DB_MAXUNAMELEN+1]; /* DB user */ char dbpwd[_DB_MAXPWDLEN+1]; /* DB user's password */ - unsigned uLastMySQLErrno; /* last errno returned by MySQL or 0 if all is well */ - uchar * f_configfile; /* MySQL Client Configuration File */ - uchar * f_configsection; /* MySQL Client Configuration Section */ - uchar *tplName; /* format template to use */ + uchar *configfile; /* MySQL Client Configuration File */ + uchar *configsection; /* MySQL Client Configuration Section */ + uchar *tplName; /* format template to use */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + MYSQL *hmysql; /* handle to MySQL */ + unsigned uLastMySQLErrno; /* last errno returned by MySQL or 0 if all is well */ +} wrkrInstanceData_t; + typedef struct configSettings_s { int iSrvPort; /* database server port */ uchar *pszMySQLConfigFile; /* MySQL Client Configuration File */ @@ -104,6 +108,12 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->hmysql = NULL; +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -115,25 +125,28 @@ ENDisCompatibleWithFeature * MySQL connection. * Initially added 2004-10-28 */ -static void closeMySQL(instanceData *pData) +static void closeMySQL(wrkrInstanceData_t *pWrkrData) { - ASSERT(pData != NULL); - - if(pData->f_hmysql != NULL) { /* just to be on the safe side... */ - mysql_close(pData->f_hmysql); - pData->f_hmysql = NULL; + if(pWrkrData->hmysql != NULL) { /* just to be on the safe side... */ + mysql_close(pWrkrData->hmysql); + pWrkrData->hmysql = NULL; } } BEGINfreeInstance CODESTARTfreeInstance - free(pData->f_configfile); - free(pData->f_configsection); + free(pData->configfile); + free(pData->configsection); free(pData->tplName); - closeMySQL(pData); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + closeMySQL(pWrkrData); +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* nothing special here */ @@ -144,25 +157,23 @@ ENDdbgPrintInstInfo * We check if we have a valid MySQL handle. If not, we simply * report an error, but can not be specific. RGerhards, 2007-01-30 */ -static void reportDBError(instanceData *pData, int bSilent) +static void reportDBError(wrkrInstanceData_t *pWrkrData, int bSilent) { char errMsg[512]; unsigned uMySQLErrno; - ASSERT(pData != NULL); - /* output log message */ errno = 0; - if(pData->f_hmysql == NULL) { + if(pWrkrData->hmysql == NULL) { errmsg.LogError(0, NO_ERRCODE, "unknown DB error occured - could not obtain MySQL handle"); } else { /* we can ask mysql for the error description... */ - uMySQLErrno = mysql_errno(pData->f_hmysql); + uMySQLErrno = mysql_errno(pWrkrData->hmysql); snprintf(errMsg, sizeof(errMsg)/sizeof(char), "db error (%d): %s\n", uMySQLErrno, - mysql_error(pData->f_hmysql)); - if(bSilent || uMySQLErrno == pData->uLastMySQLErrno) + mysql_error(pWrkrData->hmysql)); + if(bSilent || uMySQLErrno == pWrkrData->uLastMySQLErrno) dbgprintf("mysql, DBError(silent): %s\n", errMsg); else { - pData->uLastMySQLErrno = uMySQLErrno; + pWrkrData->uLastMySQLErrno = uMySQLErrno; errmsg.LogError(0, NO_ERRCODE, "%s", errMsg); } } @@ -175,25 +186,26 @@ static void reportDBError(instanceData *pData, int bSilent) * MySQL connection. * Initially added 2004-10-28 mmeckelein */ -static rsRetVal initMySQL(instanceData *pData, int bSilent) +static rsRetVal initMySQL(wrkrInstanceData_t *pWrkrData, int bSilent) { + instanceData *pData; DEFiRet; - ASSERT(pData != NULL); - ASSERT(pData->f_hmysql == NULL); - pData->f_hmysql = mysql_init(NULL); - if(pData->f_hmysql == NULL) { + ASSERT(pWrkrData->hmysql == NULL); + pData = pWrkrData->pData; + pWrkrData->hmysql = mysql_init(NULL); + if(pWrkrData->hmysql == NULL) { errmsg.LogError(0, RS_RET_SUSPENDED, "can not initialize MySQL handle"); iRet = RS_RET_SUSPENDED; } else { /* we could get the handle, now on with work... */ - mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_GROUP,((pData->f_configsection!=NULL)?(char*)pData->f_configsection:"client")); - if(pData->f_configfile!=NULL){ + mysql_options(pWrkrData->hmysql,MYSQL_READ_DEFAULT_GROUP,((pData->configsection!=NULL)?(char*)pData->configsection:"client")); + if(pData->configfile!=NULL){ FILE * fp; - fp=fopen((char*)pData->f_configfile,"r"); + fp=fopen((char*)pData->configfile,"r"); int err=errno; if(fp==NULL){ char msg[512]; - snprintf(msg,sizeof(msg)/sizeof(char),"Could not open '%s' for reading",pData->f_configfile); + snprintf(msg,sizeof(msg)/sizeof(char),"Could not open '%s' for reading",pData->configfile); if(bSilent) { char errStr[512]; rs_strerror_r(err, errStr, sizeof(errStr)); @@ -202,17 +214,17 @@ static rsRetVal initMySQL(instanceData *pData, int bSilent) errmsg.LogError(err,NO_ERRCODE,"mysql configuration error: %s\n",msg); } else { fclose(fp); - mysql_options(pData->f_hmysql,MYSQL_READ_DEFAULT_FILE,pData->f_configfile); + mysql_options(pWrkrData->hmysql,MYSQL_READ_DEFAULT_FILE,pData->configfile); } } /* Connect to database */ - if(mysql_real_connect(pData->f_hmysql, pData->dbsrv, pData->dbuid, + if(mysql_real_connect(pWrkrData->hmysql, pData->dbsrv, pData->dbuid, pData->dbpwd, pData->dbname, pData->dbsrvPort, NULL, 0) == NULL) { - reportDBError(pData, bSilent); - closeMySQL(pData); /* ignore any error we may get */ + reportDBError(pWrkrData, bSilent); + closeMySQL(pWrkrData); /* ignore any error we may get */ ABORT_FINALIZE(RS_RET_SUSPENDED); } - mysql_autocommit(pData->f_hmysql, 0); + mysql_autocommit(pWrkrData->hmysql, 0); } finalize_it: @@ -224,35 +236,32 @@ finalize_it: * to an established MySQL session. * Initially added 2004-10-28 mmeckelein */ -rsRetVal writeMySQL(uchar *psz, instanceData *pData) +rsRetVal writeMySQL(wrkrInstanceData_t *pWrkrData, uchar *psz) { DEFiRet; - ASSERT(psz != NULL); - ASSERT(pData != NULL); - /* see if we are ready to proceed */ - if(pData->f_hmysql == NULL) { - CHKiRet(initMySQL(pData, 0)); + if(pWrkrData->hmysql == NULL) { + CHKiRet(initMySQL(pWrkrData, 0)); } /* try insert */ - if(mysql_query(pData->f_hmysql, (char*)psz)) { + if(mysql_query(pWrkrData->hmysql, (char*)psz)) { /* error occured, try to re-init connection and retry */ - closeMySQL(pData); /* close the current handle */ - CHKiRet(initMySQL(pData, 0)); /* try to re-open */ - if(mysql_query(pData->f_hmysql, (char*)psz)) { /* re-try insert */ + closeMySQL(pWrkrData); /* close the current handle */ + CHKiRet(initMySQL(pWrkrData, 0)); /* try to re-open */ + if(mysql_query(pWrkrData->hmysql, (char*)psz)) { /* re-try insert */ /* we failed, giving up for now */ - reportDBError(pData, 0); - closeMySQL(pData); /* free ressources */ + reportDBError(pWrkrData, 0); + closeMySQL(pWrkrData); /* free ressources */ ABORT_FINALIZE(RS_RET_SUSPENDED); } } finalize_it: if(iRet == RS_RET_OK) { - pData->uLastMySQLErrno = 0; /* reset error for error supression */ + pWrkrData->uLastMySQLErrno = 0; /* reset error for error supression */ } RETiRet; @@ -261,28 +270,28 @@ finalize_it: BEGINtryResume CODESTARTtryResume - if(pData->f_hmysql == NULL) { - iRet = initMySQL(pData, 1); + if(pWrkrData->hmysql == NULL) { + iRet = initMySQL(pWrkrData, 1); } ENDtryResume BEGINbeginTransaction CODESTARTbeginTransaction - CHKiRet(writeMySQL((uchar*)"START TRANSACTION", pData)); + CHKiRet(writeMySQL(pWrkrData, (uchar*)"START TRANSACTION")); finalize_it: ENDbeginTransaction BEGINdoAction CODESTARTdoAction dbgprintf("\n"); - CHKiRet(writeMySQL(ppString[0], pData)); + CHKiRet(writeMySQL(pWrkrData, ppString[0])); iRet = RS_RET_DEFER_COMMIT; finalize_it: ENDdoAction BEGINendTransaction CODESTARTendTransaction - if (mysql_commit(pData->f_hmysql) != 0) { + if(mysql_commit(pWrkrData->hmysql) != 0) { dbgprintf("mysql server error: transaction not committed\n"); iRet = RS_RET_SUSPENDED; } @@ -293,10 +302,9 @@ static inline void setInstParamDefaults(instanceData *pData) { pData->dbsrvPort = 0; - pData->f_configfile = NULL; - pData->f_configsection = NULL; + pData->configfile = NULL; + pData->configsection = NULL; pData->tplName = NULL; - pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */ } @@ -338,9 +346,9 @@ CODESTARTnewActInst strncpy(pData->dbpwd, cstr, sizeof(pData->dbpwd)); free(cstr); } else if(!strcmp(actpblk.descr[i].name, "mysqlconfig.file")) { - pData->f_configfile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + pData->configfile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "mysqlconfig.section")) { - pData->f_configsection = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + pData->configsection = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { @@ -424,9 +432,8 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) ABORT_FINALIZE(RS_RET_INVALID_PARAMS); } else { pData->dbsrvPort = (unsigned) cs.iSrvPort; /* set configured port */ - pData->f_configfile = cs.pszMySQLConfigFile; - pData->f_configsection = cs.pszMySQLConfigSection; - pData->f_hmysql = NULL; /* initialize, but connect only on first message (important for queued mode!) */ + pData->configfile = cs.pszMySQLConfigFile; + pData->configsection = cs.pszMySQLConfigSection; } CODE_STD_FINALIZERparseSelectorAct @@ -446,6 +453,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt @@ -484,7 +492,7 @@ CODEmodInit_QueryRegCFSLineHdlr mysql_server_init(0, NULL, NULL) # endif ) { - errmsg.LogError(0, NO_ERRCODE, "ommysql: mysql_server_init() failed, plugin " + errmsg.LogError(0, NO_ERRCODE, "ommysql: intializing mysql client failed, plugin " "can not run"); ABORT_FINALIZE(RS_RET_ERR); } diff --git a/plugins/ompgsql/ompgsql.c b/plugins/ompgsql/ompgsql.c index 11f346f6..87599484 100644 --- a/plugins/ompgsql/ompgsql.c +++ b/plugins/ompgsql/ompgsql.c @@ -6,7 +6,7 @@ * * File begun on 2007-10-18 by sur5r (converted from ommysql.c) * - * Copyright 2007, 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007, 2013 Rainer Gerhards and Adiscon GmbH. * * The following link my be useful for the not-so-postgres literate * when setting up a test environment (on Fedora): @@ -66,11 +66,17 @@ typedef struct _instanceData { ConnStatusType eLastPgSQLStatus; /* last status from postgres */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { EMPTY_STRUCT } configSettings_t; static configSettings_t __attribute__((unused)) cs; +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; + BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars ENDinitConfVars @@ -82,6 +88,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -108,6 +118,9 @@ CODESTARTfreeInstance closePgSQL(pData); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -244,8 +257,8 @@ finalize_it: BEGINtryResume CODESTARTtryResume - if(pData->f_hpgsql == NULL) { - iRet = initPgSQL(pData, 1); + if(pWrkrData->pData->f_hpgsql == NULL) { + iRet = initPgSQL(pWrkrData->pData, 1); if(iRet == RS_RET_OK) { /* the code above seems not to actually connect to the database. As such, we do a * dummy statement (a pointless select...) to verify the connection and return @@ -253,7 +266,7 @@ CODESTARTtryResume * PostgreSQL expert, so any patch that does the desired result in a more * intelligent way is highly welcome. -- rgerhards, 2009-12-16 */ - iRet = writePgSQL((uchar*)"select 'a' as a", pData); + iRet = writePgSQL((uchar*)"select 'a' as a", pWrkrData->pData); } } @@ -263,23 +276,25 @@ ENDtryResume BEGINbeginTransaction CODESTARTbeginTransaction dbgprintf("ompgsql: beginTransaction\n"); - iRet = writePgSQL((uchar*) "begin", pData); /* TODO: make user-configurable */ + iRet = writePgSQL((uchar*) "begin", pWrkrData->pData); /* TODO: make user-configurable */ ENDbeginTransaction BEGINdoAction CODESTARTdoAction + pthread_mutex_lock(&mutDoAct); dbgprintf("\n"); - CHKiRet(writePgSQL(ppString[0], pData)); + CHKiRet(writePgSQL(ppString[0], pWrkrData->pData)); if(bCoreSupportsBatching) iRet = RS_RET_DEFER_COMMIT; finalize_it: + pthread_mutex_unlock(&mutDoAct); ENDdoAction BEGINendTransaction CODESTARTendTransaction - iRet = writePgSQL((uchar*) "commit;", pData); /* TODO: make user-configurable */ + iRet = writePgSQL((uchar*) "commit;", pWrkrData->pData); /* TODO: make user-configurable */ dbgprintf("ompgsql: endTransaction\n"); ENDendTransaction @@ -361,6 +376,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ ENDqueryEtryPt @@ -372,6 +388,11 @@ INITLegCnfVars CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING); + +# warning: transaction support missing for v8 + bCoreSupportsBatching= 0; + DBGPRINTF("ompgsql: transactions are not yet supported on v8\n"); + DBGPRINTF("ompgsql: module compiled with rsyslog version %s.\n", VERSION); DBGPRINTF("ompgsql: %susing transactional output interface.\n", bCoreSupportsBatching ? "" : "not "); ENDmodInit diff --git a/plugins/omprog/omprog.c b/plugins/omprog/omprog.c index cd07dcfb..1060b5c3 100644 --- a/plugins/omprog/omprog.c +++ b/plugins/omprog/omprog.c @@ -6,7 +6,7 @@ * * File begun on 2009-04-01 by RGerhards * - * Copyright 2009-2012 Adiscon GmbH. + * Copyright 2009-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -35,6 +35,7 @@ #include <errno.h> #include <unistd.h> #include <wait.h> +#include <pthread.h> #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" @@ -60,8 +61,13 @@ typedef struct _instanceData { int fdPipe; /* file descriptor to write to */ int bIsRunning; /* is binary currently running? 0-no, 1-yes */ int iParams; /* Holds the count of parameters if set*/ + pthread_mutex_t mut; /* make sure only one instance is active */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { uchar *szBinary; /* name of binary to call */ } configSettings_t; @@ -89,8 +95,13 @@ ENDinitConfVars BEGINcreateInstance CODESTARTcreateInstance + pthread_mutex_init(&pData->mut, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -102,6 +113,7 @@ ENDisCompatibleWithFeature BEGINfreeInstance int i; CODESTARTfreeInstance + pthread_mutex_destroy(&pData->mut); if(pData->szBinary != NULL) free(pData->szBinary); if(pData->aParams != NULL) { @@ -112,6 +124,10 @@ CODESTARTfreeInstance } ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -287,22 +303,21 @@ writePipe(instanceData *pData, uchar *szMsg) lenWrite = strlen((char*)szMsg); writeOffset = 0; - do - { + do { lenWritten = write(pData->fdPipe, ((char*)szMsg)+writeOffset, lenWrite); if(lenWritten == -1) { switch(errno) { - case EPIPE: - DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n", - pData->szBinary); - CHKiRet(cleanup(pData)); - CHKiRet(tryRestart(pData)); - break; - default: - DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno, - rs_strerror_r(errno, errStr, sizeof(errStr))); - ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE); - break; + case EPIPE: + DBGPRINTF("omprog: Program '%s' terminated, trying to restart\n", + pData->szBinary); + CHKiRet(cleanup(pData)); + CHKiRet(tryRestart(pData)); + break; + default: + DBGPRINTF("omprog: error %d writing to pipe: %s\n", errno, + rs_strerror_r(errno, errStr, sizeof(errStr))); + ABORT_FINALIZE(RS_RET_ERR_WRITE_PIPE); + break; } } else { writeOffset += lenWritten; @@ -316,7 +331,10 @@ finalize_it: BEGINdoAction + instanceData *pData; CODESTARTdoAction + pData = pWrkrData->pData; + pthread_mutex_lock(&pData->mut); if(pData->bIsRunning == 0) { openPipe(pData); } @@ -325,6 +343,7 @@ CODESTARTdoAction if(iRet != RS_RET_OK) iRet = RS_RET_SUSPENDED; + pthread_mutex_unlock(&pData->mut); ENDdoAction @@ -496,6 +515,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c index 34511e46..3b1584e2 100644 --- a/plugins/omrelp/omrelp.c +++ b/plugins/omrelp/omrelp.c @@ -2,8 +2,17 @@ * * This is the implementation of the RELP output module. * - * NOTE: read comments in module-template.h to understand how this file - * works! + * Note that when multiple action workers are activated, we currently + * also create multiple actions. This may be the source of some mild + * message loss (!) if the worker instance is shut down while the + * connection to the remote system is in retry state. + * TODO: think if we should implement a mode where we do NOT + * support multiple action worker instances. This would be + * slower, but not have this loss opportunity. But it should + * definitely be optional and by default off due to the + * performance implications (and given the fact that message + * loss is pretty unlikely in usual cases). + * * * File begun on 2008-03-13 by RGerhards * @@ -63,13 +72,9 @@ static relpEngine_t *pRelpEngine; /* our relp engine */ typedef struct _instanceData { uchar *target; uchar *port; - int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */ - int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */ int sizeWindow; /**< the RELP window size - 0=use default */ unsigned timeout; unsigned rebindInterval; - unsigned nSent; - relpClt_t *pRelpClt; /* relp client for this instance */ sbool bEnableTLS; sbool bEnableTLSZip; sbool bHadAuthFail; /**< set on auth failure, will cause retry to disable action */ @@ -85,11 +90,20 @@ typedef struct _instanceData { } permittedPeers; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */ + int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */ + relpClt_t *pRelpClt; /* relp client for this instance */ + unsigned nSent; /* number msgs sent - for rebind support */ +} wrkrInstanceData_t; + typedef struct configSettings_s { EMPTY_STRUCT } configSettings_t; static configSettings_t __attribute__((unused)) cs; +static rsRetVal doCreateRelpClient(wrkrInstanceData_t *pWrkrData); /* tables for interfacing with the v6 config system */ /* action (instance) parameters */ @@ -135,10 +149,10 @@ static uchar *getRelpPt(instanceData *pData) static void onErr(void *pUsr, char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode) { - instanceData *pData = (instanceData*) pUsr; + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) pUsr; errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: error '%s', object " " '%s' - action may not work as intended", - pData->target, pData->port, errmesg, objinfo); + pWrkrData->pData->target, pWrkrData->pData->port, errmesg, objinfo); } static void @@ -152,55 +166,58 @@ onGenericErr(char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal er static void onAuthErr(void *pUsr, char *authinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode) { - instanceData *pData = (instanceData*) pUsr; + instanceData *pData = ((wrkrInstanceData_t*) pUsr)->pData; errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: authentication error '%s', peer " "is '%s' - DISABLING action", pData->target, pData->port, errmesg, authinfo); pData->bHadAuthFail = 1; } -static inline rsRetVal -doCreateRelpClient(instanceData *pData) +static rsRetVal +doCreateRelpClient(wrkrInstanceData_t *pWrkrData) { int i; + instanceData *pData; DEFiRet; - if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK) + + pData = pWrkrData->pData; + if(relpEngineCltConstruct(pRelpEngine, &pWrkrData->pRelpClt) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK) + if(relpCltSetTimeout(pWrkrData->pRelpClt, pData->timeout) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetWindowSize(pData->pRelpClt, pData->sizeWindow) != RELP_RET_OK) + if(relpCltSetWindowSize(pWrkrData->pRelpClt, pData->sizeWindow) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetUsrPtr(pData->pRelpClt, pData) != RELP_RET_OK) + if(relpCltSetUsrPtr(pWrkrData->pRelpClt, pWrkrData) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); if(pData->bEnableTLS) { - if(relpCltEnableTLS(pData->pRelpClt) != RELP_RET_OK) + if(relpCltEnableTLS(pWrkrData->pRelpClt) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); if(pData->bEnableTLSZip) { - if(relpCltEnableTLSZip(pData->pRelpClt) != RELP_RET_OK) + if(relpCltEnableTLSZip(pWrkrData->pRelpClt) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); } - if(relpCltSetGnuTLSPriString(pData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK) + if(relpCltSetGnuTLSPriString(pWrkrData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetAuthMode(pData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) { + if(relpCltSetAuthMode(pWrkrData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) { errmsg.LogError(0, RS_RET_RELP_ERR, "omrelp: invalid auth mode '%s'\n", pData->authmode); ABORT_FINALIZE(RS_RET_RELP_ERR); } - if(relpCltSetCACert(pData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK) + if(relpCltSetCACert(pWrkrData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetOwnCert(pData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK) + if(relpCltSetOwnCert(pWrkrData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); - if(relpCltSetPrivKey(pData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK) + if(relpCltSetPrivKey(pWrkrData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); for(i = 0 ; i < pData->permittedPeers.nmemb ; ++i) { - relpCltAddPermittedPeer(pData->pRelpClt, (char*)pData->permittedPeers.name[i]); + relpCltAddPermittedPeer(pWrkrData->pRelpClt, (char*)pData->permittedPeers.name[i]); } } if(glbl.GetSourceIPofLocalClient() == NULL) { /* ar Do we have a client IP set? */ - if(relpCltSetClientIP(pData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK) + if(relpCltSetClientIP(pWrkrData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); } - pData->bInitialConnect = 1; - pData->nSent = 0; + pWrkrData->bInitialConnect = 1; + pWrkrData->nSent = 0; finalize_it: RETiRet; } @@ -221,11 +238,15 @@ CODESTARTcreateInstance pData->permittedPeers.nmemb = 0; ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->pRelpClt = NULL; + iRet = doCreateRelpClient(pWrkrData); +ENDcreateWrkrInstance + BEGINfreeInstance int i; CODESTARTfreeInstance - if(pData->pRelpClt != NULL) - relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt); free(pData->target); free(pData->port); free(pData->tplName); @@ -239,6 +260,12 @@ CODESTARTfreeInstance } ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + if(pWrkrData->pRelpClt != NULL) + relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt); +ENDfreeWrkrInstance + static inline void setInstParamDefaults(instanceData *pData) { @@ -318,8 +345,6 @@ CODESTARTnewActInst "RSYSLOG_ForwardFormat" : (char*)pData->tplName), OMSR_NO_RQD_TPL_OPTS)); - CHKiRet(doCreateRelpClient(pData)); - CODE_STD_FINALIZERnewActInst if(pvals != NULL) cnfparamvalsDestruct(pvals, &actpblk); @@ -347,22 +372,23 @@ ENDdbgPrintInstInfo /* try to connect to server * rgerhards, 2008-03-21 */ -static rsRetVal doConnect(instanceData *pData) +static rsRetVal doConnect(wrkrInstanceData_t *pWrkrData) { DEFiRet; - if(pData->bInitialConnect) { - iRet = relpCltConnect(pData->pRelpClt, glbl.GetDefPFFamily(), pData->port, pData->target); + if(pWrkrData->bInitialConnect) { + iRet = relpCltConnect(pWrkrData->pRelpClt, glbl.GetDefPFFamily(), + pWrkrData->pData->port, pWrkrData->pData->target); if(iRet == RELP_RET_OK) - pData->bInitialConnect = 0; + pWrkrData->bInitialConnect = 0; } else { - iRet = relpCltReconnect(pData->pRelpClt); + iRet = relpCltReconnect(pWrkrData->pRelpClt); } if(iRet == RELP_RET_OK) { - pData->bIsConnected = 1; + pWrkrData->bIsConnected = 1; } else { - pData->bIsConnected = 0; + pWrkrData->bIsConnected = 0; iRet = RS_RET_SUSPENDED; } @@ -372,21 +398,21 @@ static rsRetVal doConnect(instanceData *pData) BEGINtryResume CODESTARTtryResume - if(pData->bHadAuthFail) { + if(pWrkrData->pData->bHadAuthFail) { ABORT_FINALIZE(RS_RET_DISABLE_ACTION); } - iRet = doConnect(pData); + iRet = doConnect(pWrkrData); finalize_it: ENDtryResume static inline rsRetVal -doRebind(instanceData *pData) +doRebind(wrkrInstanceData_t *pWrkrData) { DEFiRet; DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n"); - CHKiRet(relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt)); - pData->bIsConnected = 0; - CHKiRet(doCreateRelpClient(pData)); + CHKiRet(relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt)); + pWrkrData->bIsConnected = 0; + CHKiRet(doCreateRelpClient(pWrkrData)); finalize_it: RETiRet; } @@ -394,10 +420,10 @@ finalize_it: BEGINbeginTransaction CODESTARTbeginTransaction dbgprintf("omrelp: beginTransaction\n"); - if(!pData->bIsConnected) { - CHKiRet(doConnect(pData)); + if(!pWrkrData->bIsConnected) { + CHKiRet(doConnect(pWrkrData)); } - relpCltHintBurstBegin(pData->pRelpClt); + relpCltHintBurstBegin(pWrkrData->pRelpClt); finalize_it: ENDbeginTransaction @@ -405,11 +431,13 @@ BEGINdoAction uchar *pMsg; /* temporary buffering */ size_t lenMsg; relpRetVal ret; + instanceData *pData; CODESTARTdoAction + pData = pWrkrData->pData; dbgprintf(" %s:%s/RELP\n", pData->target, getRelpPt(pData)); - if(!pData->bIsConnected) { - CHKiRet(doConnect(pData)); + if(!pWrkrData->bIsConnected) { + CHKiRet(doConnect(pWrkrData)); } pMsg = ppString[0]; @@ -420,7 +448,7 @@ CODESTARTdoAction lenMsg = glbl.GetMaxLine(); /* forward */ - ret = relpCltSendSyslog(pData->pRelpClt, (uchar*) pMsg, lenMsg); + ret = relpCltSendSyslog(pWrkrData->pRelpClt, (uchar*) pMsg, lenMsg); if(ret != RELP_RET_OK) { /* error! */ dbgprintf("error forwarding via relp, suspending\n"); @@ -428,8 +456,8 @@ CODESTARTdoAction } if(pData->rebindInterval != 0 && - (++pData->nSent >= pData->rebindInterval)) { - doRebind(pData); + (++pWrkrData->nSent >= pData->rebindInterval)) { + doRebind(pWrkrData); } finalize_it: if(pData->bHadAuthFail) @@ -448,7 +476,7 @@ ENDdoAction BEGINendTransaction CODESTARTendTransaction dbgprintf("omrelp: endTransaction\n"); - relpCltHintBurstEnd(pData->pRelpClt); + relpCltHintBurstEnd(pWrkrData->pRelpClt); ENDendTransaction BEGINparseSelectorAct @@ -527,8 +555,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) /* process template */ CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) "RSYSLOG_ForwardFormat")); - CHKiRet(doCreateRelpClient(pData)); - CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -546,6 +572,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_TXIF_OMOD_QUERIES diff --git a/plugins/omruleset/omruleset.c b/plugins/omruleset/omruleset.c index 11765507..73419915 100644 --- a/plugins/omruleset/omruleset.c +++ b/plugins/omruleset/omruleset.c @@ -10,7 +10,7 @@ * * File begun on 2009-11-02 by RGerhards * - * Copyright 2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2009-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -70,6 +70,10 @@ typedef struct _instanceData { uchar *pszRulesetName; /* primarily for debugging/display purposes */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { ruleset_t *pRuleset; /* ruleset to enqueue message to (NULL = Default, not recommended) */ uchar *pszRulesetName; @@ -87,11 +91,21 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature ENDisCompatibleWithFeature +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINfreeInstance CODESTARTfreeInstance free(pData->pszRulesetName); @@ -117,9 +131,9 @@ BEGINdoAction CODESTARTdoAction CHKmalloc(pMsg = MsgDup((msg_t*) ppString[0])); DBGPRINTF(":omruleset: forwarding message %p to ruleset %s[%p]\n", pMsg, - (char*) pData->pszRulesetName, pData->pRuleset); + (char*) pWrkrData->pData->pszRulesetName, pWrkrData->pData->pRuleset); MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); - MsgSetRuleset(pMsg, pData->pRuleset); + MsgSetRuleset(pMsg, pWrkrData->pData->pRuleset); /* Note: we intentionally use submitMsg2() here, as we process messages * that were already run through the rate-limiter. So it is (at least) * questionable if they were rate-limited again. @@ -199,6 +213,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES ENDqueryEtryPt diff --git a/plugins/omsnmp/omsnmp.c b/plugins/omsnmp/omsnmp.c index 42d1de6b..02862c94 100644 --- a/plugins/omsnmp/omsnmp.c +++ b/plugins/omsnmp/omsnmp.c @@ -2,7 +2,7 @@ * * This module sends an snmp trap. * - * Copyright 2007-2012 Adiscon GmbH. + * Copyright 2007-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -74,15 +74,19 @@ typedef struct _instanceData { * http://www.adiscon.org/download/ADISCON-MONITORWARE-MIB.txt * http://www.adiscon.org/download/ADISCON-MIB.txt */ - int iPort; /* Target Port */ - int iSNMPVersion; /* SNMP Version to use */ - int iTrapType; /* Snmp TrapType or GenericType */ - int iSpecificType; /* Snmp Specific Type */ + int iPort; /* Target Port */ + int iSNMPVersion; /* SNMP Version to use */ + int iTrapType; /* Snmp TrapType or GenericType */ + int iSpecificType; /* Snmp Specific Type */ - netsnmp_session *snmpsession; /* Holds to SNMP Session, NULL if not initialized */ - uchar *tplName; /* format template to use */ + uchar *tplName; /* format template to use */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; + netsnmp_session *snmpsession; /* Holds to SNMP Session, NULL if not initialized */ +} wrkrInstanceData_t; + typedef struct configSettings_s { uchar* pszTransport; /* default transport */ uchar* pszTarget; @@ -147,6 +151,10 @@ BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->snmpsession = NULL; +ENDcreateWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -171,14 +179,16 @@ ENDisCompatibleWithFeature /* Exit SNMP Session * alorbach, 2008-02-12 */ -static rsRetVal omsnmp_exitSession(instanceData *pData) +static rsRetVal +omsnmp_exitSession(wrkrInstanceData_t *pWrkrData) { DEFiRet; - if(pData->snmpsession != NULL) { - dbgprintf( "omsnmp_exitSession: Clearing Session to '%s' on Port = '%d'\n", pData->szTarget, pData->iPort); - snmp_close(pData->snmpsession); - pData->snmpsession = NULL; + if(pWrkrData->snmpsession != NULL) { + DBGPRINTF("omsnmp_exitSession: Clearing Session to '%s' on Port = '%d'\n", + pWrkrData->pData->szTarget, pWrkrData->pData->iPort); + snmp_close(pWrkrData->snmpsession); + pWrkrData->snmpsession = NULL; } RETiRet; @@ -187,15 +197,19 @@ static rsRetVal omsnmp_exitSession(instanceData *pData) /* Init SNMP Session * alorbach, 2008-02-12 */ -static rsRetVal omsnmp_initSession(instanceData *pData) +static rsRetVal +omsnmp_initSession(wrkrInstanceData_t *pWrkrData) { netsnmp_session session; + instanceData *pData; char szTargetAndPort[MAXHOSTNAMELEN+128]; /* work buffer for specifying a full target and port string */ DEFiRet; /* should not happen, but if session is not cleared yet - we do it now! */ - if (pData->snmpsession != NULL) - omsnmp_exitSession(pData); + if (pWrkrData->snmpsession != NULL) + omsnmp_exitSession(pWrkrData); + + pData = pWrkrData->pData; snprintf((char*)szTargetAndPort, sizeof(szTargetAndPort), "%s:%s:%d", (pData->szTransport == NULL) ? "udp" : (char*)pData->szTransport, @@ -217,8 +231,8 @@ static rsRetVal omsnmp_initSession(instanceData *pData) session.community_len = strlen((char*) session.community); } - pData->snmpsession = snmp_open(&session); - if (pData->snmpsession == NULL) { + pWrkrData->snmpsession = snmp_open(&session); + if (pWrkrData->snmpsession == NULL) { errmsg.LogError(0, RS_RET_SUSPENDED, "omsnmp_initSession: snmp_open to host '%s' on Port '%d' failed\n", pData->szTarget, pData->iPort); /* Stay suspended */ iRet = RS_RET_SUSPENDED; @@ -227,7 +241,7 @@ static rsRetVal omsnmp_initSession(instanceData *pData) RETiRet; } -static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz) +static rsRetVal omsnmp_sendsnmp(wrkrInstanceData_t *pWrkrData, uchar *psz) { DEFiRet; @@ -239,10 +253,12 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz) int status; char *trap = NULL; const char *strErr = NULL; + instanceData *pData; + pData = pWrkrData->pData; /* Init SNMP Session if necessary */ - if (pData->snmpsession == NULL) { - CHKiRet(omsnmp_initSession(pData)); + if (pWrkrData->snmpsession == NULL) { + CHKiRet(omsnmp_initSession(pWrkrData)); } /* String should not be NULL */ @@ -250,7 +266,7 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz) dbgprintf( "omsnmp_sendsnmp: ENTER - Syslogmessage = '%s'\n", (char*)psz); /* If SNMP Version1 is configured !*/ - if(pData->snmpsession->version == SNMP_VERSION_1) { + if(pWrkrData->snmpsession->version == SNMP_VERSION_1) { pdu = snmp_pdu_create(SNMP_MSG_TRAP); /* Set enterprise */ @@ -275,7 +291,7 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz) pdu->time = get_uptime(); } /* If SNMP Version2c is configured !*/ - else if (pData->snmpsession->version == SNMP_VERSION_2c) + else if (pWrkrData->snmpsession->version == SNMP_VERSION_2c) { long sysuptime; char csysuptime[20]; @@ -320,15 +336,15 @@ static rsRetVal omsnmp_sendsnmp(instanceData *pData, uchar *psz) } /* Send the TRAP */ - status = snmp_send(pData->snmpsession, pdu) == 0; + status = snmp_send(pWrkrData->snmpsession, pdu) == 0; if (status) { /* Debug Output! */ - int iErrorCode = pData->snmpsession->s_snmp_errno; + int iErrorCode = pWrkrData->snmpsession->s_snmp_errno; errmsg.LogError(0, RS_RET_SUSPENDED, "omsnmp_sendsnmp: snmp_send failed error '%d', Description='%s'\n", iErrorCode*(-1), api_errors[iErrorCode*(-1)]); /* Clear Session */ - omsnmp_exitSession(pData); + omsnmp_exitSession(pWrkrData); ABORT_FINALIZE(RS_RET_SUSPENDED); } @@ -347,7 +363,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume - iRet = omsnmp_initSession(pData); + iRet = omsnmp_initSession(pWrkrData); ENDtryResume BEGINdoAction @@ -358,19 +374,20 @@ CODESTARTdoAction } /* This will generate and send the SNMP Trap */ - iRet = omsnmp_sendsnmp(pData, ppString[0]); + iRet = omsnmp_sendsnmp(pWrkrData, ppString[0]); finalize_it: ENDdoAction BEGINfreeInstance CODESTARTfreeInstance - /* free snmp Session here */ - omsnmp_exitSession(pData); - free(pData->tplName); free(pData->szTarget); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + omsnmp_exitSession(pWrkrData); +ENDfreeWrkrInstance static inline void setInstParamDefaults(instanceData *pData) @@ -499,9 +516,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) /* Set some defaults in the NetSNMP library */ netsnmp_ds_set_int(NETSNMP_DS_LIBRARY_ID, NETSNMP_DS_LIB_DEFAULT_PORT, pData->iPort ); - - /* Init Session Pointer */ - pData->snmpsession = NULL; CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct @@ -545,6 +559,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES ENDqueryEtryPt diff --git a/plugins/omstdout/omstdout.c b/plugins/omstdout/omstdout.c index a84a7593..210b0165 100644 --- a/plugins/omstdout/omstdout.c +++ b/plugins/omstdout/omstdout.c @@ -6,7 +6,7 @@ * * File begun on 2009-03-19 by RGerhards * - * Copyright 2009-2012 Adiscon GmbH. + * Copyright 2009-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -60,6 +60,10 @@ typedef struct _instanceData { int bEnsureLFEnding; /* ensure that a linefeed is written at the end of EACH record (test aid for nettester) */ } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { int bUseArrayInterface; /* shall action use array instead of string template interface? */ int bEnsureLFEnding; /* shall action use array instead of string template interface? */ @@ -76,6 +80,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -88,6 +97,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo ENDdbgPrintInstInfo @@ -107,7 +121,7 @@ BEGINdoAction size_t len; int r; CODESTARTdoAction - if(pData->bUseArrayInterface) { + if(pWrkrData->pData->bUseArrayInterface) { /* if we use array passing, we need to put together a string * ourselves. At this point, please keep in mind that omstdout is * primarily a testing aid. Other modules may do different processing @@ -145,7 +159,7 @@ CODESTARTdoAction DBGPRINTF("omstdout: error %d writing to stdout[%d]: %s\n", r, len, toWrite); } - if(pData->bEnsureLFEnding && toWrite[len-1] != '\n') { + if(pWrkrData->pData->bEnsureLFEnding && toWrite[len-1] != '\n') { if((r = write(1, "\n", 1)) != 1) { /* write missing LF */ DBGPRINTF("omstdout: error %d writing \\n to stdout\n", r); @@ -186,6 +200,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES ENDqueryEtryPt diff --git a/plugins/omtesting/omtesting.c b/plugins/omtesting/omtesting.c index c9f1e06b..2cc1159e 100644 --- a/plugins/omtesting/omtesting.c +++ b/plugins/omtesting/omtesting.c @@ -22,7 +22,7 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * Copyright 2007-2012 Rainer Gerhards and Adiscon GmbH. + * Copyright 2007-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -63,7 +63,6 @@ MODULE_CNFNAME("omtesting") */ DEF_OMOD_STATIC_DATA - typedef struct _instanceData { enum { MD_SLEEP, MD_FAIL, MD_RANDFAIL, MD_ALWAYS_SUSPEND } mode; @@ -76,6 +75,10 @@ typedef struct _instanceData { int iCurrRetries; } instanceData; +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + typedef struct configSettings_s { int bEchoStdout; /* echo non-failed messages to stdout */ } configSettings_t; @@ -93,6 +96,11 @@ CODESTARTcreateInstance ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("Action delays rule by %d second(s) and %d millisecond(s)\n", @@ -170,11 +178,11 @@ static rsRetVal doRandFail(void) BEGINtryResume CODESTARTtryResume dbgprintf("omtesting tryResume() called\n"); - switch(pData->mode) { + switch(pWrkrData->pData->mode) { case MD_SLEEP: break; case MD_FAIL: - iRet = doFailOnResume(pData); + iRet = doFailOnResume(pWrkrData->pData); break; case MD_RANDFAIL: iRet = doRandFail(); @@ -187,8 +195,10 @@ ENDtryResume BEGINdoAction + instanceData *pData; CODESTARTdoAction dbgprintf("omtesting received msg '%s'\n", ppString[0]); + pData = pWrkrData->pData; switch(pData->mode) { case MD_SLEEP: iRet = doSleep(pData); @@ -220,6 +230,11 @@ CODESTARTfreeInstance ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + + BEGINparseSelectorAct int i; uchar szBuf[1024]; @@ -313,6 +328,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES ENDqueryEtryPt diff --git a/plugins/omudpspoof/omudpspoof.c b/plugins/omudpspoof/omudpspoof.c index cb907bba..ad3508c8 100644 --- a/plugins/omudpspoof/omudpspoof.c +++ b/plugins/omudpspoof/omudpspoof.c @@ -98,15 +98,19 @@ typedef struct _instanceData { uchar *port; uchar *sourceTpl; int mtu; - int *pSockArray; /* sockets to use for UDP */ - struct addrinfo *f_addr; - u_short sourcePort; u_short sourcePortStart; /* for sorce port iteration */ u_short sourcePortEnd; int bReportLibnetInitErr; /* help prevent multiple error messages on init err */ +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; libnet_t *libnet_handle; + u_short sourcePort; + int *pSockArray; /* sockets to use for UDP */ + struct addrinfo *f_addr; char errbuf[LIBNET_ERRBUF_SIZE]; -} instanceData; +} wrkrInstanceData_t; #define DFLT_SOURCE_PORT_START 32000 #define DFLT_SOURCE_PORT_END 42000 @@ -172,7 +176,7 @@ ENDinitConfVars pthread_mutex_t mutLibnet; /* forward definitions */ -static rsRetVal doTryResume(instanceData *pData); +static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData); /* this function gets the default template. It coordinates action between @@ -217,15 +221,14 @@ finalize_it: * rgerhards, 2009-05-29 */ static rsRetVal -closeUDPSockets(instanceData *pData) +closeUDPSockets(wrkrInstanceData_t *pWrkrData) { DEFiRet; - assert(pData != NULL); - if(pData->pSockArray != NULL) { - net.closeUDPListenSockets(pData->pSockArray); - pData->pSockArray = NULL; - freeaddrinfo(pData->f_addr); - pData->f_addr = NULL; + if(pWrkrData->pSockArray != NULL) { + net.closeUDPListenSockets(pWrkrData->pSockArray); + pWrkrData->pSockArray = NULL; + freeaddrinfo(pWrkrData->f_addr); + pWrkrData->f_addr = NULL; } RETiRet; } @@ -310,12 +313,17 @@ ENDfreeCnf BEGINcreateInstance CODESTARTcreateInstance - pData->libnet_handle = NULL; pData->mtu = 1500; pData->bReportLibnetInitErr = 1; ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance + pWrkrData->libnet_handle = NULL; + pWrkrData->sourcePort = pData->sourcePortStart; +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -326,15 +334,19 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance /* final cleanup */ - closeUDPSockets(pData); free(pData->tplName); free(pData->port); free(pData->host); free(pData->sourceTpl); - if(pData->libnet_handle != NULL) - libnet_destroy(pData->libnet_handle); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + closeUDPSockets(pWrkrData); + if(pWrkrData->libnet_handle != NULL) + libnet_destroy(pWrkrData->libnet_handle); +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -348,11 +360,12 @@ ENDdbgPrintInstInfo * rgehards, 2007-12-20 */ static inline rsRetVal -UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) +UDPSend(wrkrInstanceData_t *pWrkrData, uchar *pszSourcename, char *msg, size_t len) { struct addrinfo *r; int lsent = 0; int bSendSuccess; + instanceData *pData; struct sockaddr_in *tempaddr,source_ip; libnet_ptag_t ip, ipo; libnet_ptag_t udp; @@ -363,9 +376,10 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) unsigned maxPktLen, pktLen; DEFiRet; - if(pData->pSockArray == NULL) { - CHKiRet(doTryResume(pData)); + if(pWrkrData->pSockArray == NULL) { + CHKiRet(doTryResume(pWrkrData)); } + pData = pWrkrData->pData; if(len > 65528) { DBGPRINTF("omudpspoof: msg with length %d truncated to 64k: '%.768s'\n", @@ -374,8 +388,8 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) } ip = ipo = udp = 0; - if(pData->sourcePort++ >= pData->sourcePortEnd){ - pData->sourcePort = pData->sourcePortStart; + if(pWrkrData->sourcePort++ >= pData->sourcePortEnd){ + pWrkrData->sourcePort = pData->sourcePortStart; } inet_pton(AF_INET, (char*)pszSourcename, &(source_ip.sin_addr)); @@ -383,7 +397,7 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) bSendSuccess = RSFALSE; d_pthread_mutex_lock(&mutLibnet); bNeedUnlock = 1; - for (r = pData->f_addr; r && bSendSuccess == RSFALSE ; r = r->ai_next) { + for (r = pWrkrData->f_addr; r && bSendSuccess == RSFALSE ; r = r->ai_next) { tempaddr = (struct sockaddr_in *)r->ai_addr; /* Getting max payload size (must be multiple of 8) */ maxPktLen = (pData->mtu - LIBNET_IPV4_H) & ~0x07; @@ -400,19 +414,19 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) } DBGPRINTF("omudpspoof: stage 1: MF:%d, hdrOffs %d, pktLen %d\n", (hdrOffs & IP_MF) >> 13, (hdrOffs & 0x1FFF) << 3, pktLen); - libnet_clear_packet(pData->libnet_handle); + libnet_clear_packet(pWrkrData->libnet_handle); /* note: libnet does need ports in host order NOT in network byte order! -- rgerhards, 2009-11-12 */ udp = libnet_build_udp( - ntohs(pData->sourcePort),/* source port */ + ntohs(pWrkrData->sourcePort),/* source port */ ntohs(tempaddr->sin_port),/* destination port */ pktLen+LIBNET_UDP_H, /* packet length */ 0, /* checksum */ (u_char*)msg, /* payload */ pktLen, /* payload size */ - pData->libnet_handle, /* libnet handle */ + pWrkrData->libnet_handle, /* libnet handle */ udp); /* libnet id */ if (udp == -1) { - DBGPRINTF("omudpspoof: can't build UDP header: %s\n", libnet_geterror(pData->libnet_handle)); + DBGPRINTF("omudpspoof: can't build UDP header: %s\n", libnet_geterror(pWrkrData->libnet_handle)); } ip = libnet_build_ipv4( @@ -427,22 +441,22 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) tempaddr->sin_addr.s_addr, NULL, /* payload */ 0, /* payload size */ - pData->libnet_handle, /* libnet handle */ + pWrkrData->libnet_handle, /* libnet handle */ ip); /* libnet id */ if (ip == -1) { - DBGPRINTF("omudpspoof: can't build IP header: %s\n", libnet_geterror(pData->libnet_handle)); + DBGPRINTF("omudpspoof: can't build IP header: %s\n", libnet_geterror(pWrkrData->libnet_handle)); } /* Write it to the wire. */ - lsent = libnet_write(pData->libnet_handle); + lsent = libnet_write(pWrkrData->libnet_handle); if(lsent != (int) (LIBNET_IPV4_H+LIBNET_UDP_H+pktLen)) { /* note: access to fd is a libnet internal. If a newer version of libnet does * not expose that member, we should simply remove it. However, while it is there * it is useful for consolidating with strace output. */ DBGPRINTF("omudpspoof: write error (total len %d): pktLen %d, sent %d, fd %d: %s\n", - len, LIBNET_IPV4_H+LIBNET_UDP_H+pktLen, lsent, pData->libnet_handle->fd, - libnet_geterror(pData->libnet_handle)); + len, LIBNET_IPV4_H+LIBNET_UDP_H+pktLen, lsent, pWrkrData->libnet_handle->fd, + libnet_geterror(pWrkrData->libnet_handle)); if(lsent != -1) { bSendSuccess = RSTRUE; } @@ -452,7 +466,7 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) msgOffs += pktLen; /* We need to get rid of the UDP header to build the other fragments */ - libnet_clear_packet(pData->libnet_handle); + libnet_clear_packet(pWrkrData->libnet_handle); ip = LIBNET_PTAG_INITIALIZER; while(len > msgOffs ) { /* loop until all payload is sent */ /* check if there will be more fragments */ @@ -481,16 +495,16 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) tempaddr->sin_addr.s_addr, (uint8_t*)(msg+msgOffs), /* payload */ pktLen, /* payload size */ - pData->libnet_handle, /* libnet handle */ + pWrkrData->libnet_handle, /* libnet handle */ ip); /* libnet id */ if (ip == -1) { - DBGPRINTF("omudpspoof: can't build IP fragment header: %s\n", libnet_geterror(pData->libnet_handle)); + DBGPRINTF("omudpspoof: can't build IP fragment header: %s\n", libnet_geterror(pWrkrData->libnet_handle)); } /* Write it to the wire. */ - lsent = libnet_write(pData->libnet_handle); + lsent = libnet_write(pWrkrData->libnet_handle); if(lsent != (int) (LIBNET_IPV4_H+pktLen)) { DBGPRINTF("omudpspoof: fragment write error len %d, sent %d: %s\n", - LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pData->libnet_handle)); + LIBNET_IPV4_H+LIBNET_UDP_H+len, lsent, libnet_geterror(pWrkrData->libnet_handle)); bSendSuccess = RSFALSE; continue; } @@ -500,9 +514,9 @@ UDPSend(instanceData *pData, uchar *pszSourcename, char *msg, size_t len) finalize_it: if(iRet != RS_RET_OK) { - if(pData->libnet_handle != NULL) { - libnet_destroy(pData->libnet_handle); - pData->libnet_handle = NULL; + if(pWrkrData->libnet_handle != NULL) { + libnet_destroy(pWrkrData->libnet_handle); + pWrkrData->libnet_handle = NULL; } } if(bNeedUnlock) { @@ -515,26 +529,28 @@ finalize_it: /* try to resume connection if it is not ready * rgerhards, 2007-08-02 */ -static rsRetVal doTryResume(instanceData *pData) +static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData) { int iErr; struct addrinfo *res; struct addrinfo hints; + instanceData *pData; DEFiRet; - if(pData->pSockArray != NULL) + if(pWrkrData->pSockArray != NULL) FINALIZE; + pData = pWrkrData->pData; - if(pData->libnet_handle == NULL) { + if(pWrkrData->libnet_handle == NULL) { /* Initialize the libnet library. Root priviledges are required. * this initializes a IPv4 socket to use for forging UDP packets. */ - pData->libnet_handle = libnet_init( + pWrkrData->libnet_handle = libnet_init( LIBNET_RAW4, /* injection type */ NULL, /* network interface */ - pData->errbuf); /* errbuf */ + pWrkrData->errbuf); /* errbuf */ - if(pData->libnet_handle == NULL) { + if(pWrkrData->libnet_handle == NULL) { if(pData->bReportLibnetInitErr) { errmsg.LogError(0, RS_RET_ERR_LIBNET_INIT, "omudpsoof: error " "initializing libnet - are you running as root?"); @@ -559,14 +575,14 @@ static rsRetVal doTryResume(instanceData *pData) ABORT_FINALIZE(RS_RET_SUSPENDED); } DBGPRINTF("%s found, resuming.\n", pData->host); - pData->f_addr = res; - pData->pSockArray = net.create_udp_socket((uchar*)pData->host, NULL, 0, 0); + pWrkrData->f_addr = res; + pWrkrData->pSockArray = net.create_udp_socket((uchar*)pData->host, NULL, 0, 0); finalize_it: if(iRet != RS_RET_OK) { - if(pData->f_addr != NULL) { - freeaddrinfo(pData->f_addr); - pData->f_addr = NULL; + if(pWrkrData->f_addr != NULL) { + freeaddrinfo(pWrkrData->f_addr); + pWrkrData->f_addr = NULL; } iRet = RS_RET_SUSPENDED; } @@ -577,7 +593,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume - iRet = doTryResume(pData); + iRet = doTryResume(pWrkrData); ENDtryResume BEGINdoAction @@ -585,10 +601,10 @@ BEGINdoAction unsigned l; int iMaxLine; CODESTARTdoAction - CHKiRet(doTryResume(pData)); + CHKiRet(doTryResume(pWrkrData)); - DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%.256s'\n", pData->host, - getFwdPt(pData), ppString[1], ppString[0]); + DBGPRINTF(" %s:%s/omudpspoof, src '%s', msg strt '%.256s'\n", pWrkrData->pData->host, + getFwdPt(pWrkrData->pData), ppString[1], ppString[0]); iMaxLine = glbl.GetMaxLine(); psz = (char*) ppString[0]; @@ -596,7 +612,7 @@ CODESTARTdoAction if((int) l > iMaxLine) l = iMaxLine; - CHKiRet(UDPSend(pData, ppString[1], psz, l)); + CHKiRet(UDPSend(pWrkrData, ppString[1], psz, l)); finalize_it: ENDdoAction @@ -660,7 +676,6 @@ CODESTARTnewActInst } } CODE_STD_STRING_REQUESTnewActInst(2) - pData->sourcePort = pData->sourcePortStart; tplToUse = ustrdup((pData->tplName == NULL) ? getDfltTpl() : pData->tplName); CHKiRet(OMSRsetEntry(*ppOMSR, 0, tplToUse, OMSR_NO_RQD_TPL_OPTS)); @@ -699,7 +714,7 @@ CODE_STD_STRING_REQUESTparseSelectorAct(2) else CHKmalloc(pData->port = ustrdup(cs.pszTargetPort)); CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(sourceTpl), OMSR_NO_RQD_TPL_OPTS)); - pData->sourcePort = pData->sourcePortStart = cs.iSourcePortStart; + pData->sourcePortStart = cs.iSourcePortStart; pData->sourcePortEnd = cs.iSourcePortEnd; /* process template */ @@ -740,6 +755,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES diff --git a/plugins/omuxsock/omuxsock.c b/plugins/omuxsock/omuxsock.c index 583b9f94..da4e8e94 100644 --- a/plugins/omuxsock/omuxsock.c +++ b/plugins/omuxsock/omuxsock.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h to understand how this file * works! * - * Copyright 2010-2012 Adiscon GmbH. + * Copyright 2010-2013 Adiscon GmbH. * * This file is part of rsyslog. * @@ -60,10 +60,14 @@ typedef struct _instanceData { permittedPeers_t *pPermPeers; uchar *sockName; int sock; - int bIsConnected; /* are we connected to remote host? 0 - no, 1 - yes, UDP means addr resolved */ struct sockaddr_un addr; } instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; +} wrkrInstanceData_t; + /* config data */ typedef struct configSettings_s { uchar *tplName; /* name of the default template to use */ @@ -90,6 +94,7 @@ static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current l static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */ +static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; BEGINinitConfVars /* (re)set config variables to default values */ CODESTARTinitConfVars @@ -147,7 +152,6 @@ closeSocket(instanceData *pData) close(pData->sock); pData->sock = INVLD_SOCK; } -pData->bIsConnected = 0; // TODO: remove this variable altogether RETiRet; } @@ -224,6 +228,10 @@ CODESTARTcreateInstance pData->sock = INVLD_SOCK; ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature @@ -239,6 +247,10 @@ CODESTARTfreeInstance free(pData->sockName); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -332,7 +344,7 @@ static rsRetVal doTryResume(instanceData *pData) BEGINtryResume CODESTARTtryResume - iRet = doTryResume(pData); + iRet = doTryResume(pWrkrData->pData); ENDtryResume BEGINdoAction @@ -340,20 +352,22 @@ BEGINdoAction register unsigned l; int iMaxLine; CODESTARTdoAction - CHKiRet(doTryResume(pData)); + pthread_mutex_lock(&mutDoAct); + CHKiRet(doTryResume(pWrkrData->pData)); iMaxLine = glbl.GetMaxLine(); - DBGPRINTF(" omuxsock:%s\n", pData->sockName); + DBGPRINTF(" omuxsock:%s\n", pWrkrData->pData->sockName); psz = (char*) ppString[0]; l = strlen((char*) psz); if((int) l > iMaxLine) l = iMaxLine; - CHKiRet(sendMsg(pData, psz, l)); + CHKiRet(sendMsg(pWrkrData->pData, psz, l)); finalize_it: + pthread_mutex_unlock(&mutDoAct); ENDdoAction @@ -413,6 +427,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_STD_CONF2_QUERIES CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES ENDqueryEtryPt |