summaryrefslogtreecommitdiffstats
path: root/nova/openstack
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2013-03-12 14:24:49 -0400
committerRussell Bryant <rbryant@redhat.com>2013-03-12 14:24:49 -0400
commitd984b0b0bcbd17567f127395c8b890cddd9edd30 (patch)
tree91ff2359350497c8619d5cecd63737e3f04ef579 /nova/openstack
parent562b0787421b145c4a91ccbbcacfc74c340cead8 (diff)
downloadnova-d984b0b0bcbd17567f127395c8b890cddd9edd30.tar.gz
nova-d984b0b0bcbd17567f127395c8b890cddd9edd30.tar.xz
nova-d984b0b0bcbd17567f127395c8b890cddd9edd30.zip
Sync rpc from oslo-incubator.
Changes include: * Fix Copyright Headers - Rename LLC to Foundation * Preserve exceptions in impl_zmq. * Don't log a traceback on rpc call timeout. * Redis-based matchmaker * AMQP: skip duplicate messages * some other minor fixes / cleanups Fix bug 1154232. Change-Id: I8afb6b9a919fca34b4a93f7fad1551a023eba6b2
Diffstat (limited to 'nova/openstack')
-rw-r--r--nova/openstack/common/rpc/amqp.py59
-rw-r--r--nova/openstack/common/rpc/common.py4
-rw-r--r--nova/openstack/common/rpc/impl_kombu.py4
-rw-r--r--nova/openstack/common/rpc/impl_qpid.py4
-rw-r--r--nova/openstack/common/rpc/impl_zmq.py34
-rw-r--r--nova/openstack/common/rpc/matchmaker.py169
6 files changed, 250 insertions, 24 deletions
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py
index 3f25eed67..c23e36ec8 100644
--- a/nova/openstack/common/rpc/amqp.py
+++ b/nova/openstack/common/rpc/amqp.py
@@ -25,25 +25,27 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
AMQP, but is deprecated and predates this code.
"""
+import collections
import inspect
import sys
import uuid
from eventlet import greenpool
from eventlet import pools
-from eventlet import semaphore
from eventlet import queue
-
+from eventlet import semaphore
# TODO(pekowsk): Remove import cfg and below comment in Havana.
# This import should no longer be needed when the amqp_rpc_single_reply_queue
# option is removed.
from oslo.config import cfg
+
from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _
from nova.openstack.common import local
from nova.openstack.common import log as logging
from nova.openstack.common.rpc import common as rpc_common
+
# TODO(pekowski): Remove this option in Havana.
amqp_opts = [
cfg.BoolOpt('amqp_rpc_single_reply_queue',
@@ -54,6 +56,7 @@ amqp_opts = [
cfg.CONF.register_opts(amqp_opts)
+UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
@@ -236,6 +239,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
'failure': failure}
if ending:
msg['ending'] = True
+ _add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibilty.
@@ -302,6 +306,37 @@ def pack_context(msg, context):
msg.update(context_d)
+class _MsgIdCache(object):
+ """This class checks any duplicate messages."""
+
+ # NOTE: This value is considered can be a configuration item, but
+ # it is not necessary to change its value in most cases,
+ # so let this value as static for now.
+ DUP_MSG_CHECK_SIZE = 16
+
+ def __init__(self, **kwargs):
+ self.prev_msgids = collections.deque([],
+ maxlen=self.DUP_MSG_CHECK_SIZE)
+
+ def check_duplicate_message(self, message_data):
+ """AMQP consumers may read same message twice when exceptions occur
+ before ack is returned. This method prevents doing it.
+ """
+ if UNIQUE_ID in message_data:
+ msg_id = message_data[UNIQUE_ID]
+ if msg_id not in self.prev_msgids:
+ self.prev_msgids.append(msg_id)
+ else:
+ raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+
+
+def _add_unique_id(msg):
+ """Add unique_id for checking duplicate messages."""
+ unique_id = uuid.uuid4().hex
+ msg.update({UNIQUE_ID: unique_id})
+ LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
+
+
class _ThreadPoolWithWait(object):
"""Base class for a delayed invocation manager used by
the Connection class to start up green threads
@@ -349,6 +384,7 @@ class ProxyCallback(_ThreadPoolWithWait):
connection_pool=connection_pool,
)
self.proxy = proxy
+ self.msg_id_cache = _MsgIdCache()
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
@@ -368,6 +404,7 @@ class ProxyCallback(_ThreadPoolWithWait):
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+ self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
@@ -406,9 +443,11 @@ class ProxyCallback(_ThreadPoolWithWait):
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
- LOG.exception(_('Exception during message handling'))
- ctxt.reply(None, sys.exc_info(),
- connection_pool=self.connection_pool)
+ # sys.exc_info() is deleted by LOG.exception().
+ exc_info = sys.exc_info()
+ LOG.error(_('Exception during message handling'),
+ exc_info=exc_info)
+ ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
class MulticallProxyWaiter(object):
@@ -422,6 +461,7 @@ class MulticallProxyWaiter(object):
self._dataqueue = queue.LightQueue()
# Add this caller to the reply proxy's call_waiters
self._reply_proxy.add_call_waiter(self, self._msg_id)
+ self.msg_id_cache = _MsgIdCache()
def put(self, data):
self._dataqueue.put(data)
@@ -435,6 +475,7 @@ class MulticallProxyWaiter(object):
def _process_data(self, data):
result = None
+ self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
result = rpc_common.deserialize_remote_exception(self._conf,
@@ -479,6 +520,7 @@ class MulticallWaiter(object):
self._done = False
self._got_ending = False
self._conf = conf
+ self.msg_id_cache = _MsgIdCache()
def done(self):
if self._done:
@@ -490,6 +532,7 @@ class MulticallWaiter(object):
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
+ self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
@@ -542,6 +585,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
+ _add_unique_id(msg)
pack_context(msg, context)
# TODO(pekowski): Remove this flag and the code under the if clause
@@ -575,6 +619,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg))
@@ -583,6 +628,7 @@ def cast(conf, context, topic, msg, connection_pool):
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
@@ -590,6 +636,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@@ -599,6 +646,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
def fanout_cast_to_server(conf, context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@@ -610,6 +658,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py
index 55fc5b044..5fca30717 100644
--- a/nova/openstack/common/rpc/common.py
+++ b/nova/openstack/common/rpc/common.py
@@ -125,6 +125,10 @@ class Timeout(RPCException):
message = _("Timeout while waiting on RPC response.")
+class DuplicateMessageError(RPCException):
+ message = _("Found duplicate message(%(msg_id)s). Skipping it.")
+
+
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py
index 0d83253f1..81afc2a8b 100644
--- a/nova/openstack/common/rpc/impl_kombu.py
+++ b/nova/openstack/common/rpc/impl_kombu.py
@@ -624,8 +624,8 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, socket.timeout):
- LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ LOG.debug(_('Timed out waiting for RPC response: %s') %
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py
index 542256d0c..fd4b25e7f 100644
--- a/nova/openstack/common/rpc/impl_qpid.py
+++ b/nova/openstack/common/rpc/impl_qpid.py
@@ -415,8 +415,8 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
- LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ LOG.debug(_('Timed out waiting for RPC response: %s') %
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py
index c1cca34e8..4102146fb 100644
--- a/nova/openstack/common/rpc/impl_zmq.py
+++ b/nova/openstack/common/rpc/impl_zmq.py
@@ -25,6 +25,7 @@ import eventlet
import greenlet
from oslo.config import cfg
+from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
@@ -91,8 +92,8 @@ def _serialize(data):
try:
return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
- LOG.error(_("JSON serialization failed."))
- raise
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("JSON serialization failed."))
def _deserialize(data):
@@ -511,9 +512,9 @@ class ZmqProxy(ZmqBaseReactor):
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
- LOG.error(_("Could not create IPC directory %s") %
- (ipc_dir, ))
- raise
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
try:
self.register(consumption_proxy,
@@ -521,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor):
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
- LOG.error(_("Could not create ZeroMQ receiver daemon. "
- "Socket may already be in use."))
- raise
+ with excutils.save_and_reraise_exception():
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
super(ZmqProxy, self).consume_in_thread()
@@ -594,6 +595,9 @@ class Connection(rpc_common.Connection):
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
+ # Register with matchmaker.
+ _get_matchmaker().register(topic, CONF.rpc_zmq_host)
+
# Subscription scenarios
if fanout:
sock_type = zmq.SUB
@@ -620,6 +624,10 @@ class Connection(rpc_common.Connection):
self.topics.append(topic)
def close(self):
+ _get_matchmaker().stop_heartbeat()
+ for topic in self.topics:
+ _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
+
self.reactor.close()
self.topics = []
@@ -627,6 +635,7 @@ class Connection(rpc_common.Connection):
self.reactor.wait()
def consume_in_thread(self):
+ _get_matchmaker().start_heartbeat()
self.reactor.consume_in_thread()
@@ -742,7 +751,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
LOG.warn(_("No matchmaker results. Not casting."))
# While not strictly a timeout, callers know how to handle
# this exception and a timeout isn't too big a lie.
- raise rpc_common.Timeout, "No match from matchmaker."
+ raise rpc_common.Timeout(_("No match from matchmaker."))
# This supports brokerless fanout (addresses > 1)
for queue in queues:
@@ -785,7 +794,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
-def notify(conf, context, topic, msg, **kwargs):
+def notify(conf, context, topic, msg, envelope):
"""
Send notification event.
Notifications are sent to topic-priority.
@@ -793,9 +802,8 @@ def notify(conf, context, topic, msg, **kwargs):
"""
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
- topic.replace('.', '-')
- kwargs['envelope'] = kwargs.get('envelope', True)
- cast(conf, context, topic, msg, **kwargs)
+ topic = topic.replace('.', '-')
+ cast(conf, context, topic, msg, envelope=envelope)
def cleanup():
diff --git a/nova/openstack/common/rpc/matchmaker.py b/nova/openstack/common/rpc/matchmaker.py
index 57cc0b34c..e4862396a 100644
--- a/nova/openstack/common/rpc/matchmaker.py
+++ b/nova/openstack/common/rpc/matchmaker.py
@@ -22,6 +22,7 @@ import contextlib
import itertools
import json
+import eventlet
from oslo.config import cfg
from nova.openstack.common.gettextutils import _
@@ -33,6 +34,12 @@ matchmaker_opts = [
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
+ cfg.IntOpt('matchmaker_heartbeat_freq',
+ default='300',
+ help='Heartbeat frequency'),
+ cfg.IntOpt('matchmaker_heartbeat_ttl',
+ default='600',
+ help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
@@ -70,12 +77,73 @@ class Binding(object):
class MatchMakerBase(object):
- """Match Maker Base Class."""
-
+ """
+ Match Maker Base Class.
+ Build off HeartbeatMatchMakerBase if building a
+ heartbeat-capable MatchMaker.
+ """
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
+ self.no_heartbeat_msg = _('Matchmaker does not implement '
+ 'registration or heartbeat.')
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ pass
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a key.host is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ pass
+
+ def is_alive(self, topic, host):
+ """
+ Checks if a host is alive.
+ """
+ pass
+
+ def expire(self, topic, host):
+ """
+ Explicitly expire a host's registration.
+ """
+ pass
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ pass
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ pass
+
+ def start_heartbeat(self):
+ """
+ Spawn heartbeat greenthread.
+ """
+ pass
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ pass
+
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
@@ -99,6 +167,103 @@ class MatchMakerBase(object):
return workers
+class HeartbeatMatchMakerBase(MatchMakerBase):
+ """
+ Base for a heart-beat capable MatchMaker.
+ Provides common methods for registering,
+ unregistering, and maintaining heartbeats.
+ """
+ def __init__(self):
+ self.hosts = set()
+ self._heart = None
+ self.host_topic = {}
+
+ super(HeartbeatMatchMakerBase, self).__init__()
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ for key, host in self.host_topic:
+ self.ack_alive(key, host)
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a host.topic is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ raise NotImplementedError("Must implement ack_alive")
+
+ def backend_register(self, key, host):
+ """
+ Implements registration logic.
+ Called by register(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_register")
+
+ def backend_unregister(self, key, key_host):
+ """
+ Implements de-registration logic.
+ Called by unregister(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_unregister")
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ self.hosts.add(host)
+ self.host_topic[(key, host)] = host
+ key_host = '.'.join((key, host))
+
+ self.backend_register(key, key_host)
+
+ self.ack_alive(key, host)
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ if (key, host) in self.host_topic:
+ del self.host_topic[(key, host)]
+
+ self.hosts.discard(host)
+ self.backend_unregister(key, '.'.join((key, host)))
+
+ LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
+
+ def start_heartbeat(self):
+ """
+ Implementation of MatchMakerBase.start_heartbeat
+ Launches greenthread looping send_heartbeats(),
+ yielding for CONF.matchmaker_heartbeat_freq seconds
+ between iterations.
+ """
+ if len(self.hosts) == 0:
+ raise MatchMakerException(
+ _("Register before starting heartbeat."))
+
+ def do_heartbeat():
+ while True:
+ self.send_heartbeats()
+ eventlet.sleep(CONF.matchmaker_heartbeat_freq)
+
+ self._heart = eventlet.spawn(do_heartbeat)
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ if self._heart:
+ self._heart.kill()
+
+
class DirectBinding(Binding):
"""
Specifies a host in the key via a '.' character