summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-02-22 03:22:30 +0000
committerGerrit Code Review <review@openstack.org>2012-02-22 03:22:30 +0000
commitc4ff7ef07c50deccf3cb7877ecab2245724f3091 (patch)
tree0a2266b6c9d162251b213fbb2dfe96ad47ec040f
parent11d82fc7d4d17b6edf435633501ddf7a44d6adf5 (diff)
parent424f32f04d9c6c97f684782b35e1c25fbf83ce05 (diff)
Merge "blueprint host-aggregates: xenapi implementation"
-rw-r--r--nova/api/openstack/compute/contrib/aggregates.py57
-rw-r--r--nova/compute/api.py19
-rw-r--r--nova/compute/manager.py36
-rw-r--r--nova/db/sqlalchemy/api.py9
-rw-r--r--nova/db/sqlalchemy/models.py103
-rw-r--r--nova/exception.py5
-rw-r--r--nova/tests/test_compute.py108
-rw-r--r--nova/tests/test_db_api.py56
-rw-r--r--nova/tests/test_virt_drivers.py8
-rw-r--r--nova/tests/test_xenapi.py147
-rw-r--r--nova/virt/driver.py7
-rw-r--r--nova/virt/xenapi/fake.py11
-rw-r--r--nova/virt/xenapi/pool.py214
-rw-r--r--nova/virt/xenapi_conn.py27
-rw-r--r--plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost54
15 files changed, 749 insertions, 112 deletions
diff --git a/nova/api/openstack/compute/contrib/aggregates.py b/nova/api/openstack/compute/contrib/aggregates.py
index 6206f42c4..ea90a57d7 100644
--- a/nova/api/openstack/compute/contrib/aggregates.py
+++ b/nova/api/openstack/compute/contrib/aggregates.py
@@ -63,17 +63,18 @@ class AggregateController(object):
raise exc.HTTPBadRequest
try:
host_aggregate = body["aggregate"]
- aggregate_name = host_aggregate["name"]
- availability_zone = host_aggregate["availability_zone"]
+ name = host_aggregate["name"]
+ avail_zone = host_aggregate["availability_zone"]
except KeyError:
raise exc.HTTPBadRequest
if len(host_aggregate) != 2:
raise exc.HTTPBadRequest
try:
- aggregate = self.api.create_aggregate(context, aggregate_name,
- availability_zone)
+ aggregate = self.api.create_aggregate(context, name, avail_zone)
except exception.AggregateNameExists:
+ LOG.exception(_("Cannot create aggregate with name %(name)s and "
+ "availability zone %(avail_zone)s") % locals())
raise exc.HTTPConflict
return self._marshall_aggregate(aggregate)
@@ -84,6 +85,7 @@ class AggregateController(object):
try:
aggregate = self.api.get_aggregate(context, id)
except exception.AggregateNotFound:
+ LOG.exception(_("Cannot show aggregate: %(id)s") % locals())
raise exc.HTTPNotFound
return self._marshall_aggregate(aggregate)
@@ -91,7 +93,6 @@ class AggregateController(object):
"""Updates the name and/or availbility_zone of given aggregate."""
context = _get_context(req)
authorize(context)
- aggregate = id
if len(body) != 1:
raise exc.HTTPBadRequest
@@ -108,8 +109,9 @@ class AggregateController(object):
raise exc.HTTPBadRequest
try:
- aggregate = self.api.update_aggregate(context, aggregate, updates)
+ aggregate = self.api.update_aggregate(context, id, updates)
except exception.AggregateNotFound:
+ LOG.exception(_("Cannot update aggregate: %(id)s") % locals())
raise exc.HTTPNotFound
return self._marshall_aggregate(aggregate)
@@ -118,10 +120,10 @@ class AggregateController(object):
"""Removes an aggregate by id."""
context = _get_context(req)
authorize(context)
- aggregate_id = id
try:
- self.api.delete_aggregate(context, aggregate_id)
+ self.api.delete_aggregate(context, id)
except exception.AggregateNotFound:
+ LOG.exception(_("Cannot delete aggregate: %(id)s") % locals())
raise exc.HTTPNotFound
def action(self, req, id, body):
@@ -144,19 +146,17 @@ class AggregateController(object):
"""Adds a host to the specified aggregate."""
context = _get_context(req)
authorize(context)
- aggregate = id
try:
- aggregate = self.api.add_host_to_aggregate(context,
- aggregate, host)
- except exception.AggregateNotFound:
- raise exc.HTTPNotFound
- except exception.ComputeHostNotFound:
+ aggregate = self.api.add_host_to_aggregate(context, id, host)
+ except (exception.AggregateNotFound, exception.ComputeHostNotFound):
+ LOG.exception(_("Cannot add host %(host)s in aggregate "
+ "%(id)s") % locals())
raise exc.HTTPNotFound
- except exception.AggregateHostConflict:
- raise exc.HTTPConflict
- except exception.AggregateHostExists:
- raise exc.HTTPConflict
- except exception.InvalidAggregateAction:
+ except (exception.AggregateHostConflict,
+ exception.AggregateHostExists,
+ exception.InvalidAggregateAction):
+ LOG.exception(_("Cannot add host %(host)s in aggregate "
+ "%(id)s") % locals())
raise exc.HTTPConflict
return self._marshall_aggregate(aggregate)
@@ -165,15 +165,15 @@ class AggregateController(object):
"""Removes a host from the specified aggregate."""
context = _get_context(req)
authorize(context)
- aggregate = id
try:
- aggregate = self.api.remove_host_from_aggregate(context,
- aggregate, host)
- except exception.AggregateNotFound:
- raise exc.HTTPNotFound
- except exception.AggregateHostNotFound:
+ aggregate = self.api.remove_host_from_aggregate(context, id, host)
+ except (exception.AggregateNotFound, exception.AggregateHostNotFound):
+ LOG.exception(_("Cannot remove host %(host)s in aggregate "
+ "%(id)s") % locals())
raise exc.HTTPNotFound
except exception.InvalidAggregateAction:
+ LOG.exception(_("Cannot remove host %(host)s in aggregate "
+ "%(id)s") % locals())
raise exc.HTTPConflict
return self._marshall_aggregate(aggregate)
@@ -181,20 +181,19 @@ class AggregateController(object):
"""Replaces the aggregate's existing metadata with new metadata."""
context = _get_context(req)
authorize(context)
- aggregate = id
if len(body) != 1:
raise exc.HTTPBadRequest
-
try:
metadata = body["metadata"]
except KeyError:
raise exc.HTTPBadRequest
-
try:
aggregate = self.api.update_aggregate_metadata(context,
- aggregate, metadata)
+ id, metadata)
except exception.AggregateNotFound:
+ LOG.exception(_("Cannot set metadata %(metadata)s in aggregate "
+ "%(id)s") % locals())
raise exc.HTTPNotFound
return self._marshall_aggregate(aggregate)
diff --git a/nova/compute/api.py b/nova/compute/api.py
index edadf1978..b908a6587 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -1706,10 +1706,18 @@ class AggregateAPI(base.Base):
def create_aggregate(self, context, aggregate_name, availability_zone):
"""Creates the model for the aggregate."""
- values = {"name": aggregate_name,
- "availability_zone": availability_zone}
- aggregate = self.db.aggregate_create(context, values)
- return dict(aggregate.iteritems())
+ zones = [s.availability_zone for s in
+ self.db.service_get_all_by_topic(context,
+ FLAGS.compute_topic)]
+ if availability_zone in zones:
+ values = {"name": aggregate_name,
+ "availability_zone": availability_zone}
+ aggregate = self.db.aggregate_create(context, values)
+ return dict(aggregate.iteritems())
+ else:
+ raise exception.InvalidAggregateAction(action='create_aggregate',
+ aggregate_id="'N/A'",
+ reason='invalid zone')
def get_aggregate(self, context, aggregate_id):
"""Get an aggregate by id."""
@@ -1805,7 +1813,8 @@ class AggregateAPI(base.Base):
"host": host}, })
return self.get_aggregate(context, aggregate_id)
else:
- invalid = {aggregate_states.CHANGING: 'setup in progress',
+ invalid = {aggregate_states.CREATED: 'no hosts to remove',
+ aggregate_states.CHANGING: 'setup in progress',
aggregate_states.DISMISSED: 'aggregate deleted', }
if aggregate.operational_state in invalid.keys():
raise exception.InvalidAggregateAction(
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index f31a0d2e4..ac9bebafe 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -46,6 +46,7 @@ from eventlet import greenthread
from nova import block_device
import nova.context
+from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import power_state
from nova.compute import task_states
@@ -2320,13 +2321,40 @@ class ComputeManager(manager.SchedulerDependentManager):
LOG.error(msg % error)
self._set_instance_error_state(context, instance_uuid)
- def add_aggregate_host(self, context, aggregate_id, host):
+ @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
+ def add_aggregate_host(self, context, aggregate_id, host, **kwargs):
"""Adds a host to a physical hypervisor pool."""
- raise NotImplementedError()
+ aggregate = self.db.aggregate_get(context, aggregate_id)
+ try:
+ self.driver.add_to_aggregate(context, aggregate, host, **kwargs)
+ except exception.AggregateError:
+ error = sys.exc_info()
+ self._undo_aggregate_operation(context,
+ self.db.aggregate_host_delete,
+ aggregate.id, host)
+ raise error[0], error[1], error[2]
- def remove_aggregate_host(self, context, aggregate_id, host):
+ @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
+ def remove_aggregate_host(self, context, aggregate_id, host, **kwargs):
"""Removes a host from a physical hypervisor pool."""
- raise NotImplementedError()
+ aggregate = self.db.aggregate_get(context, aggregate_id)
+ try:
+ self.driver.remove_from_aggregate(context,
+ aggregate, host, **kwargs)
+ except exception.AggregateError:
+ error = sys.exc_info()
+ self._undo_aggregate_operation(context, self.db.aggregate_host_add,
+ aggregate.id, host)
+ raise error[0], error[1], error[2]
+
+ def _undo_aggregate_operation(self, context, op, aggregate_id, host):
+ try:
+ status = {'operational_state': aggregate_states.ERROR}
+ self.db.aggregate_update(context, aggregate_id, status)
+ op(context, aggregate_id, host)
+ except Exception:
+ LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state '
+ 'during operation on %(host)s') % locals())
@manager.periodic_task(
ticks_between_runs=FLAGS.image_cache_manager_interval)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 6fdba1642..026fe2f28 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -4273,8 +4273,8 @@ def _aggregate_get_query(context, model_class, id_field, id,
def aggregate_create(context, values, metadata=None):
try:
aggregate = models.Aggregate()
+ values.setdefault('operational_state', aggregate_states.CREATED)
aggregate.update(values)
- aggregate.operational_state = aggregate_states.CREATED
aggregate.save()
except exception.DBError:
raise exception.AggregateNameExists(aggregate_name=values['name'])
@@ -4409,8 +4409,7 @@ def aggregate_metadata_add(context, aggregate_id, metadata, set_delete=False):
meta_ref = aggregate_metadata_get_item(context, aggregate_id,
meta_key, session)
if meta_ref.deleted:
- item.update({'deleted': False, 'deleted_at': None,
- 'updated_at': literal_column('updated_at')})
+ item.update({'deleted': False, 'deleted_at': None})
except exception.AggregateMetadataNotFound:
meta_ref = models.AggregateMetadata()
item.update({"key": meta_key, "aggregate_id": aggregate_id})
@@ -4469,9 +4468,7 @@ def aggregate_host_add(context, aggregate_id, host):
except exception.DBError:
raise exception.AggregateHostConflict(host=host)
elif host_ref.deleted:
- host_ref.update({'deleted': False,
- 'deleted_at': None,
- 'updated_at': literal_column('updated_at')})
+ host_ref.update({'deleted': False, 'deleted_at': None})
host_ref.save(session=session)
else:
raise exception.AggregateHostExists(host=host,
diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py
index c99d9489b..33438444f 100644
--- a/nova/db/sqlalchemy/models.py
+++ b/nova/db/sqlalchemy/models.py
@@ -885,26 +885,12 @@ class Zone(BASE, NovaBase):
rpc_virtual_host = Column(String(255))
-class Aggregate(BASE, NovaBase):
- """Represents a cluster of hosts that exists in this zone."""
- __tablename__ = 'aggregates'
- id = Column(Integer, primary_key=True, autoincrement=True)
- name = Column(String(255), unique=True)
- operational_state = Column(String(255), nullable=False)
- availability_zone = Column(String(255), nullable=False)
-
-
class AggregateHost(BASE, NovaBase):
"""Represents a host that is member of an aggregate."""
__tablename__ = 'aggregate_hosts'
id = Column(Integer, primary_key=True, autoincrement=True)
host = Column(String(255), unique=True)
aggregate_id = Column(Integer, ForeignKey('aggregates.id'), nullable=False)
- aggregate = relationship(Aggregate, backref=backref('aggregates'),
- foreign_keys=aggregate_id,
- primaryjoin='and_('
- 'AggregateHost.aggregate_id == Aggregate.id,'
- 'AggregateHost.deleted == False)')
class AggregateMetadata(BASE, NovaBase):
@@ -914,11 +900,46 @@ class AggregateMetadata(BASE, NovaBase):
key = Column(String(255), nullable=False)
value = Column(String(255), nullable=False)
aggregate_id = Column(Integer, ForeignKey('aggregates.id'), nullable=False)
- aggregate = relationship(Aggregate, backref="metadata",
- foreign_keys=aggregate_id,
- primaryjoin='and_('
- 'AggregateMetadata.aggregate_id == Aggregate.id,'
- 'AggregateMetadata.deleted == False)')
+
+
+class Aggregate(BASE, NovaBase):
+ """Represents a cluster of hosts that exists in this zone."""
+ __tablename__ = 'aggregates'
+ id = Column(Integer, primary_key=True, autoincrement=True)
+ name = Column(String(255), unique=True)
+ operational_state = Column(String(255), nullable=False)
+ availability_zone = Column(String(255), nullable=False)
+ _hosts = relationship(AggregateHost,
+ secondary="aggregate_hosts",
+ primaryjoin='and_('
+ 'Aggregate.id == AggregateHost.aggregate_id,'
+ 'AggregateHost.deleted == False,'
+ 'Aggregate.deleted == False)',
+ secondaryjoin='and_('
+ 'AggregateHost.aggregate_id == Aggregate.id, '
+ 'AggregateHost.deleted == False,'
+ 'Aggregate.deleted == False)',
+ backref='aggregates')
+
+ _metadata = relationship(AggregateMetadata,
+ secondary="aggregate_metadata",
+ primaryjoin='and_('
+ 'Aggregate.id == AggregateMetadata.aggregate_id,'
+ 'AggregateMetadata.deleted == False,'
+ 'Aggregate.deleted == False)',
+ secondaryjoin='and_('
+ 'AggregateMetadata.aggregate_id == Aggregate.id, '
+ 'AggregateMetadata.deleted == False,'
+ 'Aggregate.deleted == False)',
+ backref='aggregates')
+
+ @property
+ def hosts(self):
+ return [h.host for h in self._hosts]
+
+ @property
+ def metadetails(self):
+ return dict([(m.key, m.value) for m in self._metadata])
class AgentBuild(BASE, NovaBase):
@@ -997,15 +1018,41 @@ def register_models():
connection is lost and needs to be reestablished.
"""
from sqlalchemy import create_engine
- models = (Service, Instance, InstanceActions, InstanceTypes,
- Volume, IscsiTarget, FixedIp, FloatingIp,
- Network, SecurityGroup, SecurityGroupIngressRule,
- SecurityGroupInstanceAssociation, AuthToken, User,
- Project, Certificate, ConsolePool, Console, Zone,
- VolumeMetadata, VolumeTypes, VolumeTypeExtraSpecs,
- AgentBuild, InstanceMetadata, InstanceTypeExtraSpecs, Migration,
- VirtualStorageArray, SMFlavors, SMBackendConf, SMVolume,
- InstanceFault)
+ models = (AgentBuild,
+ Aggregate,
+ AggregateHost,
+ AggregateMetadata,
+ AuthToken,
+ Certificate,
+ Console,
+ ConsolePool,
+ FixedIp,
+ FloatingIp,
+ Instance,
+ InstanceActions,
+ InstanceFault,
+ InstanceMetadata,
+ InstanceTypeExtraSpecs,
+ InstanceTypes,
+ IscsiTarget,
+ Migration,
+ Network,
+ Project,
+ SecurityGroup,
+ SecurityGroupIngressRule,
+ SecurityGroupInstanceAssociation,
+ Service,
+ SMBackendConf,
+ SMFlavors,
+ SMVolume,
+ User,
+ VirtualStorageArray,
+ Volume,
+ VolumeMetadata,
+ VolumeTypeExtraSpecs,
+ VolumeTypes,
+ Zone,
+ )
engine = create_engine(FLAGS.sql_connection, echo=False)
for model in models:
model.metadata.create_all(engine)
diff --git a/nova/exception.py b/nova/exception.py
index 518a95a62..91a377c1e 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -929,6 +929,11 @@ class QuotaError(NovaException):
message = _("Quota exceeded") + ": code=%(code)s"
+class AggregateError(NovaException):
+ message = _("Aggregate %(aggregate_id)s: action '%(action)s' "
+ "caused an error: %(reason)s.")
+
+
class AggregateNotFound(NotFound):
message = _("Aggregate %(aggregate_id)s could not be found.")
diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py
index d6d54227f..5ee74df8e 100644
--- a/nova/tests/test_compute.py
+++ b/nova/tests/test_compute.py
@@ -3153,6 +3153,8 @@ def _create_service_entries(context, values={'avail_zone1': ['fake_host1',
class ComputeAPIAggrTestCase(test.TestCase):
+ """This is for unit coverage of aggregate-related methods
+ defined in nova.compute.api."""
def setUp(self):
super(ComputeAPIAggrTestCase, self).setUp()
@@ -3164,9 +3166,16 @@ class ComputeAPIAggrTestCase(test.TestCase):
def tearDown(self):
super(ComputeAPIAggrTestCase, self).tearDown()
+ def test_create_invalid_availability_zone(self):
+ """Ensure InvalidAggregateAction is raised with wrong avail_zone."""
+ self.assertRaises(exception.InvalidAggregateAction,
+ self.api.create_aggregate,
+ self.context, 'fake_aggr', 'fake_avail_zone')
+
def test_update_aggregate_metadata(self):
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
- 'fake_availability_zone')
+ 'fake_zone')
metadata = {'foo_key1': 'foo_value1',
'foo_key2': 'foo_value2', }
aggr = self.api.update_aggregate_metadata(self.context, aggr['id'],
@@ -3178,8 +3187,9 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_delete_aggregate(self):
"""Ensure we can delete an aggregate."""
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
- 'fake_availability_zone')
+ 'fake_zone')
self.api.delete_aggregate(self.context, aggr['id'])
expected = db.aggregate_get(self.context, aggr['id'],
read_deleted='yes')
@@ -3188,10 +3198,10 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_delete_non_empty_aggregate(self):
"""Ensure InvalidAggregateAction is raised when non empty aggregate."""
- aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
- 'fake_availability_zone')
_create_service_entries(self.context,
{'fake_availability_zone': ['fake_host']})
+ aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
+ 'fake_availability_zone')
self.api.add_host_to_aggregate(self.context, aggr['id'], 'fake_host')
self.assertRaises(exception.InvalidAggregateAction,
self.api.delete_aggregate, self.context, aggr['id'])
@@ -3242,9 +3252,9 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_add_host_to_aggregate_invalid_dismissed_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
deleted."""
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
- _create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is dismissed!
status = {'operational_state': aggregate_states.DISMISSED}
db.aggregate_update(self.context, aggr['id'], status)
@@ -3255,9 +3265,9 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_add_host_to_aggregate_invalid_error_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
in error."""
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
- _create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is in error!
status = {'operational_state': aggregate_states.ERROR}
db.aggregate_update(self.context, aggr['id'], status)
@@ -3267,17 +3277,19 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_add_host_to_aggregate_zones_mismatch(self):
"""Ensure InvalidAggregateAction is raised when zones don't match."""
- _create_service_entries(self.context, {'fake_zoneX': ['fake_host']})
+ _create_service_entries(self.context, {'fake_zoneX': ['fake_host1'],
+ 'fake_zoneY': ['fake_host2']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zoneY')
self.assertRaises(exception.InvalidAggregateAction,
self.api.add_host_to_aggregate,
- self.context, aggr['id'], 'fake_host')
+ self.context, aggr['id'], 'fake_host1')
def test_add_host_to_aggregate_raise_not_found(self):
"""Ensure ComputeHostNotFound is raised when adding invalid host."""
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
- 'fake_availability_zone')
+ 'fake_zone')
self.assertRaises(exception.ComputeHostNotFound,
self.api.add_host_to_aggregate,
self.context, aggr['id'], 'invalid_host')
@@ -3325,9 +3337,9 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_remove_host_from_aggregate_invalid_dismissed_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
deleted."""
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
- _create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is dismissed!
status = {'operational_state': aggregate_states.DISMISSED}
db.aggregate_update(self.context, aggr['id'], status)
@@ -3338,9 +3350,9 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_remove_host_from_aggregate_invalid_changing_status(self):
"""Ensure InvalidAggregateAction is raised when aggregate is
changing."""
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context,
'fake_aggregate', 'fake_zone')
- _create_service_entries(self.context, {'fake_zone': ['fake_host']})
# let's mock the fact that the aggregate is changing!
status = {'operational_state': aggregate_states.CHANGING}
db.aggregate_update(self.context, aggr['id'], status)
@@ -3350,13 +3362,85 @@ class ComputeAPIAggrTestCase(test.TestCase):
def test_remove_host_from_aggregate_raise_not_found(self):
"""Ensure ComputeHostNotFound is raised when removing invalid host."""
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
- 'fake_availability_zone')
+ 'fake_zone')
self.assertRaises(exception.ComputeHostNotFound,
self.api.remove_host_from_aggregate,
self.context, aggr['id'], 'invalid_host')
+class ComputeAggrTestCase(BaseTestCase):
+ """This is for unit coverage of aggregate-related methods
+ defined in nova.compute.manager."""
+
+ def setUp(self):
+ super(ComputeAggrTestCase, self).setUp()
+ self.context = context.get_admin_context()
+ values = {'name': 'test_aggr',
+ 'availability_zone': 'test_zone', }
+ self.aggr = db.aggregate_create(self.context, values)
+
+ def tearDown(self):
+ super(ComputeAggrTestCase, self).tearDown()
+
+ def test_add_aggregate_host(self):
+ def fake_driver_add_to_aggregate(context, aggregate, host):
+ fake_driver_add_to_aggregate.called = True
+ return {"foo": "bar"}
+ self.stubs.Set(self.compute.driver, "add_to_aggregate",
+ fake_driver_add_to_aggregate)
+
+ self.compute.add_aggregate_host(self.context, self.aggr.id, "host")
+ self.assertTrue(fake_driver_add_to_aggregate.called)
+
+ def test_add_aggregate_host_raise_err(self):
+ """Ensure the undo operation works correctly on add."""
+ def fake_driver_add_to_aggregate(context, aggregate, host):
+ raise exception.AggregateError
+ self.stubs.Set(self.compute.driver, "add_to_aggregate",
+ fake_driver_add_to_aggregate)
+
+ state = {'operational_state': aggregate_states.ACTIVE}
+ db.aggregate_update(self.context, self.aggr.id, state)
+ db.aggregate_host_add(self.context, self.aggr.id, 'fake_host')
+
+ self.assertRaises(exception.AggregateError,
+ self.compute.add_aggregate_host,
+ self.context, self.aggr.id, "fake_host")
+ excepted = db.aggregate_get(self.context, self.aggr.id)
+ self.assertEqual(excepted.operational_state, aggregate_states.ERROR)
+ self.assertEqual(excepted.hosts, [])
+
+ def test_remove_aggregate_host(self):
+ def fake_driver_remove_from_aggregate(context, aggregate, host):
+ fake_driver_remove_from_aggregate.called = True
+ self.assertEqual("host", host, "host")
+ return {"foo": "bar"}
+ self.stubs.Set(self.compute.driver, "remove_from_aggregate",
+ fake_driver_remove_from_aggregate)
+
+ self.compute.remove_aggregate_host(self.context, self.aggr.id, "host")
+ self.assertTrue(fake_driver_remove_from_aggregate.called)
+
+ def test_remove_aggregate_host_raise_err(self):
+ """Ensure the undo operation works correctly on remove."""
+ def fake_driver_remove_from_aggregate(context, aggregate, host):
+ raise exception.AggregateError
+ self.stubs.Set(self.compute.driver, "remove_from_aggregate",
+ fake_driver_remove_from_aggregate)
+
+ state = {'operational_state': aggregate_states.ACTIVE}
+ db.aggregate_update(self.context, self.aggr.id, state)
+
+ self.assertRaises(exception.AggregateError,
+ self.compute.remove_aggregate_host,
+ self.context, self.aggr.id, "fake_host")
+ excepted = db.aggregate_get(self.context, self.aggr.id)
+ self.assertEqual(excepted.operational_state, aggregate_states.ERROR)
+ self.assertEqual(excepted.hosts, ['fake_host'])
+
+
class ComputePolicyTestCase(BaseTestCase):
def setUp(self):
diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py
index 3353ea737..4cb17d958 100644
--- a/nova/tests/test_db_api.py
+++ b/nova/tests/test_db_api.py
@@ -267,6 +267,30 @@ class DbApiTestCase(test.TestCase):
expected = {uuids[0]: [], uuids[1]: []}
self.assertEqual(expected, instance_faults)
+ def test_dns_registration(self):
+ domain1 = 'test.domain.one'
+ domain2 = 'test.domain.two'
+ testzone = 'testzone'
+ ctxt = context.get_admin_context()
+
+ db.dnsdomain_register_for_zone(ctxt, domain1, testzone)
+ domain_ref = db.dnsdomain_get(ctxt, domain1)
+ zone = domain_ref.availability_zone
+ scope = domain_ref.scope
+ self.assertEqual(scope, 'private')
+ self.assertEqual(zone, testzone)
+
+ db.dnsdomain_register_for_project(ctxt, domain2,
+ self.project_id)
+ domain_ref = db.dnsdomain_get(ctxt, domain2)
+ project = domain_ref.project_id
+ scope = domain_ref.scope
+ self.assertEqual(project, self.project_id)
+ self.assertEqual(scope, 'public')
+
+ db.dnsdomain_unregister(ctxt, domain1)
+ db.dnsdomain_unregister(ctxt, domain2)
+
def _get_fake_aggr_values():
return {'name': 'fake_aggregate',
@@ -351,6 +375,14 @@ class AggregateDBApiTestCase(test.TestCase):
db.aggregate_create,
self.context, _get_fake_aggr_values())
+ def test_aggregate_get(self):
+ """Ensure we can get aggregate with all its relations."""
+ ctxt = context.get_admin_context()
+ result = _create_aggregate_with_hosts(context=ctxt)
+ expected = db.aggregate_get(ctxt, result.id)
+ self.assertEqual(_get_fake_aggr_hosts(), expected.hosts)
+ self.assertEqual(_get_fake_aggr_metadata(), expected.metadetails)
+
def test_aggregate_delete_raise_not_found(self):
"""Ensure AggregateNotFound is raised when deleting an aggregate."""
ctxt = context.get_admin_context()
@@ -541,30 +573,6 @@ class AggregateDBApiTestCase(test.TestCase):
db.aggregate_host_delete,
ctxt, result.id, _get_fake_aggr_hosts()[0])
- def test_dns_registration(self):
- domain1 = 'test.domain.one'
- domain2 = 'test.domain.two'
- testzone = 'testzone'
- ctxt = context.get_admin_context()
-
- db.dnsdomain_register_for_zone(ctxt, domain1, testzone)
- domain_ref = db.dnsdomain_get(ctxt, domain1)
- zone = domain_ref.availability_zone
- scope = domain_ref.scope
- self.assertEqual(scope, 'private')
- self.assertEqual(zone, testzone)
-
- db.dnsdomain_register_for_project(ctxt, domain2,
- self.project_id)
- domain_ref = db.dnsdomain_get(ctxt, domain2)
- project = domain_ref.project_id
- scope = domain_ref.scope
- self.assertEqual(project, self.project_id)
- self.assertEqual(scope, 'public')
-
- db.dnsdomain_unregister(ctxt, domain1)
- db.dnsdomain_unregister(ctxt, domain2)
-
class CapacityTestCase(test.TestCase):
def setUp(self):
diff --git a/nova/tests/test_virt_drivers.py b/nova/tests/test_virt_drivers.py
index 41c5d118e..70d54db9a 100644
--- a/nova/tests/test_virt_drivers.py
+++ b/nova/tests/test_virt_drivers.py
@@ -401,6 +401,14 @@ class _VirtDriverTestCase(test.TestCase):
def test_host_power_action_startup(self):
self.connection.host_power_action('a useless argument?', 'startup')
+ @catch_notimplementederror
+ def test_add_to_aggregate(self):
+ self.connection.add_to_aggregate(self.ctxt, 'aggregate', 'host')
+
+ @catch_notimplementederror
+ def test_remove_from_aggregate(self):
+ self.connection.remove_from_aggregate(self.ctxt, 'aggregate', 'host')
+
class AbstractDriverTestCase(_VirtDriverTestCase):
def setUp(self):
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index d72365e5c..b08bbd69c 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -31,6 +31,7 @@ from nova import flags
from nova import log as logging
from nova import test
from nova import utils
+from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import power_state
from nova import exception
@@ -1741,3 +1742,149 @@ class XenAPISRSelectionTestCase(test.TestCase):
expected = helper.safe_find_sr(session)
self.assertEqual(session.call_xenapi('pool.get_default_SR', pool_ref),
expected)
+
+
+class XenAPIAggregateTestCase(test.TestCase):
+ """Unit tests for aggregate operations."""
+ def setUp(self):
+ super(XenAPIAggregateTestCase, self).setUp()
+ self.stubs = stubout.StubOutForTesting()
+ self.flags(xenapi_connection_url='http://test_url',
+ xenapi_connection_username='test_user',
+ xenapi_connection_password='test_pass',
+ instance_name_template='%d',
+ firewall_driver='nova.virt.xenapi.firewall.'
+ 'Dom0IptablesFirewallDriver',
+ host='host')
+ xenapi_fake.reset()
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
+ self.context = context.get_admin_context()
+ self.conn = xenapi_conn.get_connection(False)
+ self.fake_metadata = {'master_compute': 'host'}
+
+ def tearDown(self):
+ super(XenAPIAggregateTestCase, self).tearDown()
+ self.stubs.UnsetAll()
+
+ def test_add_to_aggregate_called(self):
+ def fake_add_to_aggregate(context, aggregate, host):
+ fake_add_to_aggregate.called = True
+ self.stubs.Set(self.conn._pool,
+ "add_to_aggregate",
+ fake_add_to_aggregate)
+
+ self.conn.add_to_aggregate(None, None, None)
+ self.assertTrue(fake_add_to_aggregate.called)
+
+ def test_add_to_aggregate_for_first_host_sets_metadata(self):
+ def fake_init_pool(id, name):
+ fake_init_pool.called = True
+ self.stubs.Set(self.conn._pool, "_init_pool", fake_init_pool)
+
+ aggregate = self._aggregate_setup()
+ self.conn._pool.add_to_aggregate(self.context, aggregate, "host")
+ result = db.aggregate_get(self.context, aggregate.id)
+ self.assertTrue(fake_init_pool.called)
+ self.assertDictMatch(self.fake_metadata, result.metadetails)
+ self.assertEqual(aggregate_states.ACTIVE, result.operational_state)
+
+ def test_join_slave(self):
+ """Ensure join_slave gets called when the request gets to master."""
+ def fake_join_slave(id, compute_uuid, host, url, user, password):
+ fake_join_slave.called = True
+ self.stubs.Set(self.conn._pool, "_join_slave", fake_join_slave)
+
+ aggregate = self._aggregate_setup(hosts=['host', 'host2'],
+ metadata=self.fake_metadata)
+ self.conn._pool.add_to_aggregate(self.context, aggregate, "host2",
+ compute_uuid='fake_uuid',
+ url='fake_url',
+ user='fake_user',
+ passwd='fake_pass',
+ xenhost_uuid='fake_uuid')
+ self.assertTrue(fake_join_slave.called)
+
+ def test_add_to_aggregate_first_host(self):
+ def fake_pool_set_name_label(self, session, pool_ref, name):
+ fake_pool_set_name_label.called = True
+ self.stubs.Set(xenapi_fake.SessionBase, "pool_set_name_label",
+ fake_pool_set_name_label)
+ self.conn._session.call_xenapi("pool.create", {"name": "asdf"})
+
+ values = {"name": 'fake_aggregate',
+ "availability_zone": 'fake_zone'}
+ result = db.aggregate_create(self.context, values)
+ db.aggregate_host_add(self.context, result.id, "host")
+ aggregate = db.aggregate_get(self.context, result.id)
+ self.assertEqual(["host"], aggregate.hosts)
+ self.assertEqual({}, aggregate.metadetails)
+
+ self.conn._pool.add_to_aggregate(self.context, aggregate, "host")
+ self.assertTrue(fake_pool_set_name_label.called)
+
+ def test_remove_from_aggregate_called(self):
+ def fake_remove_from_aggregate(context, aggregate, host):
+ fake_remove_from_aggregate.called = True
+ self.stubs.Set(self.conn._pool,
+ "remove_from_aggregate",
+ fake_remove_from_aggregate)
+
+ self.conn.remove_from_aggregate(None, None, None)
+ self.assertTrue(fake_remove_from_aggregate.called)
+
+ def test_remove_from_empty_aggregate(self):
+ values = {"name": 'fake_aggregate',
+ "availability_zone": 'fake_zone'}
+ result = db.aggregate_create(self.context, values)
+ self.assertRaises(exception.AggregateError,
+ self.conn._pool.remove_from_aggregate,
+ None, result, "test_host")
+
+ def test_remove_slave(self):
+ """Ensure eject slave gets called."""
+ def fake_eject_slave(id, compute_uuid, host_uuid):
+ fake_eject_slave.called = True
+ self.stubs.Set(self.conn._pool, "_eject_slave", fake_eject_slave)
+
+ self.fake_metadata['host2'] = 'fake_host2_uuid'
+ aggregate = self._aggregate_setup(hosts=['host', 'host2'],
+ metadata=self.fake_metadata)
+ self.conn._pool.remove_from_aggregate(self.context, aggregate, "host2")
+ self.assertTrue(fake_eject_slave.called)
+
+ def test_remove_master_solo(self):
+ """Ensure metadata are cleared after removal."""
+ def fake_clear_pool(id):
+ fake_clear_pool.called = True
+ self.stubs.Set(self.conn._pool, "_clear_pool", fake_clear_pool)
+
+ aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE,
+ metadata=self.fake_metadata)
+ self.conn._pool.remove_from_aggregate(self.context, aggregate, "host")
+ result = db.aggregate_get(self.context, aggregate.id)
+ self.assertTrue(fake_clear_pool.called)
+ self.assertDictMatch({}, result.metadetails)
+ self.assertEqual(aggregate_states.ACTIVE, result.operational_state)
+
+ def test_remote_master_non_empty_pool(self):
+ """Ensure AggregateError is raised if removing the master."""
+ aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE,
+ hosts=['host', 'host2'],
+ metadata=self.fake_metadata)
+ self.assertRaises(exception.AggregateError,
+ self.conn._pool.remove_from_aggregate,
+ self.context, aggregate, "host")
+
+ def _aggregate_setup(self, aggr_name='fake_aggregate',
+ aggr_zone='fake_zone',
+ aggr_state=aggregate_states.CREATED,
+ hosts=['host'], metadata=None):
+ values = {"name": aggr_name,
+ "availability_zone": aggr_zone,
+ "operational_state": aggr_state, }
+ result = db.aggregate_create(self.context, values)
+ for host in hosts:
+ db.aggregate_host_add(self.context, result.id, host)
+ if metadata:
+ db.aggregate_metadata_add(self.context, result.id, metadata)
+ return db.aggregate_get(self.context, result.id)
diff --git a/nova/virt/driver.py b/nova/virt/driver.py
index 20c41ca9f..92e0d53dd 100644
--- a/nova/virt/driver.py
+++ b/nova/virt/driver.py
@@ -654,6 +654,13 @@ class ComputeDriver(object):
related to other calls into the driver. The prime example is to clean
the cache and remove images which are no longer of interest.
"""
+
+ def add_to_aggregate(self, context, aggregate, host, **kwargs):
+ """Add a compute host to an aggregate."""
+ raise NotImplementedError()
+
+ def remove_from_aggregate(self, context, aggregate, host, **kwargs):
+ """Remove a compute host from an aggregate."""
raise NotImplementedError()
def get_volume_connector(self, instance):
diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py
index d59e78f4f..73f2d863b 100644
--- a/nova/virt/xenapi/fake.py
+++ b/nova/virt/xenapi/fake.py
@@ -63,7 +63,7 @@ from nova import log as logging
from nova import utils
-_CLASSES = ['host', 'network', 'session', 'SR', 'VBD', 'pool',
+_CLASSES = ['host', 'network', 'session', 'pool', 'SR', 'VBD',
'PBD', 'VDI', 'VIF', 'PIF', 'VM', 'VLAN', 'task']
_db_content = {}
@@ -509,6 +509,15 @@ class SessionBase(object):
def VM_clean_reboot(self, *args):
return 'burp'
+ def pool_eject(self, session, host_ref):
+ pass
+
+ def pool_join(self, session, hostname, username, password):
+ pass
+
+ def pool_set_name_label(self, session, pool_ref, name):
+ pass
+
def network_get_all_records_where(self, _1, filter):
return self.xenapi.network.get_all_records()
diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py
new file mode 100644
index 000000000..95f0f3467
--- /dev/null
+++ b/nova/virt/xenapi/pool.py
@@ -0,0 +1,214 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2012 Citrix Systems, Inc.
+# Copyright 2010 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Management class for Pool-related functions (join, eject, etc).
+"""
+
+import json
+import urlparse
+
+from nova import db
+from nova import exception
+from nova import flags
+from nova import log as logging
+from nova import rpc
+from nova.compute import aggregate_states
+from nova.openstack.common import cfg
+from nova.virt.xenapi import vm_utils
+
+LOG = logging.getLogger("nova.virt.xenapi.pool")
+
+xenapi_pool_opts = [
+ cfg.BoolOpt('use_join_force',
+ default=True,
+ help='To use for hosts with different CPUs'),
+ ]
+
+FLAGS = flags.FLAGS
+FLAGS.register_opts(xenapi_pool_opts)
+
+
+class ResourcePool(object):
+ """
+ Implements resource pool operations.
+ """
+ def __init__(self, session):
+ self.XenAPI = session.get_imported_xenapi()
+ host_ref = session.get_xenapi_host()
+ host_rec = session.call_xenapi('host.get_record', host_ref)
+ self._host_name = host_rec['hostname']
+ self._host_addr = host_rec['address']
+ self._host_uuid = host_rec['uuid']
+ self._session = session
+
+ def add_to_aggregate(self, context, aggregate, host, **kwargs):
+ """Add a compute host to an aggregate."""
+ if len(aggregate.hosts) == 1:
+ # this is the first host of the pool -> make it master
+ self._init_pool(aggregate.id, aggregate.name)
+ # save metadata so that we can find the master again:
+ # the password should be encrypted, really.
+ values = {
+ 'operational_state': aggregate_states.ACTIVE,
+ 'metadata': {'master_compute': host},
+ }
+ db.aggregate_update(context, aggregate.id, values)
+ else:
+ # the pool is already up and running, we need to figure out
+ # whether we can serve the request from this host or not.
+ master_compute = aggregate.metadetails['master_compute']
+ if master_compute == FLAGS.host and master_compute != host:
+ # this is the master -> do a pool-join
+ # To this aim, nova compute on the slave has to go down.
+ # NOTE: it is assumed that ONLY nova compute is running now
+ self._join_slave(aggregate.id, host,
+ kwargs.get('compute_uuid'),
+ kwargs.get('url'), kwargs.get('user'),
+ kwargs.get('passwd'))
+ metadata = {host: kwargs.get('xenhost_uuid'), }
+ db.aggregate_metadata_add(context, aggregate.id, metadata)
+ elif master_compute and master_compute != host:
+ # send rpc cast to master, asking to add the following
+ # host with specified credentials.
+ # NOTE: password in clear is not great, but it'll do for now
+ forward_request(context, "add_aggregate_host", master_compute,
+ aggregate.id, host,
+ self._host_addr, self._host_uuid)
+
+ def remove_from_aggregate(self, context, aggregate, host, **kwargs):
+ """Remove a compute host from an aggregate."""
+ master_compute = aggregate.metadetails.get('master_compute')
+ if master_compute == FLAGS.host and master_compute != host:
+ # this is the master -> instruct it to eject a host from the pool
+ host_uuid = db.aggregate_metadata_get(context, aggregate.id)[host]
+ self._eject_slave(aggregate.id,
+ kwargs.get('compute_uuid'), host_uuid)
+ db.aggregate_metadata_delete(context, aggregate.id, host)
+ elif master_compute == host:
+ # Remove master from its own pool -> destroy pool only if the
+ # master is on its own, otherwise raise fault. Destroying a
+ # pool made only by master is fictional
+ if len(aggregate.hosts) > 1:
+ raise exception.AggregateError(
+ aggregate_id=aggregate.id,
+ action='remove_from_aggregate',
+ reason=_('Unable to eject %(host)s '
+ 'from the pool; pool not empty')
+ % locals())
+ self._clear_pool(aggregate.id)
+ db.aggregate_metadata_delete(context,
+ aggregate.id, 'master_compute')
+ elif master_compute and master_compute != host:
+ # A master exists -> forward pool-eject request to master
+ forward_request(context, "remove_aggregate_host", master_compute,
+ aggregate.id, host,
+ self._host_addr, self._host_uuid)
+ else:
+ # this shouldn't have happened
+ raise exception.AggregateError(aggregate_id=aggregate.id,
+ action='remove_from_aggregate',
+ reason=_('Unable to eject %(host)s '
+ 'from the pool; No master found')
+ % locals())
+
+ def _join_slave(self, aggregate_id, host, compute_uuid, url, user, passwd):
+ """Joins a slave into a XenServer resource pool."""
+ try:
+ args = {'compute_uuid': compute_uuid,
+ 'url': url,
+ 'user': user,
+ 'password': passwd,
+ 'force': json.dumps(FLAGS.use_join_force),
+ 'master_addr': self._host_addr,
+ 'master_user': FLAGS.xenapi_connection_username,
+ 'master_pass': FLAGS.xenapi_connection_password, }
+ task = self._session.async_call_plugin('xenhost',
+ 'host_join', args)
+ self._session.wait_for_task(task)
+ except self.XenAPI.Failure as e:
+ LOG.error(_("Pool-Join failed: %(e)s") % locals())
+ raise exception.AggregateError(aggregate_id=aggregate_id,
+ action='add_to_aggregate',
+ reason=_('Unable to join %(host)s '
+ 'in the pool') % locals())
+
+ def _eject_slave(self, aggregate_id, compute_uuid, host_uuid):
+ """Eject a slave from a XenServer resource pool."""
+ try:
+ # shutdown nova-compute; if there are other VMs running, e.g.
+ # guest instances, the eject will fail. That's a precaution
+ # to deal with the fact that the admin should evacuate the host
+ # first. The eject wipes out the host completely.
+ vm_ref = self._session.call_xenapi('VM.get_by_uuid', compute_uuid)
+ self._session.call_xenapi("VM.clean_shutdown", vm_ref)
+
+ host_ref = self._session.call_xenapi('host.get_by_uuid', host_uuid)
+ self._session.call_xenapi("pool.eject", host_ref)
+ except self.XenAPI.Failure as e:
+ LOG.error(_("Pool-eject failed: %(e)s") % locals())
+ raise exception.AggregateError(aggregate_id=aggregate_id,
+ action='remove_from_aggregate',
+ reason=str(e.details))
+
+ def _init_pool(self, aggregate_id, aggregate_name):
+ """Set the name label of a XenServer pool."""
+ try:
+ pool_ref = self._session.call_xenapi("pool.get_all")[0]
+ self._session.call_xenapi("pool.set_name_label",
+ pool_ref, aggregate_name)
+ except self.XenAPI.Failure as e:
+ LOG.error(_("Unable to set up pool: %(e)s.") % locals())
+ raise exception.AggregateError(aggregate_id=aggregate_id,
+ action='add_to_aggregate',
+ reason=str(e.details))
+
+ def _clear_pool(self, aggregate_id):
+ """Clear the name label of a XenServer pool."""
+ try:
+ pool_ref = self._session.call_xenapi('pool.get_all')[0]
+ self._session.call_xenapi('pool.set_name_label', pool_ref, '')
+ except self.XenAPI.Failure as e:
+ LOG.error(_("Pool-set_name_label failed: %(e)s") % locals())
+ raise exception.AggregateError(aggregate_id=aggregate_id,
+ action='remove_from_aggregate',
+ reason=str(e.details))
+
+
+def forward_request(context, request_type, master, aggregate_id,
+ slave_compute, slave_address, slave_uuid):
+ """Casts add/remove requests to the pool master."""
+ # replace the address from the xenapi connection url
+ # because this might be 169.254.0.1, i.e. xenapi
+ sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address)
+ rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master),
+ {"method": request_type,
+ "args": {"aggregate_id": aggregate_id,
+ "host": slave_compute,
+ "url": sender_url,
+ "user": FLAGS.xenapi_connection_username,
+ "passwd": FLAGS.xenapi_connection_password,
+ "compute_uuid": vm_utils.get_this_vm_uuid(),
+ "xenhost_uuid": slave_uuid, },
+ })
+
+
+def swap_xapi_host(url, host_addr):
+ """Replace the XenServer address present in 'url' with 'host_addr'."""
+ temp_url = urlparse.urlparse(url)
+ _, sep, port = temp_url.netloc.partition(':')
+ return url.replace(temp_url.netloc, '%s%s%s' % (host_addr, sep, port))
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
index 08542268c..c9363bb74 100644
--- a/nova/virt/xenapi_conn.py
+++ b/nova/virt/xenapi_conn.py
@@ -78,6 +78,7 @@ from nova import flags
from nova import log as logging
from nova.openstack.common import cfg
from nova.virt import driver
+from nova.virt.xenapi import pool
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi.vmops import VMOps
from nova.virt.xenapi.volumeops import VolumeOps
@@ -180,6 +181,7 @@ class XenAPIConnection(driver.ComputeDriver):
self._product_version = self._session.get_product_version()
self._vmops = VMOps(self._session, self._product_version)
self._initiator = None
+ self._pool = pool.ResourcePool(self._session)
@property
def host_state(self):
@@ -488,6 +490,15 @@ class XenAPIConnection(driver.ComputeDriver):
"""Sets the specified host's ability to accept new instances."""
return self._vmops.set_host_enabled(host, enabled)
+ def add_to_aggregate(self, context, aggregate, host, **kwargs):
+ """Add a compute host to an aggregate."""
+ return self._pool.add_to_aggregate(context, aggregate, host, **kwargs)
+
+ def remove_from_aggregate(self, context, aggregate, host, **kwargs):
+ """Remove a compute host from an aggregate."""
+ return self._pool.remove_from_aggregate(context,
+ aggregate, host, **kwargs)
+
class XenAPISession(object):
"""The session to invoke XenAPI SDK calls"""
@@ -498,9 +509,19 @@ class XenAPISession(object):
exception = self.XenAPI.Failure(_("Unable to log in to XenAPI "
"(is the Dom0 disk full?)"))
for i in xrange(FLAGS.xenapi_connection_concurrent):
- session = self._create_session(url)
- with timeout.Timeout(FLAGS.xenapi_login_timeout, exception):
- session.login_with_password(user, pw)
+ try:
+ session = self._create_session(url)
+ with timeout.Timeout(FLAGS.xenapi_login_timeout, exception):
+ session.login_with_password(user, pw)
+ except self.XenAPI.Failure, e:
+ # if user and pw of the master are different, we're doomed!
+ if e.details[0] == 'HOST_IS_SLAVE':
+ master = e.details[1]
+ session = self.XenAPI.Session(pool.swap_xapi_host(url,
+ master))
+ session.login_with_password(user, pw)
+ else:
+ raise
self._sessions.put(session)
def get_product_version(self):
diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost
index 64938641f..9a7ad84af 100644
--- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost
+++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost
@@ -33,6 +33,7 @@ import subprocess
import tempfile
import time
+import XenAPI
import XenAPIPlugin
import pluginlib_nova as pluginlib
@@ -41,6 +42,8 @@ pluginlib.configure_logging("xenhost")
host_data_pattern = re.compile(r"\s*(\S+) \([^\)]+\) *: ?(.*)")
config_file_path = "/usr/etc/xenhost.conf"
+DEFAULT_TRIES = 23
+DEFAULT_SLEEP = 10
def jsonify(fnc):
@@ -97,6 +100,28 @@ def _run_command_with_input(cmd, process_input):
return output
+def _resume_compute(session, compute_ref, compute_uuid):
+ """Resume compute node on slave host after pool join. This has to
+ happen regardless of the success or failure of the join operation."""
+ try:
+ # session is valid if the join operation has failed
+ session.xenapi.VM.start(compute_ref, False, True)
+ except XenAPI.Failure, e:
+ # if session is invalid, e.g. xapi has restarted, then the pool
+ # join has been successful, wait for xapi to become alive again
+ for c in xrange(0, DEFAULT_TRIES):
+ try:
+ _run_command("xe vm-start uuid=%s" % compute_uuid)
+ return
+ except pluginlib.PluginError, e:
+ logging.exception('Waited %d seconds for the slave to '
+ 'become available.' % (c * DEFAULT_SLEEP))
+ time.sleep(DEFAULT_SLEEP)
+ raise pluginlib.PluginError('Unrecoverable error: the host has '
+ 'not come back for more than %d seconds'
+ % (DEFAULT_SLEEP * (DEFAULT_TRIES + 1)))
+
+
def _get_host_uuid():
cmd = "xe host-list | grep uuid"
resp = _run_command(cmd)
@@ -258,6 +283,34 @@ def host_start(self, arg_dict):
@jsonify
+def host_join(self, arg_dict):
+ """Join a remote host into a pool whose master is the host
+ where the plugin is called from. The following constraints apply:
+
+ - The host must have no VMs running, except nova-compute, which will be
+ shut down (and restarted upon pool-join) automatically,
+ - The host must have no shared storage currently set up,
+ - The host must have the same license of the master,
+ - The host must have the same supplemental packs as the master."""
+ session = XenAPI.Session(arg_dict.get("url"))
+ session.login_with_password(arg_dict.get("user"),
+ arg_dict.get("password"))
+ compute_ref = session.xenapi.VM.get_by_uuid(arg_dict.get('compute_uuid'))
+ session.xenapi.VM.clean_shutdown(compute_ref)
+ try:
+ if arg_dict.get("force"):
+ session.xenapi.pool.join(arg_dict.get("master_addr"),
+ arg_dict.get("master_user"),
+ arg_dict.get("master_pass"))
+ else:
+ session.xenapi.pool.join_force(arg_dict.get("master_addr"),
+ arg_dict.get("master_user"),
+ arg_dict.get("master_pass"))
+ finally:
+ _resume_compute(session, compute_ref, arg_dict.get("compute_uuid"))
+
+
+@jsonify
def host_data(self, arg_dict):
"""Runs the commands on the xenstore host to return the current status
information.
@@ -380,6 +433,7 @@ if __name__ == "__main__":
"host_shutdown": host_shutdown,
"host_reboot": host_reboot,
"host_start": host_start,
+ "host_join": host_join,
"get_config": get_config,
"set_config": set_config,
"iptables_config": iptables_config})