1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
<html>
<head>
<title>rsyslog queue object</title>
</head>
<body>
<h1>The rsyslog queue object</h1>
<p>This page reflects the status as of 2008-01-17. The documentation is still incomplete.
Target audience is developers and users who would like to get an in-depth understanding of
queues as used in <a href="http://www.rsyslog.com/">rsyslog</a>.</p>
<p><b>Please note that this document is outdated and does not longer reflect the
specifics of the queue object. However, I have decided to leave it in the doc
set, as the overall picture provided still is quite OK. I intend to update this
document somewhat later when I have reached the "store-and-forward" milestone.</b></p>
<h1>Some definitions</h1>
<p>A queue is DA-enabled if it is configured to use disk-assisted mode when
there is need to. A queue is in DA mode (or DA run mode), when it actually runs
disk assisted.</p>
<h1>Implementation Details</h1>
<h2>Disk-Assisted Mode</h2>
<p>Memory-Type queues may utilize disk-assisted (DA) mode. DA mode is enabled
whenever a queue file name prefix is provided. This is called DA-enabled mode.
If DA-enabled, the queue operates as a regular memory queue until a high water
mark is reached. If that happens, the queue activates disk assistance (called
"runs disk assisted" or "runs DA" - you can find that often in source file
comments). To do so, it creates a helper queue instance (the DA queue). At that
point, there are two queues running - the primary queue's consumer changes to a
shuffle-to-DA-queue consumer and the original primary consumer is assigned to
the DA queue. Existing and new messages are spooled to the disk queue, where the
DA worker takes them from and passes them for execution to the actual consumer.
In essence, the primary queue has now become a memory buffer for the DA queue.
The primary queue will be drained until a low water mark is reached. At that
point, processing is held. New messages enqueued to the primary queue will not
be processed but kept in memory. Processing resumes when either the high water
mark is reached again or the DA queue indicates it is empty. If the DA queue is
empty, it is shut down and processing of the primary queue continues as a
regular in-memory queue (aka "DA mode is shut down"). The whole thing iterates
once the high water mark is hit again.</p>
<p>There is one special case: if the primary queue is shut down and could not
finish processing all messages within the configured timeout periods, the DA
queue is instantiated to take up the remaining messages. These will be preserved
and be processed during the next run. During that period, the DA queue runs in
"enqueue-only" mode and does not execute any consumer. Draining the primary
queue is typically very fast. If that behaviour is not desired, it can be turned
of via parameters. In that case, any remaining in-memory messages are lost.</p>
<p>Due to the fact that when running DA two queues work closely together and
worker threads (including the DA worker) may shut down at any time (due to
timeout), processing synchronization and startup and shutdown is somewhat
complex. I'll outline the exact conditions and steps down here. I also do this
so that I know clearly what to develop to, so please be patient if the
information is a bit too in-depth ;)</p>
<h2>DA Run Mode Initialization</h2>
<p>Three cases:</p>
<ol>
<li>any time during queueEnqObj() when the high water mark is hit</li>
<li>at queue startup if there is an on-disk queue present (presence of QI
file indicates presence of queue data)</li>
<li>at queue shutdown if remaining in-memory data needs to be persisted to
disk</li>
</ol>
<p>In <b>case 1</b>, the worker pool is running. When switching to DA mode, all
regular workers are sent termination commands. The DA worker is initiated.
Regular workers may run in parallel to the DA worker until they terminate.
Regular workers shall terminate as soon as their current consumer has completed.
They shall not execute the DA consumer.</p>
<p>In <b>case 2</b>, the worker pool is not yet running and is NOT started. The
DA worker is initiated.</p>
<p>In <b>case 3</b>, the worker pool is already shut down. The DA worker is
initiated. The DA queue runs in enqueue-only mode.</p>
<p>In all cases, the DA worker starts up and checks if DA mode is already fully
initialized. If not, it initializes it, what most importantly means construction
of the queue.</p>
<p>Then, regular worker processing is carried out. That is, the queue worker
will wait on empty queue and terminate after an timeout. However, If any message
is received, the DA consumer is executed. That consumer checks the low water
mark. If the low water mark is reached, it stops processing until either the
high water mark is reached again or the DA queue indicates it is empty (there is
a pthread_cond_t for this synchronization).</p>
<p>In theory, a <b>case-2</b> startup could lead to the worker becoming inactive
and terminating while waiting on the primary queue to fill. In practice, this is
highly unlikely (but only for the main message queue) because rsyslog issues a
startup message. HOWEVER, we can not rely on that, it would introduce a race. If
the primary rsyslog thread (the one that issues the message) is scheduled very
late and there is a low inactivty timeout for queue workers, the queue worker
may terminate before the startup message is issued. And if the on-disk queue
holds only a few messages, it may become empty before the DA worker is
re-initiated again. So it is possible that the DA run mode termination criteria
occurs while no DA worker is running on the primary queue.</p>
<p>In cases 1 and 3, the DA worker can never become inactive without hitting the
DA shutdown criteria. In <b>case 1</b>, it either shuffles messages from the
primary to the DA queue or it waits because it has the hit low water mark. </p>
<p>In <b>case 3</b>, it always shuffles messages between the queues (because,
that's the sole purpose of that run). In order for this to happen, the high
water mark has been set to the value of 1 when DA run mode has been initialized.
This ensures that the regular logic can be applied to drain the primary queue.
To prevent a hold due to reaching the low water mark, that mark must be changed
to 0 before the DA worker starts.</p>
<h2>DA Run Mode Shutdown</h2>
<p>In essence, DA run mode is terminated when the DA queue is empty and the
primary worker queue size is below the high water mark. It is also terminated
when the primary queue is shut down. The decision to switch back to regular
(non-DA) run mode is typically made by the DA worker. If it switches, the DA
queue is destructed and the regular worker pool is restarted. In some cases, the
queue shutdown process may initiate the "switch" (in this case more or less a
clean shutdown of the DA queue).</p>
<p>One might think that it would be more natural for the DA queue to detect
being idle and shut down itself. However, there are some issues associated with
that. Most importantly, all queue worker threads need to be shut down during
queue destruction. Only after that has happend, final destruction steps can
happen (else we would have a myriad of races). However, it is the DA queues
worker thread that detects it is empty (empty queue detection always happens at
the consumer side and must so). That would lead to the DA queue worker thread to
initiate DA queue destruction which in turn would lead to that very same thread
being canceled (because workers must shut down before the queue can be
destructed). Obviously, this does not work out (and I didn't even mention the
other issues - so let's forget about it). As such, the thread that enqueues
messages must destruct the queue - and that is the primary queue's DA worker
thread.</p>
<p>There are some subleties due to thread synchronization and the fact that the
DA consumer may not be running (in a <b>case-2 startup</b>). So it is not
trivial to reliably change the queue back from DA run mode to regular run mode.
The priority is a clean switch. We accept the fact that there may be situations
where we cleanly shut down DA run mode, just to re-enable it with the very next
message being enqueued. While unlikely, this will happen from time to time and
is considered perfectly legal. We can't predict the future and it would
introduce too great complexity to try to do something against that (that would
most probably even lead to worse performance under regular conditions).</p>
<p>The primary queue's DA worker thread may wait at two different places:</p>
<ol>
<li>after reaching the low water mark and waiting for either high water or
DA queue empty</li>
<li>at the regular pthread_cond_wait() on an empty primary queue</li>
</ol>
<p>Case 2 is unlikely, but may happen (see info above on a case 2 startup).</p>
<p><b>The DA worker may also not wait at all,</b> because it is actively
executing and shuffeling messages between the queues. In that case, however, the
program flow passes both of the two wait conditions but simply does not wait.</p>
<p><b>Finally, the DA worker may be inactive </b>(again, with a case-2 startup).
In that case no work(er) at all is executed. Most importantly, without the DA
worker being active, nobody will ever detect the need to change back to regular
mode. If we have this situation, the very next message enqueued will cause the
switch, because then the DA run mode shutdown criteria is met. However, it may
take close to eternal for this message to arrive. During that time, disk and
memory resources for the DA queue remain allocated. This also leaves processing
in a sub-optimal state and it may take longer than necessary to switch back to
regular queue mode when a message burst happens. In extreme cases, this could
even lead to shutdown of DA run mode, which takes so long that the high water
mark is passed and DA run mode is immediately re-initialized - while with an
immediate switch, the message burst may have been able to be processed by the
in-memory queue without DA support.</p>
<p>So in short, it is desirable switch to regular run mode as soon as possible.
To do this, we need an active DA worker. The easy solution is to initiate DA
worker startup from the DA queue's worker once it detects empty condition. To do
so, the DA queue's worker must call into a "<i>DA worker startup initiation</i>"
routine inside the main queue. As a reminder, the DA worker will most probably
not receive the "DA queue empty" signal in that case, because it will be long
sent (in most cases) before the DA worker even waits for it. So <b>it is vital
that DA run mode termination checks be done in the DA worker before it goes into
any wait condition</b>.</p>
<p>Please note that the "<i>DA worker startup initiation</i>" routine may be
called concurrently from multiple initiators. <b>To prevent a race, it must be
guarded by the queue mutex </b>and return without any action (and no error
code!) if the DA worker is already initiated.</p>
<p>All other cases can be handled by checking the termination criteria
immediately at the start of the worker and then once again for each run. The
logic follows this simplified flow diagram:</p>
<p align="center"><a href="queueWorkerLogic.jpg">
<img border="0" src="queueWorkerLogic_small.jpg" width="431" height="605"></a></p>
<p>Some of the more subtle aspects of worker processing (e.g. enqueue thread
signaling and other fine things) have been left out in order to get the big
picture. What is called "check DA mode switchback..." right after "worker init"
is actually a check for the worker's termination criteria. Typically, <b>the
worker termination criteria is a shutdown request</b>. However, <b>for a DA
worker, termination is also requested if the queue size is below the high water
mark AND the DA queue is empty</b>. There is also a third termination criteria
and it is not even on the chart: that is the inactivity timeout, which exists in
all modes. Note that while the inactivity timeout shuts down a thread, it
logically does not terminate the worker pool (or DA worker): workers are
restarted on an as-needed basis. However, inactivity timeouts are very important
because they require us to restart workers in some situations where we may
expect a running one. So always keep them on your mind.</p>
<h2>Queue Destruction</h2>
<p>Now let's consider <b>the case of destruction of the primary queue. </b>During
destruction, our focus is on loosing as few messages as possible. If the
queue is not DA-enabled, there is nothing but the configured timeouts to handle
that situation. However, with a DA-enabled queue there are more options.</p>
<p>If the queue is DA-enabled, it may be <i>configured to persist messages to
disk before it is terminated</i>. In that case, loss of messages never occurs
(at the price of a potentially lengthy shutdown). Even if that setting is not
applied, the queue should drain as many messages as possible to the disk. For
that reason, it makes no sense to wait on a low water mark. Also, if the queue
is already in DA run mode, it does not make any sense to switch back to regular
run mode during termination and then try to process some messages via the
regular consumer. It is much more appropriate the try completely drain the queue
during the remaining timeout period. For the same reason, it is preferred that
no new consumers be activated (via the DA queue's worker), as they only cost
valuable CPU cycles and, more importantly, would potentially be long(er)-running
and possibly be needed to be cancelled. To prevent all of that, <b>queue
parameters are changed for DA-enabled queues:</b> the high water mark is to 1
and the low water mark to 0 on the primary queue. The DA queue is commanded to
run in enqueue-only mode. If the primary queue is <i>configured to persist
messages to disk before it is terminated</i>, its SHUTDOWN timeout is changed to
to eternal. These parameters will cause the queue to drain as much as possible
to disk (and they may cause a case 3 DA run mode initiation). Please note that
once the primary queue has been drained, the DA queue's worker will
automatically switch back to regular (non-DA) run mode. <b>It must be ensured
that no worker cancellation occurs during that switchback</b>. Please note that
the queue may not switch back to regular run mode if it is not <i>configured to
persist messages to disk before it is terminated</i>. In order to apply the new
parameters, <b>worker threads must be awakened.</b> Remember we may not be in DA
run mode at this stage. In that case, the regular workers must be awakened, which
then will switch to DA run mode. No worker may be active, in that case one must
be initiated. If in DA run mode and the DA worker is inactive, the "<i>DA
worker startup initiation</i>" must be called to activate it. That routine
ensures only one DA worker is started even with multiple concurrent callers -
this may be the case here. The DA queue's worker may have requested DA worker
startup in order to terminate on empty queue (which will probably not be honored
as we have changed the low water mark).</p>
<p>After all this is done, the queue destructor requests termination of the
queue's worker threads. It will use the normal timeouts and potentially cancel
too-long running worker threads. <b>The shutdown process must ensure that all
workers reach running state before they are commanded to terminate</b>.
Otherwise it may run into a race condition that could lead to a false shutdown
with workers running asynchronously. As a few workers may have just been started
to initialize (to apply new parameter settings), the probability for this race
condition is extremely high, especially on single-CPU systems.</p>
<p>After all workers have been shut down (or cancelled), the queue may still be
in DA run mode. If so, this must be terminated, which now can simply be done by
destructing the DA queue object. This is not a real switchback to regular run
mode, but that doesn't matter because the queue object will soon be gone away.</p>
<p>Finally, the queue is mostly shut down and ready to be actually destructed.
As a last try, the queuePersists() entry point is called. It is used to persists
a non-DA-enabled queue in whatever way is possible for that queue. There may be
no implementation for the specific queue type. Please note that this is not just
a theoretical construct. This is an extremely important code path when the DA
queue itself is destructed. Remember that it is a queue object in its own right.
The DA queue is obviously not DA-enabled, so it calls into queuePersists()
during its destruction - this is what enables us to persist the disk queue!</p>
<p>After that point, left over queue resources (mutexes, dynamic memory, ...)
are freed and the queue object is actually destructed.</p>
<h2>Copyright</h2>
<p>Copyright (c) 2008 <a href="http://www.gerhards.net/rainer">Rainer Gerhards</a>
and <a href="http://www.adiscon.com/en/">Adiscon</a>.</p>
<p>Permission is granted to copy, distribute and/or modify this document under
the terms of the GNU Free Documentation License, Version 1.2 or any later
version published by the Free Software Foundation; with no Invariant Sections,
no Front-Cover Texts, and no Back-Cover Texts. A copy of the license can be
viewed at <a href="http://www.gnu.org/copyleft/fdl.html">
http://www.gnu.org/copyleft/fdl.html</a>.</p>
</body>
</html>
|