diff options
| author | Mark McLoughlin <markmc@redhat.com> | 2013-01-02 14:44:45 +0000 |
|---|---|---|
| committer | Mark McLoughlin <markmc@redhat.com> | 2013-01-03 07:33:35 +0000 |
| commit | 70942a357f992ec3efd820bc0736baa276c31660 (patch) | |
| tree | 2332aec02176ba2cdb40bb9e80b8852d2224b900 /openstack | |
| parent | ffeb0855085617095f19296770a1223cb5641d1c (diff) | |
| download | oslo-70942a357f992ec3efd820bc0736baa276c31660.tar.gz oslo-70942a357f992ec3efd820bc0736baa276c31660.tar.xz oslo-70942a357f992ec3efd820bc0736baa276c31660.zip | |
Ensure proxy callback threads exit during tests
On the consumer side, each message is consumed in a separate thread
but we currently have no way of waiting for all these threads to
complete.
In BaseRpcTestCase.test_call_timeout(), we complete the thread while
there is still a message handler running on the other side. This can
mean the consumer attempting to send its reply while other tests
are running, which clearly is a bad thing.
Make the connection close() and reset() methods in the kombu and qpid
drivers wait on proxy object threads before returning to avoid this
situation.
Change-Id: I8fb84b1dc7f31196ef27a5d1f8c4a3346b03e0ae
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/amqp.py | 4 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 10 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 10 |
3 files changed, 24 insertions, 0 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 00b2f24..2f90a9b 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -293,6 +293,10 @@ class ProxyCallback(object): ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) + def wait(self): + """Wait for all callback threads to exit.""" + self.pool.waitall() + class MulticallWaiter(object): def __init__(self, conf, connection, timeout): diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index ced4e96..4c93504 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -387,6 +387,7 @@ class Connection(object): def __init__(self, conf, server_params=None): self.consumers = [] self.consumer_thread = None + self.proxy_callbacks = [] self.conf = conf self.max_retries = self.conf.rabbit_max_retries # Try forever? @@ -573,12 +574,14 @@ class Connection(object): def close(self): """Close/release this connection""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.connection.release() self.connection = None def reset(self): """Reset a connection so it can be used again""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.channel.close() self.channel = self.connection.channel() # work around 'memory' transport bug in 1.1.3 @@ -644,6 +647,11 @@ class Connection(object): pass self.consumer_thread = None + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + def publisher_send(self, cls, topic, msg, **kwargs): """Send to a publisher based on the publisher class""" @@ -719,6 +727,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) if fanout: self.declare_fanout_consumer(topic, proxy_cb) @@ -730,6 +739,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) self.declare_topic_consumer(topic, proxy_cb, pool_name) diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 7f21461..623812a 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -277,6 +277,7 @@ class Connection(object): self.session = None self.consumers = {} self.consumer_thread = None + self.proxy_callbacks = [] self.conf = conf if server_params and 'hostname' in server_params: @@ -374,12 +375,14 @@ class Connection(object): def close(self): """Close/release this connection""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.connection.close() self.connection = None def reset(self): """Reset a connection so it can be used again""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.session.close() self.session = self.connection.session() self.consumers = {} @@ -434,6 +437,11 @@ class Connection(object): pass self.consumer_thread = None + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + def publisher_send(self, cls, topic, msg): """Send to a publisher based on the publisher class""" @@ -509,6 +517,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) if fanout: consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) @@ -524,6 +533,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, name=pool_name) |
