summaryrefslogtreecommitdiffstats
path: root/runtime/wti.h
blob: adc7897c1fb06959b5ace7ec79937a9f02777451 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/* Definition of the worker thread instance (wti) class.
 *
 * Copyright 2008-2013 Adiscon GmbH.
 *
 * This file is part of the rsyslog runtime library.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *       http://www.apache.org/licenses/LICENSE-2.0
 *       -or-
 *       see COPYING.ASL20 in the source distribution
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef WTI_H_INCLUDED
#define WTI_H_INCLUDED

#include <pthread.h>
#include "wtp.h"
#include "obj.h"
#include "batch.h"
#include "action.h"


#define ACT_STATE_RDY  0	/* action ready, waiting for new transaction */
#define ACT_STATE_ITX  1	/* transaction active, waiting for new data or commit */
#define ACT_STATE_COMM 2 	/* transaction finished (a transient state) */
#define ACT_STATE_RTRY 3	/* failure occured, trying to restablish ready state */
#define ACT_STATE_SUSP 4	/* suspended due to failure (return fail until timeout expired) */
/* note: 3 bit bit field --> highest value is 7! */

/* The following structure defines immutable parameters which need to
 * be passed as action parameters. Note that the current implementation
 * does NOT focus on performance, but on a simple PoC in order to get
 * things going. TODO: Once it works, revisit this code and think about
 * an array implementation. We also need to support other passing modes
 * as well. -- gerhards, 2013-11-04
 */
typedef struct actWrkrIParams {
	int msgFlags;
	/* following are caches to save allocs if not absolutely necessary */
	uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */
				/* a cache to save malloc(), if not absolutely necessary */
	unsigned staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE];
				/* and the same for the message length (if used) */
	void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
} actWrkrIParams_t;

typedef struct actWrkrInfo {
	action_t *pAction;
	void *actWrkrData;
	uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an immediate failure following */
	int	iNbrResRtry;	/* number of retries since last suspend */
	struct {
		unsigned actState : 3;
	} flags;
	actWrkrIParams_t *iparams;
	int currIParam;
	int maxIParams;	/* current max */
	void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /* for non-strings */
} actWrkrInfo_t;

/* the worker thread instance class */
struct wti_s {
	BEGINobjInstance;
	pthread_t thrdID; 	/* thread ID */
	int bIsRunning;	/* is this thread currently running? (must be int for atomic op!) */
	sbool bAlwaysRunning;	/* should this thread always run? */
	int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
	wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
	batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */
	uchar *pszDbgHdr;	/* header string for debug messages */
	actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions (sized for max nbr of actions in config!) */
	pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */
	DEF_ATOMIC_HELPER_MUT(mutIsRunning);
	struct {
		uint8_t bPrevWasSuspended;
		uint8_t bDoAutoCommit; /* do a commit after each message
		                        * this is usually set for batches with 0 element, but may
					* also be added as a user-selectable option (not implemented yet)
					*/
	} execState;	/* state for the execution engine */
};


/* prototypes */
rsRetVal wtiConstruct(wti_t **ppThis);
rsRetVal wtiConstructFinalize(wti_t *pThis);
rsRetVal wtiDestruct(wti_t **ppThis);
rsRetVal wtiWorker(wti_t *pThis);
rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg);
rsRetVal wtiCancelThrd(wti_t *pThis);
rsRetVal wtiSetAlwaysRunning(wti_t *pThis);
rsRetVal wtiSetState(wti_t *pThis, sbool bNew);
rsRetVal wtiWakeupThrd(wti_t *pThis);
sbool wtiGetState(wti_t *pThis);
wti_t *wtiGetDummy(void);
PROTOTYPEObjClassInit(wti);
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);

static inline uint8_t
getActionStateByNbr(wti_t *pWti, int iActNbr)
{
	return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState);
}

static inline uint8_t
getActionState(wti_t *pWti, action_t *pAction)
{
	return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState);
}

static inline void
setActionState(wti_t *pWti, action_t *pAction, uint8_t newState)
{
	pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState;
}

static inline uint16_t
getActionResumeInRow(wti_t *pWti, action_t *pAction)
{
	return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow);
}

static inline void
setActionResumeInRow(wti_t *pWti, action_t *pAction, uint16_t val)
{
	pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val;
}

static inline void
incActionResumeInRow(wti_t *pWti, action_t *pAction)
{
	pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++;
}

static inline int
getActionNbrResRtry(wti_t *pWti, action_t *pAction)
{
	return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry);
}

static inline void
setActionNbrResRtry(wti_t *pWti, action_t *pAction, uint16_t val)
{
	pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val;
}

static inline void
incActionNbrResRtry(wti_t *pWti, action_t *pAction)
{
	pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++;
}

static inline rsRetVal
wtiNewIParam(wti_t *pWti, action_t *pAction, actWrkrIParams_t **piparams)
{
	actWrkrInfo_t *wrkrInfo;
	actWrkrIParams_t *iparams;
	int newMax;
	DEFiRet;

	wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
	if(wrkrInfo->currIParam == wrkrInfo->maxIParams) {
		/* we need to extend */
		dbgprintf("DDDD: extending iparams, max %d\n", wrkrInfo->maxIParams);
		newMax = (wrkrInfo->maxIParams == 0) ? CONF_IPARAMS_BUFSIZE : 2 * wrkrInfo->maxIParams;
		CHKmalloc(iparams = realloc(wrkrInfo->iparams, sizeof(actWrkrIParams_t) * newMax));
		wrkrInfo->iparams = iparams;
		wrkrInfo->maxIParams = newMax;
	}
dbgprintf("DDDD: adding param  %d for action %d\n", wrkrInfo->currIParam, pAction->iActionNbr);
	iparams = wrkrInfo->iparams + wrkrInfo->currIParam;
	memset(iparams, 0, sizeof(actWrkrIParams_t));
	*piparams = iparams;
	++wrkrInfo->currIParam;

finalize_it:
	RETiRet;
}

static inline void
wtiResetExecState(wti_t *pWti, batch_t *pBatch)
{
	pWti->execState.bPrevWasSuspended = 0;
	pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1);
}
#endif /* #ifndef WTI_H_INCLUDED */