diff options
author | Jenkins <jenkins@review.openstack.org> | 2012-06-28 17:51:55 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2012-06-28 17:51:55 +0000 |
commit | bc22a271027ef61bcdb43faeacee2be400d874f8 (patch) | |
tree | 97d0f3ba95a8ad82bd0e6269fe6c14c27cce5b99 | |
parent | 7f4ec4054e77d424c8142847e86eff0cd6d14c37 (diff) | |
parent | 169e601c8c387555609d67be11784fdd514d957f (diff) | |
download | nova-bc22a271027ef61bcdb43faeacee2be400d874f8.tar.gz nova-bc22a271027ef61bcdb43faeacee2be400d874f8.tar.xz nova-bc22a271027ef61bcdb43faeacee2be400d874f8.zip |
Merge "Refactor Dom0 Glance plugin."
-rw-r--r-- | nova/tests/test_xenapi.py | 4 | ||||
-rw-r--r-- | nova/tests/xenapi/stubs.py | 7 | ||||
-rw-r--r-- | nova/virt/xenapi/vm_utils.py | 63 | ||||
-rwxr-xr-x | plugins/xenserver/xenapi/etc/xapi.d/plugins/glance | 372 | ||||
-rw-r--r-- | plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py | 336 |
5 files changed, 427 insertions, 355 deletions
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index 668172a92..c4b3262f5 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -561,7 +561,7 @@ class XenAPIVMTestCase(test.TestCase): """ vdi_recs_start = self._list_vdis() - stubs.stubout_fetch_image_glance_disk(self.stubs, raise_failure=True) + stubs.stubout_fetch_disk_image(self.stubs, raise_failure=True) self.assertRaises(xenapi_fake.Failure, self._test_spawn, 1, 2, 3) # No additional VDI should be found. @@ -627,7 +627,7 @@ class XenAPIVMTestCase(test.TestCase): self.check_vm_params_for_windows() def test_spawn_glance(self): - stubs.stubout_fetch_image_glance_disk(self.stubs) + stubs.stubout_fetch_disk_image(self.stubs) self._test_spawn(IMAGE_MACHINE, IMAGE_KERNEL, IMAGE_RAMDISK) diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 2b3722f62..f6689c794 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -118,10 +118,10 @@ def stubout_lookup_image(stubs): stubs.Set(vm_utils, 'lookup_image', f) -def stubout_fetch_image_glance_disk(stubs, raise_failure=False): +def stubout_fetch_disk_image(stubs, raise_failure=False): """Simulates a failure in fetch image_glance_disk.""" - def _fake_fetch_image_glance_disk(context, session, instance, image, + def _fake_fetch_disk_image(context, session, instance, image, image_type): if raise_failure: raise fake.Failure("Test Exception raised by " @@ -136,8 +136,7 @@ def stubout_fetch_image_glance_disk(stubs, raise_failure=False): vdi_type = vm_utils.ImageType.to_string(image_type) return {vdi_type: dict(uuid=None, file=filename)} - stubs.Set(vm_utils, '_fetch_image_glance_disk', - _fake_fetch_image_glance_disk) + stubs.Set(vm_utils, '_fetch_disk_image', _fake_fetch_disk_image) def stubout_create_vm(stubs): diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index b0f617e48..84987d7f7 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -727,37 +727,36 @@ def fetch_image(context, session, instance, image_id, image_type): A list of dictionaries that describe VDIs, otherwise """ if image_type == ImageType.DISK_VHD: - return _fetch_image_glance_vhd(context, session, instance, image_id) + return _fetch_vhd_image(context, session, instance, image_id) else: - return _fetch_image_glance_disk(context, session, instance, image_id, - image_type) + return _fetch_disk_image(context, session, instance, image_id, + image_type) -def _retry_glance_download_vhd(context, session, image_id): - # NOTE(sirp): The Glance plugin runs under Python 2.4 +def _fetch_using_dom0_plugin_with_retry(context, session, image_id, + plugin_name, params): + # NOTE(sirp): The XenAPI plugins run under Python 2.4 # which does not have the `uuid` module. To work around this, # we generate the uuids here (under Python 2.6+) and # pass them as arguments - uuid_stack = [str(uuid.uuid4()) for i in xrange(3)] + extra_params = { + 'image_id': image_id, + 'uuid_stack': [str(uuid.uuid4()) for i in xrange(3)], + 'sr_path': get_sr_path(session), + 'auth_token': getattr(context, 'auth_token', None)} + + extra_params.update(params) + kwargs = {'params': pickle.dumps(extra_params)} max_attempts = FLAGS.glance_num_retries + 1 sleep_time = 0.5 for attempt_num in xrange(1, max_attempts + 1): - glance_host, glance_port = glance.pick_glance_api_server() - params = {'image_id': image_id, - 'glance_host': glance_host, - 'glance_port': glance_port, - 'uuid_stack': uuid_stack, - 'sr_path': get_sr_path(session), - 'auth_token': getattr(context, 'auth_token', None)} - kwargs = {'params': pickle.dumps(params)} - - LOG.info(_('download_vhd %(image_id)s ' - 'attempt %(attempt_num)d/%(max_attempts)d ' - 'from %(glance_host)s:%(glance_port)s') % locals()) + LOG.info(_('download_vhd %(image_id)s, ' + 'attempt %(attempt_num)d/%(max_attempts)d, ' + 'params: %(params)s') % locals()) try: - result = session.call_plugin('glance', 'download_vhd', kwargs) + result = session.call_plugin(plugin_name, 'download_vhd', kwargs) return jsonutils.loads(result) except session.XenAPI.Failure as exc: _type, _method, error = exc.details[:3] @@ -773,29 +772,31 @@ def _retry_glance_download_vhd(context, session, image_id): raise exception.CouldNotFetchImage(image_id=image_id) -def _fetch_image_glance_vhd(context, session, instance, image_id): +def _fetch_vhd_image(context, session, instance, image_id): """Tell glance to download an image and put the VHDs into the SR Returns: A list of dictionaries that describe VDIs """ LOG.debug(_("Asking xapi to fetch vhd image %(image_id)s"), locals(), instance=instance) - sr_ref = safe_find_sr(session) - fetched_vdis = _retry_glance_download_vhd(context, session, image_id) + plugin_name = 'glance' + glance_host, glance_port = glance.pick_glance_api_server() + params = {'glance_host': glance_host, 'glance_port': glance_port} - # 'download_vhd' will return a list of dictionaries describing VDIs. - # The dictionary will contain 'vdi_type' and 'vdi_uuid' keys. - # 'vdi_type' can be 'root' or 'swap' right now. - for vdi in fetched_vdis: - LOG.debug(_("xapi 'download_vhd' returned VDI of " - "type '%(vdi_type)s' with UUID '%(vdi_uuid)s'"), - vdi, instance=instance) + fetched_vdis = _fetch_using_dom0_plugin_with_retry( + context, session, image_id, plugin_name, params) + sr_ref = safe_find_sr(session) scan_sr(session, sr_ref) + # TODO(sirp): the plugin should return the correct format rather than + # munging here vdis = {} for vdi in fetched_vdis: + LOG.debug(_("xapi 'download_vhd' returned VDI of " + "type '%(vdi_type)s' with UUID '%(vdi_uuid)s'"), + vdi, instance=instance) vdis[vdi['vdi_type']] = dict(uuid=vdi['vdi_uuid'], file=None) # Pull out the UUID of the root VDI @@ -845,11 +846,11 @@ def _check_vdi_size(context, session, instance, vdi_uuid): raise exception.ImageTooLarge() -def _fetch_image_glance_disk(context, session, instance, image_id, image_type): +def _fetch_disk_image(context, session, instance, image_id, image_type): """Fetch the image from Glance NOTE: - Unlike _fetch_image_glance_vhd, this method does not use the Glance + Unlike _fetch_vhd_image, this method does not use the Glance plugin; instead, it streams the disks through domU to the VDI directly. diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance index 76e9349d4..b826a8711 100755 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance @@ -1,5 +1,6 @@ #!/usr/bin/env python +# Copyright (c) 2012 Openstack, LLC # Copyright (c) 2010 Citrix Systems, Inc. # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. @@ -17,9 +18,7 @@ # License for the specific language governing permissions and limitations # under the License. -# -# XenAPI plugin for managing glance images -# +"""Handle the uploading and downloading of images via Glance.""" import cPickle as pickle import httplib @@ -27,24 +26,20 @@ try: import json except ImportError: import simplejson as json -# NOTE: XenServer 6 and below use python 2.4 so md5 is needed and not hashlib import md5 import os import os.path -import shlex import shutil -import subprocess -import tempfile -import time -import urllib2 +import urllib2 import XenAPIPlugin +import utils + #FIXME(sirp): should this use pluginlib from 5.6? from pluginlib_nova import * configure_logging('glance') -CHUNK_SIZE = 8192 KERNEL_DIR = '/boot/guest' @@ -98,34 +93,33 @@ def _download_tarball_and_verify(request, staging_path): # don't have a useful __repr__ or __str__ raise RetryableError('%s: %s' % (error.__class__.__name__, error)) - tar_cmd = "tar -zx --directory=%(staging_path)s" % locals() - tar_proc = _make_subprocess(tar_cmd, stderr=True, stdin=True) - - checksum = md5.new() - etag = response.info().getheader('etag', None) - if etag is None: - etag = response.info().getheader('x-image-meta-checksum', None) - url = request.get_full_url() logging.info("Reading image data from %s" % url) - length_read = 0 - while True: - chunk = response.read(CHUNK_SIZE) - if chunk == '': - break - length_read += len(chunk) - checksum.update(chunk) - tar_proc.stdin.write(chunk) + callback_data = {'bytes_read': 0} + checksum = md5.new() - logging.info("Read %(length_read)s bytes from %(url)s" % locals()) + def update_md5(chunk): + callback_data['bytes_read'] += len(chunk) + checksum.update(chunk) try: - _finish_subprocess(tar_proc, tar_cmd) - except Exception, error: - raise RetryableError(error) + try: + utils.extract_tarball(response, staging_path, callback=update_md5) + except Exception, error: + raise RetryableError(error) + finally: + bytes_read = callback_data['bytes_read'] + logging.info("Read %d bytes from %s", bytes_read, url) + # Use ETag if available, otherwise X-Image-Meta-Checksum + etag = response.info().getheader('etag', None) + if etag is None: + etag = response.info().getheader('x-image-meta-checksum', None) + + # Verify checksum using ETag checksum = checksum.hexdigest() + if etag is None: msg = "No ETag found for comparison to checksum %(checksum)s" logging.info(msg % locals()) @@ -137,7 +131,8 @@ def _download_tarball_and_verify(request, staging_path): logging.info(msg % locals()) -def _download_tarball(sr_path, image_id, glance_host, glance_port, auth_token): +def _download_tarball(sr_path, staging_path, image_id, glance_host, + glance_port, auth_token): """Download the tarball image from Glance and extract it into the staging area. Retry if there is any failure. """ @@ -151,202 +146,12 @@ def _download_tarball(sr_path, image_id, glance_host, glance_port, auth_token): logging.info("Downloading %s" % url) request = urllib2.Request(url, headers=headers) - staging_path = _make_staging_area(sr_path) try: _download_tarball_and_verify(request, staging_path) except Exception: logging.exception('Failed to retrieve %(url)s' % locals()) - _cleanup_staging_area(staging_path) raise - return staging_path - - -def _import_vhds(sr_path, staging_path, uuid_stack): - """Import the VHDs found in the staging path. - - We cannot extract VHDs directly into the SR since they don't yet have - UUIDs, aren't properly associated with each other, and would be subject to - a race-condition of one-file being present and the other not being - downloaded yet. - - To avoid these we problems, we use a staging area to fixup the VHDs before - moving them into the SR. The steps involved are: - - 1. Extracting tarball into staging area (done prior to this call) - - 2. Renaming VHDs to use UUIDs ('snap.vhd' -> 'ffff-aaaa-...vhd') - - 3. Linking VHDs together if there's a snap.vhd - - 4. Pseudo-atomically moving the images into the SR. (It's not really - atomic because it takes place as multiple os.rename operations; - however, the chances of an SR.scan occuring between the rename()s - invocations is so small that we can safely ignore it) - - Returns: A list of VDIs. Each list element is a dictionary containing - information about the VHD. Dictionary keys are: - 1. "vdi_type" - The type of VDI. Currently they can be "root" or - "swap" - 2. "vdi_uuid" - The UUID of the VDI - - Example return: [{"vdi_type": "root","vdi_uuid": "ffff-aaa..vhd"}, - {"vdi_type": "swap","vdi_uuid": "ffff-bbb..vhd"}] - """ - def rename_with_uuid(orig_path): - """Rename VHD using UUID so that it will be recognized by SR on a - subsequent scan. - - Since Python2.4 doesn't have the `uuid` module, we pass a stack of - pre-computed UUIDs from the compute worker. - """ - orig_dirname = os.path.dirname(orig_path) - uuid = uuid_stack.pop() - new_path = os.path.join(orig_dirname, "%s.vhd" % uuid) - os.rename(orig_path, new_path) - return new_path, uuid - - def link_vhds(child_path, parent_path): - """Use vhd-util to associate the snapshot VHD with its base_copy. - - This needs to be done before we move both VHDs into the SR to prevent - the base_copy from being DOA (deleted-on-arrival). - """ - modify_cmd = ("vhd-util modify -n %(child_path)s -p %(parent_path)s" - % locals()) - modify_proc = _make_subprocess(modify_cmd, stderr=True) - _finish_subprocess(modify_proc, modify_cmd) - - def move_into_sr(orig_path): - """Move a file into the SR""" - filename = os.path.basename(orig_path) - new_path = os.path.join(sr_path, filename) - os.rename(orig_path, new_path) - return new_path - - def assert_vhd_not_hidden(path): - """ - This is a sanity check on the image; if a snap.vhd isn't - present, then the image.vhd better not be marked 'hidden' or it will - be deleted when moved into the SR. - """ - query_cmd = "vhd-util query -n %(path)s -f" % locals() - query_proc = _make_subprocess(query_cmd, stdout=True, stderr=True) - out, err = _finish_subprocess(query_proc, query_cmd) - - for line in out.splitlines(): - if line.startswith('hidden'): - value = line.split(':')[1].strip() - if value == "1": - raise Exception( - "VHD %(path)s is marked as hidden without child" % - locals()) - - def prepare_if_exists(staging_path, vhd_name, parent_path=None): - """ - Check for existance of a particular VHD in the staging path and - preparing it for moving into the SR. - - Returns: Tuple of (Path to move into the SR, VDI_UUID) - None, if the vhd_name doesn't exist in the staging path - - If the VHD exists, we will do the following: - 1. Rename it with a UUID. - 2. If parent_path exists, we'll link up the VHDs. - """ - orig_path = os.path.join(staging_path, vhd_name) - if not os.path.exists(orig_path): - return None - new_path, vdi_uuid = rename_with_uuid(orig_path) - if parent_path: - # NOTE(sirp): this step is necessary so that an SR scan won't - # delete the base_copy out from under us (since it would be - # orphaned) - link_vhds(new_path, parent_path) - return (new_path, vdi_uuid) - - def validate_vdi_chain(vdi_path): - """ - This check ensures that the parent pointers on the VHDs are valid - before we move the VDI chain to the SR. This is *very* important - because a bad parent pointer will corrupt the SR causing a cascade of - failures. - """ - def get_parent_path(path): - query_cmd = "vhd-util query -n %(path)s -p" % locals() - query_proc = _make_subprocess(query_cmd, stdout=True, stderr=True) - out, err = _finish_subprocess( - query_proc, query_cmd, ok_exit_codes=[0, 22]) - first_line = out.splitlines()[0].strip() - if first_line.endswith(".vhd"): - return first_line - elif 'has no parent' in first_line: - return None - elif 'query failed' in first_line: - raise Exception("VDI '%(path)s' not present which breaks" - " the VDI chain, bailing out" % locals()) - else: - raise Exception("Unexpected output '%(out)s' from vhd-util" % - locals()) - - cur_path = vdi_path - while cur_path: - cur_path = get_parent_path(cur_path) - - vdi_return_list = [] - paths_to_move = [] - - image_parent = None - base_info = prepare_if_exists(staging_path, 'base.vhd') - if base_info: - paths_to_move.append(base_info[0]) - image_parent = base_info[0] - - image_info = prepare_if_exists(staging_path, 'image.vhd', image_parent) - if not image_info: - raise Exception("Invalid image: image.vhd not present") - - paths_to_move.insert(0, image_info[0]) - - snap_info = prepare_if_exists(staging_path, 'snap.vhd', - image_info[0]) - if snap_info: - validate_vdi_chain(snap_info[0]) - # NOTE(sirp): this is an insert rather than an append since the - # 'snapshot' vhd needs to be copied into the SR before the base copy. - # If it doesn't, then there is a possibliity that snapwatchd will - # delete the base_copy since it is an unreferenced parent. - paths_to_move.insert(0, snap_info[0]) - # We return this snap as the VDI instead of image.vhd - vdi_return_list.append(dict(vdi_type="root", vdi_uuid=snap_info[1])) - else: - validate_vdi_chain(image_info[0]) - assert_vhd_not_hidden(image_info[0]) - # If there's no snap, we return the image.vhd UUID - vdi_return_list.append(dict(vdi_type="root", vdi_uuid=image_info[1])) - - swap_info = prepare_if_exists(staging_path, 'swap.vhd') - if swap_info: - assert_vhd_not_hidden(swap_info[0]) - paths_to_move.append(swap_info[0]) - vdi_return_list.append(dict(vdi_type="swap", vdi_uuid=swap_info[1])) - - for path in paths_to_move: - move_into_sr(path) - - return vdi_return_list - - -def _prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids): - """Hard-link VHDs into staging area with appropriate filename - ('snap' or 'image.vhd') - """ - for name, uuid in vdi_uuids.items(): - if uuid: - source = os.path.join(sr_path, "%s.vhd" % uuid) - link_name = os.path.join(staging_path, "%s.vhd" % name) - os.link(source, link_name) - def _upload_tarball(staging_path, image_id, glance_host, glance_port, auth_token, properties): @@ -394,19 +199,20 @@ def _upload_tarball(staging_path, image_id, glance_host, glance_port, conn.putheader(header, value) conn.endheaders() - tar_cmd = "tar -zc --directory=%(staging_path)s ." % locals() - tar_proc = _make_subprocess(tar_cmd, stdout=True, stderr=True) + callback_data = {'bytes_written': 0} + + def send_chunked_transfer_encoded(chunk): + chunk_len = len(chunk) + callback_data['bytes_written'] += chunk_len + conn.send("%x\r\n%s\r\n" % (chunk_len, chunk)) + + utils.create_tarball( + None, staging_path, callback=send_chunked_transfer_encoded) - length = 0 - chunk = tar_proc.stdout.read(CHUNK_SIZE) - while chunk: - length += len(chunk) - conn.send("%x\r\n%s\r\n" % (len(chunk), chunk)) - chunk = tar_proc.stdout.read(CHUNK_SIZE) - conn.send("0\r\n\r\n") - logging.info("Wrote %s bytes to %s" % (length, url)) + conn.send("0\r\n\r\n") # Chunked-Transfer terminator - _finish_subprocess(tar_proc, tar_cmd) + bytes_written = callback_data['bytes_written'] + logging.info("Wrote %d bytes to %s" % (bytes_written, url)) resp = conn.getresponse() if resp.status != httplib.OK: @@ -419,81 +225,6 @@ def _upload_tarball(staging_path, image_id, glance_host, glance_port, conn.close() -def _make_staging_area(sr_path): - """ - The staging area is a place where we can temporarily store and - manipulate VHDs. The use of the staging area is different for upload and - download: - - Download - ======== - - When we download the tarball, the VHDs contained within will have names - like "snap.vhd" and "image.vhd". We need to assign UUIDs to them before - moving them into the SR. However, since 'image.vhd' may be a base_copy, we - need to link it to 'snap.vhd' (using vhd-util modify) before moving both - into the SR (otherwise the SR.scan will cause 'image.vhd' to be deleted). - The staging area gives us a place to perform these operations before they - are moved to the SR, scanned, and then registered with XenServer. - - Upload - ====== - - On upload, we want to rename the VHDs to reflect what they are, 'snap.vhd' - in the case of the snapshot VHD, and 'image.vhd' in the case of the - base_copy. The staging area provides a directory in which we can create - hard-links to rename the VHDs without affecting what's in the SR. - - - NOTE - ==== - - The staging area is created as a subdirectory within the SR in order to - guarantee that it resides within the same filesystem and therefore permit - hard-linking and cheap file moves. - """ - staging_path = tempfile.mkdtemp(dir=sr_path) - return staging_path - - -def _cleanup_staging_area(staging_path): - """Remove staging area directory - - On upload, the staging area contains hard-links to the VHDs in the SR; - it's safe to remove the staging-area because the SR will keep the link - count > 0 (so the VHDs in the SR will not be deleted). - """ - if os.path.exists(staging_path): - shutil.rmtree(staging_path) - - -def _make_subprocess(cmdline, stdout=False, stderr=False, stdin=False): - """Make a subprocess according to the given command-line string - """ - kwargs = {} - kwargs['stdout'] = stdout and subprocess.PIPE or None - kwargs['stderr'] = stderr and subprocess.PIPE or None - kwargs['stdin'] = stdin and subprocess.PIPE or None - args = shlex.split(cmdline) - proc = subprocess.Popen(args, **kwargs) - return proc - - -def _finish_subprocess(proc, cmdline, ok_exit_codes=None): - """Ensure that the process returned a zero exit code indicating success - """ - if ok_exit_codes is None: - ok_exit_codes = [0] - - out, err = proc.communicate() - ret = proc.returncode - if ret not in ok_exit_codes: - raise Exception("'%(cmdline)s' returned non-zero exit code: " - "retcode=%(ret)i, out='%(out)s', stderr='%(err)s'" - % locals()) - return out, err - - def create_kernel_ramdisk(session, args): """Creates a copy of the kernel/ramdisk image if it is present in the cache. If the image is not present in the cache, it does nothing. @@ -524,16 +255,21 @@ def download_vhd(session, args): sr_path = params["sr_path"] auth_token = params["auth_token"] - staging_path = None + staging_path = utils.make_staging_area(sr_path) try: - staging_path = _download_tarball(sr_path, image_id, glance_host, - glance_port, auth_token) - # Right now, it's easier to return a single string via XenAPI, - # so we'll json encode the list of VHDs. - return json.dumps(_import_vhds(sr_path, staging_path, uuid_stack)) + # Download tarball into staging area and extract it + _download_tarball( + sr_path, staging_path, image_id, glance_host, glance_port, + auth_token) + + # Move the VHDs from the staging area into the storage repository + vdi_list = utils.import_vhds(sr_path, staging_path, uuid_stack) finally: - if staging_path is not None: - _cleanup_staging_area(staging_path) + utils.cleanup_staging_area(staging_path) + + # Right now, it's easier to return a single string via XenAPI, + # so we'll json encode the list of VHDs. + return json.dumps(vdi_list) def upload_vhd(session, args): @@ -548,13 +284,13 @@ def upload_vhd(session, args): auth_token = params["auth_token"] properties = params["properties"] - staging_path = _make_staging_area(sr_path) + staging_path = utils.make_staging_area(sr_path) try: - _prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids) + utils.prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids) _upload_tarball(staging_path, image_id, glance_host, glance_port, auth_token, properties) finally: - _cleanup_staging_area(staging_path) + utils.cleanup_staging_area(staging_path) return "" # Nothing useful to return on an upload diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py new file mode 100644 index 000000000..ff88501f3 --- /dev/null +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py @@ -0,0 +1,336 @@ +# Copyright (c) 2012 Openstack, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Various utilities used by XenServer plugins.""" + +import logging +import os +import shlex +import shutil +import subprocess +import tempfile + + +CHUNK_SIZE = 8192 + + +def make_subprocess(cmdline, stdout=False, stderr=False, stdin=False): + """Make a subprocess according to the given command-line string + """ + logging.info("Running cmd '%s'" % cmdline) + kwargs = {} + kwargs['stdout'] = stdout and subprocess.PIPE or None + kwargs['stderr'] = stderr and subprocess.PIPE or None + kwargs['stdin'] = stdin and subprocess.PIPE or None + args = shlex.split(cmdline) + logging.info("Running args '%s'" % args) + proc = subprocess.Popen(args, **kwargs) + return proc + + +def finish_subprocess(proc, cmdline, ok_exit_codes=None): + """Ensure that the process returned a zero exit code indicating success + """ + if ok_exit_codes is None: + ok_exit_codes = [0] + + out, err = proc.communicate() + ret = proc.returncode + if ret not in ok_exit_codes: + raise Exception("'%(cmdline)s' returned non-zero exit code: " + "retcode=%(ret)i, out='%(out)s', stderr='%(err)s'" + % locals()) + return out, err + + +def make_staging_area(sr_path): + """ + The staging area is a place where we can temporarily store and + manipulate VHDs. The use of the staging area is different for upload and + download: + + Download + ======== + + When we download the tarball, the VHDs contained within will have names + like "snap.vhd" and "image.vhd". We need to assign UUIDs to them before + moving them into the SR. However, since 'image.vhd' may be a base_copy, we + need to link it to 'snap.vhd' (using vhd-util modify) before moving both + into the SR (otherwise the SR.scan will cause 'image.vhd' to be deleted). + The staging area gives us a place to perform these operations before they + are moved to the SR, scanned, and then registered with XenServer. + + Upload + ====== + + On upload, we want to rename the VHDs to reflect what they are, 'snap.vhd' + in the case of the snapshot VHD, and 'image.vhd' in the case of the + base_copy. The staging area provides a directory in which we can create + hard-links to rename the VHDs without affecting what's in the SR. + + + NOTE + ==== + + The staging area is created as a subdirectory within the SR in order to + guarantee that it resides within the same filesystem and therefore permit + hard-linking and cheap file moves. + """ + staging_path = tempfile.mkdtemp(dir=sr_path) + return staging_path + + +def cleanup_staging_area(staging_path): + """Remove staging area directory + + On upload, the staging area contains hard-links to the VHDs in the SR; + it's safe to remove the staging-area because the SR will keep the link + count > 0 (so the VHDs in the SR will not be deleted). + """ + if os.path.exists(staging_path): + shutil.rmtree(staging_path) + + +def import_vhds(sr_path, staging_path, uuid_stack): + """Import the VHDs found in the staging path. + + We cannot extract VHDs directly into the SR since they don't yet have + UUIDs, aren't properly associated with each other, and would be subject to + a race-condition of one-file being present and the other not being + downloaded yet. + + To avoid these we problems, we use a staging area to fixup the VHDs before + moving them into the SR. The steps involved are: + + 1. Extracting tarball into staging area (done prior to this call) + + 2. Renaming VHDs to use UUIDs ('snap.vhd' -> 'ffff-aaaa-...vhd') + + 3. Linking VHDs together if there's a snap.vhd + + 4. Pseudo-atomically moving the images into the SR. (It's not really + atomic because it takes place as multiple os.rename operations; + however, the chances of an SR.scan occuring between the rename()s + invocations is so small that we can safely ignore it) + + Returns: A list of VDIs. Each list element is a dictionary containing + information about the VHD. Dictionary keys are: + 1. "vdi_type" - The type of VDI. Currently they can be "root" or + "swap" + 2. "vdi_uuid" - The UUID of the VDI + + Example return: [{"vdi_type": "root","vdi_uuid": "ffff-aaa..vhd"}, + {"vdi_type": "swap","vdi_uuid": "ffff-bbb..vhd"}] + """ + def rename_with_uuid(orig_path): + """Rename VHD using UUID so that it will be recognized by SR on a + subsequent scan. + + Since Python2.4 doesn't have the `uuid` module, we pass a stack of + pre-computed UUIDs from the compute worker. + """ + orig_dirname = os.path.dirname(orig_path) + uuid = uuid_stack.pop() + new_path = os.path.join(orig_dirname, "%s.vhd" % uuid) + os.rename(orig_path, new_path) + return new_path, uuid + + def link_vhds(child_path, parent_path): + """Use vhd-util to associate the snapshot VHD with its base_copy. + + This needs to be done before we move both VHDs into the SR to prevent + the base_copy from being DOA (deleted-on-arrival). + """ + modify_cmd = ("vhd-util modify -n %(child_path)s -p %(parent_path)s" + % locals()) + modify_proc = make_subprocess(modify_cmd, stderr=True) + finish_subprocess(modify_proc, modify_cmd) + + def move_into_sr(orig_path): + """Move a file into the SR""" + filename = os.path.basename(orig_path) + new_path = os.path.join(sr_path, filename) + os.rename(orig_path, new_path) + return new_path + + def assert_vhd_not_hidden(path): + """ + This is a sanity check on the image; if a snap.vhd isn't + present, then the image.vhd better not be marked 'hidden' or it will + be deleted when moved into the SR. + """ + query_cmd = "vhd-util query -n %(path)s -f" % locals() + query_proc = make_subprocess(query_cmd, stdout=True, stderr=True) + out, err = finish_subprocess(query_proc, query_cmd) + + for line in out.splitlines(): + if line.startswith('hidden'): + value = line.split(':')[1].strip() + if value == "1": + raise Exception( + "VHD %(path)s is marked as hidden without child" % + locals()) + + def prepare_if_exists(staging_path, vhd_name, parent_path=None): + """ + Check for existance of a particular VHD in the staging path and + preparing it for moving into the SR. + + Returns: Tuple of (Path to move into the SR, VDI_UUID) + None, if the vhd_name doesn't exist in the staging path + + If the VHD exists, we will do the following: + 1. Rename it with a UUID. + 2. If parent_path exists, we'll link up the VHDs. + """ + orig_path = os.path.join(staging_path, vhd_name) + if not os.path.exists(orig_path): + return None + new_path, vdi_uuid = rename_with_uuid(orig_path) + if parent_path: + # NOTE(sirp): this step is necessary so that an SR scan won't + # delete the base_copy out from under us (since it would be + # orphaned) + link_vhds(new_path, parent_path) + return (new_path, vdi_uuid) + + def validate_vdi_chain(vdi_path): + """ + This check ensures that the parent pointers on the VHDs are valid + before we move the VDI chain to the SR. This is *very* important + because a bad parent pointer will corrupt the SR causing a cascade of + failures. + """ + def get_parent_path(path): + query_cmd = "vhd-util query -n %(path)s -p" % locals() + query_proc = make_subprocess(query_cmd, stdout=True, stderr=True) + out, err = finish_subprocess( + query_proc, query_cmd, ok_exit_codes=[0, 22]) + first_line = out.splitlines()[0].strip() + if first_line.endswith(".vhd"): + return first_line + elif 'has no parent' in first_line: + return None + elif 'query failed' in first_line: + raise Exception("VDI '%(path)s' not present which breaks" + " the VDI chain, bailing out" % locals()) + else: + raise Exception("Unexpected output '%(out)s' from vhd-util" % + locals()) + + cur_path = vdi_path + while cur_path: + cur_path = get_parent_path(cur_path) + + vdi_return_list = [] + paths_to_move = [] + + image_parent = None + base_info = prepare_if_exists(staging_path, 'base.vhd') + if base_info: + paths_to_move.append(base_info[0]) + image_parent = base_info[0] + + image_info = prepare_if_exists(staging_path, 'image.vhd', image_parent) + if not image_info: + raise Exception("Invalid image: image.vhd not present") + + paths_to_move.insert(0, image_info[0]) + + snap_info = prepare_if_exists(staging_path, 'snap.vhd', + image_info[0]) + if snap_info: + validate_vdi_chain(snap_info[0]) + # NOTE(sirp): this is an insert rather than an append since the + # 'snapshot' vhd needs to be copied into the SR before the base copy. + # If it doesn't, then there is a possibliity that snapwatchd will + # delete the base_copy since it is an unreferenced parent. + paths_to_move.insert(0, snap_info[0]) + # We return this snap as the VDI instead of image.vhd + vdi_return_list.append(dict(vdi_type="root", vdi_uuid=snap_info[1])) + else: + validate_vdi_chain(image_info[0]) + assert_vhd_not_hidden(image_info[0]) + # If there's no snap, we return the image.vhd UUID + vdi_return_list.append(dict(vdi_type="root", vdi_uuid=image_info[1])) + + swap_info = prepare_if_exists(staging_path, 'swap.vhd') + if swap_info: + assert_vhd_not_hidden(swap_info[0]) + paths_to_move.append(swap_info[0]) + vdi_return_list.append(dict(vdi_type="swap", vdi_uuid=swap_info[1])) + + for path in paths_to_move: + move_into_sr(path) + + return vdi_return_list + + +def prepare_staging_area_for_upload(sr_path, staging_path, vdi_uuids): + """Hard-link VHDs into staging area with appropriate filename + ('snap' or 'image.vhd') + """ + for name, uuid in vdi_uuids.items(): + if uuid: + source = os.path.join(sr_path, "%s.vhd" % uuid) + link_name = os.path.join(staging_path, "%s.vhd" % name) + os.link(source, link_name) + + +def create_tarball(fileobj, path, callback=None): + """Create a tarball from a given path. + + :param fileobj: a file-like object holding the tarball byte-stream. + If None, then only the callback will be used. + :param path: path to create tarball from + :param callback: optional callback to call on each chunk written + """ + tar_cmd = "tar -zc --directory=%(path)s ." % locals() + tar_proc = make_subprocess(tar_cmd, stdout=True, stderr=True) + + while True: + chunk = tar_proc.stdout.read(CHUNK_SIZE) + if chunk == '': + break + + if callback: + callback(chunk) + + if fileobj: + fileobj.write(chunk) + + finish_subprocess(tar_proc, tar_cmd) + + +def extract_tarball(fileobj, path, callback=None): + """Extract a tarball to a given path. + + :param fileobj: a file-like object holding the tarball byte-stream + :param path: path to extract tarball into + :param callback: optional callback to call on each chunk read + """ + tar_cmd = "tar -zx --directory=%(path)s" % locals() + tar_proc = make_subprocess(tar_cmd, stderr=True, stdin=True) + + while True: + chunk = fileobj.read(CHUNK_SIZE) + if chunk == '': + break + + if callback: + callback(chunk) + + tar_proc.stdin.write(chunk) + + finish_subprocess(tar_proc, tar_cmd) |