summaryrefslogtreecommitdiffstats
path: root/lib/puppet/external/event-loop/event-loop.rb
blob: dc51a55ae012f77ac8e39430a1c0ce18428842ff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
## event-loop.rb --- high-level IO multiplexer
# Copyright (C) 2005  Daniel Brockman

# This program is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation;
# either version 2 of the License, or (at your option) any
# later version.

# This file is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.

# You should have received a copy of the GNU General Public
# License along with this program; if not, write to the Free
# Software Foundation, 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301, USA.

require "puppet/external/event-loop/better-definers"
require "puppet/external/event-loop/signal-system"

require "fcntl"

class EventLoop
  include SignalEmitter

  IO_STATES = [:readable, :writable, :exceptional]

  class << self
    def default ; @default ||= new end
    def default= x ; @default = x end

    def current
      Thread.current["event-loop::current"] || default end
    def current= x
      Thread.current["event-loop::current"] = x end

    def with_current (new)
      if current == new
        yield
      else
        begin
          old = self.current
          self.current = new
          yield
        ensure
          self.current = old
        end
      end
    end

    def method_missing (name, *args, &block)
      if current.respond_to? name
        current.__send__(name, *args, &block)
      else
        super
      end
    end
  end

  define_signals :before_sleep, :after_sleep

  def initialize
    @running = false
    @awake = false
    @wakeup_time = nil
    @timers = []

    @io_arrays = [[], [], []]
    @ios = Hash.new do |h, k| raise ArgumentError,
      "invalid IO event: #{k}", caller(2) end
    IO_STATES.each_with_index { |x, i| @ios[x] = @io_arrays[i] }

    @notify_src, @notify_snk = IO.pipe

    # prevent file descriptor leaks
    @notify_src.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
    @notify_snk.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)

    @notify_src.will_block = false
    @notify_snk.will_block = false

    # Each time a byte is sent through the notification pipe
    # we need to read it, or IO.select will keep returning.
    monitor_io(@notify_src, :readable)
    @notify_src.extend(Watchable)
    @notify_src.on_readable do
      begin
        @notify_src.sysread(256)
      rescue Errno::EAGAIN
        # The pipe wasn't readable after all.
      end
    end
  end

  define_opposite_accessors \
    :stopped?  => :running?,
    :sleeping? => :awake?

  def run
    if block_given?
      thread = Thread.new { run }
      yield ; quit ; thread.join
    else
      running!
      iterate while running?
    end
  ensure
    quit
  end

  def iterate (user_timeout=nil)
    t1, t2 = user_timeout, max_timeout
    timeout = t1 && t2 ? [t1, t2].min : t1 || t2
    select(timeout).zip(IO_STATES) do |ios, state|
      ios.each { |x| x.signal(state) } if ios
    end
  end

  private

  def select (timeout)
    @wakeup_time = timeout ? Time.now + timeout : nil
    # puts "waiting: #{timeout} seconds"
    signal :before_sleep ; sleeping!
    IO.select(*@io_arrays + [timeout]) || []
  ensure
    awake! ; signal :after_sleep
    @timers.each { |x| x.sound_alarm if x.ready? }
  end

  public

  def quit ; stopped! ; wake_up ; self end

  def monitoring_io? (io, event)
    @ios[event].include? io end
  def monitoring_timer? (timer)
    @timers.include? timer end

  def monitor_io (io, *events)
    for event in events do
      @ios[event] << io ; wake_up unless monitoring_io?(io, event)
    end
  end

  def monitor_timer (timer)
    @timers << timer unless monitoring_timer? timer
  end

  def check_timer (timer)
    wake_up if timer.end_time < @wakeup_time
  end

  def ignore_io (io, *events)
    events = IO_STATES if events.empty?
    for event in events do
      wake_up if @ios[event].delete(io)
    end
  end

  def ignore_timer (timer)
    # Don't need to wake up for this.
    @timers.delete(timer)
  end

  def max_timeout
    return nil if @timers.empty?
    [@timers.collect { |x| x.time_left }.min, 0].max
  end

  def wake_up
    @notify_snk.write('.') if sleeping?
  end
