diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-05-31 14:23:37 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-05-31 14:23:37 +0000 |
commit | 3eac3ba29caa58a57b6c997eb3af6a129b44d94f (patch) | |
tree | ad6b79bd45e76b31a823df26c30f962384269bc7 /openstack/common/rpc/impl_zmq.py | |
parent | 762e62317f1028e65aedd4ab9494479433264022 (diff) | |
parent | 35aad91fad4fb92c2d557e13328b01f492686c46 (diff) | |
download | oslo-3eac3ba29caa58a57b6c997eb3af6a129b44d94f.tar.gz oslo-3eac3ba29caa58a57b6c997eb3af6a129b44d94f.tar.xz oslo-3eac3ba29caa58a57b6c997eb3af6a129b44d94f.zip |
Merge "Use zero-copy in ZeroMQ proxy (zmq-rpc-receiver)"
Diffstat (limited to 'openstack/common/rpc/impl_zmq.py')
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 21 |
1 files changed, 7 insertions, 14 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index c64c772..726bb52 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -198,15 +198,15 @@ class ZmqSocket(object): LOG.error("ZeroMQ socket could not be closed.") self.sock = None - def recv(self): + def recv(self, **kwargs): if not self.can_recv: raise RPCException(_("You cannot recv on this socket.")) - return self.sock.recv_multipart() + return self.sock.recv_multipart(**kwargs) - def send(self, data): + def send(self, data, **kwargs): if not self.can_send: raise RPCException(_("You cannot send on this socket.")) - self.sock.send_multipart(data) + self.sock.send_multipart(data, **kwargs) class ZmqClient(object): @@ -445,11 +445,8 @@ class ZmqProxy(ZmqBaseReactor): def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir - #TODO(ewindisch): use zero-copy (i.e. references, not copying) - data = sock.recv() - topic = data[1] - - LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) + data = sock.recv(copy=False) + topic = data[1].bytes if topic.startswith('fanout~'): sock_type = zmq.PUB @@ -491,9 +488,7 @@ class ZmqProxy(ZmqBaseReactor): while(True): data = self.topic_proxy[topic].get() - out_sock.send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % - {'data': data}) + out_sock.send(data, copy=False) wait_sock_creation = eventlet.event.Event() eventlet.spawn(publisher, wait_sock_creation) @@ -506,8 +501,6 @@ class ZmqProxy(ZmqBaseReactor): try: self.topic_proxy[topic].put_nowait(data) - LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % - {'data': data}) except eventlet.queue.Full: LOG.error(_("Local per-topic backlog buffer full for topic " "%(topic)s. Dropping message.") % {'topic': topic}) |