From 424de7eea2588a3f4143e5874aac01d0dd1917e6 Mon Sep 17 00:00:00 2001 From: Armando Migliaccio Date: Wed, 15 Feb 2012 21:17:06 +0000 Subject: blueprint host-aggregates: improvements and clean-up This changeset addresses a number of issues found during testing: - avoid name conflicts during aggregate creation (see db/* changes) - avoid masking of XenAPI.Failure if pool-join fails (see plugins/* changes) - preserve VM placement decisions made during scheduling (see xenapi/vmops.py) - ensure plugins are called on the right hosts in XS pools (see xenapi_con.py) - stores master uuid in aggregate metadata for use in VM live migration and raise InvalidAction rather than Aggregate error if we attempt to remove a mster (see xenapi/pool.py and compute/manager.py) - clean-up of unit tests Change-Id: I881a94d87efe1e81bd4f86667e75f5cbee50ce91 --- nova/compute/manager.py | 17 ++++++++---- nova/db/api.py | 5 ++++ nova/db/sqlalchemy/api.py | 32 ++++++++++++++++++++-- nova/tests/test_db_api.py | 23 ++++++++++++++++ nova/tests/test_xenapi.py | 7 +++-- nova/tests/xenapi/stubs.py | 18 +++++------- nova/virt/xenapi/pool.py | 16 ++++++----- nova/virt/xenapi/vmops.py | 4 ++- nova/virt/xenapi_conn.py | 26 ++++++++++++++++-- .../xenserver/xenapi/etc/xapi.d/plugins/xenhost | 8 +++--- 10 files changed, 119 insertions(+), 37 deletions(-) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index ac9bebafe..5fb5ff9c6 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -2341,16 +2341,21 @@ class ComputeManager(manager.SchedulerDependentManager): try: self.driver.remove_from_aggregate(context, aggregate, host, **kwargs) - except exception.AggregateError: + except (exception.AggregateError, + exception.InvalidAggregateAction) as e: error = sys.exc_info() - self._undo_aggregate_operation(context, self.db.aggregate_host_add, - aggregate.id, host) + self._undo_aggregate_operation( + context, self.db.aggregate_host_add, + aggregate.id, host, + isinstance(e, exception.AggregateError)) raise error[0], error[1], error[2] - def _undo_aggregate_operation(self, context, op, aggregate_id, host): + def _undo_aggregate_operation(self, context, op, aggregate_id, + host, set_error=True): try: - status = {'operational_state': aggregate_states.ERROR} - self.db.aggregate_update(context, aggregate_id, status) + if set_error: + status = {'operational_state': aggregate_states.ERROR} + self.db.aggregate_update(context, aggregate_id, status) op(context, aggregate_id, host) except Exception: LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state ' diff --git a/nova/db/api.py b/nova/db/api.py index bd8057532..48fce9d57 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -1793,6 +1793,11 @@ def aggregate_get(context, aggregate_id, read_deleted='no'): return IMPL.aggregate_get(context, aggregate_id, read_deleted) +def aggregate_get_by_host(context, host, read_deleted='no'): + """Get a specific aggregate by host""" + return IMPL.aggregate_get_by_host(context, host, read_deleted) + + def aggregate_update(context, aggregate_id, values): """Update the attributes of an aggregates. If values contains a metadata key, it updates the aggregate metadata too.""" diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 026fe2f28..733e903f7 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -4271,12 +4271,24 @@ def _aggregate_get_query(context, model_class, id_field, id, @require_admin_context def aggregate_create(context, values, metadata=None): - try: + session = get_session() + aggregate = _aggregate_get_query(context, + models.Aggregate, + models.Aggregate.name, + values['name'], + session=session, + read_deleted='yes').first() + if not aggregate: aggregate = models.Aggregate() values.setdefault('operational_state', aggregate_states.CREATED) aggregate.update(values) - aggregate.save() - except exception.DBError: + aggregate.save(session=session) + elif aggregate.deleted: + aggregate.update({'deleted': False, + 'deleted_at': None, + 'availability_zone': values['availability_zone']}) + aggregate.save(session=session) + else: raise exception.AggregateNameExists(aggregate_name=values['name']) if metadata: aggregate_metadata_add(context, aggregate.id, metadata) @@ -4296,6 +4308,20 @@ def aggregate_get(context, aggregate_id, read_deleted='no'): return aggregate +@require_admin_context +def aggregate_get_by_host(context, host, read_deleted='no'): + aggregate_host = _aggregate_get_query(context, + models.AggregateHost, + models.AggregateHost.host, + host, + read_deleted='no').first() + + if not aggregate_host: + raise exception.AggregateHostNotFound(host=host) + + return aggregate_get(context, aggregate_host.aggregate_id, read_deleted) + + @require_admin_context def aggregate_update(context, aggregate_id, values): session = get_session() diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py index 4cb17d958..cd1adcfbf 100644 --- a/nova/tests/test_db_api.py +++ b/nova/tests/test_db_api.py @@ -338,6 +338,15 @@ class AggregateDBApiTestCase(test.TestCase): result = _create_aggregate(metadata=None) self.assertEqual(result['operational_state'], 'created') + def test_aggregate_create_avoid_name_conflict(self): + """Test we can avoid conflict on deleted aggregates.""" + r1 = _create_aggregate(metadata=None) + db.aggregate_delete(context.get_admin_context(), r1.id) + values = {'name': r1.name, 'availability_zone': 'new_zone'} + r2 = _create_aggregate(values=values) + self.assertEqual(r2.name, values['name']) + self.assertEqual(r2.availability_zone, values['availability_zone']) + def test_aggregate_create_raise_exist_exc(self): """Ensure aggregate names are distinct.""" _create_aggregate(metadata=None) @@ -383,6 +392,20 @@ class AggregateDBApiTestCase(test.TestCase): self.assertEqual(_get_fake_aggr_hosts(), expected.hosts) self.assertEqual(_get_fake_aggr_metadata(), expected.metadetails) + def test_aggregate_get_by_host(self): + """Ensure we can get an aggregate by host.""" + ctxt = context.get_admin_context() + r1 = _create_aggregate_with_hosts(context=ctxt) + r2 = db.aggregate_get_by_host(ctxt, 'foo.openstack.org') + self.assertEqual(r1.id, r2.id) + + def test_aggregate_get_by_host_not_found(self): + """Ensure AggregateHostNotFound is raised with unknown host.""" + ctxt = context.get_admin_context() + _create_aggregate_with_hosts(context=ctxt) + self.assertRaises(exception.AggregateHostNotFound, + db.aggregate_get_by_host, ctxt, 'unknown_host') + def test_aggregate_delete_raise_not_found(self): """Ensure AggregateNotFound is raised when deleting an aggregate.""" ctxt = context.get_admin_context() diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index b08bbd69c..312aaea37 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -1757,10 +1757,13 @@ class XenAPIAggregateTestCase(test.TestCase): 'Dom0IptablesFirewallDriver', host='host') xenapi_fake.reset() + host_ref = xenapi_fake.get_all('host')[0] stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) self.context = context.get_admin_context() self.conn = xenapi_conn.get_connection(False) - self.fake_metadata = {'master_compute': 'host'} + self.fake_metadata = {'master_compute': 'host', + 'host': xenapi_fake.get_record('host', + host_ref)['uuid']} def tearDown(self): super(XenAPIAggregateTestCase, self).tearDown() @@ -1871,7 +1874,7 @@ class XenAPIAggregateTestCase(test.TestCase): aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE, hosts=['host', 'host2'], metadata=self.fake_metadata) - self.assertRaises(exception.AggregateError, + self.assertRaises(exception.InvalidAggregateAction, self.conn._pool.remove_from_aggregate, self.context, aggregate, "host") diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index df444d515..01077be8d 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -202,6 +202,12 @@ class FakeSessionForVMTests(fake.SessionBase): vm['is_a_template'] = False vm['is_control_domain'] = False vm['domid'] = random.randrange(1, 1 << 16) + return vm + + def VM_start_on(self, _1, vm_ref, host_ref, _2, _3): + vm_rec = self.VM_start(_1, vm_ref, _2, _3) + host_rec = fake.get_record('host', host_ref) + vm_rec['resident_on'] = host_rec['uuid'] def VM_snapshot(self, session_ref, vm_ref, label): status = "Running" @@ -334,7 +340,7 @@ class FakeSessionForVolumeFailedTests(FakeSessionForVolumeTests): pass -class FakeSessionForMigrationTests(fake.SessionBase): +class FakeSessionForMigrationTests(FakeSessionForVMTests): """Stubs out a XenAPISession for Migration tests""" def __init__(self, uri): super(FakeSessionForMigrationTests, self).__init__(uri) @@ -342,16 +348,6 @@ class FakeSessionForMigrationTests(fake.SessionBase): def VDI_get_by_uuid(self, *args): return 'hurr' - def VM_start(self, _1, ref, _2, _3): - vm = fake.get_record('VM', ref) - if vm['power_state'] != 'Halted': - raise fake.Failure(['VM_BAD_POWER_STATE', ref, 'Halted', - vm['power_state']]) - vm['power_state'] = 'Running' - vm['is_a_template'] = False - vm['is_control_domain'] = False - vm['domid'] = random.randrange(1, 1 << 16) - def VM_set_name_label(self, *args): pass diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py index 95f0f3467..01db91e31 100644 --- a/nova/virt/xenapi/pool.py +++ b/nova/virt/xenapi/pool.py @@ -61,11 +61,11 @@ class ResourcePool(object): if len(aggregate.hosts) == 1: # this is the first host of the pool -> make it master self._init_pool(aggregate.id, aggregate.name) - # save metadata so that we can find the master again: - # the password should be encrypted, really. + # save metadata so that we can find the master again values = { 'operational_state': aggregate_states.ACTIVE, - 'metadata': {'master_compute': host}, + 'metadata': {'master_compute': host, + host: self._host_uuid}, } db.aggregate_update(context, aggregate.id, values) else: @@ -85,7 +85,6 @@ class ResourcePool(object): elif master_compute and master_compute != host: # send rpc cast to master, asking to add the following # host with specified credentials. - # NOTE: password in clear is not great, but it'll do for now forward_request(context, "add_aggregate_host", master_compute, aggregate.id, host, self._host_addr, self._host_uuid) @@ -104,15 +103,17 @@ class ResourcePool(object): # master is on its own, otherwise raise fault. Destroying a # pool made only by master is fictional if len(aggregate.hosts) > 1: - raise exception.AggregateError( + # NOTE: this could be avoided by doing a master + # re-election, but this is simpler for now. + raise exception.InvalidAggregateAction( aggregate_id=aggregate.id, action='remove_from_aggregate', reason=_('Unable to eject %(host)s ' 'from the pool; pool not empty') % locals()) self._clear_pool(aggregate.id) - db.aggregate_metadata_delete(context, - aggregate.id, 'master_compute') + for key in ['master_compute', host]: + db.aggregate_metadata_delete(context, aggregate.id, key) elif master_compute and master_compute != host: # A master exists -> forward pool-eject request to master forward_request(context, "remove_aggregate_host", master_compute, @@ -194,6 +195,7 @@ def forward_request(context, request_type, master, aggregate_id, """Casts add/remove requests to the pool master.""" # replace the address from the xenapi connection url # because this might be 169.254.0.1, i.e. xenapi + # NOTE: password in clear is not great, but it'll do for now sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address) rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master), {"method": request_type, diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 8ee34aaa7..78177920c 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -181,7 +181,9 @@ class VMOps(object): raise Exception(_('Attempted to power on non-existent instance' ' bad instance id %s') % instance.id) LOG.debug(_("Starting instance %s"), instance.name) - self._session.call_xenapi('VM.start', vm_ref, False, False) + self._session.call_xenapi('VM.start_on', vm_ref, + self._session.get_xenapi_host(), + False, False) def _create_disks(self, context, instance, image_meta): disk_image_type = VMHelper.determine_disk_image_type(image_meta) diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index c9363bb74..77efb34e3 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -506,8 +506,10 @@ class XenAPISession(object): def __init__(self, url, user, pw): self.XenAPI = self.get_imported_xenapi() self._sessions = queue.Queue() + self.host_uuid = None exception = self.XenAPI.Failure(_("Unable to log in to XenAPI " - "(is the Dom0 disk full?)")) + "(is the Dom0 disk full?)")) + is_slave = False for i in xrange(FLAGS.xenapi_connection_concurrent): try: session = self._create_session(url) @@ -520,10 +522,21 @@ class XenAPISession(object): session = self.XenAPI.Session(pool.swap_xapi_host(url, master)) session.login_with_password(user, pw) + is_slave = True else: raise self._sessions.put(session) + if is_slave: + try: + aggr = db.aggregate_get_by_host(context.get_admin_context(), + FLAGS.host) + self.host_uuid = aggr.metadetails[FLAGS.host] + except exception.AggregateHostNotFound: + LOG.exception(_('Host is member of a pool, but DB ' + 'says otherwise')) + raise + def get_product_version(self): """Return a tuple of (major, minor, rev) for the host version""" host = self.get_xenapi_host() @@ -551,9 +564,12 @@ class XenAPISession(object): self._sessions.put(session) def get_xenapi_host(self): - """Return the xenapi host""" + """Return the xenapi host on which nova-compute runs on.""" with self._get_session() as session: - return session.xenapi.session.get_this_host(session.handle) + if self.host_uuid: + return session.xenapi.host.get_by_uuid(self.host_uuid) + else: + return session.xenapi.session.get_this_host(session.handle) def call_xenapi(self, method, *args): """Call the specified XenAPI method on a background thread.""" @@ -578,6 +594,10 @@ class XenAPISession(object): # _get_session() acquires a session too, it can result in a deadlock # if multiple greenthreads race with each other. See bug 924918 host = self.get_xenapi_host() + # NOTE(armando): pass the host uuid along with the args so that + # the plugin gets executed on the right host when using XS pools + if self.host_uuid: + args['host_uuid'] = self.host_uuid with self._get_session() as session: return tpool.execute(self._unwrap_plugin_exceptions, session.xenapi.Async.host.call_plugin, diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost index 9a7ad84af..8fcf9fee0 100644 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost @@ -117,9 +117,9 @@ def _resume_compute(session, compute_ref, compute_uuid): logging.exception('Waited %d seconds for the slave to ' 'become available.' % (c * DEFAULT_SLEEP)) time.sleep(DEFAULT_SLEEP) - raise pluginlib.PluginError('Unrecoverable error: the host has ' - 'not come back for more than %d seconds' - % (DEFAULT_SLEEP * (DEFAULT_TRIES + 1))) + raise pluginlib.PluginError('Unrecoverable error: the host has ' + 'not come back for more than %d seconds' + % (DEFAULT_SLEEP * (DEFAULT_TRIES + 1))) def _get_host_uuid(): @@ -315,7 +315,7 @@ def host_data(self, arg_dict): """Runs the commands on the xenstore host to return the current status information. """ - host_uuid = _get_host_uuid() + host_uuid = arg_dict.get('host_uuid', _get_host_uuid()) cmd = "xe host-param-list uuid=%s" % host_uuid resp = _run_command(cmd) parsed_data = parse_response(resp) -- cgit