summaryrefslogtreecommitdiffstats
path: root/plugins/omrelp/omrelp.c
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/omrelp/omrelp.c')
-rw-r--r--plugins/omrelp/omrelp.c284
1 files changed, 249 insertions, 35 deletions
diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c
index 3e355464..3b1584e2 100644
--- a/plugins/omrelp/omrelp.c
+++ b/plugins/omrelp/omrelp.c
@@ -2,8 +2,17 @@
*
* This is the implementation of the RELP output module.
*
- * NOTE: read comments in module-template.h to understand how this file
- * works!
+ * Note that when multiple action workers are activated, we currently
+ * also create multiple actions. This may be the source of some mild
+ * message loss (!) if the worker instance is shut down while the
+ * connection to the remote system is in retry state.
+ * TODO: think if we should implement a mode where we do NOT
+ * support multiple action worker instances. This would be
+ * slower, but not have this loss opportunity. But it should
+ * definitely be optional and by default off due to the
+ * performance implications (and given the fact that message
+ * loss is pretty unlikely in usual cases).
+ *
*
* File begun on 2008-03-13 by RGerhards
*
@@ -55,29 +64,62 @@ DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(glbl)
+#define DFLT_ENABLE_TLS 0
+#define DFLT_ENABLE_TLSZIP 0
+
static relpEngine_t *pRelpEngine; /* our relp engine */
typedef struct _instanceData {
uchar *target;
uchar *port;
- int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
- int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
+ int sizeWindow; /**< the RELP window size - 0=use default */
unsigned timeout;
- relpClt_t *pRelpClt; /* relp client for this instance */
+ unsigned rebindInterval;
+ sbool bEnableTLS;
+ sbool bEnableTLSZip;
+ sbool bHadAuthFail; /**< set on auth failure, will cause retry to disable action */
+ uchar *pristring; /* GnuTLS priority string (NULL if not to be provided) */
+ uchar *authmode;
+ uchar *caCertFile;
+ uchar *myCertFile;
+ uchar *myPrivKeyFile;
uchar *tplName;
+ struct {
+ int nmemb;
+ uchar **name;
+ } permittedPeers;
} instanceData;
+typedef struct wrkrInstanceData {
+ instanceData *pData;
+ int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
+ int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
+ relpClt_t *pRelpClt; /* relp client for this instance */
+ unsigned nSent; /* number msgs sent - for rebind support */
+} wrkrInstanceData_t;
+
typedef struct configSettings_s {
EMPTY_STRUCT
} configSettings_t;
static configSettings_t __attribute__((unused)) cs;
+static rsRetVal doCreateRelpClient(wrkrInstanceData_t *pWrkrData);
/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "target", eCmdHdlrGetWord, 1 },
+ { "tls", eCmdHdlrBinary, 0 },
+ { "tls.compression", eCmdHdlrBinary, 0 },
+ { "tls.prioritystring", eCmdHdlrString, 0 },
+ { "tls.cacert", eCmdHdlrString, 0 },
+ { "tls.mycert", eCmdHdlrString, 0 },
+ { "tls.myprivkey", eCmdHdlrString, 0 },
+ { "tls.authmode", eCmdHdlrString, 0 },
+ { "tls.permittedpeer", eCmdHdlrArray, 0 },
{ "port", eCmdHdlrGetWord, 0 },
+ { "rebindinterval", eCmdHdlrInt, 0 },
+ { "windowsize", eCmdHdlrInt, 0 },
{ "timeout", eCmdHdlrInt, 0 },
{ "template", eCmdHdlrGetWord, 0 }
};
@@ -104,34 +146,126 @@ static uchar *getRelpPt(instanceData *pData)
return(pData->port);
}
-static inline rsRetVal
-doCreateRelpClient(instanceData *pData)
+static void
+onErr(void *pUsr, char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
{
+ wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) pUsr;
+ errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: error '%s', object "
+ " '%s' - action may not work as intended",
+ pWrkrData->pData->target, pWrkrData->pData->port, errmesg, objinfo);
+}
+
+static void
+onGenericErr(char *objinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
+{
+ errmsg.LogError(0, RS_RET_RELP_ERR, "omrelp: librelp error '%s', object "
+ "'%s' - action may not work as intended",
+ errmesg, objinfo);
+}
+
+static void
+onAuthErr(void *pUsr, char *authinfo, char* errmesg, __attribute__((unused)) relpRetVal errcode)
+{
+ instanceData *pData = ((wrkrInstanceData_t*) pUsr)->pData;
+ errmsg.LogError(0, RS_RET_RELP_AUTH_FAIL, "omrelp[%s:%s]: authentication error '%s', peer "
+ "is '%s' - DISABLING action", pData->target, pData->port, errmesg, authinfo);
+ pData->bHadAuthFail = 1;
+}
+
+static rsRetVal
+doCreateRelpClient(wrkrInstanceData_t *pWrkrData)
+{
+ int i;
+ instanceData *pData;
DEFiRet;
- if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK)
+
+ pData = pWrkrData->pData;
+ if(relpEngineCltConstruct(pRelpEngine, &pWrkrData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
- if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK)
+ if(relpCltSetTimeout(pWrkrData->pRelpClt, pData->timeout) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetWindowSize(pWrkrData->pRelpClt, pData->sizeWindow) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetUsrPtr(pWrkrData->pRelpClt, pWrkrData) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(pData->bEnableTLS) {
+ if(relpCltEnableTLS(pWrkrData->pRelpClt) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(pData->bEnableTLSZip) {
+ if(relpCltEnableTLSZip(pWrkrData->pRelpClt) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ if(relpCltSetGnuTLSPriString(pWrkrData->pRelpClt, (char*) pData->pristring) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetAuthMode(pWrkrData->pRelpClt, (char*) pData->authmode) != RELP_RET_OK) {
+ errmsg.LogError(0, RS_RET_RELP_ERR,
+ "omrelp: invalid auth mode '%s'\n", pData->authmode);
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ if(relpCltSetCACert(pWrkrData->pRelpClt, (char*) pData->caCertFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetOwnCert(pWrkrData->pRelpClt, (char*) pData->myCertFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetPrivKey(pWrkrData->pRelpClt, (char*) pData->myPrivKeyFile) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ for(i = 0 ; i < pData->permittedPeers.nmemb ; ++i) {
+ relpCltAddPermittedPeer(pWrkrData->pRelpClt, (char*)pData->permittedPeers.name[i]);
+ }
+ }
+ if(glbl.GetSourceIPofLocalClient() == NULL) { /* ar Do we have a client IP set? */
+ if(relpCltSetClientIP(pWrkrData->pRelpClt, glbl.GetSourceIPofLocalClient()) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
+ }
+ pWrkrData->bInitialConnect = 1;
+ pWrkrData->nSent = 0;
finalize_it:
RETiRet;
}
-
BEGINcreateInstance
CODESTARTcreateInstance
- pData->bInitialConnect = 1;
+ pData->sizeWindow = 0;
pData->timeout = 90;
+ pData->rebindInterval = 0;
+ pData->bEnableTLS = DFLT_ENABLE_TLS;
+ pData->bEnableTLSZip = DFLT_ENABLE_TLSZIP;
+ pData->bHadAuthFail = 0;
+ pData->pristring = NULL;
+ pData->authmode = NULL;
+ pData->caCertFile = NULL;
+ pData->myCertFile = NULL;
+ pData->myPrivKeyFile = NULL;
+ pData->permittedPeers.nmemb = 0;
ENDcreateInstance
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+ pWrkrData->pRelpClt = NULL;
+ iRet = doCreateRelpClient(pWrkrData);
+ENDcreateWrkrInstance
+
BEGINfreeInstance
+ int i;
CODESTARTfreeInstance
- if(pData->pRelpClt != NULL)
- relpEngineCltDestruct(pRelpEngine, &pData->pRelpClt);
free(pData->target);
free(pData->port);
free(pData->tplName);
+ free(pData->pristring);
+ free(pData->authmode);
+ free(pData->caCertFile);
+ free(pData->myCertFile);
+ free(pData->myPrivKeyFile);
+ for(i = 0 ; i < pData->permittedPeers.nmemb ; ++i) {
+ free(pData->permittedPeers.name[i]);
+ }
ENDfreeInstance
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+ if(pWrkrData->pRelpClt != NULL)
+ relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt);
+ENDfreeWrkrInstance
+
static inline void
setInstParamDefaults(instanceData *pData)
{
@@ -139,12 +273,22 @@ setInstParamDefaults(instanceData *pData)
pData->port = NULL;
pData->tplName = NULL;
pData->timeout = 90;
+ pData->sizeWindow = 0;
+ pData->rebindInterval = 0;
+ pData->bEnableTLS = DFLT_ENABLE_TLS;
+ pData->bEnableTLSZip = DFLT_ENABLE_TLSZIP;
+ pData->pristring = NULL;
+ pData->authmode = NULL;
+ pData->caCertFile = NULL;
+ pData->myCertFile = NULL;
+ pData->myPrivKeyFile = NULL;
+ pData->permittedPeers.nmemb = 0;
}
BEGINnewActInst
struct cnfparamvals *pvals;
- int i;
+ int i,j;
CODESTARTnewActInst
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
@@ -164,6 +308,31 @@ CODESTARTnewActInst
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "timeout")) {
pData->timeout = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "rebindinterval")) {
+ pData->rebindInterval = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "windowsize")) {
+ pData->sizeWindow = (int) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls")) {
+ pData->bEnableTLS = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls.compression")) {
+ pData->bEnableTLSZip = (unsigned) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "tls.prioritystring")) {
+ pData->pristring = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) {
+ pData->caCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) {
+ pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) {
+ pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.authmode")) {
+ pData->authmode = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "tls.permittedpeer")) {
+ pData->permittedPeers.nmemb = pvals[i].val.d.ar->nmemb;
+ CHKmalloc(pData->permittedPeers.name =
+ malloc(sizeof(uchar*) * pData->permittedPeers.nmemb));
+ for(j = 0 ; j < pvals[i].val.d.ar->nmemb ; ++j) {
+ pData->permittedPeers.name[j] = (uchar*)es_str2cstr(pvals[i].val.d.ar->arr[j], NULL);
+ }
} else {
dbgprintf("omrelp: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
@@ -176,8 +345,6 @@ CODESTARTnewActInst
"RSYSLOG_ForwardFormat" : (char*)pData->tplName),
OMSR_NO_RQD_TPL_OPTS));
- CHKiRet(doCreateRelpClient(pData));
-
CODE_STD_FINALIZERnewActInst
if(pvals != NULL)
cnfparamvalsDestruct(pvals, &actpblk);
@@ -205,22 +372,23 @@ ENDdbgPrintInstInfo
/* try to connect to server
* rgerhards, 2008-03-21
*/
-static rsRetVal doConnect(instanceData *pData)
+static rsRetVal doConnect(wrkrInstanceData_t *pWrkrData)
{
DEFiRet;
- if(pData->bInitialConnect) {
- iRet = relpCltConnect(pData->pRelpClt, glbl.GetDefPFFamily(), pData->port, pData->target);
+ if(pWrkrData->bInitialConnect) {
+ iRet = relpCltConnect(pWrkrData->pRelpClt, glbl.GetDefPFFamily(),
+ pWrkrData->pData->port, pWrkrData->pData->target);
if(iRet == RELP_RET_OK)
- pData->bInitialConnect = 0;
+ pWrkrData->bInitialConnect = 0;
} else {
- iRet = relpCltReconnect(pData->pRelpClt);
+ iRet = relpCltReconnect(pWrkrData->pRelpClt);
}
if(iRet == RELP_RET_OK) {
- pData->bIsConnected = 1;
+ pWrkrData->bIsConnected = 1;
} else {
- pData->bIsConnected = 0;
+ pWrkrData->bIsConnected = 0;
iRet = RS_RET_SUSPENDED;
}
@@ -230,40 +398,87 @@ static rsRetVal doConnect(instanceData *pData)
BEGINtryResume
CODESTARTtryResume
- iRet = doConnect(pData);
+ if(pWrkrData->pData->bHadAuthFail) {
+ ABORT_FINALIZE(RS_RET_DISABLE_ACTION);
+ }
+ iRet = doConnect(pWrkrData);
+finalize_it:
ENDtryResume
+static inline rsRetVal
+doRebind(wrkrInstanceData_t *pWrkrData)
+{
+ DEFiRet;
+ DBGPRINTF("omrelp: destructing relp client due to rebindInterval\n");
+ CHKiRet(relpEngineCltDestruct(pRelpEngine, &pWrkrData->pRelpClt));
+ pWrkrData->bIsConnected = 0;
+ CHKiRet(doCreateRelpClient(pWrkrData));
+finalize_it:
+ RETiRet;
+}
+
+BEGINbeginTransaction
+CODESTARTbeginTransaction
+dbgprintf("omrelp: beginTransaction\n");
+ if(!pWrkrData->bIsConnected) {
+ CHKiRet(doConnect(pWrkrData));
+ }
+ relpCltHintBurstBegin(pWrkrData->pRelpClt);
+finalize_it:
+ENDbeginTransaction
BEGINdoAction
uchar *pMsg; /* temporary buffering */
size_t lenMsg;
relpRetVal ret;
+ instanceData *pData;
CODESTARTdoAction
+ pData = pWrkrData->pData;
dbgprintf(" %s:%s/RELP\n", pData->target, getRelpPt(pData));
- if(!pData->bIsConnected) {
- CHKiRet(doConnect(pData));
+ if(!pWrkrData->bIsConnected) {
+ CHKiRet(doConnect(pWrkrData));
}
pMsg = ppString[0];
lenMsg = strlen((char*) pMsg); /* TODO: don't we get this? */
- /* TODO: think about handling oversize messages! */
+ /* we need to truncate oversize msgs - no way around that... */
if((int) lenMsg > glbl.GetMaxLine())
lenMsg = glbl.GetMaxLine();
/* forward */
- ret = relpCltSendSyslog(pData->pRelpClt, (uchar*) pMsg, lenMsg);
+ ret = relpCltSendSyslog(pWrkrData->pRelpClt, (uchar*) pMsg, lenMsg);
if(ret != RELP_RET_OK) {
/* error! */
dbgprintf("error forwarding via relp, suspending\n");
- iRet = RS_RET_SUSPENDED;
+ ABORT_FINALIZE(RS_RET_SUSPENDED);
}
+ if(pData->rebindInterval != 0 &&
+ (++pWrkrData->nSent >= pData->rebindInterval)) {
+ doRebind(pWrkrData);
+ }
finalize_it:
+ if(pData->bHadAuthFail)
+ iRet = RS_RET_DISABLE_ACTION;
+ if(iRet == RS_RET_OK) {
+ /* we mimic non-commit, as otherwise our endTransaction handler
+ * will not get called. While this is not 100% correct, the worst
+ * that can happen is some message duplication, something that
+ * rsyslog generally accepts and prefers over message loss.
+ */
+ iRet = RS_RET_PREVIOUS_COMMITTED;
+ }
ENDdoAction
+BEGINendTransaction
+CODESTARTendTransaction
+ dbgprintf("omrelp: endTransaction\n");
+ relpCltHintBurstEnd(pWrkrData->pRelpClt);
+ENDendTransaction
+
BEGINparseSelectorAct
uchar *q;
int i;
@@ -329,7 +544,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
++p;
}
- /* TODO: make this if go away! */
if(*p == ';') {
*p = '\0'; /* trick to obtain hostname (later)! */
CHKmalloc(pData->target = ustrdup(q));
@@ -341,8 +555,6 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
/* process template */
CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_NO_RQD_TPL_OPTS, (uchar*) "RSYSLOG_ForwardFormat"));
- CHKiRet(doCreateRelpClient(pData));
-
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
@@ -360,8 +572,10 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_TXIF_OMOD_QUERIES
CODEqueryEtryPt_SetShutdownImmdtPtr
ENDqueryEtryPt
@@ -374,12 +588,12 @@ CODEmodInit_QueryRegCFSLineHdlr
/* create our relp engine */
CHKiRet(relpEngineConstruct(&pRelpEngine));
CHKiRet(relpEngineSetDbgprint(pRelpEngine, dbgprintf));
+ CHKiRet(relpEngineSetOnAuthErr(pRelpEngine, onAuthErr));
+ CHKiRet(relpEngineSetOnGenericErr(pRelpEngine, onGenericErr));
+ CHKiRet(relpEngineSetOnErr(pRelpEngine, onErr));
CHKiRet(relpEngineSetEnableCmd(pRelpEngine, (uchar*) "syslog", eRelpCmdState_Required));
/* tell which objects we need */
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(glbl, CORE_COMPONENT));
ENDmodInit
-
-/* vim:set ai:
- */