summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-02-16 21:41:43 +0000
committerGerrit Code Review <review@openstack.org>2012-02-16 21:41:43 +0000
commit34d77ac8b1919a287865a4bef376579b6bf09b48 (patch)
tree271e25c53088c1f1aa27d46fbe6dfc1a0a558b0f
parent26227b79e9246a87eeb83766cfcc8e96d294d28b (diff)
parentc729ba8c0aa4d283e84d139bc98e0e89fd933c4a (diff)
Merge "Core modifications for future zones service."
-rw-r--r--nova/compute/__init__.py7
-rw-r--r--nova/compute/api.py55
-rw-r--r--nova/db/sqlalchemy/api.py16
-rw-r--r--nova/db/sqlalchemy/migrate_repo/versions/078_add_rpc_info_to_zones.py44
-rw-r--r--nova/db/sqlalchemy/migrate_repo/versions/078_sqlite_downgrade.sql35
-rw-r--r--nova/db/sqlalchemy/migrate_repo/versions/079_add_zone_name_to_instances.py31
-rw-r--r--nova/db/sqlalchemy/models.py7
-rw-r--r--nova/flags.py11
-rw-r--r--nova/network/__init__.py7
-rw-r--r--nova/rpc/__init__.py31
-rw-r--r--nova/rpc/amqp.py22
-rw-r--r--nova/rpc/impl_kombu.py44
-rw-r--r--nova/rpc/impl_qpid.py36
-rw-r--r--nova/tests/api/openstack/compute/test_extensions.py1
-rw-r--r--nova/tests/api/openstack/compute/test_servers.py1
-rw-r--r--nova/tests/rpc/test_kombu.py60
-rw-r--r--nova/tests/rpc/test_qpid.py72
-rw-r--r--nova/volume/__init__.py7
18 files changed, 429 insertions, 58 deletions
diff --git a/nova/compute/__init__.py b/nova/compute/__init__.py
index c0cfe7c27..72df5ddd5 100644
--- a/nova/compute/__init__.py
+++ b/nova/compute/__init__.py
@@ -16,5 +16,10 @@
# License for the specific language governing permissions and limitations
# under the License.
-from nova.compute.api import API
from nova.compute.api import AggregateAPI
+# Importing full names to not pollute the namespace and cause possible
+# collisions with use of 'from nova.compute import <foo>' elsewhere.
+import nova.flags
+import nova.utils
+
+API = nova.utils.import_class(nova.flags.FLAGS.compute_api_class)
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 637be01af..d93443ea4 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -567,6 +567,20 @@ class API(base.Base):
"requested_networks": requested_networks,
"filter_properties": filter_properties}})
+ def _check_create_policies(self, context, availability_zone,
+ requested_networks, block_device_mapping):
+ """Check policies for create()."""
+ target = {'project_id': context.project_id,
+ 'user_id': context.user_id,
+ 'availability_zone': availability_zone}
+ check_policy(context, 'create', target)
+
+ if requested_networks:
+ check_policy(context, 'create:attach_network', target)
+
+ if block_device_mapping:
+ check_policy(context, 'create:attach_volume', target)
+
def create(self, context, instance_type,
image_href, kernel_id=None, ramdisk_id=None,
min_count=None, max_count=None,
@@ -586,16 +600,9 @@ class API(base.Base):
could be 'None' or a list of instance dicts depending on if
we waited for information from the scheduler or not.
"""
- target = {'project_id': context.project_id,
- 'user_id': context.user_id,
- 'availability_zone': availability_zone}
- check_policy(context, 'create', target)
- if requested_networks:
- check_policy(context, 'create:attach_network', target)
-
- if block_device_mapping:
- check_policy(context, 'create:attach_volume', target)
+ self._check_create_policies(context, availability_zone,
+ requested_networks, block_device_mapping)
# We can create the DB entry for the instance here if we're
# only going to create 1 instance.
@@ -843,20 +850,28 @@ class API(base.Base):
else:
LOG.warning(_("No host for instance %s, deleting immediately"),
instance["uuid"])
- self.db.instance_destroy(context, instance['id'])
+ try:
+ self.db.instance_destroy(context, instance['id'])
+ except exception.InstanceNotFound:
+ # NOTE(comstud): Race condition. Instance already gone.
+ pass
def _delete(self, context, instance):
host = instance['host']
- if host:
- self.update(context,
- instance,
- task_state=task_states.DELETING,
- progress=0)
-
- self._cast_compute_message('terminate_instance', context,
- instance)
- else:
- self.db.instance_destroy(context, instance['id'])
+ try:
+ if host:
+ self.update(context,
+ instance,
+ task_state=task_states.DELETING,
+ progress=0)
+
+ self._cast_compute_message('terminate_instance', context,
+ instance)
+ else:
+ self.db.instance_destroy(context, instance['id'])
+ except exception.InstanceNotFound:
+ # NOTE(comstud): Race condition. Instance already gone.
+ pass
# NOTE(jerdfelt): The API implies that only ACTIVE and ERROR are
# allowed but the EC2 API appears to allow from RESCUED and STOPPED
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 4e92c61b6..4bf89c7e8 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -1354,11 +1354,12 @@ def instance_create(context, values):
context - request context object
values - dict containing column values.
"""
+ values = values.copy()
values['metadata'] = _metadata_refs(values.get('metadata'),
models.InstanceMetadata)
instance_ref = models.Instance()
- instance_ref['uuid'] = str(utils.gen_uuid())
-
+ if not values.get('uuid'):
+ values['uuid'] = str(utils.gen_uuid())
instance_ref.update(values)
session = get_session()
@@ -1388,7 +1389,13 @@ def instance_data_get_for_project(context, project_id):
def instance_destroy(context, instance_id):
session = get_session()
with session.begin():
- instance_ref = instance_get(context, instance_id, session=session)
+ if utils.is_uuid_like(instance_id):
+ instance_ref = instance_get_by_uuid(context, instance_id,
+ session=session)
+ instance_id = instance_ref['id']
+ else:
+ instance_ref = instance_get(context, instance_id,
+ session=session)
session.query(models.Instance).\
filter_by(id=instance_id).\
update({'deleted': True,
@@ -1412,6 +1419,7 @@ def instance_destroy(context, instance_id):
instance_info_cache_delete(context, instance_ref['uuid'],
session=session)
+ return instance_ref
@require_context
@@ -3541,7 +3549,7 @@ def zone_get(context, zone_id):
@require_admin_context
def zone_get_all(context):
- return model_query(context, models.Zone, read_deleted="yes").all()
+ return model_query(context, models.Zone, read_deleted="no").all()
####################
diff --git a/nova/db/sqlalchemy/migrate_repo/versions/078_add_rpc_info_to_zones.py b/nova/db/sqlalchemy/migrate_repo/versions/078_add_rpc_info_to_zones.py
new file mode 100644
index 000000000..5f1000afe
--- /dev/null
+++ b/nova/db/sqlalchemy/migrate_repo/versions/078_add_rpc_info_to_zones.py
@@ -0,0 +1,44 @@
+# Copyright 2012 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import *
+
+meta = MetaData()
+
+zones = Table('zones', meta,
+ Column('id', Integer(), primary_key=True, nullable=False),
+ )
+
+is_parent = Column('is_parent', Boolean(), default=False)
+rpc_host = Column('rpc_host', String(255))
+rpc_port = Column('rpc_port', Integer())
+rpc_virtual_host = Column('rpc_virtual_host', String(255))
+
+
+def upgrade(migrate_engine):
+ meta.bind = migrate_engine
+
+ zones.create_column(is_parent)
+ zones.create_column(rpc_host)
+ zones.create_column(rpc_port)
+ zones.create_column(rpc_virtual_host)
+
+
+def downgrade(migrate_engine):
+ meta.bind = migrate_engine
+
+ zones.drop_column(rpc_virtual_host)
+ zones.drop_column(rpc_port)
+ zones.drop_column(rpc_host)
+ zones.drop_column(is_parent)
diff --git a/nova/db/sqlalchemy/migrate_repo/versions/078_sqlite_downgrade.sql b/nova/db/sqlalchemy/migrate_repo/versions/078_sqlite_downgrade.sql
new file mode 100644
index 000000000..cf18c1c29
--- /dev/null
+++ b/nova/db/sqlalchemy/migrate_repo/versions/078_sqlite_downgrade.sql
@@ -0,0 +1,35 @@
+BEGIN TRANSACTION;
+
+ CREATE TEMPORARY TABLE zones_temp (
+ created_at DATETIME,
+ updated_at DATETIME,
+ deleted_at DATETIME,
+ deleted BOOLEAN,
+ id INTEGER NOT NULL,
+ name VARCHAR(255),
+ api_url VARVHAR(255),
+ username VARCHAR(255),
+ password VARCHAR(255),
+ weight_offset FLOAT,
+ weight_scale FLOAT,
+ PRIMARY KEY (id),
+ CHECK (deleted IN (0, 1))
+ );
+
+ INSERT INTO zones_temp
+ SELECT created_at,
+ updated_at,
+ deleted_at,
+ deleted,
+ id,
+ name,
+ api_url,
+ username,
+ password,
+ weight_offset,
+ weight_scale FROM zones;
+
+ DROP TABLE zones;
+
+ ALTER TABLE zones_temp RENAME TO zones;
+COMMIT;
diff --git a/nova/db/sqlalchemy/migrate_repo/versions/079_add_zone_name_to_instances.py b/nova/db/sqlalchemy/migrate_repo/versions/079_add_zone_name_to_instances.py
new file mode 100644
index 000000000..bff1b685b
--- /dev/null
+++ b/nova/db/sqlalchemy/migrate_repo/versions/079_add_zone_name_to_instances.py
@@ -0,0 +1,31 @@
+# Copyright 2012 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from sqlalchemy import *
+
+meta = MetaData()
+
+
+def upgrade(migrate_engine):
+ meta.bind = migrate_engine
+ instances = Table('instances', meta, autoload=True)
+ zone_name = Column('zone_name', String(255))
+ instances.create_column(zone_name)
+
+
+def downgrade(migrate_engine):
+ meta.bind = migrate_engine
+ instances = Table('instances', meta, autoload=True)
+ zone_name = Column('zone_name', String(255))
+ instances.drop_column(zone_name)
diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py
index 74481bda8..c99d9489b 100644
--- a/nova/db/sqlalchemy/models.py
+++ b/nova/db/sqlalchemy/models.py
@@ -277,6 +277,9 @@ class Instance(BASE, NovaBase):
# EC2 disable_api_termination
disable_terminate = Column(Boolean(), default=False, nullable=False)
+ # Openstack zone name
+ zone_name = Column(String(255))
+
class InstanceInfoCache(BASE, NovaBase):
"""
@@ -876,6 +879,10 @@ class Zone(BASE, NovaBase):
password = Column(String(255))
weight_offset = Column(Float(), default=0.0)
weight_scale = Column(Float(), default=1.0)
+ is_parent = Column(Boolean())
+ rpc_host = Column(String(255))
+ rpc_port = Column(Integer())
+ rpc_virtual_host = Column(String(255))
class Aggregate(BASE, NovaBase):
diff --git a/nova/flags.py b/nova/flags.py
index 3b06760de..af490e287 100644
--- a/nova/flags.py
+++ b/nova/flags.py
@@ -441,7 +441,16 @@ global_opts = [
help='Cache glance images locally'),
cfg.BoolOpt('use_cow_images',
default=True,
- help='Whether to use cow images')
+ help='Whether to use cow images'),
+ cfg.StrOpt('compute_api_class',
+ default='nova.compute.api.API',
+ help='The compute API class to use'),
+ cfg.StrOpt('network_api_class',
+ default='nova.network.api.API',
+ help='The network API class to use'),
+ cfg.StrOpt('volume_api_class',
+ default='nova.volume.api.API',
+ help='The volume API class to use'),
]
FLAGS.register_opts(global_opts)
diff --git a/nova/network/__init__.py b/nova/network/__init__.py
index 6eb3e3ef6..4c272c50a 100644
--- a/nova/network/__init__.py
+++ b/nova/network/__init__.py
@@ -16,4 +16,9 @@
# License for the specific language governing permissions and limitations
# under the License.
-from nova.network.api import API
+# Importing full names to not pollute the namespace and cause possible
+# collisions with use of 'from nova.network import <foo>' elsewhere.
+import nova.flags
+import nova.utils
+
+API = nova.utils.import_class(nova.flags.FLAGS.network_api_class)
diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py
index 3ad6ddd2e..412a32b7d 100644
--- a/nova/rpc/__init__.py
+++ b/nova/rpc/__init__.py
@@ -161,6 +161,37 @@ def cleanup():
return _get_impl().cleanup()
+def cast_to_server(context, server_params, topic, msg):
+ """Invoke a remote method that does not return anything.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param server_params: Connection information
+ :param topic: The topic to send the notification to.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+
+ :returns: None
+ """
+ return _get_impl().cast_to_server(context, server_params, topic, msg)
+
+
+def fanout_cast_to_server(context, server_params, topic, msg):
+ """Broadcast to a remote method invocation with no return.
+
+ :param context: Information that identifies the user that has made this
+ request.
+ :param server_params: Connection information
+ :param topic: The topic to send the notification to.
+ :param msg: This is a dict in the form { "method" : "method_to_invoke",
+ "args" : dict_of_kwargs }
+
+ :returns: None
+ """
+ return _get_impl().fanout_cast_to_server(context, server_params, topic,
+ msg)
+
+
_RPCIMPL = None
diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py
index 01e127764..95655d385 100644
--- a/nova/rpc/amqp.py
+++ b/nova/rpc/amqp.py
@@ -73,14 +73,15 @@ class ConnectionContext(rpc_common.Connection):
the pool.
"""
- def __init__(self, connection_pool, pooled=True):
+ def __init__(self, connection_pool, pooled=True, server_params=None):
"""Create a new connection, or get one from the pool"""
self.connection = None
self.connection_pool = connection_pool
if pooled:
self.connection = connection_pool.get()
else:
- self.connection = connection_pool.connection_cls()
+ self.connection = connection_pool.connection_cls(
+ server_params=server_params)
self.pooled = pooled
def __enter__(self):
@@ -353,6 +354,23 @@ def fanout_cast(context, topic, msg, connection_pool):
conn.fanout_send(topic, msg)
+def cast_to_server(context, server_params, topic, msg, connection_pool):
+ """Sends a message on a topic to a specific server."""
+ pack_context(msg, context)
+ with ConnectionContext(connection_pool, pooled=False,
+ server_params=server_params) as conn:
+ conn.topic_send(topic, msg)
+
+
+def fanout_cast_to_server(context, server_params, topic, msg,
+ connection_pool):
+ """Sends a message on a fanout exchange to a specific server."""
+ pack_context(msg, context)
+ with ConnectionContext(connection_pool, pooled=False,
+ server_params=server_params) as conn:
+ conn.fanout_send(topic, msg)
+
+
def notify(context, topic, msg, connection_pool):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending notification on %s...'), topic)
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index a90d06a76..99402f6bc 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -27,8 +27,6 @@ import kombu.entity
import kombu.messaging
import kombu.connection
-from nova import context
-from nova import exception
from nova import flags
from nova.rpc import common as rpc_common
from nova.rpc import amqp as rpc_amqp
@@ -310,7 +308,7 @@ class NotifyPublisher(TopicPublisher):
class Connection(object):
"""Connection object."""
- def __init__(self):
+ def __init__(self, server_params=None):
self.consumers = []
self.consumer_thread = None
self.max_retries = FLAGS.rabbit_max_retries
@@ -323,11 +321,25 @@ class Connection(object):
self.interval_max = 30
self.memory_transport = False
- self.params = dict(hostname=FLAGS.rabbit_host,
- port=FLAGS.rabbit_port,
- userid=FLAGS.rabbit_userid,
- password=FLAGS.rabbit_password,
- virtual_host=FLAGS.rabbit_virtual_host)
+ if server_params is None:
+ server_params = {}
+
+ # Keys to translate from server_params to kombu params
+ server_params_to_kombu_params = {'username': 'userid'}
+
+ params = {}
+ for sp_key, value in server_params.iteritems():
+ p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+ params[p_key] = value
+
+ params.setdefault('hostname', FLAGS.rabbit_host)
+ params.setdefault('port', FLAGS.rabbit_port)
+ params.setdefault('userid', FLAGS.rabbit_userid)
+ params.setdefault('password', FLAGS.rabbit_password)
+ params.setdefault('virtual_host', FLAGS.rabbit_virtual_host)
+
+ self.params = params
+
if FLAGS.fake_rabbit:
self.params['transport'] = 'memory'
self.memory_transport = True
@@ -588,10 +600,10 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
self.declare_fanout_consumer(topic,
- rpc_amqp.ProxyCallback(proxy, Connection.pool))
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
else:
self.declare_topic_consumer(topic,
- rpc_amqp.ProxyCallback(proxy, Connection.pool))
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
@@ -622,6 +634,18 @@ def fanout_cast(context, topic, msg):
return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
+def cast_to_server(context, server_params, topic, msg):
+ """Sends a message on a topic to a specific server."""
+ return rpc_amqp.cast_to_server(context, server_params, topic, msg,
+ Connection.pool)
+
+
+def fanout_cast_to_server(context, server_params, topic, msg):
+ """Sends a message on a fanout exchange to a specific server."""
+ return rpc_amqp.cast_to_server(context, server_params, topic, msg,
+ Connection.pool)
+
+
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg, Connection.pool)
diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py
index 10bc162f7..98f8b06d6 100644
--- a/nova/rpc/impl_qpid.py
+++ b/nova/rpc/impl_qpid.py
@@ -272,19 +272,31 @@ class NotifyPublisher(Publisher):
class Connection(object):
"""Connection object."""
- def __init__(self):
+ def __init__(self, server_params=None):
self.session = None
self.consumers = {}
self.consumer_thread = None
- self.broker = FLAGS.qpid_hostname + ":" + FLAGS.qpid_port
+ if server_params is None:
+ server_params = {}
+
+ default_params = dict(hostname=FLAGS.qpid_hostname,
+ port=FLAGS.qpid_port,
+ username=FLAGS.qpid_username,
+ password=FLAGS.qpid_password)
+
+ params = server_params
+ for key in default_params.keys():
+ params.setdefault(key, default_params[key])
+
+ self.broker = params['hostname'] + ":" + str(params['port'])
# Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker)
# Check if flags are set and if so set them for the connection
# before we call open
- self.connection.username = FLAGS.qpid_username
- self.connection.password = FLAGS.qpid_password
+ self.connection.username = params['username']
+ self.connection.password = params['password']
self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms
self.connection.reconnect = FLAGS.qpid_reconnect
self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout
@@ -474,10 +486,10 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
consumer = FanoutConsumer(self.session, topic,
- rpc_amqp.ProxyCallback(proxy, Connection.pool))
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
else:
consumer = TopicConsumer(self.session, topic,
- rpc_amqp.ProxyCallback(proxy, Connection.pool))
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
self._register_consumer(consumer)
return consumer
@@ -510,6 +522,18 @@ def fanout_cast(context, topic, msg):
return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
+def cast_to_server(context, server_params, topic, msg):
+ """Sends a message on a topic to a specific server."""
+ return rpc_amqp.cast_to_server(context, server_params, topic, msg,
+ Connection.pool)
+
+
+def fanout_cast_to_server(context, server_params, topic, msg):
+ """Sends a message on a fanout exchange to a specific server."""
+ return rpc_amqp.fanout_cast_to_server(context, server_params, topic,
+ msg, Connection.pool)
+
+
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg, Connection.pool)
diff --git a/nova/tests/api/openstack/compute/test_extensions.py b/nova/tests/api/openstack/compute/test_extensions.py
index bd292c533..1a281590f 100644
--- a/nova/tests/api/openstack/compute/test_extensions.py
+++ b/nova/tests/api/openstack/compute/test_extensions.py
@@ -29,7 +29,6 @@ from nova.api.openstack import xmlutil
from nova import flags
from nova import test
from nova.tests.api.openstack import fakes
-from nova import wsgi as base_wsgi
FLAGS = flags.FLAGS
diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py
index 45a726f39..281ba88e6 100644
--- a/nova/tests/api/openstack/compute/test_servers.py
+++ b/nova/tests/api/openstack/compute/test_servers.py
@@ -1103,6 +1103,7 @@ class ServersControllerTest(test.TestCase):
'display_name': 'server_test',
}
self.assertEqual(params, filtered_dict)
+ filtered_dict['uuid'] = id
return filtered_dict
self.stubs.Set(nova.db, 'instance_update', server_update)
diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py
index 928582a2f..f90d111fc 100644
--- a/nova/tests/rpc/test_kombu.py
+++ b/nova/tests/rpc/test_kombu.py
@@ -19,12 +19,15 @@
Unit Tests for remote procedure calls using kombu
"""
+from nova import context
+from nova import flags
from nova import log as logging
from nova import test
+from nova.rpc import amqp as rpc_amqp
from nova.rpc import impl_kombu
from nova.tests.rpc import common
-
+FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
@@ -99,6 +102,61 @@ class RpcKombuTestCase(common._BaseRpcTestCase):
self.assertEqual(self.received_message, message)
+ def test_cast_interface_uses_default_options(self):
+ """Test kombu rpc.cast"""
+
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+
+ class MyConnection(impl_kombu.Connection):
+ def __init__(myself, *args, **kwargs):
+ super(MyConnection, myself).__init__(*args, **kwargs)
+ self.assertEqual(myself.params,
+ {'hostname': FLAGS.rabbit_host,
+ 'userid': FLAGS.rabbit_userid,
+ 'password': FLAGS.rabbit_password,
+ 'port': FLAGS.rabbit_port,
+ 'virtual_host': FLAGS.rabbit_virtual_host,
+ 'transport': 'memory'})
+
+ def topic_send(_context, topic, msg):
+ pass
+
+ MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
+ self.stubs.Set(impl_kombu, 'Connection', MyConnection)
+
+ impl_kombu.cast(ctxt, 'fake_topic', {'msg': 'fake'})
+
+ def test_cast_to_server_uses_server_params(self):
+ """Test kombu rpc.cast"""
+
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+
+ server_params = {'username': 'fake_username',
+ 'password': 'fake_password',
+ 'hostname': 'fake_hostname',
+ 'port': 31337,
+ 'virtual_host': 'fake_virtual_host'}
+
+ class MyConnection(impl_kombu.Connection):
+ def __init__(myself, *args, **kwargs):
+ super(MyConnection, myself).__init__(*args, **kwargs)
+ self.assertEqual(myself.params,
+ {'hostname': server_params['hostname'],
+ 'userid': server_params['username'],
+ 'password': server_params['password'],
+ 'port': server_params['port'],
+ 'virtual_host': server_params['virtual_host'],
+ 'transport': 'memory'})
+
+ def topic_send(_context, topic, msg):
+ pass
+
+ MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
+ self.stubs.Set(impl_kombu, 'Connection', MyConnection)
+
+ impl_kombu.cast_to_server(ctxt, server_params,
+ 'fake_topic', {'msg': 'fake'})
+
@test.skip_test("kombu memory transport seems buggy with fanout queues "
"as this test passes when you use rabbit (fake_rabbit=False)")
def test_fanout_send_receive(self):
diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py
index 61d47fb5d..fb8be4c78 100644
--- a/nova/tests/rpc/test_qpid.py
+++ b/nova/tests/rpc/test_qpid.py
@@ -24,6 +24,7 @@ import mox
from nova import context
from nova import log as logging
+from nova.rpc import amqp as rpc_amqp
from nova import test
try:
@@ -80,6 +81,10 @@ class RpcQpidTestCase(test.TestCase):
qpid.messaging.Session = self.orig_session
qpid.messaging.Sender = self.orig_sender
qpid.messaging.Receiver = self.orig_receiver
+ if impl_qpid:
+ # Need to reset this in case we changed the connection_cls
+ # in self._setup_to_server_tests()
+ impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
self.mocker.ResetAll()
@@ -147,13 +152,15 @@ class RpcQpidTestCase(test.TestCase):
def test_create_consumer_fanout(self):
self._test_create_consumer(fanout=True)
- def _test_cast(self, fanout):
+ def _test_cast(self, fanout, server_params=None):
+
self.mock_connection = self.mocker.CreateMock(self.orig_connection)
self.mock_session = self.mocker.CreateMock(self.orig_session)
self.mock_sender = self.mocker.CreateMock(self.orig_sender)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
+
self.mock_connection.session().AndReturn(self.mock_session)
if fanout:
expected_address = ('impl_qpid_test_fanout ; '
@@ -166,22 +173,34 @@ class RpcQpidTestCase(test.TestCase):
'"create": "always"}')
self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
- # This is a pooled connection, so instead of closing it, it gets reset,
- # which is just creating a new session on the connection.
- self.mock_session.close()
- self.mock_connection.session().AndReturn(self.mock_session)
+ if not server_params:
+ # This is a pooled connection, so instead of closing it, it
+ # gets reset, which is just creating a new session on the
+ # connection.
+ self.mock_session.close()
+ self.mock_connection.session().AndReturn(self.mock_session)
self.mocker.ReplayAll()
try:
ctx = context.RequestContext("user", "project")
- if fanout:
- impl_qpid.fanout_cast(ctx, "impl_qpid_test",
- {"method": "test_method", "args": {}})
+ args = [ctx, "impl_qpid_test",
+ {"method": "test_method", "args": {}}]
+
+ if server_params:
+ args.insert(1, server_params)
+ if fanout:
+ method = impl_qpid.fanout_cast_to_server
+ else:
+ method = impl_qpid.cast_to_server
else:
- impl_qpid.cast(ctx, "impl_qpid_test",
- {"method": "test_method", "args": {}})
+ if fanout:
+ method = impl_qpid.fanout_cast
+ else:
+ method = impl_qpid.cast
+
+ method(*args)
self.mocker.VerifyAll()
finally:
@@ -198,6 +217,39 @@ class RpcQpidTestCase(test.TestCase):
def test_fanout_cast(self):
self._test_cast(fanout=True)
+ def _setup_to_server_tests(self, server_params):
+ class MyConnection(impl_qpid.Connection):
+ def __init__(myself, *args, **kwargs):
+ super(MyConnection, myself).__init__(*args, **kwargs)
+ self.assertEqual(myself.connection.username,
+ server_params['username'])
+ self.assertEqual(myself.connection.password,
+ server_params['password'])
+ self.assertEqual(myself.broker,
+ server_params['hostname'] + ':' +
+ str(server_params['port']))
+
+ MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
+ self.stubs.Set(impl_qpid, 'Connection', MyConnection)
+
+ @test.skip_if(qpid is None, "Test requires qpid")
+ def test_cast_to_server(self):
+ server_params = {'username': 'fake_username',
+ 'password': 'fake_password',
+ 'hostname': 'fake_hostname',
+ 'port': 31337}
+ self._setup_to_server_tests(server_params)
+ self._test_cast(fanout=False, server_params=server_params)
+
+ @test.skip_if(qpid is None, "Test requires qpid")
+ def test_fanout_cast_to_server(self):
+ server_params = {'username': 'fake_username',
+ 'password': 'fake_password',
+ 'hostname': 'fake_hostname',
+ 'port': 31337}
+ self._setup_to_server_tests(server_params)
+ self._test_cast(fanout=True, server_params=server_params)
+
def _test_call(self, multi):
self.mock_connection = self.mocker.CreateMock(self.orig_connection)
self.mock_session = self.mocker.CreateMock(self.orig_session)
diff --git a/nova/volume/__init__.py b/nova/volume/__init__.py
index 56ef9332e..40c1e4e7d 100644
--- a/nova/volume/__init__.py
+++ b/nova/volume/__init__.py
@@ -16,4 +16,9 @@
# License for the specific language governing permissions and limitations
# under the License.
-from nova.volume.api import API
+# Importing full names to not pollute the namespace and cause possible
+# collisions with use of 'from nova.volume import <foo>' elsewhere.
+import nova.flags
+import nova.utils
+
+API = nova.utils.import_class(nova.flags.FLAGS.volume_api_class)