summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--runtime/obj.c3
-rw-r--r--runtime/queue.c96
-rw-r--r--runtime/queue.h7
-rw-r--r--runtime/stream.c61
-rw-r--r--runtime/stream.h5
5 files changed, 105 insertions, 67 deletions
diff --git a/runtime/obj.c b/runtime/obj.c
index 7f4800fd..03d25cdc 100644
--- a/runtime/obj.c
+++ b/runtime/obj.c
@@ -899,7 +899,8 @@ objDeserializeWithMethods(void *ppObj, uchar *pszTypeExpected, int lenTypeExpect
do {
iRetLocal = objDeserializeHeader((uchar*) "Obj", &pstrID, &oVers, pStrm);
if(iRetLocal != RS_RET_OK) {
- dbgprintf("objDeserialize error %d during header processing - trying to recover\n", iRetLocal);
+ dbgprintf("objDeserialize error %d during header processing - "
+ "trying to recover\n", iRetLocal);
CHKiRet(objDeserializeTryRecover(pStrm));
}
} while(iRetLocal != RS_RET_OK);
diff --git a/runtime/queue.c b/runtime/queue.c
index 802a9340..1ee34335 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -777,9 +777,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
- /* create a duplicate for the read "pointer".
- */
-
+ /* create a duplicate for the read "pointer". */
CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq));
@@ -798,7 +796,7 @@ finalize_it:
strm.Destruct(&psQIF);
if(iRet != RS_RET_OK) {
- DBGOPRINT((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n",
+ DBGOPRINT((obj_t*) pThis, "state %d reading .qi file - can not read persisted info (if any)\n",
iRet);
}
@@ -927,50 +925,6 @@ static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg)
}
-/* the following function is a dummy to be used for qDelDisk, which (currently) must
- * provide constructors & others even though it does not really need the object. So
- * instead of using the real ones, we use the dummy here. That at least saves some time.
- * Of course, in the longer term the whole process should be refactored.
- * rgerhards, 2012-11-03
- */
-static rsRetVal
-qDelDiskCallbackDummy(void)
-{
- return RS_RET_OK;
-}
-static rsRetVal qDelDisk(qqueue_t *pThis)
-{
- msg_t *pDummyObj; /* another dummy, nothing is created */
- DEFiRet;
-
- int64 offsIn;
- int64 offsOut;
-
- CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn));
- CHKiRet(objDeserializeWithMethods(&pDummyObj, (uchar*) "msg", 3, pThis->tVars.disk.pReadDel,
- NULL, NULL, qDelDiskCallbackDummy, qDelDiskCallbackDummy, objDeserializeDummy));
- CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut));
-
- /* This time it is a bit tricky: we free disk space only upon file deletion. So we need
- * to keep track of what we have read until we get an out-offset that is lower than the
- * in-offset (which indicates file change). Then, we can subtract the whole thing from
- * the on-disk size. -- rgerhards, 2008-01-30
- */
- if(offsIn < offsOut) {
- pThis->tVars.disk.bytesRead += offsOut - offsIn;
- } else {
- pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead;
- pThis->tVars.disk.bytesRead = offsOut;
- DBGOPRINT((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk);
- /* awake possibly waiting enq process */
- pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
- }
-
-finalize_it:
- RETiRet;
-}
-
-
/* -------------------- direct (no queueing) -------------------- */
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
{
@@ -1188,11 +1142,11 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis)
rsRetVal iRetLocal;
DEFiRet;
-RUNLOG_STR("trying to shutdown workers within Action Timeout");
ISOBJ_TYPE_assert(pThis, qqueue);
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
/* instruct workers to finish ASAP, even if still work exists */
+ DBGOPRINT((obj_t*) pThis, "trying to shutdown workers within Action Timeout");
DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n");
pThis->bEnqOnly = 1;
pThis->bShutdownImmediate = 1;
@@ -1466,13 +1420,32 @@ static inline rsRetVal
DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem)
{
int i;
+ off64_t bytesDel;
DEFiRet;
ISOBJ_TYPE_assert(pThis, qqueue);
/* now send delete request to storage driver */
- for(i = 0 ; i < nElem ; ++i) {
- pThis->qDel(pThis);
+ if(pThis->qType == QUEUETYPE_DISK) {
+ strmMultiFileSeek(pThis->tVars.disk.pReadDel, pThis->tVars.disk.deqFileNumOut,
+ pThis->tVars.disk.deqOffs, &bytesDel);
+ /* We need to correct the on-disk file size. This time it is a bit tricky:
+ * we free disk space only upon file deletion. So we need to keep track of what we
+ * have read until we get an out-offset that is lower than the in-offset (which
+ * indicates file change). Then, we can subtract the whole thing from the on-disk
+ * size. -- rgerhards, 2008-01-30
+ */
+ if(bytesDel != 0) {
+ pThis->tVars.disk.sizeOnDisk -= bytesDel;
+ DBGOPRINT((obj_t*) pThis, "a %lld octet file has been deleted, now %lld octets disk "
+ "space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk);
+ /* awake possibly waiting enq process */
+ pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */
+ }
+ } else { /* memory queue */
+ for(i = 0 ; i < nElem ; ++i) {
+ pThis->qDel(pThis);
+ }
}
/* iQueueSize is not decremented by qDel(), so we need to do it ourselves */
@@ -1591,6 +1564,9 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
DeleteProcessedBatch(pThis, &pWti->batch);
nDequeued = nDiscarded = 0;
+ if(pThis->qType == QUEUETYPE_DISK) {
+ pThis->tVars.disk.deqFileNumIn = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
+ }
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
CHKiRet(qqueueDeq(pThis, &pMsg));
@@ -1609,6 +1585,11 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
++nDequeued;
}
+ if(pThis->qType == QUEUETYPE_DISK) {
+ strm.GetCurrOffset(pThis->tVars.disk.pReadDeq, &pThis->tVars.disk.deqOffs);
+ pThis->tVars.disk.deqFileNumOut = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq);
+ }
+
/* it is sufficient to persist only when the bulk of work is done */
qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
@@ -1616,7 +1597,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
pWti->batch.nElemDeq = nDequeued + nDiscarded;
pWti->batch.deqID = getNextDeqID(pThis);
*piRemainingQueueSize = iQueueSize;
-
finalize_it:
RETiRet;
}
@@ -1652,7 +1632,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk);
}
- // TODO: MULTI: check physical queue size?
pthread_cond_signal(&pThis->notFull);
/* WE ARE NO LONGER PROTECTED BY THE MUTEX */
@@ -1904,10 +1883,6 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti)
/* iterate over returned results and enqueue them in DA queue */
for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) {
- /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
- * the message. So far, we simply assume we always have msg_t, what currently is always the case.
- * rgerhards, 2009-05-28
- */
CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY,
MsgAddRef(pWti->batch.pElem[i].pMsg)));
pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */
@@ -2022,7 +1997,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->qDestruct = qDestructDisk;
pThis->qAdd = qAddDisk;
pThis->qDeq = qDeqDisk;
- pThis->qDel = qDelDisk;
+ pThis->qDel = NULL; /* delete for disk handled via special code! */
pThis->MultiEnq = qqueueMultiEnqObjNonDirect;
/* special handling */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
@@ -2158,7 +2133,7 @@ finalize_it:
}
-/* persist the queue to disk. If we have something to persist, we first
+/* persist the queue to disk (write the .qi file). If we have something to persist, we first
* save the information on the queue properties itself and then we call
* the queue-type specific drivers.
* Variable bIsCheckpoint is set to 1 if the persist is for a checkpoint,
@@ -2213,7 +2188,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis));
objSerializeSCALAR(psQIF, iQueueSize, INT);
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
- objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
CHKiRet(obj.EndSerialize(psQIF));
/* now persist the stream info */
@@ -2793,8 +2767,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
pThis->iQueueSize = pProp->val.num;
} else if(isProp("tVars.disk.sizeOnDisk")) {
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
- } else if(isProp("tVars.disk.bytesRead")) {
- pThis->tVars.disk.bytesRead = pProp->val.num;
} else if(isProp("qType")) {
if(pThis->qType != pProp->val.num)
ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH);
diff --git a/runtime/queue.h b/runtime/queue.h
index e6ccdcdb..7db2d90d 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -147,7 +147,8 @@ struct queue_s {
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
/* now follow queueing mode specific data elements */
- union { /* different data elements based on queue type (qType) */
+ //union { /* different data elements based on queue type (qType) */
+ struct { /* different data elements based on queue type (qType) */
struct {
long deqhead, head, tail;
void** pBuf; /* the queued user data structure */
@@ -159,7 +160,9 @@ struct queue_s {
} linklist;
struct {
int64 sizeOnDisk; /* current amount of disk space used */
- int64 bytesRead; /* number of bytes read from current (undeleted!) file */
+ int64 deqOffs; /* offset after dequeue batch - used for file deleter */
+ int deqFileNumIn; /* same for the circular file numbers, mainly for */
+ int deqFileNumOut;/* deleting finished files */
strm_t *pWrite; /* current file to be written */
strm_t *pReadDeq; /* current file for dequeueing */
strm_t *pReadDel; /* current file for deleting */
diff --git a/runtime/stream.c b/runtime/stream.c
index 26abeb39..23e6c943 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -365,7 +365,13 @@ static rsRetVal strmCloseFile(strm_t *pThis)
pThis->fdDir = -1;
}
- if(pThis->bDeleteOnClose && pThis->pszCurrFName != NULL) {
+ if(pThis->bDeleteOnClose) {
+ if(pThis->pszCurrFName == NULL) {
+ CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
+ pThis->pszFName, pThis->lenFName, pThis->iCurrFNum,
+ pThis->iFileNumDigits));
+ }
+ DBGPRINTF("strmCloseFile: deleting '%s'\n", pThis->pszCurrFName);
if(unlink((char*) pThis->pszCurrFName) == -1) {
char errStr[1024];
int err = errno;
@@ -373,12 +379,13 @@ static rsRetVal strmCloseFile(strm_t *pThis)
DBGPRINTF("error %d unlinking '%s' - ignored: %s\n",
errno, pThis->pszCurrFName, errStr);
}
- free(pThis->pszCurrFName); /* no longer needed in any case (just for open) */
+ free(pThis->pszCurrFName);
pThis->pszCurrFName = NULL;
}
pThis->iCurrOffs = 0; /* we are back at begin of file */
+finalize_it:
RETiRet;
}
@@ -1330,6 +1337,56 @@ finalize_it:
RETiRet;
}
+/* multi-file seek, seeks to file number & offset within file. This
+ * is a support function for the queue, in circular mode. DO NOT USE
+ * IT FOR OTHER NEEDS - it may not work as expected. It will
+ * seek to the new position and delete interim files, as it skips them.
+ * Note: this code can be removed when the queue gets a new disk store
+ * handler (if and when it does ;)).
+ * The output parameter bytesDel receives the number of bytes that have
+ * been deleted (if a file is deleted) or 0 if nothing was deleted.
+ * rgerhards, 2012-11-07
+ */
+rsRetVal
+strmMultiFileSeek(strm_t *pThis, int FNum, off64_t offs, off64_t *bytesDel)
+{
+ struct stat statBuf;
+ DEFiRet;
+
+ ISOBJ_TYPE_assert(pThis, strm);
+
+ if(FNum == 0 && offs == 0) { /* happens during queue init */
+ *bytesDel = 0;
+ FINALIZE;
+ }
+
+ if(pThis->iCurrFNum != FNum) {
+ /* Note: we assume that no more than one file is skipped - an
+ * assumption that is being used also by the whole rest of the
+ * code and most notably the queue subsystem.
+ */
+ CHKiRet(genFileName(&pThis->pszCurrFName, pThis->pszDir, pThis->lenDir,
+ pThis->pszFName, pThis->lenFName, pThis->iCurrFNum,
+ pThis->iFileNumDigits));
+ stat((char*)pThis->pszCurrFName, &statBuf);
+ *bytesDel = statBuf.st_size;
+ DBGPRINTF("strmMultiFileSeek: detected new filenum, was %d, new %d, "
+ "deleting '%s' (%lld bytes)\n", pThis->iCurrFNum, FNum,
+ pThis->pszCurrFName, (long long) *bytesDel);
+ unlink((char*)pThis->pszCurrFName);
+ free(pThis->pszCurrFName);
+ pThis->pszCurrFName = NULL;
+ pThis->iCurrFNum = FNum;
+ } else {
+ *bytesDel = 0;
+ }
+ pThis->iCurrOffs = offs;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* seek to current offset. This is primarily a helper to readjust the OS file
* pointer after a strm object has been deserialized.
diff --git a/runtime/stream.h b/runtime/stream.h
index fdfefaa3..91f0e4f4 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -189,8 +189,13 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
ENDinterface(strm)
#define strmCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */
+static inline int
+strmGetCurrFileNum(strm_t *pStrm) {
+ return pStrm->iCurrFNum;
+}
/* prototypes */
PROTOTYPEObjClassInit(strm);
+rsRetVal strmMultiFileSeek(strm_t *pThis, int fileNum, off64_t offs, off64_t *bytesDel);
#endif /* #ifndef STREAM_H_INCLUDED */