diff options
author | Ryan Lane <rlane@wikimedia.org> | 2011-03-03 23:04:11 +0000 |
---|---|---|
committer | Ryan Lane <rlane@wikimedia.org> | 2011-03-03 23:04:11 +0000 |
commit | df3a65793ec7bb9d85d2a3da47fbbfb9e97d03d4 (patch) | |
tree | aab853348a2df9f5cdb26a77dd836d4f2083f119 /nova/rpc.py | |
parent | 4c50ddee48971c76f0f6252295747b89de5d3697 (diff) | |
parent | 7ca1669603132e3afd14606dda3f95ccbce08a41 (diff) | |
download | nova-df3a65793ec7bb9d85d2a3da47fbbfb9e97d03d4.tar.gz nova-df3a65793ec7bb9d85d2a3da47fbbfb9e97d03d4.tar.xz nova-df3a65793ec7bb9d85d2a3da47fbbfb9e97d03d4.zip |
Merge from trunk
Diffstat (limited to 'nova/rpc.py')
-rw-r--r-- | nova/rpc.py | 30 |
1 files changed, 19 insertions, 11 deletions
diff --git a/nova/rpc.py b/nova/rpc.py index 01fc6d44b..fbb90299b 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -29,6 +29,7 @@ import uuid from carrot import connection as carrot_connection from carrot import messaging +from eventlet import greenpool from eventlet import greenthread from nova import context @@ -42,11 +43,13 @@ from nova import utils FLAGS = flags.FLAGS LOG = logging.getLogger('nova.rpc') +flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool') + class Connection(carrot_connection.BrokerConnection): """Connection instance object""" @classmethod - def instance(cls, new=False): + def instance(cls, new=True): """Returns the instance""" if new or not hasattr(cls, '_instance'): params = dict(hostname=FLAGS.rabbit_host, @@ -88,18 +91,19 @@ class Consumer(messaging.Consumer): super(Consumer, self).__init__(*args, **kwargs) self.failed_connection = False break - except: # Catching all because carrot sucks + except Exception as e: # Catching all because carrot sucks fl_host = FLAGS.rabbit_host fl_port = FLAGS.rabbit_port fl_intv = FLAGS.rabbit_retry_interval - LOG.exception(_("AMQP server on %(fl_host)s:%(fl_port)d is" - " unreachable. Trying again in %(fl_intv)d seconds.") + LOG.error(_("AMQP server on %(fl_host)s:%(fl_port)d is" + " unreachable: %(e)s. Trying again in %(fl_intv)d" + " seconds.") % locals()) self.failed_connection = True if self.failed_connection: - LOG.exception(_("Unable to connect to AMQP server " - "after %d tries. Shutting down."), - FLAGS.rabbit_max_retries) + LOG.error(_("Unable to connect to AMQP server " + "after %d tries. Shutting down."), + FLAGS.rabbit_max_retries) sys.exit(1) def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): @@ -119,7 +123,7 @@ class Consumer(messaging.Consumer): LOG.error(_("Reconnected to queue")) self.failed_connection = False # NOTE(vish): This is catching all errors because we really don't - # exceptions to be logged 10 times a second if some + # want exceptions to be logged 10 times a second if some # persistent failure occurs. except Exception: # pylint: disable-msg=W0703 if not self.failed_connection: @@ -155,11 +159,15 @@ class AdapterConsumer(TopicConsumer): def __init__(self, connection=None, topic="broadcast", proxy=None): LOG.debug(_('Initing the Adapter Consumer for %s') % topic) self.proxy = proxy + self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) super(AdapterConsumer, self).__init__(connection=connection, topic=topic) + def receive(self, *args, **kwargs): + self.pool.spawn_n(self._receive, *args, **kwargs) + @exception.wrap_exception - def receive(self, message_data, message): + def _receive(self, message_data, message): """Magically looks for a method on the proxy object and calls it Message data should be a dictionary with two keys: @@ -246,7 +254,7 @@ def msg_reply(msg_id, reply=None, failure=None): LOG.error(_("Returning exception %s to caller"), message) LOG.error(tb) failure = (failure[0].__name__, str(failure[1]), tb) - conn = Connection.instance(True) + conn = Connection.instance() publisher = DirectPublisher(connection=conn, msg_id=msg_id) try: publisher.send({'result': reply, 'failure': failure}) @@ -319,7 +327,7 @@ def call(context, topic, msg): self.result = data['result'] wait_msg = WaitMessage() - conn = Connection.instance(True) + conn = Connection.instance() consumer = DirectConsumer(connection=conn, msg_id=msg_id) consumer.register_callback(wait_msg) |