summaryrefslogtreecommitdiffstats
path: root/plugins/omrelp
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omrelp')
-rw-r--r--plugins/omrelp/omrelp.c135
1 files changed, 81 insertions, 54 deletions
diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c
index 34511e46..3b1584e2 100644
--- a/plugins/omrelp/omrelp.c
+++ b/plugins/omrelp/omrelp.c
@@ -2,8 +2,17 @@
*
* This is the implementation of the RELP output module.
*
- * NOTE: read comments in module-template.h to understand how this file
- * works!
+ * Note that when multiple action workers are activated, we currently
+ * also create multiple actions. This may be the source of some mild
+ * message loss (!) if the worker instance is shut down while the
+ * connection to the remote system is in retry state.
+ * TODO: think if we should implement a mode where we do NOT
+ * support multiple action worker instances. This would be
+ * slower, but not have this loss opportunity. But it should
+ * definitely be optional and by default off due to the
+ * performance implications (and given the fact that message
+ * loss is pretty unlikely in usual cases).
+ *
*
* File begun on 2008-03-13 by RGerhards
*
@@ -63,13 +72,9 @@ static relpEngine_t *pRelpEngine; /* our relp engine */
typedef struct _instanceData {
uchar *target;
uchar *port;
- int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
- int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
int sizeWindow; /**< the RELP window size - 0=use default */
unsigned timeout;
unsigned rebindInterval;
- unsigned nSent;
- relpClt_t *pRelpClt; /* relp client for this instance */
sbool bEnableTLS;
sbool bEnableTLSZip;
sbool bHadAuthFail; /**< set on auth failure, will cause retry to disable action */
@@ -85,11 +90,20 @@ typedef struct _instanceData {
} permittedPeers;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
+ int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
+ relpClt_t *pRelpClt; /* relp client for this instance */
+ unsigned nSent; /* number msgs sent - for rebind support */
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
EMPTY_STRUCT
} configSettings_t;
static configSettings_t __attribute__((unused)) cs;
+static rsRetVal doCreateRelpClient(wrkrInstanceData_t *pWrkrData);
/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
@@ -135,10 +149,10 @@ static uchar *getRelpPt(instanceData *pData)
static void
onErr(void *pUsr, char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
{
- instanceData *pData = (instanceData*) pUsr;
+ wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) pUsr;
errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: error '%s', object "
" '%s' - action may not work as intended",
- pData->target, pData->port, errmesg, objinfo);
+ pWrkrData->pData->target, pWrkrData->pData->port, errmesg, objinfo);
}
static void
@@ -152,55 +166,58 @@ onGenericErr(char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal er
static void
onAuthErr(void *pUsr, char *authinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
{
- instanceData *pData = (instanceData*) pUsr;
+ instanceData *pData = ((wrkrInstanceData_t*) pUsr)->pData;
errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: authentication error '%s', peer "
"is '%s' - DISABLING action", pData->target, pData->port, errmesg, authinfo);
pData->bHadAuthFail = 1;
}
-static inline rsRetVal
-doCreateRelpClient(instanceData *pData)
+static rsRetVal
+doCreateRelpClient(wrkrInstanceData_t *pWrkrData)
{
int i;
+ instanceData *pData;
DEFiRet;
- if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK)
+
+ pData = pWrkrData->pData;
+ if(relpEngineCltConstruct(pRelpEngine, &pWrkrData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK)
+ if(relpCltSetTimeout(pWrkrData->pRelpClt, pData->timeout) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetWindowSize(pData->pRelpClt, pData->sizeWindow) != RELP_RET_OK)
+ if(relpCltSetWindowSize(pWrkrData->pRelpClt, pData->sizeWindow) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetUsrPtr(pData->pRelpClt, pData) != RELP_RET_OK)
+ if(relpCltSetUsrPtr(pWrkrData->pRelpClt, pWrkrData) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
if(pData->bEnableTLS) {
- if(relpCltEnableTLS(pData->pRelpClt) != RELP_RET_OK)
+ if(relpCltEnableTLS(pWrkrData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
if(pData->bEnableTLSZip) {
- if(relpCltEnableTLSZip(pData->pRelpClt) != RELP_RET_OK)
+ if(relpCltEnableTLSZip(pWrkrData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
}
- if(relpCltSetGnuTLSPriString(pData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
+ if(relpCltSetGnuTLSPriString(pWrkrData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetAuthMode(pData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) {
+ if(relpCltSetAuthMode(pWrkrData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) {
errmsg.LogError(0, RS_RET_RELP_ERR,
"omrelp: invalid auth mode '%s'\n", pData->authmode);
ABORT_FINALIZE(RS_RET_RELP_ERR);
}
- if(relpCltSetCACert(pData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
+ if(relpCltSetCACert(pWrkrData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetOwnCert(pData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
+ if(relpCltSetOwnCert(pWrkrData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetPrivKey(pData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
+ if(relpCltSetPrivKey(pWrkrData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
for(i = 0 ; i < pData->permittedPeers.nmemb ; ++i) {
- relpCltAddPermittedPeer(pData->pRelpClt, (char*)pData->permittedPeers.name[i]);
+ relpCltAddPermittedPeer(pWrkrData->pRelpClt, (char*)pData->permittedPeers.name[i]);
}
}
if(glbl.GetSourceIPofLocalClient() == NULL) { /* ar Do we have a client IP set? */
- if(relpCltSetClientIP(pData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
+ if(relpCltSetClientIP(pWrkrData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
}
- pData->bInitialConnect = 1;
- pData->nSent = 0;
+ pWrkrData->bInitialConnect = 1;
+ pWrkrData->nSent = 0;
finalize_it:
RETiRet;
}
@@ -221,11 +238,15 @@ CODESTARTcreateInstance
pData->permittedPeers.nmemb = 0;
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->pRelpClt = NULL;
+ iRet = doCreateRelpClient(pWrkrData);
+ENDcreateWrkrInstance
+
BEGINfreeInstance
int i;
CODESTARTfreeInstance
- if(pData->pRelpClt != NULL)
- relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt);
free(pData->target);
free(pData->port);
free(pData->tplName);
@@ -239,6 +260,12 @@ CODESTARTfreeInstance
}
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ if(pWrkrData->pRelpClt != NULL)
+ relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt);
+ENDfreeWrkrInstance
+
static inline void
setInstParamDefaults(instanceData *pData)
{
@@ -318,8 +345,6 @@ CODESTARTnewActInst
"RSYSLOG_ForwardFormat" : (char*)pData->tplName),
OMSR_NO_RQD_TPL_OPTS));
- CHKiRet(doCreateRelpClient(pData));
-
CODE_STD_FINALIZERnewActInst
if(pvals != NULL)
cnfparamvalsDestruct(pvals, &actpblk);
@@ -347,22 +372,23 @@ ENDdbgPrintInstInfo
/* try to connect to server
* rgerhards, 2008-03-21
*/
-static rsRetVal doConnect(instanceData *pData)
+static rsRetVal doConnect(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- if(pData->bInitialConnect) {
- iRet = relpCltConnect(pData->pRelpClt, glbl.GetDefPFFamily(), pData->port, pData->target);
+ if(pWrkrData->bInitialConnect) {
+ iRet = relpCltConnect(pWrkrData->pRelpClt, glbl.GetDefPFFamily(),
+ pWrkrData->pData->port, pWrkrData->pData->target);
if(iRet == RELP_RET_OK)
- pData->bInitialConnect = 0;
+ pWrkrData->bInitialConnect = 0;
} else {
- iRet = relpCltReconnect(pData->pRelpClt);
+ iRet = relpCltReconnect(pWrkrData->pRelpClt);
}
if(iRet == RELP_RET_OK) {
- pData->bIsConnected = 1;
+ pWrkrData->bIsConnected = 1;
} else {
- pData->bIsConnected = 0;
+ pWrkrData->bIsConnected = 0;
iRet = RS_RET_SUSPENDED;
}
@@ -372,21 +398,21 @@ static rsRetVal doConnect(instanceData *pData)
BEGINtryResume
CODESTARTtryResume
- if(pData->bHadAuthFail) {
+ if(pWrkrData->pData->bHadAuthFail) {
ABORT_FINALIZE(RS_RET_DISABLE_ACTION);
}
- iRet = doConnect(pData);
+ iRet = doConnect(pWrkrData);
finalize_it:
ENDtryResume
static inline rsRetVal
-doRebind(instanceData *pData)
+doRebind(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n");
- CHKiRet(relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt));
- pData->bIsConnected = 0;
- CHKiRet(doCreateRelpClient(pData));
+ CHKiRet(relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt));
+ pWrkrData->bIsConnected = 0;
+ CHKiRet(doCreateRelpClient(pWrkrData));
finalize_it:
RETiRet;
}
@@ -394,10 +420,10 @@ finalize_it:
BEGINbeginTransaction
CODESTARTbeginTransaction
dbgprintf("omrelp: beginTransaction\n");
- if(!pData->bIsConnected) {
- CHKiRet(doConnect(pData));
+ if(!pWrkrData->bIsConnected) {
+ CHKiRet(doConnect(pWrkrData));
}
- relpCltHintBurstBegin(pData->pRelpClt);
+ relpCltHintBurstBegin(pWrkrData->pRelpClt);
finalize_it:
ENDbeginTransaction
@@ -405,11 +431,13 @@ BEGINdoAction
uchar *pMsg; /* temporary buffering */
size_t lenMsg;
relpRetVal ret;
+ instanceData *pData;
CODESTARTdoAction
+ pData = pWrkrData->pData;
dbgprintf(" %s:%s/RELP\n", pData->target, getRelpPt(pData));
- if(!pData->bIsConnected) {
- CHKiRet(doConnect(pData));
+ if(!pWrkrData->bIsConnected) {
+ CHKiRet(doConnect(pWrkrData));
}
pMsg = ppString[0];
@@ -420,7 +448,7 @@ CODESTARTdoAction
lenMsg = glbl.GetMaxLine();
/* forward */
- ret = relpCltSendSyslog(pData->pRelpClt, (uchar*) pMsg, lenMsg);
+ ret = relpCltSendSyslog(pWrkrData->pRelpClt, (uchar*) pMsg, lenMsg);
if(ret != RELP_RET_OK) {
/* error! */
dbgprintf("error forwarding via relp, suspending\n");
@@ -428,8 +456,8 @@ CODESTARTdoAction
}
if(pData->rebindInterval != 0 &&
- (++pData->nSent >= pData->rebindInterval)) {
- doRebind(pData);
+ (++pWrkrData->nSent >= pData->rebindInterval)) {
+ doRebind(pWrkrData);
}
finalize_it:
if(pData->bHadAuthFail)
@@ -448,7 +476,7 @@ ENDdoAction
BEGINendTransaction
CODESTARTendTransaction
dbgprintf("omrelp: endTransaction\n");
- relpCltHintBurstEnd(pData->pRelpClt);
+ relpCltHintBurstEnd(pWrkrData->pRelpClt);
ENDendTransaction
BEGINparseSelectorAct
@@ -527,8 +555,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* process template */
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) "RSYSLOG_ForwardFormat"));
- CHKiRet(doCreateRelpClient(pData));
-
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -546,6 +572,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_TXIF_OMOD_QUERIES