diff options
Diffstat (limited to 'client')
33 files changed, 1752 insertions, 740 deletions
diff --git a/client/Makefile.am b/client/Makefile.am index 51d4857e..2a2d076d 100644 --- a/client/Makefile.am +++ b/client/Makefile.am @@ -50,6 +50,7 @@ RED_COMMON_SRCS = \ pixels_source.h \ platform.h \ playback_channel.cpp \ + process_loop.cpp \ quic.cpp \ read_write_mutex.h \ record_channel.cpp \ diff --git a/client/application.cpp b/client/application.cpp index 65937156..3b797539 100644 --- a/client/application.cpp +++ b/client/application.cpp @@ -50,72 +50,35 @@ mutex_t cairo_surface_user_data_mutex; #endif -SyncEvent::SyncEvent() - : _err (false) - , _ready (false) +void ConnectedEvent::response(AbstractProcessLoop& events_loop) { + static_cast<Application*>(events_loop.get_owner())->on_connected(); } -SyncEvent::~SyncEvent() +void DisconnectedEvent::response(AbstractProcessLoop& events_loop) { -} - -void SyncEvent::responce(Application& application) -{ - try { - do_responce(application); - } catch (Exception& e) { - LOG_WARN("unhandle exception: %s", e.what()); - _err = true; - } catch (...) { - _err = true; - } - Lock lock(_mutex); - _ready = true; - _condition.notify_one(); -} - -void SyncEvent::wait() -{ - //todo: process event if on main thread - Lock lock(_mutex); - while (!_ready) { - _condition.wait(lock); - } -} - -void ConnectedEvent::responce(Application& application) -{ - application.on_connected(); -} - -void DisconnectedEvent::responce(Application& application) -{ - application.show_splash(0); + Application* app = static_cast<Application*>(events_loop.get_owner()); + app->show_splash(0); #ifndef RED_DEBUG - application.quit(SPICEC_ERROR_CODE_SUCCESS); + app->do_quit(SPICEC_ERROR_CODE_SUCCESS); #endif } -void CoonnectionError::responce(Application& application) +void ConnectionErrorEvent::response(AbstractProcessLoop& events_loop) { - application.show_splash(0); + Application* app = static_cast<Application*>(events_loop.get_owner()); + app->show_splash(0); #ifndef RED_DEBUG - application.quit(_error_code); + app->do_quit(_error_code); #endif } -void ErrorEvent::responce(Application& application) -{ - application.quit(SPICEC_ERROR_CODE_ERROR); -} - -void MonitorsQuery::do_responce(Application& application) +void MonitorsQuery::do_response(AbstractProcessLoop& events_loop) { Monitor* mon; int i = 0; - while ((mon = application.find_monitor(i++))) { + while ((mon = (static_cast<Application*>(events_loop.get_owner()))->find_monitor(i++))) { MonitorInfo info; info.size = mon->get_size(); info.depth = 32; @@ -236,15 +199,14 @@ enum AppCommands { }; Application::Application() - : _client (*this) + : ProcessLoop (this) + , _client (*this) , _enabled_channels (RED_CHANNEL_END, true) , _main_screen (NULL) - , _quitting (false) , _active (false) , _full_screen (false) , _changing_screens (false) , _exit_code (0) - , _events_gen (0) , _active_screen (NULL) , _gui_layer (new GUILayer()) , _inputs_handler (&default_inputs_handler) @@ -252,6 +214,7 @@ Application::Application() , _title (L"SPICEc:%d") { DBG(0, ""); + Platform::set_process_loop(*this); init_monitors(); init_key_table(); @@ -325,59 +288,13 @@ void Application::remove_inputs_handler(InputsHandler& handler) _inputs_handler = &default_inputs_handler; } -void Application::process_events() -{ - _events_gen++; - for (;;) { - Event* event; - Lock lock(_events_lock); - if (_events.empty()) { - return; - } - event = _events.front(); - if (event->_generation == _events_gen) { - Platform::wakeup(); - return; - } - _events.pop_front(); - lock.unlock(); - event->responce(*this); - event->unref(); - } -} - -void Application::push_event(Event* event) -{ - Lock lock(_events_lock); - bool notify = _events.empty(); - _events.push_back(event); - event->_generation = _events_gen; - event->ref(); - if (notify) { - Platform::wakeup(); - } -} - -int Application::message_loop() -{ - for (;;) { - Platform::wait_events(); - if (Platform::process_events()) { - _quitting = true; - break; - } - process_events(); - } - return _exit_code; -} - void Application::abort() { Platform::set_event_listener(NULL); Platform::set_display_mode_listner(NULL); unpress_all(); while (!_client.abort()) { - process_events(); + ProcessLoop::process_events_queue(); Platform::msleep(100); } } @@ -398,22 +315,10 @@ void Application::connect() int Application::run() { - try { - _client.connect(); - _exit_code = message_loop(); - } catch (...) { - throw; - } - return _exit_code; -} + _client.connect(); + _exit_code = ProcessLoop::run(); + return _exit_code; -void Application::quit(int exit_code) -{ - if (!_quitting) { - _quitting = true; - _exit_code = exit_code; - Platform::send_quit_request(); - } } RedScreen* Application::find_screen(int id) diff --git a/client/application.h b/client/application.h index 1a14c5ed..91afb54e 100644 --- a/client/application.h +++ b/client/application.h @@ -25,6 +25,7 @@ #include "platform.h" #include "menu.h" #include "hot_keys.h" +#include "process_loop.h" class RedScreen; class Application; @@ -35,73 +36,25 @@ class Monitor; class CmdLineParser; class Menu; -class Event { -public: - Event() : _refs (1) {} - - virtual void responce(Application& application) = 0; - - Event* ref() { ++_refs; return this;} - void unref() {if (--_refs == 0) delete this;} - -protected: - virtual ~Event() {} - - AtomicCount _refs; - friend class Application; - uint32_t _generation; -}; - -class SyncEvent: public Event { -public: - SyncEvent(); - - void wait(); - bool success() { return !_err;} - - virtual void do_responce(Application& application) {} - -protected: - virtual ~SyncEvent(); - -private: - virtual void responce(Application& application); - -private: - Mutex _mutex; - Condition _condition; - bool _err; - bool _ready; -}; class ConnectedEvent: public Event { public: - ConnectedEvent() : Event() {} - virtual void responce(Application& application); + virtual void response(AbstractProcessLoop& events_loop); }; class DisconnectedEvent: public Event { public: - DisconnectedEvent() : Event() {} - virtual void responce(Application& application); + virtual void response(AbstractProcessLoop& events_loop); }; -class CoonnectionError: public Event { +class ConnectionErrorEvent: public Event { public: - CoonnectionError(int error_code) : Event(), _error_code (error_code) {} - - virtual void responce(Application& application); - + ConnectionErrorEvent(int error_code) : _error_code (error_code) {} + virtual void response(AbstractProcessLoop& events_loop); private: int _error_code; }; -class ErrorEvent: public Event { -public: - ErrorEvent() : Event() {} - virtual void responce(Application& application); -}; - struct MonitorInfo { int depth; Point size; @@ -112,8 +65,7 @@ class MonitorsQuery: public SyncEvent { public: MonitorsQuery() {} - virtual void do_responce(Application& application); - + virtual void do_response(AbstractProcessLoop& events_loop); std::vector<MonitorInfo>& get_monitors() {return _monitors;} private: @@ -138,7 +90,8 @@ enum CanvasOption { #endif }; -class Application : public Platform::EventListener, +class Application : public ProcessLoop, + public Platform::EventListener, public Platform::DisplayModeListner, public CommandTarget { public: @@ -146,8 +99,7 @@ public: virtual ~Application(); int run(); - void quit(int exit_code); - void push_event(Event* event); + void set_inputs_handler(InputsHandler& handler); void remove_inputs_handler(InputsHandler& handler); RedScreen* find_screen(int id); @@ -195,8 +147,6 @@ private: bool set_enable_channels(CmdLineParser& parser, bool enable, char *val); bool set_canvas_option(CmdLineParser& parser, char *val); bool process_cmd_line(int argc, char** argv); - void process_events(); - int message_loop(); void abort(); void init_scan_code(int index); void init_korean_scan_code(int index); @@ -234,6 +184,8 @@ private: static void init_logger(); static void init_globals(); + friend class DisconnectedEvent; + friend class ConnectionErrorEvent; friend class MonitorsQuery; friend class AutoAbort; @@ -242,15 +194,11 @@ private: PeerConnectionOptMap _peer_con_opt; std::vector<bool> _enabled_channels; std::vector<RedScreen*> _screens; - std::list<Event*> _events; RedScreen* _main_screen; - Mutex _events_lock; - bool _quitting; bool _active; bool _full_screen; bool _changing_screens; int _exit_code; - uint32_t _events_gen; RedScreen* _active_screen; KeyInfo _key_table[REDKEY_NUM_KEYS]; HotKeys _hot_keys; diff --git a/client/cursor_channel.cpp b/client/cursor_channel.cpp index a78eba35..d5a64b2b 100644 --- a/client/cursor_channel.cpp +++ b/client/cursor_channel.cpp @@ -331,7 +331,7 @@ public: cursor->set_opaque(native_cursor); } - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { CursorData *cursor = *_cursor; create_cursor(); @@ -345,7 +345,8 @@ public: _channel._cursor_rect.top = _y - cursor->header().hot_spot_y; _channel._cursor_rect.bottom = _channel._cursor_rect.top + cursor->header().height; - if (application.get_mouse_mode() == RED_MOUSE_MODE_CLIENT) { + if (static_cast<Application*>(events_loop.get_owner())->get_mouse_mode() == + RED_MOUSE_MODE_CLIENT) { RedScreen* screen = _channel.screen(); ASSERT(screen); screen->set_cursor(_visible ? cursor : NULL); @@ -381,10 +382,11 @@ public: { } - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { _channel._cursor_visible = true; - if (application.get_mouse_mode() == RED_MOUSE_MODE_CLIENT) { + if (static_cast<Application*>(events_loop.get_owner())->get_mouse_mode() == + RED_MOUSE_MODE_CLIENT) { RedScreen* screen = _channel.screen(); ASSERT(screen); screen->set_cursor(_channel._cursor); @@ -412,10 +414,11 @@ private: class CursorHideEvent: public Event { public: CursorHideEvent(CursorChannel& channel): _channel (channel) {} - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { _channel._cursor_visible = false; - if (application.get_mouse_mode() == RED_MOUSE_MODE_CLIENT) { + if (static_cast<Application*>(events_loop.get_owner())->get_mouse_mode() == + RED_MOUSE_MODE_CLIENT) { RedScreen* screen = _channel.screen(); ASSERT(screen); screen->set_cursor(NULL); @@ -431,7 +434,7 @@ private: class CursorRemoveEvent: public Event { public: CursorRemoveEvent(CursorChannel& channel): _channel (channel) {} - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { _channel._cursor_visible = false; _channel.clear_area(); @@ -455,13 +458,14 @@ class CursorModeEvent: public Event { public: CursorModeEvent(CursorChannel& channel): _channel (channel) {} - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { RedScreen* screen = _channel.screen(); if (!screen) { return; } - if (application.get_mouse_mode() == RED_MOUSE_MODE_CLIENT) { + if (static_cast<Application*>(events_loop.get_owner())->get_mouse_mode() == + RED_MOUSE_MODE_CLIENT) { _channel.clear_area(); screen->set_cursor(_channel._cursor_visible ? _channel._cursor : NULL); } else { diff --git a/client/display_channel.cpp b/client/display_channel.cpp index 2e2e4f56..e9d4f192 100644 --- a/client/display_channel.cpp +++ b/client/display_channel.cpp @@ -54,11 +54,12 @@ public: { } - virtual void do_responce(Application& application) + virtual void do_response(AbstractProcessLoop& events_loop) { + Application* app = (Application*)events_loop.get_owner(); _channel.destroy_canvas(); _channel.screen()->set_mode(_width, _height, _depth); - _channel.create_canvas(application.get_canvas_types(), _width, _height, _depth); + _channel.create_canvas(app->get_canvas_types(), _width, _height, _depth); } private: @@ -75,9 +76,9 @@ public: { } - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { - application.hide_splash(_screen_id); + static_cast<Application*>(events_loop.get_owner())->hide_splash(_screen_id); } private: @@ -831,7 +832,7 @@ public: ASSERT(_timer == INVALID_TIMER); } - virtual void do_responce(Application& application) + virtual void do_response(AbstractProcessLoop& events_loop) { if ((_timer = Platform::create_interval_timer(_proc, _user_data)) == INVALID_TIMER) { THROW("create timer failed"); @@ -850,7 +851,7 @@ class DestroyTimerEvent: public Event { public: DestroyTimerEvent(TimerID timer) : _timer (timer) {} - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { Platform::destroy_interval_timer(_timer); } @@ -866,7 +867,7 @@ public: { } - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { _channel.activate_streams_timer(); } diff --git a/client/event_sources.h b/client/event_sources.h new file mode 100644 index 00000000..a7b39aee --- /dev/null +++ b/client/event_sources.h @@ -0,0 +1,95 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _H_EVENT_SOURCES +#define _H_EVENT_SOURCES + +#include "common.h" +#include "event_sources_p.h" + +class EventSource; + +// TODO: the class is not thread safe +class EventSources: public EventSources_p { +public: + class Trigger; + class Socket; + class File; + class Handle; + + EventSources(); + virtual ~EventSources(); + + void add_trigger(Trigger& trigger); + void remove_trigger(Trigger& trigger); + void add_socket(Socket& socket); + void remove_socket(Socket& socket); + void add_file(File& file); + void remove_file(File& file); + void add_handle(Handle& handle); + void remove_handle(Handle& handle); + + /* return true if the events loop should quit */ + bool wait_events(int timeout_ms = INFINITE); +}; + +class EventSource { +public: + virtual ~EventSource() {} + virtual void on_event() = 0; + +private: + virtual void action() {on_event();} + + friend class EventSources; +}; + +class EventSources::Trigger: public EventSource, private Trigger_p { +public: + Trigger(); + virtual ~Trigger(); + virtual void trigger(); + virtual void reset(); + +private: + virtual void action(); + + friend class EventSources; +}; + +class EventSources::Socket: public EventSource { +protected: + virtual int get_socket() = 0; + + friend class EventSources; +}; + + +class EventSources::File: public EventSource { +protected: + virtual int get_fd() = 0; + + friend class EventSources; +}; + +class EventSources::Handle: public EventSource, public Handle_p { + + friend class EventSources; +}; + +#endif + diff --git a/client/events_loop.h b/client/events_loop.h index 50a83e1d..ac26a0e6 100644 --- a/client/events_loop.h +++ b/client/events_loop.h @@ -21,7 +21,7 @@ #include "common.h" #include "events_loop_p.h" -class EventSource; +class EventSourceOld; class EventsLoop: public EventsLoop_p { public: @@ -43,9 +43,9 @@ public: void run_once(int timeout_milli = INFINITE); }; -class EventSource { +class EventSourceOld { public: - virtual ~EventSource() {} + virtual ~EventSourceOld() {} virtual void on_event() = 0; private: @@ -54,7 +54,7 @@ private: friend class EventsLoop; }; -class EventsLoop::Trigger: public EventSource, private Trigger_p { +class EventsLoop::Trigger: public EventSourceOld, private EventsLoop_p::Trigger_p { public: Trigger(); virtual ~Trigger(); @@ -67,7 +67,7 @@ private: friend class EventsLoop; }; -class EventsLoop::Socket: public EventSource { +class EventsLoop::Socket: public EventSourceOld { protected: virtual int get_socket() = 0; @@ -75,7 +75,7 @@ protected: }; -class EventsLoop::File: public EventSource { +class EventsLoop::File: public EventSourceOld { protected: virtual int get_fd() = 0; diff --git a/client/inputs_channel.cpp b/client/inputs_channel.cpp index 8035bfb3..d686be22 100644 --- a/client/inputs_channel.cpp +++ b/client/inputs_channel.cpp @@ -28,9 +28,9 @@ class SetInputsHandlerEvent: public Event { public: SetInputsHandlerEvent(InputsChannel& channel) : _channel (channel) {} - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { - application.set_inputs_handler(_channel); + static_cast<Application*>(events_loop.get_owner())->set_inputs_handler(_channel); } private: @@ -41,7 +41,7 @@ class KeyModifiersEvent: public Event { public: KeyModifiersEvent(InputsChannel& channel) : _channel (channel) {} - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { Lock lock(_channel._update_modifiers_lock); _channel._active_modifiers_event = false; @@ -56,9 +56,9 @@ class RemoveInputsHandlerEvent: public SyncEvent { public: RemoveInputsHandlerEvent(InputsChannel& channel) : _channel (channel) {} - virtual void do_responce(Application& application) + virtual void do_response(AbstractProcessLoop& events_loop) { - application.remove_inputs_handler(_channel); + static_cast<Application*>(events_loop.get_owner())->remove_inputs_handler(_channel); } private: diff --git a/client/platform.h b/client/platform.h index 0982bf63..a4310d98 100644 --- a/client/platform.h +++ b/client/platform.h @@ -19,12 +19,10 @@ #define _H_PLATFORM #include "cursor.h" +#include "process_loop.h" +#include "event_sources.h" #include "events_loop.h" -#define INVALID_TIMER (~TimerID(0)) -typedef unsigned long TimerID; -typedef void (*timer_proc_t)(void* opaque, TimerID timer); - class WaveRecordAbstract; class WavePlaybackAbstract; class Icon; @@ -32,19 +30,19 @@ class Icon; class Monitor; typedef std::list<Monitor*> MonitorsList; +/* TODO: tmp till each channel will handle its own thread + timers or directly through the main thread */ +#define INVALID_TIMER (~TimerID(0)) +typedef unsigned long TimerID; +typedef void (*timer_proc_t)(void* opaque, TimerID timer); + + class Platform { public: static void init(); - static void wait_events(); - static bool process_events(); - static void wakeup(); + static void set_process_loop(ProcessLoop& main_process_loop); static void msleep(unsigned int millisec); static void yield(); - static void send_quit_request(); - static TimerID create_interval_timer(timer_proc_t proc, void* opaque); - static bool activate_interval_timer(TimerID timer, unsigned int millisec); - static bool deactivate_interval_timer(TimerID timer); - static void destroy_interval_timer(TimerID timer); static uint64_t get_monolithic_time(); static void get_temp_dir(std::string& path); @@ -52,6 +50,8 @@ public: static void destroy_monitors(); static bool is_monitors_pos_valid(); + static void send_quit_request(); + enum ThreadPriority { PRIORITY_INVALID, PRIORITY_TIME_CRITICAL, @@ -98,6 +98,12 @@ public: class DisplayModeListner; static void set_display_mode_listner(DisplayModeListner* listener); + /* TODO: tmp till each channel will handle its own thread + timers or directly through the main thread */ + static TimerID create_interval_timer(timer_proc_t proc, void* opaque); + static bool activate_interval_timer(TimerID timer, unsigned int millisec); + static bool deactivate_interval_timer(TimerID timer); + static void destroy_interval_timer(TimerID timer); }; class Platform::EventListener { @@ -108,6 +114,7 @@ public: virtual void on_monitors_change() = 0; }; +// TODO: tmp till all channels work with ProcessLoop class Platform::RecordClinet { public: virtual ~RecordClinet() {} diff --git a/client/process_loop.cpp b/client/process_loop.cpp new file mode 100644 index 00000000..41aac43e --- /dev/null +++ b/client/process_loop.cpp @@ -0,0 +1,390 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "common.h" +#include "process_loop.h" +#include "debug.h" +#include "platform.h" +#include "utils.h" + +SyncEvent::SyncEvent() + : _err (false) + , _ready (false) +{ +} + +SyncEvent::~SyncEvent() +{ +} + +void SyncEvent::response(AbstractProcessLoop& events_loop) +{ + try { + do_response(events_loop); + } catch (Exception& e) { + LOG_WARN("unhandle exception: %s", e.what()); + _err = true; + } catch (...) { + _err = true; + } + Lock lock(_mutex); + _ready = true; + _condition.notify_one(); +} + +void SyncEvent::wait() +{ +#ifdef RED_DEBUG + ASSERT(!_process_loop || !_process_loop->is_same_thread(pthread_self())); +#endif + Lock lock(_mutex); + while (!_ready) { + _condition.wait(lock); + } +} + +class ProcessLoop::QuitEvent: public Event { +public: + QuitEvent(int error_code) : _error_code(error_code) {} + virtual void response(AbstractProcessLoop& events_loop); +private: + int _error_code; +}; + +void ProcessLoop::QuitEvent::response(AbstractProcessLoop& events_loop) +{ + events_loop.do_quit(_error_code); +} + +/* EventsQueue */ + +EventsQueue::EventsQueue(AbstractProcessLoop& owner) + : _events_gen (0) + , _owner (owner) +{ +} + +EventsQueue::~EventsQueue() +{ + clear_queue(); +} + +void EventsQueue::clear_queue() +{ + Lock lock(_events_lock); + while (!_events.empty()) { + Event* event = _events.front(); + _events.pop_front(); + event->unref(); + } +} + +int EventsQueue::push_event(Event* event) +{ + Lock lock(_events_lock); + _events.push_back(event); + event->_generation = _events_gen; + event->ref(); +#ifdef RED_DEBUG + event->set_process_loop(&_owner); +#endif + return _events.size(); +} + +void EventsQueue::process_events() +{ + _events_gen++; + + for (;;) { + Event* event; + Lock lock(_events_lock); + if (_events.empty()) { + return; + } + event = _events.front(); + if (event->_generation == _events_gen) { + return; + } + _events.pop_front(); + + lock.unlock(); + event->response(_owner); +#ifdef RED_DEBUG + event->set_process_loop(NULL); +#endif + event->unref(); + } +} + +bool EventsQueue::is_empty() +{ + Lock lock(_events_lock); + return _events.empty(); +} + +/* Timers Queue */ + +Timer::Timer() + : _is_armed (false) +{ +} + +Timer::~Timer() +{ +} + +void Timer::arm(uint32_t msec) +{ + _interval = msec; + _expiration = get_now(); + calc_next_expiration_time(); + _is_armed = true; +} + +void Timer::disarm() +{ + _is_armed = false; +} + +uint64_t Timer::get_now() +{ + return (Platform::get_monolithic_time() / 1000 / 1000); +} + +TimersQueue::TimersQueue(AbstractProcessLoop& owner) + : _owner (owner) +{ +} + +TimersQueue::~TimersQueue() +{ + clear_queue(); +} + +void TimersQueue::clear_queue() +{ + RecurciveLock lock(_timers_lock); + TimersSet::iterator iter; + for (iter = _armed_timers.begin(); iter != _armed_timers.end(); iter++) { + (*iter)->disarm(); + } + _armed_timers.clear(); +} + +void TimersQueue::activate_interval_timer(Timer* timer, unsigned int millisec) +{ + RecurciveLock lock(_timers_lock); + timer->ref(); + deactivate_interval_timer(timer); + timer->arm(millisec); + _armed_timers.insert(timer); +} + +void TimersQueue::deactivate_interval_timer(Timer* timer) +{ + RecurciveLock lock(_timers_lock); + if (timer->is_armed()) { +#ifdef RED_DEBUG + int ret = +#endif + _armed_timers.erase(timer); + ASSERT(ret); + timer->disarm(); + timer->unref(); + } +} + +int TimersQueue::get_soonest_timeout() +{ + RecurciveLock lock(_timers_lock); + TimersSet::iterator iter; + iter = _armed_timers.begin(); + if (iter == _armed_timers.end()) { + return INFINITE; + } + + uint64_t now = Timer::get_now(); + uint64_t next_time = (*iter)->get_expiration(); + + if (next_time <= now) { + return 0; + } + return (int)(next_time - now); +} + +#define TIMER_COMPENSATION + +void TimersQueue::timers_action() +{ + RecurciveLock lock(_timers_lock); + uint64_t now = Timer::get_now(); + TimersSet::iterator iter; + + while (((iter = _armed_timers.begin()) != _armed_timers.end()) && + ((*iter)->get_expiration() <= now)) { + Timer* timer = *iter; + _armed_timers.erase(iter); +#ifndef TIMER_COMPENSATION + timer->_experatoin = now; +#endif + timer->calc_next_expiration_time(); +#ifdef TIMER_COMPENSATION + if (timer->_expiration <= now) { + timer->_expiration = now; + timer->calc_next_expiration_time(); + } +#endif + _armed_timers.insert(timer); + timer->response(_owner); + } +} + +ProcessLoop::ProcessLoop(void* owner) + : _events_queue (*this) + , _timers_queue (*this) + , _owner (owner) + , _quitting (false) + , _exit_code (0) + , _started (false) + +{ + _event_sources.add_trigger(_wakeup_trigger); +} + +ProcessLoop::~ProcessLoop() +{ + _event_sources.remove_trigger(_wakeup_trigger); +} + +int ProcessLoop::run() +{ + _thread = pthread_self(); + _started = true; + for (;;) { + if (_event_sources.wait_events(_timers_queue.get_soonest_timeout())) { + _quitting = true; + break; + } + _timers_queue.timers_action(); + process_events_queue(); + if (_quitting) { + break; + } + } + + return _exit_code; +} + +void ProcessLoop::do_quit(int error_code) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + if (_quitting) { + return; + } + _quitting = true; + _exit_code = error_code; +} + +void ProcessLoop::quit(int error_code) +{ + AutoRef<QuitEvent> quit_event(new QuitEvent(error_code)); + push_event(*quit_event); +} + +void ProcessLoop::process_events_queue() +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _events_queue.process_events(); + if (!_events_queue.is_empty()) { + wakeup(); + } +} + +void ProcessLoop::wakeup() +{ + _wakeup_trigger.trigger(); +} + +void ProcessLoop::add_trigger(EventSources::Trigger& trigger) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.add_trigger(trigger); +} + +void ProcessLoop::remove_trigger(EventSources::Trigger& trigger) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.remove_trigger(trigger); +} + +void ProcessLoop::add_socket(EventSources::Socket& socket) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.add_socket(socket); +} + +void ProcessLoop::remove_socket(EventSources::Socket& socket) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.remove_socket(socket); +} + +void ProcessLoop::add_file(EventSources::File& file) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.add_file(file); +} + +void ProcessLoop::remove_file(EventSources::File& file) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.remove_file(file); +} + +void ProcessLoop::add_handle(EventSources::Handle& handle) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.add_handle(handle); +} + +void ProcessLoop::remove_handle(EventSources::Handle& handle) +{ + ASSERT(!_started || pthread_equal(pthread_self(), _thread)); + _event_sources.remove_handle(handle); +} + +void ProcessLoop::push_event(Event* event) +{ + int queue_size = _events_queue.push_event(event); + if (queue_size == 1) { // queue was empty before the push + wakeup(); + } +} + +void ProcessLoop::activate_interval_timer(Timer* timer, unsigned int millisec) +{ + _timers_queue.activate_interval_timer(timer, millisec); + + if (_started && !pthread_equal(pthread_self(), _thread)) { + wakeup(); + } +} + +void ProcessLoop::deactivate_interval_timer(Timer* timer) +{ + _timers_queue.deactivate_interval_timer(timer); +} diff --git a/client/process_loop.h b/client/process_loop.h new file mode 100644 index 00000000..0b052522 --- /dev/null +++ b/client/process_loop.h @@ -0,0 +1,233 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _H_PROCESS_LOOP +#define _H_PROCESS_LOOP + +#include "common.h" + +#include <set> + +#include "event_sources.h" +#include "threads.h" + +class AbstractProcessLoop { +public: + virtual ~AbstractProcessLoop() {} + virtual int run() = 0; + virtual void do_quit(int error_code) = 0; + virtual void* get_owner() = 0; + virtual bool is_same_thread(pthread_t thread) = 0; +}; + +class EventBase { +public: + EventBase() : _refs (1) {} + + virtual void response(AbstractProcessLoop& events_loop) = 0; + + EventBase* ref() { ++_refs; return this;} + void unref() {if (--_refs == 0) delete this;} + +protected: + virtual ~EventBase() {} + +private: + AtomicCount _refs; +}; + +class EventsQueue; + +class Event: public EventBase { +#ifdef RED_DEBUG +public: + Event() : _process_loop (NULL) {} + +private: + void set_process_loop(AbstractProcessLoop* process_loop) { _process_loop = process_loop;} + +protected: + AbstractProcessLoop* _process_loop; +#endif + +private: + uint32_t _generation; + + friend class EventsQueue; +}; + +class EventsQueue { +public: + EventsQueue(AbstractProcessLoop& owner); + virtual ~EventsQueue(); + /* return the size of the queue (post-push) */ + int push_event(Event* event); + void process_events(); + bool is_empty(); + +private: + void clear_queue(); + +private: + std::list<Event*> _events; + Mutex _events_lock; + uint32_t _events_gen; + + AbstractProcessLoop& _owner; +}; + +class SyncEvent: public Event { +public: + SyncEvent(); + + void wait(); + bool success() { return !_err;} + + virtual void do_response(AbstractProcessLoop& events_loop) {} + +protected: + virtual ~SyncEvent(); + +private: + virtual void response(AbstractProcessLoop& events_loop); + +private: + Mutex _mutex; + Condition _condition; + bool _err; + bool _ready; +}; + +class TimersQueue; + +class Timer: public EventBase { +public: + Timer(); + bool is_armed() {return _is_armed;} + +protected: + virtual ~Timer(); + +private: + void arm(uint32_t msec); + void disarm(); + uint64_t get_expiration() const { return _expiration;} + void calc_next_expiration_time() { _expiration += _interval;} + + static uint64_t get_now(); + +private: + bool _is_armed; + uint32_t _interval; + uint64_t _expiration; + + class Compare { + public: + bool operator () (const Timer* timer1, const Timer* timer2) const + { + if (timer1->get_expiration() < timer2->get_expiration()) { + return true; + } else if (timer1->get_expiration() > timer2->get_expiration()) { + return false; + } else { // elements must be unique (for insertion into set) + return timer1 < timer2; + } + } + }; + + friend class TimersQueue; +}; + +class TimersQueue { +public: + TimersQueue(AbstractProcessLoop& owner); + virtual ~TimersQueue(); + + void activate_interval_timer(Timer* timer, unsigned int millisec); + void deactivate_interval_timer(Timer* timer); + + int get_soonest_timeout(); + void timers_action(); + +private: + void clear_queue(); + +private: + typedef std::set<Timer*, Timer::Compare> TimersSet; + TimersSet _armed_timers; + RecurciveMutex _timers_lock; + AbstractProcessLoop& _owner; +}; + +class ProcessLoop: public AbstractProcessLoop { +public: + class QuitEvent; + ProcessLoop(void* owner); + virtual ~ProcessLoop(); + int run(); + + void quit(int error_code); + + /* Event sources to track. Note: the following methods are not thread safe, thus, + they mustn't be called from other thread than the process loop thread. */ + void add_trigger(EventSources::Trigger& trigger); + void remove_trigger(EventSources::Trigger& trigger); + void add_socket(EventSources::Socket& socket); + void remove_socket(EventSources::Socket& socket); + void add_file(EventSources::File& file); + void remove_file(EventSources::File& file); + void add_handle(EventSources::Handle& handle); + void remove_handle(EventSources::Handle& handle); + + /* events queue */ + void push_event(Event* event); + + void activate_interval_timer(Timer* timer, unsigned int millisec); + void deactivate_interval_timer(Timer* timer); + + void process_events_queue(); + void* get_owner() { return _owner;} + + bool is_same_thread(pthread_t thread) { return _started && pthread_equal(_thread, thread);} + +protected: + class WakeupTrigger: public EventSources::Trigger { + public: + virtual void on_event() {} + }; + + void wakeup(); + void do_quit(int error_code); + + friend class QuitEvent; // allowing access to quit + +private: + EventSources _event_sources; + EventsQueue _events_queue; + TimersQueue _timers_queue; + + WakeupTrigger _wakeup_trigger; + + void* _owner; + + bool _quitting; + int _exit_code; + bool _started; + pthread_t _thread; +}; + +#endif diff --git a/client/red_client.cpp b/client/red_client.cpp index 941321fb..2e4758cf 100644 --- a/client/red_client.cpp +++ b/client/red_client.cpp @@ -19,6 +19,7 @@ #include <math.h> #include "red_client.h" #include "application.h" +#include "process_loop.h" #include "utils.h" #include "debug.h" @@ -397,10 +398,12 @@ void RedClient::on_channel_disconnected(RedChannel& channel) _notify_disconnect = false; int connection_error = channel.get_connection_error(); if (connection_error == SPICEC_ERROR_CODE_SUCCESS) { + AutoRef<DisconnectedEvent> disconn_event(new DisconnectedEvent()); LOG_INFO("disconneted"); - push_event(new DisconnectedEvent()); + push_event(*disconn_event); } else { - push_event(new CoonnectionError(connection_error)); + AutoRef<ConnectionErrorEvent> error_event(new ConnectionErrorEvent(connection_error)); + push_event(*error_event); } } disconnect_channels(); diff --git a/client/red_client.h b/client/red_client.h index d799e788..e22cefd9 100644 --- a/client/red_client.h +++ b/client/red_client.h @@ -29,9 +29,9 @@ #include "audio_channels.h" #include "red.h" #include "vd_agent.h" +#include "process_loop.h" class Application; -class Event; class MigChannel: public RedChannelBase { public: diff --git a/client/screen.cpp b/client/screen.cpp index b33560ee..5d138a1c 100644 --- a/client/screen.cpp +++ b/client/screen.cpp @@ -30,9 +30,10 @@ class UpdateEvent: public Event { public: UpdateEvent(int screen) : _screen (screen) {} - virtual void responce(Application& application) + virtual void response(AbstractProcessLoop& events_loop) { - RedScreen* screen = application.find_screen(_screen); + Application* app = static_cast<Application*>(events_loop.get_owner()); + RedScreen* screen = app->find_screen(_screen); if (screen) { screen->update(); } diff --git a/client/screen_layer.cpp b/client/screen_layer.cpp index f521b465..9940ad86 100644 --- a/client/screen_layer.cpp +++ b/client/screen_layer.cpp @@ -24,18 +24,23 @@ class AttacheLayerEvent: public SyncEvent { public: - AttacheLayerEvent(ScreenLayer& layer, int screen_id) : _layer (layer), _screen_id (screen_id) {} + AttacheLayerEvent(ScreenLayer& layer, int screen_id) + : _layer (layer) + , _screen_id (screen_id) + { + } - virtual void do_responce(Application& application); + virtual void do_response(AbstractProcessLoop& events_loop); private: ScreenLayer& _layer; int _screen_id; }; -void AttacheLayerEvent::do_responce(Application& application) +void AttacheLayerEvent::do_response(AbstractProcessLoop& events_loop) { - AutoRef<RedScreen> screen(application.get_screen(_screen_id)); + Application* app = (Application*)(events_loop.get_owner()); + AutoRef<RedScreen> screen(app->get_screen(_screen_id)); (*screen)->attach_layer(_layer); } @@ -43,13 +48,13 @@ class DetacheLayerEvent: public SyncEvent { public: DetacheLayerEvent(ScreenLayer& _layer) : _layer (_layer) {} - virtual void do_responce(Application& application); + virtual void do_response(AbstractProcessLoop& events_loop); private: ScreenLayer& _layer; }; -void DetacheLayerEvent::do_responce(Application& application) +void DetacheLayerEvent::do_response(AbstractProcessLoop& events_loop) { _layer.screen()->detach_layer(_layer); } diff --git a/client/tunnel_channel.cpp b/client/tunnel_channel.cpp index 1d28ee48..5da18f56 100644 --- a/client/tunnel_channel.cpp +++ b/client/tunnel_channel.cpp @@ -256,18 +256,12 @@ TunnelChannel::TunnelChannel(RedClient& client, uint32_t id) sizeof(RedTunnelSocketClosedAck)); handler->set_handler(RED_TUNNEL_SOCKET_DATA, &TunnelChannel::handle_socket_data, sizeof(RedTunnelSocketData)); -#ifdef TUNNEL_CONFIG - _config_listener = new TunnelConfigListenerIfc(*this); -#endif } TunnelChannel::~TunnelChannel() { destroy_sockets(); OutSocketMessage::clear_free_messages(); -#ifdef TUNNEL_CONFIG - delete _config_listener; -#endif } void TunnelChannel::handle_init(RedPeer::InMessage* message) @@ -587,10 +581,21 @@ void TunnelChannel::destroy_sockets() } } +#ifdef TUNNEL_CONFIG +void TunnelChannel::on_connect() +{ + _config_listener = new TunnelConfigListenerIfc(*this); + +} +#endif + void TunnelChannel::on_disconnect() { destroy_sockets(); OutSocketMessage::clear_free_messages(); +#ifdef TUNNEL_CONFIG + delete _config_listener; +#endif } #ifdef TUNNEL_CONFIG @@ -646,23 +651,75 @@ ChannelFactory& TunnelChannel::Factory() } #ifdef TUNNEL_CONFIG +class CreatePipeListenerEvent: public SyncEvent { +public: + CreatePipeListenerEvent(NamedPipe::ListenerInterface& listener_ifc) + : _listener_ifc (listener_ifc) + { + } + + virtual void do_response(AbstractProcessLoop& events_loop) + { + _listener_ref = NamedPipe::create(TUNNEL_CONFIG_PIPE_NAME, _listener_ifc); + } + + NamedPipe::ListenerRef get_listener() { return _listener_ref;} +private: + NamedPipe::ListenerInterface& _listener_ifc; + NamedPipe::ListenerRef _listener_ref; +}; + +class DestroyPipeListenerEvent: public SyncEvent { +public: + DestroyPipeListenerEvent(NamedPipe::ListenerRef listener_ref) + : _listener_ref (listener_ref) + { + } + + virtual void do_response(AbstractProcessLoop& events_loop) + { + NamedPipe::destroy(_listener_ref); + } + +private: + NamedPipe::ListenerRef _listener_ref; +}; + +class DestroyPipeConnectionEvent: public SyncEvent { +public: + DestroyPipeConnectionEvent(NamedPipe::ConnectionRef ref) : _conn_ref(ref) {} + virtual void do_response(AbstractProcessLoop& events_loop) + { + NamedPipe::destroy_connection(_conn_ref); + } +private: + NamedPipe::ConnectionRef _conn_ref; +}; + TunnelConfigListenerIfc::TunnelConfigListenerIfc(TunnelChannel& tunnel) - : _tunnel(tunnel) + : _tunnel (tunnel) { - _listener_ref = NamedPipe::create(TUNNEL_CONFIG_PIPE_NAME, *this); + AutoRef<CreatePipeListenerEvent> event(new CreatePipeListenerEvent(*this)); + _tunnel.get_client().push_event(*event); + (*event)->wait(); + _listener_ref = (*event)->get_listener(); } TunnelConfigListenerIfc::~TunnelConfigListenerIfc() { + AutoRef<DestroyPipeListenerEvent> listen_event(new DestroyPipeListenerEvent(_listener_ref)); + _tunnel.get_client().push_event(*listen_event); + (*listen_event)->wait(); for (std::list<TunnelConfigConnectionIfc*>::iterator it = _connections.begin(); it != _connections.end(); ++it) { if ((*it)->get_ref() != NamedPipe::INVALID_CONNECTION) { - NamedPipe::destroy_connection((*it)->get_ref()); + AutoRef<DestroyPipeConnectionEvent> conn_event(new DestroyPipeConnectionEvent( + (*it)->get_ref())); + _tunnel.get_client().push_event(*conn_event); + (*conn_event)->wait(); } delete (*it); } - - NamedPipe::destroy(_listener_ref); } NamedPipe::ConnectionInterface& TunnelConfigListenerIfc::create() @@ -684,11 +741,11 @@ void TunnelConfigListenerIfc::destroy_connection(TunnelConfigConnectionIfc* conn TunnelConfigConnectionIfc::TunnelConfigConnectionIfc(TunnelChannel& tunnel, TunnelConfigListenerIfc& listener) - : _tunnel(tunnel) - , _listener(listener) - , _in_msg_len(0) - , _out_msg("") - , _out_msg_pos(0) + : _tunnel (tunnel) + , _listener (listener) + , _in_msg_len (0) + , _out_msg ("") + , _out_msg_pos (0) { } diff --git a/client/tunnel_channel.h b/client/tunnel_channel.h index 4fd3465c..b0f26977 100644 --- a/client/tunnel_channel.h +++ b/client/tunnel_channel.h @@ -61,6 +61,7 @@ protected: class TunnelSocket; virtual void on_disconnect(); + virtual void on_connect(); private: void handle_init(RedPeer::InMessage* message); @@ -88,6 +89,7 @@ private: uint32_t _service_group; #ifdef TUNNEL_CONFIG TunnelConfigListenerIfc* _config_listener; + friend class TunnelConfigListenerIfc; #endif }; diff --git a/client/windows/event_sources_p.cpp b/client/windows/event_sources_p.cpp new file mode 100644 index 00000000..4a11b779 --- /dev/null +++ b/client/windows/event_sources_p.cpp @@ -0,0 +1,209 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "common.h" +#include "event_sources.h" +#include "debug.h" +#include "utils.h" + +bool EventSources_p::process_system_events() +{ + MSG msg; + while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)) { + + if (msg.message == WM_QUIT) { + return true; + } + TranslateMessage(&msg); + DispatchMessage(&msg); + } + return false; +} + +void EventSources_p::add_event(HANDLE event, EventSource* source) +{ + int size = _events.size(); + _events.resize(size + 1); + _handles.resize(size + 1); + _events[size] = source; + _handles[size] = event; +} + +void EventSources_p::remove_event(EventSource* source) +{ + int size = _events.size(); + for (int i = 0; i < size; i++) { + if (_events[i] == source) { + for (i++; i < size; i++) { + _events[i - 1] = _events[i]; + _handles[i - 1] = _handles[i]; + } + _events.resize(size - 1); + _handles.resize(size - 1); + return; + } + } + THROW("event not found"); +} + +EventSources::EventSources() +{ +} + +EventSources::~EventSources() +{ +} + +bool EventSources::wait_events(int timeout_ms) +{ + if (_handles.empty()) { + if (WaitMessage()) { + return process_system_events(); + } else { + THROW("wait failed %d", GetLastError()); + } + } + + DWORD wait_res = MsgWaitForMultipleObjectsEx(_handles.size(), &_handles[0], timeout_ms, + QS_ALLINPUT, 0); + if (wait_res == WAIT_TIMEOUT) { + return false; + } + + if (wait_res == WAIT_FAILED) { + THROW("wait failed %d", GetLastError()); + } + + int event_index = wait_res - WAIT_OBJECT_0; + if (event_index == _handles.size()) { + return process_system_events(); + } else if ((event_index >= 0) && (event_index < (int)_handles.size())) { + _events[event_index]->action(); + return false; + } else { + THROW("invalid event id"); + } +} + +void EventSources::add_socket(Socket& socket) +{ + HANDLE event = CreateEvent(NULL, FALSE, FALSE, NULL); + if (!event) { + THROW("create event failed"); + } + if (WSAEventSelect(socket.get_socket(), event, + FD_READ | FD_WRITE | FD_CLOSE) == SOCKET_ERROR) { + CloseHandle(event); + THROW("event select failed"); + } + add_event(event, &socket); +} + +void EventSources::remove_socket(Socket& socket) +{ + int size = _events.size(); + for (int i = 0; i < size; i++) { + if (_events[i] == &socket) { + if (WSAEventSelect(socket.get_socket(), NULL, 0) == SOCKET_ERROR) { + THROW("event select failed"); + } + u_long arg = 0; + if (ioctlsocket(socket.get_socket(), FIONBIO, &arg) == SOCKET_ERROR) { + THROW("set blocking mode failed"); + } + CloseHandle(_handles[i]); + for (i++; i < size; i++) { + _events[i - 1] = _events[i]; + _handles[i - 1] = _handles[i]; + } + _events.resize(size - 1); + _handles.resize(size - 1); + return; + } + } + THROW("socket not found"); +} + +void EventSources::add_handle(Handle& handle) +{ + add_event(handle.get_handle(), &handle); +} + +void EventSources::remove_handle(Handle& handle) +{ + remove_event(&handle); +} + +Handle_p::Handle_p() +{ + if (!(_event = CreateEvent(NULL, FALSE, FALSE, NULL))) { + THROW("create event failed"); + } +} + +Handle_p::~Handle_p() +{ + CloseHandle(_event); +} + +void EventSources::add_trigger(Trigger& trigger) +{ + add_event(trigger.get_handle(), &trigger); +} + +void EventSources::remove_trigger(Trigger& trigger) +{ + remove_event(&trigger); +} + + +EventSources::Trigger::Trigger() +{ +} + +EventSources::Trigger::~Trigger() +{ +} + + +void EventSources::Trigger::trigger() +{ + if (!SetEvent(_event)) { + THROW("set event failed"); + } +} + +void EventSources::Trigger::reset() +{ + if (!ResetEvent(_event)) { + THROW("set event failed"); + } +} + +void EventSources::Trigger::action() +{ + on_event(); +} + +void EventSources::add_file(File& file) +{ +} + +void EventSources::remove_file(File& file) +{ +} + diff --git a/client/windows/event_sources_p.h b/client/windows/event_sources_p.h new file mode 100644 index 00000000..3477605e --- /dev/null +++ b/client/windows/event_sources_p.h @@ -0,0 +1,52 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _H_EVENT_SOURCES_P +#define _H_EVENT_SOURCES_P + +#include "common.h" + +#include <vector> + +class EventSource; +class Handle_p; + +class EventSources_p { +protected: + /* return true if quit should be performed */ + bool process_system_events(); + void add_event(HANDLE event, EventSource* source); + void remove_event(EventSource* source); +public: + std::vector<EventSource*> _events; + std::vector<HANDLE> _handles; +}; + +class Handle_p { +public: + Handle_p(); + virtual ~Handle_p(); + HANDLE get_handle() { return _event;} +protected: + HANDLE _event; +}; + +class Trigger_p: public Handle_p { +}; + +#endif + diff --git a/client/windows/events_loop_p.h b/client/windows/events_loop_p.h index 8361398d..6bac7b94 100644 --- a/client/windows/events_loop_p.h +++ b/client/windows/events_loop_p.h @@ -22,15 +22,17 @@ #include <vector> -class EventSource; +class EventSourceOld; class EventsLoop_p { public: - std::vector<EventSource*> _events; + class Trigger_p; +public: + std::vector<EventSourceOld*> _events; std::vector<HANDLE> _handles; }; -class Trigger_p { +class EventsLoop_p::Trigger_p { public: HANDLE get_handle() { return event;} diff --git a/client/windows/named_pipe.cpp b/client/windows/named_pipe.cpp index f33c476e..44459fab 100644 --- a/client/windows/named_pipe.cpp +++ b/client/windows/named_pipe.cpp @@ -20,23 +20,22 @@ #include "utils.h" #include "debug.h" -PipeBuffer::PipeBuffer(HANDLE pipe) +PipeBuffer::PipeBuffer(HANDLE pipe, ProcessLoop& process_loop) : _handler (NULL) , _pipe (pipe) , _start (0) , _end (0) , _pending (false) + , _process_loop(process_loop) { ZeroMemory(&_overlap, sizeof(_overlap)); - _overlap.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - _event_handle = _overlap.hEvent; - WinPlatform::add_event(*this); + _overlap.hEvent = this->get_handle(); + _process_loop.add_handle(*this); } PipeBuffer::~PipeBuffer() { - WinPlatform::remove_event(*this); - CloseHandle(_event_handle); + _process_loop.remove_handle(*this); } DWORD PipeBuffer::get_overlapped_bytes() @@ -127,10 +126,10 @@ void PipeWriter::on_event() } } -WinConnection::WinConnection(HANDLE pipe) +WinConnection::WinConnection(HANDLE pipe, ProcessLoop& process_loop) : _pipe (pipe) - , _writer (pipe) - , _reader (pipe) + , _writer (pipe, process_loop) + , _reader (pipe, process_loop) { } @@ -158,24 +157,24 @@ void WinConnection::set_handler(NamedPipe::ConnectionInterface* handler) _writer.set_handler(handler); } -WinListener::WinListener(const char *name, NamedPipe::ListenerInterface &listener_interface) +WinListener::WinListener(const char *name, NamedPipe::ListenerInterface &listener_interface, + ProcessLoop& process_loop) : _listener_interface (listener_interface) , _pipe (0) + , _process_loop (process_loop) { _pipename = new TCHAR[PIPE_MAX_NAME_LEN]; swprintf_s(_pipename, PIPE_MAX_NAME_LEN, L"%s%S", PIPE_PREFIX, name); ZeroMemory(&_overlap, sizeof(_overlap)); - _overlap.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - _event_handle = _overlap.hEvent; - WinPlatform::add_event(*this); + _overlap.hEvent = this->get_handle(); + _process_loop.add_handle(*this); create_pipe(); } WinListener::~WinListener() { CancelIo(_pipe); - WinPlatform::remove_event(*this); - CloseHandle(_event_handle); + _process_loop.remove_handle(*this); delete[] _pipename; } @@ -188,7 +187,7 @@ void WinListener::on_event() return; } DBG(0, "Pipe connected 0x%p", _pipe); - WinConnection *con = new WinConnection(_pipe); + WinConnection *con = new WinConnection(_pipe, _process_loop); NamedPipe::ConnectionInterface &con_interface = _listener_interface.create(); con->set_handler(&con_interface); con_interface.bind((NamedPipe::ConnectionRef)con); @@ -213,7 +212,7 @@ void WinListener::create_pipe() break; case ERROR_PIPE_CONNECTED: { DBG(0, "Pipe already connected"); - WinConnection *con = new WinConnection(_pipe); + WinConnection *con = new WinConnection(_pipe, _process_loop); NamedPipe::ConnectionInterface &con_interface = _listener_interface.create(); con->set_handler(&con_interface); con_interface.bind((NamedPipe::ConnectionRef)con); diff --git a/client/windows/named_pipe.h b/client/windows/named_pipe.h index 578c34d2..fcdc0dd7 100644 --- a/client/windows/named_pipe.h +++ b/client/windows/named_pipe.h @@ -19,8 +19,9 @@ #define _H_NAMED_PIPE #include <windows.h> +#include "process_loop.h" +#include "event_sources.h" #include "platform.h" -#include "win_platform.h" #define PIPE_TIMEOUT 5000 #define PIPE_BUF_SIZE 8192 @@ -29,9 +30,9 @@ class WinConnection; -class PipeBuffer: public EventOwner { +class PipeBuffer: public EventSources::Handle { public: - PipeBuffer(HANDLE pipe); + PipeBuffer(HANDLE pipe, ProcessLoop& process_loop); ~PipeBuffer(); void set_handler(NamedPipe::ConnectionInterface* handler) { _handler = handler;} DWORD get_overlapped_bytes(); @@ -44,25 +45,26 @@ protected: uint32_t _end; uint8_t _data[PIPE_BUF_SIZE]; bool _pending; + ProcessLoop& _process_loop; }; class PipeReader: public PipeBuffer { public: - PipeReader(HANDLE pipe) : PipeBuffer(pipe) {} + PipeReader(HANDLE pipe, ProcessLoop& process_loop) : PipeBuffer(pipe, process_loop) {} int32_t read(uint8_t *buf, int32_t size); void on_event(); }; class PipeWriter: public PipeBuffer { public: - PipeWriter(HANDLE pipe) : PipeBuffer(pipe) {} + PipeWriter(HANDLE pipe, ProcessLoop& process_loop) : PipeBuffer(pipe, process_loop) {} int32_t write(const uint8_t *buf, int32_t size); void on_event(); }; class WinConnection { public: - WinConnection(HANDLE pipe); + WinConnection(HANDLE pipe, ProcessLoop& process_loop); ~WinConnection(); int32_t read(uint8_t *buf, int32_t size); int32_t write(const uint8_t *buf, int32_t size); @@ -74,9 +76,10 @@ private: PipeReader _reader; }; -class WinListener: public EventOwner { +class WinListener: public EventSources::Handle { public: - WinListener(const char *name, NamedPipe::ListenerInterface &listener_interface); + WinListener(const char *name, NamedPipe::ListenerInterface &listener_interface, + ProcessLoop& process_loop); ~WinListener(); void on_event(); @@ -88,6 +91,7 @@ private: NamedPipe::ListenerInterface &_listener_interface; OVERLAPPED _overlap; HANDLE _pipe; + ProcessLoop& _process_loop; }; #endif diff --git a/client/windows/platform.cpp b/client/windows/platform.cpp index 773fa614..b5faf758 100644 --- a/client/windows/platform.cpp +++ b/client/windows/platform.cpp @@ -28,9 +28,6 @@ #include "cursor.h" #include "named_pipe.h" -#define WM_USER_WAKEUP WM_USER -#define NUM_TIMERS 100 - int gdi_handlers = 0; extern HINSTANCE instance; @@ -44,61 +41,17 @@ public: static DefaultEventListener default_event_listener; static Platform::EventListener* event_listener = &default_event_listener; static HWND paltform_win; -static HANDLE main_tread; - -struct Timer { - TimerID id; - timer_proc_t proc; - void* opaque; - Timer *next; -}; - -Timer timers[NUM_TIMERS]; -Timer* free_timers = NULL; -Mutex timers_lock; - -static void free_timer(Timer* timer) -{ - Lock lock(timers_lock); - timer->proc = NULL; - timer->next = free_timers; - free_timers = timer; -} +static ProcessLoop* main_loop = NULL; -static void init_timers() -{ - for (int i = 0; i < NUM_TIMERS; i++) { - timers[i].id = i; - free_timer(&timers[i]); - } -} - -static Timer* alloc_timer() +void Platform::send_quit_request() { - Timer* timer; - - Lock lock(timers_lock); - if (!(timer = free_timers)) { - return NULL; - } - - free_timers = free_timers->next; - return timer; + ASSERT(main_loop); + main_loop->quit(0); } static LRESULT CALLBACK PlatformWinProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam) { switch (message) { - case WM_TIMER: { - TimerID id = wParam - 1; - ASSERT(id < NUM_TIMERS); - Timer* timer = &timers[id]; - timer->proc(timer->opaque, id); - break; - } - case WM_USER_WAKEUP: { - break; - } case WM_ACTIVATEAPP: if (wParam) { event_listener->on_app_activated(); @@ -147,80 +100,21 @@ static void create_message_wind() paltform_win = window; } -void Platform::send_quit_request() -{ - ASSERT(GetCurrentThread() == main_tread); - PostQuitMessage(0); -} - -static std::vector<HANDLE> events; -static std::vector<EventOwner*> events_owners; - -void WinPlatform::add_event(EventOwner& event_owner) -{ - ASSERT(main_tread == GetCurrentThread()); - int size = events.size(); - if (size == MAXIMUM_WAIT_OBJECTS - 1) { - THROW("reached maximum allowed events to wait for"); - } - events.resize(size + 1); - events_owners.resize(size + 1); - events[size] = event_owner.get_event_handle(); - events_owners[size] = &event_owner; -} - -void WinPlatform::remove_event(EventOwner& event_owner) -{ - ASSERT(main_tread == GetCurrentThread()); - int size = events.size(); - for (int i = 0; i < size; i++) { - if (events_owners[i] == &event_owner) { - for (i++; i < size; i++) { - events[i - 1] = events[i]; - events_owners[i - 1] = events_owners[i]; - } - events.resize(size - 1); - events_owners.resize(size - 1); - return; - } - } - THROW("event owner not found"); -} - -void Platform::wait_events() -{ - if (!events.size()) { - if (!WaitMessage()) { - THROW("wait failed %d", GetLastError()); - } - return; - } - - DWORD r = MsgWaitForMultipleObjectsEx(events.size(), &events[0], INFINITE, QS_ALLINPUT, 0); - if (r == WAIT_OBJECT_0 + events.size()) { - return; - } - if (r >= WAIT_OBJECT_0 && r <= WAIT_OBJECT_0 + events.size() - 1) { - events_owners[r - WAIT_OBJECT_0]->on_event(); - } else if (r == WAIT_FAILED) { - THROW("wait multiple failed %d", GetLastError()); - } else { - THROW("unexpected wait return %u", r); - } -} - NamedPipe::ListenerRef NamedPipe::create(const char *name, ListenerInterface& listener_interface) { - return (ListenerRef)(new WinListener(name, listener_interface)); + ASSERT(main_loop && main_loop->is_same_thread(pthread_self())); + return (ListenerRef)(new WinListener(name, listener_interface, *main_loop)); } void NamedPipe::destroy(ListenerRef listener_ref) { + ASSERT(main_loop && main_loop->is_same_thread(pthread_self())); delete (WinListener *)listener_ref; } void NamedPipe::destroy_connection(ConnectionRef conn_ref) { + ASSERT(main_loop && main_loop->is_same_thread(pthread_self())); delete (WinConnection *)conn_ref; } @@ -234,26 +128,6 @@ int32_t NamedPipe::write(ConnectionRef conn_ref, const uint8_t *buf, int32_t siz return ((WinConnection *)conn_ref)->write(buf, size); } -void Platform::wakeup() -{ - if (!PostMessage(paltform_win, WM_USER_WAKEUP, 0, 0)) { - THROW("post failed %d", GetLastError()); - } -} - -bool Platform::process_events() -{ - MSG msg; - while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)) { - if (msg.message == WM_QUIT) { - return true; - } - TranslateMessage(&msg); - DispatchMessage(&msg); - } - return false; -} - void Platform::msleep(unsigned int msec) { Sleep(msec); @@ -302,48 +176,6 @@ void Platform::set_event_listener(EventListener* listener) event_listener = listener ? listener : &default_event_listener; } -TimerID Platform::create_interval_timer(timer_proc_t proc, void* opaque) -{ - Timer* timer = alloc_timer(); - if (!timer) { - return INVALID_TIMER; - } - timer->proc = proc; - timer->opaque = opaque; - return timer->id; -} - -bool Platform::activate_interval_timer(TimerID timer, unsigned int millisec) -{ - if (timer >= NUM_TIMERS) { - return false; - } - - if (!SetTimer(paltform_win, timer + 1, millisec, NULL)) { - return false; - } - return true; -} - -bool Platform::deactivate_interval_timer(TimerID timer) -{ - if (timer >= NUM_TIMERS) { - return false; - } - KillTimer(paltform_win, timer + 1); - return true; -} - -void Platform::destroy_interval_timer(TimerID timer) -{ - if (timer == INVALID_TIMER) { - return; - } - ASSERT(timer < NUM_TIMERS); - KillTimer(paltform_win, timer + 1); - free_timer(&timers[timer]); -} - uint64_t Platform::get_monolithic_time() { return uint64_t(GetTickCount()) * 1000 * 1000; @@ -565,9 +397,12 @@ bool Platform::is_monitors_pos_valid() void Platform::init() { - main_tread = GetCurrentThread(); create_message_wind(); - init_timers(); +} + +void Platform::set_process_loop(ProcessLoop& main_process_loop) +{ + main_loop = &main_process_loop; } WaveRecordAbstract* Platform::create_recorder(RecordClinet& client, @@ -664,7 +499,7 @@ WinLocalCursor::WinLocalCursor(CursorData* cursor_data) icon.yHotspot = header.hot_spot_y; icon.hbmColor = icon.hbmMask = NULL; HDC hdc = GetDC(NULL); - + switch (header.type) { case CURSOR_TYPE_ALPHA: case CURSOR_TYPE_COLOR32: @@ -784,3 +619,37 @@ Icon* Platform::load_icon(int id) return new WinIcon(icon); } +class PlatformTimer: public Timer { +public: + PlatformTimer(timer_proc_t proc, void* opaque) : _proc (proc), _opaque (opaque) {} + void response(AbstractProcessLoop& events_loop) {_proc(_opaque, (TimerID)this);} + +private: + timer_proc_t _proc; + void* _opaque; +}; + +TimerID Platform::create_interval_timer(timer_proc_t proc, void* opaque) +{ + return (TimerID)(new PlatformTimer(proc, opaque)); +} + +bool Platform::activate_interval_timer(TimerID timer, unsigned int millisec) +{ + ASSERT(main_loop); + main_loop->activate_interval_timer((PlatformTimer*)timer, millisec); + return true; +} + +bool Platform::deactivate_interval_timer(TimerID timer) +{ + ASSERT(main_loop); + main_loop->deactivate_interval_timer((PlatformTimer*)timer); + return true; +} + +void Platform::destroy_interval_timer(TimerID timer) +{ + deactivate_interval_timer(timer); + ((PlatformTimer*)timer)->unref(); +} diff --git a/client/windows/redc.vcproj b/client/windows/redc.vcproj index f65e3a2f..c63e1f1c 100644 --- a/client/windows/redc.vcproj +++ b/client/windows/redc.vcproj @@ -225,6 +225,10 @@ >
</File>
<File
+ RelativePath=".\event_sources_p.cpp"
+ >
+ </File>
+ <File
RelativePath=".\events_loop_p.cpp"
>
</File>
@@ -311,6 +315,10 @@ >
</File>
<File
+ RelativePath="..\process_loop.cpp"
+ >
+ </File>
+ <File
RelativePath="..\quic.cpp"
>
</File>
@@ -437,6 +445,14 @@ >
</File>
<File
+ RelativePath="..\event_sources.h"
+ >
+ </File>
+ <File
+ RelativePath=".\event_sources_p.h"
+ >
+ </File>
+ <File
RelativePath="..\events_loop.h"
>
</File>
@@ -513,6 +529,10 @@ >
</File>
<File
+ RelativePath="..\process_loop.h"
+ >
+ </File>
+ <File
RelativePath=".\record.h"
>
</File>
diff --git a/client/windows/win_platform.h b/client/windows/win_platform.h index ddd0cd59..a821f63d 100644 --- a/client/windows/win_platform.h +++ b/client/windows/win_platform.h @@ -20,22 +20,6 @@ #include "icon.h" -class EventOwner { -public: - EventOwner() : _event_handle (0) {} - HANDLE const get_event_handle() { return _event_handle;} - virtual void on_event() = 0; - -protected: - HANDLE _event_handle; -}; - -class WinPlatform { -public: - static void add_event(EventOwner& event_owner); - static void remove_event(EventOwner& event_owner); -}; - class WinIcon: public Icon { public: WinIcon(HICON icon) : _icon (icon) {} diff --git a/client/x11/Makefile.am b/client/x11/Makefile.am index 06909a81..fffd36c8 100644 --- a/client/x11/Makefile.am +++ b/client/x11/Makefile.am @@ -72,6 +72,7 @@ RED_COMMON_SRCS = \ $(top_srcdir)/client/pixels_source.h \ $(top_srcdir)/client/platform.h \ $(top_srcdir)/client/playback_channel.cpp \ + $(top_srcdir)/client/process_loop.cpp \ $(top_srcdir)/client/quic.cpp \ $(top_srcdir)/client/record_channel.cpp \ $(top_srcdir)/client/red_channel.cpp \ @@ -110,6 +111,7 @@ spicec_SOURCES = \ atomic_count.h \ events_loop_p.cpp \ events_loop_p.h \ + event_sources_p.cpp \ main.cpp \ named_pipe.h \ named_pipe.cpp \ diff --git a/client/x11/event_sources_p.cpp b/client/x11/event_sources_p.cpp new file mode 100644 index 00000000..2e6af138 --- /dev/null +++ b/client/x11/event_sources_p.cpp @@ -0,0 +1,339 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <sys/epoll.h> +#include <sys/fcntl.h> + +#include "event_sources.h" +#include "debug.h" +#include "utils.h" + +#ifdef USING_EVENT_FD +#include <sys/eventfd.h> +#endif + +#define NUM_EPOLL_EVENTS 10 + +#ifdef USING_EVENT_FD +#define WRITE_FD _event_fd +#define EVENT_DATA_TYPE eventfd_t +#else +#define WRITE_FD _event_write_fd +#define EVENT_DATA_TYPE uint8_t +#endif + +class EventWrapper { +public: + EventWrapper(EventSources& owner, EventSource& event) + : _owner (owner) + , _event (&event) + , _refs (1) + { + } + + EventWrapper* ref() + { + _refs++; + return this; + } + + void unref() + { + if (!--_refs) { + _owner.remove_wrapper(this); + delete this; + } + } + + EventSource* get_event() + { + return _event; + } + + void invalidate() + { + _event = NULL; + } + +private: + EventSources& _owner; + EventSource* _event; + int _refs; +}; + +EventSources::EventSources() +{ + _epoll = epoll_create(NUM_EPOLL_EVENTS); + if (_epoll == -1) { + THROW("create epool failed"); + } +} + +EventSources::~EventSources() +{ + Events::iterator iter = _events.begin(); + for (; iter != _events.end(); iter++) { + delete *iter; + } + close(_epoll); +} + +bool EventSources::wait_events(int timeout_ms) +{ + struct epoll_event events[NUM_EPOLL_EVENTS]; + int num_events = epoll_wait(_epoll, events, NUM_EPOLL_EVENTS, timeout_ms); + + if (num_events == -1) { + if (errno == EINTR) { + return false; + } + THROW("wait error eventfd failed"); + } + + for (int i = 0; i < num_events; i++) { + ((EventWrapper*)events[i].data.ptr)->ref(); + } + + for (int i = 0; i < num_events; i++) { + EventWrapper* wrapper; + EventSource* event; + + wrapper = (EventWrapper *)events[i].data.ptr; + if ((event = wrapper->get_event())) { + event->action(); + } + wrapper->unref(); + } + return false; +} + +void EventSources::add_trigger(Trigger& trigger) +{ + int fd = trigger.get_fd(); + EventWrapper* wrapper = new EventWrapper(*this, trigger); + struct epoll_event event; + event.data.ptr = wrapper; + event.events = EPOLLIN; + if (epoll_ctl(_epoll, EPOLL_CTL_ADD, fd, &event) == -1) { + THROW("epoll add failed"); + } + _events.push_back(wrapper); +} + +void EventSources_p::remove_wrapper(EventWrapper* wrapper) +{ + Events::iterator iter = _events.begin(); + for (;; iter++) { + if (iter == _events.end()) { + THROW("wrapper not found"); + } + if ((*iter) == wrapper) { + _events.erase(iter); + return; + } + } +} + +void EventSources::remove_trigger(Trigger& trigger) +{ + Events::iterator iter = _events.begin(); + for (;; iter++) { + if (iter == _events.end()) { + THROW("trigger not found"); + } + if ((*iter)->get_event() == &trigger) { + (*iter)->invalidate(); + (*iter)->unref(); + break; + } + } + int fd = trigger.get_fd(); + if (epoll_ctl(_epoll, EPOLL_CTL_DEL, fd, NULL) == -1) { + THROW("epoll remove failed"); + } +} + +EventSources::Trigger::Trigger() +{ +#ifdef USING_EVENT_FD + _event_fd = eventfd(0, 0); + if (_event_fd == -1) { + THROW("create eventfd failed"); + } +#else + int fd[2]; + if (pipe(fd) == -1) { + THROW("create pipe failed"); + } + _event_fd = fd[0]; + _event_write_fd = fd[1]; +#endif + int flags; + if ((flags = fcntl(_event_fd, F_GETFL)) == -1) { + THROW("failed to set eventfd non block: %s", strerror(errno)); + } + + if (fcntl(_event_fd, F_SETFL, flags | O_NONBLOCK) == -1) { + THROW("failed to set eventfd non block: %s", strerror(errno)); + } +} + +EventSources::Trigger::~Trigger() +{ + close(_event_fd); +#ifndef USING_EVENT_FD + close(_event_write_fd); +#endif +} + +void EventSources::Trigger::trigger() +{ + Lock lock(_lock); + if (_pending_int) { + return; + } + _pending_int = true; + static const EVENT_DATA_TYPE val = 1; + if (::write(WRITE_FD, &val, sizeof(val)) != sizeof(val)) { + THROW("write event failed"); + } +} + +bool Trigger_p::reset_event() +{ + Lock lock(_lock); + if (!_pending_int) { + return false; + } + EVENT_DATA_TYPE val; + if (read(_event_fd, &val, sizeof(val)) != sizeof(val)) { + THROW("event read error"); + } + _pending_int = false; + return true; +} + +void EventSources::Trigger::reset() +{ + reset_event(); +} + +void EventSources::Trigger::action() +{ + if (reset_event()) { + on_event(); + } +} + +static void set_non_blocking(int fd) +{ + int flags; + if ((flags = fcntl(fd, F_GETFL)) == -1) { + THROW("failed to set socket non block: %s", strerror(errno)); + } + + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { + THROW("failed to set socket non block: %s", strerror(errno)); + } +} + +static void set_blocking(int fd) +{ + int flags; + if ((flags = fcntl(fd, F_GETFL)) == -1) { + THROW("failed to clear socket non block: %s", strerror(errno)); + } + + if (fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == -1) { + THROW("failed to clear socket non block: %s", strerror(errno)); + } +} + +static void add_to_poll(int fd, int epoll, EventWrapper* wrapper) +{ + struct epoll_event event; + event.data.ptr = wrapper; + event.events = EPOLLIN | EPOLLOUT | EPOLLET; + if (epoll_ctl(epoll, EPOLL_CTL_ADD, fd, &event) == -1) { + THROW("epoll add failed"); + } +} + +void EventSources::add_socket(Socket& socket) +{ + int fd = socket.get_socket(); + set_non_blocking(fd); + EventWrapper* wrapper = new EventWrapper(*this, socket); + add_to_poll(fd, _epoll, wrapper); + _events.push_back(wrapper); +} + +static bool remove_event(EventSources_p::Events& events, EventSource& event) +{ + EventSources_p::Events::iterator iter = events.begin(); + for (;; iter++) { + if (iter == events.end()) { + return false; + } + if ((*iter)->get_event() == &event) { + (*iter)->invalidate(); + (*iter)->unref(); + return true; + } + } +} + +void EventSources::remove_socket(Socket& socket) +{ + if (!remove_event(_events, socket)) { + THROW("socket not found"); + } + int fd = socket.get_socket(); + if (epoll_ctl(_epoll, EPOLL_CTL_DEL, fd, NULL) == -1) { + THROW("epoll remove failed"); + } + set_blocking(fd); +} + +void EventSources::add_file(File& file) +{ + int fd = file.get_fd(); + set_non_blocking(fd); + EventWrapper* wrapper = new EventWrapper(*this, file); + add_to_poll(fd, _epoll, wrapper); + _events.push_back(wrapper); +} + +void EventSources::remove_file(File& file) +{ + if (!remove_event(_events, file)) { + THROW("file not found"); + } + int fd = file.get_fd(); + if (epoll_ctl(_epoll, EPOLL_CTL_DEL, fd, NULL) == -1) { + THROW("epoll remove failed"); + } + set_blocking(fd); +} + +void EventSources::add_handle(Handle& file) +{ +} + +void EventSources::remove_handle(Handle& file) +{ +} diff --git a/client/x11/event_sources_p.h b/client/x11/event_sources_p.h new file mode 100644 index 00000000..591a781f --- /dev/null +++ b/client/x11/event_sources_p.h @@ -0,0 +1,63 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _H_EVENT_SOURCES_P +#define _H_EVENT_SOURCES_P + +#include "common.h" +#include "threads.h" + +#if __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 8) +#define USING_EVENT_FD +#endif + +#define INFINITE -1 + +class EventWrapper; + +class EventSources_p { +public: + void remove_wrapper(EventWrapper*); + +public: + int _epoll; + typedef std::list<EventWrapper*> Events; + Events _events; + + friend class EventWrapper; +}; + +class Trigger_p { +public: + Trigger_p() : _pending_int (false) {} + int get_fd() { return _event_fd;} + bool reset_event(); + +public: + int _event_fd; +#ifndef USING_EVENT_FD + int _event_write_fd; +#endif + bool _pending_int; + Mutex _lock; +}; + +class Handle_p { +}; + +#endif + diff --git a/client/x11/events_loop_p.cpp b/client/x11/events_loop_p.cpp index 4db1469c..a1c450b5 100644 --- a/client/x11/events_loop_p.cpp +++ b/client/x11/events_loop_p.cpp @@ -38,7 +38,7 @@ class EventWrapper { public: - EventWrapper(EventsLoop& owner, EventSource& event) + EventWrapper(EventsLoop& owner, EventSourceOld& event) : _owner (owner) , _event (&event) , _refs (1) @@ -59,7 +59,7 @@ public: } } - EventSource* get_event() + EventSourceOld* get_event() { return _event; } @@ -71,7 +71,7 @@ public: private: EventsLoop& _owner; - EventSource* _event; + EventSourceOld* _event; int _refs; }; @@ -116,7 +116,7 @@ void EventsLoop::run_once(int timeout_milli) } for (int i = 0; i < num_events; i++) { EventWrapper* wrapper; - EventSource* event; + EventSourceOld* event; wrapper = (EventWrapper *)events[i].data.ptr; if ((event = wrapper->get_event())) { @@ -218,7 +218,7 @@ void EventsLoop::Trigger::trigger() } } -bool Trigger_p::reset_event() +bool EventsLoop_p::Trigger_p::reset_event() { Lock lock(_lock); if (!_pending_int) { @@ -287,7 +287,7 @@ void EventsLoop::add_socket(Socket& socket) _events.push_back(wrapper); } -static bool remove_event(EventsLoop_p::Events& events, EventSource& event) +static bool remove_event(EventsLoop_p::Events& events, EventSourceOld& event) { EventsLoop_p::Events::iterator iter = events.begin(); for (;; iter++) { diff --git a/client/x11/events_loop_p.h b/client/x11/events_loop_p.h index 0a70d3ed..d339ca4b 100644 --- a/client/x11/events_loop_p.h +++ b/client/x11/events_loop_p.h @@ -31,6 +31,7 @@ class EventWrapper; class EventsLoop_p { public: + class Trigger_p; void remove_wrapper(EventWrapper*); public: @@ -41,7 +42,7 @@ public: friend class EventWrapper; }; -class Trigger_p { +class EventsLoop_p::Trigger_p { public: Trigger_p() : _pending_int (false) {} int get_fd() { return _event_fd;} diff --git a/client/x11/named_pipe.cpp b/client/x11/named_pipe.cpp index 73d103c0..18c390e3 100644 --- a/client/x11/named_pipe.cpp +++ b/client/x11/named_pipe.cpp @@ -23,7 +23,7 @@ #include "utils.h" #include "debug.h" -Session::Session(int fd, EventsLoop& events_loop) +Session::Session(int fd, ProcessLoop& events_loop) : _fd_client(fd) , _events_loop(events_loop) { @@ -121,7 +121,7 @@ int LinuxListener::create_socket(const char *socket_name) } LinuxListener::LinuxListener(const char *name, NamedPipe::ListenerInterface &listener_interface, - EventsLoop &events_loop) + ProcessLoop& events_loop) : _listener_interface (listener_interface) , _events_loop (events_loop) { diff --git a/client/x11/named_pipe.h b/client/x11/named_pipe.h index ab41f7f3..b32ede88 100644 --- a/client/x11/named_pipe.h +++ b/client/x11/named_pipe.h @@ -20,11 +20,11 @@ #include "platform.h" #include "x_platform.h" -#include "events_loop.h" +#include "process_loop.h" -class Session: public EventsLoop::Socket { +class Session: public EventSources::Socket { public: - Session(int fd, EventsLoop& events_loop); + Session(int fd, ProcessLoop& events_loop); virtual ~Session(); void bind(NamedPipe::ConnectionInterface* conn_interface); @@ -37,13 +37,13 @@ public: private: NamedPipe::ConnectionInterface *_conn_interface; int _fd_client; - EventsLoop &_events_loop; + ProcessLoop &_events_loop; }; -class LinuxListener: public EventsLoop::Socket { +class LinuxListener: public EventSources::Socket { public: LinuxListener(const char *name, NamedPipe::ListenerInterface &listener_interface, - EventsLoop &events_loop); + ProcessLoop& events_loop); virtual ~LinuxListener(); void on_event(); virtual int get_socket() {return _listen_socket;} @@ -55,7 +55,7 @@ private: NamedPipe::ListenerInterface &_listener_interface; int _listen_socket; std::string _name; - EventsLoop &_events_loop; + ProcessLoop &_events_loop; }; #endif diff --git a/client/x11/platform.cpp b/client/x11/platform.cpp index f0c68235..8f318f2d 100644 --- a/client/x11/platform.cpp +++ b/client/x11/platform.cpp @@ -50,7 +50,7 @@ #include "resource.h" #include "res.h" #include "cursor.h" -#include "events_loop.h" +#include "process_loop.h" #define DWORD uint32_t #define BOOL bool @@ -66,10 +66,8 @@ static Display* x_display = NULL; static XVisualInfo **vinfo = NULL; static GLXFBConfig **fb_config; -static EventsLoop events_loop; static XContext win_proc_context; -static bool quit_request = false; -static pthread_t main_thread; +static ProcessLoop* main_loop = NULL; static int focus_count = 0; static bool using_xrandr_1_0 = false; @@ -108,16 +106,19 @@ static Platform::DisplayModeListner* display_mode_listener = &default_display_mo NamedPipe::ListenerRef NamedPipe::create(const char *name, ListenerInterface& listener_interface) { - return (ListenerRef)(new LinuxListener(name, listener_interface, events_loop)); + ASSERT(main_loop && main_loop->is_same_thread(pthread_self())); + return (ListenerRef)(new LinuxListener(name, listener_interface, *main_loop)); } void NamedPipe::destroy(ListenerRef listener_ref) { + ASSERT(main_loop && main_loop->is_same_thread(pthread_self())); delete (LinuxListener *)listener_ref; } void NamedPipe::destroy_connection(ConnectionRef conn_ref) { + ASSERT(main_loop && main_loop->is_same_thread(pthread_self())); delete (Session *)conn_ref; } @@ -137,81 +138,45 @@ int32_t NamedPipe::write(ConnectionRef conn_ref, const uint8_t* buf, int32_t siz return -1; } -class XEventHandler: public EventsLoop::File { +class XEventHandler: public EventSources::File { public: - XEventHandler(Display& x_display); - virtual void on_event() {} + XEventHandler(Display& x_display, XContext& win_proc_context); + virtual void on_event(); virtual int get_fd() {return _x_fd;} private: + Display& _x_display; + XContext& _win_proc_context; int _x_fd; }; -XEventHandler::XEventHandler(Display& x_display) +XEventHandler::XEventHandler(Display& x_display, XContext& win_proc_context) + : _x_display (x_display) + , _win_proc_context (win_proc_context) { if ((_x_fd = ConnectionNumber(&x_display)) == -1) { THROW("get x fd failed"); } } -class WakeupEventHandler: public EventsLoop::Trigger { -public: - virtual void on_event() {} -}; - -static WakeupEventHandler wakeup_handler; - -#define NSEC_PER_SEC (1000 * 1000 * 1000) - -class Timer { -public: - Timer(timer_proc_t proc, void* opaque); - ~Timer(); - - void arm(uint32_t msec); - void disarm(); - bool action(const timespec& now); - - const timespec& get_experatoin() const { return _experatoin;} - - static int get_timout(); - static void timers_action(); - -private: - void calc_next_experatoin_time(); - - static bool timespec_less(const timespec& time, const timespec& from); - static bool timespec_equal(const timespec& time, const timespec& from); - static bool timespec_less_or_equal(const timespec& time, const timespec& from); - -public: - static RecurciveMutex timers_lock; +void XEventHandler::on_event() +{ + while (XPending(&_x_display)) { + XPointer proc_pointer; + XEvent event; -private: - timer_proc_t _proc; - void* _opaque; - bool _armed; - timespec _interval; - timespec _experatoin; - - class Compare { - public: - bool operator () (const Timer* timer1, const Timer* timer2) const - { - if (!Timer::timespec_less(timer1->get_experatoin(), timer2->get_experatoin())) { - return Timer::timespec_equal(timer1->get_experatoin(), timer2->get_experatoin()) ? - timer1 < timer2 : false; - } - return true; + XNextEvent(&_x_display, &event); + if (event.xany.window == None) { + LOG_WARN("invalid window"); + continue; } - }; - typedef std::set<Timer*, Compare> TimersSet; - static TimersSet armed_timers; -}; - -Timer::TimersSet Timer::armed_timers; -RecurciveMutex Timer::timers_lock; + if (XFindContext(&_x_display, event.xany.window, _win_proc_context, &proc_pointer)) { + THROW("no window proc"); + } + ((XPlatform::win_proc_t)proc_pointer)(event); + } +} Display* XPlatform::get_display() { @@ -242,44 +207,8 @@ void XPlatform::cleare_win_proc(Window win) void Platform::send_quit_request() { - quit_request = true; - wakeup(); -} - -void Platform::wait_events() -{ - ASSERT(pthread_self() == main_thread); - XFlush(x_display); - if (!XPending(x_display)) { - events_loop.run_once(Timer::get_timout()); - Timer::timers_action(); - } -} - -void Platform::wakeup() -{ - wakeup_handler.trigger(); -} - -bool Platform::process_events() -{ - ASSERT(pthread_self() == main_thread); - while (XPending(x_display)) { - XPointer proc_pointer; - XEvent event; - - XNextEvent(x_display, &event); - if (event.xany.window == None) { - LOG_WARN("invalid window"); - continue; - } - - if (XFindContext(x_display, event.xany.window, win_proc_context, &proc_pointer)) { - THROW("no window proc"); - } - ((XPlatform::win_proc_t)proc_pointer)(event); - } - return quit_request; + ASSERT(main_loop); + main_loop->quit(0); } uint64_t Platform::get_monolithic_time() @@ -341,155 +270,6 @@ void Platform::set_thread_priority(void* thread, Platform::ThreadPriority in_pri } } -Timer::Timer(timer_proc_t proc, void* opaque) - : _proc (proc) - , _opaque (opaque) - , _armed (false) -{ -} - -Timer::~Timer() -{ - disarm(); -} - -void Timer::arm(uint32_t msec) -{ - disarm(); - _interval.tv_sec = msec / 1000; - _interval.tv_nsec = (msec % 1000) * 1000 * 1000; - if (clock_gettime(CLOCK_MONOTONIC, &_experatoin)) { - THROW("gettime failed %s", strerror(errno)); - } - calc_next_experatoin_time(); - _armed = true; - armed_timers.insert(this); -} - -void Timer::disarm() -{ - if (!_armed) { - return; - } - armed_timers.erase(this); - _armed = false; -} - -#define TINER_COMPENSATION - -bool Timer::action(const timespec& now) -{ - ASSERT(_armed); - ASSERT(now.tv_nsec < NSEC_PER_SEC); - - if (timespec_less(now, _experatoin)) { - return false; - } - armed_timers.erase(this); -#ifndef TINER_COMPENSATION - _experatoin = now; -#endif - calc_next_experatoin_time(); -#ifdef TINER_COMPENSATION - if (timespec_less_or_equal(_experatoin, now)) { - _experatoin = now; - calc_next_experatoin_time(); - } -#endif - armed_timers.insert(this); - _proc(_opaque, (TimerID)this); - return true; -} - -int Timer::get_timout() -{ - RecurciveLock lock(Timer::timers_lock); - TimersSet::iterator iter; - iter = armed_timers.begin(); - if (iter == armed_timers.end()) { - return -1; - } - - timespec now; - if (clock_gettime(CLOCK_MONOTONIC, &now)) { - THROW("gettime failed %s", strerror(errno)); - } - - const timespec& next_time = (*iter)->get_experatoin(); - - if (!timespec_less(now, next_time)) { - return 0; - } - return ((next_time.tv_nsec - now.tv_nsec) / 1000 / 1000) + - (next_time.tv_sec - now.tv_sec) * 1000; -} - -void Timer::timers_action() -{ - RecurciveLock lock(timers_lock); - timespec now; - if (clock_gettime(CLOCK_MONOTONIC, &now)) { - THROW("gettime failed %s", strerror(errno)); - } - - TimersSet::iterator iter; - while ((iter = armed_timers.begin()) != armed_timers.end() && (*iter)->action(now)); -} - -void Timer::calc_next_experatoin_time() -{ - _experatoin.tv_nsec += _interval.tv_nsec; - _experatoin.tv_sec += (_experatoin.tv_nsec / NSEC_PER_SEC) + _interval.tv_sec; - _experatoin.tv_nsec %= NSEC_PER_SEC; -} - -bool Timer::timespec_less(const timespec& time, const timespec& from) -{ - return time.tv_sec < from.tv_sec || (time.tv_sec == from.tv_sec && time.tv_nsec < from.tv_nsec); -} - -bool Timer::timespec_equal(const timespec& time, const timespec& from) -{ - return time.tv_sec == from.tv_sec && time.tv_nsec <= from.tv_nsec; -} - -bool Timer::timespec_less_or_equal(const timespec& time, const timespec& from) -{ - return time.tv_sec < from.tv_sec || - (time.tv_sec == from.tv_sec && time.tv_nsec <= from.tv_nsec); -} - -TimerID Platform::create_interval_timer(timer_proc_t proc, void* opaque) -{ - return (TimerID) new Timer(proc, opaque); -} - -bool Platform::activate_interval_timer(TimerID timer, unsigned int millisec) -{ - RecurciveLock lock(Timer::timers_lock); - ((Timer*)timer)->arm(millisec); - if (pthread_self() != main_thread) { - wakeup(); - } - return true; -} - -bool Platform::deactivate_interval_timer(TimerID timer) -{ - RecurciveLock lock(Timer::timers_lock); - ((Timer*)timer)->disarm(); - return true; -} - -void Platform::destroy_interval_timer(TimerID timer) -{ - if (timer == INVALID_TIMER) { - return; - } - RecurciveLock lock(Timer::timers_lock); - delete (Timer*)timer; -} - void Platform::set_event_listener(EventListener* listener) { event_listener = listener ? listener : &default_event_listener; @@ -2278,13 +2058,11 @@ void Platform::init() { int err, ev; int threads_enable; - XEventHandler *x_event_handler; DBG(0, ""); threads_enable = XInitThreads(); - main_thread = pthread_self(); if (!(x_display = XOpenDisplay(NULL))) { THROW("open X display failed"); @@ -2341,10 +2119,6 @@ void Platform::init() init_xrandr(); init_xrender(); - x_event_handler = new XEventHandler(*x_display); - events_loop.add_file(*x_event_handler); - events_loop.add_trigger(wakeup_handler); - struct sigaction act; memset(&act, 0, sizeof(act)); sigfillset(&act.sa_mask); @@ -2368,6 +2142,14 @@ void Platform::init() atexit(cleanup); } +void Platform::set_process_loop(ProcessLoop& main_process_loop) +{ + main_loop = &main_process_loop; + XEventHandler *x_event_handler; + x_event_handler = new XEventHandler(*x_display, win_proc_context); + main_loop->add_file(*x_event_handler); +} + uint32_t Platform::get_keyboard_modifiers() { XKeyboardState keyboard_state; @@ -2641,3 +2423,37 @@ LocalCursor* Platform::create_default_cursor() return new XDefaultCursor(); } +class PlatformTimer: public Timer { +public: + PlatformTimer(timer_proc_t proc, void* opaque) : _proc(proc), _opaque(opaque) {} + void response(AbstractProcessLoop& events_loop) {_proc(_opaque, (TimerID)this);} + +private: + timer_proc_t _proc; + void* _opaque; +}; + +TimerID Platform::create_interval_timer(timer_proc_t proc, void* opaque) +{ + return (TimerID)(new PlatformTimer(proc, opaque)); +} + +bool Platform::activate_interval_timer(TimerID timer, unsigned int millisec) +{ + ASSERT(main_loop); + main_loop->activate_interval_timer((PlatformTimer*)timer, millisec); + return true; +} + +bool Platform::deactivate_interval_timer(TimerID timer) +{ + ASSERT(main_loop); + main_loop->deactivate_interval_timer((PlatformTimer*)timer); + return true; +} + +void Platform::destroy_interval_timer(TimerID timer) +{ + deactivate_interval_timer(timer); + ((PlatformTimer*)timer)->unref(); +} |