diff options
| author | Mark McLoughlin <markmc@redhat.com> | 2012-12-05 15:12:15 +0000 |
|---|---|---|
| committer | Russell Bryant <rbryant@redhat.com> | 2012-12-05 14:38:08 -0500 |
| commit | b6d589389fefce024934bb5bd977da8efc89f6dd (patch) | |
| tree | b36c458e97c62df721d352304721941c6b593ce0 /nova/openstack | |
| parent | 02f4b1b9f674a074a91c30585a190e3e5ed85c7a (diff) | |
| download | nova-b6d589389fefce024934bb5bd977da8efc89f6dd.tar.gz nova-b6d589389fefce024934bb5bd977da8efc89f6dd.tar.xz nova-b6d589389fefce024934bb5bd977da8efc89f6dd.zip | |
Sync latest openstack.common.rpc
Changes since last sync:
202f568 Use json instead of jsonutils in rpc.impl_fake.
6d102bc Provide i18n to those messages without _()
8695285 Qpid H/A cluster support
faeafe1 Fixes import order
cf705c5 Make project pyflakes clean.
05f8ec7 Fix common rpc to use common logging instead of python logging
b6d24bb updating sphinx documentation
33fbd87 Added initialize_service_hook for rpc.Service.
cf849e0 Clean up dictionary use in RPC drivers
Change-Id: I4fbade51390e159bd9cccd2afc918a4f07740993
Diffstat (limited to 'nova/openstack')
| -rw-r--r-- | nova/openstack/common/rpc/amqp.py | 11 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/common.py | 6 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/dispatcher.py | 10 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/impl_fake.py | 9 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/impl_qpid.py | 25 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/matchmaker.py | 2 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/service.py | 5 |
7 files changed, 43 insertions, 25 deletions
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index 3324e3758..60bff59fe 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code. """ import inspect -import logging import sys import uuid @@ -38,6 +37,7 @@ from nova.openstack.common import cfg from nova.openstack.common import excutils from nova.openstack.common.gettextutils import _ from nova.openstack.common import local +from nova.openstack.common import log as logging from nova.openstack.common.rpc import common as rpc_common @@ -55,7 +55,7 @@ class Pool(pools.Pool): # TODO(comstud): Timeout connections not used in a while def create(self): - LOG.debug('Pool creating new connection') + LOG.debug(_('Pool creating new connection')) return self.connection_cls(self.conf) def empty(self): @@ -282,7 +282,7 @@ class ProxyCallback(object): ctxt.reply(rval, None, connection_pool=self.connection_pool) # This final None tells multicall that it is done. ctxt.reply(ending=True, connection_pool=self.connection_pool) - except Exception as e: + except Exception: LOG.exception(_('Exception during message handling')) ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) @@ -407,8 +407,9 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, def notify(conf, context, topic, msg, connection_pool): """Sends a notification event on a topic.""" - event_type = msg.get('event_type') - LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals()) + LOG.debug(_('Sending %(event_type)s on %(topic)s'), + dict(event_type=msg.get('event_type'), + topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.notify_send(topic, msg) diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py index eb3416804..73a18012c 100644 --- a/nova/openstack/common/rpc/common.py +++ b/nova/openstack/common/rpc/common.py @@ -18,13 +18,13 @@ # under the License. import copy -import logging import traceback from nova.openstack.common.gettextutils import _ from nova.openstack.common import importutils from nova.openstack.common import jsonutils from nova.openstack.common import local +from nova.openstack.common import log as logging LOG = logging.getLogger(__name__) @@ -40,7 +40,7 @@ class RPCException(Exception): try: message = self.message % kwargs - except Exception as e: + except Exception: # kwargs doesn't match a variable in the message # log the issue and the kwargs LOG.exception(_('Exception in string format operation')) @@ -258,7 +258,7 @@ def deserialize_remote_exception(conf, data): # we cannot necessarily change an exception message so we must override # the __str__ method. failure.__class__ = new_ex_type - except TypeError as e: + except TypeError: # NOTE(ameade): If a core exception then just add the traceback to the # first exception argument. failure.args = (message,) + failure.args[1:] diff --git a/nova/openstack/common/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py index 34c2954db..4dee5d509 100644 --- a/nova/openstack/common/rpc/dispatcher.py +++ b/nova/openstack/common/rpc/dispatcher.py @@ -41,8 +41,8 @@ server side of the API at the same time. However, as the code stands today, there can be both versioned and unversioned APIs implemented in the same code base. - -EXAMPLES: +EXAMPLES +======== Nova was the first project to use versioned rpc APIs. Consider the compute rpc API as an example. The client side is in nova/compute/rpcapi.py and the server @@ -50,12 +50,13 @@ side is in nova/compute/manager.py. Example 1) Adding a new method. +------------------------------- Adding a new method is a backwards compatible change. It should be added to nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should have a specific version specified to indicate the minimum API version that must -be implemented for the method to be supported. For example: +be implemented for the method to be supported. For example:: def get_host_uptime(self, ctxt, host): topic = _compute_topic(self.topic, ctxt, host, None) @@ -67,10 +68,11 @@ get_host_uptime() method. Example 2) Adding a new parameter. +---------------------------------- Adding a new parameter to an rpc method can be made backwards compatible. The RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. -The implementation of the method must not expect the parameter to be present. +The implementation of the method must not expect the parameter to be present.:: def some_remote_method(self, arg1, arg2, newarg=None): # The code needs to deal with newarg=None for cases diff --git a/nova/openstack/common/rpc/impl_fake.py b/nova/openstack/common/rpc/impl_fake.py index 8db0da015..0e60da3f7 100644 --- a/nova/openstack/common/rpc/impl_fake.py +++ b/nova/openstack/common/rpc/impl_fake.py @@ -18,11 +18,15 @@ queues. Casts will block, but this is very useful for tests. """ import inspect +# NOTE(russellb): We specifically want to use json, not our own jsonutils. +# jsonutils has some extra logic to automatically convert objects to primitive +# types so that they can be serialized. We want to catch all cases where +# non-primitive types make it into this code and treat it as an error. +import json import time import eventlet -from nova.openstack.common import jsonutils from nova.openstack.common.rpc import common as rpc_common CONSUMERS = {} @@ -121,7 +125,7 @@ def create_connection(conf, new=True): def check_serialize(msg): """Make sure a message intended for rpc can be serialized.""" - jsonutils.dumps(msg) + json.dumps(msg) def multicall(conf, context, topic, msg, timeout=None): @@ -154,6 +158,7 @@ def call(conf, context, topic, msg, timeout=None): def cast(conf, context, topic, msg): + check_serialize(msg) try: call(conf, context, topic, msg) except Exception: diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index b87050753..5570ea867 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -17,7 +17,6 @@ import functools import itertools -import logging import time import uuid @@ -29,6 +28,7 @@ import qpid.messaging.exceptions from nova.openstack.common import cfg from nova.openstack.common.gettextutils import _ from nova.openstack.common import jsonutils +from nova.openstack.common import log as logging from nova.openstack.common.rpc import amqp as rpc_amqp from nova.openstack.common.rpc import common as rpc_common @@ -41,6 +41,9 @@ qpid_opts = [ cfg.StrOpt('qpid_port', default='5672', help='Qpid broker port'), + cfg.ListOpt('qpid_hosts', + default=['$qpid_hostname:$qpid_port'], + help='Qpid HA cluster host:port pairs'), cfg.StrOpt('qpid_username', default='', help='Username for qpid connection'), @@ -277,22 +280,21 @@ class Connection(object): self.conf = conf params = { - 'hostname': self.conf.qpid_hostname, - 'port': self.conf.qpid_port, + 'qpid_hosts': self.conf.qpid_hosts, 'username': self.conf.qpid_username, 'password': self.conf.qpid_password, } params.update(server_params or {}) - self.broker = params['hostname'] + ":" + str(params['port']) + self.brokers = params['qpid_hosts'] self.username = params['username'] self.password = params['password'] - self.connection_create() + self.connection_create(self.brokers[0]) self.reconnect() - def connection_create(self): + def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(self.broker) + self.connection = qpid.messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -320,10 +322,14 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + attempt = 0 delay = 1 while True: + broker = self.brokers[attempt % len(self.brokers)] + attempt += 1 + try: - self.connection_create() + self.connection_create(broker) self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) @@ -333,10 +339,9 @@ class Connection(object): time.sleep(delay) delay = min(2 * delay, 60) else: + LOG.info(_('Connected to AMQP server on %s'), broker) break - LOG.info(_('Connected to AMQP server on %s'), self.broker) - self.session = self.connection.session() if self.consumers: diff --git a/nova/openstack/common/rpc/matchmaker.py b/nova/openstack/common/rpc/matchmaker.py index 783e3713c..8b2c67a44 100644 --- a/nova/openstack/common/rpc/matchmaker.py +++ b/nova/openstack/common/rpc/matchmaker.py @@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance. import contextlib import itertools import json -import logging from nova.openstack.common import cfg from nova.openstack.common.gettextutils import _ +from nova.openstack.common import log as logging matchmaker_opts = [ diff --git a/nova/openstack/common/rpc/service.py b/nova/openstack/common/rpc/service.py index 15508e432..94dc7960e 100644 --- a/nova/openstack/common/rpc/service.py +++ b/nova/openstack/common/rpc/service.py @@ -57,6 +57,11 @@ class Service(service.Service): self.conn.create_consumer(self.topic, dispatcher, fanout=True) + # Hook to allow the manager to do other initializations after + # the rpc connection is created. + if callable(getattr(self.manager, 'initialize_service_hook', None)): + self.manager.initialize_service_hook(self) + # Consume from all consumers in a thread self.conn.consume_in_thread() |
