summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-03-09 21:10:15 +0000
committerGerrit Code Review <review@openstack.org>2012-03-09 21:10:15 +0000
commit07f8da5c46ded41aa9ccf9b9d56712a182270974 (patch)
treed3ff17da217511f50496c0eabb7a5d4cdbbe150c
parent1c68d20e4bc9ca48a01b932f4c511f55cd54f4a9 (diff)
parent627d9e99481db537fe71639dd38daf205a9e0728 (diff)
Merge "Switch all xenapi async plugin calls to be sync"
-rw-r--r--nova/tests/xenapi/stubs.py1
-rw-r--r--nova/virt/xenapi/firewall.py4
-rw-r--r--nova/virt/xenapi/host.py11
-rw-r--r--nova/virt/xenapi/pool.py4
-rw-r--r--nova/virt/xenapi/vm_utils.py21
-rw-r--r--nova/virt/xenapi/vmops.py21
-rw-r--r--nova/virt/xenapi_conn.py77
7 files changed, 30 insertions, 109 deletions
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index 775572769..e6bf9dd4b 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -412,7 +412,6 @@ def stub_out_migration_methods(stubs):
stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr)
stubs.Set(vmops.VMOps, '_create_snapshot', fake_create_snapshot)
stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi)
- stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None)
stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path)
stubs.Set(vmops.VMOps, 'reset_network', fake_reset_network)
stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown)
diff --git a/nova/virt/xenapi/firewall.py b/nova/virt/xenapi/firewall.py
index 3a3ce779c..506ffcb2e 100644
--- a/nova/virt/xenapi/firewall.py
+++ b/nova/virt/xenapi/firewall.py
@@ -50,9 +50,7 @@ class Dom0IptablesFirewallDriver(firewall.IptablesFirewallDriver):
args = {}
args.update(map(lambda x: (x, str(kwargs[x])), kwargs))
args['cmd_args'] = json.dumps(cmd)
- task = self._session.async_call_plugin(
- 'xenhost', 'iptables_config', args)
- ret = self._session.wait_for_task(task)
+ ret = self._session.call_plugin('xenhost', 'iptables_config', args)
json_ret = json.loads(ret)
return (json_ret['out'], json_ret['err'])
diff --git a/nova/virt/xenapi/host.py b/nova/virt/xenapi/host.py
index 8de7a722a..141700500 100644
--- a/nova/virt/xenapi/host.py
+++ b/nova/virt/xenapi/host.py
@@ -21,7 +21,6 @@ Management class for host-related functions (start, reboot, etc).
import logging
import json
-import random
from nova.compute import vm_states
from nova import context
@@ -160,14 +159,12 @@ def call_xenhost(session, method, arg_dict):
out that behavior.
"""
# Create a task ID as something that won't match any instance ID
- task_id = random.randint(-80000, -70000)
XenAPI = session.get_imported_xenapi()
try:
- task = session.async_call_plugin("xenhost", method, args=arg_dict)
- task_result = session.wait_for_task(task, str(task_id))
- if not task_result:
- task_result = json.dumps("")
- return json.loads(task_result)
+ result = session.call_plugin('xenhost', method, args=arg_dict)
+ if not result:
+ return ''
+ return json.loads(result)
except ValueError:
LOG.exception(_("Unable to get updated status"))
return None
diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py
index 6635d4c19..a5157fd0a 100644
--- a/nova/virt/xenapi/pool.py
+++ b/nova/virt/xenapi/pool.py
@@ -138,9 +138,7 @@ class ResourcePool(object):
'master_addr': self._host_addr,
'master_user': FLAGS.xenapi_connection_username,
'master_pass': FLAGS.xenapi_connection_password, }
- task = self._session.async_call_plugin('xenhost',
- 'host_join', args)
- self._session.wait_for_task(task)
+ self._session.call_plugin('xenhost', 'host_join', args)
except self.XenAPI.Failure as e:
LOG.error(_("Pool-Join failed: %(e)s") % locals())
raise exception.AggregateError(aggregate_id=aggregate_id,
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
index a929449b4..da4c3ca88 100644
--- a/nova/virt/xenapi/vm_utils.py
+++ b/nova/virt/xenapi/vm_utils.py
@@ -426,8 +426,7 @@ class VMHelper(xenapi.HelperBase):
'properties': properties}
kwargs = {'params': pickle.dumps(params)}
- task = session.async_call_plugin('glance', 'upload_vhd', kwargs)
- session.wait_for_task(task, instance['uuid'])
+ session.call_plugin('glance', 'upload_vhd', kwargs)
@classmethod
def resize_disk(cls, session, vdi_ref, instance_type):
@@ -584,9 +583,8 @@ class VMHelper(xenapi.HelperBase):
args = {}
args['cached-image'] = image
args['new-image-uuid'] = str(uuid.uuid4())
- task = session.async_call_plugin('glance', "create_kernel_ramdisk",
- args)
- filename = session.wait_for_task(task, instance.id)
+ filename = session.call_plugin('glance', 'create_kernel_ramdisk',
+ args)
if filename == "":
return cls.fetch_image(context, session, instance, image,
@@ -689,7 +687,7 @@ class VMHelper(xenapi.HelperBase):
session, instance, image, image_type)
@classmethod
- def _retry_glance_download_vhd(cls, context, session, instance, image):
+ def _retry_glance_download_vhd(cls, context, session, image):
# NOTE(sirp): The Glance plugin runs under Python 2.4
# which does not have the `uuid` module. To work around this,
# we generate the uuids here (under Python 2.6+) and
@@ -713,9 +711,8 @@ class VMHelper(xenapi.HelperBase):
'attempt %(attempt_num)d/%(max_attempts)d '
'from %(glance_host)s:%(glance_port)s') % locals())
- task = session.async_call_plugin('glance', 'download_vhd', kwargs)
try:
- result = session.wait_for_task(task, instance['uuid'])
+ result = session.call_plugin('glance', 'download_vhd', kwargs)
return json.loads(result)
except cls.XenAPI.Failure as exc:
_type, _method, error = exc.details[:3]
@@ -741,8 +738,7 @@ class VMHelper(xenapi.HelperBase):
% locals())
sr_ref = cls.safe_find_sr(session)
- vdis = cls._retry_glance_download_vhd(context, session, instance,
- image)
+ vdis = cls._retry_glance_download_vhd(context, session, image)
# 'download_vhd' will return a list of dictionaries describing VDIs.
# The dictionary will contain 'vdi_type' and 'vdi_uuid' keys.
@@ -863,12 +859,13 @@ class VMHelper(xenapi.HelperBase):
fn = "copy_kernel_vdi"
args = {}
args['vdi-ref'] = vdi_ref
+
# Let the plugin copy the correct number of bytes.
args['image-size'] = str(vdi_size)
if FLAGS.cache_images:
args['cached-image'] = image
- task = session.async_call_plugin('glance', fn, args)
- filename = session.wait_for_task(task, instance['uuid'])
+ filename = session.call_plugin('glance', fn, args)
+
# Remove the VDI as it is not needed anymore.
session.call_xenapi("VDI.destroy", vdi_ref)
LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref)
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 8669fbd09..aada50965 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -706,10 +706,8 @@ class VMOps(object):
try:
_params = {'params': pickle.dumps(params)}
- task = self._session.async_call_plugin('migration',
- 'transfer_vhd',
- _params)
- self._session.wait_for_task(task, instance_uuid)
+ self._session.call_plugin('migration', 'transfer_vhd',
+ _params)
except self.XenAPI.Failure:
msg = _("Failed to transfer vhd to new host")
raise exception.MigrationError(reason=msg)
@@ -869,9 +867,8 @@ class VMOps(object):
else:
new_uuid = new_base_copy_uuid
- task = self._session.async_call_plugin('migration',
- 'move_vhds_into_sr', {'params': pickle.dumps(params)})
- self._session.wait_for_task(task, instance['uuid'])
+ self._session.call_plugin('migration', 'move_vhds_into_sr',
+ {'params': pickle.dumps(params)})
# Now we rescan the SR so we find the VHDs
VMHelper.scan_default_sr(self._session)
@@ -1128,9 +1125,7 @@ class VMOps(object):
args['kernel-file'] = kernel
if ramdisk:
args['ramdisk-file'] = ramdisk
- task = self._session.async_call_plugin(
- 'glance', 'remove_kernel_ramdisk', args)
- self._session.wait_for_task(task)
+ self._session.call_plugin('glance', 'remove_kernel_ramdisk', args)
def _destroy_kernel_ramdisk(self, instance, vm_ref):
"""Three situations can occur:
@@ -1615,10 +1610,8 @@ class VMOps(object):
args = {'dom_id': vm_rec['domid'], 'path': path}
args.update(addl_args or {})
try:
- task = self._session.async_call_plugin(plugin, method, args)
- ret = self._session.wait_for_task(task, instance_uuid)
+ return self._session.call_plugin(plugin, method, args)
except self.XenAPI.Failure, e:
- ret = None
err_msg = e.details[-1].splitlines()[-1]
if 'TIMEOUT:' in err_msg:
LOG.error(_('TIMEOUT: The call to %(method)s timed out. '
@@ -1633,7 +1626,7 @@ class VMOps(object):
LOG.error(_('The call to %(method)s returned an error: %(e)s. '
'VM id=%(instance_uuid)s; args=%(args)r') % locals())
return {'returncode': 'error', 'message': err_msg}
- return ret
+ return None
def add_to_xenstore(self, vm, path, key, value):
"""
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
index d6d05e8f8..54eb886c5 100644
--- a/nova/virt/xenapi_conn.py
+++ b/nova/virt/xenapi_conn.py
@@ -24,17 +24,6 @@ All XenAPI calls are on a green thread (using eventlet's "tpool"
thread pool). They are remote calls, and so may hang for the usual
reasons.
-All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
-(using XenAPI.VM.async_start etc). These return a task, which can then be
-polled for completion.
-
-This combination of techniques means that we don't block the main thread at
-all, and at the same time we don't hold lots of threads waiting for
-long-running operations.
-
-FIXME: get_info currently doesn't conform to these rules, and will block the
-reactor thread if the VM.get_by_name_label or VM.get_record calls block.
-
**Related Flags**
:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
@@ -42,9 +31,6 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
Platform (default: root).
:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
Platform.
-:xenapi_task_poll_interval: The interval (seconds) used for polling of
- remote tasks (Async.VM.start, etc)
- (default: 0.5).
:target_host: the iSCSI Target Host IP address, i.e. the IP
address for the nova-volume host
:target_port: iSCSI Target Port, 3260 Default
@@ -99,11 +85,6 @@ xenapi_opts = [
default=5,
help='Maximum number of concurrent XenAPI connections. '
'Used only if connection_type=xenapi.'),
- cfg.FloatOpt('xenapi_task_poll_interval',
- default=0.5,
- help='The interval used for polling of remote tasks '
- '(Async.VM.start, etc). '
- 'Used only if connection_type=xenapi.'),
cfg.FloatOpt('xenapi_vhd_coalesce_poll_interval',
default=5.0,
help='The interval used for polling of coalescing vhds. '
@@ -121,9 +102,6 @@ xenapi_opts = [
cfg.StrOpt('xenapi_sr_base_path',
default='/var/run/sr-mount',
help='Base path to the storage repository'),
- cfg.BoolOpt('xenapi_log_instance_actions',
- default=False,
- help='Log all instance calls to XenAPI in the database.'),
cfg.StrOpt('target_host',
default=None,
help='iSCSI Target Host'),
@@ -604,62 +582,23 @@ class XenAPISession(object):
f = session.xenapi_request
return tpool.execute(f, method, *args)
- def async_call_plugin(self, plugin, fn, args):
- """Call Async.host.call_plugin on a background thread."""
+ def call_plugin(self, plugin, fn, args):
+ """Call host.call_plugin on a background thread."""
# NOTE(johannes): Fetch host before we acquire a session. Since
- # _get_session() acquires a session too, it can result in a deadlock
- # if multiple greenthreads race with each other. See bug 924918
+ # get_xenapi_host() acquires a session too, it can result in a
+ # deadlock if multiple greenthreads race with each other. See
+ # bug 924918
host = self.get_xenapi_host()
+
# NOTE(armando): pass the host uuid along with the args so that
# the plugin gets executed on the right host when using XS pools
args['host_uuid'] = self.host_uuid
+
with self._get_session() as session:
return tpool.execute(self._unwrap_plugin_exceptions,
- session.xenapi.Async.host.call_plugin,
+ session.xenapi.host.call_plugin,
host, plugin, fn, args)
- def wait_for_task(self, task, uuid=None):
- """Return the result of the given task. The task is polled
- until it completes."""
- while True:
- """Poll the given XenAPI task, and return the result if the
- action was completed successfully or not.
- """
- ctxt = context.get_admin_context()
- name = self.call_xenapi("task.get_name_label", task)
- status = self.call_xenapi("task.get_status", task)
-
- # Ensure action is never > 255
- action = dict(action=name[:255], error=None)
- log_instance_actions = (FLAGS.xenapi_log_instance_actions and
- uuid)
- if log_instance_actions:
- action["instance_uuid"] = uuid
-
- if status == "pending":
- pass
- elif status == "success":
- result = self.call_xenapi("task.get_result", task)
- LOG.info(_("Task [%(name)s] %(task)s status:"
- " success %(result)s") % locals())
-
- if log_instance_actions:
- db.instance_action_create(ctxt, action)
-
- return _parse_xmlrpc_value(result)
- else:
- error_info = self.call_xenapi("task.get_error_info", task)
- LOG.warn(_("Task [%(name)s] %(task)s status:"
- " %(status)s %(error_info)s") % locals())
-
- if log_instance_actions:
- action["error"] = str(error_info)
- db.instance_action_create(ctxt, action)
-
- raise self.XenAPI.Failure(error_info)
-
- greenthread.sleep(FLAGS.xenapi_task_poll_interval)
-
def _create_session(self, url):
"""Stubout point. This can be replaced with a mock session."""
return self.XenAPI.Session(url)