summaryrefslogtreecommitdiffstats
path: root/runtime/wti.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wti.c')
-rw-r--r--runtime/wti.c16
1 files changed, 11 insertions, 5 deletions
diff --git a/runtime/wti.c b/runtime/wti.c
index 544bffa7..7029dcfd 100644
--- a/runtime/wti.c
+++ b/runtime/wti.c
@@ -201,8 +201,8 @@ CODESTARTobjDestruct(wti)
pthread_cond_destroy(&pThis->condExitDone);
pthread_mutex_destroy(&pThis->mut);
- if(pThis->pszDbgHdr != NULL)
- free(pThis->pszDbgHdr);
+ free(pThis->batch.pElem);
+ free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
@@ -222,15 +222,20 @@ rsRetVal
wtiConstructFinalize(wti_t *pThis)
{
DEFiRet;
+ int iDeqBatchSize;
ISOBJ_TYPE_assert(pThis, wti);
dbgprintf("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis));
/* initialize our thread instance descriptor */
- pThis->pUsrp = NULL;
pThis->tCurrCmd = eWRKTHRD_STOPPED;
+ /* we now alloc the array for user pointers. We obtain the max from the queue itself. */
+ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
+ CHKmalloc(pThis->batch.pElem = calloc((size_t)iDeqBatchSize, sizeof(batch_obj_t)));
+
+finalize_it:
RETiRet;
}
@@ -314,7 +319,8 @@ wtiWorkerCancelCleanup(void *arg)
DBGPRINTF("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
/* call user supplied handler (that one e.g. requeues the element) */
- pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->pUsrp);
+// MULTIQUEUE: need to change here!
+ pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr, pThis->batch.pElem[0].pUsrp);
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
d_pthread_mutex_lock(&pWtp->mut);
@@ -366,7 +372,7 @@ wtiWorker(wti_t *pThis)
ISOBJ_TYPE_assert(pWtp, wtp);
dbgSetThrdName(pThis->pszDbgHdr);
- pThis->pUsrp = NULL;
+ pThis->batch.nElem = 0; /* flag no elements present */ // MULTIQUEUE: do we really need this any longer (cnacel handeler)?
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);