From fdd27860724cd57db6df059a97e98289f88ce6ac Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: add support to rpc for multicall --- nova/rpc.py | 99 +++++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 73 insertions(+), 26 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 2116f22c3..04198a4a6 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -32,8 +32,11 @@ import uuid from carrot import connection as carrot_connection from carrot import messaging +import eventlet from eventlet import greenpool from eventlet import greenthread +from eventlet import queue + from nova import context from nova import exception @@ -131,7 +134,8 @@ class Consumer(messaging.Consumer): self.connection = Connection.recreate() self.backend = self.connection.create_backend() self.declare() - super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks) + return super(Consumer, self).fetch( + no_ack, auto_ack, enable_callbacks) if self.failed_connection: LOG.error(_('Reconnected to queue')) self.failed_connection = False @@ -347,8 +351,9 @@ def _unpack_context(msg): if key.startswith('_context_'): value = msg.pop(key) context_dict[key[9:]] = value + context_dict['msg_id'] = msg.pop('_msg_id', None) LOG.debug(_('unpacked context: %s'), context_dict) - return context.RequestContext.from_dict(context_dict) + return RpcContext.from_dict(context_dict) def _pack_context(msg, context): @@ -365,26 +370,27 @@ def _pack_context(msg, context): msg.update(context) -def call(context, topic, msg): - """Sends a message on a topic and wait for a response.""" +class RpcContext(context.RequestContext): + def __init__(self, *args, **kwargs): + msg_id = kwargs.pop('msg_id', None) + self.msg_id = msg_id + super(RpcContext, self).__init__(*args, **kwargs) + + def reply(self, *args, **kwargs): + msg_reply(self.msg_id, *args, **kwargs) + + +def multicall(context, topic, msg): + """Make a call that returns multiple times.""" LOG.debug(_('Making asynchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) _pack_context(msg, context) - class WaitMessage(object): - def __call__(self, data, message): - """Acks message and sets result.""" - message.ack() - if data['failure']: - self.result = RemoteError(*data['failure']) - else: - self.result = data['result'] - - wait_msg = WaitMessage() conn = Connection.instance() consumer = DirectConsumer(connection=conn, msg_id=msg_id) + wait_msg = MulticallWaiter(consumer) consumer.register_callback(wait_msg) conn = Connection.instance() @@ -392,18 +398,59 @@ def call(context, topic, msg): publisher.send(msg) publisher.close() - try: - consumer.wait(limit=1) - except StopIteration: - pass - consumer.close() - # NOTE(termie): this is a little bit of a change from the original - # non-eventlet code where returning a Failure - # instance from a deferred call is very similar to - # raising an exception - if isinstance(wait_msg.result, Exception): - raise wait_msg.result - return wait_msg.result + return wait_msg + + +class MulticallWaiter(object): + def __init__(self, consumer): + self._consumer = consumer + self._results = queue.Queue() + self._closed = False + + def close(self): + self._closed = True + self._consumer.close() + + def __call__(self, data, message): + """Acks message and sets result.""" + message.ack() + if data['failure']: + self._results.put(RemoteError(*data['failure'])) + else: + self._results.put(data['result']) + + def __iter__(self): + return self.wait() + + def wait(self): + # TODO(termie): This is probably really a much simpler issue but am + # trying to solve the problem quickly. This works but + # I'd prefer to dig in and do it the best way later on. + + def _waiter(): + while not self._closed: + try: + self._consumer.wait(limit=1) + except StopIteration: + pass + eventlet.spawn(_waiter) + + while True: + result = self._results.get() + if isinstance(result, Exception): + raise result + if result == None: + self.close() + raise StopIteration + yield result + + +def call(context, topic, msg): + """Sends a message on a topic and wait for a response.""" + rv = multicall(context, topic, msg) + for x in rv: + rv.close() + return x def cast(context, topic, msg): -- cgit From 7622e854ef68fbdbfc531690cf74916301956c8e Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: add commented out unworking code for yield-based returns --- nova/rpc.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 04198a4a6..f43291c4b 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -201,6 +201,11 @@ class AdapterConsumer(Consumer): try: rval = node_func(context=ctxt, **node_args) if msg_id: + # TODO(termie): re-enable when fix the yielding issue + #if hasattr(rval, 'send'): + # logging.error('rval! %s', rval) + # for x in rval: + # msg_reply(msg_id, x, None) msg_reply(msg_id, rval, None) except Exception as e: logging.exception('Exception during message handling') -- cgit From b44c1fe9561ee8754137d2700bab295f20a4032b Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: Add a connection pool for rpc cast/call Use the same rabbit connection for all topic listening and wait to be notified vs doing a 0.1 second poll for each. --- nova/rpc.py | 96 +++++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 72 insertions(+), 24 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index f43291c4b..62590ca92 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -35,9 +35,9 @@ from carrot import messaging import eventlet from eventlet import greenpool from eventlet import greenthread +from eventlet import pools from eventlet import queue - from nova import context from nova import exception from nova import fakerabbit @@ -92,6 +92,11 @@ class Connection(carrot_connection.BrokerConnection): pass return cls.instance() +class Pool(pools.Pool): + def create(self): + return Connection.instance(new=True) + +ConnectionPool = Pool(max_size=20) class Consumer(messaging.Consumer): """Consumer base class. @@ -163,21 +168,9 @@ class AdapterConsumer(Consumer): self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) super(AdapterConsumer, self).__init__(connection=connection, topic=topic) + self.register_callback(self.process_data) - def receive(self, *args, **kwargs): - self.pool.spawn_n(self._receive, *args, **kwargs) - - @exception.wrap_exception - def _receive(self, message_data, message): - """Magically looks for a method on the proxy object and calls it. - - Message data should be a dictionary with two keys: - method: string representing the method to call - args: dictionary of arg: value - - Example: {'method': 'echo', 'args': {'value': 42}} - - """ + def process_data(self, message_data, message): LOG.debug(_('received %s') % message_data) msg_id = message_data.pop('_msg_id', None) @@ -194,6 +187,19 @@ class AdapterConsumer(Consumer): LOG.warn(_('no method for message: %s') % message_data) msg_reply(msg_id, _('No method for message: %s') % message_data) return + self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args) + + @exception.wrap_exception + def _process_data(self, msg_id, ctxt, method, args): + """Magically looks for a method on the proxy object and calls it. + + Message data should be a dictionary with two keys: + method: string representing the method to call + args: dictionary of arg: value + + Example: {'method': 'echo', 'args': {'value': 42}} + + """ node_func = getattr(self.proxy, str(method)) node_args = dict((str(k), v) for k, v in args.iteritems()) @@ -214,11 +220,6 @@ class AdapterConsumer(Consumer): return -class Publisher(messaging.Publisher): - """Publisher base class.""" - pass - - class TopicAdapterConsumer(AdapterConsumer): """Consumes messages on a specific topic.""" @@ -251,6 +252,50 @@ class FanoutAdapterConsumer(AdapterConsumer): topic=topic, proxy=proxy) +class ConsumerSet(object): + """Groups consumers to listen on together on a single connection""" + + def __init__(self, conn, consumer_list): + self.consumer_list = set(consumer_list) + self.consumer_set = None + self.init(conn) + + def init(self, conn): + if not conn: + conn = Connection.instance(new=True) + if self.consumer_set: + self.consumer_set.close() + self.consumer_set = messaging.ConsumerSet(conn) + for consumer in self.consumer_list: + consumer.connection = conn + # consumer.backend is set for us + self.consumer_set.add_consumer(consumer) + + def reconnect(self): + self.init(None) + + def wait(self, limit=None): + while True: + it = self.consumer_set.iterconsume(limit=limit) + while True: + try: + it.next() + except StopIteration: + return + except Exception as e: + LOG.error(_("Received exception %s " % str(e) + \ + "while processing consumer")) + fuck + self.reconnect() + # Break to outer loop + break + + +class Publisher(messaging.Publisher): + """Publisher base class.""" + pass + + class TopicPublisher(Publisher): """Publishes messages on a specific topic.""" @@ -315,7 +360,7 @@ def msg_reply(msg_id, reply=None, failure=None): LOG.error(_("Returning exception %s to caller"), message) LOG.error(tb) failure = (failure[0].__name__, str(failure[1]), tb) - conn = Connection.instance() + conn = ConnectionPool.get() publisher = DirectPublisher(connection=conn, msg_id=msg_id) try: publisher.send({'result': reply, 'failure': failure}) @@ -324,7 +369,9 @@ def msg_reply(msg_id, reply=None, failure=None): {'result': dict((k, repr(v)) for k, v in reply.__dict__.iteritems()), 'failure': failure}) + publisher.close() + ConnectionPool.put(conn) class RemoteError(exception.Error): @@ -393,12 +440,11 @@ def multicall(context, topic, msg): LOG.debug(_('MSG_ID is %s') % (msg_id)) _pack_context(msg, context) - conn = Connection.instance() + conn = ConnectionPool.get() consumer = DirectConsumer(connection=conn, msg_id=msg_id) wait_msg = MulticallWaiter(consumer) consumer.register_callback(wait_msg) - conn = Connection.instance() publisher = TopicPublisher(connection=conn, topic=topic) publisher.send(msg) publisher.close() @@ -462,10 +508,11 @@ def cast(context, topic, msg): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) _pack_context(msg, context) - conn = Connection.instance() + conn = ConnectionPool.get() publisher = TopicPublisher(connection=conn, topic=topic) publisher.send(msg) publisher.close() + ConnectionPool.put(conn) def fanout_cast(context, topic, msg): @@ -511,6 +558,7 @@ def send_message(topic, message, wait=True): if wait: consumer.wait() + consumer.close() if __name__ == '__main__': -- cgit From e1a47584cc63136280cf3ca9ef02da3efc1dff7f Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: pep8 and comment fixes --- nova/rpc.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 62590ca92..db5aec826 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -92,12 +92,16 @@ class Connection(carrot_connection.BrokerConnection): pass return cls.instance() + class Pool(pools.Pool): + """Class that implements a Pool of Connections""" + def create(self): return Connection.instance(new=True) ConnectionPool = Pool(max_size=20) + class Consumer(messaging.Consumer): """Consumer base class. @@ -171,6 +175,16 @@ class AdapterConsumer(Consumer): self.register_callback(self.process_data) def process_data(self, message_data, message): + """Consumer callback that parses the message for validity and + fires off a thread to call the proxy object method. + + Message data should be a dictionary with two keys: + method: string representing the method to call + args: dictionary of arg: value + + Example: {'method': 'echo', 'args': {'value': 42}} + + """ LOG.debug(_('received %s') % message_data) msg_id = message_data.pop('_msg_id', None) @@ -191,14 +205,8 @@ class AdapterConsumer(Consumer): @exception.wrap_exception def _process_data(self, msg_id, ctxt, method, args): - """Magically looks for a method on the proxy object and calls it. - - Message data should be a dictionary with two keys: - method: string representing the method to call - args: dictionary of arg: value - - Example: {'method': 'echo', 'args': {'value': 42}} - + """Thread that maigcally looks for a method on the proxy + object and calls it. """ node_func = getattr(self.proxy, str(method)) @@ -285,7 +293,6 @@ class ConsumerSet(object): except Exception as e: LOG.error(_("Received exception %s " % str(e) + \ "while processing consumer")) - fuck self.reconnect() # Break to outer loop break -- cgit From d0be426d4e7bbfb1ecb3f078c71c1e176da441a5 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: convert fanout_cast to ConnectionPool --- nova/rpc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index db5aec826..fdb228695 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -526,10 +526,11 @@ def fanout_cast(context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) _pack_context(msg, context) - conn = Connection.instance() + conn = ConnectionPool.get() publisher = FanoutPublisher(topic, connection=conn) publisher.send(msg) publisher.close() + ConnectionPool.put(conn) def generic_response(message_data, message): -- cgit From 90e30806a2e0c235612eb09792656cd861997f84 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Wed, 25 May 2011 15:42:24 -0700 Subject: fix consumers to actually be deleted and clean up cloud test --- nova/rpc.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index fdb228695..e2e962fcc 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -30,11 +30,11 @@ import time import traceback import uuid +import greenlet from carrot import connection as carrot_connection from carrot import messaging import eventlet from eventlet import greenpool -from eventlet import greenthread from eventlet import pools from eventlet import queue @@ -266,6 +266,7 @@ class ConsumerSet(object): def __init__(self, conn, consumer_list): self.consumer_list = set(consumer_list) self.consumer_set = None + self.enabled = True self.init(conn) def init(self, conn): @@ -283,15 +284,21 @@ class ConsumerSet(object): self.init(None) def wait(self, limit=None): - while True: + running = True + while running: it = self.consumer_set.iterconsume(limit=limit) + if not it: + break while True: try: it.next() except StopIteration: return + except greenlet.GreenletExit: + running = False + break except Exception as e: - LOG.error(_("Received exception %s " % str(e) + \ + LOG.error(_("Received exception %s " % type(e) + \ "while processing consumer")) self.reconnect() # Break to outer loop -- cgit From 8f2557dcd3e3d88c0eabb63bcce90ced79347ae4 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: catch greenlet.GreenletExit when shutting service down --- nova/rpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index e2e962fcc..02052ecf5 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -24,13 +24,13 @@ No fan-out support yet. """ +import greenlet import json import sys import time import traceback import uuid -import greenlet from carrot import connection as carrot_connection from carrot import messaging import eventlet -- cgit From 11d3672ad655c39265e5d2477a30db3a12adc65c Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: Add rpc_conn_pool_size flag for the new connection pool --- nova/rpc.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 02052ecf5..82869fc46 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -50,7 +50,10 @@ LOG = logging.getLogger('nova.rpc') FLAGS = flags.FLAGS -flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool') +flags.DEFINE_integer('rpc_thread_pool_size', 1024, + 'Size of RPC thread pool') +flags.DEFINE_integer('rpc_conn_pool_size', 30, + 'Size of RPC connection pool') class Connection(carrot_connection.BrokerConnection): @@ -99,7 +102,7 @@ class Pool(pools.Pool): def create(self): return Connection.instance(new=True) -ConnectionPool = Pool(max_size=20) +ConnectionPool = Pool(max_size=FLAGS.rpc_conn_pool_size) class Consumer(messaging.Consumer): -- cgit From b193b97054f11664a72cd53547f355d1c9044f88 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Wed, 25 May 2011 15:42:24 -0700 Subject: connection pool tests and make the pool LIFO --- nova/rpc.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 82869fc46..3cc0dadd4 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -99,10 +99,16 @@ class Connection(carrot_connection.BrokerConnection): class Pool(pools.Pool): """Class that implements a Pool of Connections""" + # TODO(comstud): Timeout connections not used in a while def create(self): return Connection.instance(new=True) -ConnectionPool = Pool(max_size=FLAGS.rpc_conn_pool_size) +# Create a ConnectionPool to use for RPC calls. We'll order the +# pool as a stack (LIFO), so that we can potentially loop through and +# timeout old unused connections at some point +ConnectionPool = Pool( + max_size=FLAGS.rpc_conn_pool_size, + order_as_stack=True) class Consumer(messaging.Consumer): -- cgit From 51e8eeb9b3a23f811bcbf52d9700d94c5c8b15e4 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: bring back commits lost in merge --- nova/rpc.py | 107 +++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 63 insertions(+), 44 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 3cc0dadd4..d7d7bb014 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -35,6 +35,7 @@ from carrot import connection as carrot_connection from carrot import messaging import eventlet from eventlet import greenpool +from eventlet import greenthread from eventlet import pools from eventlet import queue @@ -140,30 +141,30 @@ class Consumer(messaging.Consumer): FLAGS.rabbit_max_retries) sys.exit(1) - def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): - """Wraps the parent fetch with some logic for failed connection.""" - # TODO(vish): the logic for failed connections and logging should be - # refactored into some sort of connection manager object - try: - if self.failed_connection: - # NOTE(vish): connection is defined in the parent class, we can - # recreate it as long as we create the backend too - # pylint: disable=W0201 - self.connection = Connection.recreate() - self.backend = self.connection.create_backend() - self.declare() - return super(Consumer, self).fetch( - no_ack, auto_ack, enable_callbacks) - if self.failed_connection: - LOG.error(_('Reconnected to queue')) - self.failed_connection = False - # NOTE(vish): This is catching all errors because we really don't - # want exceptions to be logged 10 times a second if some - # persistent failure occurs. - except Exception, e: # pylint: disable=W0703 - if not self.failed_connection: - LOG.exception(_('Failed to fetch message from queue: %s' % e)) - self.failed_connection = True + #def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): + # """Wraps the parent fetch with some logic for failed connection.""" + # # TODO(vish): the logic for failed connections and logging should be + # # refactored into some sort of connection manager object + # try: + # if self.failed_connection: + # # NOTE(vish): connection is defined in the parent class, we can + # # recreate it as long as we create the backend too + # # pylint: disable=W0201 + # self.connection = Connection.recreate() + # self.backend = self.connection.create_backend() + # self.declare() + # return super(Consumer, self).fetch( + # no_ack, auto_ack, enable_callbacks) + # if self.failed_connection: + # LOG.error(_('Reconnected to queue')) + # self.failed_connection = False + # # NOTE(vish): This is catching all errors because we really don't + # # want exceptions to be logged 10 times a second if some + # # persistent failure occurs. + # except Exception, e: # pylint: disable=W0703 + # if not self.failed_connection: + # LOG.exception(_('Failed to fetch message from queue: %s' % e)) + # self.failed_connection = True def attach_to_eventlet(self): """Only needed for unit tests!""" @@ -195,7 +196,7 @@ class AdapterConsumer(Consumer): """ LOG.debug(_('received %s') % message_data) - msg_id = message_data.pop('_msg_id', None) + msg_id = message_data.get('_msg_id', None) ctxt = _unpack_context(message_data) @@ -225,11 +226,14 @@ class AdapterConsumer(Consumer): rval = node_func(context=ctxt, **node_args) if msg_id: # TODO(termie): re-enable when fix the yielding issue - #if hasattr(rval, 'send'): - # logging.error('rval! %s', rval) - # for x in rval: - # msg_reply(msg_id, x, None) - msg_reply(msg_id, rval, None) + if hasattr(rval, 'send'): + logging.error('rval! %s', rval) + for x in rval: + msg_reply(msg_id, x, None) + msg_reply(msg_id, None, None) + else: + msg_reply(msg_id, rval, None) + #msg_reply(msg_id, rval, None) except Exception as e: logging.exception('Exception during message handling') if msg_id: @@ -355,7 +359,7 @@ class DirectConsumer(Consumer): self.routing_key = msg_id self.exchange = msg_id self.auto_delete = True - self.exclusive = True + self.exclusive = False super(DirectConsumer, self).__init__(connection=connection) @@ -387,7 +391,9 @@ def msg_reply(msg_id, reply=None, failure=None): publisher = DirectPublisher(connection=conn, msg_id=msg_id) try: publisher.send({'result': reply, 'failure': failure}) + LOG.error('MSG REPLY SUCCESS') except TypeError: + LOG.error('MSG REPLY FAILURE') publisher.send( {'result': dict((k, repr(v)) for k, v in reply.__dict__.iteritems()), @@ -440,9 +446,9 @@ def _pack_context(msg, context): for args at some point. """ - context = dict([('_context_%s' % key, value) - for (key, value) in context.to_dict().iteritems()]) - msg.update(context) + context_d = dict([('_context_%s' % key, value) + for (key, value) in context.to_dict().iteritems()]) + msg.update(context_d) class RpcContext(context.RequestContext): @@ -463,12 +469,13 @@ def multicall(context, topic, msg): LOG.debug(_('MSG_ID is %s') % (msg_id)) _pack_context(msg, context) - conn = ConnectionPool.get() - consumer = DirectConsumer(connection=conn, msg_id=msg_id) + con_conn = ConnectionPool.get() + consumer = DirectConsumer(connection=con_conn, msg_id=msg_id) wait_msg = MulticallWaiter(consumer) consumer.register_callback(wait_msg) - publisher = TopicPublisher(connection=conn, topic=topic) + pub_conn = ConnectionPool.get() + publisher = TopicPublisher(connection=pub_conn, topic=topic) publisher.send(msg) publisher.close() @@ -484,6 +491,7 @@ class MulticallWaiter(object): def close(self): self._closed = True self._consumer.close() + ConnectionPool.put(self._consumer.connection) def __call__(self, data, message): """Acks message and sets result.""" @@ -501,15 +509,26 @@ class MulticallWaiter(object): # trying to solve the problem quickly. This works but # I'd prefer to dig in and do it the best way later on. - def _waiter(): - while not self._closed: - try: - self._consumer.wait(limit=1) - except StopIteration: - pass - eventlet.spawn(_waiter) + #def _waiter(): + # i = 0 + # while not self._closed: + # LOG.error('Iteration #%s (%s)', i, self._consumer.consumer_tag) + # i += 1 + # try: + # self._consumer.wait(limit=1) + # except StopIteration: + # pass + # self._consumer.close() + # ConnectionPool.put(self._consumer.connection) + #eventlet.spawn(_waiter) while True: + rv = None + while rv is None and not self._closed: + rv = self._consumer.fetch(enable_callbacks=True) + time.sleep(0.01) + + LOG.error('RV %s', rv) result = self._results.get() if isinstance(result, Exception): raise result -- cgit From 64b13a2aad676d2310947e3bf8b9e3dde6b763e7 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: almost everything working with fake_rabbit --- nova/rpc.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index d7d7bb014..e1f594a99 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -102,6 +102,7 @@ class Pool(pools.Pool): # TODO(comstud): Timeout connections not used in a while def create(self): + LOG.debug('Creating new connection') return Connection.instance(new=True) # Create a ConnectionPool to use for RPC calls. We'll order the @@ -166,6 +167,10 @@ class Consumer(messaging.Consumer): # LOG.exception(_('Failed to fetch message from queue: %s' % e)) # self.failed_connection = True + def close(self, *args, **kwargs): + LOG.debug('Closing consumer %s', self.consumer_tag) + return super(Consumer, self).close(*args, **kwargs) + def attach_to_eventlet(self): """Only needed for unit tests!""" timer = utils.LoopingCall(self.fetch, enable_callbacks=True) @@ -317,6 +322,8 @@ class ConsumerSet(object): # Break to outer loop break + def close(self): + self.consumer_set.close() class Publisher(messaging.Publisher): """Publisher base class.""" @@ -525,12 +532,19 @@ class MulticallWaiter(object): while True: rv = None while rv is None and not self._closed: - rv = self._consumer.fetch(enable_callbacks=True) + try: + rv = self._consumer.fetch(enable_callbacks=True) + except Exception: + self.close() + raise + #rv = self._consumer.fetch(enable_callbacks=True) time.sleep(0.01) LOG.error('RV %s', rv) result = self._results.get() + LOG.error('RESULT %s', result) if isinstance(result, Exception): + self.close() raise result if result == None: self.close() -- cgit From e3a88390fd62308cde3d4c597d653c8dc245bed4 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:24 -0700 Subject: don't need to use a separate connection --- nova/rpc.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index e1f594a99..a212383fd 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -481,8 +481,7 @@ def multicall(context, topic, msg): wait_msg = MulticallWaiter(consumer) consumer.register_callback(wait_msg) - pub_conn = ConnectionPool.get() - publisher = TopicPublisher(connection=pub_conn, topic=topic) + publisher = TopicPublisher(connection=con_conn, topic=topic) publisher.send(msg) publisher.close() -- cgit From c9b21b0619891c069251c568e4d89be791af56c3 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Wed, 25 May 2011 15:42:24 -0700 Subject: lots of fixes for rpc and extra imports --- nova/rpc.py | 68 +++++++++++++++++++++++-------------------------------------- 1 file changed, 25 insertions(+), 43 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index a212383fd..7faed4d3a 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -33,9 +33,7 @@ import uuid from carrot import connection as carrot_connection from carrot import messaging -import eventlet from eventlet import greenpool -from eventlet import greenthread from eventlet import pools from eventlet import queue @@ -142,30 +140,30 @@ class Consumer(messaging.Consumer): FLAGS.rabbit_max_retries) sys.exit(1) - #def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): - # """Wraps the parent fetch with some logic for failed connection.""" - # # TODO(vish): the logic for failed connections and logging should be - # # refactored into some sort of connection manager object - # try: - # if self.failed_connection: - # # NOTE(vish): connection is defined in the parent class, we can - # # recreate it as long as we create the backend too - # # pylint: disable=W0201 - # self.connection = Connection.recreate() - # self.backend = self.connection.create_backend() - # self.declare() - # return super(Consumer, self).fetch( - # no_ack, auto_ack, enable_callbacks) - # if self.failed_connection: - # LOG.error(_('Reconnected to queue')) - # self.failed_connection = False - # # NOTE(vish): This is catching all errors because we really don't - # # want exceptions to be logged 10 times a second if some - # # persistent failure occurs. - # except Exception, e: # pylint: disable=W0703 - # if not self.failed_connection: - # LOG.exception(_('Failed to fetch message from queue: %s' % e)) - # self.failed_connection = True + def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): + """Wraps the parent fetch with some logic for failed connection.""" + # TODO(vish): the logic for failed connections and logging should be + # refactored into some sort of connection manager object + try: + if self.failed_connection: + # NOTE(vish): connection is defined in the parent class, we can + # recreate it as long as we create the backend too + # pylint: disable=W0201 + self.connection = Connection.recreate() + self.backend = self.connection.create_backend() + self.declare() + return super(Consumer, self).fetch( + no_ack, auto_ack, enable_callbacks) + if self.failed_connection: + LOG.error(_('Reconnected to queue')) + self.failed_connection = False + # NOTE(vish): This is catching all errors because we really don't + # want exceptions to be logged 10 times a second if some + # persistent failure occurs. + except Exception, e: # pylint: disable=W0703 + if not self.failed_connection: + LOG.exception(_('Failed to fetch message from queue: %s' % e)) + self.failed_connection = True def close(self, *args, **kwargs): LOG.debug('Closing consumer %s', self.consumer_tag) @@ -325,6 +323,7 @@ class ConsumerSet(object): def close(self): self.consumer_set.close() + class Publisher(messaging.Publisher): """Publisher base class.""" pass @@ -511,23 +510,6 @@ class MulticallWaiter(object): return self.wait() def wait(self): - # TODO(termie): This is probably really a much simpler issue but am - # trying to solve the problem quickly. This works but - # I'd prefer to dig in and do it the best way later on. - - #def _waiter(): - # i = 0 - # while not self._closed: - # LOG.error('Iteration #%s (%s)', i, self._consumer.consumer_tag) - # i += 1 - # try: - # self._consumer.wait(limit=1) - # except StopIteration: - # pass - # self._consumer.close() - # ConnectionPool.put(self._consumer.connection) - #eventlet.spawn(_waiter) - while True: rv = None while rv is None and not self._closed: -- cgit From 9334d41c6fe638a3119327702094695cfbd38271 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:25 -0700 Subject: make sure that using multicall on a call with a single result still functions --- nova/rpc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 7faed4d3a..84493271f 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -233,10 +233,10 @@ class AdapterConsumer(Consumer): logging.error('rval! %s', rval) for x in rval: msg_reply(msg_id, x, None) - msg_reply(msg_id, None, None) else: msg_reply(msg_id, rval, None) - #msg_reply(msg_id, rval, None) + # This final None tells multicall that it is done. + msg_reply(msg_id, None, None) except Exception as e: logging.exception('Exception during message handling') if msg_id: -- cgit From c7fe7e5e28b9f4bb999c8309f56953f6609cbc57 Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:42:49 -0700 Subject: cleanup the code for merging --- nova/rpc.py | 78 +++++++++++++++++++++++++++---------------------------------- 1 file changed, 35 insertions(+), 43 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 84493271f..8d14494f0 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -24,7 +24,6 @@ No fan-out support yet. """ -import greenlet import json import sys import time @@ -36,6 +35,7 @@ from carrot import messaging from eventlet import greenpool from eventlet import pools from eventlet import queue +import greenlet from nova import context from nova import exception @@ -50,9 +50,9 @@ LOG = logging.getLogger('nova.rpc') FLAGS = flags.FLAGS flags.DEFINE_integer('rpc_thread_pool_size', 1024, - 'Size of RPC thread pool') + 'Size of RPC thread pool') flags.DEFINE_integer('rpc_conn_pool_size', 30, - 'Size of RPC connection pool') + 'Size of RPC connection pool') class Connection(carrot_connection.BrokerConnection): @@ -96,7 +96,7 @@ class Connection(carrot_connection.BrokerConnection): class Pool(pools.Pool): - """Class that implements a Pool of Connections""" + """Class that implements a Pool of Connections.""" # TODO(comstud): Timeout connections not used in a while def create(self): @@ -152,8 +152,9 @@ class Consumer(messaging.Consumer): self.connection = Connection.recreate() self.backend = self.connection.create_backend() self.declare() - return super(Consumer, self).fetch( - no_ack, auto_ack, enable_callbacks) + return super(Consumer, self).fetch(no_ack, + auto_ack, + enable_callbacks) if self.failed_connection: LOG.error(_('Reconnected to queue')) self.failed_connection = False @@ -165,10 +166,6 @@ class Consumer(messaging.Consumer): LOG.exception(_('Failed to fetch message from queue: %s' % e)) self.failed_connection = True - def close(self, *args, **kwargs): - LOG.debug('Closing consumer %s', self.consumer_tag) - return super(Consumer, self).close(*args, **kwargs) - def attach_to_eventlet(self): """Only needed for unit tests!""" timer = utils.LoopingCall(self.fetch, enable_callbacks=True) @@ -188,8 +185,10 @@ class AdapterConsumer(Consumer): self.register_callback(self.process_data) def process_data(self, message_data, message): - """Consumer callback that parses the message for validity and - fires off a thread to call the proxy object method. + """Consumer callback to call a method on a proxy object. + + Parses the message for validity and fires off a thread to call the + proxy object method. Message data should be a dictionary with two keys: method: string representing the method to call @@ -199,8 +198,8 @@ class AdapterConsumer(Consumer): """ LOG.debug(_('received %s') % message_data) + # This will be popped off in _unpack_context msg_id = message_data.get('_msg_id', None) - ctxt = _unpack_context(message_data) method = message_data.get('method') @@ -228,13 +227,13 @@ class AdapterConsumer(Consumer): try: rval = node_func(context=ctxt, **node_args) if msg_id: - # TODO(termie): re-enable when fix the yielding issue + # Check if the result was a generator if hasattr(rval, 'send'): - logging.error('rval! %s', rval) for x in rval: msg_reply(msg_id, x, None) else: msg_reply(msg_id, rval, None) + # This final None tells multicall that it is done. msg_reply(msg_id, None, None) except Exception as e: @@ -277,7 +276,7 @@ class FanoutAdapterConsumer(AdapterConsumer): class ConsumerSet(object): - """Groups consumers to listen on together on a single connection""" + """Groups consumers to listen on together on a single connection.""" def __init__(self, conn, consumer_list): self.consumer_list = set(consumer_list) @@ -365,7 +364,7 @@ class DirectConsumer(Consumer): self.routing_key = msg_id self.exchange = msg_id self.auto_delete = True - self.exclusive = False + self.exclusive = True super(DirectConsumer, self).__init__(connection=connection) @@ -393,20 +392,18 @@ def msg_reply(msg_id, reply=None, failure=None): LOG.error(_("Returning exception %s to caller"), message) LOG.error(tb) failure = (failure[0].__name__, str(failure[1]), tb) - conn = ConnectionPool.get() - publisher = DirectPublisher(connection=conn, msg_id=msg_id) - try: - publisher.send({'result': reply, 'failure': failure}) - LOG.error('MSG REPLY SUCCESS') - except TypeError: - LOG.error('MSG REPLY FAILURE') - publisher.send( - {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure}) - publisher.close() - ConnectionPool.put(conn) + with ConnectionPool.item() as conn: + publisher = DirectPublisher(connection=conn, msg_id=msg_id) + try: + publisher.send({'result': reply, 'failure': failure}) + except TypeError: + publisher.send( + {'result': dict((k, repr(v)) + for k, v in reply.__dict__.iteritems()), + 'failure': failure}) + + publisher.close() class RemoteError(exception.Error): @@ -518,12 +515,9 @@ class MulticallWaiter(object): except Exception: self.close() raise - #rv = self._consumer.fetch(enable_callbacks=True) time.sleep(0.01) - LOG.error('RV %s', rv) result = self._results.get() - LOG.error('RESULT %s', result) if isinstance(result, Exception): self.close() raise result @@ -545,22 +539,20 @@ def cast(context, topic, msg): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) _pack_context(msg, context) - conn = ConnectionPool.get() - publisher = TopicPublisher(connection=conn, topic=topic) - publisher.send(msg) - publisher.close() - ConnectionPool.put(conn) + with ConnectionPool.item() as conn: + publisher = TopicPublisher(connection=conn, topic=topic) + publisher.send(msg) + publisher.close() def fanout_cast(context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) _pack_context(msg, context) - conn = ConnectionPool.get() - publisher = FanoutPublisher(topic, connection=conn) - publisher.send(msg) - publisher.close() - ConnectionPool.put(conn) + with ConnectionPool.item() as conn: + publisher = FanoutPublisher(topic, connection=conn) + publisher.send(msg) + publisher.close() def generic_response(message_data, message): -- cgit From feb04f0117450bcd6e8f4966f4487575073be41c Mon Sep 17 00:00:00 2001 From: termie Date: Wed, 25 May 2011 15:43:04 -0700 Subject: change the behavior of calling a multicall --- nova/rpc.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 8d14494f0..493978e57 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -236,6 +236,9 @@ class AdapterConsumer(Consumer): # This final None tells multicall that it is done. msg_reply(msg_id, None, None) + elif hasattr(rval, 'send'): + # NOTE(vish): this iterates through the generator + list(rval) except Exception as e: logging.exception('Exception during message handling') if msg_id: @@ -530,9 +533,8 @@ class MulticallWaiter(object): def call(context, topic, msg): """Sends a message on a topic and wait for a response.""" rv = multicall(context, topic, msg) - for x in rv: - rv.close() - return x + # NOTE(vish): return the last result from the multicall + return list(rv)[-1] def cast(context, topic, msg): -- cgit From d7e0b45a9bc415e87beee32f10c8d6bdff9819ed Mon Sep 17 00:00:00 2001 From: termie Date: Thu, 26 May 2011 15:08:53 -0700 Subject: changes per review --- nova/rpc.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 493978e57..1ec495bc8 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -28,6 +28,7 @@ import json import sys import time import traceback +import types import uuid from carrot import connection as carrot_connection @@ -228,7 +229,7 @@ class AdapterConsumer(Consumer): rval = node_func(context=ctxt, **node_args) if msg_id: # Check if the result was a generator - if hasattr(rval, 'send'): + if isinstance(rval, types.GeneratorType): for x in rval: msg_reply(msg_id, x, None) else: @@ -236,7 +237,7 @@ class AdapterConsumer(Consumer): # This final None tells multicall that it is done. msg_reply(msg_id, None, None) - elif hasattr(rval, 'send'): + elif isinstance(rval, types.GeneratorType): # NOTE(vish): this iterates through the generator list(rval) except Exception as e: @@ -281,11 +282,11 @@ class FanoutAdapterConsumer(AdapterConsumer): class ConsumerSet(object): """Groups consumers to listen on together on a single connection.""" - def __init__(self, conn, consumer_list): + def __init__(self, connection, consumer_list): self.consumer_list = set(consumer_list) self.consumer_set = None self.enabled = True - self.init(conn) + self.init(connection) def init(self, conn): if not conn: @@ -316,8 +317,7 @@ class ConsumerSet(object): running = False break except Exception as e: - LOG.error(_("Received exception %s " % type(e) + \ - "while processing consumer")) + LOG.exception(_("Exception while processing consumer")) self.reconnect() # Break to outer loop break @@ -534,7 +534,10 @@ def call(context, topic, msg): """Sends a message on a topic and wait for a response.""" rv = multicall(context, topic, msg) # NOTE(vish): return the last result from the multicall - return list(rv)[-1] + rv = list(rv) + if not rv: + return + return rv[-1] def cast(context, topic, msg): -- cgit From 103bcae9f172dfee64e7b9235807bcfe1a8aefb3 Mon Sep 17 00:00:00 2001 From: termie Date: Thu, 26 May 2011 17:06:52 -0700 Subject: fix a minor bug unrelated to this change --- nova/rpc.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'nova/rpc.py') diff --git a/nova/rpc.py b/nova/rpc.py index 1ec495bc8..c5277c6a9 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -212,7 +212,9 @@ class AdapterConsumer(Consumer): # we just log the message and send an error string # back to the caller LOG.warn(_('no method for message: %s') % message_data) - msg_reply(msg_id, _('No method for message: %s') % message_data) + if msg_id: + msg_reply(msg_id, + _('No method for message: %s') % message_data) return self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args) -- cgit