/* Copyright (C) 2009 Red Hat, Inc. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, see . */ #ifdef HAVE_CONFIG_H #include #endif #include "common.h" #include "process_loop.h" #include "debug.h" #include "platform.h" #include "utils.h" SyncEvent::SyncEvent() : _err (false) , _ready (false) { } SyncEvent::~SyncEvent() { } void SyncEvent::response(AbstractProcessLoop& events_loop) { try { do_response(events_loop); } catch (Exception& e) { LOG_WARN("unhandled exception: %s", e.what()); _err = true; } catch (...) { _err = true; } Lock lock(_mutex); _ready = true; _condition.notify_one(); } void SyncEvent::wait() { #ifdef RED_DEBUG ASSERT(_process_loop && !_process_loop->is_same_thread(pthread_self())); #endif Lock lock(_mutex); while (!_ready) { _condition.wait(lock); } } class ProcessLoop::QuitEvent: public Event { public: QuitEvent(int error_code) : _error_code(error_code) {} virtual void response(AbstractProcessLoop& events_loop); private: int _error_code; }; void ProcessLoop::QuitEvent::response(AbstractProcessLoop& events_loop) { events_loop.do_quit(_error_code); } /* EventsQueue */ EventsQueue::EventsQueue(AbstractProcessLoop& owner) : _events_gen (0) , _owner (owner) { } EventsQueue::~EventsQueue() { clear_queue(); } void EventsQueue::clear_queue() { Lock lock(_events_lock); while (!_events.empty()) { Event* event = _events.front(); _events.pop_front(); event->unref(); } } int EventsQueue::push_event(Event* event) { Lock lock(_events_lock); _events.push_back(event); event->set_generation(_events_gen); event->ref(); #ifdef RED_DEBUG event->set_process_loop(&_owner); #endif return _events.size(); } void EventsQueue::process_events() { _events_gen++; for (;;) { Event* event; Lock lock(_events_lock); if (_events.empty()) { return; } event = _events.front(); if (event->get_generation() == _events_gen) { return; } _events.pop_front(); lock.unlock(); event->response(_owner); event->unref(); } } bool EventsQueue::is_empty() { Lock lock(_events_lock); return _events.empty(); } /* Timers Queue */ Timer::Timer() : _is_armed (false) { } Timer::~Timer() { } void Timer::arm(uint32_t msec) { _interval = msec; _expiration = get_now(); calc_next_expiration_time(); _is_armed = true; } void Timer::disarm() { _is_armed = false; } #define TIMER_COMPENSATION void Timer::calc_next_expiration_time(uint64_t now) { #ifndef TIMER_COMPENSATION _expiratoin = now; #endif calc_next_expiration_time(); #ifdef TIMER_COMPENSATION if (_expiration <= now) { _expiration = now; calc_next_expiration_time(); } #endif } uint64_t Timer::get_now() { return (Platform::get_monolithic_time() / 1000 / 1000); } TimersQueue::TimersQueue(AbstractProcessLoop& owner) : _owner (owner) { } TimersQueue::~TimersQueue() { clear_queue(); } void TimersQueue::clear_queue() { RecurciveLock lock(_timers_lock); TimersSet::iterator iter; for (iter = _armed_timers.begin(); iter != _armed_timers.end(); iter++) { (*iter)->disarm(); } _armed_timers.clear(); } void TimersQueue::activate_interval_timer(Timer* timer, unsigned int millisec) { RecurciveLock lock(_timers_lock); timer->ref(); deactivate_interval_timer(timer); timer->arm(millisec); _armed_timers.insert(timer); } void TimersQueue::deactivate_interval_timer(Timer* timer) { RecurciveLock lock(_timers_lock); if (timer->is_armed()) { #ifdef RED_DEBUG int ret = #endif _armed_timers.erase(timer); ASSERT(ret); timer->disarm(); timer->unref(); } } unsigned int TimersQueue::get_soonest_timeout() { RecurciveLock lock(_timers_lock); TimersSet::iterator iter; iter = _armed_timers.begin(); if (iter == _armed_timers.end()) { return INFINITE; } uint64_t now = Timer::get_now(); uint64_t next_time = (*iter)->get_expiration(); if (next_time <= now) { return 0; } return (int)(next_time - now); } void TimersQueue::timers_action() { RecurciveLock lock(_timers_lock); uint64_t now = Timer::get_now(); TimersSet::iterator iter; while (((iter = _armed_timers.begin()) != _armed_timers.end()) && ((*iter)->get_expiration() <= now)) { Timer* timer = *iter; _armed_timers.erase(iter); timer->calc_next_expiration_time(now); _armed_timers.insert(timer); timer->response(_owner); } } ProcessLoop::ProcessLoop(void* owner) : _events_queue (*this) , _timers_queue (*this) , _owner (owner) , _quitting (false) , _exit_code (0) , _started (false) { _event_sources.add_trigger(_wakeup_trigger); } ProcessLoop::~ProcessLoop() { _event_sources.remove_trigger(_wakeup_trigger); } int ProcessLoop::run() { _thread = pthread_self(); _started = true; on_start_running(); for (;;) { if (_event_sources.wait_events(_timers_queue.get_soonest_timeout())) { _quitting = true; break; } _timers_queue.timers_action(); process_events_queue(); if (_quitting) { break; } } return _exit_code; } void ProcessLoop::do_quit(int error_code) { ASSERT(!_started || pthread_equal(pthread_self(), _thread)); if (_quitting) { return; } _quitting = true; _exit_code = error_code; } void ProcessLoop::quit(int error_code) { AutoRef quit_event(new QuitEvent(error_code)); push_event(*quit_event); } void ProcessLoop::process_events_queue() { ASSERT(!_started || pthread_equal(pthread_self(), _thread)); _events_queue.process_events(); if (!_events_queue.is_empty()) { wakeup(); } } void ProcessLoop::wakeup() { _wakeup_trigger.trigger(); } void ProcessLoop::add_trigger(EventSources::Trigger& trigger) { ASSERT(!_started || pthread_equal(pthread_self(), _thread)); _event_sources.add_trigger(trigger); } void ProcessLoop::remove_trigger(EventSources::Trigger& trigger) { ASSERT(!_started || pthread_equal(pthread_self(), _thread)); _event_sources.remove_trigger(trigger); } void ProcessLoop::add_socket(EventSources::Socket& socket) { ASSERT(!_started || pthread_equal(pthread_self(), _thread)); _event_sources.add_socket(socket); } void ProcessLoop::remove_socket(EventSources::Socket& socket) { ASSERT(!_started || pthread_equal(pthread_self(), _thread)); _event_sources.remove_socket(socket); } void ProcessLoop::add_file(EventSources::File& file) { ASSERT(!_started || pthread_equal(pthread_self(), _thread)); _event_sources.add_file(file); } void ProcessLoop::remove_file(EventSources::File& file) { ASSERT(!_started || pthread_equal(pthread_self(), _thread)); _event_sources.remove_file(file); } void ProcessLoop::add_handle(EventSources::Handle& handle) { ASSERT(!_started || pthread_equal(pthread_self(), _thread)); _event_sources.add_handle(handle); } void ProcessLoop::remove_handle(EventSources::Handle& handle) { ASSERT(!_started || pthread_equal(pthread_self(), _thread)); _event_sources.remove_handle(handle); } void ProcessLoop::push_event(Event* event) { int queue_size = _events_queue.push_event(event); if (queue_size == 1) { // queue was empty before the push wakeup(); } } void ProcessLoop::activate_interval_timer(Timer* timer, unsigned int millisec) { _timers_queue.activate_interval_timer(timer, millisec); if (_started && !pthread_equal(pthread_self(), _thread)) { wakeup(); } } void ProcessLoop::deactivate_interval_timer(Timer* timer) { _timers_queue.deactivate_interval_timer(timer); } unsigned ProcessLoop::get_soonest_timeout() { return _timers_queue.get_soonest_timeout(); } void ProcessLoop::timers_action() { _timers_queue.timers_action(); }