diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-02-16 21:41:43 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-02-16 21:41:43 +0000 |
| commit | 34d77ac8b1919a287865a4bef376579b6bf09b48 (patch) | |
| tree | 271e25c53088c1f1aa27d46fbe6dfc1a0a558b0f | |
| parent | 26227b79e9246a87eeb83766cfcc8e96d294d28b (diff) | |
| parent | c729ba8c0aa4d283e84d139bc98e0e89fd933c4a (diff) | |
Merge "Core modifications for future zones service."
| -rw-r--r-- | nova/compute/__init__.py | 7 | ||||
| -rw-r--r-- | nova/compute/api.py | 55 | ||||
| -rw-r--r-- | nova/db/sqlalchemy/api.py | 16 | ||||
| -rw-r--r-- | nova/db/sqlalchemy/migrate_repo/versions/078_add_rpc_info_to_zones.py | 44 | ||||
| -rw-r--r-- | nova/db/sqlalchemy/migrate_repo/versions/078_sqlite_downgrade.sql | 35 | ||||
| -rw-r--r-- | nova/db/sqlalchemy/migrate_repo/versions/079_add_zone_name_to_instances.py | 31 | ||||
| -rw-r--r-- | nova/db/sqlalchemy/models.py | 7 | ||||
| -rw-r--r-- | nova/flags.py | 11 | ||||
| -rw-r--r-- | nova/network/__init__.py | 7 | ||||
| -rw-r--r-- | nova/rpc/__init__.py | 31 | ||||
| -rw-r--r-- | nova/rpc/amqp.py | 22 | ||||
| -rw-r--r-- | nova/rpc/impl_kombu.py | 44 | ||||
| -rw-r--r-- | nova/rpc/impl_qpid.py | 36 | ||||
| -rw-r--r-- | nova/tests/api/openstack/compute/test_extensions.py | 1 | ||||
| -rw-r--r-- | nova/tests/api/openstack/compute/test_servers.py | 1 | ||||
| -rw-r--r-- | nova/tests/rpc/test_kombu.py | 60 | ||||
| -rw-r--r-- | nova/tests/rpc/test_qpid.py | 72 | ||||
| -rw-r--r-- | nova/volume/__init__.py | 7 |
18 files changed, 429 insertions, 58 deletions
diff --git a/nova/compute/__init__.py b/nova/compute/__init__.py index c0cfe7c27..72df5ddd5 100644 --- a/nova/compute/__init__.py +++ b/nova/compute/__init__.py @@ -16,5 +16,10 @@ # License for the specific language governing permissions and limitations # under the License. -from nova.compute.api import API from nova.compute.api import AggregateAPI +# Importing full names to not pollute the namespace and cause possible +# collisions with use of 'from nova.compute import <foo>' elsewhere. +import nova.flags +import nova.utils + +API = nova.utils.import_class(nova.flags.FLAGS.compute_api_class) diff --git a/nova/compute/api.py b/nova/compute/api.py index 637be01af..d93443ea4 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -567,6 +567,20 @@ class API(base.Base): "requested_networks": requested_networks, "filter_properties": filter_properties}}) + def _check_create_policies(self, context, availability_zone, + requested_networks, block_device_mapping): + """Check policies for create().""" + target = {'project_id': context.project_id, + 'user_id': context.user_id, + 'availability_zone': availability_zone} + check_policy(context, 'create', target) + + if requested_networks: + check_policy(context, 'create:attach_network', target) + + if block_device_mapping: + check_policy(context, 'create:attach_volume', target) + def create(self, context, instance_type, image_href, kernel_id=None, ramdisk_id=None, min_count=None, max_count=None, @@ -586,16 +600,9 @@ class API(base.Base): could be 'None' or a list of instance dicts depending on if we waited for information from the scheduler or not. """ - target = {'project_id': context.project_id, - 'user_id': context.user_id, - 'availability_zone': availability_zone} - check_policy(context, 'create', target) - if requested_networks: - check_policy(context, 'create:attach_network', target) - - if block_device_mapping: - check_policy(context, 'create:attach_volume', target) + self._check_create_policies(context, availability_zone, + requested_networks, block_device_mapping) # We can create the DB entry for the instance here if we're # only going to create 1 instance. @@ -843,20 +850,28 @@ class API(base.Base): else: LOG.warning(_("No host for instance %s, deleting immediately"), instance["uuid"]) - self.db.instance_destroy(context, instance['id']) + try: + self.db.instance_destroy(context, instance['id']) + except exception.InstanceNotFound: + # NOTE(comstud): Race condition. Instance already gone. + pass def _delete(self, context, instance): host = instance['host'] - if host: - self.update(context, - instance, - task_state=task_states.DELETING, - progress=0) - - self._cast_compute_message('terminate_instance', context, - instance) - else: - self.db.instance_destroy(context, instance['id']) + try: + if host: + self.update(context, + instance, + task_state=task_states.DELETING, + progress=0) + + self._cast_compute_message('terminate_instance', context, + instance) + else: + self.db.instance_destroy(context, instance['id']) + except exception.InstanceNotFound: + # NOTE(comstud): Race condition. Instance already gone. + pass # NOTE(jerdfelt): The API implies that only ACTIVE and ERROR are # allowed but the EC2 API appears to allow from RESCUED and STOPPED diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 4e92c61b6..4bf89c7e8 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -1354,11 +1354,12 @@ def instance_create(context, values): context - request context object values - dict containing column values. """ + values = values.copy() values['metadata'] = _metadata_refs(values.get('metadata'), models.InstanceMetadata) instance_ref = models.Instance() - instance_ref['uuid'] = str(utils.gen_uuid()) - + if not values.get('uuid'): + values['uuid'] = str(utils.gen_uuid()) instance_ref.update(values) session = get_session() @@ -1388,7 +1389,13 @@ def instance_data_get_for_project(context, project_id): def instance_destroy(context, instance_id): session = get_session() with session.begin(): - instance_ref = instance_get(context, instance_id, session=session) + if utils.is_uuid_like(instance_id): + instance_ref = instance_get_by_uuid(context, instance_id, + session=session) + instance_id = instance_ref['id'] + else: + instance_ref = instance_get(context, instance_id, + session=session) session.query(models.Instance).\ filter_by(id=instance_id).\ update({'deleted': True, @@ -1412,6 +1419,7 @@ def instance_destroy(context, instance_id): instance_info_cache_delete(context, instance_ref['uuid'], session=session) + return instance_ref @require_context @@ -3541,7 +3549,7 @@ def zone_get(context, zone_id): @require_admin_context def zone_get_all(context): - return model_query(context, models.Zone, read_deleted="yes").all() + return model_query(context, models.Zone, read_deleted="no").all() #################### diff --git a/nova/db/sqlalchemy/migrate_repo/versions/078_add_rpc_info_to_zones.py b/nova/db/sqlalchemy/migrate_repo/versions/078_add_rpc_info_to_zones.py new file mode 100644 index 000000000..5f1000afe --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/078_add_rpc_info_to_zones.py @@ -0,0 +1,44 @@ +# Copyright 2012 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import * + +meta = MetaData() + +zones = Table('zones', meta, + Column('id', Integer(), primary_key=True, nullable=False), + ) + +is_parent = Column('is_parent', Boolean(), default=False) +rpc_host = Column('rpc_host', String(255)) +rpc_port = Column('rpc_port', Integer()) +rpc_virtual_host = Column('rpc_virtual_host', String(255)) + + +def upgrade(migrate_engine): + meta.bind = migrate_engine + + zones.create_column(is_parent) + zones.create_column(rpc_host) + zones.create_column(rpc_port) + zones.create_column(rpc_virtual_host) + + +def downgrade(migrate_engine): + meta.bind = migrate_engine + + zones.drop_column(rpc_virtual_host) + zones.drop_column(rpc_port) + zones.drop_column(rpc_host) + zones.drop_column(is_parent) diff --git a/nova/db/sqlalchemy/migrate_repo/versions/078_sqlite_downgrade.sql b/nova/db/sqlalchemy/migrate_repo/versions/078_sqlite_downgrade.sql new file mode 100644 index 000000000..cf18c1c29 --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/078_sqlite_downgrade.sql @@ -0,0 +1,35 @@ +BEGIN TRANSACTION; + + CREATE TEMPORARY TABLE zones_temp ( + created_at DATETIME, + updated_at DATETIME, + deleted_at DATETIME, + deleted BOOLEAN, + id INTEGER NOT NULL, + name VARCHAR(255), + api_url VARVHAR(255), + username VARCHAR(255), + password VARCHAR(255), + weight_offset FLOAT, + weight_scale FLOAT, + PRIMARY KEY (id), + CHECK (deleted IN (0, 1)) + ); + + INSERT INTO zones_temp + SELECT created_at, + updated_at, + deleted_at, + deleted, + id, + name, + api_url, + username, + password, + weight_offset, + weight_scale FROM zones; + + DROP TABLE zones; + + ALTER TABLE zones_temp RENAME TO zones; +COMMIT; diff --git a/nova/db/sqlalchemy/migrate_repo/versions/079_add_zone_name_to_instances.py b/nova/db/sqlalchemy/migrate_repo/versions/079_add_zone_name_to_instances.py new file mode 100644 index 000000000..bff1b685b --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/079_add_zone_name_to_instances.py @@ -0,0 +1,31 @@ +# Copyright 2012 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import * + +meta = MetaData() + + +def upgrade(migrate_engine): + meta.bind = migrate_engine + instances = Table('instances', meta, autoload=True) + zone_name = Column('zone_name', String(255)) + instances.create_column(zone_name) + + +def downgrade(migrate_engine): + meta.bind = migrate_engine + instances = Table('instances', meta, autoload=True) + zone_name = Column('zone_name', String(255)) + instances.drop_column(zone_name) diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index 74481bda8..c99d9489b 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -277,6 +277,9 @@ class Instance(BASE, NovaBase): # EC2 disable_api_termination disable_terminate = Column(Boolean(), default=False, nullable=False) + # Openstack zone name + zone_name = Column(String(255)) + class InstanceInfoCache(BASE, NovaBase): """ @@ -876,6 +879,10 @@ class Zone(BASE, NovaBase): password = Column(String(255)) weight_offset = Column(Float(), default=0.0) weight_scale = Column(Float(), default=1.0) + is_parent = Column(Boolean()) + rpc_host = Column(String(255)) + rpc_port = Column(Integer()) + rpc_virtual_host = Column(String(255)) class Aggregate(BASE, NovaBase): diff --git a/nova/flags.py b/nova/flags.py index 3b06760de..af490e287 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -441,7 +441,16 @@ global_opts = [ help='Cache glance images locally'), cfg.BoolOpt('use_cow_images', default=True, - help='Whether to use cow images') + help='Whether to use cow images'), + cfg.StrOpt('compute_api_class', + default='nova.compute.api.API', + help='The compute API class to use'), + cfg.StrOpt('network_api_class', + default='nova.network.api.API', + help='The network API class to use'), + cfg.StrOpt('volume_api_class', + default='nova.volume.api.API', + help='The volume API class to use'), ] FLAGS.register_opts(global_opts) diff --git a/nova/network/__init__.py b/nova/network/__init__.py index 6eb3e3ef6..4c272c50a 100644 --- a/nova/network/__init__.py +++ b/nova/network/__init__.py @@ -16,4 +16,9 @@ # License for the specific language governing permissions and limitations # under the License. -from nova.network.api import API +# Importing full names to not pollute the namespace and cause possible +# collisions with use of 'from nova.network import <foo>' elsewhere. +import nova.flags +import nova.utils + +API = nova.utils.import_class(nova.flags.FLAGS.network_api_class) diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index 3ad6ddd2e..412a32b7d 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -161,6 +161,37 @@ def cleanup(): return _get_impl().cleanup() +def cast_to_server(context, server_params, topic, msg): + """Invoke a remote method that does not return anything. + + :param context: Information that identifies the user that has made this + request. + :param server_params: Connection information + :param topic: The topic to send the notification to. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().cast_to_server(context, server_params, topic, msg) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Broadcast to a remote method invocation with no return. + + :param context: Information that identifies the user that has made this + request. + :param server_params: Connection information + :param topic: The topic to send the notification to. + :param msg: This is a dict in the form { "method" : "method_to_invoke", + "args" : dict_of_kwargs } + + :returns: None + """ + return _get_impl().fanout_cast_to_server(context, server_params, topic, + msg) + + _RPCIMPL = None diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py index 01e127764..95655d385 100644 --- a/nova/rpc/amqp.py +++ b/nova/rpc/amqp.py @@ -73,14 +73,15 @@ class ConnectionContext(rpc_common.Connection): the pool. """ - def __init__(self, connection_pool, pooled=True): + def __init__(self, connection_pool, pooled=True, server_params=None): """Create a new connection, or get one from the pool""" self.connection = None self.connection_pool = connection_pool if pooled: self.connection = connection_pool.get() else: - self.connection = connection_pool.connection_cls() + self.connection = connection_pool.connection_cls( + server_params=server_params) self.pooled = pooled def __enter__(self): @@ -353,6 +354,23 @@ def fanout_cast(context, topic, msg, connection_pool): conn.fanout_send(topic, msg) +def cast_to_server(context, server_params, topic, msg, connection_pool): + """Sends a message on a topic to a specific server.""" + pack_context(msg, context) + with ConnectionContext(connection_pool, pooled=False, + server_params=server_params) as conn: + conn.topic_send(topic, msg) + + +def fanout_cast_to_server(context, server_params, topic, msg, + connection_pool): + """Sends a message on a fanout exchange to a specific server.""" + pack_context(msg, context) + with ConnectionContext(connection_pool, pooled=False, + server_params=server_params) as conn: + conn.fanout_send(topic, msg) + + def notify(context, topic, msg, connection_pool): """Sends a notification event on a topic.""" LOG.debug(_('Sending notification on %s...'), topic) diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index a90d06a76..99402f6bc 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -27,8 +27,6 @@ import kombu.entity import kombu.messaging import kombu.connection -from nova import context -from nova import exception from nova import flags from nova.rpc import common as rpc_common from nova.rpc import amqp as rpc_amqp @@ -310,7 +308,7 @@ class NotifyPublisher(TopicPublisher): class Connection(object): """Connection object.""" - def __init__(self): + def __init__(self, server_params=None): self.consumers = [] self.consumer_thread = None self.max_retries = FLAGS.rabbit_max_retries @@ -323,11 +321,25 @@ class Connection(object): self.interval_max = 30 self.memory_transport = False - self.params = dict(hostname=FLAGS.rabbit_host, - port=FLAGS.rabbit_port, - userid=FLAGS.rabbit_userid, - password=FLAGS.rabbit_password, - virtual_host=FLAGS.rabbit_virtual_host) + if server_params is None: + server_params = {} + + # Keys to translate from server_params to kombu params + server_params_to_kombu_params = {'username': 'userid'} + + params = {} + for sp_key, value in server_params.iteritems(): + p_key = server_params_to_kombu_params.get(sp_key, sp_key) + params[p_key] = value + + params.setdefault('hostname', FLAGS.rabbit_host) + params.setdefault('port', FLAGS.rabbit_port) + params.setdefault('userid', FLAGS.rabbit_userid) + params.setdefault('password', FLAGS.rabbit_password) + params.setdefault('virtual_host', FLAGS.rabbit_virtual_host) + + self.params = params + if FLAGS.fake_rabbit: self.params['transport'] = 'memory' self.memory_transport = True @@ -588,10 +600,10 @@ class Connection(object): """Create a consumer that calls a method in a proxy object""" if fanout: self.declare_fanout_consumer(topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) else: self.declare_topic_consumer(topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) Connection.pool = rpc_amqp.Pool(connection_cls=Connection) @@ -622,6 +634,18 @@ def fanout_cast(context, topic, msg): return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) +def cast_to_server(context, server_params, topic, msg): + """Sends a message on a topic to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Sends a message on a fanout exchange to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + def notify(context, topic, msg): """Sends a notification event on a topic.""" return rpc_amqp.notify(context, topic, msg, Connection.pool) diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py index 10bc162f7..98f8b06d6 100644 --- a/nova/rpc/impl_qpid.py +++ b/nova/rpc/impl_qpid.py @@ -272,19 +272,31 @@ class NotifyPublisher(Publisher): class Connection(object): """Connection object.""" - def __init__(self): + def __init__(self, server_params=None): self.session = None self.consumers = {} self.consumer_thread = None - self.broker = FLAGS.qpid_hostname + ":" + FLAGS.qpid_port + if server_params is None: + server_params = {} + + default_params = dict(hostname=FLAGS.qpid_hostname, + port=FLAGS.qpid_port, + username=FLAGS.qpid_username, + password=FLAGS.qpid_password) + + params = server_params + for key in default_params.keys(): + params.setdefault(key, default_params[key]) + + self.broker = params['hostname'] + ":" + str(params['port']) # Create the connection - this does not open the connection self.connection = qpid.messaging.Connection(self.broker) # Check if flags are set and if so set them for the connection # before we call open - self.connection.username = FLAGS.qpid_username - self.connection.password = FLAGS.qpid_password + self.connection.username = params['username'] + self.connection.password = params['password'] self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms self.connection.reconnect = FLAGS.qpid_reconnect self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout @@ -474,10 +486,10 @@ class Connection(object): """Create a consumer that calls a method in a proxy object""" if fanout: consumer = FanoutConsumer(self.session, topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) else: consumer = TopicConsumer(self.session, topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) self._register_consumer(consumer) return consumer @@ -510,6 +522,18 @@ def fanout_cast(context, topic, msg): return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) +def cast_to_server(context, server_params, topic, msg): + """Sends a message on a topic to a specific server.""" + return rpc_amqp.cast_to_server(context, server_params, topic, msg, + Connection.pool) + + +def fanout_cast_to_server(context, server_params, topic, msg): + """Sends a message on a fanout exchange to a specific server.""" + return rpc_amqp.fanout_cast_to_server(context, server_params, topic, + msg, Connection.pool) + + def notify(context, topic, msg): """Sends a notification event on a topic.""" return rpc_amqp.notify(context, topic, msg, Connection.pool) diff --git a/nova/tests/api/openstack/compute/test_extensions.py b/nova/tests/api/openstack/compute/test_extensions.py index bd292c533..1a281590f 100644 --- a/nova/tests/api/openstack/compute/test_extensions.py +++ b/nova/tests/api/openstack/compute/test_extensions.py @@ -29,7 +29,6 @@ from nova.api.openstack import xmlutil from nova import flags from nova import test from nova.tests.api.openstack import fakes -from nova import wsgi as base_wsgi FLAGS = flags.FLAGS diff --git a/nova/tests/api/openstack/compute/test_servers.py b/nova/tests/api/openstack/compute/test_servers.py index 45a726f39..281ba88e6 100644 --- a/nova/tests/api/openstack/compute/test_servers.py +++ b/nova/tests/api/openstack/compute/test_servers.py @@ -1103,6 +1103,7 @@ class ServersControllerTest(test.TestCase): 'display_name': 'server_test', } self.assertEqual(params, filtered_dict) + filtered_dict['uuid'] = id return filtered_dict self.stubs.Set(nova.db, 'instance_update', server_update) diff --git a/nova/tests/rpc/test_kombu.py b/nova/tests/rpc/test_kombu.py index 928582a2f..f90d111fc 100644 --- a/nova/tests/rpc/test_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -19,12 +19,15 @@ Unit Tests for remote procedure calls using kombu """ +from nova import context +from nova import flags from nova import log as logging from nova import test +from nova.rpc import amqp as rpc_amqp from nova.rpc import impl_kombu from nova.tests.rpc import common - +FLAGS = flags.FLAGS LOG = logging.getLogger(__name__) @@ -99,6 +102,61 @@ class RpcKombuTestCase(common._BaseRpcTestCase): self.assertEqual(self.received_message, message) + def test_cast_interface_uses_default_options(self): + """Test kombu rpc.cast""" + + ctxt = context.RequestContext('fake_user', 'fake_project') + + class MyConnection(impl_kombu.Connection): + def __init__(myself, *args, **kwargs): + super(MyConnection, myself).__init__(*args, **kwargs) + self.assertEqual(myself.params, + {'hostname': FLAGS.rabbit_host, + 'userid': FLAGS.rabbit_userid, + 'password': FLAGS.rabbit_password, + 'port': FLAGS.rabbit_port, + 'virtual_host': FLAGS.rabbit_virtual_host, + 'transport': 'memory'}) + + def topic_send(_context, topic, msg): + pass + + MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection) + self.stubs.Set(impl_kombu, 'Connection', MyConnection) + + impl_kombu.cast(ctxt, 'fake_topic', {'msg': 'fake'}) + + def test_cast_to_server_uses_server_params(self): + """Test kombu rpc.cast""" + + ctxt = context.RequestContext('fake_user', 'fake_project') + + server_params = {'username': 'fake_username', + 'password': 'fake_password', + 'hostname': 'fake_hostname', + 'port': 31337, + 'virtual_host': 'fake_virtual_host'} + + class MyConnection(impl_kombu.Connection): + def __init__(myself, *args, **kwargs): + super(MyConnection, myself).__init__(*args, **kwargs) + self.assertEqual(myself.params, + {'hostname': server_params['hostname'], + 'userid': server_params['username'], + 'password': server_params['password'], + 'port': server_params['port'], + 'virtual_host': server_params['virtual_host'], + 'transport': 'memory'}) + + def topic_send(_context, topic, msg): + pass + + MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection) + self.stubs.Set(impl_kombu, 'Connection', MyConnection) + + impl_kombu.cast_to_server(ctxt, server_params, + 'fake_topic', {'msg': 'fake'}) + @test.skip_test("kombu memory transport seems buggy with fanout queues " "as this test passes when you use rabbit (fake_rabbit=False)") def test_fanout_send_receive(self): diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py index 61d47fb5d..fb8be4c78 100644 --- a/nova/tests/rpc/test_qpid.py +++ b/nova/tests/rpc/test_qpid.py @@ -24,6 +24,7 @@ import mox from nova import context from nova import log as logging +from nova.rpc import amqp as rpc_amqp from nova import test try: @@ -80,6 +81,10 @@ class RpcQpidTestCase(test.TestCase): qpid.messaging.Session = self.orig_session qpid.messaging.Sender = self.orig_sender qpid.messaging.Receiver = self.orig_receiver + if impl_qpid: + # Need to reset this in case we changed the connection_cls + # in self._setup_to_server_tests() + impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection self.mocker.ResetAll() @@ -147,13 +152,15 @@ class RpcQpidTestCase(test.TestCase): def test_create_consumer_fanout(self): self._test_create_consumer(fanout=True) - def _test_cast(self, fanout): + def _test_cast(self, fanout, server_params=None): + self.mock_connection = self.mocker.CreateMock(self.orig_connection) self.mock_session = self.mocker.CreateMock(self.orig_session) self.mock_sender = self.mocker.CreateMock(self.orig_sender) self.mock_connection.opened().AndReturn(False) self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) if fanout: expected_address = ('impl_qpid_test_fanout ; ' @@ -166,22 +173,34 @@ class RpcQpidTestCase(test.TestCase): '"create": "always"}') self.mock_session.sender(expected_address).AndReturn(self.mock_sender) self.mock_sender.send(mox.IgnoreArg()) - # This is a pooled connection, so instead of closing it, it gets reset, - # which is just creating a new session on the connection. - self.mock_session.close() - self.mock_connection.session().AndReturn(self.mock_session) + if not server_params: + # This is a pooled connection, so instead of closing it, it + # gets reset, which is just creating a new session on the + # connection. + self.mock_session.close() + self.mock_connection.session().AndReturn(self.mock_session) self.mocker.ReplayAll() try: ctx = context.RequestContext("user", "project") - if fanout: - impl_qpid.fanout_cast(ctx, "impl_qpid_test", - {"method": "test_method", "args": {}}) + args = [ctx, "impl_qpid_test", + {"method": "test_method", "args": {}}] + + if server_params: + args.insert(1, server_params) + if fanout: + method = impl_qpid.fanout_cast_to_server + else: + method = impl_qpid.cast_to_server else: - impl_qpid.cast(ctx, "impl_qpid_test", - {"method": "test_method", "args": {}}) + if fanout: + method = impl_qpid.fanout_cast + else: + method = impl_qpid.cast + + method(*args) self.mocker.VerifyAll() finally: @@ -198,6 +217,39 @@ class RpcQpidTestCase(test.TestCase): def test_fanout_cast(self): self._test_cast(fanout=True) + def _setup_to_server_tests(self, server_params): + class MyConnection(impl_qpid.Connection): + def __init__(myself, *args, **kwargs): + super(MyConnection, myself).__init__(*args, **kwargs) + self.assertEqual(myself.connection.username, + server_params['username']) + self.assertEqual(myself.connection.password, + server_params['password']) + self.assertEqual(myself.broker, + server_params['hostname'] + ':' + + str(server_params['port'])) + + MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection) + self.stubs.Set(impl_qpid, 'Connection', MyConnection) + + @test.skip_if(qpid is None, "Test requires qpid") + def test_cast_to_server(self): + server_params = {'username': 'fake_username', + 'password': 'fake_password', + 'hostname': 'fake_hostname', + 'port': 31337} + self._setup_to_server_tests(server_params) + self._test_cast(fanout=False, server_params=server_params) + + @test.skip_if(qpid is None, "Test requires qpid") + def test_fanout_cast_to_server(self): + server_params = {'username': 'fake_username', + 'password': 'fake_password', + 'hostname': 'fake_hostname', + 'port': 31337} + self._setup_to_server_tests(server_params) + self._test_cast(fanout=True, server_params=server_params) + def _test_call(self, multi): self.mock_connection = self.mocker.CreateMock(self.orig_connection) self.mock_session = self.mocker.CreateMock(self.orig_session) diff --git a/nova/volume/__init__.py b/nova/volume/__init__.py index 56ef9332e..40c1e4e7d 100644 --- a/nova/volume/__init__.py +++ b/nova/volume/__init__.py @@ -16,4 +16,9 @@ # License for the specific language governing permissions and limitations # under the License. -from nova.volume.api import API +# Importing full names to not pollute the namespace and cause possible +# collisions with use of 'from nova.volume import <foo>' elsewhere. +import nova.flags +import nova.utils + +API = nova.utils.import_class(nova.flags.FLAGS.volume_api_class) |
