summaryrefslogtreecommitdiffstats
path: root/nova/rpc.py
diff options
context:
space:
mode:
authorjaypipes@gmail.com <>2010-12-17 11:10:11 -0500
committerjaypipes@gmail.com <>2010-12-17 11:10:11 -0500
commitafae367d63c6d38faa5d9be54725df29c2a5b903 (patch)
treee2282e9bbc6ef6d4c5f339a4fde38127a619f354 /nova/rpc.py
parentd283922defdda6ede5fa2e09656cd8d411a90096 (diff)
parentcd460a1f661eea7e050891f50a8218fdf24f2c6f (diff)
downloadnova-afae367d63c6d38faa5d9be54725df29c2a5b903.tar.gz
nova-afae367d63c6d38faa5d9be54725df29c2a5b903.tar.xz
nova-afae367d63c6d38faa5d9be54725df29c2a5b903.zip
Merge eventlet and resolve all conflicts
Diffstat (limited to 'nova/rpc.py')
-rw-r--r--nova/rpc.py83
1 files changed, 26 insertions, 57 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index cc3c7dfc8..6e2cf051a 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -25,18 +25,18 @@ import json
import logging
import sys
import time
+import traceback
import uuid
from carrot import connection as carrot_connection
from carrot import messaging
from eventlet import greenthread
-from twisted.internet import defer
-from twisted.internet import task
+from nova import context
from nova import exception
from nova import fakerabbit
from nova import flags
-from nova import context
+from nova import utils
FLAGS = flags.FLAGS
@@ -128,17 +128,9 @@ class Consumer(messaging.Consumer):
def attach_to_eventlet(self):
"""Only needed for unit tests!"""
- def fetch_repeatedly():
- while True:
- self.fetch(enable_callbacks=True)
- greenthread.sleep(0.1)
- greenthread.spawn(fetch_repeatedly)
-
- def attach_to_twisted(self):
- """Attach a callback to twisted that fires 10 times a second"""
- loop = task.LoopingCall(self.fetch, enable_callbacks=True)
- loop.start(interval=0.1)
- return loop
+ timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
+ timer.start(0.1)
+ return timer
class Publisher(messaging.Publisher):
@@ -196,11 +188,13 @@ class AdapterConsumer(TopicConsumer):
node_func = getattr(self.proxy, str(method))
node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
- # pylint: disable-msg=W0142
- d = defer.maybeDeferred(node_func, context=ctxt, **node_args)
- if msg_id:
- d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
- d.addErrback(lambda e: msg_reply(msg_id, None, e))
+ try:
+ rval = node_func(context=ctxt, **node_args)
+ if msg_id:
+ msg_reply(msg_id, rval, None)
+ except Exception as e:
+ if msg_id:
+ msg_reply(msg_id, None, sys.exc_info())
return
@@ -242,13 +236,15 @@ class DirectPublisher(Publisher):
def msg_reply(msg_id, reply=None, failure=None):
"""Sends a reply or an error on the channel signified by msg_id
- failure should be a twisted failure object"""
+ failure should be a sys.exc_info() tuple.
+
+ """
if failure:
- message = failure.getErrorMessage()
- traceback = failure.getTraceback()
+ message = str(failure[1])
+ tb = traceback.format_exception(*failure)
logging.error(_("Returning exception %s to caller"), message)
- logging.error(traceback)
- failure = (failure.type.__name__, str(failure.value), traceback)
+ logging.error(tb)
+ failure = (failure[0].__name__, str(failure[1]), tb)
conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
@@ -313,7 +309,6 @@ def call(context, topic, msg):
_pack_context(msg, context)
class WaitMessage(object):
-
def __call__(self, data, message):
"""Acks message and sets result."""
message.ack()
@@ -337,41 +332,15 @@ def call(context, topic, msg):
except StopIteration:
pass
consumer.close()
+ # NOTE(termie): this is a little bit of a change from the original
+ # non-eventlet code where returning a Failure
+ # instance from a deferred call is very similar to
+ # raising an exception
+ if isinstance(wait_msg.result, Exception):
+ raise wait_msg.result
return wait_msg.result
-def call_twisted(context, topic, msg):
- """Sends a message on a topic and wait for a response"""
- LOG.debug("Making asynchronous call...")
- msg_id = uuid.uuid4().hex
- msg.update({'_msg_id': msg_id})
- LOG.debug(_("MSG_ID is %s") % (msg_id))
- _pack_context(msg, context)
-
- conn = Connection.instance()
- d = defer.Deferred()
- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
-
- def deferred_receive(data, message):
- """Acks message and callbacks or errbacks"""
- message.ack()
- if data['failure']:
- return d.errback(RemoteError(*data['failure']))
- else:
- return d.callback(data['result'])
-
- consumer.register_callback(deferred_receive)
- injected = consumer.attach_to_twisted()
-
- # clean up after the injected listened and return x
- d.addCallback(lambda x: injected.stop() and x or x)
-
- publisher = TopicPublisher(connection=conn, topic=topic)
- publisher.send(msg)
- publisher.close()
- return d
-
-
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")