From 3c85f1b7ed593a2d4d126a34241f217da5cf7ce6 Mon Sep 17 00:00:00 2001 From: Andy Smith Date: Wed, 8 Dec 2010 17:18:27 -0800 Subject: intermediate commit to checkpoint progress all relevant tests are passing except volume, next step is volume manager fixery --- nova/rpc.py | 75 +++++++++++++++++-------------------------------------------- 1 file changed, 20 insertions(+), 55 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 86a29574f..652b9e4aa 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -25,18 +25,18 @@ import json import logging import sys import time +import traceback import uuid from carrot import connection as carrot_connection from carrot import messaging from eventlet import greenthread -from twisted.internet import defer -from twisted.internet import task +from nova import context from nova import exception from nova import fakerabbit from nova import flags -from nova import context +from nova import utils FLAGS = flags.FLAGS @@ -128,17 +128,9 @@ class Consumer(messaging.Consumer): def attach_to_eventlet(self): """Only needed for unit tests!""" - def fetch_repeatedly(): - while True: - self.fetch(enable_callbacks=True) - greenthread.sleep(0.1) - greenthread.spawn(fetch_repeatedly) - - def attach_to_twisted(self): - """Attach a callback to twisted that fires 10 times a second""" - loop = task.LoopingCall(self.fetch, enable_callbacks=True) - loop.start(interval=0.1) - return loop + timer = utils.LoopingCall(self.fetch, enable_callbacks=True) + timer.start(0.1) + return timer class Publisher(messaging.Publisher): @@ -197,10 +189,13 @@ class AdapterConsumer(TopicConsumer): node_args = dict((str(k), v) for k, v in args.iteritems()) # NOTE(vish): magic is fun! # pylint: disable-msg=W0142 - d = defer.maybeDeferred(node_func, context=ctxt, **node_args) - if msg_id: - d.addCallback(lambda rval: msg_reply(msg_id, rval, None)) - d.addErrback(lambda e: msg_reply(msg_id, None, e)) + try: + rval = node_func(context=ctxt, **node_args) + if msg_id: + msg_reply(msg_id, rval, None) + except Exception as e: + if msg_id: + msg_reply(msg_id, None, sys.exc_info()) return @@ -244,11 +239,11 @@ def msg_reply(msg_id, reply=None, failure=None): failure should be a twisted failure object""" if failure: - message = failure.getErrorMessage() - traceback = failure.getTraceback() + message = str(failure[1]) + tb = traceback.format_exception(*failure) logging.error("Returning exception %s to caller", message) - logging.error(traceback) - failure = (failure.type.__name__, str(failure.value), traceback) + logging.error(tb) + failure = (failure[0].__name__, str(failure[1]), tb) conn = Connection.instance() publisher = DirectPublisher(connection=conn, msg_id=msg_id) try: @@ -313,8 +308,8 @@ def call(context, topic, msg): _pack_context(msg, context) class WaitMessage(object): - def __call__(self, data, message): + LOG.debug('data %s, msg %s', data, message) """Acks message and sets result.""" message.ack() if data['failure']: @@ -337,41 +332,11 @@ def call(context, topic, msg): except StopIteration: pass consumer.close() + if isinstance(wait_msg.result, Exception): + raise wait_msg.result return wait_msg.result -def call_twisted(context, topic, msg): - """Sends a message on a topic and wait for a response""" - LOG.debug("Making asynchronous call...") - msg_id = uuid.uuid4().hex - msg.update({'_msg_id': msg_id}) - LOG.debug("MSG_ID is %s" % (msg_id)) - _pack_context(msg, context) - - conn = Connection.instance() - d = defer.Deferred() - consumer = DirectConsumer(connection=conn, msg_id=msg_id) - - def deferred_receive(data, message): - """Acks message and callbacks or errbacks""" - message.ack() - if data['failure']: - return d.errback(RemoteError(*data['failure'])) - else: - return d.callback(data['result']) - - consumer.register_callback(deferred_receive) - injected = consumer.attach_to_twisted() - - # clean up after the injected listened and return x - d.addCallback(lambda x: injected.stop() and x or x) - - publisher = TopicPublisher(connection=conn, topic=topic) - publisher.send(msg) - publisher.close() - return d - - def cast(context, topic, msg): """Sends a message on a topic without waiting for a response""" LOG.debug("Making asynchronous cast...") -- cgit From a1640f352806ee12f6b485a8d69a65bd42b51411 Mon Sep 17 00:00:00 2001 From: Andy Smith Date: Thu, 9 Dec 2010 16:05:13 -0800 Subject: formatting and naming cleanup --- nova/rpc.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 652b9e4aa..6a634a4ec 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -309,7 +309,6 @@ def call(context, topic, msg): class WaitMessage(object): def __call__(self, data, message): - LOG.debug('data %s, msg %s', data, message) """Acks message and sets result.""" message.ack() if data['failure']: @@ -332,6 +331,10 @@ def call(context, topic, msg): except StopIteration: pass consumer.close() + # NOTE(termie): this is a little bit of a change from the original + # twisted-based code where returning a Failure + # instance from a deferred call is very similar to + # raising an exception if isinstance(wait_msg.result, Exception): raise wait_msg.result return wait_msg.result -- cgit From af5c175dbc77048fb74311bf92569866676eee9c Mon Sep 17 00:00:00 2001 From: Andy Smith Date: Thu, 9 Dec 2010 16:18:52 -0800 Subject: removed a few more references to twisted --- nova/rpc.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 6a634a4ec..b5df4904b 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -237,7 +237,9 @@ class DirectPublisher(Publisher): def msg_reply(msg_id, reply=None, failure=None): """Sends a reply or an error on the channel signified by msg_id - failure should be a twisted failure object""" + failure should be a sys.exc_info() tuple. + + """ if failure: message = str(failure[1]) tb = traceback.format_exception(*failure) @@ -332,7 +334,7 @@ def call(context, topic, msg): pass consumer.close() # NOTE(termie): this is a little bit of a change from the original - # twisted-based code where returning a Failure + # non-eventlet code where returning a Failure # instance from a deferred call is very similar to # raising an exception if isinstance(wait_msg.result, Exception): -- cgit From 12802a76c775a35e9d5a651bf896cfa25bec547f Mon Sep 17 00:00:00 2001 From: "jaypipes@gmail.com" <> Date: Sat, 11 Dec 2010 15:23:40 -0500 Subject: First round of i18n-ifying strings in Nova --- nova/rpc.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 86a29574f..cc3c7dfc8 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -91,15 +91,15 @@ class Consumer(messaging.Consumer): self.failed_connection = False break except: # Catching all because carrot sucks - logging.exception("AMQP server on %s:%d is unreachable." \ - " Trying again in %d seconds." % ( + logging.exception(_("AMQP server on %s:%d is unreachable." + " Trying again in %d seconds.") % ( FLAGS.rabbit_host, FLAGS.rabbit_port, FLAGS.rabbit_retry_interval)) self.failed_connection = True if self.failed_connection: - logging.exception("Unable to connect to AMQP server" \ - " after %d tries. Shutting down." % FLAGS.rabbit_max_retries) + logging.exception(_("Unable to connect to AMQP server" + " after %d tries. Shutting down.") % FLAGS.rabbit_max_retries) sys.exit(1) def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): @@ -116,14 +116,14 @@ class Consumer(messaging.Consumer): self.declare() super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks) if self.failed_connection: - logging.error("Reconnected to queue") + logging.error(_("Reconnected to queue")) self.failed_connection = False # NOTE(vish): This is catching all errors because we really don't # exceptions to be logged 10 times a second if some # persistent failure occurs. except Exception: # pylint: disable-msg=W0703 if not self.failed_connection: - logging.exception("Failed to fetch message from queue") + logging.exception(_("Failed to fetch message from queue")) self.failed_connection = True def attach_to_eventlet(self): @@ -161,7 +161,7 @@ class TopicConsumer(Consumer): class AdapterConsumer(TopicConsumer): """Calls methods on a proxy object based on method and args""" def __init__(self, connection=None, topic="broadcast", proxy=None): - LOG.debug('Initing the Adapter Consumer for %s' % (topic)) + LOG.debug(_('Initing the Adapter Consumer for %s') % (topic)) self.proxy = proxy super(AdapterConsumer, self).__init__(connection=connection, topic=topic) @@ -176,7 +176,7 @@ class AdapterConsumer(TopicConsumer): Example: {'method': 'echo', 'args': {'value': 42}} """ - LOG.debug('received %s' % (message_data)) + LOG.debug(_('received %s') % (message_data)) msg_id = message_data.pop('_msg_id', None) ctxt = _unpack_context(message_data) @@ -189,8 +189,8 @@ class AdapterConsumer(TopicConsumer): # messages stay in the queue indefinitely, so for now # we just log the message and send an error string # back to the caller - LOG.warn('no method for message: %s' % (message_data)) - msg_reply(msg_id, 'No method for message: %s' % message_data) + LOG.warn(_('no method for message: %s') % (message_data)) + msg_reply(msg_id, _('No method for message: %s') % message_data) return node_func = getattr(self.proxy, str(method)) @@ -246,7 +246,7 @@ def msg_reply(msg_id, reply=None, failure=None): if failure: message = failure.getErrorMessage() traceback = failure.getTraceback() - logging.error("Returning exception %s to caller", message) + logging.error(_("Returning exception %s to caller"), message) logging.error(traceback) failure = (failure.type.__name__, str(failure.value), traceback) conn = Connection.instance() @@ -287,7 +287,7 @@ def _unpack_context(msg): if key.startswith('_context_'): value = msg.pop(key) context_dict[key[9:]] = value - LOG.debug('unpacked context: %s', context_dict) + LOG.debug(_('unpacked context: %s'), context_dict) return context.RequestContext.from_dict(context_dict) @@ -306,10 +306,10 @@ def _pack_context(msg, context): def call(context, topic, msg): """Sends a message on a topic and wait for a response""" - LOG.debug("Making asynchronous call...") + LOG.debug(_("Making asynchronous call...")) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) - LOG.debug("MSG_ID is %s" % (msg_id)) + LOG.debug(_("MSG_ID is %s") % (msg_id)) _pack_context(msg, context) class WaitMessage(object): @@ -345,7 +345,7 @@ def call_twisted(context, topic, msg): LOG.debug("Making asynchronous call...") msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) - LOG.debug("MSG_ID is %s" % (msg_id)) + LOG.debug(_("MSG_ID is %s") % (msg_id)) _pack_context(msg, context) conn = Connection.instance() @@ -384,7 +384,7 @@ def cast(context, topic, msg): def generic_response(message_data, message): """Logs a result and exits""" - LOG.debug('response %s', message_data) + LOG.debug(_('response %s'), message_data) message.ack() sys.exit(0) @@ -393,8 +393,8 @@ def send_message(topic, message, wait=True): """Sends a message for testing""" msg_id = uuid.uuid4().hex message.update({'_msg_id': msg_id}) - LOG.debug('topic is %s', topic) - LOG.debug('message %s', message) + LOG.debug(_('topic is %s'), topic) + LOG.debug(_('message %s'), message) if wait: consumer = messaging.Consumer(connection=Connection.instance(), -- cgit From a2a8406b5d793545c8ecb359e18b80bba618c509 Mon Sep 17 00:00:00 2001 From: termie Date: Tue, 14 Dec 2010 16:05:39 -0800 Subject: updates per review --- nova/rpc.py | 1 - 1 file changed, 1 deletion(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index b5df4904b..6a3f552db 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -188,7 +188,6 @@ class AdapterConsumer(TopicConsumer): node_func = getattr(self.proxy, str(method)) node_args = dict((str(k), v) for k, v in args.iteritems()) # NOTE(vish): magic is fun! - # pylint: disable-msg=W0142 try: rval = node_func(context=ctxt, **node_args) if msg_id: -- cgit