# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (c) 2012 Citrix Systems, Inc. # Copyright 2010 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """ Management class for Pool-related functions (join, eject, etc). """ import urlparse from oslo.config import cfg from nova.compute import rpcapi as compute_rpcapi from nova import exception from nova.openstack.common import jsonutils from nova.openstack.common import log as logging from nova.virt.xenapi import pool_states from nova.virt.xenapi import vm_utils LOG = logging.getLogger(__name__) xenapi_pool_opts = [ cfg.BoolOpt('use_join_force', default=True, help='To use for hosts with different CPUs'), ] CONF = cfg.CONF CONF.register_opts(xenapi_pool_opts) CONF.import_opt('host', 'nova.netconf') class ResourcePool(object): """ Implements resource pool operations. """ def __init__(self, session, virtapi): host_ref = session.get_xenapi_host() host_rec = session.call_xenapi('host.get_record', host_ref) self._host_name = host_rec['hostname'] self._host_addr = host_rec['address'] self._host_uuid = host_rec['uuid'] self._session = session self._virtapi = virtapi self.compute_rpcapi = compute_rpcapi.ComputeAPI() def undo_aggregate_operation(self, context, op, aggregate, host, set_error): """Undo aggregate operation when pool error raised.""" try: if set_error: metadata = {pool_states.KEY: pool_states.ERROR} self._virtapi.aggregate_metadata_add(context, aggregate, metadata) op(context, aggregate, host) except Exception: LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state ' 'during operation on %(host)s'), {'aggregate_id': aggregate['id'], 'host': host}) def add_to_aggregate(self, context, aggregate, host, slave_info=None): """Add a compute host to an aggregate.""" if not pool_states.is_hv_pool(aggregate['metadetails']): return invalid = {pool_states.CHANGING: 'setup in progress', pool_states.DISMISSED: 'aggregate deleted', pool_states.ERROR: 'aggregate in error'} if (aggregate['metadetails'][pool_states.KEY] in invalid.keys()): raise exception.InvalidAggregateAction( action='add host', aggregate_id=aggregate['id'], reason=aggregate['metadetails'][pool_states.KEY]) if (aggregate['metadetails'][pool_states.KEY] == pool_states.CREATED): self._virtapi.aggregate_metadata_add(context, aggregate, {pool_states.KEY: pool_states.CHANGING}) if len(aggregate['hosts']) == 1: # this is the first host of the pool -> make it master self._init_pool(aggregate['id'], aggregate['name']) # save metadata so that we can find the master again metadata = {'master_compute': host, host: self._host_uuid, pool_states.KEY: pool_states.ACTIVE} self._virtapi.aggregate_metadata_add(context, aggregate, metadata) else: # the pool is already up and running, we need to figure out # whether we can serve the request from this host or not. master_compute = aggregate['metadetails']['master_compute'] if master_compute == CONF.host and master_compute != host: # this is the master -> do a pool-join # To this aim, nova compute on the slave has to go down. # NOTE: it is assumed that ONLY nova compute is running now self._join_slave(aggregate['id'], host, slave_info.get('compute_uuid'), slave_info.get('url'), slave_info.get('user'), slave_info.get('passwd')) metadata = {host: slave_info.get('xenhost_uuid'), } self._virtapi.aggregate_metadata_add(context, aggregate, metadata) elif master_compute and master_compute != host: # send rpc cast to master, asking to add the following # host with specified credentials. slave_info = self._create_slave_info() self.compute_rpcapi.add_aggregate_host( context, aggregate, host, master_compute, slave_info) def remove_from_aggregate(self, context, aggregate, host, slave_info=None): """Remove a compute host from an aggregate.""" slave_info = slave_info or dict() if not pool_states.is_hv_pool(aggregate['metadetails']): return invalid = {pool_states.CREATED: 'no hosts to remove', pool_states.CHANGING: 'setup in progress', pool_states.DISMISSED: 'aggregate deleted', } if aggregate['metadetails'][pool_states.KEY] in invalid.keys(): raise exception.InvalidAggregateAction( action='remove host', aggregate_id=aggregate['id'], reason=invalid[aggregate['metadetails'][pool_states.KEY]]) master_compute = aggregate['metadetails']['master_compute'] if master_compute == CONF.host and master_compute != host: # this is the master -> instruct it to eject a host from the pool host_uuid = aggregate['metadetails'][host] self._eject_slave(aggregate['id'], slave_info.get('compute_uuid'), host_uuid) self._virtapi.aggregate_metadata_delete(context, aggregate, host) elif master_compute == host: # Remove master from its own pool -> destroy pool only if the # master is on its own, otherwise raise fault. Destroying a # pool made only by master is fictional if len(aggregate['hosts']) > 1: # NOTE: this could be avoided by doing a master # re-election, but this is simpler for now. raise exception.InvalidAggregateAction( aggregate_id=aggregate['id'], action='remove_from_aggregate', reason=_('Unable to eject %s ' 'from the pool; pool not empty') % host) self._clear_pool(aggregate['id']) for key in ['master_compute', host]: self._virtapi.aggregate_metadata_delete(context, aggregate, key) elif master_compute and master_compute != host: # A master exists -> forward pool-eject request to master slave_info = self._create_slave_info() self.compute_rpcapi.remove_aggregate_host( context, aggregate['id'], host, master_compute, slave_info) else: # this shouldn't have happened raise exception.AggregateError(aggregate_id=aggregate['id'], action='remove_from_aggregate', reason=_('Unable to eject %s ' 'from the pool; No master found') % host) def _join_slave(self, aggregate_id, host, compute_uuid, url, user, passwd): """Joins a slave into a XenServer resource pool.""" try: args = {'compute_uuid': compute_uuid, 'url': url, 'user': user, 'password': passwd, 'force': jsonutils.dumps(CONF.use_join_force), 'master_addr': self._host_addr, 'master_user': CONF.xenapi_connection_username, 'master_pass': CONF.xenapi_connection_password, } self._session.call_plugin('xenhost', 'host_join', args) except self._session.XenAPI.Failure as e: LOG.error(_("Pool-Join failed: %s"), e) raise exception.AggregateError(aggregate_id=aggregate_id, action='add_to_aggregate', reason=_('Unable to join %s ' 'in the pool') % host) def _eject_slave(self, aggregate_id, compute_uuid, host_uuid): """Eject a slave from a XenServer resource pool.""" try: # shutdown nova-compute; if there are other VMs running, e.g. # guest instances, the eject will fail. That's a precaution # to deal with the fact that the admin should evacuate the host # first. The eject wipes out the host completely. vm_ref = self._session.call_xenapi('VM.get_by_uuid', compute_uuid) self._session.call_xenapi("VM.clean_shutdown", vm_ref) host_ref = self._session.call_xenapi('host.get_by_uuid', host_uuid) self._session.call_xenapi("pool.eject", host_ref) except self._session.XenAPI.Failure as e: LOG.error(_("Pool-eject failed: %s"), e) raise exception.AggregateError(aggregate_id=aggregate_id, action='remove_from_aggregate', reason=str(e.details)) def _init_pool(self, aggregate_id, aggregate_name): """Set the name label of a XenServer pool.""" try: pool_ref = self._session.call_xenapi("pool.get_all")[0] self._session.call_xenapi("pool.set_name_label", pool_ref, aggregate_name) except self._session.XenAPI.Failure as e: LOG.error(_("Unable to set up pool: %s."), e) raise exception.AggregateError(aggregate_id=aggregate_id, action='add_to_aggregate', reason=str(e.details)) def _clear_pool(self, aggregate_id): """Clear the name label of a XenServer pool.""" try: pool_ref = self._session.call_xenapi('pool.get_all')[0] self._session.call_xenapi('pool.set_name_label', pool_ref, '') except self._session.XenAPI.Failure as e: LOG.error(_("Pool-set_name_label failed: %s"), e) raise exception.AggregateError(aggregate_id=aggregate_id, action='remove_from_aggregate', reason=str(e.details)) def _create_slave_info(self): """XenServer specific info needed to join the hypervisor pool.""" # replace the address from the xenapi connection url # because this might be 169.254.0.1, i.e. xenapi # NOTE: password in clear is not great, but it'll do for now sender_url = swap_xapi_host( CONF.xenapi_connection_url, self._host_addr) return { "url": sender_url, "user": CONF.xenapi_connection_username, "passwd": CONF.xenapi_connection_password, "compute_uuid": vm_utils.get_this_vm_uuid(), "xenhost_uuid": self._host_uuid, } def swap_xapi_host(url, host_addr): """Replace the XenServer address present in 'url' with 'host_addr'.""" temp_url = urlparse.urlparse(url) _netloc, sep, port = temp_url.netloc.partition(':') return url.replace(temp_url.netloc, '%s%s%s' % (host_addr, sep, port))