diff options
Diffstat (limited to 'client/process_loop.cpp')
| -rw-r--r-- | client/process_loop.cpp | 390 |
1 files changed, 390 insertions, 0 deletions
diff --git a/client/process_loop.cpp b/client/process_loop.cpp new file mode 100644 index 00000000..41aac43e --- /dev/null +++ b/client/process_loop.cpp @@ -0,0 +1,390 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "common.h" +#include "process_loop.h" +#include "debug.h" +#include "platform.h" +#include "utils.h" + +SyncEvent::SyncEvent() + : _err (false) + , _ready (false) +{ +} + +SyncEvent::~SyncEvent() +{ +} + +void SyncEvent::response(AbstractProcessLoop& events_loop) +{ + try { + do_response(events_loop); + } catch (Exception& e) { + LOG_WARN("unhandle exception: %s", e.what()); + _err = true; + } catch (...) { + _err = true; + } + Lock lock(_mutex); + _ready = true; + _condition.notify_one(); +} + +void SyncEvent::wait() +{ +#ifdef RED_DEBUG + ASSERT(!_process_loop || !_process_loop->is_same_thread(pthread_self())); +#endif + Lock lock(_mutex); + while (!_ready) { + _condition.wait(lock); + } +} + +class ProcessLoop::QuitEvent: public Event { +public: + QuitEvent(int error_code) : _error_code(error_code) {} + virtual void response(AbstractProcessLoop& events_loop); +private: + int _error_code; +}; + +void ProcessLoop::QuitEvent::response(AbstractProcessLoop& events_loop) +{ + events_loop.do_quit(_error_code); +} + +/* EventsQueue */ + +EventsQueue::EventsQueue(AbstractProcessLoop& owner) + : _events_gen (0) + , _owner (owner) +{ +} + +EventsQueue::~EventsQueue() +{ + clear_queue(); +} + +void EventsQueue::clear_queue() +{ + Lock lock(_events_lock); + while (!_events.empty()) { + Event* event = _events.front(); + _events.pop_front(); + event->unref(); + } +} + +int EventsQueue::push_event(Event* event) +{ + Lock lock(_events_lock); + _events.push_back(event); + event->_generation = _events_gen; + event->ref(); +#ifdef RED_DEBUG + event->set_process_loop(&_owner); +#endif + return _events.size(); +} + +void EventsQueue::process_events() +{ + _events_gen++; + + for (;;) { + Event* event; + Lock lock(_events_lock); + if (_events.empty()) { + return; + } + event = _events.front(); + if (event->_generation == _events_gen) { + return; + } + _events.pop_front(); + + lock.unlock(); + event->response(_owner); +#ifdef RED_DEBUG + event->set_process_loop(NULL); +#endif + event->unref(); + } +} + +bool EventsQueue::is_empty() +{ + Lock lock(_events_lock); + return _events.empty(); +} + +/* Timers Queue */ + +Timer::Timer() + : _is_armed (false) +{ +} + +Timer::~Timer() +{ +} + +void Timer::arm(uint32_t msec) +{ + _interval = msec; + _expiration = get_now(); + calc_next_expiration_time(); + _is_armed = true; +} + +void Timer::disarm() +{ + _is_armed = false; +} + +uint64_t Timer::get_now() +{ + return (Platform::get_monolithic_time() / 1000 / 1000); +} + +TimersQueue::TimersQueue(AbstractProcessLoop& owner) + : _owner (owner) +{ +} + +TimersQueue::~TimersQueue() +{ + clear_queue(); +} + +void TimersQueue::clear_queue() +{ + RecurciveLock lock(_timers_lock); + TimersSet::iterator iter; + for (iter = _armed_timers.begin(); iter != _armed_timers.end(); iter++) { + (*iter)->disarm(); + } + _armed_timers.clear(); +} + +void TimersQueue::activate_interval_timer(Timer* timer, unsigned int millisec) +{ + RecurciveLock lock(_timers_lock); + timer->ref(); + deactivate_interval_timer(timer); + timer->arm(millisec); + _armed_timers.insert(timer); +} + +void TimersQueue::deactivate_interval_timer(Timer* timer) +{ + RecurciveLock lock(_timers_lock); + if (timer->is_armed()) { +#ifdef RED_DEBUG + int ret = +#endif + _armed_timers.erase(timer); + ASSERT(ret); + timer->disarm(); + timer->unref(); + } +} + +int TimersQueue::get_soonest_timeout() +{ + RecurciveLock lock(_timers_lock); + TimersSet::iterator iter; + iter = _armed_timers.begin(); + if (iter == _armed_timers.end()) { + return INFINITE; + } + + uint64_t now = Timer::get_now(); + uint64_t next_time = (*iter)->get_expiration(); + + if (next_time <= now) { + return 0; + } + return (int)(next_time - now); +} + +#define TIMER_COMPENSATION + +void TimersQueue::timers_action() +{ + RecurciveLock lock(_timers_lock); + uint64_t now = Timer::get_now(); + TimersSet::iterator iter; + + while (((iter = _armed_timers.begin()) != _armed_timers.end()) && + ((*iter)->get_expiration() <= now)) { + Timer* timer = *iter; + _armed_timers.erase(iter); +#ifndef TIMER_COMPENSATION + timer->_experatoin = now; +#endif + timer->calc_next_expiration_time(); +#ifdef TIMER_COMPENSATION + if (timer->_expiration <= now) { + timer->_expiration = now; + timer->calc_next_expiration_time(); + } +#endif + _armed_timers.insert(timer); + timer->response(_owner); + } +} + +ProcessLoop::ProcessLoop(void* owner) + : _events_queue (*this) + , _timers_queue (*this) + , _owner (owner) + , _quitting (false) + , _exit_code (0) + , _started (false) + +{ + _event_sources.add_trigger(_wakeup_trigger); +} + +ProcessLoop::~ProcessLoop() +{ + _event_sources.remove_trigger(_wakeup_trigger); +} + +int ProcessLoop::run() +{ + _thread = pthread_self(); + _started = true; + for (;;) { + if (_event_sources.wait_events(_timers_queue.get_soonest_timeout())) { + _quitting = true; + break; + } + _timers_queue.timers_action(); + process_events_queue(); + if (_quitting) { + break; + } + } + + return _exit_code; +} + +void ProcessLoop::do_quit(int error_code) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + if (_quitting) { + return; + } + _quitting = true; + _exit_code = error_code; +} + +void ProcessLoop::quit(int error_code) +{ + AutoRef<QuitEvent> quit_event(new QuitEvent(error_code)); + push_event(*quit_event); +} + +void ProcessLoop::process_events_queue() +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _events_queue.process_events(); + if (!_events_queue.is_empty()) { + wakeup(); + } +} + +void ProcessLoop::wakeup() +{ + _wakeup_trigger.trigger(); +} + +void ProcessLoop::add_trigger(EventSources::Trigger& trigger) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.add_trigger(trigger); +} + +void ProcessLoop::remove_trigger(EventSources::Trigger& trigger) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.remove_trigger(trigger); +} + +void ProcessLoop::add_socket(EventSources::Socket& socket) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.add_socket(socket); +} + +void ProcessLoop::remove_socket(EventSources::Socket& socket) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.remove_socket(socket); +} + +void ProcessLoop::add_file(EventSources::File& file) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.add_file(file); +} + +void ProcessLoop::remove_file(EventSources::File& file) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.remove_file(file); +} + +void ProcessLoop::add_handle(EventSources::Handle& handle) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.add_handle(handle); +} + +void ProcessLoop::remove_handle(EventSources::Handle& handle) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.remove_handle(handle); +} + +void ProcessLoop::push_event(Event* event) +{ + int queue_size = _events_queue.push_event(event); + if (queue_size == 1) { // queue was empty before the push + wakeup(); + } +} + +void ProcessLoop::activate_interval_timer(Timer* timer, unsigned int millisec) +{ + _timers_queue.activate_interval_timer(timer, millisec); + + if (_started && !pthread_equal(pthread_self(), _thread)) { + wakeup(); + } +} + +void ProcessLoop::deactivate_interval_timer(Timer* timer) +{ + _timers_queue.deactivate_interval_timer(timer); +} |
