summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortermie <github@anarkystic.com>2011-05-25 15:42:24 -0700
committertermie <github@anarkystic.com>2011-05-25 15:42:24 -0700
commitfdd27860724cd57db6df059a97e98289f88ce6ac (patch)
tree16df4112803d456994bf98bec56ffa25816edd35
parentdb18a792414240cbdb1221d0e79e8a63313f103e (diff)
add support to rpc for multicall
-rw-r--r--nova/rpc.py99
-rw-r--r--nova/tests/test_rpc.py17
2 files changed, 90 insertions, 26 deletions
diff --git a/nova/rpc.py b/nova/rpc.py
index 2116f22c3..04198a4a6 100644
--- a/nova/rpc.py
+++ b/nova/rpc.py
@@ -32,8 +32,11 @@ import uuid
from carrot import connection as carrot_connection
from carrot import messaging
+import eventlet
from eventlet import greenpool
from eventlet import greenthread
+from eventlet import queue
+
from nova import context
from nova import exception
@@ -131,7 +134,8 @@ class Consumer(messaging.Consumer):
self.connection = Connection.recreate()
self.backend = self.connection.create_backend()
self.declare()
- super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
+ return super(Consumer, self).fetch(
+ no_ack, auto_ack, enable_callbacks)
if self.failed_connection:
LOG.error(_('Reconnected to queue'))
self.failed_connection = False
@@ -347,8 +351,9 @@ def _unpack_context(msg):
if key.startswith('_context_'):
value = msg.pop(key)
context_dict[key[9:]] = value
+ context_dict['msg_id'] = msg.pop('_msg_id', None)
LOG.debug(_('unpacked context: %s'), context_dict)
- return context.RequestContext.from_dict(context_dict)
+ return RpcContext.from_dict(context_dict)
def _pack_context(msg, context):
@@ -365,26 +370,27 @@ def _pack_context(msg, context):
msg.update(context)
-def call(context, topic, msg):
- """Sends a message on a topic and wait for a response."""
+class RpcContext(context.RequestContext):
+ def __init__(self, *args, **kwargs):
+ msg_id = kwargs.pop('msg_id', None)
+ self.msg_id = msg_id
+ super(RpcContext, self).__init__(*args, **kwargs)
+
+ def reply(self, *args, **kwargs):
+ msg_reply(self.msg_id, *args, **kwargs)
+
+
+def multicall(context, topic, msg):
+ """Make a call that returns multiple times."""
LOG.debug(_('Making asynchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
_pack_context(msg, context)
- class WaitMessage(object):
- def __call__(self, data, message):
- """Acks message and sets result."""
- message.ack()
- if data['failure']:
- self.result = RemoteError(*data['failure'])
- else:
- self.result = data['result']
-
- wait_msg = WaitMessage()
conn = Connection.instance()
consumer = DirectConsumer(connection=conn, msg_id=msg_id)
+ wait_msg = MulticallWaiter(consumer)
consumer.register_callback(wait_msg)
conn = Connection.instance()
@@ -392,18 +398,59 @@ def call(context, topic, msg):
publisher.send(msg)
publisher.close()
- try:
- consumer.wait(limit=1)
- except StopIteration:
- pass
- consumer.close()
- # NOTE(termie): this is a little bit of a change from the original
- # non-eventlet code where returning a Failure
- # instance from a deferred call is very similar to
- # raising an exception
- if isinstance(wait_msg.result, Exception):
- raise wait_msg.result
- return wait_msg.result
+ return wait_msg
+
+
+class MulticallWaiter(object):
+ def __init__(self, consumer):
+ self._consumer = consumer
+ self._results = queue.Queue()
+ self._closed = False
+
+ def close(self):
+ self._closed = True
+ self._consumer.close()
+
+ def __call__(self, data, message):
+ """Acks message and sets result."""
+ message.ack()
+ if data['failure']:
+ self._results.put(RemoteError(*data['failure']))
+ else:
+ self._results.put(data['result'])
+
+ def __iter__(self):
+ return self.wait()
+
+ def wait(self):
+ # TODO(termie): This is probably really a much simpler issue but am
+ # trying to solve the problem quickly. This works but
+ # I'd prefer to dig in and do it the best way later on.
+
+ def _waiter():
+ while not self._closed:
+ try:
+ self._consumer.wait(limit=1)
+ except StopIteration:
+ pass
+ eventlet.spawn(_waiter)
+
+ while True:
+ result = self._results.get()
+ if isinstance(result, Exception):
+ raise result
+ if result == None:
+ self.close()
+ raise StopIteration
+ yield result
+
+
+def call(context, topic, msg):
+ """Sends a message on a topic and wait for a response."""
+ rv = multicall(context, topic, msg)
+ for x in rv:
+ rv.close()
+ return x
def cast(context, topic, msg):
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index 44d7c91eb..92ddfcffc 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -49,6 +49,17 @@ class RpcTestCase(test.TestCase):
"args": {"value": value}})
self.assertEqual(value, result)
+ def test_multicall_succeed_three_times(self):
+ """Get a value through rpc call"""
+ value = 42
+ result = rpc.multicall(self.context,
+ 'test',
+ {"method": "echo_three_times",
+ "args": {"value": value}})
+
+ for x in result:
+ self.assertEqual(value, x)
+
def test_context_passed(self):
"""Makes sure a context is passed through rpc call"""
value = 42
@@ -127,6 +138,12 @@ class TestReceiver(object):
return context.to_dict()
@staticmethod
+ def echo_three_times(context, value):
+ context.reply(value)
+ context.reply(value)
+ context.reply(value)
+
+ @staticmethod
def fail(context, value):
"""Raises an exception with the value sent in"""
raise Exception(value)