summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2013-10-29 18:16:57 +0100
committerRainer Gerhards <rgerhards@adiscon.com>2013-10-29 18:16:57 +0100
commit4ac27e73af1086147135abc8c17e7627bfce8fb4 (patch)
tree6736d32f60a4d72dc1e2316f3d1025047a983f8e
parent5878ee24c7232d0f4f8655e7af6a1d456c07492f (diff)
downloadrsyslog-4ac27e73af1086147135abc8c17e7627bfce8fb4.tar.gz
rsyslog-4ac27e73af1086147135abc8c17e7627bfce8fb4.tar.bz2
rsyslog-4ac27e73af1086147135abc8c17e7627bfce8fb4.zip
call freeWrkrInstance() on worker thread termination
-rw-r--r--action.c10
-rw-r--r--runtime/wti.c16
-rw-r--r--runtime/wti.h7
3 files changed, 25 insertions, 8 deletions
diff --git a/action.c b/action.c
index ecdfa220..1e2f95d6 100644
--- a/action.c
+++ b/action.c
@@ -913,16 +913,18 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti)
DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n",
getActStateName(pThis), pThis->iActionNbr);
- if(pWti->actWrkrData[pThis->iActionNbr] == NULL) {
+ if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) {
dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr);
DBGPRINTF("we need to create a new action worker instance for "
"action %d\n", pThis->iActionNbr);
- CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrData[pThis->iActionNbr]), pThis->pModData));
+ CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData),
+ pThis->pModData));
+ pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis;
}
pThis->bHadAutoCommit = 0;
iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags,
- pWti->actWrkrData[pThis->iActionNbr]);
+ pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
@@ -998,7 +1000,7 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti)
CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate));
if(pThis->eState == ACT_STATE_ITX) {
- iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrData[pThis->iActionNbr]);
+ iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData);
switch(iRet) {
case RS_RET_OK:
actionCommitted(pThis);
diff --git a/runtime/wti.c b/runtime/wti.c
index 545d5179..df77bc19 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -172,7 +172,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(wti)
/* actual destruction */
batchFree(&pThis->batch);
- free(pThis->actWrkrData);
+ free(pThis->actWrkrInfo);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
free(pThis->pszDbgHdr);
@@ -203,8 +203,8 @@ wtiConstructFinalize(wti_t *pThis)
/* initialize our thread instance descriptor (no concurrency here) */
pThis->bIsRunning = RSFALSE;
- /* must use calloc as we need zeto-init */
- CHKmalloc(pThis->actWrkrData = calloc(iActionNbr, sizeof(void*)));
+ /* must use calloc as we need zero-init */
+ CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t)));
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
@@ -283,6 +283,7 @@ wtiWorker(wti_t *pThis)
rsRetVal localRet;
rsRetVal terminateRet;
int iCancelStateSave;
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -339,6 +340,15 @@ dbgprintf("DDDD: wti %p: worker starting\n", pThis);
bInactivityTOOccured = 0; /* reset for next run */
}
+ DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis);
+ for(i = 0 ; i < iActionNbr ; ++i) {
+ dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData);
+ if(pThis->actWrkrInfo[i].actWrkrData != NULL) {
+ dbgprintf("DDDD: calling freeWrkrData!\n");
+ pThis->actWrkrInfo[i].pAction->pMod->mod.om.freeWrkrInstance(pThis->actWrkrInfo[i].actWrkrData);
+ }
+ }
+
/* indicate termination */
pthread_cleanup_pop(0); /* remove cleanup handler */
pthread_setcancelstate(iCancelStateSave, NULL);
diff --git a/runtime/wti.h b/runtime/wti.h
index 297fb999..bb4f56bc 100644
--- a/runtime/wti.h
+++ b/runtime/wti.h
@@ -28,6 +28,11 @@
#include "batch.h"
+typedef struct actWrkrInfo {
+ action_t *pAction;
+ void *actWrkrData;
+} actWrkrInfo_t;
+
/* the worker thread instance class */
struct wti_s {
BEGINobjInstance;
@@ -37,7 +42,7 @@ struct wti_s {
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
uchar *pszDbgHdr; /* header string for debug messages */
- void **actWrkrData; /* *array* of action wrkr data pointers (sized for max nbr of actions in config!) */
+ actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */
DEF_ATOMIC_HELPER_MUT(mutIsRunning);
};