From 26353bb372081f674cf5fd3dbbffd990918f3803 Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Tue, 29 May 2012 16:35:35 -0400 Subject: Move queue_get_for() from db to rpc. Part of blueprint common-rpc. The function queue_get_for() is a utility function used by various consumers of the rpc API. This function lived in the db API, but never ended up using anything from the database. This patch moves it into the rpc API so that it can be used by other users of rpc once it moves into openstack-common. Change-Id: If92675beecff5471b416a929c161b810e3c71939 --- bin/nova-manage | 4 +- nova/compute/api.py | 3 +- nova/compute/manager.py | 22 +++---- nova/compute/rpcapi.py | 6 +- nova/console/api.py | 10 +-- nova/console/manager.py | 2 +- nova/console/vmrc_manager.py | 6 +- nova/db/api.py | 8 --- nova/db/sqlalchemy/api.py | 8 --- nova/network/manager.py | 13 ++-- nova/network/quantum/manager.py | 6 +- nova/rpc/__init__.py | 5 ++ nova/scheduler/driver.py | 24 ++++---- nova/tests/api/openstack/compute/test_servers.py | 2 +- nova/tests/compute/test_compute.py | 6 +- nova/tests/db/fakes.py | 6 +- nova/tests/scheduler/test_scheduler.py | 78 ++++++++++++------------ nova/virt/xenapi/pool.py | 2 +- nova/volume/api.py | 16 ++--- 19 files changed, 104 insertions(+), 123 deletions(-) diff --git a/bin/nova-manage b/bin/nova-manage index f94497087..eff73d2e4 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -1152,7 +1152,7 @@ class VolumeCommands(object): return rpc.cast(ctxt, - db.queue_get_for(ctxt, FLAGS.volume_topic, host), + rpc.queue_get_for(ctxt, FLAGS.volume_topic, host), {"method": "delete_volume", "args": {"volume_id": volume['id']}}) @@ -1170,7 +1170,7 @@ class VolumeCommands(object): instance = db.instance_get(ctxt, volume['instance_id']) host = instance['host'] rpc.cast(ctxt, - db.queue_get_for(ctxt, FLAGS.compute_topic, host), + rpc.queue_get_for(ctxt, FLAGS.compute_topic, host), {"method": "attach_volume", "args": {"instance_id": instance['id'], "volume_id": volume['id'], diff --git a/nova/compute/api.py b/nova/compute/api.py index 3fd358a34..edcbff9b9 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -835,7 +835,7 @@ class API(base.Base): in self.db.service_get_all_compute_sorted(context)] for host in hosts: rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), + rpc.queue_get_for(context, FLAGS.compute_topic, host), {'method': 'refresh_provider_fw_rules', 'args': {}}) def _is_security_group_associated_with_server(self, security_group, @@ -1808,7 +1808,6 @@ class HostAPI(base.Base): """Reboots, shuts down or powers up the host.""" # NOTE(comstud): No instance_uuid argument to this compute manager # call - topic = self.db.queue_get_for(context, FLAGS.compute_topic, host) return self.compute_rpcapi.host_power_action(context, action=action, host=host) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 856175304..7ab27f740 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -315,9 +315,9 @@ class ComputeManager(manager.SchedulerDependentManager): """ #TODO(mdragon): perhaps make this variable by console_type? - return self.db.queue_get_for(context, - FLAGS.console_topic, - FLAGS.console_host) + return rpc.queue_get_for(context, + FLAGS.console_topic, + FLAGS.console_host) def get_console_pool_info(self, context, console_type): return self.driver.get_console_pool_info(console_type) @@ -1257,7 +1257,7 @@ class ComputeManager(manager.SchedulerDependentManager): network_info = self._get_instance_nw_info(context, instance_ref) self.driver.destroy(instance_ref, self._legacy_nw_info(network_info)) - topic = self.db.queue_get_for(context, FLAGS.compute_topic, + topic = rpc.queue_get_for(context, FLAGS.compute_topic, migration_ref['source_compute']) rpc.cast(context, topic, {'method': 'finish_revert_resize', @@ -1349,7 +1349,7 @@ class ComputeManager(manager.SchedulerDependentManager): 'status': 'pre-migrating'}) LOG.audit(_('Migrating'), context=context, instance=instance_ref) - topic = self.db.queue_get_for(context, FLAGS.compute_topic, + topic = rpc.queue_get_for(context, FLAGS.compute_topic, instance_ref['host']) rpc.cast(context, topic, {'method': 'resize_instance', @@ -1412,9 +1412,9 @@ class ComputeManager(manager.SchedulerDependentManager): service = self.db.service_get_by_host_and_topic( context, migration_ref['dest_compute'], FLAGS.compute_topic) - topic = self.db.queue_get_for(context, - FLAGS.compute_topic, - migration_ref['dest_compute']) + topic = rpc.queue_get_for(context, + FLAGS.compute_topic, + migration_ref['dest_compute']) params = {'migration_id': migration_id, 'disk_info': disk_info, 'instance_uuid': instance_ref['uuid'], @@ -2040,7 +2040,7 @@ class ComputeManager(manager.SchedulerDependentManager): disk = None rpc.call(context, - self.db.queue_get_for(context, FLAGS.compute_topic, dest), + rpc.queue_get_for(context, FLAGS.compute_topic, dest), {'method': 'pre_live_migration', 'args': {'instance_id': instance_id, 'block_migration': block_migration, @@ -2122,7 +2122,7 @@ class ComputeManager(manager.SchedulerDependentManager): # Define domain at destination host, without doing it, # pause/suspend/terminate do not work. rpc.call(ctxt, - self.db.queue_get_for(ctxt, FLAGS.compute_topic, dest), + rpc.queue_get_for(ctxt, FLAGS.compute_topic, dest), {"method": "post_live_migration_at_destination", "args": {'instance_id': instance_ref['id'], 'block_migration': block_migration}}) @@ -2227,7 +2227,7 @@ class ComputeManager(manager.SchedulerDependentManager): # any empty images has to be deleted. if block_migration: rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, dest), + rpc.queue_get_for(context, FLAGS.compute_topic, dest), {"method": "rollback_live_migration_at_destination", "args": {'instance_id': instance_ref['id']}}) diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py index a2a3b281c..5db0282c3 100644 --- a/nova/compute/rpcapi.py +++ b/nova/compute/rpcapi.py @@ -18,16 +18,16 @@ Client side of the compute RPC API. """ -from nova.db import base from nova import exception from nova import flags +from nova import rpc import nova.rpc.proxy FLAGS = flags.FLAGS -class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base): +class ComputeAPI(nova.rpc.proxy.RpcProxy): '''Client side of the compute rpc API. API version history: @@ -58,7 +58,7 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base): if not host: raise exception.NovaException(_('Unable to find host for ' 'Instance %s') % instance['uuid']) - return self.db.queue_get_for(ctxt, self.topic, host) + return rpc.queue_get_for(ctxt, self.topic, host) def add_aggregate_host(self, ctxt, aggregate_id, host_param, host): '''Add aggregate host. diff --git a/nova/console/api.py b/nova/console/api.py index 0feaae488..20f00030e 100644 --- a/nova/console/api.py +++ b/nova/console/api.py @@ -44,8 +44,8 @@ class API(base.Base): def delete_console(self, context, instance_id, console_id): instance_id = self._translate_uuid_if_necessary(context, instance_id) console = self.db.console_get(context, console_id, instance_id) - topic = self.db.queue_get_for(context, FLAGS.console_topic, - pool['host']) + topic = rpc.queue_get_for(context, FLAGS.console_topic, + pool['host']) rpcapi = console_rpcapi.ConsoleAPI(topic=topic) rpcapi.remove_console(context, console['id']) @@ -61,9 +61,9 @@ class API(base.Base): rpcapi.add_console(context, instance['id']) def _get_console_topic(self, context, instance_host): - topic = self.db.queue_get_for(context, - FLAGS.compute_topic, - instance_host) + topic = rpc.queue_get_for(context, + FLAGS.compute_topic, + instance_host) return rpc.call(context, topic, {'method': 'get_console_topic', 'args': {'fake': 1}}) diff --git a/nova/console/manager.py b/nova/console/manager.py index 8a42b449a..96a9ef31c 100644 --- a/nova/console/manager.py +++ b/nova/console/manager.py @@ -122,7 +122,7 @@ class ConsoleProxyManager(manager.Manager): 'password': '1234pass'} else: pool_info = rpc.call(context, - self.db.queue_get_for(context, + rpc.queue_get_for(context, FLAGS.compute_topic, instance_host), {'method': 'get_console_pool_info', diff --git a/nova/console/vmrc_manager.py b/nova/console/vmrc_manager.py index 0968ecd31..2008ff1b2 100644 --- a/nova/console/vmrc_manager.py +++ b/nova/console/vmrc_manager.py @@ -138,9 +138,9 @@ class ConsoleVMRCManager(manager.Manager): console_type) except exception.NotFound: pool_info = rpc.call(context, - self.db.queue_get_for(context, - FLAGS.compute_topic, - instance_host), + rpc.queue_get_for(context, + FLAGS.compute_topic, + instance_host), {'method': 'get_console_pool_info', 'args': {'console_type': console_type}}) pool_info['password'] = self.driver.fix_pool_password( diff --git a/nova/db/api.py b/nova/db/api.py index 75739df8d..d33584036 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -848,14 +848,6 @@ def network_update(context, network_id, values): ################### -def queue_get_for(context, topic, physical_node_id): - """Return a channel to send a message to a node with a topic.""" - return IMPL.queue_get_for(context, topic, physical_node_id) - - -################### - - def iscsi_target_count_by_host(context, host): """Return count of export devices.""" return IMPL.iscsi_target_count_by_host(context, host) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 22eb7a8fd..c28239453 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -2203,14 +2203,6 @@ def network_update(context, network_id, values): ################### -def queue_get_for(context, topic, physical_node_id): - # FIXME(ja): this should be servername? - return "%s.%s" % (topic, physical_node_id) - - -################### - - @require_admin_context def iscsi_target_count_by_host(context, host): return model_query(context, models.IscsiTarget).\ diff --git a/nova/network/manager.py b/nova/network/manager.py index e552dec7f..1f1580634 100644 --- a/nova/network/manager.py +++ b/nova/network/manager.py @@ -201,9 +201,7 @@ class RPCAllocateFixedIP(object): jsonutils.to_primitive(network)}}) if host != self.host: # need to call allocate_fixed_ip to correct network host - topic = self.db.queue_get_for(context, - FLAGS.network_topic, - host) + topic = rpc.queue_get_for(context, FLAGS.network_topic, host) args = {} args['instance_id'] = instance_id args['network_id'] = network['id'] @@ -241,7 +239,7 @@ class RPCAllocateFixedIP(object): host = network['host'] if host != self.host: # need to call deallocate_fixed_ip on correct network host - topic = self.db.queue_get_for(context, FLAGS.network_topic, host) + topic = rpc.queue_get_for(context, FLAGS.network_topic, host) args = {'address': address, 'host': host} rpc.cast(context, topic, @@ -513,7 +511,7 @@ class FloatingIP(object): else: # send to correct host rpc.cast(context, - self.db.queue_get_for(context, FLAGS.network_topic, host), + rpc.queue_get_for(context, FLAGS.network_topic, host), {'method': '_associate_floating_ip', 'args': {'floating_address': floating_address, 'fixed_address': fixed_address, @@ -583,7 +581,7 @@ class FloatingIP(object): else: # send to correct host rpc.cast(context, - self.db.queue_get_for(context, FLAGS.network_topic, host), + rpc.queue_get_for(context, FLAGS.network_topic, host), {'method': '_disassociate_floating_ip', 'args': {'address': address, 'interface': interface}}) @@ -1545,8 +1543,7 @@ class NetworkManager(manager.SchedulerDependentManager): call_func(context, network) else: # i'm not the right host, run call on correct host - topic = self.db.queue_get_for(context, FLAGS.network_topic, - host) + topic = rpc.queue_get_for(context, FLAGS.network_topic, host) args = {'network_id': network['id'], 'teardown': teardown} # NOTE(tr3buchet): the call is just to wait for completion green_pool.spawn_n(rpc.call, context, topic, diff --git a/nova/network/quantum/manager.py b/nova/network/quantum/manager.py index 8765f1082..50bc8cf14 100644 --- a/nova/network/quantum/manager.py +++ b/nova/network/quantum/manager.py @@ -291,7 +291,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager): if net_ref['host'] == self.host: self.kill_dhcp(net_ref) else: - topic = self.db.queue_get_for(context, + topic = rpc.queue_get_for(context, FLAGS.network_topic, net_ref['host']) @@ -389,7 +389,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager): self.enable_dhcp(context, network['quantum_net_id'], network, vif_rec, network['net_tenant_id']) else: - topic = self.db.queue_get_for(context, + topic = rpc.queue_get_for(context, FLAGS.network_topic, network['host']) rpc.call(context, topic, {'method': 'enable_dhcp', 'args': {'quantum_net_id': network['quantum_net_id'], @@ -608,7 +608,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager): self.update_dhcp(context, ipam_tenant_id, network, vif, project_id) else: - topic = self.db.queue_get_for(context, + topic = rpc.queue_get_for(context, FLAGS.network_topic, network['host']) rpc.call(context, topic, {'method': 'update_dhcp', 'args': {'ipam_tenant_id': ipam_tenant_id, diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index 93fa6b173..1980f9679 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -226,6 +226,11 @@ def fanout_cast_to_server(context, server_params, topic, msg): topic, msg) +def queue_get_for(context, topic, host): + """Get a queue name for a given topic + host.""" + return '%s.%s' % (topic, host) + + _RPCIMPL = None diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py index 8e49e5aa4..364278584 100644 --- a/nova/scheduler/driver.py +++ b/nova/scheduler/driver.py @@ -62,8 +62,8 @@ def cast_to_volume_host(context, host, method, update_db=True, **kwargs): db.volume_update(context, volume_id, {'host': host, 'scheduled_at': now}) rpc.cast(context, - db.queue_get_for(context, 'volume', host), - {"method": method, "args": kwargs}) + rpc.queue_get_for(context, 'volume', host), + {"method": method, "args": kwargs}) LOG.debug(_("Casted '%(method)s' to volume '%(host)s'") % locals()) @@ -79,8 +79,8 @@ def cast_to_compute_host(context, host, method, update_db=True, **kwargs): db.instance_update(context, instance_uuid, {'host': host, 'scheduled_at': now}) rpc.cast(context, - db.queue_get_for(context, 'compute', host), - {"method": method, "args": kwargs}) + rpc.queue_get_for(context, 'compute', host), + {"method": method, "args": kwargs}) LOG.debug(_("Casted '%(method)s' to compute '%(host)s'") % locals()) @@ -88,8 +88,8 @@ def cast_to_network_host(context, host, method, update_db=False, **kwargs): """Cast request to a network host queue""" rpc.cast(context, - db.queue_get_for(context, 'network', host), - {"method": method, "args": kwargs}) + rpc.queue_get_for(context, 'network', host), + {"method": method, "args": kwargs}) LOG.debug(_("Casted '%(method)s' to network '%(host)s'") % locals()) @@ -106,8 +106,8 @@ def cast_to_host(context, topic, host, method, update_db=True, **kwargs): func(context, host, method, update_db=update_db, **kwargs) else: rpc.cast(context, - db.queue_get_for(context, topic, host), - {"method": method, "args": kwargs}) + rpc.queue_get_for(context, topic, host), + {"method": method, "args": kwargs}) LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'") % locals()) @@ -355,7 +355,7 @@ class Scheduler(object): # Checking cpuinfo. try: rpc.call(context, - db.queue_get_for(context, FLAGS.compute_topic, dest), + rpc.queue_get_for(context, FLAGS.compute_topic, dest), {"method": 'compare_cpu', "args": {'cpu_info': oservice_ref['cpu_info']}}) @@ -443,7 +443,7 @@ class Scheduler(object): available = available_gb * (1024 ** 3) # Getting necessary disk size - topic = db.queue_get_for(context, FLAGS.compute_topic, + topic = rpc.queue_get_for(context, FLAGS.compute_topic, instance_ref['host']) ret = rpc.call(context, topic, {"method": 'get_instance_disk_info', @@ -492,8 +492,8 @@ class Scheduler(object): """ src = instance_ref['host'] - dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest) - src_t = db.queue_get_for(context, FLAGS.compute_topic, src) + dst_t = rpc.queue_get_for(context, FLAGS.compute_topic, dest) + src_t = rpc.queue_get_for(context, FLAGS.compute_topic, src) filename = rpc.call(context, dst_t, {"method": 'create_shared_storage_test_file'}) diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py index aefe19581..83a8963a5 100644 --- a/nova/tests/api/openstack/compute/test_servers.py +++ b/nova/tests/api/openstack/compute/test_servers.py @@ -1491,7 +1491,7 @@ class ServersControllerCreateTest(test.TestCase): self.stubs.Set(nova.rpc, 'call', rpc_call_wrapper) self.stubs.Set(nova.db, 'instance_update_and_get_original', server_update) - self.stubs.Set(nova.db, 'queue_get_for', queue_get_for) + self.stubs.Set(nova.rpc, 'queue_get_for', queue_get_for) self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip', fake_method) diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py index 51e36daab..e779e46a4 100644 --- a/nova/tests/compute/test_compute.py +++ b/nova/tests/compute/test_compute.py @@ -1425,7 +1425,7 @@ class ComputeTestCase(BaseTestCase): inst_ref = self._create_fake_instance({'host': 'dummy'}) c = context.get_admin_context() - topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host']) + topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host']) # creating volume testdata volume_id = db.volume_create(c, {'size': 1})['id'] @@ -1480,7 +1480,7 @@ class ComputeTestCase(BaseTestCase): instance_id = instance['id'] c = context.get_admin_context() inst_ref = db.instance_get(c, instance_id) - topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host']) + topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host']) # create self.mox.StubOutWithMock(rpc, 'call') @@ -1522,7 +1522,7 @@ class ComputeTestCase(BaseTestCase): self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance') self.compute.driver.unfilter_instance(i_ref, []) self.mox.StubOutWithMock(rpc, 'call') - rpc.call(c, db.queue_get_for(c, FLAGS.compute_topic, dest), + rpc.call(c, rpc.queue_get_for(c, FLAGS.compute_topic, dest), {"method": "post_live_migration_at_destination", "args": {'instance_id': i_ref['id'], 'block_migration': False}}) self.mox.StubOutWithMock(self.compute.driver, 'unplug_vifs') diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py index f97ddb730..a78fd2e12 100644 --- a/nova/tests/db/fakes.py +++ b/nova/tests/db/fakes.py @@ -304,9 +304,6 @@ def stub_out_db_network_api(stubs): return [FakeModel(n) for n in networks if n['project_id'] == project_id] - def fake_queue_get_for(context, topic, node): - return "%s.%s" % (topic, node) - funcs = [fake_floating_ip_allocate_address, fake_floating_ip_deallocate, fake_floating_ip_disassociate, @@ -335,8 +332,7 @@ def stub_out_db_network_api(stubs): fake_network_get_all_by_instance, fake_network_set_host, fake_network_update, - fake_project_get_networks, - fake_queue_get_for] + fake_project_get_networks] stub_out(stubs, funcs) diff --git a/nova/tests/scheduler/test_scheduler.py b/nova/tests/scheduler/test_scheduler.py index 938f60b46..302f22939 100644 --- a/nova/tests/scheduler/test_scheduler.py +++ b/nova/tests/scheduler/test_scheduler.py @@ -476,7 +476,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host') self.mox.StubOutWithMock(self.driver, '_get_compute_info') self.mox.StubOutWithMock(db, 'instance_get_all_by_host') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') self.mox.StubOutWithMock(db, 'instance_update_and_get_original') @@ -504,7 +504,7 @@ class SchedulerTestCase(test.TestCase): # assert_compute_node_has_enough_disk() self.driver._get_compute_info(self.context, dest, 'disk_available_least').AndReturn(1025) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue1') rpc.call(self.context, 'src_queue1', {'method': 'get_instance_disk_info', @@ -512,9 +512,9 @@ class SchedulerTestCase(test.TestCase): json.dumps([{'disk_size': 1024 * (1024 ** 3)}])) # Common checks (shared storage ok, same hypervisor,e tc) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -535,7 +535,7 @@ class SchedulerTestCase(test.TestCase): [{'compute_node': [{'hypervisor_type': 'xen', 'hypervisor_version': 1, 'cpu_info': 'fake_cpu_info'}]}]) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') rpc.call(self.context, 'dest_queue', {'method': 'compare_cpu', @@ -696,7 +696,7 @@ class SchedulerTestCase(test.TestCase): 'assert_compute_node_has_enough_memory') self.mox.StubOutWithMock(self.driver, '_get_compute_info') self.mox.StubOutWithMock(db, 'instance_get_all_by_host') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') dest = 'fake_host2' @@ -717,7 +717,7 @@ class SchedulerTestCase(test.TestCase): # Not enough disk self.driver._get_compute_info(self.context, dest, 'disk_available_least').AndReturn(1023) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') rpc.call(self.context, 'src_queue', {'method': 'get_instance_disk_info', @@ -737,7 +737,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'instance_get') self.mox.StubOutWithMock(self.driver, '_live_migration_src_check') self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') @@ -751,9 +751,9 @@ class SchedulerTestCase(test.TestCase): self.driver._live_migration_dest_check(self.context, instance, dest, block_migration, disk_over_commit) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -779,7 +779,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'instance_get') self.mox.StubOutWithMock(self.driver, '_live_migration_src_check') self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') @@ -793,9 +793,9 @@ class SchedulerTestCase(test.TestCase): self.driver._live_migration_dest_check(self.context, instance, dest, block_migration, disk_over_commit) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -819,7 +819,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'instance_get') self.mox.StubOutWithMock(self.driver, '_live_migration_src_check') self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host') @@ -834,9 +834,9 @@ class SchedulerTestCase(test.TestCase): self.driver._live_migration_dest_check(self.context, instance, dest, block_migration, disk_over_commit) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -868,7 +868,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'instance_get') self.mox.StubOutWithMock(self.driver, '_live_migration_src_check') self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host') @@ -883,9 +883,9 @@ class SchedulerTestCase(test.TestCase): self.driver._live_migration_dest_check(self.context, instance, dest, block_migration, disk_over_commit) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -916,7 +916,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'instance_get') self.mox.StubOutWithMock(self.driver, '_live_migration_src_check') self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host') @@ -931,9 +931,9 @@ class SchedulerTestCase(test.TestCase): self.driver._live_migration_dest_check(self.context, instance, dest, block_migration, disk_over_commit) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -953,7 +953,7 @@ class SchedulerTestCase(test.TestCase): [{'compute_node': [{'hypervisor_type': 'xen', 'hypervisor_version': 1, 'cpu_info': 'fake_cpu_info'}]}]) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') rpc.call(self.context, 'dest_queue', {'method': 'compare_cpu', @@ -1018,13 +1018,13 @@ class SchedulerDriverModuleTestCase(test.TestCase): self.mox.StubOutWithMock(utils, 'utcnow') self.mox.StubOutWithMock(db, 'volume_update') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') utils.utcnow().AndReturn('fake-now') db.volume_update(self.context, 31337, {'host': host, 'scheduled_at': 'fake-now'}) - db.queue_get_for(self.context, 'volume', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1039,10 +1039,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, 'volume', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1057,10 +1057,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, 'volume', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1078,13 +1078,13 @@ class SchedulerDriverModuleTestCase(test.TestCase): self.mox.StubOutWithMock(utils, 'utcnow') self.mox.StubOutWithMock(db, 'instance_update') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') utils.utcnow().AndReturn('fake-now') db.instance_update(self.context, 31337, {'host': host, 'scheduled_at': 'fake-now'}) - db.queue_get_for(self.context, 'compute', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1099,10 +1099,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, 'compute', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1117,10 +1117,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, 'compute', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1135,10 +1135,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, 'network', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'network', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1193,10 +1193,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): topic = 'unknown' queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, topic, host).AndReturn(queue) + rpc.queue_get_for(self.context, topic, host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py index f3cce22a9..6bf3edad4 100644 --- a/nova/virt/xenapi/pool.py +++ b/nova/virt/xenapi/pool.py @@ -195,7 +195,7 @@ def forward_request(context, request_type, master, aggregate_id, # because this might be 169.254.0.1, i.e. xenapi # NOTE: password in clear is not great, but it'll do for now sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address) - rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master), + rpc.cast(context, rpc.queue_get_for(context, FLAGS.compute_topic, master), {"method": request_type, "args": {"aggregate_id": aggregate_id, "host": slave_compute, diff --git a/nova/volume/api.py b/nova/volume/api.py index e03e78779..0b9b4afff 100644 --- a/nova/volume/api.py +++ b/nova/volume/api.py @@ -152,7 +152,7 @@ class API(base.Base): 'terminated_at': now}) host = volume['host'] rpc.cast(context, - self.db.queue_get_for(context, FLAGS.volume_topic, host), + rpc.queue_get_for(context, FLAGS.volume_topic, host), {"method": "delete_volume", "args": {"volume_id": volume_id}}) @@ -238,7 +238,7 @@ class API(base.Base): def remove_from_compute(self, context, volume, instance_id, host): """Remove volume from specified compute host.""" rpc.call(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), + rpc.queue_get_for(context, FLAGS.compute_topic, host), {"method": "remove_volume_connection", "args": {'instance_id': instance_id, 'volume_id': volume['id']}}) @@ -255,7 +255,7 @@ class API(base.Base): @wrap_check_policy def attach(self, context, volume, instance_uuid, mountpoint): host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "attach_volume", "args": {"volume_id": volume['id'], @@ -265,7 +265,7 @@ class API(base.Base): @wrap_check_policy def detach(self, context, volume): host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "detach_volume", "args": {"volume_id": volume['id']}}) @@ -273,7 +273,7 @@ class API(base.Base): @wrap_check_policy def initialize_connection(self, context, volume, connector): host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "initialize_connection", "args": {"volume_id": volume['id'], @@ -283,7 +283,7 @@ class API(base.Base): def terminate_connection(self, context, volume, connector): self.unreserve_volume(context, volume) host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "terminate_connection", "args": {"volume_id": volume['id'], @@ -310,7 +310,7 @@ class API(base.Base): snapshot = self.db.snapshot_create(context, options) host = volume['host'] rpc.cast(context, - self.db.queue_get_for(context, FLAGS.volume_topic, host), + rpc.queue_get_for(context, FLAGS.volume_topic, host), {"method": "create_snapshot", "args": {"volume_id": volume['id'], "snapshot_id": snapshot['id']}}) @@ -334,7 +334,7 @@ class API(base.Base): volume = self.db.volume_get(context, snapshot['volume_id']) host = volume['host'] rpc.cast(context, - self.db.queue_get_for(context, FLAGS.volume_topic, host), + rpc.queue_get_for(context, FLAGS.volume_topic, host), {"method": "delete_snapshot", "args": {"snapshot_id": snapshot['id']}}) -- cgit