summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@gmail.com>2010-08-12 21:27:53 -0700
committerVishvananda Ishaya <vishvananda@gmail.com>2010-08-12 21:27:53 -0700
commita679cab031ec91dd719b9ba887cdae4f595b2ca4 (patch)
treecbc8b1772582f3f8b5ab5694147580e416e0fd68
parent2bbb2b86272c89b35a1042ab2866bbe4863bc3e3 (diff)
downloadnova-a679cab031ec91dd719b9ba887cdae4f595b2ca4.tar.gz
nova-a679cab031ec91dd719b9ba887cdae4f595b2ca4.tar.xz
nova-a679cab031ec91dd719b9ba887cdae4f595b2ca4.zip
make rpc.call propogate exception info. Includes tests
-rw-r--r--nova/endpoint/cloud.py15
-rw-r--r--nova/rpc.py38
-rw-r--r--nova/tests/rpc_unittest.py62
-rw-r--r--run_tests.py1
4 files changed, 98 insertions, 18 deletions
diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py
index ad9188ff3..c32fb1f7f 100644
--- a/nova/endpoint/cloud.py
+++ b/nova/endpoint/cloud.py
@@ -103,7 +103,7 @@ class CloudController(object):
result = {}
for instance in self.instdir.all:
if instance['project_id'] == project_id:
- line = '%s slots=%d' % (instance['private_dns_name'],
+ line = '%s slots=%d' % (instance['private_dns_name'],
INSTANCE_TYPES[instance['instance_type']]['vcpus'])
if instance['key_name'] in result:
result[instance['key_name']].append(line)
@@ -300,7 +300,7 @@ class CloudController(object):
"user_id": context.user.id,
"project_id": context.project.id}})
# NOTE(vish): rpc returned value is in the result key in the dictionary
- volume = self._get_volume(context, result['result'])
+ volume = self._get_volume(context, result)
defer.returnValue({'volumeSet': [self.format_volume(context, volume)]})
def _get_address(self, context, public_ip):
@@ -423,7 +423,7 @@ class CloudController(object):
i['key_name'] = instance.get('key_name', None)
if context.user.is_admin():
i['key_name'] = '%s (%s, %s)' % (i['key_name'],
- instance.get('project_id', None),
+ instance.get('project_id', None),
instance.get('node_name', ''))
i['product_codes_set'] = self._convert_to_set(
instance.get('product_codes', None), 'product_code')
@@ -471,11 +471,10 @@ class CloudController(object):
@defer.inlineCallbacks
def allocate_address(self, context, **kwargs):
network_topic = yield self._get_network_topic(context)
- alloc_result = yield rpc.call(network_topic,
+ public_ip = yield rpc.call(network_topic,
{"method": "allocate_elastic_ip",
"args": {"user_id": context.user.id,
"project_id": context.project.id}})
- public_ip = alloc_result['result']
defer.returnValue({'addressSet': [{'publicIp': public_ip}]})
@rbac.allow('netadmin')
@@ -516,11 +515,10 @@ class CloudController(object):
"""Retrieves the network host for a project"""
host = network_service.get_host_for_project(context.project.id)
if not host:
- result = yield rpc.call(FLAGS.network_topic,
+ host = yield rpc.call(FLAGS.network_topic,
{"method": "set_network_host",
"args": {"user_id": context.user.id,
"project_id": context.project.id}})
- host = result['result']
defer.returnValue('%s.%s' %(FLAGS.network_topic, host))
@rbac.allow('projectmanager', 'sysadmin')
@@ -563,13 +561,12 @@ class CloudController(object):
vpn = False
if image_id == FLAGS.vpn_image_id:
vpn = True
- allocate_result = yield rpc.call(network_topic,
+ allocate_data = yield rpc.call(network_topic,
{"method": "allocate_fixed_ip",
"args": {"user_id": context.user.id,
"project_id": context.project.id,
"security_group": security_group,
"vpn": vpn}})
- allocate_data = allocate_result['result']
inst = self.instdir.new()
inst['image_id'] = image_id
inst['kernel_id'] = kernel_id
diff --git a/nova/rpc.py b/nova/rpc.py
index 2a550c3ae..e06a3e19b 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -40,7 +40,7 @@ FLAGS = flags.FLAGS
_log = logging.getLogger('amqplib')
-_log.setLevel(logging.WARN)
+_log.setLevel(logging.DEBUG)
class Connection(connection.BrokerConnection):
@@ -141,8 +141,8 @@ class AdapterConsumer(TopicConsumer):
node_args = dict((str(k), v) for k, v in args.iteritems())
d = defer.maybeDeferred(node_func, **node_args)
if msg_id:
- d.addCallback(lambda rval: msg_reply(msg_id, rval))
- d.addErrback(lambda e: msg_reply(msg_id, str(e)))
+ d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
+ d.addErrback(lambda e: msg_reply(msg_id, None, e))
return
@@ -174,20 +174,37 @@ class DirectPublisher(Publisher):
super(DirectPublisher, self).__init__(connection=connection)
-def msg_reply(msg_id, reply):
+def msg_reply(msg_id, reply=None, failure=None):
+ if failure:
+ message = failure.getErrorMessage()
+ traceback = failure.getTraceback()
+ logging.error("Returning exception %s to caller", message)
+ logging.error(traceback)
+ failure = (failure.type.__name__, str(failure.value), traceback)
conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
-
try:
- publisher.send({'result': reply})
- except TypeError:
+ publisher.send({'result': reply, 'failure': failure})
+ except Exception, exc:
publisher.send(
{'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems())
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure
})
publisher.close()
+class RemoteError(exception.Error):
+ """signifies that a remote class has raised an exception"""
+ def __init__(self, type, value, traceback):
+ self.type = type
+ self.value = value
+ self.traceback = traceback
+ super(RemoteError, self).__init__("%s %s\n%s" % (type,
+ value,
+ traceback))
+
+
def call(topic, msg):
_log.debug("Making asynchronous call...")
msg_id = uuid.uuid4().hex
@@ -199,7 +216,10 @@ def call(topic, msg):
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
def deferred_receive(data, message):
message.ack()
- d.callback(data)
+ if data['failure']:
+ return d.errback(RemoteError(*data['failure']))
+ else:
+ return d.callback(data['result'])
consumer.register_callback(deferred_receive)
injected = consumer.attach_to_tornado()
diff --git a/nova/tests/rpc_unittest.py b/nova/tests/rpc_unittest.py
new file mode 100644
index 000000000..9c2e29344
--- /dev/null
+++ b/nova/tests/rpc_unittest.py
@@ -0,0 +1,62 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from nova import flags
+from nova import rpc
+from nova import test
+
+
+FLAGS = flags.FLAGS
+
+
+class RpcTestCase(test.BaseTestCase):
+ def setUp(self):
+ super(RpcTestCase, self).setUp()
+ self.conn = rpc.Connection.instance()
+ self.receiver = TestReceiver()
+ self.consumer = rpc.AdapterConsumer(connection=self.conn,
+ topic='test',
+ proxy=self.receiver)
+
+ self.injected.append(self.consumer.attach_to_tornado(self.ioloop))
+
+ def test_call_succeed(self):
+ value = 42
+ result = yield rpc.call('test', {"method": "echo", "args": {"value": value}})
+ self.assertEqual(value, result)
+
+ def test_call_exception(self):
+ value = 42
+ self.assertFailure(rpc.call('test', {"method": "fail", "args": {"value": value}}), rpc.RemoteError)
+ try:
+ yield rpc.call('test', {"method": "fail", "args": {"value": value}})
+ self.fail("should have thrown rpc.RemoteError")
+ except rpc.RemoteError as exc:
+ self.assertEqual(int(exc.value), value)
+
+class TestReceiver(object):
+ def echo(self, value):
+ logging.debug("Received %s", value)
+ return defer.succeed(value)
+
+ def fail(self, value):
+ raise Exception(value)
diff --git a/run_tests.py b/run_tests.py
index 7fe6e73ec..d90ac8175 100644
--- a/run_tests.py
+++ b/run_tests.py
@@ -59,6 +59,7 @@ from nova.tests.model_unittest import *
from nova.tests.network_unittest import *
from nova.tests.objectstore_unittest import *
from nova.tests.process_unittest import *
+from nova.tests.rpc_unittest import *
from nova.tests.validator_unittest import *
from nova.tests.volume_unittest import *