summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2012-11-13 14:24:25 -0500
committerRussell Bryant <rbryant@redhat.com>2012-11-13 14:25:31 -0500
commitb2facb4f4ff2ee75366ec9e51dd59eede5bbb47d (patch)
tree915db75cdef90a31a90785059f22d8e8f6e6204b
parent083bccd34bc35dc043f17ff6ade66c64dd4bead2 (diff)
downloadnova-b2facb4f4ff2ee75366ec9e51dd59eede5bbb47d.tar.gz
nova-b2facb4f4ff2ee75366ec9e51dd59eede5bbb47d.tar.xz
nova-b2facb4f4ff2ee75366ec9e51dd59eede5bbb47d.zip
Sync latest code from oslo-incubator.
Change-Id: I463a6d934d17f3374763472cee7291234238c50d
-rw-r--r--nova/openstack/common/cfg.py6
-rw-r--r--nova/openstack/common/gettextutils.py2
-rw-r--r--nova/openstack/common/lockutils.py1
-rw-r--r--nova/openstack/common/notifier/rabbit_notifier.py31
-rw-r--r--nova/openstack/common/notifier/rpc_notifier.py46
-rw-r--r--nova/openstack/common/rpc/impl_kombu.py14
-rw-r--r--nova/openstack/common/rpc/impl_qpid.py88
-rw-r--r--nova/openstack/common/rpc/service.py70
-rw-r--r--nova/openstack/common/setup.py16
9 files changed, 178 insertions, 96 deletions
diff --git a/nova/openstack/common/cfg.py b/nova/openstack/common/cfg.py
index 36e5e0ab0..8775a5f8a 100644
--- a/nova/openstack/common/cfg.py
+++ b/nova/openstack/common/cfg.py
@@ -236,10 +236,10 @@ log files:
This module also contains a global instance of the CommonConfigOpts class
in order to support a common usage pattern in OpenStack:
- from openstack.common import cfg
+ from nova.openstack.common import cfg
opts = [
- cfg.StrOpt('bind_host' default='0.0.0.0'),
+ cfg.StrOpt('bind_host', default='0.0.0.0'),
cfg.IntOpt('bind_port', default=9292),
]
@@ -1507,7 +1507,7 @@ class ConfigOpts(collections.Mapping):
if ('default' in info or 'override' in info):
continue
- if self._get(opt.name, group) is None:
+ if self._get(opt.dest, group) is None:
raise RequiredOptError(opt.name, group)
def _parse_cli_opts(self, args):
diff --git a/nova/openstack/common/gettextutils.py b/nova/openstack/common/gettextutils.py
index 235350cc4..d52309e62 100644
--- a/nova/openstack/common/gettextutils.py
+++ b/nova/openstack/common/gettextutils.py
@@ -20,7 +20,7 @@ gettext for openstack-common modules.
Usual usage in an openstack.common module:
- from openstack.common.gettextutils import _
+ from nova.openstack.common.gettextutils import _
"""
import gettext
diff --git a/nova/openstack/common/lockutils.py b/nova/openstack/common/lockutils.py
index 2840ce6f7..ba390dc69 100644
--- a/nova/openstack/common/lockutils.py
+++ b/nova/openstack/common/lockutils.py
@@ -24,7 +24,6 @@ import tempfile
import time
import weakref
-from eventlet import greenthread
from eventlet import semaphore
from nova.openstack.common import cfg
diff --git a/nova/openstack/common/notifier/rabbit_notifier.py b/nova/openstack/common/notifier/rabbit_notifier.py
index c7b3f54fe..11067fb0a 100644
--- a/nova/openstack/common/notifier/rabbit_notifier.py
+++ b/nova/openstack/common/notifier/rabbit_notifier.py
@@ -1,4 +1,4 @@
-# Copyright 2011 OpenStack LLC.
+# Copyright 2012 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -14,33 +14,16 @@
# under the License.
-from nova.openstack.common import cfg
-from nova.openstack.common import context as req_context
from nova.openstack.common.gettextutils import _
from nova.openstack.common import log as logging
-from nova.openstack.common import rpc
+from nova.openstack.common.notifier import rpc_notifier
LOG = logging.getLogger(__name__)
-notification_topic_opt = cfg.ListOpt(
- 'notification_topics', default=['notifications', ],
- help='AMQP topic used for openstack notifications')
-
-CONF = cfg.CONF
-CONF.register_opt(notification_topic_opt)
-
def notify(context, message):
- """Sends a notification to the RabbitMQ"""
- if not context:
- context = req_context.get_admin_context()
- priority = message.get('priority',
- CONF.default_notification_level)
- priority = priority.lower()
- for topic in CONF.notification_topics:
- topic = '%s.%s' % (topic, priority)
- try:
- rpc.notify(context, topic, message)
- except Exception, e:
- LOG.exception(_("Could not send notification to %(topic)s. "
- "Payload=%(message)s"), locals())
+ """Deprecated in Grizzly. Please use rpc_notifier instead."""
+
+ LOG.deprecated(_("The rabbit_notifier is now deprecated."
+ " Please use rpc_notifier instead."))
+ rpc_notifier.notify(context, message)
diff --git a/nova/openstack/common/notifier/rpc_notifier.py b/nova/openstack/common/notifier/rpc_notifier.py
new file mode 100644
index 000000000..aa9e8860e
--- /dev/null
+++ b/nova/openstack/common/notifier/rpc_notifier.py
@@ -0,0 +1,46 @@
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from nova.openstack.common import cfg
+from nova.openstack.common import context as req_context
+from nova.openstack.common.gettextutils import _
+from nova.openstack.common import log as logging
+from nova.openstack.common import rpc
+
+LOG = logging.getLogger(__name__)
+
+notification_topic_opt = cfg.ListOpt(
+ 'notification_topics', default=['notifications', ],
+ help='AMQP topic used for openstack notifications')
+
+CONF = cfg.CONF
+CONF.register_opt(notification_topic_opt)
+
+
+def notify(context, message):
+ """Sends a notification via RPC"""
+ if not context:
+ context = req_context.get_admin_context()
+ priority = message.get('priority',
+ CONF.default_notification_level)
+ priority = priority.lower()
+ for topic in CONF.notification_topics:
+ topic = '%s.%s' % (topic, priority)
+ try:
+ rpc.notify(context, topic, message)
+ except Exception, e:
+ LOG.exception(_("Could not send notification to %(topic)s. "
+ "Payload=%(message)s"), locals())
diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py
index 46295d90f..bb0ade27c 100644
--- a/nova/openstack/common/rpc/impl_kombu.py
+++ b/nova/openstack/common/rpc/impl_kombu.py
@@ -409,18 +409,18 @@ class Connection(object):
hostname, port = network_utils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
- params = {}
+ params = {
+ 'hostname': hostname,
+ 'port': port,
+ 'userid': self.conf.rabbit_userid,
+ 'password': self.conf.rabbit_password,
+ 'virtual_host': self.conf.rabbit_virtual_host,
+ }
for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
- params.setdefault('hostname', hostname)
- params.setdefault('port', port)
- params.setdefault('userid', self.conf.rabbit_userid)
- params.setdefault('password', self.conf.rabbit_password)
- params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
-
if self.conf.fake_rabbit:
params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py
index 70a03c5bf..b87050753 100644
--- a/nova/openstack/common/rpc/impl_qpid.py
+++ b/nova/openstack/common/rpc/impl_qpid.py
@@ -50,24 +50,6 @@ qpid_opts = [
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
help='Space separated list of SASL mechanisms to use for auth'),
- cfg.BoolOpt('qpid_reconnect',
- default=True,
- help='Automatically reconnect'),
- cfg.IntOpt('qpid_reconnect_timeout',
- default=0,
- help='Reconnection timeout in seconds'),
- cfg.IntOpt('qpid_reconnect_limit',
- default=0,
- help='Max reconnections before giving up'),
- cfg.IntOpt('qpid_reconnect_interval_min',
- default=0,
- help='Minimum seconds between reconnection attempts'),
- cfg.IntOpt('qpid_reconnect_interval_max',
- default=0,
- help='Maximum seconds between reconnection attempts'),
- cfg.IntOpt('qpid_reconnect_interval',
- default=0,
- help='Equivalent to setting max and min to the same value'),
cfg.IntOpt('qpid_heartbeat',
default=60,
help='Seconds between connection keepalive heartbeats'),
@@ -294,50 +276,36 @@ class Connection(object):
self.consumer_thread = None
self.conf = conf
- if server_params is None:
- server_params = {}
-
- default_params = dict(hostname=self.conf.qpid_hostname,
- port=self.conf.qpid_port,
- username=self.conf.qpid_username,
- password=self.conf.qpid_password)
-
- params = server_params
- for key in default_params.keys():
- params.setdefault(key, default_params[key])
+ params = {
+ 'hostname': self.conf.qpid_hostname,
+ 'port': self.conf.qpid_port,
+ 'username': self.conf.qpid_username,
+ 'password': self.conf.qpid_password,
+ }
+ params.update(server_params or {})
self.broker = params['hostname'] + ":" + str(params['port'])
+ self.username = params['username']
+ self.password = params['password']
+ self.connection_create()
+ self.reconnect()
+
+ def connection_create(self):
# Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker)
# Check if flags are set and if so set them for the connection
# before we call open
- self.connection.username = params['username']
- self.connection.password = params['password']
+ self.connection.username = self.username
+ self.connection.password = self.password
+
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
- self.connection.reconnect = self.conf.qpid_reconnect
- if self.conf.qpid_reconnect_timeout:
- self.connection.reconnect_timeout = (
- self.conf.qpid_reconnect_timeout)
- if self.conf.qpid_reconnect_limit:
- self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
- if self.conf.qpid_reconnect_interval_max:
- self.connection.reconnect_interval_max = (
- self.conf.qpid_reconnect_interval_max)
- if self.conf.qpid_reconnect_interval_min:
- self.connection.reconnect_interval_min = (
- self.conf.qpid_reconnect_interval_min)
- if self.conf.qpid_reconnect_interval:
- self.connection.reconnect_interval = (
- self.conf.qpid_reconnect_interval)
+ # Reconnection is done by self.reconnect()
+ self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
- # Open is part of reconnect -
- # NOTE(WGH) not sure we need this with the reconnect flags
- self.reconnect()
-
def _register_consumer(self, consumer):
self.consumers[str(consumer.get_receiver())] = consumer
@@ -352,12 +320,18 @@ class Connection(object):
except qpid.messaging.exceptions.ConnectionError:
pass
+ delay = 1
while True:
try:
+ self.connection_create()
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
- LOG.error(_('Unable to connect to AMQP server: %s'), e)
- time.sleep(self.conf.qpid_reconnect_interval or 1)
+ msg_dict = dict(e=e, delay=delay)
+ msg = _("Unable to connect to AMQP server: %(e)s. "
+ "Sleeping %(delay)s seconds") % msg_dict
+ LOG.error(msg)
+ time.sleep(delay)
+ delay = min(2 * delay, 60)
else:
break
@@ -365,10 +339,14 @@ class Connection(object):
self.session = self.connection.session()
- for consumer in self.consumers.itervalues():
- consumer.reconnect(self.session)
-
if self.consumers:
+ consumers = self.consumers
+ self.consumers = {}
+
+ for consumer in consumers.itervalues():
+ consumer.reconnect(self.session)
+ self._register_consumer(consumer)
+
LOG.debug(_("Re-established AMQP queues"))
def ensure(self, error_callback, method, *args, **kwargs):
diff --git a/nova/openstack/common/rpc/service.py b/nova/openstack/common/rpc/service.py
new file mode 100644
index 000000000..15508e432
--- /dev/null
+++ b/nova/openstack/common/rpc/service.py
@@ -0,0 +1,70 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nova.openstack.common.gettextutils import _
+from nova.openstack.common import log as logging
+from nova.openstack.common import rpc
+from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
+from nova.openstack.common import service
+
+
+LOG = logging.getLogger(__name__)
+
+
+class Service(service.Service):
+ """Service object for binaries running on hosts.
+
+ A service enables rpc by listening to queues based on topic and host."""
+ def __init__(self, host, topic, manager=None):
+ super(Service, self).__init__()
+ self.host = host
+ self.topic = topic
+ if manager is None:
+ self.manager = self
+ else:
+ self.manager = manager
+
+ def start(self):
+ super(Service, self).start()
+
+ self.conn = rpc.create_connection(new=True)
+ LOG.debug(_("Creating Consumer connection for Service %s") %
+ self.topic)
+
+ dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
+
+ # Share this same connection for these Consumers
+ self.conn.create_consumer(self.topic, dispatcher, fanout=False)
+
+ node_topic = '%s.%s' % (self.topic, self.host)
+ self.conn.create_consumer(node_topic, dispatcher, fanout=False)
+
+ self.conn.create_consumer(self.topic, dispatcher, fanout=True)
+
+ # Consume from all consumers in a thread
+ self.conn.consume_in_thread()
+
+ def stop(self):
+ # Try to shut the connection down, but if we get any sort of
+ # errors, go ahead and ignore them.. as we're shutting down anyway
+ try:
+ self.conn.close()
+ except Exception:
+ pass
+ super(Service, self).stop()
diff --git a/nova/openstack/common/setup.py b/nova/openstack/common/setup.py
index 4e2a57717..e6f72f034 100644
--- a/nova/openstack/common/setup.py
+++ b/nova/openstack/common/setup.py
@@ -117,8 +117,12 @@ def write_requirements():
def _run_shell_command(cmd):
- output = subprocess.Popen(["/bin/sh", "-c", cmd],
- stdout=subprocess.PIPE)
+ if os.name == 'nt':
+ output = subprocess.Popen(["cmd.exe", "/C", cmd],
+ stdout=subprocess.PIPE)
+ else:
+ output = subprocess.Popen(["/bin/sh", "-c", cmd],
+ stdout=subprocess.PIPE)
out = output.communicate()
if len(out) == 0:
return None
@@ -136,15 +140,17 @@ def _get_git_next_version_suffix(branch_name):
_run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
milestone_cmd = "git show meta/openstack/release:%s" % branch_name
milestonever = _run_shell_command(milestone_cmd)
- if not milestonever:
- milestonever = ""
+ if milestonever:
+ first_half = "%s~%s" % (milestonever, datestamp)
+ else:
+ first_half = datestamp
+
post_version = _get_git_post_version()
# post version should look like:
# 0.1.1.4.gcc9e28a
# where the bit after the last . is the short sha, and the bit between
# the last and second to last is the revno count
(revno, sha) = post_version.split(".")[-2:]
- first_half = "%s~%s" % (milestonever, datestamp)
second_half = "%s%s.%s" % (revno_prefix, revno, sha)
return ".".join((first_half, second_half))