summaryrefslogtreecommitdiffstats
path: root/runtime/wtp.h
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/wtp.h')
-rw-r--r--runtime/wtp.h23
1 files changed, 11 insertions, 12 deletions
diff --git a/runtime/wtp.h b/runtime/wtp.h
index 1ce171cc..6b4054b8 100644
--- a/runtime/wtp.h
+++ b/runtime/wtp.h
@@ -30,14 +30,12 @@
/* commands and states for worker threads. */
typedef enum {
eWRKTHRD_STOPPED = 0, /* worker thread is not running (either actually never ran or was shut down) */
- eWRKTHRD_TERMINATING = 1,/* worker thread has shut down, but some finalzing is still needed */
/* ALL active states MUST be numerically higher than eWRKTHRD_TERMINATED and NONE must be lower! */
- eWRKTHRD_RUN_CREATED = 2,/* worker thread has been created, but not yet begun initialization (prob. not yet scheduled) */
- eWRKTHRD_RUN_INIT = 3, /* worker thread is initializing, but not yet fully running */
+ eWRKTHRD_RUN_CREATED = 2,/* worker thread has been created, but is not fully running (prob. not yet scheduled) */
eWRKTHRD_RUNNING = 4, /* worker thread is up and running and shall continue to do so */
eWRKTHRD_SHUTDOWN = 5, /* worker thread is running but shall terminate when wtp is empty */
eWRKTHRD_SHUTDOWN_IMMEDIATE = 6/* worker thread is running but shall terminate even if wtp is full */
- /* SHUTDOWN_IMMEDIATE MUST alsways be the numerically highest state! */
+ /* SHUTDOWN_IMMEDIATE MUST always be the numerically highest state! */
} qWrkCmd_t;
@@ -50,7 +48,7 @@ typedef enum {
/* the worker thread pool (wtp) object */
-typedef struct wtp_s {
+struct wtp_s {
BEGINobjInstance;
wtpState_t wtpState;
int iNumWorkerThreads;/* number of worker threads to use */
@@ -60,18 +58,18 @@ typedef struct wtp_s {
bool bInactivityGuard;/* prevents inactivity due to race condition */
rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dewtpd messages */
/* synchronization variables */
- pthread_mutex_t mutThrdShutdwn; /* mutex to guard thread shutdown processing */
pthread_mutex_t mut; /* mutex for the wtp's thread management */
pthread_cond_t condThrdTrm;/* signalled when threads terminate */
- int bThrdStateChanged; /* at least one thread state has changed if 1 */
/* end sync variables */
/* user objects */
- void *pUsr; /* pointer to user object */
+ void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */
pthread_mutex_t *pmutUsr;
pthread_cond_t *pcondBusy; /* condition the user will signal "busy again, keep runing" on (awakes worker) */
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
+ rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
+ rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */
rsRetVal (*pfRateLimiter)(void *pUsr);
- rsRetVal (*pfIsIdle)(void *pUsr, int);
+ rsRetVal (*pfIsIdle)(void *pUsr, wtp_t *pWtp);
rsRetVal (*pfDoWork)(void *pUsr, void *pWti, int);
rsRetVal (*pfOnIdle)(void *pUsr, int);
rsRetVal (*pfOnWorkerCancel)(void *pUsr, void*pWti);
@@ -79,7 +77,7 @@ typedef struct wtp_s {
rsRetVal (*pfOnWorkerShutdown)(void *pUsr);
/* end user objects */
uchar *pszDbgHdr; /* header string for debug messages */
-} wtp_t;
+};
/* some symbolic constants for easier reference */
@@ -97,14 +95,15 @@ rsRetVal wtpWakeupWrkr(wtp_t *pThis);
rsRetVal wtpWakeupAllWrkr(wtp_t *pThis);
rsRetVal wtpCancelAll(wtp_t *pThis);
rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
-rsRetVal wtpSignalWrkrTermination(wtp_t *pWtp);
rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout);
int wtpGetCurNumWrkr(wtp_t *pThis, int bLockMutex);
PROTOTYPEObjClassInit(wtp);
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
-PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, int));
+PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*));
+PROTOTYPEpropSetMethFP(wtp, pfIsIdle, rsRetVal(*pVal)(void*, wtp_t*));
PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*, int));
+PROTOTYPEpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*));
PROTOTYPEpropSetMethFP(wtp, pfOnIdle, rsRetVal(*pVal)(void*, int));
PROTOTYPEpropSetMethFP(wtp, pfOnWorkerCancel, rsRetVal(*pVal)(void*,void*));
PROTOTYPEpropSetMethFP(wtp, pfOnWorkerStartup, rsRetVal(*pVal)(void*));