summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c20
1 files changed, 19 insertions, 1 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index 77197a95..c2077a51 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -44,6 +44,7 @@
#include "wti.h"
#include "obj.h"
#include "glbl.h"
+#include "action.h"
#include "atomic.h"
/* static data */
@@ -171,6 +172,7 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE
CODESTARTobjDestruct(wti)
/* actual destruction */
batchFree(&pThis->batch);
+ free(pThis->actWrkrInfo);
pthread_cond_destroy(&pThis->pcondBusy);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
@@ -197,11 +199,15 @@ wtiConstructFinalize(wti_t *pThis)
ISOBJ_TYPE_assert(pThis, wti);
- DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
+ DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n",
+ wtiGetDbgHdr(pThis), iActionNbr);
/* initialize our thread instance descriptor (no concurrency here) */
pThis->bIsRunning = RSFALSE;
+ /* must use calloc as we need zero-init */
+ CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t)));
+
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
CHKiRet(batchInit(&pThis->batch, iDeqBatchSize));
@@ -279,6 +285,7 @@ wtiWorker(wti_t *pThis)
rsRetVal localRet;
rsRetVal terminateRet;
int iCancelStateSave;
+ int i;
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
@@ -288,6 +295,7 @@ wtiWorker(wti_t *pThis)
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
+dbgprintf("DDDD: wti %p: worker starting\n", pThis);
/* now we have our identity, on to real processing */
while(1) { /* loop will be broken below - need to do mutex locks */
@@ -334,9 +342,19 @@ wtiWorker(wti_t *pThis)
bInactivityTOOccured = 0; /* reset for next run */
}
+ DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis);
+ for(i = 0 ; i < iActionNbr ; ++i) {
+ dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData);
+ if(pThis->actWrkrInfo[i].actWrkrData != NULL) {
+ dbgprintf("DDDD: calling freeWrkrData!\n");
+ pThis->actWrkrInfo[i].pAction->pMod->mod.om.freeWrkrInstance(pThis->actWrkrInfo[i].actWrkrData);
+ }
+ }
+
/* indicate termination */
pthread_cleanup_pop(0); /* remove cleanup handler */
pthread_setcancelstate(iCancelStateSave, NULL);
+dbgprintf("DDDD: wti %p: worker exiting\n", pThis);
RETiRet;
}