From c57908241e68a3f2a9f5eb4ee0fff6207962023d Mon Sep 17 00:00:00 2001 From: sateesh Date: Fri, 18 Mar 2011 17:20:46 +0530 Subject: Using eventlets greenthreads for optimized image processing. Fixed minor issues and style related nits. --- nova/tests/test_vmwareapi.py | 2 +- nova/virt/vmwareapi/fake.py | 4 +- nova/virt/vmwareapi/io_util.py | 168 +++++++++++++++++++++++++++++++++ nova/virt/vmwareapi/read_write_util.py | 48 ++++++++-- nova/virt/vmwareapi/vmops.py | 17 ++-- nova/virt/vmwareapi/vmware_images.py | 81 +++++++++++++--- nova/virt/vmwareapi_conn.py | 2 +- 7 files changed, 293 insertions(+), 29 deletions(-) create mode 100644 nova/virt/vmwareapi/io_util.py (limited to 'nova') diff --git a/nova/tests/test_vmwareapi.py b/nova/tests/test_vmwareapi.py index d17805b99..b31ac11f1 100644 --- a/nova/tests/test_vmwareapi.py +++ b/nova/tests/test_vmwareapi.py @@ -45,7 +45,7 @@ class VMWareAPIVMTestCase(test.TestCase): super(VMWareAPIVMTestCase, self).setUp() self.flags(vmwareapi_host_ip='test_url', vmwareapi_host_username='test_username', - vmware_host_password='test_pass') + vmwareapi_host_password='test_pass') self.manager = manager.AuthManager() self.user = self.manager.create_user('fake', 'fake', 'fake', admin=True) diff --git a/nova/virt/vmwareapi/fake.py b/nova/virt/vmwareapi/fake.py index 80768ad2d..3afb46590 100644 --- a/nova/virt/vmwareapi/fake.py +++ b/nova/virt/vmwareapi/fake.py @@ -192,7 +192,9 @@ class VirtualMachine(ManagedObject): ds_do.ManagedObjectReference = [kwargs.get("ds").obj] self.set("datastore", ds_do) self.set("summary.guest.toolsStatus", kwargs.get("toolsstatus", - "toolsOk")) + "toolsOk")) + self.set("summary.guest.toolsRunningStatus", kwargs.get( + "toolsrunningstate", "guestToolsRunning")) self.set("runtime.powerState", kwargs.get("powerstate", "poweredOn")) self.set("config.files.vmPathName", kwargs.get("vmPathName")) self.set("summary.config.numCpu", kwargs.get("numCpu", 1)) diff --git a/nova/virt/vmwareapi/io_util.py b/nova/virt/vmwareapi/io_util.py new file mode 100644 index 000000000..7f321c1e7 --- /dev/null +++ b/nova/virt/vmwareapi/io_util.py @@ -0,0 +1,168 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 Citrix Systems, Inc. +# Copyright 2011 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Utility classes for defining the time saving transfer of data from the reader +to the write using a LightQueue as a Pipe between the reader and the writer. +""" + +from eventlet import event +from eventlet import greenthread +from eventlet.queue import LightQueue + +from glance import client + +from nova import exception +from nova import log as logging + +LOG = logging.getLogger("nova.virt.vmwareapi.io_util") + +IO_THREAD_SLEEP_TIME = .01 +GLANCE_POLL_INTERVAL = 5 + + +class ThreadSafePipe(LightQueue): + """The pipe to hold the data which the reader writes to and the writer + reads from.""" + + def __init__(self, maxsize, transfer_size): + LightQueue.__init__(self, maxsize) + self.transfer_size = transfer_size + self.transferred = 0 + + def read(self, chunk_size): + """Read data from the pipe. Chunksize if ignored for we have ensured + that the data chunks written to the pipe by readers is the same as the + chunks asked for by the Writer.""" + if self.transferred < self.transfer_size: + data_item = self.get() + self.transferred += len(data_item) + return data_item + else: + return "" + + def write(self, data): + """Put a data item in the pipe.""" + self.put(data) + + def close(self): + """A place-holder to maintain consistency.""" + pass + + +class GlanceWriteThread(object): + """Ensures that image data is written to in the glance client and that + it is in correct ('active')state.""" + + def __init__(self, input, glance_client, image_id, image_meta={}): + self.input = input + self.glance_client = glance_client + self.image_id = image_id + self.image_meta = image_meta + self._running = False + + def start(self): + self.done = event.Event() + + def _inner(): + """Function to do the image data transfer through an update + and thereon checks if the state is 'active'.""" + self.glance_client.update_image(self.image_id, + image_meta=self.image_meta, + image_data=self.input) + self._running = True + while self._running: + try: + image_status = \ + self.glance_client.get_image_meta(self.image_id).get( + "status") + if image_status == "active": + self.stop() + self.done.send(True) + # If the state is killed, then raise an exception. + elif image_status == "killed": + self.stop() + exc_msg = _("Glance image %s is in killed state") %\ + self.image_id + LOG.exception(exc_msg) + self.done.send_exception(exception.Error(exc_msg)) + elif image_status in ["saving", "queued"]: + greenthread.sleep(GLANCE_POLL_INTERVAL) + else: + self.stop() + exc_msg = _("Glance image " + "%(image_id)s is in unknown state " + "- %(state)s") % { + "image_id": self.image_id, + "state": image_status} + LOG.exception(exc_msg) + self.done.send_exception(exception.Error(exc_msg)) + except Exception, exc: + self.stop() + self.done.send_exception(exc) + + greenthread.spawn(_inner) + return self.done + + def stop(self): + self._running = False + + def wait(self): + return self.done.wait() + + def close(self): + pass + + +class IOThread(object): + """Class that reads chunks from the input file and writes them to the + output file till the transfer is completely done.""" + + def __init__(self, input, output): + self.input = input + self.output = output + self._running = False + self.got_exception = False + + def start(self): + self.done = event.Event() + + def _inner(): + """Read data from the input and write the same to the output + until the transfer completes.""" + self._running = True + while self._running: + try: + data = self.input.read(None) + if not data: + self.stop() + self.done.send(True) + self.output.write(data) + greenthread.sleep(IO_THREAD_SLEEP_TIME) + except Exception, exc: + self.stop() + LOG.exception(exc) + self.done.send_exception(exc) + + greenthread.spawn(_inner) + return self.done + + def stop(self): + self._running = False + + def wait(self): + return self.done.wait() diff --git a/nova/virt/vmwareapi/read_write_util.py b/nova/virt/vmwareapi/read_write_util.py index 52ed6f9ac..237fd44dc 100644 --- a/nova/virt/vmwareapi/read_write_util.py +++ b/nova/virt/vmwareapi/read_write_util.py @@ -27,16 +27,49 @@ import urllib import urllib2 import urlparse +from eventlet import event +from eventlet import greenthread + +from glance import client + from nova import flags from nova import log as logging -FLAGS = flags.FLAGS +LOG = logging.getLogger("nova.virt.vmwareapi.read_write_util") -READ_CHUNKSIZE = 2 * 1024 * 1024 +FLAGS = flags.FLAGS USER_AGENT = "OpenStack-ESX-Adapter" -LOG = logging.getLogger("nova.virt.vmwareapi.read_write_util") +try: + READ_CHUNKSIZE = client.BaseClient.CHUNKSIZE +except: + READ_CHUNKSIZE = 65536 + + +class GlanceFileRead(object): + """Glance file read handler class.""" + + def __init__(self, glance_read_iter): + self.glance_read_iter = glance_read_iter + self.iter = self.get_next() + + def read(self, chunk_size): + """Read an item from the queue. The chunk size is ignored for the + Client ImageBodyIterator uses its own CHUNKSIZE.""" + try: + return self.iter.next() + except StopIteration: + return "" + + def get_next(self): + """Get the next item from the image iterator.""" + for data in self.glance_read_iter: + yield data + + def close(self): + """A dummy close just to maintain consistency.""" + pass class VMwareHTTPFile(object): @@ -77,7 +110,7 @@ class VMwareHTTPFile(object): """Write data to the file.""" raise NotImplementedError - def read(self, chunk_size=READ_CHUNKSIZE): + def read(self, chunk_size): """Read a chunk of data.""" raise NotImplementedError @@ -137,9 +170,12 @@ class VmWareHTTPReadFile(VMwareHTTPFile): conn = urllib2.urlopen(request) VMwareHTTPFile.__init__(self, conn) - def read(self, chunk_size=READ_CHUNKSIZE): + def read(self, chunk_size): """Read a chunk of data.""" - return self.file_handle.read(chunk_size) + # We are ignoring the chunk size passed for we want the pipe to hold + # data items of the chunk-size that Glance Client uses for read + # while writing. + return self.file_handle.read(READ_CHUNKSIZE) def get_size(self): """Get size of the file to be read.""" diff --git a/nova/virt/vmwareapi/vmops.py b/nova/virt/vmwareapi/vmops.py index 4b3c8adca..e09b89e39 100644 --- a/nova/virt/vmwareapi/vmops.py +++ b/nova/virt/vmwareapi/vmops.py @@ -482,27 +482,32 @@ class VMWareVMOps(object): if vm_ref is None: raise exception.NotFound(_("instance - %s not present") % instance.name) - lst_properties = ["summary.guest.toolsStatus", "runtime.powerState"] + lst_properties = ["summary.guest.toolsStatus", "runtime.powerState", + "summary.guest.toolsRunningStatus"] props = self._session._call_method(vim_util, "get_object_properties", None, vm_ref, "VirtualMachine", lst_properties) + pwr_state = None + tools_status = None + tools_running_status = False for elem in props: - pwr_state = None - tools_status = None for prop in elem.propSet: if prop.name == "runtime.powerState": pwr_state = prop.val elif prop.name == "summary.guest.toolsStatus": tools_status = prop.val + elif prop.name == "summary.guest.toolsRunningStatus": + tools_running_status = prop.val # Raise an exception if the VM is not powered On. if pwr_state not in ["poweredOn"]: raise exception.Invalid(_("instance - %s not poweredOn. So can't " "be rebooted.") % instance.name) - # If vmware tools are installed in the VM, then do a guest reboot. - # Otherwise do a hard reset. - if tools_status not in ['toolsNotInstalled', 'toolsNotRunning']: + # If latest vmware tools are installed in the VM, and that the tools + # are running, then only do a guest reboot. Otherwise do a hard reset. + if (tools_status == "toolsOk" and + tools_running_status == "guestToolsRunning"): LOG.debug(_("Rebooting guest OS of VM %s") % instance.name) self._session._call_method(self._session._get_vim(), "RebootGuest", vm_ref) diff --git a/nova/virt/vmwareapi/vmware_images.py b/nova/virt/vmwareapi/vmware_images.py index d9c7f52e5..50c6baedf 100644 --- a/nova/virt/vmwareapi/vmware_images.py +++ b/nova/virt/vmwareapi/vmware_images.py @@ -18,20 +18,70 @@ Utility functions for Image transfer. """ -import glance.client +from glance import client from nova import exception from nova import flags from nova import log as logging +from nova.virt.vmwareapi import io_util from nova.virt.vmwareapi import read_write_util -FLAGS = flags.FLAGS +LOG = logging.getLogger("nova.virt.vmwareapi.vmware_images") -QUEUE_BUFFER_SIZE = 5 -READ_CHUNKSIZE = 2 * 1024 * 1024 -WRITE_CHUNKSIZE = 2 * 1024 * 1024 +FLAGS = flags.FLAGS -LOG = logging.getLogger("nova.virt.vmwareapi.vmware_images") +QUEUE_BUFFER_SIZE = 10 + + +def start_transfer(read_file_handle, data_size, write_file_handle=None, + glance_client=None, image_id=None, image_meta={}): + """Start the data transfer from the reader to the writer. + Reader writes to the pipe and the writer reads from the pipe. This means + that the total transfer time boils down to the slower of the read/write + and not the addition of the two times.""" + # The pipe that acts as an intermediate store of data for reader to write + # to and writer to grab from. + thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, data_size) + # The read thread. In case of glance it is the instance of the + # GlanceFileRead class. The glance client read returns an iterator + # and this class wraps that iterator to provide datachunks in calls + # to read. + read_thread = io_util.IOThread(read_file_handle, thread_safe_pipe) + + # In case of Glance - VMWare transfer, we just need a handle to the + # HTTP Connection that is to send transfer data to the VMWare datastore. + if write_file_handle: + write_thread = io_util.IOThread(thread_safe_pipe, write_file_handle) + # In case of VMWare - Glance transfer, we relinquish VMWare HTTP file read + # handle to Glance Client instance, but to be sure of the transfer we need + # to be sure of the status of the image on glnace changing to active. + # The GlanceWriteThread handles the same for us. + elif glance_client and image_id: + write_thread = io_util.GlanceWriteThread(thread_safe_pipe, + glance_client, image_id, image_meta) + # Start the read and write threads. + read_event = read_thread.start() + write_event = write_thread.start() + try: + # Wait on the read and write events to signal their end + read_event.wait() + write_event.wait() + except Exception, exc: + # In case of any of the reads or writes raising an exception, + # stop the threads so that we un-necessarily don't keep the other one + # waiting. + read_thread.stop() + write_thread.stop() + + # Log and raise the exception. + LOG.exception(exc) + raise exception.Error(exc) + finally: + # No matter what, try closing the read and write handles, if it so + # applies. + read_file_handle.close() + if write_file_handle: + write_file_handle.close() def fetch_image(image, instance, **kwargs): @@ -67,8 +117,9 @@ def upload_image(image, instance, **kwargs): def _get_glance_image(image, instance, **kwargs): """Download image from the glance image server.""" LOG.debug(_("Downloading image %s from glance image server") % image) - glance_client = glance.client.Client(FLAGS.glance_host, FLAGS.glance_port) - metadata, read_file_handle = glance_client.get_image(image) + glance_client = client.Client(FLAGS.glance_host, FLAGS.glance_port) + metadata, read_iter = glance_client.get_image(image) + read_file_handle = read_write_util.GlanceFileRead(read_iter) file_size = int(metadata['size']) write_file_handle = read_write_util.VMWareHTTPWriteFile( kwargs.get("host"), @@ -77,8 +128,8 @@ def _get_glance_image(image, instance, **kwargs): kwargs.get("cookies"), kwargs.get("file_path"), file_size) - for chunk in read_file_handle: - write_file_handle.write(chunk) + start_transfer(read_file_handle, file_size, + write_file_handle=write_file_handle) LOG.debug(_("Downloaded image %s from glance image server") % image) @@ -101,7 +152,9 @@ def _put_glance_image(image, instance, **kwargs): kwargs.get("datastore_name"), kwargs.get("cookies"), kwargs.get("file_path")) - glance_client = glance.client.Client(FLAGS.glance_host, FLAGS.glance_port) + file_size = read_file_handle.get_size() + glance_client = client.Client(FLAGS.glance_host, FLAGS.glance_port) + # The properties and other fields that we need to set for the image. image_metadata = {"is_public": True, "disk_format": "vmdk", "container_format": "bare", @@ -111,8 +164,8 @@ def _put_glance_image(image, instance, **kwargs): "vmware_ostype": kwargs.get("os_type"), "vmware_image_version": kwargs.get("image_version")}} - glance_client.update_image(image, image_meta=image_metadata, - image_data=read_file_handle) + start_transfer(read_file_handle, file_size, glance_client=glance_client, + image_id=image, image_meta=image_metadata) LOG.debug(_("Uploaded image %s to the Glance image server") % image) @@ -135,7 +188,7 @@ def get_vmdk_size_and_properties(image, instance): LOG.debug(_("Getting image size for the image %s") % image) if FLAGS.image_service == "nova.image.glance.GlanceImageService": - glance_client = glance.client.Client(FLAGS.glance_host, + glance_client = client.Client(FLAGS.glance_host, FLAGS.glance_port) meta_data = glance_client.get_image_meta(image) size, properties = meta_data["size"], meta_data["properties"] diff --git a/nova/virt/vmwareapi_conn.py b/nova/virt/vmwareapi_conn.py index bb10c6043..414b8731d 100644 --- a/nova/virt/vmwareapi_conn.py +++ b/nova/virt/vmwareapi_conn.py @@ -348,7 +348,7 @@ class VMWareAPISession(object): action["error"] = error_info LOG.warn(_("Task [%(task_name)s] %(task_ref)s " "status: error %(error_info)s") % locals()) - done.send_exception(Exception(error_info)) + done.send_exception(exception.Error(error_info)) db.instance_action_create(context.get_admin_context(), action) except Exception, excep: LOG.warn(_("In vmwareapi:_poll_task, Got this error %s") % excep) -- cgit