summaryrefslogtreecommitdiffstats
path: root/nova/rpc.py
diff options
context:
space:
mode:
authorEric Day <eday@oddments.org>2010-09-24 19:25:12 -0700
committerEric Day <eday@oddments.org>2010-09-24 19:25:12 -0700
commit808b3421e7cd73e9ac55bd61cedfa42e75cb9780 (patch)
tree6450be6895ad48751e37241983e494a5c06580a9 /nova/rpc.py
parentcb311a3deb42094261b91467b7717f4eb3e9eaba (diff)
parent387671f9bc0299116ffbab7acfc47127afb989aa (diff)
downloadnova-808b3421e7cd73e9ac55bd61cedfa42e75cb9780.tar.gz
nova-808b3421e7cd73e9ac55bd61cedfa42e75cb9780.tar.xz
nova-808b3421e7cd73e9ac55bd61cedfa42e75cb9780.zip
Merged Termie's branch and fixed rpc test cases for tesited. Nothing is testing the Eventlet version of rpc.call though yet.
Diffstat (limited to 'nova/rpc.py')
-rw-r--r--nova/rpc.py45
1 files changed, 32 insertions, 13 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index 6363335ea..fe52ad35f 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -84,19 +84,6 @@ class Consumer(messaging.Consumer):
self.failed_connection = False
super(Consumer, self).__init__(*args, **kwargs)
- # TODO(termie): it would be nice to give these some way of automatically
- # cleaning up after themselves
- def attach_to_tornado(self, io_inst=None):
- """Attach a callback to tornado that fires 10 times a second"""
- from tornado import ioloop
- if io_inst is None:
- io_inst = ioloop.IOLoop.instance()
-
- injected = ioloop.PeriodicCallback(
- lambda: self.fetch(enable_callbacks=True), 100, io_loop=io_inst)
- injected.start()
- return injected
-
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connections"""
# TODO(vish): the logic for failed connections and logging should be
@@ -124,6 +111,7 @@ class Consumer(messaging.Consumer):
"""Attach a callback to twisted that fires 10 times a second"""
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
loop.start(interval=0.1)
+ return loop
class Publisher(messaging.Publisher):
@@ -294,6 +282,37 @@ def call(topic, msg):
return wait_msg.result
+def call_twisted(topic, msg):
+ """Sends a message on a topic and wait for a response"""
+ LOG.debug("Making asynchronous call...")
+ msg_id = uuid.uuid4().hex
+ msg.update({'_msg_id': msg_id})
+ LOG.debug("MSG_ID is %s" % (msg_id))
+
+ conn = Connection.instance()
+ d = defer.Deferred()
+ consumer = DirectConsumer(connection=conn, msg_id=msg_id)
+
+ def deferred_receive(data, message):
+ """Acks message and callbacks or errbacks"""
+ message.ack()
+ if data['failure']:
+ return d.errback(RemoteError(*data['failure']))
+ else:
+ return d.callback(data['result'])
+
+ consumer.register_callback(deferred_receive)
+ injected = consumer.attach_to_twisted()
+
+ # clean up after the injected listened and return x
+ d.addCallback(lambda x: injected.stop() and x or x)
+
+ publisher = TopicPublisher(connection=conn, topic=topic)
+ publisher.send(msg)
+ publisher.close()
+ return d
+
+
def cast(topic, msg):
"""Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")