diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-02-22 03:22:30 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-02-22 03:22:30 +0000 |
| commit | c4ff7ef07c50deccf3cb7877ecab2245724f3091 (patch) | |
| tree | 0a2266b6c9d162251b213fbb2dfe96ad47ec040f /nova/virt | |
| parent | 11d82fc7d4d17b6edf435633501ddf7a44d6adf5 (diff) | |
| parent | 424f32f04d9c6c97f684782b35e1c25fbf83ce05 (diff) | |
Merge "blueprint host-aggregates: xenapi implementation"
Diffstat (limited to 'nova/virt')
| -rw-r--r-- | nova/virt/driver.py | 7 | ||||
| -rw-r--r-- | nova/virt/xenapi/fake.py | 11 | ||||
| -rw-r--r-- | nova/virt/xenapi/pool.py | 214 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 27 |
4 files changed, 255 insertions, 4 deletions
diff --git a/nova/virt/driver.py b/nova/virt/driver.py index 20c41ca9f..92e0d53dd 100644 --- a/nova/virt/driver.py +++ b/nova/virt/driver.py @@ -654,6 +654,13 @@ class ComputeDriver(object): related to other calls into the driver. The prime example is to clean the cache and remove images which are no longer of interest. """ + + def add_to_aggregate(self, context, aggregate, host, **kwargs): + """Add a compute host to an aggregate.""" + raise NotImplementedError() + + def remove_from_aggregate(self, context, aggregate, host, **kwargs): + """Remove a compute host from an aggregate.""" raise NotImplementedError() def get_volume_connector(self, instance): diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py index d59e78f4f..73f2d863b 100644 --- a/nova/virt/xenapi/fake.py +++ b/nova/virt/xenapi/fake.py @@ -63,7 +63,7 @@ from nova import log as logging from nova import utils -_CLASSES = ['host', 'network', 'session', 'SR', 'VBD', 'pool', +_CLASSES = ['host', 'network', 'session', 'pool', 'SR', 'VBD', 'PBD', 'VDI', 'VIF', 'PIF', 'VM', 'VLAN', 'task'] _db_content = {} @@ -509,6 +509,15 @@ class SessionBase(object): def VM_clean_reboot(self, *args): return 'burp' + def pool_eject(self, session, host_ref): + pass + + def pool_join(self, session, hostname, username, password): + pass + + def pool_set_name_label(self, session, pool_ref, name): + pass + def network_get_all_records_where(self, _1, filter): return self.xenapi.network.get_all_records() diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py new file mode 100644 index 000000000..95f0f3467 --- /dev/null +++ b/nova/virt/xenapi/pool.py @@ -0,0 +1,214 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 Citrix Systems, Inc. +# Copyright 2010 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Management class for Pool-related functions (join, eject, etc). +""" + +import json +import urlparse + +from nova import db +from nova import exception +from nova import flags +from nova import log as logging +from nova import rpc +from nova.compute import aggregate_states +from nova.openstack.common import cfg +from nova.virt.xenapi import vm_utils + +LOG = logging.getLogger("nova.virt.xenapi.pool") + +xenapi_pool_opts = [ + cfg.BoolOpt('use_join_force', + default=True, + help='To use for hosts with different CPUs'), + ] + +FLAGS = flags.FLAGS +FLAGS.register_opts(xenapi_pool_opts) + + +class ResourcePool(object): + """ + Implements resource pool operations. + """ + def __init__(self, session): + self.XenAPI = session.get_imported_xenapi() + host_ref = session.get_xenapi_host() + host_rec = session.call_xenapi('host.get_record', host_ref) + self._host_name = host_rec['hostname'] + self._host_addr = host_rec['address'] + self._host_uuid = host_rec['uuid'] + self._session = session + + def add_to_aggregate(self, context, aggregate, host, **kwargs): + """Add a compute host to an aggregate.""" + if len(aggregate.hosts) == 1: + # this is the first host of the pool -> make it master + self._init_pool(aggregate.id, aggregate.name) + # save metadata so that we can find the master again: + # the password should be encrypted, really. + values = { + 'operational_state': aggregate_states.ACTIVE, + 'metadata': {'master_compute': host}, + } + db.aggregate_update(context, aggregate.id, values) + else: + # the pool is already up and running, we need to figure out + # whether we can serve the request from this host or not. + master_compute = aggregate.metadetails['master_compute'] + if master_compute == FLAGS.host and master_compute != host: + # this is the master -> do a pool-join + # To this aim, nova compute on the slave has to go down. + # NOTE: it is assumed that ONLY nova compute is running now + self._join_slave(aggregate.id, host, + kwargs.get('compute_uuid'), + kwargs.get('url'), kwargs.get('user'), + kwargs.get('passwd')) + metadata = {host: kwargs.get('xenhost_uuid'), } + db.aggregate_metadata_add(context, aggregate.id, metadata) + elif master_compute and master_compute != host: + # send rpc cast to master, asking to add the following + # host with specified credentials. + # NOTE: password in clear is not great, but it'll do for now + forward_request(context, "add_aggregate_host", master_compute, + aggregate.id, host, + self._host_addr, self._host_uuid) + + def remove_from_aggregate(self, context, aggregate, host, **kwargs): + """Remove a compute host from an aggregate.""" + master_compute = aggregate.metadetails.get('master_compute') + if master_compute == FLAGS.host and master_compute != host: + # this is the master -> instruct it to eject a host from the pool + host_uuid = db.aggregate_metadata_get(context, aggregate.id)[host] + self._eject_slave(aggregate.id, + kwargs.get('compute_uuid'), host_uuid) + db.aggregate_metadata_delete(context, aggregate.id, host) + elif master_compute == host: + # Remove master from its own pool -> destroy pool only if the + # master is on its own, otherwise raise fault. Destroying a + # pool made only by master is fictional + if len(aggregate.hosts) > 1: + raise exception.AggregateError( + aggregate_id=aggregate.id, + action='remove_from_aggregate', + reason=_('Unable to eject %(host)s ' + 'from the pool; pool not empty') + % locals()) + self._clear_pool(aggregate.id) + db.aggregate_metadata_delete(context, + aggregate.id, 'master_compute') + elif master_compute and master_compute != host: + # A master exists -> forward pool-eject request to master + forward_request(context, "remove_aggregate_host", master_compute, + aggregate.id, host, + self._host_addr, self._host_uuid) + else: + # this shouldn't have happened + raise exception.AggregateError(aggregate_id=aggregate.id, + action='remove_from_aggregate', + reason=_('Unable to eject %(host)s ' + 'from the pool; No master found') + % locals()) + + def _join_slave(self, aggregate_id, host, compute_uuid, url, user, passwd): + """Joins a slave into a XenServer resource pool.""" + try: + args = {'compute_uuid': compute_uuid, + 'url': url, + 'user': user, + 'password': passwd, + 'force': json.dumps(FLAGS.use_join_force), + 'master_addr': self._host_addr, + 'master_user': FLAGS.xenapi_connection_username, + 'master_pass': FLAGS.xenapi_connection_password, } + task = self._session.async_call_plugin('xenhost', + 'host_join', args) + self._session.wait_for_task(task) + except self.XenAPI.Failure as e: + LOG.error(_("Pool-Join failed: %(e)s") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='add_to_aggregate', + reason=_('Unable to join %(host)s ' + 'in the pool') % locals()) + + def _eject_slave(self, aggregate_id, compute_uuid, host_uuid): + """Eject a slave from a XenServer resource pool.""" + try: + # shutdown nova-compute; if there are other VMs running, e.g. + # guest instances, the eject will fail. That's a precaution + # to deal with the fact that the admin should evacuate the host + # first. The eject wipes out the host completely. + vm_ref = self._session.call_xenapi('VM.get_by_uuid', compute_uuid) + self._session.call_xenapi("VM.clean_shutdown", vm_ref) + + host_ref = self._session.call_xenapi('host.get_by_uuid', host_uuid) + self._session.call_xenapi("pool.eject", host_ref) + except self.XenAPI.Failure as e: + LOG.error(_("Pool-eject failed: %(e)s") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='remove_from_aggregate', + reason=str(e.details)) + + def _init_pool(self, aggregate_id, aggregate_name): + """Set the name label of a XenServer pool.""" + try: + pool_ref = self._session.call_xenapi("pool.get_all")[0] + self._session.call_xenapi("pool.set_name_label", + pool_ref, aggregate_name) + except self.XenAPI.Failure as e: + LOG.error(_("Unable to set up pool: %(e)s.") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='add_to_aggregate', + reason=str(e.details)) + + def _clear_pool(self, aggregate_id): + """Clear the name label of a XenServer pool.""" + try: + pool_ref = self._session.call_xenapi('pool.get_all')[0] + self._session.call_xenapi('pool.set_name_label', pool_ref, '') + except self.XenAPI.Failure as e: + LOG.error(_("Pool-set_name_label failed: %(e)s") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='remove_from_aggregate', + reason=str(e.details)) + + +def forward_request(context, request_type, master, aggregate_id, + slave_compute, slave_address, slave_uuid): + """Casts add/remove requests to the pool master.""" + # replace the address from the xenapi connection url + # because this might be 169.254.0.1, i.e. xenapi + sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address) + rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master), + {"method": request_type, + "args": {"aggregate_id": aggregate_id, + "host": slave_compute, + "url": sender_url, + "user": FLAGS.xenapi_connection_username, + "passwd": FLAGS.xenapi_connection_password, + "compute_uuid": vm_utils.get_this_vm_uuid(), + "xenhost_uuid": slave_uuid, }, + }) + + +def swap_xapi_host(url, host_addr): + """Replace the XenServer address present in 'url' with 'host_addr'.""" + temp_url = urlparse.urlparse(url) + _, sep, port = temp_url.netloc.partition(':') + return url.replace(temp_url.netloc, '%s%s%s' % (host_addr, sep, port)) diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index 08542268c..c9363bb74 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -78,6 +78,7 @@ from nova import flags from nova import log as logging from nova.openstack.common import cfg from nova.virt import driver +from nova.virt.xenapi import pool from nova.virt.xenapi import vm_utils from nova.virt.xenapi.vmops import VMOps from nova.virt.xenapi.volumeops import VolumeOps @@ -180,6 +181,7 @@ class XenAPIConnection(driver.ComputeDriver): self._product_version = self._session.get_product_version() self._vmops = VMOps(self._session, self._product_version) self._initiator = None + self._pool = pool.ResourcePool(self._session) @property def host_state(self): @@ -488,6 +490,15 @@ class XenAPIConnection(driver.ComputeDriver): """Sets the specified host's ability to accept new instances.""" return self._vmops.set_host_enabled(host, enabled) + def add_to_aggregate(self, context, aggregate, host, **kwargs): + """Add a compute host to an aggregate.""" + return self._pool.add_to_aggregate(context, aggregate, host, **kwargs) + + def remove_from_aggregate(self, context, aggregate, host, **kwargs): + """Remove a compute host from an aggregate.""" + return self._pool.remove_from_aggregate(context, + aggregate, host, **kwargs) + class XenAPISession(object): """The session to invoke XenAPI SDK calls""" @@ -498,9 +509,19 @@ class XenAPISession(object): exception = self.XenAPI.Failure(_("Unable to log in to XenAPI " "(is the Dom0 disk full?)")) for i in xrange(FLAGS.xenapi_connection_concurrent): - session = self._create_session(url) - with timeout.Timeout(FLAGS.xenapi_login_timeout, exception): - session.login_with_password(user, pw) + try: + session = self._create_session(url) + with timeout.Timeout(FLAGS.xenapi_login_timeout, exception): + session.login_with_password(user, pw) + except self.XenAPI.Failure, e: + # if user and pw of the master are different, we're doomed! + if e.details[0] == 'HOST_IS_SLAVE': + master = e.details[1] + session = self.XenAPI.Session(pool.swap_xapi_host(url, + master)) + session.login_with_password(user, pw) + else: + raise self._sessions.put(session) def get_product_version(self): |
