summaryrefslogtreecommitdiffstats
path: root/nova/openstack
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-08-07 17:29:00 +0000
committerGerrit Code Review <review@openstack.org>2012-08-07 17:29:00 +0000
commit210ed0fc80442d7e7213f756d72ce24179e12be8 (patch)
tree48f4ba95c8d7ef002cd4a12ef40594cd9a7e7933 /nova/openstack
parent5c42f4735fac88591177ce0b709a0e94b8bfc961 (diff)
parent23c63e08e8e068c9913a95171773cba5052fd15b (diff)
downloadnova-210ed0fc80442d7e7213f756d72ce24179e12be8.tar.gz
nova-210ed0fc80442d7e7213f756d72ce24179e12be8.tar.xz
nova-210ed0fc80442d7e7213f756d72ce24179e12be8.zip
Merge "Update RPC code from common"
Diffstat (limited to 'nova/openstack')
-rw-r--r--nova/openstack/common/rpc/__init__.py4
-rw-r--r--nova/openstack/common/rpc/amqp.py2
-rw-r--r--nova/openstack/common/rpc/common.py6
-rw-r--r--nova/openstack/common/rpc/impl_qpid.py2
-rw-r--r--nova/openstack/common/rpc/impl_zmq.py45
-rw-r--r--nova/openstack/common/rpc/proxy.py12
6 files changed, 40 insertions, 31 deletions
diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py
index 06f2dfb2c..d2faff3de 100644
--- a/nova/openstack/common/rpc/__init__.py
+++ b/nova/openstack/common/rpc/__init__.py
@@ -47,7 +47,9 @@ rpc_opts = [
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
- default=['nova.openstack.common.exception', 'nova.exception'],
+ default=['nova.openstack.common.exception',
+ 'nova.exception',
+ ],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.StrOpt('control_exchange',
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py
index 13ff5665e..09f20fe81 100644
--- a/nova/openstack/common/rpc/amqp.py
+++ b/nova/openstack/common/rpc/amqp.py
@@ -35,8 +35,8 @@ from eventlet import pools
from eventlet import semaphore
from nova.openstack.common import excutils
-from nova.openstack.common import local
from nova.openstack.common.gettextutils import _
+from nova.openstack.common import local
from nova.openstack.common.rpc import common as rpc_common
diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py
index 7bc65a6a6..01db756eb 100644
--- a/nova/openstack/common/rpc/common.py
+++ b/nova/openstack/common/rpc/common.py
@@ -108,7 +108,7 @@ class Connection(object):
"""
raise NotImplementedError()
- def create_consumer(self, conf, topic, proxy, fanout=False):
+ def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer on this connection.
A consumer is associated with a message queue on the backend message
@@ -117,7 +117,6 @@ class Connection(object):
off of the queue will determine which method gets called on the proxy
object.
- :param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
topic. For example, all instances of nova-compute consume
@@ -133,7 +132,7 @@ class Connection(object):
"""
raise NotImplementedError()
- def create_worker(self, conf, topic, proxy, pool_name):
+ def create_worker(self, topic, proxy, pool_name):
"""Create a worker on this connection.
A worker is like a regular consumer of messages directed to a
@@ -143,7 +142,6 @@ class Connection(object):
be asked to process it. Load is distributed across the members
of the pool in round-robin fashion.
- :param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
topic.
diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py
index 202988faf..4663f0046 100644
--- a/nova/openstack/common/rpc/impl_qpid.py
+++ b/nova/openstack/common/rpc/impl_qpid.py
@@ -329,7 +329,7 @@ class Connection(object):
if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = (
self.conf.qpid_reconnect_interval)
- self.connection.hearbeat = self.conf.qpid_heartbeat
+ self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py
index fc342de50..6f215891b 100644
--- a/nova/openstack/common/rpc/impl_zmq.py
+++ b/nova/openstack/common/rpc/impl_zmq.py
@@ -47,9 +47,12 @@ zmq_opts = [
'address.'),
# The module.Class to use for matchmaking.
- cfg.StrOpt('rpc_zmq_matchmaker',
- default='nova.openstack.common.rpc.matchmaker.MatchMakerLocalhost',
- help='MatchMaker driver'),
+ cfg.StrOpt(
+ 'rpc_zmq_matchmaker',
+ default=('nova.openstack.common.rpc.'
+ 'matchmaker.MatchMakerLocalhost'),
+ help='MatchMaker driver',
+ ),
# The following port is unassigned by IANA as of 2012-05-21
cfg.IntOpt('rpc_zmq_port', default=9501,
@@ -60,15 +63,16 @@ zmq_opts = [
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
+
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
help='Name of this node. Must be a valid hostname, FQDN, or '
- 'IP address')
+ 'IP address. Must match "host" option, if running Nova.')
]
# These globals are defined in register_opts(conf),
# a mandatory initialization call
-FLAGS = None
+CONF = None
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
@@ -270,7 +274,7 @@ class InternalContext(object):
ctx.replies)
LOG.debug(_("Sending reply"))
- cast(FLAGS, ctx, topic, {
+ cast(CONF, ctx, topic, {
'method': '-process_reply',
'args': {
'msg_id': msg_id,
@@ -325,7 +329,6 @@ class ZmqBaseReactor(ConsumerBase):
def __init__(self, conf):
super(ZmqBaseReactor, self).__init__()
- self.conf = conf
self.mapping = {}
self.proxies = {}
self.threads = []
@@ -401,7 +404,7 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
- ipc_dir = conf.rpc_zmq_ipc_dir
+ ipc_dir = CONF.rpc_zmq_ipc_dir
self.topic_proxy['zmq_replies'] = \
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
@@ -409,7 +412,7 @@ class ZmqProxy(ZmqBaseReactor):
self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
- ipc_dir = self.conf.rpc_zmq_ipc_dir
+ ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
@@ -483,7 +486,6 @@ class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
- self.conf = conf
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
@@ -504,7 +506,7 @@ class Connection(rpc_common.Connection):
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
- (self.conf.rpc_zmq_ipc_dir, topic)
+ (CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB])
@@ -523,7 +525,7 @@ class Connection(rpc_common.Connection):
def _cast(addr, context, msg_id, topic, msg, timeout=None):
- timeout_cast = timeout or FLAGS.rpc_cast_timeout
+ timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
with Timeout(timeout_cast, exception=rpc_common.Timeout):
@@ -541,13 +543,13 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
def _call(addr, context, msg_id, topic, msg, timeout=None):
# timeout_response is how long we wait for a response
- timeout = timeout or FLAGS.rpc_response_timeout
+ timeout = timeout or CONF.rpc_response_timeout
# The msg_id is used to track replies.
msg_id = str(uuid.uuid4().hex)
# Replies always come into the reply service.
- reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host
+ reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
LOG.debug(_("Creating payload"))
# Curry the original request into a reply method.
@@ -569,7 +571,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
+ "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
zmq.SUB, subscribe=msg_id, bind=False
)
@@ -595,7 +597,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
# responses for Exceptions.
for resp in responses:
if isinstance(resp, types.DictType) and 'exc' in resp:
- raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
+ raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
return responses[-1]
@@ -606,7 +608,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
dispatches to the matchmaker and sends
message to all relevant hosts.
"""
- conf = FLAGS
+ conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = matchmaker.queues(topic)
@@ -693,11 +695,11 @@ def register_opts(conf):
# We memoize through these globals
global ZMQ_CTX
global matchmaker
- global FLAGS
+ global CONF
- if not FLAGS:
+ if not CONF:
conf.register_opts(zmq_opts)
- FLAGS = conf
+ CONF = conf
# Don't re-set, if this method is called twice.
if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
@@ -716,3 +718,6 @@ def register_opts(conf):
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
+
+
+register_opts(cfg.CONF)
diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py
index fe28cdde5..a0775528e 100644
--- a/nova/openstack/common/rpc/proxy.py
+++ b/nova/openstack/common/rpc/proxy.py
@@ -112,11 +112,12 @@ class RpcProxy(object):
self._set_version(msg, version)
rpc.cast(context, self._get_topic(topic), msg)
- def fanout_cast(self, context, msg, version=None):
+ def fanout_cast(self, context, msg, topic=None, version=None):
"""rpc.fanout_cast() a remote method.
:param context: The request context
:param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
:param version: (Optional) Override the requested API version in this
message.
@@ -124,7 +125,7 @@ class RpcProxy(object):
from the remote method.
"""
self._set_version(msg, version)
- rpc.fanout_cast(context, self.topic, msg)
+ rpc.fanout_cast(context, self._get_topic(topic), msg)
def cast_to_server(self, context, server_params, msg, topic=None,
version=None):
@@ -144,13 +145,15 @@ class RpcProxy(object):
self._set_version(msg, version)
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
- def fanout_cast_to_server(self, context, server_params, msg, version=None):
+ def fanout_cast_to_server(self, context, server_params, msg, topic=None,
+ version=None):
"""rpc.fanout_cast_to_server() a remote method.
:param context: The request context
:param server_params: Server parameters. See rpc.cast_to_server() for
details.
:param msg: The message to send, including the method and args.
+ :param topic: Override the topic for this message.
:param version: (Optional) Override the requested API version in this
message.
@@ -158,4 +161,5 @@ class RpcProxy(object):
return values.
"""
self._set_version(msg, version)
- rpc.fanout_cast_to_server(context, server_params, self.topic, msg)
+ rpc.fanout_cast_to_server(context, server_params,
+ self._get_topic(topic), msg)