summaryrefslogtreecommitdiffstats
path: root/client/process_loop.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'client/process_loop.cpp')
-rw-r--r--client/process_loop.cpp390
1 files changed, 390 insertions, 0 deletions
diff --git a/client/process_loop.cpp b/client/process_loop.cpp
new file mode 100644
index 00000000..41aac43e
--- /dev/null
+++ b/client/process_loop.cpp
@@ -0,0 +1,390 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "common.h"
+#include "process_loop.h"
+#include "debug.h"
+#include "platform.h"
+#include "utils.h"
+
+SyncEvent::SyncEvent()
+ : _err (false)
+ , _ready (false)
+{
+}
+
+SyncEvent::~SyncEvent()
+{
+}
+
+void SyncEvent::response(AbstractProcessLoop& events_loop)
+{
+ try {
+ do_response(events_loop);
+ } catch (Exception& e) {
+ LOG_WARN("unhandle exception: %s", e.what());
+ _err = true;
+ } catch (...) {
+ _err = true;
+ }
+ Lock lock(_mutex);
+ _ready = true;
+ _condition.notify_one();
+}
+
+void SyncEvent::wait()
+{
+#ifdef RED_DEBUG
+ ASSERT(!_process_loop || !_process_loop->is_same_thread(pthread_self()));
+#endif
+ Lock lock(_mutex);
+ while (!_ready) {
+ _condition.wait(lock);
+ }
+}
+
+class ProcessLoop::QuitEvent: public Event {
+public:
+ QuitEvent(int error_code) : _error_code(error_code) {}
+ virtual void response(AbstractProcessLoop& events_loop);
+private:
+ int _error_code;
+};
+
+void ProcessLoop::QuitEvent::response(AbstractProcessLoop& events_loop)
+{
+ events_loop.do_quit(_error_code);
+}
+
+/* EventsQueue */
+
+EventsQueue::EventsQueue(AbstractProcessLoop& owner)
+ : _events_gen (0)
+ , _owner (owner)
+{
+}
+
+EventsQueue::~EventsQueue()
+{
+ clear_queue();
+}
+
+void EventsQueue::clear_queue()
+{
+ Lock lock(_events_lock);
+ while (!_events.empty()) {
+ Event* event = _events.front();
+ _events.pop_front();
+ event->unref();
+ }
+}
+
+int EventsQueue::push_event(Event* event)
+{
+ Lock lock(_events_lock);
+ _events.push_back(event);
+ event->_generation = _events_gen;
+ event->ref();
+#ifdef RED_DEBUG
+ event->set_process_loop(&_owner);
+#endif
+ return _events.size();
+}
+
+void EventsQueue::process_events()
+{
+ _events_gen++;
+
+ for (;;) {
+ Event* event;
+ Lock lock(_events_lock);
+ if (_events.empty()) {
+ return;
+ }
+ event = _events.front();
+ if (event->_generation == _events_gen) {
+ return;
+ }
+ _events.pop_front();
+
+ lock.unlock();
+ event->response(_owner);
+#ifdef RED_DEBUG
+ event->set_process_loop(NULL);
+#endif
+ event->unref();
+ }
+}
+
+bool EventsQueue::is_empty()
+{
+ Lock lock(_events_lock);
+ return _events.empty();
+}
+
+/* Timers Queue */
+
+Timer::Timer()
+ : _is_armed (false)
+{
+}
+
+Timer::~Timer()
+{
+}
+
+void Timer::arm(uint32_t msec)
+{
+ _interval = msec;
+ _expiration = get_now();
+ calc_next_expiration_time();
+ _is_armed = true;
+}
+
+void Timer::disarm()
+{
+ _is_armed = false;
+}
+
+uint64_t Timer::get_now()
+{
+ return (Platform::get_monolithic_time() / 1000 / 1000);
+}
+
+TimersQueue::TimersQueue(AbstractProcessLoop& owner)
+ : _owner (owner)
+{
+}
+
+TimersQueue::~TimersQueue()
+{
+ clear_queue();
+}
+
+void TimersQueue::clear_queue()
+{
+ RecurciveLock lock(_timers_lock);
+ TimersSet::iterator iter;
+ for (iter = _armed_timers.begin(); iter != _armed_timers.end(); iter++) {
+ (*iter)->disarm();
+ }
+ _armed_timers.clear();
+}
+
+void TimersQueue::activate_interval_timer(Timer* timer, unsigned int millisec)
+{
+ RecurciveLock lock(_timers_lock);
+ timer->ref();
+ deactivate_interval_timer(timer);
+ timer->arm(millisec);
+ _armed_timers.insert(timer);
+}
+
+void TimersQueue::deactivate_interval_timer(Timer* timer)
+{
+ RecurciveLock lock(_timers_lock);
+ if (timer->is_armed()) {
+#ifdef RED_DEBUG
+ int ret =
+#endif
+ _armed_timers.erase(timer);
+ ASSERT(ret);
+ timer->disarm();
+ timer->unref();
+ }
+}
+
+int TimersQueue::get_soonest_timeout()
+{
+ RecurciveLock lock(_timers_lock);
+ TimersSet::iterator iter;
+ iter = _armed_timers.begin();
+ if (iter == _armed_timers.end()) {
+ return INFINITE;
+ }
+
+ uint64_t now = Timer::get_now();
+ uint64_t next_time = (*iter)->get_expiration();
+
+ if (next_time <= now) {
+ return 0;
+ }
+ return (int)(next_time - now);
+}
+
+#define TIMER_COMPENSATION
+
+void TimersQueue::timers_action()
+{
+ RecurciveLock lock(_timers_lock);
+ uint64_t now = Timer::get_now();
+ TimersSet::iterator iter;
+
+ while (((iter = _armed_timers.begin()) != _armed_timers.end()) &&
+ ((*iter)->get_expiration() <= now)) {
+ Timer* timer = *iter;
+ _armed_timers.erase(iter);
+#ifndef TIMER_COMPENSATION
+ timer->_experatoin = now;
+#endif
+ timer->calc_next_expiration_time();
+#ifdef TIMER_COMPENSATION
+ if (timer->_expiration <= now) {
+ timer->_expiration = now;
+ timer->calc_next_expiration_time();
+ }
+#endif
+ _armed_timers.insert(timer);
+ timer->response(_owner);
+ }
+}
+
+ProcessLoop::ProcessLoop(void* owner)
+ : _events_queue (*this)
+ , _timers_queue (*this)
+ , _owner (owner)
+ , _quitting (false)
+ , _exit_code (0)
+ , _started (false)
+
+{
+ _event_sources.add_trigger(_wakeup_trigger);
+}
+
+ProcessLoop::~ProcessLoop()
+{
+ _event_sources.remove_trigger(_wakeup_trigger);
+}
+
+int ProcessLoop::run()
+{
+ _thread = pthread_self();
+ _started = true;
+ for (;;) {
+ if (_event_sources.wait_events(_timers_queue.get_soonest_timeout())) {
+ _quitting = true;
+ break;
+ }
+ _timers_queue.timers_action();
+ process_events_queue();
+ if (_quitting) {
+ break;
+ }
+ }
+
+ return _exit_code;
+}
+
+void ProcessLoop::do_quit(int error_code)
+{
+ ASSERT(!_started || pthread_equal(pthread_self(), _thread));
+ if (_quitting) {
+ return;
+ }
+ _quitting = true;
+ _exit_code = error_code;
+}
+
+void ProcessLoop::quit(int error_code)
+{
+ AutoRef<QuitEvent> quit_event(new QuitEvent(error_code));
+ push_event(*quit_event);
+}
+
+void ProcessLoop::process_events_queue()
+{
+ ASSERT(!_started || pthread_equal(pthread_self(), _thread));
+ _events_queue.process_events();
+ if (!_events_queue.is_empty()) {
+ wakeup();
+ }
+}
+
+void ProcessLoop::wakeup()
+{
+ _wakeup_trigger.trigger();
+}
+
+void ProcessLoop::add_trigger(EventSources::Trigger& trigger)
+{
+ ASSERT(!_started || pthread_equal(pthread_self(), _thread));
+ _event_sources.add_trigger(trigger);
+}
+
+void ProcessLoop::remove_trigger(EventSources::Trigger& trigger)
+{
+ ASSERT(!_started || pthread_equal(pthread_self(), _thread));
+ _event_sources.remove_trigger(trigger);
+}
+
+void ProcessLoop::add_socket(EventSources::Socket& socket)
+{
+ ASSERT(!_started || pthread_equal(pthread_self(), _thread));
+ _event_sources.add_socket(socket);
+}
+
+void ProcessLoop::remove_socket(EventSources::Socket& socket)
+{
+ ASSERT(!_started || pthread_equal(pthread_self(), _thread));
+ _event_sources.remove_socket(socket);
+}
+
+void ProcessLoop::add_file(EventSources::File& file)
+{
+ ASSERT(!_started || pthread_equal(pthread_self(), _thread));
+ _event_sources.add_file(file);
+}
+
+void ProcessLoop::remove_file(EventSources::File& file)
+{
+ ASSERT(!_started || pthread_equal(pthread_self(), _thread));
+ _event_sources.remove_file(file);
+}
+
+void ProcessLoop::add_handle(EventSources::Handle& handle)
+{
+ ASSERT(!_started || pthread_equal(pthread_self(), _thread));
+ _event_sources.add_handle(handle);
+}
+
+void ProcessLoop::remove_handle(EventSources::Handle& handle)
+{
+ ASSERT(!_started || pthread_equal(pthread_self(), _thread));
+ _event_sources.remove_handle(handle);
+}
+
+void ProcessLoop::push_event(Event* event)
+{
+ int queue_size = _events_queue.push_event(event);
+ if (queue_size == 1) { // queue was empty before the push
+ wakeup();
+ }
+}
+
+void ProcessLoop::activate_interval_timer(Timer* timer, unsigned int millisec)
+{
+ _timers_queue.activate_interval_timer(timer, millisec);
+
+ if (_started && !pthread_equal(pthread_self(), _thread)) {
+ wakeup();
+ }
+}
+
+void ProcessLoop::deactivate_interval_timer(Timer* timer)
+{
+ _timers_queue.deactivate_interval_timer(timer);
+}