From 3d9e3178c4d2e18425ba6df23f27ee7b1da07453 Mon Sep 17 00:00:00 2001 From: "Daniel P. Berrange" Date: Fri, 22 Nov 2013 14:36:37 +0000 Subject: Move python example programs into python/examples/ subdirectory Signed-off-by: Daniel P. Berrange --- examples/event-test.py | 591 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 591 insertions(+) create mode 100644 examples/event-test.py (limited to 'examples/event-test.py') diff --git a/examples/event-test.py b/examples/event-test.py new file mode 100644 index 0000000..84f5259 --- /dev/null +++ b/examples/event-test.py @@ -0,0 +1,591 @@ +#!/usr/bin/python -u +# +# +# +################################################################################# +# Start off by implementing a general purpose event loop for anyones use +################################################################################# + +import sys +import getopt +import os +import libvirt +import select +import errno +import time +import threading + +# For the sake of demonstration, this example program includes +# an implementation of a pure python event loop. Most applications +# would be better off just using the default libvirt event loop +# APIs, instead of implementing this in python. The exception is +# where an application wants to integrate with an existing 3rd +# party event loop impl +# +# Change this to 'False' to make the demo use the native +# libvirt event loop impl +use_pure_python_event_loop = True + +do_debug = False +def debug(msg): + global do_debug + if do_debug: + print msg + +# +# This general purpose event loop will support waiting for file handle +# I/O and errors events, as well as scheduling repeatable timers with +# a fixed interval. +# +# It is a pure python implementation based around the poll() API +# +class virEventLoopPure: + # This class contains the data we need to track for a + # single file handle + class virEventLoopPureHandle: + def __init__(self, handle, fd, events, cb, opaque): + self.handle = handle + self.fd = fd + self.events = events + self.cb = cb + self.opaque = opaque + + def get_id(self): + return self.handle + + def get_fd(self): + return self.fd + + def get_events(self): + return self.events + + def set_events(self, events): + self.events = events + + def dispatch(self, events): + self.cb(self.handle, + self.fd, + events, + self.opaque) + + # This class contains the data we need to track for a + # single periodic timer + class virEventLoopPureTimer: + def __init__(self, timer, interval, cb, opaque): + self.timer = timer + self.interval = interval + self.cb = cb + self.opaque = opaque + self.lastfired = 0 + + def get_id(self): + return self.timer + + def get_interval(self): + return self.interval + + def set_interval(self, interval): + self.interval = interval + + def get_last_fired(self): + return self.lastfired + + def set_last_fired(self, now): + self.lastfired = now + + def dispatch(self): + self.cb(self.timer, + self.opaque) + + + def __init__(self): + self.poll = select.poll() + self.pipetrick = os.pipe() + self.pendingWakeup = False + self.runningPoll = False + self.nextHandleID = 1 + self.nextTimerID = 1 + self.handles = [] + self.timers = [] + self.quit = False + + # The event loop can be used from multiple threads at once. + # Specifically while the main thread is sleeping in poll() + # waiting for events to occur, another thread may come along + # and add/update/remove a file handle, or timer. When this + # happens we need to interrupt the poll() sleep in the other + # thread, so that it'll see the file handle / timer changes. + # + # Using OS level signals for this is very unreliable and + # hard to implement correctly. Thus we use the real classic + # "self pipe" trick. A anonymous pipe, with one end registered + # with the event loop for input events. When we need to force + # the main thread out of a poll() sleep, we simple write a + # single byte of data to the other end of the pipe. + debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1])) + self.poll.register(self.pipetrick[0], select.POLLIN) + + + # Calculate when the next timeout is due to occur, returning + # the absolute timestamp for the next timeout, or 0 if there is + # no timeout due + def next_timeout(self): + next = 0 + for t in self.timers: + last = t.get_last_fired() + interval = t.get_interval() + if interval < 0: + continue + if next == 0 or (last + interval) < next: + next = last + interval + + return next + + # Lookup a virEventLoopPureHandle object based on file descriptor + def get_handle_by_fd(self, fd): + for h in self.handles: + if h.get_fd() == fd: + return h + return None + + # Lookup a virEventLoopPureHandle object based on its event loop ID + def get_handle_by_id(self, handleID): + for h in self.handles: + if h.get_id() == handleID: + return h + return None + + + # This is the heart of the event loop, performing one single + # iteration. It asks when the next timeout is due, and then + # calcuates the maximum amount of time it is able to sleep + # for in poll() pending file handle events. + # + # It then goes into the poll() sleep. + # + # When poll() returns, there will zero or more file handle + # events which need to be dispatched to registered callbacks + # It may also be time to fire some periodic timers. + # + # Due to the coarse granularity of schedular timeslices, if + # we ask for a sleep of 500ms in order to satisfy a timer, we + # may return up to 1 schedular timeslice early. So even though + # our sleep timeout was reached, the registered timer may not + # technically be at its expiry point. This leads to us going + # back around the loop with a crazy 5ms sleep. So when checking + # if timeouts are due, we allow a margin of 20ms, to avoid + # these pointless repeated tiny sleeps. + def run_once(self): + sleep = -1 + self.runningPoll = True + try: + next = self.next_timeout() + debug("Next timeout due at %d" % next) + if next > 0: + now = int(time.time() * 1000) + if now >= next: + sleep = 0 + else: + sleep = (next - now) / 1000.0 + + debug("Poll with a sleep of %d" % sleep) + events = self.poll.poll(sleep) + + # Dispatch any file handle events that occurred + for (fd, revents) in events: + # See if the events was from the self-pipe + # telling us to wakup. if so, then discard + # the data just continue + if fd == self.pipetrick[0]: + self.pendingWakeup = False + data = os.read(fd, 1) + continue + + h = self.get_handle_by_fd(fd) + if h: + debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents)) + h.dispatch(self.events_from_poll(revents)) + + now = int(time.time() * 1000) + for t in self.timers: + interval = t.get_interval() + if interval < 0: + continue + + want = t.get_last_fired() + interval + # Deduct 20ms, since scheduler timeslice + # means we could be ever so slightly early + if now >= (want-20): + debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want))) + t.set_last_fired(now) + t.dispatch() + + except (os.error, select.error), e: + if e.args[0] != errno.EINTR: + raise + finally: + self.runningPoll = False + + + # Actually the event loop forever + def run_loop(self): + self.quit = False + while not self.quit: + self.run_once() + + def interrupt(self): + if self.runningPoll and not self.pendingWakeup: + self.pendingWakeup = True + os.write(self.pipetrick[1], 'c') + + + # Registers a new file handle 'fd', monitoring for 'events' (libvirt + # event constants), firing the callback cb() when an event occurs. + # Returns a unique integer identier for this handle, that should be + # used to later update/remove it + def add_handle(self, fd, events, cb, opaque): + handleID = self.nextHandleID + 1 + self.nextHandleID = self.nextHandleID + 1 + + h = self.virEventLoopPureHandle(handleID, fd, events, cb, opaque) + self.handles.append(h) + + self.poll.register(fd, self.events_to_poll(events)) + self.interrupt() + + debug("Add handle %d fd %d events %d" % (handleID, fd, events)) + + return handleID + + # Registers a new timer with periodic expiry at 'interval' ms, + # firing cb() each time the timer expires. If 'interval' is -1, + # then the timer is registered, but not enabled + # Returns a unique integer identier for this handle, that should be + # used to later update/remove it + def add_timer(self, interval, cb, opaque): + timerID = self.nextTimerID + 1 + self.nextTimerID = self.nextTimerID + 1 + + h = self.virEventLoopPureTimer(timerID, interval, cb, opaque) + self.timers.append(h) + self.interrupt() + + debug("Add timer %d interval %d" % (timerID, interval)) + + return timerID + + # Change the set of events to be monitored on the file handle + def update_handle(self, handleID, events): + h = self.get_handle_by_id(handleID) + if h: + h.set_events(events) + self.poll.unregister(h.get_fd()) + self.poll.register(h.get_fd(), self.events_to_poll(events)) + self.interrupt() + + debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events)) + + # Change the periodic frequency of the timer + def update_timer(self, timerID, interval): + for h in self.timers: + if h.get_id() == timerID: + h.set_interval(interval) + self.interrupt() + + debug("Update timer %d interval %d" % (timerID, interval)) + break + + # Stop monitoring for events on the file handle + def remove_handle(self, handleID): + handles = [] + for h in self.handles: + if h.get_id() == handleID: + self.poll.unregister(h.get_fd()) + debug("Remove handle %d fd %d" % (handleID, h.get_fd())) + else: + handles.append(h) + self.handles = handles + self.interrupt() + + # Stop firing the periodic timer + def remove_timer(self, timerID): + timers = [] + for h in self.timers: + if h.get_id() != timerID: + timers.append(h) + debug("Remove timer %d" % timerID) + self.timers = timers + self.interrupt() + + # Convert from libvirt event constants, to poll() events constants + def events_to_poll(self, events): + ret = 0 + if events & libvirt.VIR_EVENT_HANDLE_READABLE: + ret |= select.POLLIN + if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: + ret |= select.POLLOUT + if events & libvirt.VIR_EVENT_HANDLE_ERROR: + ret |= select.POLLERR + if events & libvirt.VIR_EVENT_HANDLE_HANGUP: + ret |= select.POLLHUP + return ret + + # Convert from poll() event constants, to libvirt events constants + def events_from_poll(self, events): + ret = 0 + if events & select.POLLIN: + ret |= libvirt.VIR_EVENT_HANDLE_READABLE + if events & select.POLLOUT: + ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE + if events & select.POLLNVAL: + ret |= libvirt.VIR_EVENT_HANDLE_ERROR + if events & select.POLLERR: + ret |= libvirt.VIR_EVENT_HANDLE_ERROR + if events & select.POLLHUP: + ret |= libvirt.VIR_EVENT_HANDLE_HANGUP + return ret + + +########################################################################### +# Now glue an instance of the general event loop into libvirt's event loop +########################################################################### + +# This single global instance of the event loop wil be used for +# monitoring libvirt events +eventLoop = virEventLoopPure() + +# This keeps track of what thread is running the event loop, +# (if it is run in a background thread) +eventLoopThread = None + + +# These next set of 6 methods are the glue between the official +# libvirt events API, and our particular impl of the event loop +# +# There is no reason why the 'virEventLoopPure' has to be used. +# An application could easily may these 6 glue methods hook into +# another event loop such as GLib's, or something like the python +# Twisted event framework. + +def virEventAddHandleImpl(fd, events, cb, opaque): + global eventLoop + return eventLoop.add_handle(fd, events, cb, opaque) + +def virEventUpdateHandleImpl(handleID, events): + global eventLoop + return eventLoop.update_handle(handleID, events) + +def virEventRemoveHandleImpl(handleID): + global eventLoop + return eventLoop.remove_handle(handleID) + +def virEventAddTimerImpl(interval, cb, opaque): + global eventLoop + return eventLoop.add_timer(interval, cb, opaque) + +def virEventUpdateTimerImpl(timerID, interval): + global eventLoop + return eventLoop.update_timer(timerID, interval) + +def virEventRemoveTimerImpl(timerID): + global eventLoop + return eventLoop.remove_timer(timerID) + +# This tells libvirt what event loop implementation it +# should use +def virEventLoopPureRegister(): + libvirt.virEventRegisterImpl(virEventAddHandleImpl, + virEventUpdateHandleImpl, + virEventRemoveHandleImpl, + virEventAddTimerImpl, + virEventUpdateTimerImpl, + virEventRemoveTimerImpl) + +# Directly run the event loop in the current thread +def virEventLoopPureRun(): + global eventLoop + eventLoop.run_loop() + +def virEventLoopNativeRun(): + while True: + libvirt.virEventRunDefaultImpl() + +# Spawn a background thread to run the event loop +def virEventLoopPureStart(): + global eventLoopThread + virEventLoopPureRegister() + eventLoopThread = threading.Thread(target=virEventLoopPureRun, name="libvirtEventLoop") + eventLoopThread.setDaemon(True) + eventLoopThread.start() + +def virEventLoopNativeStart(): + global eventLoopThread + libvirt.virEventRegisterDefaultImpl() + eventLoopThread = threading.Thread(target=virEventLoopNativeRun, name="libvirtEventLoop") + eventLoopThread.setDaemon(True) + eventLoopThread.start() + + +########################################################################## +# Everything that now follows is a simple demo of domain lifecycle events +########################################################################## +def eventToString(event): + eventStrings = ( "Defined", + "Undefined", + "Started", + "Suspended", + "Resumed", + "Stopped", + "Shutdown", + "PMSuspended", + "Crashed" ) + return eventStrings[event] + +def detailToString(event, detail): + eventStrings = ( + ( "Added", "Updated" ), + ( "Removed", ), + ( "Booted", "Migrated", "Restored", "Snapshot", "Wakeup" ), + ( "Paused", "Migrated", "IOError", "Watchdog", "Restored", "Snapshot", "API error" ), + ( "Unpaused", "Migrated", "Snapshot" ), + ( "Shutdown", "Destroyed", "Crashed", "Migrated", "Saved", "Failed", "Snapshot"), + ( "Finished", ), + ( "Memory", "Disk" ), + ( "Panicked", ) + ) + return eventStrings[event][detail] + +def myDomainEventCallback1 (conn, dom, event, detail, opaque): + print "myDomainEventCallback1 EVENT: Domain %s(%s) %s %s" % (dom.name(), dom.ID(), + eventToString(event), + detailToString(event, detail)) + +def myDomainEventCallback2 (conn, dom, event, detail, opaque): + print "myDomainEventCallback2 EVENT: Domain %s(%s) %s %s" % (dom.name(), dom.ID(), + eventToString(event), + detailToString(event, detail)) + +def myDomainEventRebootCallback(conn, dom, opaque): + print "myDomainEventRebootCallback: Domain %s(%s)" % (dom.name(), dom.ID()) + +def myDomainEventRTCChangeCallback(conn, dom, utcoffset, opaque): + print "myDomainEventRTCChangeCallback: Domain %s(%s) %d" % (dom.name(), dom.ID(), utcoffset) + +def myDomainEventWatchdogCallback(conn, dom, action, opaque): + print "myDomainEventWatchdogCallback: Domain %s(%s) %d" % (dom.name(), dom.ID(), action) + +def myDomainEventIOErrorCallback(conn, dom, srcpath, devalias, action, opaque): + print "myDomainEventIOErrorCallback: Domain %s(%s) %s %s %d" % (dom.name(), dom.ID(), srcpath, devalias, action) + +def myDomainEventGraphicsCallback(conn, dom, phase, localAddr, remoteAddr, authScheme, subject, opaque): + print "myDomainEventGraphicsCallback: Domain %s(%s) %d %s" % (dom.name(), dom.ID(), phase, authScheme) + +def myDomainEventDiskChangeCallback(conn, dom, oldSrcPath, newSrcPath, devAlias, reason, opaque): + print "myDomainEventDiskChangeCallback: Domain %s(%s) disk change oldSrcPath: %s newSrcPath: %s devAlias: %s reason: %s" % ( + dom.name(), dom.ID(), oldSrcPath, newSrcPath, devAlias, reason) +def myDomainEventTrayChangeCallback(conn, dom, devAlias, reason, opaque): + print "myDomainEventTrayChangeCallback: Domain %s(%s) tray change devAlias: %s reason: %s" % ( + dom.name(), dom.ID(), devAlias, reason) +def myDomainEventPMWakeupCallback(conn, dom, reason, opaque): + print "myDomainEventPMWakeupCallback: Domain %s(%s) system pmwakeup" % ( + dom.name(), dom.ID()) +def myDomainEventPMSuspendCallback(conn, dom, reason, opaque): + print "myDomainEventPMSuspendCallback: Domain %s(%s) system pmsuspend" % ( + dom.name(), dom.ID()) +def myDomainEventBalloonChangeCallback(conn, dom, actual, opaque): + print "myDomainEventBalloonChangeCallback: Domain %s(%s) %d" % (dom.name(), dom.ID(), actual) +def myDomainEventPMSuspendDiskCallback(conn, dom, reason, opaque): + print "myDomainEventPMSuspendDiskCallback: Domain %s(%s) system pmsuspend_disk" % ( + dom.name(), dom.ID()) +def myDomainEventDeviceRemovedCallback(conn, dom, dev, opaque): + print "myDomainEventDeviceRemovedCallback: Domain %s(%s) device removed: %s" % ( + dom.name(), dom.ID(), dev) + +run = True + +def myConnectionCloseCallback(conn, reason, opaque): + reasonStrings = ( + "Error", "End-of-file", "Keepalive", "Client", + ) + print "myConnectionCloseCallback: %s: %s" % (conn.getURI(), reasonStrings[reason]) + run = False + +def usage(out=sys.stderr): + print >>out, "usage: "+os.path.basename(sys.argv[0])+" [-hdl] [uri]" + print >>out, " uri will default to qemu:///system" + print >>out, " --help, -h Print this help message" + print >>out, " --debug, -d Print debug output" + print >>out, " --loop, -l Toggle event-loop-implementation" + +def main(): + try: + opts, args = getopt.getopt(sys.argv[1:], "hdl", ["help", "debug", "loop"]) + except getopt.GetoptError, err: + # print help information and exit: + print str(err) # will print something like "option -a not recognized" + usage() + sys.exit(2) + for o, a in opts: + if o in ("-h", "--help"): + usage(sys.stdout) + sys.exit() + if o in ("-d", "--debug"): + global do_debug + do_debug = True + if o in ("-l", "--loop"): + global use_pure_python_event_loop + use_pure_python_event_loop ^= True + + if len(args) >= 1: + uri = args[0] + else: + uri = "qemu:///system" + + print "Using uri:" + uri + + # Run a background thread with the event loop + if use_pure_python_event_loop: + virEventLoopPureStart() + else: + virEventLoopNativeStart() + + vc = libvirt.openReadOnly(uri) + + # Close connection on exit (to test cleanup paths) + old_exitfunc = getattr(sys, 'exitfunc', None) + def exit(): + print "Closing " + str(vc) + vc.close() + if (old_exitfunc): old_exitfunc() + sys.exitfunc = exit + + vc.registerCloseCallback(myConnectionCloseCallback, None) + + #Add 2 callbacks to prove this works with more than just one + vc.domainEventRegister(myDomainEventCallback1,None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, myDomainEventCallback2, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_REBOOT, myDomainEventRebootCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_RTC_CHANGE, myDomainEventRTCChangeCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_IO_ERROR, myDomainEventIOErrorCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_WATCHDOG, myDomainEventWatchdogCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_GRAPHICS, myDomainEventGraphicsCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_DISK_CHANGE, myDomainEventDiskChangeCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_TRAY_CHANGE, myDomainEventTrayChangeCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_PMWAKEUP, myDomainEventPMWakeupCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_PMSUSPEND, myDomainEventPMSuspendCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_BALLOON_CHANGE, myDomainEventBalloonChangeCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_PMSUSPEND_DISK, myDomainEventPMSuspendDiskCallback, None) + vc.domainEventRegisterAny(None, libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED, myDomainEventDeviceRemovedCallback, None) + + vc.setKeepAlive(5, 3) + + # The rest of your app would go here normally, but for sake + # of demo we'll just go to sleep. The other option is to + # run the event loop in your main thread if your app is + # totally event based. + while run: + time.sleep(1) + + +if __name__ == "__main__": + main() -- cgit