summaryrefslogtreecommitdiffstats
path: root/nova/openstack
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-02-21 20:57:11 +0000
committerGerrit Code Review <review@openstack.org>2013-02-21 20:57:11 +0000
commit607db3f06f52033b511716cfc80fba81c21b5600 (patch)
treeec3818258f110625685f3e1ee325c7762dfdc055 /nova/openstack
parentf13ce11ba634764f41f05938b0e3cee3469a9a0e (diff)
parentc04bdc81c9335fc26ba84890459820ac7538c7ab (diff)
downloadnova-607db3f06f52033b511716cfc80fba81c21b5600.tar.gz
nova-607db3f06f52033b511716cfc80fba81c21b5600.tar.xz
nova-607db3f06f52033b511716cfc80fba81c21b5600.zip
Merge "Sync rpc from oslo-incubator."
Diffstat (limited to 'nova/openstack')
-rw-r--r--nova/openstack/common/rpc/__init__.py1
-rw-r--r--nova/openstack/common/rpc/impl_kombu.py1
-rw-r--r--nova/openstack/common/rpc/impl_zmq.py121
3 files changed, 86 insertions, 37 deletions
diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py
index ff72c3f8e..991820b7c 100644
--- a/nova/openstack/common/rpc/__init__.py
+++ b/nova/openstack/common/rpc/__init__.py
@@ -34,6 +34,7 @@ from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
from nova.openstack.common import local
+
LOG = logging.getLogger(__name__)
diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py
index 5e1846f91..180685cf3 100644
--- a/nova/openstack/common/rpc/impl_kombu.py
+++ b/nova/openstack/common/rpc/impl_kombu.py
@@ -198,6 +198,7 @@ class DirectConsumer(ConsumerBase):
"""
# Default options
options = {'durable': False,
+ 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
'exclusive': False}
options.update(kwargs)
diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py
index 20a7f923d..c1cca34e8 100644
--- a/nova/openstack/common/rpc/impl_zmq.py
+++ b/nova/openstack/common/rpc/impl_zmq.py
@@ -216,12 +216,18 @@ class ZmqClient(object):
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
- if serialize:
- data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
+ if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'cast', _serialize(data))))
+ return
+
+ rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
+ zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
+ self.outq.send(map(bytes,
+ (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
@@ -320,7 +326,7 @@ class ConsumerBase(object):
else:
return [result]
- def process(self, style, target, proxy, ctx, data):
+ def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
@@ -432,12 +438,14 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- msg_id, topic, style, in_msg = data
- topic = topic.split('.', 1)[0]
+ topic = data[1]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
+ topic = topic.split('.', 1)[0]
+ elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
else:
sock_type = zmq.PUSH
@@ -520,6 +528,21 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).consume_in_thread()
+def unflatten_envelope(packenv):
+ """Unflattens the RPC envelope.
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
+ """
+ i = iter(packenv)
+ h = {}
+ try:
+ while True:
+ k = i.next()
+ h[k] = i.next()
+ except StopIteration:
+ return h
+
+
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@@ -540,38 +563,50 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
- msg_id, topic, style, in_msg = data
+ proxy = self.proxies[sock]
- ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
- ctx = RpcContext.unmarshal(ctx)
+ if data[2] == 'cast': # Legacy protocol
+ packenv = data[3]
- proxy = self.proxies[sock]
+ ctx, msg = _deserialize(packenv)
+ request = rpc_common.deserialize_msg(msg)
+ ctx = RpcContext.unmarshal(ctx)
+ elif data[2] == 'impl_zmq_v2':
+ packenv = data[4:]
+
+ msg = unflatten_envelope(packenv)
+ request = rpc_common.deserialize_msg(msg)
+
+ # Unmarshal only after verifying the message.
+ ctx = RpcContext.unmarshal(data[3])
+ else:
+ LOG.error(_("ZMQ Envelope version unsupported or unknown."))
+ return
- self.pool.spawn_n(self.process, style, topic,
- proxy, ctx, request)
+ self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
+ self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
- # Only consume on the base topic name.
- topic = topic.split('.', 1)[0]
-
- LOG.info(_("Create Consumer for topic (%(topic)s)") %
- {'topic': topic})
-
# Subscription scenarios
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
- topic = 'fanout~' + topic
+ subscribe = ('', fanout)[type(fanout) == str]
+ topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
+ topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+
+ if topic in self.topics:
+ LOG.info(_("Skipping topic registration. Already registered."))
+ return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
@@ -582,9 +617,11 @@ class Connection(rpc_common.Connection):
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
+ self.topics.append(topic)
def close(self):
self.reactor.close()
+ self.topics = []
def wait(self):
self.reactor.wait()
@@ -593,8 +630,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False, _msg_id=None):
+def _cast(addr, context, topic, msg, timeout=None, envelope=False,
+ _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -603,7 +640,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(_msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -612,7 +649,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
def _call(addr, context, topic, msg, timeout=None,
- serialize=True, force_envelope=False):
+ envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -642,20 +679,31 @@ def _call(addr, context, topic, msg, timeout=None,
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
+ "ipc://%s/zmq_topic_zmq_replies.%s" %
+ (CONF.rpc_zmq_ipc_dir,
+ CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, topic, payload,
- serialize=serialize, force_envelope=force_envelope)
+ _cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])[-1]['args']['response']
+
+ if msg[2] == 'cast': # Legacy version
+ raw_msg = _deserialize(msg[-1])[-1]
+ elif msg[2] == 'impl_zmq_v2':
+ rpc_envelope = unflatten_envelope(msg[4:])
+ raw_msg = rpc_common.deserialize_msg(rpc_envelope)
+ else:
+ raise rpc_common.UnsupportedRpcEnvelopeVersion(
+ _("Unsupported or unknown ZMQ envelope returned."))
+
+ responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
@@ -676,8 +724,8 @@ def _call(addr, context, topic, msg, timeout=None,
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False, _msg_id=None):
+def _multi_send(method, context, topic, msg, timeout=None,
+ envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -703,11 +751,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, msg, timeout, serialize,
- force_envelope, _msg_id)
+ _topic, msg, timeout, envelope,
+ _msg_id)
return
return method(_addr, context, _topic, msg, timeout,
- serialize, force_envelope)
+ envelope)
def create_connection(conf, new=True):
@@ -746,8 +794,7 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
- kwargs['serialize'] = kwargs.pop('envelope')
- kwargs['force_envelope'] = True
+ kwargs['envelope'] = kwargs.get('envelope', True)
cast(conf, context, topic, msg, **kwargs)