summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nova/compute/__init__.py1
-rw-r--r--nova/compute/aggregate_states.py26
-rw-r--r--nova/compute/api.py126
-rw-r--r--nova/compute/manager.py8
-rw-r--r--nova/db/sqlalchemy/api.py13
-rw-r--r--nova/exception.py5
-rw-r--r--nova/tests/test_compute.py211
-rw-r--r--nova/tests/test_db_api.py13
8 files changed, 393 insertions, 10 deletions
diff --git a/nova/compute/__init__.py b/nova/compute/__init__.py
index b94f971d1..c0cfe7c27 100644
--- a/nova/compute/__init__.py
+++ b/nova/compute/__init__.py
@@ -17,3 +17,4 @@
# under the License.
from nova.compute.api import API
+from nova.compute.api import AggregateAPI
diff --git a/nova/compute/aggregate_states.py b/nova/compute/aggregate_states.py
index bad7720e3..92e194027 100644
--- a/nova/compute/aggregate_states.py
+++ b/nova/compute/aggregate_states.py
@@ -17,12 +17,28 @@
"""Possible states for host aggregates.
-An aggregate may be 'building', in which case the admin has triggered its
-creation, but the underlying hypervisor pool has not actually being created
-yet. An aggregate may be 'active', in which case the underlying hypervisor
-pool is up and running. An aggregate may be in 'error' in all other cases.
+An aggregate may be 'created', in which case the admin has triggered its
+creation, but the underlying hypervisor pool has not actually being set up
+yet. An aggregate may be 'changing', meaning that the underlying hypervisor
+pool is being setup. An aggregate may be 'active', in which case the underlying
+hypervisor pool is up and running. An aggregate may be 'dismissed' when it has
+no hosts and it has been deleted. An aggregate may be in 'error' in all other
+cases.
+A 'created' aggregate becomes 'changing' during the first request of
+adding a host. During a 'changing' status no other requests will be accepted;
+this is to allow the hypervisor layer to instantiate the underlying pool
+without any potential race condition that may incur in master/slave-based
+configurations. The aggregate goes into the 'active' state when the underlying
+pool has been correctly instantiated.
+All other operations (e.g. add/remove hosts) that succeed will keep the
+aggregate in the 'active' state. If a number of continuous requests fail,
+an 'active' aggregate goes into an 'error' state. To recover from such a state,
+admin intervention is required. Currently an error state is irreversible,
+that is, in order to recover from it an aggregate must be deleted.
"""
-BUILDING = 'building'
+CREATED = 'created'
+CHANGING = 'changing'
ACTIVE = 'active'
ERROR = 'error'
+DISMISSED = 'dismissed'
diff --git a/nova/compute/api.py b/nova/compute/api.py
index 0e874358b..fcc1f1652 100644
--- a/nova/compute/api.py
+++ b/nova/compute/api.py
@@ -17,7 +17,8 @@
# License for the specific language governing permissions and limitations
# under the License.
-"""Handles all requests relating to instances (guest vms)."""
+"""Handles all requests relating to compute resources (e.g. guest vms,
+networking and storage of vms, and compute hosts on which they run)."""
import functools
import re
@@ -27,6 +28,7 @@ import novaclient
import webob.exc
from nova import block_device
+from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import power_state
from nova.compute import task_states
@@ -1740,3 +1742,125 @@ class API(base.Base):
uuids = [instance['uuid'] for instance in instances]
return self.db.instance_fault_get_by_instance_uuids(context, uuids)
+
+
+class AggregateAPI(base.Base):
+ """Sub-set of the Compute Manager API for managing host aggregates."""
+ def __init__(self, **kwargs):
+ super(AggregateAPI, self).__init__(**kwargs)
+
+ def create_aggregate(self, context, aggregate_name, availability_zone):
+ """Creates the model for the aggregate."""
+ values = {"name": aggregate_name,
+ "availability_zone": availability_zone}
+ aggregate = self.db.aggregate_create(context, values)
+ return dict(aggregate.iteritems())
+
+ def get_aggregate(self, context, aggregate_id):
+ """Get an aggregate by id."""
+ aggregate = self.db.aggregate_get(context, aggregate_id)
+ return self._get_aggregate_info(context, aggregate)
+
+ def get_aggregate_list(self, context):
+ """Get all the aggregates for this zone."""
+ aggregates = self.db.aggregate_get_all(context, read_deleted="no")
+ return [self._get_aggregate_info(context, a) for a in aggregates]
+
+ def update_aggregate(self, context, aggregate_id, values):
+ """Update the properties of an aggregate."""
+ aggregate = self.db.aggregate_update(context, aggregate_id, values)
+ return self._get_aggregate_info(context, aggregate)
+
+ def update_aggregate_metadata(self, context, aggregate_id, metadata):
+ """Updates the aggregate metadata.
+
+ If a key is set to None, it gets removed from the aggregate metadata.
+ """
+ # As a first release of the host aggregates blueprint, this call is
+ # pretty dumb, in the sense that interacts only with the model.
+ # In later releasses, updating metadata may trigger virt actions like
+ # the setup of shared storage, or more generally changes to the
+ # underlying hypervisor pools.
+ for key in metadata.keys():
+ if not metadata[key]:
+ try:
+ self.db.aggregate_metadata_delete(context,
+ aggregate_id, key)
+ metadata.pop(key)
+ except exception.AggregateMetadataNotFound, e:
+ LOG.warn(e.message)
+ self.db.aggregate_metadata_add(context, aggregate_id, metadata)
+ return self.get_aggregate(context, aggregate_id)
+
+ def delete_aggregate(self, context, aggregate_id):
+ """Deletes the aggregate."""
+ hosts = self.db.aggregate_host_get_all(context, aggregate_id,
+ read_deleted="no")
+ if len(hosts) > 0:
+ raise exception.InvalidAggregateAction(action='delete',
+ aggregate_id=aggregate_id,
+ reason='not empty')
+ values = {'operational_state': aggregate_states.DISMISSED}
+ self.db.aggregate_update(context, aggregate_id, values)
+ self.db.aggregate_delete(context, aggregate_id)
+
+ def add_host_to_aggregate(self, context, aggregate_id, host):
+ """Adds the host to an aggregate."""
+ # validates the host; ComputeHostNotFound is raised if invalid
+ service = self.db.service_get_all_compute_by_host(context, host)[0]
+ # add host, and reflects action in the aggregate operational state
+ aggregate = self.db.aggregate_get(context, aggregate_id)
+ if aggregate.operational_state in [aggregate_states.CREATED,
+ aggregate_states.ACTIVE]:
+ if service.availability_zone != aggregate.availability_zone:
+ raise exception.\
+ InvalidAggregateAction(action='add host',
+ aggregate_id=aggregate_id,
+ reason='availibility zone mismatch')
+ self.db.aggregate_host_add(context, aggregate_id, host)
+ if aggregate.operational_state == aggregate_states.CREATED:
+ values = {'operational_state': aggregate_states.CHANGING}
+ self.db.aggregate_update(context, aggregate_id, values)
+ queue = self.db.queue_get_for(context, service.topic, host)
+ rpc.cast(context, queue, {"method": "add_aggregate_host",
+ "args": {"aggregate_id": aggregate_id,
+ "host": host}, })
+ return self.get_aggregate(context, aggregate_id)
+ else:
+ invalid = {aggregate_states.CHANGING: 'setup in progress',
+ aggregate_states.DISMISSED: 'aggregate deleted',
+ aggregate_states.ERROR: 'aggregate in error', }
+ if aggregate.operational_state in invalid.keys():
+ raise exception.\
+ InvalidAggregateAction(action='add host',
+ aggregate_id=aggregate_id,
+ reason=invalid[aggregate.operational_state])
+
+ def remove_host_from_aggregate(self, context, aggregate_id, host):
+ """Removes host from the aggregate."""
+ # validates the host; ComputeHostNotFound is raised if invalid
+ service = self.db.service_get_all_compute_by_host(context, host)[0]
+ aggregate = self.db.aggregate_get(context, aggregate_id)
+ if aggregate.operational_state in [aggregate_states.ACTIVE,
+ aggregate_states.ERROR]:
+ self.db.aggregate_host_delete(context, aggregate_id, host)
+ queue = self.db.queue_get_for(context, service.topic, host)
+ rpc.cast(context, queue, {"method": "remove_aggregate_host",
+ "args": {"aggregate_id": aggregate_id,
+ "host": host}, })
+ return self.get_aggregate(context, aggregate_id)
+ elif aggregate.operational_state == aggregate_states.DISMISSED:
+ raise exception.InvalidAggregateAction(action='add host',
+ aggregate_id=aggregate_id,
+ reason='aggregate deleted')
+
+ def _get_aggregate_info(self, context, aggregate):
+ """Builds a dictionary with aggregate props, metadata and hosts."""
+ metadata = self.db.aggregate_metadata_get(context, aggregate.id)
+ hosts = self.db.aggregate_host_get_all(context, aggregate.id,
+ read_deleted="no")
+
+ result = dict(aggregate.iteritems())
+ result["metadata"] = metadata
+ result["hosts"] = hosts
+ return result
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index 59e228e45..53d558368 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -2207,3 +2207,11 @@ class ComputeManager(manager.SchedulerDependentManager):
instance_uuid,
vm_state=vm_states.ERROR,
task_state=None)
+
+ def add_aggregate_host(self, context, aggregate_id, host):
+ """Adds a host to a physical hypervisor pool."""
+ raise NotImplementedError()
+
+ def remove_aggregate_host(self, context, aggregate_id, host):
+ """Removes a host from a physical hypervisor pool."""
+ raise NotImplementedError()
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 43b386945..72ecd1f2d 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -4257,7 +4257,7 @@ def aggregate_create(context, values, metadata=None):
try:
aggregate = models.Aggregate()
aggregate.update(values)
- aggregate.operational_state = aggregate_states.BUILDING
+ aggregate.operational_state = aggregate_states.CREATED
aggregate.save()
except exception.DBError:
raise exception.AggregateNameExists(aggregate_name=values['name'])
@@ -4435,20 +4435,27 @@ def aggregate_host_delete(context, aggregate_id, host):
@require_admin_context
@require_aggregate_exists
def aggregate_host_add(context, aggregate_id, host):
+ session = get_session()
host_ref = _aggregate_get_query(context,
models.AggregateHost,
models.AggregateHost.aggregate_id,
aggregate_id,
- read_deleted='no').\
+ session=session,
+ read_deleted='yes').\
filter_by(host=host).first()
if not host_ref:
try:
host_ref = models.AggregateHost()
values = {"host": host, "aggregate_id": aggregate_id, }
host_ref.update(values)
- host_ref.save()
+ host_ref.save(session=session)
except exception.DBError:
raise exception.AggregateHostConflict(host=host)
+ elif host_ref.deleted:
+ host_ref.update({'deleted': False,
+ 'deleted_at': None,
+ 'updated_at': literal_column('updated_at')})
+ host_ref.save(session=session)
else:
raise exception.AggregateHostExists(host=host,
aggregate_id=aggregate_id)
diff --git a/nova/exception.py b/nova/exception.py
index 147834066..d89f53653 100644
--- a/nova/exception.py
+++ b/nova/exception.py
@@ -261,6 +261,11 @@ class InvalidParameterValue(Invalid):
message = _("%(err)s")
+class InvalidAggregateAction(Invalid):
+ message = _("Cannot perform action '%(action)s' on aggregate "
+ "%(aggregate_id)s. Reason: %(reason)s.")
+
+
class InstanceInvalidState(Invalid):
message = _("Instance %(instance_uuid)s in %(attr)s %(state)s. Cannot "
"%(method)s while the instance is in this state.")
diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py
index f416e4a51..44938e5ba 100644
--- a/nova/tests/test_compute.py
+++ b/nova/tests/test_compute.py
@@ -31,6 +31,7 @@ import nova
import nova.common.policy
from nova import compute
import nova.compute.api
+from nova.compute import aggregate_states
from nova.compute import instance_types
from nova.compute import manager as compute_manager
from nova.compute import power_state
@@ -3011,6 +3012,216 @@ class ComputeAPITestCase(BaseTestCase):
db.instance_destroy(self.context, instance['id'])
+def fake_rpc_method(context, topic, msg, do_cast=True):
+ pass
+
+
+def _create_service_entries(context, values={'avail_zone1': ['fake_host1',
+ 'fake_host2'],
+ 'avail_zone2': ['fake_host3'], }):
+ for avail_zone, hosts in values.iteritems():
+ for host in hosts:
+ db.service_create(context,
+ {'host': host,
+ 'binary': 'nova-compute',
+ 'topic': 'compute',
+ 'report_count': 0,
+ 'availability_zone': avail_zone})
+ return values
+
+
+class ComputeAPIAggrTestCase(test.TestCase):
+
+ def setUp(self):
+ super(ComputeAPIAggrTestCase, self).setUp()
+ self.api = compute.AggregateAPI()
+ self.context = context.get_admin_context()
+ self.stubs.Set(rpc, 'call', fake_rpc_method)
+ self.stubs.Set(rpc, 'cast', fake_rpc_method)
+
+ def tearDown(self):
+ super(ComputeAPIAggrTestCase, self).tearDown()
+
+ def test_update_aggregate_metadata(self):
+ aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
+ 'fake_availability_zone')
+ metadata = {'foo_key1': 'foo_value1',
+ 'foo_key2': 'foo_value2', }
+ aggr = self.api.update_aggregate_metadata(self.context, aggr['id'],
+ metadata)
+ metadata['foo_key1'] = None
+ expected = self.api.update_aggregate_metadata(self.context,
+ aggr['id'], metadata)
+ self.assertDictMatch(expected['metadata'], {'foo_key2': 'foo_value2'})
+
+ def test_delete_aggregate(self):
+ """Ensure we can delete an aggregate."""
+ aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
+ 'fake_availability_zone')
+ self.api.delete_aggregate(self.context, aggr['id'])
+ expected = db.aggregate_get(self.context, aggr['id'],
+ read_deleted='yes')
+ self.assertNotEqual(aggr['operational_state'],
+ expected['operational_state'])
+
+ def test_delete_non_empty_aggregate(self):
+ """Ensure InvalidAggregateAction is raised when non empty aggregate."""
+ aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
+ 'fake_availability_zone')
+ _create_service_entries(self.context,
+ {'fake_availability_zone': ['fake_host']})
+ self.api.add_host_to_aggregate(self.context, aggr['id'], 'fake_host')
+ self.assertRaises(exception.InvalidAggregateAction,
+ self.api.delete_aggregate, self.context, aggr['id'])
+
+ def test_add_host_to_aggregate(self):
+ """Ensure we can add a host to an aggregate."""
+ values = _create_service_entries(self.context)
+ fake_zone = values.keys()[0]
+ fake_host = values[fake_zone][0]
+ aggr = self.api.create_aggregate(self.context,
+ 'fake_aggregate', fake_zone)
+ aggr = self.api.add_host_to_aggregate(self.context,
+ aggr['id'], fake_host)
+ self.assertEqual(aggr['operational_state'], aggregate_states.CHANGING)
+
+ def test_add_host_to_aggregate_multiple(self):
+ """Ensure we can add multiple hosts to an aggregate."""
+ values = _create_service_entries(self.context)
+ fake_zone = values.keys()[0]
+ aggr = self.api.create_aggregate(self.context,
+ 'fake_aggregate', fake_zone)
+ # let's mock the fact that the aggregate is active already!
+ status = {'operational_state': aggregate_states.ACTIVE}
+ db.aggregate_update(self.context, aggr['id'], status)
+ for host in values[fake_zone]:
+ aggr = self.api.add_host_to_aggregate(self.context,
+ aggr['id'], host)
+ self.assertEqual(len(aggr['hosts']), len(values[fake_zone]))
+ self.assertEqual(aggr['operational_state'],
+ aggregate_states.ACTIVE)
+
+ def test_add_host_to_aggregate_invalid_changing_status(self):
+ """Ensure InvalidAggregateAction is raised when adding host while
+ aggregate is not ready."""
+ values = _create_service_entries(self.context)
+ fake_zone = values.keys()[0]
+ fake_host = values[fake_zone][0]
+ aggr = self.api.create_aggregate(self.context,
+ 'fake_aggregate', fake_zone)
+ aggr = self.api.add_host_to_aggregate(self.context,
+ aggr['id'], fake_host)
+ self.assertEqual(aggr['operational_state'],
+ aggregate_states.CHANGING)
+ self.assertRaises(exception.InvalidAggregateAction,
+ self.api.add_host_to_aggregate, self.context,
+ aggr['id'], fake_host)
+
+ def test_add_host_to_aggregate_invalid_dismissed_status(self):
+ """Ensure InvalidAggregateAction is raised when aggregate is
+ deleted."""
+ aggr = self.api.create_aggregate(self.context,
+ 'fake_aggregate', 'fake_zone')
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
+ # let's mock the fact that the aggregate is dismissed!
+ status = {'operational_state': aggregate_states.DISMISSED}
+ db.aggregate_update(self.context, aggr['id'], status)
+ self.assertRaises(exception.InvalidAggregateAction,
+ self.api.add_host_to_aggregate, self.context,
+ aggr['id'], 'fake_host')
+
+ def test_add_host_to_aggregate_invalid_error_status(self):
+ """Ensure InvalidAggregateAction is raised when aggregate is
+ in error."""
+ aggr = self.api.create_aggregate(self.context,
+ 'fake_aggregate', 'fake_zone')
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
+ # let's mock the fact that the aggregate is in error!
+ status = {'operational_state': aggregate_states.ERROR}
+ db.aggregate_update(self.context, aggr['id'], status)
+ self.assertRaises(exception.InvalidAggregateAction,
+ self.api.add_host_to_aggregate, self.context,
+ aggr['id'], 'fake_host')
+
+ def test_add_host_to_aggregate_zones_mismatch(self):
+ """Ensure InvalidAggregateAction is raised when zones don't match."""
+ _create_service_entries(self.context, {'fake_zoneX': ['fake_host']})
+ aggr = self.api.create_aggregate(self.context,
+ 'fake_aggregate', 'fake_zoneY')
+ self.assertRaises(exception.InvalidAggregateAction,
+ self.api.add_host_to_aggregate,
+ self.context, aggr['id'], 'fake_host')
+
+ def test_add_host_to_aggregate_raise_not_found(self):
+ """Ensure ComputeHostNotFound is raised when adding invalid host."""
+ aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
+ 'fake_availability_zone')
+ self.assertRaises(exception.ComputeHostNotFound,
+ self.api.add_host_to_aggregate,
+ self.context, aggr['id'], 'invalid_host')
+
+ def test_remove_host_from_aggregate_active(self):
+ """Ensure we can remove a host from an aggregate."""
+ values = _create_service_entries(self.context)
+ fake_zone = values.keys()[0]
+ aggr = self.api.create_aggregate(self.context,
+ 'fake_aggregate', fake_zone)
+ # let's mock the fact that the aggregate is active already!
+ status = {'operational_state': aggregate_states.ACTIVE}
+ db.aggregate_update(self.context, aggr['id'], status)
+ for host in values[fake_zone]:
+ aggr = self.api.add_host_to_aggregate(self.context,
+ aggr['id'], host)
+ expected = self.api.remove_host_from_aggregate(self.context,
+ aggr['id'],
+ values[fake_zone][0])
+ self.assertEqual(len(aggr['hosts']) - 1, len(expected['hosts']))
+ self.assertEqual(expected['operational_state'],
+ aggregate_states.ACTIVE)
+
+ def test_remove_host_from_aggregate_error(self):
+ """Ensure we can remove a host from an aggregate even if in error."""
+ values = _create_service_entries(self.context)
+ fake_zone = values.keys()[0]
+ aggr = self.api.create_aggregate(self.context,
+ 'fake_aggregate', fake_zone)
+ # let's mock the fact that the aggregate is ready!
+ status = {'operational_state': aggregate_states.ACTIVE}
+ db.aggregate_update(self.context, aggr['id'], status)
+ for host in values[fake_zone]:
+ aggr = self.api.add_host_to_aggregate(self.context,
+ aggr['id'], host)
+ # let's mock the fact that the aggregate is in error!
+ status = {'operational_state': aggregate_states.ERROR}
+ expected = self.api.remove_host_from_aggregate(self.context,
+ aggr['id'],
+ values[fake_zone][0])
+ self.assertEqual(len(aggr['hosts']) - 1, len(expected['hosts']))
+ self.assertEqual(expected['operational_state'],
+ aggregate_states.ACTIVE)
+
+ def test_remove_host_from_aggregate_invalid_dismissed_status(self):
+ """Ensure InvalidAggregateAction is raised when aggregate is
+ deleted."""
+ aggr = self.api.create_aggregate(self.context,
+ 'fake_aggregate', 'fake_zone')
+ _create_service_entries(self.context, {'fake_zone': ['fake_host']})
+ # let's mock the fact that the aggregate is dismissed!
+ status = {'operational_state': aggregate_states.DISMISSED}
+ db.aggregate_update(self.context, aggr['id'], status)
+ self.assertRaises(exception.InvalidAggregateAction,
+ self.api.remove_host_from_aggregate, self.context,
+ aggr['id'], 'fake_host')
+
+ def test_remove_host_from_aggregate_raise_not_found(self):
+ """Ensure ComputeHostNotFound is raised when removing invalid host."""
+ aggr = self.api.create_aggregate(self.context, 'fake_aggregate',
+ 'fake_availability_zone')
+ self.assertRaises(exception.ComputeHostNotFound,
+ self.api.remove_host_from_aggregate,
+ self.context, aggr['id'], 'invalid_host')
+
+
class ComputePolicyTestCase(BaseTestCase):
def setUp(self):
diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py
index a384181dd..121120dbf 100644
--- a/nova/tests/test_db_api.py
+++ b/nova/tests/test_db_api.py
@@ -304,7 +304,7 @@ class AggregateDBApiTestCase(test.TestCase):
def test_aggregate_create(self):
"""Ensure aggregate can be created with no metadata."""
result = _create_aggregate(metadata=None)
- self.assertEqual(result['operational_state'], 'building')
+ self.assertEqual(result['operational_state'], 'created')
def test_aggregate_create_raise_exist_exc(self):
"""Ensure aggregate names are distinct."""
@@ -476,6 +476,17 @@ class AggregateDBApiTestCase(test.TestCase):
expected = db.aggregate_host_get_all(ctxt, result.id)
self.assertEqual(_get_fake_aggr_hosts(), expected)
+ def test_aggregate_host_add_deleted(self):
+ """Ensure we can add a host that was previously deleted."""
+ ctxt = context.get_admin_context()
+ result = _create_aggregate_with_hosts(context=ctxt, metadata=None)
+ host = _get_fake_aggr_hosts()[0]
+ db.aggregate_host_delete(ctxt, result.id, host)
+ db.aggregate_host_add(ctxt, result.id, host)
+ expected = db.aggregate_host_get_all(ctxt, result.id,
+ read_deleted='no')
+ self.assertEqual(len(expected), 1)
+
def test_aggregate_host_add_duplicate_raise_conflict(self):
"""Ensure we cannot add host to distinct aggregates."""
ctxt = context.get_admin_context()