summaryrefslogtreecommitdiffstats
path: root/nova/rpc.py
diff options
context:
space:
mode:
Diffstat (limited to 'nova/rpc.py')
-rw-r--r--nova/rpc.py119
1 files changed, 44 insertions, 75 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index 86a29574f..844088348 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -25,18 +25,18 @@ import json
import logging
import sys
import time
+import traceback
import uuid
from carrot import connection as carrot_connection
from carrot import messaging
from eventlet import greenthread
-from twisted.internet import defer
-from twisted.internet import task
+from nova import context
from nova import exception
from nova import fakerabbit
from nova import flags
-from nova import context
+from nova import utils
FLAGS = flags.FLAGS
@@ -91,15 +91,15 @@ class Consumer(messaging.Consumer):
self.failed_connection = False
break
except: # Catching all because carrot sucks
- logging.exception("AMQP server on %s:%d is unreachable." \
- " Trying again in %d seconds." % (
+ logging.exception(_("AMQP server on %s:%d is unreachable."
+ " Trying again in %d seconds.") % (
FLAGS.rabbit_host,
FLAGS.rabbit_port,
FLAGS.rabbit_retry_interval))
self.failed_connection = True
if self.failed_connection:
- logging.exception("Unable to connect to AMQP server" \
- " after %d tries. Shutting down." % FLAGS.rabbit_max_retries)
+ logging.exception(_("Unable to connect to AMQP server"
+ " after %d tries. Shutting down.") % FLAGS.rabbit_max_retries)
sys.exit(1)
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
@@ -116,29 +116,21 @@ class Consumer(messaging.Consumer):
self.declare()
super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
if self.failed_connection:
- logging.error("Reconnected to queue")
+ logging.error(_("Reconnected to queue"))
self.failed_connection = False
# NOTE(vish): This is catching all errors because we really don't
# exceptions to be logged 10 times a second if some
# persistent failure occurs.
except Exception: # pylint: disable-msg=W0703
if not self.failed_connection:
- logging.exception("Failed to fetch message from queue")
+ logging.exception(_("Failed to fetch message from queue"))
self.failed_connection = True
def attach_to_eventlet(self):
"""Only needed for unit tests!"""
- def fetch_repeatedly():
- while True:
- self.fetch(enable_callbacks=True)
- greenthread.sleep(0.1)
- greenthread.spawn(fetch_repeatedly)
-
- def attach_to_twisted(self):
- """Attach a callback to twisted that fires 10 times a second"""
- loop = task.LoopingCall(self.fetch, enable_callbacks=True)
- loop.start(interval=0.1)
- return loop
+ timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
+ timer.start(0.1)
+ return timer
class Publisher(messaging.Publisher):
@@ -161,7 +153,7 @@ class TopicConsumer(Consumer):
class AdapterConsumer(TopicConsumer):
"""Calls methods on a proxy object based on method and args"""
def __init__(self, connection=None, topic="broadcast", proxy=None):
- LOG.debug('Initing the Adapter Consumer for %s' % (topic))
+ LOG.debug(_('Initing the Adapter Consumer for %s') % (topic))
self.proxy = proxy
super(AdapterConsumer, self).__init__(connection=connection,
topic=topic)
@@ -176,7 +168,7 @@ class AdapterConsumer(TopicConsumer):
Example: {'method': 'echo', 'args': {'value': 42}}
"""
- LOG.debug('received %s' % (message_data))
+ LOG.debug(_('received %s') % (message_data))
msg_id = message_data.pop('_msg_id', None)
ctxt = _unpack_context(message_data)
@@ -189,18 +181,20 @@ class AdapterConsumer(TopicConsumer):
# messages stay in the queue indefinitely, so for now
# we just log the message and send an error string
# back to the caller
- LOG.warn('no method for message: %s' % (message_data))
- msg_reply(msg_id, 'No method for message: %s' % message_data)
+ LOG.warn(_('no method for message: %s') % (message_data))
+ msg_reply(msg_id, _('No method for message: %s') % message_data)
return
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
- # pylint: disable-msg=W0142
- d = defer.maybeDeferred(node_func, context=ctxt, **node_args)
- if msg_id:
- d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
- d.addErrback(lambda e: msg_reply(msg_id, None, e))
+ try:
+ rval = node_func(context=ctxt, **node_args)
+ if msg_id:
+ msg_reply(msg_id, rval, None)
+ except Exception as e:
+ if msg_id:
+ msg_reply(msg_id, None, sys.exc_info())
return
@@ -242,14 +236,16 @@ class DirectPublisher(Publisher):
def msg_reply(msg_id, reply=None, failure=None):
"""Sends a reply or an error on the channel signified by msg_id
- failure should be a twisted failure object"""
+ failure should be a sys.exc_info() tuple.
+
+ """
if failure:
- message = failure.getErrorMessage()
- traceback = failure.getTraceback()
- logging.error("Returning exception %s to caller", message)
- logging.error(traceback)
- failure = (failure.type.__name__, str(failure.value), traceback)
- conn = Connection.instance()
+ message = str(failure[1])
+ tb = traceback.format_exception(*failure)
+ logging.error(_("Returning exception %s to caller"), message)
+ logging.error(tb)
+ failure = (failure[0].__name__, str(failure[1]), tb)
+ conn = Connection.instance(True)
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
@@ -287,7 +283,7 @@ def _unpack_context(msg):
if key.startswith('_context_'):
value = msg.pop(key)
context_dict[key[9:]] = value
- LOG.debug('unpacked context: %s', context_dict)
+ LOG.debug(_('unpacked context: %s'), context_dict)
return context.RequestContext.from_dict(context_dict)
@@ -306,14 +302,13 @@ def _pack_context(msg, context):
def call(context, topic, msg):
"""Sends a message on a topic and wait for a response"""
- LOG.debug("Making asynchronous call...")
+ LOG.debug(_("Making asynchronous call..."))
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
- LOG.debug("MSG_ID is %s" % (msg_id))
+ LOG.debug(_("MSG_ID is %s") % (msg_id))
_pack_context(msg, context)
class WaitMessage(object):
-
def __call__(self, data, message):
"""Acks message and sets result."""
message.ack()
@@ -337,41 +332,15 @@ def call(context, topic, msg):
except StopIteration:
pass
consumer.close()
+ # NOTE(termie): this is a little bit of a change from the original
+ # non-eventlet code where returning a Failure
+ # instance from a deferred call is very similar to
+ # raising an exception
+ if isinstance(wait_msg.result, Exception):
+ raise wait_msg.result
return wait_msg.result
-def call_twisted(context, topic, msg):
- """Sends a message on a topic and wait for a response"""
- LOG.debug("Making asynchronous call...")
- msg_id = uuid.uuid4().hex
- msg.update({'_msg_id': msg_id})
- LOG.debug("MSG_ID is %s" % (msg_id))
- _pack_context(msg, context)
-
- conn = Connection.instance()
- d = defer.Deferred()
- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
-
- def deferred_receive(data, message):
- """Acks message and callbacks or errbacks"""
- message.ack()
- if data['failure']:
- return d.errback(RemoteError(*data['failure']))
- else:
- return d.callback(data['result'])
-
- consumer.register_callback(deferred_receive)
- injected = consumer.attach_to_twisted()
-
- # clean up after the injected listened and return x
- d.addCallback(lambda x: injected.stop() and x or x)
-
- publisher = TopicPublisher(connection=conn, topic=topic)
- publisher.send(msg)
- publisher.close()
- return d
-
-
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")
@@ -384,7 +353,7 @@ def cast(context, topic, msg):
def generic_response(message_data, message):
"""Logs a result and exits"""
- LOG.debug('response %s', message_data)
+ LOG.debug(_('response %s'), message_data)
message.ack()
sys.exit(0)
@@ -393,8 +362,8 @@ def send_message(topic, message, wait=True):
"""Sends a message for testing"""
msg_id = uuid.uuid4().hex
message.update({'_msg_id': msg_id})
- LOG.debug('topic is %s', topic)
- LOG.debug('message %s', message)
+ LOG.debug(_('topic is %s'), topic)
+ LOG.debug(_('message %s'), message)
if wait:
consumer = messaging.Consumer(connection=Connection.instance(),