diff options
author | Russell Bryant <rbryant@redhat.com> | 2012-06-06 21:22:40 -0400 |
---|---|---|
committer | Russell Bryant <rbryant@redhat.com> | 2012-06-11 11:25:20 -0400 |
commit | 4bfe5a40a523df45dffc128de2f263eedb51c2e8 (patch) | |
tree | e182e1942ab65a451a2beed01fd252363c277020 | |
parent | 2d4e7b3177f658a2437297f9495211d0075480b0 (diff) | |
download | oslo-4bfe5a40a523df45dffc128de2f263eedb51c2e8.tar.gz oslo-4bfe5a40a523df45dffc128de2f263eedb51c2e8.tar.xz oslo-4bfe5a40a523df45dffc128de2f263eedb51c2e8.zip |
Add impl_zmq to rpc.
Part of blueprint common-rpc.
This driver just went into nova.rpc. Pull it in over here. I haven't
been able to test this, but I can't get the tests to work in nova,
either.
There is a bit of a complication with impl_zmq worth mentioning. It
requires a service to run (nova/bin/nova-rpc-zmq-receiver). Only once
instance of this service should run on a given machine, so it shouldn't
be copied into each project that uses rpc. Once openstack-common is an
installed library, it can be distributed with that, but until then ...
we'll just leave it in nova for now.
Change-Id: I2bd067be58d943d4f3a90e013b22282217484872
-rw-r--r-- | openstack/common/rpc/__init__.py | 4 | ||||
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 714 | ||||
-rw-r--r-- | tests/unit/rpc/test_zmq.py | 132 |
3 files changed, 850 insertions, 0 deletions
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py index 116aa84..26bd048 100644 --- a/openstack/common/rpc/__init__.py +++ b/openstack/common/rpc/__init__.py @@ -42,6 +42,10 @@ rpc_opts = [ cfg.IntOpt('rpc_response_timeout', default=60, help='Seconds to wait for a response from call or multicall'), + cfg.IntOpt('rpc_cast_timeout', + default=30, + help='Seconds to wait before a cast expires (TTL). ' + 'Only supported by impl_zmq.'), cfg.ListOpt('allowed_rpc_exception_modules', default=['openstack.common.exception', 'nova.exception'], help='Modules of exceptions that are permitted to be recreated' diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py new file mode 100644 index 0000000..da69dd7 --- /dev/null +++ b/openstack/common/rpc/impl_zmq.py @@ -0,0 +1,714 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import pprint +import string +import sys +import types +import uuid + +import eventlet +from eventlet.green import zmq +import greenlet + +from openstack.common import cfg +from openstack.common.gettextutils import _ +from openstack.common import importutils +from openstack.common.rpc import common as rpc_common + + +# for convenience, are not modified. +pformat = pprint.pformat +Timeout = eventlet.timeout.Timeout +LOG = rpc_common.LOG +RemoteError = rpc_common.RemoteError +RPCException = rpc_common.RPCException + +zmq_opts = [ + cfg.StrOpt('rpc_zmq_bind_address', default='*', + help='ZeroMQ bind address. Should be a wildcard (*), ' + 'an ethernet interface, or IP. ' + 'The "host" option should point or resolve to this address.'), + + # The module.Class to use for matchmaking. + cfg.StrOpt('rpc_zmq_matchmaker', + default='openstack.common.rpc.matchmaker.MatchMakerLocalhost', + help='MatchMaker driver'), + + # The following port is unassigned by IANA as of 2012-05-21 + cfg.IntOpt('rpc_zmq_port', default=9501, + help='ZeroMQ receiver listening port'), + + cfg.IntOpt('rpc_zmq_contexts', default=1, + help='Number of ZeroMQ contexts, defaults to 1'), + + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', + help='Directory for holding IPC sockets'), + ] + + +# These globals are defined in register_opts(conf), +# a mandatory initialization call +FLAGS = None +ZMQ_CTX = None # ZeroMQ Context, must be global. +matchmaker = None # memoized matchmaker object + + +def _serialize(data): + """ + Serialization wrapper + We prefer using JSON, but it cannot encode all types. + Error if a developer passes us bad data. + """ + try: + return str(json.dumps(data, ensure_ascii=True)) + except TypeError: + LOG.error(_("JSON serialization failed.")) + raise + + +def _deserialize(data): + """ + Deserialization wrapper + """ + LOG.debug(_("Deserializing: %s"), data) + return json.loads(data) + + +class ZmqSocket(object): + """ + A tiny wrapper around ZeroMQ to simplify the send/recv protocol + and connection management. + + Can be used as a Context (supports the 'with' statement). + """ + + def __init__(self, addr, zmq_type, bind=True, subscribe=None): + self.sock = ZMQ_CTX.socket(zmq_type) + self.addr = addr + self.type = zmq_type + self.subscriptions = [] + + # Support failures on sending/receiving on wrong socket type. + self.can_recv = zmq_type in (zmq.PULL, zmq.SUB) + self.can_send = zmq_type in (zmq.PUSH, zmq.PUB) + self.can_sub = zmq_type in (zmq.SUB, ) + + # Support list, str, & None for subscribe arg (cast to list) + do_sub = { + list: subscribe, + str: [subscribe], + type(None): [] + }[type(subscribe)] + + for f in do_sub: + self.subscribe(f) + + LOG.debug(_("Connecting to %{addr}s with %{type}s" + "\n-> Subscribed to %{subscribe}s" + "\n-> bind: %{bind}s"), + {'addr': addr, 'type': self.socket_s(), + 'subscribe': subscribe, 'bind': bind}) + + try: + if bind: + self.sock.bind(addr) + else: + self.sock.connect(addr) + except Exception: + raise RPCException(_("Could not open socket.")) + + def socket_s(self): + """Get socket type as string.""" + t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER', + 'DEALER') + return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type] + + def subscribe(self, msg_filter): + """Subscribe.""" + if not self.can_sub: + raise RPCException("Cannot subscribe on this socket.") + LOG.debug(_("Subscribing to %s"), msg_filter) + + try: + self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter) + except Exception: + return + + self.subscriptions.append(msg_filter) + + def unsubscribe(self, msg_filter): + """Unsubscribe.""" + if msg_filter not in self.subscriptions: + return + self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter) + self.subscriptions.remove(msg_filter) + + def close(self): + if self.sock is None or self.sock.closed: + return + + # We must unsubscribe, or we'll leak descriptors. + if len(self.subscriptions) > 0: + for f in self.subscriptions: + try: + self.sock.setsockopt(zmq.UNSUBSCRIBE, f) + except Exception: + pass + self.subscriptions = [] + + # Linger -1 prevents lost/dropped messages + try: + self.sock.close(linger=-1) + except Exception: + pass + self.sock = None + + def recv(self): + if not self.can_recv: + raise RPCException(_("You cannot recv on this socket.")) + return self.sock.recv_multipart() + + def send(self, data): + if not self.can_send: + raise RPCException(_("You cannot send on this socket.")) + self.sock.send_multipart(data) + + +class ZmqClient(object): + """Client for ZMQ sockets.""" + + def __init__(self, addr, socket_type=zmq.PUSH, bind=False): + self.outq = ZmqSocket(addr, socket_type, bind=bind) + + def cast(self, msg_id, topic, data): + self.outq.send([str(msg_id), str(topic), str('cast'), + _serialize(data)]) + + def close(self): + self.outq.close() + + +class RpcContext(rpc_common.CommonRpcContext): + """Context that supports replying to a rpc.call.""" + def __init__(self, **kwargs): + self.replies = [] + super(RpcContext, self).__init__(**kwargs) + + def deepcopy(self): + values = self.to_dict() + values['replies'] = self.replies + return self.__class__(**values) + + def reply(self, reply=None, failure=None, ending=False): + if ending: + return + self.replies.append(reply) + + @classmethod + def marshal(self, ctx): + ctx_data = ctx.to_dict() + return _serialize(ctx_data) + + @classmethod + def unmarshal(self, data): + return RpcContext.from_dict(_deserialize(data)) + + +class InternalContext(object): + """Used by ConsumerBase as a private context for - methods.""" + + def __init__(self, proxy): + self.proxy = proxy + self.msg_waiter = None + + def _get_response(self, ctx, proxy, topic, data): + """Process a curried message and cast the result to topic.""" + LOG.debug(_("Running func with context: %s"), ctx.to_dict()) + data.setdefault('version', None) + data.setdefault('args', []) + + try: + result = proxy.dispatch( + ctx, data['version'], data['method'], **data['args']) + return ConsumerBase.normalize_reply(result, ctx.replies) + except greenlet.GreenletExit: + # ignore these since they are just from shutdowns + pass + except Exception: + return {'exc': + rpc_common.serialize_remote_exception(sys.exc_info())} + + def reply(self, ctx, proxy, + msg_id=None, context=None, topic=None, msg=None): + """Reply to a casted call.""" + # Our real method is curried into msg['args'] + + child_ctx = RpcContext.unmarshal(msg[0]) + response = ConsumerBase.normalize_reply( + self._get_response(child_ctx, proxy, topic, msg[1]), + ctx.replies) + + LOG.debug(_("Sending reply")) + cast(FLAGS, ctx, topic, { + 'method': '-process_reply', + 'args': { + 'msg_id': msg_id, + 'response': response + } + }) + + +class ConsumerBase(object): + """Base Consumer.""" + + def __init__(self): + self.private_ctx = InternalContext(None) + + @classmethod + def normalize_reply(self, result, replies): + #TODO(ewindisch): re-evaluate and document this method. + if isinstance(result, types.GeneratorType): + return list(result) + elif replies: + return replies + else: + return [result] + + def process(self, style, target, proxy, ctx, data): + # Method starting with - are + # processed internally. (non-valid method name) + method = data['method'] + + # Internal method + # uses internal context for safety. + if data['method'][0] == '-': + # For reply / process_reply + method = method[1:] + if method == 'reply': + self.private_ctx.reply(ctx, proxy, **data['args']) + return + + data.setdefault('version', None) + data.setdefault('args', []) + proxy.dispatch(ctx, data['version'], + data['method'], **data['args']) + + +class ZmqBaseReactor(ConsumerBase): + """ + A consumer class implementing a + centralized casting broker (PULL-PUSH) + for RoundRobin requests. + """ + + def __init__(self, conf): + super(ZmqBaseReactor, self).__init__() + + self.conf = conf + self.mapping = {} + self.proxies = {} + self.threads = [] + self.sockets = [] + self.subscribe = {} + + self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) + + def register(self, proxy, in_addr, zmq_type_in, out_addr=None, + zmq_type_out=None, in_bind=True, out_bind=True, + subscribe=None): + + LOG.info(_("Registering reactor")) + + if zmq_type_in not in (zmq.PULL, zmq.SUB): + raise RPCException("Bad input socktype") + + # Items push in. + inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind, + subscribe=subscribe) + + self.proxies[inq] = proxy + self.sockets.append(inq) + + LOG.info(_("In reactor registered")) + + if not out_addr: + return + + if zmq_type_out not in (zmq.PUSH, zmq.PUB): + raise RPCException("Bad output socktype") + + # Items push out. + outq = ZmqSocket(out_addr, zmq_type_out, + bind=out_bind) + + self.mapping[inq] = outq + self.mapping[outq] = inq + self.sockets.append(outq) + + LOG.info(_("Out reactor registered")) + + def consume_in_thread(self): + def _consume(sock): + LOG.info(_("Consuming socket")) + while True: + self.consume(sock) + + for k in self.proxies.keys(): + self.threads.append( + self.pool.spawn(_consume, k) + ) + + def wait(self): + for t in self.threads: + t.wait() + + def close(self): + for s in self.sockets: + s.close() + + for t in self.threads: + t.kill() + + +class ZmqProxy(ZmqBaseReactor): + """ + A consumer class implementing a + topic-based proxy, forwarding to + IPC sockets. + """ + + def __init__(self, conf): + super(ZmqProxy, self).__init__(conf) + + self.topic_proxy = {} + ipc_dir = conf.rpc_zmq_ipc_dir + + self.topic_proxy['zmq_replies'] = \ + ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), + zmq.PUB, bind=True) + self.sockets.append(self.topic_proxy['zmq_replies']) + + def consume(self, sock): + ipc_dir = self.conf.rpc_zmq_ipc_dir + + #TODO(ewindisch): use zero-copy (i.e. references, not copying) + data = sock.recv() + msg_id, topic, style, in_msg = data + topic = topic.split('.', 1)[0] + + LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) + + # Handle zmq_replies magic + if topic.startswith('fanout~'): + sock_type = zmq.PUB + elif topic.startswith('zmq_replies'): + sock_type = zmq.PUB + inside = _deserialize(in_msg) + msg_id = inside[-1]['args']['msg_id'] + response = inside[-1]['args']['response'] + LOG.debug(_("->response->%s"), response) + data = [str(msg_id), _serialize(response)] + else: + sock_type = zmq.PUSH + + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) + + LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) + self.topic_proxy[topic].send(data) + LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) + + +class ZmqReactor(ZmqBaseReactor): + """ + A consumer class implementing a + consumer for messages. Can also be + used as a 1:1 proxy + """ + + def __init__(self, conf): + super(ZmqReactor, self).__init__(conf) + + def consume(self, sock): + #TODO(ewindisch): use zero-copy (i.e. references, not copying) + data = sock.recv() + LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) + if sock in self.mapping: + LOG.debug(_("ROUTER RELAY-OUT %(data)s") % { + 'data': data}) + self.mapping[sock].send(data) + return + + msg_id, topic, style, in_msg = data + + ctx, request = _deserialize(in_msg) + ctx = RpcContext.unmarshal(ctx) + + proxy = self.proxies[sock] + + self.pool.spawn_n(self.process, style, topic, + proxy, ctx, request) + + +class Connection(rpc_common.Connection): + """Manages connections and threads.""" + + def __init__(self, conf): + self.conf = conf + self.reactor = ZmqReactor(conf) + + def create_consumer(self, topic, proxy, fanout=False): + # Only consume on the base topic name. + topic = topic.split('.', 1)[0] + + LOG.info(_("Create Consumer for topic (%(topic)s)") % + {'topic': topic}) + + # Subscription scenarios + if fanout: + subscribe = ('', fanout)[type(fanout) == str] + sock_type = zmq.SUB + topic = 'fanout~' + topic + else: + sock_type = zmq.PULL + subscribe = None + + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (self.conf.rpc_zmq_ipc_dir, topic) + + LOG.debug(_("Consumer is a zmq.%s"), + ['PULL', 'SUB'][sock_type == zmq.SUB]) + + self.reactor.register(proxy, inaddr, sock_type, + subscribe=subscribe, in_bind=False) + + def close(self): + self.reactor.close() + + def wait(self): + self.reactor.wait() + + def consume_in_thread(self): + self.reactor.consume_in_thread() + + +def _cast(addr, context, msg_id, topic, msg, timeout=None): + timeout_cast = timeout or FLAGS.rpc_cast_timeout + payload = [RpcContext.marshal(context), msg] + + with Timeout(timeout_cast, exception=rpc_common.Timeout): + try: + conn = ZmqClient(addr) + + # assumes cast can't return an exception + conn.cast(msg_id, topic, payload) + except zmq.ZMQError: + raise RPCException("Cast failed. ZMQ Socket Exception") + finally: + if 'conn' in vars(): + conn.close() + + +def _call(addr, context, msg_id, topic, msg, timeout=None): + # timeout_response is how long we wait for a response + timeout = timeout or FLAGS.rpc_response_timeout + + # The msg_id is used to track replies. + msg_id = str(uuid.uuid4().hex) + + # Replies always come into the reply service. + # We require that FLAGS.host is a FQDN, IP, or resolvable hostname. + reply_topic = "zmq_replies.%s" % FLAGS.host + + LOG.debug(_("Creating payload")) + # Curry the original request into a reply method. + mcontext = RpcContext.marshal(context) + payload = { + 'method': '-reply', + 'args': { + 'msg_id': msg_id, + 'context': mcontext, + 'topic': reply_topic, + 'msg': [mcontext, msg] + } + } + + LOG.debug(_("Creating queue socket for reply waiter")) + + # Messages arriving async. + # TODO(ewindisch): have reply consumer with dynamic subscription mgmt + with Timeout(timeout, exception=rpc_common.Timeout): + try: + msg_waiter = ZmqSocket( + "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir, + zmq.SUB, subscribe=msg_id, bind=False + ) + + LOG.debug(_("Sending cast")) + _cast(addr, context, msg_id, topic, payload) + + LOG.debug(_("Cast sent; Waiting reply")) + # Blocks until receives reply + msg = msg_waiter.recv() + LOG.debug(_("Received message: %s"), msg) + LOG.debug(_("Unpacking response")) + responses = _deserialize(msg[-1]) + # ZMQError trumps the Timeout error. + except zmq.ZMQError: + raise RPCException("ZMQ Socket Error") + finally: + if 'msg_waiter' in vars(): + msg_waiter.close() + + # It seems we don't need to do all of the following, + # but perhaps it would be useful for multicall? + # One effect of this is that we're checking all + # responses for Exceptions. + for resp in responses: + if isinstance(resp, types.DictType) and 'exc' in resp: + raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc']) + + return responses[-1] + + +def _multi_send(method, context, topic, msg, timeout=None): + """ + Wraps the sending of messages, + dispatches to the matchmaker and sends + message to all relevant hosts. + """ + conf = FLAGS + LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) + + queues = matchmaker.queues(topic) + LOG.debug(_("Sending message(s) to: %s"), queues) + + # Don't stack if we have no matchmaker results + if len(queues) == 0: + LOG.warn(_("No matchmaker results. Not casting.")) + # While not strictly a timeout, callers know how to handle + # this exception and a timeout isn't too big a lie. + raise rpc_common.Timeout, "No match from matchmaker." + + # This supports brokerless fanout (addresses > 1) + for queue in queues: + (_topic, ip_addr) = queue + _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port) + + if method.__name__ == '_cast': + eventlet.spawn_n(method, _addr, context, + _topic, _topic, msg, timeout) + return + return method(_addr, context, _topic, _topic, msg, timeout) + + +def create_connection(conf, new=True): + return Connection(conf) + + +def multicall(conf, *args, **kwargs): + """Multiple calls.""" + register_opts(conf) + return _multi_send(_call, *args, **kwargs) + + +def call(conf, *args, **kwargs): + """Send a message, expect a response.""" + register_opts(conf) + data = _multi_send(_call, *args, **kwargs) + return data[-1] + + +def cast(conf, *args, **kwargs): + """Send a message expecting no reply.""" + register_opts(conf) + _multi_send(_cast, *args, **kwargs) + + +def fanout_cast(conf, context, topic, msg, **kwargs): + """Send a message to all listening and expect no reply.""" + register_opts(conf) + # NOTE(ewindisch): fanout~ is used because it avoid splitting on . + # and acts as a non-subtle hint to the matchmaker and ZmqProxy. + _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) + + +def notify(conf, context, topic, msg, **kwargs): + """ + Send notification event. + Notifications are sent to topic-priority. + This differs from the AMQP drivers which send to topic.priority. + """ + register_opts(conf) + # NOTE(ewindisch): dot-priority in rpc notifier does not + # work with our assumptions. + topic.replace('.', '-') + cast(conf, context, topic, msg, **kwargs) + + +def cleanup(): + """Clean up resources in use by implementation.""" + global ZMQ_CTX + global matchmaker + matchmaker = None + ZMQ_CTX.destroy() + ZMQ_CTX = None + + +def register_opts(conf): + """Registration of options for this driver.""" + #NOTE(ewindisch): ZMQ_CTX and matchmaker + # are initialized here as this is as good + # an initialization method as any. + + # We memoize through these globals + global ZMQ_CTX + global matchmaker + global FLAGS + + if not FLAGS: + conf.register_opts(zmq_opts) + FLAGS = conf + # Don't re-set, if this method is called twice. + if not ZMQ_CTX: + ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) + if not matchmaker: + # rpc_zmq_matchmaker should be set to a 'module.Class' + mm_path = conf.rpc_zmq_matchmaker.split('.') + mm_module = '.'.join(mm_path[:-1]) + mm_class = mm_path[-1] + + # Only initialize a class. + if mm_path[-1][0] not in string.ascii_uppercase: + LOG.error(_("Matchmaker could not be loaded.\n" + "rpc_zmq_matchmaker is not a class.")) + raise + + mm_impl = importutils.import_module(mm_module) + mm_constructor = getattr(mm_impl, mm_class) + matchmaker = mm_constructor() diff --git a/tests/unit/rpc/test_zmq.py b/tests/unit/rpc/test_zmq.py new file mode 100644 index 0000000..5c69445 --- /dev/null +++ b/tests/unit/rpc/test_zmq.py @@ -0,0 +1,132 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Cloudscaling Group, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Unit Tests for remote procedure calls using zeromq +""" + +import eventlet +eventlet.monkey_patch() + +import logging +import os + +from openstack.common import cfg +from openstack.common import exception +from openstack.common.gettextutils import _ +from openstack.common import rpc +from openstack.common import testutils +from openstack.common import utils +from tests.unit.rpc import common + +try: + from eventlet.green import zmq + from openstack.common.rpc import impl_zmq +except ImportError: + zmq = None + impl_zmq = None + +LOG = logging.getLogger(__name__) +FLAGS = cfg.CONF + + +class _RpcZmqBaseTestCase(common.BaseRpcTestCase): + @testutils.skip_if(zmq is None, "Test requires zmq") + def setUp(self, topic='test', topic_nested='nested'): + if not impl_zmq: + return None + + self.reactor = None + FLAGS.register_opts(rpc.rpc_opts) + self.rpc = impl_zmq + self.rpc.register_opts(FLAGS) + FLAGS.set_default('rpc_zmq_matchmaker', + 'mod_matchmaker.MatchMakerLocalhost') + + # We'll change this if we detect no daemon running. + ipc_dir = FLAGS.rpc_zmq_ipc_dir + + # Only launch the router if it isn't running independently. + if not os.path.exists(os.path.join(ipc_dir, "zmq_topic_zmq_replies")): + LOG.info(_("Running internal zmq receiver.")) + # The normal ipc_dir default needs to run as root, + # /tmp is easier within a testing environment. + FLAGS.set_default('rpc_zmq_ipc_dir', '/tmp/openstack-zmq.ipc.test') + + # Value has changed. + ipc_dir = FLAGS.rpc_zmq_ipc_dir + + try: + # Only launch the receiver if it isn't running independently. + # This is checked again, with the (possibly) new ipc_dir. + if os.path.exists(os.path.join(ipc_dir, "zmq_topic_zmq_replies")): + LOG.warning(_("Detected zmq-receiver socket. " + "Assuming nova-rpc-zmq-receiver is running.")) + return + + if not os.path.isdir(ipc_dir): + os.mkdir(ipc_dir) + + self.reactor = impl_zmq.ZmqProxy(FLAGS) + consume_in = "tcp://%s:%s" % \ + (FLAGS.rpc_zmq_bind_address, + FLAGS.rpc_zmq_port) + consumption_proxy = impl_zmq.InternalContext(None) + + self.reactor.register(consumption_proxy, + consume_in, zmq.PULL, out_bind=True) + self.reactor.consume_in_thread() + except zmq.ZMQError: + assert False, _("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.") + except OSError: + assert False, _("Could not create IPC directory %s") % \ + (ipc_dir, ) + finally: + super(_RpcZmqBaseTestCase, self).setUp( + topic=topic, topic_nested=topic_nested) + + def tearDown(self): + if not impl_zmq: + return None + if self.reactor: + self.reactor.close() + + try: + utils.execute('rm', '-rf', FLAGS.rpc_zmq_ipc_dir) + except exception.Error: + pass + + super(_RpcZmqBaseTestCase, self).tearDown() + + +class RpcZmqBaseTopicTestCase(_RpcZmqBaseTestCase): + """ + This tests with topics such as 'test' and 'nested', + without any .host appended. Stresses the matchmaker. + """ + pass + + +class RpcZmqDirectTopicTestCase(_RpcZmqBaseTestCase): + """ + Test communication directly to a host, + tests use 'localhost'. + """ + def setUp(self): + super(RpcZmqDirectTopicTestCase, self).setUp( + topic='test.localhost', + topic_nested='nested.localhost') |