From 424f32f04d9c6c97f684782b35e1c25fbf83ce05 Mon Sep 17 00:00:00 2001 From: Armando Migliaccio Date: Wed, 1 Feb 2012 15:01:26 +0000 Subject: blueprint host-aggregates: xenapi implementation This commit introduces some clean-up/improvements on the current model and api for host aggregates. It also introduces a first version of the xenapi implementation. More precisely: - it lays out the structure of the virt driver, - it introduces compute and xenapi unit tests coverage, - it deals with join/eject of pool master and slaves, - it fixes xenapi_conn, when used in resource pool configurations More commits to follow (to ensure that VM placement, networking setup, performance metrics work just as well in cases where resource pools are present). However, these may be outside the scope of this blueprint and considered as ad-hoc bug fixes. Change-Id: Ib3cff71160264c5547e1c060d3fd566ad87337cb --- nova/api/openstack/compute/contrib/aggregates.py | 57 +++--- nova/compute/api.py | 19 +- nova/compute/manager.py | 36 +++- nova/db/sqlalchemy/api.py | 9 +- nova/db/sqlalchemy/models.py | 103 +++++++--- nova/exception.py | 5 + nova/tests/test_compute.py | 108 +++++++++-- nova/tests/test_db_api.py | 56 +++--- nova/tests/test_virt_drivers.py | 8 + nova/tests/test_xenapi.py | 147 ++++++++++++++ nova/virt/driver.py | 7 + nova/virt/xenapi/fake.py | 11 +- nova/virt/xenapi/pool.py | 214 +++++++++++++++++++++ nova/virt/xenapi_conn.py | 27 ++- .../xenserver/xenapi/etc/xapi.d/plugins/xenhost | 54 ++++++ 15 files changed, 749 insertions(+), 112 deletions(-) create mode 100644 nova/virt/xenapi/pool.py diff --git a/nova/api/openstack/compute/contrib/aggregates.py b/nova/api/openstack/compute/contrib/aggregates.py index 6206f42c4..ea90a57d7 100644 --- a/nova/api/openstack/compute/contrib/aggregates.py +++ b/nova/api/openstack/compute/contrib/aggregates.py @@ -63,17 +63,18 @@ class AggregateController(object): raise exc.HTTPBadRequest try: host_aggregate = body["aggregate"] - aggregate_name = host_aggregate["name"] - availability_zone = host_aggregate["availability_zone"] + name = host_aggregate["name"] + avail_zone = host_aggregate["availability_zone"] except KeyError: raise exc.HTTPBadRequest if len(host_aggregate) != 2: raise exc.HTTPBadRequest try: - aggregate = self.api.create_aggregate(context, aggregate_name, - availability_zone) + aggregate = self.api.create_aggregate(context, name, avail_zone) except exception.AggregateNameExists: + LOG.exception(_("Cannot create aggregate with name %(name)s and " + "availability zone %(avail_zone)s") % locals()) raise exc.HTTPConflict return self._marshall_aggregate(aggregate) @@ -84,6 +85,7 @@ class AggregateController(object): try: aggregate = self.api.get_aggregate(context, id) except exception.AggregateNotFound: + LOG.exception(_("Cannot show aggregate: %(id)s") % locals()) raise exc.HTTPNotFound return self._marshall_aggregate(aggregate) @@ -91,7 +93,6 @@ class AggregateController(object): """Updates the name and/or availbility_zone of given aggregate.""" context = _get_context(req) authorize(context) - aggregate = id if len(body) != 1: raise exc.HTTPBadRequest @@ -108,8 +109,9 @@ class AggregateController(object): raise exc.HTTPBadRequest try: - aggregate = self.api.update_aggregate(context, aggregate, updates) + aggregate = self.api.update_aggregate(context, id, updates) except exception.AggregateNotFound: + LOG.exception(_("Cannot update aggregate: %(id)s") % locals()) raise exc.HTTPNotFound return self._marshall_aggregate(aggregate) @@ -118,10 +120,10 @@ class AggregateController(object): """Removes an aggregate by id.""" context = _get_context(req) authorize(context) - aggregate_id = id try: - self.api.delete_aggregate(context, aggregate_id) + self.api.delete_aggregate(context, id) except exception.AggregateNotFound: + LOG.exception(_("Cannot delete aggregate: %(id)s") % locals()) raise exc.HTTPNotFound def action(self, req, id, body): @@ -144,19 +146,17 @@ class AggregateController(object): """Adds a host to the specified aggregate.""" context = _get_context(req) authorize(context) - aggregate = id try: - aggregate = self.api.add_host_to_aggregate(context, - aggregate, host) - except exception.AggregateNotFound: - raise exc.HTTPNotFound - except exception.ComputeHostNotFound: + aggregate = self.api.add_host_to_aggregate(context, id, host) + except (exception.AggregateNotFound, exception.ComputeHostNotFound): + LOG.exception(_("Cannot add host %(host)s in aggregate " + "%(id)s") % locals()) raise exc.HTTPNotFound - except exception.AggregateHostConflict: - raise exc.HTTPConflict - except exception.AggregateHostExists: - raise exc.HTTPConflict - except exception.InvalidAggregateAction: + except (exception.AggregateHostConflict, + exception.AggregateHostExists, + exception.InvalidAggregateAction): + LOG.exception(_("Cannot add host %(host)s in aggregate " + "%(id)s") % locals()) raise exc.HTTPConflict return self._marshall_aggregate(aggregate) @@ -165,15 +165,15 @@ class AggregateController(object): """Removes a host from the specified aggregate.""" context = _get_context(req) authorize(context) - aggregate = id try: - aggregate = self.api.remove_host_from_aggregate(context, - aggregate, host) - except exception.AggregateNotFound: - raise exc.HTTPNotFound - except exception.AggregateHostNotFound: + aggregate = self.api.remove_host_from_aggregate(context, id, host) + except (exception.AggregateNotFound, exception.AggregateHostNotFound): + LOG.exception(_("Cannot remove host %(host)s in aggregate " + "%(id)s") % locals()) raise exc.HTTPNotFound except exception.InvalidAggregateAction: + LOG.exception(_("Cannot remove host %(host)s in aggregate " + "%(id)s") % locals()) raise exc.HTTPConflict return self._marshall_aggregate(aggregate) @@ -181,20 +181,19 @@ class AggregateController(object): """Replaces the aggregate's existing metadata with new metadata.""" context = _get_context(req) authorize(context) - aggregate = id if len(body) != 1: raise exc.HTTPBadRequest - try: metadata = body["metadata"] except KeyError: raise exc.HTTPBadRequest - try: aggregate = self.api.update_aggregate_metadata(context, - aggregate, metadata) + id, metadata) except exception.AggregateNotFound: + LOG.exception(_("Cannot set metadata %(metadata)s in aggregate " + "%(id)s") % locals()) raise exc.HTTPNotFound return self._marshall_aggregate(aggregate) diff --git a/nova/compute/api.py b/nova/compute/api.py index edadf1978..b908a6587 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -1706,10 +1706,18 @@ class AggregateAPI(base.Base): def create_aggregate(self, context, aggregate_name, availability_zone): """Creates the model for the aggregate.""" - values = {"name": aggregate_name, - "availability_zone": availability_zone} - aggregate = self.db.aggregate_create(context, values) - return dict(aggregate.iteritems()) + zones = [s.availability_zone for s in + self.db.service_get_all_by_topic(context, + FLAGS.compute_topic)] + if availability_zone in zones: + values = {"name": aggregate_name, + "availability_zone": availability_zone} + aggregate = self.db.aggregate_create(context, values) + return dict(aggregate.iteritems()) + else: + raise exception.InvalidAggregateAction(action='create_aggregate', + aggregate_id="'N/A'", + reason='invalid zone') def get_aggregate(self, context, aggregate_id): """Get an aggregate by id.""" @@ -1805,7 +1813,8 @@ class AggregateAPI(base.Base): "host": host}, }) return self.get_aggregate(context, aggregate_id) else: - invalid = {aggregate_states.CHANGING: 'setup in progress', + invalid = {aggregate_states.CREATED: 'no hosts to remove', + aggregate_states.CHANGING: 'setup in progress', aggregate_states.DISMISSED: 'aggregate deleted', } if aggregate.operational_state in invalid.keys(): raise exception.InvalidAggregateAction( diff --git a/nova/compute/manager.py b/nova/compute/manager.py index f31a0d2e4..ac9bebafe 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -46,6 +46,7 @@ from eventlet import greenthread from nova import block_device import nova.context +from nova.compute import aggregate_states from nova.compute import instance_types from nova.compute import power_state from nova.compute import task_states @@ -2320,13 +2321,40 @@ class ComputeManager(manager.SchedulerDependentManager): LOG.error(msg % error) self._set_instance_error_state(context, instance_uuid) - def add_aggregate_host(self, context, aggregate_id, host): + @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) + def add_aggregate_host(self, context, aggregate_id, host, **kwargs): """Adds a host to a physical hypervisor pool.""" - raise NotImplementedError() + aggregate = self.db.aggregate_get(context, aggregate_id) + try: + self.driver.add_to_aggregate(context, aggregate, host, **kwargs) + except exception.AggregateError: + error = sys.exc_info() + self._undo_aggregate_operation(context, + self.db.aggregate_host_delete, + aggregate.id, host) + raise error[0], error[1], error[2] - def remove_aggregate_host(self, context, aggregate_id, host): + @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) + def remove_aggregate_host(self, context, aggregate_id, host, **kwargs): """Removes a host from a physical hypervisor pool.""" - raise NotImplementedError() + aggregate = self.db.aggregate_get(context, aggregate_id) + try: + self.driver.remove_from_aggregate(context, + aggregate, host, **kwargs) + except exception.AggregateError: + error = sys.exc_info() + self._undo_aggregate_operation(context, self.db.aggregate_host_add, + aggregate.id, host) + raise error[0], error[1], error[2] + + def _undo_aggregate_operation(self, context, op, aggregate_id, host): + try: + status = {'operational_state': aggregate_states.ERROR} + self.db.aggregate_update(context, aggregate_id, status) + op(context, aggregate_id, host) + except Exception: + LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state ' + 'during operation on %(host)s') % locals()) @manager.periodic_task( ticks_between_runs=FLAGS.image_cache_manager_interval) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 6fdba1642..026fe2f28 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -4273,8 +4273,8 @@ def _aggregate_get_query(context, model_class, id_field, id, def aggregate_create(context, values, metadata=None): try: aggregate = models.Aggregate() + values.setdefault('operational_state', aggregate_states.CREATED) aggregate.update(values) - aggregate.operational_state = aggregate_states.CREATED aggregate.save() except exception.DBError: raise exception.AggregateNameExists(aggregate_name=values['name']) @@ -4409,8 +4409,7 @@ def aggregate_metadata_add(context, aggregate_id, metadata, set_delete=False): meta_ref = aggregate_metadata_get_item(context, aggregate_id, meta_key, session) if meta_ref.deleted: - item.update({'deleted': False, 'deleted_at': None, - 'updated_at': literal_column('updated_at')}) + item.update({'deleted': False, 'deleted_at': None}) except exception.AggregateMetadataNotFound: meta_ref = models.AggregateMetadata() item.update({"key": meta_key, "aggregate_id": aggregate_id}) @@ -4469,9 +4468,7 @@ def aggregate_host_add(context, aggregate_id, host): except exception.DBError: raise exception.AggregateHostConflict(host=host) elif host_ref.deleted: - host_ref.update({'deleted': False, - 'deleted_at': None, - 'updated_at': literal_column('updated_at')}) + host_ref.update({'deleted': False, 'deleted_at': None}) host_ref.save(session=session) else: raise exception.AggregateHostExists(host=host, diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index c99d9489b..33438444f 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -885,26 +885,12 @@ class Zone(BASE, NovaBase): rpc_virtual_host = Column(String(255)) -class Aggregate(BASE, NovaBase): - """Represents a cluster of hosts that exists in this zone.""" - __tablename__ = 'aggregates' - id = Column(Integer, primary_key=True, autoincrement=True) - name = Column(String(255), unique=True) - operational_state = Column(String(255), nullable=False) - availability_zone = Column(String(255), nullable=False) - - class AggregateHost(BASE, NovaBase): """Represents a host that is member of an aggregate.""" __tablename__ = 'aggregate_hosts' id = Column(Integer, primary_key=True, autoincrement=True) host = Column(String(255), unique=True) aggregate_id = Column(Integer, ForeignKey('aggregates.id'), nullable=False) - aggregate = relationship(Aggregate, backref=backref('aggregates'), - foreign_keys=aggregate_id, - primaryjoin='and_(' - 'AggregateHost.aggregate_id == Aggregate.id,' - 'AggregateHost.deleted == False)') class AggregateMetadata(BASE, NovaBase): @@ -914,11 +900,46 @@ class AggregateMetadata(BASE, NovaBase): key = Column(String(255), nullable=False) value = Column(String(255), nullable=False) aggregate_id = Column(Integer, ForeignKey('aggregates.id'), nullable=False) - aggregate = relationship(Aggregate, backref="metadata", - foreign_keys=aggregate_id, - primaryjoin='and_(' - 'AggregateMetadata.aggregate_id == Aggregate.id,' - 'AggregateMetadata.deleted == False)') + + +class Aggregate(BASE, NovaBase): + """Represents a cluster of hosts that exists in this zone.""" + __tablename__ = 'aggregates' + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String(255), unique=True) + operational_state = Column(String(255), nullable=False) + availability_zone = Column(String(255), nullable=False) + _hosts = relationship(AggregateHost, + secondary="aggregate_hosts", + primaryjoin='and_(' + 'Aggregate.id == AggregateHost.aggregate_id,' + 'AggregateHost.deleted == False,' + 'Aggregate.deleted == False)', + secondaryjoin='and_(' + 'AggregateHost.aggregate_id == Aggregate.id, ' + 'AggregateHost.deleted == False,' + 'Aggregate.deleted == False)', + backref='aggregates') + + _metadata = relationship(AggregateMetadata, + secondary="aggregate_metadata", + primaryjoin='and_(' + 'Aggregate.id == AggregateMetadata.aggregate_id,' + 'AggregateMetadata.deleted == False,' + 'Aggregate.deleted == False)', + secondaryjoin='and_(' + 'AggregateMetadata.aggregate_id == Aggregate.id, ' + 'AggregateMetadata.deleted == False,' + 'Aggregate.deleted == False)', + backref='aggregates') + + @property + def hosts(self): + return [h.host for h in self._hosts] + + @property + def metadetails(self): + return dict([(m.key, m.value) for m in self._metadata]) class AgentBuild(BASE, NovaBase): @@ -997,15 +1018,41 @@ def register_models(): connection is lost and needs to be reestablished. """ from sqlalchemy import create_engine - models = (Service, Instance, InstanceActions, InstanceTypes, - Volume, IscsiTarget, FixedIp, FloatingIp, - Network, SecurityGroup, SecurityGroupIngressRule, - SecurityGroupInstanceAssociation, AuthToken, User, - Project, Certificate, ConsolePool, Console, Zone, - VolumeMetadata, VolumeTypes, VolumeTypeExtraSpecs, - AgentBuild, InstanceMetadata, InstanceTypeExtraSpecs, Migration, - VirtualStorageArray, SMFlavors, SMBackendConf, SMVolume, - InstanceFault) + models = (AgentBuild, + Aggregate, + AggregateHost, + AggregateMetadata, + AuthToken, + Certificate, + Console, + ConsolePool, + FixedIp, + FloatingIp, + Instance, + InstanceActions, + InstanceFault, + InstanceMetadata, + InstanceTypeExtraSpecs, + InstanceTypes, + IscsiTarget, + Migration, + Network, + Project, + SecurityGroup, + SecurityGroupIngressRule, + SecurityGroupInstanceAssociation, + Service, + SMBackendConf, + SMFlavors, + SMVolume, + User, + VirtualStorageArray, + Volume, + VolumeMetadata, + VolumeTypeExtraSpecs, + VolumeTypes, + Zone, + ) engine = create_engine(FLAGS.sql_connection, echo=False) for model in models: model.metadata.create_all(engine) diff --git a/nova/exception.py b/nova/exception.py index 518a95a62..91a377c1e 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -929,6 +929,11 @@ class QuotaError(NovaException): message = _("Quota exceeded") + ": code=%(code)s" +class AggregateError(NovaException): + message = _("Aggregate %(aggregate_id)s: action '%(action)s' " + "caused an error: %(reason)s.") + + class AggregateNotFound(NotFound): message = _("Aggregate %(aggregate_id)s could not be found.") diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index d6d54227f..5ee74df8e 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -3153,6 +3153,8 @@ def _create_service_entries(context, values={'avail_zone1': ['fake_host1', class ComputeAPIAggrTestCase(test.TestCase): + """This is for unit coverage of aggregate-related methods + defined in nova.compute.api.""" def setUp(self): super(ComputeAPIAggrTestCase, self).setUp() @@ -3164,9 +3166,16 @@ class ComputeAPIAggrTestCase(test.TestCase): def tearDown(self): super(ComputeAPIAggrTestCase, self).tearDown() + def test_create_invalid_availability_zone(self): + """Ensure InvalidAggregateAction is raised with wrong avail_zone.""" + self.assertRaises(exception.InvalidAggregateAction, + self.api.create_aggregate, + self.context, 'fake_aggr', 'fake_avail_zone') + def test_update_aggregate_metadata(self): + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', - 'fake_availability_zone') + 'fake_zone') metadata = {'foo_key1': 'foo_value1', 'foo_key2': 'foo_value2', } aggr = self.api.update_aggregate_metadata(self.context, aggr['id'], @@ -3178,8 +3187,9 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_delete_aggregate(self): """Ensure we can delete an aggregate.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', - 'fake_availability_zone') + 'fake_zone') self.api.delete_aggregate(self.context, aggr['id']) expected = db.aggregate_get(self.context, aggr['id'], read_deleted='yes') @@ -3188,10 +3198,10 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_delete_non_empty_aggregate(self): """Ensure InvalidAggregateAction is raised when non empty aggregate.""" - aggr = self.api.create_aggregate(self.context, 'fake_aggregate', - 'fake_availability_zone') _create_service_entries(self.context, {'fake_availability_zone': ['fake_host']}) + aggr = self.api.create_aggregate(self.context, 'fake_aggregate', + 'fake_availability_zone') self.api.add_host_to_aggregate(self.context, aggr['id'], 'fake_host') self.assertRaises(exception.InvalidAggregateAction, self.api.delete_aggregate, self.context, aggr['id']) @@ -3242,9 +3252,9 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_add_host_to_aggregate_invalid_dismissed_status(self): """Ensure InvalidAggregateAction is raised when aggregate is deleted.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', 'fake_zone') - _create_service_entries(self.context, {'fake_zone': ['fake_host']}) # let's mock the fact that the aggregate is dismissed! status = {'operational_state': aggregate_states.DISMISSED} db.aggregate_update(self.context, aggr['id'], status) @@ -3255,9 +3265,9 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_add_host_to_aggregate_invalid_error_status(self): """Ensure InvalidAggregateAction is raised when aggregate is in error.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', 'fake_zone') - _create_service_entries(self.context, {'fake_zone': ['fake_host']}) # let's mock the fact that the aggregate is in error! status = {'operational_state': aggregate_states.ERROR} db.aggregate_update(self.context, aggr['id'], status) @@ -3267,17 +3277,19 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_add_host_to_aggregate_zones_mismatch(self): """Ensure InvalidAggregateAction is raised when zones don't match.""" - _create_service_entries(self.context, {'fake_zoneX': ['fake_host']}) + _create_service_entries(self.context, {'fake_zoneX': ['fake_host1'], + 'fake_zoneY': ['fake_host2']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', 'fake_zoneY') self.assertRaises(exception.InvalidAggregateAction, self.api.add_host_to_aggregate, - self.context, aggr['id'], 'fake_host') + self.context, aggr['id'], 'fake_host1') def test_add_host_to_aggregate_raise_not_found(self): """Ensure ComputeHostNotFound is raised when adding invalid host.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', - 'fake_availability_zone') + 'fake_zone') self.assertRaises(exception.ComputeHostNotFound, self.api.add_host_to_aggregate, self.context, aggr['id'], 'invalid_host') @@ -3325,9 +3337,9 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_remove_host_from_aggregate_invalid_dismissed_status(self): """Ensure InvalidAggregateAction is raised when aggregate is deleted.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', 'fake_zone') - _create_service_entries(self.context, {'fake_zone': ['fake_host']}) # let's mock the fact that the aggregate is dismissed! status = {'operational_state': aggregate_states.DISMISSED} db.aggregate_update(self.context, aggr['id'], status) @@ -3338,9 +3350,9 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_remove_host_from_aggregate_invalid_changing_status(self): """Ensure InvalidAggregateAction is raised when aggregate is changing.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', 'fake_zone') - _create_service_entries(self.context, {'fake_zone': ['fake_host']}) # let's mock the fact that the aggregate is changing! status = {'operational_state': aggregate_states.CHANGING} db.aggregate_update(self.context, aggr['id'], status) @@ -3350,13 +3362,85 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_remove_host_from_aggregate_raise_not_found(self): """Ensure ComputeHostNotFound is raised when removing invalid host.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', - 'fake_availability_zone') + 'fake_zone') self.assertRaises(exception.ComputeHostNotFound, self.api.remove_host_from_aggregate, self.context, aggr['id'], 'invalid_host') +class ComputeAggrTestCase(BaseTestCase): + """This is for unit coverage of aggregate-related methods + defined in nova.compute.manager.""" + + def setUp(self): + super(ComputeAggrTestCase, self).setUp() + self.context = context.get_admin_context() + values = {'name': 'test_aggr', + 'availability_zone': 'test_zone', } + self.aggr = db.aggregate_create(self.context, values) + + def tearDown(self): + super(ComputeAggrTestCase, self).tearDown() + + def test_add_aggregate_host(self): + def fake_driver_add_to_aggregate(context, aggregate, host): + fake_driver_add_to_aggregate.called = True + return {"foo": "bar"} + self.stubs.Set(self.compute.driver, "add_to_aggregate", + fake_driver_add_to_aggregate) + + self.compute.add_aggregate_host(self.context, self.aggr.id, "host") + self.assertTrue(fake_driver_add_to_aggregate.called) + + def test_add_aggregate_host_raise_err(self): + """Ensure the undo operation works correctly on add.""" + def fake_driver_add_to_aggregate(context, aggregate, host): + raise exception.AggregateError + self.stubs.Set(self.compute.driver, "add_to_aggregate", + fake_driver_add_to_aggregate) + + state = {'operational_state': aggregate_states.ACTIVE} + db.aggregate_update(self.context, self.aggr.id, state) + db.aggregate_host_add(self.context, self.aggr.id, 'fake_host') + + self.assertRaises(exception.AggregateError, + self.compute.add_aggregate_host, + self.context, self.aggr.id, "fake_host") + excepted = db.aggregate_get(self.context, self.aggr.id) + self.assertEqual(excepted.operational_state, aggregate_states.ERROR) + self.assertEqual(excepted.hosts, []) + + def test_remove_aggregate_host(self): + def fake_driver_remove_from_aggregate(context, aggregate, host): + fake_driver_remove_from_aggregate.called = True + self.assertEqual("host", host, "host") + return {"foo": "bar"} + self.stubs.Set(self.compute.driver, "remove_from_aggregate", + fake_driver_remove_from_aggregate) + + self.compute.remove_aggregate_host(self.context, self.aggr.id, "host") + self.assertTrue(fake_driver_remove_from_aggregate.called) + + def test_remove_aggregate_host_raise_err(self): + """Ensure the undo operation works correctly on remove.""" + def fake_driver_remove_from_aggregate(context, aggregate, host): + raise exception.AggregateError + self.stubs.Set(self.compute.driver, "remove_from_aggregate", + fake_driver_remove_from_aggregate) + + state = {'operational_state': aggregate_states.ACTIVE} + db.aggregate_update(self.context, self.aggr.id, state) + + self.assertRaises(exception.AggregateError, + self.compute.remove_aggregate_host, + self.context, self.aggr.id, "fake_host") + excepted = db.aggregate_get(self.context, self.aggr.id) + self.assertEqual(excepted.operational_state, aggregate_states.ERROR) + self.assertEqual(excepted.hosts, ['fake_host']) + + class ComputePolicyTestCase(BaseTestCase): def setUp(self): diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py index 3353ea737..4cb17d958 100644 --- a/nova/tests/test_db_api.py +++ b/nova/tests/test_db_api.py @@ -267,6 +267,30 @@ class DbApiTestCase(test.TestCase): expected = {uuids[0]: [], uuids[1]: []} self.assertEqual(expected, instance_faults) + def test_dns_registration(self): + domain1 = 'test.domain.one' + domain2 = 'test.domain.two' + testzone = 'testzone' + ctxt = context.get_admin_context() + + db.dnsdomain_register_for_zone(ctxt, domain1, testzone) + domain_ref = db.dnsdomain_get(ctxt, domain1) + zone = domain_ref.availability_zone + scope = domain_ref.scope + self.assertEqual(scope, 'private') + self.assertEqual(zone, testzone) + + db.dnsdomain_register_for_project(ctxt, domain2, + self.project_id) + domain_ref = db.dnsdomain_get(ctxt, domain2) + project = domain_ref.project_id + scope = domain_ref.scope + self.assertEqual(project, self.project_id) + self.assertEqual(scope, 'public') + + db.dnsdomain_unregister(ctxt, domain1) + db.dnsdomain_unregister(ctxt, domain2) + def _get_fake_aggr_values(): return {'name': 'fake_aggregate', @@ -351,6 +375,14 @@ class AggregateDBApiTestCase(test.TestCase): db.aggregate_create, self.context, _get_fake_aggr_values()) + def test_aggregate_get(self): + """Ensure we can get aggregate with all its relations.""" + ctxt = context.get_admin_context() + result = _create_aggregate_with_hosts(context=ctxt) + expected = db.aggregate_get(ctxt, result.id) + self.assertEqual(_get_fake_aggr_hosts(), expected.hosts) + self.assertEqual(_get_fake_aggr_metadata(), expected.metadetails) + def test_aggregate_delete_raise_not_found(self): """Ensure AggregateNotFound is raised when deleting an aggregate.""" ctxt = context.get_admin_context() @@ -541,30 +573,6 @@ class AggregateDBApiTestCase(test.TestCase): db.aggregate_host_delete, ctxt, result.id, _get_fake_aggr_hosts()[0]) - def test_dns_registration(self): - domain1 = 'test.domain.one' - domain2 = 'test.domain.two' - testzone = 'testzone' - ctxt = context.get_admin_context() - - db.dnsdomain_register_for_zone(ctxt, domain1, testzone) - domain_ref = db.dnsdomain_get(ctxt, domain1) - zone = domain_ref.availability_zone - scope = domain_ref.scope - self.assertEqual(scope, 'private') - self.assertEqual(zone, testzone) - - db.dnsdomain_register_for_project(ctxt, domain2, - self.project_id) - domain_ref = db.dnsdomain_get(ctxt, domain2) - project = domain_ref.project_id - scope = domain_ref.scope - self.assertEqual(project, self.project_id) - self.assertEqual(scope, 'public') - - db.dnsdomain_unregister(ctxt, domain1) - db.dnsdomain_unregister(ctxt, domain2) - class CapacityTestCase(test.TestCase): def setUp(self): diff --git a/nova/tests/test_virt_drivers.py b/nova/tests/test_virt_drivers.py index 41c5d118e..70d54db9a 100644 --- a/nova/tests/test_virt_drivers.py +++ b/nova/tests/test_virt_drivers.py @@ -401,6 +401,14 @@ class _VirtDriverTestCase(test.TestCase): def test_host_power_action_startup(self): self.connection.host_power_action('a useless argument?', 'startup') + @catch_notimplementederror + def test_add_to_aggregate(self): + self.connection.add_to_aggregate(self.ctxt, 'aggregate', 'host') + + @catch_notimplementederror + def test_remove_from_aggregate(self): + self.connection.remove_from_aggregate(self.ctxt, 'aggregate', 'host') + class AbstractDriverTestCase(_VirtDriverTestCase): def setUp(self): diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index d72365e5c..b08bbd69c 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -31,6 +31,7 @@ from nova import flags from nova import log as logging from nova import test from nova import utils +from nova.compute import aggregate_states from nova.compute import instance_types from nova.compute import power_state from nova import exception @@ -1741,3 +1742,149 @@ class XenAPISRSelectionTestCase(test.TestCase): expected = helper.safe_find_sr(session) self.assertEqual(session.call_xenapi('pool.get_default_SR', pool_ref), expected) + + +class XenAPIAggregateTestCase(test.TestCase): + """Unit tests for aggregate operations.""" + def setUp(self): + super(XenAPIAggregateTestCase, self).setUp() + self.stubs = stubout.StubOutForTesting() + self.flags(xenapi_connection_url='http://test_url', + xenapi_connection_username='test_user', + xenapi_connection_password='test_pass', + instance_name_template='%d', + firewall_driver='nova.virt.xenapi.firewall.' + 'Dom0IptablesFirewallDriver', + host='host') + xenapi_fake.reset() + stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) + self.context = context.get_admin_context() + self.conn = xenapi_conn.get_connection(False) + self.fake_metadata = {'master_compute': 'host'} + + def tearDown(self): + super(XenAPIAggregateTestCase, self).tearDown() + self.stubs.UnsetAll() + + def test_add_to_aggregate_called(self): + def fake_add_to_aggregate(context, aggregate, host): + fake_add_to_aggregate.called = True + self.stubs.Set(self.conn._pool, + "add_to_aggregate", + fake_add_to_aggregate) + + self.conn.add_to_aggregate(None, None, None) + self.assertTrue(fake_add_to_aggregate.called) + + def test_add_to_aggregate_for_first_host_sets_metadata(self): + def fake_init_pool(id, name): + fake_init_pool.called = True + self.stubs.Set(self.conn._pool, "_init_pool", fake_init_pool) + + aggregate = self._aggregate_setup() + self.conn._pool.add_to_aggregate(self.context, aggregate, "host") + result = db.aggregate_get(self.context, aggregate.id) + self.assertTrue(fake_init_pool.called) + self.assertDictMatch(self.fake_metadata, result.metadetails) + self.assertEqual(aggregate_states.ACTIVE, result.operational_state) + + def test_join_slave(self): + """Ensure join_slave gets called when the request gets to master.""" + def fake_join_slave(id, compute_uuid, host, url, user, password): + fake_join_slave.called = True + self.stubs.Set(self.conn._pool, "_join_slave", fake_join_slave) + + aggregate = self._aggregate_setup(hosts=['host', 'host2'], + metadata=self.fake_metadata) + self.conn._pool.add_to_aggregate(self.context, aggregate, "host2", + compute_uuid='fake_uuid', + url='fake_url', + user='fake_user', + passwd='fake_pass', + xenhost_uuid='fake_uuid') + self.assertTrue(fake_join_slave.called) + + def test_add_to_aggregate_first_host(self): + def fake_pool_set_name_label(self, session, pool_ref, name): + fake_pool_set_name_label.called = True + self.stubs.Set(xenapi_fake.SessionBase, "pool_set_name_label", + fake_pool_set_name_label) + self.conn._session.call_xenapi("pool.create", {"name": "asdf"}) + + values = {"name": 'fake_aggregate', + "availability_zone": 'fake_zone'} + result = db.aggregate_create(self.context, values) + db.aggregate_host_add(self.context, result.id, "host") + aggregate = db.aggregate_get(self.context, result.id) + self.assertEqual(["host"], aggregate.hosts) + self.assertEqual({}, aggregate.metadetails) + + self.conn._pool.add_to_aggregate(self.context, aggregate, "host") + self.assertTrue(fake_pool_set_name_label.called) + + def test_remove_from_aggregate_called(self): + def fake_remove_from_aggregate(context, aggregate, host): + fake_remove_from_aggregate.called = True + self.stubs.Set(self.conn._pool, + "remove_from_aggregate", + fake_remove_from_aggregate) + + self.conn.remove_from_aggregate(None, None, None) + self.assertTrue(fake_remove_from_aggregate.called) + + def test_remove_from_empty_aggregate(self): + values = {"name": 'fake_aggregate', + "availability_zone": 'fake_zone'} + result = db.aggregate_create(self.context, values) + self.assertRaises(exception.AggregateError, + self.conn._pool.remove_from_aggregate, + None, result, "test_host") + + def test_remove_slave(self): + """Ensure eject slave gets called.""" + def fake_eject_slave(id, compute_uuid, host_uuid): + fake_eject_slave.called = True + self.stubs.Set(self.conn._pool, "_eject_slave", fake_eject_slave) + + self.fake_metadata['host2'] = 'fake_host2_uuid' + aggregate = self._aggregate_setup(hosts=['host', 'host2'], + metadata=self.fake_metadata) + self.conn._pool.remove_from_aggregate(self.context, aggregate, "host2") + self.assertTrue(fake_eject_slave.called) + + def test_remove_master_solo(self): + """Ensure metadata are cleared after removal.""" + def fake_clear_pool(id): + fake_clear_pool.called = True + self.stubs.Set(self.conn._pool, "_clear_pool", fake_clear_pool) + + aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE, + metadata=self.fake_metadata) + self.conn._pool.remove_from_aggregate(self.context, aggregate, "host") + result = db.aggregate_get(self.context, aggregate.id) + self.assertTrue(fake_clear_pool.called) + self.assertDictMatch({}, result.metadetails) + self.assertEqual(aggregate_states.ACTIVE, result.operational_state) + + def test_remote_master_non_empty_pool(self): + """Ensure AggregateError is raised if removing the master.""" + aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE, + hosts=['host', 'host2'], + metadata=self.fake_metadata) + self.assertRaises(exception.AggregateError, + self.conn._pool.remove_from_aggregate, + self.context, aggregate, "host") + + def _aggregate_setup(self, aggr_name='fake_aggregate', + aggr_zone='fake_zone', + aggr_state=aggregate_states.CREATED, + hosts=['host'], metadata=None): + values = {"name": aggr_name, + "availability_zone": aggr_zone, + "operational_state": aggr_state, } + result = db.aggregate_create(self.context, values) + for host in hosts: + db.aggregate_host_add(self.context, result.id, host) + if metadata: + db.aggregate_metadata_add(self.context, result.id, metadata) + return db.aggregate_get(self.context, result.id) diff --git a/nova/virt/driver.py b/nova/virt/driver.py index 20c41ca9f..92e0d53dd 100644 --- a/nova/virt/driver.py +++ b/nova/virt/driver.py @@ -654,6 +654,13 @@ class ComputeDriver(object): related to other calls into the driver. The prime example is to clean the cache and remove images which are no longer of interest. """ + + def add_to_aggregate(self, context, aggregate, host, **kwargs): + """Add a compute host to an aggregate.""" + raise NotImplementedError() + + def remove_from_aggregate(self, context, aggregate, host, **kwargs): + """Remove a compute host from an aggregate.""" raise NotImplementedError() def get_volume_connector(self, instance): diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py index d59e78f4f..73f2d863b 100644 --- a/nova/virt/xenapi/fake.py +++ b/nova/virt/xenapi/fake.py @@ -63,7 +63,7 @@ from nova import log as logging from nova import utils -_CLASSES = ['host', 'network', 'session', 'SR', 'VBD', 'pool', +_CLASSES = ['host', 'network', 'session', 'pool', 'SR', 'VBD', 'PBD', 'VDI', 'VIF', 'PIF', 'VM', 'VLAN', 'task'] _db_content = {} @@ -509,6 +509,15 @@ class SessionBase(object): def VM_clean_reboot(self, *args): return 'burp' + def pool_eject(self, session, host_ref): + pass + + def pool_join(self, session, hostname, username, password): + pass + + def pool_set_name_label(self, session, pool_ref, name): + pass + def network_get_all_records_where(self, _1, filter): return self.xenapi.network.get_all_records() diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py new file mode 100644 index 000000000..95f0f3467 --- /dev/null +++ b/nova/virt/xenapi/pool.py @@ -0,0 +1,214 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 Citrix Systems, Inc. +# Copyright 2010 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Management class for Pool-related functions (join, eject, etc). +""" + +import json +import urlparse + +from nova import db +from nova import exception +from nova import flags +from nova import log as logging +from nova import rpc +from nova.compute import aggregate_states +from nova.openstack.common import cfg +from nova.virt.xenapi import vm_utils + +LOG = logging.getLogger("nova.virt.xenapi.pool") + +xenapi_pool_opts = [ + cfg.BoolOpt('use_join_force', + default=True, + help='To use for hosts with different CPUs'), + ] + +FLAGS = flags.FLAGS +FLAGS.register_opts(xenapi_pool_opts) + + +class ResourcePool(object): + """ + Implements resource pool operations. + """ + def __init__(self, session): + self.XenAPI = session.get_imported_xenapi() + host_ref = session.get_xenapi_host() + host_rec = session.call_xenapi('host.get_record', host_ref) + self._host_name = host_rec['hostname'] + self._host_addr = host_rec['address'] + self._host_uuid = host_rec['uuid'] + self._session = session + + def add_to_aggregate(self, context, aggregate, host, **kwargs): + """Add a compute host to an aggregate.""" + if len(aggregate.hosts) == 1: + # this is the first host of the pool -> make it master + self._init_pool(aggregate.id, aggregate.name) + # save metadata so that we can find the master again: + # the password should be encrypted, really. + values = { + 'operational_state': aggregate_states.ACTIVE, + 'metadata': {'master_compute': host}, + } + db.aggregate_update(context, aggregate.id, values) + else: + # the pool is already up and running, we need to figure out + # whether we can serve the request from this host or not. + master_compute = aggregate.metadetails['master_compute'] + if master_compute == FLAGS.host and master_compute != host: + # this is the master -> do a pool-join + # To this aim, nova compute on the slave has to go down. + # NOTE: it is assumed that ONLY nova compute is running now + self._join_slave(aggregate.id, host, + kwargs.get('compute_uuid'), + kwargs.get('url'), kwargs.get('user'), + kwargs.get('passwd')) + metadata = {host: kwargs.get('xenhost_uuid'), } + db.aggregate_metadata_add(context, aggregate.id, metadata) + elif master_compute and master_compute != host: + # send rpc cast to master, asking to add the following + # host with specified credentials. + # NOTE: password in clear is not great, but it'll do for now + forward_request(context, "add_aggregate_host", master_compute, + aggregate.id, host, + self._host_addr, self._host_uuid) + + def remove_from_aggregate(self, context, aggregate, host, **kwargs): + """Remove a compute host from an aggregate.""" + master_compute = aggregate.metadetails.get('master_compute') + if master_compute == FLAGS.host and master_compute != host: + # this is the master -> instruct it to eject a host from the pool + host_uuid = db.aggregate_metadata_get(context, aggregate.id)[host] + self._eject_slave(aggregate.id, + kwargs.get('compute_uuid'), host_uuid) + db.aggregate_metadata_delete(context, aggregate.id, host) + elif master_compute == host: + # Remove master from its own pool -> destroy pool only if the + # master is on its own, otherwise raise fault. Destroying a + # pool made only by master is fictional + if len(aggregate.hosts) > 1: + raise exception.AggregateError( + aggregate_id=aggregate.id, + action='remove_from_aggregate', + reason=_('Unable to eject %(host)s ' + 'from the pool; pool not empty') + % locals()) + self._clear_pool(aggregate.id) + db.aggregate_metadata_delete(context, + aggregate.id, 'master_compute') + elif master_compute and master_compute != host: + # A master exists -> forward pool-eject request to master + forward_request(context, "remove_aggregate_host", master_compute, + aggregate.id, host, + self._host_addr, self._host_uuid) + else: + # this shouldn't have happened + raise exception.AggregateError(aggregate_id=aggregate.id, + action='remove_from_aggregate', + reason=_('Unable to eject %(host)s ' + 'from the pool; No master found') + % locals()) + + def _join_slave(self, aggregate_id, host, compute_uuid, url, user, passwd): + """Joins a slave into a XenServer resource pool.""" + try: + args = {'compute_uuid': compute_uuid, + 'url': url, + 'user': user, + 'password': passwd, + 'force': json.dumps(FLAGS.use_join_force), + 'master_addr': self._host_addr, + 'master_user': FLAGS.xenapi_connection_username, + 'master_pass': FLAGS.xenapi_connection_password, } + task = self._session.async_call_plugin('xenhost', + 'host_join', args) + self._session.wait_for_task(task) + except self.XenAPI.Failure as e: + LOG.error(_("Pool-Join failed: %(e)s") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='add_to_aggregate', + reason=_('Unable to join %(host)s ' + 'in the pool') % locals()) + + def _eject_slave(self, aggregate_id, compute_uuid, host_uuid): + """Eject a slave from a XenServer resource pool.""" + try: + # shutdown nova-compute; if there are other VMs running, e.g. + # guest instances, the eject will fail. That's a precaution + # to deal with the fact that the admin should evacuate the host + # first. The eject wipes out the host completely. + vm_ref = self._session.call_xenapi('VM.get_by_uuid', compute_uuid) + self._session.call_xenapi("VM.clean_shutdown", vm_ref) + + host_ref = self._session.call_xenapi('host.get_by_uuid', host_uuid) + self._session.call_xenapi("pool.eject", host_ref) + except self.XenAPI.Failure as e: + LOG.error(_("Pool-eject failed: %(e)s") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='remove_from_aggregate', + reason=str(e.details)) + + def _init_pool(self, aggregate_id, aggregate_name): + """Set the name label of a XenServer pool.""" + try: + pool_ref = self._session.call_xenapi("pool.get_all")[0] + self._session.call_xenapi("pool.set_name_label", + pool_ref, aggregate_name) + except self.XenAPI.Failure as e: + LOG.error(_("Unable to set up pool: %(e)s.") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='add_to_aggregate', + reason=str(e.details)) + + def _clear_pool(self, aggregate_id): + """Clear the name label of a XenServer pool.""" + try: + pool_ref = self._session.call_xenapi('pool.get_all')[0] + self._session.call_xenapi('pool.set_name_label', pool_ref, '') + except self.XenAPI.Failure as e: + LOG.error(_("Pool-set_name_label failed: %(e)s") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='remove_from_aggregate', + reason=str(e.details)) + + +def forward_request(context, request_type, master, aggregate_id, + slave_compute, slave_address, slave_uuid): + """Casts add/remove requests to the pool master.""" + # replace the address from the xenapi connection url + # because this might be 169.254.0.1, i.e. xenapi + sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address) + rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master), + {"method": request_type, + "args": {"aggregate_id": aggregate_id, + "host": slave_compute, + "url": sender_url, + "user": FLAGS.xenapi_connection_username, + "passwd": FLAGS.xenapi_connection_password, + "compute_uuid": vm_utils.get_this_vm_uuid(), + "xenhost_uuid": slave_uuid, }, + }) + + +def swap_xapi_host(url, host_addr): + """Replace the XenServer address present in 'url' with 'host_addr'.""" + temp_url = urlparse.urlparse(url) + _, sep, port = temp_url.netloc.partition(':') + return url.replace(temp_url.netloc, '%s%s%s' % (host_addr, sep, port)) diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index 08542268c..c9363bb74 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -78,6 +78,7 @@ from nova import flags from nova import log as logging from nova.openstack.common import cfg from nova.virt import driver +from nova.virt.xenapi import pool from nova.virt.xenapi import vm_utils from nova.virt.xenapi.vmops import VMOps from nova.virt.xenapi.volumeops import VolumeOps @@ -180,6 +181,7 @@ class XenAPIConnection(driver.ComputeDriver): self._product_version = self._session.get_product_version() self._vmops = VMOps(self._session, self._product_version) self._initiator = None + self._pool = pool.ResourcePool(self._session) @property def host_state(self): @@ -488,6 +490,15 @@ class XenAPIConnection(driver.ComputeDriver): """Sets the specified host's ability to accept new instances.""" return self._vmops.set_host_enabled(host, enabled) + def add_to_aggregate(self, context, aggregate, host, **kwargs): + """Add a compute host to an aggregate.""" + return self._pool.add_to_aggregate(context, aggregate, host, **kwargs) + + def remove_from_aggregate(self, context, aggregate, host, **kwargs): + """Remove a compute host from an aggregate.""" + return self._pool.remove_from_aggregate(context, + aggregate, host, **kwargs) + class XenAPISession(object): """The session to invoke XenAPI SDK calls""" @@ -498,9 +509,19 @@ class XenAPISession(object): exception = self.XenAPI.Failure(_("Unable to log in to XenAPI " "(is the Dom0 disk full?)")) for i in xrange(FLAGS.xenapi_connection_concurrent): - session = self._create_session(url) - with timeout.Timeout(FLAGS.xenapi_login_timeout, exception): - session.login_with_password(user, pw) + try: + session = self._create_session(url) + with timeout.Timeout(FLAGS.xenapi_login_timeout, exception): + session.login_with_password(user, pw) + except self.XenAPI.Failure, e: + # if user and pw of the master are different, we're doomed! + if e.details[0] == 'HOST_IS_SLAVE': + master = e.details[1] + session = self.XenAPI.Session(pool.swap_xapi_host(url, + master)) + session.login_with_password(user, pw) + else: + raise self._sessions.put(session) def get_product_version(self): diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost index 64938641f..9a7ad84af 100644 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost @@ -33,6 +33,7 @@ import subprocess import tempfile import time +import XenAPI import XenAPIPlugin import pluginlib_nova as pluginlib @@ -41,6 +42,8 @@ pluginlib.configure_logging("xenhost") host_data_pattern = re.compile(r"\s*(\S+) \([^\)]+\) *: ?(.*)") config_file_path = "/usr/etc/xenhost.conf" +DEFAULT_TRIES = 23 +DEFAULT_SLEEP = 10 def jsonify(fnc): @@ -97,6 +100,28 @@ def _run_command_with_input(cmd, process_input): return output +def _resume_compute(session, compute_ref, compute_uuid): + """Resume compute node on slave host after pool join. This has to + happen regardless of the success or failure of the join operation.""" + try: + # session is valid if the join operation has failed + session.xenapi.VM.start(compute_ref, False, True) + except XenAPI.Failure, e: + # if session is invalid, e.g. xapi has restarted, then the pool + # join has been successful, wait for xapi to become alive again + for c in xrange(0, DEFAULT_TRIES): + try: + _run_command("xe vm-start uuid=%s" % compute_uuid) + return + except pluginlib.PluginError, e: + logging.exception('Waited %d seconds for the slave to ' + 'become available.' % (c * DEFAULT_SLEEP)) + time.sleep(DEFAULT_SLEEP) + raise pluginlib.PluginError('Unrecoverable error: the host has ' + 'not come back for more than %d seconds' + % (DEFAULT_SLEEP * (DEFAULT_TRIES + 1))) + + def _get_host_uuid(): cmd = "xe host-list | grep uuid" resp = _run_command(cmd) @@ -257,6 +282,34 @@ def host_start(self, arg_dict): return _power_action("startup") +@jsonify +def host_join(self, arg_dict): + """Join a remote host into a pool whose master is the host + where the plugin is called from. The following constraints apply: + + - The host must have no VMs running, except nova-compute, which will be + shut down (and restarted upon pool-join) automatically, + - The host must have no shared storage currently set up, + - The host must have the same license of the master, + - The host must have the same supplemental packs as the master.""" + session = XenAPI.Session(arg_dict.get("url")) + session.login_with_password(arg_dict.get("user"), + arg_dict.get("password")) + compute_ref = session.xenapi.VM.get_by_uuid(arg_dict.get('compute_uuid')) + session.xenapi.VM.clean_shutdown(compute_ref) + try: + if arg_dict.get("force"): + session.xenapi.pool.join(arg_dict.get("master_addr"), + arg_dict.get("master_user"), + arg_dict.get("master_pass")) + else: + session.xenapi.pool.join_force(arg_dict.get("master_addr"), + arg_dict.get("master_user"), + arg_dict.get("master_pass")) + finally: + _resume_compute(session, compute_ref, arg_dict.get("compute_uuid")) + + @jsonify def host_data(self, arg_dict): """Runs the commands on the xenstore host to return the current status @@ -380,6 +433,7 @@ if __name__ == "__main__": "host_shutdown": host_shutdown, "host_reboot": host_reboot, "host_start": host_start, + "host_join": host_join, "get_config": get_config, "set_config": set_config, "iptables_config": iptables_config}) -- cgit