summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorMark McLoughlin <markmc@redhat.com>2013-01-02 14:44:45 +0000
committerMark McLoughlin <markmc@redhat.com>2013-01-03 07:33:35 +0000
commit70942a357f992ec3efd820bc0736baa276c31660 (patch)
tree2332aec02176ba2cdb40bb9e80b8852d2224b900 /openstack
parentffeb0855085617095f19296770a1223cb5641d1c (diff)
downloadoslo-70942a357f992ec3efd820bc0736baa276c31660.tar.gz
oslo-70942a357f992ec3efd820bc0736baa276c31660.tar.xz
oslo-70942a357f992ec3efd820bc0736baa276c31660.zip
Ensure proxy callback threads exit during tests
On the consumer side, each message is consumed in a separate thread but we currently have no way of waiting for all these threads to complete. In BaseRpcTestCase.test_call_timeout(), we complete the thread while there is still a message handler running on the other side. This can mean the consumer attempting to send its reply while other tests are running, which clearly is a bad thing. Make the connection close() and reset() methods in the kombu and qpid drivers wait on proxy object threads before returning to avoid this situation. Change-Id: I8fb84b1dc7f31196ef27a5d1f8c4a3346b03e0ae
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/amqp.py4
-rw-r--r--openstack/common/rpc/impl_kombu.py10
-rw-r--r--openstack/common/rpc/impl_qpid.py10
3 files changed, 24 insertions, 0 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 00b2f24..2f90a9b 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -293,6 +293,10 @@ class ProxyCallback(object):
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
+ def wait(self):
+ """Wait for all callback threads to exit."""
+ self.pool.waitall()
+
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index ced4e96..4c93504 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -387,6 +387,7 @@ class Connection(object):
def __init__(self, conf, server_params=None):
self.consumers = []
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
# Try forever?
@@ -573,12 +574,14 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
@@ -644,6 +647,11 @@ class Connection(object):
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class"""
@@ -719,6 +727,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
@@ -730,6 +739,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 7f21461..623812a 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -277,6 +277,7 @@ class Connection(object):
self.session = None
self.consumers = {}
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
if server_params and 'hostname' in server_params:
@@ -374,12 +375,14 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.close()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.session.close()
self.session = self.connection.session()
self.consumers = {}
@@ -434,6 +437,11 @@ class Connection(object):
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
@@ -509,6 +517,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@@ -524,6 +533,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)