end

class Symbol
  def io_state?
    EventLoop::IO_STATES.include? self
  end
end

module EventLoop::Watchable
  include SignalEmitter

  define_signals :readable, :writable, :exceptional

  def monitor_events (*events)
    EventLoop.monitor_io(self, *events) end
  def ignore_events (*events)
    EventLoop.ignore_io(self, *events) end

  define_soft_aliases \
    :monitor_event => :monitor_events,
    :ignore_event  => :ignore_events

  def close ; super
    ignore_events end
  def close_read ; super
    ignore_event :readable end
  def close_write ; super
    ignore_event :writable end

  module Automatic
    include EventLoop::Watchable

    def add_signal_handler (name, &handler) super
      monitor_event(name) if name.io_state?
    end

    def remove_signal_handler (name, handler) super
      if @signal_handlers[name].empty?
        ignore_event(name) if name.io_state?
      end
    end
  end
end

class IO
  def on_readable &block
    extend EventLoop::Watchable::Automatic
    on_readable(&block)
  end

  def on_writable &block
    extend EventLoop::Watchable::Automatic
    on_writable(&block)
  end

  def on_exceptional &block
    extend EventLoop::Watchable::Automatic
    on_exceptional(&block)
  end

  def will_block?
    require "fcntl"
    fcntl(Fcntl::F_GETFL, 0) & Fcntl::O_NONBLOCK == 0
  end

  def will_block= (wants_blocking)
    require "fcntl"
    flags = fcntl(Fcntl::F_GETFL, 0)
    if wants_blocking
      flags &= ~Fcntl::O_NONBLOCK
    else
      flags |= Fcntl::O_NONBLOCK
    end
    fcntl(Fcntl::F_SETFL, flags)
  end
end

class EventLoop::Timer
  include SignalEmitter

  DEFAULT_INTERVAL = 0.0
  DEFAULT_TOLERANCE = 0.001

  def initialize (options={}, &handler)
    @running = false
    @start_time = nil

    options = { :interval => options } if options.kind_of? Numeric

    if options[:interval]
      @interval = options[:interval].to_f
    else
      @interval = DEFAULT_INTERVAL
    end

    if options[:tolerance]
      @tolerance = options[:tolerance].to_f
    elsif DEFAULT_TOLERANCE < @interval
      @tolerance = DEFAULT_TOLERANCE
    else
      @tolerance = 0.0
    end

    @event_loop = options[:event_loop] || EventLoop.current

    if block_given?
      add_signal_handler(:alarm, &handler)
      start unless options[:start?] == false
    else
      start if options[:start?]
    end
  end

  define_readers :interval, :tolerance
  define_signal :alarm

  def stopped? ; @start_time == nil end
  def running? ; @start_time != nil end

  def interval= (new_interval)
    old_interval = @interval
    @interval = new_interval
    @event_loop.check_timer(self) if new_interval < old_interval
  end

  def end_time
    @start_time + @interval end
  def time_left
    end_time - Time.now end
  def ready?
    time_left <= @tolerance end

  def restart
    @start_time = Time.now
  end

  def sound_alarm
    signal :alarm
    restart if running?
  end

  def start
    @start_time = Time.now
    @event_loop.monitor_timer(self)
  end

  def stop
    @start_time = nil
    @event_loop.ignore_timer(self)
  end
end

if __FILE__ == $0
  require "test/unit"

  class TimerTest < Test::Unit::TestCase
    def setup
      @timer = EventLoop::Timer.new(:interval => 0.001)
    end

    def test_timer
      @timer.on_alarm do
        puts "[#{@timer.time_left} seconds left after alarm]"
        EventLoop.quit
      end
      8.times do
        t0 = Time.now
        @timer.start ; EventLoop.run
        t1 = Time.now
        assert(t1 - t0 > @timer.interval - @timer.tolerance)
      end
    end
  end
end

## event-loop.rb ends here.