summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2012-06-06 21:22:40 -0400
committerRussell Bryant <rbryant@redhat.com>2012-06-11 11:25:20 -0400
commit4bfe5a40a523df45dffc128de2f263eedb51c2e8 (patch)
treee182e1942ab65a451a2beed01fd252363c277020
parent2d4e7b3177f658a2437297f9495211d0075480b0 (diff)
downloadoslo-4bfe5a40a523df45dffc128de2f263eedb51c2e8.tar.gz
oslo-4bfe5a40a523df45dffc128de2f263eedb51c2e8.tar.xz
oslo-4bfe5a40a523df45dffc128de2f263eedb51c2e8.zip
Add impl_zmq to rpc.
Part of blueprint common-rpc. This driver just went into nova.rpc. Pull it in over here. I haven't been able to test this, but I can't get the tests to work in nova, either. There is a bit of a complication with impl_zmq worth mentioning. It requires a service to run (nova/bin/nova-rpc-zmq-receiver). Only once instance of this service should run on a given machine, so it shouldn't be copied into each project that uses rpc. Once openstack-common is an installed library, it can be distributed with that, but until then ... we'll just leave it in nova for now. Change-Id: I2bd067be58d943d4f3a90e013b22282217484872
-rw-r--r--openstack/common/rpc/__init__.py4
-rw-r--r--openstack/common/rpc/impl_zmq.py714
-rw-r--r--tests/unit/rpc/test_zmq.py132
3 files changed, 850 insertions, 0 deletions
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py
index 116aa84..26bd048 100644
--- a/openstack/common/rpc/__init__.py
+++ b/openstack/common/rpc/__init__.py
@@ -42,6 +42,10 @@ rpc_opts = [
cfg.IntOpt('rpc_response_timeout',
default=60,
help='Seconds to wait for a response from call or multicall'),
+ cfg.IntOpt('rpc_cast_timeout',
+ default=30,
+ help='Seconds to wait before a cast expires (TTL). '
+ 'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
default=['openstack.common.exception', 'nova.exception'],
help='Modules of exceptions that are permitted to be recreated'
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
new file mode 100644
index 0000000..da69dd7
--- /dev/null
+++ b/openstack/common/rpc/impl_zmq.py
@@ -0,0 +1,714 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+import pprint
+import string
+import sys
+import types
+import uuid
+
+import eventlet
+from eventlet.green import zmq
+import greenlet
+
+from openstack.common import cfg
+from openstack.common.gettextutils import _
+from openstack.common import importutils
+from openstack.common.rpc import common as rpc_common
+
+
+# for convenience, are not modified.
+pformat = pprint.pformat
+Timeout = eventlet.timeout.Timeout
+LOG = rpc_common.LOG
+RemoteError = rpc_common.RemoteError
+RPCException = rpc_common.RPCException
+
+zmq_opts = [
+ cfg.StrOpt('rpc_zmq_bind_address', default='*',
+ help='ZeroMQ bind address. Should be a wildcard (*), '
+ 'an ethernet interface, or IP. '
+ 'The "host" option should point or resolve to this address.'),
+
+ # The module.Class to use for matchmaking.
+ cfg.StrOpt('rpc_zmq_matchmaker',
+ default='openstack.common.rpc.matchmaker.MatchMakerLocalhost',
+ help='MatchMaker driver'),
+
+ # The following port is unassigned by IANA as of 2012-05-21
+ cfg.IntOpt('rpc_zmq_port', default=9501,
+ help='ZeroMQ receiver listening port'),
+
+ cfg.IntOpt('rpc_zmq_contexts', default=1,
+ help='Number of ZeroMQ contexts, defaults to 1'),
+
+ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
+ help='Directory for holding IPC sockets'),
+ ]
+
+
+# These globals are defined in register_opts(conf),
+# a mandatory initialization call
+FLAGS = None
+ZMQ_CTX = None # ZeroMQ Context, must be global.
+matchmaker = None # memoized matchmaker object
+
+
+def _serialize(data):
+ """
+ Serialization wrapper
+ We prefer using JSON, but it cannot encode all types.
+ Error if a developer passes us bad data.
+ """
+ try:
+ return str(json.dumps(data, ensure_ascii=True))
+ except TypeError:
+ LOG.error(_("JSON serialization failed."))
+ raise
+
+
+def _deserialize(data):
+ """
+ Deserialization wrapper
+ """
+ LOG.debug(_("Deserializing: %s"), data)
+ return json.loads(data)
+
+
+class ZmqSocket(object):
+ """
+ A tiny wrapper around ZeroMQ to simplify the send/recv protocol
+ and connection management.
+
+ Can be used as a Context (supports the 'with' statement).
+ """
+
+ def __init__(self, addr, zmq_type, bind=True, subscribe=None):
+ self.sock = ZMQ_CTX.socket(zmq_type)
+ self.addr = addr
+ self.type = zmq_type
+ self.subscriptions = []
+
+ # Support failures on sending/receiving on wrong socket type.
+ self.can_recv = zmq_type in (zmq.PULL, zmq.SUB)
+ self.can_send = zmq_type in (zmq.PUSH, zmq.PUB)
+ self.can_sub = zmq_type in (zmq.SUB, )
+
+ # Support list, str, & None for subscribe arg (cast to list)
+ do_sub = {
+ list: subscribe,
+ str: [subscribe],
+ type(None): []
+ }[type(subscribe)]
+
+ for f in do_sub:
+ self.subscribe(f)
+
+ LOG.debug(_("Connecting to %{addr}s with %{type}s"
+ "\n-> Subscribed to %{subscribe}s"
+ "\n-> bind: %{bind}s"),
+ {'addr': addr, 'type': self.socket_s(),
+ 'subscribe': subscribe, 'bind': bind})
+
+ try:
+ if bind:
+ self.sock.bind(addr)
+ else:
+ self.sock.connect(addr)
+ except Exception:
+ raise RPCException(_("Could not open socket."))
+
+ def socket_s(self):
+ """Get socket type as string."""
+ t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER',
+ 'DEALER')
+ return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type]
+
+ def subscribe(self, msg_filter):
+ """Subscribe."""
+ if not self.can_sub:
+ raise RPCException("Cannot subscribe on this socket.")
+ LOG.debug(_("Subscribing to %s"), msg_filter)
+
+ try:
+ self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
+ except Exception:
+ return
+
+ self.subscriptions.append(msg_filter)
+
+ def unsubscribe(self, msg_filter):
+ """Unsubscribe."""
+ if msg_filter not in self.subscriptions:
+ return
+ self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
+ self.subscriptions.remove(msg_filter)
+
+ def close(self):
+ if self.sock is None or self.sock.closed:
+ return
+
+ # We must unsubscribe, or we'll leak descriptors.
+ if len(self.subscriptions) > 0:
+ for f in self.subscriptions:
+ try:
+ self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
+ except Exception:
+ pass
+ self.subscriptions = []
+
+ # Linger -1 prevents lost/dropped messages
+ try:
+ self.sock.close(linger=-1)
+ except Exception:
+ pass
+ self.sock = None
+
+ def recv(self):
+ if not self.can_recv:
+ raise RPCException(_("You cannot recv on this socket."))
+ return self.sock.recv_multipart()
+
+ def send(self, data):
+ if not self.can_send:
+ raise RPCException(_("You cannot send on this socket."))
+ self.sock.send_multipart(data)
+
+
+class ZmqClient(object):
+ """Client for ZMQ sockets."""
+
+ def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+ self.outq = ZmqSocket(addr, socket_type, bind=bind)
+
+ def cast(self, msg_id, topic, data):
+ self.outq.send([str(msg_id), str(topic), str('cast'),
+ _serialize(data)])
+
+ def close(self):
+ self.outq.close()
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+ """Context that supports replying to a rpc.call."""
+ def __init__(self, **kwargs):
+ self.replies = []
+ super(RpcContext, self).__init__(**kwargs)
+
+ def deepcopy(self):
+ values = self.to_dict()
+ values['replies'] = self.replies
+ return self.__class__(**values)
+
+ def reply(self, reply=None, failure=None, ending=False):
+ if ending:
+ return
+ self.replies.append(reply)
+
+ @classmethod
+ def marshal(self, ctx):
+ ctx_data = ctx.to_dict()
+ return _serialize(ctx_data)
+
+ @classmethod
+ def unmarshal(self, data):
+ return RpcContext.from_dict(_deserialize(data))
+
+
+class InternalContext(object):
+ """Used by ConsumerBase as a private context for - methods."""
+
+ def __init__(self, proxy):
+ self.proxy = proxy
+ self.msg_waiter = None
+
+ def _get_response(self, ctx, proxy, topic, data):
+ """Process a curried message and cast the result to topic."""
+ LOG.debug(_("Running func with context: %s"), ctx.to_dict())
+ data.setdefault('version', None)
+ data.setdefault('args', [])
+
+ try:
+ result = proxy.dispatch(
+ ctx, data['version'], data['method'], **data['args'])
+ return ConsumerBase.normalize_reply(result, ctx.replies)
+ except greenlet.GreenletExit:
+ # ignore these since they are just from shutdowns
+ pass
+ except Exception:
+ return {'exc':
+ rpc_common.serialize_remote_exception(sys.exc_info())}
+
+ def reply(self, ctx, proxy,
+ msg_id=None, context=None, topic=None, msg=None):
+ """Reply to a casted call."""
+ # Our real method is curried into msg['args']
+
+ child_ctx = RpcContext.unmarshal(msg[0])
+ response = ConsumerBase.normalize_reply(
+ self._get_response(child_ctx, proxy, topic, msg[1]),
+ ctx.replies)
+
+ LOG.debug(_("Sending reply"))
+ cast(FLAGS, ctx, topic, {
+ 'method': '-process_reply',
+ 'args': {
+ 'msg_id': msg_id,
+ 'response': response
+ }
+ })
+
+
+class ConsumerBase(object):
+ """Base Consumer."""
+
+ def __init__(self):
+ self.private_ctx = InternalContext(None)
+
+ @classmethod
+ def normalize_reply(self, result, replies):
+ #TODO(ewindisch): re-evaluate and document this method.
+ if isinstance(result, types.GeneratorType):
+ return list(result)
+ elif replies:
+ return replies
+ else:
+ return [result]
+
+ def process(self, style, target, proxy, ctx, data):
+ # Method starting with - are
+ # processed internally. (non-valid method name)
+ method = data['method']
+
+ # Internal method
+ # uses internal context for safety.
+ if data['method'][0] == '-':
+ # For reply / process_reply
+ method = method[1:]
+ if method == 'reply':
+ self.private_ctx.reply(ctx, proxy, **data['args'])
+ return
+
+ data.setdefault('version', None)
+ data.setdefault('args', [])
+ proxy.dispatch(ctx, data['version'],
+ data['method'], **data['args'])
+
+
+class ZmqBaseReactor(ConsumerBase):
+ """
+ A consumer class implementing a
+ centralized casting broker (PULL-PUSH)
+ for RoundRobin requests.
+ """
+
+ def __init__(self, conf):
+ super(ZmqBaseReactor, self).__init__()
+
+ self.conf = conf
+ self.mapping = {}
+ self.proxies = {}
+ self.threads = []
+ self.sockets = []
+ self.subscribe = {}
+
+ self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
+
+ def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
+ zmq_type_out=None, in_bind=True, out_bind=True,
+ subscribe=None):
+
+ LOG.info(_("Registering reactor"))
+
+ if zmq_type_in not in (zmq.PULL, zmq.SUB):
+ raise RPCException("Bad input socktype")
+
+ # Items push in.
+ inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind,
+ subscribe=subscribe)
+
+ self.proxies[inq] = proxy
+ self.sockets.append(inq)
+
+ LOG.info(_("In reactor registered"))
+
+ if not out_addr:
+ return
+
+ if zmq_type_out not in (zmq.PUSH, zmq.PUB):
+ raise RPCException("Bad output socktype")
+
+ # Items push out.
+ outq = ZmqSocket(out_addr, zmq_type_out,
+ bind=out_bind)
+
+ self.mapping[inq] = outq
+ self.mapping[outq] = inq
+ self.sockets.append(outq)
+
+ LOG.info(_("Out reactor registered"))
+
+ def consume_in_thread(self):
+ def _consume(sock):
+ LOG.info(_("Consuming socket"))
+ while True:
+ self.consume(sock)
+
+ for k in self.proxies.keys():
+ self.threads.append(
+ self.pool.spawn(_consume, k)
+ )
+
+ def wait(self):
+ for t in self.threads:
+ t.wait()
+
+ def close(self):
+ for s in self.sockets:
+ s.close()
+
+ for t in self.threads:
+ t.kill()
+
+
+class ZmqProxy(ZmqBaseReactor):
+ """
+ A consumer class implementing a
+ topic-based proxy, forwarding to
+ IPC sockets.
+ """
+
+ def __init__(self, conf):
+ super(ZmqProxy, self).__init__(conf)
+
+ self.topic_proxy = {}
+ ipc_dir = conf.rpc_zmq_ipc_dir
+
+ self.topic_proxy['zmq_replies'] = \
+ ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
+ zmq.PUB, bind=True)
+ self.sockets.append(self.topic_proxy['zmq_replies'])
+
+ def consume(self, sock):
+ ipc_dir = self.conf.rpc_zmq_ipc_dir
+
+ #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+ data = sock.recv()
+ msg_id, topic, style, in_msg = data
+ topic = topic.split('.', 1)[0]
+
+ LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+
+ # Handle zmq_replies magic
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
+ elif topic.startswith('zmq_replies'):
+ sock_type = zmq.PUB
+ inside = _deserialize(in_msg)
+ msg_id = inside[-1]['args']['msg_id']
+ response = inside[-1]['args']['response']
+ LOG.debug(_("->response->%s"), response)
+ data = [str(msg_id), _serialize(response)]
+ else:
+ sock_type = zmq.PUSH
+
+ if not topic in self.topic_proxy:
+ outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+ sock_type, bind=True)
+ self.topic_proxy[topic] = outq
+ self.sockets.append(outq)
+ LOG.info(_("Created topic proxy: %s"), topic)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
+
+ LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
+ self.topic_proxy[topic].send(data)
+ LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+
+
+class ZmqReactor(ZmqBaseReactor):
+ """
+ A consumer class implementing a
+ consumer for messages. Can also be
+ used as a 1:1 proxy
+ """
+
+ def __init__(self, conf):
+ super(ZmqReactor, self).__init__(conf)
+
+ def consume(self, sock):
+ #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+ data = sock.recv()
+ LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
+ if sock in self.mapping:
+ LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
+ 'data': data})
+ self.mapping[sock].send(data)
+ return
+
+ msg_id, topic, style, in_msg = data
+
+ ctx, request = _deserialize(in_msg)
+ ctx = RpcContext.unmarshal(ctx)
+
+ proxy = self.proxies[sock]
+
+ self.pool.spawn_n(self.process, style, topic,
+ proxy, ctx, request)
+
+
+class Connection(rpc_common.Connection):
+ """Manages connections and threads."""
+
+ def __init__(self, conf):
+ self.conf = conf
+ self.reactor = ZmqReactor(conf)
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ # Only consume on the base topic name.
+ topic = topic.split('.', 1)[0]
+
+ LOG.info(_("Create Consumer for topic (%(topic)s)") %
+ {'topic': topic})
+
+ # Subscription scenarios
+ if fanout:
+ subscribe = ('', fanout)[type(fanout) == str]
+ sock_type = zmq.SUB
+ topic = 'fanout~' + topic
+ else:
+ sock_type = zmq.PULL
+ subscribe = None
+
+ # Receive messages from (local) proxy
+ inaddr = "ipc://%s/zmq_topic_%s" % \
+ (self.conf.rpc_zmq_ipc_dir, topic)
+
+ LOG.debug(_("Consumer is a zmq.%s"),
+ ['PULL', 'SUB'][sock_type == zmq.SUB])
+
+ self.reactor.register(proxy, inaddr, sock_type,
+ subscribe=subscribe, in_bind=False)
+
+ def close(self):
+ self.reactor.close()
+
+ def wait(self):
+ self.reactor.wait()
+
+ def consume_in_thread(self):
+ self.reactor.consume_in_thread()
+
+
+def _cast(addr, context, msg_id, topic, msg, timeout=None):
+ timeout_cast = timeout or FLAGS.rpc_cast_timeout
+ payload = [RpcContext.marshal(context), msg]
+
+ with Timeout(timeout_cast, exception=rpc_common.Timeout):
+ try:
+ conn = ZmqClient(addr)
+
+ # assumes cast can't return an exception
+ conn.cast(msg_id, topic, payload)
+ except zmq.ZMQError:
+ raise RPCException("Cast failed. ZMQ Socket Exception")
+ finally:
+ if 'conn' in vars():
+ conn.close()
+
+
+def _call(addr, context, msg_id, topic, msg, timeout=None):
+ # timeout_response is how long we wait for a response
+ timeout = timeout or FLAGS.rpc_response_timeout
+
+ # The msg_id is used to track replies.
+ msg_id = str(uuid.uuid4().hex)
+
+ # Replies always come into the reply service.
+ # We require that FLAGS.host is a FQDN, IP, or resolvable hostname.
+ reply_topic = "zmq_replies.%s" % FLAGS.host
+
+ LOG.debug(_("Creating payload"))
+ # Curry the original request into a reply method.
+ mcontext = RpcContext.marshal(context)
+ payload = {
+ 'method': '-reply',
+ 'args': {
+ 'msg_id': msg_id,
+ 'context': mcontext,
+ 'topic': reply_topic,
+ 'msg': [mcontext, msg]
+ }
+ }
+
+ LOG.debug(_("Creating queue socket for reply waiter"))
+
+ # Messages arriving async.
+ # TODO(ewindisch): have reply consumer with dynamic subscription mgmt
+ with Timeout(timeout, exception=rpc_common.Timeout):
+ try:
+ msg_waiter = ZmqSocket(
+ "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
+ zmq.SUB, subscribe=msg_id, bind=False
+ )
+
+ LOG.debug(_("Sending cast"))
+ _cast(addr, context, msg_id, topic, payload)
+
+ LOG.debug(_("Cast sent; Waiting reply"))
+ # Blocks until receives reply
+ msg = msg_waiter.recv()
+ LOG.debug(_("Received message: %s"), msg)
+ LOG.debug(_("Unpacking response"))
+ responses = _deserialize(msg[-1])
+ # ZMQError trumps the Timeout error.
+ except zmq.ZMQError:
+ raise RPCException("ZMQ Socket Error")
+ finally:
+ if 'msg_waiter' in vars():
+ msg_waiter.close()
+
+ # It seems we don't need to do all of the following,
+ # but perhaps it would be useful for multicall?
+ # One effect of this is that we're checking all
+ # responses for Exceptions.
+ for resp in responses:
+ if isinstance(resp, types.DictType) and 'exc' in resp:
+ raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
+
+ return responses[-1]
+
+
+def _multi_send(method, context, topic, msg, timeout=None):
+ """
+ Wraps the sending of messages,
+ dispatches to the matchmaker and sends
+ message to all relevant hosts.
+ """
+ conf = FLAGS
+ LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
+
+ queues = matchmaker.queues(topic)
+ LOG.debug(_("Sending message(s) to: %s"), queues)
+
+ # Don't stack if we have no matchmaker results
+ if len(queues) == 0:
+ LOG.warn(_("No matchmaker results. Not casting."))
+ # While not strictly a timeout, callers know how to handle
+ # this exception and a timeout isn't too big a lie.
+ raise rpc_common.Timeout, "No match from matchmaker."
+
+ # This supports brokerless fanout (addresses > 1)
+ for queue in queues:
+ (_topic, ip_addr) = queue
+ _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port)
+
+ if method.__name__ == '_cast':
+ eventlet.spawn_n(method, _addr, context,
+ _topic, _topic, msg, timeout)
+ return
+ return method(_addr, context, _topic, _topic, msg, timeout)
+
+
+def create_connection(conf, new=True):
+ return Connection(conf)
+
+
+def multicall(conf, *args, **kwargs):
+ """Multiple calls."""
+ register_opts(conf)
+ return _multi_send(_call, *args, **kwargs)
+
+
+def call(conf, *args, **kwargs):
+ """Send a message, expect a response."""
+ register_opts(conf)
+ data = _multi_send(_call, *args, **kwargs)
+ return data[-1]
+
+
+def cast(conf, *args, **kwargs):
+ """Send a message expecting no reply."""
+ register_opts(conf)
+ _multi_send(_cast, *args, **kwargs)
+
+
+def fanout_cast(conf, context, topic, msg, **kwargs):
+ """Send a message to all listening and expect no reply."""
+ register_opts(conf)
+ # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
+ # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
+ _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
+
+
+def notify(conf, context, topic, msg, **kwargs):
+ """
+ Send notification event.
+ Notifications are sent to topic-priority.
+ This differs from the AMQP drivers which send to topic.priority.
+ """
+ register_opts(conf)
+ # NOTE(ewindisch): dot-priority in rpc notifier does not
+ # work with our assumptions.
+ topic.replace('.', '-')
+ cast(conf, context, topic, msg, **kwargs)
+
+
+def cleanup():
+ """Clean up resources in use by implementation."""
+ global ZMQ_CTX
+ global matchmaker
+ matchmaker = None
+ ZMQ_CTX.destroy()
+ ZMQ_CTX = None
+
+
+def register_opts(conf):
+ """Registration of options for this driver."""
+ #NOTE(ewindisch): ZMQ_CTX and matchmaker
+ # are initialized here as this is as good
+ # an initialization method as any.
+
+ # We memoize through these globals
+ global ZMQ_CTX
+ global matchmaker
+ global FLAGS
+
+ if not FLAGS:
+ conf.register_opts(zmq_opts)
+ FLAGS = conf
+ # Don't re-set, if this method is called twice.
+ if not ZMQ_CTX:
+ ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
+ if not matchmaker:
+ # rpc_zmq_matchmaker should be set to a 'module.Class'
+ mm_path = conf.rpc_zmq_matchmaker.split('.')
+ mm_module = '.'.join(mm_path[:-1])
+ mm_class = mm_path[-1]
+
+ # Only initialize a class.
+ if mm_path[-1][0] not in string.ascii_uppercase:
+ LOG.error(_("Matchmaker could not be loaded.\n"
+ "rpc_zmq_matchmaker is not a class."))
+ raise
+
+ mm_impl = importutils.import_module(mm_module)
+ mm_constructor = getattr(mm_impl, mm_class)
+ matchmaker = mm_constructor()
diff --git a/tests/unit/rpc/test_zmq.py b/tests/unit/rpc/test_zmq.py
new file mode 100644
index 0000000..5c69445
--- /dev/null
+++ b/tests/unit/rpc/test_zmq.py
@@ -0,0 +1,132 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Cloudscaling Group, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+Unit Tests for remote procedure calls using zeromq
+"""
+
+import eventlet
+eventlet.monkey_patch()
+
+import logging
+import os
+
+from openstack.common import cfg
+from openstack.common import exception
+from openstack.common.gettextutils import _
+from openstack.common import rpc
+from openstack.common import testutils
+from openstack.common import utils
+from tests.unit.rpc import common
+
+try:
+ from eventlet.green import zmq
+ from openstack.common.rpc import impl_zmq
+except ImportError:
+ zmq = None
+ impl_zmq = None
+
+LOG = logging.getLogger(__name__)
+FLAGS = cfg.CONF
+
+
+class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
+ @testutils.skip_if(zmq is None, "Test requires zmq")
+ def setUp(self, topic='test', topic_nested='nested'):
+ if not impl_zmq:
+ return None
+
+ self.reactor = None
+ FLAGS.register_opts(rpc.rpc_opts)
+ self.rpc = impl_zmq
+ self.rpc.register_opts(FLAGS)
+ FLAGS.set_default('rpc_zmq_matchmaker',
+ 'mod_matchmaker.MatchMakerLocalhost')
+
+ # We'll change this if we detect no daemon running.
+ ipc_dir = FLAGS.rpc_zmq_ipc_dir
+
+ # Only launch the router if it isn't running independently.
+ if not os.path.exists(os.path.join(ipc_dir, "zmq_topic_zmq_replies")):
+ LOG.info(_("Running internal zmq receiver."))
+ # The normal ipc_dir default needs to run as root,
+ # /tmp is easier within a testing environment.
+ FLAGS.set_default('rpc_zmq_ipc_dir', '/tmp/openstack-zmq.ipc.test')
+
+ # Value has changed.
+ ipc_dir = FLAGS.rpc_zmq_ipc_dir
+
+ try:
+ # Only launch the receiver if it isn't running independently.
+ # This is checked again, with the (possibly) new ipc_dir.
+ if os.path.exists(os.path.join(ipc_dir, "zmq_topic_zmq_replies")):
+ LOG.warning(_("Detected zmq-receiver socket. "
+ "Assuming nova-rpc-zmq-receiver is running."))
+ return
+
+ if not os.path.isdir(ipc_dir):
+ os.mkdir(ipc_dir)
+
+ self.reactor = impl_zmq.ZmqProxy(FLAGS)
+ consume_in = "tcp://%s:%s" % \
+ (FLAGS.rpc_zmq_bind_address,
+ FLAGS.rpc_zmq_port)
+ consumption_proxy = impl_zmq.InternalContext(None)
+
+ self.reactor.register(consumption_proxy,
+ consume_in, zmq.PULL, out_bind=True)
+ self.reactor.consume_in_thread()
+ except zmq.ZMQError:
+ assert False, _("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use.")
+ except OSError:
+ assert False, _("Could not create IPC directory %s") % \
+ (ipc_dir, )
+ finally:
+ super(_RpcZmqBaseTestCase, self).setUp(
+ topic=topic, topic_nested=topic_nested)
+
+ def tearDown(self):
+ if not impl_zmq:
+ return None
+ if self.reactor:
+ self.reactor.close()
+
+ try:
+ utils.execute('rm', '-rf', FLAGS.rpc_zmq_ipc_dir)
+ except exception.Error:
+ pass
+
+ super(_RpcZmqBaseTestCase, self).tearDown()
+
+
+class RpcZmqBaseTopicTestCase(_RpcZmqBaseTestCase):
+ """
+ This tests with topics such as 'test' and 'nested',
+ without any .host appended. Stresses the matchmaker.
+ """
+ pass
+
+
+class RpcZmqDirectTopicTestCase(_RpcZmqBaseTestCase):
+ """
+ Test communication directly to a host,
+ tests use 'localhost'.
+ """
+ def setUp(self):
+ super(RpcZmqDirectTopicTestCase, self).setUp(
+ topic='test.localhost',
+ topic_nested='nested.localhost')