diff options
Diffstat (limited to 'runtime/queue.c')
-rw-r--r-- | runtime/queue.c | 376 |
1 files changed, 225 insertions, 151 deletions
diff --git a/runtime/queue.c b/runtime/queue.c index 8d8d8e0a..699e2a66 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -12,7 +12,7 @@ * function names - this makes it really hard to read and does not provide much * benefit, at least I (now) think so... * - * Copyright 2008-2011 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -59,7 +59,6 @@ #include "datetime.h" #include "unicode-helper.h" #include "statsobj.h" -#include "msg.h" /* TODO: remove once we remove MsgAddRef() call */ #ifdef OS_SOLARIS # include <sched.h> @@ -74,7 +73,7 @@ DEFobjCurrIf(datetime) DEFobjCurrIf(statsobj) /* forward-definitions */ -static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr); +static inline rsRetVal doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg); static rsRetVal qqueueChkPersist(qqueue_t *pThis, int nUpdates); static rsRetVal RateLimiter(qqueue_t *pThis); static int qqueueChkStopWrkrDA(qqueue_t *pThis); @@ -83,7 +82,7 @@ static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti); static rsRetVal batchProcessed(qqueue_t *pThis, wti_t *pWti); static rsRetVal qqueueMultiEnqObjNonDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub); -static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr); +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); @@ -119,6 +118,7 @@ static struct cnfparamdescr cnfpdescr[] = { { "queue.dequeueslowdown", eCmdHdlrInt, 0 }, { "queue.dequeuetimebegin", eCmdHdlrInt, 0 }, { "queue.dequeuetimeend", eCmdHdlrInt, 0 }, + { "queue.cry.provider", eCmdHdlrGetWord, 0 } }; static struct cnfparamblk pblk = { CNFPARAMBLK_VERSION, @@ -131,7 +131,7 @@ static inline void displayBatchState(batch_t *pBatch) { int i; for(i = 0 ; i < pBatch->nElem ; ++i) { - DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->pElem[i].state); + DBGPRINTF("displayBatchState %p[%d]: %d\n", pBatch, i, pBatch->eltState[i]); } } @@ -242,8 +242,8 @@ getQueueTypeName(queueType_t t) case QUEUETYPE_DIRECT: r = "Direct"; break; - default: - r = "unknown queue type"; + default: + r = "invalid/unknown queue mode"; break; } return r; @@ -315,16 +315,16 @@ getLogicalQueueSize(qqueue_t *pThis) */ static inline void queueDrain(qqueue_t *pThis) { - void *pUsr; + msg_t *pMsg; ASSERT(pThis != NULL); BEGINfunc DBGOPRINT((obj_t*) pThis, "queue (type %d) will lose %d messages, destroying...\n", pThis->qType, pThis->iQueueSize); /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ while(ATOMIC_DEC_AND_FETCH(&pThis->iQueueSize, &pThis->mutQueueSize) > 0) { - pThis->qDeq(pThis, &pUsr); - if(pUsr != NULL) { - objDestruct(pUsr); + pThis->qDeq(pThis, &pMsg); + if(pMsg != NULL) { + msgDestruct(&pMsg); } pThis->qDel(pThis); } @@ -414,7 +414,7 @@ StartDA(qqueue_t *pThis) */ pThis->pqDA->pqParent = pThis; - CHKiRet(qqueueSetpUsr(pThis->pqDA, pThis->pUsr)); + CHKiRet(qqueueSetpAction(pThis->pqDA, pThis->pAction)); CHKiRet(qqueueSetsizeOnDiskMax(pThis->pqDA, pThis->sizeOnDiskMax)); CHKiRet(qqueueSetiDeqSlowdown(pThis->pqDA, pThis->iDeqSlowdown)); CHKiRet(qqueueSetMaxFileSize(pThis->pqDA, pThis->iMaxFileSize)); @@ -549,7 +549,7 @@ static rsRetVal qDestructFixedArray(qqueue_t *pThis) } -static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) +static rsRetVal qAddFixedArray(qqueue_t *pThis, msg_t* in) { DEFiRet; @@ -563,7 +563,7 @@ static rsRetVal qAddFixedArray(qqueue_t *pThis, void* in) } -static rsRetVal qDeqFixedArray(qqueue_t *pThis, void **out) +static rsRetVal qDeqFixedArray(qqueue_t *pThis, msg_t **out) { DEFiRet; @@ -624,7 +624,7 @@ static rsRetVal qDestructLinkedList(qqueue_t __attribute__((unused)) *pThis) RETiRet; } -static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddLinkedList(qqueue_t *pThis, msg_t* pMsg) { qLinkedList_t *pEntry; DEFiRet; @@ -632,7 +632,7 @@ static rsRetVal qAddLinkedList(qqueue_t *pThis, void* pUsr) CHKmalloc((pEntry = (qLinkedList_t*) MALLOC(sizeof(qLinkedList_t)))); pEntry->pNext = NULL; - pEntry->pUsr = pUsr; + pEntry->pMsg = pMsg; if(pThis->tVars.linklist.pDelRoot == NULL) { pThis->tVars.linklist.pDelRoot = pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pLast = pEntry; @@ -650,14 +650,13 @@ finalize_it: } -static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr) +static rsRetVal qDeqLinkedList(qqueue_t *pThis, msg_t **ppMsg) { qLinkedList_t *pEntry; DEFiRet; pEntry = pThis->tVars.linklist.pDeqRoot; - ISOBJ_TYPE_assert(pEntry->pUsr, msg); - *ppUsr = pEntry->pUsr; + *ppMsg = pEntry->pMsg; pThis->tVars.linklist.pDeqRoot = pEntry->pNext; RETiRet; @@ -747,18 +746,12 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) { DEFiRet; strm_t *psQIF = NULL; - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; struct stat stat_buf; ISOBJ_TYPE_assert(pThis, qqueue); - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - /* check if the file exists */ - if(stat((char*) pszQIFNam, &stat_buf) == -1) { + if(stat((char*) pThis->pszQIFNam, &stat_buf) == -1) { if(errno == ENOENT) { DBGOPRINT((obj_t*) pThis, "clean startup, no .qi file found\n"); ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND); @@ -773,7 +766,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) CHKiRet(strm.Construct(&psQIF)); CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_READ)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); /* first, we try to read the property bag for ourselfs */ @@ -784,13 +777,19 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis) (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF, (rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis)); - - /* create a duplicate for the read "pointer". - */ - + /* create a duplicate for the read "pointer". */ CHKiRet(strm.Dup(pThis->tVars.disk.pReadDel, &pThis->tVars.disk.pReadDeq)); CHKiRet(strm.SetbDeleteOnClose(pThis->tVars.disk.pReadDeq, 0)); /* deq must NOT delete the files! */ CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq)); + /* if we use a crypto provider, we need to amend the objects with it's info */ + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pWrite, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pWrite, pThis->cryprovData)); + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDeq, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDeq, pThis->cryprovData)); + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDel, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDel, pThis->cryprovData)); + } CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pWrite)); CHKiRet(strm.SeekCurrOffs(pThis->tVars.disk.pReadDel)); @@ -806,7 +805,7 @@ finalize_it: strm.Destruct(&psQIF); if(iRet != RS_RET_OK) { - DBGOPRINT((obj_t*) pThis, "error %d reading .qi file - can not read persisted info (if any)\n", + DBGOPRINT((obj_t*) pThis, "state %d reading .qi file - can not read persisted info (if any)\n", iRet); } @@ -844,6 +843,10 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pWrite, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pWrite, STREAMMODE_WRITE)); CHKiRet(strm.SetsType(pThis->tVars.disk.pWrite, STREAMTYPE_FILE_CIRCULAR)); + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pWrite, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pWrite, pThis->cryprovData)); + } CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pWrite)); CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDeq)); @@ -852,6 +855,10 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDeq, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDeq, STREAMMODE_READ)); CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDeq, STREAMTYPE_FILE_CIRCULAR)); + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDeq, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDeq, pThis->cryprovData)); + } CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDeq)); CHKiRet(strm.Construct(&pThis->tVars.disk.pReadDel)); @@ -861,6 +868,10 @@ static rsRetVal qConstructDisk(qqueue_t *pThis) CHKiRet(strm.SetiMaxFiles(pThis->tVars.disk.pReadDel, 10000000)); CHKiRet(strm.SettOperationsMode(pThis->tVars.disk.pReadDel, STREAMMODE_READ)); CHKiRet(strm.SetsType(pThis->tVars.disk.pReadDel, STREAMTYPE_FILE_CIRCULAR)); + if(pThis->useCryprov) { + CHKiRet(strm.Setcryprov(pThis->tVars.disk.pReadDel, &pThis->cryprov)); + CHKiRet(strm.SetcryprovData(pThis->tVars.disk.pReadDel, pThis->cryprovData)); + } CHKiRet(strm.ConstructFinalize(pThis->tVars.disk.pReadDel)); CHKiRet(strm.SetFName(pThis->tVars.disk.pWrite, pThis->pszFilePrefix, pThis->lenFilePrefix)); @@ -898,7 +909,7 @@ static rsRetVal qDestructDisk(qqueue_t *pThis) RETiRet; } -static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddDisk(qqueue_t *pThis, msg_t* pMsg) { DEFiRet; number_t nWriteCount; @@ -906,7 +917,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) ASSERT(pThis != NULL); CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, &nWriteCount)); - CHKiRet((objSerialize(pUsr))(pUsr, pThis->tVars.disk.pWrite)); + CHKiRet((objSerialize(pMsg))(pMsg, pThis->tVars.disk.pWrite)); CHKiRet(strm.Flush(pThis->tVars.disk.pWrite)); CHKiRet(strm.SetWCntr(pThis->tVars.disk.pWrite, NULL)); /* no more counting for now... */ @@ -916,7 +927,7 @@ static rsRetVal qAddDisk(qqueue_t *pThis, void* pUsr) * the in-memory representation. The instance will be re-created upon * dequeue. -- rgerhards, 2008-07-09 */ - objDestruct(pUsr); + msgDestruct(&pMsg); DBGOPRINT((obj_t*) pThis, "write wrote %lld octets to disk, queue disk size now %lld octets, EnqOnly:%d\n", nWriteCount, pThis->tVars.disk.sizeOnDisk, pThis->bEnqOnly); @@ -926,43 +937,11 @@ finalize_it: } -static rsRetVal qDeqDisk(qqueue_t *pThis, void **ppUsr) +static rsRetVal qDeqDisk(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; - iRet = obj.Deserialize(ppUsr, (uchar*) "msg", pThis->tVars.disk.pReadDeq, NULL, NULL); - RETiRet; -} - - -static rsRetVal qDelDisk(qqueue_t *pThis) -{ - obj_t *pDummyObj; /* we need to deserialize it... */ - DEFiRet; - - int64 offsIn; - int64 offsOut; - - CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsIn)); - CHKiRet(obj.Deserialize(&pDummyObj, (uchar*) "msg", pThis->tVars.disk.pReadDel, NULL, NULL)); - objDestruct(pDummyObj); - CHKiRet(strm.GetCurrOffset(pThis->tVars.disk.pReadDel, &offsOut)); - - /* This time it is a bit tricky: we free disk space only upon file deletion. So we need - * to keep track of what we have read until we get an out-offset that is lower than the - * in-offset (which indicates file change). Then, we can subtract the whole thing from - * the on-disk size. -- rgerhards, 2008-01-30 - */ - if(offsIn < offsOut) { - pThis->tVars.disk.bytesRead += offsOut - offsIn; - } else { - pThis->tVars.disk.sizeOnDisk -= pThis->tVars.disk.bytesRead; - pThis->tVars.disk.bytesRead = offsOut; - DBGOPRINT((obj_t*) pThis, "a file has been deleted, now %lld octets disk space used\n", pThis->tVars.disk.sizeOnDisk); - /* awake possibly waiting enq process */ - pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ - } - -finalize_it: + iRet = objDeserializeWithMethods(ppMsg, (uchar*) "msg", 3, pThis->tVars.disk.pReadDeq, NULL, + NULL, msgConstructForDeserializer, NULL, MsgDeserialize); RETiRet; } @@ -979,10 +958,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) +static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) { batch_t singleBatch; batch_obj_t batchObj; + batch_state_t batchState = BATCH_STATE_RDY; sbool active = 1; int i; DEFiRet; @@ -1000,17 +980,17 @@ static rsRetVal qAddDirect(qqueue_t *pThis, void* pUsr) */ memset(&batchObj, 0, sizeof(batch_obj_t)); memset(&singleBatch, 0, sizeof(batch_t)); - batchObj.state = BATCH_STATE_RDY; - batchObj.pUsrp = (obj_t*) pUsr; + batchObj.pMsg = pMsg; singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; + singleBatch.eltState = &batchState; singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pUsr, &singleBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); /* delete the batch string params: TODO: create its own "class" for this */ for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { free(batchObj.staticActStrings[i]); } - objDestruct(pUsr); + msgDestruct(&pMsg); RETiRet; } @@ -1032,7 +1012,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) * We use our knowledge about the batch_t structure below, but without that, we * pay a too-large performance toll... -- rgerhards, 2009-04-22 */ - iRet = pThis->pConsumer(pThis->pUsr, pBatch, &pThis->bShutdownImmediate); + iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL); RETiRet; } @@ -1053,13 +1033,13 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) * things truely different. -- rgerhards, 2008-02-12 */ static rsRetVal -qqueueAdd(qqueue_t *pThis, void *pUsr) +qqueueAdd(qqueue_t *pThis, msg_t *pMsg) { DEFiRet; ASSERT(pThis != NULL); - CHKiRet(pThis->qAdd(pThis, pUsr)); + CHKiRet(pThis->qAdd(pThis, pMsg)); if(pThis->qType != QUEUETYPE_DIRECT) { ATOMIC_INC(&pThis->iQueueSize, &pThis->mutQueueSize); @@ -1075,7 +1055,7 @@ finalize_it: /* generic code to dequeue a queue entry */ static rsRetVal -qqueueDeq(qqueue_t *pThis, void **ppUsr) +qqueueDeq(qqueue_t *pThis, msg_t **ppMsg) { DEFiRet; @@ -1086,7 +1066,7 @@ qqueueDeq(qqueue_t *pThis, void **ppUsr) * If we decrement, however, we may lose a message. But that is better than * losing the whole process because it loops... -- rgerhards, 2008-01-03 */ - iRet = pThis->qDeq(pThis, ppUsr); + iRet = pThis->qDeq(pThis, ppMsg); ATOMIC_INC(&pThis->nLogDeq, &pThis->mutLogDeq); // DBGOPRINT((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n", @@ -1184,11 +1164,11 @@ tryShutdownWorkersWithinActionTimeout(qqueue_t *pThis) rsRetVal iRetLocal; DEFiRet; -RUNLOG_STR("trying to shutdown workers within Action Timeout"); ISOBJ_TYPE_assert(pThis, qqueue); ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */ /* instruct workers to finish ASAP, even if still work exists */ + DBGOPRINT((obj_t*) pThis, "trying to shutdown workers within Action Timeout"); DBGOPRINT((obj_t*) pThis, "setting EnqOnly mode\n"); pThis->bEnqOnly = 1; pThis->bShutdownImmediate = 1; @@ -1332,8 +1312,6 @@ finalize_it: RETiRet; } - - /* Constructor for the queue object * This constructs the data structure, but does not yet start the queue. That * is done by queueStart(). The reason is that we want to give the caller a chance @@ -1363,6 +1341,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread pThis->iMaxFileSize = 1024 * 1024; /* default is 1 MiB */ pThis->iQueueSize = 0; pThis->nLogDeq = 0; + pThis->useCryprov = 0; pThis->iMaxQueueSize = iMaxQueueSize; pThis->pConsumer = pConsumer; pThis->iNumWorkerThreads = iWorkerThreads; @@ -1459,22 +1438,21 @@ qqueueSetDefaultsRulesetQueue(qqueue_t *pThis) * the return state! * rgerhards, 2008-01-24 */ -static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, void *pUsr) +static int qqueueChkDiscardMsg(qqueue_t *pThis, int iQueueSize, msg_t *pMsg) { DEFiRet; rsRetVal iRetLocal; int iSeverity; ISOBJ_TYPE_assert(pThis, qqueue); - ISOBJ_assert(pUsr); if(pThis->iDiscardMrk > 0 && iQueueSize >= pThis->iDiscardMrk) { - iRetLocal = objGetSeverity(pUsr, &iSeverity); + iRetLocal = MsgGetSeverity(pMsg, &iSeverity); if(iRetLocal == RS_RET_OK && iSeverity >= pThis->iDiscardSeverity) { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), discarded severity %d message\n", iQueueSize, iSeverity); STATSCOUNTER_INC(pThis->ctrNFDscrd, pThis->mutCtrNFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { DBGOPRINT((obj_t*) pThis, "queue nearly full (%d entries), but could not drop msg " @@ -1493,13 +1471,32 @@ static inline rsRetVal DoDeleteBatchFromQStore(qqueue_t *pThis, int nElem) { int i; + off64_t bytesDel; DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); /* now send delete request to storage driver */ - for(i = 0 ; i < nElem ; ++i) { - pThis->qDel(pThis); + if(pThis->qType == QUEUETYPE_DISK) { + strmMultiFileSeek(pThis->tVars.disk.pReadDel, pThis->tVars.disk.deqFileNumOut, + pThis->tVars.disk.deqOffs, &bytesDel); + /* We need to correct the on-disk file size. This time it is a bit tricky: + * we free disk space only upon file deletion. So we need to keep track of what we + * have read until we get an out-offset that is lower than the in-offset (which + * indicates file change). Then, we can subtract the whole thing from the on-disk + * size. -- rgerhards, 2008-01-30 + */ + if(bytesDel != 0) { + pThis->tVars.disk.sizeOnDisk -= bytesDel; + DBGOPRINT((obj_t*) pThis, "a %lld octet file has been deleted, now %lld octets disk " + "space used\n", bytesDel, pThis->tVars.disk.sizeOnDisk); + /* awake possibly waiting enq process */ + pthread_cond_signal(&pThis->notFull); /* we hold the mutex while we are in here! */ + } + } else { /* memory queue */ + for(i = 0 ; i < nElem ; ++i) { + pThis->qDel(pThis); + } } /* iQueueSize is not decremented by qDel(), so we need to do it ourselves */ @@ -1560,7 +1557,7 @@ static inline rsRetVal DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) { int i; - void *pUsr; + msg_t *pMsg; int nEnqueued = 0; rsRetVal localRet; DEFiRet; @@ -1569,17 +1566,16 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) assert(pBatch != NULL); for(i = 0 ; i < pBatch->nElem ; ++i) { - pUsr = pBatch->pElem[i].pUsrp; - if( pBatch->pElem[i].state == BATCH_STATE_RDY - || pBatch->pElem[i].state == BATCH_STATE_SUB) { - localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, - (obj_t*)MsgAddRef((msg_t*) pUsr)); + pMsg = pBatch->pElem[i].pMsg; + if( pBatch->eltState[i] == BATCH_STATE_RDY + || pBatch->eltState[i] == BATCH_STATE_SUB) { + localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); ++nEnqueued; if(localRet != RS_RET_OK) { DBGPRINTF("error %d re-enqueuing unprocessed data element - discarded\n", localRet); } } - objDestruct(pUsr); + msgDestruct(&pMsg); } DBGPRINTF("we deleted %d objects and enqueued %d objects\n", i-nEnqueued, nEnqueued); @@ -1611,7 +1607,7 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz int nDiscarded; int nDeleted; int iQueueSize; - void *pUsr; + msg_t *pMsg; rsRetVal localRet; DEFiRet; @@ -1619,11 +1615,14 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz DeleteProcessedBatch(pThis, &pWti->batch); nDequeued = nDiscarded = 0; + if(pThis->qType == QUEUETYPE_DISK) { + pThis->tVars.disk.deqFileNumIn = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq); + } while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) { - CHKiRet(qqueueDeq(pThis, &pUsr)); + CHKiRet(qqueueDeq(pThis, &pMsg)); /* check if we should discard this element */ - localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr); + localRet = qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg); if(localRet == RS_RET_QUEUE_FULL) { ++nDiscarded; continue; @@ -1632,11 +1631,16 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz } /* all well, use this element */ - pWti->batch.pElem[nDequeued].pUsrp = pUsr; - pWti->batch.pElem[nDequeued].state = BATCH_STATE_RDY; + pWti->batch.pElem[nDequeued].pMsg = pMsg; + pWti->batch.eltState[nDequeued] = BATCH_STATE_RDY; ++nDequeued; } + if(pThis->qType == QUEUETYPE_DISK) { + strm.GetCurrOffset(pThis->tVars.disk.pReadDeq, &pThis->tVars.disk.deqOffs); + pThis->tVars.disk.deqFileNumOut = strmGetCurrFileNum(pThis->tVars.disk.pReadDeq); + } + /* it is sufficient to persist only when the bulk of work is done */ qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted); @@ -1644,7 +1648,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz pWti->batch.nElemDeq = nDequeued + nDiscarded; pWti->batch.deqID = getNextDeqID(pThis); *piRemainingQueueSize = iQueueSize; - finalize_it: RETiRet; } @@ -1680,7 +1683,6 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) pthread_cond_broadcast(&pThis->belowLightDlyWtrMrk); } - // TODO: MULTI: check physical queue size? pthread_cond_signal(&pThis->notFull); /* WE ARE NO LONGER PROTECTED BY THE MUTEX */ @@ -1874,7 +1876,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) /* at this spot, we may be cancelled */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pUsr, &pWti->batch, &pThis->bShutdownImmediate)); + + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -1932,13 +1935,9 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti) /* iterate over returned results and enqueue them in DA queue */ for(i = 0 ; i < pWti->batch.nElem && !pThis->bShutdownImmediate ; i++) { - /* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs - * the message. So far, we simply assume we always have msg_t, what currently is always the case. - * rgerhards, 2009-05-28 - */ - CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, - (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp)))); - pWti->batch.pElem[i].state = BATCH_STATE_COMM; /* commited to other queue! */ + CHKiRet(qqueueEnqMsg(pThis->pqDA, eFLOWCTL_NO_DELAY, + MsgAddRef(pWti->batch.pElem[i].pMsg))); + pWti->batch.eltState[i] = BATCH_STATE_COMM; /* commited to other queue! */ } /* but now cancellation is no longer permitted */ @@ -2018,6 +2017,7 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ { DEFiRet; uchar pszBuf[64]; + uchar pszQIFNam[MAXFNAME]; int wrk; uchar *qName; size_t lenBuf; @@ -2040,8 +2040,8 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qConstruct = qConstructLinkedList; pThis->qDestruct = qDestructLinkedList; pThis->qAdd = qAddLinkedList; - pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList; - pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList; + pThis->qDeq = qDeqLinkedList; + pThis->qDel = qDelLinkedList; pThis->MultiEnq = qqueueMultiEnqObjNonDirect; break; case QUEUETYPE_DISK: @@ -2049,10 +2049,16 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->qDestruct = qDestructDisk; pThis->qAdd = qAddDisk; pThis->qDeq = qDeqDisk; - pThis->qDel = qDelDisk; + pThis->qDel = NULL; /* delete for disk handled via special code! */ pThis->MultiEnq = qqueueMultiEnqObjNonDirect; /* special handling */ pThis->iNumWorkerThreads = 1; /* we need exactly one worker */ + /* pre-construct file name for .qi file */ + pThis->lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), + "%s/%s.qi", (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); + pThis->pszQIFNam = ustrdup(pszQIFNam); + DBGOPRINT((obj_t*) pThis, ".qi file name is '%s', len %d\n", pThis->pszQIFNam, + (int) pThis->lenQIFNam); break; case QUEUETYPE_DIRECT: pThis->qConstruct = qConstructDirect; @@ -2179,7 +2185,7 @@ finalize_it: } -/* persist the queue to disk. If we have something to persist, we first +/* persist the queue to disk (write the .qi file). If we have something to persist, we first * save the information on the queue properties itself and then we call * the queue-type specific drivers. * Variable bIsCheckpoint is set to 1 if the persist is for a checkpoint, @@ -2190,8 +2196,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) { DEFiRet; strm_t *psQIF = NULL; /* Queue Info File */ - uchar pszQIFNam[MAXFNAME]; - size_t lenQIFNam; ASSERT(pThis != NULL); @@ -2209,13 +2213,9 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) DBGOPRINT((obj_t*) pThis, "persisting queue to disk, %d entries...\n", getPhysicalQueueSize(pThis)); - /* Construct file name */ - lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi", - (char*) glbl.GetWorkDir(), (char*)pThis->pszFilePrefix); - if((bIsCheckpoint != QUEUE_CHECKPOINT) && (getPhysicalQueueSize(pThis) == 0)) { if(pThis->bNeedDelQIF) { - unlink((char*)pszQIFNam); + unlink((char*)pThis->pszQIFNam); pThis->bNeedDelQIF = 0; } /* indicate spool file needs to be deleted */ @@ -2228,7 +2228,7 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) CHKiRet(strm.SettOperationsMode(psQIF, STREAMMODE_WRITE_TRUNC)); CHKiRet(strm.SetbSync(psQIF, pThis->bSyncQueueFiles)); CHKiRet(strm.SetsType(psQIF, STREAMTYPE_FILE_SINGLE)); - CHKiRet(strm.SetFName(psQIF, pszQIFNam, lenQIFNam)); + CHKiRet(strm.SetFName(psQIF, pThis->pszQIFNam, pThis->lenQIFNam)); CHKiRet(strm.ConstructFinalize(psQIF)); /* first, write the property bag for ourselfs @@ -2240,7 +2240,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint) CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis)); objSerializeSCALAR(psQIF, iQueueSize, INT); objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64); - objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64); CHKiRet(obj.EndSerialize(psQIF)); /* now persist the stream info */ @@ -2412,6 +2411,13 @@ CODESTARTobjDestruct(qqueue) free(pThis->pszFilePrefix); free(pThis->pszSpoolDir); + if(pThis->useCryprov) { + pThis->cryprov.Destruct(&pThis->cryprovData); + obj.ReleaseObj(__FILE__, pThis->cryprovNameFull+2, pThis->cryprovNameFull, + (void*) &pThis->cryprov); + free(pThis->cryprovName); + free(pThis->cryprovNameFull); + } /* some queues do not provide stats and thus have no statsobj! */ if(pThis->statsobj != NULL) @@ -2470,7 +2476,7 @@ finalize_it: * rgerhards, 2009-06-16 */ static inline rsRetVal -doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) { DEFiRet; int err; @@ -2479,7 +2485,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) STATSCOUNTER_INC(pThis->ctrEnqueued, pThis->mutCtrEnqueued); /* first check if we need to discard this message (which will cause CHKiRet() to exit) */ - CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pUsr)); + CHKiRet(qqueueChkDiscardMsg(pThis, pThis->iQueueSize, pMsg)); /* handle flow control * There are two different flow control mechanisms: basic and advanced flow control. @@ -2556,7 +2562,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) if(pThis->toEnq == 0 || pThis->bEnqOnly) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - configured for immediate discarding.\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } else { DBGOPRINT((obj_t*) pThis, "enqueueMsg: queue FULL - waiting %dms to drain.\n", pThis->toEnq); @@ -2568,7 +2574,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) if(pthread_cond_timedwait(&pThis->notFull, pThis->mut, &t) != 0) { DBGOPRINT((obj_t*) pThis, "enqueueMsg: cond timeout, dropping message!\n"); STATSCOUNTER_INC(pThis->ctrFDscrd, pThis->mutCtrFDscrd); - objDestruct(pUsr); + msgDestruct(&pMsg); ABORT_FINALIZE(RS_RET_QUEUE_FULL); } dbgoprint((obj_t*) pThis, "enqueueMsg: wait solved queue full condition, enqueing\n"); @@ -2576,7 +2582,7 @@ doEnqSingleObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) } /* and finally enqueue the message */ - CHKiRet(qqueueAdd(pThis, pUsr)); + CHKiRet(qqueueAdd(pThis, pMsg)); STATSCOUNTER_SETMAX_NOMUT(pThis->ctrMaxqsize, pThis->iQueueSize); finalize_it: @@ -2652,11 +2658,11 @@ finalize_it: * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) +qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) { DEFiRet; ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pUsr); + iRet = qAddDirect(pThis, pMsg); RETiRet; } @@ -2665,7 +2671,7 @@ qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr) * Enqueues the new element and awakes worker thread. */ rsRetVal -qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) +qqueueEnqMsg(qqueue_t *pThis, flowControl_t flowCtlType, msg_t *pMsg) { DEFiRet; int iCancelStateSave; @@ -2677,7 +2683,7 @@ qqueueEnqObj(qqueue_t *pThis, flowControl_t flowCtlType, void *pUsr) d_pthread_mutex_lock(pThis->mut); } - CHKiRet(doEnqSingleObj(pThis, flowCtlType, pUsr)); + CHKiRet(doEnqSingleObj(pThis, flowCtlType, pMsg)); qqueueChkPersist(pThis, 1); @@ -2695,27 +2701,67 @@ finalize_it: } -/* take v6 config list and extract the queue params out of it. Hand the - * param values back to the caller. Caller is responsible for destructing - * them when no longer needed. Caller can use this param block to configure - * all parameters for a newly created queue with one call to qqueueSetParams(). - * rgerhards, 2011-07-22 +/* are any queue params set at all? 1 - yes, 0 - no + * We need to evaluate the param block for this function, which is somewhat + * inefficient. HOWEVER, this is only done during config load, so we really + * don't care... -- rgerhards, 2013-05-10 */ -rsRetVal -qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals) +int +queueCnfParamsSet(struct nvlst *lst) { - *ppvals = nvlstGetParams(lst, &pblk, NULL); - return RS_RET_OK; + int r; + struct cnfparamvals *pvals; + + pvals = nvlstGetParams(lst, &pblk, NULL); + r = cnfparamvalsIsSet(&pblk, pvals); + cnfparamvalsDestruct(pvals, &pblk); + return r; } -/* are any queue params set at all? 1 - yes, 0 - no */ -int -queueCnfParamsSet(struct cnfparamvals *pvals) +static inline rsRetVal +initCryprov(qqueue_t *pThis, struct nvlst *lst) { - return cnfparamvalsIsSet(&pblk, pvals); -} + uchar szDrvrName[1024]; + DEFiRet; + + if(snprintf((char*)szDrvrName, sizeof(szDrvrName), "lmcry_%s", pThis->cryprovName) + == sizeof(szDrvrName)) { + errmsg.LogError(0, RS_RET_ERR, "queue: crypto provider " + "name is too long: '%s' - encryption disabled", + pThis->cryprovName); + ABORT_FINALIZE(RS_RET_ERR); + } + pThis->cryprovNameFull = ustrdup(szDrvrName); + pThis->cryprov.ifVersion = cryprovCURR_IF_VERSION; + /* The pDrvrName+2 below is a hack to obtain the object name. It + * safes us to have yet another variable with the name without "lm" in + * front of it. If we change the module load interface, we may re-think + * about this hack, but for the time being it is efficient and clean enough. + */ + if(obj.UseObj(__FILE__, szDrvrName, szDrvrName, (void*) &pThis->cryprov) + != RS_RET_OK) { + errmsg.LogError(0, RS_RET_LOAD_ERROR, "queue: could not load " + "crypto provider '%s' - encryption disabled", + szDrvrName); + ABORT_FINALIZE(RS_RET_CRYPROV_ERR); + } + + if(pThis->cryprov.Construct(&pThis->cryprovData) != RS_RET_OK) { + errmsg.LogError(0, RS_RET_CRYPROV_ERR, "queue: error constructing " + "crypto provider %s dataset - encryption disabled", + szDrvrName); + ABORT_FINALIZE(RS_RET_CRYPROV_ERR); + } + CHKiRet(pThis->cryprov.SetCnfParam(pThis->cryprovData, lst, CRYPROV_PARAMTYPE_DISK)); + + dbgprintf("loaded crypto provider %s, data instance at %p\n", + szDrvrName, pThis->cryprovData); + pThis->useCryprov = 1; +finalize_it: + RETiRet; +} /* apply all params from param block to queue. Must be called before * finalizing. This supports the v6 config system. Defaults were already @@ -2723,15 +2769,24 @@ queueCnfParamsSet(struct cnfparamvals *pvals) * function. */ rsRetVal -qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals) +qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst) { int i; + struct cnfparamvals *pvals; + + pvals = nvlstGetParams(lst, &pblk, NULL); + if(Debug) { + dbgprintf("queue param blk:\n"); + cnfparamsPrint(&pblk, pvals); + } for(i = 0 ; i < pblk.nParams ; ++i) { if(!pvals[i].bUsed) continue; if(!strcmp(pblk.descr[i].name, "queue.filename")) { pThis->pszFilePrefix = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); pThis->lenFilePrefix = es_strlen(pvals[i].val.d.estr); + } else if(!strcmp(pblk.descr[i].name, "queue.cry.provider")) { + pThis->cryprovName = (uchar*) es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(pblk.descr[i].name, "queue.size")) { pThis->iMaxQueueSize = pvals[i].val.d.n; } else if(!strcmp(pblk.descr[i].name, "queue.dequeuebatchsize")) { @@ -2783,6 +2838,27 @@ qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals) "param '%s'\n", pblk.descr[i].name); } } + if(pThis->qType == QUEUETYPE_DISK) { + if(pThis->pszFilePrefix == NULL) { + errmsg.LogError(0, RS_RET_QUEUE_DISK_NO_FN, "error on queue '%s', disk mode selected, but " + "no queue file name given; queue type changed to 'linkedList'", + obj.GetName((obj_t*) pThis)); + pThis->qType = QUEUETYPE_LINKEDLIST; + } + } + + if(pThis->pszFilePrefix == NULL && pThis->cryprovName != NULL) { + errmsg.LogError(0, RS_RET_QUEUE_CRY_DISK_ONLY, "error on queue '%s', crypto provider can " + "only be set for disk or disk assisted queue - ignored", + obj.GetName((obj_t*) pThis)); + free(pThis->cryprovName); + pThis->cryprovName = NULL; + } + + if(pThis->cryprovName != NULL) { + initCryprov(pThis, lst); + } + cnfparamvalsDestruct(pvals, &pblk); return RS_RET_OK; } @@ -2806,7 +2882,7 @@ DEFpropSetMeth(qqueue, iLightDlyMrk, int) DEFpropSetMeth(qqueue, bIsDA, int) DEFpropSetMeth(qqueue, iMinMsgsPerWrkr, int) DEFpropSetMeth(qqueue, bSaveOnShutdown, int) -DEFpropSetMeth(qqueue, pUsr, void*) +DEFpropSetMeth(qqueue, pAction, action_t*) DEFpropSetMeth(qqueue, iDeqSlowdown, int) DEFpropSetMeth(qqueue, iDeqBatchSize, int) DEFpropSetMeth(qqueue, sizeOnDiskMax, int64) @@ -2829,8 +2905,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp) pThis->iQueueSize = pProp->val.num; } else if(isProp("tVars.disk.sizeOnDisk")) { pThis->tVars.disk.sizeOnDisk = pProp->val.num; - } else if(isProp("tVars.disk.bytesRead")) { - pThis->tVars.disk.bytesRead = pProp->val.num; } else if(isProp("qType")) { if(pThis->qType != pProp->val.num) ABORT_FINALIZE(RS_RET_QTYPE_MISMATCH); |