diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-02-20 08:42:04 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-02-20 08:42:04 +0000 |
| commit | ad325523968a89db74d649553ccd8bb53908899a (patch) | |
| tree | 9f47778ae2c5845b0408e6931385e29d6d4178ea | |
| parent | 22a1cb330c63b82bf9bd4a1bb9649ed0143de961 (diff) | |
| parent | d63f7c1b47509f31c1e5aa8d27392d7f83e6dad2 (diff) | |
Merge "Add support for lifecycle events in the libvirt driver"
| -rw-r--r-- | nova/tests/fakelibvirt.py | 38 | ||||
| -rw-r--r-- | nova/tests/test_libvirt.py | 84 | ||||
| -rwxr-xr-x | nova/virt/libvirt/driver.py | 162 |
3 files changed, 283 insertions, 1 deletions
diff --git a/nova/tests/fakelibvirt.py b/nova/tests/fakelibvirt.py index 6abe7771c..69a4e677e 100644 --- a/nova/tests/fakelibvirt.py +++ b/nova/tests/fakelibvirt.py @@ -70,6 +70,17 @@ VIR_DOMAIN_CRASHED = 6 VIR_DOMAIN_XML_SECURE = 1 +VIR_DOMAIN_EVENT_ID_LIFECYCLE = 0 + +VIR_DOMAIN_EVENT_DEFINED = 0 +VIR_DOMAIN_EVENT_UNDEFINED = 1 +VIR_DOMAIN_EVENT_STARTED = 2 +VIR_DOMAIN_EVENT_SUSPENDED = 3 +VIR_DOMAIN_EVENT_RESUMED = 4 +VIR_DOMAIN_EVENT_STOPPED = 5 +VIR_DOMAIN_EVENT_SHUTDOWN = 6 +VIR_DOMAIN_EVENT_PMSUSPENDED = 7 + VIR_DOMAIN_UNDEFINE_MANAGED_SAVE = 1 VIR_DOMAIN_AFFECT_CURRENT = 0 @@ -506,6 +517,7 @@ class Connection(object): self._running_vms = {} self._id_counter = 1 # libvirt reserves 0 for the hypervisor. self._nwfilters = {} + self._event_callbacks = {} self.fakeLibVersion = version self.fakeVersion = version @@ -517,6 +529,7 @@ class Connection(object): def _mark_running(self, dom): self._running_vms[self._id_counter] = dom + self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STARTED, 0) self._id_counter += 1 def _mark_not_running(self, dom): @@ -528,10 +541,13 @@ class Connection(object): for (k, v) in self._running_vms.iteritems(): if v == dom: del self._running_vms[k] + self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STOPPED, 0) return def _undefine(self, dom): del self._vms[dom.name()] + if not dom._transient: + self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_UNDEFINED, 0) def getInfo(self): return [node_arch, @@ -563,14 +579,25 @@ class Connection(object): 'name "%s"' % name, VIR_ERR_NO_DOMAIN, VIR_FROM_QEMU) + def _emit_lifecycle(self, dom, event, detail): + if VIR_DOMAIN_EVENT_ID_LIFECYCLE not in self._event_callbacks: + return + + cbinfo = self._event_callbacks[VIR_DOMAIN_EVENT_ID_LIFECYCLE] + callback = cbinfo[0] + opaque = cbinfo[1] + callback(self, dom, event, detail, opaque) + def defineXML(self, xml): dom = Domain(connection=self, running=False, transient=False, xml=xml) self._vms[dom.name()] = dom + self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_DEFINED, 0) return dom def createXML(self, xml, flags): dom = Domain(connection=self, running=True, transient=True, xml=xml) self._vms[dom.name()] = dom + self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STARTED, 0) return dom def getType(self): @@ -586,6 +613,9 @@ class Connection(object): def getHostname(self): return 'compute1' + def domainEventRegisterAny(self, dom, eventid, callback, opaque): + self._event_callbacks[eventid] = [callback, opaque] + def getCapabilities(self): return '''<capabilities> <host> @@ -875,6 +905,14 @@ def openAuth(uri, auth, flags): return Connection(uri, readonly=False) +def virEventRunDefaultImpl(): + time.sleep(1) + + +def virEventRegisterDefaultImpl(): + pass + + virDomain = Domain diff --git a/nova/tests/test_libvirt.py b/nova/tests/test_libvirt.py index 8ac74aabd..abb2b0c20 100644 --- a/nova/tests/test_libvirt.py +++ b/nova/tests/test_libvirt.py @@ -53,6 +53,7 @@ from nova import utils from nova import version from nova.virt.disk import api as disk from nova.virt import driver +from nova.virt import event as virtevent from nova.virt import fake from nova.virt import firewall as base_firewall from nova.virt import images @@ -100,7 +101,8 @@ class FakeVirDomainSnapshot(object): class FakeVirtDomain(object): - def __init__(self, fake_xml=None): + def __init__(self, fake_xml=None, uuidstr=None): + self.uuidstr = uuidstr if fake_xml: self._fake_dom_xml = fake_xml else: @@ -132,6 +134,9 @@ class FakeVirtDomain(object): def XMLDesc(self, *args): return self._fake_dom_xml + def UUIDString(self): + return self.uuidstr + class CacheConcurrencyTestCase(test.TestCase): def setUp(self): @@ -3350,6 +3355,83 @@ class LibvirtConnTestCase(test.TestCase): got = conn.get_instance_capabilities() self.assertEqual(want, got) + def test_event_dispatch(self): + # Validate that the libvirt self-pipe for forwarding + # events between threads is working sanely + conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) + got_events = [] + + def handler(event): + got_events.append(event) + + conn.register_event_listener(handler) + + conn._init_events_pipe() + + event1 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_STARTED) + event2 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_PAUSED) + conn._queue_event(event1) + conn._queue_event(event2) + conn._dispatch_events() + + want_events = [event1, event2] + self.assertEqual(want_events, got_events) + + event3 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_RESUMED) + event4 = virtevent.LifecycleEvent( + "cef19ce0-0ca2-11df-855d-b19fbce37686", + virtevent.EVENT_LIFECYCLE_STOPPED) + + conn._queue_event(event3) + conn._queue_event(event4) + conn._dispatch_events() + + want_events = [event1, event2, event3, event4] + self.assertEqual(want_events, got_events) + + def test_event_lifecycle(self): + # Validate that libvirt events are correctly translated + # to Nova events + conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False) + got_events = [] + + def handler(event): + got_events.append(event) + + conn.register_event_listener(handler) + conn._init_events_pipe() + fake_dom_xml = """ + <domain type='kvm'> + <uuid>cef19ce0-0ca2-11df-855d-b19fbce37686</uuid> + <devices> + <disk type='file'> + <source file='filename'/> + </disk> + </devices> + </domain> + """ + dom = FakeVirtDomain(fake_dom_xml, + "cef19ce0-0ca2-11df-855d-b19fbce37686") + + conn._event_lifecycle_callback(conn._conn, + dom, + libvirt.VIR_DOMAIN_EVENT_STOPPED, + 0, + conn) + conn._dispatch_events() + self.assertEqual(len(got_events), 1) + self.assertEqual(type(got_events[0]), virtevent.LifecycleEvent) + self.assertEqual(got_events[0].uuid, + "cef19ce0-0ca2-11df-855d-b19fbce37686") + self.assertEqual(got_events[0].transition, + virtevent.EVENT_LIFECYCLE_STOPPED) + class HostStateTestCase(test.TestCase): diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 1a5eaaa1a..2a68b39bd 100755 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -41,18 +41,23 @@ Supports KVM, LXC, QEMU, UML, and XEN. """ import errno +import eventlet import functools import glob import hashlib import os import shutil +import socket import sys import tempfile import time import uuid +from eventlet import greenio from eventlet import greenthread +from eventlet import patcher from eventlet import tpool +from eventlet import util as eventlet_util from lxml import etree from xml.dom import minidom @@ -76,6 +81,7 @@ from nova import version from nova.virt import configdrive from nova.virt.disk import api as disk from nova.virt import driver +from nova.virt import event as virtevent from nova.virt import firewall from nova.virt.libvirt import blockinfo from nova.virt.libvirt import config as vconfig @@ -85,6 +91,9 @@ from nova.virt.libvirt import imagecache from nova.virt.libvirt import utils as libvirt_utils from nova.virt import netutils +native_threading = patcher.original("threading") +native_Queue = patcher.original("Queue") + libvirt = None LOG = logging.getLogger(__name__) @@ -301,6 +310,7 @@ class LibvirtDriver(driver.ComputeDriver): CONF.libvirt_volume_drivers, self) self._host_state = None + self._event_queue = None self._disk_cachemode = None self.image_cache_manager = imagecache.ImageCacheManager() @@ -351,7 +361,146 @@ class LibvirtDriver(driver.ComputeDriver): except Exception: return False + def _native_thread(self): + """Receives async events coming in from libvirtd. + + This is a native thread which runs the default + libvirt event loop implementation. This processes + any incoming async events from libvirtd and queues + them for later dispatch. This thread is only + permitted to use libvirt python APIs, and the + driver.queue_event method. In particular any use + of logging is forbidden, since it will confuse + eventlet's greenthread integration""" + + while True: + libvirt.virEventRunDefaultImpl() + + def _dispatch_thread(self): + """Dispatches async events coming in from libvirtd. + + This is a green thread which waits for events to + arrive from the libvirt event loop thread. This + then dispatches the events to the compute manager.""" + + while True: + self._dispatch_events() + + @staticmethod + def _event_lifecycle_callback(conn, dom, event, detail, opaque): + """Receives lifecycle events from libvirt. + + NB: this method is executing in a native thread, not + an eventlet coroutine. It can only invoke other libvirt + APIs, or use self.queue_event(). Any use of logging APIs + in particular is forbidden.""" + + self = opaque + + uuid = dom.UUIDString() + transition = None + if event == libvirt.VIR_DOMAIN_EVENT_STOPPED: + transition = virtevent.EVENT_LIFECYCLE_STOPPED + elif event == libvirt.VIR_DOMAIN_EVENT_STARTED: + transition = virtevent.EVENT_LIFECYCLE_STARTED + elif event == libvirt.VIR_DOMAIN_EVENT_SUSPENDED: + transition = virtevent.EVENT_LIFECYCLE_PAUSED + elif event == libvirt.VIR_DOMAIN_EVENT_RESUMED: + transition = virtevent.EVENT_LIFECYCLE_RESUMED + + if transition is not None: + self._queue_event(virtevent.LifecycleEvent(uuid, transition)) + + def _queue_event(self, event): + """Puts an event on the queue for dispatch. + + This method is called by the native event thread to + put events on the queue for later dispatch by the + green thread.""" + + if self._event_queue is None: + LOG.debug("Event loop thread is not active, " + "discarding event %s" % event) + return + + # Queue the event... + self._event_queue.put(event) + + # ...then wakeup the green thread to dispatch it + c = ' '.encode() + self._event_notify_send.write(c) + self._event_notify_send.flush() + + def _dispatch_events(self): + """Wait for & dispatch events from native thread + + Blocks until native thread indicates some events + are ready. Then dispatches all queued events.""" + + # Wait to be notified that there are some + # events pending + try: + _c = self._event_notify_recv.read(1) + assert _c + except ValueError: + return # will be raised when pipe is closed + + # Process as many events as possible without + # blocking + while not self._event_queue.empty(): + try: + event = self._event_queue.get(block=False) + self.emit_event(event) + except native_Queue.Empty: + pass + + def _init_events_pipe(self): + """Create a self-pipe for the native thread to synchronize on. + + This code is taken from the eventlet tpool module, under terms + of the Apache License v2.0.""" + + self._event_queue = native_Queue.Queue() + try: + rpipe, wpipe = os.pipe() + self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0) + self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0) + except (ImportError, NotImplementedError): + # This is Windows compatibility -- use a socket instead + # of a pipe because pipes don't really exist on Windows. + sock = eventlet_util.__original_socket__(socket.AF_INET, + socket.SOCK_STREAM) + sock.bind(('localhost', 0)) + sock.listen(50) + csock = eventlet_util.__original_socket__(socket.AF_INET, + socket.SOCK_STREAM) + csock.connect(('localhost', sock.getsockname()[1])) + nsock, addr = sock.accept() + self._event_notify_send = nsock.makefile('wb', 0) + gsock = greenio.GreenSocket(csock) + self._event_notify_recv = gsock.makefile('rb', 0) + + def _init_events(self): + """Initializes the libvirt events subsystem. + + This requires running a native thread to provide the + libvirt event loop integration. This forwards events + to a green thread which does the actual dispatching. + """ + + self._init_events_pipe() + + LOG.debug("Starting native event thread") + event_thread = native_threading.Thread(target=self._native_thread) + event_thread.setDaemon(True) + event_thread.start() + + LOG.debug("Starting green dispatch thread") + dispatch_thread = eventlet.spawn(self._dispatch_thread) + def init_host(self, host): + libvirt.virEventRegisterDefaultImpl() + if not self.has_min_version(MIN_LIBVIRT_VERSION): major = MIN_LIBVIRT_VERSION[0] minor = MIN_LIBVIRT_VERSION[1] @@ -360,6 +509,8 @@ class LibvirtDriver(driver.ComputeDriver): '%(major)i.%(minor)i.%(micro)i or greater.') % locals()) + self._init_events() + def _get_connection(self): if not self._wrapped_conn or not self._test_connection(): LOG.debug(_('Connecting to libvirt: %s'), self.uri) @@ -371,6 +522,17 @@ class LibvirtDriver(driver.ComputeDriver): (libvirt.virDomain, libvirt.virConnect), self._connect, self.uri, self.read_only) + try: + LOG.debug("Registering for lifecycle events %s" % str(self)) + self._wrapped_conn.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, + self._event_lifecycle_callback, + self) + except Exception, e: + LOG.warn(_("URI %s does not support events"), + self.uri) + return self._wrapped_conn _conn = property(_get_connection) |
