summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-02-15 21:41:35 +0000
committerGerrit Code Review <review@openstack.org>2012-02-15 21:41:35 +0000
commitf69b0923b8a81bad075d1972cd342d4d018f8170 (patch)
tree4588a826118c0aa079a3db49eddcd0349f45bcc2
parent963d24d87d489aaf4270bb7bdc1a4b2cbb02a83b (diff)
parente0efd7551d0d4e1c23111419f0622a1994601fda (diff)
downloadnova-f69b0923b8a81bad075d1972cd342d4d018f8170.tar.gz
nova-f69b0923b8a81bad075d1972cd342d4d018f8170.tar.xz
nova-f69b0923b8a81bad075d1972cd342d4d018f8170.zip
Merge "Move connection pool back into impl_kombu/qpid."
-rw-r--r--nova/rpc/amqp.py76
-rw-r--r--nova/rpc/impl_kombu.py21
-rw-r--r--nova/rpc/impl_qpid.py20
-rw-r--r--nova/tests/rpc/test_qpid.py9
4 files changed, 63 insertions, 63 deletions
diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py
index 0995d9ab8..01e127764 100644
--- a/nova/rpc/amqp.py
+++ b/nova/rpc/amqp.py
@@ -44,18 +44,22 @@ from nova.rpc.common import LOG
FLAGS = flags.FLAGS
-ConnectionClass = None
-
-
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, *args, **kwargs):
+ self.connection_cls = kwargs.pop("connection_cls", None)
+ kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size)
+ kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug('Pool creating new connection')
- return ConnectionClass()
+ return self.connection_cls()
+
+ def empty(self):
+ while self.free_items:
+ self.get().close()
class ConnectionContext(rpc_common.Connection):
@@ -69,16 +73,14 @@ class ConnectionContext(rpc_common.Connection):
the pool.
"""
- _connection_pool = Pool(max_size=FLAGS.rpc_conn_pool_size,
- order_as_stack=True)
-
- def __init__(self, pooled=True):
+ def __init__(self, connection_pool, pooled=True):
"""Create a new connection, or get one from the pool"""
self.connection = None
+ self.connection_pool = connection_pool
if pooled:
- self.connection = self._connection_pool.get()
+ self.connection = connection_pool.get()
else:
- self.connection = ConnectionClass()
+ self.connection = connection_pool.connection_cls()
self.pooled = pooled
def __enter__(self):
@@ -94,7 +96,7 @@ class ConnectionContext(rpc_common.Connection):
# Reset the connection so it's ready for the next caller
# to grab from the pool
self.connection.reset()
- self._connection_pool.put(self.connection)
+ self.connection_pool.put(self.connection)
else:
try:
self.connection.close()
@@ -127,19 +129,14 @@ class ConnectionContext(rpc_common.Connection):
else:
raise exception.InvalidRPCConnectionReuse()
- @classmethod
- def empty_pool(cls):
- while cls._connection_pool.free_items:
- cls._connection_pool.get().close()
-
-def msg_reply(msg_id, reply=None, failure=None, ending=False):
+def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
"""
- with ConnectionContext() as conn:
+ with ConnectionContext(connection_pool) as conn:
if failure:
message = str(failure[1])
tb = traceback.format_exception(*failure)
@@ -161,18 +158,19 @@ def msg_reply(msg_id, reply=None, failure=None, ending=False):
class RpcContext(context.RequestContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, *args, **kwargs):
- msg_id = kwargs.pop('msg_id', None)
- self.msg_id = msg_id
+ self.msg_id = kwargs.pop('msg_id', None)
+ self.connection_pool = kwargs.pop('connection_pool', None)
super(RpcContext, self).__init__(*args, **kwargs)
def reply(self, reply=None, failure=None, ending=False):
if self.msg_id:
- msg_reply(self.msg_id, reply, failure, ending)
+ msg_reply(self.msg_id, self.connection_pool, reply, failure,
+ ending)
if ending:
self.msg_id = None
-def unpack_context(msg):
+def unpack_context(msg, connection_pool):
"""Unpack context from msg."""
context_dict = {}
for key in list(msg.keys()):
@@ -183,6 +181,7 @@ def unpack_context(msg):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
+ context_dict['connection_pool'] = connection_pool
ctx = RpcContext.from_dict(context_dict)
LOG.debug(_('unpacked context: %s'), ctx.to_dict())
return ctx
@@ -205,9 +204,10 @@ def pack_context(msg, context):
class ProxyCallback(object):
"""Calls methods on a proxy object based on method and args."""
- def __init__(self, proxy):
+ def __init__(self, proxy, connection_pool):
self.proxy = proxy
self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
+ self.connection_pool = connection_pool
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
@@ -227,7 +227,7 @@ class ProxyCallback(object):
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
- ctxt = unpack_context(message_data)
+ ctxt = unpack_context(message_data, self.connection_pool)
method = message_data.get('method')
args = message_data.get('args', {})
if not method:
@@ -303,12 +303,12 @@ class MulticallWaiter(object):
yield result
-def create_connection(new=True):
+def create_connection(new, connection_pool):
"""Create a connection"""
- return ConnectionContext(pooled=not new)
+ return ConnectionContext(connection_pool, pooled=not new)
-def multicall(context, topic, msg, timeout):
+def multicall(context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
@@ -320,16 +320,16 @@ def multicall(context, topic, msg, timeout):
LOG.debug(_('MSG_ID is %s') % (msg_id))
pack_context(msg, context)
- conn = ConnectionContext()
+ conn = ConnectionContext(connection_pool)
wait_msg = MulticallWaiter(conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
return wait_msg
-def call(context, topic, msg, timeout):
+def call(context, topic, msg, timeout, connection_pool):
"""Sends a message on a topic and wait for a response."""
- rv = multicall(context, topic, msg, timeout)
+ rv = multicall(context, topic, msg, timeout, connection_pool)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
@@ -337,29 +337,29 @@ def call(context, topic, msg, timeout):
return rv[-1]
-def cast(context, topic, msg):
+def cast(context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
- with ConnectionContext() as conn:
+ with ConnectionContext(connection_pool) as conn:
conn.topic_send(topic, msg)
-def fanout_cast(context, topic, msg):
+def fanout_cast(context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
- with ConnectionContext() as conn:
+ with ConnectionContext(connection_pool) as conn:
conn.fanout_send(topic, msg)
-def notify(context, topic, msg):
+def notify(context, topic, msg, connection_pool):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending notification on %s...'), topic)
pack_context(msg, context)
- with ConnectionContext() as conn:
+ with ConnectionContext(connection_pool) as conn:
conn.notify_send(topic, msg)
-def cleanup():
- ConnectionContext.empty_pool()
+def cleanup(connection_pool):
+ connection_pool.empty()
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index 50459e5ad..a90d06a76 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -588,43 +588,44 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
self.declare_fanout_consumer(topic,
- rpc_amqp.ProxyCallback(proxy))
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
else:
- self.declare_topic_consumer(topic, rpc_amqp.ProxyCallback(proxy))
+ self.declare_topic_consumer(topic,
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
-rpc_amqp.ConnectionClass = Connection
+Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
def create_connection(new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(new)
+ return rpc_amqp.create_connection(new, Connection.pool)
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(context, topic, msg, timeout)
+ return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(context, topic, msg, timeout)
+ return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(context, topic, msg)
+ return rpc_amqp.cast(context, topic, msg, Connection.pool)
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(context, topic, msg)
+ return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
- return rpc_amqp.notify(context, topic, msg)
+ return rpc_amqp.notify(context, topic, msg, Connection.pool)
def cleanup():
- return rpc_amqp.cleanup()
+ return rpc_amqp.cleanup(Connection.pool)
diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py
index 3898893d2..10bc162f7 100644
--- a/nova/rpc/impl_qpid.py
+++ b/nova/rpc/impl_qpid.py
@@ -474,46 +474,46 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
consumer = FanoutConsumer(self.session, topic,
- rpc_amqp.ProxyCallback(proxy))
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
else:
consumer = TopicConsumer(self.session, topic,
- rpc_amqp.ProxyCallback(proxy))
+ rpc_amqp.ProxyCallback(proxy, Connection.pool))
self._register_consumer(consumer)
return consumer
-rpc_amqp.ConnectionClass = Connection
+Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
def create_connection(new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(new)
+ return rpc_amqp.create_connection(new, Connection.pool)
def multicall(context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(context, topic, msg, timeout)
+ return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool)
def call(context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(context, topic, msg, timeout)
+ return rpc_amqp.call(context, topic, msg, timeout, Connection.pool)
def cast(context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(context, topic, msg)
+ return rpc_amqp.cast(context, topic, msg, Connection.pool)
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(context, topic, msg)
+ return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
- return rpc_amqp.notify(context, topic, msg)
+ return rpc_amqp.notify(context, topic, msg, Connection.pool)
def cleanup():
- return rpc_amqp.cleanup()
+ return rpc_amqp.cleanup(Connection.pool)
diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py
index 38ecd33d9..61d47fb5d 100644
--- a/nova/tests/rpc/test_qpid.py
+++ b/nova/tests/rpc/test_qpid.py
@@ -25,7 +25,6 @@ import mox
from nova import context
from nova import log as logging
from nova import test
-import nova.rpc.amqp as rpc_amqp
try:
import qpid
@@ -186,10 +185,10 @@ class RpcQpidTestCase(test.TestCase):
self.mocker.VerifyAll()
finally:
- while rpc_amqp.ConnectionContext._connection_pool.free_items:
+ while impl_qpid.Connection.pool.free_items:
# Pull the mock connection object out of the connection pool so
# that it doesn't mess up other test cases.
- rpc_amqp.ConnectionContext._connection_pool.get()
+ impl_qpid.Connection.pool.get()
@test.skip_if(qpid is None, "Test requires qpid")
def test_cast(self):
@@ -265,10 +264,10 @@ class RpcQpidTestCase(test.TestCase):
self.mocker.VerifyAll()
finally:
- while rpc_amqp.ConnectionContext._connection_pool.free_items:
+ while impl_qpid.Connection.pool.free_items:
# Pull the mock connection object out of the connection pool so
# that it doesn't mess up other test cases.
- rpc_amqp.ConnectionContext._connection_pool.get()
+ impl_qpid.Connection.pool.get()
@test.skip_if(qpid is None, "Test requires qpid")
def test_call(self):