diff options
| author | Eric Windisch <eric@cloudscaling.com> | 2013-05-23 14:28:48 -0400 |
|---|---|---|
| committer | Eric Windisch <eric@cloudscaling.com> | 2013-05-23 14:28:48 -0400 |
| commit | 35aad91fad4fb92c2d557e13328b01f492686c46 (patch) | |
| tree | 7047ecf0a023af7342d8021bf7dada7619f18acc /openstack/common | |
| parent | cc1e163ddcd4de0deb58ff8caeb2afc62200c001 (diff) | |
| download | oslo-35aad91fad4fb92c2d557e13328b01f492686c46.tar.gz oslo-35aad91fad4fb92c2d557e13328b01f492686c46.tar.xz oslo-35aad91fad4fb92c2d557e13328b01f492686c46.zip | |
Use zero-copy in ZeroMQ proxy (zmq-rpc-receiver)
Now, messages will not be copied into the Python memory space
from the C library. The 'topic' key *will* be copied, to make
use of several string/bytes functions that are available to us.
(Potentially, we might be able to work with this as a memoryview
in the future as a further optimization?)
Implements blueprint zmq-zero-copy
Change-Id: I20a700867ac642b1a1788b31e3a21f8a299c97c8
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 21 |
1 files changed, 7 insertions, 14 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index add3973..139ee2c 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -199,15 +199,15 @@ class ZmqSocket(object): LOG.error("ZeroMQ socket could not be closed.") self.sock = None - def recv(self): + def recv(self, **kwargs): if not self.can_recv: raise RPCException(_("You cannot recv on this socket.")) - return self.sock.recv_multipart() + return self.sock.recv_multipart(**kwargs) - def send(self, data): + def send(self, data, **kwargs): if not self.can_send: raise RPCException(_("You cannot send on this socket.")) - self.sock.send_multipart(data) + self.sock.send_multipart(data, **kwargs) class ZmqClient(object): @@ -446,11 +446,8 @@ class ZmqProxy(ZmqBaseReactor): def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir - #TODO(ewindisch): use zero-copy (i.e. references, not copying) - data = sock.recv() - topic = data[1] - - LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) + data = sock.recv(copy=False) + topic = data[1].bytes if topic.startswith('fanout~'): sock_type = zmq.PUB @@ -492,9 +489,7 @@ class ZmqProxy(ZmqBaseReactor): while(True): data = self.topic_proxy[topic].get() - out_sock.send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % - {'data': data}) + out_sock.send(data, copy=False) wait_sock_creation = eventlet.event.Event() eventlet.spawn(publisher, wait_sock_creation) @@ -507,8 +502,6 @@ class ZmqProxy(ZmqBaseReactor): try: self.topic_proxy[topic].put_nowait(data) - LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % - {'data': data}) except eventlet.queue.Full: LOG.error(_("Local per-topic backlog buffer full for topic " "%(topic)s. Dropping message.") % {'topic': topic}) |
