summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-05-31 01:29:40 +0000
committerGerrit Code Review <review@openstack.org>2012-05-31 01:29:40 +0000
commitf3edf7ff7edefede242de50aca59abc4c6adc7f9 (patch)
tree68e8260744e8ae02515c02fe71251362a031506f
parent069f8f3ea4ac20271cddca0c3e209ef0af8cd0d6 (diff)
parent26353bb372081f674cf5fd3dbbffd990918f3803 (diff)
downloadnova-f3edf7ff7edefede242de50aca59abc4c6adc7f9.tar.gz
nova-f3edf7ff7edefede242de50aca59abc4c6adc7f9.tar.xz
nova-f3edf7ff7edefede242de50aca59abc4c6adc7f9.zip
Merge "Move queue_get_for() from db to rpc."
-rwxr-xr-xbin/nova-manage4
-rw-r--r--nova/compute/api.py3
-rw-r--r--nova/compute/manager.py22
-rw-r--r--nova/compute/rpcapi.py6
-rw-r--r--nova/console/api.py10
-rw-r--r--nova/console/manager.py2
-rw-r--r--nova/console/vmrc_manager.py6
-rw-r--r--nova/db/api.py8
-rw-r--r--nova/db/sqlalchemy/api.py8
-rw-r--r--nova/network/manager.py13
-rw-r--r--nova/network/quantum/manager.py6
-rw-r--r--nova/rpc/__init__.py5
-rw-r--r--nova/scheduler/driver.py24
-rw-r--r--nova/tests/api/openstack/compute/test_servers.py2
-rw-r--r--nova/tests/compute/test_compute.py6
-rw-r--r--nova/tests/db/fakes.py6
-rw-r--r--nova/tests/scheduler/test_scheduler.py78
-rw-r--r--nova/virt/xenapi/pool.py2
-rw-r--r--nova/volume/api.py16
19 files changed, 104 insertions, 123 deletions
diff --git a/bin/nova-manage b/bin/nova-manage
index 20326b3c4..1554251a0 100755
--- a/bin/nova-manage
+++ b/bin/nova-manage
@@ -1153,7 +1153,7 @@ class VolumeCommands(object):
return
rpc.cast(ctxt,
- db.queue_get_for(ctxt, FLAGS.volume_topic, host),
+ rpc.queue_get_for(ctxt, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume['id']}})
@@ -1171,7 +1171,7 @@ class VolumeCommands(object):
instance = db.instance_get(ctxt, volume['instance_id'])
host = instance['host']
rpc.cast(ctxt,
- db.queue_get_for(ctxt, FLAGS.compute_topic, host),
+ rpc.queue_get_for(ctxt, FLAGS.compute_topic, host),
{"method": "attach_volume",
"args": {"instance_id": instance['id'],
"volume_id": volume['id'],
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 3990a9596..9f96b8e6b 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -841,7 +841,7 @@ class API(base.Base):
in self.db.service_get_all_compute_sorted(context)]
for host in hosts:
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ rpc.queue_get_for(context, FLAGS.compute_topic, host),
{'method': 'refresh_provider_fw_rules', 'args': {}})
def _is_security_group_associated_with_server(self, security_group,
@@ -1814,7 +1814,6 @@ class HostAPI(base.Base):
"""Reboots, shuts down or powers up the host."""
# NOTE(comstud): No instance_uuid argument to this compute manager
# call
- topic = self.db.queue_get_for(context, FLAGS.compute_topic, host)
return self.compute_rpcapi.host_power_action(context, action=action,
host=host)
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 856175304..7ab27f740 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -315,9 +315,9 @@ class ComputeManager(manager.SchedulerDependentManager):
"""
#TODO(mdragon): perhaps make this variable by console_type?
- return self.db.queue_get_for(context,
- FLAGS.console_topic,
- FLAGS.console_host)
+ return rpc.queue_get_for(context,
+ FLAGS.console_topic,
+ FLAGS.console_host)
def get_console_pool_info(self, context, console_type):
return self.driver.get_console_pool_info(console_type)
@@ -1257,7 +1257,7 @@ class ComputeManager(manager.SchedulerDependentManager):
network_info = self._get_instance_nw_info(context, instance_ref)
self.driver.destroy(instance_ref, self._legacy_nw_info(network_info))
- topic = self.db.queue_get_for(context, FLAGS.compute_topic,
+ topic = rpc.queue_get_for(context, FLAGS.compute_topic,
migration_ref['source_compute'])
rpc.cast(context, topic,
{'method': 'finish_revert_resize',
@@ -1349,7 +1349,7 @@ class ComputeManager(manager.SchedulerDependentManager):
'status': 'pre-migrating'})
LOG.audit(_('Migrating'), context=context, instance=instance_ref)
- topic = self.db.queue_get_for(context, FLAGS.compute_topic,
+ topic = rpc.queue_get_for(context, FLAGS.compute_topic,
instance_ref['host'])
rpc.cast(context, topic,
{'method': 'resize_instance',
@@ -1412,9 +1412,9 @@ class ComputeManager(manager.SchedulerDependentManager):
service = self.db.service_get_by_host_and_topic(
context, migration_ref['dest_compute'], FLAGS.compute_topic)
- topic = self.db.queue_get_for(context,
- FLAGS.compute_topic,
- migration_ref['dest_compute'])
+ topic = rpc.queue_get_for(context,
+ FLAGS.compute_topic,
+ migration_ref['dest_compute'])
params = {'migration_id': migration_id,
'disk_info': disk_info,
'instance_uuid': instance_ref['uuid'],
@@ -2040,7 +2040,7 @@ class ComputeManager(manager.SchedulerDependentManager):
disk = None
rpc.call(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, dest),
+ rpc.queue_get_for(context, FLAGS.compute_topic, dest),
{'method': 'pre_live_migration',
'args': {'instance_id': instance_id,
'block_migration': block_migration,
@@ -2122,7 +2122,7 @@ class ComputeManager(manager.SchedulerDependentManager):
# Define domain at destination host, without doing it,
# pause/suspend/terminate do not work.
rpc.call(ctxt,
- self.db.queue_get_for(ctxt, FLAGS.compute_topic, dest),
+ rpc.queue_get_for(ctxt, FLAGS.compute_topic, dest),
{"method": "post_live_migration_at_destination",
"args": {'instance_id': instance_ref['id'],
'block_migration': block_migration}})
@@ -2227,7 +2227,7 @@ class ComputeManager(manager.SchedulerDependentManager):
# any empty images has to be deleted.
if block_migration:
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, dest),
+ rpc.queue_get_for(context, FLAGS.compute_topic, dest),
{"method": "rollback_live_migration_at_destination",
"args": {'instance_id': instance_ref['id']}})
diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py
index a2a3b281c..5db0282c3 100644
--- a/nova/compute/rpcapi.py
+++ b/nova/compute/rpcapi.py
@@ -18,16 +18,16 @@
Client side of the compute RPC API.
"""
-from nova.db import base
from nova import exception
from nova import flags
+from nova import rpc
import nova.rpc.proxy
FLAGS = flags.FLAGS
-class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base):
+class ComputeAPI(nova.rpc.proxy.RpcProxy):
'''Client side of the compute rpc API.
API version history:
@@ -58,7 +58,7 @@ class ComputeAPI(nova.rpc.proxy.RpcProxy, base.Base):
if not host:
raise exception.NovaException(_('Unable to find host for '
'Instance %s') % instance['uuid'])
- return self.db.queue_get_for(ctxt, self.topic, host)
+ return rpc.queue_get_for(ctxt, self.topic, host)
def add_aggregate_host(self, ctxt, aggregate_id, host_param, host):
'''Add aggregate host.
diff --git a/nova/console/api.py b/nova/console/api.py
index 0feaae488..20f00030e 100644
--- a/nova/console/api.py
+++ b/nova/console/api.py
@@ -44,8 +44,8 @@ class API(base.Base):
def delete_console(self, context, instance_id, console_id):
instance_id = self._translate_uuid_if_necessary(context, instance_id)
console = self.db.console_get(context, console_id, instance_id)
- topic = self.db.queue_get_for(context, FLAGS.console_topic,
- pool['host'])
+ topic = rpc.queue_get_for(context, FLAGS.console_topic,
+ pool['host'])
rpcapi = console_rpcapi.ConsoleAPI(topic=topic)
rpcapi.remove_console(context, console['id'])
@@ -61,9 +61,9 @@ class API(base.Base):
rpcapi.add_console(context, instance['id'])
def _get_console_topic(self, context, instance_host):
- topic = self.db.queue_get_for(context,
- FLAGS.compute_topic,
- instance_host)
+ topic = rpc.queue_get_for(context,
+ FLAGS.compute_topic,
+ instance_host)
return rpc.call(context, topic, {'method': 'get_console_topic',
'args': {'fake': 1}})
diff --git a/nova/console/manager.py b/nova/console/manager.py
index 8a42b449a..96a9ef31c 100644
--- a/nova/console/manager.py
+++ b/nova/console/manager.py
@@ -122,7 +122,7 @@ class ConsoleProxyManager(manager.Manager):
'password': '1234pass'}
else:
pool_info = rpc.call(context,
- self.db.queue_get_for(context,
+ rpc.queue_get_for(context,
FLAGS.compute_topic,
instance_host),
{'method': 'get_console_pool_info',
diff --git a/nova/console/vmrc_manager.py b/nova/console/vmrc_manager.py
index 0968ecd31..2008ff1b2 100644
--- a/nova/console/vmrc_manager.py
+++ b/nova/console/vmrc_manager.py
@@ -138,9 +138,9 @@ class ConsoleVMRCManager(manager.Manager):
console_type)
except exception.NotFound:
pool_info = rpc.call(context,
- self.db.queue_get_for(context,
- FLAGS.compute_topic,
- instance_host),
+ rpc.queue_get_for(context,
+ FLAGS.compute_topic,
+ instance_host),
{'method': 'get_console_pool_info',
'args': {'console_type': console_type}})
pool_info['password'] = self.driver.fix_pool_password(
diff --git a/nova/db/api.py b/nova/db/api.py
index 75739df8d..d33584036 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -848,14 +848,6 @@ def network_update(context, network_id, values):
###################
-def queue_get_for(context, topic, physical_node_id):
- """Return a channel to send a message to a node with a topic."""
- return IMPL.queue_get_for(context, topic, physical_node_id)
-
-
-###################
-
-
def iscsi_target_count_by_host(context, host):
"""Return count of export devices."""
return IMPL.iscsi_target_count_by_host(context, host)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 718438569..30c556b4d 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -2203,14 +2203,6 @@ def network_update(context, network_id, values):
###################
-def queue_get_for(context, topic, physical_node_id):
- # FIXME(ja): this should be servername?
- return "%s.%s" % (topic, physical_node_id)
-
-
-###################
-
-
@require_admin_context
def iscsi_target_count_by_host(context, host):
return model_query(context, models.IscsiTarget).\
diff --git a/nova/network/manager.py b/nova/network/manager.py
index e552dec7f..1f1580634 100644
--- a/nova/network/manager.py
+++ b/nova/network/manager.py
@@ -201,9 +201,7 @@ class RPCAllocateFixedIP(object):
jsonutils.to_primitive(network)}})
if host != self.host:
# need to call allocate_fixed_ip to correct network host
- topic = self.db.queue_get_for(context,
- FLAGS.network_topic,
- host)
+ topic = rpc.queue_get_for(context, FLAGS.network_topic, host)
args = {}
args['instance_id'] = instance_id
args['network_id'] = network['id']
@@ -241,7 +239,7 @@ class RPCAllocateFixedIP(object):
host = network['host']
if host != self.host:
# need to call deallocate_fixed_ip on correct network host
- topic = self.db.queue_get_for(context, FLAGS.network_topic, host)
+ topic = rpc.queue_get_for(context, FLAGS.network_topic, host)
args = {'address': address,
'host': host}
rpc.cast(context, topic,
@@ -513,7 +511,7 @@ class FloatingIP(object):
else:
# send to correct host
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.network_topic, host),
+ rpc.queue_get_for(context, FLAGS.network_topic, host),
{'method': '_associate_floating_ip',
'args': {'floating_address': floating_address,
'fixed_address': fixed_address,
@@ -583,7 +581,7 @@ class FloatingIP(object):
else:
# send to correct host
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.network_topic, host),
+ rpc.queue_get_for(context, FLAGS.network_topic, host),
{'method': '_disassociate_floating_ip',
'args': {'address': address,
'interface': interface}})
@@ -1545,8 +1543,7 @@ class NetworkManager(manager.SchedulerDependentManager):
call_func(context, network)
else:
# i'm not the right host, run call on correct host
- topic = self.db.queue_get_for(context, FLAGS.network_topic,
- host)
+ topic = rpc.queue_get_for(context, FLAGS.network_topic, host)
args = {'network_id': network['id'], 'teardown': teardown}
# NOTE(tr3buchet): the call is just to wait for completion
green_pool.spawn_n(rpc.call, context, topic,
diff --git a/nova/network/quantum/manager.py b/nova/network/quantum/manager.py
index 8765f1082..50bc8cf14 100644
--- a/nova/network/quantum/manager.py
+++ b/nova/network/quantum/manager.py
@@ -291,7 +291,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager):
if net_ref['host'] == self.host:
self.kill_dhcp(net_ref)
else:
- topic = self.db.queue_get_for(context,
+ topic = rpc.queue_get_for(context,
FLAGS.network_topic,
net_ref['host'])
@@ -389,7 +389,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager):
self.enable_dhcp(context, network['quantum_net_id'],
network, vif_rec, network['net_tenant_id'])
else:
- topic = self.db.queue_get_for(context,
+ topic = rpc.queue_get_for(context,
FLAGS.network_topic, network['host'])
rpc.call(context, topic, {'method': 'enable_dhcp',
'args': {'quantum_net_id': network['quantum_net_id'],
@@ -608,7 +608,7 @@ class QuantumManager(manager.FloatingIP, manager.FlatManager):
self.update_dhcp(context, ipam_tenant_id, network,
vif, project_id)
else:
- topic = self.db.queue_get_for(context,
+ topic = rpc.queue_get_for(context,
FLAGS.network_topic, network['host'])
rpc.call(context, topic, {'method': 'update_dhcp',
'args': {'ipam_tenant_id': ipam_tenant_id,
diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py
index 93fa6b173..1980f9679 100644
--- a/nova/rpc/__init__.py
+++ b/nova/rpc/__init__.py
@@ -226,6 +226,11 @@ def fanout_cast_to_server(context, server_params, topic, msg):
topic, msg)
+def queue_get_for(context, topic, host):
+ """Get a queue name for a given topic + host."""
+ return '%s.%s' % (topic, host)
+
+
_RPCIMPL = None
diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py
index 8e49e5aa4..364278584 100644
--- a/nova/scheduler/driver.py
+++ b/nova/scheduler/driver.py
@@ -62,8 +62,8 @@ def cast_to_volume_host(context, host, method, update_db=True, **kwargs):
db.volume_update(context, volume_id,
{'host': host, 'scheduled_at': now})
rpc.cast(context,
- db.queue_get_for(context, 'volume', host),
- {"method": method, "args": kwargs})
+ rpc.queue_get_for(context, 'volume', host),
+ {"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to volume '%(host)s'") % locals())
@@ -79,8 +79,8 @@ def cast_to_compute_host(context, host, method, update_db=True, **kwargs):
db.instance_update(context, instance_uuid,
{'host': host, 'scheduled_at': now})
rpc.cast(context,
- db.queue_get_for(context, 'compute', host),
- {"method": method, "args": kwargs})
+ rpc.queue_get_for(context, 'compute', host),
+ {"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to compute '%(host)s'") % locals())
@@ -88,8 +88,8 @@ def cast_to_network_host(context, host, method, update_db=False, **kwargs):
"""Cast request to a network host queue"""
rpc.cast(context,
- db.queue_get_for(context, 'network', host),
- {"method": method, "args": kwargs})
+ rpc.queue_get_for(context, 'network', host),
+ {"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to network '%(host)s'") % locals())
@@ -106,8 +106,8 @@ def cast_to_host(context, topic, host, method, update_db=True, **kwargs):
func(context, host, method, update_db=update_db, **kwargs)
else:
rpc.cast(context,
- db.queue_get_for(context, topic, host),
- {"method": method, "args": kwargs})
+ rpc.queue_get_for(context, topic, host),
+ {"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'")
% locals())
@@ -355,7 +355,7 @@ class Scheduler(object):
# Checking cpuinfo.
try:
rpc.call(context,
- db.queue_get_for(context, FLAGS.compute_topic, dest),
+ rpc.queue_get_for(context, FLAGS.compute_topic, dest),
{"method": 'compare_cpu',
"args": {'cpu_info': oservice_ref['cpu_info']}})
@@ -443,7 +443,7 @@ class Scheduler(object):
available = available_gb * (1024 ** 3)
# Getting necessary disk size
- topic = db.queue_get_for(context, FLAGS.compute_topic,
+ topic = rpc.queue_get_for(context, FLAGS.compute_topic,
instance_ref['host'])
ret = rpc.call(context, topic,
{"method": 'get_instance_disk_info',
@@ -492,8 +492,8 @@ class Scheduler(object):
"""
src = instance_ref['host']
- dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest)
- src_t = db.queue_get_for(context, FLAGS.compute_topic, src)
+ dst_t = rpc.queue_get_for(context, FLAGS.compute_topic, dest)
+ src_t = rpc.queue_get_for(context, FLAGS.compute_topic, src)
filename = rpc.call(context, dst_t,
{"method": 'create_shared_storage_test_file'})
diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py
index aefe19581..83a8963a5 100644
--- a/nova/tests/api/openstack/compute/test_servers.py
+++ b/nova/tests/api/openstack/compute/test_servers.py
@@ -1491,7 +1491,7 @@ class ServersControllerCreateTest(test.TestCase):
self.stubs.Set(nova.rpc, 'call', rpc_call_wrapper)
self.stubs.Set(nova.db, 'instance_update_and_get_original',
server_update)
- self.stubs.Set(nova.db, 'queue_get_for', queue_get_for)
+ self.stubs.Set(nova.rpc, 'queue_get_for', queue_get_for)
self.stubs.Set(nova.network.manager.VlanManager, 'allocate_fixed_ip',
fake_method)
diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py
index 83671d265..f1903bd4c 100644
--- a/nova/tests/compute/test_compute.py
+++ b/nova/tests/compute/test_compute.py
@@ -1426,7 +1426,7 @@ class ComputeTestCase(BaseTestCase):
inst_ref = self._create_fake_instance({'host': 'dummy'})
c = context.get_admin_context()
- topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
+ topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
# creating volume testdata
volume_id = db.volume_create(c, {'size': 1})['id']
@@ -1481,7 +1481,7 @@ class ComputeTestCase(BaseTestCase):
instance_id = instance['id']
c = context.get_admin_context()
inst_ref = db.instance_get(c, instance_id)
- topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
+ topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
# create
self.mox.StubOutWithMock(rpc, 'call')
@@ -1523,7 +1523,7 @@ class ComputeTestCase(BaseTestCase):
self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance')
self.compute.driver.unfilter_instance(i_ref, [])
self.mox.StubOutWithMock(rpc, 'call')
- rpc.call(c, db.queue_get_for(c, FLAGS.compute_topic, dest),
+ rpc.call(c, rpc.queue_get_for(c, FLAGS.compute_topic, dest),
{"method": "post_live_migration_at_destination",
"args": {'instance_id': i_ref['id'], 'block_migration': False}})
self.mox.StubOutWithMock(self.compute.driver, 'unplug_vifs')
diff --git a/nova/tests/db/fakes.py b/nova/tests/db/fakes.py
index f97ddb730..a78fd2e12 100644
--- a/nova/tests/db/fakes.py
+++ b/nova/tests/db/fakes.py
@@ -304,9 +304,6 @@ def stub_out_db_network_api(stubs):
return [FakeModel(n) for n in networks
if n['project_id'] == project_id]
- def fake_queue_get_for(context, topic, node):
- return "%s.%s" % (topic, node)
-
funcs = [fake_floating_ip_allocate_address,
fake_floating_ip_deallocate,
fake_floating_ip_disassociate,
@@ -335,8 +332,7 @@ def stub_out_db_network_api(stubs):
fake_network_get_all_by_instance,
fake_network_set_host,
fake_network_update,
- fake_project_get_networks,
- fake_queue_get_for]
+ fake_project_get_networks]
stub_out(stubs, funcs)
diff --git a/nova/tests/scheduler/test_scheduler.py b/nova/tests/scheduler/test_scheduler.py
index 938f60b46..302f22939 100644
--- a/nova/tests/scheduler/test_scheduler.py
+++ b/nova/tests/scheduler/test_scheduler.py
@@ -476,7 +476,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
self.mox.StubOutWithMock(self.driver, '_get_compute_info')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'instance_update_and_get_original')
@@ -504,7 +504,7 @@ class SchedulerTestCase(test.TestCase):
# assert_compute_node_has_enough_disk()
self.driver._get_compute_info(self.context, dest,
'disk_available_least').AndReturn(1025)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue1')
rpc.call(self.context, 'src_queue1',
{'method': 'get_instance_disk_info',
@@ -512,9 +512,9 @@ class SchedulerTestCase(test.TestCase):
json.dumps([{'disk_size': 1024 * (1024 ** 3)}]))
# Common checks (shared storage ok, same hypervisor,e tc)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -535,7 +535,7 @@ class SchedulerTestCase(test.TestCase):
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1,
'cpu_info': 'fake_cpu_info'}]}])
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.call(self.context, 'dest_queue',
{'method': 'compare_cpu',
@@ -696,7 +696,7 @@ class SchedulerTestCase(test.TestCase):
'assert_compute_node_has_enough_memory')
self.mox.StubOutWithMock(self.driver, '_get_compute_info')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
dest = 'fake_host2'
@@ -717,7 +717,7 @@ class SchedulerTestCase(test.TestCase):
# Not enough disk
self.driver._get_compute_info(self.context, dest,
'disk_available_least').AndReturn(1023)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
rpc.call(self.context, 'src_queue',
{'method': 'get_instance_disk_info',
@@ -737,7 +737,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
@@ -751,9 +751,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -779,7 +779,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
@@ -793,9 +793,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -819,7 +819,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@@ -834,9 +834,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -868,7 +868,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@@ -883,9 +883,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -916,7 +916,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@@ -931,9 +931,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -953,7 +953,7 @@ class SchedulerTestCase(test.TestCase):
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1,
'cpu_info': 'fake_cpu_info'}]}])
- db.queue_get_for(self.context, FLAGS.compute_topic,
+ rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.call(self.context, 'dest_queue',
{'method': 'compare_cpu',
@@ -1018,13 +1018,13 @@ class SchedulerDriverModuleTestCase(test.TestCase):
self.mox.StubOutWithMock(utils, 'utcnow')
self.mox.StubOutWithMock(db, 'volume_update')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
utils.utcnow().AndReturn('fake-now')
db.volume_update(self.context, 31337,
{'host': host, 'scheduled_at': 'fake-now'})
- db.queue_get_for(self.context, 'volume', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1039,10 +1039,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, 'volume', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1057,10 +1057,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, 'volume', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1078,13 +1078,13 @@ class SchedulerDriverModuleTestCase(test.TestCase):
self.mox.StubOutWithMock(utils, 'utcnow')
self.mox.StubOutWithMock(db, 'instance_update')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
utils.utcnow().AndReturn('fake-now')
db.instance_update(self.context, 31337,
{'host': host, 'scheduled_at': 'fake-now'})
- db.queue_get_for(self.context, 'compute', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1099,10 +1099,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, 'compute', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1117,10 +1117,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, 'compute', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1135,10 +1135,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, 'network', host).AndReturn(queue)
+ rpc.queue_get_for(self.context, 'network', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1193,10 +1193,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
topic = 'unknown'
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, topic, host).AndReturn(queue)
+ rpc.queue_get_for(self.context, topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py
index f3cce22a9..6bf3edad4 100644
--- a/nova/virt/xenapi/pool.py
+++ b/nova/virt/xenapi/pool.py
@@ -195,7 +195,7 @@ def forward_request(context, request_type, master, aggregate_id,
# because this might be 169.254.0.1, i.e. xenapi
# NOTE: password in clear is not great, but it'll do for now
sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address)
- rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master),
+ rpc.cast(context, rpc.queue_get_for(context, FLAGS.compute_topic, master),
{"method": request_type,
"args": {"aggregate_id": aggregate_id,
"host": slave_compute,
diff --git a/nova/volume/api.py b/nova/volume/api.py
index e03e78779..0b9b4afff 100644
--- a/nova/volume/api.py
+++ b/nova/volume/api.py
@@ -152,7 +152,7 @@ class API(base.Base):
'terminated_at': now})
host = volume['host']
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.volume_topic, host),
+ rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume_id}})
@@ -238,7 +238,7 @@ class API(base.Base):
def remove_from_compute(self, context, volume, instance_id, host):
"""Remove volume from specified compute host."""
rpc.call(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ rpc.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "remove_volume_connection",
"args": {'instance_id': instance_id,
'volume_id': volume['id']}})
@@ -255,7 +255,7 @@ class API(base.Base):
@wrap_check_policy
def attach(self, context, volume, instance_uuid, mountpoint):
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "attach_volume",
"args": {"volume_id": volume['id'],
@@ -265,7 +265,7 @@ class API(base.Base):
@wrap_check_policy
def detach(self, context, volume):
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "detach_volume",
"args": {"volume_id": volume['id']}})
@@ -273,7 +273,7 @@ class API(base.Base):
@wrap_check_policy
def initialize_connection(self, context, volume, connector):
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "initialize_connection",
"args": {"volume_id": volume['id'],
@@ -283,7 +283,7 @@ class API(base.Base):
def terminate_connection(self, context, volume, connector):
self.unreserve_volume(context, volume)
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "terminate_connection",
"args": {"volume_id": volume['id'],
@@ -310,7 +310,7 @@ class API(base.Base):
snapshot = self.db.snapshot_create(context, options)
host = volume['host']
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.volume_topic, host),
+ rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "create_snapshot",
"args": {"volume_id": volume['id'],
"snapshot_id": snapshot['id']}})
@@ -334,7 +334,7 @@ class API(base.Base):
volume = self.db.volume_get(context, snapshot['volume_id'])
host = volume['host']
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.volume_topic, host),
+ rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_snapshot",
"args": {"snapshot_id": snapshot['id']}})