diff options
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 21 |
1 files changed, 7 insertions, 14 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index c64c772..726bb52 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -198,15 +198,15 @@ class ZmqSocket(object): LOG.error("ZeroMQ socket could not be closed.") self.sock = None - def recv(self): + def recv(self, **kwargs): if not self.can_recv: raise RPCException(_("You cannot recv on this socket.")) - return self.sock.recv_multipart() + return self.sock.recv_multipart(**kwargs) - def send(self, data): + def send(self, data, **kwargs): if not self.can_send: raise RPCException(_("You cannot send on this socket.")) - self.sock.send_multipart(data) + self.sock.send_multipart(data, **kwargs) class ZmqClient(object): @@ -445,11 +445,8 @@ class ZmqProxy(ZmqBaseReactor): def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir - #TODO(ewindisch): use zero-copy (i.e. references, not copying) - data = sock.recv() - topic = data[1] - - LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) + data = sock.recv(copy=False) + topic = data[1].bytes if topic.startswith('fanout~'): sock_type = zmq.PUB @@ -491,9 +488,7 @@ class ZmqProxy(ZmqBaseReactor): while(True): data = self.topic_proxy[topic].get() - out_sock.send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % - {'data': data}) + out_sock.send(data, copy=False) wait_sock_creation = eventlet.event.Event() eventlet.spawn(publisher, wait_sock_creation) @@ -506,8 +501,6 @@ class ZmqProxy(ZmqBaseReactor): try: self.topic_proxy[topic].put_nowait(data) - LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % - {'data': data}) except eventlet.queue.Full: LOG.error(_("Local per-topic backlog buffer full for topic " "%(topic)s. Dropping message.") % {'topic': topic}) |
