diff options
author | Rainer Gerhards <rgerhards@adiscon.com> | 2013-10-29 18:16:57 +0100 |
---|---|---|
committer | Rainer Gerhards <rgerhards@adiscon.com> | 2013-10-29 18:16:57 +0100 |
commit | 4ac27e73af1086147135abc8c17e7627bfce8fb4 (patch) | |
tree | 6736d32f60a4d72dc1e2316f3d1025047a983f8e | |
parent | 5878ee24c7232d0f4f8655e7af6a1d456c07492f (diff) | |
download | rsyslog-4ac27e73af1086147135abc8c17e7627bfce8fb4.tar.gz rsyslog-4ac27e73af1086147135abc8c17e7627bfce8fb4.tar.bz2 rsyslog-4ac27e73af1086147135abc8c17e7627bfce8fb4.zip |
call freeWrkrInstance() on worker thread termination
-rw-r--r-- | action.c | 10 | ||||
-rw-r--r-- | runtime/wti.c | 16 | ||||
-rw-r--r-- | runtime/wti.h | 7 |
3 files changed, 25 insertions, 8 deletions
@@ -913,16 +913,18 @@ actionCallDoAction(action_t *pThis, msg_t *pMsg, void *actParams, wti_t *pWti) DBGPRINTF("entering actionCalldoAction(), state: %s, actionNbr %d\n", getActStateName(pThis), pThis->iActionNbr); - if(pWti->actWrkrData[pThis->iActionNbr] == NULL) { + if(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData == NULL) { dbgprintf("DDDD: wti %p create new worker instance for action %d\n", pWti, pThis->iActionNbr); DBGPRINTF("we need to create a new action worker instance for " "action %d\n", pThis->iActionNbr); - CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrData[pThis->iActionNbr]), pThis->pModData)); + CHKiRet(pThis->pMod->mod.om.createWrkrInstance(&(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData), + pThis->pModData)); + pWti->actWrkrInfo[pThis->iActionNbr].pAction = pThis; } pThis->bHadAutoCommit = 0; iRet = pThis->pMod->mod.om.doAction(actParams, pMsg->msgFlags, - pWti->actWrkrData[pThis->iActionNbr]); + pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionCommitted(pThis); @@ -998,7 +1000,7 @@ finishBatch(action_t *pThis, batch_t *pBatch, wti_t *pWti) CHKiRet(actionPrepare(pThis, pBatch->pbShutdownImmediate)); if(pThis->eState == ACT_STATE_ITX) { - iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrData[pThis->iActionNbr]); + iRet = pThis->pMod->mod.om.endTransaction(pWti->actWrkrInfo[pThis->iActionNbr].actWrkrData); switch(iRet) { case RS_RET_OK: actionCommitted(pThis); diff --git a/runtime/wti.c b/runtime/wti.c index 545d5179..df77bc19 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -172,7 +172,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(wti) /* actual destruction */ batchFree(&pThis->batch); - free(pThis->actWrkrData); + free(pThis->actWrkrInfo); DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); free(pThis->pszDbgHdr); @@ -203,8 +203,8 @@ wtiConstructFinalize(wti_t *pThis) /* initialize our thread instance descriptor (no concurrency here) */ pThis->bIsRunning = RSFALSE; - /* must use calloc as we need zeto-init */ - CHKmalloc(pThis->actWrkrData = calloc(iActionNbr, sizeof(void*))); + /* must use calloc as we need zero-init */ + CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t))); /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); @@ -283,6 +283,7 @@ wtiWorker(wti_t *pThis) rsRetVal localRet; rsRetVal terminateRet; int iCancelStateSave; + int i; DEFiRet; ISOBJ_TYPE_assert(pThis, wti); @@ -339,6 +340,15 @@ dbgprintf("DDDD: wti %p: worker starting\n", pThis); bInactivityTOOccured = 0; /* reset for next run */ } + DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis); + for(i = 0 ; i < iActionNbr ; ++i) { + dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData); + if(pThis->actWrkrInfo[i].actWrkrData != NULL) { + dbgprintf("DDDD: calling freeWrkrData!\n"); + pThis->actWrkrInfo[i].pAction->pMod->mod.om.freeWrkrInstance(pThis->actWrkrInfo[i].actWrkrData); + } + } + /* indicate termination */ pthread_cleanup_pop(0); /* remove cleanup handler */ pthread_setcancelstate(iCancelStateSave, NULL); diff --git a/runtime/wti.h b/runtime/wti.h index 297fb999..bb4f56bc 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -28,6 +28,11 @@ #include "batch.h" +typedef struct actWrkrInfo { + action_t *pAction; + void *actWrkrData; +} actWrkrInfo_t; + /* the worker thread instance class */ struct wti_s { BEGINobjInstance; @@ -37,7 +42,7 @@ struct wti_s { wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ - void **actWrkrData; /* *array* of action wrkr data pointers (sized for max nbr of actions in config!) */ + actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */ DEF_ATOMIC_HELPER_MUT(mutIsRunning); }; |