From b8b46cbd6c06cb4979fa2f443892a2a1d60cc7bb Mon Sep 17 00:00:00 2001 From: Mate Lakat Date: Tue, 11 Sep 2012 16:53:05 +0100 Subject: xapi: fix create hypervisor pool Fixes bug 1049099. Fixing problems with the rpc api when creating hypervisor pools with xenapi. Rpc calls were not using the compute_rpcapi approach, thus were not properly versioned. Apart from that, the slave parameters were not forwarded to the master. A new (2.2) version is introduced where the rpc calls have the slave_info payload. Added tests to cover the pool cases. Some trivial extract methods performed on pool, to decouple the pool functionality from its dependencies. Change-Id: Ie44a1c09ef204affc4a657c71557691e83b22c22 --- nova/virt/xenapi/pool.py | 88 ++++++++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 40 deletions(-) (limited to 'nova/virt') diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py index d7204d372..71b21ce24 100644 --- a/nova/virt/xenapi/pool.py +++ b/nova/virt/xenapi/pool.py @@ -21,6 +21,7 @@ Management class for Pool-related functions (join, eject, etc). import urlparse +from nova.compute import rpcapi as compute_rpcapi from nova import db from nova import exception from nova import flags @@ -54,6 +55,13 @@ class ResourcePool(object): self._host_addr = host_rec['address'] self._host_uuid = host_rec['uuid'] self._session = session + self.compute_rpcapi = compute_rpcapi.ComputeAPI() + + def _is_hv_pool(self, context, aggregate_id): + return pool_states.is_hv_pool(context, aggregate_id) + + def _get_metadata(self, context, aggregate_id): + return db.aggregate_metadata_get(context, aggregate_id) def undo_aggregate_operation(self, context, op, aggregate_id, host, set_error): @@ -67,25 +75,25 @@ class ResourcePool(object): LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state ' 'during operation on %(host)s') % locals()) - def add_to_aggregate(self, context, aggregate, host, **kwargs): + def add_to_aggregate(self, context, aggregate, host, slave_info=None): """Add a compute host to an aggregate.""" - if not pool_states.is_hv_pool(context, aggregate.id): + if not self._is_hv_pool(context, aggregate.id): return invalid = {pool_states.CHANGING: 'setup in progress', pool_states.DISMISSED: 'aggregate deleted', pool_states.ERROR: 'aggregate in error'} - if (db.aggregate_metadata_get(context, aggregate.id)[pool_states.KEY] + if (self._get_metadata(context, aggregate.id)[pool_states.KEY] in invalid.keys()): raise exception.InvalidAggregateAction( action='add host', aggregate_id=aggregate.id, - reason=invalid[db.aggregate_metadata_get(context, + reason=invalid[self._get_metadata(context, aggregate.id) [pool_states.KEY]]) - if (db.aggregate_metadata_get(context, aggregate.id)[pool_states.KEY] + if (self._get_metadata(context, aggregate.id)[pool_states.KEY] == pool_states.CREATED): db.aggregate_metadata_add(context, aggregate.id, {pool_states.KEY: pool_states.CHANGING}) @@ -100,48 +108,50 @@ class ResourcePool(object): else: # the pool is already up and running, we need to figure out # whether we can serve the request from this host or not. - master_compute = db.aggregate_metadata_get(context, + master_compute = self._get_metadata(context, aggregate.id)['master_compute'] if master_compute == FLAGS.host and master_compute != host: # this is the master -> do a pool-join # To this aim, nova compute on the slave has to go down. # NOTE: it is assumed that ONLY nova compute is running now self._join_slave(aggregate.id, host, - kwargs.get('compute_uuid'), - kwargs.get('url'), kwargs.get('user'), - kwargs.get('passwd')) - metadata = {host: kwargs.get('xenhost_uuid'), } + slave_info.get('compute_uuid'), + slave_info.get('url'), slave_info.get('user'), + slave_info.get('passwd')) + metadata = {host: slave_info.get('xenhost_uuid'), } db.aggregate_metadata_add(context, aggregate.id, metadata) elif master_compute and master_compute != host: # send rpc cast to master, asking to add the following # host with specified credentials. - forward_request(context, "add_aggregate_host", master_compute, - aggregate.id, host, - self._host_addr, self._host_uuid) + slave_info = self._create_slave_info() - def remove_from_aggregate(self, context, aggregate, host, **kwargs): + self.compute_rpcapi.add_aggregate_host( + context, aggregate.id, host, master_compute, slave_info) + + def remove_from_aggregate(self, context, aggregate, host, slave_info=None): """Remove a compute host from an aggregate.""" - if not pool_states.is_hv_pool(context, aggregate.id): + slave_info = slave_info or dict() + if not self._is_hv_pool(context, aggregate.id): return invalid = {pool_states.CREATED: 'no hosts to remove', pool_states.CHANGING: 'setup in progress', pool_states.DISMISSED: 'aggregate deleted', } - if (db.aggregate_metadata_get(context, aggregate.id)[pool_states.KEY] + if (self._get_metadata(context, aggregate.id)[pool_states.KEY] in invalid.keys()): raise exception.InvalidAggregateAction( action='remove host', aggregate_id=aggregate.id, - reason=invalid[db.aggregate_metadata_get(context, + reason=invalid[self._get_metadata(context, aggregate.id)[pool_states.KEY]]) - master_compute = db.aggregate_metadata_get(context, + master_compute = self._get_metadata(context, aggregate.id)['master_compute'] if master_compute == FLAGS.host and master_compute != host: # this is the master -> instruct it to eject a host from the pool - host_uuid = db.aggregate_metadata_get(context, aggregate.id)[host] + host_uuid = self._get_metadata(context, aggregate.id)[host] self._eject_slave(aggregate.id, - kwargs.get('compute_uuid'), host_uuid) + slave_info.get('compute_uuid'), host_uuid) db.aggregate_metadata_delete(context, aggregate.id, host) elif master_compute == host: # Remove master from its own pool -> destroy pool only if the @@ -161,9 +171,10 @@ class ResourcePool(object): db.aggregate_metadata_delete(context, aggregate.id, key) elif master_compute and master_compute != host: # A master exists -> forward pool-eject request to master - forward_request(context, "remove_aggregate_host", master_compute, - aggregate.id, host, - self._host_addr, self._host_uuid) + slave_info = self._create_slave_info() + + self.compute_rpcapi.remove_aggregate_host( + context, aggregate.id, host, master_compute, slave_info) else: # this shouldn't have happened raise exception.AggregateError(aggregate_id=aggregate.id, @@ -232,24 +243,21 @@ class ResourcePool(object): action='remove_from_aggregate', reason=str(e.details)) + def _create_slave_info(self): + """XenServer specific info needed to join the hypervisor pool""" + # replace the address from the xenapi connection url + # because this might be 169.254.0.1, i.e. xenapi + # NOTE: password in clear is not great, but it'll do for now + sender_url = swap_xapi_host( + FLAGS.xenapi_connection_url, self._host_addr) -def forward_request(context, request_type, master, aggregate_id, - slave_compute, slave_address, slave_uuid): - """Casts add/remove requests to the pool master.""" - # replace the address from the xenapi connection url - # because this might be 169.254.0.1, i.e. xenapi - # NOTE: password in clear is not great, but it'll do for now - sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address) - rpc.cast(context, rpc.queue_get_for(context, FLAGS.compute_topic, master), - {"method": request_type, - "args": {"aggregate_id": aggregate_id, - "host": slave_compute, - "url": sender_url, - "user": FLAGS.xenapi_connection_username, - "passwd": FLAGS.xenapi_connection_password, - "compute_uuid": vm_utils.get_this_vm_uuid(), - "xenhost_uuid": slave_uuid, }, - }) + return { + "url": sender_url, + "user": FLAGS.xenapi_connection_username, + "passwd": FLAGS.xenapi_connection_password, + "compute_uuid": vm_utils.get_this_vm_uuid(), + "xenhost_uuid": self._host_uuid, + } def swap_xapi_host(url, host_addr): -- cgit