summaryrefslogtreecommitdiffstats
path: root/nova/rpc.py
diff options
context:
space:
mode:
authorAndy Smith <code@term.ie>2010-12-08 17:18:27 -0800
committerAndy Smith <code@term.ie>2010-12-08 17:18:27 -0800
commit3c85f1b7ed593a2d4d126a34241f217da5cf7ce6 (patch)
tree50b88411d51a9a5bbc49ba75cd40a21b635d60c7 /nova/rpc.py
parent0324b6609fe413866fd4cc52a87760f2b5ab2d85 (diff)
downloadnova-3c85f1b7ed593a2d4d126a34241f217da5cf7ce6.tar.gz
nova-3c85f1b7ed593a2d4d126a34241f217da5cf7ce6.tar.xz
nova-3c85f1b7ed593a2d4d126a34241f217da5cf7ce6.zip
intermediate commit to checkpoint progress
all relevant tests are passing except volume, next step is volume manager fixery
Diffstat (limited to 'nova/rpc.py')
-rw-r--r--nova/rpc.py75
1 files changed, 20 insertions, 55 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index 86a29574f..652b9e4aa 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -25,18 +25,18 @@ import json
import logging
import sys
import time
+import traceback
import uuid
from carrot import connection as carrot_connection
from carrot import messaging
from eventlet import greenthread
-from twisted.internet import defer
-from twisted.internet import task
+from nova import context
from nova import exception
from nova import fakerabbit
from nova import flags
-from nova import context
+from nova import utils
FLAGS = flags.FLAGS
@@ -128,17 +128,9 @@ class Consumer(messaging.Consumer):
def attach_to_eventlet(self):
"""Only needed for unit tests!"""
- def fetch_repeatedly():
- while True:
- self.fetch(enable_callbacks=True)
- greenthread.sleep(0.1)
- greenthread.spawn(fetch_repeatedly)
-
- def attach_to_twisted(self):
- """Attach a callback to twisted that fires 10 times a second"""
- loop = task.LoopingCall(self.fetch, enable_callbacks=True)
- loop.start(interval=0.1)
- return loop
+ timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
+ timer.start(0.1)
+ return timer
class Publisher(messaging.Publisher):
@@ -197,10 +189,13 @@ class AdapterConsumer(TopicConsumer):
node_args = dict((str(k), v) for k, v in args.iteritems())
# NOTE(vish): magic is fun!
# pylint: disable-msg=W0142
- d = defer.maybeDeferred(node_func, context=ctxt, **node_args)
- if msg_id:
- d.addCallback(lambda rval: msg_reply(msg_id, rval, None))
- d.addErrback(lambda e: msg_reply(msg_id, None, e))
+ try:
+ rval = node_func(context=ctxt, **node_args)
+ if msg_id:
+ msg_reply(msg_id, rval, None)
+ except Exception as e:
+ if msg_id:
+ msg_reply(msg_id, None, sys.exc_info())
return
@@ -244,11 +239,11 @@ def msg_reply(msg_id, reply=None, failure=None):
failure should be a twisted failure object"""
if failure:
- message = failure.getErrorMessage()
- traceback = failure.getTraceback()
+ message = str(failure[1])
+ tb = traceback.format_exception(*failure)
logging.error("Returning exception %s to caller", message)
- logging.error(traceback)
- failure = (failure.type.__name__, str(failure.value), traceback)
+ logging.error(tb)
+ failure = (failure[0].__name__, str(failure[1]), tb)
conn = Connection.instance()
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
@@ -313,8 +308,8 @@ def call(context, topic, msg):
_pack_context(msg, context)
class WaitMessage(object):
-
def __call__(self, data, message):
+ LOG.debug('data %s, msg %s', data, message)
"""Acks message and sets result."""
message.ack()
if data['failure']:
@@ -337,41 +332,11 @@ def call(context, topic, msg):
except StopIteration:
pass
consumer.close()
+ if isinstance(wait_msg.result, Exception):
+ raise wait_msg.result
return wait_msg.result
-def call_twisted(context, topic, msg):
- """Sends a message on a topic and wait for a response"""
- LOG.debug("Making asynchronous call...")
- msg_id = uuid.uuid4().hex
- msg.update({'_msg_id': msg_id})
- LOG.debug("MSG_ID is %s" % (msg_id))
- _pack_context(msg, context)
-
- conn = Connection.instance()
- d = defer.Deferred()
- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
-
- def deferred_receive(data, message):
- """Acks message and callbacks or errbacks"""
- message.ack()
- if data['failure']:
- return d.errback(RemoteError(*data['failure']))
- else:
- return d.callback(data['result'])
-
- consumer.register_callback(deferred_receive)
- injected = consumer.attach_to_twisted()
-
- # clean up after the injected listened and return x
- d.addCallback(lambda x: injected.stop() and x or x)
-
- publisher = TopicPublisher(connection=conn, topic=topic)
- publisher.send(msg)
- publisher.close()
- return d
-
-
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")