diff options
Diffstat (limited to 'nova/rpc.py')
-rw-r--r-- | nova/rpc.py | 119 |
1 files changed, 44 insertions, 75 deletions
diff --git a/nova/rpc.py b/nova/rpc.py index 86a29574f..844088348 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -25,18 +25,18 @@ import json import logging import sys import time +import traceback import uuid from carrot import connection as carrot_connection from carrot import messaging from eventlet import greenthread -from twisted.internet import defer -from twisted.internet import task +from nova import context from nova import exception from nova import fakerabbit from nova import flags -from nova import context +from nova import utils FLAGS = flags.FLAGS @@ -91,15 +91,15 @@ class Consumer(messaging.Consumer): self.failed_connection = False break except: # Catching all because carrot sucks - logging.exception("AMQP server on %s:%d is unreachable." \ - " Trying again in %d seconds." % ( + logging.exception(_("AMQP server on %s:%d is unreachable." + " Trying again in %d seconds.") % ( FLAGS.rabbit_host, FLAGS.rabbit_port, FLAGS.rabbit_retry_interval)) self.failed_connection = True if self.failed_connection: - logging.exception("Unable to connect to AMQP server" \ - " after %d tries. Shutting down." % FLAGS.rabbit_max_retries) + logging.exception(_("Unable to connect to AMQP server" + " after %d tries. Shutting down.") % FLAGS.rabbit_max_retries) sys.exit(1) def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): @@ -116,29 +116,21 @@ class Consumer(messaging.Consumer): self.declare() super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks) if self.failed_connection: - logging.error("Reconnected to queue") + logging.error(_("Reconnected to queue")) self.failed_connection = False # NOTE(vish): This is catching all errors because we really don't # exceptions to be logged 10 times a second if some # persistent failure occurs. except Exception: # pylint: disable-msg=W0703 if not self.failed_connection: - logging.exception("Failed to fetch message from queue") + logging.exception(_("Failed to fetch message from queue")) self.failed_connection = True def attach_to_eventlet(self): """Only needed for unit tests!""" - def fetch_repeatedly(): - while True: - self.fetch(enable_callbacks=True) - greenthread.sleep(0.1) - greenthread.spawn(fetch_repeatedly) - - def attach_to_twisted(self): - """Attach a callback to twisted that fires 10 times a second""" - loop = task.LoopingCall(self.fetch, enable_callbacks=True) - loop.start(interval=0.1) - return loop + timer = utils.LoopingCall(self.fetch, enable_callbacks=True) + timer.start(0.1) + return timer class Publisher(messaging.Publisher): @@ -161,7 +153,7 @@ class TopicConsumer(Consumer): class AdapterConsumer(TopicConsumer): """Calls methods on a proxy object based on method and args""" def __init__(self, connection=None, topic="broadcast", proxy=None): - LOG.debug('Initing the Adapter Consumer for %s' % (topic)) + LOG.debug(_('Initing the Adapter Consumer for %s') % (topic)) self.proxy = proxy super(AdapterConsumer, self).__init__(connection=connection, topic=topic) @@ -176,7 +168,7 @@ class AdapterConsumer(TopicConsumer): Example: {'method': 'echo', 'args': {'value': 42}} """ - LOG.debug('received %s' % (message_data)) + LOG.debug(_('received %s') % (message_data)) msg_id = message_data.pop('_msg_id', None) ctxt = _unpack_context(message_data) @@ -189,18 +181,20 @@ class AdapterConsumer(TopicConsumer): # messages stay in the queue indefinitely, so for now # we just log the message and send an error string # back to the caller - LOG.warn('no method for message: %s' % (message_data)) - msg_reply(msg_id, 'No method for message: %s' % message_data) + LOG.warn(_('no method for message: %s') % (message_data)) + msg_reply(msg_id, _('No method for message: %s') % message_data) return node_func = getattr(self.proxy, str(method)) node_args = dict((str(k), v) for k, v in args.iteritems()) # NOTE(vish): magic is fun! - # pylint: disable-msg=W0142 - d = defer.maybeDeferred(node_func, context=ctxt, **node_args) - if msg_id: - d.addCallback(lambda rval: msg_reply(msg_id, rval, None)) - d.addErrback(lambda e: msg_reply(msg_id, None, e)) + try: + rval = node_func(context=ctxt, **node_args) + if msg_id: + msg_reply(msg_id, rval, None) + except Exception as e: + if msg_id: + msg_reply(msg_id, None, sys.exc_info()) return @@ -242,14 +236,16 @@ class DirectPublisher(Publisher): def msg_reply(msg_id, reply=None, failure=None): """Sends a reply or an error on the channel signified by msg_id - failure should be a twisted failure object""" + failure should be a sys.exc_info() tuple. + + """ if failure: - message = failure.getErrorMessage() - traceback = failure.getTraceback() - logging.error("Returning exception %s to caller", message) - logging.error(traceback) - failure = (failure.type.__name__, str(failure.value), traceback) - conn = Connection.instance() + message = str(failure[1]) + tb = traceback.format_exception(*failure) + logging.error(_("Returning exception %s to caller"), message) + logging.error(tb) + failure = (failure[0].__name__, str(failure[1]), tb) + conn = Connection.instance(True) publisher = DirectPublisher(connection=conn, msg_id=msg_id) try: publisher.send({'result': reply, 'failure': failure}) @@ -287,7 +283,7 @@ def _unpack_context(msg): if key.startswith('_context_'): value = msg.pop(key) context_dict[key[9:]] = value - LOG.debug('unpacked context: %s', context_dict) + LOG.debug(_('unpacked context: %s'), context_dict) return context.RequestContext.from_dict(context_dict) @@ -306,14 +302,13 @@ def _pack_context(msg, context): def call(context, topic, msg): """Sends a message on a topic and wait for a response""" - LOG.debug("Making asynchronous call...") + LOG.debug(_("Making asynchronous call...")) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) - LOG.debug("MSG_ID is %s" % (msg_id)) + LOG.debug(_("MSG_ID is %s") % (msg_id)) _pack_context(msg, context) class WaitMessage(object): - def __call__(self, data, message): """Acks message and sets result.""" message.ack() @@ -337,41 +332,15 @@ def call(context, topic, msg): except StopIteration: pass consumer.close() + # NOTE(termie): this is a little bit of a change from the original + # non-eventlet code where returning a Failure + # instance from a deferred call is very similar to + # raising an exception + if isinstance(wait_msg.result, Exception): + raise wait_msg.result return wait_msg.result -def call_twisted(context, topic, msg): - """Sends a message on a topic and wait for a response""" - LOG.debug("Making asynchronous call...") - msg_id = uuid.uuid4().hex - msg.update({'_msg_id': msg_id}) - LOG.debug("MSG_ID is %s" % (msg_id)) - _pack_context(msg, context) - - conn = Connection.instance() - d = defer.Deferred() - consumer = DirectConsumer(connection=conn, msg_id=msg_id) - - def deferred_receive(data, message): - """Acks message and callbacks or errbacks""" - message.ack() - if data['failure']: - return d.errback(RemoteError(*data['failure'])) - else: - return d.callback(data['result']) - - consumer.register_callback(deferred_receive) - injected = consumer.attach_to_twisted() - - # clean up after the injected listened and return x - d.addCallback(lambda x: injected.stop() and x or x) - - publisher = TopicPublisher(connection=conn, topic=topic) - publisher.send(msg) - publisher.close() - return d - - def cast(context, topic, msg): """Sends a message on a topic without waiting for a response""" LOG.debug("Making asynchronous cast...") @@ -384,7 +353,7 @@ def cast(context, topic, msg): def generic_response(message_data, message): """Logs a result and exits""" - LOG.debug('response %s', message_data) + LOG.debug(_('response %s'), message_data) message.ack() sys.exit(0) @@ -393,8 +362,8 @@ def send_message(topic, message, wait=True): """Sends a message for testing""" msg_id = uuid.uuid4().hex message.update({'_msg_id': msg_id}) - LOG.debug('topic is %s', topic) - LOG.debug('message %s', message) + LOG.debug(_('topic is %s'), topic) + LOG.debug(_('message %s'), message) if wait: consumer = messaging.Consumer(connection=Connection.instance(), |