diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-02-20 08:42:04 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-02-20 08:42:04 +0000 |
| commit | ad325523968a89db74d649553ccd8bb53908899a (patch) | |
| tree | 9f47778ae2c5845b0408e6931385e29d6d4178ea /nova/virt | |
| parent | 22a1cb330c63b82bf9bd4a1bb9649ed0143de961 (diff) | |
| parent | d63f7c1b47509f31c1e5aa8d27392d7f83e6dad2 (diff) | |
Merge "Add support for lifecycle events in the libvirt driver"
Diffstat (limited to 'nova/virt')
| -rwxr-xr-x | nova/virt/libvirt/driver.py | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 1a5eaaa1a..2a68b39bd 100755 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -41,18 +41,23 @@ Supports KVM, LXC, QEMU, UML, and XEN. """ import errno +import eventlet import functools import glob import hashlib import os import shutil +import socket import sys import tempfile import time import uuid +from eventlet import greenio from eventlet import greenthread +from eventlet import patcher from eventlet import tpool +from eventlet import util as eventlet_util from lxml import etree from xml.dom import minidom @@ -76,6 +81,7 @@ from nova import version from nova.virt import configdrive from nova.virt.disk import api as disk from nova.virt import driver +from nova.virt import event as virtevent from nova.virt import firewall from nova.virt.libvirt import blockinfo from nova.virt.libvirt import config as vconfig @@ -85,6 +91,9 @@ from nova.virt.libvirt import imagecache from nova.virt.libvirt import utils as libvirt_utils from nova.virt import netutils +native_threading = patcher.original("threading") +native_Queue = patcher.original("Queue") + libvirt = None LOG = logging.getLogger(__name__) @@ -301,6 +310,7 @@ class LibvirtDriver(driver.ComputeDriver): CONF.libvirt_volume_drivers, self) self._host_state = None + self._event_queue = None self._disk_cachemode = None self.image_cache_manager = imagecache.ImageCacheManager() @@ -351,7 +361,146 @@ class LibvirtDriver(driver.ComputeDriver): except Exception: return False + def _native_thread(self): + """Receives async events coming in from libvirtd. + + This is a native thread which runs the default + libvirt event loop implementation. This processes + any incoming async events from libvirtd and queues + them for later dispatch. This thread is only + permitted to use libvirt python APIs, and the + driver.queue_event method. In particular any use + of logging is forbidden, since it will confuse + eventlet's greenthread integration""" + + while True: + libvirt.virEventRunDefaultImpl() + + def _dispatch_thread(self): + """Dispatches async events coming in from libvirtd. + + This is a green thread which waits for events to + arrive from the libvirt event loop thread. This + then dispatches the events to the compute manager.""" + + while True: + self._dispatch_events() + + @staticmethod + def _event_lifecycle_callback(conn, dom, event, detail, opaque): + """Receives lifecycle events from libvirt. + + NB: this method is executing in a native thread, not + an eventlet coroutine. It can only invoke other libvirt + APIs, or use self.queue_event(). Any use of logging APIs + in particular is forbidden.""" + + self = opaque + + uuid = dom.UUIDString() + transition = None + if event == libvirt.VIR_DOMAIN_EVENT_STOPPED: + transition = virtevent.EVENT_LIFECYCLE_STOPPED + elif event == libvirt.VIR_DOMAIN_EVENT_STARTED: + transition = virtevent.EVENT_LIFECYCLE_STARTED + elif event == libvirt.VIR_DOMAIN_EVENT_SUSPENDED: + transition = virtevent.EVENT_LIFECYCLE_PAUSED + elif event == libvirt.VIR_DOMAIN_EVENT_RESUMED: + transition = virtevent.EVENT_LIFECYCLE_RESUMED + + if transition is not None: + self._queue_event(virtevent.LifecycleEvent(uuid, transition)) + + def _queue_event(self, event): + """Puts an event on the queue for dispatch. + + This method is called by the native event thread to + put events on the queue for later dispatch by the + green thread.""" + + if self._event_queue is None: + LOG.debug("Event loop thread is not active, " + "discarding event %s" % event) + return + + # Queue the event... + self._event_queue.put(event) + + # ...then wakeup the green thread to dispatch it + c = ' '.encode() + self._event_notify_send.write(c) + self._event_notify_send.flush() + + def _dispatch_events(self): + """Wait for & dispatch events from native thread + + Blocks until native thread indicates some events + are ready. Then dispatches all queued events.""" + + # Wait to be notified that there are some + # events pending + try: + _c = self._event_notify_recv.read(1) + assert _c + except ValueError: + return # will be raised when pipe is closed + + # Process as many events as possible without + # blocking + while not self._event_queue.empty(): + try: + event = self._event_queue.get(block=False) + self.emit_event(event) + except native_Queue.Empty: + pass + + def _init_events_pipe(self): + """Create a self-pipe for the native thread to synchronize on. + + This code is taken from the eventlet tpool module, under terms + of the Apache License v2.0.""" + + self._event_queue = native_Queue.Queue() + try: + rpipe, wpipe = os.pipe() + self._event_notify_send = greenio.GreenPipe(wpipe, 'wb', 0) + self._event_notify_recv = greenio.GreenPipe(rpipe, 'rb', 0) + except (ImportError, NotImplementedError): + # This is Windows compatibility -- use a socket instead + # of a pipe because pipes don't really exist on Windows. + sock = eventlet_util.__original_socket__(socket.AF_INET, + socket.SOCK_STREAM) + sock.bind(('localhost', 0)) + sock.listen(50) + csock = eventlet_util.__original_socket__(socket.AF_INET, + socket.SOCK_STREAM) + csock.connect(('localhost', sock.getsockname()[1])) + nsock, addr = sock.accept() + self._event_notify_send = nsock.makefile('wb', 0) + gsock = greenio.GreenSocket(csock) + self._event_notify_recv = gsock.makefile('rb', 0) + + def _init_events(self): + """Initializes the libvirt events subsystem. + + This requires running a native thread to provide the + libvirt event loop integration. This forwards events + to a green thread which does the actual dispatching. + """ + + self._init_events_pipe() + + LOG.debug("Starting native event thread") + event_thread = native_threading.Thread(target=self._native_thread) + event_thread.setDaemon(True) + event_thread.start() + + LOG.debug("Starting green dispatch thread") + dispatch_thread = eventlet.spawn(self._dispatch_thread) + def init_host(self, host): + libvirt.virEventRegisterDefaultImpl() + if not self.has_min_version(MIN_LIBVIRT_VERSION): major = MIN_LIBVIRT_VERSION[0] minor = MIN_LIBVIRT_VERSION[1] @@ -360,6 +509,8 @@ class LibvirtDriver(driver.ComputeDriver): '%(major)i.%(minor)i.%(micro)i or greater.') % locals()) + self._init_events() + def _get_connection(self): if not self._wrapped_conn or not self._test_connection(): LOG.debug(_('Connecting to libvirt: %s'), self.uri) @@ -371,6 +522,17 @@ class LibvirtDriver(driver.ComputeDriver): (libvirt.virDomain, libvirt.virConnect), self._connect, self.uri, self.read_only) + try: + LOG.debug("Registering for lifecycle events %s" % str(self)) + self._wrapped_conn.domainEventRegisterAny( + None, + libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, + self._event_lifecycle_callback, + self) + except Exception, e: + LOG.warn(_("URI %s does not support events"), + self.uri) + return self._wrapped_conn _conn = property(_get_connection) |
