summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-06-28 17:51:55 +0000
committerGerrit Code Review <review@openstack.org>2012-06-28 17:51:55 +0000
commitbc22a271027ef61bcdb43faeacee2be400d874f8 (patch)
tree97d0f3ba95a8ad82bd0e6269fe6c14c27cce5b99
parent7f4ec4054e77d424c8142847e86eff0cd6d14c37 (diff)
parent169e601c8c387555609d67be11784fdd514d957f (diff)
downloadnova-bc22a271027ef61bcdb43faeacee2be400d874f8.tar.gz
nova-bc22a271027ef61bcdb43faeacee2be400d874f8.tar.xz
nova-bc22a271027ef61bcdb43faeacee2be400d874f8.zip
Merge "Refactor Dom0 Glance plugin."
-rw-r--r--nova/tests/test_xenapi.py4
-rw-r--r--nova/tests/xenapi/stubs.py7
-rw-r--r--nova/virt/xenapi/vm_utils.py63
-rwxr-xr-xplugins/xenserver/xenapi/etc/xapi.d/plugins/glance372
-rw-r--r--plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py336
5 files changed, 427 insertions, 355 deletions
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index 668172a92..c4b3262f5 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -561,7 +561,7 @@ class XenAPIVMTestCase(test.TestCase):
"""
vdi_recs_start = self._list_vdis()
- stubs.stubout_fetch_image_glance_disk(self.stubs, raise_failure=True)
+ stubs.stubout_fetch_disk_image(self.stubs, raise_failure=True)
self.assertRaises(xenapi_fake.Failure,
self._test_spawn, 1, 2, 3)
# No additional VDI should be found.
@@ -627,7 +627,7 @@ class XenAPIVMTestCase(test.TestCase):
self.check_vm_params_for_windows()
def test_spawn_glance(self):
- stubs.stubout_fetch_image_glance_disk(self.stubs)
+ stubs.stubout_fetch_disk_image(self.stubs)
self._test_spawn(IMAGE_MACHINE,
IMAGE_KERNEL,
IMAGE_RAMDISK)
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index 2b3722f62..f6689c794 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -118,10 +118,10 @@ def stubout_lookup_image(stubs):
stubs.Set(vm_utils, 'lookup_image', f)
-def stubout_fetch_image_glance_disk(stubs, raise_failure=False):
+def stubout_fetch_disk_image(stubs, raise_failure=False):
"""Simulates a failure in fetch image_glance_disk."""
- def _fake_fetch_image_glance_disk(context, session, instance, image,
+ def _fake_fetch_disk_image(context, session, instance, image,
image_type):
if raise_failure:
raise fake.Failure("Test Exception raised by "
@@ -136,8 +136,7 @@ def stubout_fetch_image_glance_disk(stubs, raise_failure=False):
vdi_type = vm_utils.ImageType.to_string(image_type)
return {vdi_type: dict(uuid=None, file=filename)}
- stubs.Set(vm_utils, '_fetch_image_glance_disk',
- _fake_fetch_image_glance_disk)
+ stubs.Set(vm_utils, '_fetch_disk_image', _fake_fetch_disk_image)
def stubout_create_vm(stubs):
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
index b0f617e48..84987d7f7 100644
--- a/nova/virt/xenapi/vm_utils.py
+++ b/nova/virt/xenapi/vm_utils.py
@@ -727,37 +727,36 @@ def fetch_image(context, session, instance, image_id, image_type):
A list of dictionaries that describe VDIs, otherwise
"""
if image_type == ImageType.DISK_VHD:
- return _fetch_image_glance_vhd(context, session, instance, image_id)
+ return _fetch_vhd_image(context, session, instance, image_id)
else:
- return _fetch_image_glance_disk(context, session, instance, image_id,
- image_type)
+ return _fetch_disk_image(context, session, instance, image_id,
+ image_type)
-def _retry_glance_download_vhd(context, session, image_id):
- # NOTE(sirp): The Glance plugin runs under Python 2.4
+def _fetch_using_dom0_plugin_with_retry(context, session, image_id,
+ plugin_name, params):
+ # NOTE(sirp): The XenAPI plugins run under Python 2.4
# which does not have the `uuid` module. To work around this,
# we generate the uuids here (under Python 2.6+) and
# pass them as arguments
- uuid_stack = [str(uuid.uuid4()) for i in xrange(3)]
+ extra_params = {
+ 'image_id': image_id,
+ 'uuid_stack': [str(uuid.uuid4()) for i in xrange(3)],
+ 'sr_path': get_sr_path(session),
+ 'auth_token': getattr(context, 'auth_token', None)}
+
+ extra_params.update(params)
+ kwargs = {'params': pickle.dumps(extra_params)}
max_attempts = FLAGS.glance_num_retries + 1
sleep_time = 0.5
for attempt_num in xrange(1, max_attempts + 1):
- glance_host, glance_port = glance.pick_glance_api_server()
- params = {'image_id': image_id,
- 'glance_host': glance_host,
- 'glance_port': glance_port,
- 'uuid_stack': uuid_stack,
- 'sr_path': get_sr_path(session),
- 'auth_token': getattr(context, 'auth_token', None)}
- kwargs = {'params': pickle.dumps(params)}
-
- LOG.info(_('download_vhd %(image_id)s '
- 'attempt %(attempt_num)d/%(max_attempts)d '
- 'from %(glance_host)s:%(glance_port)s') % locals())
+ LOG.info(_('download_vhd %(image_id)s, '
+ 'attempt %(attempt_num)d/%(max_attempts)d, '
+ 'params: %(params)s') % locals())
try:
- result = session.call_plugin('glance', 'download_vhd', kwargs)
+ result = session.call_plugin(plugin_name, 'download_vhd', kwargs)
return jsonutils.loads(result)
except session.XenAPI.Failure as exc:
_type, _method, error = exc.details[:3]
@@ -773,29 +772,31 @@ def _retry_glance_download_vhd(context, session, image_id):
raise exception.CouldNotFetchImage(image_id=image_id)
-def _fetch_image_glance_vhd(context, session, instance, image_id):
+def _fetch_vhd_image(context, session, instance, image_id):
"""Tell glance to download an image and put the VHDs into the SR
Returns: A list of dictionaries that describe VDIs
"""
LOG.debug(_("Asking xapi to fetch vhd image %(image_id)s"), locals(),
instance=instance)
- sr_ref = safe_find_sr(session)
- fetched_vdis = _retry_glance_download_vhd(context, session, image_id)
+ plugin_name = 'glance'
+ glance_host, glance_port = glance.pick_glance_api_server()
+ params = {'glance_host': glance_host, 'glance_port': glance_port}
- # 'download_vhd' will return a list of dictionaries describing VDIs.
- # The dictionary will contain 'vdi_type' and 'vdi_uuid' keys.
- # 'vdi_type' can be 'root' or 'swap' right now.
- for vdi in fetched_vdis:
- LOG.debug(_("xapi 'download_vhd' returned VDI of "
- "type '%(vdi_type)s' with UUID '%(vdi_uuid)s'"),
- vdi, instance=instance)
+ fetched_vdis = _fetch_using_dom0_plugin_with_retry(
+ context, session, image_id, plugin_name, params)
+ sr_ref = safe_find_sr(session)
scan_sr(session, sr_ref)
+ # TODO(sirp): the plugin should return the correct format rather than
+ # munging here
vdis = {}
for vdi in fetched_vdis:
+ LOG.debug(_("xapi 'download_vhd' returned VDI of "
+ "type '%(vdi_type)s' with UUID '%(vdi_uuid)s'"),
+ vdi, instance=instance)
vdis[vdi['vdi_type']] = dict(uuid=vdi['vdi_uuid'], file=None)
# Pull out the UUID of the root VDI
@@ -845,11 +846,11 @@ def _check_vdi_size(context, session, instance, vdi_uuid):
raise exception.ImageTooLarge()
-def _fetch_image_glance_disk(context, session, instance, image_id, image_type):
+def _fetch_disk_image(context, session, instance, image_id, image_type):
"""Fetch the image from Glance
NOTE:
- Unlike _fetch_image_glance_vhd, this method does not use the Glance
+ Unlike _fetch_vhd_image, this method does not use the Glance
plugin; instead, it streams the disks through domU to the VDI
directly.
diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance
index 76e9349d4..b826a8711 100755
--- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance
+++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance
@@ -1,5 +1,6 @@
#!/usr/bin/env python
+# Copyright (c) 2012 Openstack, LLC
# Copyright (c) 2010 Citrix Systems, Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
@@ -17,9 +18,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-#
-# XenAPI plugin for managing glance images
-#
+"""Handle the uploading and downloading of images via Glance."""
import cPickle as pickle
import httplib
@@ -27,24 +26,20 @@ try:
import json
except ImportError:
import simplejson as json
-# NOTE: XenServer 6 and below use python 2.4 so md5 is needed and not hashlib
import md5
import os
import os.path
-import shlex
import shutil
-import subprocess
-import tempfile
-import time
-import urllib2
+import urllib2
import XenAPIPlugin
+import utils
+
#FIXME(sirp): should this use pluginlib from 5.6?
from pluginlib_nova import *
configure_logging('glance')
-CHUNK_SIZE = 8192
KERNEL_DIR = '/boot/guest'
@@ -98,34 +93,33 @@ def _download_tarball_and_verify(request, staging_path):
# don't have a useful __repr__ or __str__
raise RetryableError('%s: %s' % (error.__class__.__name__, error))
- tar_cmd = "tar -zx --directory=%(staging_path)s" % locals()
- tar_proc = _make_subprocess(tar_cmd, stderr=True, stdin=True)
-
- checksum = md5.new()
- etag = response.info().getheader('etag', None)
- if etag is None:
- etag = response.info().getheader('x-image-meta-checksum', None)
-
url = request.get_full_url()
logging.info("Reading image data from %s" % url)
- length_read = 0
- while True:
- chunk = response.read(CHUNK_SIZE)
- if chunk == '':
- break
- length_read += len(chunk)
- checksum.update(chunk)
- tar_proc.stdin.write(chunk)
+ callback_data = {'bytes_read': 0}
+ checksum = md5.new()
- logging.info("Read %(length_read)s bytes from %(url)s" % locals())
+ def update_md5(chunk):
+ callback_data['bytes_read'] += len(chunk)
+ checksum.update(chunk)
try:
- _finish_subprocess(tar_proc, tar_cmd)
- except Exception, error:
- raise RetryableError(error)
+ try:
+ utils.extract_tarball(response, staging_path, callback=update_md5)
+ except Exception, error:
+ raise RetryableError(error)
+ finally:
+ bytes_read = callback_data['bytes_read']
+ logging.info("Read %d bytes from %s", bytes_read, url)
+ # Use ETag if available, otherwise X-Image-Meta-Checksum
+ etag = response.info().getheader('etag', None)
+ if etag is None:
+ etag = response.info().getheader('x-image-meta-checksum', None)
+
+ # Verify checksum using ETag
checksum = checksum.hexdigest()
+
if etag is None:
msg = "No ETag found for comparison to checksum %(checksum)s"
logging.info(msg % locals())
@@ -137,7 +131,8 @@ def _download_tarball_and_verify(request, staging_path):
logging.info(msg % locals())
-def _download_tarball(sr_path, image_id, glance_host, glance_port, auth_token):
+def _download_tarball(sr_path, staging_path, image_id, glance_host,
+ glance_port, auth_token):
"""Download the tarball image from Glance and extract it into the staging
area. Retry if there is any failure.
"""
@@ -151,202 +146,12 @@ def _download_tarball(sr_path, image_id, glance_host, glance_port, auth_token):
logging.info("Downloading %s" % url)
request = urllib2.Request(url, headers=headers)
- staging_path = _make_staging_area(sr_path)
try:
_download_tarball_and_verify(request, staging_path)
except Exception:
logging.exception('Failed to retrieve %(url)s' % locals())
- _cleanup_staging_area(staging_path)
raise
- return staging_path
-
-
-def _import_vhds(sr_path, staging_path, uuid_stack):
- """Import the VHDs found in the staging path.
-
- We cannot extract VHDs directly into the SR since they don't yet have
- UUIDs, aren't properly associated with each other, and would be subject to
- a race-condition of one-file being present and the other not being
- downloaded yet.
-
- To avoid these we problems, we use a staging area to fixup the VHDs before
- moving them into the SR. The steps involved are:
-
- 1. Extracting tarball into staging area (done prior to this call)
-
- 2. Renaming VHDs to use UUIDs ('snap.vhd' -> 'ffff-aaaa-...vhd')
-
- 3. Linking VHDs together if there's a snap.vhd
-
- 4. Pseudo-atomically moving the images into the SR. (It's not really
- atomic because it takes place as multiple os.rename operations;
- however, the chances of an SR.scan occuring between the rename()s
- invocations is so small that we can safely ignore it)
-
- Returns: A list of VDIs. Each list element is a dictionary containing
- information about the VHD. Dictionary keys are:
- 1. "vdi_type" - The type of VDI. Currently they can be "root" or
- "swap"
- 2. "vdi_uuid" - The UUID of the VDI
-
- Example return: [{"vdi_type": "root","vdi_uuid": "ffff-aaa..vhd"},
- {"vdi_type": "swap","vdi_uuid": "ffff-bbb..vhd"}]
- """
- def rename_with_uuid(orig_path):
- """Rename VHD using UUID so that it will be recognized by SR on a
- subsequent scan.
-
- Since Python2.4 doesn't have the `uuid` module, we pass a stack of
- pre-computed UUIDs from the compute worker.
- """
- orig_dirname = os.path.dirname(orig_path)
- uuid = uuid_stack.pop()
- new_path = os.path.join(orig_dirname, "%s.vhd" % uuid)
- os.rename(orig_path, new_path)
- return new_path, uuid
-
- def link_vhds(child_path, parent_path):
- """Use vhd-util to associate the snapshot VHD with its base_copy.
-
- This needs to be done before we move both VHDs into the SR to prevent
- the base_copy from being DOA (deleted-on-arrival).
- """
- modify_cmd = ("vhd-util modify -n %(child_path)s -p %(parent_path)s"
- % locals())
- modify_proc = _make_subprocess(modify_cmd, stderr=True)
- _finish_subprocess(modify_proc, modify_cmd)
-
- def move_into_sr(orig_path):
- """Move a file into the SR"""
- filename = os.path.basename(orig_path)
- new_path = os.path.join(sr_path, filename)
- os.rename(orig_path, new_path)
- return new_path
-
- def assert_vhd_not_hidden(path):
- """
- This is a sanity check on the image; if a snap.vhd isn't
- present, then the image.vhd better not be marked 'hidden' or it will
- be deleted when moved into the SR.
- """
- query_cmd = "vhd-util query -n %(path)s -f" % locals()
- query_proc = _make_subprocess(query_cmd, stdout=True, stderr=True)
- out, err = _finish_subprocess(query_proc, query_cmd)
-
- for line in out.splitlines():
- if line.startswith('hidden'):
- value = line.split(':')[1].strip()
- if value == "1":
- raise Exception(
- "VHD %(path)s is marked as hidden without child" %
- locals())
-
- def prepare_if_exists(staging_path, vhd_name, parent_path=None):
- """
- Check for existance of a particular VHD in the staging path and
- preparing it for moving into the SR.
-
- Returns: Tuple of (Path to move into the SR, VDI_UUID)
- None, if the vhd_name doesn't exist in the staging path
-
- If the VHD exists, we will do the following:
- 1. Rename it with a UUID.
- 2. If parent_path exists, we'll link up the VHDs.
- """
- orig_path = os.path.join(staging_path, vhd_name)
- if not os.path.exists(orig_path):
- return None
- new_path, vdi_uuid = rename_with_uuid(orig_path)
- if parent_path:
- # NOTE(sirp): this step is necessary so that an SR scan won't
- # delete the base_copy out from under us (since it would be
- # orphaned)
- link_vhds(new_path, parent_path)
- return (new_path, vdi_uuid)
-
- def validate_vdi_chain(vdi_path):
- """
- This check ensures that the parent pointers on the VHDs are valid
- before we move the VDI chain to the SR. This is *very* important
- because a bad parent pointer will corrupt the SR causing a cascade of
- failures.
- """
- def get_parent_path(path):
- query_cmd = "vhd-util query -n %(path)s -p" % locals()
- query_proc = _make_subprocess(query_cmd, stdout=True, stderr=True)
- out, err = _finish_subprocess(
- query_proc, query_cmd, ok_exit_codes=[0, 22])
- first_line = out.splitlines()[0].strip()
- if first_line.endswith(".vhd"):
- return first_line
- elif 'has no parent' in first_line:
- return None
- elif 'query failed' in first_line:
- raise Exception("VDI '%(path)s' not present which breaks"
- " the VDI chain, bailing out" % locals())
- else:
- raise Exception("Unexpected output '%(out)s' from vhd-util" %
- locals())
-
- cur_path = vdi_path
- while cur_path:
- cur_path = get_parent_path(cur_path)
-
- vdi_return_list = []
- paths_to_move = []
-
- image_parent = None
- base_info = prepare_if_exists(staging_path, 'base.vhd')
- if base_info:
- paths_to_move.append(base_info[0])
- image_parent = base_info[0]
-
- image_info = prepare_if_exists(staging_path, 'image.vhd', image_parent)
- if not image_info:
- raise Exception("Invalid image: image.vhd not present")
-
- paths_to_move.insert(0, image_info[0])
-
- snap_info = prepare_if_exists(staging_path, 'snap.vhd',
- image_info[0])
- if snap_info:
- validate_vdi_chain(snap_info[0])
- # NOTE(sirp): this is an insert rather than an append since the
- # 'snapshot' vhd needs to be copied into the SR before the base copy.
- # If it doesn't, then there is a possibliity that snapwatchd will
- # delete the base_copy since it is an unreferenced parent.
- paths_to_move.insert(0, snap_info[0])
- # We return this snap as the VDI instead of image.vhd
- vdi_return_list.append(dict(vdi_type="root", vdi_uuid=snap_info[1]))
- else:
- validate_vdi_chain(image_info[0])
- assert_vhd_not_hidden(image_info[0])
- # If there's no snap, we return the image.vhd UUID
- vdi_return_list.append(dict(vdi_type="root", vdi_uuid=image_info[1]))
-
- swap_info = prepare_if_exists(staging_path, 'swap.vhd')
- if swap_info:
- assert_vhd_not_hidden(swap_info[0])
- paths_to_move.append(swap_info[0])
- vdi_return_list.append(dict(vdi_type="swap", vdi_uuid=swap_info[1]))
-
- for path in paths_to_move:
- move_into_sr(path)
-
- return vdi_return_list
-
-
-def _prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids):
- """Hard-link VHDs into staging area with appropriate filename
- ('snap' or 'image.vhd')
- """
- for name, uuid in vdi_uuids.items():
- if uuid:
- source = os.path.join(sr_path, "%s.vhd" % uuid)
- link_name = os.path.join(staging_path, "%s.vhd" % name)
- os.link(source, link_name)
-
def _upload_tarball(staging_path, image_id, glance_host, glance_port,
auth_token, properties):
@@ -394,19 +199,20 @@ def _upload_tarball(staging_path, image_id, glance_host, glance_port,
conn.putheader(header, value)
conn.endheaders()
- tar_cmd = "tar -zc --directory=%(staging_path)s ." % locals()
- tar_proc = _make_subprocess(tar_cmd, stdout=True, stderr=True)
+ callback_data = {'bytes_written': 0}
+
+ def send_chunked_transfer_encoded(chunk):
+ chunk_len = len(chunk)
+ callback_data['bytes_written'] += chunk_len
+ conn.send("%x\r\n%s\r\n" % (chunk_len, chunk))
+
+ utils.create_tarball(
+ None, staging_path, callback=send_chunked_transfer_encoded)
- length = 0
- chunk = tar_proc.stdout.read(CHUNK_SIZE)
- while chunk:
- length += len(chunk)
- conn.send("%x\r\n%s\r\n" % (len(chunk), chunk))
- chunk = tar_proc.stdout.read(CHUNK_SIZE)
- conn.send("0\r\n\r\n")
- logging.info("Wrote %s bytes to %s" % (length, url))
+ conn.send("0\r\n\r\n") # Chunked-Transfer terminator
- _finish_subprocess(tar_proc, tar_cmd)
+ bytes_written = callback_data['bytes_written']
+ logging.info("Wrote %d bytes to %s" % (bytes_written, url))
resp = conn.getresponse()
if resp.status != httplib.OK:
@@ -419,81 +225,6 @@ def _upload_tarball(staging_path, image_id, glance_host, glance_port,
conn.close()
-def _make_staging_area(sr_path):
- """
- The staging area is a place where we can temporarily store and
- manipulate VHDs. The use of the staging area is different for upload and
- download:
-
- Download
- ========
-
- When we download the tarball, the VHDs contained within will have names
- like "snap.vhd" and "image.vhd". We need to assign UUIDs to them before
- moving them into the SR. However, since 'image.vhd' may be a base_copy, we
- need to link it to 'snap.vhd' (using vhd-util modify) before moving both
- into the SR (otherwise the SR.scan will cause 'image.vhd' to be deleted).
- The staging area gives us a place to perform these operations before they
- are moved to the SR, scanned, and then registered with XenServer.
-
- Upload
- ======
-
- On upload, we want to rename the VHDs to reflect what they are, 'snap.vhd'
- in the case of the snapshot VHD, and 'image.vhd' in the case of the
- base_copy. The staging area provides a directory in which we can create
- hard-links to rename the VHDs without affecting what's in the SR.
-
-
- NOTE
- ====
-
- The staging area is created as a subdirectory within the SR in order to
- guarantee that it resides within the same filesystem and therefore permit
- hard-linking and cheap file moves.
- """
- staging_path = tempfile.mkdtemp(dir=sr_path)
- return staging_path
-
-
-def _cleanup_staging_area(staging_path):
- """Remove staging area directory
-
- On upload, the staging area contains hard-links to the VHDs in the SR;
- it's safe to remove the staging-area because the SR will keep the link
- count > 0 (so the VHDs in the SR will not be deleted).
- """
- if os.path.exists(staging_path):
- shutil.rmtree(staging_path)
-
-
-def _make_subprocess(cmdline, stdout=False, stderr=False, stdin=False):
- """Make a subprocess according to the given command-line string
- """
- kwargs = {}
- kwargs['stdout'] = stdout and subprocess.PIPE or None
- kwargs['stderr'] = stderr and subprocess.PIPE or None
- kwargs['stdin'] = stdin and subprocess.PIPE or None
- args = shlex.split(cmdline)
- proc = subprocess.Popen(args, **kwargs)
- return proc
-
-
-def _finish_subprocess(proc, cmdline, ok_exit_codes=None):
- """Ensure that the process returned a zero exit code indicating success
- """
- if ok_exit_codes is None:
- ok_exit_codes = [0]
-
- out, err = proc.communicate()
- ret = proc.returncode
- if ret not in ok_exit_codes:
- raise Exception("'%(cmdline)s' returned non-zero exit code: "
- "retcode=%(ret)i, out='%(out)s', stderr='%(err)s'"
- % locals())
- return out, err
-
-
def create_kernel_ramdisk(session, args):
"""Creates a copy of the kernel/ramdisk image if it is present in the
cache. If the image is not present in the cache, it does nothing.
@@ -524,16 +255,21 @@ def download_vhd(session, args):
sr_path = params["sr_path"]
auth_token = params["auth_token"]
- staging_path = None
+ staging_path = utils.make_staging_area(sr_path)
try:
- staging_path = _download_tarball(sr_path, image_id, glance_host,
- glance_port, auth_token)
- # Right now, it's easier to return a single string via XenAPI,
- # so we'll json encode the list of VHDs.
- return json.dumps(_import_vhds(sr_path, staging_path, uuid_stack))
+ # Download tarball into staging area and extract it
+ _download_tarball(
+ sr_path, staging_path, image_id, glance_host, glance_port,
+ auth_token)
+
+ # Move the VHDs from the staging area into the storage repository
+ vdi_list = utils.import_vhds(sr_path, staging_path, uuid_stack)
finally:
- if staging_path is not None:
- _cleanup_staging_area(staging_path)
+ utils.cleanup_staging_area(staging_path)
+
+ # Right now, it's easier to return a single string via XenAPI,
+ # so we'll json encode the list of VHDs.
+ return json.dumps(vdi_list)
def upload_vhd(session, args):
@@ -548,13 +284,13 @@ def upload_vhd(session, args):
auth_token = params["auth_token"]
properties = params["properties"]
- staging_path = _make_staging_area(sr_path)
+ staging_path = utils.make_staging_area(sr_path)
try:
- _prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids)
+ utils.prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids)
_upload_tarball(staging_path, image_id, glance_host, glance_port,
auth_token, properties)
finally:
- _cleanup_staging_area(staging_path)
+ utils.cleanup_staging_area(staging_path)
return "" # Nothing useful to return on an upload
diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py
new file mode 100644
index 000000000..ff88501f3
--- /dev/null
+++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py
@@ -0,0 +1,336 @@
+# Copyright (c) 2012 Openstack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Various utilities used by XenServer plugins."""
+
+import logging
+import os
+import shlex
+import shutil
+import subprocess
+import tempfile
+
+
+CHUNK_SIZE = 8192
+
+
+def make_subprocess(cmdline, stdout=False, stderr=False, stdin=False):
+ """Make a subprocess according to the given command-line string
+ """
+ logging.info("Running cmd '%s'" % cmdline)
+ kwargs = {}
+ kwargs['stdout'] = stdout and subprocess.PIPE or None
+ kwargs['stderr'] = stderr and subprocess.PIPE or None
+ kwargs['stdin'] = stdin and subprocess.PIPE or None
+ args = shlex.split(cmdline)
+ logging.info("Running args '%s'" % args)
+ proc = subprocess.Popen(args, **kwargs)
+ return proc
+
+
+def finish_subprocess(proc, cmdline, ok_exit_codes=None):
+ """Ensure that the process returned a zero exit code indicating success
+ """
+ if ok_exit_codes is None:
+ ok_exit_codes = [0]
+
+ out, err = proc.communicate()
+ ret = proc.returncode
+ if ret not in ok_exit_codes:
+ raise Exception("'%(cmdline)s' returned non-zero exit code: "
+ "retcode=%(ret)i, out='%(out)s', stderr='%(err)s'"
+ % locals())
+ return out, err
+
+
+def make_staging_area(sr_path):
+ """
+ The staging area is a place where we can temporarily store and
+ manipulate VHDs. The use of the staging area is different for upload and
+ download:
+
+ Download
+ ========
+
+ When we download the tarball, the VHDs contained within will have names
+ like "snap.vhd" and "image.vhd". We need to assign UUIDs to them before
+ moving them into the SR. However, since 'image.vhd' may be a base_copy, we
+ need to link it to 'snap.vhd' (using vhd-util modify) before moving both
+ into the SR (otherwise the SR.scan will cause 'image.vhd' to be deleted).
+ The staging area gives us a place to perform these operations before they
+ are moved to the SR, scanned, and then registered with XenServer.
+
+ Upload
+ ======
+
+ On upload, we want to rename the VHDs to reflect what they are, 'snap.vhd'
+ in the case of the snapshot VHD, and 'image.vhd' in the case of the
+ base_copy. The staging area provides a directory in which we can create
+ hard-links to rename the VHDs without affecting what's in the SR.
+
+
+ NOTE
+ ====
+
+ The staging area is created as a subdirectory within the SR in order to
+ guarantee that it resides within the same filesystem and therefore permit
+ hard-linking and cheap file moves.
+ """
+ staging_path = tempfile.mkdtemp(dir=sr_path)
+ return staging_path
+
+
+def cleanup_staging_area(staging_path):
+ """Remove staging area directory
+
+ On upload, the staging area contains hard-links to the VHDs in the SR;
+ it's safe to remove the staging-area because the SR will keep the link
+ count > 0 (so the VHDs in the SR will not be deleted).
+ """
+ if os.path.exists(staging_path):
+ shutil.rmtree(staging_path)
+
+
+def import_vhds(sr_path, staging_path, uuid_stack):
+ """Import the VHDs found in the staging path.
+
+ We cannot extract VHDs directly into the SR since they don't yet have
+ UUIDs, aren't properly associated with each other, and would be subject to
+ a race-condition of one-file being present and the other not being
+ downloaded yet.
+
+ To avoid these we problems, we use a staging area to fixup the VHDs before
+ moving them into the SR. The steps involved are:
+
+ 1. Extracting tarball into staging area (done prior to this call)
+
+ 2. Renaming VHDs to use UUIDs ('snap.vhd' -> 'ffff-aaaa-...vhd')
+
+ 3. Linking VHDs together if there's a snap.vhd
+
+ 4. Pseudo-atomically moving the images into the SR. (It's not really
+ atomic because it takes place as multiple os.rename operations;
+ however, the chances of an SR.scan occuring between the rename()s
+ invocations is so small that we can safely ignore it)
+
+ Returns: A list of VDIs. Each list element is a dictionary containing
+ information about the VHD. Dictionary keys are:
+ 1. "vdi_type" - The type of VDI. Currently they can be "root" or
+ "swap"
+ 2. "vdi_uuid" - The UUID of the VDI
+
+ Example return: [{"vdi_type": "root","vdi_uuid": "ffff-aaa..vhd"},
+ {"vdi_type": "swap","vdi_uuid": "ffff-bbb..vhd"}]
+ """
+ def rename_with_uuid(orig_path):
+ """Rename VHD using UUID so that it will be recognized by SR on a
+ subsequent scan.
+
+ Since Python2.4 doesn't have the `uuid` module, we pass a stack of
+ pre-computed UUIDs from the compute worker.
+ """
+ orig_dirname = os.path.dirname(orig_path)
+ uuid = uuid_stack.pop()
+ new_path = os.path.join(orig_dirname, "%s.vhd" % uuid)
+ os.rename(orig_path, new_path)
+ return new_path, uuid
+
+ def link_vhds(child_path, parent_path):
+ """Use vhd-util to associate the snapshot VHD with its base_copy.
+
+ This needs to be done before we move both VHDs into the SR to prevent
+ the base_copy from being DOA (deleted-on-arrival).
+ """
+ modify_cmd = ("vhd-util modify -n %(child_path)s -p %(parent_path)s"
+ % locals())
+ modify_proc = make_subprocess(modify_cmd, stderr=True)
+ finish_subprocess(modify_proc, modify_cmd)
+
+ def move_into_sr(orig_path):
+ """Move a file into the SR"""
+ filename = os.path.basename(orig_path)
+ new_path = os.path.join(sr_path, filename)
+ os.rename(orig_path, new_path)
+ return new_path
+
+ def assert_vhd_not_hidden(path):
+ """
+ This is a sanity check on the image; if a snap.vhd isn't
+ present, then the image.vhd better not be marked 'hidden' or it will
+ be deleted when moved into the SR.
+ """
+ query_cmd = "vhd-util query -n %(path)s -f" % locals()
+ query_proc = make_subprocess(query_cmd, stdout=True, stderr=True)
+ out, err = finish_subprocess(query_proc, query_cmd)
+
+ for line in out.splitlines():
+ if line.startswith('hidden'):
+ value = line.split(':')[1].strip()
+ if value == "1":
+ raise Exception(
+ "VHD %(path)s is marked as hidden without child" %
+ locals())
+
+ def prepare_if_exists(staging_path, vhd_name, parent_path=None):
+ """
+ Check for existance of a particular VHD in the staging path and
+ preparing it for moving into the SR.
+
+ Returns: Tuple of (Path to move into the SR, VDI_UUID)
+ None, if the vhd_name doesn't exist in the staging path
+
+ If the VHD exists, we will do the following:
+ 1. Rename it with a UUID.
+ 2. If parent_path exists, we'll link up the VHDs.
+ """
+ orig_path = os.path.join(staging_path, vhd_name)
+ if not os.path.exists(orig_path):
+ return None
+ new_path, vdi_uuid = rename_with_uuid(orig_path)
+ if parent_path:
+ # NOTE(sirp): this step is necessary so that an SR scan won't
+ # delete the base_copy out from under us (since it would be
+ # orphaned)
+ link_vhds(new_path, parent_path)
+ return (new_path, vdi_uuid)
+
+ def validate_vdi_chain(vdi_path):
+ """
+ This check ensures that the parent pointers on the VHDs are valid
+ before we move the VDI chain to the SR. This is *very* important
+ because a bad parent pointer will corrupt the SR causing a cascade of
+ failures.
+ """
+ def get_parent_path(path):
+ query_cmd = "vhd-util query -n %(path)s -p" % locals()
+ query_proc = make_subprocess(query_cmd, stdout=True, stderr=True)
+ out, err = finish_subprocess(
+ query_proc, query_cmd, ok_exit_codes=[0, 22])
+ first_line = out.splitlines()[0].strip()
+ if first_line.endswith(".vhd"):
+ return first_line
+ elif 'has no parent' in first_line:
+ return None
+ elif 'query failed' in first_line:
+ raise Exception("VDI '%(path)s' not present which breaks"
+ " the VDI chain, bailing out" % locals())
+ else:
+ raise Exception("Unexpected output '%(out)s' from vhd-util" %
+ locals())
+
+ cur_path = vdi_path
+ while cur_path:
+ cur_path = get_parent_path(cur_path)
+
+ vdi_return_list = []
+ paths_to_move = []
+
+ image_parent = None
+ base_info = prepare_if_exists(staging_path, 'base.vhd')
+ if base_info:
+ paths_to_move.append(base_info[0])
+ image_parent = base_info[0]
+
+ image_info = prepare_if_exists(staging_path, 'image.vhd', image_parent)
+ if not image_info:
+ raise Exception("Invalid image: image.vhd not present")
+
+ paths_to_move.insert(0, image_info[0])
+
+ snap_info = prepare_if_exists(staging_path, 'snap.vhd',
+ image_info[0])
+ if snap_info:
+ validate_vdi_chain(snap_info[0])
+ # NOTE(sirp): this is an insert rather than an append since the
+ # 'snapshot' vhd needs to be copied into the SR before the base copy.
+ # If it doesn't, then there is a possibliity that snapwatchd will
+ # delete the base_copy since it is an unreferenced parent.
+ paths_to_move.insert(0, snap_info[0])
+ # We return this snap as the VDI instead of image.vhd
+ vdi_return_list.append(dict(vdi_type="root", vdi_uuid=snap_info[1]))
+ else:
+ validate_vdi_chain(image_info[0])
+ assert_vhd_not_hidden(image_info[0])
+ # If there's no snap, we return the image.vhd UUID
+ vdi_return_list.append(dict(vdi_type="root", vdi_uuid=image_info[1]))
+
+ swap_info = prepare_if_exists(staging_path, 'swap.vhd')
+ if swap_info:
+ assert_vhd_not_hidden(swap_info[0])
+ paths_to_move.append(swap_info[0])
+ vdi_return_list.append(dict(vdi_type="swap", vdi_uuid=swap_info[1]))
+
+ for path in paths_to_move:
+ move_into_sr(path)
+
+ return vdi_return_list
+
+
+def prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids):
+ """Hard-link VHDs into staging area with appropriate filename
+ ('snap' or 'image.vhd')
+ """
+ for name, uuid in vdi_uuids.items():
+ if uuid:
+ source = os.path.join(sr_path, "%s.vhd" % uuid)
+ link_name = os.path.join(staging_path, "%s.vhd" % name)
+ os.link(source, link_name)
+
+
+def create_tarball(fileobj, path, callback=None):
+ """Create a tarball from a given path.
+
+ :param fileobj: a file-like object holding the tarball byte-stream.
+ If None, then only the callback will be used.
+ :param path: path to create tarball from
+ :param callback: optional callback to call on each chunk written
+ """
+ tar_cmd = "tar -zc --directory=%(path)s ." % locals()
+ tar_proc = make_subprocess(tar_cmd, stdout=True, stderr=True)
+
+ while True:
+ chunk = tar_proc.stdout.read(CHUNK_SIZE)
+ if chunk == '':
+ break
+
+ if callback:
+ callback(chunk)
+
+ if fileobj:
+ fileobj.write(chunk)
+
+ finish_subprocess(tar_proc, tar_cmd)
+
+
+def extract_tarball(fileobj, path, callback=None):
+ """Extract a tarball to a given path.
+
+ :param fileobj: a file-like object holding the tarball byte-stream
+ :param path: path to extract tarball into
+ :param callback: optional callback to call on each chunk read
+ """
+ tar_cmd = "tar -zx --directory=%(path)s" % locals()
+ tar_proc = make_subprocess(tar_cmd, stderr=True, stdin=True)
+
+ while True:
+ chunk = fileobj.read(CHUNK_SIZE)
+ if chunk == '':
+ break
+
+ if callback:
+ callback(chunk)
+
+ tar_proc.stdin.write(chunk)
+
+ finish_subprocess(tar_proc, tar_cmd)