diff options
author | Eric Day <eday@oddments.org> | 2010-09-29 02:13:12 +0000 |
---|---|---|
committer | Tarmac <> | 2010-09-29 02:13:12 +0000 |
commit | 2b65cf963b8afbc4703f79d3057e5c19f2894baa (patch) | |
tree | 5908d4245207865eda68d546cb8413cdd00757ee /nova/rpc.py | |
parent | 493047c2e475ff2c5d071e412d502081bd024078 (diff) | |
parent | 808b3421e7cd73e9ac55bd61cedfa42e75cb9780 (diff) | |
download | nova-2b65cf963b8afbc4703f79d3057e5c19f2894baa.tar.gz nova-2b65cf963b8afbc4703f79d3057e5c19f2894baa.tar.xz nova-2b65cf963b8afbc4703f79d3057e5c19f2894baa.zip |
Merged Termie's branch that starts tornado removal and fixed rpc test cases for twisted. Nothing is testing the Eventlet version of rpc.call though yet.
Diffstat (limited to 'nova/rpc.py')
-rw-r--r-- | nova/rpc.py | 45 |
1 files changed, 32 insertions, 13 deletions
diff --git a/nova/rpc.py b/nova/rpc.py index 6363335ea..fe52ad35f 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -84,19 +84,6 @@ class Consumer(messaging.Consumer): self.failed_connection = False super(Consumer, self).__init__(*args, **kwargs) - # TODO(termie): it would be nice to give these some way of automatically - # cleaning up after themselves - def attach_to_tornado(self, io_inst=None): - """Attach a callback to tornado that fires 10 times a second""" - from tornado import ioloop - if io_inst is None: - io_inst = ioloop.IOLoop.instance() - - injected = ioloop.PeriodicCallback( - lambda: self.fetch(enable_callbacks=True), 100, io_loop=io_inst) - injected.start() - return injected - def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): """Wraps the parent fetch with some logic for failed connections""" # TODO(vish): the logic for failed connections and logging should be @@ -124,6 +111,7 @@ class Consumer(messaging.Consumer): """Attach a callback to twisted that fires 10 times a second""" loop = task.LoopingCall(self.fetch, enable_callbacks=True) loop.start(interval=0.1) + return loop class Publisher(messaging.Publisher): @@ -294,6 +282,37 @@ def call(topic, msg): return wait_msg.result +def call_twisted(topic, msg): + """Sends a message on a topic and wait for a response""" + LOG.debug("Making asynchronous call...") + msg_id = uuid.uuid4().hex + msg.update({'_msg_id': msg_id}) + LOG.debug("MSG_ID is %s" % (msg_id)) + + conn = Connection.instance() + d = defer.Deferred() + consumer = DirectConsumer(connection=conn, msg_id=msg_id) + + def deferred_receive(data, message): + """Acks message and callbacks or errbacks""" + message.ack() + if data['failure']: + return d.errback(RemoteError(*data['failure'])) + else: + return d.callback(data['result']) + + consumer.register_callback(deferred_receive) + injected = consumer.attach_to_twisted() + + # clean up after the injected listened and return x + d.addCallback(lambda x: injected.stop() and x or x) + + publisher = TopicPublisher(connection=conn, topic=topic) + publisher.send(msg) + publisher.close() + return d + + def cast(topic, msg): """Sends a message on a topic without waiting for a response""" LOG.debug("Making asynchronous cast...") |