summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2012-05-29 16:35:35 -0400
committerRussell Bryant <rbryant@redhat.com>2012-05-30 17:50:24 -0400
commit26353bb372081f674cf5fd3dbbffd990918f3803 (patch)
treef98ee9e002d310ee9bf81b195e92c9f567186125
parentcb7c4377b373857cc57e95b84d4e93b0f785673c (diff)
Move queue_get_for() from db to rpc.
Part of blueprint common-rpc. The function queue_get_for() is a utility function used by various consumers of the rpc API. This function lived in the db API, but never ended up using anything from the database. This patch moves it into the rpc API so that it can be used by other users of rpc once it moves into openstack-common. Change-Id: If92675beecff5471b416a929c161b810e3c71939
-rwxr-xr-xbin/nova-manage4
-rw-r--r--nova/compute/api.py3
-rw-r--r--nova/compute/manager.py22
-rw-r--r--nova/compute/rpcapi.py6
-rw-r--r--nova/console/api.py10
-rw-r--r--nova/console/manager.py2
-rw-r--r--nova/console/vmrc_manager.py6
-rw-r--r--nova/db/api.py8
-rw-r--r--nova/db/sqlalchemy/api.py8
-rw-r--r--nova/network/manager.py13
-rw-r--r--nova/network/quantum/manager.py6
-rw-r--r--nova/rpc/__init__.py5
-rw-r--r--nova/scheduler/driver.py24
-rw-r--r--nova/tests/api/openstack/compute/test_servers.py2
-rw-r--r--nova/tests/compute/test_compute.py6
-rw-r--r--nova/tests/db/fakes.py6
-rw-r--r--nova/tests/scheduler/test_scheduler.py78
-rw-r--r--nova/virt/xenapi/pool.py2
-rw-r--r--nova/volume/api.py16
19 files changed, 104 insertions, 123 deletions
diff --git a/bin/nova-manage b/bin/nova-manage
index f94497087..eff73d2e4 100755
--- a/bin/nova-manage
+++ b/bin/nova-manage
@@ -1152,7 +1152,7 @@ class VolumeCommands(object):
return
rpc.cast(ctxt,
- db.queue_get_for(ctxt, FLAGS.volume_topic, host),
+ rpc.queue_get_for(ctxt, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume['id']}})
@@ -1170,7 +1170,7 @@ class VolumeCommands(object):
instance = db.instance_get(ctxt, volume['instance_id'])
host = instance['host']
rpc.cast(ctxt,
- db.queue_get_for(ctxt, FLAGS.compute_topic, host),
+ rpc.queue_get_for(ctxt, FLAGS.compute_topic, host),
{"method": "attach_volume",
"args": {"instance_id": instance['id'],
"volume_id": volume['id'],
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 3fd358a34..edcbff9b9 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -835,7 +835,7 @@ class API(base.Base):
in self.db.service_get_all_compute_sorted(context)]
for host in hosts:
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ rpc.queue_get_for(context, FLAGS.compute_topic, host),
{'method': 'refresh_provider_fw_rules', 'args': {}})
def _is_security_group_associated_with_server(self, security_group,
@@ -1808,7 +1808,6 @@ class HostAPI(base.Base):
"""Reboots, shuts down or powers up the host."""
# NOTE(comstud): No instance_uuid argument to this compute manager
# call
- topic = self.db.queue_get_for(context, FLAGS.compute_topic, host)
return self.compute_rpcapi.host_power_action(context, action=action,
host=host)
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 856175304..7ab27f740 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -315,9 +315,9 @@ class ComputeManager(manager.SchedulerDependentManager):
"""
#TODO(mdragon): perhaps make this variable by console_type?
- return self.db.queue_get_for(context,
- FLAGS.console_topic,
- FLAGS.console_host)
+ return rpc.queue_get_for(context,
+ FLAGS.console_topic,
+ FLAGS.console_host)
def get_console_pool_info(self, context, console_type):
return self.driver.get_console_pool_info(console_type)
@@ -1257,7 +1257,7 @@ class ComputeManager(manager.SchedulerDependentManager):
network_info = self._get_instance_nw_info(context, instance_ref)
self.driver.destroy(instance_ref, self._legacy_nw_info(network_info))
- topic = self.db.queue_get_for(context, FLAGS.compute_topic,
+ topic = rpc.queue_get_for(context, FLAGS.compute_topic,
migration_ref['source_compute'])
rpc.cast(context, topic,
{'method': 'finish_revert_resize',
@@ -1349,7 +1349,7 @@ class ComputeManager(manager.SchedulerDependentManager):
'status': 'pre-migrating'})
LOG.audit(_('Migrating'), context=context, instance=instance_ref)
- topic = self.db.queue_get_for(context, FLAGS.compute_topic,
+ topic = rpc.queue_get_for(context, FLAGS.compute_topic,
instance_ref['host'])
rpc.cast(context, topic,
{'method': 'resize_instance',
@@ -1412,9 +1412,9 @@ class ComputeManager(manager.SchedulerDependentManager):
service = self.db.service_get_by_host_and_topic(
context, migration_ref['dest_compute'], FLAGS.compute_topic)
- topic = self.db.queue_get_for(context,
- FLAGS.compute_topic,
- migration_ref['dest_compute'])
+ topic = rpc.queue_get_for(context,
+ FLAGS.compute_topic,
+ migration_ref['dest_compute'])
params = {'migration_id': migration_id,
'disk_info': disk_info,
'instance_uuid': instance_ref['uuid'],
@@ -2040,7 +2040,7 @@ class ComputeManager(manager.SchedulerDependentManager):
disk = None
rpc.call(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, dest),
+ rpc.queue_get_for(context, FLAGS.compute_topic, dest),
{'method': 'pre_live_migration',
'args': {'instance_id': instance_id,
'block_migration': block_migration,
@@ -2122,7 +2122,7 @@ class ComputeManager(manager.SchedulerDependentManager):
# Define domain at destination host, without doing it,
# pause/suspend/terminate do not work.
rpc.call(ctxt,
- self.db.queue_get_for(ctxt, FLAGS.compute_topic, dest),
+ rpc.queue_get_for(ctxt, FLAGS.compute_topic, dest),
{"method": "post_live_migration_at_destination",
"args": {'instance_id': instance_ref['id'],
'block_migration': block_migration}})
@@ -2227,7 +2227,7 @@ class ComputeManager(manager.SchedulerDependentManager):
# any empty images has to be deleted.
if block_migration:
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, dest),
+ rpc.queue_get_for(context, FLAGS.compute_topic, dest),
{"method": "rollback_live_migration_at_destination",
"args": {'instance_id': instance_ref['id']}})
diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py
index a2a3b281c..5db0282c3 100644
--- a/nova/compute/rpcapi.py
+++ b/nova/compute/rpcapi.py
@@ -18,16 +18,16 @@
Client side of the compute RPC API.
"""
-from nova.db import base
from nova import exception
from nova import flags
+from nova import rpc
import nova.rpc.proxy
FLAGS = flags.FLAGS
-class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base):
+class ComputeAPI(nova.rpc.proxy.RpcProxy):
'''Client side of the compute rpc API.
API version history:
@@ -58,7 +58,7 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base):
if not host:
raise exception.NovaException(_('Unable to find host for '
'Instance %s') % instance['uuid'])
- return self.db.queue_get_for(ctxt, self.topic, host)
+ return rpc.queue_get_for(ctxt, self.topic, host)
def add_aggregate_host(self, ctxt, aggregate_id, host_param, host):
'''Add aggregate host.
diff --git a/nova/console/api.py b/nova/console/api.py
index 0feaae488..20f00030e 100644
--- a/nova/console/api.py
+++ b/nova/console/api.py
@@ -44,8 +44,8 @@ class API(base.Base):
def delete_console(self, context, instance_id, console_id):
instance_id = self._translate_uuid_if_necessary(context, instance_id)
console = self.db.console_get(context, console_id, instance_id)
- topic = self.db.queue_get_for(context, FLAGS.console_topic,
- pool['host'])
+ topic = rpc.queue_get_for(context, FLAGS.console_topic,
+ pool['host'])
rpcapi = console_rpcapi.ConsoleAPI(topic=topic)
rpcapi.remove_console(context, console['id'])
@@ -61,9 +61,9 @@ class API(base.Base):
rpcapi.add_console(context, instance['id'])
def _get_console_topic(self, context, instance_host):
- topic = self.db.queue_get_for(context,
- FLAGS.compute_topic,
- instance_host)
+ topic = rpc.queue_get_for(context,
+ FLAGS.compute_topic,
+ instance_host)
return rpc.call(context, topic, {'method': 'get_console_topic',
'args': {'fake': 1}})
diff --git a/nova/console/manager.py b/nova/console/manager.py
index 8a42b449a..96a9ef31c 100644
--- a/nova/console/manager.py
+++ b/nova/console/manager.py
@@ -122,7 +122,7 @@ class ConsoleProxyManager(manager.Manager):
'password': '1234pass'}
else:
pool_info = rpc.call(context,
- self.db.queue_get_for(context,
+ rpc.queue_get_for(context,
FLAGS.compute_topic,
instance_host),
{'method': 'get_console_pool_info',
diff --git a/nova/console/vmrc_manager.py b/nova/console/vmrc_manager.py
index 0968ecd31..2008ff1b2 100644
--- a/nova/console/vmrc_manager.py
+++ b/nova/console/vmrc_manager.py
@@ -138,9 +138,9 @@ class ConsoleVMRCManager(manager.Manager):
console_type)
except exception.NotFound:
pool_info = rpc.call(context,
- self.db.queue_get_for(context,
- FLAGS.compute_topic,
- instance_host),
+ rpc.queue_get_for(context,
+ FLAGS.compute_topic,
+ instance_host),
{'method': 'get_console_pool_info',
'args': {'console_type': console_type}})
pool_info['password'] = self.driver.fix_pool_password(
diff --git a/nova/db/api.py b/nova/db/api.py
index 75739df8d..d33584036 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -848,14 +848,6 @@ def network_update(context, network_id, values):
###################
-def queue_get_for(context, topic, physical_node_id):
- """Return a channel to send a message to a node with a topic."""
- return IMPL.queue_get_for(context, topic, physical_node_id)
-
-
-###################
-
-
def iscsi_target_count_by_host(context, host):
"""Return count of export devices."""
return IMPL.iscsi_target_count_by_host(context, host)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 22eb7a8fd..c28239453 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -2203,14 +2203,6 @@ def network_update(context, network_id, values):
###################
-def queue_get_for(context, topic, physical_node_id):
- # FIXME(ja): this should be servername?
- return "%s.%s" % (topic, physical_node_id)
-
-
-###################
-
-
@require_admin_context
def iscsi_target_count_by_host(context, host):
return model_query(context, models.IscsiTarget).\
diff --git a/nova/network/manager.py b/nova/network/manager.py
index e552dec7f..1f1580634 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -201,9 +201,7 @@ class RPCAllocateFixedIP(object):
jsonutils.to_primitive(network)}})
if host != self.host:
# need to call allocate_fixed_ip to correct network host
- topic = self.db.queue_get_for(context,
- FLAGS.network_topic,
- host)
+ topic = rpc.queue_get_for(context, FLAGS.network_topic, host)
args = {}
args['instance_id'] = instance_id
args['network_id'] = network['id']
@@ -241,7 +239,7 @@ class RPCAllocateFixedIP(object):
host = network['host']
if host != self.host:
# need to call deallocate_fixed_ip on correct network host
- topic = self.db.queue_get_for(context, FLAGS.network_topic, host)
+ topic = rpc.queue_get_for(context, FLAGS.network_topic, host)
args = {'address': address,
'host': host}
rpc.cast(context, topic,
@@ -513,7 +511,7 @@ class FloatingIP(object):
else:
# send to correct host
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.network_topic, host),
+ rpc.queue_get_for(context, FLAGS.network_topic, host),
{'method': '_associate_floating_ip',
'args': {'floating_address': floating_address,
'fixed_address': fixed_address,
@@ -583,7 +581,7 @@ class FloatingIP(object):
else:
# send to correct host
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.network_topic, host),
+ rpc.queue_get_for(context, FLAGS.network_topic, host),
{'method': '_disassociate_floating_ip',
'args': {'address': address,
'interface': interface}})
@@ -1545,8 +1543,7 @@ class NetworkManager(manager.SchedulerDependentManager):
call_func(context, network)
else:
# i'm not the right host, run call on correct host
- topic = self.db.queue_get_for(context, FLAGS.network_topic,
- host)
+ topic = rpc.queue_get_for(context, FLAGS.network_topic, host)
args = {'network_id': network['id'], 'teardown': teardown}
# NOTE(tr3buchet): the call is just to wait for completion
green_pool.spawn_n(rpc.call, context, topic,
diff --git a/nova/network/quantum/manager.py b/nova/network/quantum/manager.py
index 8765f1082..50bc8cf14 100644
--- a/nova/network/quantum/manager.py
+++ b/nova/network/quantum/manager.py
@@ -291,7 +291,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager):
if net_ref['host'] == self.host:
self.kill_dhcp(net_ref)
else:
- topic = self.db.queue_get_for(context,
+ topic = rpc.queue_get_for(context,
FLAGS.network_topic,
net_ref['host'])
@@ -389,7 +389,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager):
self.enable_dhcp(context, network['quantum_net_id'],
network, vif_rec, network['net_tenant_id'])
else:
- topic = self.db.queue_get_for(context,
+ topic = rpc.queue_get_for(context,
FLAGS.network_topic, network['host'])
rpc.call(context, topic, {'method': 'enable_dhcp',
'args': {'quantum_net_id': network['quantum_net_id'],
@@ -608,7 +608,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager):
self.update_dhcp(context, ipam_tenant_id, network,
vif, project_id)
else:
- topic = self.db.queue_get_for(context,
+ topic = rpc.queue_get_for(context,
FLAGS.network_topic, network['host'])
rpc.call(context, topic, {'method': 'update_dhcp',
'args': {'ipam_tenant_id': ipam_tenant_id,
diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py
index 93fa6b173..1980f9679 100644
--- a/nova/rpc/__init__.py
+++ b/nova/rpc/__init__.py
@@ -226,6 +226,11 @@ def fanout_cast_to_server(context, server_params, topic, msg):
topic, msg)
+def queue_get_for(context, topic, host):
+ """Get a queue name for a given topic + host."""
+ return '%s.%s' % (topic, host)
+
+
_RPCIMPL = None
diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py
index 8e49e5aa4..364278584 100644
--- a/nova/scheduler/driver.py
+++ b/nova/scheduler/driver.py
@@ -62,8 +62,8 @@ def cast_to_volume_host(context, host, method, update_db=True, **kwargs):
db.volume_update(context, volume_id,
{'host': host, 'scheduled_at': now})
rpc.cast(context,
- db.queue_get_for(context, 'volume', host),
- {"method": method, "args": kwargs})
+ rpc.queue_get_for(context, 'volume', host),
+ {"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to volume '%(host)s'") % locals())
@@ -79,8 +79,8 @@ def cast_to_compute_host(context, host, method, update_db=True, **kwargs):
db.instance_update(context, instance_uuid,
{'host': host, 'scheduled_at': now})
rpc.cast(context,
- db.queue_get_for(context, 'compute', host),
- {"method": method, "args": kwargs})
+ rpc.queue_get_for(context, 'compute', host),
+ {"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to compute '%(host)s'") % locals())
@@ -88,8 +88,8 @@ def cast_to_network_host(context, host, method, update_db=False, **kwargs):
"""Cast request to a network host queue"""
rpc.cast(context,
- db.queue_get_for(context, 'network', host),
- {"method": method, "args": kwargs})
+ rpc.queue_get_for(context, 'network', host),
+ {"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to network '%(host)s'") % locals())
@@ -106,8 +106,8 @@ def cast_to_host(context, topic, host, method, update_db=True, **kwargs):
func(context, host, method, update_db=update_db, **kwargs)
else:
rpc.cast(context,
- db.queue_get_for(context, topic, host),
- {"method": method, "args": kwargs})
+ rpc.queue_get_for(context, topic, host),
+ {"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'")
% locals())
@@ -355,7 +355,7 @@ class Scheduler(object):
# Checking cpuinfo.
try:
rpc.call(context,
- db.queue_get_for(context, FLAGS.compute_topic, dest),
+ rpc.queue_get_for(context, FLAGS.compute_topic, dest),
{"method": 'compare_cpu',
"args": {'cpu_info': oservice_ref['cpu_info']}})
@@ -443,7 +443,7 @@ class Scheduler(object):
available = available_gb * (1024 ** 3)
# Getting necessary disk size
- topic = db.queue_get_for(context, FLAGS.compute_topic,
+ topic = rpc.queue_get_for(context, FLAGS.compute_topic,
instance_ref['host'])
ret = rpc.call(context, topic,
{"method": 'get_instance_disk_info',
@@ -492,8 +492,8 @@ class Scheduler(object):
"""
src = instance_ref['host']
- dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest)
- src_t = db.queue_get_for(context, FLAGS.compute_topic, src)
+ dst_t = rpc.queue_get_for(context, FLAGS.compute_topic, dest)
+ src_t = rpc.queue_get_for(context, FLAGS.compute_topic, src)
filename = rpc.call(context, dst_t,
{"method": 'create_shared_storage_test_file'})
diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py
index aefe19581..83a8963a5 100644
--- a/nova/tests/api/openstack/compute/test_servers.py
+++ b/nova/tests/api/openstack/compute/test_servers.py
@@ -1491,7 +1491,7 @@ class ServersControllerCreateTest(test.TestCase):
self.stubs.Set(nova.rpc, 'call', rpc_call_wrapper)
self.stubs.Set(nova.db, 'instance_update_and_get_original',
server_update)
- self.stubs.Set(nova.db, 'queue_get_for', queue_get_for)
+ self.stubs.Set(nova.rpc, 'queue_get_for', queue_get_for)
self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip',
fake_method)
diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py
index 51e36daab..e779e46a4 100644
--- a/nova/tests/compute/test_compute.py
+++ b/nova/tests/compute/test_compute.py
@@ -1425,7 +1425,7 @@ class ComputeTestCase(BaseTestCase):
inst_ref = self._create_fake_instance({'host': 'dummy'})
c = context.get_admin_context()
- topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
+ topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
# creating volume testdata
volume_id = db.volume_create(c, {'size': 1})['id']
@@ -1480,7 +1480,7 @@ class ComputeTestCase(BaseTestCase):
instance_id = instance['id']
c = context.get_admin_context()
inst_ref = db.instance_get(c, instance_id)
- topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
+ topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
# create
self.mox.StubOutWithMock(rpc, 'call')
@@ -1522,7 +1522,7 @@ class ComputeTestCase(BaseTestCase):
self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance')
self.compute.driver.unfilter_instance(i_ref, [])
self.mox.StubOutWithMock(rpc, 'call')
- rpc.call(c, db.queue_get_for(c, FLAGS.compute_topic, dest),
+ rpc.call(c, rpc.queue_get_for(c, FLAGS.compute_topic, dest),
{"method": "post_live_migration_at_destination",
"args": {'instance_id': i_ref['id'], 'block_migration': False}})
self.mox.StubOutWithMock(self.compute.driver, 'unplug_vifs')
diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py
index f97ddb730..a78fd2e12 100644
--- a/nova/tests/db/fakes.py
+++ b/nova/tests/db/fakes.py
@@ -304,9 +304,6 @@ def stub_out_db_network_api(stubs):
return [FakeModel(n) for n in networks
if n['project_id'] == project_id]
- def fake_queue_get_for(context, topic, node):
- return "%s.%s" % (topic, node)
-
funcs = [fake_floating_ip_allocate_address,
fake_floating_ip_deallocate,
fake_floating_ip_disassociate,
@@ -335,8 +332,7 @@ def stub_out_db_network_api(stubs):
fake_network_get_all_by_instance,
fake_network_set_host,
fake_network_update,
- fake_project_get_networks,
- fake_queue_get_for]
+ fake_project_get_networks]
stub_out(stubs, funcs)
diff --git a/nova/tests/scheduler/test_scheduler.py b/nova/tests/scheduler/test_scheduler.py
index 938f60b46..302f22939 100644
--- a/nova/tests/scheduler/test_scheduler.py
+++ b/nova/tests/scheduler/test_scheduler.py
@@ -476,7 +476,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
self.mox.StubOutWithMock(self.driver, '_get_compute_info')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'instance_update_and_get_original')
@@ -504,7 +504,7 @@ class SchedulerTestCase(test.TestCase):
# assert_compute_node_has_enough_disk()
self.driver._get_compute_info(self.context, dest,
'disk_available_least').AndReturn(1025)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue1')
rpc.call(self.context, 'src_queue1',
{'method': 'get_instance_disk_info',
@@ -512,9 +512,9 @@ class SchedulerTestCase(test.TestCase):
json.dumps([{'disk_size': 1024 * (1024 ** 3)}]))
# Common checks (shared storage ok, same hypervisor,e tc)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -535,7 +535,7 @@ class SchedulerTestCase(test.TestCase):
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1,
'cpu_info': 'fake_cpu_info'}]}])
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.call(self.context, 'dest_queue',
{'method': 'compare_cpu',
@@ -696,7 +696,7 @@ class SchedulerTestCase(test.TestCase):
'assert_compute_node_has_enough_memory')
self.mox.StubOutWithMock(self.driver, '_get_compute_info')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
dest = 'fake_host2'
@@ -717,7 +717,7 @@ class SchedulerTestCase(test.TestCase):
# Not enough disk
self.driver._get_compute_info(self.context, dest,
'disk_available_least').AndReturn(1023)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
rpc.call(self.context, 'src_queue',
{'method': 'get_instance_disk_info',
@@ -737,7 +737,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
@@ -751,9 +751,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -779,7 +779,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
@@ -793,9 +793,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -819,7 +819,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@@ -834,9 +834,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -868,7 +868,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@@ -883,9 +883,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -916,7 +916,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@@ -931,9 +931,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -953,7 +953,7 @@ class SchedulerTestCase(test.TestCase):
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1,
'cpu_info': 'fake_cpu_info'}]}])
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.call(self.context, 'dest_queue',
{'method': 'compare_cpu',
@@ -1018,13 +1018,13 @@ class SchedulerDriverModuleTestCase(test.TestCase):
self.mox.StubOutWithMock(utils, 'utcnow')
self.mox.StubOutWithMock(db, 'volume_update')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
utils.utcnow().AndReturn('fake-now')
db.volume_update(self.context, 31337,
{'host': host, 'scheduled_at': 'fake-now'})
- db.queue_get_for(self.context, 'volume', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1039,10 +1039,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, 'volume', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1057,10 +1057,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, 'volume', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1078,13 +1078,13 @@ class SchedulerDriverModuleTestCase(test.TestCase):
self.mox.StubOutWithMock(utils, 'utcnow')
self.mox.StubOutWithMock(db, 'instance_update')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
utils.utcnow().AndReturn('fake-now')
db.instance_update(self.context, 31337,
{'host': host, 'scheduled_at': 'fake-now'})
- db.queue_get_for(self.context, 'compute', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1099,10 +1099,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, 'compute', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1117,10 +1117,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, 'compute', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1135,10 +1135,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, 'network', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'network', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1193,10 +1193,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
topic = 'unknown'
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, topic, host).AndReturn(queue)
+ rpc.queue_get_for(self.context, topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py
index f3cce22a9..6bf3edad4 100644
--- a/nova/virt/xenapi/pool.py
+++ b/nova/virt/xenapi/pool.py
@@ -195,7 +195,7 @@ def forward_request(context, request_type, master, aggregate_id,
# because this might be 169.254.0.1, i.e. xenapi
# NOTE: password in clear is not great, but it'll do for now
sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address)
- rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master),
+ rpc.cast(context, rpc.queue_get_for(context, FLAGS.compute_topic, master),
{"method": request_type,
"args": {"aggregate_id": aggregate_id,
"host": slave_compute,
diff --git a/nova/volume/api.py b/nova/volume/api.py
index e03e78779..0b9b4afff 100644
--- a/nova/volume/api.py
+++ b/nova/volume/api.py
@@ -152,7 +152,7 @@ class API(base.Base):
'terminated_at': now})
host = volume['host']
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.volume_topic, host),
+ rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume_id}})
@@ -238,7 +238,7 @@ class API(base.Base):
def remove_from_compute(self, context, volume, instance_id, host):
"""Remove volume from specified compute host."""
rpc.call(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ rpc.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "remove_volume_connection",
"args": {'instance_id': instance_id,
'volume_id': volume['id']}})
@@ -255,7 +255,7 @@ class API(base.Base):
@wrap_check_policy
def attach(self, context, volume, instance_uuid, mountpoint):
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "attach_volume",
"args": {"volume_id": volume['id'],
@@ -265,7 +265,7 @@ class API(base.Base):
@wrap_check_policy
def detach(self, context, volume):
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "detach_volume",
"args": {"volume_id": volume['id']}})
@@ -273,7 +273,7 @@ class API(base.Base):
@wrap_check_policy
def initialize_connection(self, context, volume, connector):
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "initialize_connection",
"args": {"volume_id": volume['id'],
@@ -283,7 +283,7 @@ class API(base.Base):
def terminate_connection(self, context, volume, connector):
self.unreserve_volume(context, volume)
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "terminate_connection",
"args": {"volume_id": volume['id'],
@@ -310,7 +310,7 @@ class API(base.Base):
snapshot = self.db.snapshot_create(context, options)
host = volume['host']
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.volume_topic, host),
+ rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "create_snapshot",
"args": {"volume_id": volume['id'],
"snapshot_id": snapshot['id']}})
@@ -334,7 +334,7 @@ class API(base.Base):
volume = self.db.volume_get(context, snapshot['volume_id'])
host = volume['host']
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.volume_topic, host),
+ rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_snapshot",
"args": {"snapshot_id": snapshot['id']}})