summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2012-02-07 22:31:27 -0500
committerRussell Bryant <rbryant@redhat.com>2012-02-08 12:21:01 -0500
commite0efd7551d0d4e1c23111419f0622a1994601fda (patch)
treef41e1e8e269b6cc12b986a4ab4d5d6768d6d7519
parent27ac9d5f0ca4279753ee2984769094f2b4191619 (diff)
downloadnova-e0efd7551d0d4e1c23111419f0622a1994601fda.tar.gz
nova-e0efd7551d0d4e1c23111419f0622a1994601fda.tar.xz
nova-e0efd7551d0d4e1c23111419f0622a1994601fda.zip
Move connection pool back into impl_kombu/qpid.
Fix bug 928996. This patch moves the creation of the connection pool from nova.rpc.amqp back into nova.rpc.impl_kombu and nova.rpc.impl_qpid. The pool now gets passed into nova.rpc.amqp using arguments as needed. The previous method worked fine unless both rpc implementations got loaded into the same Python instance. In that case, whichever one got loaded 2nd had control over what type of connections nova.rpc.amqp would create. With these changes in place, this conflict between impl_kombu and impl_qpid is resolved. Change-Id: I72bc0c95bfc04ccdfb89d3456332f622ca5ffa42
-rw-r--r--nova/rpc/amqp.py76
-rw-r--r--nova/rpc/impl_kombu.py21
-rw-r--r--nova/rpc/impl_qpid.py20
-rw-r--r--nova/tests/rpc/test_qpid.py9
4 files changed, 63 insertions, 63 deletions
diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py
index 0995d9ab8..01e127764 100644
--- a/nova/rpc/amqp.py
+++ b/nova/rpc/amqp.py
@@ -44,18 +44,22 @@ from nova.rpc.common import LOG
FLAGS = flags.FLAGS
-ConnectionClass = None
-
-
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, *args, **kwargs):
+ self.connection_cls = kwargs.pop("connection_cls", None)
+ kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size)
+ kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug('Pool creating new connection')
- return ConnectionClass()
+ return self.connection_cls()
+
+ def empty(self):
+ while self.free_items:
+ self.get().close()
class ConnectionContext(rpc_common.Connection):
@@ -69,16 +73,14 @@ class ConnectionContext(rpc_common.Connection):
the pool.
"""
- _connection_pool = Pool(max_size=FLAGS.rpc_conn_pool_size,
- order_as_stack=True)
-
- def __init__(self, pooled=True):
+ def __init__(self, connection_pool, pooled=True):
"""Create a new connection, or get one from the pool"""
self.connection = None
+ self.connection_pool = connection_pool
if pooled:
- self.connection = self._connection_pool.get()
+ self.connection = connection_pool.get()
else:
- self.connection = ConnectionClass()
+ self.connection = connection_pool.connection_cls()
self.pooled = pooled
def __enter__(self):
@@ -94,7 +96,7 @@ class ConnectionContext(rpc_common.Connection):
# Reset the connection so it's ready for the next caller
# to grab from the pool
self.connection.reset()
- self._connection_pool.put(self.connection)
+ self.connection_pool.put(self.connection)
else:
try:
self.connection.close()
@@ -127,19 +129,14 @@ class ConnectionContext(rpc_common.Connection):
else:
raise exception.InvalidRPCConnectionReuse()
- @classmethod
- def empty_pool(cls):
- while cls._connection_pool.free_items:
- cls._connection_pool.get().close()
-
-def msg_reply(msg_id, reply=None, failure=None, ending=False):
+def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
"""
- with ConnectionContext() as conn:
+ with ConnectionContext(connection_pool) as conn:
if failure:
message = str(failure[1])
tb = traceback.format_exception(*failure)
@@ -161,18 +158,19 @@ def msg_reply(msg_id, reply=None, failure=None, ending=False):
class RpcContext(context.RequestContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, *args, **kwargs):
- msg_id = kwargs.pop('msg_id', None)
- self.msg_id = msg_id
+ self.msg_id = kwargs.pop('msg_id', None)
+ self.connection_pool = kwargs.pop('connection_pool', None)
super(RpcContext, self).__init__(*args, **kwargs)
def reply(self, reply=None, failure=None, ending=False):
if self.msg_id:
- msg_reply(self.msg_id, reply, failure, ending)
+ msg_reply(self.msg_id, self.connection_pool, reply, failure,
+ ending)
if ending:
self.msg_id = None
-def unpack_context(msg):
+def unpack_context(msg, connection_pool):
"""Unpack context from msg."""
context_dict = {}
for key in list(msg.keys()):
@@ -183,6 +181,7 @@ def unpack_context(msg):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
+ context_dict['connection_pool'] = connection_pool
ctx = RpcContext.from_dict(context_dict)
LOG.debug(_('unpacked context: %s'), ctx.to_dict())
return ctx
@@ -205,9 +204,10 @@ def pack_context(msg, context):
class ProxyCallback(object):
"""Calls methods on a proxy object based on method and args."""
- def __init__(self, proxy):
+ def __init__(self, proxy, connection_pool):
self.proxy = proxy
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
+ self.connection_pool = connection_pool
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
@@ -227,7 +227,7 @@ class ProxyCallback(object):
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
- ctxt = unpack_context(message_data)
+ ctxt = unpack_context(message_data, self.connection_pool)
method = message_data.get('method')
args = message_data.get('args', {})
if not method:
@@ -303,12 +303,12 @@ class MulticallWaiter(object):
yield result
-def create_connection(new=True):
+def create_connection(new, connection_pool):
"""Create a connection"""
- return ConnectionContext(pooled=not new)
+ return ConnectionContext(connection_pool, pooled=not new)
-def multicall(context, topic, msg, timeout):
+def multicall(context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
@@ -320,16 +320,16 @@ def multicall(context, topic, msg, timeout):
LOG.debug(_('MSG_ID is %s') % (msg_id))
pack_context(msg, context)
- conn = ConnectionContext()
+ conn = ConnectionContext(connection_pool)
wait_msg = MulticallWaiter(conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
return wait_msg
-def call(context, topic, msg, timeout):
+def call(context, topic, msg, timeout, connection_pool):
"""Sends a message on a topic and wait for a response."""
- rv = multicall(context, topic, msg, timeout)
+ rv = multicall(context, topic, msg, timeout, connection_pool)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
@@ -337,29 +337,29 @@ def call(context, topic, msg, timeout):
return rv[-1]
-def cast(context, topic, msg):
+def cast(context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
- with ConnectionContext() as conn:
+ with ConnectionContext(connection_pool) as conn:
conn.topic_send(topic, msg)
-def fanout_cast(context, topic, msg):
+def fanout_cast(context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
- with ConnectionContext() as conn:
+ with ConnectionContext(connection_pool) as conn:
conn.fanout_send(topic, msg)
-def notify(context, topic, msg):
+def notify(context, topic, msg, connection_pool):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending notification on %s...'), topic)
pack_context(msg, context)
- with ConnectionContext() as conn:
+ with ConnectionContext(connection_pool) as conn:
conn.notify_send(topic, msg)
-def cleanup():
- ConnectionContext.empty_pool()
+def cleanup(connection_pool):
+ connection_pool.empty()
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index 50459e5ad..a90d06a76 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -588,43 +588,44 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
self.declare_fanout_consumer(topic,
- rpc_amqp.ProxyCallback(proxy))
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
else:
- self.declare_topic_consumer(topic, rpc_amqp.ProxyCallback(proxy))
+ self.declare_topic_consumer(topic,
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
-rpc_amqp.ConnectionClass = Connection
+Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
def create_connection(new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(new)
+ return rpc_amqp.create_connection(new, Connection.pool)
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(context, topic, msg, timeout)
+ return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(context, topic, msg, timeout)
+ return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(context, topic, msg)
+ return rpc_amqp.cast(context, topic, msg, Connection.pool)
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(context, topic, msg)
+ return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
- return rpc_amqp.notify(context, topic, msg)
+ return rpc_amqp.notify(context, topic, msg, Connection.pool)
def cleanup():
- return rpc_amqp.cleanup()
+ return rpc_amqp.cleanup(Connection.pool)
diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py
index 1ed20ba81..e01687375 100644
--- a/nova/rpc/impl_qpid.py
+++ b/nova/rpc/impl_qpid.py
@@ -474,46 +474,46 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
consumer = FanoutConsumer(self.session, topic,
- rpc_amqp.ProxyCallback(proxy))
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
else:
consumer = TopicConsumer(self.session, topic,
- rpc_amqp.ProxyCallback(proxy))
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
self._register_consumer(consumer)
return consumer
-rpc_amqp.ConnectionClass = Connection
+Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
def create_connection(new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(new)
+ return rpc_amqp.create_connection(new, Connection.pool)
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(context, topic, msg, timeout)
+ return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(context, topic, msg, timeout)
+ return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(context, topic, msg)
+ return rpc_amqp.cast(context, topic, msg, Connection.pool)
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(context, topic, msg)
+ return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
- return rpc_amqp.notify(context, topic, msg)
+ return rpc_amqp.notify(context, topic, msg, Connection.pool)
def cleanup():
- return rpc_amqp.cleanup()
+ return rpc_amqp.cleanup(Connection.pool)
diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py
index 9e318fbfd..f96b714bd 100644
--- a/nova/tests/rpc/test_qpid.py
+++ b/nova/tests/rpc/test_qpid.py
@@ -25,7 +25,6 @@ import mox
from nova import context
from nova import log as logging
from nova import test
-import nova.rpc.amqp as rpc_amqp
try:
import qpid
@@ -186,10 +185,10 @@ class RpcQpidTestCase(test.TestCase):
self.mocker.VerifyAll()
finally:
- while rpc_amqp.ConnectionContext._connection_pool.free_items:
+ while impl_qpid.Connection.pool.free_items:
# Pull the mock connection object out of the connection pool so
# that it doesn't mess up other test cases.
- rpc_amqp.ConnectionContext._connection_pool.get()
+ impl_qpid.Connection.pool.get()
@test.skip_if(qpid is None, "Test requires qpid")
def test_cast(self):
@@ -265,10 +264,10 @@ class RpcQpidTestCase(test.TestCase):
self.mocker.VerifyAll()
finally:
- while rpc_amqp.ConnectionContext._connection_pool.free_items:
+ while impl_qpid.Connection.pool.free_items:
# Pull the mock connection object out of the connection pool so
# that it doesn't mess up other test cases.
- rpc_amqp.ConnectionContext._connection_pool.get()
+ impl_qpid.Connection.pool.get()
@test.skip_if(qpid is None, "Test requires qpid")
def test_call(self):