summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJohannes Erdfelt <johannes.erdfelt@rackspace.com>2012-03-09 16:20:01 +0000
committerJohannes Erdfelt <johannes.erdfelt@rackspace.com>2012-03-09 17:03:32 +0000
commit627d9e99481db537fe71639dd38daf205a9e0728 (patch)
tree939eb56e620771afe9fe972b3ac43cfac4ad3aab
parent0c3c67e085018316d2e6174d852bbc8cd2e4ccb1 (diff)
Switch all xenapi async plugin calls to be sync
Originally all calls were required to be async because of integration with twisted, but that has long since been changed and just using the synchronous calls is easier and less code. Change-Id: Ib86fc721326de34fd71d68bbec42ecd65280aa8a
-rw-r--r--nova/tests/xenapi/stubs.py1
-rw-r--r--nova/virt/xenapi/firewall.py4
-rw-r--r--nova/virt/xenapi/host.py11
-rw-r--r--nova/virt/xenapi/pool.py4
-rw-r--r--nova/virt/xenapi/vm_utils.py21
-rw-r--r--nova/virt/xenapi/vmops.py21
-rw-r--r--nova/virt/xenapi_conn.py77
7 files changed, 30 insertions, 109 deletions
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index 775572769..e6bf9dd4b 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -412,7 +412,6 @@ def stub_out_migration_methods(stubs):
stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr)
stubs.Set(vmops.VMOps, '_create_snapshot', fake_create_snapshot)
stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi)
- stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None)
stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path)
stubs.Set(vmops.VMOps, 'reset_network', fake_reset_network)
stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown)
diff --git a/nova/virt/xenapi/firewall.py b/nova/virt/xenapi/firewall.py
index 3a3ce779c..506ffcb2e 100644
--- a/nova/virt/xenapi/firewall.py
+++ b/nova/virt/xenapi/firewall.py
@@ -50,9 +50,7 @@ class Dom0IptablesFirewallDriver(firewall.IptablesFirewallDriver):
args = {}
args.update(map(lambda x: (x, str(kwargs[x])), kwargs))
args['cmd_args'] = json.dumps(cmd)
- task = self._session.async_call_plugin(
- 'xenhost', 'iptables_config', args)
- ret = self._session.wait_for_task(task)
+ ret = self._session.call_plugin('xenhost', 'iptables_config', args)
json_ret = json.loads(ret)
return (json_ret['out'], json_ret['err'])
diff --git a/nova/virt/xenapi/host.py b/nova/virt/xenapi/host.py
index 8de7a722a..141700500 100644
--- a/nova/virt/xenapi/host.py
+++ b/nova/virt/xenapi/host.py
@@ -21,7 +21,6 @@ Management class for host-related functions (start, reboot, etc).
import logging
import json
-import random
from nova.compute import vm_states
from nova import context
@@ -160,14 +159,12 @@ def call_xenhost(session, method, arg_dict):
out that behavior.
"""
# Create a task ID as something that won't match any instance ID
- task_id = random.randint(-80000, -70000)
XenAPI = session.get_imported_xenapi()
try:
- task = session.async_call_plugin("xenhost", method, args=arg_dict)
- task_result = session.wait_for_task(task, str(task_id))
- if not task_result:
- task_result = json.dumps("")
- return json.loads(task_result)
+ result = session.call_plugin('xenhost', method, args=arg_dict)
+ if not result:
+ return ''
+ return json.loads(result)
except ValueError:
LOG.exception(_("Unable to get updated status"))
return None
diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py
index 6635d4c19..a5157fd0a 100644
--- a/nova/virt/xenapi/pool.py
+++ b/nova/virt/xenapi/pool.py
@@ -138,9 +138,7 @@ class ResourcePool(object):
'master_addr': self._host_addr,
'master_user': FLAGS.xenapi_connection_username,
'master_pass': FLAGS.xenapi_connection_password, }
- task = self._session.async_call_plugin('xenhost',
- 'host_join', args)
- self._session.wait_for_task(task)
+ self._session.call_plugin('xenhost', 'host_join', args)
except self.XenAPI.Failure as e:
LOG.error(_("Pool-Join failed: %(e)s") % locals())
raise exception.AggregateError(aggregate_id=aggregate_id,
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
index a929449b4..da4c3ca88 100644
--- a/nova/virt/xenapi/vm_utils.py
+++ b/nova/virt/xenapi/vm_utils.py
@@ -426,8 +426,7 @@ class VMHelper(xenapi.HelperBase):
'properties': properties}
kwargs = {'params': pickle.dumps(params)}
- task = session.async_call_plugin('glance', 'upload_vhd', kwargs)
- session.wait_for_task(task, instance['uuid'])
+ session.call_plugin('glance', 'upload_vhd', kwargs)
@classmethod
def resize_disk(cls, session, vdi_ref, instance_type):
@@ -584,9 +583,8 @@ class VMHelper(xenapi.HelperBase):
args = {}
args['cached-image'] = image
args['new-image-uuid'] = str(uuid.uuid4())
- task = session.async_call_plugin('glance', "create_kernel_ramdisk",
- args)
- filename = session.wait_for_task(task, instance.id)
+ filename = session.call_plugin('glance', 'create_kernel_ramdisk',
+ args)
if filename == "":
return cls.fetch_image(context, session, instance, image,
@@ -689,7 +687,7 @@ class VMHelper(xenapi.HelperBase):
session, instance, image, image_type)
@classmethod
- def _retry_glance_download_vhd(cls, context, session, instance, image):
+ def _retry_glance_download_vhd(cls, context, session, image):
# NOTE(sirp): The Glance plugin runs under Python 2.4
# which does not have the `uuid` module. To work around this,
# we generate the uuids here (under Python 2.6+) and
@@ -713,9 +711,8 @@ class VMHelper(xenapi.HelperBase):
'attempt %(attempt_num)d/%(max_attempts)d '
'from %(glance_host)s:%(glance_port)s') % locals())
- task = session.async_call_plugin('glance', 'download_vhd', kwargs)
try:
- result = session.wait_for_task(task, instance['uuid'])
+ result = session.call_plugin('glance', 'download_vhd', kwargs)
return json.loads(result)
except cls.XenAPI.Failure as exc:
_type, _method, error = exc.details[:3]
@@ -741,8 +738,7 @@ class VMHelper(xenapi.HelperBase):
% locals())
sr_ref = cls.safe_find_sr(session)
- vdis = cls._retry_glance_download_vhd(context, session, instance,
- image)
+ vdis = cls._retry_glance_download_vhd(context, session, image)
# 'download_vhd' will return a list of dictionaries describing VDIs.
# The dictionary will contain 'vdi_type' and 'vdi_uuid' keys.
@@ -863,12 +859,13 @@ class VMHelper(xenapi.HelperBase):
fn = "copy_kernel_vdi"
args = {}
args['vdi-ref'] = vdi_ref
+
# Let the plugin copy the correct number of bytes.
args['image-size'] = str(vdi_size)
if FLAGS.cache_images:
args['cached-image'] = image
- task = session.async_call_plugin('glance', fn, args)
- filename = session.wait_for_task(task, instance['uuid'])
+ filename = session.call_plugin('glance', fn, args)
+
# Remove the VDI as it is not needed anymore.
session.call_xenapi("VDI.destroy", vdi_ref)
LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref)
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 8669fbd09..aada50965 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -706,10 +706,8 @@ class VMOps(object):
try:
_params = {'params': pickle.dumps(params)}
- task = self._session.async_call_plugin('migration',
- 'transfer_vhd',
- _params)
- self._session.wait_for_task(task, instance_uuid)
+ self._session.call_plugin('migration', 'transfer_vhd',
+ _params)
except self.XenAPI.Failure:
msg = _("Failed to transfer vhd to new host")
raise exception.MigrationError(reason=msg)
@@ -869,9 +867,8 @@ class VMOps(object):
else:
new_uuid = new_base_copy_uuid
- task = self._session.async_call_plugin('migration',
- 'move_vhds_into_sr', {'params': pickle.dumps(params)})
- self._session.wait_for_task(task, instance['uuid'])
+ self._session.call_plugin('migration', 'move_vhds_into_sr',
+ {'params': pickle.dumps(params)})
# Now we rescan the SR so we find the VHDs
VMHelper.scan_default_sr(self._session)
@@ -1128,9 +1125,7 @@ class VMOps(object):
args['kernel-file'] = kernel
if ramdisk:
args['ramdisk-file'] = ramdisk
- task = self._session.async_call_plugin(
- 'glance', 'remove_kernel_ramdisk', args)
- self._session.wait_for_task(task)
+ self._session.call_plugin('glance', 'remove_kernel_ramdisk', args)
def _destroy_kernel_ramdisk(self, instance, vm_ref):
"""Three situations can occur:
@@ -1615,10 +1610,8 @@ class VMOps(object):
args = {'dom_id': vm_rec['domid'], 'path': path}
args.update(addl_args or {})
try:
- task = self._session.async_call_plugin(plugin, method, args)
- ret = self._session.wait_for_task(task, instance_uuid)
+ return self._session.call_plugin(plugin, method, args)
except self.XenAPI.Failure, e:
- ret = None
err_msg = e.details[-1].splitlines()[-1]
if 'TIMEOUT:' in err_msg:
LOG.error(_('TIMEOUT: The call to %(method)s timed out. '
@@ -1633,7 +1626,7 @@ class VMOps(object):
LOG.error(_('The call to %(method)s returned an error: %(e)s. '
'VM id=%(instance_uuid)s; args=%(args)r') % locals())
return {'returncode': 'error', 'message': err_msg}
- return ret
+ return None
def add_to_xenstore(self, vm, path, key, value):
"""
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
index d6d05e8f8..54eb886c5 100644
--- a/nova/virt/xenapi_conn.py
+++ b/nova/virt/xenapi_conn.py
@@ -24,17 +24,6 @@ All XenAPI calls are on a green thread (using eventlet's "tpool"
thread pool). They are remote calls, and so may hang for the usual
reasons.
-All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
-(using XenAPI.VM.async_start etc). These return a task, which can then be
-polled for completion.
-
-This combination of techniques means that we don't block the main thread at
-all, and at the same time we don't hold lots of threads waiting for
-long-running operations.
-
-FIXME: get_info currently doesn't conform to these rules, and will block the
-reactor thread if the VM.get_by_name_label or VM.get_record calls block.
-
**Related Flags**
:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
@@ -42,9 +31,6 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
Platform (default: root).
:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
Platform.
-:xenapi_task_poll_interval: The interval (seconds) used for polling of
- remote tasks (Async.VM.start, etc)
- (default: 0.5).
:target_host: the iSCSI Target Host IP address, i.e. the IP
address for the nova-volume host
:target_port: iSCSI Target Port, 3260 Default
@@ -99,11 +85,6 @@ xenapi_opts = [
default=5,
help='Maximum number of concurrent XenAPI connections. '
'Used only if connection_type=xenapi.'),
- cfg.FloatOpt('xenapi_task_poll_interval',
- default=0.5,
- help='The interval used for polling of remote tasks '
- '(Async.VM.start, etc). '
- 'Used only if connection_type=xenapi.'),
cfg.FloatOpt('xenapi_vhd_coalesce_poll_interval',
default=5.0,
help='The interval used for polling of coalescing vhds. '
@@ -121,9 +102,6 @@ xenapi_opts = [
cfg.StrOpt('xenapi_sr_base_path',
default='/var/run/sr-mount',
help='Base path to the storage repository'),
- cfg.BoolOpt('xenapi_log_instance_actions',
- default=False,
- help='Log all instance calls to XenAPI in the database.'),
cfg.StrOpt('target_host',
default=None,
help='iSCSI Target Host'),
@@ -604,62 +582,23 @@ class XenAPISession(object):
f = session.xenapi_request
return tpool.execute(f, method, *args)
- def async_call_plugin(self, plugin, fn, args):
- """Call Async.host.call_plugin on a background thread."""
+ def call_plugin(self, plugin, fn, args):
+ """Call host.call_plugin on a background thread."""
# NOTE(johannes): Fetch host before we acquire a session. Since
- # _get_session() acquires a session too, it can result in a deadlock
- # if multiple greenthreads race with each other. See bug 924918
+ # get_xenapi_host() acquires a session too, it can result in a
+ # deadlock if multiple greenthreads race with each other. See
+ # bug 924918
host = self.get_xenapi_host()
+
# NOTE(armando): pass the host uuid along with the args so that
# the plugin gets executed on the right host when using XS pools
args['host_uuid'] = self.host_uuid
+
with self._get_session() as session:
return tpool.execute(self._unwrap_plugin_exceptions,
- session.xenapi.Async.host.call_plugin,
+ session.xenapi.host.call_plugin,
host, plugin, fn, args)
- def wait_for_task(self, task, uuid=None):
- """Return the result of the given task. The task is polled
- until it completes."""
- while True:
- """Poll the given XenAPI task, and return the result if the
- action was completed successfully or not.
- """
- ctxt = context.get_admin_context()
- name = self.call_xenapi("task.get_name_label", task)
- status = self.call_xenapi("task.get_status", task)
-
- # Ensure action is never > 255
- action = dict(action=name[:255], error=None)
- log_instance_actions = (FLAGS.xenapi_log_instance_actions and
- uuid)
- if log_instance_actions:
- action["instance_uuid"] = uuid
-
- if status == "pending":
- pass
- elif status == "success":
- result = self.call_xenapi("task.get_result", task)
- LOG.info(_("Task [%(name)s] %(task)s status:"
- " success %(result)s") % locals())
-
- if log_instance_actions:
- db.instance_action_create(ctxt, action)
-
- return _parse_xmlrpc_value(result)
- else:
- error_info = self.call_xenapi("task.get_error_info", task)
- LOG.warn(_("Task [%(name)s] %(task)s status:"
- " %(status)s %(error_info)s") % locals())
-
- if log_instance_actions:
- action["error"] = str(error_info)
- db.instance_action_create(ctxt, action)
-
- raise self.XenAPI.Failure(error_info)
-
- greenthread.sleep(FLAGS.xenapi_task_poll_interval)
-
def _create_session(self, url):
"""Stubout point. This can be replaced with a mock session."""
return self.XenAPI.Session(url)