summaryrefslogtreecommitdiffstats
path: root/nova/virt
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-02-20 08:42:04 +0000
committerGerrit Code Review <review@openstack.org>2013-02-20 08:42:04 +0000
commitad325523968a89db74d649553ccd8bb53908899a (patch)
tree9f47778ae2c5845b0408e6931385e29d6d4178ea /nova/virt
parent22a1cb330c63b82bf9bd4a1bb9649ed0143de961 (diff)
parentd63f7c1b47509f31c1e5aa8d27392d7f83e6dad2 (diff)
Merge "Add support for lifecycle events in the libvirt driver"
Diffstat (limited to 'nova/virt')
-rwxr-xr-xnova/virt/libvirt/driver.py162
1 files changed, 162 insertions, 0 deletions
diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py
index 1a5eaaa1a..2a68b39bd 100755
--- a/nova/virt/libvirt/driver.py
+++ b/nova/virt/libvirt/driver.py
@@ -41,18 +41,23 @@ Supports KVM, LXC, QEMU, UML, and XEN.
"""
import errno
+import eventlet
import functools
import glob
import hashlib
import os
import shutil
+import socket
import sys
import tempfile
import time
import uuid
+from eventlet import greenio
from eventlet import greenthread
+from eventlet import patcher
from eventlet import tpool
+from eventlet import util as eventlet_util
from lxml import etree
from xml.dom import minidom
@@ -76,6 +81,7 @@ from nova import version
from nova.virt import configdrive
from nova.virt.disk import api as disk
from nova.virt import driver
+from nova.virt import event as virtevent
from nova.virt import firewall
from nova.virt.libvirt import blockinfo
from nova.virt.libvirt import config as vconfig
@@ -85,6 +91,9 @@ from nova.virt.libvirt import imagecache
from nova.virt.libvirt import utils as libvirt_utils
from nova.virt import netutils
+native_threading = patcher.original("threading")
+native_Queue = patcher.original("Queue")
+
libvirt = None
LOG = logging.getLogger(__name__)
@@ -301,6 +310,7 @@ class LibvirtDriver(driver.ComputeDriver):
CONF.libvirt_volume_drivers, self)
self._host_state = None
+ self._event_queue = None
self._disk_cachemode = None
self.image_cache_manager = imagecache.ImageCacheManager()
@@ -351,7 +361,146 @@ class LibvirtDriver(driver.ComputeDriver):
except Exception:
return False
+ def _native_thread(self):
+ """Receives async events coming in from libvirtd.
+
+ This is a native thread which runs the default
+ libvirt event loop implementation. This processes
+ any incoming async events from libvirtd and queues
+ them for later dispatch. This thread is only
+ permitted to use libvirt python APIs, and the
+ driver.queue_event method. In particular any use
+ of logging is forbidden, since it will confuse
+ eventlet's greenthread integration"""
+
+ while True:
+ libvirt.virEventRunDefaultImpl()
+
+ def _dispatch_thread(self):
+ """Dispatches async events coming in from libvirtd.
+
+ This is a green thread which waits for events to
+ arrive from the libvirt event loop thread. This
+ then dispatches the events to the compute manager."""
+
+ while True:
+ self._dispatch_events()
+
+ @staticmethod
+ def _event_lifecycle_callback(conn, dom, event, detail, opaque):
+ """Receives lifecycle events from libvirt.
+
+ NB: this method is executing in a native thread, not
+ an eventlet coroutine. It can only invoke other libvirt
+ APIs, or use self.queue_event(). Any use of logging APIs
+ in particular is forbidden."""
+
+ self = opaque
+
+ uuid = dom.UUIDString()
+ transition = None
+ if event == libvirt.VIR_DOMAIN_EVENT_STOPPED:
+ transition = virtevent.EVENT_LIFECYCLE_STOPPED
+ elif event == libvirt.VIR_DOMAIN_EVENT_STARTED:
+ transition = virtevent.EVENT_LIFECYCLE_STARTED
+ elif event == libvirt.VIR_DOMAIN_EVENT_SUSPENDED:
+ transition = virtevent.EVENT_LIFECYCLE_PAUSED
+ elif event == libvirt.VIR_DOMAIN_EVENT_RESUMED:
+ transition = virtevent.EVENT_LIFECYCLE_RESUMED
+
+ if transition is not None:
+ self._queue_event(virtevent.LifecycleEvent(uuid, transition))
+
+ def _queue_event(self, event):
+ """Puts an event on the queue for dispatch.
+
+ This method is called by the native event thread to
+ put events on the queue for later dispatch by the
+ green thread."""
+
+ if self._event_queue is None:
+ LOG.debug("Event loop thread is not active, "
+ "discarding event %s" % event)
+ return
+
+ # Queue the event...
+ self._event_queue.put(event)
+
+ # ...then wakeup the green thread to dispatch it
+ c = ' '.encode()
+ self._event_notify_send.write(c)
+ self._event_notify_send.flush()
+
+ def _dispatch_events(self):
+ """Wait for & dispatch events from native thread
+
+ Blocks until native thread indicates some events
+ are ready. Then dispatches all queued events."""
+
+ # Wait to be notified that there are some
+ # events pending
+ try:
+ _c = self._event_notify_recv.read(1)
+ assert _c
+ except ValueError:
+ return # will be raised when pipe is closed
+
+ # Process as many events as possible without
+ # blocking
+ while not self._event_queue.empty():
+ try:
+ event = self._event_queue.get(block=False)
+ self.emit_event(event)
+ except native_Queue.Empty:
+ pass
+
+ def _init_events_pipe(self):
+ """Create a self-pipe for the native thread to synchronize on.
+
+ This code is taken from the eventlet tpool module, under terms
+ of the Apache License v2.0."""
+
+ self._event_queue = native_Queue.Queue()
+ try:
+ rpipe, wpipe = os.pipe()
+ self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0)
+ self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0)
+ except (ImportError, NotImplementedError):
+ # This is Windows compatibility -- use a socket instead
+ # of a pipe because pipes don't really exist on Windows.
+ sock = eventlet_util.__original_socket__(socket.AF_INET,
+ socket.SOCK_STREAM)
+ sock.bind(('localhost', 0))
+ sock.listen(50)
+ csock = eventlet_util.__original_socket__(socket.AF_INET,
+ socket.SOCK_STREAM)
+ csock.connect(('localhost', sock.getsockname()[1]))
+ nsock, addr = sock.accept()
+ self._event_notify_send = nsock.makefile('wb', 0)
+ gsock = greenio.GreenSocket(csock)
+ self._event_notify_recv = gsock.makefile('rb', 0)
+
+ def _init_events(self):
+ """Initializes the libvirt events subsystem.
+
+ This requires running a native thread to provide the
+ libvirt event loop integration. This forwards events
+ to a green thread which does the actual dispatching.
+ """
+
+ self._init_events_pipe()
+
+ LOG.debug("Starting native event thread")
+ event_thread = native_threading.Thread(target=self._native_thread)
+ event_thread.setDaemon(True)
+ event_thread.start()
+
+ LOG.debug("Starting green dispatch thread")
+ dispatch_thread = eventlet.spawn(self._dispatch_thread)
+
def init_host(self, host):
+ libvirt.virEventRegisterDefaultImpl()
+
if not self.has_min_version(MIN_LIBVIRT_VERSION):
major = MIN_LIBVIRT_VERSION[0]
minor = MIN_LIBVIRT_VERSION[1]
@@ -360,6 +509,8 @@ class LibvirtDriver(driver.ComputeDriver):
'%(major)i.%(minor)i.%(micro)i or greater.') %
locals())
+ self._init_events()
+
def _get_connection(self):
if not self._wrapped_conn or not self._test_connection():
LOG.debug(_('Connecting to libvirt: %s'), self.uri)
@@ -371,6 +522,17 @@ class LibvirtDriver(driver.ComputeDriver):
(libvirt.virDomain, libvirt.virConnect),
self._connect, self.uri, self.read_only)
+ try:
+ LOG.debug("Registering for lifecycle events %s" % str(self))
+ self._wrapped_conn.domainEventRegisterAny(
+ None,
+ libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE,
+ self._event_lifecycle_callback,
+ self)
+ except Exception, e:
+ LOG.warn(_("URI %s does not support events"),
+ self.uri)
+
return self._wrapped_conn
_conn = property(_get_connection)