diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-02-21 20:57:11 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-02-21 20:57:11 +0000 |
| commit | 607db3f06f52033b511716cfc80fba81c21b5600 (patch) | |
| tree | ec3818258f110625685f3e1ee325c7762dfdc055 /nova/openstack | |
| parent | f13ce11ba634764f41f05938b0e3cee3469a9a0e (diff) | |
| parent | c04bdc81c9335fc26ba84890459820ac7538c7ab (diff) | |
| download | nova-607db3f06f52033b511716cfc80fba81c21b5600.tar.gz nova-607db3f06f52033b511716cfc80fba81c21b5600.tar.xz nova-607db3f06f52033b511716cfc80fba81c21b5600.zip | |
Merge "Sync rpc from oslo-incubator."
Diffstat (limited to 'nova/openstack')
| -rw-r--r-- | nova/openstack/common/rpc/__init__.py | 1 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/impl_kombu.py | 1 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/impl_zmq.py | 121 |
3 files changed, 86 insertions, 37 deletions
diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py index ff72c3f8e..991820b7c 100644 --- a/nova/openstack/common/rpc/__init__.py +++ b/nova/openstack/common/rpc/__init__.py @@ -34,6 +34,7 @@ from nova.openstack.common.gettextutils import _ from nova.openstack.common import importutils from nova.openstack.common import local + LOG = logging.getLogger(__name__) diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py index 5e1846f91..180685cf3 100644 --- a/nova/openstack/common/rpc/impl_kombu.py +++ b/nova/openstack/common/rpc/impl_kombu.py @@ -198,6 +198,7 @@ class DirectConsumer(ConsumerBase): """ # Default options options = {'durable': False, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': True, 'exclusive': False} options.update(kwargs) diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py index 20a7f923d..c1cca34e8 100644 --- a/nova/openstack/common/rpc/impl_zmq.py +++ b/nova/openstack/common/rpc/impl_zmq.py @@ -216,12 +216,18 @@ class ZmqClient(object): socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + def cast(self, msg_id, topic, data, envelope=False): msg_id = msg_id or 0 - if serialize: - data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) + if not (envelope or rpc_common._SEND_RPC_ENVELOPE): + self.outq.send(map(bytes, + (msg_id, topic, 'cast', _serialize(data)))) + return + + rpc_envelope = rpc_common.serialize_msg(data[1], envelope) + zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items()) + self.outq.send(map(bytes, + (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg)) def close(self): self.outq.close() @@ -320,7 +326,7 @@ class ConsumerBase(object): else: return [result] - def process(self, style, target, proxy, ctx, data): + def process(self, proxy, ctx, data): data.setdefault('version', None) data.setdefault('args', {}) @@ -432,12 +438,14 @@ class ZmqProxy(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - msg_id, topic, style, in_msg = data - topic = topic.split('.', 1)[0] + topic = data[1] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - if topic.startswith('fanout~') or topic.startswith('zmq_replies'): + if topic.startswith('fanout~'): + sock_type = zmq.PUB + topic = topic.split('.', 1)[0] + elif topic.startswith('zmq_replies'): sock_type = zmq.PUB else: sock_type = zmq.PUSH @@ -520,6 +528,21 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).consume_in_thread() +def unflatten_envelope(packenv): + """Unflattens the RPC envelope. + Takes a list and returns a dictionary. + i.e. [1,2,3,4] => {1: 2, 3: 4} + """ + i = iter(packenv) + h = {} + try: + while True: + k = i.next() + h[k] = i.next() + except StopIteration: + return h + + class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -540,38 +563,50 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - msg_id, topic, style, in_msg = data + proxy = self.proxies[sock] - ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg)) - ctx = RpcContext.unmarshal(ctx) + if data[2] == 'cast': # Legacy protocol + packenv = data[3] - proxy = self.proxies[sock] + ctx, msg = _deserialize(packenv) + request = rpc_common.deserialize_msg(msg) + ctx = RpcContext.unmarshal(ctx) + elif data[2] == 'impl_zmq_v2': + packenv = data[4:] + + msg = unflatten_envelope(packenv) + request = rpc_common.deserialize_msg(msg) + + # Unmarshal only after verifying the message. + ctx = RpcContext.unmarshal(data[3]) + else: + LOG.error(_("ZMQ Envelope version unsupported or unknown.")) + return - self.pool.spawn_n(self.process, style, topic, - proxy, ctx, request) + self.pool.spawn_n(self.process, proxy, ctx, request) class Connection(rpc_common.Connection): """Manages connections and threads.""" def __init__(self, conf): + self.topics = [] self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): - # Only consume on the base topic name. - topic = topic.split('.', 1)[0] - - LOG.info(_("Create Consumer for topic (%(topic)s)") % - {'topic': topic}) - # Subscription scenarios if fanout: - subscribe = ('', fanout)[type(fanout) == str] sock_type = zmq.SUB - topic = 'fanout~' + topic + subscribe = ('', fanout)[type(fanout) == str] + topic = 'fanout~' + topic.split('.', 1)[0] else: sock_type = zmq.PULL subscribe = None + topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) + + if topic in self.topics: + LOG.info(_("Skipping topic registration. Already registered.")) + return # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ @@ -582,9 +617,11 @@ class Connection(rpc_common.Connection): self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) + self.topics.append(topic) def close(self): self.reactor.close() + self.topics = [] def wait(self): self.reactor.wait() @@ -593,8 +630,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, topic, msg, timeout=None, serialize=True, - force_envelope=False, _msg_id=None): +def _cast(addr, context, topic, msg, timeout=None, envelope=False, + _msg_id=None): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -603,7 +640,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True, conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(_msg_id, topic, payload, serialize, force_envelope) + conn.cast(_msg_id, topic, payload, envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -612,7 +649,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True, def _call(addr, context, topic, msg, timeout=None, - serialize=True, force_envelope=False): + envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -642,20 +679,31 @@ def _call(addr, context, topic, msg, timeout=None, with Timeout(timeout, exception=rpc_common.Timeout): try: msg_waiter = ZmqSocket( - "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir, + "ipc://%s/zmq_topic_zmq_replies.%s" % + (CONF.rpc_zmq_ipc_dir, + CONF.rpc_zmq_host), zmq.SUB, subscribe=msg_id, bind=False ) LOG.debug(_("Sending cast")) - _cast(addr, context, topic, payload, - serialize=serialize, force_envelope=force_envelope) + _cast(addr, context, topic, payload, envelope) LOG.debug(_("Cast sent; Waiting reply")) # Blocks until receives reply msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1])[-1]['args']['response'] + + if msg[2] == 'cast': # Legacy version + raw_msg = _deserialize(msg[-1])[-1] + elif msg[2] == 'impl_zmq_v2': + rpc_envelope = unflatten_envelope(msg[4:]) + raw_msg = rpc_common.deserialize_msg(rpc_envelope) + else: + raise rpc_common.UnsupportedRpcEnvelopeVersion( + _("Unsupported or unknown ZMQ envelope returned.")) + + responses = raw_msg['args']['response'] # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") @@ -676,8 +724,8 @@ def _call(addr, context, topic, msg, timeout=None, return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None, serialize=True, - force_envelope=False, _msg_id=None): +def _multi_send(method, context, topic, msg, timeout=None, + envelope=False, _msg_id=None): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -703,11 +751,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, msg, timeout, serialize, - force_envelope, _msg_id) + _topic, msg, timeout, envelope, + _msg_id) return return method(_addr, context, _topic, msg, timeout, - serialize, force_envelope) + envelope) def create_connection(conf, new=True): @@ -746,8 +794,7 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') - kwargs['serialize'] = kwargs.pop('envelope') - kwargs['force_envelope'] = True + kwargs['envelope'] = kwargs.get('envelope', True) cast(conf, context, topic, msg, **kwargs) |
