From 4226f0dd4813277819406a4f13b460195d798f1a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 15 Apr 2008 16:28:44 +0200 Subject: begin building runtime convenience library (does not build!) --- runtime/queue.h | 205 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 runtime/queue.h (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h new file mode 100644 index 00000000..9e75b31b --- /dev/null +++ b/runtime/queue.h @@ -0,0 +1,205 @@ +/* Definition of the queue support module. + * + * Copyright 2008 Rainer Gerhards and Adiscon GmbH. + * + * This file is part of the rsyslog runtime library. + * + * The rsyslog runtime library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The rsyslog runtime library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the rsyslog runtime library. If not, see . + * + * A copy of the GPL can be found in the file "COPYING" in this distribution. + * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. + */ + +#ifndef QUEUE_H_INCLUDED +#define QUEUE_H_INCLUDED + +#include +#include "obj.h" +#include "wtp.h" +#include "stream.h" + +/* queue types */ +typedef enum { + QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */ + QUEUETYPE_LINKEDLIST = 1, /* linked list used as buffer, lower fixed memory overhead but slower */ + QUEUETYPE_DISK = 2, /* disk files used as buffer */ + QUEUETYPE_DIRECT = 3 /* no queuing happens, consumer is directly called */ +} queueType_t; + +/* list member definition for linked list types of queues: */ +typedef struct qLinkedList_S { + struct qLinkedList_S *pNext; + void *pUsr; +} qLinkedList_t; + + +typedef struct qWrkThrd_s { + pthread_t thrdID; /* thread ID */ + qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ + obj_t *pUsr; /* current user object being processed (or NULL if none) */ + struct queue_s *pQueue; /* my queue (important if only the work thread instance is passed! */ + int iThrd; /* my worker thread array index */ + pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */ + pthread_mutex_t mut; +} qWrkThrd_t; /* type for queue worker threads */ + +/* the queue object */ +typedef struct queue_s { + BEGINobjInstance; + queueType_t qType; + int bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ + int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ + int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ + int bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */ + int iQueueSize; /* Current number of elements in the queue */ + int iMaxQueueSize; /* how large can the queue grow? */ + int iNumWorkerThreads;/* number of worker threads to use */ + int iCurNumWrkThrd;/* current number of active worker threads */ + int iMinMsgsPerWrkr;/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */ + wtp_t *pWtpDA; + wtp_t *pWtpReg; + void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */ + int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ + int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ + int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ + int iLowWtrMrk; /* low water mark for disk-assisted memory queues */ + int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */ + int iFullDlyMrk; /* if the queue is above this mark, FULL_DELAYable message are put on hold */ + int iLightDlyMrk; /* if the queue is above this mark, LIGHT_DELAYable message are put on hold */ + int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */ + int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */ + int toQShutdown; /* timeout for regular queue shutdown in ms */ + int toActShutdown; /* timeout for long-running action shutdown in ms */ + int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ + int toEnq; /* enqueue timeout */ + /* rate limiting settings (will be expanded) */ + int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */ + /* end rate limiting */ + /* dequeue time window settings (may also be expanded) */ + int iDeqtWinFromHr; /* begin of dequeue time window (hour only) */ + int iDeqtWinToHr; /* end of dequeue time window (hour only), set to 25 to disable deq window! */ + /* note that begin and end have specific semantics. It is a big difference if we have + * begin 4, end 22 or begin 22, end 4. In the later case, dequeuing will run from 10p, + * throughout the night and stop at 4 in the morning. In the first case, it will start + * at 4am, run throughout the day, and stop at 10 in the evening! So far, not logic is + * applied to detect user configuration errors (and tell me how should we detect what + * the user really wanted...). -- rgerhards, 2008-04-02 + */ + /* ane dequeue time window */ + rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */ + /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the + * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer + * to message) + * rgerhards, 2008-01-28 + */ + /* type-specific handlers (set during construction) */ + rsRetVal (*qConstruct)(struct queue_s *pThis); + rsRetVal (*qDestruct)(struct queue_s *pThis); + rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr); + rsRetVal (*qDel)(struct queue_s *pThis, void **ppUsr); + /* end type-specific handler */ + /* synchronization variables */ + pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ + pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ + pthread_cond_t notFull, notEmpty; + pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ + pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ + pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */ + int bChildIsDone; /* set to 1 when the child DA queue has finished processing, 0 otherwise */ + int bThrdStateChanged; /* at least one thread state has changed if 1 */ + /* end sync variables */ + /* the following variables are always present, because they + * are not only used for the "disk" queueing mode but also for + * any other queueing mode if it is set to "disk assisted". + * rgerhards, 2008-01-09 + */ + uchar *pszSpoolDir; + size_t lenSpoolDir; + uchar *pszFilePrefix; + size_t lenFilePrefix; + int iNumberFiles; /* how many files make up the queue? */ + int64 iMaxFileSize; /* max size for a single queue file */ + int64 sizeOnDiskMax; /* maximum size on disk allowed */ + int bIsDA; /* is this queue disk assisted? */ + int bRunsDA; /* is this queue actually *running* disk assisted? */ + struct queue_s *pqDA; /* queue for disk-assisted modes */ + struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */ + int bDAEnqOnly; /* EnqOnly setting for DA queue */ + /* some data elements for the queueUngetObj() functionality. This list should always be short + * and is always kept in memory + */ + qLinkedList_t *pUngetRoot; + qLinkedList_t *pUngetLast; + int iUngottenObjs; /* number of objects currently in the "ungotten" list */ + /* now follow queueing mode specific data elements */ + union { /* different data elements based on queue type (qType) */ + struct { + long head, tail; + void** pBuf; /* the queued user data structure */ + } farray; + struct { + qLinkedList_t *pRoot; + qLinkedList_t *pLast; + } linklist; + struct { + int64 sizeOnDisk; /* current amount of disk space used */ + int64 bytesRead; /* number of bytes read from current (undeleted!) file */ + strm_t *pWrite; /* current file to be written */ + strm_t *pRead; /* current file to be read */ + } disk; + } tVars; +} queue_t; + +/* some symbolic constants for easier reference */ +#define QUEUE_MODE_ENQDEQ 0 +#define QUEUE_MODE_ENQONLY 1 + +#define QUEUE_IDX_DA_WORKER 0 /* index for the DA worker (fixed) */ +#define QUEUE_PTR_DA_WORKER(x) (&((pThis)->pWrkThrds[0])) + +/* the define below is an "eternal" timeout for the timeout settings which require a value. + * It is one day, which is not really eternal, but comes close to it if we think about + * rsyslog (e.g.: do you want to wait on shutdown for more than a day? ;)) + * rgerhards, 2008-01-17 + */ +#define QUEUE_TIMEOUT_ETERNAL 24 * 60 * 60 * 1000 + +/* prototypes */ +rsRetVal queueDestruct(queue_t **ppThis); +rsRetVal queueEnqObj(queue_t *pThis, flowControl_t flwCtlType, void *pUsr); +rsRetVal queueStart(queue_t *pThis); +rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize); +rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); +rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); +PROTOTYPEObjClassInit(queue); +PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int); +PROTOTYPEpropSetMeth(queue, iDeqtWinFromHr, int); +PROTOTYPEpropSetMeth(queue, iDeqtWinToHr, int); +PROTOTYPEpropSetMeth(queue, toQShutdown, long); +PROTOTYPEpropSetMeth(queue, toActShutdown, long); +PROTOTYPEpropSetMeth(queue, toWrkShutdown, long); +PROTOTYPEpropSetMeth(queue, toEnq, long); +PROTOTYPEpropSetMeth(queue, iHighWtrMrk, int); +PROTOTYPEpropSetMeth(queue, iLowWtrMrk, int); +PROTOTYPEpropSetMeth(queue, iDiscardMrk, int); +PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int); +PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int); +PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int); +PROTOTYPEpropSetMeth(queue, pUsr, void*); +PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int); +PROTOTYPEpropSetMeth(queue, sizeOnDiskMax, int64); +#define queueGetID(pThis) ((unsigned long) pThis) + +#endif /* #ifndef QUEUE_H_INCLUDED */ -- cgit v1.2.3 From cf38fc81759b01af5125b1a05e0d6fe8e2e1bc21 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 22 Oct 2008 13:54:40 +0200 Subject: added a setting "$OptimizeForUniprocessor" ...to enable users to turn off pthread_yield calls which are counter-productive on multiprocessor machines (but have been shown to be useful on uniprocessors) --- runtime/queue.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 9e75b31b..a2dd594f 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -58,6 +58,7 @@ typedef struct qWrkThrd_s { typedef struct queue_s { BEGINobjInstance; queueType_t qType; + int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */ int bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ -- cgit v1.2.3 From 2e388db9ac91eae35ac836b329c8bcadd319a409 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 5 Mar 2009 11:10:43 +0100 Subject: integrated various patches for solaris Unfortunatley, I do not have the full list of contributors available. The patch set was compiled by Ben Taylor, and I made some further changes to adopt it to the news rsyslog branch. Others provided much of the base work, but I can not find the names of the original authors. If you happen to be one of them, please let me know so that I can give proper credits. --- runtime/queue.h | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index a2dd594f..a267862d 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -160,7 +160,7 @@ typedef struct queue_s { strm_t *pRead; /* current file to be read */ } disk; } tVars; -} queue_t; +} qqueue_t; /* some symbolic constants for easier reference */ #define QUEUE_MODE_ENQDEQ 0 @@ -177,30 +177,30 @@ typedef struct queue_s { #define QUEUE_TIMEOUT_ETERNAL 24 * 60 * 60 * 1000 /* prototypes */ -rsRetVal queueDestruct(queue_t **ppThis); -rsRetVal queueEnqObj(queue_t *pThis, flowControl_t flwCtlType, void *pUsr); -rsRetVal queueStart(queue_t *pThis); -rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize); -rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); -rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads, +rsRetVal qqueueDestruct(qqueue_t **ppThis); +rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); +rsRetVal qqueueStart(qqueue_t *pThis); +rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); +rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); +rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); -PROTOTYPEObjClassInit(queue); -PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int); -PROTOTYPEpropSetMeth(queue, iDeqtWinFromHr, int); -PROTOTYPEpropSetMeth(queue, iDeqtWinToHr, int); -PROTOTYPEpropSetMeth(queue, toQShutdown, long); -PROTOTYPEpropSetMeth(queue, toActShutdown, long); -PROTOTYPEpropSetMeth(queue, toWrkShutdown, long); -PROTOTYPEpropSetMeth(queue, toEnq, long); -PROTOTYPEpropSetMeth(queue, iHighWtrMrk, int); -PROTOTYPEpropSetMeth(queue, iLowWtrMrk, int); -PROTOTYPEpropSetMeth(queue, iDiscardMrk, int); -PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int); -PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int); -PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int); -PROTOTYPEpropSetMeth(queue, pUsr, void*); -PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int); -PROTOTYPEpropSetMeth(queue, sizeOnDiskMax, int64); -#define queueGetID(pThis) ((unsigned long) pThis) +PROTOTYPEObjClassInit(qqueue); +PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); +PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); +PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int); +PROTOTYPEpropSetMeth(qqueue, toQShutdown, long); +PROTOTYPEpropSetMeth(qqueue, toActShutdown, long); +PROTOTYPEpropSetMeth(qqueue, toWrkShutdown, long); +PROTOTYPEpropSetMeth(qqueue, toEnq, long); +PROTOTYPEpropSetMeth(qqueue, iHighWtrMrk, int); +PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int); +PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int); +PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int); +PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int); +PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); +PROTOTYPEpropSetMeth(qqueue, pUsr, void*); +PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); +PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64); +#define qqueueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ -- cgit v1.2.3 From 7667845bd72b6f92eabc975318a4f288a77f2630 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 22 Apr 2009 15:06:45 +0200 Subject: first attempt at dequeueing multiple batches inside the queue ... but this code has serious problems when terminating the queue, also it is far from being optimal. I will commit a series of patches (hopefully) as I am on the path to the final implementation. --- runtime/queue.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index a267862d..7ecb9294 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -54,6 +54,7 @@ typedef struct qWrkThrd_s { pthread_mutex_t mut; } qWrkThrd_t; /* type for queue worker threads */ + /* the queue object */ typedef struct queue_s { BEGINobjInstance; @@ -84,6 +85,7 @@ typedef struct queue_s { int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ int toEnq; /* enqueue timeout */ + int iDeqMaxAtOnce; /* max number of elements that shall be dequeued at once */ /* rate limiting settings (will be expanded) */ int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */ /* end rate limiting */ @@ -97,7 +99,7 @@ typedef struct queue_s { * applied to detect user configuration errors (and tell me how should we detect what * the user really wanted...). -- rgerhards, 2008-04-02 */ - /* ane dequeue time window */ + /* end dequeue time window */ rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer -- cgit v1.2.3 From e4b3f6d287d74b34d27b4e296c33cb3f1294a58c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 22 Apr 2009 16:39:58 +0200 Subject: now batches are handed down to the actual consumer ... but the action consumer does not do anything really intelligent with them. But the DA consumer is already done, as is the main message queue consumer. --- runtime/queue.h | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 7ecb9294..4fb57d07 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -100,11 +100,10 @@ typedef struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the - * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer - * to message) - * rgerhards, 2008-01-28 + * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 + * is pointer to an array of message message pointers) */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -185,7 +184,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); -- cgit v1.2.3 From 5c0aeae8ab1f344a022d586dc26c5d78203f7e0b Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 23 Apr 2009 12:50:07 +0200 Subject: added $MainMsgQueueDequeueBatchSize and $ActionQueueDequeueBatchSize configuration directives --- runtime/queue.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 4fb57d07..8a60254b 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -85,7 +85,7 @@ typedef struct queue_s { int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ int toEnq; /* enqueue timeout */ - int iDeqMaxAtOnce; /* max number of elements that shall be dequeued at once */ + int iDeqBatchSize; /* max number of elements that shall be dequeued at once */ /* rate limiting settings (will be expanded) */ int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */ /* end rate limiting */ @@ -202,6 +202,7 @@ PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); PROTOTYPEpropSetMeth(qqueue, pUsr, void*); PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64); +PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int); #define qqueueGetID(pThis) ((unsigned long) pThis) #endif /* #ifndef QUEUE_H_INCLUDED */ -- cgit v1.2.3 From bb79e96dc300fa5a2182e7c047afb3b15c5dc870 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 12 May 2009 15:27:40 +0200 Subject: moving to a cleaner implementation of batches ... now that we know what we need from a theoretical POV. --- runtime/queue.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 8a60254b..4a5f16a1 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -27,6 +27,7 @@ #include #include "obj.h" #include "wtp.h" +#include "batch.h" #include "stream.h" /* queue types */ @@ -100,7 +101,7 @@ typedef struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,aUsrp_t*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 * is pointer to an array of message message pointers) @@ -184,7 +185,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,aUsrp_t*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); -- cgit v1.2.3 From 93f873277bfe5ebb309ff5e92f5dc7244ebd9f1a Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 18 May 2009 17:28:34 +0200 Subject: t-delete list implemented, queue store drivers updated... ... on the way to the ultra-reliable queue modes (redesign doc). This version does not really work, but is a good commit point. Next comes queue size calculation. DA mode does not yet work. --- runtime/queue.h | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 4a5f16a1..00cee419 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -30,6 +30,15 @@ #include "batch.h" #include "stream.h" +/* support for the toDelete list */ +typedef struct toDeleteLst_s toDeleteLst_t; +struct toDeleteLst_s { + qDeqID deqID; + int nElem; + struct toDeleteLst_s *pNext; +}; + + /* queue types */ typedef enum { QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */ @@ -85,6 +94,7 @@ typedef struct queue_s { int toQShutdown; /* timeout for regular queue shutdown in ms */ int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ + toDeleteLst_t *toDeleteLst;/* this queue's to-delete list */ int toEnq; /* enqueue timeout */ int iDeqBatchSize; /* max number of elements that shall be dequeued at once */ /* rate limiting settings (will be expanded) */ @@ -110,7 +120,8 @@ typedef struct queue_s { rsRetVal (*qConstruct)(struct queue_s *pThis); rsRetVal (*qDestruct)(struct queue_s *pThis); rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr); - rsRetVal (*qDel)(struct queue_s *pThis, void **ppUsr); + rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); + rsRetVal (*qDel)(struct queue_s *pThis); /* end type-specific handler */ /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ @@ -134,6 +145,8 @@ typedef struct queue_s { int iNumberFiles; /* how many files make up the queue? */ int64 iMaxFileSize; /* max size for a single queue file */ int64 sizeOnDiskMax; /* maximum size on disk allowed */ + qDeqID deqIDAdd; /* next dequeue ID to use during add to queue store */ + qDeqID deqIDDel; /* queue store delete position */ int bIsDA; /* is this queue disk assisted? */ int bRunsDA; /* is this queue actually *running* disk assisted? */ struct queue_s *pqDA; /* queue for disk-assisted modes */ @@ -148,18 +161,20 @@ typedef struct queue_s { /* now follow queueing mode specific data elements */ union { /* different data elements based on queue type (qType) */ struct { - long head, tail; + long deqhead, head, tail; void** pBuf; /* the queued user data structure */ } farray; struct { - qLinkedList_t *pRoot; + qLinkedList_t *pDeqRoot; + qLinkedList_t *pDelRoot; qLinkedList_t *pLast; } linklist; struct { int64 sizeOnDisk; /* current amount of disk space used */ int64 bytesRead; /* number of bytes read from current (undeleted!) file */ - strm_t *pWrite; /* current file to be written */ - strm_t *pRead; /* current file to be read */ + strm_t *pWrite; /* current file to be written */ + strm_t *pReadDeq; /* current file for dequeueing */ + strm_t *pReadDel; /* current file for deleting */ } disk; } tVars; } qqueue_t; -- cgit v1.2.3 From fe5bea77ac2533faab3b7b73bc253c4dc7d702bc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 18 May 2009 18:39:52 +0200 Subject: removed queue's UngetObj() call ... which is no longer needed thanks to the new queue design. --- runtime/queue.h | 6 ------ 1 file changed, 6 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 00cee419..e47b8762 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -152,12 +152,6 @@ typedef struct queue_s { struct queue_s *pqDA; /* queue for disk-assisted modes */ struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */ int bDAEnqOnly; /* EnqOnly setting for DA queue */ - /* some data elements for the queueUngetObj() functionality. This list should always be short - * and is always kept in memory - */ - qLinkedList_t *pUngetRoot; - qLinkedList_t *pUngetLast; - int iUngottenObjs; /* number of objects currently in the "ungotten" list */ /* now follow queueing mode specific data elements */ union { /* different data elements based on queue type (qType) */ struct { -- cgit v1.2.3 From a4dad2009992d436ba23c2d0a4a43b483aac40fc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 May 2009 11:03:09 +0200 Subject: queue size calculation now based on logical/physical dequeue ... needed to split the old single counter into two. I wouldn't bet that I made some mistakes while doing so, but at least some ad-hoc tests plus the testbench do no longer indicate errors. --- runtime/queue.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index e47b8762..92bf8ae5 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -74,7 +74,8 @@ typedef struct queue_s { int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ int bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */ - int iQueueSize; /* Current number of elements in the queue */ + int iQueueSize; /* Current number of elements in queue store (some are already logically dequeued!) */ + int nLogDeq; /* number of elements currently logically dequeued */ int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ int iCurNumWrkThrd;/* current number of active worker threads */ -- cgit v1.2.3 From 0cf8e88a348dc574244e4f5c2be26f47e8bfff08 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 19 May 2009 18:58:33 +0200 Subject: solved the intended-discard-during-dequeue issue --- runtime/queue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 92bf8ae5..954a7fd4 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -34,7 +34,7 @@ typedef struct toDeleteLst_s toDeleteLst_t; struct toDeleteLst_s { qDeqID deqID; - int nElem; + int nElemDeq; /* numbe of elements that were dequeued and as such must now be discarded */ struct toDeleteLst_s *pNext; }; -- cgit v1.2.3 From aa9426f683fa6af9280bc63050ee0187ba4c57e1 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 26 May 2009 12:43:43 +0200 Subject: solved design issue with queue termination ... and also improved the test suite. There is a design issue in the v3 queue engine that manifested to some serious problems with the new processing mode. However, in v3 shutdown may take eternally if a queue runs in DA mode, is configured to preserve data AND the action fails and retries immediately. There is no cure available for v3, it would require doing much of the work we have done on the new engine. The window of exposure, as one might guess from the description, is very small. That is probably the reason why we have not seen it in practice. --- runtime/queue.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 954a7fd4..c1fe597d 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -123,6 +123,7 @@ typedef struct queue_s { rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr); rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); rsRetVal (*qDel)(struct queue_s *pThis); + rsRetVal (*qUnDeqAll)(struct queue_s *pThis); /* end type-specific handler */ /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ -- cgit v1.2.3 From 9704f129f72ec9ece11aeccea4bbf0cbccb116cb Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 9 Jun 2009 19:00:18 +0200 Subject: added capability to fsync() queue disk files for enhanced reliability also adds speed, because you do no longer need to run the whole file system in sync mode. New testbench and new config directives: - $MainMsgQueueSyncQueueFiles - $ActionQueueSyncQueueFiles --- runtime/queue.h | 2 ++ 1 file changed, 2 insertions(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index a267862d..07f134aa 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -73,6 +73,7 @@ typedef struct queue_s { void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */ int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ + int bSyncQueueFiles;/* if working with files, sync them after each write? */ int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ int iLowWtrMrk; /* low water mark for disk-assisted memory queues */ int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */ @@ -186,6 +187,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); +PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int); PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int); PROTOTYPEpropSetMeth(qqueue, toQShutdown, long); -- cgit v1.2.3 From 56e462610db0dc71cfc2e4af17d1eb27bd67fae7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 17 Jun 2009 12:56:58 +0200 Subject: further optimized message object pri, facility and severity string generation simplified --- runtime/queue.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 07f134aa..5bc03254 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -179,6 +179,7 @@ typedef struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); +rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub); rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); -- cgit v1.2.3 From 7a695d171436fe249770e8256ae48cd4ed86fd30 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 19 Jun 2009 12:03:56 +0200 Subject: removed uniprocessor optimization ... as it was not even optimal on uniprocessors any longer ;) I keep the config directive in, maybe we can utilize it again at some later point in time (questionable). --- runtime/queue.h | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 5bc03254..1d82d8d9 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -58,11 +58,10 @@ typedef struct qWrkThrd_s { typedef struct queue_s { BEGINobjInstance; queueType_t qType; - int bOptimizeUniProc; /* cache for the equally-named global setting, pulled at time of queue creation */ - int bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ - int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ - int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ - int bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */ + bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ + bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ + bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ + bool bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */ int iQueueSize; /* Current number of elements in the queue */ int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ @@ -73,14 +72,14 @@ typedef struct queue_s { void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */ int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ - int bSyncQueueFiles;/* if working with files, sync them after each write? */ + bool bSyncQueueFiles;/* if working with files, sync them after each write? */ int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ int iLowWtrMrk; /* low water mark for disk-assisted memory queues */ int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */ int iFullDlyMrk; /* if the queue is above this mark, FULL_DELAYable message are put on hold */ int iLightDlyMrk; /* if the queue is above this mark, LIGHT_DELAYable message are put on hold */ int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */ - int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */ + bool bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */ int toQShutdown; /* timeout for regular queue shutdown in ms */ int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ -- cgit v1.2.3 From 4c9eded44dbae1701bb3b8f255865892b19e7f72 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 17 Jul 2009 18:40:28 +0200 Subject: further code simplification ... could even remove one mutex by using a better algorithm. I think I also spotted some situation in which a hang could have happened. As I can't fix it in v4 and less without moving to the new engine, I make no effort in testing this out. Hangs occur during shutdown, only (if at all). The code changes should also result in some mild performance improvement. Some bug potential, but overall the bug potential should have been greatly reduced. --- runtime/queue.h | 11 ----------- 1 file changed, 11 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index e873c456..7b10e5dd 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -54,17 +54,6 @@ typedef struct qLinkedList_S { } qLinkedList_t; -typedef struct qWrkThrd_s { - pthread_t thrdID; /* thread ID */ - qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */ - obj_t *pUsr; /* current user object being processed (or NULL if none) */ - struct queue_s *pQueue; /* my queue (important if only the work thread instance is passed! */ - int iThrd; /* my worker thread array index */ - pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */ - pthread_mutex_t mut; -} qWrkThrd_t; /* type for queue worker threads */ - - /* the queue object */ typedef struct queue_s { BEGINobjInstance; -- cgit v1.2.3 From ff6963d6f6d85c6c10e80b17da8432bb983f3e38 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 20 Jul 2009 15:06:32 +0200 Subject: simplified startup of queue DA mode --- runtime/queue.h | 1 - 1 file changed, 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 7b10e5dd..73c62b52 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -121,7 +121,6 @@ typedef struct queue_s { pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */ - int bChildIsDone; /* set to 1 when the child DA queue has finished processing, 0 otherwise */ int bThrdStateChanged; /* at least one thread state has changed if 1 */ /* end sync variables */ /* the following variables are always present, because they -- cgit v1.2.3 From 4d70c9b3e5e480d6dfa1c94506270f1f78e8ef32 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 13 Oct 2009 14:38:45 +0200 Subject: added some debug settings plus improved shutdown sequence ... non-working version! --- runtime/queue.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 73c62b52..74bf2d31 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -59,10 +59,10 @@ typedef struct queue_s { BEGINobjInstance; queueType_t qType; int nLogDeq; /* number of elements currently logically dequeued */ + int bShutdownImmediate; /* should all workers cease processing messages? */ bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ - bool bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */ int iQueueSize; /* Current number of elements in the queue */ int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ @@ -101,10 +101,11 @@ typedef struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 - * is pointer to an array of message message pointers) + * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero + * during normal operations and one if the consumer must urgently shut down. */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -185,7 +186,7 @@ rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*)); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); -- cgit v1.2.3 From 90e8475260cf8ac54519b3d964d879489af879f6 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Oct 2009 09:41:45 +0200 Subject: bugfix: message processing states were not set correctly in all cases however, this had no negative effect, as the message processing state was not evaluated when a batch was deleted, and that was the only case where the state could be wrong. --- runtime/queue.h | 6 ------ 1 file changed, 6 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 74bf2d31..338f091b 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -164,12 +164,6 @@ typedef struct queue_s { } tVars; } qqueue_t; -/* some symbolic constants for easier reference */ -#define QUEUE_MODE_ENQDEQ 0 -#define QUEUE_MODE_ENQONLY 1 - -#define QUEUE_IDX_DA_WORKER 0 /* index for the DA worker (fixed) */ -#define QUEUE_PTR_DA_WORKER(x) (&((pThis)->pWrkThrds[0])) /* the define below is an "eternal" timeout for the timeout settings which require a value. * It is one day, which is not really eternal, but comes close to it if we think about -- cgit v1.2.3 From 553d1880d47b57b2f4e023c2017675f010afd9a0 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 10:36:53 +0100 Subject: some cleanup --- runtime/queue.h | 1 - 1 file changed, 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 338f091b..3b5d7038 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -113,7 +113,6 @@ typedef struct queue_s { rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr); rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); rsRetVal (*qDel)(struct queue_s *pThis); - rsRetVal (*qUnDeqAll)(struct queue_s *pThis); /* end type-specific handler */ /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ -- cgit v1.2.3 From 386b7cd2f2ae6f9ac8e0b9c8b49934398c159ea4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 10:44:55 +0100 Subject: removed no longer needed flag variable --- runtime/queue.h | 1 - 1 file changed, 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 3b5d7038..26c57a50 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -138,7 +138,6 @@ typedef struct queue_s { qDeqID deqIDAdd; /* next dequeue ID to use during add to queue store */ qDeqID deqIDDel; /* queue store delete position */ int bIsDA; /* is this queue disk assisted? */ - int bRunsDA; /* is this queue actually *running* disk assisted? */ struct queue_s *pqDA; /* queue for disk-assisted modes */ struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */ int bDAEnqOnly; /* EnqOnly setting for DA queue */ -- cgit v1.2.3 From 1ada506e2d90377c2475e103340d8986bf8847f9 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Oct 2009 17:31:27 +0100 Subject: added the capability to have ruleset-specific main message queues This offers considerable additional flexibility AND superior performance (in cases where multiple inputs now can avoid lock contention) --- runtime/queue.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 26c57a50..93573dae 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -55,7 +55,7 @@ typedef struct qLinkedList_S { /* the queue object */ -typedef struct queue_s { +struct queue_s { BEGINobjInstance; queueType_t qType; int nLogDeq; /* number of elements currently logically dequeued */ @@ -160,7 +160,7 @@ typedef struct queue_s { strm_t *pReadDel; /* current file for deleting */ } disk; } tVars; -} qqueue_t; +}; /* the define below is an "eternal" timeout for the timeout settings which require a value. -- cgit v1.2.3 From 38cb3926727c0ad29f3950db43ba12248e867b89 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 2 Feb 2010 15:51:01 +0100 Subject: replaced data type "bool" by "sbool" because this created some portability issues --- runtime/queue.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 93573dae..38c0d491 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -60,9 +60,9 @@ struct queue_s { queueType_t qType; int nLogDeq; /* number of elements currently logically dequeued */ int bShutdownImmediate; /* should all workers cease processing messages? */ - bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ - bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ - bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ + sbool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */ + sbool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */ + sbool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */ int iQueueSize; /* Current number of elements in the queue */ int iMaxQueueSize; /* how large can the queue grow? */ int iNumWorkerThreads;/* number of worker threads to use */ @@ -73,14 +73,14 @@ struct queue_s { void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */ int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ - bool bSyncQueueFiles;/* if working with files, sync them after each write? */ + sbool bSyncQueueFiles;/* if working with files, sync them after each write? */ int iHighWtrMrk; /* high water mark for disk-assisted memory queues */ int iLowWtrMrk; /* low water mark for disk-assisted memory queues */ int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */ int iFullDlyMrk; /* if the queue is above this mark, FULL_DELAYable message are put on hold */ int iLightDlyMrk; /* if the queue is above this mark, LIGHT_DELAYable message are put on hold */ int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */ - bool bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */ + sbool bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */ int toQShutdown; /* timeout for regular queue shutdown in ms */ int toActShutdown; /* timeout for long-running action shutdown in ms */ int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */ -- cgit v1.2.3 From cbe2e3d44496ec7c6418e7e74ce917f2086a2947 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Apr 2010 17:31:28 +0200 Subject: bugfix: problems with atomic operations emulation replaced atomic operation emulation with new code. The previous code seemed to have some issue and also limited concurrency severely. The whole atomic operation emulation has been rewritten. --- runtime/queue.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 1d82d8d9..aafdaa45 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -160,6 +160,7 @@ typedef struct queue_s { strm_t *pRead; /* current file to be read */ } disk; } tVars; + DEF_ATOMIC_HELPER_MUT(mutQueueSize); } qqueue_t; /* some symbolic constants for easier reference */ -- cgit v1.2.3 From dd76d96d676f305aa2d29131321fe5cac5a676c4 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 27 Apr 2010 18:26:09 +0200 Subject: adapted new atomic instruction emulation to v5 engine code did not compile after merge from v4 --- runtime/queue.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 45d3a51b..8ede6922 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -161,6 +161,7 @@ struct queue_s { } disk; } tVars; DEF_ATOMIC_HELPER_MUT(mutQueueSize); + DEF_ATOMIC_HELPER_MUT(mutLogDeq); }; -- cgit v1.2.3 From 395660f462c62029f76b99f73bd9a424a8cf73a2 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 9 Jun 2010 14:34:35 +0200 Subject: somewhat improved direct mode queue performance ... but only for batch enqueues. This will not help much with the current code, but will play well with upcoming changes. --- runtime/queue.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 8ede6922..33b21c9a 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -114,6 +114,9 @@ struct queue_s { rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); rsRetVal (*qDel)(struct queue_s *pThis); /* end type-specific handler */ + /* public entry points (set during construction, permit to set best algorithm for params selected) */ + rsRetVal (*MultiEnq)(qqueue_t *pThis, multi_submit_t *pMultiSub); + /* end public entry points */ /* synchronization variables */ pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */ pthread_mutex_t *mut; /* mutex for enqueing and dequeueing messages */ @@ -174,7 +177,6 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub); rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); -- cgit v1.2.3 From 802f6d8a8f39e5ba578e0183e4500bef8e3a198c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 15 Jun 2010 14:02:34 +0200 Subject: milestone(BUGGY): batch now pushed down to action at least in important cases (not for non-direct action queues and some other minor things). This version is definitely buggy, but may be tried with success on a non-production system. I will continue to work on the correctness, but needed to commit now to get a baseline. --- runtime/queue.h | 2 ++ 1 file changed, 2 insertions(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 33b21c9a..1c758134 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -177,12 +177,14 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); +rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr); rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); +rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); -- cgit v1.2.3 From e86cb62f1299ef18732f7b3b87d45a840ee38f1e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 13 Sep 2010 15:43:56 +0200 Subject: improved statistics-gathering subsystem ... well, actually this is a first real implementation of this subsystem. I have added a counter registry, a way to access the countres (as readable string) and a way to define and maintem them. Also, module impstats has been updated to utilize the new system. Finally, I added some counters. I hope that this sets the baseline for useful future enhancements. --- runtime/queue.h | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 1c758134..38e248cd 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -29,6 +29,7 @@ #include "wtp.h" #include "batch.h" #include "stream.h" +#include "statsobj.h" /* support for the toDelete list */ typedef struct toDeleteLst_s toDeleteLst_t; @@ -165,6 +166,11 @@ struct queue_s { } tVars; DEF_ATOMIC_HELPER_MUT(mutQueueSize); DEF_ATOMIC_HELPER_MUT(mutLogDeq); + /* for statistics subsystem */ + statsobj_t *statsobj; + STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued); + STATSCOUNTER_DEF(ctrFull, mutCtrFull); + int ctrMaxqsize; }; -- cgit v1.2.3 From 6a18d25cbec2676a7910ff038170716293abe89f Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 11 Feb 2011 17:06:20 +0100 Subject: removed no longer needed code --- runtime/queue.h | 1 - 1 file changed, 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 38e248cd..97057180 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -124,7 +124,6 @@ struct queue_s { pthread_cond_t notFull, notEmpty; pthread_cond_t belowFullDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ pthread_cond_t belowLightDlyWtrMrk; /* below eFLOWCTL_FULL_DELAY watermark */ - pthread_cond_t condDAReady;/* signalled when the DA queue is fully initialized and ready for processing */ int bThrdStateChanged; /* at least one thread state has changed if 1 */ /* end sync variables */ /* the following variables are always present, because they -- cgit v1.2.3 From 9757aeb56445eee3aca2b43e6b3efa1f1cb59ba3 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 22 Jul 2011 18:03:43 +0200 Subject: milestone: queue object now has a param handler for new conf interface ... and action queue defs use this new interface (but not yet the main queues) --- runtime/queue.h | 3 +++ 1 file changed, 3 insertions(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 97057180..c18b9f47 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -190,6 +190,9 @@ rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefi rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); +rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals); +rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals); + PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int); -- cgit v1.2.3 From a789813b14de79c5ecac8b0d7f7c222ae7f47412 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 1 Aug 2011 15:56:34 +0200 Subject: milestone: queue-params are properly initialized for action queues --- runtime/queue.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index c18b9f47..2b1fcfa8 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -192,6 +192,7 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals); +void qqueueSetDefaultsActionQueue(qqueue_t *pThis); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); -- cgit v1.2.3 From 7796eeb6960b2e1d1aeac77242a5ad93a64776cc Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 16 Dec 2011 14:42:37 +0100 Subject: new stats counter "discarded" for queue object Tells how many messages have been discarded due to queue full condition. --- runtime/queue.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 97057180..70bb8895 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -169,6 +169,7 @@ struct queue_s { statsobj_t *statsobj; STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued); STATSCOUNTER_DEF(ctrFull, mutCtrFull); + STATSCOUNTER_DEF(ctrDscrd, mutCtrDscrd); int ctrMaxqsize; }; -- cgit v1.2.3 From a1b752b32ffb90bfdce4fd8dfdffac3ebbadc79e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 16 Dec 2011 15:12:24 +0100 Subject: more work on queue statistics counter --- runtime/queue.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 70bb8895..06a58229 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -169,7 +169,8 @@ struct queue_s { statsobj_t *statsobj; STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued); STATSCOUNTER_DEF(ctrFull, mutCtrFull); - STATSCOUNTER_DEF(ctrDscrd, mutCtrDscrd); + STATSCOUNTER_DEF(ctrFDscrd, mutCtrFDscrd); + STATSCOUNTER_DEF(ctrNFDscrd, mutCtrNFDscrd); int ctrMaxqsize; }; -- cgit v1.2.3 From 5fe837bf7dbdcc245ee233feb1fbcc6d052a4898 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Dec 2011 11:16:07 +0100 Subject: added instrumentation --- runtime/queue.h | 2 ++ 1 file changed, 2 insertions(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 97057180..06a58229 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -169,6 +169,8 @@ struct queue_s { statsobj_t *statsobj; STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued); STATSCOUNTER_DEF(ctrFull, mutCtrFull); + STATSCOUNTER_DEF(ctrFDscrd, mutCtrFDscrd); + STATSCOUNTER_DEF(ctrNFDscrd, mutCtrNFDscrd); int ctrMaxqsize; }; -- cgit v1.2.3 From f9b6b94b802c653e6c588f42af0997682e75f267 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Tue, 21 Feb 2012 16:52:36 +0100 Subject: added configuration directives to customize queue light delay marks $MainMsgQueueLightDelayMark, $ActionQueueLightDelayMark; both specify number of messages starting at which a delay happens. --- runtime/queue.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 06a58229..dbd6f249 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -201,6 +201,7 @@ PROTOTYPEpropSetMeth(qqueue, toQShutdown, long); PROTOTYPEpropSetMeth(qqueue, toActShutdown, long); PROTOTYPEpropSetMeth(qqueue, toWrkShutdown, long); PROTOTYPEpropSetMeth(qqueue, toEnq, long); +PROTOTYPEpropSetMeth(qqueue, iLightDlyMrk, int); PROTOTYPEpropSetMeth(qqueue, iHighWtrMrk, int); PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int); PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int); -- cgit v1.2.3 From e6aaf19689791c668ea444a21e470e4db3244cb5 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 14 Mar 2012 12:50:21 +0100 Subject: changed statsobj interface and added better doc --- runtime/queue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 97057180..7ef5673c 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -169,7 +169,7 @@ struct queue_s { statsobj_t *statsobj; STATSCOUNTER_DEF(ctrEnqueued, mutCtrEnqueued); STATSCOUNTER_DEF(ctrFull, mutCtrFull); - int ctrMaxqsize; + int ctrMaxqsize; /* NOT guarded by a mutex */ }; -- cgit v1.2.3 From 3dd44b02d6251c519b04c3147425622603dd4754 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Thu, 5 Jul 2012 17:15:07 +0200 Subject: debug log: emit (some) action queue parameters to debug log --- runtime/queue.h | 1 + 1 file changed, 1 insertion(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 3841615a..edb770c6 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -195,6 +195,7 @@ rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals); void qqueueSetDefaultsActionQueue(qqueue_t *pThis); +void qqueueDbgPrint(qqueue_t *pThis); PROTOTYPEObjClassInit(qqueue); PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int); -- cgit v1.2.3 From 40fffde2b6a36ba12388b89d422104c258a667f7 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 2 Nov 2012 18:55:53 +0100 Subject: generate disk .qi file once at queue construction ... instead of each time a file write happens. In some situations (very frequent sync), this can probably be a big performane win. --- runtime/queue.h | 2 ++ 1 file changed, 2 insertions(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index edb770c6..f9d6067f 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -135,6 +135,8 @@ struct queue_s { size_t lenSpoolDir; uchar *pszFilePrefix; size_t lenFilePrefix; + uchar *pszQIFNam; /* full .qi file name, based on parts above */ + size_t lenQIFNam; int iNumberFiles; /* how many files make up the queue? */ int64 iMaxFileSize; /* max size for a single queue file */ int64 sizeOnDiskMax; /* maximum size on disk allowed */ -- cgit v1.2.3 From feddb2cc8fa213725a14e556d0366aef8d3a4339 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 11:40:13 +0100 Subject: queue: change gerenic queue pUsr ptr to be action_t this was always action_t, but the initial design was more generic. We are making it specific now in order to gain better performance (after all, we did not need the generic engine in the past 8 years...) --- runtime/queue.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index f9d6067f..a5a020ae 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -71,7 +71,7 @@ struct queue_s { int iMinMsgsPerWrkr;/* minimum nbr of msgs per worker thread, if more, a new worker is started until max wrkrs */ wtp_t *pWtpDA; wtp_t *pWtpReg; - void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */ + action_t *pAction; /* for action queues, ptr to action object; for main queues unused */ int iUpdsSincePersist;/* nbr of queue updates since the last persist call */ int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */ sbool bSyncQueueFiles;/* if working with files, sync them after each write? */ @@ -215,7 +215,7 @@ PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int); PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int); PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int); PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int); -PROTOTYPEpropSetMeth(qqueue, pUsr, void*); +PROTOTYPEpropSetMeth(qqueue, pAction, action_t*); PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int); PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64); PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int); -- cgit v1.2.3 From c55e0a5a06e69106bc346057dd61dcb98688a4aa Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Sat, 3 Nov 2012 12:32:50 +0100 Subject: queue: change generic msg ptr (pUsr) to be of msg_t type --- runtime/queue.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index a5a020ae..e6ccdcdb 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -51,7 +51,7 @@ typedef enum { /* list member definition for linked list types of queues: */ typedef struct qLinkedList_S { struct qLinkedList_S *pNext; - void *pUsr; + msg_t *pMsg; } qLinkedList_t; @@ -111,8 +111,8 @@ struct queue_s { /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); rsRetVal (*qDestruct)(struct queue_s *pThis); - rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr); - rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr); + rsRetVal (*qAdd)(struct queue_s *pThis, msg_t *pMsg); + rsRetVal (*qDeq)(struct queue_s *pThis, msg_t **ppMsg); rsRetVal (*qDel)(struct queue_s *pThis); /* end type-specific handler */ /* public entry points (set during construction, permit to set best algorithm for params selected) */ @@ -186,8 +186,8 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueEnqObjDirect(qqueue_t *pThis, void *pUsr); -rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr); +rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg); +rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); -- cgit v1.2.3 From c563914d6f96efc1c4da02a7f49409297b20f656 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 19 Nov 2012 16:46:41 +0100 Subject: queue: file delete stream does no longer do real io This stream is primarily used for state tracking, and has been modified to do just that. This results in considerable less io being done and the respective speedup. --- runtime/queue.h | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index e6ccdcdb..7db2d90d 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -147,7 +147,8 @@ struct queue_s { struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */ int bDAEnqOnly; /* EnqOnly setting for DA queue */ /* now follow queueing mode specific data elements */ - union { /* different data elements based on queue type (qType) */ + //union { /* different data elements based on queue type (qType) */ + struct { /* different data elements based on queue type (qType) */ struct { long deqhead, head, tail; void** pBuf; /* the queued user data structure */ @@ -159,7 +160,9 @@ struct queue_s { } linklist; struct { int64 sizeOnDisk; /* current amount of disk space used */ - int64 bytesRead; /* number of bytes read from current (undeleted!) file */ + int64 deqOffs; /* offset after dequeue batch - used for file deleter */ + int deqFileNumIn; /* same for the circular file numbers, mainly for */ + int deqFileNumOut;/* deleting finished files */ strm_t *pWrite; /* current file to be written */ strm_t *pReadDeq; /* current file for dequeueing */ strm_t *pReadDel; /* current file for deleting */ -- cgit v1.2.3 From 62f6a7d7b4b3c9fee0fffea961a201d24a059b2c Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 30 Nov 2012 17:09:28 +0100 Subject: fix missing functionality: ruleset(){} could not specify ruleset queue The "queue.xxx" parameter set was not supported, and legacy ruleset config statements did not work (by intention). The fix introduces the "queue.xxx" parameter set. It has some regression potential, but only for the new functionality. Note that using that interface it is possible to specify duplicate queue file names, which will cause trouble. This will be solved in v7.3, because there is a too-large regression potential for the v7.2 stable branch. --- runtime/queue.h | 2 ++ 1 file changed, 2 insertions(+) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index edb770c6..91c100ed 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -193,7 +193,9 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals); +int queueCnfParamsSet(struct cnfparamvals *pvals); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals); +void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); void qqueueSetDefaultsActionQueue(qqueue_t *pThis); void qqueueDbgPrint(qqueue_t *pThis); -- cgit v1.2.3 From 415b26d5a19d8b1fd50d8e0b7b29cc8527537316 Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Fri, 10 May 2013 15:39:42 +0200 Subject: enable shuffling of crypto parameters down through queue definition --- runtime/queue.h | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 886fac8d..9ed7f87d 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -30,6 +30,7 @@ #include "batch.h" #include "stream.h" #include "statsobj.h" +#include "cryprov.h" /* support for the toDelete list */ typedef struct toDeleteLst_s toDeleteLst_t; @@ -168,6 +169,11 @@ struct queue_s { strm_t *pReadDel; /* current file for deleting */ } disk; } tVars; + sbool useCryprov; /* quicker than checkig ptr (1 vs 8 bytes!) */ + uchar *cryprovName; /* crypto provider to use */ + cryprov_if_t cryprov; /* ptr to crypto provider interface */ + uchar *cryprovNameFull;/* full internal crypto provider name */ + void *cryprovData; /* opaque data ptr for provider use */ DEF_ATOMIC_HELPER_MUT(mutQueueSize); DEF_ATOMIC_HELPER_MUT(mutLogDeq); /* for statistics subsystem */ @@ -197,9 +203,8 @@ rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefi rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); -rsRetVal qqueueDoCnfParams(struct nvlst *lst, struct cnfparamvals **ppvals); -int queueCnfParamsSet(struct cnfparamvals *pvals); -rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct cnfparamvals *pvals); +int queueCnfParamsSet(struct nvlst *lst); +rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst); void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); void qqueueSetDefaultsActionQueue(qqueue_t *pThis); void qqueueDbgPrint(qqueue_t *pThis); -- cgit v1.2.3 From 0d000a8b1096abb26f9e47a4083dc560fed0282d Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Mon, 13 May 2013 08:04:13 +0200 Subject: basic queue file encryption --- runtime/queue.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'runtime/queue.h') diff --git a/runtime/queue.h b/runtime/queue.h index 9ed7f87d..844523ad 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -172,8 +172,8 @@ struct queue_s { sbool useCryprov; /* quicker than checkig ptr (1 vs 8 bytes!) */ uchar *cryprovName; /* crypto provider to use */ cryprov_if_t cryprov; /* ptr to crypto provider interface */ + void *cryprovData; /* opaque data ptr for provider use */ uchar *cryprovNameFull;/* full internal crypto provider name */ - void *cryprovData; /* opaque data ptr for provider use */ DEF_ATOMIC_HELPER_MUT(mutQueueSize); DEF_ATOMIC_HELPER_MUT(mutLogDeq); /* for statistics subsystem */ -- cgit v1.2.3