summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c50
1 files changed, 42 insertions, 8 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 0fe95876..db31d9f8 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -59,6 +59,7 @@
#include "datetime.h"
#include "unicode-helper.h"
#include "statsobj.h"
+#include "parserif.h"
#ifdef OS_SOLARIS
# include <sched.h>
@@ -86,6 +87,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDestructDisk(qqueue_t *pThis);
+rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -94,6 +96,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis);
/* tables for interfacing with the v6 config system */
static struct cnfparamdescr cnfpdescr[] = {
{ "queue.filename", eCmdHdlrGetWord, 0 },
+ { "queue.spooldirectory", eCmdHdlrGetWord, 0 },
{ "queue.size", eCmdHdlrSize, 0 },
{ "queue.dequeuebatchsize", eCmdHdlrInt, 0 },
{ "queue.maxdiskspace", eCmdHdlrSize, 0 },
@@ -417,6 +420,7 @@ StartDA(qqueue_t *pThis)
CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(qqueueSetSpoolDir(pThis->pqDA, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
@@ -733,7 +737,7 @@ qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pTh
DEFiRet;
ISOBJ_TYPE_assert(pStrm, strm);
ISOBJ_TYPE_assert(pThis, qqueue);
- CHKiRet(strm.SetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pStrm, pThis->pszSpoolDir, pThis->lenSpoolDir));
finalize_it:
RETiRet;
}
@@ -840,7 +844,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
} else {
CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite));
CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles));
- CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
@@ -852,7 +856,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
- CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
@@ -865,7 +869,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel));
CHKiRet(strm.SetbSync(pThis->tVars.disk.pReadDel, pThis->bSyncQueueFiles));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
- CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR));
@@ -2081,7 +2085,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
/* pre-construct file name for .qi file */
pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar),
- "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
+ "%s/%s.qi", (char*) pThis->pszSpoolDir, (char*)pThis->pszFilePrefix);
pThis->pszQIFNam = ustrdup(pszQIFNam);
DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam,
(int) pThis->lenQIFNam);
@@ -2175,10 +2179,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->iFullDlyMrk = wrk;
}
- DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, "
- "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, "
+ DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, spoolDir '%s', maxFileSz %lld, "
+ "maxQSize %d, lqsize %d, pqsize %d, child %d, full delay %d, "
+ "light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, "
"max wrkr %d, min msgs f. wrkr %d\n",
- pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize,
+ pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->pszSpoolDir,
+ pThis->iMaxFileSize, pThis->iMaxQueueSize,
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk,
pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk,
@@ -2489,6 +2495,24 @@ CODESTARTobjDestruct(qqueue)
ENDobjDestruct(qqueue)
+/* set the queue's spool directory. The directory MUST NOT be NULL.
+ * The passed-in string is duplicated. So if the caller does not need
+ * it any longer, it must free it.
+ */
+rsRetVal
+qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir)
+{
+ DEFiRet;
+
+ free(pThis->pszSpoolDir);
+ CHKmalloc(pThis->pszSpoolDir = ustrdup(pszSpoolDir));
+ pThis->lenSpoolDir = lenSpoolDir;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* set the queue's file prefix
* The passed-in string is duplicated. So if the caller does not need
* it any longer, it must free it.
@@ -2839,6 +2863,16 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst)
pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr);
} else if(!strcmp(pblk.descr[i].name, "queue.cry.provider")) {
pThis->cryprovName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(pblk.descr[i].name, "queue.spooldirectory")) {
+ free(pThis->pszSpoolDir);
+ pThis->pszSpoolDir = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ pThis->lenSpoolDir = es_strlen(pvals[i].val.d.estr);
+ if(pThis->pszSpoolDir[pThis->lenSpoolDir-1] == '/') {
+ pThis->pszSpoolDir[pThis->lenSpoolDir-1] = '\0';
+ --pThis->lenSpoolDir;
+ parser_errmsg("queue.spooldirectory must not end with '/', "
+ "corrected to '%s'", pThis->pszSpoolDir);
+ }
} else if(!strcmp(pblk.descr[i].name, "queue.size")) {
pThis->iMaxQueueSize = pvals[i].val.d.n;
} else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) {