summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-01-05 00:47:26 +0000
committerGerrit Code Review <review@openstack.org>2013-01-05 00:47:26 +0000
commitaefdd01621c7e43ba93c91f9db852caf2a08935b (patch)
tree09ffefe32938189e99190743dfdd4dc95bc6a670 /openstack/common
parenta08daf15d42890a16af0a60ceadd0aaced04ef7f (diff)
parent70942a357f992ec3efd820bc0736baa276c31660 (diff)
downloadoslo-aefdd01621c7e43ba93c91f9db852caf2a08935b.tar.gz
oslo-aefdd01621c7e43ba93c91f9db852caf2a08935b.tar.xz
oslo-aefdd01621c7e43ba93c91f9db852caf2a08935b.zip
Merge "Ensure proxy callback threads exit during tests"
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/amqp.py4
-rw-r--r--openstack/common/rpc/impl_kombu.py10
-rw-r--r--openstack/common/rpc/impl_qpid.py10
3 files changed, 24 insertions, 0 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index ce4c028..c32815d 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -293,6 +293,10 @@ class ProxyCallback(object):
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
+ def wait(self):
+ """Wait for all callback threads to exit."""
+ self.pool.waitall()
+
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 96e81d7..fa3e25b 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -388,6 +388,7 @@ class Connection(object):
def __init__(self, conf, server_params=None):
self.consumers = []
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
# Try forever?
@@ -574,12 +575,14 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
@@ -645,6 +648,11 @@ class Connection(object):
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class"""
@@ -720,6 +728,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
@@ -731,6 +740,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index a6dd983..f3e558e 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -278,6 +278,7 @@ class Connection(object):
self.session = None
self.consumers = {}
self.consumer_thread = None
+ self.proxy_callbacks = []
self.conf = conf
if server_params and 'hostname' in server_params:
@@ -375,12 +376,14 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.connection.close()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
+ self.wait_on_proxy_callbacks()
self.session.close()
self.session = self.connection.session()
self.consumers = {}
@@ -435,6 +438,11 @@ class Connection(object):
pass
self.consumer_thread = None
+ def wait_on_proxy_callbacks(self):
+ """Wait for all proxy callback threads to exit."""
+ for proxy_cb in self.proxy_callbacks:
+ proxy_cb.wait()
+
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
@@ -510,6 +518,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@@ -525,6 +534,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
+ self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)