From 35aad91fad4fb92c2d557e13328b01f492686c46 Mon Sep 17 00:00:00 2001 From: Eric Windisch Date: Thu, 23 May 2013 14:28:48 -0400 Subject: Use zero-copy in ZeroMQ proxy (zmq-rpc-receiver) Now, messages will not be copied into the Python memory space from the C library. The 'topic' key *will* be copied, to make use of several string/bytes functions that are available to us. (Potentially, we might be able to work with this as a memoryview in the future as a further optimization?) Implements blueprint zmq-zero-copy Change-Id: I20a700867ac642b1a1788b31e3a21f8a299c97c8 --- openstack/common/rpc/impl_zmq.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) (limited to 'openstack/common/rpc') diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index add3973..139ee2c 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -199,15 +199,15 @@ class ZmqSocket(object): LOG.error("ZeroMQ socket could not be closed.") self.sock = None - def recv(self): + def recv(self, **kwargs): if not self.can_recv: raise RPCException(_("You cannot recv on this socket.")) - return self.sock.recv_multipart() + return self.sock.recv_multipart(**kwargs) - def send(self, data): + def send(self, data, **kwargs): if not self.can_send: raise RPCException(_("You cannot send on this socket.")) - self.sock.send_multipart(data) + self.sock.send_multipart(data, **kwargs) class ZmqClient(object): @@ -446,11 +446,8 @@ class ZmqProxy(ZmqBaseReactor): def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir - #TODO(ewindisch): use zero-copy (i.e. references, not copying) - data = sock.recv() - topic = data[1] - - LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) + data = sock.recv(copy=False) + topic = data[1].bytes if topic.startswith('fanout~'): sock_type = zmq.PUB @@ -492,9 +489,7 @@ class ZmqProxy(ZmqBaseReactor): while(True): data = self.topic_proxy[topic].get() - out_sock.send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % - {'data': data}) + out_sock.send(data, copy=False) wait_sock_creation = eventlet.event.Event() eventlet.spawn(publisher, wait_sock_creation) @@ -507,8 +502,6 @@ class ZmqProxy(ZmqBaseReactor): try: self.topic_proxy[topic].put_nowait(data) - LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % - {'data': data}) except eventlet.queue.Full: LOG.error(_("Local per-topic backlog buffer full for topic " "%(topic)s. Dropping message.") % {'topic': topic}) -- cgit