diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/queue.c | 122 | ||||
-rw-r--r-- | runtime/wti.c | 30 |
2 files changed, 121 insertions, 31 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 5770eae9..ff1b30f4 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -59,6 +59,7 @@ #include "datetime.h" #include "unicode-helper.h" #include "statsobj.h" +#include "parserif.h" #ifdef OS_SOLARIS # include <sched.h> @@ -86,6 +87,7 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDestructDisk(qqueue_t *pThis); +rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir); /* some constants for queuePersist () */ #define QUEUE_CHECKPOINT 1 @@ -94,6 +96,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis); /* tables for interfacing with the v6 config system */ static struct cnfparamdescr cnfpdescr[] = { { "queue.filename", eCmdHdlrGetWord, 0 }, + { "queue.spooldirectory", eCmdHdlrGetWord, 0 }, { "queue.size", eCmdHdlrSize, 0 }, { "queue.dequeuebatchsize", eCmdHdlrInt, 0 }, { "queue.maxdiskspace", eCmdHdlrSize, 0 }, @@ -417,6 +420,7 @@ StartDA(qqueue_t *pThis) CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); CHKiRet(qqueueSetFilePrefix(pThis->pqDA, pThis->pszFilePrefix, pThis->lenFilePrefix)); + CHKiRet(qqueueSetSpoolDir(pThis->pqDA, pThis->pszSpoolDir, pThis->lenSpoolDir)); CHKiRet(qqueueSetiPersistUpdCnt(pThis->pqDA, pThis->iPersistUpdCnt)); CHKiRet(qqueueSetbSyncQueueFiles(pThis->pqDA, pThis->bSyncQueueFiles)); CHKiRet(qqueueSettoActShutdown(pThis->pqDA, pThis->toActShutdown)); @@ -733,7 +737,7 @@ qqueueLoadPersStrmInfoFixup(strm_t *pStrm, qqueue_t __attribute__((unused)) *pTh DEFiRet; ISOBJ_TYPE_assert(pStrm, strm); ISOBJ_TYPE_assert(pThis, qqueue); - CHKiRet(strm.SetDir(pStrm, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetDir(pStrm, pThis->pszSpoolDir, pThis->lenSpoolDir)); finalize_it: RETiRet; } @@ -840,7 +844,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) } else { CHKiRet(strm.Construct(&pThis->tVars.disk.pWrite)); CHKiRet(strm.SetbSync(pThis->tVars.disk.pWrite, pThis->bSyncQueueFiles)); - CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetDir(pThis->tVars.disk.pWrite, pThis->pszSpoolDir, pThis->lenSpoolDir)); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); @@ -852,7 +856,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq)); CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); - CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDeq, pThis->pszSpoolDir, pThis->lenSpoolDir)); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); @@ -865,7 +869,7 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel)); CHKiRet(strm.SetbSync(pThis->tVars.disk.pReadDel, pThis->bSyncQueueFiles)); CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDel, 1)); - CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, glbl.GetWorkDir(), strlen((char*)glbl.GetWorkDir()))); + CHKiRet(strm.SetDir(pThis->tVars.disk.pReadDel, pThis->pszSpoolDir, pThis->lenSpoolDir)); CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ)); CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR)); @@ -1359,7 +1363,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) pThis->iDeqBatchSize = 128; /* default batch size */ pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ - pThis->iDiscardMrk = 980; /* begin to discard messages */ + pThis->iDiscardMrk = -1; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ pThis->iMaxFileSize = 1024*1024; @@ -1389,7 +1393,7 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) pThis->iDeqBatchSize = 1024; /* default batch size */ pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ - pThis->iDiscardMrk = 49500; /* begin to discard messages */ + pThis->iDiscardMrk = -1; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ pThis->iMaxFileSize = 16*1024*1024; @@ -1680,6 +1684,9 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) /* The rate limiter * + * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when + * it actually delays processing. Otherwise inputs are stalled. + * * Here we may wait if a dequeue time window is defined or if we are * rate-limited. TODO: If we do so, we should also look into the * way new worker threads are spawned. Obviously, it doesn't make much @@ -1765,8 +1772,10 @@ RateLimiter(qqueue_t *pThis) } if(iDelay > 0) { + pthread_mutex_unlock(pThis->mut); DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); srSleep(iDelay, 0); + pthread_mutex_lock(pThis->mut); } RETiRet; @@ -2076,7 +2085,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ /* pre-construct file name for .qi file */ pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), - "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); + "%s/%s.qi", (char*) pThis->pszSpoolDir, (char*)pThis->pszFilePrefix); pThis->pszQIFNam = ustrdup(pszQIFNam); DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam, (int) pThis->lenQIFNam); @@ -2124,21 +2133,68 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ } } - if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) + if(pThis->iDiscardMrk > pThis->iMaxQueueSize) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": " + "queue.discardMark %d is set larger than queue.size", + obj.GetName((obj_t*) pThis), pThis->iDiscardMrk); + } + + goodval = (pThis->iMaxQueueSize / 100) * 80; + if(pThis->iDiscardMrk != -1 && pThis->iDiscardMrk < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": queue.discardMark " + "is set quite low at %d. You should only set it below " + "80%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iDiscardMrk, goodval); + } + + + /* now come parameter corrections and defaults */ + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) { pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; + if(pThis->iHighWtrMrk == 0) { /* guard against very low max queue sizes! */ + pThis->iHighWtrMrk = pThis->iMaxQueueSize; + } + } if( pThis->iLowWtrMrk < 2 || pThis->iLowWtrMrk > pThis->iMaxQueueSize - || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) + || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) { pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70; + if(pThis->iLowWtrMrk == 0) { + pThis->iLowWtrMrk = 1; + } + } + if( pThis->iMinMsgsPerWrkr < 1 - || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) + || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) { pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads; - if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) + } + + if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) { pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; - if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) + if(pThis->iFullDlyMrk == 0) { + pThis->iFullDlyMrk = + (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1; + } + } + if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) { pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70; - if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) + if(pThis->iLightDlyMrk == 0) { + pThis->iLightDlyMrk = + (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1; + } + } + + if(pThis->iDiscardMrk < 1 || pThis->iDiscardMrk > pThis->iMaxQueueSize) { + pThis->iDiscardMrk = (pThis->iMaxQueueSize / 100) * 98; + if(pThis->iDiscardMrk == 0) { + /* for very small queues, we disable this by default */ + pThis->iDiscardMrk = pThis->iMaxQueueSize; + } + } + + if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) { pThis->iDeqBatchSize = pThis->iMaxQueueSize; + } /* finalize some initializations that could not yet be done because it is * influenced by properties which might have been set after queueConstruct () @@ -2170,14 +2226,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->iFullDlyMrk = wrk; } - DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, " - "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d, " - "max wrkr %d, min msgs f. wrkr %d\n", - pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize, + DBGOPRINT((obj_t*) pThis, "params: type %d, enq-only %d, disk assisted %d, spoolDir '%s', maxFileSz %lld, " + "maxQSize %d, lqsize %d, pqsize %d, child %d, full delay %d, " + "light delay %d, deq batch size %d, high wtrmrk %d, low wtrmrk %d, " + "discardmrk %d, max wrkr %d, min msgs f. wrkr %d\n", + pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->pszSpoolDir, + pThis->iMaxFileSize, pThis->iMaxQueueSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk, pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk, - pThis->iNumWorkerThreads, pThis->iMinMsgsPerWrkr); + pThis->iDiscardMrk, pThis->iNumWorkerThreads, pThis->iMinMsgsPerWrkr); pThis->bQueueStarted = 1; if(pThis->qType == QUEUETYPE_DIRECT) @@ -2484,6 +2542,24 @@ CODESTARTobjDestruct(qqueue) ENDobjDestruct(qqueue) +/* set the queue's spool directory. The directory MUST NOT be NULL. + * The passed-in string is duplicated. So if the caller does not need + * it any longer, it must free it. + */ +rsRetVal +qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir) +{ + DEFiRet; + + free(pThis->pszSpoolDir); + CHKmalloc(pThis->pszSpoolDir = ustrdup(pszSpoolDir)); + pThis->lenSpoolDir = lenSpoolDir; + +finalize_it: + RETiRet; +} + + /* set the queue's file prefix * The passed-in string is duplicated. So if the caller does not need * it any longer, it must free it. @@ -2834,6 +2910,16 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst) pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr); } else if(!strcmp(pblk.descr[i].name, "queue.cry.provider")) { pThis->cryprovName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(pblk.descr[i].name, "queue.spooldirectory")) { + free(pThis->pszSpoolDir); + pThis->pszSpoolDir = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + pThis->lenSpoolDir = es_strlen(pvals[i].val.d.estr); + if(pThis->pszSpoolDir[pThis->lenSpoolDir-1] == '/') { + pThis->pszSpoolDir[pThis->lenSpoolDir-1] = '\0'; + --pThis->lenSpoolDir; + parser_errmsg("queue.spooldirectory must not end with '/', " + "corrected to '%s'", pThis->pszSpoolDir); + } } else if(!strcmp(pblk.descr[i].name, "queue.size")) { pThis->iMaxQueueSize = pvals[i].val.d.n; } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) { diff --git a/runtime/wti.c b/runtime/wti.c index bbefc537..c02d0573 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -303,15 +303,26 @@ wtiWorker(wti_t *pThis) pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); dbgprintf("DDDD: wti %p: worker starting\n", pThis); - /* now we have our identity, on to real processing */ - while(1) { /* loop will be broken below - need to do mutex locks */ + + /* note: in this loop, the mutex is "never" unlocked. Of course, + * this is not true: it actually is unlocked when the actual processing + * is done, as part of pWtp->pfDoWork() processing. Note that this + * function is required to re-lock it when done. We cannot do the + * lock/unlock here ourselfs, as pfDoWork() needs to access queue + * structures itself. + * The same goes for pfRateLimiter(). While we could unlock/lock when + * we call it, in practice the function is often called without any + * ratelimiting actually done. Only the rate limiter itself knows + * that. As such, it needs to bear the burden of doing the locking + * when required. -- rgerhards, 2013-11-20 + */ + d_pthread_mutex_lock(pWtp->pmutUsr); + while(1) { /* loop will be broken below */ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); } - d_pthread_mutex_lock(pWtp->pmutUsr); - /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); if(terminateRet == RS_RET_TERMINATE_NOW) { @@ -319,36 +330,29 @@ dbgprintf("DDDD: wti %p: worker starting\n", pThis); localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); DBGOPRINT((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", localRet); - d_pthread_mutex_unlock(pWtp->pmutUsr); break; } /* try to execute and process whatever we have */ - /* Note that this function releases and re-aquires the mutex. The returned - * information on idle state must be processed before releasing the mutex again. - */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) { - d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } else if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { - d_pthread_mutex_unlock(pWtp->pmutUsr); DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n", terminateRet, bInactivityTOOccured); break; /* end of loop */ } doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); - d_pthread_mutex_unlock(pWtp->pmutUsr); continue; /* request next iteration */ } - d_pthread_mutex_unlock(pWtp->pmutUsr); - bInactivityTOOccured = 0; /* reset for next run */ } + d_pthread_mutex_unlock(pWtp->pmutUsr); + DBGPRINTF("DDDD: wti %p: worker cleanup up action instances\n", pThis); for(i = 0 ; i < iActionNbr ; ++i) { dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, pThis->actWrkrInfo[i].actWrkrData); |