summaryrefslogtreecommitdiffstats
path: root/nova/rpc.py
diff options
context:
space:
mode:
authorSoren Hansen <soren.hansen@rackspace.com>2010-09-29 13:29:45 +0200
committerSoren Hansen <soren.hansen@rackspace.com>2010-09-29 13:29:45 +0200
commitc43fb8325eb068e970df02ec99d02f37de38ecc4 (patch)
treeb4ecce6394cdb4ceb8fb382cb12045870177bb9b /nova/rpc.py
parente609dd8620796f4eadefcff6130dfeae06b97ef4 (diff)
parent43ce84290964b433fd9d9898772d29bffc385dd8 (diff)
downloadnova-c43fb8325eb068e970df02ec99d02f37de38ecc4.tar.gz
nova-c43fb8325eb068e970df02ec99d02f37de38ecc4.tar.xz
nova-c43fb8325eb068e970df02ec99d02f37de38ecc4.zip
Merge trunk.
Diffstat (limited to 'nova/rpc.py')
-rw-r--r--nova/rpc.py45
1 files changed, 32 insertions, 13 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index 6363335ea..fe52ad35f 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -84,19 +84,6 @@ class Consumer(messaging.Consumer):
self.failed_connection = False
super(Consumer, self).__init__(*args, **kwargs)
- # TODO(termie): it would be nice to give these some way of automatically
- # cleaning up after themselves
- def attach_to_tornado(self, io_inst=None):
- """Attach a callback to tornado that fires 10 times a second"""
- from tornado import ioloop
- if io_inst is None:
- io_inst = ioloop.IOLoop.instance()
-
- injected = ioloop.PeriodicCallback(
- lambda: self.fetch(enable_callbacks=True), 100, io_loop=io_inst)
- injected.start()
- return injected
-
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connections"""
# TODO(vish): the logic for failed connections and logging should be
@@ -124,6 +111,7 @@ class Consumer(messaging.Consumer):
"""Attach a callback to twisted that fires 10 times a second"""
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
loop.start(interval=0.1)
+ return loop
class Publisher(messaging.Publisher):
@@ -294,6 +282,37 @@ def call(topic, msg):
return wait_msg.result
+def call_twisted(topic, msg):
+ """Sends a message on a topic and wait for a response"""
+ LOG.debug("Making asynchronous call...")
+ msg_id = uuid.uuid4().hex
+ msg.update({'_msg_id': msg_id})
+ LOG.debug("MSG_ID is %s" % (msg_id))
+
+ conn = Connection.instance()
+ d = defer.Deferred()
+ consumer = DirectConsumer(connection=conn, msg_id=msg_id)
+
+ def deferred_receive(data, message):
+ """Acks message and callbacks or errbacks"""
+ message.ack()
+ if data['failure']:
+ return d.errback(RemoteError(*data['failure']))
+ else:
+ return d.callback(data['result'])
+
+ consumer.register_callback(deferred_receive)
+ injected = consumer.attach_to_twisted()
+
+ # clean up after the injected listened and return x
+ d.addCallback(lambda x: injected.stop() and x or x)
+
+ publisher = TopicPublisher(connection=conn, topic=topic)
+ publisher.send(msg)
+ publisher.close()
+ return d
+
+
def cast(topic, msg):
"""Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")