diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-02-22 03:22:30 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-02-22 03:22:30 +0000 |
| commit | c4ff7ef07c50deccf3cb7877ecab2245724f3091 (patch) | |
| tree | 0a2266b6c9d162251b213fbb2dfe96ad47ec040f | |
| parent | 11d82fc7d4d17b6edf435633501ddf7a44d6adf5 (diff) | |
| parent | 424f32f04d9c6c97f684782b35e1c25fbf83ce05 (diff) | |
Merge "blueprint host-aggregates: xenapi implementation"
| -rw-r--r-- | nova/api/openstack/compute/contrib/aggregates.py | 57 | ||||
| -rw-r--r-- | nova/compute/api.py | 19 | ||||
| -rw-r--r-- | nova/compute/manager.py | 36 | ||||
| -rw-r--r-- | nova/db/sqlalchemy/api.py | 9 | ||||
| -rw-r--r-- | nova/db/sqlalchemy/models.py | 103 | ||||
| -rw-r--r-- | nova/exception.py | 5 | ||||
| -rw-r--r-- | nova/tests/test_compute.py | 108 | ||||
| -rw-r--r-- | nova/tests/test_db_api.py | 56 | ||||
| -rw-r--r-- | nova/tests/test_virt_drivers.py | 8 | ||||
| -rw-r--r-- | nova/tests/test_xenapi.py | 147 | ||||
| -rw-r--r-- | nova/virt/driver.py | 7 | ||||
| -rw-r--r-- | nova/virt/xenapi/fake.py | 11 | ||||
| -rw-r--r-- | nova/virt/xenapi/pool.py | 214 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 27 | ||||
| -rw-r--r-- | plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost | 54 |
15 files changed, 749 insertions, 112 deletions
diff --git a/nova/api/openstack/compute/contrib/aggregates.py b/nova/api/openstack/compute/contrib/aggregates.py index 6206f42c4..ea90a57d7 100644 --- a/nova/api/openstack/compute/contrib/aggregates.py +++ b/nova/api/openstack/compute/contrib/aggregates.py @@ -63,17 +63,18 @@ class AggregateController(object): raise exc.HTTPBadRequest try: host_aggregate = body["aggregate"] - aggregate_name = host_aggregate["name"] - availability_zone = host_aggregate["availability_zone"] + name = host_aggregate["name"] + avail_zone = host_aggregate["availability_zone"] except KeyError: raise exc.HTTPBadRequest if len(host_aggregate) != 2: raise exc.HTTPBadRequest try: - aggregate = self.api.create_aggregate(context, aggregate_name, - availability_zone) + aggregate = self.api.create_aggregate(context, name, avail_zone) except exception.AggregateNameExists: + LOG.exception(_("Cannot create aggregate with name %(name)s and " + "availability zone %(avail_zone)s") % locals()) raise exc.HTTPConflict return self._marshall_aggregate(aggregate) @@ -84,6 +85,7 @@ class AggregateController(object): try: aggregate = self.api.get_aggregate(context, id) except exception.AggregateNotFound: + LOG.exception(_("Cannot show aggregate: %(id)s") % locals()) raise exc.HTTPNotFound return self._marshall_aggregate(aggregate) @@ -91,7 +93,6 @@ class AggregateController(object): """Updates the name and/or availbility_zone of given aggregate.""" context = _get_context(req) authorize(context) - aggregate = id if len(body) != 1: raise exc.HTTPBadRequest @@ -108,8 +109,9 @@ class AggregateController(object): raise exc.HTTPBadRequest try: - aggregate = self.api.update_aggregate(context, aggregate, updates) + aggregate = self.api.update_aggregate(context, id, updates) except exception.AggregateNotFound: + LOG.exception(_("Cannot update aggregate: %(id)s") % locals()) raise exc.HTTPNotFound return self._marshall_aggregate(aggregate) @@ -118,10 +120,10 @@ class AggregateController(object): """Removes an aggregate by id.""" context = _get_context(req) authorize(context) - aggregate_id = id try: - self.api.delete_aggregate(context, aggregate_id) + self.api.delete_aggregate(context, id) except exception.AggregateNotFound: + LOG.exception(_("Cannot delete aggregate: %(id)s") % locals()) raise exc.HTTPNotFound def action(self, req, id, body): @@ -144,19 +146,17 @@ class AggregateController(object): """Adds a host to the specified aggregate.""" context = _get_context(req) authorize(context) - aggregate = id try: - aggregate = self.api.add_host_to_aggregate(context, - aggregate, host) - except exception.AggregateNotFound: - raise exc.HTTPNotFound - except exception.ComputeHostNotFound: + aggregate = self.api.add_host_to_aggregate(context, id, host) + except (exception.AggregateNotFound, exception.ComputeHostNotFound): + LOG.exception(_("Cannot add host %(host)s in aggregate " + "%(id)s") % locals()) raise exc.HTTPNotFound - except exception.AggregateHostConflict: - raise exc.HTTPConflict - except exception.AggregateHostExists: - raise exc.HTTPConflict - except exception.InvalidAggregateAction: + except (exception.AggregateHostConflict, + exception.AggregateHostExists, + exception.InvalidAggregateAction): + LOG.exception(_("Cannot add host %(host)s in aggregate " + "%(id)s") % locals()) raise exc.HTTPConflict return self._marshall_aggregate(aggregate) @@ -165,15 +165,15 @@ class AggregateController(object): """Removes a host from the specified aggregate.""" context = _get_context(req) authorize(context) - aggregate = id try: - aggregate = self.api.remove_host_from_aggregate(context, - aggregate, host) - except exception.AggregateNotFound: - raise exc.HTTPNotFound - except exception.AggregateHostNotFound: + aggregate = self.api.remove_host_from_aggregate(context, id, host) + except (exception.AggregateNotFound, exception.AggregateHostNotFound): + LOG.exception(_("Cannot remove host %(host)s in aggregate " + "%(id)s") % locals()) raise exc.HTTPNotFound except exception.InvalidAggregateAction: + LOG.exception(_("Cannot remove host %(host)s in aggregate " + "%(id)s") % locals()) raise exc.HTTPConflict return self._marshall_aggregate(aggregate) @@ -181,20 +181,19 @@ class AggregateController(object): """Replaces the aggregate's existing metadata with new metadata.""" context = _get_context(req) authorize(context) - aggregate = id if len(body) != 1: raise exc.HTTPBadRequest - try: metadata = body["metadata"] except KeyError: raise exc.HTTPBadRequest - try: aggregate = self.api.update_aggregate_metadata(context, - aggregate, metadata) + id, metadata) except exception.AggregateNotFound: + LOG.exception(_("Cannot set metadata %(metadata)s in aggregate " + "%(id)s") % locals()) raise exc.HTTPNotFound return self._marshall_aggregate(aggregate) diff --git a/nova/compute/api.py b/nova/compute/api.py index edadf1978..b908a6587 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -1706,10 +1706,18 @@ class AggregateAPI(base.Base): def create_aggregate(self, context, aggregate_name, availability_zone): """Creates the model for the aggregate.""" - values = {"name": aggregate_name, - "availability_zone": availability_zone} - aggregate = self.db.aggregate_create(context, values) - return dict(aggregate.iteritems()) + zones = [s.availability_zone for s in + self.db.service_get_all_by_topic(context, + FLAGS.compute_topic)] + if availability_zone in zones: + values = {"name": aggregate_name, + "availability_zone": availability_zone} + aggregate = self.db.aggregate_create(context, values) + return dict(aggregate.iteritems()) + else: + raise exception.InvalidAggregateAction(action='create_aggregate', + aggregate_id="'N/A'", + reason='invalid zone') def get_aggregate(self, context, aggregate_id): """Get an aggregate by id.""" @@ -1805,7 +1813,8 @@ class AggregateAPI(base.Base): "host": host}, }) return self.get_aggregate(context, aggregate_id) else: - invalid = {aggregate_states.CHANGING: 'setup in progress', + invalid = {aggregate_states.CREATED: 'no hosts to remove', + aggregate_states.CHANGING: 'setup in progress', aggregate_states.DISMISSED: 'aggregate deleted', } if aggregate.operational_state in invalid.keys(): raise exception.InvalidAggregateAction( diff --git a/nova/compute/manager.py b/nova/compute/manager.py index f31a0d2e4..ac9bebafe 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -46,6 +46,7 @@ from eventlet import greenthread from nova import block_device import nova.context +from nova.compute import aggregate_states from nova.compute import instance_types from nova.compute import power_state from nova.compute import task_states @@ -2320,13 +2321,40 @@ class ComputeManager(manager.SchedulerDependentManager): LOG.error(msg % error) self._set_instance_error_state(context, instance_uuid) - def add_aggregate_host(self, context, aggregate_id, host): + @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) + def add_aggregate_host(self, context, aggregate_id, host, **kwargs): """Adds a host to a physical hypervisor pool.""" - raise NotImplementedError() + aggregate = self.db.aggregate_get(context, aggregate_id) + try: + self.driver.add_to_aggregate(context, aggregate, host, **kwargs) + except exception.AggregateError: + error = sys.exc_info() + self._undo_aggregate_operation(context, + self.db.aggregate_host_delete, + aggregate.id, host) + raise error[0], error[1], error[2] - def remove_aggregate_host(self, context, aggregate_id, host): + @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) + def remove_aggregate_host(self, context, aggregate_id, host, **kwargs): """Removes a host from a physical hypervisor pool.""" - raise NotImplementedError() + aggregate = self.db.aggregate_get(context, aggregate_id) + try: + self.driver.remove_from_aggregate(context, + aggregate, host, **kwargs) + except exception.AggregateError: + error = sys.exc_info() + self._undo_aggregate_operation(context, self.db.aggregate_host_add, + aggregate.id, host) + raise error[0], error[1], error[2] + + def _undo_aggregate_operation(self, context, op, aggregate_id, host): + try: + status = {'operational_state': aggregate_states.ERROR} + self.db.aggregate_update(context, aggregate_id, status) + op(context, aggregate_id, host) + except Exception: + LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state ' + 'during operation on %(host)s') % locals()) @manager.periodic_task( ticks_between_runs=FLAGS.image_cache_manager_interval) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 6fdba1642..026fe2f28 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -4273,8 +4273,8 @@ def _aggregate_get_query(context, model_class, id_field, id, def aggregate_create(context, values, metadata=None): try: aggregate = models.Aggregate() + values.setdefault('operational_state', aggregate_states.CREATED) aggregate.update(values) - aggregate.operational_state = aggregate_states.CREATED aggregate.save() except exception.DBError: raise exception.AggregateNameExists(aggregate_name=values['name']) @@ -4409,8 +4409,7 @@ def aggregate_metadata_add(context, aggregate_id, metadata, set_delete=False): meta_ref = aggregate_metadata_get_item(context, aggregate_id, meta_key, session) if meta_ref.deleted: - item.update({'deleted': False, 'deleted_at': None, - 'updated_at': literal_column('updated_at')}) + item.update({'deleted': False, 'deleted_at': None}) except exception.AggregateMetadataNotFound: meta_ref = models.AggregateMetadata() item.update({"key": meta_key, "aggregate_id": aggregate_id}) @@ -4469,9 +4468,7 @@ def aggregate_host_add(context, aggregate_id, host): except exception.DBError: raise exception.AggregateHostConflict(host=host) elif host_ref.deleted: - host_ref.update({'deleted': False, - 'deleted_at': None, - 'updated_at': literal_column('updated_at')}) + host_ref.update({'deleted': False, 'deleted_at': None}) host_ref.save(session=session) else: raise exception.AggregateHostExists(host=host, diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index c99d9489b..33438444f 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -885,26 +885,12 @@ class Zone(BASE, NovaBase): rpc_virtual_host = Column(String(255)) -class Aggregate(BASE, NovaBase): - """Represents a cluster of hosts that exists in this zone.""" - __tablename__ = 'aggregates' - id = Column(Integer, primary_key=True, autoincrement=True) - name = Column(String(255), unique=True) - operational_state = Column(String(255), nullable=False) - availability_zone = Column(String(255), nullable=False) - - class AggregateHost(BASE, NovaBase): """Represents a host that is member of an aggregate.""" __tablename__ = 'aggregate_hosts' id = Column(Integer, primary_key=True, autoincrement=True) host = Column(String(255), unique=True) aggregate_id = Column(Integer, ForeignKey('aggregates.id'), nullable=False) - aggregate = relationship(Aggregate, backref=backref('aggregates'), - foreign_keys=aggregate_id, - primaryjoin='and_(' - 'AggregateHost.aggregate_id == Aggregate.id,' - 'AggregateHost.deleted == False)') class AggregateMetadata(BASE, NovaBase): @@ -914,11 +900,46 @@ class AggregateMetadata(BASE, NovaBase): key = Column(String(255), nullable=False) value = Column(String(255), nullable=False) aggregate_id = Column(Integer, ForeignKey('aggregates.id'), nullable=False) - aggregate = relationship(Aggregate, backref="metadata", - foreign_keys=aggregate_id, - primaryjoin='and_(' - 'AggregateMetadata.aggregate_id == Aggregate.id,' - 'AggregateMetadata.deleted == False)') + + +class Aggregate(BASE, NovaBase): + """Represents a cluster of hosts that exists in this zone.""" + __tablename__ = 'aggregates' + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String(255), unique=True) + operational_state = Column(String(255), nullable=False) + availability_zone = Column(String(255), nullable=False) + _hosts = relationship(AggregateHost, + secondary="aggregate_hosts", + primaryjoin='and_(' + 'Aggregate.id == AggregateHost.aggregate_id,' + 'AggregateHost.deleted == False,' + 'Aggregate.deleted == False)', + secondaryjoin='and_(' + 'AggregateHost.aggregate_id == Aggregate.id, ' + 'AggregateHost.deleted == False,' + 'Aggregate.deleted == False)', + backref='aggregates') + + _metadata = relationship(AggregateMetadata, + secondary="aggregate_metadata", + primaryjoin='and_(' + 'Aggregate.id == AggregateMetadata.aggregate_id,' + 'AggregateMetadata.deleted == False,' + 'Aggregate.deleted == False)', + secondaryjoin='and_(' + 'AggregateMetadata.aggregate_id == Aggregate.id, ' + 'AggregateMetadata.deleted == False,' + 'Aggregate.deleted == False)', + backref='aggregates') + + @property + def hosts(self): + return [h.host for h in self._hosts] + + @property + def metadetails(self): + return dict([(m.key, m.value) for m in self._metadata]) class AgentBuild(BASE, NovaBase): @@ -997,15 +1018,41 @@ def register_models(): connection is lost and needs to be reestablished. """ from sqlalchemy import create_engine - models = (Service, Instance, InstanceActions, InstanceTypes, - Volume, IscsiTarget, FixedIp, FloatingIp, - Network, SecurityGroup, SecurityGroupIngressRule, - SecurityGroupInstanceAssociation, AuthToken, User, - Project, Certificate, ConsolePool, Console, Zone, - VolumeMetadata, VolumeTypes, VolumeTypeExtraSpecs, - AgentBuild, InstanceMetadata, InstanceTypeExtraSpecs, Migration, - VirtualStorageArray, SMFlavors, SMBackendConf, SMVolume, - InstanceFault) + models = (AgentBuild, + Aggregate, + AggregateHost, + AggregateMetadata, + AuthToken, + Certificate, + Console, + ConsolePool, + FixedIp, + FloatingIp, + Instance, + InstanceActions, + InstanceFault, + InstanceMetadata, + InstanceTypeExtraSpecs, + InstanceTypes, + IscsiTarget, + Migration, + Network, + Project, + SecurityGroup, + SecurityGroupIngressRule, + SecurityGroupInstanceAssociation, + Service, + SMBackendConf, + SMFlavors, + SMVolume, + User, + VirtualStorageArray, + Volume, + VolumeMetadata, + VolumeTypeExtraSpecs, + VolumeTypes, + Zone, + ) engine = create_engine(FLAGS.sql_connection, echo=False) for model in models: model.metadata.create_all(engine) diff --git a/nova/exception.py b/nova/exception.py index 518a95a62..91a377c1e 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -929,6 +929,11 @@ class QuotaError(NovaException): message = _("Quota exceeded") + ": code=%(code)s" +class AggregateError(NovaException): + message = _("Aggregate %(aggregate_id)s: action '%(action)s' " + "caused an error: %(reason)s.") + + class AggregateNotFound(NotFound): message = _("Aggregate %(aggregate_id)s could not be found.") diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index d6d54227f..5ee74df8e 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -3153,6 +3153,8 @@ def _create_service_entries(context, values={'avail_zone1': ['fake_host1', class ComputeAPIAggrTestCase(test.TestCase): + """This is for unit coverage of aggregate-related methods + defined in nova.compute.api.""" def setUp(self): super(ComputeAPIAggrTestCase, self).setUp() @@ -3164,9 +3166,16 @@ class ComputeAPIAggrTestCase(test.TestCase): def tearDown(self): super(ComputeAPIAggrTestCase, self).tearDown() + def test_create_invalid_availability_zone(self): + """Ensure InvalidAggregateAction is raised with wrong avail_zone.""" + self.assertRaises(exception.InvalidAggregateAction, + self.api.create_aggregate, + self.context, 'fake_aggr', 'fake_avail_zone') + def test_update_aggregate_metadata(self): + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', - 'fake_availability_zone') + 'fake_zone') metadata = {'foo_key1': 'foo_value1', 'foo_key2': 'foo_value2', } aggr = self.api.update_aggregate_metadata(self.context, aggr['id'], @@ -3178,8 +3187,9 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_delete_aggregate(self): """Ensure we can delete an aggregate.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', - 'fake_availability_zone') + 'fake_zone') self.api.delete_aggregate(self.context, aggr['id']) expected = db.aggregate_get(self.context, aggr['id'], read_deleted='yes') @@ -3188,10 +3198,10 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_delete_non_empty_aggregate(self): """Ensure InvalidAggregateAction is raised when non empty aggregate.""" - aggr = self.api.create_aggregate(self.context, 'fake_aggregate', - 'fake_availability_zone') _create_service_entries(self.context, {'fake_availability_zone': ['fake_host']}) + aggr = self.api.create_aggregate(self.context, 'fake_aggregate', + 'fake_availability_zone') self.api.add_host_to_aggregate(self.context, aggr['id'], 'fake_host') self.assertRaises(exception.InvalidAggregateAction, self.api.delete_aggregate, self.context, aggr['id']) @@ -3242,9 +3252,9 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_add_host_to_aggregate_invalid_dismissed_status(self): """Ensure InvalidAggregateAction is raised when aggregate is deleted.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', 'fake_zone') - _create_service_entries(self.context, {'fake_zone': ['fake_host']}) # let's mock the fact that the aggregate is dismissed! status = {'operational_state': aggregate_states.DISMISSED} db.aggregate_update(self.context, aggr['id'], status) @@ -3255,9 +3265,9 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_add_host_to_aggregate_invalid_error_status(self): """Ensure InvalidAggregateAction is raised when aggregate is in error.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', 'fake_zone') - _create_service_entries(self.context, {'fake_zone': ['fake_host']}) # let's mock the fact that the aggregate is in error! status = {'operational_state': aggregate_states.ERROR} db.aggregate_update(self.context, aggr['id'], status) @@ -3267,17 +3277,19 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_add_host_to_aggregate_zones_mismatch(self): """Ensure InvalidAggregateAction is raised when zones don't match.""" - _create_service_entries(self.context, {'fake_zoneX': ['fake_host']}) + _create_service_entries(self.context, {'fake_zoneX': ['fake_host1'], + 'fake_zoneY': ['fake_host2']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', 'fake_zoneY') self.assertRaises(exception.InvalidAggregateAction, self.api.add_host_to_aggregate, - self.context, aggr['id'], 'fake_host') + self.context, aggr['id'], 'fake_host1') def test_add_host_to_aggregate_raise_not_found(self): """Ensure ComputeHostNotFound is raised when adding invalid host.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', - 'fake_availability_zone') + 'fake_zone') self.assertRaises(exception.ComputeHostNotFound, self.api.add_host_to_aggregate, self.context, aggr['id'], 'invalid_host') @@ -3325,9 +3337,9 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_remove_host_from_aggregate_invalid_dismissed_status(self): """Ensure InvalidAggregateAction is raised when aggregate is deleted.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', 'fake_zone') - _create_service_entries(self.context, {'fake_zone': ['fake_host']}) # let's mock the fact that the aggregate is dismissed! status = {'operational_state': aggregate_states.DISMISSED} db.aggregate_update(self.context, aggr['id'], status) @@ -3338,9 +3350,9 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_remove_host_from_aggregate_invalid_changing_status(self): """Ensure InvalidAggregateAction is raised when aggregate is changing.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', 'fake_zone') - _create_service_entries(self.context, {'fake_zone': ['fake_host']}) # let's mock the fact that the aggregate is changing! status = {'operational_state': aggregate_states.CHANGING} db.aggregate_update(self.context, aggr['id'], status) @@ -3350,13 +3362,85 @@ class ComputeAPIAggrTestCase(test.TestCase): def test_remove_host_from_aggregate_raise_not_found(self): """Ensure ComputeHostNotFound is raised when removing invalid host.""" + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) aggr = self.api.create_aggregate(self.context, 'fake_aggregate', - 'fake_availability_zone') + 'fake_zone') self.assertRaises(exception.ComputeHostNotFound, self.api.remove_host_from_aggregate, self.context, aggr['id'], 'invalid_host') +class ComputeAggrTestCase(BaseTestCase): + """This is for unit coverage of aggregate-related methods + defined in nova.compute.manager.""" + + def setUp(self): + super(ComputeAggrTestCase, self).setUp() + self.context = context.get_admin_context() + values = {'name': 'test_aggr', + 'availability_zone': 'test_zone', } + self.aggr = db.aggregate_create(self.context, values) + + def tearDown(self): + super(ComputeAggrTestCase, self).tearDown() + + def test_add_aggregate_host(self): + def fake_driver_add_to_aggregate(context, aggregate, host): + fake_driver_add_to_aggregate.called = True + return {"foo": "bar"} + self.stubs.Set(self.compute.driver, "add_to_aggregate", + fake_driver_add_to_aggregate) + + self.compute.add_aggregate_host(self.context, self.aggr.id, "host") + self.assertTrue(fake_driver_add_to_aggregate.called) + + def test_add_aggregate_host_raise_err(self): + """Ensure the undo operation works correctly on add.""" + def fake_driver_add_to_aggregate(context, aggregate, host): + raise exception.AggregateError + self.stubs.Set(self.compute.driver, "add_to_aggregate", + fake_driver_add_to_aggregate) + + state = {'operational_state': aggregate_states.ACTIVE} + db.aggregate_update(self.context, self.aggr.id, state) + db.aggregate_host_add(self.context, self.aggr.id, 'fake_host') + + self.assertRaises(exception.AggregateError, + self.compute.add_aggregate_host, + self.context, self.aggr.id, "fake_host") + excepted = db.aggregate_get(self.context, self.aggr.id) + self.assertEqual(excepted.operational_state, aggregate_states.ERROR) + self.assertEqual(excepted.hosts, []) + + def test_remove_aggregate_host(self): + def fake_driver_remove_from_aggregate(context, aggregate, host): + fake_driver_remove_from_aggregate.called = True + self.assertEqual("host", host, "host") + return {"foo": "bar"} + self.stubs.Set(self.compute.driver, "remove_from_aggregate", + fake_driver_remove_from_aggregate) + + self.compute.remove_aggregate_host(self.context, self.aggr.id, "host") + self.assertTrue(fake_driver_remove_from_aggregate.called) + + def test_remove_aggregate_host_raise_err(self): + """Ensure the undo operation works correctly on remove.""" + def fake_driver_remove_from_aggregate(context, aggregate, host): + raise exception.AggregateError + self.stubs.Set(self.compute.driver, "remove_from_aggregate", + fake_driver_remove_from_aggregate) + + state = {'operational_state': aggregate_states.ACTIVE} + db.aggregate_update(self.context, self.aggr.id, state) + + self.assertRaises(exception.AggregateError, + self.compute.remove_aggregate_host, + self.context, self.aggr.id, "fake_host") + excepted = db.aggregate_get(self.context, self.aggr.id) + self.assertEqual(excepted.operational_state, aggregate_states.ERROR) + self.assertEqual(excepted.hosts, ['fake_host']) + + class ComputePolicyTestCase(BaseTestCase): def setUp(self): diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py index 3353ea737..4cb17d958 100644 --- a/nova/tests/test_db_api.py +++ b/nova/tests/test_db_api.py @@ -267,6 +267,30 @@ class DbApiTestCase(test.TestCase): expected = {uuids[0]: [], uuids[1]: []} self.assertEqual(expected, instance_faults) + def test_dns_registration(self): + domain1 = 'test.domain.one' + domain2 = 'test.domain.two' + testzone = 'testzone' + ctxt = context.get_admin_context() + + db.dnsdomain_register_for_zone(ctxt, domain1, testzone) + domain_ref = db.dnsdomain_get(ctxt, domain1) + zone = domain_ref.availability_zone + scope = domain_ref.scope + self.assertEqual(scope, 'private') + self.assertEqual(zone, testzone) + + db.dnsdomain_register_for_project(ctxt, domain2, + self.project_id) + domain_ref = db.dnsdomain_get(ctxt, domain2) + project = domain_ref.project_id + scope = domain_ref.scope + self.assertEqual(project, self.project_id) + self.assertEqual(scope, 'public') + + db.dnsdomain_unregister(ctxt, domain1) + db.dnsdomain_unregister(ctxt, domain2) + def _get_fake_aggr_values(): return {'name': 'fake_aggregate', @@ -351,6 +375,14 @@ class AggregateDBApiTestCase(test.TestCase): db.aggregate_create, self.context, _get_fake_aggr_values()) + def test_aggregate_get(self): + """Ensure we can get aggregate with all its relations.""" + ctxt = context.get_admin_context() + result = _create_aggregate_with_hosts(context=ctxt) + expected = db.aggregate_get(ctxt, result.id) + self.assertEqual(_get_fake_aggr_hosts(), expected.hosts) + self.assertEqual(_get_fake_aggr_metadata(), expected.metadetails) + def test_aggregate_delete_raise_not_found(self): """Ensure AggregateNotFound is raised when deleting an aggregate.""" ctxt = context.get_admin_context() @@ -541,30 +573,6 @@ class AggregateDBApiTestCase(test.TestCase): db.aggregate_host_delete, ctxt, result.id, _get_fake_aggr_hosts()[0]) - def test_dns_registration(self): - domain1 = 'test.domain.one' - domain2 = 'test.domain.two' - testzone = 'testzone' - ctxt = context.get_admin_context() - - db.dnsdomain_register_for_zone(ctxt, domain1, testzone) - domain_ref = db.dnsdomain_get(ctxt, domain1) - zone = domain_ref.availability_zone - scope = domain_ref.scope - self.assertEqual(scope, 'private') - self.assertEqual(zone, testzone) - - db.dnsdomain_register_for_project(ctxt, domain2, - self.project_id) - domain_ref = db.dnsdomain_get(ctxt, domain2) - project = domain_ref.project_id - scope = domain_ref.scope - self.assertEqual(project, self.project_id) - self.assertEqual(scope, 'public') - - db.dnsdomain_unregister(ctxt, domain1) - db.dnsdomain_unregister(ctxt, domain2) - class CapacityTestCase(test.TestCase): def setUp(self): diff --git a/nova/tests/test_virt_drivers.py b/nova/tests/test_virt_drivers.py index 41c5d118e..70d54db9a 100644 --- a/nova/tests/test_virt_drivers.py +++ b/nova/tests/test_virt_drivers.py @@ -401,6 +401,14 @@ class _VirtDriverTestCase(test.TestCase): def test_host_power_action_startup(self): self.connection.host_power_action('a useless argument?', 'startup') + @catch_notimplementederror + def test_add_to_aggregate(self): + self.connection.add_to_aggregate(self.ctxt, 'aggregate', 'host') + + @catch_notimplementederror + def test_remove_from_aggregate(self): + self.connection.remove_from_aggregate(self.ctxt, 'aggregate', 'host') + class AbstractDriverTestCase(_VirtDriverTestCase): def setUp(self): diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index d72365e5c..b08bbd69c 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -31,6 +31,7 @@ from nova import flags from nova import log as logging from nova import test from nova import utils +from nova.compute import aggregate_states from nova.compute import instance_types from nova.compute import power_state from nova import exception @@ -1741,3 +1742,149 @@ class XenAPISRSelectionTestCase(test.TestCase): expected = helper.safe_find_sr(session) self.assertEqual(session.call_xenapi('pool.get_default_SR', pool_ref), expected) + + +class XenAPIAggregateTestCase(test.TestCase): + """Unit tests for aggregate operations.""" + def setUp(self): + super(XenAPIAggregateTestCase, self).setUp() + self.stubs = stubout.StubOutForTesting() + self.flags(xenapi_connection_url='http://test_url', + xenapi_connection_username='test_user', + xenapi_connection_password='test_pass', + instance_name_template='%d', + firewall_driver='nova.virt.xenapi.firewall.' + 'Dom0IptablesFirewallDriver', + host='host') + xenapi_fake.reset() + stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) + self.context = context.get_admin_context() + self.conn = xenapi_conn.get_connection(False) + self.fake_metadata = {'master_compute': 'host'} + + def tearDown(self): + super(XenAPIAggregateTestCase, self).tearDown() + self.stubs.UnsetAll() + + def test_add_to_aggregate_called(self): + def fake_add_to_aggregate(context, aggregate, host): + fake_add_to_aggregate.called = True + self.stubs.Set(self.conn._pool, + "add_to_aggregate", + fake_add_to_aggregate) + + self.conn.add_to_aggregate(None, None, None) + self.assertTrue(fake_add_to_aggregate.called) + + def test_add_to_aggregate_for_first_host_sets_metadata(self): + def fake_init_pool(id, name): + fake_init_pool.called = True + self.stubs.Set(self.conn._pool, "_init_pool", fake_init_pool) + + aggregate = self._aggregate_setup() + self.conn._pool.add_to_aggregate(self.context, aggregate, "host") + result = db.aggregate_get(self.context, aggregate.id) + self.assertTrue(fake_init_pool.called) + self.assertDictMatch(self.fake_metadata, result.metadetails) + self.assertEqual(aggregate_states.ACTIVE, result.operational_state) + + def test_join_slave(self): + """Ensure join_slave gets called when the request gets to master.""" + def fake_join_slave(id, compute_uuid, host, url, user, password): + fake_join_slave.called = True + self.stubs.Set(self.conn._pool, "_join_slave", fake_join_slave) + + aggregate = self._aggregate_setup(hosts=['host', 'host2'], + metadata=self.fake_metadata) + self.conn._pool.add_to_aggregate(self.context, aggregate, "host2", + compute_uuid='fake_uuid', + url='fake_url', + user='fake_user', + passwd='fake_pass', + xenhost_uuid='fake_uuid') + self.assertTrue(fake_join_slave.called) + + def test_add_to_aggregate_first_host(self): + def fake_pool_set_name_label(self, session, pool_ref, name): + fake_pool_set_name_label.called = True + self.stubs.Set(xenapi_fake.SessionBase, "pool_set_name_label", + fake_pool_set_name_label) + self.conn._session.call_xenapi("pool.create", {"name": "asdf"}) + + values = {"name": 'fake_aggregate', + "availability_zone": 'fake_zone'} + result = db.aggregate_create(self.context, values) + db.aggregate_host_add(self.context, result.id, "host") + aggregate = db.aggregate_get(self.context, result.id) + self.assertEqual(["host"], aggregate.hosts) + self.assertEqual({}, aggregate.metadetails) + + self.conn._pool.add_to_aggregate(self.context, aggregate, "host") + self.assertTrue(fake_pool_set_name_label.called) + + def test_remove_from_aggregate_called(self): + def fake_remove_from_aggregate(context, aggregate, host): + fake_remove_from_aggregate.called = True + self.stubs.Set(self.conn._pool, + "remove_from_aggregate", + fake_remove_from_aggregate) + + self.conn.remove_from_aggregate(None, None, None) + self.assertTrue(fake_remove_from_aggregate.called) + + def test_remove_from_empty_aggregate(self): + values = {"name": 'fake_aggregate', + "availability_zone": 'fake_zone'} + result = db.aggregate_create(self.context, values) + self.assertRaises(exception.AggregateError, + self.conn._pool.remove_from_aggregate, + None, result, "test_host") + + def test_remove_slave(self): + """Ensure eject slave gets called.""" + def fake_eject_slave(id, compute_uuid, host_uuid): + fake_eject_slave.called = True + self.stubs.Set(self.conn._pool, "_eject_slave", fake_eject_slave) + + self.fake_metadata['host2'] = 'fake_host2_uuid' + aggregate = self._aggregate_setup(hosts=['host', 'host2'], + metadata=self.fake_metadata) + self.conn._pool.remove_from_aggregate(self.context, aggregate, "host2") + self.assertTrue(fake_eject_slave.called) + + def test_remove_master_solo(self): + """Ensure metadata are cleared after removal.""" + def fake_clear_pool(id): + fake_clear_pool.called = True + self.stubs.Set(self.conn._pool, "_clear_pool", fake_clear_pool) + + aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE, + metadata=self.fake_metadata) + self.conn._pool.remove_from_aggregate(self.context, aggregate, "host") + result = db.aggregate_get(self.context, aggregate.id) + self.assertTrue(fake_clear_pool.called) + self.assertDictMatch({}, result.metadetails) + self.assertEqual(aggregate_states.ACTIVE, result.operational_state) + + def test_remote_master_non_empty_pool(self): + """Ensure AggregateError is raised if removing the master.""" + aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE, + hosts=['host', 'host2'], + metadata=self.fake_metadata) + self.assertRaises(exception.AggregateError, + self.conn._pool.remove_from_aggregate, + self.context, aggregate, "host") + + def _aggregate_setup(self, aggr_name='fake_aggregate', + aggr_zone='fake_zone', + aggr_state=aggregate_states.CREATED, + hosts=['host'], metadata=None): + values = {"name": aggr_name, + "availability_zone": aggr_zone, + "operational_state": aggr_state, } + result = db.aggregate_create(self.context, values) + for host in hosts: + db.aggregate_host_add(self.context, result.id, host) + if metadata: + db.aggregate_metadata_add(self.context, result.id, metadata) + return db.aggregate_get(self.context, result.id) diff --git a/nova/virt/driver.py b/nova/virt/driver.py index 20c41ca9f..92e0d53dd 100644 --- a/nova/virt/driver.py +++ b/nova/virt/driver.py @@ -654,6 +654,13 @@ class ComputeDriver(object): related to other calls into the driver. The prime example is to clean the cache and remove images which are no longer of interest. """ + + def add_to_aggregate(self, context, aggregate, host, **kwargs): + """Add a compute host to an aggregate.""" + raise NotImplementedError() + + def remove_from_aggregate(self, context, aggregate, host, **kwargs): + """Remove a compute host from an aggregate.""" raise NotImplementedError() def get_volume_connector(self, instance): diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py index d59e78f4f..73f2d863b 100644 --- a/nova/virt/xenapi/fake.py +++ b/nova/virt/xenapi/fake.py @@ -63,7 +63,7 @@ from nova import log as logging from nova import utils -_CLASSES = ['host', 'network', 'session', 'SR', 'VBD', 'pool', +_CLASSES = ['host', 'network', 'session', 'pool', 'SR', 'VBD', 'PBD', 'VDI', 'VIF', 'PIF', 'VM', 'VLAN', 'task'] _db_content = {} @@ -509,6 +509,15 @@ class SessionBase(object): def VM_clean_reboot(self, *args): return 'burp' + def pool_eject(self, session, host_ref): + pass + + def pool_join(self, session, hostname, username, password): + pass + + def pool_set_name_label(self, session, pool_ref, name): + pass + def network_get_all_records_where(self, _1, filter): return self.xenapi.network.get_all_records() diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py new file mode 100644 index 000000000..95f0f3467 --- /dev/null +++ b/nova/virt/xenapi/pool.py @@ -0,0 +1,214 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2012 Citrix Systems, Inc. +# Copyright 2010 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Management class for Pool-related functions (join, eject, etc). +""" + +import json +import urlparse + +from nova import db +from nova import exception +from nova import flags +from nova import log as logging +from nova import rpc +from nova.compute import aggregate_states +from nova.openstack.common import cfg +from nova.virt.xenapi import vm_utils + +LOG = logging.getLogger("nova.virt.xenapi.pool") + +xenapi_pool_opts = [ + cfg.BoolOpt('use_join_force', + default=True, + help='To use for hosts with different CPUs'), + ] + +FLAGS = flags.FLAGS +FLAGS.register_opts(xenapi_pool_opts) + + +class ResourcePool(object): + """ + Implements resource pool operations. + """ + def __init__(self, session): + self.XenAPI = session.get_imported_xenapi() + host_ref = session.get_xenapi_host() + host_rec = session.call_xenapi('host.get_record', host_ref) + self._host_name = host_rec['hostname'] + self._host_addr = host_rec['address'] + self._host_uuid = host_rec['uuid'] + self._session = session + + def add_to_aggregate(self, context, aggregate, host, **kwargs): + """Add a compute host to an aggregate.""" + if len(aggregate.hosts) == 1: + # this is the first host of the pool -> make it master + self._init_pool(aggregate.id, aggregate.name) + # save metadata so that we can find the master again: + # the password should be encrypted, really. + values = { + 'operational_state': aggregate_states.ACTIVE, + 'metadata': {'master_compute': host}, + } + db.aggregate_update(context, aggregate.id, values) + else: + # the pool is already up and running, we need to figure out + # whether we can serve the request from this host or not. + master_compute = aggregate.metadetails['master_compute'] + if master_compute == FLAGS.host and master_compute != host: + # this is the master -> do a pool-join + # To this aim, nova compute on the slave has to go down. + # NOTE: it is assumed that ONLY nova compute is running now + self._join_slave(aggregate.id, host, + kwargs.get('compute_uuid'), + kwargs.get('url'), kwargs.get('user'), + kwargs.get('passwd')) + metadata = {host: kwargs.get('xenhost_uuid'), } + db.aggregate_metadata_add(context, aggregate.id, metadata) + elif master_compute and master_compute != host: + # send rpc cast to master, asking to add the following + # host with specified credentials. + # NOTE: password in clear is not great, but it'll do for now + forward_request(context, "add_aggregate_host", master_compute, + aggregate.id, host, + self._host_addr, self._host_uuid) + + def remove_from_aggregate(self, context, aggregate, host, **kwargs): + """Remove a compute host from an aggregate.""" + master_compute = aggregate.metadetails.get('master_compute') + if master_compute == FLAGS.host and master_compute != host: + # this is the master -> instruct it to eject a host from the pool + host_uuid = db.aggregate_metadata_get(context, aggregate.id)[host] + self._eject_slave(aggregate.id, + kwargs.get('compute_uuid'), host_uuid) + db.aggregate_metadata_delete(context, aggregate.id, host) + elif master_compute == host: + # Remove master from its own pool -> destroy pool only if the + # master is on its own, otherwise raise fault. Destroying a + # pool made only by master is fictional + if len(aggregate.hosts) > 1: + raise exception.AggregateError( + aggregate_id=aggregate.id, + action='remove_from_aggregate', + reason=_('Unable to eject %(host)s ' + 'from the pool; pool not empty') + % locals()) + self._clear_pool(aggregate.id) + db.aggregate_metadata_delete(context, + aggregate.id, 'master_compute') + elif master_compute and master_compute != host: + # A master exists -> forward pool-eject request to master + forward_request(context, "remove_aggregate_host", master_compute, + aggregate.id, host, + self._host_addr, self._host_uuid) + else: + # this shouldn't have happened + raise exception.AggregateError(aggregate_id=aggregate.id, + action='remove_from_aggregate', + reason=_('Unable to eject %(host)s ' + 'from the pool; No master found') + % locals()) + + def _join_slave(self, aggregate_id, host, compute_uuid, url, user, passwd): + """Joins a slave into a XenServer resource pool.""" + try: + args = {'compute_uuid': compute_uuid, + 'url': url, + 'user': user, + 'password': passwd, + 'force': json.dumps(FLAGS.use_join_force), + 'master_addr': self._host_addr, + 'master_user': FLAGS.xenapi_connection_username, + 'master_pass': FLAGS.xenapi_connection_password, } + task = self._session.async_call_plugin('xenhost', + 'host_join', args) + self._session.wait_for_task(task) + except self.XenAPI.Failure as e: + LOG.error(_("Pool-Join failed: %(e)s") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='add_to_aggregate', + reason=_('Unable to join %(host)s ' + 'in the pool') % locals()) + + def _eject_slave(self, aggregate_id, compute_uuid, host_uuid): + """Eject a slave from a XenServer resource pool.""" + try: + # shutdown nova-compute; if there are other VMs running, e.g. + # guest instances, the eject will fail. That's a precaution + # to deal with the fact that the admin should evacuate the host + # first. The eject wipes out the host completely. + vm_ref = self._session.call_xenapi('VM.get_by_uuid', compute_uuid) + self._session.call_xenapi("VM.clean_shutdown", vm_ref) + + host_ref = self._session.call_xenapi('host.get_by_uuid', host_uuid) + self._session.call_xenapi("pool.eject", host_ref) + except self.XenAPI.Failure as e: + LOG.error(_("Pool-eject failed: %(e)s") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='remove_from_aggregate', + reason=str(e.details)) + + def _init_pool(self, aggregate_id, aggregate_name): + """Set the name label of a XenServer pool.""" + try: + pool_ref = self._session.call_xenapi("pool.get_all")[0] + self._session.call_xenapi("pool.set_name_label", + pool_ref, aggregate_name) + except self.XenAPI.Failure as e: + LOG.error(_("Unable to set up pool: %(e)s.") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='add_to_aggregate', + reason=str(e.details)) + + def _clear_pool(self, aggregate_id): + """Clear the name label of a XenServer pool.""" + try: + pool_ref = self._session.call_xenapi('pool.get_all')[0] + self._session.call_xenapi('pool.set_name_label', pool_ref, '') + except self.XenAPI.Failure as e: + LOG.error(_("Pool-set_name_label failed: %(e)s") % locals()) + raise exception.AggregateError(aggregate_id=aggregate_id, + action='remove_from_aggregate', + reason=str(e.details)) + + +def forward_request(context, request_type, master, aggregate_id, + slave_compute, slave_address, slave_uuid): + """Casts add/remove requests to the pool master.""" + # replace the address from the xenapi connection url + # because this might be 169.254.0.1, i.e. xenapi + sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address) + rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master), + {"method": request_type, + "args": {"aggregate_id": aggregate_id, + "host": slave_compute, + "url": sender_url, + "user": FLAGS.xenapi_connection_username, + "passwd": FLAGS.xenapi_connection_password, + "compute_uuid": vm_utils.get_this_vm_uuid(), + "xenhost_uuid": slave_uuid, }, + }) + + +def swap_xapi_host(url, host_addr): + """Replace the XenServer address present in 'url' with 'host_addr'.""" + temp_url = urlparse.urlparse(url) + _, sep, port = temp_url.netloc.partition(':') + return url.replace(temp_url.netloc, '%s%s%s' % (host_addr, sep, port)) diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index 08542268c..c9363bb74 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -78,6 +78,7 @@ from nova import flags from nova import log as logging from nova.openstack.common import cfg from nova.virt import driver +from nova.virt.xenapi import pool from nova.virt.xenapi import vm_utils from nova.virt.xenapi.vmops import VMOps from nova.virt.xenapi.volumeops import VolumeOps @@ -180,6 +181,7 @@ class XenAPIConnection(driver.ComputeDriver): self._product_version = self._session.get_product_version() self._vmops = VMOps(self._session, self._product_version) self._initiator = None + self._pool = pool.ResourcePool(self._session) @property def host_state(self): @@ -488,6 +490,15 @@ class XenAPIConnection(driver.ComputeDriver): """Sets the specified host's ability to accept new instances.""" return self._vmops.set_host_enabled(host, enabled) + def add_to_aggregate(self, context, aggregate, host, **kwargs): + """Add a compute host to an aggregate.""" + return self._pool.add_to_aggregate(context, aggregate, host, **kwargs) + + def remove_from_aggregate(self, context, aggregate, host, **kwargs): + """Remove a compute host from an aggregate.""" + return self._pool.remove_from_aggregate(context, + aggregate, host, **kwargs) + class XenAPISession(object): """The session to invoke XenAPI SDK calls""" @@ -498,9 +509,19 @@ class XenAPISession(object): exception = self.XenAPI.Failure(_("Unable to log in to XenAPI " "(is the Dom0 disk full?)")) for i in xrange(FLAGS.xenapi_connection_concurrent): - session = self._create_session(url) - with timeout.Timeout(FLAGS.xenapi_login_timeout, exception): - session.login_with_password(user, pw) + try: + session = self._create_session(url) + with timeout.Timeout(FLAGS.xenapi_login_timeout, exception): + session.login_with_password(user, pw) + except self.XenAPI.Failure, e: + # if user and pw of the master are different, we're doomed! + if e.details[0] == 'HOST_IS_SLAVE': + master = e.details[1] + session = self.XenAPI.Session(pool.swap_xapi_host(url, + master)) + session.login_with_password(user, pw) + else: + raise self._sessions.put(session) def get_product_version(self): diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost index 64938641f..9a7ad84af 100644 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost @@ -33,6 +33,7 @@ import subprocess import tempfile import time +import XenAPI import XenAPIPlugin import pluginlib_nova as pluginlib @@ -41,6 +42,8 @@ pluginlib.configure_logging("xenhost") host_data_pattern = re.compile(r"\s*(\S+) \([^\)]+\) *: ?(.*)") config_file_path = "/usr/etc/xenhost.conf" +DEFAULT_TRIES = 23 +DEFAULT_SLEEP = 10 def jsonify(fnc): @@ -97,6 +100,28 @@ def _run_command_with_input(cmd, process_input): return output +def _resume_compute(session, compute_ref, compute_uuid): + """Resume compute node on slave host after pool join. This has to + happen regardless of the success or failure of the join operation.""" + try: + # session is valid if the join operation has failed + session.xenapi.VM.start(compute_ref, False, True) + except XenAPI.Failure, e: + # if session is invalid, e.g. xapi has restarted, then the pool + # join has been successful, wait for xapi to become alive again + for c in xrange(0, DEFAULT_TRIES): + try: + _run_command("xe vm-start uuid=%s" % compute_uuid) + return + except pluginlib.PluginError, e: + logging.exception('Waited %d seconds for the slave to ' + 'become available.' % (c * DEFAULT_SLEEP)) + time.sleep(DEFAULT_SLEEP) + raise pluginlib.PluginError('Unrecoverable error: the host has ' + 'not come back for more than %d seconds' + % (DEFAULT_SLEEP * (DEFAULT_TRIES + 1))) + + def _get_host_uuid(): cmd = "xe host-list | grep uuid" resp = _run_command(cmd) @@ -258,6 +283,34 @@ def host_start(self, arg_dict): @jsonify +def host_join(self, arg_dict): + """Join a remote host into a pool whose master is the host + where the plugin is called from. The following constraints apply: + + - The host must have no VMs running, except nova-compute, which will be + shut down (and restarted upon pool-join) automatically, + - The host must have no shared storage currently set up, + - The host must have the same license of the master, + - The host must have the same supplemental packs as the master.""" + session = XenAPI.Session(arg_dict.get("url")) + session.login_with_password(arg_dict.get("user"), + arg_dict.get("password")) + compute_ref = session.xenapi.VM.get_by_uuid(arg_dict.get('compute_uuid')) + session.xenapi.VM.clean_shutdown(compute_ref) + try: + if arg_dict.get("force"): + session.xenapi.pool.join(arg_dict.get("master_addr"), + arg_dict.get("master_user"), + arg_dict.get("master_pass")) + else: + session.xenapi.pool.join_force(arg_dict.get("master_addr"), + arg_dict.get("master_user"), + arg_dict.get("master_pass")) + finally: + _resume_compute(session, compute_ref, arg_dict.get("compute_uuid")) + + +@jsonify def host_data(self, arg_dict): """Runs the commands on the xenstore host to return the current status information. @@ -380,6 +433,7 @@ if __name__ == "__main__": "host_shutdown": host_shutdown, "host_reboot": host_reboot, "host_start": host_start, + "host_join": host_join, "get_config": get_config, "set_config": set_config, "iptables_config": iptables_config}) |
