summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2013-05-23 14:28:48 -0400
committerEric Windisch <eric@cloudscaling.com>2013-05-23 14:28:48 -0400
commit35aad91fad4fb92c2d557e13328b01f492686c46 (patch)
tree7047ecf0a023af7342d8021bf7dada7619f18acc /openstack/common
parentcc1e163ddcd4de0deb58ff8caeb2afc62200c001 (diff)
downloadoslo-35aad91fad4fb92c2d557e13328b01f492686c46.tar.gz
oslo-35aad91fad4fb92c2d557e13328b01f492686c46.tar.xz
oslo-35aad91fad4fb92c2d557e13328b01f492686c46.zip
Use zero-copy in ZeroMQ proxy (zmq-rpc-receiver)
Now, messages will not be copied into the Python memory space from the C library. The 'topic' key *will* be copied, to make use of several string/bytes functions that are available to us. (Potentially, we might be able to work with this as a memoryview in the future as a further optimization?) Implements blueprint zmq-zero-copy Change-Id: I20a700867ac642b1a1788b31e3a21f8a299c97c8
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/impl_zmq.py21
1 files changed, 7 insertions, 14 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index add3973..139ee2c 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -199,15 +199,15 @@ class ZmqSocket(object):
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
- def recv(self):
+ def recv(self, **kwargs):
if not self.can_recv:
raise RPCException(_("You cannot recv on this socket."))
- return self.sock.recv_multipart()
+ return self.sock.recv_multipart(**kwargs)
- def send(self, data):
+ def send(self, data, **kwargs):
if not self.can_send:
raise RPCException(_("You cannot send on this socket."))
- self.sock.send_multipart(data)
+ self.sock.send_multipart(data, **kwargs)
class ZmqClient(object):
@@ -446,11 +446,8 @@ class ZmqProxy(ZmqBaseReactor):
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
- #TODO(ewindisch): use zero-copy (i.e. references, not copying)
- data = sock.recv()
- topic = data[1]
-
- LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+ data = sock.recv(copy=False)
+ topic = data[1].bytes
if topic.startswith('fanout~'):
sock_type = zmq.PUB
@@ -492,9 +489,7 @@ class ZmqProxy(ZmqBaseReactor):
while(True):
data = self.topic_proxy[topic].get()
- out_sock.send(data)
- LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
- {'data': data})
+ out_sock.send(data, copy=False)
wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
@@ -507,8 +502,6 @@ class ZmqProxy(ZmqBaseReactor):
try:
self.topic_proxy[topic].put_nowait(data)
- LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
- {'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})