diff options
-rw-r--r-- | Authors | 1 | ||||
-rw-r--r-- | nova/rpc/__init__.py | 66 | ||||
-rw-r--r-- | nova/rpc/amqp.py (renamed from nova/rpc.py) | 23 | ||||
-rw-r--r-- | nova/rpc/common.py | 23 | ||||
-rw-r--r-- | nova/service.py | 28 | ||||
-rw-r--r-- | nova/test.py | 16 | ||||
-rw-r--r-- | nova/tests/test_adminapi.py | 2 | ||||
-rw-r--r-- | nova/tests/test_cloud.py | 32 | ||||
-rw-r--r-- | nova/tests/test_rpc.py | 61 | ||||
-rw-r--r-- | nova/tests/test_rpc_amqp.py | 68 | ||||
-rw-r--r-- | nova/tests/test_service.py | 170 | ||||
-rw-r--r-- | nova/tests/test_test.py | 13 |
12 files changed, 198 insertions, 305 deletions
@@ -105,3 +105,4 @@ Yoshiaki Tamura <yoshi@midokura.jp> Youcef Laribi <Youcef.Laribi@eu.citrix.com> Yuriy Taraday <yorik.sar@gmail.com> Zhixue Wu <Zhixue.Wu@citrix.com> +Zed Shaw <zedshaw@zedshaw.com> diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py new file mode 100644 index 000000000..bdf7f705b --- /dev/null +++ b/nova/rpc/__init__.py @@ -0,0 +1,66 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from nova.utils import import_object +from nova.rpc.common import RemoteError, LOG +from nova import flags + +FLAGS = flags.FLAGS +flags.DEFINE_string('rpc_backend', + 'nova.rpc.amqp', + "The messaging module to use, defaults to AMQP.") + +RPCIMPL = import_object(FLAGS.rpc_backend) + + +def create_connection(new=True): + return RPCIMPL.Connection.instance(new=True) + + +def create_consumer(conn, topic, proxy, fanout=False): + if fanout: + return RPCIMPL.FanoutAdapterConsumer( + connection=conn, + topic=topic, + proxy=proxy) + else: + return RPCIMPL.TopicAdapterConsumer( + connection=conn, + topic=topic, + proxy=proxy) + + +def create_consumer_set(conn, consumers): + return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers) + + +def call(context, topic, msg): + return RPCIMPL.call(context, topic, msg) + + +def cast(context, topic, msg): + return RPCIMPL.cast(context, topic, msg) + + +def fanout_cast(context, topic, msg): + return RPCIMPL.fanout_cast(context, topic, msg) + + +def multicall(context, topic, msg): + return RPCIMPL.multicall(context, topic, msg) diff --git a/nova/rpc.py b/nova/rpc/amqp.py index e2771ca88..61555795a 100644 --- a/nova/rpc.py +++ b/nova/rpc/amqp.py @@ -44,9 +44,7 @@ from nova import fakerabbit from nova import flags from nova import log as logging from nova import utils - - -LOG = logging.getLogger('nova.rpc') +from nova.rpc.common import RemoteError, LOG FLAGS = flags.FLAGS @@ -418,25 +416,6 @@ def msg_reply(msg_id, reply=None, failure=None): publisher.close() -class RemoteError(exception.Error): - """Signifies that a remote class has raised an exception. - - Containes a string representation of the type of the original exception, - the value of the original exception, and the traceback. These are - sent to the parent as a joined string so printing the exception - contains all of the relevent info. - - """ - - def __init__(self, exc_type, value, traceback): - self.exc_type = exc_type - self.value = value - self.traceback = traceback - super(RemoteError, self).__init__('%s %s\n%s' % (exc_type, - value, - traceback)) - - def _unpack_context(msg): """Unpack context from msg.""" context_dict = {} diff --git a/nova/rpc/common.py b/nova/rpc/common.py new file mode 100644 index 000000000..1d3065a83 --- /dev/null +++ b/nova/rpc/common.py @@ -0,0 +1,23 @@ +from nova import exception +from nova import log as logging + +LOG = logging.getLogger('nova.rpc') + + +class RemoteError(exception.Error): + """Signifies that a remote class has raised an exception. + + Containes a string representation of the type of the original exception, + the value of the original exception, and the traceback. These are + sent to the parent as a joined string so printing the exception + contains all of the relevent info. + + """ + + def __init__(self, exc_type, value, traceback): + self.exc_type = exc_type + self.value = value + self.traceback = traceback + super(RemoteError, self).__init__('%s %s\n%s' % (exc_type, + value, + traceback)) diff --git a/nova/service.py b/nova/service.py index 00e4f61e5..6e9eddc5a 100644 --- a/nova/service.py +++ b/nova/service.py @@ -149,26 +149,22 @@ class Service(object): if 'nova-compute' == self.binary: self.manager.update_available_resource(ctxt) - self.conn = rpc.Connection.instance(new=True) + self.conn = rpc.create_connection(new=True) logging.debug("Creating Consumer connection for Service %s" % self.topic) # Share this same connection for these Consumers - consumer_all = rpc.TopicAdapterConsumer( - connection=self.conn, - topic=self.topic, - proxy=self) - consumer_node = rpc.TopicAdapterConsumer( - connection=self.conn, - topic='%s.%s' % (self.topic, self.host), - proxy=self) - fanout = rpc.FanoutAdapterConsumer( - connection=self.conn, - topic=self.topic, - proxy=self) - consumer_set = rpc.ConsumerSet( - connection=self.conn, - consumer_list=[consumer_all, consumer_node, fanout]) + consumer_all = rpc.create_consumer(self.conn, self.topic, self, + fanout=False) + + node_topic = '%s.%s' % (self.topic, self.host) + consumer_node = rpc.create_consumer(self.conn, node_topic, self, + fanout=False) + + fanout = rpc.create_consumer(self.conn, self.topic, self, fanout=True) + + consumers = [consumer_all, consumer_node, fanout] + consumer_set = rpc.create_consumer_set(self.conn, consumers) # Wait forever, processing these consumers def _wait(): diff --git a/nova/test.py b/nova/test.py index 9790b0aa1..549aa6fcf 100644 --- a/nova/test.py +++ b/nova/test.py @@ -99,9 +99,7 @@ class TestCase(unittest.TestCase): self.flag_overrides = {} self.injected = [] self._services = [] - self._monkey_patch_attach() self._original_flags = FLAGS.FlagValuesDict() - rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size) def tearDown(self): """Runs after each test method to tear down test environment.""" @@ -126,9 +124,6 @@ class TestCase(unittest.TestCase): # Reset any overriden flags self.reset_flags() - # Reset our monkey-patches - rpc.Consumer.attach_to_eventlet = self.original_attach - # Stop any timers for x in self.injected: try: @@ -172,17 +167,6 @@ class TestCase(unittest.TestCase): self._services.append(svc) return svc - def _monkey_patch_attach(self): - self.original_attach = rpc.Consumer.attach_to_eventlet - - def _wrapped(inner_self): - rv = self.original_attach(inner_self) - self.injected.append(rv) - return rv - - _wrapped.func_name = self.original_attach.func_name - rpc.Consumer.attach_to_eventlet = _wrapped - # Useful assertions def assertDictMatch(self, d1, d2, approx_equal=False, tolerance=0.001): """Assert two dicts are equivalent. diff --git a/nova/tests/test_adminapi.py b/nova/tests/test_adminapi.py index 877cf4ea1..6bbe15f53 100644 --- a/nova/tests/test_adminapi.py +++ b/nova/tests/test_adminapi.py @@ -39,7 +39,7 @@ class AdminApiTestCase(test.TestCase): super(AdminApiTestCase, self).setUp() self.flags(connection_type='fake') - self.conn = rpc.Connection.instance() + self.conn = rpc.create_connection() # set up our cloud self.api = admin.AdminController() diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index e419e7a50..50df344d6 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -50,7 +50,7 @@ class CloudTestCase(test.TestCase): self.flags(connection_type='fake', stub_network=True) - self.conn = rpc.Connection.instance() + self.conn = rpc.create_connection() # set up our cloud self.cloud = cloud.CloudController() @@ -326,22 +326,15 @@ class CloudTestCase(test.TestCase): revoke = self.cloud.revoke_security_group_ingress self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs)) - def test_revoke_security_group_ingress_by_id(self): - kwargs = {'project_id': self.context.project_id, 'name': 'test'} - sec = db.security_group_create(self.context, kwargs) - authz = self.cloud.authorize_security_group_ingress - kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'} - authz(self.context, group_id=sec['id'], **kwargs) - revoke = self.cloud.revoke_security_group_ingress - self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs)) - - def test_authorize_security_group_ingress_by_id(self): + def test_authorize_revoke_security_group_ingress_by_id(self): sec = db.security_group_create(self.context, {'project_id': self.context.project_id, 'name': 'test'}) authz = self.cloud.authorize_security_group_ingress kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'} - self.assertTrue(authz(self.context, group_id=sec['id'], **kwargs)) + authz(self.context, group_id=sec['id'], **kwargs) + revoke = self.cloud.revoke_security_group_ingress + self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs)) def test_authorize_security_group_ingress_missing_protocol_params(self): sec = db.security_group_create(self.context, @@ -961,21 +954,6 @@ class CloudTestCase(test.TestCase): self._wait_for_running(ec2_instance_id) return ec2_instance_id - def test_rescue_unrescue_instance(self): - instance_id = self._run_instance( - image_id='ami-1', - instance_type=FLAGS.default_instance_type, - max_count=1) - self.cloud.rescue_instance(context=self.context, - instance_id=instance_id) - # NOTE(vish): This currently does no validation, it simply makes sure - # that the code path doesn't throw an exception. - self.cloud.unrescue_instance(context=self.context, - instance_id=instance_id) - # TODO(soren): We need this until we can stop polling in the rpc code - # for unit tests. - self.cloud.terminate_instances(self.context, [instance_id]) - def test_console_output(self): instance_id = self._run_instance( image_id='ami-1', diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index ffd748efe..2d2436175 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -33,11 +33,12 @@ LOG = logging.getLogger('nova.tests.rpc') class RpcTestCase(test.TestCase): def setUp(self): super(RpcTestCase, self).setUp() - self.conn = rpc.Connection.instance(True) + self.conn = rpc.create_connection(True) self.receiver = TestReceiver() - self.consumer = rpc.TopicAdapterConsumer(connection=self.conn, - topic='test', - proxy=self.receiver) + self.consumer = rpc.create_consumer(self.conn, + 'test', + self.receiver, + False) self.consumer.attach_to_eventlet() self.context = context.get_admin_context() @@ -129,6 +130,8 @@ class RpcTestCase(test.TestCase): """Calls echo in the passed queue""" LOG.debug(_("Nested received %(queue)s, %(value)s") % locals()) + # TODO: so, it will replay the context and use the same REQID? + # that's bizarre. ret = rpc.call(context, queue, {"method": "echo", @@ -137,10 +140,11 @@ class RpcTestCase(test.TestCase): return value nested = Nested() - conn = rpc.Connection.instance(True) - consumer = rpc.TopicAdapterConsumer(connection=conn, - topic='nested', - proxy=nested) + conn = rpc.create_connection(True) + consumer = rpc.create_consumer(conn, + 'nested', + nested, + False) consumer.attach_to_eventlet() value = 42 result = rpc.call(self.context, @@ -149,47 +153,6 @@ class RpcTestCase(test.TestCase): "value": value}}) self.assertEqual(value, result) - def test_connectionpool_single(self): - """Test that ConnectionPool recycles a single connection.""" - conn1 = rpc.ConnectionPool.get() - rpc.ConnectionPool.put(conn1) - conn2 = rpc.ConnectionPool.get() - rpc.ConnectionPool.put(conn2) - self.assertEqual(conn1, conn2) - - def test_connectionpool_double(self): - """Test that ConnectionPool returns and reuses separate connections. - - When called consecutively we should get separate connections and upon - returning them those connections should be reused for future calls - before generating a new connection. - - """ - conn1 = rpc.ConnectionPool.get() - conn2 = rpc.ConnectionPool.get() - - self.assertNotEqual(conn1, conn2) - rpc.ConnectionPool.put(conn1) - rpc.ConnectionPool.put(conn2) - - conn3 = rpc.ConnectionPool.get() - conn4 = rpc.ConnectionPool.get() - self.assertEqual(conn1, conn3) - self.assertEqual(conn2, conn4) - - def test_connectionpool_limit(self): - """Test connection pool limit and connection uniqueness.""" - max_size = FLAGS.rpc_conn_pool_size - conns = [] - - for i in xrange(max_size): - conns.append(rpc.ConnectionPool.get()) - - self.assertFalse(rpc.ConnectionPool.free_items) - self.assertEqual(rpc.ConnectionPool.current_size, - rpc.ConnectionPool.max_size) - self.assertEqual(len(set(conns)), max_size) - class TestReceiver(object): """Simple Proxy class so the consumer has methods to call. diff --git a/nova/tests/test_rpc_amqp.py b/nova/tests/test_rpc_amqp.py new file mode 100644 index 000000000..d29f7ae32 --- /dev/null +++ b/nova/tests/test_rpc_amqp.py @@ -0,0 +1,68 @@ +from nova import context +from nova import flags +from nova import log as logging +from nova import rpc +from nova.rpc import amqp +from nova import test + + +FLAGS = flags.FLAGS +LOG = logging.getLogger('nova.tests.rpc') + + +class RpcAMQPTestCase(test.TestCase): + def setUp(self): + super(RpcAMQPTestCase, self).setUp() + self.conn = rpc.create_connection(True) + self.receiver = TestReceiver() + self.consumer = rpc.create_consumer(self.conn, + 'test', + self.receiver, + False) + self.consumer.attach_to_eventlet() + self.context = context.get_admin_context() + + def test_connectionpool_single(self): + """Test that ConnectionPool recycles a single connection.""" + conn1 = amqp.ConnectionPool.get() + amqp.ConnectionPool.put(conn1) + conn2 = amqp.ConnectionPool.get() + amqp.ConnectionPool.put(conn2) + self.assertEqual(conn1, conn2) + + +class TestReceiver(object): + """Simple Proxy class so the consumer has methods to call. + + Uses static methods because we aren't actually storing any state. + + """ + + @staticmethod + def echo(context, value): + """Simply returns whatever value is sent in.""" + LOG.debug(_("Received %s"), value) + return value + + @staticmethod + def context(context, value): + """Returns dictionary version of context.""" + LOG.debug(_("Received %s"), context) + return context.to_dict() + + @staticmethod + def echo_three_times(context, value): + context.reply(value) + context.reply(value + 1) + context.reply(value + 2) + + @staticmethod + def echo_three_times_yield(context, value): + yield value + yield value + 1 + yield value + 2 + + @staticmethod + def fail(context, value): + """Raises an exception with the value sent in.""" + raise Exception(value) diff --git a/nova/tests/test_service.py b/nova/tests/test_service.py index f45f76b73..bbf47b50f 100644 --- a/nova/tests/test_service.py +++ b/nova/tests/test_service.py @@ -109,103 +109,8 @@ class ServiceTestCase(test.TestCase): # the looping calls are created in StartService. app = service.Service.create(host=host, binary=binary, topic=topic) - self.mox.StubOutWithMock(service.rpc.Connection, 'instance') - service.rpc.Connection.instance(new=mox.IgnoreArg()) - - self.mox.StubOutWithMock(rpc, - 'TopicAdapterConsumer', - use_mock_anything=True) - self.mox.StubOutWithMock(rpc, - 'FanoutAdapterConsumer', - use_mock_anything=True) - - self.mox.StubOutWithMock(rpc, - 'ConsumerSet', - use_mock_anything=True) - - rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), - topic=topic, - proxy=mox.IsA(service.Service)).AndReturn( - rpc.TopicAdapterConsumer) - - rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), - topic='%s.%s' % (topic, host), - proxy=mox.IsA(service.Service)).AndReturn( - rpc.TopicAdapterConsumer) - - rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(), - topic=topic, - proxy=mox.IsA(service.Service)).AndReturn( - rpc.FanoutAdapterConsumer) - - def wait_func(self, limit=None): - return None - - mock_cset = self.mox.CreateMock(rpc.ConsumerSet, - {'wait': wait_func}) - rpc.ConsumerSet(connection=mox.IgnoreArg(), - consumer_list=mox.IsA(list)).AndReturn(mock_cset) - wait_func(mox.IgnoreArg()) - - service_create = {'host': host, - 'binary': binary, - 'topic': topic, - 'report_count': 0, - 'availability_zone': 'nova'} - service_ref = {'host': host, - 'binary': binary, - 'report_count': 0, - 'id': 1} - - service.db.service_get_by_args(mox.IgnoreArg(), - host, - binary).AndRaise(exception.NotFound()) - service.db.service_create(mox.IgnoreArg(), - service_create).AndReturn(service_ref) - self.mox.ReplayAll() - - app.start() - app.stop() self.assert_(app) - # We're testing sort of weird behavior in how report_state decides - # whether it is disconnected, it looks for a variable on itself called - # 'model_disconnected' and report_state doesn't really do much so this - # these are mostly just for coverage - def test_report_state_no_service(self): - host = 'foo' - binary = 'bar' - topic = 'test' - service_create = {'host': host, - 'binary': binary, - 'topic': topic, - 'report_count': 0, - 'availability_zone': 'nova'} - service_ref = {'host': host, - 'binary': binary, - 'topic': topic, - 'report_count': 0, - 'availability_zone': 'nova', - 'id': 1} - - service.db.service_get_by_args(mox.IgnoreArg(), - host, - binary).AndRaise(exception.NotFound()) - service.db.service_create(mox.IgnoreArg(), - service_create).AndReturn(service_ref) - service.db.service_get(mox.IgnoreArg(), - service_ref['id']).AndReturn(service_ref) - service.db.service_update(mox.IgnoreArg(), service_ref['id'], - mox.ContainsKeyValue('report_count', 1)) - - self.mox.ReplayAll() - serv = service.Service(host, - binary, - topic, - 'nova.tests.test_service.FakeManager') - serv.start() - serv.report_state() - def test_report_state_newly_disconnected(self): host = 'foo' binary = 'bar' @@ -276,81 +181,6 @@ class ServiceTestCase(test.TestCase): self.assert_(not serv.model_disconnected) - def test_compute_can_update_available_resource(self): - """Confirm compute updates their record of compute-service table.""" - host = 'foo' - binary = 'nova-compute' - topic = 'compute' - - # Any mocks are not working without UnsetStubs() here. - self.mox.UnsetStubs() - ctxt = context.get_admin_context() - service_ref = db.service_create(ctxt, {'host': host, - 'binary': binary, - 'topic': topic}) - serv = service.Service(host, - binary, - topic, - 'nova.compute.manager.ComputeManager') - - # This testcase want to test calling update_available_resource. - # No need to call periodic call, then below variable must be set 0. - serv.report_interval = 0 - serv.periodic_interval = 0 - - # Creating mocks - self.mox.StubOutWithMock(service.rpc.Connection, 'instance') - service.rpc.Connection.instance(new=mox.IgnoreArg()) - - self.mox.StubOutWithMock(rpc, - 'TopicAdapterConsumer', - use_mock_anything=True) - self.mox.StubOutWithMock(rpc, - 'FanoutAdapterConsumer', - use_mock_anything=True) - - self.mox.StubOutWithMock(rpc, - 'ConsumerSet', - use_mock_anything=True) - - rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), - topic=topic, - proxy=mox.IsA(service.Service)).AndReturn( - rpc.TopicAdapterConsumer) - - rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), - topic='%s.%s' % (topic, host), - proxy=mox.IsA(service.Service)).AndReturn( - rpc.TopicAdapterConsumer) - - rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(), - topic=topic, - proxy=mox.IsA(service.Service)).AndReturn( - rpc.FanoutAdapterConsumer) - - def wait_func(self, limit=None): - return None - - mock_cset = self.mox.CreateMock(rpc.ConsumerSet, - {'wait': wait_func}) - rpc.ConsumerSet(connection=mox.IgnoreArg(), - consumer_list=mox.IsA(list)).AndReturn(mock_cset) - wait_func(mox.IgnoreArg()) - - self.mox.StubOutWithMock(serv.manager.driver, - 'update_available_resource') - serv.manager.driver.update_available_resource(mox.IgnoreArg(), host) - - # Just doing start()-stop(), not confirm new db record is created, - # because update_available_resource() works only in - # libvirt environment. This testcase confirms - # update_available_resource() is called. Otherwise, mox complains. - self.mox.ReplayAll() - serv.start() - serv.stop() - - db.service_destroy(ctxt, service_ref['id']) - class TestWSGIService(test.TestCase): diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py index 35c838065..64f11fa45 100644 --- a/nova/tests/test_test.py +++ b/nova/tests/test_test.py @@ -33,8 +33,13 @@ class IsolationTestCase(test.TestCase): self.start_service('compute') def test_rpc_consumer_isolation(self): - connection = rpc.Connection.instance(new=True) - consumer = rpc.TopicAdapterConsumer(connection, topic='compute') - consumer.register_callback( - lambda x, y: self.fail('I should never be called')) + class NeverCalled(object): + + def __getattribute__(*args): + assert False, "I should never get called." + + connection = rpc.create_connection(new=True) + proxy = NeverCalled() + consumer = rpc.create_consumer(connection, 'compute', + proxy, fanout=False) consumer.attach_to_eventlet() |