From d63f7c1b47509f31c1e5aa8d27392d7f83e6dad2 Mon Sep 17 00:00:00 2001 From: "Daniel P. Berrange" Date: Tue, 12 Feb 2013 13:32:06 +0000 Subject: Add support for lifecycle events in the libvirt driver This wires up the libvirt compute driver to process async events from libvirtd and emit lifecycle events. This makes use of the native libvirt event loop to handle I/O processing which requires a native thread. The native thread uses a queue + self-pipe to forward events onto a green thread. The green thread reads events off the queue and uses the emit_event method to dispatch them to the compute manager. Blueprint: compute-driver-events Change-Id: Icd2cb7081adde10420ae55beebe60350afe21379 Signed-off-by: Daniel P. Berrange --- nova/tests/fakelibvirt.py | 38 +++++++++++ nova/tests/test_libvirt.py | 84 ++++++++++++++++++++++- nova/virt/libvirt/driver.py | 162 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 283 insertions(+), 1 deletion(-) diff --git a/nova/tests/fakelibvirt.py b/nova/tests/fakelibvirt.py index 6abe7771c..69a4e677e 100644 --- a/nova/tests/fakelibvirt.py +++ b/nova/tests/fakelibvirt.py @@ -70,6 +70,17 @@ VIR_DOMAIN_CRASHED = 6 VIR_DOMAIN_XML_SECURE = 1 +VIR_DOMAIN_EVENT_ID_LIFECYCLE = 0 + +VIR_DOMAIN_EVENT_DEFINED = 0 +VIR_DOMAIN_EVENT_UNDEFINED = 1 +VIR_DOMAIN_EVENT_STARTED = 2 +VIR_DOMAIN_EVENT_SUSPENDED = 3 +VIR_DOMAIN_EVENT_RESUMED = 4 +VIR_DOMAIN_EVENT_STOPPED = 5 +VIR_DOMAIN_EVENT_SHUTDOWN = 6 +VIR_DOMAIN_EVENT_PMSUSPENDED = 7 + VIR_DOMAIN_UNDEFINE_MANAGED_SAVE = 1 VIR_DOMAIN_AFFECT_CURRENT = 0 @@ -506,6 +517,7 @@ class Connection(object): self._running_vms = {} self._id_counter = 1 # libvirt reserves 0 for the hypervisor. self._nwfilters = {} + self._event_callbacks = {} self.fakeLibVersion = version self.fakeVersion = version @@ -517,6 +529,7 @@ class Connection(object): def _mark_running(self, dom): self._running_vms[self._id_counter] = dom + self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STARTED, 0) self._id_counter += 1 def _mark_not_running(self, dom): @@ -528,10 +541,13 @@ class Connection(object): for (k, v) in self._running_vms.iteritems(): if v == dom: del self._running_vms[k] + self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STOPPED, 0) return def _undefine(self, dom): del self._vms[dom.name()] + if not dom._transient: + self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_UNDEFINED, 0) def getInfo(self): return [node_arch, @@ -563,14 +579,25 @@ class Connection(object): 'name "%s"' % name, VIR_ERR_NO_DOMAIN, VIR_FROM_QEMU) + def _emit_lifecycle(self, dom, event, detail): + if VIR_DOMAIN_EVENT_ID_LIFECYCLE not in self._event_callbacks: + return + + cbinfo = self._event_callbacks[VIR_DOMAIN_EVENT_ID_LIFECYCLE] + callback = cbinfo[0] + opaque = cbinfo[1] + callback(self, dom, event, detail, opaque) + def defineXML(self, xml): dom = Domain(connection=self, running=False, transient=False, xml=xml) self._vms[dom.name()] = dom + self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_DEFINED, 0) return dom def createXML(self, xml, flags): dom = Domain(connection=self, running=True, transient=True, xml=xml) self._vms[dom.name()] = dom + self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STARTED, 0) return dom def getType(self): @@ -586,6 +613,9 @@ class Connection(object): def getHostname(self): return 'compute1' + def domainEventRegisterAny(self, dom, eventid, callback, opaque): + self._event_callbacks[eventid] = [callback, opaque] + def getCapabilities(self): return ''' @@ -875,6 +905,14 @@ def openAuth(uri, auth, flags): return Connection(uri, readonly=False) +def virEventRunDefaultImpl(): + time.sleep(1) + + +def virEventRegisterDefaultImpl(): + pass + + virDomain = Domain diff --git a/nova/tests/test_libvirt.py b/nova/tests/test_libvirt.py index 8a2130c80..039189a9e 100644 --- a/nova/tests/test_libvirt.py +++ b/nova/tests/test_libvirt.py @@ -52,6 +52,7 @@ from nova import utils from nova import version from nova.virt.disk import api as disk from nova.virt import driver +from nova.virt import event as virtevent from nova.virt import fake from nova.virt import firewall as base_firewall from nova.virt import images @@ -99,7 +100,8 @@ class FakeVirDomainSnapshot(object): class FakeVirtDomain(object): - def __init__(self, fake_xml=None): + def __init__(self, fake_xml=None, uuidstr=None): + self.uuidstr = uuidstr if fake_xml: self._fake_dom_xml = fake_xml else: @@ -131,6 +133,9 @@ class FakeVirtDomain(object): def XMLDesc(self, *args): return self._fake_dom_xml + def UUIDString(self): + return self.uuidstr + class CacheConcurrencyTestCase(test.TestCase): def setUp(self): @@ -3340,6 +3345,83 @@ class LibvirtConnTestCase(test.TestCase): got = conn.get_instance_capabilities() self.assertEqual(want, got) + def test_event_dispatch(self): + # Validate that the libvirt self-pipe for forwarding + # events between threads is working sanely + conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) + got_events = [] + + def handler(event): + got_events.append(event) + + conn.register_event_listener(handler) + + conn._init_events_pipe() + + event1 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_STARTED) + event2 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_PAUSED) + conn._queue_event(event1) + conn._queue_event(event2) + conn._dispatch_events() + + want_events = [event1, event2] + self.assertEqual(want_events, got_events) + + event3 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_RESUMED) + event4 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_STOPPED) + + conn._queue_event(event3) + conn._queue_event(event4) + conn._dispatch_events() + + want_events = [event1, event2, event3, event4] + self.assertEqual(want_events, got_events) + + def test_event_lifecycle(self): + # Validate that libvirt events are correctly translated + # to Nova events + conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) + got_events = [] + + def handler(event): + got_events.append(event) + + conn.register_event_listener(handler) + conn._init_events_pipe() + fake_dom_xml = """ + + cef19ce0-0ca2-11df-855d-b19fbce37686 + + + + + + + """ + dom = FakeVirtDomain(fake_dom_xml, + "cef19ce0-0ca2-11df-855d-b19fbce37686") + + conn._event_lifecycle_callback(conn._conn, + dom, + libvirt.VIR_DOMAIN_EVENT_STOPPED, + 0, + conn) + conn._dispatch_events() + self.assertEqual(len(got_events), 1) + self.assertEqual(type(got_events[0]), virtevent.LifecycleEvent) + self.assertEqual(got_events[0].uuid, + "cef19ce0-0ca2-11df-855d-b19fbce37686") + self.assertEqual(got_events[0].transition, + virtevent.EVENT_LIFECYCLE_STOPPED) + class HostStateTestCase(test.TestCase): diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 386fe836c..0567bf8c3 100755 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -40,18 +40,23 @@ Supports KVM, LXC, QEMU, UML, and XEN. """ import errno +import eventlet import functools import glob import hashlib import os import shutil +import socket import sys import tempfile import time import uuid +from eventlet import greenio from eventlet import greenthread +from eventlet import patcher from eventlet import tpool +from eventlet import util as eventlet_util from lxml import etree from xml.dom import minidom @@ -75,6 +80,7 @@ from nova import version from nova.virt import configdrive from nova.virt.disk import api as disk from nova.virt import driver +from nova.virt import event as virtevent from nova.virt import firewall from nova.virt.libvirt import blockinfo from nova.virt.libvirt import config as vconfig @@ -84,6 +90,9 @@ from nova.virt.libvirt import imagecache from nova.virt.libvirt import utils as libvirt_utils from nova.virt import netutils +native_threading = patcher.original("threading") +native_Queue = patcher.original("Queue") + libvirt = None LOG = logging.getLogger(__name__) @@ -296,6 +305,7 @@ class LibvirtDriver(driver.ComputeDriver): CONF.libvirt_volume_drivers, self) self._host_state = None + self._event_queue = None self._disk_cachemode = None self.image_cache_manager = imagecache.ImageCacheManager() @@ -346,7 +356,146 @@ class LibvirtDriver(driver.ComputeDriver): except Exception: return False + def _native_thread(self): + """Receives async events coming in from libvirtd. + + This is a native thread which runs the default + libvirt event loop implementation. This processes + any incoming async events from libvirtd and queues + them for later dispatch. This thread is only + permitted to use libvirt python APIs, and the + driver.queue_event method. In particular any use + of logging is forbidden, since it will confuse + eventlet's greenthread integration""" + + while True: + libvirt.virEventRunDefaultImpl() + + def _dispatch_thread(self): + """Dispatches async events coming in from libvirtd. + + This is a green thread which waits for events to + arrive from the libvirt event loop thread. This + then dispatches the events to the compute manager.""" + + while True: + self._dispatch_events() + + @staticmethod + def _event_lifecycle_callback(conn, dom, event, detail, opaque): + """Receives lifecycle events from libvirt. + + NB: this method is executing in a native thread, not + an eventlet coroutine. It can only invoke other libvirt + APIs, or use self.queue_event(). Any use of logging APIs + in particular is forbidden.""" + + self = opaque + + uuid = dom.UUIDString() + transition = None + if event == libvirt.VIR_DOMAIN_EVENT_STOPPED: + transition = virtevent.EVENT_LIFECYCLE_STOPPED + elif event == libvirt.VIR_DOMAIN_EVENT_STARTED: + transition = virtevent.EVENT_LIFECYCLE_STARTED + elif event == libvirt.VIR_DOMAIN_EVENT_SUSPENDED: + transition = virtevent.EVENT_LIFECYCLE_PAUSED + elif event == libvirt.VIR_DOMAIN_EVENT_RESUMED: + transition = virtevent.EVENT_LIFECYCLE_RESUMED + + if transition is not None: + self._queue_event(virtevent.LifecycleEvent(uuid, transition)) + + def _queue_event(self, event): + """Puts an event on the queue for dispatch. + + This method is called by the native event thread to + put events on the queue for later dispatch by the + green thread.""" + + if self._event_queue is None: + LOG.debug("Event loop thread is not active, " + "discarding event %s" % event) + return + + # Queue the event... + self._event_queue.put(event) + + # ...then wakeup the green thread to dispatch it + c = ' '.encode() + self._event_notify_send.write(c) + self._event_notify_send.flush() + + def _dispatch_events(self): + """Wait for & dispatch events from native thread + + Blocks until native thread indicates some events + are ready. Then dispatches all queued events.""" + + # Wait to be notified that there are some + # events pending + try: + _c = self._event_notify_recv.read(1) + assert _c + except ValueError: + return # will be raised when pipe is closed + + # Process as many events as possible without + # blocking + while not self._event_queue.empty(): + try: + event = self._event_queue.get(block=False) + self.emit_event(event) + except native_Queue.Empty: + pass + + def _init_events_pipe(self): + """Create a self-pipe for the native thread to synchronize on. + + This code is taken from the eventlet tpool module, under terms + of the Apache License v2.0.""" + + self._event_queue = native_Queue.Queue() + try: + rpipe, wpipe = os.pipe() + self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0) + self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0) + except (ImportError, NotImplementedError): + # This is Windows compatibility -- use a socket instead + # of a pipe because pipes don't really exist on Windows. + sock = eventlet_util.__original_socket__(socket.AF_INET, + socket.SOCK_STREAM) + sock.bind(('localhost', 0)) + sock.listen(50) + csock = eventlet_util.__original_socket__(socket.AF_INET, + socket.SOCK_STREAM) + csock.connect(('localhost', sock.getsockname()[1])) + nsock, addr = sock.accept() + self._event_notify_send = nsock.makefile('wb', 0) + gsock = greenio.GreenSocket(csock) + self._event_notify_recv = gsock.makefile('rb', 0) + + def _init_events(self): + """Initializes the libvirt events subsystem. + + This requires running a native thread to provide the + libvirt event loop integration. This forwards events + to a green thread which does the actual dispatching. + """ + + self._init_events_pipe() + + LOG.debug("Starting native event thread") + event_thread = native_threading.Thread(target=self._native_thread) + event_thread.setDaemon(True) + event_thread.start() + + LOG.debug("Starting green dispatch thread") + dispatch_thread = eventlet.spawn(self._dispatch_thread) + def init_host(self, host): + libvirt.virEventRegisterDefaultImpl() + if not self.has_min_version(MIN_LIBVIRT_VERSION): major = MIN_LIBVIRT_VERSION[0] minor = MIN_LIBVIRT_VERSION[1] @@ -355,6 +504,8 @@ class LibvirtDriver(driver.ComputeDriver): '%(major)i.%(minor)i.%(micro)i or greater.') % locals()) + self._init_events() + def _get_connection(self): if not self._wrapped_conn or not self._test_connection(): LOG.debug(_('Connecting to libvirt: %s'), self.uri) @@ -366,6 +517,17 @@ class LibvirtDriver(driver.ComputeDriver): (libvirt.virDomain, libvirt.virConnect), self._connect, self.uri, self.read_only) + try: + LOG.debug("Registering for lifecycle events %s" % str(self)) + self._wrapped_conn.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, + self._event_lifecycle_callback, + self) + except Exception, e: + LOG.warn(_("URI %s does not support events"), + self.uri) + return self._wrapped_conn _conn = property(_get_connection) -- cgit