summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c122
1 files changed, 104 insertions, 18 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 5770eae9..ff1b30f4 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -59,6 +59,7 @@
#include "datetime.h"
#include "unicode-helper.h"
#include "statsobj.h"
+#include "parserif.h"
#ifdef OS_SOLARIS
# include <sched.h>
@@ -86,6 +87,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDestructDisk(qqueue_t *pThis);
+rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir);
/* some constants for queuePersist () */
#define QUEUE_CHECKPOINT 1
@@ -94,6 +96,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis);
/* tables for interfacing with the v6 config system */
static struct cnfparamdescr cnfpdescr[] = {
{ "queue.filename", eCmdHdlrGetWord, 0 },
+ { "queue.spooldirectory", eCmdHdlrGetWord, 0 },
{ "queue.size", eCmdHdlrSize, 0 },
{ "queue.dequeuebatchsize", eCmdHdlrInt, 0 },
{ "queue.maxdiskspace", eCmdHdlrSize, 0 },
@@ -417,6 +420,7 @@ StartDA(qqueue_t *pThis)
CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown));
CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize));
CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix));
+ CHKiRet(qqueueSetSpoolDir(pThis->pqDA, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt));
CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles));
CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown));
@@ -733,7 +737,7 @@ qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pTh
DEFiRet;
ISOBJ_TYPE_assert(pStrm, strm);
ISOBJ_TYPE_assert(pThis, qqueue);
- CHKiRet(strm.SetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pStrm, pThis->pszSpoolDir, pThis->lenSpoolDir));
finalize_it:
RETiRet;
}
@@ -840,7 +844,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
} else {
CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite));
CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles));
- CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
@@ -852,7 +856,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0));
- CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
@@ -865,7 +869,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel));
CHKiRet(strm.SetbSync(pThis->tVars.disk.pReadDel, pThis->bSyncQueueFiles));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1));
- CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir())));
+ CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, pThis->pszSpoolDir, pThis->lenSpoolDir));
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR));
@@ -1359,7 +1363,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis)
pThis->iDeqBatchSize = 128; /* default batch size */
pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */
pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */
- pThis->iDiscardMrk = 980; /* begin to discard messages */
+ pThis->iDiscardMrk = -1; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
pThis->iMaxFileSize = 1024*1024;
@@ -1389,7 +1393,7 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis)
pThis->iDeqBatchSize = 1024; /* default batch size */
pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */
pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */
- pThis->iDiscardMrk = 49500; /* begin to discard messages */
+ pThis->iDiscardMrk = -1; /* begin to discard messages */
pThis->iDiscardSeverity = 8; /* turn off */
pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */
pThis->iMaxFileSize = 16*1024*1024;
@@ -1680,6 +1684,9 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti)
/* The rate limiter
*
+ * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when
+ * it actually delays processing. Otherwise inputs are stalled.
+ *
* Here we may wait if a dequeue time window is defined or if we are
* rate-limited. TODO: If we do so, we should also look into the
* way new worker threads are spawned. Obviously, it doesn't make much
@@ -1765,8 +1772,10 @@ RateLimiter(qqueue_t *pThis)
}
if(iDelay > 0) {
+ pthread_mutex_unlock(pThis->mut);
DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay);
srSleep(iDelay, 0);
+ pthread_mutex_lock(pThis->mut);
}
RETiRet;
@@ -2076,7 +2085,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
/* pre-construct file name for .qi file */
pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar),
- "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix);
+ "%s/%s.qi", (char*) pThis->pszSpoolDir, (char*)pThis->pszFilePrefix);
pThis->pszQIFNam = ustrdup(pszQIFNam);
DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam,
(int) pThis->lenQIFNam);
@@ -2124,21 +2133,68 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
}
}
- if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize)
+ if(pThis->iDiscardMrk > pThis->iMaxQueueSize) {
+ errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": "
+ "queue.discardMark %d is set larger than queue.size",
+ obj.GetName((obj_t*) pThis), pThis->iDiscardMrk);
+ }
+
+ goodval = (pThis->iMaxQueueSize / 100) * 80;
+ if(pThis->iDiscardMrk != -1 && pThis->iDiscardMrk < goodval) {
+ errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": queue.discardMark "
+ "is set quite low at %d. You should only set it below "
+ "80%% (%d) if you have a good reason for this.",
+ obj.GetName((obj_t*) pThis), pThis->iDiscardMrk, goodval);
+ }
+
+
+ /* now come parameter corrections and defaults */
+ if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) {
pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90;
+ if(pThis->iHighWtrMrk == 0) { /* guard against very low max queue sizes! */
+ pThis->iHighWtrMrk = pThis->iMaxQueueSize;
+ }
+ }
if( pThis->iLowWtrMrk < 2
|| pThis->iLowWtrMrk > pThis->iMaxQueueSize
- || pThis->iLowWtrMrk > pThis->iHighWtrMrk )
+ || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) {
pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70;
+ if(pThis->iLowWtrMrk == 0) {
+ pThis->iLowWtrMrk = 1;
+ }
+ }
+
if( pThis->iMinMsgsPerWrkr < 1
- || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize )
+ || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) {
pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads;
- if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize)
+ }
+
+ if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) {
pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97;
- if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize)
+ if(pThis->iFullDlyMrk == 0) {
+ pThis->iFullDlyMrk =
+ (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1;
+ }
+ }
+ if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) {
pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70;
- if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize)
+ if(pThis->iLightDlyMrk == 0) {
+ pThis->iLightDlyMrk =
+ (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1;
+ }
+ }
+
+ if(pThis->iDiscardMrk < 1 || pThis->iDiscardMrk > pThis->iMaxQueueSize) {
+ pThis->iDiscardMrk = (pThis->iMaxQueueSize / 100) * 98;
+ if(pThis->iDiscardMrk == 0) {
+ /* for very small queues, we disable this by default */
+ pThis->iDiscardMrk = pThis->iMaxQueueSize;
+ }
+ }
+
+ if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) {
pThis->iDeqBatchSize = pThis->iMaxQueueSize;
+ }
/* finalize some initializations that could not yet be done because it is
* influenced by properties which might have been set after queueConstruct ()
@@ -2170,14 +2226,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pThis->iFullDlyMrk = wrk;
}
- DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, "
- "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, "
- "max wrkr %d, min msgs f. wrkr %d\n",
- pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize,
+ DBGOPRINT((obj_t*) pThis, "params: type %d, enq-only %d, disk assisted %d, spoolDir '%s', maxFileSz %lld, "
+ "maxQSize %d, lqsize %d, pqsize %d, child %d, full delay %d, "
+ "light delay %d, deq batch size %d, high wtrmrk %d, low wtrmrk %d, "
+ "discardmrk %d, max wrkr %d, min msgs f. wrkr %d\n",
+ pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->pszSpoolDir,
+ pThis->iMaxFileSize, pThis->iMaxQueueSize,
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk,
pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk,
- pThis->iNumWorkerThreads, pThis->iMinMsgsPerWrkr);
+ pThis->iDiscardMrk, pThis->iNumWorkerThreads, pThis->iMinMsgsPerWrkr);
pThis->bQueueStarted = 1;
if(pThis->qType == QUEUETYPE_DIRECT)
@@ -2484,6 +2542,24 @@ CODESTARTobjDestruct(qqueue)
ENDobjDestruct(qqueue)
+/* set the queue's spool directory. The directory MUST NOT be NULL.
+ * The passed-in string is duplicated. So if the caller does not need
+ * it any longer, it must free it.
+ */
+rsRetVal
+qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir)
+{
+ DEFiRet;
+
+ free(pThis->pszSpoolDir);
+ CHKmalloc(pThis->pszSpoolDir = ustrdup(pszSpoolDir));
+ pThis->lenSpoolDir = lenSpoolDir;
+
+finalize_it:
+ RETiRet;
+}
+
+
/* set the queue's file prefix
* The passed-in string is duplicated. So if the caller does not need
* it any longer, it must free it.
@@ -2834,6 +2910,16 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst)
pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr);
} else if(!strcmp(pblk.descr[i].name, "queue.cry.provider")) {
pThis->cryprovName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ } else if(!strcmp(pblk.descr[i].name, "queue.spooldirectory")) {
+ free(pThis->pszSpoolDir);
+ pThis->pszSpoolDir = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
+ pThis->lenSpoolDir = es_strlen(pvals[i].val.d.estr);
+ if(pThis->pszSpoolDir[pThis->lenSpoolDir-1] == '/') {
+ pThis->pszSpoolDir[pThis->lenSpoolDir-1] = '\0';
+ --pThis->lenSpoolDir;
+ parser_errmsg("queue.spooldirectory must not end with '/', "
+ "corrected to '%s'", pThis->pszSpoolDir);
+ }
} else if(!strcmp(pblk.descr[i].name, "queue.size")) {
pThis->iMaxQueueSize = pvals[i].val.d.n;
} else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) {