summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2011-11-29 20:16:10 +0000
committerGerrit Code Review <review@openstack.org>2011-11-29 20:16:10 +0000
commit01f55fdd674f453d16d6e20629ab35c3161d5809 (patch)
tree6c4395803dc28ee5ca52cebdcd660e2d769353df
parent8efd022956dfe1e925710d8ec7d011607257e116 (diff)
parent84693b4a16413830be61f465f602de9d13b45161 (diff)
Merge "Fix RPC responses to allow None response correctly."
-rw-r--r--nova/rpc/impl_carrot.py86
-rw-r--r--nova/rpc/impl_fake.py146
-rw-r--r--nova/rpc/impl_kombu.py23
-rw-r--r--nova/test.py1
-rw-r--r--nova/tests/api/ec2/test_cloud.py1
-rw-r--r--nova/tests/fake_flags.py1
-rw-r--r--nova/tests/rpc/__init__.py19
-rw-r--r--nova/tests/rpc/common.py (renamed from nova/tests/test_rpc_common.py)18
-rw-r--r--nova/tests/rpc/test_carrot.py (renamed from nova/tests/test_rpc_carrot.py)4
-rw-r--r--nova/tests/rpc/test_fake.py36
-rw-r--r--nova/tests/rpc/test_kombu.py (renamed from nova/tests/test_rpc_kombu.py)4
-rw-r--r--nova/tests/rpc/test_rpc.py (renamed from nova/tests/test_rpc.py)8
-rw-r--r--nova/tests/test_adminapi.py11
-rw-r--r--nova/tests/xenapi/stubs.py10
-rw-r--r--run_tests.py2
15 files changed, 295 insertions, 75 deletions
diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py
index 2a518d7d7..57fd074f0 100644
--- a/nova/rpc/impl_carrot.py
+++ b/nova/rpc/impl_carrot.py
@@ -266,14 +266,13 @@ class AdapterConsumer(Consumer):
# we just log the message and send an error string
# back to the caller
LOG.warn(_('no method for message: %s') % message_data)
- if msg_id:
- msg_reply(msg_id,
- _('No method for message: %s') % message_data)
+ ctxt.reply(msg_id,
+ _('No method for message: %s') % message_data)
return
- self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
+ self.pool.spawn_n(self._process_data, ctxt, method, args)
@exception.wrap_exception()
- def _process_data(self, msg_id, ctxt, method, args):
+ def _process_data(self, ctxt, method, args):
"""Thread that magically looks for a method on the proxy
object and calls it.
"""
@@ -283,23 +282,18 @@ class AdapterConsumer(Consumer):
# NOTE(vish): magic is fun!
try:
rval = node_func(context=ctxt, **node_args)
- if msg_id:
- # Check if the result was a generator
- if isinstance(rval, types.GeneratorType):
- for x in rval:
- msg_reply(msg_id, x, None)
- else:
- msg_reply(msg_id, rval, None)
-
- # This final None tells multicall that it is done.
- msg_reply(msg_id, None, None)
- elif isinstance(rval, types.GeneratorType):
- # NOTE(vish): this iterates through the generator
- list(rval)
+ # Check if the result was a generator
+ if isinstance(rval, types.GeneratorType):
+ for x in rval:
+ ctxt.reply(x, None)
+ else:
+ ctxt.reply(rval, None)
+
+ # This final None tells multicall that it is done.
+ ctxt.reply(ending=True)
except Exception as e:
LOG.exception('Exception during message handling')
- if msg_id:
- msg_reply(msg_id, None, sys.exc_info())
+ ctxt.reply(None, sys.exc_info())
return
@@ -447,7 +441,7 @@ class DirectPublisher(Publisher):
super(DirectPublisher, self).__init__(connection=connection)
-def msg_reply(msg_id, reply=None, failure=None):
+def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
@@ -463,12 +457,17 @@ def msg_reply(msg_id, reply=None, failure=None):
with ConnectionPool.item() as conn:
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
- publisher.send({'result': reply, 'failure': failure})
+ msg = {'result': reply, 'failure': failure}
+ if ending:
+ msg['ending'] = True
+ publisher.send(msg)
except TypeError:
- publisher.send(
- {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure})
+ msg = {'result': dict((k, repr(v))
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure}
+ if ending:
+ msg['ending'] = True
+ publisher.send(msg)
publisher.close()
@@ -508,8 +507,11 @@ class RpcContext(context.RequestContext):
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)
- def reply(self, *args, **kwargs):
- msg_reply(self.msg_id, *args, **kwargs)
+ def reply(self, reply=None, failure=None, ending=False):
+ if self.msg_id:
+ msg_reply(self.msg_id, reply, failure, ending)
+ if ending:
+ self.msg_id = None
def multicall(context, topic, msg):
@@ -537,8 +539,11 @@ class MulticallWaiter(object):
self._consumer = consumer
self._results = queue.Queue()
self._closed = False
+ self._got_ending = False
def close(self):
+ if self._closed:
+ return
self._closed = True
self._consumer.close()
ConnectionPool.put(self._consumer.connection)
@@ -548,6 +553,8 @@ class MulticallWaiter(object):
message.ack()
if data['failure']:
self._results.put(RemoteError(*data['failure']))
+ elif data.get('ending', False):
+ self._got_ending = True
else:
self._results.put(data['result'])
@@ -555,23 +562,22 @@ class MulticallWaiter(object):
return self.wait()
def wait(self):
- while True:
- rv = None
- while rv is None and not self._closed:
- try:
- rv = self._consumer.fetch(enable_callbacks=True)
- except Exception:
- self.close()
- raise
+ while not self._closed:
+ try:
+ rv = self._consumer.fetch(enable_callbacks=True)
+ except Exception:
+ self.close()
+ raise
+ if rv is None:
time.sleep(0.01)
-
+ continue
+ if self._got_ending:
+ self.close()
+ raise StopIteration
result = self._results.get()
if isinstance(result, Exception):
self.close()
raise result
- if result == None:
- self.close()
- raise StopIteration
yield result
diff --git a/nova/rpc/impl_fake.py b/nova/rpc/impl_fake.py
new file mode 100644
index 000000000..9a94be07e
--- /dev/null
+++ b/nova/rpc/impl_fake.py
@@ -0,0 +1,146 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Fake RPC implementation which calls proxy methods directly with no
+queues. Casts will block, but this is very useful for tests.
+"""
+
+import sys
+import traceback
+import types
+
+from nova import context
+from nova.rpc import common as rpc_common
+
+CONSUMERS = {}
+
+
+class RpcContext(context.RequestContext):
+ def __init__(self, *args, **kwargs):
+ super(RpcContext, self).__init__(*args, **kwargs)
+ self._response = []
+ self._done = False
+
+ def reply(self, reply=None, failure=None, ending=False):
+ if ending:
+ self._done = True
+ if not self._done:
+ self._response.append((reply, failure))
+
+
+class Consumer(object):
+ def __init__(self, topic, proxy):
+ self.topic = topic
+ self.proxy = proxy
+
+ def call(self, context, method, args):
+ node_func = getattr(self.proxy, method)
+ node_args = dict((str(k), v) for k, v in args.iteritems())
+
+ ctxt = RpcContext.from_dict(context.to_dict())
+ try:
+ rval = node_func(context=ctxt, **node_args)
+ # Caller might have called ctxt.reply() manually
+ for (reply, failure) in ctxt._response:
+ if failure:
+ raise failure[0], failure[1], failure[2]
+ yield reply
+ # if ending not 'sent'...we might have more data to
+ # return from the function itself
+ if not ctxt._done:
+ if isinstance(rval, types.GeneratorType):
+ for val in rval:
+ yield val
+ else:
+ yield rval
+ except Exception:
+ exc_info = sys.exc_info()
+ raise rpc_common.RemoteError(exc_info[0].__name__,
+ str(exc_info[1]),
+ traceback.format_exception(*exc_info))
+
+
+class Connection(object):
+ """Connection object."""
+
+ def __init__(self):
+ self.consumers = []
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ consumer = Consumer(topic, proxy)
+ self.consumers.append(consumer)
+ if topic not in CONSUMERS:
+ CONSUMERS[topic] = []
+ CONSUMERS[topic].append(consumer)
+
+ def close(self):
+ for consumer in self.consumers:
+ CONSUMERS[consumer.topic].remove(consumer)
+ self.consumers = []
+
+ def consume_in_thread(self):
+ pass
+
+
+def create_connection(new=True):
+ """Create a connection"""
+ return Connection()
+
+
+def multicall(context, topic, msg):
+ """Make a call that returns multiple times."""
+
+ method = msg.get('method')
+ if not method:
+ return
+ args = msg.get('args', {})
+
+ try:
+ consumer = CONSUMERS[topic][0]
+ except (KeyError, IndexError):
+ return iter([None])
+ else:
+ return consumer.call(context, method, args)
+
+
+def call(context, topic, msg):
+ """Sends a message on a topic and wait for a response."""
+ rv = multicall(context, topic, msg)
+ # NOTE(vish): return the last result from the multicall
+ rv = list(rv)
+ if not rv:
+ return
+ return rv[-1]
+
+
+def cast(context, topic, msg):
+ try:
+ call(context, topic, msg)
+ except rpc_common.RemoteError:
+ pass
+
+
+def fanout_cast(context, topic, msg):
+ """Cast to all consumers of a topic"""
+ method = msg.get('method')
+ if not method:
+ return
+ args = msg.get('args', {})
+
+ for consumer in CONSUMERS.get(topic, []):
+ try:
+ consumer.call(context, method, args)
+ except rpc_common.RemoteError:
+ pass
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index 1b80fce04..757e7636a 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -625,7 +625,7 @@ class ProxyCallback(object):
else:
ctxt.reply(rval, None)
# This final None tells multicall that it is done.
- ctxt.reply(None, None)
+ ctxt.reply(ending=True)
except Exception as e:
LOG.exception('Exception during message handling')
ctxt.reply(None, sys.exc_info())
@@ -668,9 +668,11 @@ class RpcContext(context.RequestContext):
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)
- def reply(self, *args, **kwargs):
+ def reply(self, reply=None, failure=None, ending=False):
if self.msg_id:
- msg_reply(self.msg_id, *args, **kwargs)
+ msg_reply(self.msg_id, reply, failure, ending)
+ if ending:
+ self.msg_id = None
class MulticallWaiter(object):
@@ -679,8 +681,11 @@ class MulticallWaiter(object):
self._iterator = connection.iterconsume()
self._result = None
self._done = False
+ self._got_ending = False
def done(self):
+ if self._done:
+ return
self._done = True
self._iterator.close()
self._iterator = None
@@ -690,6 +695,8 @@ class MulticallWaiter(object):
"""The consume() callback will call this. Store the result."""
if data['failure']:
self._result = RemoteError(*data['failure'])
+ elif data.get('ending', False):
+ self._got_ending = True
else:
self._result = data['result']
@@ -699,13 +706,13 @@ class MulticallWaiter(object):
raise StopIteration
while True:
self._iterator.next()
+ if self._got_ending:
+ self.done()
+ raise StopIteration
result = self._result
if isinstance(result, Exception):
self.done()
raise result
- if result == None:
- self.done()
- raise StopIteration
yield result
@@ -759,7 +766,7 @@ def fanout_cast(context, topic, msg):
conn.fanout_send(topic, msg)
-def msg_reply(msg_id, reply=None, failure=None):
+def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
@@ -779,4 +786,6 @@ def msg_reply(msg_id, reply=None, failure=None):
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
+ if ending:
+ msg['ending'] = True
conn.direct_send(msg_id, msg)
diff --git a/nova/test.py b/nova/test.py
index abd1294d4..6c565f53d 100644
--- a/nova/test.py
+++ b/nova/test.py
@@ -34,7 +34,6 @@ import nose.plugins.skip
import nova.image.fake
import shutil
import stubout
-from eventlet import greenthread
from nova import fakerabbit
from nova import flags
diff --git a/nova/tests/api/ec2/test_cloud.py b/nova/tests/api/ec2/test_cloud.py
index e7cb4289b..47100b7be 100644
--- a/nova/tests/api/ec2/test_cloud.py
+++ b/nova/tests/api/ec2/test_cloud.py
@@ -1537,7 +1537,6 @@ class CloudTestCase(test.TestCase):
self.assertFalse(vol['deleted'])
db.volume_destroy(self.context, vol1['id'])
- greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=True)
vol = db.volume_get(admin_ctxt, vol2['id'])
self.assertTrue(vol['deleted'])
diff --git a/nova/tests/fake_flags.py b/nova/tests/fake_flags.py
index 13fb6c6ca..fc7bb059a 100644
--- a/nova/tests/fake_flags.py
+++ b/nova/tests/fake_flags.py
@@ -24,6 +24,7 @@ flags.DECLARE('volume_driver', 'nova.volume.manager')
FLAGS['volume_driver'].SetDefault('nova.volume.driver.FakeISCSIDriver')
FLAGS['connection_type'].SetDefault('fake')
FLAGS['fake_rabbit'].SetDefault(True)
+FLAGS['rpc_backend'].SetDefault('nova.rpc.impl_fake')
flags.DECLARE('auth_driver', 'nova.auth.manager')
FLAGS['auth_driver'].SetDefault('nova.auth.dbdriver.DbDriver')
flags.DECLARE('network_size', 'nova.network.manager')
diff --git a/nova/tests/rpc/__init__.py b/nova/tests/rpc/__init__.py
new file mode 100644
index 000000000..6dab802f2
--- /dev/null
+++ b/nova/tests/rpc/__init__.py
@@ -0,0 +1,19 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Openstack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work
+from nova.tests import *
diff --git a/nova/tests/test_rpc_common.py b/nova/tests/rpc/common.py
index 4ab4e8a0e..dc8aafcfe 100644
--- a/nova/tests/test_rpc_common.py
+++ b/nova/tests/rpc/common.py
@@ -81,6 +81,17 @@ class _BaseRpcTestCase(test.TestCase):
for i, x in enumerate(result):
self.assertEqual(value + i, x)
+ def test_multicall_three_nones(self):
+ value = 42
+ result = self.rpc.multicall(self.context,
+ 'test',
+ {"method": "multicall_three_nones",
+ "args": {"value": value}})
+ for i, x in enumerate(result):
+ self.assertEqual(x, None)
+ # i should have been 0, 1, and finally 2:
+ self.assertEqual(i, 2)
+
def test_multicall_succeed_three_times_yield(self):
value = 42
result = self.rpc.multicall(self.context,
@@ -176,6 +187,13 @@ class TestReceiver(object):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
+ context.reply(ending=True)
+
+ @staticmethod
+ def multicall_three_nones(context, value):
+ yield None
+ yield None
+ yield None
@staticmethod
def echo_three_times_yield(context, value):
diff --git a/nova/tests/test_rpc_carrot.py b/nova/tests/rpc/test_carrot.py
index 57cdebf4f..fa9f73961 100644
--- a/nova/tests/test_rpc_carrot.py
+++ b/nova/tests/rpc/test_carrot.py
@@ -22,13 +22,13 @@ Unit Tests for remote procedure calls using carrot
from nova import context
from nova import log as logging
from nova.rpc import impl_carrot
-from nova.tests import test_rpc_common
+from nova.tests.rpc import common
LOG = logging.getLogger('nova.tests.rpc')
-class RpcCarrotTestCase(test_rpc_common._BaseRpcTestCase):
+class RpcCarrotTestCase(common._BaseRpcTestCase):
def setUp(self):
self.rpc = impl_carrot
super(RpcCarrotTestCase, self).setUp()
diff --git a/nova/tests/rpc/test_fake.py b/nova/tests/rpc/test_fake.py
new file mode 100644
index 000000000..344c44628
--- /dev/null
+++ b/nova/tests/rpc/test_fake.py
@@ -0,0 +1,36 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Unit Tests for remote procedure calls using fake_impl
+"""
+
+from nova import log as logging
+from nova.rpc import impl_fake
+from nova.tests.rpc import common
+
+
+LOG = logging.getLogger('nova.tests.rpc')
+
+
+class RpcFakeTestCase(common._BaseRpcTestCase):
+ def setUp(self):
+ self.rpc = impl_fake
+ super(RpcFakeTestCase, self).setUp()
+
+ def tearDown(self):
+ super(RpcFakeTestCase, self).tearDown()
diff --git a/nova/tests/test_rpc_kombu.py b/nova/tests/rpc/test_kombu.py
index 101ed14af..01b00f33d 100644
--- a/nova/tests/test_rpc_kombu.py
+++ b/nova/tests/rpc/test_kombu.py
@@ -23,13 +23,13 @@ from nova import context
from nova import log as logging
from nova import test
from nova.rpc import impl_kombu
-from nova.tests import test_rpc_common
+from nova.tests.rpc import common
LOG = logging.getLogger('nova.tests.rpc')
-class RpcKombuTestCase(test_rpc_common._BaseRpcTestCase):
+class RpcKombuTestCase(common._BaseRpcTestCase):
def setUp(self):
self.rpc = impl_kombu
super(RpcKombuTestCase, self).setUp()
diff --git a/nova/tests/test_rpc.py b/nova/tests/rpc/test_rpc.py
index 6b4454747..4524391f8 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/rpc/test_rpc.py
@@ -16,20 +16,20 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
-Unit Tests for remote procedure calls using queue
+Unit Tests for remote procedure call interfaces
"""
-from nova import context
from nova import log as logging
from nova import rpc
-from nova.tests import test_rpc_common
+from nova.tests.rpc import common
LOG = logging.getLogger('nova.tests.rpc')
-class RpcTestCase(test_rpc_common._BaseRpcTestCase):
+class RpcTestCase(common._BaseRpcTestCase):
def setUp(self):
+ self.flags(rpc_backend='nova.tests.rpc.fake')
self.rpc = rpc
super(RpcTestCase, self).setUp()
diff --git a/nova/tests/test_adminapi.py b/nova/tests/test_adminapi.py
index aaa633adc..08c8f707a 100644
--- a/nova/tests/test_adminapi.py
+++ b/nova/tests/test_adminapi.py
@@ -61,14 +61,9 @@ class AdminApiTestCase(test.TestCase):
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
self.stubs.Set(fake._FakeImageService, 'show_by_name', fake_show)
- # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
- rpc_cast = rpc.cast
-
- def finish_cast(*args, **kwargs):
- rpc_cast(*args, **kwargs)
- greenthread.sleep(0.2)
-
- self.stubs.Set(rpc, 'cast', finish_cast)
+ # NOTE(comstud): Make 'cast' behave like a 'call' which will
+ # ensure that operations complete
+ self.stubs.Set(rpc, 'cast', rpc.call)
def test_block_external_ips(self):
"""Make sure provider firewall rules are created."""
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index 7c5634c5a..96c4d9cbe 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -132,16 +132,6 @@ def stubout_loopingcall_start(stubs):
stubs.Set(utils.LoopingCall, 'start', fake_start)
-def stubout_loopingcall_delay(stubs):
- def fake_start(self, interval, now=True):
- self._running = True
- eventlet.sleep(1)
- self.f(*self.args, **self.kw)
- # This would fail before parallel xenapi calls were fixed
- assert self._running == False
- stubs.Set(utils.LoopingCall, 'start', fake_start)
-
-
def _make_fake_vdi():
sr_ref = fake.get_all('SR')[0]
vdi_ref = fake.create_vdi('', False, sr_ref, False)
diff --git a/run_tests.py b/run_tests.py
index fd836967e..17547b8e0 100644
--- a/run_tests.py
+++ b/run_tests.py
@@ -64,6 +64,7 @@ import time
gettext.install('nova', unicode=1)
+import eventlet
from nose import config
from nose import core
from nose import result
@@ -336,6 +337,7 @@ class NovaTestRunner(core.TextTestRunner):
if __name__ == '__main__':
+ eventlet.monkey_patch()
logging.setup()
# If any argument looks like a test name but doesn't have "nova.tests" in
# front of it, automatically add that so we don't have to type as much