diff options
-rw-r--r-- | nova/compute/__init__.py | 1 | ||||
-rw-r--r-- | nova/compute/aggregate_states.py | 26 | ||||
-rw-r--r-- | nova/compute/api.py | 126 | ||||
-rw-r--r-- | nova/compute/manager.py | 8 | ||||
-rw-r--r-- | nova/db/sqlalchemy/api.py | 13 | ||||
-rw-r--r-- | nova/exception.py | 5 | ||||
-rw-r--r-- | nova/tests/test_compute.py | 211 | ||||
-rw-r--r-- | nova/tests/test_db_api.py | 13 |
8 files changed, 393 insertions, 10 deletions
diff --git a/nova/compute/__init__.py b/nova/compute/__init__.py index b94f971d1..c0cfe7c27 100644 --- a/nova/compute/__init__.py +++ b/nova/compute/__init__.py @@ -17,3 +17,4 @@ # under the License. from nova.compute.api import API +from nova.compute.api import AggregateAPI diff --git a/nova/compute/aggregate_states.py b/nova/compute/aggregate_states.py index bad7720e3..92e194027 100644 --- a/nova/compute/aggregate_states.py +++ b/nova/compute/aggregate_states.py @@ -17,12 +17,28 @@ """Possible states for host aggregates. -An aggregate may be 'building', in which case the admin has triggered its -creation, but the underlying hypervisor pool has not actually being created -yet. An aggregate may be 'active', in which case the underlying hypervisor -pool is up and running. An aggregate may be in 'error' in all other cases. +An aggregate may be 'created', in which case the admin has triggered its +creation, but the underlying hypervisor pool has not actually being set up +yet. An aggregate may be 'changing', meaning that the underlying hypervisor +pool is being setup. An aggregate may be 'active', in which case the underlying +hypervisor pool is up and running. An aggregate may be 'dismissed' when it has +no hosts and it has been deleted. An aggregate may be in 'error' in all other +cases. +A 'created' aggregate becomes 'changing' during the first request of +adding a host. During a 'changing' status no other requests will be accepted; +this is to allow the hypervisor layer to instantiate the underlying pool +without any potential race condition that may incur in master/slave-based +configurations. The aggregate goes into the 'active' state when the underlying +pool has been correctly instantiated. +All other operations (e.g. add/remove hosts) that succeed will keep the +aggregate in the 'active' state. If a number of continuous requests fail, +an 'active' aggregate goes into an 'error' state. To recover from such a state, +admin intervention is required. Currently an error state is irreversible, +that is, in order to recover from it an aggregate must be deleted. """ -BUILDING = 'building' +CREATED = 'created' +CHANGING = 'changing' ACTIVE = 'active' ERROR = 'error' +DISMISSED = 'dismissed' diff --git a/nova/compute/api.py b/nova/compute/api.py index 0e874358b..fcc1f1652 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -17,7 +17,8 @@ # License for the specific language governing permissions and limitations # under the License. -"""Handles all requests relating to instances (guest vms).""" +"""Handles all requests relating to compute resources (e.g. guest vms, +networking and storage of vms, and compute hosts on which they run).""" import functools import re @@ -27,6 +28,7 @@ import novaclient import webob.exc from nova import block_device +from nova.compute import aggregate_states from nova.compute import instance_types from nova.compute import power_state from nova.compute import task_states @@ -1740,3 +1742,125 @@ class API(base.Base): uuids = [instance['uuid'] for instance in instances] return self.db.instance_fault_get_by_instance_uuids(context, uuids) + + +class AggregateAPI(base.Base): + """Sub-set of the Compute Manager API for managing host aggregates.""" + def __init__(self, **kwargs): + super(AggregateAPI, self).__init__(**kwargs) + + def create_aggregate(self, context, aggregate_name, availability_zone): + """Creates the model for the aggregate.""" + values = {"name": aggregate_name, + "availability_zone": availability_zone} + aggregate = self.db.aggregate_create(context, values) + return dict(aggregate.iteritems()) + + def get_aggregate(self, context, aggregate_id): + """Get an aggregate by id.""" + aggregate = self.db.aggregate_get(context, aggregate_id) + return self._get_aggregate_info(context, aggregate) + + def get_aggregate_list(self, context): + """Get all the aggregates for this zone.""" + aggregates = self.db.aggregate_get_all(context, read_deleted="no") + return [self._get_aggregate_info(context, a) for a in aggregates] + + def update_aggregate(self, context, aggregate_id, values): + """Update the properties of an aggregate.""" + aggregate = self.db.aggregate_update(context, aggregate_id, values) + return self._get_aggregate_info(context, aggregate) + + def update_aggregate_metadata(self, context, aggregate_id, metadata): + """Updates the aggregate metadata. + + If a key is set to None, it gets removed from the aggregate metadata. + """ + # As a first release of the host aggregates blueprint, this call is + # pretty dumb, in the sense that interacts only with the model. + # In later releasses, updating metadata may trigger virt actions like + # the setup of shared storage, or more generally changes to the + # underlying hypervisor pools. + for key in metadata.keys(): + if not metadata[key]: + try: + self.db.aggregate_metadata_delete(context, + aggregate_id, key) + metadata.pop(key) + except exception.AggregateMetadataNotFound, e: + LOG.warn(e.message) + self.db.aggregate_metadata_add(context, aggregate_id, metadata) + return self.get_aggregate(context, aggregate_id) + + def delete_aggregate(self, context, aggregate_id): + """Deletes the aggregate.""" + hosts = self.db.aggregate_host_get_all(context, aggregate_id, + read_deleted="no") + if len(hosts) > 0: + raise exception.InvalidAggregateAction(action='delete', + aggregate_id=aggregate_id, + reason='not empty') + values = {'operational_state': aggregate_states.DISMISSED} + self.db.aggregate_update(context, aggregate_id, values) + self.db.aggregate_delete(context, aggregate_id) + + def add_host_to_aggregate(self, context, aggregate_id, host): + """Adds the host to an aggregate.""" + # validates the host; ComputeHostNotFound is raised if invalid + service = self.db.service_get_all_compute_by_host(context, host)[0] + # add host, and reflects action in the aggregate operational state + aggregate = self.db.aggregate_get(context, aggregate_id) + if aggregate.operational_state in [aggregate_states.CREATED, + aggregate_states.ACTIVE]: + if service.availability_zone != aggregate.availability_zone: + raise exception.\ + InvalidAggregateAction(action='add host', + aggregate_id=aggregate_id, + reason='availibility zone mismatch') + self.db.aggregate_host_add(context, aggregate_id, host) + if aggregate.operational_state == aggregate_states.CREATED: + values = {'operational_state': aggregate_states.CHANGING} + self.db.aggregate_update(context, aggregate_id, values) + queue = self.db.queue_get_for(context, service.topic, host) + rpc.cast(context, queue, {"method": "add_aggregate_host", + "args": {"aggregate_id": aggregate_id, + "host": host}, }) + return self.get_aggregate(context, aggregate_id) + else: + invalid = {aggregate_states.CHANGING: 'setup in progress', + aggregate_states.DISMISSED: 'aggregate deleted', + aggregate_states.ERROR: 'aggregate in error', } + if aggregate.operational_state in invalid.keys(): + raise exception.\ + InvalidAggregateAction(action='add host', + aggregate_id=aggregate_id, + reason=invalid[aggregate.operational_state]) + + def remove_host_from_aggregate(self, context, aggregate_id, host): + """Removes host from the aggregate.""" + # validates the host; ComputeHostNotFound is raised if invalid + service = self.db.service_get_all_compute_by_host(context, host)[0] + aggregate = self.db.aggregate_get(context, aggregate_id) + if aggregate.operational_state in [aggregate_states.ACTIVE, + aggregate_states.ERROR]: + self.db.aggregate_host_delete(context, aggregate_id, host) + queue = self.db.queue_get_for(context, service.topic, host) + rpc.cast(context, queue, {"method": "remove_aggregate_host", + "args": {"aggregate_id": aggregate_id, + "host": host}, }) + return self.get_aggregate(context, aggregate_id) + elif aggregate.operational_state == aggregate_states.DISMISSED: + raise exception.InvalidAggregateAction(action='add host', + aggregate_id=aggregate_id, + reason='aggregate deleted') + + def _get_aggregate_info(self, context, aggregate): + """Builds a dictionary with aggregate props, metadata and hosts.""" + metadata = self.db.aggregate_metadata_get(context, aggregate.id) + hosts = self.db.aggregate_host_get_all(context, aggregate.id, + read_deleted="no") + + result = dict(aggregate.iteritems()) + result["metadata"] = metadata + result["hosts"] = hosts + return result diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 59e228e45..53d558368 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -2207,3 +2207,11 @@ class ComputeManager(manager.SchedulerDependentManager): instance_uuid, vm_state=vm_states.ERROR, task_state=None) + + def add_aggregate_host(self, context, aggregate_id, host): + """Adds a host to a physical hypervisor pool.""" + raise NotImplementedError() + + def remove_aggregate_host(self, context, aggregate_id, host): + """Removes a host from a physical hypervisor pool.""" + raise NotImplementedError() diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 43b386945..72ecd1f2d 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -4257,7 +4257,7 @@ def aggregate_create(context, values, metadata=None): try: aggregate = models.Aggregate() aggregate.update(values) - aggregate.operational_state = aggregate_states.BUILDING + aggregate.operational_state = aggregate_states.CREATED aggregate.save() except exception.DBError: raise exception.AggregateNameExists(aggregate_name=values['name']) @@ -4435,20 +4435,27 @@ def aggregate_host_delete(context, aggregate_id, host): @require_admin_context @require_aggregate_exists def aggregate_host_add(context, aggregate_id, host): + session = get_session() host_ref = _aggregate_get_query(context, models.AggregateHost, models.AggregateHost.aggregate_id, aggregate_id, - read_deleted='no').\ + session=session, + read_deleted='yes').\ filter_by(host=host).first() if not host_ref: try: host_ref = models.AggregateHost() values = {"host": host, "aggregate_id": aggregate_id, } host_ref.update(values) - host_ref.save() + host_ref.save(session=session) except exception.DBError: raise exception.AggregateHostConflict(host=host) + elif host_ref.deleted: + host_ref.update({'deleted': False, + 'deleted_at': None, + 'updated_at': literal_column('updated_at')}) + host_ref.save(session=session) else: raise exception.AggregateHostExists(host=host, aggregate_id=aggregate_id) diff --git a/nova/exception.py b/nova/exception.py index 147834066..d89f53653 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -261,6 +261,11 @@ class InvalidParameterValue(Invalid): message = _("%(err)s") +class InvalidAggregateAction(Invalid): + message = _("Cannot perform action '%(action)s' on aggregate " + "%(aggregate_id)s. Reason: %(reason)s.") + + class InstanceInvalidState(Invalid): message = _("Instance %(instance_uuid)s in %(attr)s %(state)s. Cannot " "%(method)s while the instance is in this state.") diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index f416e4a51..44938e5ba 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -31,6 +31,7 @@ import nova import nova.common.policy from nova import compute import nova.compute.api +from nova.compute import aggregate_states from nova.compute import instance_types from nova.compute import manager as compute_manager from nova.compute import power_state @@ -3011,6 +3012,216 @@ class ComputeAPITestCase(BaseTestCase): db.instance_destroy(self.context, instance['id']) +def fake_rpc_method(context, topic, msg, do_cast=True): + pass + + +def _create_service_entries(context, values={'avail_zone1': ['fake_host1', + 'fake_host2'], + 'avail_zone2': ['fake_host3'], }): + for avail_zone, hosts in values.iteritems(): + for host in hosts: + db.service_create(context, + {'host': host, + 'binary': 'nova-compute', + 'topic': 'compute', + 'report_count': 0, + 'availability_zone': avail_zone}) + return values + + +class ComputeAPIAggrTestCase(test.TestCase): + + def setUp(self): + super(ComputeAPIAggrTestCase, self).setUp() + self.api = compute.AggregateAPI() + self.context = context.get_admin_context() + self.stubs.Set(rpc, 'call', fake_rpc_method) + self.stubs.Set(rpc, 'cast', fake_rpc_method) + + def tearDown(self): + super(ComputeAPIAggrTestCase, self).tearDown() + + def test_update_aggregate_metadata(self): + aggr = self.api.create_aggregate(self.context, 'fake_aggregate', + 'fake_availability_zone') + metadata = {'foo_key1': 'foo_value1', + 'foo_key2': 'foo_value2', } + aggr = self.api.update_aggregate_metadata(self.context, aggr['id'], + metadata) + metadata['foo_key1'] = None + expected = self.api.update_aggregate_metadata(self.context, + aggr['id'], metadata) + self.assertDictMatch(expected['metadata'], {'foo_key2': 'foo_value2'}) + + def test_delete_aggregate(self): + """Ensure we can delete an aggregate.""" + aggr = self.api.create_aggregate(self.context, 'fake_aggregate', + 'fake_availability_zone') + self.api.delete_aggregate(self.context, aggr['id']) + expected = db.aggregate_get(self.context, aggr['id'], + read_deleted='yes') + self.assertNotEqual(aggr['operational_state'], + expected['operational_state']) + + def test_delete_non_empty_aggregate(self): + """Ensure InvalidAggregateAction is raised when non empty aggregate.""" + aggr = self.api.create_aggregate(self.context, 'fake_aggregate', + 'fake_availability_zone') + _create_service_entries(self.context, + {'fake_availability_zone': ['fake_host']}) + self.api.add_host_to_aggregate(self.context, aggr['id'], 'fake_host') + self.assertRaises(exception.InvalidAggregateAction, + self.api.delete_aggregate, self.context, aggr['id']) + + def test_add_host_to_aggregate(self): + """Ensure we can add a host to an aggregate.""" + values = _create_service_entries(self.context) + fake_zone = values.keys()[0] + fake_host = values[fake_zone][0] + aggr = self.api.create_aggregate(self.context, + 'fake_aggregate', fake_zone) + aggr = self.api.add_host_to_aggregate(self.context, + aggr['id'], fake_host) + self.assertEqual(aggr['operational_state'], aggregate_states.CHANGING) + + def test_add_host_to_aggregate_multiple(self): + """Ensure we can add multiple hosts to an aggregate.""" + values = _create_service_entries(self.context) + fake_zone = values.keys()[0] + aggr = self.api.create_aggregate(self.context, + 'fake_aggregate', fake_zone) + # let's mock the fact that the aggregate is active already! + status = {'operational_state': aggregate_states.ACTIVE} + db.aggregate_update(self.context, aggr['id'], status) + for host in values[fake_zone]: + aggr = self.api.add_host_to_aggregate(self.context, + aggr['id'], host) + self.assertEqual(len(aggr['hosts']), len(values[fake_zone])) + self.assertEqual(aggr['operational_state'], + aggregate_states.ACTIVE) + + def test_add_host_to_aggregate_invalid_changing_status(self): + """Ensure InvalidAggregateAction is raised when adding host while + aggregate is not ready.""" + values = _create_service_entries(self.context) + fake_zone = values.keys()[0] + fake_host = values[fake_zone][0] + aggr = self.api.create_aggregate(self.context, + 'fake_aggregate', fake_zone) + aggr = self.api.add_host_to_aggregate(self.context, + aggr['id'], fake_host) + self.assertEqual(aggr['operational_state'], + aggregate_states.CHANGING) + self.assertRaises(exception.InvalidAggregateAction, + self.api.add_host_to_aggregate, self.context, + aggr['id'], fake_host) + + def test_add_host_to_aggregate_invalid_dismissed_status(self): + """Ensure InvalidAggregateAction is raised when aggregate is + deleted.""" + aggr = self.api.create_aggregate(self.context, + 'fake_aggregate', 'fake_zone') + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) + # let's mock the fact that the aggregate is dismissed! + status = {'operational_state': aggregate_states.DISMISSED} + db.aggregate_update(self.context, aggr['id'], status) + self.assertRaises(exception.InvalidAggregateAction, + self.api.add_host_to_aggregate, self.context, + aggr['id'], 'fake_host') + + def test_add_host_to_aggregate_invalid_error_status(self): + """Ensure InvalidAggregateAction is raised when aggregate is + in error.""" + aggr = self.api.create_aggregate(self.context, + 'fake_aggregate', 'fake_zone') + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) + # let's mock the fact that the aggregate is in error! + status = {'operational_state': aggregate_states.ERROR} + db.aggregate_update(self.context, aggr['id'], status) + self.assertRaises(exception.InvalidAggregateAction, + self.api.add_host_to_aggregate, self.context, + aggr['id'], 'fake_host') + + def test_add_host_to_aggregate_zones_mismatch(self): + """Ensure InvalidAggregateAction is raised when zones don't match.""" + _create_service_entries(self.context, {'fake_zoneX': ['fake_host']}) + aggr = self.api.create_aggregate(self.context, + 'fake_aggregate', 'fake_zoneY') + self.assertRaises(exception.InvalidAggregateAction, + self.api.add_host_to_aggregate, + self.context, aggr['id'], 'fake_host') + + def test_add_host_to_aggregate_raise_not_found(self): + """Ensure ComputeHostNotFound is raised when adding invalid host.""" + aggr = self.api.create_aggregate(self.context, 'fake_aggregate', + 'fake_availability_zone') + self.assertRaises(exception.ComputeHostNotFound, + self.api.add_host_to_aggregate, + self.context, aggr['id'], 'invalid_host') + + def test_remove_host_from_aggregate_active(self): + """Ensure we can remove a host from an aggregate.""" + values = _create_service_entries(self.context) + fake_zone = values.keys()[0] + aggr = self.api.create_aggregate(self.context, + 'fake_aggregate', fake_zone) + # let's mock the fact that the aggregate is active already! + status = {'operational_state': aggregate_states.ACTIVE} + db.aggregate_update(self.context, aggr['id'], status) + for host in values[fake_zone]: + aggr = self.api.add_host_to_aggregate(self.context, + aggr['id'], host) + expected = self.api.remove_host_from_aggregate(self.context, + aggr['id'], + values[fake_zone][0]) + self.assertEqual(len(aggr['hosts']) - 1, len(expected['hosts'])) + self.assertEqual(expected['operational_state'], + aggregate_states.ACTIVE) + + def test_remove_host_from_aggregate_error(self): + """Ensure we can remove a host from an aggregate even if in error.""" + values = _create_service_entries(self.context) + fake_zone = values.keys()[0] + aggr = self.api.create_aggregate(self.context, + 'fake_aggregate', fake_zone) + # let's mock the fact that the aggregate is ready! + status = {'operational_state': aggregate_states.ACTIVE} + db.aggregate_update(self.context, aggr['id'], status) + for host in values[fake_zone]: + aggr = self.api.add_host_to_aggregate(self.context, + aggr['id'], host) + # let's mock the fact that the aggregate is in error! + status = {'operational_state': aggregate_states.ERROR} + expected = self.api.remove_host_from_aggregate(self.context, + aggr['id'], + values[fake_zone][0]) + self.assertEqual(len(aggr['hosts']) - 1, len(expected['hosts'])) + self.assertEqual(expected['operational_state'], + aggregate_states.ACTIVE) + + def test_remove_host_from_aggregate_invalid_dismissed_status(self): + """Ensure InvalidAggregateAction is raised when aggregate is + deleted.""" + aggr = self.api.create_aggregate(self.context, + 'fake_aggregate', 'fake_zone') + _create_service_entries(self.context, {'fake_zone': ['fake_host']}) + # let's mock the fact that the aggregate is dismissed! + status = {'operational_state': aggregate_states.DISMISSED} + db.aggregate_update(self.context, aggr['id'], status) + self.assertRaises(exception.InvalidAggregateAction, + self.api.remove_host_from_aggregate, self.context, + aggr['id'], 'fake_host') + + def test_remove_host_from_aggregate_raise_not_found(self): + """Ensure ComputeHostNotFound is raised when removing invalid host.""" + aggr = self.api.create_aggregate(self.context, 'fake_aggregate', + 'fake_availability_zone') + self.assertRaises(exception.ComputeHostNotFound, + self.api.remove_host_from_aggregate, + self.context, aggr['id'], 'invalid_host') + + class ComputePolicyTestCase(BaseTestCase): def setUp(self): diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py index a384181dd..121120dbf 100644 --- a/nova/tests/test_db_api.py +++ b/nova/tests/test_db_api.py @@ -304,7 +304,7 @@ class AggregateDBApiTestCase(test.TestCase): def test_aggregate_create(self): """Ensure aggregate can be created with no metadata.""" result = _create_aggregate(metadata=None) - self.assertEqual(result['operational_state'], 'building') + self.assertEqual(result['operational_state'], 'created') def test_aggregate_create_raise_exist_exc(self): """Ensure aggregate names are distinct.""" @@ -476,6 +476,17 @@ class AggregateDBApiTestCase(test.TestCase): expected = db.aggregate_host_get_all(ctxt, result.id) self.assertEqual(_get_fake_aggr_hosts(), expected) + def test_aggregate_host_add_deleted(self): + """Ensure we can add a host that was previously deleted.""" + ctxt = context.get_admin_context() + result = _create_aggregate_with_hosts(context=ctxt, metadata=None) + host = _get_fake_aggr_hosts()[0] + db.aggregate_host_delete(ctxt, result.id, host) + db.aggregate_host_add(ctxt, result.id, host) + expected = db.aggregate_host_get_all(ctxt, result.id, + read_deleted='no') + self.assertEqual(len(expected), 1) + def test_aggregate_host_add_duplicate_raise_conflict(self): """Ensure we cannot add host to distinct aggregates.""" ctxt = context.get_admin_context() |