diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 380 |
1 files changed, 269 insertions, 111 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index a152af1e..ff1b30f4 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -59,6 +59,7 @@ #include "datetime.h" #include "unicode-helper.h" #include "statsobj.h" +#include "parserif.h" #ifdef OS_SOLARIS # include <sched.h> @@ -85,7 +86,6 @@ static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiS static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDestructDisk(qqueue_t *pThis); rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir); @@ -96,6 +96,7 @@ rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir) /* tables for interfacing with the v6 config system */ static struct cnfparamdescr cnfpdescr[] = { { "queue.filename", eCmdHdlrGetWord, 0 }, + { "queue.spooldirectory", eCmdHdlrGetWord, 0 }, { "queue.size", eCmdHdlrSize, 0 }, { "queue.dequeuebatchsize", eCmdHdlrInt, 0 }, { "queue.maxdiskspace", eCmdHdlrSize, 0 }, @@ -119,6 +120,7 @@ static struct cnfparamdescr cnfpdescr[] = { { "queue.dequeueslowdown", eCmdHdlrInt, 0 }, { "queue.dequeuetimebegin", eCmdHdlrInt, 0 }, { "queue.dequeuetimeend", eCmdHdlrInt, 0 }, + { "queue.cry.provider", eCmdHdlrGetWord, 0 } }; static struct cnfparamblk pblk = { CNFPARAMBLK_VERSION, @@ -351,16 +353,15 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis) if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) { DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n"); wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */ + } + if(getLogicalQueueSize(pThis) == 0) { + iMaxWorkers = 0; + } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { + iMaxWorkers = 1; } else { - if(getLogicalQueueSize(pThis) == 0) { - iMaxWorkers = 0; - } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) { - iMaxWorkers = 1; - } else { - iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; - } - wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); + iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1; } + wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); } RETiRet; @@ -485,7 +486,6 @@ InitDA(qqueue_t *pThis, int bLockMutex) CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1)); CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown)); CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis)); @@ -708,9 +708,13 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) pThis->qType = QUEUETYPE_DIRECT; pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; if(pThis->pqParent != NULL) { DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n"); pThis->pqParent->bIsDA = 0; @@ -778,11 +782,19 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); - /* create a duplicate for the read "pointer". */ CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq)); CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */ CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq)); + /* if we use a crypto provider, we need to amend the objects with it's info */ + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pWrite, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pWrite, pThis->cryprovData)); + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDeq, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDeq, pThis->cryprovData)); + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDel, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDel, pThis->cryprovData)); + } CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pWrite)); CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDel)); @@ -836,6 +848,10 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pWrite, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pWrite, pThis->cryprovData)); + } CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite)); CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq)); @@ -844,6 +860,10 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDeq, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDeq, pThis->cryprovData)); + } CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq)); CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel)); @@ -853,6 +873,10 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ)); CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR)); + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDel, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDel, pThis->cryprovData)); + } CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDel)); CHKiRet(strm.SetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); @@ -940,13 +964,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) +static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) { batch_t singleBatch; batch_obj_t batchObj; batch_state_t batchState = BATCH_STATE_RDY; - sbool active = 1; - int i; DEFiRet; //TODO: init batchObj (states _OK and new fields -- CHECK) @@ -966,46 +988,29 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; - singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); - /* delete the batch string params: TODO: create its own "class" for this */ - for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { - free(batchObj.staticActStrings[i]); - } + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti); msgDestruct(&pMsg); RETiRet; } -/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead - * otherwise incured. -- rgerhards, ~2010-06-23 +/* this is called if we do not have a pWti. This currently only happens + * when we are called from a main queue in direct mode. If so, we need + * to obtain a dummy pWti. */ -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) +static rsRetVal +qAddDirect(qqueue_t *pThis, msg_t* pMsg) { + wti_t *pWti; DEFiRet; - ASSERT(pThis != NULL); - - /* calling the consumer is quite different here than it is from a worker thread */ - /* we need to provide the consumer's return value back to the caller because in direct - * mode the consumer probably has a lot to convey (which get's lost in the other modes - * because they are asynchronous. But direct mode is deliberately synchronous. - * rgerhards, 2008-02-12 - * We use our knowledge about the batch_t structure below, but without that, we - * pay a too-large performance toll... -- rgerhards, 2009-04-22 - */ - iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL); - + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + iRet = qAddDirectWithWti(pThis, pMsg, pWti); RETiRet; } -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) -{ - return RS_RET_OK; -} - - /* --------------- end type-specific handlers -------------------- */ @@ -1300,7 +1305,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*)) { DEFiRet; qqueue_t *pThis; @@ -1323,6 +1328,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; pThis->nLogDeq = 0; + pThis->useCryprov = 0; pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; @@ -1355,9 +1361,9 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) pThis->qType = QUEUETYPE_DIRECT; /* type of the main message queue above */ pThis->iMaxQueueSize = 1000; /* size of the main message queue above */ pThis->iDeqBatchSize = 128; /* default batch size */ - pThis->iHighWtrMrk = 800; /* high water mark for disk-assisted queues */ - pThis->iLowWtrMrk = 200; /* low water mark for disk-assisted queues */ - pThis->iDiscardMrk = 980; /* begin to discard messages */ + pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ + pThis->iDiscardMrk = -1; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ pThis->iMaxFileSize = 1024*1024; @@ -1367,7 +1373,7 @@ qqueueSetDefaultsActionQueue(qqueue_t *pThis) pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ pThis->toEnq = 2000; /* timeout for queue enque */ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ - pThis->iMinMsgsPerWrkr = 100; /* minimum messages per worker needed to start a new one */ + pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ pThis->sizeOnDiskMax = 0; /* unlimited */ pThis->iDeqSlowdown = 0; @@ -1385,9 +1391,9 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) pThis->qType = QUEUETYPE_FIXED_ARRAY; /* type of the main message queue above */ pThis->iMaxQueueSize = 50000; /* size of the main message queue above */ pThis->iDeqBatchSize = 1024; /* default batch size */ - pThis->iHighWtrMrk = 45000; /* high water mark for disk-assisted queues */ - pThis->iLowWtrMrk = 20000; /* low water mark for disk-assisted queues */ - pThis->iDiscardMrk = 49500; /* begin to discard messages */ + pThis->iHighWtrMrk = -1; /* high water mark for disk-assisted queues */ + pThis->iLowWtrMrk = -1; /* low water mark for disk-assisted queues */ + pThis->iDiscardMrk = -1; /* begin to discard messages */ pThis->iDiscardSeverity = 8; /* turn off */ pThis->iNumWorkerThreads = 1; /* number of worker threads for the mm queue above */ pThis->iMaxFileSize = 16*1024*1024; @@ -1397,7 +1403,7 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) pThis->toActShutdown = 1000; /* action shutdown (in phase 2) */ pThis->toEnq = 2000; /* timeout for queue enque */ pThis->toWrkShutdown = 60000; /* timeout for worker thread shutdown */ - pThis->iMinMsgsPerWrkr = 1000; /* minimum messages per worker needed to start a new one */ + pThis->iMinMsgsPerWrkr = -1; /* minimum messages per worker needed to start a new one */ pThis->bSaveOnShutdown = 1; /* save queue on shutdown (when DA enabled)? */ pThis->sizeOnDiskMax = 0; /* unlimited */ pThis->iDeqSlowdown = 0; @@ -1678,6 +1684,9 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) /* The rate limiter * + * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when + * it actually delays processing. Otherwise inputs are stalled. + * * Here we may wait if a dequeue time window is defined or if we are * rate-limited. TODO: If we do so, we should also look into the * way new worker threads are spawned. Obviously, it doesn't make much @@ -1763,8 +1772,10 @@ RateLimiter(qqueue_t *pThis) } if(iDelay > 0) { + pthread_mutex_unlock(pThis->mut); DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); srSleep(iDelay, 0); + pthread_mutex_lock(pThis->mut); } RETiRet; @@ -1858,7 +1869,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2036,6 +2048,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ uchar pszBuf[64]; uchar pszQIFNam[MAXFNAME]; int wrk; + int goodval; /* a "good value" to use for comparisons (different objects) */ uchar *qName; size_t lenBuf; @@ -2080,9 +2093,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ case QUEUETYPE_DIRECT: pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; break; } @@ -2097,14 +2114,87 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* we need to do a quick check if our water marks are set plausible. If not, * we correct the most important shortcomings. */ - if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) - pThis->iFullDlyMrk = pThis->iMaxQueueSize - - (pThis->iMaxQueueSize / 100) * 3; /* default 97% */ - if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) - pThis->iLightDlyMrk = pThis->iMaxQueueSize - - (pThis->iMaxQueueSize / 100) * 30; /* default 70% */ - if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) + goodval = (pThis->iMaxQueueSize / 100) * 60; + if(pThis->iHighWtrMrk != -1 && pThis->iHighWtrMrk < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": high water mark " + "is set quite low at %d. You should only set it below " + "60%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iHighWtrMrk, goodval); + } + + if(pThis->iNumWorkerThreads > 1) { + goodval = (pThis->iMaxQueueSize / 100) * 10; + if(pThis->iMinMsgsPerWrkr != -1 && pThis->iMinMsgsPerWrkr < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": " + "queue.workerThreadMinimumMessage " + "is set quite low at %d. You should only set it below " + "10%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iMinMsgsPerWrkr, goodval); + } + } + + if(pThis->iDiscardMrk > pThis->iMaxQueueSize) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": " + "queue.discardMark %d is set larger than queue.size", + obj.GetName((obj_t*) pThis), pThis->iDiscardMrk); + } + + goodval = (pThis->iMaxQueueSize / 100) * 80; + if(pThis->iDiscardMrk != -1 && pThis->iDiscardMrk < goodval) { + errmsg.LogError(0, RS_RET_CONF_PARSE_WARNING, "queue \"%s\": queue.discardMark " + "is set quite low at %d. You should only set it below " + "80%% (%d) if you have a good reason for this.", + obj.GetName((obj_t*) pThis), pThis->iDiscardMrk, goodval); + } + + + /* now come parameter corrections and defaults */ + if(pThis->iHighWtrMrk < 2 || pThis->iHighWtrMrk > pThis->iMaxQueueSize) { + pThis->iHighWtrMrk = (pThis->iMaxQueueSize / 100) * 90; + if(pThis->iHighWtrMrk == 0) { /* guard against very low max queue sizes! */ + pThis->iHighWtrMrk = pThis->iMaxQueueSize; + } + } + if( pThis->iLowWtrMrk < 2 + || pThis->iLowWtrMrk > pThis->iMaxQueueSize + || pThis->iLowWtrMrk > pThis->iHighWtrMrk ) { + pThis->iLowWtrMrk = (pThis->iMaxQueueSize / 100) * 70; + if(pThis->iLowWtrMrk == 0) { + pThis->iLowWtrMrk = 1; + } + } + + if( pThis->iMinMsgsPerWrkr < 1 + || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) { + pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads; + } + + if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) { + pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; + if(pThis->iFullDlyMrk == 0) { + pThis->iFullDlyMrk = + (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1; + } + } + if(pThis->iLightDlyMrk == -1 || pThis->iLightDlyMrk > pThis->iMaxQueueSize) { + pThis->iLightDlyMrk = (pThis->iMaxQueueSize / 100) * 70; + if(pThis->iLightDlyMrk == 0) { + pThis->iLightDlyMrk = + (pThis->iMaxQueueSize == 1) ? 1 : pThis->iMaxQueueSize - 1; + } + } + + if(pThis->iDiscardMrk < 1 || pThis->iDiscardMrk > pThis->iMaxQueueSize) { + pThis->iDiscardMrk = (pThis->iMaxQueueSize / 100) * 98; + if(pThis->iDiscardMrk == 0) { + /* for very small queues, we disable this by default */ + pThis->iDiscardMrk = pThis->iMaxQueueSize; + } + } + + if(pThis->iMaxQueueSize > 0 && pThis->iDeqBatchSize > pThis->iMaxQueueSize) { pThis->iDeqBatchSize = pThis->iMaxQueueSize; + } /* finalize some initializations that could not yet be done because it is * influenced by properties which might have been set after queueConstruct () @@ -2120,7 +2210,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pthread_mutex_init(&pThis->mutThrdMgmt, NULL); pthread_cond_init (&pThis->notFull, NULL); - pthread_cond_init (&pThis->notEmpty, NULL); pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL); pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL); @@ -2137,12 +2226,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->iFullDlyMrk = wrk; } - DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, spoolDir '%s', maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, " - "full delay %d, light delay %d, deq batch size %d starting, high wtrrmrk %d, low wtrmrk %d\n", - pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->pszSpoolDir, pThis->iMaxFileSize, pThis->iMaxQueueSize, + DBGOPRINT((obj_t*) pThis, "params: type %d, enq-only %d, disk assisted %d, spoolDir '%s', maxFileSz %lld, " + "maxQSize %d, lqsize %d, pqsize %d, child %d, full delay %d, " + "light delay %d, deq batch size %d, high wtrmrk %d, low wtrmrk %d, " + "discardmrk %d, max wrkr %d, min msgs f. wrkr %d\n", + pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->pszSpoolDir, + pThis->iMaxFileSize, pThis->iMaxQueueSize, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis), pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk, - pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk); + pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk, + pThis->iDiscardMrk, pThis->iNumWorkerThreads, pThis->iMinMsgsPerWrkr); pThis->bQueueStarted = 1; if(pThis->qType == QUEUETYPE_DIRECT) @@ -2159,7 +2252,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg)); CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed)); CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut)); - CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty)); CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads)); CHKiRet(wtpSettoWrkShutdown (pThis->pWtpReg, pThis->toWrkShutdown)); CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis)); @@ -2183,26 +2275,26 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ /* we need to save the queue size, as the stats module initializes it to 0! */ /* iQueueSize is a dual-use counter: no init, no mutex! */ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("size"), - ctrType_Int, &pThis->iQueueSize)); + ctrType_Int, CTR_FLAG_NONE, &pThis->iQueueSize)); STATSCOUNTER_INIT(pThis->ctrEnqueued, pThis->mutCtrEnqueued); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("enqueued"), - ctrType_IntCtr, &pThis->ctrEnqueued)); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrEnqueued)); STATSCOUNTER_INIT(pThis->ctrFull, pThis->mutCtrFull); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"), - ctrType_IntCtr, &pThis->ctrFull)); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFull)); STATSCOUNTER_INIT(pThis->ctrFDscrd, pThis->mutCtrFDscrd); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.full"), - ctrType_IntCtr, &pThis->ctrFDscrd)); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFDscrd)); STATSCOUNTER_INIT(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd); CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.nf"), - ctrType_IntCtr, &pThis->ctrNFDscrd)); + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrNFDscrd)); pThis->ctrMaxqsize = 0; /* no mutex needed, thus no init call */ CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"), - ctrType_Int, &pThis->ctrMaxqsize)); + ctrType_Int, CTR_FLAG_NONE, &pThis->ctrMaxqsize)); CHKiRet(statsobj.ConstructFinalize(pThis->statsobj)); @@ -2424,7 +2516,6 @@ CODESTARTobjDestruct(qqueue) } pthread_mutex_destroy(&pThis->mutThrdMgmt); pthread_cond_destroy(&pThis->notFull); - pthread_cond_destroy(&pThis->notEmpty); pthread_cond_destroy(&pThis->belowFullDlyWtrMrk); pthread_cond_destroy(&pThis->belowLightDlyWtrMrk); @@ -2437,6 +2528,13 @@ CODESTARTobjDestruct(qqueue) free(pThis->pszFilePrefix); free(pThis->pszSpoolDir); + if(pThis->useCryprov) { + pThis->cryprov.Destruct(&pThis->cryprovData); + obj.ReleaseObj(__FILE__, pThis->cryprovNameFull+2, pThis->cryprovNameFull, + (void*) &pThis->cryprov); + free(pThis->cryprovName); + free(pThis->cryprovNameFull); + } /* some queues do not provide stats and thus have no statsobj! */ if(pThis->statsobj != NULL) @@ -2676,13 +2774,14 @@ static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int i; + wti_t *pWti; DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - assert(pMultiSub != NULL); + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; for(i = 0 ; i < pMultiSub->nElem ; ++i) { - CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); + CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); } finalize_it: @@ -2691,22 +2790,7 @@ finalize_it: /* ------------------------------ END multi-enqueue functions ------------------------------ */ -/* enqueue a new user data element in direct mode - * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better - * code later on (like multi submit!) 2010-06-10 - * Enqueues the new element and awakes worker thread. - */ -rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) -{ - DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pMsg); - RETiRet; -} - - -/* enqueue a new user data element +/* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ rsRetVal @@ -2740,27 +2824,67 @@ finalize_it: } -/* take v6 config list and extract the queue params out of it. Hand the - * param values back to the caller. Caller is responsible for destructing - * them when no longer needed. Caller can use this param block to configure - * all parameters for a newly created queue with one call to qqueueSetParams(). - * rgerhards, 2011-07-22 +/* are any queue params set at all? 1 - yes, 0 - no + * We need to evaluate the param block for this function, which is somewhat + * inefficient. HOWEVER, this is only done during config load, so we really + * don't care... -- rgerhards, 2013-05-10 */ -rsRetVal -qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals) +int +queueCnfParamsSet(struct nvlst *lst) { - *ppvals = nvlstGetParams(lst, &pblk, NULL); - return RS_RET_OK; + int r; + struct cnfparamvals *pvals; + + pvals = nvlstGetParams(lst, &pblk, NULL); + r = cnfparamvalsIsSet(&pblk, pvals); + cnfparamvalsDestruct(pvals, &pblk); + return r; } -/* are any queue params set at all? 1 - yes, 0 - no */ -int -queueCnfParamsSet(struct cnfparamvals *pvals) +static inline rsRetVal +initCryprov(qqueue_t *pThis, struct nvlst *lst) { - return cnfparamvalsIsSet(&pblk, pvals); -} + uchar szDrvrName[1024]; + DEFiRet; + + if(snprintf((char*)szDrvrName, sizeof(szDrvrName), "lmcry_%s", pThis->cryprovName) + == sizeof(szDrvrName)) { + errmsg.LogError(0, RS_RET_ERR, "queue: crypto provider " + "name is too long: '%s' - encryption disabled", + pThis->cryprovName); + ABORT_FINALIZE(RS_RET_ERR); + } + pThis->cryprovNameFull = ustrdup(szDrvrName); + pThis->cryprov.ifVersion = cryprovCURR_IF_VERSION; + /* The pDrvrName+2 below is a hack to obtain the object name. It + * safes us to have yet another variable with the name without "lm" in + * front of it. If we change the module load interface, we may re-think + * about this hack, but for the time being it is efficient and clean enough. + */ + if(obj.UseObj(__FILE__, szDrvrName, szDrvrName, (void*) &pThis->cryprov) + != RS_RET_OK) { + errmsg.LogError(0, RS_RET_LOAD_ERROR, "queue: could not load " + "crypto provider '%s' - encryption disabled", + szDrvrName); + ABORT_FINALIZE(RS_RET_CRYPROV_ERR); + } + + if(pThis->cryprov.Construct(&pThis->cryprovData) != RS_RET_OK) { + errmsg.LogError(0, RS_RET_CRYPROV_ERR, "queue: error constructing " + "crypto provider %s dataset - encryption disabled", + szDrvrName); + ABORT_FINALIZE(RS_RET_CRYPROV_ERR); + } + CHKiRet(pThis->cryprov.SetCnfParam(pThis->cryprovData, lst, CRYPROV_PARAMTYPE_DISK)); + + dbgprintf("loaded crypto provider %s, data instance at %p\n", + szDrvrName, pThis->cryprovData); + pThis->useCryprov = 1; +finalize_it: + RETiRet; +} /* apply all params from param block to queue. Must be called before * finalizing. This supports the v6 config system. Defaults were already @@ -2768,15 +2892,34 @@ queueCnfParamsSet(struct cnfparamvals *pvals) * function. */ rsRetVal -qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals) +qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst) { int i; + struct cnfparamvals *pvals; + + pvals = nvlstGetParams(lst, &pblk, NULL); + if(Debug) { + dbgprintf("queue param blk:\n"); + cnfparamsPrint(&pblk, pvals); + } for(i = 0 ; i < pblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; if(!strcmp(pblk.descr[i].name, "queue.filename")) { pThis->pszFilePrefix = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr); + } else if(!strcmp(pblk.descr[i].name, "queue.cry.provider")) { + pThis->cryprovName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + } else if(!strcmp(pblk.descr[i].name, "queue.spooldirectory")) { + free(pThis->pszSpoolDir); + pThis->pszSpoolDir = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); + pThis->lenSpoolDir = es_strlen(pvals[i].val.d.estr); + if(pThis->pszSpoolDir[pThis->lenSpoolDir-1] == '/') { + pThis->pszSpoolDir[pThis->lenSpoolDir-1] = '\0'; + --pThis->lenSpoolDir; + parser_errmsg("queue.spooldirectory must not end with '/', " + "corrected to '%s'", pThis->pszSpoolDir); + } } else if(!strcmp(pblk.descr[i].name, "queue.size")) { pThis->iMaxQueueSize = pvals[i].val.d.n; } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) { @@ -2828,12 +2971,27 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals) "param '%s'\n", pblk.descr[i].name); } } - if(pThis->qType == QUEUETYPE_DISK && pThis->pszFilePrefix == NULL) { - errmsg.LogError(0, RS_RET_QUEUE_DISK_NO_FN, "error on queue '%s', disk mode selected, but " - "no queue file name given; queue type changed to 'linkedList'", + if(pThis->qType == QUEUETYPE_DISK) { + if(pThis->pszFilePrefix == NULL) { + errmsg.LogError(0, RS_RET_QUEUE_DISK_NO_FN, "error on queue '%s', disk mode selected, but " + "no queue file name given; queue type changed to 'linkedList'", + obj.GetName((obj_t*) pThis)); + pThis->qType = QUEUETYPE_LINKEDLIST; + } + } + + if(pThis->pszFilePrefix == NULL && pThis->cryprovName != NULL) { + errmsg.LogError(0, RS_RET_QUEUE_CRY_DISK_ONLY, "error on queue '%s', crypto provider can " + "only be set for disk or disk assisted queue - ignored", obj.GetName((obj_t*) pThis)); - pThis->qType = QUEUETYPE_LINKEDLIST; + free(pThis->cryprovName); + pThis->cryprovName = NULL; } + + if(pThis->cryprovName != NULL) { + initCryprov(pThis, lst); + } + cnfparamvalsDestruct(pvals, &pblk); return RS_RET_OK; } |