summaryrefslogtreecommitdiffstats
path: root/nova/rpc.py
diff options
context:
space:
mode:
authorMichael Gundlach <michael.gundlach@rackspace.com>2010-09-23 21:18:12 +0000
committerTarmac <>2010-09-23 21:18:12 +0000
commitcb311a3deb42094261b91467b7717f4eb3e9eaba (patch)
tree64bc1ce0f07dac7bbd1ec31c24c0b82de933d180 /nova/rpc.py
parent2b2313d5c2fad31d36ac75dbad48773e7538cf1e (diff)
parent203cd3f34cf40d5597f9791083096d70c2c3edc7 (diff)
downloadnova-cb311a3deb42094261b91467b7717f4eb3e9eaba.tar.gz
nova-cb311a3deb42094261b91467b7717f4eb3e9eaba.tar.xz
nova-cb311a3deb42094261b91467b7717f4eb3e9eaba.zip
Put EC2 API -> eventlet back into trunk, fixing the bits that I missed when I put it into trunk on 9/21.
Note that some of this got into trunk via r291 accidentally because r291 was a branch based off of the trunk that was reverted on 9/22.
Diffstat (limited to 'nova/rpc.py')
-rw-r--r--nova/rpc.py47
1 files changed, 26 insertions, 21 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index 84a9b5590..6363335ea 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -46,9 +46,9 @@ LOG.setLevel(logging.DEBUG)
class Connection(carrot_connection.BrokerConnection):
"""Connection instance object"""
@classmethod
- def instance(cls):
+ def instance(cls, new=False):
"""Returns the instance"""
- if not hasattr(cls, '_instance'):
+ if new or not hasattr(cls, '_instance'):
params = dict(hostname=FLAGS.rabbit_host,
port=FLAGS.rabbit_port,
userid=FLAGS.rabbit_userid,
@@ -60,7 +60,10 @@ class Connection(carrot_connection.BrokerConnection):
# NOTE(vish): magic is fun!
# pylint: disable-msg=W0142
- cls._instance = cls(**params)
+ if new:
+ return cls(**params)
+ else:
+ cls._instance = cls(**params)
return cls._instance
@classmethod
@@ -94,8 +97,6 @@ class Consumer(messaging.Consumer):
injected.start()
return injected
- attachToTornado = attach_to_tornado
-
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connections"""
# TODO(vish): the logic for failed connections and logging should be
@@ -265,28 +266,32 @@ def call(topic, msg):
msg.update({'_msg_id': msg_id})
LOG.debug("MSG_ID is %s" % (msg_id))
- conn = Connection.instance()
- d = defer.Deferred()
- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
-
- def deferred_receive(data, message):
- """Acks message and callbacks or errbacks"""
- message.ack()
- if data['failure']:
- return d.errback(RemoteError(*data['failure']))
- else:
- return d.callback(data['result'])
+ class WaitMessage(object):
- consumer.register_callback(deferred_receive)
- injected = consumer.attach_to_tornado()
+ def __call__(self, data, message):
+ """Acks message and sets result."""
+ message.ack()
+ if data['failure']:
+ self.result = RemoteError(*data['failure'])
+ else:
+ self.result = data['result']
- # clean up after the injected listened and return x
- d.addCallback(lambda x: injected.stop() and x or x)
+ wait_msg = WaitMessage()
+ conn = Connection.instance(True)
+ consumer = DirectConsumer(connection=conn, msg_id=msg_id)
+ consumer.register_callback(wait_msg)
+ conn = Connection.instance()
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
- return d
+
+ try:
+ consumer.wait(limit=1)
+ except StopIteration:
+ pass
+ consumer.close()
+ return wait_msg.result
def cast(topic, msg):