diff options
Diffstat (limited to 'lib/puppet/event-loop/event-loop.rb')
-rw-r--r-- | lib/puppet/event-loop/event-loop.rb | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/lib/puppet/event-loop/event-loop.rb b/lib/puppet/event-loop/event-loop.rb new file mode 100644 index 000000000..5d78844ef --- /dev/null +++ b/lib/puppet/event-loop/event-loop.rb @@ -0,0 +1,355 @@ +## event-loop.rb --- high-level IO multiplexer +# Copyright (C) 2005 Daniel Brockman + +# This program is free software; you can redistribute it +# and/or modify it under the terms of the GNU General Public +# License as published by the Free Software Foundation; +# either version 2 of the License, or (at your option) any +# later version. + +# This file is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty +# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU General Public License for more details. + +# You should have received a copy of the GNU General Public +# License along with this program; if not, write to the Free +# Software Foundation, 51 Franklin Street, Fifth Floor, +# Boston, MA 02110-1301, USA. + +require "puppet/event-loop/better-definers" +require "puppet/event-loop/signal-system" + +require "fcntl" + +class EventLoop + include SignalEmitter + + IO_STATES = [:readable, :writable, :exceptional] + + class << self + def default ; @default ||= new end + def default= x ; @default = x end + + def current + Thread.current["event-loop::current"] || default end + def current= x + Thread.current["event-loop::current"] = x end + + def with_current (new) + if current == new + yield + else + begin + old = self.current + self.current = new + yield + ensure + self.current = old + end + end + end + + def method_missing (name, *args, &block) + if current.respond_to? name + current.__send__(name, *args, &block) + else + super + end + end + end + + define_signals :before_sleep, :after_sleep + + def initialize + @running = false + @awake = false + @wakeup_time = nil + @timers = [] + + @io_arrays = [[], [], []] + @ios = Hash.new do |h, k| raise ArgumentError, + "invalid IO event: #{k}", caller(2) end + IO_STATES.each_with_index { |x, i| @ios[x] = @io_arrays[i] } + + @notify_src, @notify_snk = IO.pipe + + @notify_src.will_block = false + @notify_snk.will_block = false + + # Each time a byte is sent through the notification pipe + # we need to read it, or IO.select will keep returning. + monitor_io(@notify_src, :readable) + @notify_src.extend(Watchable) + @notify_src.on_readable do + begin + @notify_src.sysread(256) + rescue Errno::EAGAIN + # The pipe wasn't readable after all. + end + end + end + + define_opposite_accessors \ + :stopped? => :running?, + :sleeping? => :awake? + + def run + if block_given? + thread = Thread.new { run } + yield ; quit ; thread.join + else + running! + iterate while running? + end + ensure + quit + end + + def iterate (user_timeout=nil) + t1, t2 = user_timeout, max_timeout + timeout = t1 && t2 ? [t1, t2].min : t1 || t2 + select(timeout).zip(IO_STATES) do |ios, state| + ios.each { |x| x.signal(state) } if ios + end + end + + private + + def select (timeout) + @wakeup_time = timeout ? Time.now + timeout : nil + # puts "waiting: #{timeout} seconds" + signal :before_sleep ; sleeping! + IO.select(*@io_arrays + [timeout]) || [] + ensure + awake! ; signal :after_sleep + @timers.each { |x| x.sound_alarm if x.ready? } + end + + public + + def quit ; stopped! ; wake_up ; self end + + def monitoring_io? (io, event) + @ios[event].include? io end + def monitoring_timer? (timer) + @timers.include? timer end + + def monitor_io (io, *events) + for event in events do + unless monitoring_io?(io, event) + @ios[event] << io ; wake_up + end + end + end + + def monitor_timer (timer) + unless monitoring_timer? timer + @timers << timer + end + end + + def check_timer (timer) + wake_up if timer.end_time < @wakeup_time + end + + def ignore_io (io, *events) + events = IO_STATES if events.empty? + for event in events do + wake_up if @ios[event].delete(io) + end + end + + def ignore_timer (timer) + # Don't need to wake up for this. + @timers.delete(timer) + end + + def max_timeout + return nil if @timers.empty? + [@timers.collect { |x| x.time_left }.min, 0].max + end + + def wake_up + @notify_snk.write('.') if sleeping? + end +end + +class Symbol + def io_state? + EventLoop::IO_STATES.include? self + end +end + +module EventLoop::Watchable + include SignalEmitter + + define_signals :readable, :writable, :exceptional + + def monitor_events (*events) + EventLoop.monitor_io(self, *events) end + def ignore_events (*events) + EventLoop.ignore_io(self, *events) end + + define_soft_aliases \ + :monitor_event => :monitor_events, + :ignore_event => :ignore_events + + def close ; super + ignore_events end + def close_read ; super + ignore_event :readable end + def close_write ; super + ignore_event :writable end + + module Automatic + include EventLoop::Watchable + + def add_signal_handler (name, &handler) super + monitor_event(name) if name.io_state? + end + + def remove_signal_handler (name, handler) super + if @signal_handlers[name].empty? + ignore_event(name) if name.io_state? + end + end + end +end + +class IO + def on_readable &block + extend EventLoop::Watchable::Automatic + on_readable(&block) + end + + def on_writable &block + extend EventLoop::Watchable::Automatic + on_writable(&block) + end + + def on_exceptional &block + extend EventLoop::Watchable::Automatic + on_exceptional(&block) + end + + def will_block? + require "fcntl" + fcntl(Fcntl::F_GETFL, 0) & Fcntl::O_NONBLOCK == 0 + end + + def will_block= (wants_blocking) + require "fcntl" + flags = fcntl(Fcntl::F_GETFL, 0) + if wants_blocking + flags &= ~Fcntl::O_NONBLOCK + else + flags |= Fcntl::O_NONBLOCK + end + fcntl(Fcntl::F_SETFL, flags) + end +end + +class EventLoop::Timer + include SignalEmitter + + DEFAULT_INTERVAL = 0.0 + DEFAULT_TOLERANCE = 0.001 + + def initialize (options={}, &handler) + @running = false + @start_time = nil + + if options.kind_of? Numeric + options = { :interval => options } + end + + if options[:interval] + @interval = options[:interval].to_f + else + @interval = DEFAULT_INTERVAL + end + + if options[:tolerance] + @tolerance = options[:tolerance].to_f + elsif DEFAULT_TOLERANCE < @interval + @tolerance = DEFAULT_TOLERANCE + else + @tolerance = 0.0 + end + + @event_loop = options[:event_loop] || EventLoop.current + + if block_given? + add_signal_handler(:alarm, &handler) + start unless options[:start?] == false + else + start if options[:start?] + end + end + + define_readers :interval, :tolerance + define_signal :alarm + + def stopped? ; @start_time == nil end + def running? ; @start_time != nil end + + def interval= (new_interval) + old_interval = @interval + @interval = new_interval + if new_interval < old_interval + @event_loop.check_timer(self) + end + end + + def end_time + @start_time + @interval end + def time_left + end_time - Time.now end + def ready? + time_left <= @tolerance end + + def restart + @start_time = Time.now + end + + def sound_alarm + signal :alarm + restart if running? + end + + def start + @start_time = Time.now + @event_loop.monitor_timer(self) + end + + def stop + @start_time = nil + @event_loop.ignore_timer(self) + end +end + +if __FILE__ == $0 + require "test/unit" + + class TimerTest < Test::Unit::TestCase + def setup + @timer = EventLoop::Timer.new(:interval => 0.001) + end + + def test_timer + @timer.on_alarm do + puts "[#{@timer.time_left} seconds left after alarm]" + EventLoop.quit + end + 8.times do + t0 = Time.now + @timer.start ; EventLoop.run + t1 = Time.now + assert(t1 - t0 > @timer.interval - @timer.tolerance) + end + end + end +end + +## event-loop.rb ends here. |