summaryrefslogtreecommitdiffstats
path: root/nova/rpc.py
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@yahoo.com>2010-09-28 09:22:36 -0700
committerVishvananda Ishaya <vishvananda@yahoo.com>2010-09-28 09:22:36 -0700
commit669cf475d11700064aa16f959077d0512e6b1531 (patch)
tree7de0a38210cf41b2c819af388004191be19aff53 /nova/rpc.py
parentbe1b1e320c17630430cfa567d8685f8cfc5773e4 (diff)
parentcb311a3deb42094261b91467b7717f4eb3e9eaba (diff)
downloadnova-669cf475d11700064aa16f959077d0512e6b1531.tar.gz
nova-669cf475d11700064aa16f959077d0512e6b1531.tar.xz
nova-669cf475d11700064aa16f959077d0512e6b1531.zip
merged trunk
Diffstat (limited to 'nova/rpc.py')
-rw-r--r--nova/rpc.py47
1 files changed, 26 insertions, 21 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index 84a9b5590..6363335ea 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -46,9 +46,9 @@ LOG.setLevel(logging.DEBUG)
class Connection(carrot_connection.BrokerConnection):
"""Connection instance object"""
@classmethod
- def instance(cls):
+ def instance(cls, new=False):
"""Returns the instance"""
- if not hasattr(cls, '_instance'):
+ if new or not hasattr(cls, '_instance'):
params = dict(hostname=FLAGS.rabbit_host,
port=FLAGS.rabbit_port,
userid=FLAGS.rabbit_userid,
@@ -60,7 +60,10 @@ class Connection(carrot_connection.BrokerConnection):
# NOTE(vish): magic is fun!
# pylint: disable-msg=W0142
- cls._instance = cls(**params)
+ if new:
+ return cls(**params)
+ else:
+ cls._instance = cls(**params)
return cls._instance
@classmethod
@@ -94,8 +97,6 @@ class Consumer(messaging.Consumer):
injected.start()
return injected
- attachToTornado = attach_to_tornado
-
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connections"""
# TODO(vish): the logic for failed connections and logging should be
@@ -265,28 +266,32 @@ def call(topic, msg):
msg.update({'_msg_id': msg_id})
LOG.debug("MSG_ID is %s" % (msg_id))
- conn = Connection.instance()
- d = defer.Deferred()
- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
-
- def deferred_receive(data, message):
- """Acks message and callbacks or errbacks"""
- message.ack()
- if data['failure']:
- return d.errback(RemoteError(*data['failure']))
- else:
- return d.callback(data['result'])
+ class WaitMessage(object):
- consumer.register_callback(deferred_receive)
- injected = consumer.attach_to_tornado()
+ def __call__(self, data, message):
+ """Acks message and sets result."""
+ message.ack()
+ if data['failure']:
+ self.result = RemoteError(*data['failure'])
+ else:
+ self.result = data['result']
- # clean up after the injected listened and return x
- d.addCallback(lambda x: injected.stop() and x or x)
+ wait_msg = WaitMessage()
+ conn = Connection.instance(True)
+ consumer = DirectConsumer(connection=conn, msg_id=msg_id)
+ consumer.register_callback(wait_msg)
+ conn = Connection.instance()
publisher = TopicPublisher(connection=conn, topic=topic)
publisher.send(msg)
publisher.close()
- return d
+
+ try:
+ consumer.wait(limit=1)
+ except StopIteration:
+ pass
+ consumer.close()
+ return wait_msg.result
def cast(topic, msg):