summaryrefslogtreecommitdiffstats
path: root/runtime/queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.c')
-rw-r--r--runtime/queue.c255
1 files changed, 160 insertions, 95 deletions
diff --git a/runtime/queue.c b/runtime/queue.c
index 073c6823..8fb734e7 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -12,7 +12,7 @@
* function names - this makes it really hard to read and does not provide much
* benefit, at least I (now) think so...
*
- * Copyright 2008-2011 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
@@ -85,7 +85,6 @@ static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiS
static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg);
static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis);
-static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis);
static rsRetVal qDestructDisk(qqueue_t *pThis);
/* some constants for queuePersist () */
@@ -118,6 +117,7 @@ static struct cnfparamdescr cnfpdescr[] = {
{ "queue.dequeueslowdown", eCmdHdlrInt, 0 },
{ "queue.dequeuetimebegin", eCmdHdlrInt, 0 },
{ "queue.dequeuetimeend", eCmdHdlrInt, 0 },
+ { "queue.cry.provider", eCmdHdlrGetWord, 0 }
};
static struct cnfparamblk pblk =
{ CNFPARAMBLK_VERSION,
@@ -350,16 +350,15 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis)
if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
DBGOPRINT((obj_t*) pThis, "(re)activating DA worker\n");
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
+ }
+ if(getLogicalQueueSize(pThis) == 0) {
+ iMaxWorkers = 0;
+ } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
+ iMaxWorkers = 1;
} else {
- if(getLogicalQueueSize(pThis) == 0) {
- iMaxWorkers = 0;
- } else if(pThis->qType == QUEUETYPE_DISK || pThis->iMinMsgsPerWrkr == 0) {
- iMaxWorkers = 1;
- } else {
- iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
- }
- wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
+ iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
}
+ wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers);
}
RETiRet;
@@ -483,7 +482,6 @@ InitDA(qqueue_t *pThis, int bLockMutex)
CHKiRet(wtpSetpfDoWork (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerDA));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpDA, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpmutUsr (pThis->pWtpDA, pThis->mut));
- CHKiRet(wtpSetpcondBusy (pThis->pWtpDA, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpDA, 1));
CHKiRet(wtpSettoWrkShutdown (pThis->pWtpDA, pThis->toWrkShutdown));
CHKiRet(wtpSetpUsr (pThis->pWtpDA, pThis));
@@ -706,9 +704,13 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError)
pThis->qType = QUEUETYPE_DIRECT;
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
+ /* these entry points shall not be used in direct mode
+ * To catch program errors, make us abort if that happens!
+ * rgerhards, 2013-11-05
+ */
pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ pThis->qDel = NULL;
if(pThis->pqParent != NULL) {
DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n");
pThis->pqParent->bIsDA = 0;
@@ -776,11 +778,19 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
-
/* create a duplicate for the read "pointer". */
CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq));
CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq));
+ /* if we use a crypto provider, we need to amend the objects with it's info */
+ if(pThis->useCryprov) {
+ CHKiRet(strm.Setcryprov(pThis->tVars.disk.pWrite, &pThis->cryprov));
+ CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pWrite, pThis->cryprovData));
+ CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDeq, &pThis->cryprov));
+ CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDeq, pThis->cryprovData));
+ CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDel, &pThis->cryprov));
+ CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDel, pThis->cryprovData));
+ }
CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pWrite));
CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDel));
@@ -834,6 +844,10 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE));
CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR));
+ if(pThis->useCryprov) {
+ CHKiRet(strm.Setcryprov(pThis->tVars.disk.pWrite, &pThis->cryprov));
+ CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pWrite, pThis->cryprovData));
+ }
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite));
CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq));
@@ -842,6 +856,10 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR));
+ if(pThis->useCryprov) {
+ CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDeq, &pThis->cryprov));
+ CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDeq, pThis->cryprovData));
+ }
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq));
CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel));
@@ -851,6 +869,10 @@ static rsRetVal qConstructDisk(qqueue_t *pThis)
CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000));
CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ));
CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR));
+ if(pThis->useCryprov) {
+ CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDel, &pThis->cryprov));
+ CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDel, pThis->cryprovData));
+ }
CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDel));
CHKiRet(strm.SetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix));
@@ -938,13 +960,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis)
return RS_RET_OK;
}
-static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
+static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti)
{
batch_t singleBatch;
batch_obj_t batchObj;
batch_state_t batchState = BATCH_STATE_RDY;
- sbool active = 1;
- int i;
DEFiRet;
//TODO: init batchObj (states _OK and new fields -- CHECK)
@@ -964,46 +984,29 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg)
singleBatch.nElem = 1; /* there always is only one in direct mode */
singleBatch.pElem = &batchObj;
singleBatch.eltState = &batchState;
- singleBatch.active = &active;
- iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate);
- /* delete the batch string params: TODO: create its own "class" for this */
- for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) {
- free(batchObj.staticActStrings[i]);
- }
+ iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti);
msgDestruct(&pMsg);
RETiRet;
}
-/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead
- * otherwise incured. -- rgerhards, ~2010-06-23
+/* this is called if we do not have a pWti. This currently only happens
+ * when we are called from a main queue in direct mode. If so, we need
+ * to obtain a dummy pWti.
*/
-rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch)
+static rsRetVal
+qAddDirect(qqueue_t *pThis, msg_t* pMsg)
{
+ wti_t *pWti;
DEFiRet;
- ASSERT(pThis != NULL);
-
- /* calling the consumer is quite different here than it is from a worker thread */
- /* we need to provide the consumer's return value back to the caller because in direct
- * mode the consumer probably has a lot to convey (which get's lost in the other modes
- * because they are asynchronous. But direct mode is deliberately synchronous.
- * rgerhards, 2008-02-12
- * We use our knowledge about the batch_t structure below, but without that, we
- * pay a too-large performance toll... -- rgerhards, 2009-04-22
- */
- iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL);
-
+ pWti = wtiGetDummy();
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
+ iRet = qAddDirectWithWti(pThis, pMsg, pWti);
RETiRet;
}
-static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
-{
- return RS_RET_OK;
-}
-
-
/* --------------- end type-specific handlers -------------------- */
@@ -1298,7 +1301,7 @@ finalize_it:
* to modify some parameters before the queue is actually started.
*/
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*))
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*))
{
DEFiRet;
qqueue_t *pThis;
@@ -1321,6 +1324,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */
pThis->iQueueSize = 0;
pThis->nLogDeq = 0;
+ pThis->useCryprov = 0;
pThis->iMaxQueueSize = iMaxQueueSize;
pThis->pConsumer = pConsumer;
pThis->iNumWorkerThreads = iWorkerThreads;
@@ -1856,7 +1860,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti)
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave);
- CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate));
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
+ CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti));
/* we now need to check if we should deliberately delay processing a bit
* and, if so, do that. -- rgerhards, 2008-01-30
@@ -2078,9 +2083,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
case QUEUETYPE_DIRECT:
pThis->qConstruct = qConstructDirect;
pThis->qDestruct = qDestructDirect;
+ /* these entry points shall not be used in direct mode
+ * To catch program errors, make us abort if that happens!
+ * rgerhards, 2013-11-05
+ */
pThis->qAdd = qAddDirect;
- pThis->qDel = qDelDirect;
pThis->MultiEnq = qqueueMultiEnqObjDirect;
+ pThis->qDel = NULL;
break;
}
@@ -2118,7 +2127,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
pthread_mutex_init(&pThis->mutThrdMgmt, NULL);
pthread_cond_init (&pThis->notFull, NULL);
- pthread_cond_init (&pThis->notEmpty, NULL);
pthread_cond_init (&pThis->belowFullDlyWtrMrk, NULL);
pthread_cond_init (&pThis->belowLightDlyWtrMrk, NULL);
@@ -2136,11 +2144,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
}
DBGOPRINT((obj_t*) pThis, "type %d, enq-only %d, disk assisted %d, maxFileSz %lld, maxQSize %d, lqsize %d, pqsize %d, child %d, "
- "full delay %d, light delay %d, deq batch size %d starting, high wtrrmrk %d, low wtrmrk %d\n",
+ "full delay %d, light delay %d, deq batch size %d starting, high wtrmrk %d, low wtrmrk %d\n"
+ "max wrkr %d, min msgs f. wrkr %d\n",
pThis->qType, pThis->bEnqOnly, pThis->bIsDA, pThis->iMaxFileSize, pThis->iMaxQueueSize,
getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis),
pThis->pqParent == NULL ? 0 : 1, pThis->iFullDlyMrk, pThis->iLightDlyMrk,
- pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk);
+ pThis->iDeqBatchSize, pThis->iHighWtrMrk, pThis->iLowWtrMrk,
+ pThis->iNumWorkerThreads, pThis->iMinMsgsPerWrkr);
pThis->bQueueStarted = 1;
if(pThis->qType == QUEUETYPE_DIRECT)
@@ -2157,7 +2167,6 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
CHKiRet(wtpSetpfDoWork (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, void *pWti)) ConsumerReg));
CHKiRet(wtpSetpfObjProcessed (pThis->pWtpReg, (rsRetVal (*)(void *pUsr, wti_t *pWti)) batchProcessed));
CHKiRet(wtpSetpmutUsr (pThis->pWtpReg, pThis->mut));
- CHKiRet(wtpSetpcondBusy (pThis->pWtpReg, &pThis->notEmpty));
CHKiRet(wtpSetiNumWorkerThreads (pThis->pWtpReg, pThis->iNumWorkerThreads));
CHKiRet(wtpSettoWrkShutdown (pThis->pWtpReg, pThis->toWrkShutdown));
CHKiRet(wtpSetpUsr (pThis->pWtpReg, pThis));
@@ -2181,26 +2190,26 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */
/* we need to save the queue size, as the stats module initializes it to 0! */
/* iQueueSize is a dual-use counter: no init, no mutex! */
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("size"),
- ctrType_Int, &pThis->iQueueSize));
+ ctrType_Int, CTR_FLAG_NONE, &pThis->iQueueSize));
STATSCOUNTER_INIT(pThis->ctrEnqueued, pThis->mutCtrEnqueued);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("enqueued"),
- ctrType_IntCtr, &pThis->ctrEnqueued));
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrEnqueued));
STATSCOUNTER_INIT(pThis->ctrFull, pThis->mutCtrFull);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("full"),
- ctrType_IntCtr, &pThis->ctrFull));
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFull));
STATSCOUNTER_INIT(pThis->ctrFDscrd, pThis->mutCtrFDscrd);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.full"),
- ctrType_IntCtr, &pThis->ctrFDscrd));
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrFDscrd));
STATSCOUNTER_INIT(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd);
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("discarded.nf"),
- ctrType_IntCtr, &pThis->ctrNFDscrd));
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &pThis->ctrNFDscrd));
pThis->ctrMaxqsize = 0; /* no mutex needed, thus no init call */
CHKiRet(statsobj.AddCounter(pThis->statsobj, UCHAR_CONSTANT("maxqsize"),
- ctrType_Int, &pThis->ctrMaxqsize));
+ ctrType_Int, CTR_FLAG_NONE, &pThis->ctrMaxqsize));
CHKiRet(statsobj.ConstructFinalize(pThis->statsobj));
@@ -2422,7 +2431,6 @@ CODESTARTobjDestruct(qqueue)
}
pthread_mutex_destroy(&pThis->mutThrdMgmt);
pthread_cond_destroy(&pThis->notFull);
- pthread_cond_destroy(&pThis->notEmpty);
pthread_cond_destroy(&pThis->belowFullDlyWtrMrk);
pthread_cond_destroy(&pThis->belowLightDlyWtrMrk);
@@ -2435,6 +2443,13 @@ CODESTARTobjDestruct(qqueue)
free(pThis->pszFilePrefix);
free(pThis->pszSpoolDir);
+ if(pThis->useCryprov) {
+ pThis->cryprov.Destruct(&pThis->cryprovData);
+ obj.ReleaseObj(__FILE__, pThis->cryprovNameFull+2, pThis->cryprovNameFull,
+ (void*) &pThis->cryprov);
+ free(pThis->cryprovName);
+ free(pThis->cryprovNameFull);
+ }
/* some queues do not provide stats and thus have no statsobj! */
if(pThis->statsobj != NULL)
@@ -2656,13 +2671,14 @@ static rsRetVal
qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub)
{
int i;
+ wti_t *pWti;
DEFiRet;
- ISOBJ_TYPE_assert(pThis, qqueue);
- assert(pMultiSub != NULL);
+ pWti = wtiGetDummy();
+ pWti->pbShutdownImmediate = &pThis->bShutdownImmediate;
for(i = 0 ; i < pMultiSub->nElem ; ++i) {
- CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i]));
+ CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti));
}
finalize_it:
@@ -2671,22 +2687,7 @@ finalize_it:
/* ------------------------------ END multi-enqueue functions ------------------------------ */
-/* enqueue a new user data element in direct mode
- * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better
- * code later on (like multi submit!) 2010-06-10
- * Enqueues the new element and awakes worker thread.
- */
-rsRetVal
-qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg)
-{
- DEFiRet;
- ISOBJ_TYPE_assert(pThis, qqueue);
- iRet = qAddDirect(pThis, pMsg);
- RETiRet;
-}
-
-
-/* enqueue a new user data element
+/* enqueue a new user data element
* Enqueues the new element and awakes worker thread.
*/
rsRetVal
@@ -2720,27 +2721,67 @@ finalize_it:
}
-/* take v6 config list and extract the queue params out of it. Hand the
- * param values back to the caller. Caller is responsible for destructing
- * them when no longer needed. Caller can use this param block to configure
- * all parameters for a newly created queue with one call to qqueueSetParams().
- * rgerhards, 2011-07-22
+/* are any queue params set at all? 1 - yes, 0 - no
+ * We need to evaluate the param block for this function, which is somewhat
+ * inefficient. HOWEVER, this is only done during config load, so we really
+ * don't care... -- rgerhards, 2013-05-10
*/
-rsRetVal
-qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals)
+int
+queueCnfParamsSet(struct nvlst *lst)
{
- *ppvals = nvlstGetParams(lst, &pblk, NULL);
- return RS_RET_OK;
+ int r;
+ struct cnfparamvals *pvals;
+
+ pvals = nvlstGetParams(lst, &pblk, NULL);
+ r = cnfparamvalsIsSet(&pblk, pvals);
+ cnfparamvalsDestruct(pvals, &pblk);
+ return r;
}
-/* are any queue params set at all? 1 - yes, 0 - no */
-int
-queueCnfParamsSet(struct cnfparamvals *pvals)
+static inline rsRetVal
+initCryprov(qqueue_t *pThis, struct nvlst *lst)
{
- return cnfparamvalsIsSet(&pblk, pvals);
-}
+ uchar szDrvrName[1024];
+ DEFiRet;
+ if(snprintf((char*)szDrvrName, sizeof(szDrvrName), "lmcry_%s", pThis->cryprovName)
+ == sizeof(szDrvrName)) {
+ errmsg.LogError(0, RS_RET_ERR, "queue: crypto provider "
+ "name is too long: '%s' - encryption disabled",
+ pThis->cryprovName);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ pThis->cryprovNameFull = ustrdup(szDrvrName);
+
+ pThis->cryprov.ifVersion = cryprovCURR_IF_VERSION;
+ /* The pDrvrName+2 below is a hack to obtain the object name. It
+ * safes us to have yet another variable with the name without "lm" in
+ * front of it. If we change the module load interface, we may re-think
+ * about this hack, but for the time being it is efficient and clean enough.
+ */
+ if(obj.UseObj(__FILE__, szDrvrName, szDrvrName, (void*) &pThis->cryprov)
+ != RS_RET_OK) {
+ errmsg.LogError(0, RS_RET_LOAD_ERROR, "queue: could not load "
+ "crypto provider '%s' - encryption disabled",
+ szDrvrName);
+ ABORT_FINALIZE(RS_RET_CRYPROV_ERR);
+ }
+
+ if(pThis->cryprov.Construct(&pThis->cryprovData) != RS_RET_OK) {
+ errmsg.LogError(0, RS_RET_CRYPROV_ERR, "queue: error constructing "
+ "crypto provider %s dataset - encryption disabled",
+ szDrvrName);
+ ABORT_FINALIZE(RS_RET_CRYPROV_ERR);
+ }
+ CHKiRet(pThis->cryprov.SetCnfParam(pThis->cryprovData, lst, CRYPROV_PARAMTYPE_DISK));
+
+ dbgprintf("loaded crypto provider %s, data instance at %p\n",
+ szDrvrName, pThis->cryprovData);
+ pThis->useCryprov = 1;
+finalize_it:
+ RETiRet;
+}
/* apply all params from param block to queue. Must be called before
* finalizing. This supports the v6 config system. Defaults were already
@@ -2748,15 +2789,24 @@ queueCnfParamsSet(struct cnfparamvals *pvals)
* function.
*/
rsRetVal
-qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals)
+qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst)
{
int i;
+ struct cnfparamvals *pvals;
+
+ pvals = nvlstGetParams(lst, &pblk, NULL);
+ if(Debug) {
+ dbgprintf("queue param blk:\n");
+ cnfparamsPrint(&pblk, pvals);
+ }
for(i = 0 ; i < pblk.nParams ; ++i) {
if(!pvals[i].bUsed)
continue;
if(!strcmp(pblk.descr[i].name, "queue.filename")) {
pThis->pszFilePrefix = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr);
+ } else if(!strcmp(pblk.descr[i].name, "queue.cry.provider")) {
+ pThis->cryprovName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(pblk.descr[i].name, "queue.size")) {
pThis->iMaxQueueSize = pvals[i].val.d.n;
} else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) {
@@ -2808,12 +2858,27 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals)
"param '%s'\n", pblk.descr[i].name);
}
}
- if(pThis->qType == QUEUETYPE_DISK && pThis->pszFilePrefix == NULL) {
- errmsg.LogError(0, RS_RET_QUEUE_DISK_NO_FN, "error on queue '%s', disk mode selected, but "
- "no queue file name given; queue type changed to 'linkedList'",
+ if(pThis->qType == QUEUETYPE_DISK) {
+ if(pThis->pszFilePrefix == NULL) {
+ errmsg.LogError(0, RS_RET_QUEUE_DISK_NO_FN, "error on queue '%s', disk mode selected, but "
+ "no queue file name given; queue type changed to 'linkedList'",
+ obj.GetName((obj_t*) pThis));
+ pThis->qType = QUEUETYPE_LINKEDLIST;
+ }
+ }
+
+ if(pThis->pszFilePrefix == NULL && pThis->cryprovName != NULL) {
+ errmsg.LogError(0, RS_RET_QUEUE_CRY_DISK_ONLY, "error on queue '%s', crypto provider can "
+ "only be set for disk or disk assisted queue - ignored",
obj.GetName((obj_t*) pThis));
- pThis->qType = QUEUETYPE_LINKEDLIST;
+ free(pThis->cryprovName);
+ pThis->cryprovName = NULL;
+ }
+
+ if(pThis->cryprovName != NULL) {
+ initCryprov(pThis, lst);
}
+
cnfparamvalsDestruct(pvals, &pblk);
return RS_RET_OK;
}