diff options
Diffstat (limited to 'openstack/common/rpc/impl_zmq.py')
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 67 |
1 files changed, 48 insertions, 19 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index aab664c..5c9d190 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -58,6 +58,9 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_port', default=9501, help='ZeroMQ receiver listening port'), + cfg.IntOpt('rpc_zmq_port_pub', default=9502, + help='ZeroMQ fanout publisher port'), + cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), @@ -206,7 +209,7 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data): - self.outq.send([str(msg_id), str(topic), str('cast'), + self.outq.send([str(topic), str(msg_id), str('cast'), _serialize(data)]) def close(self): @@ -299,6 +302,9 @@ class ConsumerBase(object): else: return [result] + def consume(self, sock): + raise NotImplementedError() + def process(self, style, target, proxy, ctx, data): # Method starting with - are # processed internally. (non-valid method name) @@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor): zmq.PUB, bind=True) self.sockets.append(self.topic_proxy['zmq_replies']) + self.topic_proxy['fanout~'] = \ + ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port_pub), zmq.PUB, bind=True) + self.sockets.append(self.topic_proxy['fanout~']) + def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - msg_id, topic, style, in_msg = data + topic, msg_id, style, in_msg = data topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) @@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor): # Handle zmq_replies magic if topic.startswith('fanout~'): sock_type = zmq.PUB + + # This doesn't change what is in the message, + # it only specifies that these messages go to + # the generic fanout topic. + topic = 'fanout~' elif topic.startswith('zmq_replies'): sock_type = zmq.PUB inside = _deserialize(in_msg) @@ -434,17 +450,12 @@ class ZmqProxy(ZmqBaseReactor): else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) - - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) self.topic_proxy[topic].send(data) @@ -471,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - msg_id, topic, style, in_msg = data + topic, msg_id, style, in_msg = data ctx, request = _deserialize(in_msg) ctx = RpcContext.unmarshal(ctx) @@ -488,6 +499,11 @@ class Connection(rpc_common.Connection): def __init__(self, conf): self.reactor = ZmqReactor(conf) + def _consume_fanout(self, reactor, topic, proxy, bind=False): + for topic, host in matchmaker.queues("publishers~%s" % (topic, )): + inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port) + reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind) + def create_consumer(self, topic, proxy, fanout=False): # Only consume on the base topic name. topic = topic.split('.', 1)[0] @@ -495,22 +511,35 @@ class Connection(rpc_common.Connection): LOG.info(_("Create Consumer for topic (%(topic)s)") % {'topic': topic}) - # Subscription scenarios + # Consume direct-push fanout messages (relay to local consumers) if fanout: - subscribe = ('', fanout)[type(fanout) == str] + # If we're not in here, we can't receive direct fanout messages + if CONF.rpc_zmq_host in matchmaker.queues(topic): + # Consume from all remote publishers. + self._consume_fanout(self.reactor, topic, proxy) + else: + LOG.warn("This service cannot receive direct PUSH fanout " + "messages without being known by the matchmaker.") + return + + # Configure consumer for direct pushes. + subscribe = (topic, fanout)[type(fanout) == str] sock_type = zmq.SUB topic = 'fanout~' + topic + + inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, ) else: sock_type = zmq.PULL subscribe = None - # Receive messages from (local) proxy - inaddr = "ipc://%s/zmq_topic_%s" % \ - (CONF.rpc_zmq_ipc_dir, topic) + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (CONF.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), ['PULL', 'SUB'][sock_type == zmq.SUB]) + # Consume messages from local rpc-zmq-receiver daemon. self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) |
