summaryrefslogtreecommitdiffstats
path: root/nova/rpc.py
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@yahoo.com>2010-09-28 16:54:57 -0700
committerVishvananda Ishaya <vishvananda@yahoo.com>2010-09-28 16:54:57 -0700
commitbd5816698316f64a2df700ed361b66e533eb9a31 (patch)
tree6a5b111ae15e2b9d7ca607864803bc2b6b1cfc8f /nova/rpc.py
parent669cf475d11700064aa16f959077d0512e6b1531 (diff)
parent2b65cf963b8afbc4703f79d3057e5c19f2894baa (diff)
downloadnova-bd5816698316f64a2df700ed361b66e533eb9a31.tar.gz
nova-bd5816698316f64a2df700ed361b66e533eb9a31.tar.xz
nova-bd5816698316f64a2df700ed361b66e533eb9a31.zip
fixed merge conflicts
Diffstat (limited to 'nova/rpc.py')
-rw-r--r--nova/rpc.py45
1 files changed, 32 insertions, 13 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index 6363335ea..fe52ad35f 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -84,19 +84,6 @@ class Consumer(messaging.Consumer):
self.failed_connection = False
super(Consumer, self).__init__(*args, **kwargs)
- # TODO(termie): it would be nice to give these some way of automatically
- # cleaning up after themselves
- def attach_to_tornado(self, io_inst=None):
- """Attach a callback to tornado that fires 10 times a second"""
- from tornado import ioloop
- if io_inst is None:
- io_inst = ioloop.IOLoop.instance()
-
- injected = ioloop.PeriodicCallback(
- lambda: self.fetch(enable_callbacks=True), 100, io_loop=io_inst)
- injected.start()
- return injected
-
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connections"""
# TODO(vish): the logic for failed connections and logging should be
@@ -124,6 +111,7 @@ class Consumer(messaging.Consumer):
"""Attach a callback to twisted that fires 10 times a second"""
loop = task.LoopingCall(self.fetch, enable_callbacks=True)
loop.start(interval=0.1)
+ return loop
class Publisher(messaging.Publisher):
@@ -294,6 +282,37 @@ def call(topic, msg):
return wait_msg.result
+def call_twisted(topic, msg):
+ """Sends a message on a topic and wait for a response"""
+ LOG.debug("Making asynchronous call...")
+ msg_id = uuid.uuid4().hex
+ msg.update({'_msg_id': msg_id})
+ LOG.debug("MSG_ID is %s" % (msg_id))
+
+ conn = Connection.instance()
+ d = defer.Deferred()
+ consumer = DirectConsumer(connection=conn, msg_id=msg_id)
+
+ def deferred_receive(data, message):
+ """Acks message and callbacks or errbacks"""
+ message.ack()
+ if data['failure']:
+ return d.errback(RemoteError(*data['failure']))
+ else:
+ return d.callback(data['result'])
+
+ consumer.register_callback(deferred_receive)
+ injected = consumer.attach_to_twisted()
+
+ # clean up after the injected listened and return x
+ d.addCallback(lambda x: injected.stop() and x or x)
+
+ publisher = TopicPublisher(connection=conn, topic=topic)
+ publisher.send(msg)
+ publisher.close()
+ return d
+
+
def cast(topic, msg):
"""Sends a message on a topic without waiting for a response"""
LOG.debug("Making asynchronous cast...")