diff options
| author | sateesh <sateesh.chodapuneedi@citrix.com> | 2011-03-18 17:20:46 +0530 |
|---|---|---|
| committer | sateesh <sateesh.chodapuneedi@citrix.com> | 2011-03-18 17:20:46 +0530 |
| commit | c57908241e68a3f2a9f5eb4ee0fff6207962023d (patch) | |
| tree | aff9783a873301b38dbb78f46a3f96dee69b208a /nova | |
| parent | cbcda1ec466fd498fb8e9fe47c72b52c2d4b3dde (diff) | |
Using eventlets greenthreads for optimized image processing.
Fixed minor issues and style related nits.
Diffstat (limited to 'nova')
| -rw-r--r-- | nova/tests/test_vmwareapi.py | 2 | ||||
| -rw-r--r-- | nova/virt/vmwareapi/fake.py | 4 | ||||
| -rw-r--r-- | nova/virt/vmwareapi/io_util.py | 168 | ||||
| -rw-r--r-- | nova/virt/vmwareapi/read_write_util.py | 48 | ||||
| -rw-r--r-- | nova/virt/vmwareapi/vmops.py | 17 | ||||
| -rw-r--r-- | nova/virt/vmwareapi/vmware_images.py | 81 | ||||
| -rw-r--r-- | nova/virt/vmwareapi_conn.py | 2 |
7 files changed, 293 insertions, 29 deletions
diff --git a/nova/tests/test_vmwareapi.py b/nova/tests/test_vmwareapi.py index d17805b99..b31ac11f1 100644 --- a/nova/tests/test_vmwareapi.py +++ b/nova/tests/test_vmwareapi.py @@ -45,7 +45,7 @@ class VMWareAPIVMTestCase(test.TestCase): super(VMWareAPIVMTestCase, self).setUp()
self.flags(vmwareapi_host_ip='test_url',
vmwareapi_host_username='test_username',
- vmware_host_password='test_pass')
+ vmwareapi_host_password='test_pass')
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
diff --git a/nova/virt/vmwareapi/fake.py b/nova/virt/vmwareapi/fake.py index 80768ad2d..3afb46590 100644 --- a/nova/virt/vmwareapi/fake.py +++ b/nova/virt/vmwareapi/fake.py @@ -192,7 +192,9 @@ class VirtualMachine(ManagedObject): ds_do.ManagedObjectReference = [kwargs.get("ds").obj]
self.set("datastore", ds_do)
self.set("summary.guest.toolsStatus", kwargs.get("toolsstatus",
- "toolsOk"))
+ "toolsOk"))
+ self.set("summary.guest.toolsRunningStatus", kwargs.get(
+ "toolsrunningstate", "guestToolsRunning"))
self.set("runtime.powerState", kwargs.get("powerstate", "poweredOn"))
self.set("config.files.vmPathName", kwargs.get("vmPathName"))
self.set("summary.config.numCpu", kwargs.get("numCpu", 1))
diff --git a/nova/virt/vmwareapi/io_util.py b/nova/virt/vmwareapi/io_util.py new file mode 100644 index 000000000..7f321c1e7 --- /dev/null +++ b/nova/virt/vmwareapi/io_util.py @@ -0,0 +1,168 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2011 Citrix Systems, Inc.
+# Copyright 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Utility classes for defining the time saving transfer of data from the reader
+to the write using a LightQueue as a Pipe between the reader and the writer.
+"""
+
+from eventlet import event
+from eventlet import greenthread
+from eventlet.queue import LightQueue
+
+from glance import client
+
+from nova import exception
+from nova import log as logging
+
+LOG = logging.getLogger("nova.virt.vmwareapi.io_util")
+
+IO_THREAD_SLEEP_TIME = .01
+GLANCE_POLL_INTERVAL = 5
+
+
+class ThreadSafePipe(LightQueue):
+ """The pipe to hold the data which the reader writes to and the writer
+ reads from."""
+
+ def __init__(self, maxsize, transfer_size):
+ LightQueue.__init__(self, maxsize)
+ self.transfer_size = transfer_size
+ self.transferred = 0
+
+ def read(self, chunk_size):
+ """Read data from the pipe. Chunksize if ignored for we have ensured
+ that the data chunks written to the pipe by readers is the same as the
+ chunks asked for by the Writer."""
+ if self.transferred < self.transfer_size:
+ data_item = self.get()
+ self.transferred += len(data_item)
+ return data_item
+ else:
+ return ""
+
+ def write(self, data):
+ """Put a data item in the pipe."""
+ self.put(data)
+
+ def close(self):
+ """A place-holder to maintain consistency."""
+ pass
+
+
+class GlanceWriteThread(object):
+ """Ensures that image data is written to in the glance client and that
+ it is in correct ('active')state."""
+
+ def __init__(self, input, glance_client, image_id, image_meta={}):
+ self.input = input
+ self.glance_client = glance_client
+ self.image_id = image_id
+ self.image_meta = image_meta
+ self._running = False
+
+ def start(self):
+ self.done = event.Event()
+
+ def _inner():
+ """Function to do the image data transfer through an update
+ and thereon checks if the state is 'active'."""
+ self.glance_client.update_image(self.image_id,
+ image_meta=self.image_meta,
+ image_data=self.input)
+ self._running = True
+ while self._running:
+ try:
+ image_status = \
+ self.glance_client.get_image_meta(self.image_id).get(
+ "status")
+ if image_status == "active":
+ self.stop()
+ self.done.send(True)
+ # If the state is killed, then raise an exception.
+ elif image_status == "killed":
+ self.stop()
+ exc_msg = _("Glance image %s is in killed state") %\
+ self.image_id
+ LOG.exception(exc_msg)
+ self.done.send_exception(exception.Error(exc_msg))
+ elif image_status in ["saving", "queued"]:
+ greenthread.sleep(GLANCE_POLL_INTERVAL)
+ else:
+ self.stop()
+ exc_msg = _("Glance image "
+ "%(image_id)s is in unknown state "
+ "- %(state)s") % {
+ "image_id": self.image_id,
+ "state": image_status}
+ LOG.exception(exc_msg)
+ self.done.send_exception(exception.Error(exc_msg))
+ except Exception, exc:
+ self.stop()
+ self.done.send_exception(exc)
+
+ greenthread.spawn(_inner)
+ return self.done
+
+ def stop(self):
+ self._running = False
+
+ def wait(self):
+ return self.done.wait()
+
+ def close(self):
+ pass
+
+
+class IOThread(object):
+ """Class that reads chunks from the input file and writes them to the
+ output file till the transfer is completely done."""
+
+ def __init__(self, input, output):
+ self.input = input
+ self.output = output
+ self._running = False
+ self.got_exception = False
+
+ def start(self):
+ self.done = event.Event()
+
+ def _inner():
+ """Read data from the input and write the same to the output
+ until the transfer completes."""
+ self._running = True
+ while self._running:
+ try:
+ data = self.input.read(None)
+ if not data:
+ self.stop()
+ self.done.send(True)
+ self.output.write(data)
+ greenthread.sleep(IO_THREAD_SLEEP_TIME)
+ except Exception, exc:
+ self.stop()
+ LOG.exception(exc)
+ self.done.send_exception(exc)
+
+ greenthread.spawn(_inner)
+ return self.done
+
+ def stop(self):
+ self._running = False
+
+ def wait(self):
+ return self.done.wait()
diff --git a/nova/virt/vmwareapi/read_write_util.py b/nova/virt/vmwareapi/read_write_util.py index 52ed6f9ac..237fd44dc 100644 --- a/nova/virt/vmwareapi/read_write_util.py +++ b/nova/virt/vmwareapi/read_write_util.py @@ -27,16 +27,49 @@ import urllib import urllib2
import urlparse
+from eventlet import event
+from eventlet import greenthread
+
+from glance import client
+
from nova import flags
from nova import log as logging
-FLAGS = flags.FLAGS
+LOG = logging.getLogger("nova.virt.vmwareapi.read_write_util")
-READ_CHUNKSIZE = 2 * 1024 * 1024
+FLAGS = flags.FLAGS
USER_AGENT = "OpenStack-ESX-Adapter"
-LOG = logging.getLogger("nova.virt.vmwareapi.read_write_util")
+try:
+ READ_CHUNKSIZE = client.BaseClient.CHUNKSIZE
+except:
+ READ_CHUNKSIZE = 65536
+
+
+class GlanceFileRead(object):
+ """Glance file read handler class."""
+
+ def __init__(self, glance_read_iter):
+ self.glance_read_iter = glance_read_iter
+ self.iter = self.get_next()
+
+ def read(self, chunk_size):
+ """Read an item from the queue. The chunk size is ignored for the
+ Client ImageBodyIterator uses its own CHUNKSIZE."""
+ try:
+ return self.iter.next()
+ except StopIteration:
+ return ""
+
+ def get_next(self):
+ """Get the next item from the image iterator."""
+ for data in self.glance_read_iter:
+ yield data
+
+ def close(self):
+ """A dummy close just to maintain consistency."""
+ pass
class VMwareHTTPFile(object):
@@ -77,7 +110,7 @@ class VMwareHTTPFile(object): """Write data to the file."""
raise NotImplementedError
- def read(self, chunk_size=READ_CHUNKSIZE):
+ def read(self, chunk_size):
"""Read a chunk of data."""
raise NotImplementedError
@@ -137,9 +170,12 @@ class VmWareHTTPReadFile(VMwareHTTPFile): conn = urllib2.urlopen(request)
VMwareHTTPFile.__init__(self, conn)
- def read(self, chunk_size=READ_CHUNKSIZE):
+ def read(self, chunk_size):
"""Read a chunk of data."""
- return self.file_handle.read(chunk_size)
+ # We are ignoring the chunk size passed for we want the pipe to hold
+ # data items of the chunk-size that Glance Client uses for read
+ # while writing.
+ return self.file_handle.read(READ_CHUNKSIZE)
def get_size(self):
"""Get size of the file to be read."""
diff --git a/nova/virt/vmwareapi/vmops.py b/nova/virt/vmwareapi/vmops.py index 4b3c8adca..e09b89e39 100644 --- a/nova/virt/vmwareapi/vmops.py +++ b/nova/virt/vmwareapi/vmops.py @@ -482,27 +482,32 @@ class VMWareVMOps(object): if vm_ref is None:
raise exception.NotFound(_("instance - %s not present") %
instance.name)
- lst_properties = ["summary.guest.toolsStatus", "runtime.powerState"]
+ lst_properties = ["summary.guest.toolsStatus", "runtime.powerState",
+ "summary.guest.toolsRunningStatus"]
props = self._session._call_method(vim_util, "get_object_properties",
None, vm_ref, "VirtualMachine",
lst_properties)
+ pwr_state = None
+ tools_status = None
+ tools_running_status = False
for elem in props:
- pwr_state = None
- tools_status = None
for prop in elem.propSet:
if prop.name == "runtime.powerState":
pwr_state = prop.val
elif prop.name == "summary.guest.toolsStatus":
tools_status = prop.val
+ elif prop.name == "summary.guest.toolsRunningStatus":
+ tools_running_status = prop.val
# Raise an exception if the VM is not powered On.
if pwr_state not in ["poweredOn"]:
raise exception.Invalid(_("instance - %s not poweredOn. So can't "
"be rebooted.") % instance.name)
- # If vmware tools are installed in the VM, then do a guest reboot.
- # Otherwise do a hard reset.
- if tools_status not in ['toolsNotInstalled', 'toolsNotRunning']:
+ # If latest vmware tools are installed in the VM, and that the tools
+ # are running, then only do a guest reboot. Otherwise do a hard reset.
+ if (tools_status == "toolsOk" and
+ tools_running_status == "guestToolsRunning"):
LOG.debug(_("Rebooting guest OS of VM %s") % instance.name)
self._session._call_method(self._session._get_vim(), "RebootGuest",
vm_ref)
diff --git a/nova/virt/vmwareapi/vmware_images.py b/nova/virt/vmwareapi/vmware_images.py index d9c7f52e5..50c6baedf 100644 --- a/nova/virt/vmwareapi/vmware_images.py +++ b/nova/virt/vmwareapi/vmware_images.py @@ -18,20 +18,70 @@ Utility functions for Image transfer.
"""
-import glance.client
+from glance import client
from nova import exception
from nova import flags
from nova import log as logging
+from nova.virt.vmwareapi import io_util
from nova.virt.vmwareapi import read_write_util
-FLAGS = flags.FLAGS
+LOG = logging.getLogger("nova.virt.vmwareapi.vmware_images")
-QUEUE_BUFFER_SIZE = 5
-READ_CHUNKSIZE = 2 * 1024 * 1024
-WRITE_CHUNKSIZE = 2 * 1024 * 1024
+FLAGS = flags.FLAGS
-LOG = logging.getLogger("nova.virt.vmwareapi.vmware_images")
+QUEUE_BUFFER_SIZE = 10
+
+
+def start_transfer(read_file_handle, data_size, write_file_handle=None,
+ glance_client=None, image_id=None, image_meta={}):
+ """Start the data transfer from the reader to the writer.
+ Reader writes to the pipe and the writer reads from the pipe. This means
+ that the total transfer time boils down to the slower of the read/write
+ and not the addition of the two times."""
+ # The pipe that acts as an intermediate store of data for reader to write
+ # to and writer to grab from.
+ thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, data_size)
+ # The read thread. In case of glance it is the instance of the
+ # GlanceFileRead class. The glance client read returns an iterator
+ # and this class wraps that iterator to provide datachunks in calls
+ # to read.
+ read_thread = io_util.IOThread(read_file_handle, thread_safe_pipe)
+
+ # In case of Glance - VMWare transfer, we just need a handle to the
+ # HTTP Connection that is to send transfer data to the VMWare datastore.
+ if write_file_handle:
+ write_thread = io_util.IOThread(thread_safe_pipe, write_file_handle)
+ # In case of VMWare - Glance transfer, we relinquish VMWare HTTP file read
+ # handle to Glance Client instance, but to be sure of the transfer we need
+ # to be sure of the status of the image on glnace changing to active.
+ # The GlanceWriteThread handles the same for us.
+ elif glance_client and image_id:
+ write_thread = io_util.GlanceWriteThread(thread_safe_pipe,
+ glance_client, image_id, image_meta)
+ # Start the read and write threads.
+ read_event = read_thread.start()
+ write_event = write_thread.start()
+ try:
+ # Wait on the read and write events to signal their end
+ read_event.wait()
+ write_event.wait()
+ except Exception, exc:
+ # In case of any of the reads or writes raising an exception,
+ # stop the threads so that we un-necessarily don't keep the other one
+ # waiting.
+ read_thread.stop()
+ write_thread.stop()
+
+ # Log and raise the exception.
+ LOG.exception(exc)
+ raise exception.Error(exc)
+ finally:
+ # No matter what, try closing the read and write handles, if it so
+ # applies.
+ read_file_handle.close()
+ if write_file_handle:
+ write_file_handle.close()
def fetch_image(image, instance, **kwargs):
@@ -67,8 +117,9 @@ def upload_image(image, instance, **kwargs): def _get_glance_image(image, instance, **kwargs):
"""Download image from the glance image server."""
LOG.debug(_("Downloading image %s from glance image server") % image)
- glance_client = glance.client.Client(FLAGS.glance_host, FLAGS.glance_port)
- metadata, read_file_handle = glance_client.get_image(image)
+ glance_client = client.Client(FLAGS.glance_host, FLAGS.glance_port)
+ metadata, read_iter = glance_client.get_image(image)
+ read_file_handle = read_write_util.GlanceFileRead(read_iter)
file_size = int(metadata['size'])
write_file_handle = read_write_util.VMWareHTTPWriteFile(
kwargs.get("host"),
@@ -77,8 +128,8 @@ def _get_glance_image(image, instance, **kwargs): kwargs.get("cookies"),
kwargs.get("file_path"),
file_size)
- for chunk in read_file_handle:
- write_file_handle.write(chunk)
+ start_transfer(read_file_handle, file_size,
+ write_file_handle=write_file_handle)
LOG.debug(_("Downloaded image %s from glance image server") % image)
@@ -101,7 +152,9 @@ def _put_glance_image(image, instance, **kwargs): kwargs.get("datastore_name"),
kwargs.get("cookies"),
kwargs.get("file_path"))
- glance_client = glance.client.Client(FLAGS.glance_host, FLAGS.glance_port)
+ file_size = read_file_handle.get_size()
+ glance_client = client.Client(FLAGS.glance_host, FLAGS.glance_port)
+ # The properties and other fields that we need to set for the image.
image_metadata = {"is_public": True,
"disk_format": "vmdk",
"container_format": "bare",
@@ -111,8 +164,8 @@ def _put_glance_image(image, instance, **kwargs): "vmware_ostype": kwargs.get("os_type"),
"vmware_image_version":
kwargs.get("image_version")}}
- glance_client.update_image(image, image_meta=image_metadata,
- image_data=read_file_handle)
+ start_transfer(read_file_handle, file_size, glance_client=glance_client,
+ image_id=image, image_meta=image_metadata)
LOG.debug(_("Uploaded image %s to the Glance image server") % image)
@@ -135,7 +188,7 @@ def get_vmdk_size_and_properties(image, instance): LOG.debug(_("Getting image size for the image %s") % image)
if FLAGS.image_service == "nova.image.glance.GlanceImageService":
- glance_client = glance.client.Client(FLAGS.glance_host,
+ glance_client = client.Client(FLAGS.glance_host,
FLAGS.glance_port)
meta_data = glance_client.get_image_meta(image)
size, properties = meta_data["size"], meta_data["properties"]
diff --git a/nova/virt/vmwareapi_conn.py b/nova/virt/vmwareapi_conn.py index bb10c6043..414b8731d 100644 --- a/nova/virt/vmwareapi_conn.py +++ b/nova/virt/vmwareapi_conn.py @@ -348,7 +348,7 @@ class VMWareAPISession(object): action["error"] = error_info
LOG.warn(_("Task [%(task_name)s] %(task_ref)s "
"status: error %(error_info)s") % locals())
- done.send_exception(Exception(error_info))
+ done.send_exception(exception.Error(error_info))
db.instance_action_create(context.get_admin_context(), action)
except Exception, excep:
LOG.warn(_("In vmwareapi:_poll_task, Got this error %s") % excep)
|
