From 861efe3aa7ce6af7b5c548e5a555625fa53a3d86 Mon Sep 17 00:00:00 2001 From: Joe Gordon Date: Tue, 24 Jul 2012 17:03:56 -0700 Subject: General host aggregates part 2 Partially implements blueprint general-host-aggregates: Scheduler Filter * Add AggregateInstanceExtraSpecsFilter * change db.aggregate_get_by_host to return a list of aggregates instead of the first aggregate * Add optional key filter to db.aggregate_get_by_host along with test * Add db.aggregate_metadata_get_by_host to get host aggregate metadata * add optional key filter to db.aggregate_metadata_get_by_host * Add AggregateTypeAffinityFilter Change-Id: I4a3061276cc3b0c5c695eaf973b773183b3c4f4b --- nova/db/api.py | 15 +++- nova/db/sqlalchemy/api.py | 31 ++++++--- .../filters/aggregate_instance_extra_specs.py | 51 ++++++++++++++ nova/scheduler/filters/type_filter.py | 16 +++++ nova/tests/scheduler/test_host_filters.py | 80 ++++++++++++++++++++++ nova/tests/test_db_api.py | 62 +++++++++++++++-- nova/tests/test_xenapi.py | 8 +-- nova/virt/xenapi/driver.py | 13 ++-- nova/virt/xenapi/host.py | 6 +- nova/virt/xenapi/pool.py | 9 +-- nova/virt/xenapi/pool_states.py | 7 ++ nova/virt/xenapi/vmops.py | 6 +- 12 files changed, 267 insertions(+), 37 deletions(-) create mode 100644 nova/scheduler/filters/aggregate_instance_extra_specs.py (limited to 'nova') diff --git a/nova/db/api.py b/nova/db/api.py index e6ebecbdf..83f4ca355 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -1828,9 +1828,18 @@ def aggregate_get(context, aggregate_id): return IMPL.aggregate_get(context, aggregate_id) -def aggregate_get_by_host(context, host): - """Get a specific aggregate by host""" - return IMPL.aggregate_get_by_host(context, host) +def aggregate_get_by_host(context, host, key=None): + """Get a list of aggregates that host belongs to""" + return IMPL.aggregate_get_by_host(context, host, key) + + +def aggregate_metadata_get_by_host(context, host, key=None): + """Get metadata for all aggregates that host belongs to. + + Returns a dictionary where each value is a set, this is to cover the case + where there two aggregates have different values for the same key. + Optional key filter""" + return IMPL.aggregate_metadata_get_by_host(context, host, key) def aggregate_update(context, aggregate_id, values): diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 2993aaaf3..325827089 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -19,6 +19,7 @@ """Implementation of SQLAlchemy backend.""" +from collections import defaultdict import copy import datetime import functools @@ -4926,16 +4927,30 @@ def aggregate_get(context, aggregate_id): @require_admin_context -def aggregate_get_by_host(context, host): - aggregate_host = _aggregate_get_query(context, - models.AggregateHost, - models.AggregateHost.host, - host).first() +def aggregate_get_by_host(context, host, key=None): + query = model_query(context, models.Aggregate).join( + "_hosts").filter(models.AggregateHost.host == host) + + if key: + query = query.join("_metadata").filter( + models.AggregateMetadata.key == key) + return query.all() - if not aggregate_host: - raise exception.AggregateHostNotFound(host=host) - return aggregate_get(context, aggregate_host.aggregate_id) +@require_admin_context +def aggregate_metadata_get_by_host(context, host, key=None): + query = model_query(context, models.Aggregate).join( + "_hosts").filter(models.AggregateHost.host == host).join( + "_metadata") + + if key: + query = query.filter(models.AggregateMetadata.key == key) + rows = query.all() + metadata = defaultdict(set) + for agg in rows: + for kv in agg._metadata: + metadata[kv['key']].add(kv['value']) + return metadata @require_admin_context diff --git a/nova/scheduler/filters/aggregate_instance_extra_specs.py b/nova/scheduler/filters/aggregate_instance_extra_specs.py new file mode 100644 index 000000000..5851909be --- /dev/null +++ b/nova/scheduler/filters/aggregate_instance_extra_specs.py @@ -0,0 +1,51 @@ +# Copyright (c) 2012 OpenStack, LLC. +# Copyright (c) 2012 Cloudscaling +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova import db +from nova import exception +from nova.openstack.common import log as logging +from nova.scheduler import filters +from nova import utils + + +LOG = logging.getLogger(__name__) + + +class AggregateInstanceExtraSpecsFilter(filters.BaseHostFilter): + """AggregateInstanceExtraSpecsFilter works with InstanceType records.""" + + def host_passes(self, host_state, filter_properties): + """Return a list of hosts that can create instance_type + + Check that the extra specs associated with the instance type match + the metadata provided by aggregates. If not present return False. + """ + instance_type = filter_properties.get('instance_type') + if 'extra_specs' not in instance_type: + return True + + context = filter_properties['context'].elevated() + metadata = db.aggregate_metadata_get_by_host(context, host_state.host) + + for key, value in instance_type['extra_specs'].iteritems(): + aggregate_value = metadata.get(key, None) + if not aggregate_value or value not in aggregate_value: + LOG.debug(_("%(host_state)s fails " + "AggregateInstanceExtraSpecsFilter requirements, " + "missing %(key)s,'%(value)s'=" + "'%(aggregate_value)s'"), locals()) + return False + return True diff --git a/nova/scheduler/filters/type_filter.py b/nova/scheduler/filters/type_filter.py index a8d842d52..3f7c631b7 100644 --- a/nova/scheduler/filters/type_filter.py +++ b/nova/scheduler/filters/type_filter.py @@ -37,3 +37,19 @@ class TypeAffinityFilter(filters.BaseHostFilter): instances_other_type = db.instance_get_all_by_host_and_not_type( context, host_state.host, instance_type['id']) return len(instances_other_type) == 0 + + +class AggregateTypeAffinityFilter(filters.BaseHostFilter): + """AggregateTypeAffinityFilter limits instance_type by aggregate + + return True if no instance_type key is set or if the aggregate metadata + key 'instance_type' has the instance_type name as a value + """ + + def host_passes(self, host_state, filter_properties): + instance_type = filter_properties.get('instance_type') + context = filter_properties['context'].elevated() + metadata = db.aggregate_metadata_get_by_host( + context, host_state.host, key='instance_type') + return (len(metadata) == 0 or + instance_type['name'] in metadata['instance_type']) diff --git a/nova/tests/scheduler/test_host_filters.py b/nova/tests/scheduler/test_host_filters.py index c4bdf4d61..57827ac19 100644 --- a/nova/tests/scheduler/test_host_filters.py +++ b/nova/tests/scheduler/test_host_filters.py @@ -19,6 +19,7 @@ import httplib import stubout from nova import context +from nova import db from nova import exception from nova import flags from nova.openstack.common import jsonutils @@ -292,6 +293,28 @@ class HostFiltersTestCase(test.TestCase): params={'host': 'fake_host', 'instance_type_id': 2}) self.assertFalse(filt_cls.host_passes(host, filter_properties)) + def test_aggregate_type_filter(self): + self._stub_service_is_up(True) + filt_cls = self.class_map['AggregateTypeAffinityFilter']() + + filter_properties = {'context': self.context, + 'instance_type': {'name': 'fake1'}} + filter2_properties = {'context': self.context, + 'instance_type': {'name': 'fake2'}} + capabilities = {'enabled': True} + service = {'disabled': False} + host = fakes.FakeHostState('fake_host', 'compute', + {'capabilities': capabilities, + 'service': service}) + #True since no aggregates + self.assertTrue(filt_cls.host_passes(host, filter_properties)) + #True since type matches aggregate, metadata + self._create_aggregate_with_host(name='fake_aggregate', + hosts=['fake_host'], metadata={'instance_type': 'fake1'}) + self.assertTrue(filt_cls.host_passes(host, filter_properties)) + #False since type matches aggregate, metadata + self.assertFalse(filt_cls.host_passes(host, filter2_properties)) + def test_ram_filter_fails_on_memory(self): self._stub_service_is_up(True) filt_cls = self.class_map['RamFilter']() @@ -398,6 +421,63 @@ class HostFiltersTestCase(test.TestCase): self.assertFalse(filt_cls.host_passes(host, filter_properties)) + def test_aggregate_filter_passes_no_extra_specs(self): + self._stub_service_is_up(True) + filt_cls = self.class_map['AggregateInstanceExtraSpecsFilter']() + capabilities = {'enabled': True, 'opt1': 1, 'opt2': 2} + + filter_properties = {'context': self.context, 'instance_type': + {'memory_mb': 1024}} + host = fakes.FakeHostState('host1', 'compute', + {'capabilities': capabilities}) + self.assertTrue(filt_cls.host_passes(host, filter_properties)) + + def _create_aggregate_with_host(self, name='fake_aggregate', + metadata=None, + hosts=['host1']): + values = {'name': name, + 'availability_zone': 'fake_avail_zone', } + result = db.aggregate_create(self.context.elevated(), values, metadata) + for host in hosts: + db.aggregate_host_add(self.context.elevated(), result.id, host) + return result + + def test_aggregate_filter_passes_extra_specs(self): + self._stub_service_is_up(True) + filt_cls = self.class_map['AggregateInstanceExtraSpecsFilter']() + extra_specs = {'opt1': '1', 'opt2': '2'} + self._create_aggregate_with_host(metadata={'opt1': '1'}) + self._create_aggregate_with_host(name='fake2', metadata={'opt2': '2'}) + + filter_properties = {'context': self.context, 'instance_type': + {'memory_mb': 1024, 'extra_specs': extra_specs}} + host = fakes.FakeHostState('host1', 'compute', {'free_ram_mb': 1024}) + self.assertTrue(filt_cls.host_passes(host, filter_properties)) + + def test_aggregate_filter_fails_extra_specs(self): + self._stub_service_is_up(True) + filt_cls = self.class_map['AggregateInstanceExtraSpecsFilter']() + extra_specs = {'opt1': 1, 'opt2': 3} + self._create_aggregate_with_host(metadata={'opt1': '1'}) + self._create_aggregate_with_host(name='fake2', metadata={'opt2': '2'}) + filter_properties = {'context': self.context, 'instance_type': + {'memory_mb': 1024, 'extra_specs': extra_specs}} + host = fakes.FakeHostState('host1', 'compute', {'free_ram_mb': 1024}) + self.assertFalse(filt_cls.host_passes(host, filter_properties)) + + def test_aggregate_filter_fails_extra_specs_deleted_host(self): + self._stub_service_is_up(True) + filt_cls = self.class_map['AggregateInstanceExtraSpecsFilter']() + extra_specs = {'opt1': '1', 'opt2': '2'} + self._create_aggregate_with_host(metadata={'opt1': '1'}) + agg2 = self._create_aggregate_with_host(name='fake2', + metadata={'opt2': '2'}) + filter_properties = {'context': self.context, 'instance_type': + {'memory_mb': 1024, 'extra_specs': extra_specs}} + host = fakes.FakeHostState('host1', 'compute', {'free_ram_mb': 1024}) + db.aggregate_host_delete(self.context.elevated(), agg2.id, 'host1') + self.assertFalse(filt_cls.host_passes(host, filter_properties)) + def test_isolated_hosts_fails_isolated_on_non_isolated(self): self.flags(isolated_images=['isolated'], isolated_hosts=['isolated']) filt_cls = self.class_map['IsolatedHostsFilter']() diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py index b2b1cf9e2..4a7adb001 100644 --- a/nova/tests/test_db_api.py +++ b/nova/tests/test_db_api.py @@ -515,18 +515,68 @@ class AggregateDBApiTestCase(test.TestCase): self.assertEqual(_get_fake_aggr_metadata(), expected.metadetails) def test_aggregate_get_by_host(self): - """Ensure we can get an aggregate by host.""" + """Ensure we can get aggregates by host.""" ctxt = context.get_admin_context() - r1 = _create_aggregate_with_hosts(context=ctxt) - r2 = db.aggregate_get_by_host(ctxt, 'foo.openstack.org') - self.assertEqual(r1.id, r2.id) + values = {'name': 'fake_aggregate2', + 'availability_zone': 'fake_avail_zone', } + a1 = _create_aggregate_with_hosts(context=ctxt) + a2 = _create_aggregate_with_hosts(context=ctxt, values=values) + r1 = db.aggregate_get_by_host(ctxt, 'foo.openstack.org') + self.assertEqual([a1.id, a2.id], [x.id for x in r1]) + + def test_aggregate_get_by_host_with_key(self): + """Ensure we can get aggregates by host.""" + ctxt = context.get_admin_context() + values = {'name': 'fake_aggregate2', + 'availability_zone': 'fake_avail_zone', } + a1 = _create_aggregate_with_hosts(context=ctxt, + metadata={'goodkey': 'good'}) + a2 = _create_aggregate_with_hosts(context=ctxt, values=values) + # filter result by key + r1 = db.aggregate_get_by_host(ctxt, 'foo.openstack.org', key='goodkey') + self.assertEqual([a1.id], [x.id for x in r1]) + + def test_aggregate_metdata_get_by_host(self): + """Ensure we can get aggregates by host.""" + ctxt = context.get_admin_context() + values = {'name': 'fake_aggregate2', + 'availability_zone': 'fake_avail_zone', } + values2 = {'name': 'fake_aggregate3', + 'availability_zone': 'fake_avail_zone', } + a1 = _create_aggregate_with_hosts(context=ctxt) + a2 = _create_aggregate_with_hosts(context=ctxt, values=values) + a3 = _create_aggregate_with_hosts(context=ctxt, values=values2, + hosts=['bar.openstack.org'], metadata={'badkey': 'bad'}) + r1 = db.aggregate_metadata_get_by_host(ctxt, 'foo.openstack.org') + self.assertEqual(r1['fake_key1'], set(['fake_value1'])) + self.assertFalse('badkey' in r1) + + def test_aggregate_metdata_get_by_host_with_key(self): + """Ensure we can get aggregates by host.""" + ctxt = context.get_admin_context() + values = {'name': 'fake_aggregate2', + 'availability_zone': 'fake_avail_zone', } + values2 = {'name': 'fake_aggregate3', + 'availability_zone': 'fake_avail_zone', } + a1 = _create_aggregate_with_hosts(context=ctxt) + a2 = _create_aggregate_with_hosts(context=ctxt, values=values) + a3 = _create_aggregate_with_hosts(context=ctxt, values=values2, + hosts=['foo.openstack.org'], metadata={'good': 'value'}) + r1 = db.aggregate_metadata_get_by_host(ctxt, 'foo.openstack.org', + key='good') + self.assertEqual(r1['good'], set(['value'])) + self.assertFalse('fake_key1' in r1) + # Delete metadata + db.aggregate_metadata_delete(ctxt, a3.id, 'good') + r2 = db.aggregate_metadata_get_by_host(ctxt, 'foo.openstack.org', + key='good') + self.assertFalse('good' in r2) def test_aggregate_get_by_host_not_found(self): """Ensure AggregateHostNotFound is raised with unknown host.""" ctxt = context.get_admin_context() _create_aggregate_with_hosts(context=ctxt) - self.assertRaises(exception.AggregateHostNotFound, - db.aggregate_get_by_host, ctxt, 'unknown_host') + self.assertEqual([], db.aggregate_get_by_host(ctxt, 'unknown_host')) def test_aggregate_delete_raise_not_found(self): """Ensure AggregateNotFound is raised when deleting an aggregate.""" diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index 6362d8ff5..ec84ba077 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -2149,9 +2149,9 @@ class XenAPILiveMigrateTestCase(stubs.XenAPITestBase): def __init__(self): self.metadetails = {"host": "test_host_uuid"} - def fake_aggregate_get_by_host(context, host): + def fake_aggregate_get_by_host(context, host, key=None): self.assertEqual(FLAGS.host, host) - return fake_aggregate() + return [fake_aggregate()] self.stubs.Set(db, "aggregate_get_by_host", fake_aggregate_get_by_host) @@ -2163,9 +2163,9 @@ class XenAPILiveMigrateTestCase(stubs.XenAPITestBase): def __init__(self): self.metadetails = {"dest_other": "test_host_uuid"} - def fake_aggregate_get_by_host(context, host): + def fake_aggregate_get_by_host(context, host, key=None): self.assertEqual(FLAGS.host, host) - return fake_aggregate() + return [fake_aggregate()] self.stubs.Set(db, "aggregate_get_by_host", fake_aggregate_get_by_host) diff --git a/nova/virt/xenapi/driver.py b/nova/virt/xenapi/driver.py index ac4547166..28beffc80 100644 --- a/nova/virt/xenapi/driver.py +++ b/nova/virt/xenapi/driver.py @@ -54,11 +54,11 @@ from nova.openstack.common import log as logging from nova.virt import driver from nova.virt.xenapi import host from nova.virt.xenapi import pool +from nova.virt.xenapi import pool_states from nova.virt.xenapi import vm_utils from nova.virt.xenapi import vmops from nova.virt.xenapi import volumeops - LOG = logging.getLogger(__name__) xenapi_opts = [ @@ -621,13 +621,12 @@ class XenAPISession(object): def _get_host_uuid(self): if self.is_slave: - try: - aggr = db.aggregate_get_by_host(context.get_admin_context(), - FLAGS.host) - except exception.AggregateHostNotFound: - LOG.exception(_('Host is member of a pool, but DB ' + aggr = db.aggregate_get_by_host(context.get_admin_context(), + FLAGS.host, key=pool_states.POOL_FLAG)[0] + if not aggr: + LOG.error(_('Host is member of a pool, but DB ' 'says otherwise')) - raise + raise exception.AggregateHostNotFound() return aggr.metadetails[FLAGS.host] else: with self._get_session() as session: diff --git a/nova/virt/xenapi/host.py b/nova/virt/xenapi/host.py index df6b4f85d..b45a9106c 100644 --- a/nova/virt/xenapi/host.py +++ b/nova/virt/xenapi/host.py @@ -28,6 +28,7 @@ from nova import db from nova import exception from nova import notifications from nova.openstack.common import jsonutils +from nova.virt.xenapi import pool_states from nova.virt.xenapi import vm_utils LOG = logging.getLogger(__name__) @@ -210,7 +211,10 @@ def _host_find(context, session, src, dst): # NOTE: this would be a lot simpler if nova-compute stored # FLAGS.host in the XenServer host's other-config map. # TODO(armando-migliaccio): improve according the note above - aggregate = db.aggregate_get_by_host(context, src) + aggregate = db.aggregate_get_by_host(context, src, + key=pool_states.POOL_FLAG)[0] + if not aggregate: + raise exception.AggregateHostNotFound(host=src) uuid = session.call_xenapi('host.get_record', dst)['uuid'] for compute_host, host_uuid in aggregate.metadetails.iteritems(): if host_uuid == uuid: diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py index 05592f978..d7204d372 100644 --- a/nova/virt/xenapi/pool.py +++ b/nova/virt/xenapi/pool.py @@ -67,14 +67,9 @@ class ResourcePool(object): LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state ' 'during operation on %(host)s') % locals()) - def _is_hv_pool(self, context, aggregate_id): - """Checks if aggregate is a hypervisor_pool""" - metadata = db.aggregate_metadata_get(context, aggregate_id) - return pool_states.POOL_FLAG in metadata.keys() - def add_to_aggregate(self, context, aggregate, host, **kwargs): """Add a compute host to an aggregate.""" - if not self._is_hv_pool(context, aggregate.id): + if not pool_states.is_hv_pool(context, aggregate.id): return invalid = {pool_states.CHANGING: 'setup in progress', @@ -126,7 +121,7 @@ class ResourcePool(object): def remove_from_aggregate(self, context, aggregate, host, **kwargs): """Remove a compute host from an aggregate.""" - if not self._is_hv_pool(context, aggregate.id): + if not pool_states.is_hv_pool(context, aggregate.id): return invalid = {pool_states.CREATED: 'no hosts to remove', diff --git a/nova/virt/xenapi/pool_states.py b/nova/virt/xenapi/pool_states.py index 5b3765cfc..82a85ce14 100644 --- a/nova/virt/xenapi/pool_states.py +++ b/nova/virt/xenapi/pool_states.py @@ -36,6 +36,7 @@ an 'active' pool goes into an 'error' state. To recover from such a state, admin intervention is required. Currently an error state is irreversible, that is, in order to recover from it an pool must be deleted. """ +from nova import db CREATED = 'created' CHANGING = 'changing' @@ -46,3 +47,9 @@ DISMISSED = 'dismissed' # Metadata keys KEY = 'operational_state' POOL_FLAG = 'hypervisor_pool' + + +def is_hv_pool(context, aggregate_id): + """Checks if aggregate is a hypervisor_pool""" + metadata = db.aggregate_metadata_get(context, aggregate_id) + return POOL_FLAG in metadata.keys() diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 8d443f498..abac03d57 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -44,6 +44,7 @@ from nova import utils from nova.virt import driver from nova.virt.xenapi import agent from nova.virt.xenapi import firewall +from nova.virt.xenapi import pool_states from nova.virt.xenapi import vm_utils from nova.virt.xenapi import volume_utils @@ -1450,7 +1451,10 @@ class VMOps(object): network_info=network_info) def _get_host_uuid_from_aggregate(self, context, hostname): - current_aggregate = db.aggregate_get_by_host(context, FLAGS.host) + current_aggregate = db.aggregate_get_by_host(context, FLAGS.host, + key=pool_states.POOL_FLAG)[0] + if not current_aggregate: + raise exception.AggregateHostNotFound(host=FLAGS.host) try: return current_aggregate.metadetails[hostname] except KeyError: -- cgit