summaryrefslogtreecommitdiffstats
path: root/runtime/queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/queue.h')
-rw-r--r--runtime/queue.h34
1 files changed, 22 insertions, 12 deletions
diff --git a/runtime/queue.h b/runtime/queue.h
index 91c100ed..844523ad 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -30,6 +30,7 @@
#include "batch.h"
#include "stream.h"
#include "statsobj.h"
+#include "cryprov.h"
/* support for the toDelete list */
typedef struct toDeleteLst_s toDeleteLst_t;
@@ -51,7 +52,7 @@ typedef enum {
/* list member definition for linked list types of queues: */
typedef struct qLinkedList_S {
struct qLinkedList_S *pNext;
- void *pUsr;
+ msg_t *pMsg;
} qLinkedList_t;
@@ -71,7 +72,7 @@ struct queue_s {
int iMinMsgsPerWrkr;/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */
wtp_t *pWtpDA;
wtp_t *pWtpReg;
- void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */
+ action_t *pAction; /* for action queues, ptr to action object; for main queues unused */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
sbool bSyncQueueFiles;/* if working with files, sync them after each write? */
@@ -111,8 +112,8 @@ struct queue_s {
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
rsRetVal (*qDestruct)(struct queue_s *pThis);
- rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
- rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
+ rsRetVal (*qAdd)(struct queue_s *pThis, msg_t *pMsg);
+ rsRetVal (*qDeq)(struct queue_s *pThis, msg_t **ppMsg);
rsRetVal (*qDel)(struct queue_s *pThis);
/* end type-specific handler */
/* public entry points (set during construction, permit to set best algorithm for params selected) */
@@ -135,6 +136,8 @@ struct queue_s {
size_t lenSpoolDir;
uchar *pszFilePrefix;
size_t lenFilePrefix;
+ uchar *pszQIFNam; /* full .qi file name, based on parts above */
+ size_t lenQIFNam;
int iNumberFiles; /* how many files make up the queue? */
int64 iMaxFileSize; /* max size for a single queue file */
int64 sizeOnDiskMax; /* maximum size on disk allowed */
@@ -145,7 +148,8 @@ struct queue_s {
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
/* now follow queueing mode specific data elements */
- union { /* different data elements based on queue type (qType) */
+ //union { /* different data elements based on queue type (qType) */
+ struct { /* different data elements based on queue type (qType) */
struct {
long deqhead, head, tail;
void** pBuf; /* the queued user data structure */
@@ -157,12 +161,19 @@ struct queue_s {
} linklist;
struct {
int64 sizeOnDisk; /* current amount of disk space used */
- int64 bytesRead; /* number of bytes read from current (undeleted!) file */
+ int64 deqOffs; /* offset after dequeue batch - used for file deleter */
+ int deqFileNumIn; /* same for the circular file numbers, mainly for */
+ int deqFileNumOut;/* deleting finished files */
strm_t *pWrite; /* current file to be written */
strm_t *pReadDeq; /* current file for dequeueing */
strm_t *pReadDel; /* current file for deleting */
} disk;
} tVars;
+ sbool useCryprov; /* quicker than checkig ptr (1 vs 8 bytes!) */
+ uchar *cryprovName; /* crypto provider to use */
+ cryprov_if_t cryprov; /* ptr to crypto provider interface */
+ void *cryprovData; /* opaque data ptr for provider use */
+ uchar *cryprovNameFull;/* full internal crypto provider name */
DEF_ATOMIC_HELPER_MUT(mutQueueSize);
DEF_ATOMIC_HELPER_MUT(mutLogDeq);
/* for statistics subsystem */
@@ -184,17 +195,16 @@ struct queue_s {
/* prototypes */
rsRetVal qqueueDestruct(qqueue_t **ppThis);
-rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr);
-rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
+rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg);
+rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg);
rsRetVal qqueueStart(qqueue_t *pThis);
rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*));
rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch);
-rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals);
-int queueCnfParamsSet(struct cnfparamvals *pvals);
-rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals);
+int queueCnfParamsSet(struct nvlst *lst);
+rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst);
void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis);
void qqueueSetDefaultsActionQueue(qqueue_t *pThis);
void qqueueDbgPrint(qqueue_t *pThis);
@@ -215,7 +225,7 @@ PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int);
PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int);
PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int);
PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int);
-PROTOTYPEpropSetMeth(qqueue, pUsr, void*);
+PROTOTYPEpropSetMeth(qqueue, pAction, action_t*);
PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int);