diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-01-05 00:47:26 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-01-05 00:47:26 +0000 |
commit | aefdd01621c7e43ba93c91f9db852caf2a08935b (patch) | |
tree | 09ffefe32938189e99190743dfdd4dc95bc6a670 /openstack/common/rpc | |
parent | a08daf15d42890a16af0a60ceadd0aaced04ef7f (diff) | |
parent | 70942a357f992ec3efd820bc0736baa276c31660 (diff) | |
download | oslo-aefdd01621c7e43ba93c91f9db852caf2a08935b.tar.gz oslo-aefdd01621c7e43ba93c91f9db852caf2a08935b.tar.xz oslo-aefdd01621c7e43ba93c91f9db852caf2a08935b.zip |
Merge "Ensure proxy callback threads exit during tests"
Diffstat (limited to 'openstack/common/rpc')
-rw-r--r-- | openstack/common/rpc/amqp.py | 4 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 10 | ||||
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 10 |
3 files changed, 24 insertions, 0 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index ce4c028..c32815d 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -293,6 +293,10 @@ class ProxyCallback(object): ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) + def wait(self): + """Wait for all callback threads to exit.""" + self.pool.waitall() + class MulticallWaiter(object): def __init__(self, conf, connection, timeout): diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 96e81d7..fa3e25b 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -388,6 +388,7 @@ class Connection(object): def __init__(self, conf, server_params=None): self.consumers = [] self.consumer_thread = None + self.proxy_callbacks = [] self.conf = conf self.max_retries = self.conf.rabbit_max_retries # Try forever? @@ -574,12 +575,14 @@ class Connection(object): def close(self): """Close/release this connection""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.connection.release() self.connection = None def reset(self): """Reset a connection so it can be used again""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.channel.close() self.channel = self.connection.channel() # work around 'memory' transport bug in 1.1.3 @@ -645,6 +648,11 @@ class Connection(object): pass self.consumer_thread = None + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + def publisher_send(self, cls, topic, msg, **kwargs): """Send to a publisher based on the publisher class""" @@ -720,6 +728,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) if fanout: self.declare_fanout_consumer(topic, proxy_cb) @@ -731,6 +740,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) self.declare_topic_consumer(topic, proxy_cb, pool_name) diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index a6dd983..f3e558e 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -278,6 +278,7 @@ class Connection(object): self.session = None self.consumers = {} self.consumer_thread = None + self.proxy_callbacks = [] self.conf = conf if server_params and 'hostname' in server_params: @@ -375,12 +376,14 @@ class Connection(object): def close(self): """Close/release this connection""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.connection.close() self.connection = None def reset(self): """Reset a connection so it can be used again""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.session.close() self.session = self.connection.session() self.consumers = {} @@ -435,6 +438,11 @@ class Connection(object): pass self.consumer_thread = None + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + def publisher_send(self, cls, topic, msg): """Send to a publisher based on the publisher class""" @@ -510,6 +518,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) if fanout: consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) @@ -525,6 +534,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, name=pool_name) |