diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-02-09 02:36:42 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-02-09 02:36:42 +0000 |
| commit | 5200f7ca8c307880978be27aca2baff262115b96 (patch) | |
| tree | 7d968fca913ee0278b37f2eec50bb7f22cdd4cc1 /nova/openstack | |
| parent | 150cad4576c65c89a790bb6f205a0d96315c6375 (diff) | |
| parent | 1cfc526f50686286e64ac07566f715e981916d3c (diff) | |
| download | nova-5200f7ca8c307880978be27aca2baff262115b96.tar.gz nova-5200f7ca8c307880978be27aca2baff262115b96.tar.xz nova-5200f7ca8c307880978be27aca2baff262115b96.zip | |
Merge "Update modules from common required for rpc with lock detection."
Diffstat (limited to 'nova/openstack')
| -rw-r--r-- | nova/openstack/common/local.py | 11 | ||||
| -rw-r--r-- | nova/openstack/common/lockutils.py | 110 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/__init__.py | 60 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/amqp.py | 2 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/common.py | 2 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/impl_fake.py | 2 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/impl_kombu.py | 25 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/impl_qpid.py | 39 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/impl_zmq.py | 237 |
9 files changed, 308 insertions, 180 deletions
diff --git a/nova/openstack/common/local.py b/nova/openstack/common/local.py index 19d962732..8bdc837a9 100644 --- a/nova/openstack/common/local.py +++ b/nova/openstack/common/local.py @@ -26,6 +26,9 @@ class WeakLocal(corolocal.local): def __getattribute__(self, attr): rval = corolocal.local.__getattribute__(self, attr) if rval: + # NOTE(mikal): this bit is confusing. What is stored is a weak + # reference, not the value itself. We therefore need to lookup + # the weak reference and return the inner value here. rval = rval() return rval @@ -34,4 +37,12 @@ class WeakLocal(corolocal.local): return corolocal.local.__setattr__(self, attr, value) +# NOTE(mikal): the name "store" should be deprecated in the future store = WeakLocal() + +# A "weak" store uses weak references and allows an object to fall out of scope +# when it falls out of scope in the code that uses the thread local storage. A +# "strong" store will hold a reference to the object so that it never falls out +# of scope. +weak_store = WeakLocal() +strong_store = corolocal.local diff --git a/nova/openstack/common/lockutils.py b/nova/openstack/common/lockutils.py index 6f80a1f67..930e265f6 100644 --- a/nova/openstack/common/lockutils.py +++ b/nova/openstack/common/lockutils.py @@ -29,6 +29,7 @@ from eventlet import semaphore from nova.openstack.common import cfg from nova.openstack.common import fileutils from nova.openstack.common.gettextutils import _ +from nova.openstack.common import local from nova.openstack.common import log as logging @@ -39,9 +40,8 @@ util_opts = [ cfg.BoolOpt('disable_process_locking', default=False, help='Whether to disable inter-process locks'), cfg.StrOpt('lock_path', - default=os.path.abspath(os.path.join(os.path.dirname(__file__), - '../')), - help='Directory to use for lock files') + help=('Directory to use for lock files. Default to a ' + 'temp directory')) ] @@ -140,7 +140,7 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None): def foo(self, *args): ... - ensures that only one thread will execute the bar method at a time. + ensures that only one thread will execute the foo method at a time. Different methods can share the same lock:: @@ -184,54 +184,66 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None): LOG.debug(_('Got semaphore "%(lock)s" for method ' '"%(method)s"...'), {'lock': name, 'method': f.__name__}) - if external and not CONF.disable_process_locking: - LOG.debug(_('Attempting to grab file lock "%(lock)s" for ' - 'method "%(method)s"...'), - {'lock': name, 'method': f.__name__}) - cleanup_dir = False - - # We need a copy of lock_path because it is non-local - local_lock_path = lock_path - if not local_lock_path: - local_lock_path = CONF.lock_path - - if not local_lock_path: - cleanup_dir = True - local_lock_path = tempfile.mkdtemp() - - if not os.path.exists(local_lock_path): - cleanup_dir = True - fileutils.ensure_tree(local_lock_path) - - # NOTE(mikal): the lock name cannot contain directory - # separators - safe_name = name.replace(os.sep, '_') - lock_file_name = '%s%s' % (lock_file_prefix, safe_name) - lock_file_path = os.path.join(local_lock_path, - lock_file_name) - - try: - lock = InterProcessLock(lock_file_path) - with lock: - LOG.debug(_('Got file lock "%(lock)s" at %(path)s ' - 'for method "%(method)s"...'), + + # NOTE(mikal): I know this looks odd + if not hasattr(local.strong_store, 'locks_held'): + local.strong_store.locks_held = [] + local.strong_store.locks_held.append(name) + + try: + if external and not CONF.disable_process_locking: + LOG.debug(_('Attempting to grab file lock "%(lock)s" ' + 'for method "%(method)s"...'), + {'lock': name, 'method': f.__name__}) + cleanup_dir = False + + # We need a copy of lock_path because it is non-local + local_lock_path = lock_path + if not local_lock_path: + local_lock_path = CONF.lock_path + + if not local_lock_path: + cleanup_dir = True + local_lock_path = tempfile.mkdtemp() + + if not os.path.exists(local_lock_path): + cleanup_dir = True + fileutils.ensure_tree(local_lock_path) + + # NOTE(mikal): the lock name cannot contain directory + # separators + safe_name = name.replace(os.sep, '_') + lock_file_name = '%s%s' % (lock_file_prefix, safe_name) + lock_file_path = os.path.join(local_lock_path, + lock_file_name) + + try: + lock = InterProcessLock(lock_file_path) + with lock: + LOG.debug(_('Got file lock "%(lock)s" at ' + '%(path)s for method ' + '"%(method)s"...'), + {'lock': name, + 'path': lock_file_path, + 'method': f.__name__}) + retval = f(*args, **kwargs) + finally: + LOG.debug(_('Released file lock "%(lock)s" at ' + '%(path)s for method "%(method)s"...'), {'lock': name, 'path': lock_file_path, 'method': f.__name__}) - retval = f(*args, **kwargs) - finally: - LOG.debug(_('Released file lock "%(lock)s" at %(path)s' - ' for method "%(method)s"...'), - {'lock': name, - 'path': lock_file_path, - 'method': f.__name__}) - # NOTE(vish): This removes the tempdir if we needed - # to create one. This is used to cleanup - # the locks left behind by unit tests. - if cleanup_dir: - shutil.rmtree(local_lock_path) - else: - retval = f(*args, **kwargs) + # NOTE(vish): This removes the tempdir if we needed + # to create one. This is used to + # cleanup the locks left behind by unit + # tests. + if cleanup_dir: + shutil.rmtree(local_lock_path) + else: + retval = f(*args, **kwargs) + + finally: + local.strong_store.locks_held.remove(name) return retval return inner diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py index d43b48fa2..b98fef006 100644 --- a/nova/openstack/common/rpc/__init__.py +++ b/nova/openstack/common/rpc/__init__.py @@ -25,8 +25,16 @@ For some wrappers that add message versioning to rpc, see: rpc.proxy """ +import inspect +import logging + from nova.openstack.common import cfg +from nova.openstack.common.gettextutils import _ from nova.openstack.common import importutils +from nova.openstack.common import local + + +LOG = logging.getLogger(__name__) rpc_opts = [ @@ -62,7 +70,8 @@ rpc_opts = [ help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ] -cfg.CONF.register_opts(rpc_opts) +CONF = cfg.CONF +CONF.register_opts(rpc_opts) def set_defaults(control_exchange): @@ -83,10 +92,27 @@ def create_connection(new=True): :returns: An instance of openstack.common.rpc.common.Connection """ - return _get_impl().create_connection(cfg.CONF, new=new) + return _get_impl().create_connection(CONF, new=new) + + +def _check_for_lock(): + if not CONF.debug: + return None + + if ((hasattr(local.strong_store, 'locks_held') + and local.strong_store.locks_held)): + stack = ' :: '.join([frame[3] for frame in inspect.stack()]) + LOG.warn(_('A RPC is being made while holding a lock. The locks ' + 'currently held are %(locks)s. This is probably a bug. ' + 'Please report it. Include the following: [%(stack)s].'), + {'locks': local.strong_store.locks_held, + 'stack': stack}) + return True + + return False -def call(context, topic, msg, timeout=None): +def call(context, topic, msg, timeout=None, check_for_lock=False): """Invoke a remote method that returns something. :param context: Information that identifies the user that has made this @@ -100,13 +126,17 @@ def call(context, topic, msg, timeout=None): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. + :param check_for_lock: if True, a warning is emitted if a RPC call is made + with a lock held. :returns: A dict from the remote method. :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - return _get_impl().call(cfg.CONF, context, topic, msg, timeout) + if check_for_lock: + _check_for_lock() + return _get_impl().call(CONF, context, topic, msg, timeout) def cast(context, topic, msg): @@ -124,7 +154,7 @@ def cast(context, topic, msg): :returns: None """ - return _get_impl().cast(cfg.CONF, context, topic, msg) + return _get_impl().cast(CONF, context, topic, msg) def fanout_cast(context, topic, msg): @@ -145,10 +175,10 @@ def fanout_cast(context, topic, msg): :returns: None """ - return _get_impl().fanout_cast(cfg.CONF, context, topic, msg) + return _get_impl().fanout_cast(CONF, context, topic, msg) -def multicall(context, topic, msg, timeout=None): +def multicall(context, topic, msg, timeout=None, check_for_lock=False): """Invoke a remote method and get back an iterator. In this case, the remote method will be returning multiple values in @@ -166,6 +196,8 @@ def multicall(context, topic, msg, timeout=None): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. + :param check_for_lock: if True, a warning is emitted if a RPC call is made + with a lock held. :returns: An iterator. The iterator will yield a tuple (N, X) where N is an index that starts at 0 and increases by one for each value @@ -175,7 +207,9 @@ def multicall(context, topic, msg, timeout=None): :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) + if check_for_lock: + _check_for_lock() + return _get_impl().multicall(CONF, context, topic, msg, timeout) def notify(context, topic, msg, envelope=False): @@ -217,7 +251,7 @@ def cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic, + return _get_impl().cast_to_server(CONF, context, server_params, topic, msg) @@ -233,7 +267,7 @@ def fanout_cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params, + return _get_impl().fanout_cast_to_server(CONF, context, server_params, topic, msg) @@ -263,10 +297,10 @@ def _get_impl(): global _RPCIMPL if _RPCIMPL is None: try: - _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) + _RPCIMPL = importutils.import_module(CONF.rpc_backend) except ImportError: # For backwards compatibility with older nova config. - impl = cfg.CONF.rpc_backend.replace('nova.rpc', - 'nova.openstack.common.rpc') + impl = CONF.rpc_backend.replace('nova.rpc', + 'nova.openstack.common.rpc') _RPCIMPL = importutils.import_module(impl) return _RPCIMPL diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index 105e6fcbe..9aadce733 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, rpc_common.serialize_msg(msg)) + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py index bf4f5a3de..d2f5a7b8f 100644 --- a/nova/openstack/common/rpc/common.py +++ b/nova/openstack/common/rpc/common.py @@ -289,7 +289,7 @@ def deserialize_remote_exception(conf, data): # NOTE(ameade): We DO NOT want to allow just any module to be imported, in # order to prevent arbitrary code execution. - if not module in conf.allowed_rpc_exception_modules: + if module not in conf.allowed_rpc_exception_modules: return RemoteError(name, failure.get('message'), trace) try: diff --git a/nova/openstack/common/rpc/impl_fake.py b/nova/openstack/common/rpc/impl_fake.py index 4d133a1af..f43dbfe91 100644 --- a/nova/openstack/common/rpc/impl_fake.py +++ b/nova/openstack/common/rpc/impl_fake.py @@ -167,7 +167,7 @@ def cast(conf, context, topic, msg): pass -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): check_serialize(msg) diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py index 305dc7877..4b5550d6f 100644 --- a/nova/openstack/common/rpc/impl_kombu.py +++ b/nova/openstack/common/rpc/impl_kombu.py @@ -66,7 +66,8 @@ kombu_opts = [ help='the RabbitMQ userid'), cfg.StrOpt('rabbit_password', default='guest', - help='the RabbitMQ password'), + help='the RabbitMQ password', + secret=True), cfg.StrOpt('rabbit_virtual_host', default='/', help='the RabbitMQ virtual host'), @@ -175,7 +176,7 @@ class ConsumerBase(object): try: self.queue.cancel(self.tag) except KeyError, e: - # NOTE(comstud): Kludge to get around an amqplib bug + # NOTE(comstud): Kludge to get around a amqplib bug if str(e) != "u'%s'" % self.tag: raise self.queue = None @@ -302,9 +303,15 @@ class Publisher(object): channel=channel, routing_key=self.routing_key) - def send(self, msg): + def send(self, msg, timeout=None): """Send a message""" - self.producer.publish(msg) + if timeout: + # + # AMQP TTL is in milliseconds when set in the header. + # + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) + else: + self.producer.publish(msg) class DirectPublisher(Publisher): @@ -653,7 +660,7 @@ class Connection(object): for proxy_cb in self.proxy_callbacks: proxy_cb.wait() - def publisher_send(self, cls, topic, msg, **kwargs): + def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): """Send to a publisher based on the publisher class""" def _error_callback(exc): @@ -663,7 +670,7 @@ class Connection(object): def _publish(): publisher = cls(self.conf, self.channel, topic, **kwargs) - publisher.send(msg) + publisher.send(msg, timeout) self.ensure(_error_callback, _publish) @@ -691,9 +698,9 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + self.publisher_send(TopicPublisher, topic, msg, timeout) def fanout_send(self, topic, msg): """Send a 'fanout' message""" @@ -701,7 +708,7 @@ class Connection(object): def notify_send(self, topic, msg, **kwargs): """Send a notify message on a topic""" - self.publisher_send(NotifyPublisher, topic, msg, **kwargs) + self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs) def consume(self, limit=None): """Consume from all queues/consumers""" diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index 2e05f02f1..544d33790 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -22,16 +22,18 @@ import uuid import eventlet import greenlet -import qpid.messaging -import qpid.messaging.exceptions from nova.openstack.common import cfg from nova.openstack.common.gettextutils import _ +from nova.openstack.common import importutils from nova.openstack.common import jsonutils from nova.openstack.common import log as logging from nova.openstack.common.rpc import amqp as rpc_amqp from nova.openstack.common.rpc import common as rpc_common +qpid_messaging = importutils.try_import("qpid.messaging") +qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") + LOG = logging.getLogger(__name__) qpid_opts = [ @@ -49,7 +51,8 @@ qpid_opts = [ help='Username for qpid connection'), cfg.StrOpt('qpid_password', default='', - help='Password for qpid connection'), + help='Password for qpid connection', + secret=True), cfg.StrOpt('qpid_sasl_mechanisms', default='', help='Space separated list of SASL mechanisms to use for auth'), @@ -275,6 +278,9 @@ class Connection(object): pool = None def __init__(self, conf, server_params=None): + if not qpid_messaging: + raise ImportError("Failed to import qpid.messaging") + self.session = None self.consumers = {} self.consumer_thread = None @@ -303,7 +309,7 @@ class Connection(object): def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(broker) + self.connection = qpid_messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -328,7 +334,7 @@ class Connection(object): if self.connection.opened(): try: self.connection.close() - except qpid.messaging.exceptions.ConnectionError: + except qpid_exceptions.ConnectionError: pass attempt = 0 @@ -340,7 +346,7 @@ class Connection(object): try: self.connection_create(broker) self.connection.open() - except qpid.messaging.exceptions.ConnectionError, e: + except qpid_exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) msg = _("Unable to connect to AMQP server: %(e)s. " "Sleeping %(delay)s seconds") % msg_dict @@ -367,8 +373,8 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except (qpid.messaging.exceptions.Empty, - qpid.messaging.exceptions.ConnectionError), e: + except (qpid_exceptions.Empty, + qpid_exceptions.ConnectionError), e: if error_callback: error_callback(e) self.reconnect() @@ -408,7 +414,7 @@ class Connection(object): """Return an iterator that will consume from all queues/consumers""" def _error_callback(exc): - if isinstance(exc, qpid.messaging.exceptions.Empty): + if isinstance(exc, qpid_exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % str(exc)) raise rpc_common.Timeout() @@ -481,9 +487,20 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + # + # We want to create a message with attributes, e.g. a TTL. We + # don't really need to keep 'msg' in its JSON format any longer + # so let's create an actual qpid message here and get some + # value-add on the go. + # + # WARNING: Request timeout happens to be in the same units as + # qpid's TTL (seconds). If this changes in the future, then this + # will need to be altered accordingly. + # + qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) + self.publisher_send(TopicPublisher, topic, qpid_message) def fanout_send(self, topic, msg): """Send a 'fanout' message""" diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py index d99d390f2..2c0631548 100644 --- a/nova/openstack/common/rpc/impl_zmq.py +++ b/nova/openstack/common/rpc/impl_zmq.py @@ -14,23 +14,24 @@ # License for the specific language governing permissions and limitations # under the License. +import os import pprint import socket -import string import sys import types import uuid import eventlet -from eventlet.green import zmq import greenlet from nova.openstack.common import cfg from nova.openstack.common.gettextutils import _ from nova.openstack.common import importutils from nova.openstack.common import jsonutils +from nova.openstack.common import processutils as utils from nova.openstack.common.rpc import common as rpc_common +zmq = importutils.try_import('eventlet.green.zmq') # for convenience, are not modified. pformat = pprint.pformat @@ -61,6 +62,10 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), + cfg.IntOpt('rpc_zmq_topic_backlog', default=None, + help='Maximum number of ingress messages to locally buffer ' + 'per topic. Default is unlimited.'), + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', help='Directory for holding IPC sockets'), @@ -70,9 +75,9 @@ zmq_opts = [ ] -# These globals are defined in register_opts(conf), -# a mandatory initialization call -CONF = None +CONF = cfg.CONF +CONF.register_opts(zmq_opts) + ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object @@ -84,7 +89,7 @@ def _serialize(data): Error if a developer passes us bad data. """ try: - return str(jsonutils.dumps(data, ensure_ascii=True)) + return jsonutils.dumps(data, ensure_ascii=True) except TypeError: LOG.error(_("JSON serialization failed.")) raise @@ -107,7 +112,7 @@ class ZmqSocket(object): """ def __init__(self, addr, zmq_type, bind=True, subscribe=None): - self.sock = ZMQ_CTX.socket(zmq_type) + self.sock = _get_ctxt().socket(zmq_type) self.addr = addr self.type = zmq_type self.subscriptions = [] @@ -181,11 +186,15 @@ class ZmqSocket(object): pass self.subscriptions = [] - # Linger -1 prevents lost/dropped messages try: - self.sock.close(linger=-1) + # Default is to linger + self.sock.close() except Exception: - pass + # While this is a bad thing to happen, + # it would be much worse if some of the code calling this + # were to fail. For now, lets log, and later evaluate + # if we can safely raise here. + LOG.error("ZeroMQ socket could not be closed.") self.sock = None def recv(self): @@ -202,14 +211,17 @@ class ZmqSocket(object): class ZmqClient(object): """Client for ZMQ sockets.""" - def __init__(self, addr, socket_type=zmq.PUSH, bind=False): + def __init__(self, addr, socket_type=None, bind=False): + if socket_type is None: + socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + msg_id = msg_id or 0 + if serialize: data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) def close(self): self.outq.close() @@ -283,13 +295,13 @@ class InternalContext(object): ctx.replies) LOG.debug(_("Sending reply")) - cast(CONF, ctx, topic, { + _multi_send(_cast, ctx, topic, { 'method': '-process_reply', 'args': { - 'msg_id': msg_id, + 'msg_id': msg_id, # Include for Folsom compat. 'response': response } - }) + }, _msg_id=msg_id) class ConsumerBase(object): @@ -309,21 +321,22 @@ class ConsumerBase(object): return [result] def process(self, style, target, proxy, ctx, data): + data.setdefault('version', None) + data.setdefault('args', {}) + # Method starting with - are # processed internally. (non-valid method name) - method = data['method'] + method = data.get('method') + if not method: + LOG.error(_("RPC message did not include method.")) + return # Internal method # uses internal context for safety. - if data['method'][0] == '-': - # For reply / process_reply - method = method[1:] - if method == 'reply': - self.private_ctx.reply(ctx, proxy, **data['args']) + if method == '-reply': + self.private_ctx.reply(ctx, proxy, **data['args']) return - data.setdefault('version', None) - data.setdefault('args', {}) proxy.dispatch(ctx, data['version'], data['method'], **data['args']) @@ -413,12 +426,6 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).__init__(conf) self.topic_proxy = {} - ipc_dir = CONF.rpc_zmq_ipc_dir - - self.topic_proxy['zmq_replies'] = \ - ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), - zmq.PUB, bind=True) - self.sockets.append(self.topic_proxy['zmq_replies']) def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir @@ -430,34 +437,87 @@ class ZmqProxy(ZmqBaseReactor): LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - # Handle zmq_replies magic - if topic.startswith('fanout~'): + if topic.startswith('fanout~') or topic.startswith('zmq_replies'): sock_type = zmq.PUB - elif topic.startswith('zmq_replies'): - sock_type = zmq.PUB - inside = rpc_common.deserialize_msg(_deserialize(in_msg)) - msg_id = inside[-1]['args']['msg_id'] - response = inside[-1]['args']['response'] - LOG.debug(_("->response->%s"), response) - data = [str(msg_id), _serialize(response)] else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) + if topic not in self.topic_proxy: + def publisher(waiter): + LOG.info(_("Creating proxy for topic: %s"), topic) + + try: + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % + (ipc_dir, topic), + sock_type, bind=True) + except RPCException: + waiter.send_exception(*sys.exc_info()) + return + + self.topic_proxy[topic] = eventlet.queue.LightQueue( + CONF.rpc_zmq_topic_backlog) + self.sockets.append(out_sock) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) + + waiter.send(True) + + while(True): + data = self.topic_proxy[topic].get() + out_sock.send(data) + LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % + {'data': data}) + + wait_sock_creation = eventlet.event.Event() + eventlet.spawn(publisher, wait_sock_creation) + + try: + wait_sock_creation.wait() + except RPCException: + LOG.error(_("Topic socket file creation failed.")) + return + + try: + self.topic_proxy[topic].put_nowait(data) + LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % + {'data': data}) + except eventlet.queue.Full: + LOG.error(_("Local per-topic backlog buffer full for topic " + "%(topic)s. Dropping message.") % {'topic': topic}) + + def consume_in_thread(self): + """Runs the ZmqProxy service""" + ipc_dir = CONF.rpc_zmq_ipc_dir + consume_in = "tcp://%s:%s" % \ + (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port) + consumption_proxy = InternalContext(None) + + if not os.path.isdir(ipc_dir): + try: + utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) + utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), + ipc_dir, run_as_root=True) + utils.execute('chmod', '750', ipc_dir, run_as_root=True) + except utils.ProcessExecutionError: + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) + raise - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) + try: + self.register(consumption_proxy, + consume_in, + zmq.PULL, + out_bind=True) + except zmq.ZMQError: + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) + raise - LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) - self.topic_proxy[topic].send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) + super(ZmqProxy, self).consume_in_thread() class ZmqReactor(ZmqBaseReactor): @@ -533,8 +593,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, - force_envelope=False): +def _cast(addr, context, topic, msg, timeout=None, serialize=True, + force_envelope=False, _msg_id=None): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -543,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload, serialize, force_envelope) + conn.cast(_msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -551,7 +611,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn.close() -def _call(addr, context, msg_id, topic, msg, timeout=None): +def _call(addr, context, topic, msg, timeout=None, + serialize=True, force_envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -586,17 +647,20 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): ) LOG.debug(_("Sending cast")) - _cast(addr, context, msg_id, topic, payload) + _cast(addr, context, topic, payload, + serialize=serialize, force_envelope=force_envelope) LOG.debug(_("Cast sent; Waiting reply")) # Blocks until receives reply msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1]) + responses = _deserialize(msg[-1])[-1]['args']['response'] # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") + except (IndexError, KeyError): + raise RPCException(_("RPC Message Invalid.")) finally: if 'msg_waiter' in vars(): msg_waiter.close() @@ -613,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): def _multi_send(method, context, topic, msg, timeout=None, serialize=True, - force_envelope=False): + force_envelope=False, _msg_id=None): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -622,7 +686,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, conf = CONF LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) - queues = matchmaker.queues(topic) + queues = _get_matchmaker().queues(topic) LOG.debug(_("Sending message(s) to: %s"), queues) # Don't stack if we have no matchmaker results @@ -639,10 +703,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout, serialize, - force_envelope) + _topic, msg, timeout, serialize, + force_envelope, _msg_id) return - return method(_addr, context, _topic, _topic, msg, timeout) + return method(_addr, context, _topic, msg, timeout, + serialize, force_envelope) def create_connection(conf, new=True): @@ -689,44 +754,26 @@ def notify(conf, context, topic, msg, **kwargs): def cleanup(): """Clean up resources in use by implementation.""" global ZMQ_CTX + if ZMQ_CTX: + ZMQ_CTX.term() + ZMQ_CTX = None + global matchmaker matchmaker = None - ZMQ_CTX.term() - ZMQ_CTX = None -def register_opts(conf): - """Registration of options for this driver.""" - #NOTE(ewindisch): ZMQ_CTX and matchmaker - # are initialized here as this is as good - # an initialization method as any. +def _get_ctxt(): + if not zmq: + raise ImportError("Failed to import eventlet.green.zmq") - # We memoize through these globals global ZMQ_CTX - global matchmaker - global CONF - - if not CONF: - conf.register_opts(zmq_opts) - CONF = conf - # Don't re-set, if this method is called twice. if not ZMQ_CTX: - ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) - if not matchmaker: - # rpc_zmq_matchmaker should be set to a 'module.Class' - mm_path = conf.rpc_zmq_matchmaker.split('.') - mm_module = '.'.join(mm_path[:-1]) - mm_class = mm_path[-1] + ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts) + return ZMQ_CTX - # Only initialize a class. - if mm_path[-1][0] not in string.ascii_uppercase: - LOG.error(_("Matchmaker could not be loaded.\n" - "rpc_zmq_matchmaker is not a class.")) - raise RPCException(_("Error loading Matchmaker.")) - mm_impl = importutils.import_module(mm_module) - mm_constructor = getattr(mm_impl, mm_class) - matchmaker = mm_constructor() - - -register_opts(cfg.CONF) +def _get_matchmaker(): + global matchmaker + if not matchmaker: + matchmaker = importutils.import_object(CONF.rpc_zmq_matchmaker) + return matchmaker |
