diff options
-rwxr-xr-x | bin/nova-manage | 4 | ||||
-rw-r--r-- | nova/compute/api.py | 3 | ||||
-rw-r--r-- | nova/compute/manager.py | 22 | ||||
-rw-r--r-- | nova/compute/rpcapi.py | 6 | ||||
-rw-r--r-- | nova/console/api.py | 10 | ||||
-rw-r--r-- | nova/console/manager.py | 2 | ||||
-rw-r--r-- | nova/console/vmrc_manager.py | 6 | ||||
-rw-r--r-- | nova/db/api.py | 8 | ||||
-rw-r--r-- | nova/db/sqlalchemy/api.py | 8 | ||||
-rw-r--r-- | nova/network/manager.py | 13 | ||||
-rw-r--r-- | nova/network/quantum/manager.py | 6 | ||||
-rw-r--r-- | nova/rpc/__init__.py | 5 | ||||
-rw-r--r-- | nova/scheduler/driver.py | 24 | ||||
-rw-r--r-- | nova/tests/api/openstack/compute/test_servers.py | 2 | ||||
-rw-r--r-- | nova/tests/compute/test_compute.py | 6 | ||||
-rw-r--r-- | nova/tests/db/fakes.py | 6 | ||||
-rw-r--r-- | nova/tests/scheduler/test_scheduler.py | 78 | ||||
-rw-r--r-- | nova/virt/xenapi/pool.py | 2 | ||||
-rw-r--r-- | nova/volume/api.py | 16 |
19 files changed, 104 insertions, 123 deletions
diff --git a/bin/nova-manage b/bin/nova-manage index 20326b3c4..1554251a0 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -1153,7 +1153,7 @@ class VolumeCommands(object): return rpc.cast(ctxt, - db.queue_get_for(ctxt, FLAGS.volume_topic, host), + rpc.queue_get_for(ctxt, FLAGS.volume_topic, host), {"method": "delete_volume", "args": {"volume_id": volume['id']}}) @@ -1171,7 +1171,7 @@ class VolumeCommands(object): instance = db.instance_get(ctxt, volume['instance_id']) host = instance['host'] rpc.cast(ctxt, - db.queue_get_for(ctxt, FLAGS.compute_topic, host), + rpc.queue_get_for(ctxt, FLAGS.compute_topic, host), {"method": "attach_volume", "args": {"instance_id": instance['id'], "volume_id": volume['id'], diff --git a/nova/compute/api.py b/nova/compute/api.py index 3990a9596..9f96b8e6b 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -841,7 +841,7 @@ class API(base.Base): in self.db.service_get_all_compute_sorted(context)] for host in hosts: rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), + rpc.queue_get_for(context, FLAGS.compute_topic, host), {'method': 'refresh_provider_fw_rules', 'args': {}}) def _is_security_group_associated_with_server(self, security_group, @@ -1814,7 +1814,6 @@ class HostAPI(base.Base): """Reboots, shuts down or powers up the host.""" # NOTE(comstud): No instance_uuid argument to this compute manager # call - topic = self.db.queue_get_for(context, FLAGS.compute_topic, host) return self.compute_rpcapi.host_power_action(context, action=action, host=host) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 856175304..7ab27f740 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -315,9 +315,9 @@ class ComputeManager(manager.SchedulerDependentManager): """ #TODO(mdragon): perhaps make this variable by console_type? - return self.db.queue_get_for(context, - FLAGS.console_topic, - FLAGS.console_host) + return rpc.queue_get_for(context, + FLAGS.console_topic, + FLAGS.console_host) def get_console_pool_info(self, context, console_type): return self.driver.get_console_pool_info(console_type) @@ -1257,7 +1257,7 @@ class ComputeManager(manager.SchedulerDependentManager): network_info = self._get_instance_nw_info(context, instance_ref) self.driver.destroy(instance_ref, self._legacy_nw_info(network_info)) - topic = self.db.queue_get_for(context, FLAGS.compute_topic, + topic = rpc.queue_get_for(context, FLAGS.compute_topic, migration_ref['source_compute']) rpc.cast(context, topic, {'method': 'finish_revert_resize', @@ -1349,7 +1349,7 @@ class ComputeManager(manager.SchedulerDependentManager): 'status': 'pre-migrating'}) LOG.audit(_('Migrating'), context=context, instance=instance_ref) - topic = self.db.queue_get_for(context, FLAGS.compute_topic, + topic = rpc.queue_get_for(context, FLAGS.compute_topic, instance_ref['host']) rpc.cast(context, topic, {'method': 'resize_instance', @@ -1412,9 +1412,9 @@ class ComputeManager(manager.SchedulerDependentManager): service = self.db.service_get_by_host_and_topic( context, migration_ref['dest_compute'], FLAGS.compute_topic) - topic = self.db.queue_get_for(context, - FLAGS.compute_topic, - migration_ref['dest_compute']) + topic = rpc.queue_get_for(context, + FLAGS.compute_topic, + migration_ref['dest_compute']) params = {'migration_id': migration_id, 'disk_info': disk_info, 'instance_uuid': instance_ref['uuid'], @@ -2040,7 +2040,7 @@ class ComputeManager(manager.SchedulerDependentManager): disk = None rpc.call(context, - self.db.queue_get_for(context, FLAGS.compute_topic, dest), + rpc.queue_get_for(context, FLAGS.compute_topic, dest), {'method': 'pre_live_migration', 'args': {'instance_id': instance_id, 'block_migration': block_migration, @@ -2122,7 +2122,7 @@ class ComputeManager(manager.SchedulerDependentManager): # Define domain at destination host, without doing it, # pause/suspend/terminate do not work. rpc.call(ctxt, - self.db.queue_get_for(ctxt, FLAGS.compute_topic, dest), + rpc.queue_get_for(ctxt, FLAGS.compute_topic, dest), {"method": "post_live_migration_at_destination", "args": {'instance_id': instance_ref['id'], 'block_migration': block_migration}}) @@ -2227,7 +2227,7 @@ class ComputeManager(manager.SchedulerDependentManager): # any empty images has to be deleted. if block_migration: rpc.cast(context, - self.db.queue_get_for(context, FLAGS.compute_topic, dest), + rpc.queue_get_for(context, FLAGS.compute_topic, dest), {"method": "rollback_live_migration_at_destination", "args": {'instance_id': instance_ref['id']}}) diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py index a2a3b281c..5db0282c3 100644 --- a/nova/compute/rpcapi.py +++ b/nova/compute/rpcapi.py @@ -18,16 +18,16 @@ Client side of the compute RPC API. """ -from nova.db import base from nova import exception from nova import flags +from nova import rpc import nova.rpc.proxy FLAGS = flags.FLAGS -class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base): +class ComputeAPI(nova.rpc.proxy.RpcProxy): '''Client side of the compute rpc API. API version history: @@ -58,7 +58,7 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base): if not host: raise exception.NovaException(_('Unable to find host for ' 'Instance %s') % instance['uuid']) - return self.db.queue_get_for(ctxt, self.topic, host) + return rpc.queue_get_for(ctxt, self.topic, host) def add_aggregate_host(self, ctxt, aggregate_id, host_param, host): '''Add aggregate host. diff --git a/nova/console/api.py b/nova/console/api.py index 0feaae488..20f00030e 100644 --- a/nova/console/api.py +++ b/nova/console/api.py @@ -44,8 +44,8 @@ class API(base.Base): def delete_console(self, context, instance_id, console_id): instance_id = self._translate_uuid_if_necessary(context, instance_id) console = self.db.console_get(context, console_id, instance_id) - topic = self.db.queue_get_for(context, FLAGS.console_topic, - pool['host']) + topic = rpc.queue_get_for(context, FLAGS.console_topic, + pool['host']) rpcapi = console_rpcapi.ConsoleAPI(topic=topic) rpcapi.remove_console(context, console['id']) @@ -61,9 +61,9 @@ class API(base.Base): rpcapi.add_console(context, instance['id']) def _get_console_topic(self, context, instance_host): - topic = self.db.queue_get_for(context, - FLAGS.compute_topic, - instance_host) + topic = rpc.queue_get_for(context, + FLAGS.compute_topic, + instance_host) return rpc.call(context, topic, {'method': 'get_console_topic', 'args': {'fake': 1}}) diff --git a/nova/console/manager.py b/nova/console/manager.py index 8a42b449a..96a9ef31c 100644 --- a/nova/console/manager.py +++ b/nova/console/manager.py @@ -122,7 +122,7 @@ class ConsoleProxyManager(manager.Manager): 'password': '1234pass'} else: pool_info = rpc.call(context, - self.db.queue_get_for(context, + rpc.queue_get_for(context, FLAGS.compute_topic, instance_host), {'method': 'get_console_pool_info', diff --git a/nova/console/vmrc_manager.py b/nova/console/vmrc_manager.py index 0968ecd31..2008ff1b2 100644 --- a/nova/console/vmrc_manager.py +++ b/nova/console/vmrc_manager.py @@ -138,9 +138,9 @@ class ConsoleVMRCManager(manager.Manager): console_type) except exception.NotFound: pool_info = rpc.call(context, - self.db.queue_get_for(context, - FLAGS.compute_topic, - instance_host), + rpc.queue_get_for(context, + FLAGS.compute_topic, + instance_host), {'method': 'get_console_pool_info', 'args': {'console_type': console_type}}) pool_info['password'] = self.driver.fix_pool_password( diff --git a/nova/db/api.py b/nova/db/api.py index 75739df8d..d33584036 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -848,14 +848,6 @@ def network_update(context, network_id, values): ################### -def queue_get_for(context, topic, physical_node_id): - """Return a channel to send a message to a node with a topic.""" - return IMPL.queue_get_for(context, topic, physical_node_id) - - -################### - - def iscsi_target_count_by_host(context, host): """Return count of export devices.""" return IMPL.iscsi_target_count_by_host(context, host) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 718438569..30c556b4d 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -2203,14 +2203,6 @@ def network_update(context, network_id, values): ################### -def queue_get_for(context, topic, physical_node_id): - # FIXME(ja): this should be servername? - return "%s.%s" % (topic, physical_node_id) - - -################### - - @require_admin_context def iscsi_target_count_by_host(context, host): return model_query(context, models.IscsiTarget).\ diff --git a/nova/network/manager.py b/nova/network/manager.py index e552dec7f..1f1580634 100644 --- a/nova/network/manager.py +++ b/nova/network/manager.py @@ -201,9 +201,7 @@ class RPCAllocateFixedIP(object): jsonutils.to_primitive(network)}}) if host != self.host: # need to call allocate_fixed_ip to correct network host - topic = self.db.queue_get_for(context, - FLAGS.network_topic, - host) + topic = rpc.queue_get_for(context, FLAGS.network_topic, host) args = {} args['instance_id'] = instance_id args['network_id'] = network['id'] @@ -241,7 +239,7 @@ class RPCAllocateFixedIP(object): host = network['host'] if host != self.host: # need to call deallocate_fixed_ip on correct network host - topic = self.db.queue_get_for(context, FLAGS.network_topic, host) + topic = rpc.queue_get_for(context, FLAGS.network_topic, host) args = {'address': address, 'host': host} rpc.cast(context, topic, @@ -513,7 +511,7 @@ class FloatingIP(object): else: # send to correct host rpc.cast(context, - self.db.queue_get_for(context, FLAGS.network_topic, host), + rpc.queue_get_for(context, FLAGS.network_topic, host), {'method': '_associate_floating_ip', 'args': {'floating_address': floating_address, 'fixed_address': fixed_address, @@ -583,7 +581,7 @@ class FloatingIP(object): else: # send to correct host rpc.cast(context, - self.db.queue_get_for(context, FLAGS.network_topic, host), + rpc.queue_get_for(context, FLAGS.network_topic, host), {'method': '_disassociate_floating_ip', 'args': {'address': address, 'interface': interface}}) @@ -1545,8 +1543,7 @@ class NetworkManager(manager.SchedulerDependentManager): call_func(context, network) else: # i'm not the right host, run call on correct host - topic = self.db.queue_get_for(context, FLAGS.network_topic, - host) + topic = rpc.queue_get_for(context, FLAGS.network_topic, host) args = {'network_id': network['id'], 'teardown': teardown} # NOTE(tr3buchet): the call is just to wait for completion green_pool.spawn_n(rpc.call, context, topic, diff --git a/nova/network/quantum/manager.py b/nova/network/quantum/manager.py index 8765f1082..50bc8cf14 100644 --- a/nova/network/quantum/manager.py +++ b/nova/network/quantum/manager.py @@ -291,7 +291,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager): if net_ref['host'] == self.host: self.kill_dhcp(net_ref) else: - topic = self.db.queue_get_for(context, + topic = rpc.queue_get_for(context, FLAGS.network_topic, net_ref['host']) @@ -389,7 +389,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager): self.enable_dhcp(context, network['quantum_net_id'], network, vif_rec, network['net_tenant_id']) else: - topic = self.db.queue_get_for(context, + topic = rpc.queue_get_for(context, FLAGS.network_topic, network['host']) rpc.call(context, topic, {'method': 'enable_dhcp', 'args': {'quantum_net_id': network['quantum_net_id'], @@ -608,7 +608,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager): self.update_dhcp(context, ipam_tenant_id, network, vif, project_id) else: - topic = self.db.queue_get_for(context, + topic = rpc.queue_get_for(context, FLAGS.network_topic, network['host']) rpc.call(context, topic, {'method': 'update_dhcp', 'args': {'ipam_tenant_id': ipam_tenant_id, diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index 93fa6b173..1980f9679 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -226,6 +226,11 @@ def fanout_cast_to_server(context, server_params, topic, msg): topic, msg) +def queue_get_for(context, topic, host): + """Get a queue name for a given topic + host.""" + return '%s.%s' % (topic, host) + + _RPCIMPL = None diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py index 8e49e5aa4..364278584 100644 --- a/nova/scheduler/driver.py +++ b/nova/scheduler/driver.py @@ -62,8 +62,8 @@ def cast_to_volume_host(context, host, method, update_db=True, **kwargs): db.volume_update(context, volume_id, {'host': host, 'scheduled_at': now}) rpc.cast(context, - db.queue_get_for(context, 'volume', host), - {"method": method, "args": kwargs}) + rpc.queue_get_for(context, 'volume', host), + {"method": method, "args": kwargs}) LOG.debug(_("Casted '%(method)s' to volume '%(host)s'") % locals()) @@ -79,8 +79,8 @@ def cast_to_compute_host(context, host, method, update_db=True, **kwargs): db.instance_update(context, instance_uuid, {'host': host, 'scheduled_at': now}) rpc.cast(context, - db.queue_get_for(context, 'compute', host), - {"method": method, "args": kwargs}) + rpc.queue_get_for(context, 'compute', host), + {"method": method, "args": kwargs}) LOG.debug(_("Casted '%(method)s' to compute '%(host)s'") % locals()) @@ -88,8 +88,8 @@ def cast_to_network_host(context, host, method, update_db=False, **kwargs): """Cast request to a network host queue""" rpc.cast(context, - db.queue_get_for(context, 'network', host), - {"method": method, "args": kwargs}) + rpc.queue_get_for(context, 'network', host), + {"method": method, "args": kwargs}) LOG.debug(_("Casted '%(method)s' to network '%(host)s'") % locals()) @@ -106,8 +106,8 @@ def cast_to_host(context, topic, host, method, update_db=True, **kwargs): func(context, host, method, update_db=update_db, **kwargs) else: rpc.cast(context, - db.queue_get_for(context, topic, host), - {"method": method, "args": kwargs}) + rpc.queue_get_for(context, topic, host), + {"method": method, "args": kwargs}) LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'") % locals()) @@ -355,7 +355,7 @@ class Scheduler(object): # Checking cpuinfo. try: rpc.call(context, - db.queue_get_for(context, FLAGS.compute_topic, dest), + rpc.queue_get_for(context, FLAGS.compute_topic, dest), {"method": 'compare_cpu', "args": {'cpu_info': oservice_ref['cpu_info']}}) @@ -443,7 +443,7 @@ class Scheduler(object): available = available_gb * (1024 ** 3) # Getting necessary disk size - topic = db.queue_get_for(context, FLAGS.compute_topic, + topic = rpc.queue_get_for(context, FLAGS.compute_topic, instance_ref['host']) ret = rpc.call(context, topic, {"method": 'get_instance_disk_info', @@ -492,8 +492,8 @@ class Scheduler(object): """ src = instance_ref['host'] - dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest) - src_t = db.queue_get_for(context, FLAGS.compute_topic, src) + dst_t = rpc.queue_get_for(context, FLAGS.compute_topic, dest) + src_t = rpc.queue_get_for(context, FLAGS.compute_topic, src) filename = rpc.call(context, dst_t, {"method": 'create_shared_storage_test_file'}) diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py index aefe19581..83a8963a5 100644 --- a/nova/tests/api/openstack/compute/test_servers.py +++ b/nova/tests/api/openstack/compute/test_servers.py @@ -1491,7 +1491,7 @@ class ServersControllerCreateTest(test.TestCase): self.stubs.Set(nova.rpc, 'call', rpc_call_wrapper) self.stubs.Set(nova.db, 'instance_update_and_get_original', server_update) - self.stubs.Set(nova.db, 'queue_get_for', queue_get_for) + self.stubs.Set(nova.rpc, 'queue_get_for', queue_get_for) self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip', fake_method) diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py index 83671d265..f1903bd4c 100644 --- a/nova/tests/compute/test_compute.py +++ b/nova/tests/compute/test_compute.py @@ -1426,7 +1426,7 @@ class ComputeTestCase(BaseTestCase): inst_ref = self._create_fake_instance({'host': 'dummy'}) c = context.get_admin_context() - topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host']) + topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host']) # creating volume testdata volume_id = db.volume_create(c, {'size': 1})['id'] @@ -1481,7 +1481,7 @@ class ComputeTestCase(BaseTestCase): instance_id = instance['id'] c = context.get_admin_context() inst_ref = db.instance_get(c, instance_id) - topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host']) + topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host']) # create self.mox.StubOutWithMock(rpc, 'call') @@ -1523,7 +1523,7 @@ class ComputeTestCase(BaseTestCase): self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance') self.compute.driver.unfilter_instance(i_ref, []) self.mox.StubOutWithMock(rpc, 'call') - rpc.call(c, db.queue_get_for(c, FLAGS.compute_topic, dest), + rpc.call(c, rpc.queue_get_for(c, FLAGS.compute_topic, dest), {"method": "post_live_migration_at_destination", "args": {'instance_id': i_ref['id'], 'block_migration': False}}) self.mox.StubOutWithMock(self.compute.driver, 'unplug_vifs') diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py index f97ddb730..a78fd2e12 100644 --- a/nova/tests/db/fakes.py +++ b/nova/tests/db/fakes.py @@ -304,9 +304,6 @@ def stub_out_db_network_api(stubs): return [FakeModel(n) for n in networks if n['project_id'] == project_id] - def fake_queue_get_for(context, topic, node): - return "%s.%s" % (topic, node) - funcs = [fake_floating_ip_allocate_address, fake_floating_ip_deallocate, fake_floating_ip_disassociate, @@ -335,8 +332,7 @@ def stub_out_db_network_api(stubs): fake_network_get_all_by_instance, fake_network_set_host, fake_network_update, - fake_project_get_networks, - fake_queue_get_for] + fake_project_get_networks] stub_out(stubs, funcs) diff --git a/nova/tests/scheduler/test_scheduler.py b/nova/tests/scheduler/test_scheduler.py index 938f60b46..302f22939 100644 --- a/nova/tests/scheduler/test_scheduler.py +++ b/nova/tests/scheduler/test_scheduler.py @@ -476,7 +476,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host') self.mox.StubOutWithMock(self.driver, '_get_compute_info') self.mox.StubOutWithMock(db, 'instance_get_all_by_host') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') self.mox.StubOutWithMock(db, 'instance_update_and_get_original') @@ -504,7 +504,7 @@ class SchedulerTestCase(test.TestCase): # assert_compute_node_has_enough_disk() self.driver._get_compute_info(self.context, dest, 'disk_available_least').AndReturn(1025) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue1') rpc.call(self.context, 'src_queue1', {'method': 'get_instance_disk_info', @@ -512,9 +512,9 @@ class SchedulerTestCase(test.TestCase): json.dumps([{'disk_size': 1024 * (1024 ** 3)}])) # Common checks (shared storage ok, same hypervisor,e tc) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -535,7 +535,7 @@ class SchedulerTestCase(test.TestCase): [{'compute_node': [{'hypervisor_type': 'xen', 'hypervisor_version': 1, 'cpu_info': 'fake_cpu_info'}]}]) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') rpc.call(self.context, 'dest_queue', {'method': 'compare_cpu', @@ -696,7 +696,7 @@ class SchedulerTestCase(test.TestCase): 'assert_compute_node_has_enough_memory') self.mox.StubOutWithMock(self.driver, '_get_compute_info') self.mox.StubOutWithMock(db, 'instance_get_all_by_host') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') dest = 'fake_host2' @@ -717,7 +717,7 @@ class SchedulerTestCase(test.TestCase): # Not enough disk self.driver._get_compute_info(self.context, dest, 'disk_available_least').AndReturn(1023) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') rpc.call(self.context, 'src_queue', {'method': 'get_instance_disk_info', @@ -737,7 +737,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'instance_get') self.mox.StubOutWithMock(self.driver, '_live_migration_src_check') self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') @@ -751,9 +751,9 @@ class SchedulerTestCase(test.TestCase): self.driver._live_migration_dest_check(self.context, instance, dest, block_migration, disk_over_commit) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -779,7 +779,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'instance_get') self.mox.StubOutWithMock(self.driver, '_live_migration_src_check') self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') @@ -793,9 +793,9 @@ class SchedulerTestCase(test.TestCase): self.driver._live_migration_dest_check(self.context, instance, dest, block_migration, disk_over_commit) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -819,7 +819,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'instance_get') self.mox.StubOutWithMock(self.driver, '_live_migration_src_check') self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host') @@ -834,9 +834,9 @@ class SchedulerTestCase(test.TestCase): self.driver._live_migration_dest_check(self.context, instance, dest, block_migration, disk_over_commit) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -868,7 +868,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'instance_get') self.mox.StubOutWithMock(self.driver, '_live_migration_src_check') self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host') @@ -883,9 +883,9 @@ class SchedulerTestCase(test.TestCase): self.driver._live_migration_dest_check(self.context, instance, dest, block_migration, disk_over_commit) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -916,7 +916,7 @@ class SchedulerTestCase(test.TestCase): self.mox.StubOutWithMock(db, 'instance_get') self.mox.StubOutWithMock(self.driver, '_live_migration_src_check') self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'cast') self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host') @@ -931,9 +931,9 @@ class SchedulerTestCase(test.TestCase): self.driver._live_migration_dest_check(self.context, instance, dest, block_migration, disk_over_commit) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, instance['host']).AndReturn('src_queue') tmp_filename = 'test-filename' rpc.call(self.context, 'dest_queue', @@ -953,7 +953,7 @@ class SchedulerTestCase(test.TestCase): [{'compute_node': [{'hypervisor_type': 'xen', 'hypervisor_version': 1, 'cpu_info': 'fake_cpu_info'}]}]) - db.queue_get_for(self.context, FLAGS.compute_topic, + rpc.queue_get_for(self.context, FLAGS.compute_topic, dest).AndReturn('dest_queue') rpc.call(self.context, 'dest_queue', {'method': 'compare_cpu', @@ -1018,13 +1018,13 @@ class SchedulerDriverModuleTestCase(test.TestCase): self.mox.StubOutWithMock(utils, 'utcnow') self.mox.StubOutWithMock(db, 'volume_update') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') utils.utcnow().AndReturn('fake-now') db.volume_update(self.context, 31337, {'host': host, 'scheduled_at': 'fake-now'}) - db.queue_get_for(self.context, 'volume', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1039,10 +1039,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, 'volume', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1057,10 +1057,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, 'volume', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1078,13 +1078,13 @@ class SchedulerDriverModuleTestCase(test.TestCase): self.mox.StubOutWithMock(utils, 'utcnow') self.mox.StubOutWithMock(db, 'instance_update') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') utils.utcnow().AndReturn('fake-now') db.instance_update(self.context, 31337, {'host': host, 'scheduled_at': 'fake-now'}) - db.queue_get_for(self.context, 'compute', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1099,10 +1099,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, 'compute', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1117,10 +1117,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, 'compute', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1135,10 +1135,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, 'network', host).AndReturn(queue) + rpc.queue_get_for(self.context, 'network', host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) @@ -1193,10 +1193,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): topic = 'unknown' queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, topic, host).AndReturn(queue) + rpc.queue_get_for(self.context, topic, host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py index f3cce22a9..6bf3edad4 100644 --- a/nova/virt/xenapi/pool.py +++ b/nova/virt/xenapi/pool.py @@ -195,7 +195,7 @@ def forward_request(context, request_type, master, aggregate_id, # because this might be 169.254.0.1, i.e. xenapi # NOTE: password in clear is not great, but it'll do for now sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address) - rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master), + rpc.cast(context, rpc.queue_get_for(context, FLAGS.compute_topic, master), {"method": request_type, "args": {"aggregate_id": aggregate_id, "host": slave_compute, diff --git a/nova/volume/api.py b/nova/volume/api.py index e03e78779..0b9b4afff 100644 --- a/nova/volume/api.py +++ b/nova/volume/api.py @@ -152,7 +152,7 @@ class API(base.Base): 'terminated_at': now}) host = volume['host'] rpc.cast(context, - self.db.queue_get_for(context, FLAGS.volume_topic, host), + rpc.queue_get_for(context, FLAGS.volume_topic, host), {"method": "delete_volume", "args": {"volume_id": volume_id}}) @@ -238,7 +238,7 @@ class API(base.Base): def remove_from_compute(self, context, volume, instance_id, host): """Remove volume from specified compute host.""" rpc.call(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), + rpc.queue_get_for(context, FLAGS.compute_topic, host), {"method": "remove_volume_connection", "args": {'instance_id': instance_id, 'volume_id': volume['id']}}) @@ -255,7 +255,7 @@ class API(base.Base): @wrap_check_policy def attach(self, context, volume, instance_uuid, mountpoint): host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "attach_volume", "args": {"volume_id": volume['id'], @@ -265,7 +265,7 @@ class API(base.Base): @wrap_check_policy def detach(self, context, volume): host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "detach_volume", "args": {"volume_id": volume['id']}}) @@ -273,7 +273,7 @@ class API(base.Base): @wrap_check_policy def initialize_connection(self, context, volume, connector): host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "initialize_connection", "args": {"volume_id": volume['id'], @@ -283,7 +283,7 @@ class API(base.Base): def terminate_connection(self, context, volume, connector): self.unreserve_volume(context, volume) host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "terminate_connection", "args": {"volume_id": volume['id'], @@ -310,7 +310,7 @@ class API(base.Base): snapshot = self.db.snapshot_create(context, options) host = volume['host'] rpc.cast(context, - self.db.queue_get_for(context, FLAGS.volume_topic, host), + rpc.queue_get_for(context, FLAGS.volume_topic, host), {"method": "create_snapshot", "args": {"volume_id": volume['id'], "snapshot_id": snapshot['id']}}) @@ -334,7 +334,7 @@ class API(base.Base): volume = self.db.volume_get(context, snapshot['volume_id']) host = volume['host'] rpc.cast(context, - self.db.queue_get_for(context, FLAGS.volume_topic, host), + rpc.queue_get_for(context, FLAGS.volume_topic, host), {"method": "delete_snapshot", "args": {"snapshot_id": snapshot['id']}}) |