diff options
-rw-r--r-- | ChangeLog | 1 | ||||
-rw-r--r-- | action.c | 12 | ||||
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | doc/omrelp.html | 4 | ||||
-rw-r--r-- | plugins/omrelp/omrelp.c | 18 | ||||
-rw-r--r-- | runtime/module-template.h | 22 | ||||
-rw-r--r-- | runtime/modules.c | 4 | ||||
-rw-r--r-- | runtime/modules.h | 1 | ||||
-rw-r--r-- | runtime/queue.c | 7 | ||||
-rw-r--r-- | tools/syslogd.c | 1 |
10 files changed, 63 insertions, 9 deletions
@@ -2,6 +2,7 @@ Version 7.3.10 [devel] 2013-04-?? - added RainerScript re_extract() function - omrelp: added support for RainerScript-based configuration +- omrelp: added ability to specify session timeout - bugfix: failover/action suspend did not work correctly This was experienced if the retry action took more than one second to complete. For suspending, a cached timestamp was used, and if the @@ -961,6 +961,8 @@ actionProcessMessage(action_t *pThis, msg_t *pMsg, void *actParams, int *pbShutd ISOBJ_TYPE_assert(pMsg, msg); CHKiRet(actionPrepare(pThis, pbShutdownImmediate)); + if(pThis->pMod->mod.om.SetShutdownImmdtPtr != NULL) + pThis->pMod->mod.om.SetShutdownImmdtPtr(pThis->pModData, pbShutdownImmediate); if(pThis->eState == ACT_STATE_ITX) CHKiRet(actionCallDoAction(pThis, pMsg, actParams)); @@ -1272,8 +1274,11 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) assert(pBatch != NULL); - pbShutdownImmdtSave = pBatch->pbShutdownImmediate; - pBatch->pbShutdownImmediate = pbShutdownImmediate; + if(pbShutdownImmediate != NULL) { + pbShutdownImmdtSave = pBatch->pbShutdownImmediate; + pBatch->pbShutdownImmediate = pbShutdownImmediate; +dbgprintf("DDDD: processBatchMain ShutdownImmediate is %p, was %p\n", pBatch->pbShutdownImmediate, pbShutdownImmdtSave); + } CHKiRet(prepareBatch(pAction, pBatch, &activeSave, &bMustRestoreActivePtr)); /* We now must guard the output module against execution by multiple threads. The @@ -1304,7 +1309,8 @@ processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) } finalize_it: - pBatch->pbShutdownImmediate = pbShutdownImmdtSave; + if(pbShutdownImmediate != NULL) + pBatch->pbShutdownImmediate = pbShutdownImmdtSave; RETiRet; } #pragma GCC diagnostic warning "-Wempty-body" diff --git a/configure.ac b/configure.ac index 0c877eea..1344f076 100644 --- a/configure.ac +++ b/configure.ac @@ -928,7 +928,7 @@ AC_ARG_ENABLE(relp, [enable_relp=no] ) if test "x$enable_relp" = "xyes"; then - PKG_CHECK_MODULES(RELP, relp >= 1.0.2) + PKG_CHECK_MODULES(RELP, relp >= 1.0.3) fi AM_CONDITIONAL(ENABLE_RELP, test x$enable_relp = xyes) diff --git a/doc/omrelp.html b/doc/omrelp.html index 4b28bdda..8858f884 100644 --- a/doc/omrelp.html +++ b/doc/omrelp.html @@ -30,6 +30,10 @@ must be used. <li><b>template </b>(not mandatory, default "RSYSLOG_ForwardFormat")<br> Defines the template to be used for the output. </li> + <li><b>timeout </b>(not mandatory, default 90)<br> + Timeout for relp sessions. If set too low, valid sessions + may be considered dead and tried to recover. + </li> </ul> <p><b>Sample:</b></p> <p>The following sample sends all messages to the central server diff --git a/plugins/omrelp/omrelp.c b/plugins/omrelp/omrelp.c index 0c296673..c9e32444 100644 --- a/plugins/omrelp/omrelp.c +++ b/plugins/omrelp/omrelp.c @@ -63,7 +63,8 @@ typedef struct _instanceData { uchar *port; int bInitialConnect; /* is this the initial connection request of our module? (0-no, 1-yes) */ int bIsConnected; /* currently connected to server? 0 - no, 1 - yes */ - relpClt_t *pRelpClt; /* relp client for this instance */ + unsigned timeout; + relpClt_t *pRelpClt; /* relp client for this instance */ uchar *tplName; } instanceData; @@ -76,8 +77,9 @@ static configSettings_t __attribute__((unused)) cs; /* tables for interfacing with the v6 config system */ /* action (instance) parameters */ static struct cnfparamdescr actpdescr[] = { - { "target", eCmdHdlrGetWord, 0 }, + { "target", eCmdHdlrGetWord, 1 }, { "port", eCmdHdlrGetWord, 0 }, + { "timeout", eCmdHdlrInt, 0 }, { "template", eCmdHdlrGetWord, 1 } }; static struct cnfparamblk actpblk = @@ -109,6 +111,8 @@ doCreateRelpClient(instanceData *pData) DEFiRet; if(relpEngineCltConstruct(pRelpEngine, &pData->pRelpClt) != RELP_RET_OK) ABORT_FINALIZE(RS_RET_RELP_ERR); + if(relpCltSetTimeout(pData->pRelpClt, pData->timeout) != RELP_RET_OK) + ABORT_FINALIZE(RS_RET_RELP_ERR); finalize_it: RETiRet; } @@ -134,6 +138,7 @@ setInstParamDefaults(instanceData *pData) pData->target = NULL; pData->port = NULL; pData->tplName = NULL; + pData->timeout = 90; } @@ -157,6 +162,8 @@ CODESTARTnewActInst pData->port = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(actpblk.descr[i].name, "timeout")) { + pData->timeout = (unsigned) pvals[i].val.d.n; } else { dbgprintf("omrelp: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); @@ -181,6 +188,12 @@ CODESTARTisCompatibleWithFeature iRet = RS_RET_OK; ENDisCompatibleWithFeature +BEGINSetShutdownImmdtPtr +CODESTARTSetShutdownImmdtPtr + relpEngineSetShutdownImmdtPtr(pRelpEngine, pPtr); + DBGPRINTF("omrelp: shutdownImmediate ptr now is %p\n", pPtr); +ENDSetShutdownImmdtPtr + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo @@ -404,6 +417,7 @@ CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES +CODEqueryEtryPt_SetShutdownImmdtPtr ENDqueryEtryPt diff --git a/runtime/module-template.h b/runtime/module-template.h index fe74bac9..8a958f90 100644 --- a/runtime/module-template.h +++ b/runtime/module-template.h @@ -938,6 +938,28 @@ static rsRetVal doHUP(instanceData __attribute__((unused)) *pData)\ } +/* SetShutdownImmdtPtr() + * This function is optional. If defined by an output plugin, it is called + * each time the action is invoked to set the "ShutdownImmediate" pointer, + * which is used during termination to indicate the action should shutdown + * as quickly as possible. + */ +#define CODEqueryEtryPt_SetShutdownImmdtPtr \ + else if(!strcmp((char*) name, "SetShutdownImmdtPtr")) {\ + *pEtryPoint = SetShutdownImmdtPtr;\ + } +#define BEGINSetShutdownImmdtPtr \ +static rsRetVal SetShutdownImmdtPtr(instanceData __attribute__((unused)) *pData, int *pPtr)\ +{\ + DEFiRet; + +#define CODESTARTSetShutdownImmdtPtr + +#define ENDSetShutdownImmdtPtr \ + RETiRet;\ +} + + /* parse() - main entry point of parser modules */ #define BEGINparse \ diff --git a/runtime/modules.c b/runtime/modules.c index 9f7ff31c..e9d8d959 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -657,6 +657,10 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_ if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) ABORT_FINALIZE(localRet); + localRet = (*pNew->modQueryEtryPt)((uchar*)"SetShutdownImmdtPtr", &pNew->mod.om.SetShutdownImmdtPtr); + if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) + ABORT_FINALIZE(localRet); + localRet = (*pNew->modQueryEtryPt)((uchar*)"beginTransaction", &pNew->mod.om.beginTransaction); if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) pNew->mod.om.beginTransaction = dummyBeginTransaction; diff --git a/runtime/modules.h b/runtime/modules.h index e42d19e1..64644be2 100644 --- a/runtime/modules.h +++ b/runtime/modules.h @@ -142,6 +142,7 @@ struct modInfo_s { rsRetVal (*endTransaction)(void*); rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**); rsRetVal (*newActInst)(uchar *modName, struct nvlst *lst, void **, omodStringRequest_t **); + rsRetVal (*SetShutdownImmdtPtr)(void *pData, void *pPtr); } om; struct { /* data for library modules */ char dummy; diff --git a/runtime/queue.c b/runtime/queue.c index 4c8d3ac5..74090a4d 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -991,7 +991,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pAction, pBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL); RETiRet; } @@ -1150,6 +1150,7 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) DBGOPRINT((obj_t*) pThis, "trying to shutdown workers within Action Timeout"); DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n"); pThis->bEnqOnly = 1; +dbgprintf("DDDD: setting shutdownImmediate mode, ptr %p!\n", &pThis->bShutdownImmediate); pThis->bShutdownImmediate = 1; /* now DA queue */ if(pThis->bIsDA) { @@ -1291,8 +1292,6 @@ finalize_it: RETiRet; } - - /* Constructor for the queue object * This constructs the data structure, but does not yet start the queue. That * is done by queueStart(). The reason is that we want to give the caller a chance @@ -1856,6 +1855,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* at this spot, we may be cancelled */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); + +dbgprintf("DDDD: calling consumer with shutdownImmeditate ptr %p\n", &pThis->bShutdownImmediate); CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit diff --git a/tools/syslogd.c b/tools/syslogd.c index e291ba47..03fb2bd9 100644 --- a/tools/syslogd.c +++ b/tools/syslogd.c @@ -567,6 +567,7 @@ msgConsumer(void __attribute__((unused)) *notNeeded, batch_t *pBatch, int *pbShu assert(pBatch != NULL); pBatch->pbShutdownImmediate = pbShutdownImmediate; /* TODO: move this to batch creation! */ preprocessBatch(pBatch); +dbgprintf("DDDD: batches ShutdownImmediate is %p\n", pBatch->pbShutdownImmediate); ruleset.ProcessBatch(pBatch); //TODO: the BATCH_STATE_COMM must be set somewhere down the road, but we //do not have this yet and so we emulate -- 2010-06-10 |