diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-03-09 21:10:15 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-03-09 21:10:15 +0000 |
| commit | 07f8da5c46ded41aa9ccf9b9d56712a182270974 (patch) | |
| tree | d3ff17da217511f50496c0eabb7a5d4cdbbe150c | |
| parent | 1c68d20e4bc9ca48a01b932f4c511f55cd54f4a9 (diff) | |
| parent | 627d9e99481db537fe71639dd38daf205a9e0728 (diff) | |
Merge "Switch all xenapi async plugin calls to be sync"
| -rw-r--r-- | nova/tests/xenapi/stubs.py | 1 | ||||
| -rw-r--r-- | nova/virt/xenapi/firewall.py | 4 | ||||
| -rw-r--r-- | nova/virt/xenapi/host.py | 11 | ||||
| -rw-r--r-- | nova/virt/xenapi/pool.py | 4 | ||||
| -rw-r--r-- | nova/virt/xenapi/vm_utils.py | 21 | ||||
| -rw-r--r-- | nova/virt/xenapi/vmops.py | 21 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 77 |
7 files changed, 30 insertions, 109 deletions
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 775572769..e6bf9dd4b 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -412,7 +412,6 @@ def stub_out_migration_methods(stubs): stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr) stubs.Set(vmops.VMOps, '_create_snapshot', fake_create_snapshot) stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi) - stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None) stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path) stubs.Set(vmops.VMOps, 'reset_network', fake_reset_network) stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown) diff --git a/nova/virt/xenapi/firewall.py b/nova/virt/xenapi/firewall.py index 3a3ce779c..506ffcb2e 100644 --- a/nova/virt/xenapi/firewall.py +++ b/nova/virt/xenapi/firewall.py @@ -50,9 +50,7 @@ class Dom0IptablesFirewallDriver(firewall.IptablesFirewallDriver): args = {} args.update(map(lambda x: (x, str(kwargs[x])), kwargs)) args['cmd_args'] = json.dumps(cmd) - task = self._session.async_call_plugin( - 'xenhost', 'iptables_config', args) - ret = self._session.wait_for_task(task) + ret = self._session.call_plugin('xenhost', 'iptables_config', args) json_ret = json.loads(ret) return (json_ret['out'], json_ret['err']) diff --git a/nova/virt/xenapi/host.py b/nova/virt/xenapi/host.py index 8de7a722a..141700500 100644 --- a/nova/virt/xenapi/host.py +++ b/nova/virt/xenapi/host.py @@ -21,7 +21,6 @@ Management class for host-related functions (start, reboot, etc). import logging import json -import random from nova.compute import vm_states from nova import context @@ -160,14 +159,12 @@ def call_xenhost(session, method, arg_dict): out that behavior. """ # Create a task ID as something that won't match any instance ID - task_id = random.randint(-80000, -70000) XenAPI = session.get_imported_xenapi() try: - task = session.async_call_plugin("xenhost", method, args=arg_dict) - task_result = session.wait_for_task(task, str(task_id)) - if not task_result: - task_result = json.dumps("") - return json.loads(task_result) + result = session.call_plugin('xenhost', method, args=arg_dict) + if not result: + return '' + return json.loads(result) except ValueError: LOG.exception(_("Unable to get updated status")) return None diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py index 6635d4c19..a5157fd0a 100644 --- a/nova/virt/xenapi/pool.py +++ b/nova/virt/xenapi/pool.py @@ -138,9 +138,7 @@ class ResourcePool(object): 'master_addr': self._host_addr, 'master_user': FLAGS.xenapi_connection_username, 'master_pass': FLAGS.xenapi_connection_password, } - task = self._session.async_call_plugin('xenhost', - 'host_join', args) - self._session.wait_for_task(task) + self._session.call_plugin('xenhost', 'host_join', args) except self.XenAPI.Failure as e: LOG.error(_("Pool-Join failed: %(e)s") % locals()) raise exception.AggregateError(aggregate_id=aggregate_id, diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index a929449b4..da4c3ca88 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -426,8 +426,7 @@ class VMHelper(xenapi.HelperBase): 'properties': properties} kwargs = {'params': pickle.dumps(params)} - task = session.async_call_plugin('glance', 'upload_vhd', kwargs) - session.wait_for_task(task, instance['uuid']) + session.call_plugin('glance', 'upload_vhd', kwargs) @classmethod def resize_disk(cls, session, vdi_ref, instance_type): @@ -584,9 +583,8 @@ class VMHelper(xenapi.HelperBase): args = {} args['cached-image'] = image args['new-image-uuid'] = str(uuid.uuid4()) - task = session.async_call_plugin('glance', "create_kernel_ramdisk", - args) - filename = session.wait_for_task(task, instance.id) + filename = session.call_plugin('glance', 'create_kernel_ramdisk', + args) if filename == "": return cls.fetch_image(context, session, instance, image, @@ -689,7 +687,7 @@ class VMHelper(xenapi.HelperBase): session, instance, image, image_type) @classmethod - def _retry_glance_download_vhd(cls, context, session, instance, image): + def _retry_glance_download_vhd(cls, context, session, image): # NOTE(sirp): The Glance plugin runs under Python 2.4 # which does not have the `uuid` module. To work around this, # we generate the uuids here (under Python 2.6+) and @@ -713,9 +711,8 @@ class VMHelper(xenapi.HelperBase): 'attempt %(attempt_num)d/%(max_attempts)d ' 'from %(glance_host)s:%(glance_port)s') % locals()) - task = session.async_call_plugin('glance', 'download_vhd', kwargs) try: - result = session.wait_for_task(task, instance['uuid']) + result = session.call_plugin('glance', 'download_vhd', kwargs) return json.loads(result) except cls.XenAPI.Failure as exc: _type, _method, error = exc.details[:3] @@ -741,8 +738,7 @@ class VMHelper(xenapi.HelperBase): % locals()) sr_ref = cls.safe_find_sr(session) - vdis = cls._retry_glance_download_vhd(context, session, instance, - image) + vdis = cls._retry_glance_download_vhd(context, session, image) # 'download_vhd' will return a list of dictionaries describing VDIs. # The dictionary will contain 'vdi_type' and 'vdi_uuid' keys. @@ -863,12 +859,13 @@ class VMHelper(xenapi.HelperBase): fn = "copy_kernel_vdi" args = {} args['vdi-ref'] = vdi_ref + # Let the plugin copy the correct number of bytes. args['image-size'] = str(vdi_size) if FLAGS.cache_images: args['cached-image'] = image - task = session.async_call_plugin('glance', fn, args) - filename = session.wait_for_task(task, instance['uuid']) + filename = session.call_plugin('glance', fn, args) + # Remove the VDI as it is not needed anymore. session.call_xenapi("VDI.destroy", vdi_ref) LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref) diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 8669fbd09..aada50965 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -706,10 +706,8 @@ class VMOps(object): try: _params = {'params': pickle.dumps(params)} - task = self._session.async_call_plugin('migration', - 'transfer_vhd', - _params) - self._session.wait_for_task(task, instance_uuid) + self._session.call_plugin('migration', 'transfer_vhd', + _params) except self.XenAPI.Failure: msg = _("Failed to transfer vhd to new host") raise exception.MigrationError(reason=msg) @@ -869,9 +867,8 @@ class VMOps(object): else: new_uuid = new_base_copy_uuid - task = self._session.async_call_plugin('migration', - 'move_vhds_into_sr', {'params': pickle.dumps(params)}) - self._session.wait_for_task(task, instance['uuid']) + self._session.call_plugin('migration', 'move_vhds_into_sr', + {'params': pickle.dumps(params)}) # Now we rescan the SR so we find the VHDs VMHelper.scan_default_sr(self._session) @@ -1128,9 +1125,7 @@ class VMOps(object): args['kernel-file'] = kernel if ramdisk: args['ramdisk-file'] = ramdisk - task = self._session.async_call_plugin( - 'glance', 'remove_kernel_ramdisk', args) - self._session.wait_for_task(task) + self._session.call_plugin('glance', 'remove_kernel_ramdisk', args) def _destroy_kernel_ramdisk(self, instance, vm_ref): """Three situations can occur: @@ -1615,10 +1610,8 @@ class VMOps(object): args = {'dom_id': vm_rec['domid'], 'path': path} args.update(addl_args or {}) try: - task = self._session.async_call_plugin(plugin, method, args) - ret = self._session.wait_for_task(task, instance_uuid) + return self._session.call_plugin(plugin, method, args) except self.XenAPI.Failure, e: - ret = None err_msg = e.details[-1].splitlines()[-1] if 'TIMEOUT:' in err_msg: LOG.error(_('TIMEOUT: The call to %(method)s timed out. ' @@ -1633,7 +1626,7 @@ class VMOps(object): LOG.error(_('The call to %(method)s returned an error: %(e)s. ' 'VM id=%(instance_uuid)s; args=%(args)r') % locals()) return {'returncode': 'error', 'message': err_msg} - return ret + return None def add_to_xenstore(self, vm, path, key, value): """ diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index d6d05e8f8..54eb886c5 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -24,17 +24,6 @@ All XenAPI calls are on a green thread (using eventlet's "tpool" thread pool). They are remote calls, and so may hang for the usual reasons. -All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async -(using XenAPI.VM.async_start etc). These return a task, which can then be -polled for completion. - -This combination of techniques means that we don't block the main thread at -all, and at the same time we don't hold lots of threads waiting for -long-running operations. - -FIXME: get_info currently doesn't conform to these rules, and will block the -reactor thread if the VM.get_by_name_label or VM.get_record calls block. - **Related Flags** :xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform. @@ -42,9 +31,6 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block. Platform (default: root). :xenapi_connection_password: Password for connection to XenServer/Xen Cloud Platform. -:xenapi_task_poll_interval: The interval (seconds) used for polling of - remote tasks (Async.VM.start, etc) - (default: 0.5). :target_host: the iSCSI Target Host IP address, i.e. the IP address for the nova-volume host :target_port: iSCSI Target Port, 3260 Default @@ -99,11 +85,6 @@ xenapi_opts = [ default=5, help='Maximum number of concurrent XenAPI connections. ' 'Used only if connection_type=xenapi.'), - cfg.FloatOpt('xenapi_task_poll_interval', - default=0.5, - help='The interval used for polling of remote tasks ' - '(Async.VM.start, etc). ' - 'Used only if connection_type=xenapi.'), cfg.FloatOpt('xenapi_vhd_coalesce_poll_interval', default=5.0, help='The interval used for polling of coalescing vhds. ' @@ -121,9 +102,6 @@ xenapi_opts = [ cfg.StrOpt('xenapi_sr_base_path', default='/var/run/sr-mount', help='Base path to the storage repository'), - cfg.BoolOpt('xenapi_log_instance_actions', - default=False, - help='Log all instance calls to XenAPI in the database.'), cfg.StrOpt('target_host', default=None, help='iSCSI Target Host'), @@ -604,62 +582,23 @@ class XenAPISession(object): f = session.xenapi_request return tpool.execute(f, method, *args) - def async_call_plugin(self, plugin, fn, args): - """Call Async.host.call_plugin on a background thread.""" + def call_plugin(self, plugin, fn, args): + """Call host.call_plugin on a background thread.""" # NOTE(johannes): Fetch host before we acquire a session. Since - # _get_session() acquires a session too, it can result in a deadlock - # if multiple greenthreads race with each other. See bug 924918 + # get_xenapi_host() acquires a session too, it can result in a + # deadlock if multiple greenthreads race with each other. See + # bug 924918 host = self.get_xenapi_host() + # NOTE(armando): pass the host uuid along with the args so that # the plugin gets executed on the right host when using XS pools args['host_uuid'] = self.host_uuid + with self._get_session() as session: return tpool.execute(self._unwrap_plugin_exceptions, - session.xenapi.Async.host.call_plugin, + session.xenapi.host.call_plugin, host, plugin, fn, args) - def wait_for_task(self, task, uuid=None): - """Return the result of the given task. The task is polled - until it completes.""" - while True: - """Poll the given XenAPI task, and return the result if the - action was completed successfully or not. - """ - ctxt = context.get_admin_context() - name = self.call_xenapi("task.get_name_label", task) - status = self.call_xenapi("task.get_status", task) - - # Ensure action is never > 255 - action = dict(action=name[:255], error=None) - log_instance_actions = (FLAGS.xenapi_log_instance_actions and - uuid) - if log_instance_actions: - action["instance_uuid"] = uuid - - if status == "pending": - pass - elif status == "success": - result = self.call_xenapi("task.get_result", task) - LOG.info(_("Task [%(name)s] %(task)s status:" - " success %(result)s") % locals()) - - if log_instance_actions: - db.instance_action_create(ctxt, action) - - return _parse_xmlrpc_value(result) - else: - error_info = self.call_xenapi("task.get_error_info", task) - LOG.warn(_("Task [%(name)s] %(task)s status:" - " %(status)s %(error_info)s") % locals()) - - if log_instance_actions: - action["error"] = str(error_info) - db.instance_action_create(ctxt, action) - - raise self.XenAPI.Failure(error_info) - - greenthread.sleep(FLAGS.xenapi_task_poll_interval) - def _create_session(self, url): """Stubout point. This can be replaced with a mock session.""" return self.XenAPI.Session(url) |
