summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c96
1 files changed, 34 insertions, 62 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 802a9340..1ee34335 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -777,9 +777,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- /* create a duplicate for the read "pointer".
- */
-
+ /* create a duplicate for the read "pointer". */
CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq));
@@ -798,7 +796,7 @@ finalize_it:
strm.Destruct(&psQIF);
if(iRet != RS_RET_OK) {
- DBGOPRINT((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n",
+ DBGOPRINT((obj_t*) pThis, "state %d reading .qi file - can not read persisted info (if any)\n",
iRet);
}
@@ -927,50 +925,6 @@ static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg)
}
-/* the following function is a dummy to be used for qDelDisk, which (currently) must
- * provide constructors & others even though it does not really need the object. So
- * instead of using the real ones, we use the dummy here. That at least saves some time.
- * Of course, in the longer term the whole process should be refactored.
- * rgerhards, 2012-11-03
- */
-static rsRetVal
-qDelDiskCallbackDummy(void)
-{
- return RS_RET_OK;
-}
-static rsRetVal qDelDisk(qqueue_t *pThis)
-{
- msg_t *pDummyObj; /* another dummy, nothing is created */
- DEFiRet;
-
- int64 offsIn;
- int64 offsOut;
-
- CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn));
- CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", 3, pThis->tVars.disk.pReadDel,
- NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, objDeserializeDummy));
- CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut));
-
- /* This time it is a bit tricky: we free disk space only upon file deletion. So we need
- * to keep track of what we have read until we get an out-offset that is lower than the
- * in-offset (which indicates file change). Then, we can subtract the whole thing from
- * the on-disk size. -- rgerhards, 2008-01-30
- */
- if(offsIn < offsOut) {
- pThis->tVars.disk.bytesRead += offsOut - offsIn;
- } else {
- pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead;
- pThis->tVars.disk.bytesRead = offsOut;
- DBGOPRINT((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
- /* awake possibly waiting enq process */
- pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
- }
-
-finalize_it:
- RETiRet;
-}
-
-
/* -------------------- direct (no queueing) -------------------- */
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -1188,11 +1142,11 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
rsRetVal iRetLocal;
DEFiRet;
-RUNLOG_STR("trying to shutdown workers within Action Timeout");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
+ DBGOPRINT((obj_t*) pThis, "trying to shutdown workers within Action Timeout");
DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n");
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
@@ -1466,13 +1420,32 @@ static inline rsRetVal
DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
{
int i;
+ off64_t bytesDel;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
/* now send delete request to storage driver */
- for(i = 0 ; i < nElem ; ++i) {
- pThis->qDel(pThis);
+ if(pThis->qType == QUEUETYPE_DISK) {
+ strmMultiFileSeek(pThis->tVars.disk.pReadDel, pThis->tVars.disk.deqFileNumOut,
+ pThis->tVars.disk.deqOffs, &bytesDel);
+ /* We need to correct the on-disk file size. This time it is a bit tricky:
+ * we free disk space only upon file deletion. So we need to keep track of what we
+ * have read until we get an out-offset that is lower than the in-offset (which
+ * indicates file change). Then, we can subtract the whole thing from the on-disk
+ * size. -- rgerhards, 2008-01-30
+ */
+ if(bytesDel != 0) {
+ pThis->tVars.disk.sizeOnDisk -= bytesDel;
+ DBGOPRINT((obj_t*) pThis, "a %lld octet file has been deleted, now %lld octets disk "
+ "space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk);
+ /* awake possibly waiting enq process */
+ pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
+ }
+ } else { /* memory queue */
+ for(i = 0 ; i < nElem ; ++i) {
+ pThis->qDel(pThis);
+ }
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
@@ -1591,6 +1564,9 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = nDiscarded = 0;
+ if(pThis->qType == QUEUETYPE_DISK) {
+ pThis->tVars.disk.deqFileNumIn = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
+ }
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
CHKiRet(qqueueDeq(pThis, &pMsg));
@@ -1609,6 +1585,11 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
++nDequeued;
}
+ if(pThis->qType == QUEUETYPE_DISK) {
+ strm.GetCurrOffset(pThis->tVars.disk.pReadDeq, &pThis->tVars.disk.deqOffs);
+ pThis->tVars.disk.deqFileNumOut = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
+ }
+
/* it is sufficient to persist only when the bulk of work is done */
qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
@@ -1616,7 +1597,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
pWti->batch.nElemDeq = nDequeued + nDiscarded;
pWti->batch.deqID = getNextDeqID(pThis);
*piRemainingQueueSize = iQueueSize;
-
finalize_it:
RETiRet;
}
@@ -1652,7 +1632,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
- // TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
@@ -1904,10 +1883,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
- /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
- * the message. So far, we simply assume we always have msg_t, what currently is always the case.
- * rgerhards, 2009-05-28
- */
CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY,
MsgAddRef(pWti->batch.pElem[i].pMsg)));
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
@@ -2022,7 +1997,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qDestruct = qDestructDisk;
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
- pThis->qDel = qDelDisk;
+ pThis->qDel = NULL; /* delete for disk handled via special code! */
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
@@ -2158,7 +2133,7 @@ finalize_it:
}
-/* persist the queue to disk. If we have something to persist, we first
+/* persist the queue to disk (write the .qi file). If we have something to persist, we first
* save the information on the queue properties itself and then we call
* the queue-type specific drivers.
* Variable bIsCheckpoint is set to 1 if the persist is for a checkpoint,
@@ -2213,7 +2188,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
- objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
CHKiRet(obj.EndSerialize(psQIF));
/* now persist the stream info */
@@ -2793,8 +2767,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
pThis->iQueueSize = pProp->val.num;
} else if(isProp("tVars.disk.sizeOnDisk")) {
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
- } else if(isProp("tVars.disk.bytesRead")) {
- pThis->tVars.disk.bytesRead = pProp->val.num;
} else if(isProp("qType")) {
if(pThis->qType != pProp->val.num)
ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH);