diff options
author | Michael Gundlach <michael.gundlach@rackspace.com> | 2010-09-23 21:18:12 +0000 |
---|---|---|
committer | Tarmac <> | 2010-09-23 21:18:12 +0000 |
commit | cb311a3deb42094261b91467b7717f4eb3e9eaba (patch) | |
tree | 64bc1ce0f07dac7bbd1ec31c24c0b82de933d180 /nova/rpc.py | |
parent | 2b2313d5c2fad31d36ac75dbad48773e7538cf1e (diff) | |
parent | 203cd3f34cf40d5597f9791083096d70c2c3edc7 (diff) | |
download | nova-cb311a3deb42094261b91467b7717f4eb3e9eaba.tar.gz nova-cb311a3deb42094261b91467b7717f4eb3e9eaba.tar.xz nova-cb311a3deb42094261b91467b7717f4eb3e9eaba.zip |
Put EC2 API -> eventlet back into trunk, fixing the bits that I missed when I put it into trunk on 9/21.
Note that some of this got into trunk via r291 accidentally because r291 was a branch based off of the trunk that was reverted on 9/22.
Diffstat (limited to 'nova/rpc.py')
-rw-r--r-- | nova/rpc.py | 47 |
1 files changed, 26 insertions, 21 deletions
diff --git a/nova/rpc.py b/nova/rpc.py index 84a9b5590..6363335ea 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -46,9 +46,9 @@ LOG.setLevel(logging.DEBUG) class Connection(carrot_connection.BrokerConnection): """Connection instance object""" @classmethod - def instance(cls): + def instance(cls, new=False): """Returns the instance""" - if not hasattr(cls, '_instance'): + if new or not hasattr(cls, '_instance'): params = dict(hostname=FLAGS.rabbit_host, port=FLAGS.rabbit_port, userid=FLAGS.rabbit_userid, @@ -60,7 +60,10 @@ class Connection(carrot_connection.BrokerConnection): # NOTE(vish): magic is fun! # pylint: disable-msg=W0142 - cls._instance = cls(**params) + if new: + return cls(**params) + else: + cls._instance = cls(**params) return cls._instance @classmethod @@ -94,8 +97,6 @@ class Consumer(messaging.Consumer): injected.start() return injected - attachToTornado = attach_to_tornado - def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): """Wraps the parent fetch with some logic for failed connections""" # TODO(vish): the logic for failed connections and logging should be @@ -265,28 +266,32 @@ def call(topic, msg): msg.update({'_msg_id': msg_id}) LOG.debug("MSG_ID is %s" % (msg_id)) - conn = Connection.instance() - d = defer.Deferred() - consumer = DirectConsumer(connection=conn, msg_id=msg_id) - - def deferred_receive(data, message): - """Acks message and callbacks or errbacks""" - message.ack() - if data['failure']: - return d.errback(RemoteError(*data['failure'])) - else: - return d.callback(data['result']) + class WaitMessage(object): - consumer.register_callback(deferred_receive) - injected = consumer.attach_to_tornado() + def __call__(self, data, message): + """Acks message and sets result.""" + message.ack() + if data['failure']: + self.result = RemoteError(*data['failure']) + else: + self.result = data['result'] - # clean up after the injected listened and return x - d.addCallback(lambda x: injected.stop() and x or x) + wait_msg = WaitMessage() + conn = Connection.instance(True) + consumer = DirectConsumer(connection=conn, msg_id=msg_id) + consumer.register_callback(wait_msg) + conn = Connection.instance() publisher = TopicPublisher(connection=conn, topic=topic) publisher.send(msg) publisher.close() - return d + + try: + consumer.wait(limit=1) + except StopIteration: + pass + consumer.close() + return wait_msg.result def cast(topic, msg): |