summaryrefslogtreecommitdiffstats
path: root/runtime/queue.h
diff options
context:
space:
mode:
authorRainer Gerhards <rgerhards@adiscon.com>2009-07-09 09:42:32 +0200
committerRainer Gerhards <rgerhards@adiscon.com>2009-07-09 09:42:32 +0200
commit3e4aa167eed33a37163446feb2f28efe3e4582d5 (patch)
treeb309d9fe6ab1c33f2e9df8512cf0216e2ce86271 /runtime/queue.h
parente2c9493b0d832078e181a6bce3373850df27d204 (diff)
parent9f286c0c4c21128c66305166ae379d3f7b07f673 (diff)
downloadrsyslog-3e4aa167eed33a37163446feb2f28efe3e4582d5.tar.gz
rsyslog-3e4aa167eed33a37163446feb2f28efe3e4582d5.tar.bz2
rsyslog-3e4aa167eed33a37163446feb2f28efe3e4582d5.zip
Merge branch 'master' into udpspoof
Conflicts: tools/omfwd.c
Diffstat (limited to 'runtime/queue.h')
-rw-r--r--runtime/queue.h111
1 files changed, 64 insertions, 47 deletions
diff --git a/runtime/queue.h b/runtime/queue.h
index 9e75b31b..e873c456 100644
--- a/runtime/queue.h
+++ b/runtime/queue.h
@@ -27,8 +27,18 @@
#include <pthread.h>
#include "obj.h"
#include "wtp.h"
+#include "batch.h"
#include "stream.h"
+/* support for the toDelete list */
+typedef struct toDeleteLst_s toDeleteLst_t;
+struct toDeleteLst_s {
+ qDeqID deqID;
+ int nElemDeq; /* numbe of elements that were dequeued and as such must now be discarded */
+ struct toDeleteLst_s *pNext;
+};
+
+
/* queue types */
typedef enum {
QUEUETYPE_FIXED_ARRAY = 0,/* a simple queue made out of a fixed (initially malloced) array fast but memoryhog */
@@ -54,14 +64,16 @@ typedef struct qWrkThrd_s {
pthread_mutex_t mut;
} qWrkThrd_t; /* type for queue worker threads */
+
/* the queue object */
typedef struct queue_s {
BEGINobjInstance;
queueType_t qType;
- int bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
- int bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
- int bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
- int bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */
+ int nLogDeq; /* number of elements currently logically dequeued */
+ bool bEnqOnly; /* does queue run in enqueue-only mode (1) or not (0)? */
+ bool bSaveOnShutdown;/* persists everthing on shutdown (if DA!)? 1-yes, 0-no */
+ bool bQueueStarted; /* has queueStart() been called on this queue? 1-yes, 0-no */
+ bool bQueueInDestruction;/* 1 if queue is in destruction process, 0 otherwise */
int iQueueSize; /* Current number of elements in the queue */
int iMaxQueueSize; /* how large can the queue grow? */
int iNumWorkerThreads;/* number of worker threads to use */
@@ -72,17 +84,20 @@ typedef struct queue_s {
void *pUsr; /* a global, user-supplied pointer. Is passed back to consumer. */
int iUpdsSincePersist;/* nbr of queue updates since the last persist call */
int iPersistUpdCnt; /* persits queue info after this nbr of updates - 0 -> persist only on shutdown */
+ bool bSyncQueueFiles;/* if working with files, sync them after each write? */
int iHighWtrMrk; /* high water mark for disk-assisted memory queues */
int iLowWtrMrk; /* low water mark for disk-assisted memory queues */
int iDiscardMrk; /* if the queue is above this mark, low-severity messages are discarded */
int iFullDlyMrk; /* if the queue is above this mark, FULL_DELAYable message are put on hold */
int iLightDlyMrk; /* if the queue is above this mark, LIGHT_DELAYable message are put on hold */
int iDiscardSeverity;/* messages of this severity above are discarded on too-full queue */
- int bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
+ bool bNeedDelQIF; /* does the QIF file need to be deleted when queue becomes empty? */
int toQShutdown; /* timeout for regular queue shutdown in ms */
int toActShutdown; /* timeout for long-running action shutdown in ms */
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
+ toDeleteLst_t *toDeleteLst;/* this queue's to-delete list */
int toEnq; /* enqueue timeout */
+ int iDeqBatchSize; /* max number of elements that shall be dequeued at once */
/* rate limiting settings (will be expanded) */
int iDeqSlowdown; /* slow down dequeue by specified nbr of microseconds */
/* end rate limiting */
@@ -96,18 +111,19 @@ typedef struct queue_s {
* applied to detect user configuration errors (and tell me how should we detect what
* the user really wanted...). -- rgerhards, 2008-04-02
*/
- /* ane dequeue time window */
- rsRetVal (*pConsumer)(void *,void*); /* user-supplied consumer function for dequeued messages */
+ /* end dequeue time window */
+ rsRetVal (*pConsumer)(void *,batch_t*); /* user-supplied consumer function for dequeued messages */
/* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the
- * user pointer that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 is pointer
- * to message)
- * rgerhards, 2008-01-28
+ * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2
+ * is pointer to an array of message message pointers)
*/
/* type-specific handlers (set during construction) */
rsRetVal (*qConstruct)(struct queue_s *pThis);
rsRetVal (*qDestruct)(struct queue_s *pThis);
rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
- rsRetVal (*qDel)(struct queue_s *pThis, void **ppUsr);
+ rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
+ rsRetVal (*qDel)(struct queue_s *pThis);
+ rsRetVal (*qUnDeqAll)(struct queue_s *pThis);
/* end type-specific handler */
/* synchronization variables */
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
@@ -131,35 +147,33 @@ typedef struct queue_s {
int iNumberFiles; /* how many files make up the queue? */
int64 iMaxFileSize; /* max size for a single queue file */
int64 sizeOnDiskMax; /* maximum size on disk allowed */
+ qDeqID deqIDAdd; /* next dequeue ID to use during add to queue store */
+ qDeqID deqIDDel; /* queue store delete position */
int bIsDA; /* is this queue disk assisted? */
int bRunsDA; /* is this queue actually *running* disk assisted? */
struct queue_s *pqDA; /* queue for disk-assisted modes */
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
int bDAEnqOnly; /* EnqOnly setting for DA queue */
- /* some data elements for the queueUngetObj() functionality. This list should always be short
- * and is always kept in memory
- */
- qLinkedList_t *pUngetRoot;
- qLinkedList_t *pUngetLast;
- int iUngottenObjs; /* number of objects currently in the "ungotten" list */
/* now follow queueing mode specific data elements */
union { /* different data elements based on queue type (qType) */
struct {
- long head, tail;
+ long deqhead, head, tail;
void** pBuf; /* the queued user data structure */
} farray;
struct {
- qLinkedList_t *pRoot;
+ qLinkedList_t *pDeqRoot;
+ qLinkedList_t *pDelRoot;
qLinkedList_t *pLast;
} linklist;
struct {
int64 sizeOnDisk; /* current amount of disk space used */
int64 bytesRead; /* number of bytes read from current (undeleted!) file */
- strm_t *pWrite; /* current file to be written */
- strm_t *pRead; /* current file to be read */
+ strm_t *pWrite; /* current file to be written */
+ strm_t *pReadDeq; /* current file for dequeueing */
+ strm_t *pReadDel; /* current file for deleting */
} disk;
} tVars;
-} queue_t;
+} qqueue_t;
/* some symbolic constants for easier reference */
#define QUEUE_MODE_ENQDEQ 0
@@ -176,30 +190,33 @@ typedef struct queue_s {
#define QUEUE_TIMEOUT_ETERNAL 24 * 60 * 60 * 1000
/* prototypes */
-rsRetVal queueDestruct(queue_t **ppThis);
-rsRetVal queueEnqObj(queue_t *pThis, flowControl_t flwCtlType, void *pUsr);
-rsRetVal queueStart(queue_t *pThis);
-rsRetVal queueSetMaxFileSize(queue_t *pThis, size_t iMaxFileSize);
-rsRetVal queueSetFilePrefix(queue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
-rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iWorkerThreads,
- int iMaxQueueSize, rsRetVal (*pConsumer)(void*,void*));
-PROTOTYPEObjClassInit(queue);
-PROTOTYPEpropSetMeth(queue, iPersistUpdCnt, int);
-PROTOTYPEpropSetMeth(queue, iDeqtWinFromHr, int);
-PROTOTYPEpropSetMeth(queue, iDeqtWinToHr, int);
-PROTOTYPEpropSetMeth(queue, toQShutdown, long);
-PROTOTYPEpropSetMeth(queue, toActShutdown, long);
-PROTOTYPEpropSetMeth(queue, toWrkShutdown, long);
-PROTOTYPEpropSetMeth(queue, toEnq, long);
-PROTOTYPEpropSetMeth(queue, iHighWtrMrk, int);
-PROTOTYPEpropSetMeth(queue, iLowWtrMrk, int);
-PROTOTYPEpropSetMeth(queue, iDiscardMrk, int);
-PROTOTYPEpropSetMeth(queue, iDiscardSeverity, int);
-PROTOTYPEpropSetMeth(queue, iMinMsgsPerWrkr, int);
-PROTOTYPEpropSetMeth(queue, bSaveOnShutdown, int);
-PROTOTYPEpropSetMeth(queue, pUsr, void*);
-PROTOTYPEpropSetMeth(queue, iDeqSlowdown, int);
-PROTOTYPEpropSetMeth(queue, sizeOnDiskMax, int64);
-#define queueGetID(pThis) ((unsigned long) pThis)
+rsRetVal qqueueDestruct(qqueue_t **ppThis);
+rsRetVal qqueueMultiEnqObj(qqueue_t *pThis, multi_submit_t *pMultiSub);
+rsRetVal qqueueEnqObj(qqueue_t *pThis, flowControl_t flwCtlType, void *pUsr);
+rsRetVal qqueueStart(qqueue_t *pThis);
+rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize);
+rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
+rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads,
+ int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*));
+PROTOTYPEObjClassInit(qqueue);
+PROTOTYPEpropSetMeth(qqueue, iPersistUpdCnt, int);
+PROTOTYPEpropSetMeth(qqueue, bSyncQueueFiles, int);
+PROTOTYPEpropSetMeth(qqueue, iDeqtWinFromHr, int);
+PROTOTYPEpropSetMeth(qqueue, iDeqtWinToHr, int);
+PROTOTYPEpropSetMeth(qqueue, toQShutdown, long);
+PROTOTYPEpropSetMeth(qqueue, toActShutdown, long);
+PROTOTYPEpropSetMeth(qqueue, toWrkShutdown, long);
+PROTOTYPEpropSetMeth(qqueue, toEnq, long);
+PROTOTYPEpropSetMeth(qqueue, iHighWtrMrk, int);
+PROTOTYPEpropSetMeth(qqueue, iLowWtrMrk, int);
+PROTOTYPEpropSetMeth(qqueue, iDiscardMrk, int);
+PROTOTYPEpropSetMeth(qqueue, iDiscardSeverity, int);
+PROTOTYPEpropSetMeth(qqueue, iMinMsgsPerWrkr, int);
+PROTOTYPEpropSetMeth(qqueue, bSaveOnShutdown, int);
+PROTOTYPEpropSetMeth(qqueue, pUsr, void*);
+PROTOTYPEpropSetMeth(qqueue, iDeqSlowdown, int);
+PROTOTYPEpropSetMeth(qqueue, sizeOnDiskMax, int64);
+PROTOTYPEpropSetMeth(qqueue, iDeqBatchSize, int);
+#define qqueueGetID(pThis) ((unsigned long) pThis)
#endif /* #ifndef QUEUE_H_INCLUDED */