diff options
| author | Russell Bryant <rbryant@redhat.com> | 2012-11-13 14:24:25 -0500 |
|---|---|---|
| committer | Russell Bryant <rbryant@redhat.com> | 2012-11-13 14:25:31 -0500 |
| commit | b2facb4f4ff2ee75366ec9e51dd59eede5bbb47d (patch) | |
| tree | 915db75cdef90a31a90785059f22d8e8f6e6204b | |
| parent | 083bccd34bc35dc043f17ff6ade66c64dd4bead2 (diff) | |
| download | nova-b2facb4f4ff2ee75366ec9e51dd59eede5bbb47d.tar.gz nova-b2facb4f4ff2ee75366ec9e51dd59eede5bbb47d.tar.xz nova-b2facb4f4ff2ee75366ec9e51dd59eede5bbb47d.zip | |
Sync latest code from oslo-incubator.
Change-Id: I463a6d934d17f3374763472cee7291234238c50d
| -rw-r--r-- | nova/openstack/common/cfg.py | 6 | ||||
| -rw-r--r-- | nova/openstack/common/gettextutils.py | 2 | ||||
| -rw-r--r-- | nova/openstack/common/lockutils.py | 1 | ||||
| -rw-r--r-- | nova/openstack/common/notifier/rabbit_notifier.py | 31 | ||||
| -rw-r--r-- | nova/openstack/common/notifier/rpc_notifier.py | 46 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/impl_kombu.py | 14 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/impl_qpid.py | 88 | ||||
| -rw-r--r-- | nova/openstack/common/rpc/service.py | 70 | ||||
| -rw-r--r-- | nova/openstack/common/setup.py | 16 |
9 files changed, 178 insertions, 96 deletions
diff --git a/nova/openstack/common/cfg.py b/nova/openstack/common/cfg.py index 36e5e0ab0..8775a5f8a 100644 --- a/nova/openstack/common/cfg.py +++ b/nova/openstack/common/cfg.py @@ -236,10 +236,10 @@ log files: This module also contains a global instance of the CommonConfigOpts class in order to support a common usage pattern in OpenStack: - from openstack.common import cfg + from nova.openstack.common import cfg opts = [ - cfg.StrOpt('bind_host' default='0.0.0.0'), + cfg.StrOpt('bind_host', default='0.0.0.0'), cfg.IntOpt('bind_port', default=9292), ] @@ -1507,7 +1507,7 @@ class ConfigOpts(collections.Mapping): if ('default' in info or 'override' in info): continue - if self._get(opt.name, group) is None: + if self._get(opt.dest, group) is None: raise RequiredOptError(opt.name, group) def _parse_cli_opts(self, args): diff --git a/nova/openstack/common/gettextutils.py b/nova/openstack/common/gettextutils.py index 235350cc4..d52309e62 100644 --- a/nova/openstack/common/gettextutils.py +++ b/nova/openstack/common/gettextutils.py @@ -20,7 +20,7 @@ gettext for openstack-common modules. Usual usage in an openstack.common module: - from openstack.common.gettextutils import _ + from nova.openstack.common.gettextutils import _ """ import gettext diff --git a/nova/openstack/common/lockutils.py b/nova/openstack/common/lockutils.py index 2840ce6f7..ba390dc69 100644 --- a/nova/openstack/common/lockutils.py +++ b/nova/openstack/common/lockutils.py @@ -24,7 +24,6 @@ import tempfile import time import weakref -from eventlet import greenthread from eventlet import semaphore from nova.openstack.common import cfg diff --git a/nova/openstack/common/notifier/rabbit_notifier.py b/nova/openstack/common/notifier/rabbit_notifier.py index c7b3f54fe..11067fb0a 100644 --- a/nova/openstack/common/notifier/rabbit_notifier.py +++ b/nova/openstack/common/notifier/rabbit_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2012 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -14,33 +14,16 @@ # under the License. -from nova.openstack.common import cfg -from nova.openstack.common import context as req_context from nova.openstack.common.gettextutils import _ from nova.openstack.common import log as logging -from nova.openstack.common import rpc +from nova.openstack.common.notifier import rpc_notifier LOG = logging.getLogger(__name__) -notification_topic_opt = cfg.ListOpt( - 'notification_topics', default=['notifications', ], - help='AMQP topic used for openstack notifications') - -CONF = cfg.CONF -CONF.register_opt(notification_topic_opt) - def notify(context, message): - """Sends a notification to the RabbitMQ""" - if not context: - context = req_context.get_admin_context() - priority = message.get('priority', - CONF.default_notification_level) - priority = priority.lower() - for topic in CONF.notification_topics: - topic = '%s.%s' % (topic, priority) - try: - rpc.notify(context, topic, message) - except Exception, e: - LOG.exception(_("Could not send notification to %(topic)s. " - "Payload=%(message)s"), locals()) + """Deprecated in Grizzly. Please use rpc_notifier instead.""" + + LOG.deprecated(_("The rabbit_notifier is now deprecated." + " Please use rpc_notifier instead.")) + rpc_notifier.notify(context, message) diff --git a/nova/openstack/common/notifier/rpc_notifier.py b/nova/openstack/common/notifier/rpc_notifier.py new file mode 100644 index 000000000..aa9e8860e --- /dev/null +++ b/nova/openstack/common/notifier/rpc_notifier.py @@ -0,0 +1,46 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from nova.openstack.common import cfg +from nova.openstack.common import context as req_context +from nova.openstack.common.gettextutils import _ +from nova.openstack.common import log as logging +from nova.openstack.common import rpc + +LOG = logging.getLogger(__name__) + +notification_topic_opt = cfg.ListOpt( + 'notification_topics', default=['notifications', ], + help='AMQP topic used for openstack notifications') + +CONF = cfg.CONF +CONF.register_opt(notification_topic_opt) + + +def notify(context, message): + """Sends a notification via RPC""" + if not context: + context = req_context.get_admin_context() + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + for topic in CONF.notification_topics: + topic = '%s.%s' % (topic, priority) + try: + rpc.notify(context, topic, message) + except Exception, e: + LOG.exception(_("Could not send notification to %(topic)s. " + "Payload=%(message)s"), locals()) diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py index 46295d90f..bb0ade27c 100644 --- a/nova/openstack/common/rpc/impl_kombu.py +++ b/nova/openstack/common/rpc/impl_kombu.py @@ -409,18 +409,18 @@ class Connection(object): hostname, port = network_utils.parse_host_port( adr, default_port=self.conf.rabbit_port) - params = {} + params = { + 'hostname': hostname, + 'port': port, + 'userid': self.conf.rabbit_userid, + 'password': self.conf.rabbit_password, + 'virtual_host': self.conf.rabbit_virtual_host, + } for sp_key, value in server_params.iteritems(): p_key = server_params_to_kombu_params.get(sp_key, sp_key) params[p_key] = value - params.setdefault('hostname', hostname) - params.setdefault('port', port) - params.setdefault('userid', self.conf.rabbit_userid) - params.setdefault('password', self.conf.rabbit_password) - params.setdefault('virtual_host', self.conf.rabbit_virtual_host) - if self.conf.fake_rabbit: params['transport'] = 'memory' if self.conf.rabbit_use_ssl: diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index 70a03c5bf..b87050753 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -50,24 +50,6 @@ qpid_opts = [ cfg.StrOpt('qpid_sasl_mechanisms', default='', help='Space separated list of SASL mechanisms to use for auth'), - cfg.BoolOpt('qpid_reconnect', - default=True, - help='Automatically reconnect'), - cfg.IntOpt('qpid_reconnect_timeout', - default=0, - help='Reconnection timeout in seconds'), - cfg.IntOpt('qpid_reconnect_limit', - default=0, - help='Max reconnections before giving up'), - cfg.IntOpt('qpid_reconnect_interval_min', - default=0, - help='Minimum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval_max', - default=0, - help='Maximum seconds between reconnection attempts'), - cfg.IntOpt('qpid_reconnect_interval', - default=0, - help='Equivalent to setting max and min to the same value'), cfg.IntOpt('qpid_heartbeat', default=60, help='Seconds between connection keepalive heartbeats'), @@ -294,50 +276,36 @@ class Connection(object): self.consumer_thread = None self.conf = conf - if server_params is None: - server_params = {} - - default_params = dict(hostname=self.conf.qpid_hostname, - port=self.conf.qpid_port, - username=self.conf.qpid_username, - password=self.conf.qpid_password) - - params = server_params - for key in default_params.keys(): - params.setdefault(key, default_params[key]) + params = { + 'hostname': self.conf.qpid_hostname, + 'port': self.conf.qpid_port, + 'username': self.conf.qpid_username, + 'password': self.conf.qpid_password, + } + params.update(server_params or {}) self.broker = params['hostname'] + ":" + str(params['port']) + self.username = params['username'] + self.password = params['password'] + self.connection_create() + self.reconnect() + + def connection_create(self): # Create the connection - this does not open the connection self.connection = qpid.messaging.Connection(self.broker) # Check if flags are set and if so set them for the connection # before we call open - self.connection.username = params['username'] - self.connection.password = params['password'] + self.connection.username = self.username + self.connection.password = self.password + self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms - self.connection.reconnect = self.conf.qpid_reconnect - if self.conf.qpid_reconnect_timeout: - self.connection.reconnect_timeout = ( - self.conf.qpid_reconnect_timeout) - if self.conf.qpid_reconnect_limit: - self.connection.reconnect_limit = self.conf.qpid_reconnect_limit - if self.conf.qpid_reconnect_interval_max: - self.connection.reconnect_interval_max = ( - self.conf.qpid_reconnect_interval_max) - if self.conf.qpid_reconnect_interval_min: - self.connection.reconnect_interval_min = ( - self.conf.qpid_reconnect_interval_min) - if self.conf.qpid_reconnect_interval: - self.connection.reconnect_interval = ( - self.conf.qpid_reconnect_interval) + # Reconnection is done by self.reconnect() + self.connection.reconnect = False self.connection.heartbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay - # Open is part of reconnect - - # NOTE(WGH) not sure we need this with the reconnect flags - self.reconnect() - def _register_consumer(self, consumer): self.consumers[str(consumer.get_receiver())] = consumer @@ -352,12 +320,18 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + delay = 1 while True: try: + self.connection_create() self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: - LOG.error(_('Unable to connect to AMQP server: %s'), e) - time.sleep(self.conf.qpid_reconnect_interval or 1) + msg_dict = dict(e=e, delay=delay) + msg = _("Unable to connect to AMQP server: %(e)s. " + "Sleeping %(delay)s seconds") % msg_dict + LOG.error(msg) + time.sleep(delay) + delay = min(2 * delay, 60) else: break @@ -365,10 +339,14 @@ class Connection(object): self.session = self.connection.session() - for consumer in self.consumers.itervalues(): - consumer.reconnect(self.session) - if self.consumers: + consumers = self.consumers + self.consumers = {} + + for consumer in consumers.itervalues(): + consumer.reconnect(self.session) + self._register_consumer(consumer) + LOG.debug(_("Re-established AMQP queues")) def ensure(self, error_callback, method, *args, **kwargs): diff --git a/nova/openstack/common/rpc/service.py b/nova/openstack/common/rpc/service.py new file mode 100644 index 000000000..15508e432 --- /dev/null +++ b/nova/openstack/common/rpc/service.py @@ -0,0 +1,70 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova.openstack.common.gettextutils import _ +from nova.openstack.common import log as logging +from nova.openstack.common import rpc +from nova.openstack.common.rpc import dispatcher as rpc_dispatcher +from nova.openstack.common import service + + +LOG = logging.getLogger(__name__) + + +class Service(service.Service): + """Service object for binaries running on hosts. + + A service enables rpc by listening to queues based on topic and host.""" + def __init__(self, host, topic, manager=None): + super(Service, self).__init__() + self.host = host + self.topic = topic + if manager is None: + self.manager = self + else: + self.manager = manager + + def start(self): + super(Service, self).start() + + self.conn = rpc.create_connection(new=True) + LOG.debug(_("Creating Consumer connection for Service %s") % + self.topic) + + dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) + + # Share this same connection for these Consumers + self.conn.create_consumer(self.topic, dispatcher, fanout=False) + + node_topic = '%s.%s' % (self.topic, self.host) + self.conn.create_consumer(node_topic, dispatcher, fanout=False) + + self.conn.create_consumer(self.topic, dispatcher, fanout=True) + + # Consume from all consumers in a thread + self.conn.consume_in_thread() + + def stop(self): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.conn.close() + except Exception: + pass + super(Service, self).stop() diff --git a/nova/openstack/common/setup.py b/nova/openstack/common/setup.py index 4e2a57717..e6f72f034 100644 --- a/nova/openstack/common/setup.py +++ b/nova/openstack/common/setup.py @@ -117,8 +117,12 @@ def write_requirements(): def _run_shell_command(cmd): - output = subprocess.Popen(["/bin/sh", "-c", cmd], - stdout=subprocess.PIPE) + if os.name == 'nt': + output = subprocess.Popen(["cmd.exe", "/C", cmd], + stdout=subprocess.PIPE) + else: + output = subprocess.Popen(["/bin/sh", "-c", cmd], + stdout=subprocess.PIPE) out = output.communicate() if len(out) == 0: return None @@ -136,15 +140,17 @@ def _get_git_next_version_suffix(branch_name): _run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*") milestone_cmd = "git show meta/openstack/release:%s" % branch_name milestonever = _run_shell_command(milestone_cmd) - if not milestonever: - milestonever = "" + if milestonever: + first_half = "%s~%s" % (milestonever, datestamp) + else: + first_half = datestamp + post_version = _get_git_post_version() # post version should look like: # 0.1.1.4.gcc9e28a # where the bit after the last . is the short sha, and the bit between # the last and second to last is the revno count (revno, sha) = post_version.split(".")[-2:] - first_half = "%s~%s" % (milestonever, datestamp) second_half = "%s%s.%s" % (revno_prefix, revno, sha) return ".".join((first_half, second_half)) |
