summaryrefslogtreecommitdiffstats
path: root/runtime/batch.h
blob: 68f48d8b94303a6e074826375902563e92809d92 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
/* Definition of the batch_t data structure.
 * I am not sure yet if this will become a full-blown object. For now, this header just
 * includes the object definition and is not accompanied by code.
 *
 * Copyright 2009 by Rainer Gerhards and Adiscon GmbH.
 *
 * This file is part of the rsyslog runtime library.
 *
 * The rsyslog runtime library is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * The rsyslog runtime library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with the rsyslog runtime library.  If not, see <http://www.gnu.org/licenses/>.
 *
 * A copy of the GPL can be found in the file "COPYING" in this distribution.
 * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
 */

#ifndef BATCH_H_INCLUDED
#define BATCH_H_INCLUDED

#include <string.h>
#include "msg.h"

/* enum for batch states. Actually, we violate a layer here, in that we assume that a batch is used
 * for action processing. So far, this seems acceptable, the status is simply ignored inside the
 * main message queue. But over time, it could potentially be useful to split the two.
 * rgerhad, 2009-05-12
 */
typedef enum {
	BATCH_STATE_RDY  = 0,	/* object ready for processing */
	BATCH_STATE_BAD  = 1,	/* unrecoverable failure while processing, do NOT resubmit to same action */
	BATCH_STATE_SUB  = 2,	/* message submitted for processing, outcome yet unknown */
	BATCH_STATE_COMM = 3,	/* message successfully commited */
	BATCH_STATE_DISC = 4, 	/* discarded - processed OK, but do not submit to any other action */
} batch_state_t;


/* an object inside a batch, including any information (state!) needed for it to "life".
 */
struct batch_obj_s {
	obj_t *pUsrp;		/* pointer to user object (most often message) */
	batch_state_t state;	/* associated state */
	/* work variables for action processing; these are reused for each action (or block of
	 * actions)
	 */
	sbool bFilterOK;	/* work area for filter processing (per action, reused!) */
	sbool bPrevWasSuspended;
	void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
				/* a cache to save malloc(), if not absolutely necessary */
	size_t staticLenParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
				/* and the same for the message length (if used) */
	/* end action work variables */
};

/* the batch
 * This object is used to dequeue multiple user pointers which are than handed over
 * to processing. The size of elements is fixed after queue creation, but may be 
 * modified by config variables (better said: queue properties).
 * Note that a "user pointer" in rsyslog context so far always is a message 
 * object. We stick to the more generic term because queues may potentially hold
 * other types of objects, too.
 * rgerhards, 2009-05-12
 * Note that nElem is not necessarily equal to nElemDeq. This is the case when we
 * discard some elements (because of configuration) during dequeue processing. As
 * all Elements are only deleted when the batch is processed, we can not immediately
 * delete them. So we need to keep their number that we can delete them when the batch
 * is completed (else, the whole process does not work correctly).
 */
struct batch_s {
	int maxElem;		/* maximum number of elements that this batch supports */
	int nElem;		/* actual number of element in this entry */
	int nElemDeq;		/* actual number of elements dequeued (and thus to be deleted) - see comment above! */
	int iDoneUpTo;		/* all messages below this index have state other than RDY */
	qDeqID	deqID;		/* ID of dequeue operation that generated this batch */
	int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
	sbool bSingleRuleset;	/* do all msgs of this batch use a single ruleset? */
	batch_obj_t *pElem;	/* batch elements */
};


/* some inline functions (we may move this off to an object .. or not) */
static inline void
batchSetSingleRuleset(batch_t *pBatch, sbool val) {
	pBatch->bSingleRuleset = val;
}

/* get the batches ruleset (if we have a single ruleset) */
static inline ruleset_t*
batchGetRuleset(batch_t *pBatch) {
	return (pBatch->nElem > 0) ? ((msg_t*) pBatch->pElem[0].pUsrp)->pRuleset : NULL;
}

/* get the ruleset of a specifc element of the batch (index not verified!) */
static inline ruleset_t*
batchElemGetRuleset(batch_t *pBatch, int i) {
	return ((msg_t*) pBatch->pElem[i].pUsrp)->pRuleset;
}

/* get number of msgs for this batch */
static inline int
batchNumMsgs(batch_t *pBatch) {
	return pBatch->nElem;
}


/* set the status of the i-th batch element. Note that once the status is
 * DISC, it will never be reset. So this function can NOT be used to initialize
 * the state table. -- rgerhards, 2010-06-10
 */
static inline void
batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) {
	if(pBatch->pElem[i].state != BATCH_STATE_DISC)
		pBatch->pElem[i].state = newState;
}


/* check if an element is a valid entry. We do NOT verify if the
 * element index is valid. -- rgerhards, 2010-06-10
 */
static inline int
batchIsValidElem(batch_t *pBatch, int i) {
	return(pBatch->pElem[i].bFilterOK && pBatch->pElem[i].state != BATCH_STATE_DISC);
}


/* copy one batch element to another.
 * This creates a complete duplicate in those cases where
 * it is needed. Use duplication only when absolutely necessary!
 * rgerhards, 2010-06-10
 */
static inline void
batchCopyElem(batch_obj_t *pDest, batch_obj_t *pSrc) {
	memcpy(pDest, pSrc, sizeof(batch_obj_t));
}


/* free members of a batch "object". Note that we can not do the usual
 * destruction as the object typically is allocated on the stack and so the
 * object itself cannot be freed! -- rgerhards, 2010-06-15
 */
static inline void
batchFree(batch_t *pBatch) {
	int i;
	int j;
	for(i = 0 ; i < pBatch->maxElem ; ++i) {
		for(j = 0 ; j < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++j) {
			free(pBatch->pElem[i].staticActParams[j]);
		}
	}
	free(pBatch->pElem);
}


/* initialiaze a batch "object". The record must already exist,
 * we "just" initialize it. The max number of elements must be
 * provided. -- rgerhards, 2010-06-15
 */
static inline rsRetVal
batchInit(batch_t *pBatch, int maxElem) {
	DEFiRet;
	pBatch->maxElem = maxElem;
	CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t)));
	// TODO: replace calloc by inidividual writes?
finalize_it:
	RETiRet;
}


/* primarily a helper for debug purposes, get human-readble name of state */
static inline char *
batchState2String(batch_state_t state) {
	switch(state) {
	case BATCH_STATE_RDY:
		return "BATCH_STATE_RDY";
	case BATCH_STATE_BAD:
		return "BATCH_STATE_BAD";
	case BATCH_STATE_SUB:
		return "BATCH_STATE_SUB";
	case BATCH_STATE_COMM:
		return "BATCH_STATE_COMM";
	case BATCH_STATE_DISC:
		return "BATCH_STATE_DISC";
	}
	return "ERROR, batch state not known!";
}
#endif /* #ifndef BATCH_H_INCLUDED */