diff options
Diffstat (limited to 'action.c')
-rw-r--r-- | action.c | 19 |
1 files changed, 11 insertions, 8 deletions
@@ -50,7 +50,7 @@ #define NO_TIME_PROVIDED 0 /* indicate we do not provide any cached time */ /* forward definitions */ -static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch); +static rsRetVal processBatchMain(action_t *pAction, batch_t *pBatch, int*); /* object static data (once for all instances) */ /* TODO: make this an object! DEFobjStaticHelpers -- rgerhards, 2008-03-05 */ @@ -291,7 +291,7 @@ actionConstructFinalize(action_t *pThis) * spec. -- rgerhards, 2008-01-30 */ CHKiRet(qqueueConstruct(&pThis->pQueue, ActionQueType, 1, iActionQueueSize, - (rsRetVal (*)(void*, batch_t*))processBatchMain)); + (rsRetVal (*)(void*, batch_t*, int*))processBatchMain)); obj.SetName((obj_t*) pThis->pQueue, pszQName); /* ... set some properties ... */ @@ -821,12 +821,12 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) DBGPRINTF("action call returned %d\n", localRet); if(localRet == RS_RET_OK) { /* mark messages as committed */ - while(iCommittedUpTo < i) { + while(iCommittedUpTo <= i) { pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } } else if(localRet == RS_RET_PREVIOUS_COMMITTED) { /* mark messages as committed */ - while(iCommittedUpTo < i - 1) { + while(iCommittedUpTo < i) { pBatch->pElem[iCommittedUpTo++].state = BATCH_STATE_COMM; } pBatch->pElem[i].state = BATCH_STATE_SUB; @@ -838,6 +838,7 @@ tryDoAction(action_t *pAction, batch_t *pBatch, int *pnElem) iRet = localRet; FINALIZE; } +dbgprintf("XXX: submitBatch set element %d state to %d\n", i, pBatch->pElem[i].state); } ++i; ++iElemProcessed; @@ -871,6 +872,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) bDone = 0; do { +dbgprintf("XXX: submitBatch in loop, batch size %d\n", nElem); localRet = tryDoAction(pAction, pBatch, &nElem); if( localRet == RS_RET_OK || localRet == RS_RET_PREVIOUS_COMMITTED @@ -917,7 +919,7 @@ submitBatch(action_t *pAction, batch_t *pBatch, int nElem) * rgerhards, 2009-05-12 */ static rsRetVal -processAction(action_t *pAction, batch_t *pBatch) +processAction(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { int i; msg_t *pMsg; @@ -934,7 +936,8 @@ processAction(action_t *pAction, batch_t *pBatch) CHKiRet(localRet); /* this must be moved away - up into the dequeue part of the queue, I guess, but that's for another day */ - for(i = 0 ; i < pBatch->nElem ; i++) { + //for(i = 0 ; i < pBatch->nElem && !*pbShutdownImmediate ; i++) { + for(i = 0 ; i < pBatch->nElem ; i++) { pMsg = (msg_t*) pBatch->pElem[i].pUsrp; } iRet = finishBatch(pAction); @@ -950,7 +953,7 @@ finalize_it: * rgerhards, 2009-04-22 */ static rsRetVal -processBatchMain(action_t *pAction, batch_t *pBatch) +processBatchMain(action_t *pAction, batch_t *pBatch, int *pbShutdownImmediate) { DEFiRet; @@ -964,7 +967,7 @@ processBatchMain(action_t *pAction, batch_t *pBatch) d_pthread_mutex_lock(&pAction->mutActExec); pthread_cleanup_push(mutexCancelCleanup, &pAction->mutActExec); - iRet = processAction(pAction, pBatch); + iRet = processAction(pAction, pBatch, pbShutdownImmediate); pthread_cleanup_pop(1); /* unlock mutex */ |