summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-02-20 08:42:04 +0000
committerGerrit Code Review <review@openstack.org>2013-02-20 08:42:04 +0000
commitad325523968a89db74d649553ccd8bb53908899a (patch)
tree9f47778ae2c5845b0408e6931385e29d6d4178ea
parent22a1cb330c63b82bf9bd4a1bb9649ed0143de961 (diff)
parentd63f7c1b47509f31c1e5aa8d27392d7f83e6dad2 (diff)
Merge "Add support for lifecycle events in the libvirt driver"
-rw-r--r--nova/tests/fakelibvirt.py38
-rw-r--r--nova/tests/test_libvirt.py84
-rwxr-xr-xnova/virt/libvirt/driver.py162
3 files changed, 283 insertions, 1 deletions
diff --git a/nova/tests/fakelibvirt.py b/nova/tests/fakelibvirt.py
index 6abe7771c..69a4e677e 100644
--- a/nova/tests/fakelibvirt.py
+++ b/nova/tests/fakelibvirt.py
@@ -70,6 +70,17 @@ VIR_DOMAIN_CRASHED = 6
VIR_DOMAIN_XML_SECURE = 1
+VIR_DOMAIN_EVENT_ID_LIFECYCLE = 0
+
+VIR_DOMAIN_EVENT_DEFINED = 0
+VIR_DOMAIN_EVENT_UNDEFINED = 1
+VIR_DOMAIN_EVENT_STARTED = 2
+VIR_DOMAIN_EVENT_SUSPENDED = 3
+VIR_DOMAIN_EVENT_RESUMED = 4
+VIR_DOMAIN_EVENT_STOPPED = 5
+VIR_DOMAIN_EVENT_SHUTDOWN = 6
+VIR_DOMAIN_EVENT_PMSUSPENDED = 7
+
VIR_DOMAIN_UNDEFINE_MANAGED_SAVE = 1
VIR_DOMAIN_AFFECT_CURRENT = 0
@@ -506,6 +517,7 @@ class Connection(object):
self._running_vms = {}
self._id_counter = 1 # libvirt reserves 0 for the hypervisor.
self._nwfilters = {}
+ self._event_callbacks = {}
self.fakeLibVersion = version
self.fakeVersion = version
@@ -517,6 +529,7 @@ class Connection(object):
def _mark_running(self, dom):
self._running_vms[self._id_counter] = dom
+ self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STARTED, 0)
self._id_counter += 1
def _mark_not_running(self, dom):
@@ -528,10 +541,13 @@ class Connection(object):
for (k, v) in self._running_vms.iteritems():
if v == dom:
del self._running_vms[k]
+ self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STOPPED, 0)
return
def _undefine(self, dom):
del self._vms[dom.name()]
+ if not dom._transient:
+ self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_UNDEFINED, 0)
def getInfo(self):
return [node_arch,
@@ -563,14 +579,25 @@ class Connection(object):
'name "%s"' % name,
VIR_ERR_NO_DOMAIN, VIR_FROM_QEMU)
+ def _emit_lifecycle(self, dom, event, detail):
+ if VIR_DOMAIN_EVENT_ID_LIFECYCLE not in self._event_callbacks:
+ return
+
+ cbinfo = self._event_callbacks[VIR_DOMAIN_EVENT_ID_LIFECYCLE]
+ callback = cbinfo[0]
+ opaque = cbinfo[1]
+ callback(self, dom, event, detail, opaque)
+
def defineXML(self, xml):
dom = Domain(connection=self, running=False, transient=False, xml=xml)
self._vms[dom.name()] = dom
+ self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_DEFINED, 0)
return dom
def createXML(self, xml, flags):
dom = Domain(connection=self, running=True, transient=True, xml=xml)
self._vms[dom.name()] = dom
+ self._emit_lifecycle(dom, VIR_DOMAIN_EVENT_STARTED, 0)
return dom
def getType(self):
@@ -586,6 +613,9 @@ class Connection(object):
def getHostname(self):
return 'compute1'
+ def domainEventRegisterAny(self, dom, eventid, callback, opaque):
+ self._event_callbacks[eventid] = [callback, opaque]
+
def getCapabilities(self):
return '''<capabilities>
<host>
@@ -875,6 +905,14 @@ def openAuth(uri, auth, flags):
return Connection(uri, readonly=False)
+def virEventRunDefaultImpl():
+ time.sleep(1)
+
+
+def virEventRegisterDefaultImpl():
+ pass
+
+
virDomain = Domain
diff --git a/nova/tests/test_libvirt.py b/nova/tests/test_libvirt.py
index 8ac74aabd..abb2b0c20 100644
--- a/nova/tests/test_libvirt.py
+++ b/nova/tests/test_libvirt.py
@@ -53,6 +53,7 @@ from nova import utils
from nova import version
from nova.virt.disk import api as disk
from nova.virt import driver
+from nova.virt import event as virtevent
from nova.virt import fake
from nova.virt import firewall as base_firewall
from nova.virt import images
@@ -100,7 +101,8 @@ class FakeVirDomainSnapshot(object):
class FakeVirtDomain(object):
- def __init__(self, fake_xml=None):
+ def __init__(self, fake_xml=None, uuidstr=None):
+ self.uuidstr = uuidstr
if fake_xml:
self._fake_dom_xml = fake_xml
else:
@@ -132,6 +134,9 @@ class FakeVirtDomain(object):
def XMLDesc(self, *args):
return self._fake_dom_xml
+ def UUIDString(self):
+ return self.uuidstr
+
class CacheConcurrencyTestCase(test.TestCase):
def setUp(self):
@@ -3350,6 +3355,83 @@ class LibvirtConnTestCase(test.TestCase):
got = conn.get_instance_capabilities()
self.assertEqual(want, got)
+ def test_event_dispatch(self):
+ # Validate that the libvirt self-pipe for forwarding
+ # events between threads is working sanely
+ conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
+ got_events = []
+
+ def handler(event):
+ got_events.append(event)
+
+ conn.register_event_listener(handler)
+
+ conn._init_events_pipe()
+
+ event1 = virtevent.LifecycleEvent(
+ "cef19ce0-0ca2-11df-855d-b19fbce37686",
+ virtevent.EVENT_LIFECYCLE_STARTED)
+ event2 = virtevent.LifecycleEvent(
+ "cef19ce0-0ca2-11df-855d-b19fbce37686",
+ virtevent.EVENT_LIFECYCLE_PAUSED)
+ conn._queue_event(event1)
+ conn._queue_event(event2)
+ conn._dispatch_events()
+
+ want_events = [event1, event2]
+ self.assertEqual(want_events, got_events)
+
+ event3 = virtevent.LifecycleEvent(
+ "cef19ce0-0ca2-11df-855d-b19fbce37686",
+ virtevent.EVENT_LIFECYCLE_RESUMED)
+ event4 = virtevent.LifecycleEvent(
+ "cef19ce0-0ca2-11df-855d-b19fbce37686",
+ virtevent.EVENT_LIFECYCLE_STOPPED)
+
+ conn._queue_event(event3)
+ conn._queue_event(event4)
+ conn._dispatch_events()
+
+ want_events = [event1, event2, event3, event4]
+ self.assertEqual(want_events, got_events)
+
+ def test_event_lifecycle(self):
+ # Validate that libvirt events are correctly translated
+ # to Nova events
+ conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
+ got_events = []
+
+ def handler(event):
+ got_events.append(event)
+
+ conn.register_event_listener(handler)
+ conn._init_events_pipe()
+ fake_dom_xml = """
+ <domain type='kvm'>
+ <uuid>cef19ce0-0ca2-11df-855d-b19fbce37686</uuid>
+ <devices>
+ <disk type='file'>
+ <source file='filename'/>
+ </disk>
+ </devices>
+ </domain>
+ """
+ dom = FakeVirtDomain(fake_dom_xml,
+ "cef19ce0-0ca2-11df-855d-b19fbce37686")
+
+ conn._event_lifecycle_callback(conn._conn,
+ dom,
+ libvirt.VIR_DOMAIN_EVENT_STOPPED,
+ 0,
+ conn)
+ conn._dispatch_events()
+ self.assertEqual(len(got_events), 1)
+ self.assertEqual(type(got_events[0]), virtevent.LifecycleEvent)
+ self.assertEqual(got_events[0].uuid,
+ "cef19ce0-0ca2-11df-855d-b19fbce37686")
+ self.assertEqual(got_events[0].transition,
+ virtevent.EVENT_LIFECYCLE_STOPPED)
+
class HostStateTestCase(test.TestCase):
diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py
index 1a5eaaa1a..2a68b39bd 100755
--- a/nova/virt/libvirt/driver.py
+++ b/nova/virt/libvirt/driver.py
@@ -41,18 +41,23 @@ Supports KVM, LXC, QEMU, UML, and XEN.
"""
import errno
+import eventlet
import functools
import glob
import hashlib
import os
import shutil
+import socket
import sys
import tempfile
import time
import uuid
+from eventlet import greenio
from eventlet import greenthread
+from eventlet import patcher
from eventlet import tpool
+from eventlet import util as eventlet_util
from lxml import etree
from xml.dom import minidom
@@ -76,6 +81,7 @@ from nova import version
from nova.virt import configdrive
from nova.virt.disk import api as disk
from nova.virt import driver
+from nova.virt import event as virtevent
from nova.virt import firewall
from nova.virt.libvirt import blockinfo
from nova.virt.libvirt import config as vconfig
@@ -85,6 +91,9 @@ from nova.virt.libvirt import imagecache
from nova.virt.libvirt import utils as libvirt_utils
from nova.virt import netutils
+native_threading = patcher.original("threading")
+native_Queue = patcher.original("Queue")
+
libvirt = None
LOG = logging.getLogger(__name__)
@@ -301,6 +310,7 @@ class LibvirtDriver(driver.ComputeDriver):
CONF.libvirt_volume_drivers, self)
self._host_state = None
+ self._event_queue = None
self._disk_cachemode = None
self.image_cache_manager = imagecache.ImageCacheManager()
@@ -351,7 +361,146 @@ class LibvirtDriver(driver.ComputeDriver):
except Exception:
return False
+ def _native_thread(self):
+ """Receives async events coming in from libvirtd.
+
+ This is a native thread which runs the default
+ libvirt event loop implementation. This processes
+ any incoming async events from libvirtd and queues
+ them for later dispatch. This thread is only
+ permitted to use libvirt python APIs, and the
+ driver.queue_event method. In particular any use
+ of logging is forbidden, since it will confuse
+ eventlet's greenthread integration"""
+
+ while True:
+ libvirt.virEventRunDefaultImpl()
+
+ def _dispatch_thread(self):
+ """Dispatches async events coming in from libvirtd.
+
+ This is a green thread which waits for events to
+ arrive from the libvirt event loop thread. This
+ then dispatches the events to the compute manager."""
+
+ while True:
+ self._dispatch_events()
+
+ @staticmethod
+ def _event_lifecycle_callback(conn, dom, event, detail, opaque):
+ """Receives lifecycle events from libvirt.
+
+ NB: this method is executing in a native thread, not
+ an eventlet coroutine. It can only invoke other libvirt
+ APIs, or use self.queue_event(). Any use of logging APIs
+ in particular is forbidden."""
+
+ self = opaque
+
+ uuid = dom.UUIDString()
+ transition = None
+ if event == libvirt.VIR_DOMAIN_EVENT_STOPPED:
+ transition = virtevent.EVENT_LIFECYCLE_STOPPED
+ elif event == libvirt.VIR_DOMAIN_EVENT_STARTED:
+ transition = virtevent.EVENT_LIFECYCLE_STARTED
+ elif event == libvirt.VIR_DOMAIN_EVENT_SUSPENDED:
+ transition = virtevent.EVENT_LIFECYCLE_PAUSED
+ elif event == libvirt.VIR_DOMAIN_EVENT_RESUMED:
+ transition = virtevent.EVENT_LIFECYCLE_RESUMED
+
+ if transition is not None:
+ self._queue_event(virtevent.LifecycleEvent(uuid, transition))
+
+ def _queue_event(self, event):
+ """Puts an event on the queue for dispatch.
+
+ This method is called by the native event thread to
+ put events on the queue for later dispatch by the
+ green thread."""
+
+ if self._event_queue is None:
+ LOG.debug("Event loop thread is not active, "
+ "discarding event %s" % event)
+ return
+
+ # Queue the event...
+ self._event_queue.put(event)
+
+ # ...then wakeup the green thread to dispatch it
+ c = ' '.encode()
+ self._event_notify_send.write(c)
+ self._event_notify_send.flush()
+
+ def _dispatch_events(self):
+ """Wait for & dispatch events from native thread
+
+ Blocks until native thread indicates some events
+ are ready. Then dispatches all queued events."""
+
+ # Wait to be notified that there are some
+ # events pending
+ try:
+ _c = self._event_notify_recv.read(1)
+ assert _c
+ except ValueError:
+ return # will be raised when pipe is closed
+
+ # Process as many events as possible without
+ # blocking
+ while not self._event_queue.empty():
+ try:
+ event = self._event_queue.get(block=False)
+ self.emit_event(event)
+ except native_Queue.Empty:
+ pass
+
+ def _init_events_pipe(self):
+ """Create a self-pipe for the native thread to synchronize on.
+
+ This code is taken from the eventlet tpool module, under terms
+ of the Apache License v2.0."""
+
+ self._event_queue = native_Queue.Queue()
+ try:
+ rpipe, wpipe = os.pipe()
+ self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0)
+ self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0)
+ except (ImportError, NotImplementedError):
+ # This is Windows compatibility -- use a socket instead
+ # of a pipe because pipes don't really exist on Windows.
+ sock = eventlet_util.__original_socket__(socket.AF_INET,
+ socket.SOCK_STREAM)
+ sock.bind(('localhost', 0))
+ sock.listen(50)
+ csock = eventlet_util.__original_socket__(socket.AF_INET,
+ socket.SOCK_STREAM)
+ csock.connect(('localhost', sock.getsockname()[1]))
+ nsock, addr = sock.accept()
+ self._event_notify_send = nsock.makefile('wb', 0)
+ gsock = greenio.GreenSocket(csock)
+ self._event_notify_recv = gsock.makefile('rb', 0)
+
+ def _init_events(self):
+ """Initializes the libvirt events subsystem.
+
+ This requires running a native thread to provide the
+ libvirt event loop integration. This forwards events
+ to a green thread which does the actual dispatching.
+ """
+
+ self._init_events_pipe()
+
+ LOG.debug("Starting native event thread")
+ event_thread = native_threading.Thread(target=self._native_thread)
+ event_thread.setDaemon(True)
+ event_thread.start()
+
+ LOG.debug("Starting green dispatch thread")
+ dispatch_thread = eventlet.spawn(self._dispatch_thread)
+
def init_host(self, host):
+ libvirt.virEventRegisterDefaultImpl()
+
if not self.has_min_version(MIN_LIBVIRT_VERSION):
major = MIN_LIBVIRT_VERSION[0]
minor = MIN_LIBVIRT_VERSION[1]
@@ -360,6 +509,8 @@ class LibvirtDriver(driver.ComputeDriver):
'%(major)i.%(minor)i.%(micro)i or greater.') %
locals())
+ self._init_events()
+
def _get_connection(self):
if not self._wrapped_conn or not self._test_connection():
LOG.debug(_('Connecting to libvirt: %s'), self.uri)
@@ -371,6 +522,17 @@ class LibvirtDriver(driver.ComputeDriver):
(libvirt.virDomain, libvirt.virConnect),
self._connect, self.uri, self.read_only)
+ try:
+ LOG.debug("Registering for lifecycle events %s" % str(self))
+ self._wrapped_conn.domainEventRegisterAny(
+ None,
+ libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
+ self._event_lifecycle_callback,
+ self)
+ except Exception, e:
+ LOG.warn(_("URI %s does not support events"),
+ self.uri)
+
return self._wrapped_conn
_conn = property(_get_connection)