summaryrefslogtreecommitdiffstats
path: root/nova/openstack
diff options
context:
space:
mode:
authorMichael Still <mikal@stillhq.com>2013-02-07 16:04:17 +1100
committerMichael Still <mikal@stillhq.com>2013-02-07 16:04:17 +1100
commit1cfc526f50686286e64ac07566f715e981916d3c (patch)
tree96b429be7e014bbf827b60c0196cda3b080150d4 /nova/openstack
parentb7a0a193f5ee5d9ad55f0c8a8d7b7da9692dbffb (diff)
Update modules from common required for rpc with lock detection.
This change updates the versions of the local, lockutils and rpc modules from common. This is required so that I can implement automated detection of rpcs being made while holding locks, which will come in a later review. Sneaks up on bug 1063222. Change-Id: I3689c133d1efc7425a44662abbdb434b5f36c64e
Diffstat (limited to 'nova/openstack')
-rw-r--r--nova/openstack/common/local.py11
-rw-r--r--nova/openstack/common/lockutils.py110
-rw-r--r--nova/openstack/common/rpc/__init__.py60
-rw-r--r--nova/openstack/common/rpc/amqp.py2
-rw-r--r--nova/openstack/common/rpc/common.py2
-rw-r--r--nova/openstack/common/rpc/impl_fake.py2
-rw-r--r--nova/openstack/common/rpc/impl_kombu.py25
-rw-r--r--nova/openstack/common/rpc/impl_qpid.py39
-rw-r--r--nova/openstack/common/rpc/impl_zmq.py237
9 files changed, 308 insertions, 180 deletions
diff --git a/nova/openstack/common/local.py b/nova/openstack/common/local.py
index 19d962732..8bdc837a9 100644
--- a/nova/openstack/common/local.py
+++ b/nova/openstack/common/local.py
@@ -26,6 +26,9 @@ class WeakLocal(corolocal.local):
def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr)
if rval:
+ # NOTE(mikal): this bit is confusing. What is stored is a weak
+ # reference, not the value itself. We therefore need to lookup
+ # the weak reference and return the inner value here.
rval = rval()
return rval
@@ -34,4 +37,12 @@ class WeakLocal(corolocal.local):
return corolocal.local.__setattr__(self, attr, value)
+# NOTE(mikal): the name "store" should be deprecated in the future
store = WeakLocal()
+
+# A "weak" store uses weak references and allows an object to fall out of scope
+# when it falls out of scope in the code that uses the thread local storage. A
+# "strong" store will hold a reference to the object so that it never falls out
+# of scope.
+weak_store = WeakLocal()
+strong_store = corolocal.local
diff --git a/nova/openstack/common/lockutils.py b/nova/openstack/common/lockutils.py
index 6f80a1f67..930e265f6 100644
--- a/nova/openstack/common/lockutils.py
+++ b/nova/openstack/common/lockutils.py
@@ -29,6 +29,7 @@ from eventlet import semaphore
from nova.openstack.common import cfg
from nova.openstack.common import fileutils
from nova.openstack.common.gettextutils import _
+from nova.openstack.common import local
from nova.openstack.common import log as logging
@@ -39,9 +40,8 @@ util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
- default=os.path.abspath(os.path.join(os.path.dirname(__file__),
- '../')),
- help='Directory to use for lock files')
+ help=('Directory to use for lock files. Default to a '
+ 'temp directory'))
]
@@ -140,7 +140,7 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
def foo(self, *args):
...
- ensures that only one thread will execute the bar method at a time.
+ ensures that only one thread will execute the foo method at a time.
Different methods can share the same lock::
@@ -184,54 +184,66 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
- if external and not CONF.disable_process_locking:
- LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
- 'method "%(method)s"...'),
- {'lock': name, 'method': f.__name__})
- cleanup_dir = False
-
- # We need a copy of lock_path because it is non-local
- local_lock_path = lock_path
- if not local_lock_path:
- local_lock_path = CONF.lock_path
-
- if not local_lock_path:
- cleanup_dir = True
- local_lock_path = tempfile.mkdtemp()
-
- if not os.path.exists(local_lock_path):
- cleanup_dir = True
- fileutils.ensure_tree(local_lock_path)
-
- # NOTE(mikal): the lock name cannot contain directory
- # separators
- safe_name = name.replace(os.sep, '_')
- lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
- lock_file_path = os.path.join(local_lock_path,
- lock_file_name)
-
- try:
- lock = InterProcessLock(lock_file_path)
- with lock:
- LOG.debug(_('Got file lock "%(lock)s" at %(path)s '
- 'for method "%(method)s"...'),
+
+ # NOTE(mikal): I know this looks odd
+ if not hasattr(local.strong_store, 'locks_held'):
+ local.strong_store.locks_held = []
+ local.strong_store.locks_held.append(name)
+
+ try:
+ if external and not CONF.disable_process_locking:
+ LOG.debug(_('Attempting to grab file lock "%(lock)s" '
+ 'for method "%(method)s"...'),
+ {'lock': name, 'method': f.__name__})
+ cleanup_dir = False
+
+ # We need a copy of lock_path because it is non-local
+ local_lock_path = lock_path
+ if not local_lock_path:
+ local_lock_path = CONF.lock_path
+
+ if not local_lock_path:
+ cleanup_dir = True
+ local_lock_path = tempfile.mkdtemp()
+
+ if not os.path.exists(local_lock_path):
+ cleanup_dir = True
+ fileutils.ensure_tree(local_lock_path)
+
+ # NOTE(mikal): the lock name cannot contain directory
+ # separators
+ safe_name = name.replace(os.sep, '_')
+ lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
+ lock_file_path = os.path.join(local_lock_path,
+ lock_file_name)
+
+ try:
+ lock = InterProcessLock(lock_file_path)
+ with lock:
+ LOG.debug(_('Got file lock "%(lock)s" at '
+ '%(path)s for method '
+ '"%(method)s"...'),
+ {'lock': name,
+ 'path': lock_file_path,
+ 'method': f.__name__})
+ retval = f(*args, **kwargs)
+ finally:
+ LOG.debug(_('Released file lock "%(lock)s" at '
+ '%(path)s for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
- retval = f(*args, **kwargs)
- finally:
- LOG.debug(_('Released file lock "%(lock)s" at %(path)s'
- ' for method "%(method)s"...'),
- {'lock': name,
- 'path': lock_file_path,
- 'method': f.__name__})
- # NOTE(vish): This removes the tempdir if we needed
- # to create one. This is used to cleanup
- # the locks left behind by unit tests.
- if cleanup_dir:
- shutil.rmtree(local_lock_path)
- else:
- retval = f(*args, **kwargs)
+ # NOTE(vish): This removes the tempdir if we needed
+ # to create one. This is used to
+ # cleanup the locks left behind by unit
+ # tests.
+ if cleanup_dir:
+ shutil.rmtree(local_lock_path)
+ else:
+ retval = f(*args, **kwargs)
+
+ finally:
+ local.strong_store.locks_held.remove(name)
return retval
return inner
diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py
index d43b48fa2..b98fef006 100644
--- a/nova/openstack/common/rpc/__init__.py
+++ b/nova/openstack/common/rpc/__init__.py
@@ -25,8 +25,16 @@ For some wrappers that add message versioning to rpc, see:
rpc.proxy
"""
+import inspect
+import logging
+
from nova.openstack.common import cfg
+from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
+from nova.openstack.common import local
+
+
+LOG = logging.getLogger(__name__)
rpc_opts = [
@@ -62,7 +70,8 @@ rpc_opts = [
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
-cfg.CONF.register_opts(rpc_opts)
+CONF = cfg.CONF
+CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
@@ -83,10 +92,27 @@ def create_connection(new=True):
:returns: An instance of openstack.common.rpc.common.Connection
"""
- return _get_impl().create_connection(cfg.CONF, new=new)
+ return _get_impl().create_connection(CONF, new=new)
+
+
+def _check_for_lock():
+ if not CONF.debug:
+ return None
+
+ if ((hasattr(local.strong_store, 'locks_held')
+ and local.strong_store.locks_held)):
+ stack = ' :: '.join([frame[3] for frame in inspect.stack()])
+ LOG.warn(_('A RPC is being made while holding a lock. The locks '
+ 'currently held are %(locks)s. This is probably a bug. '
+ 'Please report it. Include the following: [%(stack)s].'),
+ {'locks': local.strong_store.locks_held,
+ 'stack': stack})
+ return True
+
+ return False
-def call(context, topic, msg, timeout=None):
+def call(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
@@ -100,13 +126,17 @@ def call(context, topic, msg, timeout=None):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
+ :param check_for_lock: if True, a warning is emitted if a RPC call is made
+ with a lock held.
:returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
- return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
+ if check_for_lock:
+ _check_for_lock()
+ return _get_impl().call(CONF, context, topic, msg, timeout)
def cast(context, topic, msg):
@@ -124,7 +154,7 @@ def cast(context, topic, msg):
:returns: None
"""
- return _get_impl().cast(cfg.CONF, context, topic, msg)
+ return _get_impl().cast(CONF, context, topic, msg)
def fanout_cast(context, topic, msg):
@@ -145,10 +175,10 @@ def fanout_cast(context, topic, msg):
:returns: None
"""
- return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
+ return _get_impl().fanout_cast(CONF, context, topic, msg)
-def multicall(context, topic, msg, timeout=None):
+def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
@@ -166,6 +196,8 @@ def multicall(context, topic, msg, timeout=None):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
+ :param check_for_lock: if True, a warning is emitted if a RPC call is made
+ with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
@@ -175,7 +207,9 @@ def multicall(context, topic, msg, timeout=None):
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
- return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
+ if check_for_lock:
+ _check_for_lock()
+ return _get_impl().multicall(CONF, context, topic, msg, timeout)
def notify(context, topic, msg, envelope=False):
@@ -217,7 +251,7 @@ def cast_to_server(context, server_params, topic, msg):
:returns: None
"""
- return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
+ return _get_impl().cast_to_server(CONF, context, server_params, topic,
msg)
@@ -233,7 +267,7 @@ def fanout_cast_to_server(context, server_params, topic, msg):
:returns: None
"""
- return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
+ return _get_impl().fanout_cast_to_server(CONF, context, server_params,
topic, msg)
@@ -263,10 +297,10 @@ def _get_impl():
global _RPCIMPL
if _RPCIMPL is None:
try:
- _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
+ _RPCIMPL = importutils.import_module(CONF.rpc_backend)
except ImportError:
# For backwards compatibility with older nova config.
- impl = cfg.CONF.rpc_backend.replace('nova.rpc',
- 'nova.openstack.common.rpc')
+ impl = CONF.rpc_backend.replace('nova.rpc',
+ 'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl)
return _RPCIMPL
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py
index 105e6fcbe..9aadce733 100644
--- a/nova/openstack/common/rpc/amqp.py
+++ b/nova/openstack/common/rpc/amqp.py
@@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, rpc_common.serialize_msg(msg))
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py
index bf4f5a3de..d2f5a7b8f 100644
--- a/nova/openstack/common/rpc/common.py
+++ b/nova/openstack/common/rpc/common.py
@@ -289,7 +289,7 @@ def deserialize_remote_exception(conf, data):
# NOTE(ameade): We DO NOT want to allow just any module to be imported, in
# order to prevent arbitrary code execution.
- if not module in conf.allowed_rpc_exception_modules:
+ if module not in conf.allowed_rpc_exception_modules:
return RemoteError(name, failure.get('message'), trace)
try:
diff --git a/nova/openstack/common/rpc/impl_fake.py b/nova/openstack/common/rpc/impl_fake.py
index 4d133a1af..f43dbfe91 100644
--- a/nova/openstack/common/rpc/impl_fake.py
+++ b/nova/openstack/common/rpc/impl_fake.py
@@ -167,7 +167,7 @@ def cast(conf, context, topic, msg):
pass
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
check_serialize(msg)
diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py
index 305dc7877..4b5550d6f 100644
--- a/nova/openstack/common/rpc/impl_kombu.py
+++ b/nova/openstack/common/rpc/impl_kombu.py
@@ -66,7 +66,8 @@ kombu_opts = [
help='the RabbitMQ userid'),
cfg.StrOpt('rabbit_password',
default='guest',
- help='the RabbitMQ password'),
+ help='the RabbitMQ password',
+ secret=True),
cfg.StrOpt('rabbit_virtual_host',
default='/',
help='the RabbitMQ virtual host'),
@@ -175,7 +176,7 @@ class ConsumerBase(object):
try:
self.queue.cancel(self.tag)
except KeyError, e:
- # NOTE(comstud): Kludge to get around an amqplib bug
+ # NOTE(comstud): Kludge to get around a amqplib bug
if str(e) != "u'%s'" % self.tag:
raise
self.queue = None
@@ -302,9 +303,15 @@ class Publisher(object):
channel=channel,
routing_key=self.routing_key)
- def send(self, msg):
+ def send(self, msg, timeout=None):
"""Send a message"""
- self.producer.publish(msg)
+ if timeout:
+ #
+ # AMQP TTL is in milliseconds when set in the header.
+ #
+ self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
+ else:
+ self.producer.publish(msg)
class DirectPublisher(Publisher):
@@ -653,7 +660,7 @@ class Connection(object):
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
- def publisher_send(self, cls, topic, msg, **kwargs):
+ def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
"""Send to a publisher based on the publisher class"""
def _error_callback(exc):
@@ -663,7 +670,7 @@ class Connection(object):
def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs)
- publisher.send(msg)
+ publisher.send(msg, timeout)
self.ensure(_error_callback, _publish)
@@ -691,9 +698,9 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg):
+ def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg)
+ self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
@@ -701,7 +708,7 @@ class Connection(object):
def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
- self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
+ self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None):
"""Consume from all queues/consumers"""
diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py
index 2e05f02f1..544d33790 100644
--- a/nova/openstack/common/rpc/impl_qpid.py
+++ b/nova/openstack/common/rpc/impl_qpid.py
@@ -22,16 +22,18 @@ import uuid
import eventlet
import greenlet
-import qpid.messaging
-import qpid.messaging.exceptions
from nova.openstack.common import cfg
from nova.openstack.common.gettextutils import _
+from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova.openstack.common.rpc import amqp as rpc_amqp
from nova.openstack.common.rpc import common as rpc_common
+qpid_messaging = importutils.try_import("qpid.messaging")
+qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
+
LOG = logging.getLogger(__name__)
qpid_opts = [
@@ -49,7 +51,8 @@ qpid_opts = [
help='Username for qpid connection'),
cfg.StrOpt('qpid_password',
default='',
- help='Password for qpid connection'),
+ help='Password for qpid connection',
+ secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
@@ -275,6 +278,9 @@ class Connection(object):
pool = None
def __init__(self, conf, server_params=None):
+ if not qpid_messaging:
+ raise ImportError("Failed to import qpid.messaging")
+
self.session = None
self.consumers = {}
self.consumer_thread = None
@@ -303,7 +309,7 @@ class Connection(object):
def connection_create(self, broker):
# Create the connection - this does not open the connection
- self.connection = qpid.messaging.Connection(broker)
+ self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
@@ -328,7 +334,7 @@ class Connection(object):
if self.connection.opened():
try:
self.connection.close()
- except qpid.messaging.exceptions.ConnectionError:
+ except qpid_exceptions.ConnectionError:
pass
attempt = 0
@@ -340,7 +346,7 @@ class Connection(object):
try:
self.connection_create(broker)
self.connection.open()
- except qpid.messaging.exceptions.ConnectionError, e:
+ except qpid_exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
@@ -367,8 +373,8 @@ class Connection(object):
while True:
try:
return method(*args, **kwargs)
- except (qpid.messaging.exceptions.Empty,
- qpid.messaging.exceptions.ConnectionError), e:
+ except (qpid_exceptions.Empty,
+ qpid_exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
@@ -408,7 +414,7 @@ class Connection(object):
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
- if isinstance(exc, qpid.messaging.exceptions.Empty):
+ if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
@@ -481,9 +487,20 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg):
+ def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg)
+ #
+ # We want to create a message with attributes, e.g. a TTL. We
+ # don't really need to keep 'msg' in its JSON format any longer
+ # so let's create an actual qpid message here and get some
+ # value-add on the go.
+ #
+ # WARNING: Request timeout happens to be in the same units as
+ # qpid's TTL (seconds). If this changes in the future, then this
+ # will need to be altered accordingly.
+ #
+ qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
+ self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py
index d99d390f2..2c0631548 100644
--- a/nova/openstack/common/rpc/impl_zmq.py
+++ b/nova/openstack/common/rpc/impl_zmq.py
@@ -14,23 +14,24 @@
# License for the specific language governing permissions and limitations
# under the License.
+import os
import pprint
import socket
-import string
import sys
import types
import uuid
import eventlet
-from eventlet.green import zmq
import greenlet
from nova.openstack.common import cfg
from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
+from nova.openstack.common import processutils as utils
from nova.openstack.common.rpc import common as rpc_common
+zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified.
pformat = pprint.pformat
@@ -61,6 +62,10 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
+ cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
+ help='Maximum number of ingress messages to locally buffer '
+ 'per topic. Default is unlimited.'),
+
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
@@ -70,9 +75,9 @@ zmq_opts = [
]
-# These globals are defined in register_opts(conf),
-# a mandatory initialization call
-CONF = None
+CONF = cfg.CONF
+CONF.register_opts(zmq_opts)
+
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
@@ -84,7 +89,7 @@ def _serialize(data):
Error if a developer passes us bad data.
"""
try:
- return str(jsonutils.dumps(data, ensure_ascii=True))
+ return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
LOG.error(_("JSON serialization failed."))
raise
@@ -107,7 +112,7 @@ class ZmqSocket(object):
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
- self.sock = ZMQ_CTX.socket(zmq_type)
+ self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr
self.type = zmq_type
self.subscriptions = []
@@ -181,11 +186,15 @@ class ZmqSocket(object):
pass
self.subscriptions = []
- # Linger -1 prevents lost/dropped messages
try:
- self.sock.close(linger=-1)
+ # Default is to linger
+ self.sock.close()
except Exception:
- pass
+ # While this is a bad thing to happen,
+ # it would be much worse if some of the code calling this
+ # were to fail. For now, lets log, and later evaluate
+ # if we can safely raise here.
+ LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
def recv(self):
@@ -202,14 +211,17 @@ class ZmqSocket(object):
class ZmqClient(object):
"""Client for ZMQ sockets."""
- def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+ def __init__(self, addr, socket_type=None, bind=False):
+ if socket_type is None:
+ socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ msg_id = msg_id or 0
+
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
- self.outq.send([str(msg_id), str(topic), str('cast'),
- _serialize(data)])
+ self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
def close(self):
self.outq.close()
@@ -283,13 +295,13 @@ class InternalContext(object):
ctx.replies)
LOG.debug(_("Sending reply"))
- cast(CONF, ctx, topic, {
+ _multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
- 'msg_id': msg_id,
+ 'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
- })
+ }, _msg_id=msg_id)
class ConsumerBase(object):
@@ -309,21 +321,22 @@ class ConsumerBase(object):
return [result]
def process(self, style, target, proxy, ctx, data):
+ data.setdefault('version', None)
+ data.setdefault('args', {})
+
# Method starting with - are
# processed internally. (non-valid method name)
- method = data['method']
+ method = data.get('method')
+ if not method:
+ LOG.error(_("RPC message did not include method."))
+ return
# Internal method
# uses internal context for safety.
- if data['method'][0] == '-':
- # For reply / process_reply
- method = method[1:]
- if method == 'reply':
- self.private_ctx.reply(ctx, proxy, **data['args'])
+ if method == '-reply':
+ self.private_ctx.reply(ctx, proxy, **data['args'])
return
- data.setdefault('version', None)
- data.setdefault('args', {})
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
@@ -413,12 +426,6 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
- ipc_dir = CONF.rpc_zmq_ipc_dir
-
- self.topic_proxy['zmq_replies'] = \
- ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
- zmq.PUB, bind=True)
- self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
@@ -430,34 +437,87 @@ class ZmqProxy(ZmqBaseReactor):
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- # Handle zmq_replies magic
- if topic.startswith('fanout~'):
+ if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- elif topic.startswith('zmq_replies'):
- sock_type = zmq.PUB
- inside = rpc_common.deserialize_msg(_deserialize(in_msg))
- msg_id = inside[-1]['args']['msg_id']
- response = inside[-1]['args']['response']
- LOG.debug(_("->response->%s"), response)
- data = [str(msg_id), _serialize(response)]
else:
sock_type = zmq.PUSH
- if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
+ if topic not in self.topic_proxy:
+ def publisher(waiter):
+ LOG.info(_("Creating proxy for topic: %s"), topic)
+
+ try:
+ out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
+ (ipc_dir, topic),
+ sock_type, bind=True)
+ except RPCException:
+ waiter.send_exception(*sys.exc_info())
+ return
+
+ self.topic_proxy[topic] = eventlet.queue.LightQueue(
+ CONF.rpc_zmq_topic_backlog)
+ self.sockets.append(out_sock)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
+
+ waiter.send(True)
+
+ while(True):
+ data = self.topic_proxy[topic].get()
+ out_sock.send(data)
+ LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
+ {'data': data})
+
+ wait_sock_creation = eventlet.event.Event()
+ eventlet.spawn(publisher, wait_sock_creation)
+
+ try:
+ wait_sock_creation.wait()
+ except RPCException:
+ LOG.error(_("Topic socket file creation failed."))
+ return
+
+ try:
+ self.topic_proxy[topic].put_nowait(data)
+ LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
+ {'data': data})
+ except eventlet.queue.Full:
+ LOG.error(_("Local per-topic backlog buffer full for topic "
+ "%(topic)s. Dropping message.") % {'topic': topic})
+
+ def consume_in_thread(self):
+ """Runs the ZmqProxy service"""
+ ipc_dir = CONF.rpc_zmq_ipc_dir
+ consume_in = "tcp://%s:%s" % \
+ (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port)
+ consumption_proxy = InternalContext(None)
+
+ if not os.path.isdir(ipc_dir):
+ try:
+ utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+ utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+ ipc_dir, run_as_root=True)
+ utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+ except utils.ProcessExecutionError:
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
+ raise
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
+ try:
+ self.register(consumption_proxy,
+ consume_in,
+ zmq.PULL,
+ out_bind=True)
+ except zmq.ZMQError:
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
+ raise
- LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
- self.topic_proxy[topic].send(data)
- LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+ super(ZmqProxy, self).consume_in_thread()
class ZmqReactor(ZmqBaseReactor):
@@ -533,8 +593,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+def _cast(addr, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False, _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -543,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -551,7 +611,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None):
+def _call(addr, context, topic, msg, timeout=None,
+ serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -586,17 +647,20 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload)
+ _cast(addr, context, topic, payload,
+ serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])
+ responses = _deserialize(msg[-1])[-1]['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
+ except (IndexError, KeyError):
+ raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
@@ -613,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+ force_envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -622,7 +686,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
- queues = matchmaker.queues(topic)
+ queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
@@ -639,10 +703,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout, serialize,
- force_envelope)
+ _topic, msg, timeout, serialize,
+ force_envelope, _msg_id)
return
- return method(_addr, context, _topic, _topic, msg, timeout)
+ return method(_addr, context, _topic, msg, timeout,
+ serialize, force_envelope)
def create_connection(conf, new=True):
@@ -689,44 +754,26 @@ def notify(conf, context, topic, msg, **kwargs):
def cleanup():
"""Clean up resources in use by implementation."""
global ZMQ_CTX
+ if ZMQ_CTX:
+ ZMQ_CTX.term()
+ ZMQ_CTX = None
+
global matchmaker
matchmaker = None
- ZMQ_CTX.term()
- ZMQ_CTX = None
-def register_opts(conf):
- """Registration of options for this driver."""
- #NOTE(ewindisch): ZMQ_CTX and matchmaker
- # are initialized here as this is as good
- # an initialization method as any.
+def _get_ctxt():
+ if not zmq:
+ raise ImportError("Failed to import eventlet.green.zmq")
- # We memoize through these globals
global ZMQ_CTX
- global matchmaker
- global CONF
-
- if not CONF:
- conf.register_opts(zmq_opts)
- CONF = conf
- # Don't re-set, if this method is called twice.
if not ZMQ_CTX:
- ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
- if not matchmaker:
- # rpc_zmq_matchmaker should be set to a 'module.Class'
- mm_path = conf.rpc_zmq_matchmaker.split('.')
- mm_module = '.'.join(mm_path[:-1])
- mm_class = mm_path[-1]
+ ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
+ return ZMQ_CTX
- # Only initialize a class.
- if mm_path[-1][0] not in string.ascii_uppercase:
- LOG.error(_("Matchmaker could not be loaded.\n"
- "rpc_zmq_matchmaker is not a class."))
- raise RPCException(_("Error loading Matchmaker."))
- mm_impl = importutils.import_module(mm_module)
- mm_constructor = getattr(mm_impl, mm_class)
- matchmaker = mm_constructor()
-
-
-register_opts(cfg.CONF)
+def _get_matchmaker():
+ global matchmaker
+ if not matchmaker:
+ matchmaker = importutils.import_object(CONF.rpc_zmq_matchmaker)
+ return matchmaker