summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog1
-rw-r--r--action.c12
-rw-r--r--configure.ac2
-rw-r--r--doc/omrelp.html4
-rw-r--r--plugins/omrelp/omrelp.c18
-rw-r--r--runtime/module-template.h22
-rw-r--r--runtime/modules.c4
-rw-r--r--runtime/modules.h1
-rw-r--r--runtime/queue.c7
-rw-r--r--tools/syslogd.c1
10 files changed, 63 insertions, 9 deletions
diff --git a/ChangeLog b/ChangeLog
index a32187a2..3c532059 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,6 +2,7 @@
Version 7.3.10 [devel] 2013-04-??
- added RainerScript re_extract() function
- omrelp: added support for RainerScript-based configuration
+- omrelp: added ability to specify session timeout
- bugfix: failover/action suspend did not work correctly
This was experienced if the retry action took more than one second
to complete. For suspending, a cached timestamp was used, and if the
diff --git a/action.c b/action.c
index 7731c988..6b5a718c 100644
--- a/action.c
+++ b/action.c
@@ -961,6 +961,8 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd
ISOBJ_TYPE_assert(pMsg, msg);
CHKiRet(actionPrepare(pThis, pbShutdownImmediate));
+ if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL)
+ pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate);
if(pThis->eState == ACT_STATE_ITX)
CHKiRet(actionCallDoAction(pThis, pMsg, actParams));
@@ -1272,8 +1274,11 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
assert(pBatch != NULL);
- pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
- pBatch->pbShutdownImmediate = pbShutdownImmediate;
+ if(pbShutdownImmediate != NULL) {
+ pbShutdownImmdtSave = pBatch->pbShutdownImmediate;
+ pBatch->pbShutdownImmediate = pbShutdownImmediate;
+dbgprintf("DDDD: processBatchMain ShutdownImmediate is %p, was %p\n", pBatch->pbShutdownImmediate, pbShutdownImmdtSave);
+ }
CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr));
/* We now must guard the output module against execution by multiple threads. The
@@ -1304,7 +1309,8 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate)
}
finalize_it:
- pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
+ if(pbShutdownImmediate != NULL)
+ pBatch->pbShutdownImmediate = pbShutdownImmdtSave;
RETiRet;
}
#pragma GCC diagnostic warning "-Wempty-body"
diff --git a/configure.ac b/configure.ac
index 0c877eea..1344f076 100644
--- a/configure.ac
+++ b/configure.ac
@@ -928,7 +928,7 @@ AC_ARG_ENABLE(relp,
[enable_relp=no]
)
if test "x$enable_relp" = "xyes"; then
- PKG_CHECK_MODULES(RELP, relp >= 1.0.2)
+ PKG_CHECK_MODULES(RELP, relp >= 1.0.3)
fi
AM_CONDITIONAL(ENABLE_RELP, test x$enable_relp = xyes)
diff --git a/doc/omrelp.html b/doc/omrelp.html
index 4b28bdda..8858f884 100644
--- a/doc/omrelp.html
+++ b/doc/omrelp.html
@@ -30,6 +30,10 @@ must be used.
<li><b>template </b>(not mandatory, default "RSYSLOG_ForwardFormat")<br>
Defines the template to be used for the output.
</li>
+ <li><b>timeout </b>(not mandatory, default 90)<br>
+ Timeout for relp sessions. If set too low, valid sessions
+ may be considered dead and tried to recover.
+ </li>
</ul>
<p><b>Sample:</b></p>
<p>The following sample sends all messages to the central server
diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c
index 0c296673..c9e32444 100644
--- a/plugins/omrelp/omrelp.c
+++ b/plugins/omrelp/omrelp.c
@@ -63,7 +63,8 @@ typedef struct _instanceData {
uchar *port;
int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */
int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */
- relpClt_t *pRelpClt; /* relp client for this instance */
+ unsigned timeout;
+ relpClt_t *pRelpClt; /* relp client for this instance */
uchar *tplName;
} instanceData;
@@ -76,8 +77,9 @@ static configSettings_t __attribute__((unused)) cs;
/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
- { "target", eCmdHdlrGetWord, 0 },
+ { "target", eCmdHdlrGetWord, 1 },
{ "port", eCmdHdlrGetWord, 0 },
+ { "timeout", eCmdHdlrInt, 0 },
{ "template", eCmdHdlrGetWord, 1 }
};
static struct cnfparamblk actpblk =
@@ -109,6 +111,8 @@ doCreateRelpClient(instanceData *pData)
DEFiRet;
if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK)
ABORT_FINALIZE(RS_RET_RELP_ERR);
+ if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK)
+ ABORT_FINALIZE(RS_RET_RELP_ERR);
finalize_it:
RETiRet;
}
@@ -134,6 +138,7 @@ setInstParamDefaults(instanceData *pData)
pData->target = NULL;
pData->port = NULL;
pData->tplName = NULL;
+ pData->timeout = 90;
}
@@ -157,6 +162,8 @@ CODESTARTnewActInst
pData->port = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "template")) {
pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(actpblk.descr[i].name, "timeout")) {
+ pData->timeout = (unsigned) pvals[i].val.d.n;
} else {
dbgprintf("omrelp: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
@@ -181,6 +188,12 @@ CODESTARTisCompatibleWithFeature
iRet = RS_RET_OK;
ENDisCompatibleWithFeature
+BEGINSetShutdownImmdtPtr
+CODESTARTSetShutdownImmdtPtr
+ relpEngineSetShutdownImmdtPtr(pRelpEngine, pPtr);
+ DBGPRINTF("omrelp: shutdownImmediate ptr now is %p\n", pPtr);
+ENDSetShutdownImmdtPtr
+
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
@@ -404,6 +417,7 @@ CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+CODEqueryEtryPt_SetShutdownImmdtPtr
ENDqueryEtryPt
diff --git a/runtime/module-template.h b/runtime/module-template.h
index fe74bac9..8a958f90 100644
--- a/runtime/module-template.h
+++ b/runtime/module-template.h
@@ -938,6 +938,28 @@ static rsRetVal doHUP(instanceData __attribute__((unused)) *pData)\
}
+/* SetShutdownImmdtPtr()
+ * This function is optional. If defined by an output plugin, it is called
+ * each time the action is invoked to set the "ShutdownImmediate" pointer,
+ * which is used during termination to indicate the action should shutdown
+ * as quickly as possible.
+ */
+#define CODEqueryEtryPt_SetShutdownImmdtPtr \
+ else if(!strcmp((char*) name, "SetShutdownImmdtPtr")) {\
+ *pEtryPoint = SetShutdownImmdtPtr;\
+ }
+#define BEGINSetShutdownImmdtPtr \
+static rsRetVal SetShutdownImmdtPtr(instanceData __attribute__((unused)) *pData, int *pPtr)\
+{\
+ DEFiRet;
+
+#define CODESTARTSetShutdownImmdtPtr
+
+#define ENDSetShutdownImmdtPtr \
+ RETiRet;\
+}
+
+
/* parse() - main entry point of parser modules
*/
#define BEGINparse \
diff --git a/runtime/modules.c b/runtime/modules.c
index 9f7ff31c..e9d8d959 100644
--- a/runtime/modules.c
+++ b/runtime/modules.c
@@ -657,6 +657,10 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
ABORT_FINALIZE(localRet);
+ localRet = (*pNew->modQueryEtryPt)((uchar*)"SetShutdownImmdtPtr", &pNew->mod.om.SetShutdownImmdtPtr);
+ if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
+ ABORT_FINALIZE(localRet);
+
localRet = (*pNew->modQueryEtryPt)((uchar*)"beginTransaction", &pNew->mod.om.beginTransaction);
if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND)
pNew->mod.om.beginTransaction = dummyBeginTransaction;
diff --git a/runtime/modules.h b/runtime/modules.h
index e42d19e1..64644be2 100644
--- a/runtime/modules.h
+++ b/runtime/modules.h
@@ -142,6 +142,7 @@ struct modInfo_s {
rsRetVal (*endTransaction)(void*);
rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**);
rsRetVal (*newActInst)(uchar *modName, struct nvlst *lst, void **, omodStringRequest_t **);
+ rsRetVal (*SetShutdownImmdtPtr)(void *pData, void *pPtr);
} om;
struct { /* data for library modules */
char dummy;
diff --git a/runtime/queue.c b/runtime/queue.c
index 4c8d3ac5..74090a4d 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -991,7 +991,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
* We use our knowledge about the batch_t structure below, but without that, we
* pay a too-large performance toll... -- rgerhards, 2009-04-22
*/
- iRet = pThis->pConsumer(pThis->pAction, pBatch, &pThis->bShutdownImmediate);
+ iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL);
RETiRet;
}
@@ -1150,6 +1150,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
DBGOPRINT((obj_t*) pThis, "trying to shutdown workers within Action Timeout");
DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n");
pThis->bEnqOnly = 1;
+dbgprintf("DDDD: setting shutdownImmediate mode, ptr %p!\n", &pThis->bShutdownImmediate);
pThis->bShutdownImmediate = 1;
/* now DA queue */
if(pThis->bIsDA) {
@@ -1291,8 +1292,6 @@ finalize_it:
RETiRet;
}
-
-
/* Constructor for the queue object
* This constructs the data structure, but does not yet start the queue. That
* is done by queueStart(). The reason is that we want to give the caller a chance
@@ -1856,6 +1855,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
/* at this spot, we may be cancelled */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
+
+dbgprintf("DDDD: calling consumer with shutdownImmeditate ptr %p\n", &pThis->bShutdownImmediate);
CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate));
/* we now need to check if we should deliberately delay processing a bit
diff --git a/tools/syslogd.c b/tools/syslogd.c
index e291ba47..03fb2bd9 100644
--- a/tools/syslogd.c
+++ b/tools/syslogd.c
@@ -567,6 +567,7 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu
assert(pBatch != NULL);
pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */
preprocessBatch(pBatch);
+dbgprintf("DDDD: batches ShutdownImmediate is %p\n", pBatch->pbShutdownImmediate);
ruleset.ProcessBatch(pBatch);
//TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we
//do not have this yet and so we emulate -- 2010-06-10