diff options
| author | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-08-14 23:37:51 +0000 |
|---|---|---|
| committer | Tarmac <> | 2010-08-14 23:37:51 +0000 |
| commit | 1d2351f7bfd997be8b2a25ec74999a17b2b508ee (patch) | |
| tree | f77242c85ba45be17470ec9dfaad6995a514902c | |
| parent | 2dc2c9dc71bb50a5e322c8fdc5af3f831d41ddba (diff) | |
| parent | 517e887a804905a20cee0ced0c37b93432814f47 (diff) | |
| download | nova-1d2351f7bfd997be8b2a25ec74999a17b2b508ee.tar.gz nova-1d2351f7bfd997be8b2a25ec74999a17b2b508ee.tar.xz nova-1d2351f7bfd997be8b2a25ec74999a17b2b508ee.zip | |
Catches and logs exceptions for rpc calls and raises a RemoteError exception on the caller side.
| -rw-r--r-- | nova/endpoint/cloud.py | 11 | ||||
| -rw-r--r-- | nova/rpc.py | 143 | ||||
| -rw-r--r-- | nova/tests/rpc_unittest.py | 85 | ||||
| -rw-r--r-- | run_tests.py | 1 |
4 files changed, 202 insertions, 38 deletions
diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py index dcabad710..5366acec7 100644 --- a/nova/endpoint/cloud.py +++ b/nova/endpoint/cloud.py @@ -306,7 +306,7 @@ class CloudController(object): "user_id": context.user.id, "project_id": context.project.id}}) # NOTE(vish): rpc returned value is in the result key in the dictionary - volume = self._get_volume(context, result['result']) + volume = self._get_volume(context, result) defer.returnValue({'volumeSet': [self.format_volume(context, volume)]}) def _get_address(self, context, public_ip): @@ -477,11 +477,10 @@ class CloudController(object): @defer.inlineCallbacks def allocate_address(self, context, **kwargs): network_topic = yield self._get_network_topic(context) - alloc_result = yield rpc.call(network_topic, + public_ip = yield rpc.call(network_topic, {"method": "allocate_elastic_ip", "args": {"user_id": context.user.id, "project_id": context.project.id}}) - public_ip = alloc_result['result'] defer.returnValue({'addressSet': [{'publicIp': public_ip}]}) @rbac.allow('netadmin') @@ -522,11 +521,10 @@ class CloudController(object): """Retrieves the network host for a project""" host = network_service.get_host_for_project(context.project.id) if not host: - result = yield rpc.call(FLAGS.network_topic, + host = yield rpc.call(FLAGS.network_topic, {"method": "set_network_host", "args": {"user_id": context.user.id, "project_id": context.project.id}}) - host = result['result'] defer.returnValue('%s.%s' %(FLAGS.network_topic, host)) @rbac.allow('projectmanager', 'sysadmin') @@ -570,14 +568,13 @@ class CloudController(object): if image_id == FLAGS.vpn_image_id: is_vpn = True inst = self.instdir.new() - allocate_result = yield rpc.call(network_topic, + allocate_data = yield rpc.call(network_topic, {"method": "allocate_fixed_ip", "args": {"user_id": context.user.id, "project_id": context.project.id, "security_group": security_group, "is_vpn": is_vpn, "hostname": inst.instance_id}}) - allocate_data = allocate_result['result'] inst['image_id'] = image_id inst['kernel_id'] = kernel_id inst['ramdisk_id'] = ramdisk_id diff --git a/nova/rpc.py b/nova/rpc.py index 2a550c3ae..4ac546c2a 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -21,14 +21,13 @@ AMQP-based RPC. Queues have consumers and publishers. No fan-out support yet. """ -from carrot import connection +from carrot import connection as carrot_connection from carrot import messaging import json import logging import sys import uuid from twisted.internet import defer -from twisted.internet import reactor from twisted.internet import task from nova import exception @@ -39,13 +38,15 @@ from nova import flags FLAGS = flags.FLAGS -_log = logging.getLogger('amqplib') -_log.setLevel(logging.WARN) +LOG = logging.getLogger('amqplib') +LOG.setLevel(logging.DEBUG) -class Connection(connection.BrokerConnection): +class Connection(carrot_connection.BrokerConnection): + """Connection instance object""" @classmethod def instance(cls): + """Returns the instance""" if not hasattr(cls, '_instance'): params = dict(hostname=FLAGS.rabbit_host, port=FLAGS.rabbit_port, @@ -56,18 +57,33 @@ class Connection(connection.BrokerConnection): if FLAGS.fake_rabbit: params['backend_cls'] = fakerabbit.Backend + # NOTE(vish): magic is fun! + # pylint: disable=W0142 cls._instance = cls(**params) return cls._instance @classmethod def recreate(cls): + """Recreates the connection instance + + This is necessary to recover from some network errors/disconnects""" del cls._instance return cls.instance() + class Consumer(messaging.Consumer): + """Consumer base class + + Contains methods for connecting the fetch method to async loops + """ + def __init__(self, *args, **kwargs): + self.failed_connection = False + super(Consumer, self).__init__(*args, **kwargs) + # TODO(termie): it would be nice to give these some way of automatically # cleaning up after themselves def attach_to_tornado(self, io_inst=None): + """Attach a callback to tornado that fires 10 times a second""" from tornado import ioloop if io_inst is None: io_inst = ioloop.IOLoop.instance() @@ -79,33 +95,44 @@ class Consumer(messaging.Consumer): attachToTornado = attach_to_tornado - def fetch(self, *args, **kwargs): + def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): + """Wraps the parent fetch with some logic for failed connections""" # TODO(vish): the logic for failed connections and logging should be # refactored into some sort of connection manager object try: - if getattr(self, 'failed_connection', False): - # attempt to reconnect + if self.failed_connection: + # NOTE(vish): conn is defined in the parent class, we can + # recreate it as long as we create the backend too + # pylint: disable=W0201 self.conn = Connection.recreate() self.backend = self.conn.create_backend() - super(Consumer, self).fetch(*args, **kwargs) - if getattr(self, 'failed_connection', False): + super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks) + if self.failed_connection: logging.error("Reconnected to queue") self.failed_connection = False - except Exception, ex: - if not getattr(self, 'failed_connection', False): + # NOTE(vish): This is catching all errors because we really don't + # exceptions to be logged 10 times a second if some + # persistent failure occurs. + except Exception: # pylint: disable=W0703 + if not self.failed_connection: logging.exception("Failed to fetch message from queue") self.failed_connection = True def attach_to_twisted(self): + """Attach a callback to twisted that fires 10 times a second""" loop = task.LoopingCall(self.fetch, enable_callbacks=True) loop.start(interval=0.1) + class Publisher(messaging.Publisher): + """Publisher base class""" pass class TopicConsumer(Consumer): + """Consumes messages on a specific topic""" exchange_type = "topic" + def __init__(self, connection=None, topic="broadcast"): self.queue = topic self.routing_key = topic @@ -115,14 +142,24 @@ class TopicConsumer(Consumer): class AdapterConsumer(TopicConsumer): + """Calls methods on a proxy object based on method and args""" def __init__(self, connection=None, topic="broadcast", proxy=None): - _log.debug('Initing the Adapter Consumer for %s' % (topic)) + LOG.debug('Initing the Adapter Consumer for %s' % (topic)) self.proxy = proxy - super(AdapterConsumer, self).__init__(connection=connection, topic=topic) + super(AdapterConsumer, self).__init__(connection=connection, + topic=topic) @exception.wrap_exception def receive(self, message_data, message): - _log.debug('received %s' % (message_data)) + """Magically looks for a method on the proxy object and calls it + + Message data should be a dictionary with two keys: + method: string representing the method to call + args: dictionary of arg: value + + Example: {'method': 'echo', 'args': {'value': 42}} + """ + LOG.debug('received %s' % (message_data)) msg_id = message_data.pop('_msg_id', None) method = message_data.get('method') @@ -133,21 +170,25 @@ class AdapterConsumer(TopicConsumer): # messages stay in the queue indefinitely, so for now # we just log the message and send an error string # back to the caller - _log.warn('no method for message: %s' % (message_data)) + LOG.warn('no method for message: %s' % (message_data)) msg_reply(msg_id, 'No method for message: %s' % message_data) return node_func = getattr(self.proxy, str(method)) node_args = dict((str(k), v) for k, v in args.iteritems()) + # NOTE(vish): magic is fun! + # pylint: disable=W0142 d = defer.maybeDeferred(node_func, **node_args) if msg_id: - d.addCallback(lambda rval: msg_reply(msg_id, rval)) - d.addErrback(lambda e: msg_reply(msg_id, str(e))) + d.addCallback(lambda rval: msg_reply(msg_id, rval, None)) + d.addErrback(lambda e: msg_reply(msg_id, None, e)) return class TopicPublisher(Publisher): + """Publishes messages on a specific topic""" exchange_type = "topic" + def __init__(self, connection=None, topic="broadcast"): self.routing_key = topic self.exchange = FLAGS.control_exchange @@ -156,7 +197,9 @@ class TopicPublisher(Publisher): class DirectConsumer(Consumer): + """Consumes messages directly on a channel specified by msg_id""" exchange_type = "direct" + def __init__(self, connection=None, msg_id=None): self.queue = msg_id self.routing_key = msg_id @@ -166,7 +209,9 @@ class DirectConsumer(Consumer): class DirectPublisher(Publisher): + """Publishes messages directly on a channel specified by msg_id""" exchange_type = "direct" + def __init__(self, connection=None, msg_id=None): self.routing_key = msg_id self.exchange = msg_id @@ -174,32 +219,63 @@ class DirectPublisher(Publisher): super(DirectPublisher, self).__init__(connection=connection) -def msg_reply(msg_id, reply): +def msg_reply(msg_id, reply=None, failure=None): + """Sends a reply or an error on the channel signified by msg_id + + failure should be a twisted failure object""" + if failure: + message = failure.getErrorMessage() + traceback = failure.getTraceback() + logging.error("Returning exception %s to caller", message) + logging.error(traceback) + failure = (failure.type.__name__, str(failure.value), traceback) conn = Connection.instance() publisher = DirectPublisher(connection=conn, msg_id=msg_id) - try: - publisher.send({'result': reply}) + publisher.send({'result': reply, 'failure': failure}) except TypeError: publisher.send( {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()) - }) + for k, v in reply.__dict__.iteritems()), + 'failure': failure}) publisher.close() +class RemoteError(exception.Error): + """Signifies that a remote class has raised an exception + + Containes a string representation of the type of the original exception, + the value of the original exception, and the traceback. These are + sent to the parent as a joined string so printing the exception + contains all of the relevent info.""" + def __init__(self, exc_type, value, traceback): + self.exc_type = exc_type + self.value = value + self.traceback = traceback + super(RemoteError, self).__init__("%s %s\n%s" % (exc_type, + value, + traceback)) + + def call(topic, msg): - _log.debug("Making asynchronous call...") + """Sends a message on a topic and wait for a response""" + LOG.debug("Making asynchronous call...") msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) - _log.debug("MSG_ID is %s" % (msg_id)) + LOG.debug("MSG_ID is %s" % (msg_id)) conn = Connection.instance() d = defer.Deferred() consumer = DirectConsumer(connection=conn, msg_id=msg_id) + def deferred_receive(data, message): + """Acks message and callbacks or errbacks""" message.ack() - d.callback(data) + if data['failure']: + return d.errback(RemoteError(*data['failure'])) + else: + return d.callback(data['result']) + consumer.register_callback(deferred_receive) injected = consumer.attach_to_tornado() @@ -213,7 +289,8 @@ def call(topic, msg): def cast(topic, msg): - _log.debug("Making asynchronous cast...") + """Sends a message on a topic without waiting for a response""" + LOG.debug("Making asynchronous cast...") conn = Connection.instance() publisher = TopicPublisher(connection=conn, topic=topic) publisher.send(msg) @@ -221,16 +298,18 @@ def cast(topic, msg): def generic_response(message_data, message): - _log.debug('response %s', message_data) + """Logs a result and exits""" + LOG.debug('response %s', message_data) message.ack() sys.exit(0) def send_message(topic, message, wait=True): + """Sends a message for testing""" msg_id = uuid.uuid4().hex message.update({'_msg_id': msg_id}) - _log.debug('topic is %s', topic) - _log.debug('message %s', message) + LOG.debug('topic is %s', topic) + LOG.debug('message %s', message) if wait: consumer = messaging.Consumer(connection=Connection.instance(), @@ -253,6 +332,8 @@ def send_message(topic, message, wait=True): consumer.wait() -# TODO: Replace with a docstring test if __name__ == "__main__": + # NOTE(vish): you can send messages from the command line using + # topic and a json sting representing a dictionary + # for the method send_message(sys.argv[1], json.loads(sys.argv[2])) diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py new file mode 100644 index 000000000..764a97416 --- /dev/null +++ b/nova/tests/rpc_unittest.py @@ -0,0 +1,85 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Unit Tests for remote procedure calls using queue +""" +import logging + +from twisted.internet import defer + +from nova import flags +from nova import rpc +from nova import test + + +FLAGS = flags.FLAGS + + +class RpcTestCase(test.BaseTestCase): + """Test cases for rpc""" + def setUp(self): # pylint: disable=C0103 + super(RpcTestCase, self).setUp() + self.conn = rpc.Connection.instance() + self.receiver = TestReceiver() + self.consumer = rpc.AdapterConsumer(connection=self.conn, + topic='test', + proxy=self.receiver) + + self.injected.append(self.consumer.attach_to_tornado(self.ioloop)) + + def test_call_succeed(self): + """Get a value through rpc call""" + value = 42 + result = yield rpc.call('test', {"method": "echo", + "args": {"value": value}}) + self.assertEqual(value, result) + + def test_call_exception(self): + """Test that exception gets passed back properly + + rpc.call returns a RemoteError object. The value of the + exception is converted to a string, so we convert it back + to an int in the test. + """ + value = 42 + self.assertFailure(rpc.call('test', {"method": "fail", + "args": {"value": value}}), + rpc.RemoteError) + try: + yield rpc.call('test', {"method": "fail", + "args": {"value": value}}) + self.fail("should have thrown rpc.RemoteError") + except rpc.RemoteError as exc: + self.assertEqual(int(exc.value), value) + + +class TestReceiver(object): + """Simple Proxy class so the consumer has methods to call + + Uses static methods because we aren't actually storing any state""" + + @staticmethod + def echo(value): + """Simply returns whatever value is sent in""" + logging.debug("Received %s", value) + return defer.succeed(value) + + @staticmethod + def fail(value): + """Raises an exception with the value sent in""" + raise Exception(value) diff --git a/run_tests.py b/run_tests.py index 7fe6e73ec..d90ac8175 100644 --- a/run_tests.py +++ b/run_tests.py @@ -59,6 +59,7 @@ from nova.tests.model_unittest import * from nova.tests.network_unittest import * from nova.tests.objectstore_unittest import * from nova.tests.process_unittest import * +from nova.tests.rpc_unittest import * from nova.tests.validator_unittest import * from nova.tests.volume_unittest import * |
