summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@gmail.com>2010-08-14 23:37:51 +0000
committerTarmac <>2010-08-14 23:37:51 +0000
commit1d2351f7bfd997be8b2a25ec74999a17b2b508ee (patch)
treef77242c85ba45be17470ec9dfaad6995a514902c
parent2dc2c9dc71bb50a5e322c8fdc5af3f831d41ddba (diff)
parent517e887a804905a20cee0ced0c37b93432814f47 (diff)
downloadnova-1d2351f7bfd997be8b2a25ec74999a17b2b508ee.tar.gz
nova-1d2351f7bfd997be8b2a25ec74999a17b2b508ee.tar.xz
nova-1d2351f7bfd997be8b2a25ec74999a17b2b508ee.zip
Catches and logs exceptions for rpc calls and raises a RemoteError exception on the caller side.
-rw-r--r--nova/endpoint/cloud.py11
-rw-r--r--nova/rpc.py143
-rw-r--r--nova/tests/rpc_unittest.py85
-rw-r--r--run_tests.py1
4 files changed, 202 insertions, 38 deletions
diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py
index dcabad710..5366acec7 100644
--- a/nova/endpoint/cloud.py
+++ b/nova/endpoint/cloud.py
@@ -306,7 +306,7 @@ class CloudController(object):
"user_id": context.user.id,
"project_id": context.project.id}})
# NOTE(vish): rpc returned value is in the result key in the dictionary
- volume = self._get_volume(context, result['result'])
+ volume = self._get_volume(context, result)
defer.returnValue({'volumeSet': [self.format_volume(context, volume)]})
def _get_address(self, context, public_ip):
@@ -477,11 +477,10 @@ class CloudController(object):
@defer.inlineCallbacks
def allocate_address(self, context, **kwargs):
network_topic = yield self._get_network_topic(context)
- alloc_result = yield rpc.call(network_topic,
+ public_ip = yield rpc.call(network_topic,
{"method": "allocate_elastic_ip",
"args": {"user_id": context.user.id,
"project_id": context.project.id}})
- public_ip = alloc_result['result']
defer.returnValue({'addressSet': [{'publicIp': public_ip}]})
@rbac.allow('netadmin')
@@ -522,11 +521,10 @@ class CloudController(object):
"""Retrieves the network host for a project"""
host = network_service.get_host_for_project(context.project.id)
if not host:
- result = yield rpc.call(FLAGS.network_topic,
+ host = yield rpc.call(FLAGS.network_topic,
{"method": "set_network_host",
"args": {"user_id": context.user.id,
"project_id": context.project.id}})
- host = result['result']
defer.returnValue('%s.%s' %(FLAGS.network_topic, host))
@rbac.allow('projectmanager', 'sysadmin')
@@ -570,14 +568,13 @@ class CloudController(object):
if image_id == FLAGS.vpn_image_id:
is_vpn = True
inst = self.instdir.new()
- allocate_result = yield rpc.call(network_topic,
+ allocate_data = yield rpc.call(network_topic,
{"method": "allocate_fixed_ip",
"args": {"user_id": context.user.id,
"project_id": context.project.id,
"security_group": security_group,
"is_vpn": is_vpn,
"hostname": inst.instance_id}})
- allocate_data = allocate_result['result']
inst['image_id'] = image_id
inst['kernel_id'] = kernel_id
inst['ramdisk_id'] = ramdisk_id
diff --git a/nova/rpc.py b/nova/rpc.py
index 2a550c3ae..4ac546c2a 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -21,14 +21,13 @@ AMQP-based RPC. Queues have consumers and publishers.
No fan-out support yet.
"""
-from carrot import connection
+from carrot import connection as carrot_connection
from carrot import messaging
import json
import logging
import sys
import uuid
from twisted.internet import defer
-from twisted.internet import reactor
from twisted.internet import task
from nova import exception
@@ -39,13 +38,15 @@ from nova import flags
FLAGS = flags.FLAGS
-_log = logging.getLogger('amqplib')
-_log.setLevel(logging.WARN)
+LOG = logging.getLogger('amqplib')
+LOG.setLevel(logging.DEBUG)
-class Connection(connection.BrokerConnection):
+class Connection(carrot_connection.BrokerConnection):
+ """Connection instance object"""
@classmethod
def instance(cls):
+ """Returns the instance"""
if not hasattr(cls, '_instance'):
params = dict(hostname=FLAGS.rabbit_host,
port=FLAGS.rabbit_port,
@@ -56,18 +57,33 @@ class Connection(connection.BrokerConnection):
if FLAGS.fake_rabbit:
params['backend_cls'] = fakerabbit.Backend
+ # NOTE(vish): magic is fun!
+ # pylint: disable=W0142
cls._instance = cls(**params)
return cls._instance
@classmethod
def recreate(cls):
+ """Recreates the connection instance
+
+ This is necessary to recover from some network errors/disconnects"""
del cls._instance
return cls.instance()
+
class Consumer(messaging.Consumer):
+ """Consumer base class
+
+ Contains methods for connecting the fetch method to async loops
+ """
+ def __init__(self, *args, **kwargs):
+ self.failed_connection = False
+ super(Consumer, self).__init__(*args, **kwargs)
+
# TODO(termie): it would be nice to give these some way of automatically
# cleaning up after themselves
def attach_to_tornado(self, io_inst=None):
+ """Attach a callback to tornado that fires 10 times a second"""
from tornado import ioloop
if io_inst is None:
io_inst = ioloop.IOLoop.instance()
@@ -79,33 +95,44 @@ class Consumer(messaging.Consumer):
attachToTornado = attach_to_tornado
- def fetch(self, *args, **kwargs):
+ def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
+ """Wraps the parent fetch with some logic for failed connections"""
# TODO(vish): the logic for failed connections and logging should be
# refactored into some sort of connection manager object
try:
- if getattr(self, 'failed_connection', False):
- # attempt to reconnect
+ if self.failed_connection:
+ # NOTE(vish): conn is defined in the parent class, we can
+ # recreate it as long as we create the backend too
+ # pylint: disable=W0201
self.conn = Connection.recreate()
self.backend = self.conn.create_backend()
- super(Consumer, self).fetch(*args, **kwargs)
- if getattr(self, 'failed_connection', False):
+ super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
+ if self.failed_connection:
logging.error("Reconnected to queue")
self.failed_connection = False
- except Exception, ex:
- if not getattr(self, 'failed_connection', False):
+ # NOTE(vish): This is catching all errors because we really don't
+ # exceptions to be logged 10 times a second if some
+ # persistent failure occurs.
+ except Exception: # pylint: disable=W0703
+ if not self.failed_connection:
logging.exception("Failed to fetch message from queue")
self.failed_connection = True
def attach_to_twisted(self):
+ """Attach a callback to twisted that fires 10 times a second"""
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
loop.start(interval=0.1)
+
class Publisher(messaging.Publisher):
+ """Publisher base class"""
pass
class TopicConsumer(Consumer):
+ """Consumes messages on a specific topic"""
exchange_type = "topic"
+
def __init__(self, connection=None, topic="broadcast"):
self.queue = topic
self.routing_key = topic
@@ -115,14 +142,24 @@ class TopicConsumer(Consumer):
class AdapterConsumer(TopicConsumer):
+ """Calls methods on a proxy object based on method and args"""
def __init__(self, connection=None, topic="broadcast", proxy=None):
- _log.debug('Initing the Adapter Consumer for %s' % (topic))
+ LOG.debug('Initing the Adapter Consumer for %s' % (topic))
self.proxy = proxy
- super(AdapterConsumer, self).__init__(connection=connection, topic=topic)
+ super(AdapterConsumer, self).__init__(connection=connection,
+ topic=topic)
@exception.wrap_exception
def receive(self, message_data, message):
- _log.debug('received %s' % (message_data))
+ """Magically looks for a method on the proxy object and calls it
+
+ Message data should be a dictionary with two keys:
+ method: string representing the method to call
+ args: dictionary of arg: value
+
+ Example: {'method': 'echo', 'args': {'value': 42}}
+ """
+ LOG.debug('received %s' % (message_data))
msg_id = message_data.pop('_msg_id', None)
method = message_data.get('method')
@@ -133,21 +170,25 @@ class AdapterConsumer(TopicConsumer):
# messages stay in the queue indefinitely, so for now
# we just log the message and send an error string
# back to the caller
- _log.warn('no method for message: %s' % (message_data))
+ LOG.warn('no method for message: %s' % (message_data))
msg_reply(msg_id, 'No method for message: %s' % message_data)
return
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
+ # NOTE(vish): magic is fun!
+ # pylint: disable=W0142
d = defer.maybeDeferred(node_func, **node_args)
if msg_id:
- d.addCallback(lambda rval: msg_reply(msg_id, rval))
- d.addErrback(lambda e: msg_reply(msg_id, str(e)))
+ d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
+ d.addErrback(lambda e: msg_reply(msg_id, None, e))
return
class TopicPublisher(Publisher):
+ """Publishes messages on a specific topic"""
exchange_type = "topic"
+
def __init__(self, connection=None, topic="broadcast"):
self.routing_key = topic
self.exchange = FLAGS.control_exchange
@@ -156,7 +197,9 @@ class TopicPublisher(Publisher):
class DirectConsumer(Consumer):
+ """Consumes messages directly on a channel specified by msg_id"""
exchange_type = "direct"
+
def __init__(self, connection=None, msg_id=None):
self.queue = msg_id
self.routing_key = msg_id
@@ -166,7 +209,9 @@ class DirectConsumer(Consumer):
class DirectPublisher(Publisher):
+ """Publishes messages directly on a channel specified by msg_id"""
exchange_type = "direct"
+
def __init__(self, connection=None, msg_id=None):
self.routing_key = msg_id
self.exchange = msg_id
@@ -174,32 +219,63 @@ class DirectPublisher(Publisher):
super(DirectPublisher, self).__init__(connection=connection)
-def msg_reply(msg_id, reply):
+def msg_reply(msg_id, reply=None, failure=None):
+ """Sends a reply or an error on the channel signified by msg_id
+
+ failure should be a twisted failure object"""
+ if failure:
+ message = failure.getErrorMessage()
+ traceback = failure.getTraceback()
+ logging.error("Returning exception %s to caller", message)
+ logging.error(traceback)
+ failure = (failure.type.__name__, str(failure.value), traceback)
conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
-
try:
- publisher.send({'result': reply})
+ publisher.send({'result': reply, 'failure': failure})
except TypeError:
publisher.send(
{'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems())
- })
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure})
publisher.close()
+class RemoteError(exception.Error):
+ """Signifies that a remote class has raised an exception
+
+ Containes a string representation of the type of the original exception,
+ the value of the original exception, and the traceback. These are
+ sent to the parent as a joined string so printing the exception
+ contains all of the relevent info."""
+ def __init__(self, exc_type, value, traceback):
+ self.exc_type = exc_type
+ self.value = value
+ self.traceback = traceback
+ super(RemoteError, self).__init__("%s %s\n%s" % (exc_type,
+ value,
+ traceback))
+
+
def call(topic, msg):
- _log.debug("Making asynchronous call...")
+ """Sends a message on a topic and wait for a response"""
+ LOG.debug("Making asynchronous call...")
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
- _log.debug("MSG_ID is %s" % (msg_id))
+ LOG.debug("MSG_ID is %s" % (msg_id))
conn = Connection.instance()
d = defer.Deferred()
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
+
def deferred_receive(data, message):
+ """Acks message and callbacks or errbacks"""
message.ack()
- d.callback(data)
+ if data['failure']:
+ return d.errback(RemoteError(*data['failure']))
+ else:
+ return d.callback(data['result'])
+
consumer.register_callback(deferred_receive)
injected = consumer.attach_to_tornado()
@@ -213,7 +289,8 @@ def call(topic, msg):
def cast(topic, msg):
- _log.debug("Making asynchronous cast...")
+ """Sends a message on a topic without waiting for a response"""
+ LOG.debug("Making asynchronous cast...")
conn = Connection.instance()
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
@@ -221,16 +298,18 @@ def cast(topic, msg):
def generic_response(message_data, message):
- _log.debug('response %s', message_data)
+ """Logs a result and exits"""
+ LOG.debug('response %s', message_data)
message.ack()
sys.exit(0)
def send_message(topic, message, wait=True):
+ """Sends a message for testing"""
msg_id = uuid.uuid4().hex
message.update({'_msg_id': msg_id})
- _log.debug('topic is %s', topic)
- _log.debug('message %s', message)
+ LOG.debug('topic is %s', topic)
+ LOG.debug('message %s', message)
if wait:
consumer = messaging.Consumer(connection=Connection.instance(),
@@ -253,6 +332,8 @@ def send_message(topic, message, wait=True):
consumer.wait()
-# TODO: Replace with a docstring test
if __name__ == "__main__":
+ # NOTE(vish): you can send messages from the command line using
+ # topic and a json sting representing a dictionary
+ # for the method
send_message(sys.argv[1], json.loads(sys.argv[2]))
diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py
new file mode 100644
index 000000000..764a97416
--- /dev/null
+++ b/nova/tests/rpc_unittest.py
@@ -0,0 +1,85 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Unit Tests for remote procedure calls using queue
+"""
+import logging
+
+from twisted.internet import defer
+
+from nova import flags
+from nova import rpc
+from nova import test
+
+
+FLAGS = flags.FLAGS
+
+
+class RpcTestCase(test.BaseTestCase):
+ """Test cases for rpc"""
+ def setUp(self): # pylint: disable=C0103
+ super(RpcTestCase, self).setUp()
+ self.conn = rpc.Connection.instance()
+ self.receiver = TestReceiver()
+ self.consumer = rpc.AdapterConsumer(connection=self.conn,
+ topic='test',
+ proxy=self.receiver)
+
+ self.injected.append(self.consumer.attach_to_tornado(self.ioloop))
+
+ def test_call_succeed(self):
+ """Get a value through rpc call"""
+ value = 42
+ result = yield rpc.call('test', {"method": "echo",
+ "args": {"value": value}})
+ self.assertEqual(value, result)
+
+ def test_call_exception(self):
+ """Test that exception gets passed back properly
+
+ rpc.call returns a RemoteError object. The value of the
+ exception is converted to a string, so we convert it back
+ to an int in the test.
+ """
+ value = 42
+ self.assertFailure(rpc.call('test', {"method": "fail",
+ "args": {"value": value}}),
+ rpc.RemoteError)
+ try:
+ yield rpc.call('test', {"method": "fail",
+ "args": {"value": value}})
+ self.fail("should have thrown rpc.RemoteError")
+ except rpc.RemoteError as exc:
+ self.assertEqual(int(exc.value), value)
+
+
+class TestReceiver(object):
+ """Simple Proxy class so the consumer has methods to call
+
+ Uses static methods because we aren't actually storing any state"""
+
+ @staticmethod
+ def echo(value):
+ """Simply returns whatever value is sent in"""
+ logging.debug("Received %s", value)
+ return defer.succeed(value)
+
+ @staticmethod
+ def fail(value):
+ """Raises an exception with the value sent in"""
+ raise Exception(value)
diff --git a/run_tests.py b/run_tests.py
index 7fe6e73ec..d90ac8175 100644
--- a/run_tests.py
+++ b/run_tests.py
@@ -59,6 +59,7 @@ from nova.tests.model_unittest import *
from nova.tests.network_unittest import *
from nova.tests.objectstore_unittest import *
from nova.tests.process_unittest import *
+from nova.tests.rpc_unittest import *
from nova.tests.validator_unittest import *
from nova.tests.volume_unittest import *