summaryrefslogtreecommitdiffstats
path: root/nova/virt
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-02-22 03:22:30 +0000
committerGerrit Code Review <review@openstack.org>2012-02-22 03:22:30 +0000
commitc4ff7ef07c50deccf3cb7877ecab2245724f3091 (patch)
tree0a2266b6c9d162251b213fbb2dfe96ad47ec040f /nova/virt
parent11d82fc7d4d17b6edf435633501ddf7a44d6adf5 (diff)
parent424f32f04d9c6c97f684782b35e1c25fbf83ce05 (diff)
Merge "blueprint host-aggregates: xenapi implementation"
Diffstat (limited to 'nova/virt')
-rw-r--r--nova/virt/driver.py7
-rw-r--r--nova/virt/xenapi/fake.py11
-rw-r--r--nova/virt/xenapi/pool.py214
-rw-r--r--nova/virt/xenapi_conn.py27
4 files changed, 255 insertions, 4 deletions
diff --git a/nova/virt/driver.py b/nova/virt/driver.py
index 20c41ca9f..92e0d53dd 100644
--- a/nova/virt/driver.py
+++ b/nova/virt/driver.py
@@ -654,6 +654,13 @@ class ComputeDriver(object):
related to other calls into the driver. The prime example is to clean
the cache and remove images which are no longer of interest.
"""
+
+ def add_to_aggregate(self, context, aggregate, host, **kwargs):
+ """Add a compute host to an aggregate."""
+ raise NotImplementedError()
+
+ def remove_from_aggregate(self, context, aggregate, host, **kwargs):
+ """Remove a compute host from an aggregate."""
raise NotImplementedError()
def get_volume_connector(self, instance):
diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py
index d59e78f4f..73f2d863b 100644
--- a/nova/virt/xenapi/fake.py
+++ b/nova/virt/xenapi/fake.py
@@ -63,7 +63,7 @@ from nova import log as logging
from nova import utils
-_CLASSES = ['host', 'network', 'session', 'SR', 'VBD', 'pool',
+_CLASSES = ['host', 'network', 'session', 'pool', 'SR', 'VBD',
'PBD', 'VDI', 'VIF', 'PIF', 'VM', 'VLAN', 'task']
_db_content = {}
@@ -509,6 +509,15 @@ class SessionBase(object):
def VM_clean_reboot(self, *args):
return 'burp'
+ def pool_eject(self, session, host_ref):
+ pass
+
+ def pool_join(self, session, hostname, username, password):
+ pass
+
+ def pool_set_name_label(self, session, pool_ref, name):
+ pass
+
def network_get_all_records_where(self, _1, filter):
return self.xenapi.network.get_all_records()
diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py
new file mode 100644
index 000000000..95f0f3467
--- /dev/null
+++ b/nova/virt/xenapi/pool.py
@@ -0,0 +1,214 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Citrix Systems, Inc.
+# Copyright 2010 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Management class for Pool-related functions (join, eject, etc).
+"""
+
+import json
+import urlparse
+
+from nova import db
+from nova import exception
+from nova import flags
+from nova import log as logging
+from nova import rpc
+from nova.compute import aggregate_states
+from nova.openstack.common import cfg
+from nova.virt.xenapi import vm_utils
+
+LOG = logging.getLogger("nova.virt.xenapi.pool")
+
+xenapi_pool_opts = [
+ cfg.BoolOpt('use_join_force',
+ default=True,
+ help='To use for hosts with different CPUs'),
+ ]
+
+FLAGS = flags.FLAGS
+FLAGS.register_opts(xenapi_pool_opts)
+
+
+class ResourcePool(object):
+ """
+ Implements resource pool operations.
+ """
+ def __init__(self, session):
+ self.XenAPI = session.get_imported_xenapi()
+ host_ref = session.get_xenapi_host()
+ host_rec = session.call_xenapi('host.get_record', host_ref)
+ self._host_name = host_rec['hostname']
+ self._host_addr = host_rec['address']
+ self._host_uuid = host_rec['uuid']
+ self._session = session
+
+ def add_to_aggregate(self, context, aggregate, host, **kwargs):
+ """Add a compute host to an aggregate."""
+ if len(aggregate.hosts) == 1:
+ # this is the first host of the pool -> make it master
+ self._init_pool(aggregate.id, aggregate.name)
+ # save metadata so that we can find the master again:
+ # the password should be encrypted, really.
+ values = {
+ 'operational_state': aggregate_states.ACTIVE,
+ 'metadata': {'master_compute': host},
+ }
+ db.aggregate_update(context, aggregate.id, values)
+ else:
+ # the pool is already up and running, we need to figure out
+ # whether we can serve the request from this host or not.
+ master_compute = aggregate.metadetails['master_compute']
+ if master_compute == FLAGS.host and master_compute != host:
+ # this is the master -> do a pool-join
+ # To this aim, nova compute on the slave has to go down.
+ # NOTE: it is assumed that ONLY nova compute is running now
+ self._join_slave(aggregate.id, host,
+ kwargs.get('compute_uuid'),
+ kwargs.get('url'), kwargs.get('user'),
+ kwargs.get('passwd'))
+ metadata = {host: kwargs.get('xenhost_uuid'), }
+ db.aggregate_metadata_add(context, aggregate.id, metadata)
+ elif master_compute and master_compute != host:
+ # send rpc cast to master, asking to add the following
+ # host with specified credentials.
+ # NOTE: password in clear is not great, but it'll do for now
+ forward_request(context, "add_aggregate_host", master_compute,
+ aggregate.id, host,
+ self._host_addr, self._host_uuid)
+
+ def remove_from_aggregate(self, context, aggregate, host, **kwargs):
+ """Remove a compute host from an aggregate."""
+ master_compute = aggregate.metadetails.get('master_compute')
+ if master_compute == FLAGS.host and master_compute != host:
+ # this is the master -> instruct it to eject a host from the pool
+ host_uuid = db.aggregate_metadata_get(context, aggregate.id)[host]
+ self._eject_slave(aggregate.id,
+ kwargs.get('compute_uuid'), host_uuid)
+ db.aggregate_metadata_delete(context, aggregate.id, host)
+ elif master_compute == host:
+ # Remove master from its own pool -> destroy pool only if the
+ # master is on its own, otherwise raise fault. Destroying a
+ # pool made only by master is fictional
+ if len(aggregate.hosts) > 1:
+ raise exception.AggregateError(
+ aggregate_id=aggregate.id,
+ action='remove_from_aggregate',
+ reason=_('Unable to eject %(host)s '
+ 'from the pool; pool not empty')
+ % locals())
+ self._clear_pool(aggregate.id)
+ db.aggregate_metadata_delete(context,
+ aggregate.id, 'master_compute')
+ elif master_compute and master_compute != host:
+ # A master exists -> forward pool-eject request to master
+ forward_request(context, "remove_aggregate_host", master_compute,
+ aggregate.id, host,
+ self._host_addr, self._host_uuid)
+ else:
+ # this shouldn't have happened
+ raise exception.AggregateError(aggregate_id=aggregate.id,
+ action='remove_from_aggregate',
+ reason=_('Unable to eject %(host)s '
+ 'from the pool; No master found')
+ % locals())
+
+ def _join_slave(self, aggregate_id, host, compute_uuid, url, user, passwd):
+ """Joins a slave into a XenServer resource pool."""
+ try:
+ args = {'compute_uuid': compute_uuid,
+ 'url': url,
+ 'user': user,
+ 'password': passwd,
+ 'force': json.dumps(FLAGS.use_join_force),
+ 'master_addr': self._host_addr,
+ 'master_user': FLAGS.xenapi_connection_username,
+ 'master_pass': FLAGS.xenapi_connection_password, }
+ task = self._session.async_call_plugin('xenhost',
+ 'host_join', args)
+ self._session.wait_for_task(task)
+ except self.XenAPI.Failure as e:
+ LOG.error(_("Pool-Join failed: %(e)s") % locals())
+ raise exception.AggregateError(aggregate_id=aggregate_id,
+ action='add_to_aggregate',
+ reason=_('Unable to join %(host)s '
+ 'in the pool') % locals())
+
+ def _eject_slave(self, aggregate_id, compute_uuid, host_uuid):
+ """Eject a slave from a XenServer resource pool."""
+ try:
+ # shutdown nova-compute; if there are other VMs running, e.g.
+ # guest instances, the eject will fail. That's a precaution
+ # to deal with the fact that the admin should evacuate the host
+ # first. The eject wipes out the host completely.
+ vm_ref = self._session.call_xenapi('VM.get_by_uuid', compute_uuid)
+ self._session.call_xenapi("VM.clean_shutdown", vm_ref)
+
+ host_ref = self._session.call_xenapi('host.get_by_uuid', host_uuid)
+ self._session.call_xenapi("pool.eject", host_ref)
+ except self.XenAPI.Failure as e:
+ LOG.error(_("Pool-eject failed: %(e)s") % locals())
+ raise exception.AggregateError(aggregate_id=aggregate_id,
+ action='remove_from_aggregate',
+ reason=str(e.details))
+
+ def _init_pool(self, aggregate_id, aggregate_name):
+ """Set the name label of a XenServer pool."""
+ try:
+ pool_ref = self._session.call_xenapi("pool.get_all")[0]
+ self._session.call_xenapi("pool.set_name_label",
+ pool_ref, aggregate_name)
+ except self.XenAPI.Failure as e:
+ LOG.error(_("Unable to set up pool: %(e)s.") % locals())
+ raise exception.AggregateError(aggregate_id=aggregate_id,
+ action='add_to_aggregate',
+ reason=str(e.details))
+
+ def _clear_pool(self, aggregate_id):
+ """Clear the name label of a XenServer pool."""
+ try:
+ pool_ref = self._session.call_xenapi('pool.get_all')[0]
+ self._session.call_xenapi('pool.set_name_label', pool_ref, '')
+ except self.XenAPI.Failure as e:
+ LOG.error(_("Pool-set_name_label failed: %(e)s") % locals())
+ raise exception.AggregateError(aggregate_id=aggregate_id,
+ action='remove_from_aggregate',
+ reason=str(e.details))
+
+
+def forward_request(context, request_type, master, aggregate_id,
+ slave_compute, slave_address, slave_uuid):
+ """Casts add/remove requests to the pool master."""
+ # replace the address from the xenapi connection url
+ # because this might be 169.254.0.1, i.e. xenapi
+ sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address)
+ rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master),
+ {"method": request_type,
+ "args": {"aggregate_id": aggregate_id,
+ "host": slave_compute,
+ "url": sender_url,
+ "user": FLAGS.xenapi_connection_username,
+ "passwd": FLAGS.xenapi_connection_password,
+ "compute_uuid": vm_utils.get_this_vm_uuid(),
+ "xenhost_uuid": slave_uuid, },
+ })
+
+
+def swap_xapi_host(url, host_addr):
+ """Replace the XenServer address present in 'url' with 'host_addr'."""
+ temp_url = urlparse.urlparse(url)
+ _, sep, port = temp_url.netloc.partition(':')
+ return url.replace(temp_url.netloc, '%s%s%s' % (host_addr, sep, port))
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
index 08542268c..c9363bb74 100644
--- a/nova/virt/xenapi_conn.py
+++ b/nova/virt/xenapi_conn.py
@@ -78,6 +78,7 @@ from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
from nova.virt import driver
+from nova.virt.xenapi import pool
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi.vmops import VMOps
from nova.virt.xenapi.volumeops import VolumeOps
@@ -180,6 +181,7 @@ class XenAPIConnection(driver.ComputeDriver):
self._product_version = self._session.get_product_version()
self._vmops = VMOps(self._session, self._product_version)
self._initiator = None
+ self._pool = pool.ResourcePool(self._session)
@property
def host_state(self):
@@ -488,6 +490,15 @@ class XenAPIConnection(driver.ComputeDriver):
"""Sets the specified host's ability to accept new instances."""
return self._vmops.set_host_enabled(host, enabled)
+ def add_to_aggregate(self, context, aggregate, host, **kwargs):
+ """Add a compute host to an aggregate."""
+ return self._pool.add_to_aggregate(context, aggregate, host, **kwargs)
+
+ def remove_from_aggregate(self, context, aggregate, host, **kwargs):
+ """Remove a compute host from an aggregate."""
+ return self._pool.remove_from_aggregate(context,
+ aggregate, host, **kwargs)
+
class XenAPISession(object):
"""The session to invoke XenAPI SDK calls"""
@@ -498,9 +509,19 @@ class XenAPISession(object):
exception = self.XenAPI.Failure(_("Unable to log in to XenAPI "
"(is the Dom0 disk full?)"))
for i in xrange(FLAGS.xenapi_connection_concurrent):
- session = self._create_session(url)
- with timeout.Timeout(FLAGS.xenapi_login_timeout, exception):
- session.login_with_password(user, pw)
+ try:
+ session = self._create_session(url)
+ with timeout.Timeout(FLAGS.xenapi_login_timeout, exception):
+ session.login_with_password(user, pw)
+ except self.XenAPI.Failure, e:
+ # if user and pw of the master are different, we're doomed!
+ if e.details[0] == 'HOST_IS_SLAVE':
+ master = e.details[1]
+ session = self.XenAPI.Session(pool.swap_xapi_host(url,
+ master))
+ session.login_with_password(user, pw)
+ else:
+ raise
self._sessions.put(session)
def get_product_version(self):