diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-02-15 21:41:35 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-02-15 21:41:35 +0000 |
| commit | f69b0923b8a81bad075d1972cd342d4d018f8170 (patch) | |
| tree | 4588a826118c0aa079a3db49eddcd0349f45bcc2 | |
| parent | 963d24d87d489aaf4270bb7bdc1a4b2cbb02a83b (diff) | |
| parent | e0efd7551d0d4e1c23111419f0622a1994601fda (diff) | |
| download | nova-f69b0923b8a81bad075d1972cd342d4d018f8170.tar.gz nova-f69b0923b8a81bad075d1972cd342d4d018f8170.tar.xz nova-f69b0923b8a81bad075d1972cd342d4d018f8170.zip | |
Merge "Move connection pool back into impl_kombu/qpid."
| -rw-r--r-- | nova/rpc/amqp.py | 76 | ||||
| -rw-r--r-- | nova/rpc/impl_kombu.py | 21 | ||||
| -rw-r--r-- | nova/rpc/impl_qpid.py | 20 | ||||
| -rw-r--r-- | nova/tests/rpc/test_qpid.py | 9 |
4 files changed, 63 insertions, 63 deletions
diff --git a/nova/rpc/amqp.py b/nova/rpc/amqp.py index 0995d9ab8..01e127764 100644 --- a/nova/rpc/amqp.py +++ b/nova/rpc/amqp.py @@ -44,18 +44,22 @@ from nova.rpc.common import LOG FLAGS = flags.FLAGS -ConnectionClass = None - - class Pool(pools.Pool): """Class that implements a Pool of Connections.""" def __init__(self, *args, **kwargs): + self.connection_cls = kwargs.pop("connection_cls", None) + kwargs.setdefault("max_size", FLAGS.rpc_conn_pool_size) + kwargs.setdefault("order_as_stack", True) super(Pool, self).__init__(*args, **kwargs) # TODO(comstud): Timeout connections not used in a while def create(self): LOG.debug('Pool creating new connection') - return ConnectionClass() + return self.connection_cls() + + def empty(self): + while self.free_items: + self.get().close() class ConnectionContext(rpc_common.Connection): @@ -69,16 +73,14 @@ class ConnectionContext(rpc_common.Connection): the pool. """ - _connection_pool = Pool(max_size=FLAGS.rpc_conn_pool_size, - order_as_stack=True) - - def __init__(self, pooled=True): + def __init__(self, connection_pool, pooled=True): """Create a new connection, or get one from the pool""" self.connection = None + self.connection_pool = connection_pool if pooled: - self.connection = self._connection_pool.get() + self.connection = connection_pool.get() else: - self.connection = ConnectionClass() + self.connection = connection_pool.connection_cls() self.pooled = pooled def __enter__(self): @@ -94,7 +96,7 @@ class ConnectionContext(rpc_common.Connection): # Reset the connection so it's ready for the next caller # to grab from the pool self.connection.reset() - self._connection_pool.put(self.connection) + self.connection_pool.put(self.connection) else: try: self.connection.close() @@ -127,19 +129,14 @@ class ConnectionContext(rpc_common.Connection): else: raise exception.InvalidRPCConnectionReuse() - @classmethod - def empty_pool(cls): - while cls._connection_pool.free_items: - cls._connection_pool.get().close() - -def msg_reply(msg_id, reply=None, failure=None, ending=False): +def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. """ - with ConnectionContext() as conn: + with ConnectionContext(connection_pool) as conn: if failure: message = str(failure[1]) tb = traceback.format_exception(*failure) @@ -161,18 +158,19 @@ def msg_reply(msg_id, reply=None, failure=None, ending=False): class RpcContext(context.RequestContext): """Context that supports replying to a rpc.call""" def __init__(self, *args, **kwargs): - msg_id = kwargs.pop('msg_id', None) - self.msg_id = msg_id + self.msg_id = kwargs.pop('msg_id', None) + self.connection_pool = kwargs.pop('connection_pool', None) super(RpcContext, self).__init__(*args, **kwargs) def reply(self, reply=None, failure=None, ending=False): if self.msg_id: - msg_reply(self.msg_id, reply, failure, ending) + msg_reply(self.msg_id, self.connection_pool, reply, failure, + ending) if ending: self.msg_id = None -def unpack_context(msg): +def unpack_context(msg, connection_pool): """Unpack context from msg.""" context_dict = {} for key in list(msg.keys()): @@ -183,6 +181,7 @@ def unpack_context(msg): value = msg.pop(key) context_dict[key[9:]] = value context_dict['msg_id'] = msg.pop('_msg_id', None) + context_dict['connection_pool'] = connection_pool ctx = RpcContext.from_dict(context_dict) LOG.debug(_('unpacked context: %s'), ctx.to_dict()) return ctx @@ -205,9 +204,10 @@ def pack_context(msg, context): class ProxyCallback(object): """Calls methods on a proxy object based on method and args.""" - def __init__(self, proxy): + def __init__(self, proxy, connection_pool): self.proxy = proxy self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) + self.connection_pool = connection_pool def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -227,7 +227,7 @@ class ProxyCallback(object): if hasattr(local.store, 'context'): del local.store.context rpc_common._safe_log(LOG.debug, _('received %s'), message_data) - ctxt = unpack_context(message_data) + ctxt = unpack_context(message_data, self.connection_pool) method = message_data.get('method') args = message_data.get('args', {}) if not method: @@ -303,12 +303,12 @@ class MulticallWaiter(object): yield result -def create_connection(new=True): +def create_connection(new, connection_pool): """Create a connection""" - return ConnectionContext(pooled=not new) + return ConnectionContext(connection_pool, pooled=not new) -def multicall(context, topic, msg, timeout): +def multicall(context, topic, msg, timeout, connection_pool): """Make a call that returns multiple times.""" # Can't use 'with' for multicall, as it returns an iterator # that will continue to use the connection. When it's done, @@ -320,16 +320,16 @@ def multicall(context, topic, msg, timeout): LOG.debug(_('MSG_ID is %s') % (msg_id)) pack_context(msg, context) - conn = ConnectionContext() + conn = ConnectionContext(connection_pool) wait_msg = MulticallWaiter(conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) conn.topic_send(topic, msg) return wait_msg -def call(context, topic, msg, timeout): +def call(context, topic, msg, timeout, connection_pool): """Sends a message on a topic and wait for a response.""" - rv = multicall(context, topic, msg, timeout) + rv = multicall(context, topic, msg, timeout, connection_pool) # NOTE(vish): return the last result from the multicall rv = list(rv) if not rv: @@ -337,29 +337,29 @@ def call(context, topic, msg, timeout): return rv[-1] -def cast(context, topic, msg): +def cast(context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) pack_context(msg, context) - with ConnectionContext() as conn: + with ConnectionContext(connection_pool) as conn: conn.topic_send(topic, msg) -def fanout_cast(context, topic, msg): +def fanout_cast(context, topic, msg, connection_pool): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) pack_context(msg, context) - with ConnectionContext() as conn: + with ConnectionContext(connection_pool) as conn: conn.fanout_send(topic, msg) -def notify(context, topic, msg): +def notify(context, topic, msg, connection_pool): """Sends a notification event on a topic.""" LOG.debug(_('Sending notification on %s...'), topic) pack_context(msg, context) - with ConnectionContext() as conn: + with ConnectionContext(connection_pool) as conn: conn.notify_send(topic, msg) -def cleanup(): - ConnectionContext.empty_pool() +def cleanup(connection_pool): + connection_pool.empty() diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 50459e5ad..a90d06a76 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -588,43 +588,44 @@ class Connection(object): """Create a consumer that calls a method in a proxy object""" if fanout: self.declare_fanout_consumer(topic, - rpc_amqp.ProxyCallback(proxy)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) else: - self.declare_topic_consumer(topic, rpc_amqp.ProxyCallback(proxy)) + self.declare_topic_consumer(topic, + rpc_amqp.ProxyCallback(proxy, Connection.pool)) -rpc_amqp.ConnectionClass = Connection +Connection.pool = rpc_amqp.Pool(connection_cls=Connection) def create_connection(new=True): """Create a connection""" - return rpc_amqp.create_connection(new) + return rpc_amqp.create_connection(new, Connection.pool) def multicall(context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(context, topic, msg, timeout) + return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool) def call(context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(context, topic, msg, timeout) + return rpc_amqp.call(context, topic, msg, timeout, Connection.pool) def cast(context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(context, topic, msg) + return rpc_amqp.cast(context, topic, msg, Connection.pool) def fanout_cast(context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(context, topic, msg) + return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) def notify(context, topic, msg): """Sends a notification event on a topic.""" - return rpc_amqp.notify(context, topic, msg) + return rpc_amqp.notify(context, topic, msg, Connection.pool) def cleanup(): - return rpc_amqp.cleanup() + return rpc_amqp.cleanup(Connection.pool) diff --git a/nova/rpc/impl_qpid.py b/nova/rpc/impl_qpid.py index 3898893d2..10bc162f7 100644 --- a/nova/rpc/impl_qpid.py +++ b/nova/rpc/impl_qpid.py @@ -474,46 +474,46 @@ class Connection(object): """Create a consumer that calls a method in a proxy object""" if fanout: consumer = FanoutConsumer(self.session, topic, - rpc_amqp.ProxyCallback(proxy)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) else: consumer = TopicConsumer(self.session, topic, - rpc_amqp.ProxyCallback(proxy)) + rpc_amqp.ProxyCallback(proxy, Connection.pool)) self._register_consumer(consumer) return consumer -rpc_amqp.ConnectionClass = Connection +Connection.pool = rpc_amqp.Pool(connection_cls=Connection) def create_connection(new=True): """Create a connection""" - return rpc_amqp.create_connection(new) + return rpc_amqp.create_connection(new, Connection.pool) def multicall(context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(context, topic, msg, timeout) + return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool) def call(context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(context, topic, msg, timeout) + return rpc_amqp.call(context, topic, msg, timeout, Connection.pool) def cast(context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(context, topic, msg) + return rpc_amqp.cast(context, topic, msg, Connection.pool) def fanout_cast(context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(context, topic, msg) + return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) def notify(context, topic, msg): """Sends a notification event on a topic.""" - return rpc_amqp.notify(context, topic, msg) + return rpc_amqp.notify(context, topic, msg, Connection.pool) def cleanup(): - return rpc_amqp.cleanup() + return rpc_amqp.cleanup(Connection.pool) diff --git a/nova/tests/rpc/test_qpid.py b/nova/tests/rpc/test_qpid.py index 38ecd33d9..61d47fb5d 100644 --- a/nova/tests/rpc/test_qpid.py +++ b/nova/tests/rpc/test_qpid.py @@ -25,7 +25,6 @@ import mox from nova import context from nova import log as logging from nova import test -import nova.rpc.amqp as rpc_amqp try: import qpid @@ -186,10 +185,10 @@ class RpcQpidTestCase(test.TestCase): self.mocker.VerifyAll() finally: - while rpc_amqp.ConnectionContext._connection_pool.free_items: + while impl_qpid.Connection.pool.free_items: # Pull the mock connection object out of the connection pool so # that it doesn't mess up other test cases. - rpc_amqp.ConnectionContext._connection_pool.get() + impl_qpid.Connection.pool.get() @test.skip_if(qpid is None, "Test requires qpid") def test_cast(self): @@ -265,10 +264,10 @@ class RpcQpidTestCase(test.TestCase): self.mocker.VerifyAll() finally: - while rpc_amqp.ConnectionContext._connection_pool.free_items: + while impl_qpid.Connection.pool.free_items: # Pull the mock connection object out of the connection pool so # that it doesn't mess up other test cases. - rpc_amqp.ConnectionContext._connection_pool.get() + impl_qpid.Connection.pool.get() @test.skip_if(qpid is None, "Test requires qpid") def test_call(self): |
