summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/impl_zmq.py
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-05-31 14:23:37 +0000
committerGerrit Code Review <review@openstack.org>2013-05-31 14:23:37 +0000
commit3eac3ba29caa58a57b6c997eb3af6a129b44d94f (patch)
treead6b79bd45e76b31a823df26c30f962384269bc7 /openstack/common/rpc/impl_zmq.py
parent762e62317f1028e65aedd4ab9494479433264022 (diff)
parent35aad91fad4fb92c2d557e13328b01f492686c46 (diff)
downloadoslo-3eac3ba29caa58a57b6c997eb3af6a129b44d94f.tar.gz
oslo-3eac3ba29caa58a57b6c997eb3af6a129b44d94f.tar.xz
oslo-3eac3ba29caa58a57b6c997eb3af6a129b44d94f.zip
Merge "Use zero-copy in ZeroMQ proxy (zmq-rpc-receiver)"
Diffstat (limited to 'openstack/common/rpc/impl_zmq.py')
-rw-r--r--openstack/common/rpc/impl_zmq.py21
1 files changed, 7 insertions, 14 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index c64c772..726bb52 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -198,15 +198,15 @@ class ZmqSocket(object):
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
- def recv(self):
+ def recv(self, **kwargs):
if not self.can_recv:
raise RPCException(_("You cannot recv on this socket."))
- return self.sock.recv_multipart()
+ return self.sock.recv_multipart(**kwargs)
- def send(self, data):
+ def send(self, data, **kwargs):
if not self.can_send:
raise RPCException(_("You cannot send on this socket."))
- self.sock.send_multipart(data)
+ self.sock.send_multipart(data, **kwargs)
class ZmqClient(object):
@@ -445,11 +445,8 @@ class ZmqProxy(ZmqBaseReactor):
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
- #TODO(ewindisch): use zero-copy (i.e. references, not copying)
- data = sock.recv()
- topic = data[1]
-
- LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+ data = sock.recv(copy=False)
+ topic = data[1].bytes
if topic.startswith('fanout~'):
sock_type = zmq.PUB
@@ -491,9 +488,7 @@ class ZmqProxy(ZmqBaseReactor):
while(True):
data = self.topic_proxy[topic].get()
- out_sock.send(data)
- LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
- {'data': data})
+ out_sock.send(data, copy=False)
wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
@@ -506,8 +501,6 @@ class ZmqProxy(ZmqBaseReactor):
try:
self.topic_proxy[topic].put_nowait(data)
- LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
- {'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})