diff options
author | Jenkins <jenkins@review.openstack.org> | 2012-09-19 15:00:02 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2012-09-19 15:00:02 +0000 |
commit | a484bfea8ff3f274bb71be11ff387aa671c1e798 (patch) | |
tree | 42fff1f656faff46ec05ac2a6a1ad5ad67b11fd2 /nova | |
parent | 737ca6e7ecdfa9ffd6744e1e1a54c53c4c897d83 (diff) | |
parent | b8b46cbd6c06cb4979fa2f443892a2a1d60cc7bb (diff) | |
download | nova-a484bfea8ff3f274bb71be11ff387aa671c1e798.tar.gz nova-a484bfea8ff3f274bb71be11ff387aa671c1e798.tar.xz nova-a484bfea8ff3f274bb71be11ff387aa671c1e798.zip |
Merge "xapi: fix create hypervisor pool"
Diffstat (limited to 'nova')
-rw-r--r-- | nova/compute/manager.py | 13 | ||||
-rw-r--r-- | nova/compute/rpcapi.py | 22 | ||||
-rw-r--r-- | nova/tests/compute/test_compute.py | 31 | ||||
-rw-r--r-- | nova/tests/compute/test_rpcapi.py | 5 | ||||
-rw-r--r-- | nova/tests/test_xenapi.py | 140 | ||||
-rw-r--r-- | nova/virt/xenapi/pool.py | 88 |
6 files changed, 235 insertions, 64 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py index cd756af65..2f977b26c 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -213,7 +213,7 @@ def _get_image_meta(context, image_ref): class ComputeManager(manager.SchedulerDependentManager): """Manages the running instances from creation to destruction.""" - RPC_API_VERSION = '2.1' + RPC_API_VERSION = '2.2' def __init__(self, compute_driver=None, *args, **kwargs): """Load configuration options and connect to the hypervisor.""" @@ -2812,11 +2812,12 @@ class ComputeManager(manager.SchedulerDependentManager): self._set_instance_error_state(context, instance_uuid) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) - def add_aggregate_host(self, context, aggregate_id, host): + def add_aggregate_host(self, context, aggregate_id, host, slave_info=None): """Notify hypervisor of change (for hypervisor pools).""" aggregate = self.db.aggregate_get(context, aggregate_id) try: - self.driver.add_to_aggregate(context, aggregate, host) + self.driver.add_to_aggregate(context, aggregate, host, + slave_info=slave_info) except exception.AggregateError: with excutils.save_and_reraise_exception(): self.driver.undo_aggregate_operation(context, @@ -2824,11 +2825,13 @@ class ComputeManager(manager.SchedulerDependentManager): aggregate.id, host) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) - def remove_aggregate_host(self, context, aggregate_id, host): + def remove_aggregate_host(self, context, aggregate_id, + host, slave_info=None): """Removes a host from a physical hypervisor pool.""" aggregate = self.db.aggregate_get(context, aggregate_id) try: - self.driver.remove_from_aggregate(context, aggregate, host) + self.driver.remove_from_aggregate(context, aggregate, host, + slave_info=slave_info) except (exception.AggregateError, exception.InvalidAggregateAction) as e: with excutils.save_and_reraise_exception(): diff --git a/nova/compute/rpcapi.py b/nova/compute/rpcapi.py index b53cc1e7a..2e3873c19 100644 --- a/nova/compute/rpcapi.py +++ b/nova/compute/rpcapi.py @@ -128,6 +128,8 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): 2.0 - Remove 1.x backwards compat 2.1 - Adds orig_sys_metadata to rebuild_instance() + 2.2 - Adds slave_info parameter to add_aggregate_host() and + remove_aggregate_host() ''' # @@ -145,7 +147,8 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): topic=FLAGS.compute_topic, default_version=self.BASE_RPC_API_VERSION) - def add_aggregate_host(self, ctxt, aggregate_id, host_param, host): + def add_aggregate_host(self, ctxt, aggregate_id, host_param, host, + slave_info=None): '''Add aggregate host. :param ctxt: request context @@ -154,9 +157,12 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): parameter for the remote method. :param host: This is the host to send the message to. ''' + self.cast(ctxt, self.make_msg('add_aggregate_host', - aggregate_id=aggregate_id, host=host_param), - topic=_compute_topic(self.topic, ctxt, host, None)) + aggregate_id=aggregate_id, host=host_param, + slave_info=slave_info), + topic=_compute_topic(self.topic, ctxt, host, None), + version='2.2') def add_fixed_ip_to_instance(self, ctxt, instance, network_id): instance_p = jsonutils.to_primitive(instance) @@ -355,7 +361,8 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): self.cast(ctxt, self.make_msg('refresh_provider_fw_rules'), _compute_topic(self.topic, ctxt, host, None)) - def remove_aggregate_host(self, ctxt, aggregate_id, host_param, host): + def remove_aggregate_host(self, ctxt, aggregate_id, host_param, host, + slave_info=None): '''Remove aggregate host. :param ctxt: request context @@ -364,9 +371,12 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy): parameter for the remote method. :param host: This is the host to send the message to. ''' + self.cast(ctxt, self.make_msg('remove_aggregate_host', - aggregate_id=aggregate_id, host=host_param), - topic=_compute_topic(self.topic, ctxt, host, None)) + aggregate_id=aggregate_id, host=host_param, + slave_info=slave_info), + topic=_compute_topic(self.topic, ctxt, host, None), + version='2.2') def remove_fixed_ip_from_instance(self, ctxt, instance, address): instance_p = jsonutils.to_primitive(instance) diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py index de07d4d68..14edaa7a0 100644 --- a/nova/tests/compute/test_compute.py +++ b/nova/tests/compute/test_compute.py @@ -4656,7 +4656,7 @@ class ComputeAggrTestCase(BaseTestCase): self.aggr = db.aggregate_create(self.context, values) def test_add_aggregate_host(self): - def fake_driver_add_to_aggregate(context, aggregate, host): + def fake_driver_add_to_aggregate(context, aggregate, host, **_ignore): fake_driver_add_to_aggregate.called = True return {"foo": "bar"} self.stubs.Set(self.compute.driver, "add_to_aggregate", @@ -4666,7 +4666,8 @@ class ComputeAggrTestCase(BaseTestCase): self.assertTrue(fake_driver_add_to_aggregate.called) def test_remove_aggregate_host(self): - def fake_driver_remove_from_aggregate(context, aggregate, host): + def fake_driver_remove_from_aggregate(context, aggregate, host, + **_ignore): fake_driver_remove_from_aggregate.called = True self.assertEqual("host", host, "host") return {"foo": "bar"} @@ -4676,6 +4677,32 @@ class ComputeAggrTestCase(BaseTestCase): self.compute.remove_aggregate_host(self.context, self.aggr.id, "host") self.assertTrue(fake_driver_remove_from_aggregate.called) + def test_add_aggregate_host_passes_slave_info_to_driver(self): + def driver_add_to_aggregate(context, aggregate, host, **kwargs): + self.assertEquals(self.context, context) + self.assertEquals(aggregate.id, self.aggr.id) + self.assertEquals(host, "the_host") + self.assertEquals("SLAVE_INFO", kwargs.get("slave_info")) + + self.stubs.Set(self.compute.driver, "add_to_aggregate", + driver_add_to_aggregate) + + self.compute.add_aggregate_host(self.context, self.aggr.id, + "the_host", slave_info="SLAVE_INFO") + + def test_remove_from_aggregate_passes_slave_info_to_driver(self): + def driver_remove_from_aggregate(context, aggregate, host, **kwargs): + self.assertEquals(self.context, context) + self.assertEquals(aggregate.id, self.aggr.id) + self.assertEquals(host, "the_host") + self.assertEquals("SLAVE_INFO", kwargs.get("slave_info")) + + self.stubs.Set(self.compute.driver, "remove_from_aggregate", + driver_remove_from_aggregate) + + self.compute.remove_aggregate_host(self.context, + self.aggr.id, "the_host", slave_info="SLAVE_INFO") + class ComputePolicyTestCase(BaseTestCase): diff --git a/nova/tests/compute/test_rpcapi.py b/nova/tests/compute/test_rpcapi.py index 5728bdf91..559a56a0a 100644 --- a/nova/tests/compute/test_rpcapi.py +++ b/nova/tests/compute/test_rpcapi.py @@ -99,7 +99,7 @@ class ComputeRpcAPITestCase(test.TestCase): def test_add_aggregate_host(self): self._test_compute_api('add_aggregate_host', 'cast', aggregate_id='id', - host_param='host', host='host') + host_param='host', host='host', slave_info={}, version='2.2') def test_add_fixed_ip_to_instance(self): self._test_compute_api('add_fixed_ip_to_instance', 'cast', @@ -249,7 +249,8 @@ class ComputeRpcAPITestCase(test.TestCase): def test_remove_aggregate_host(self): self._test_compute_api('remove_aggregate_host', 'cast', - aggregate_id='id', host_param='host', host='host') + aggregate_id='id', host_param='host', host='host', + slave_info={}, version='2.2') def test_remove_fixed_ip_from_instance(self): self._test_compute_api('remove_fixed_ip_from_instance', 'cast', diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index dced3ddd9..b543f3e0f 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -44,6 +44,7 @@ from nova.tests.xenapi import stubs from nova.virt.xenapi import agent from nova.virt.xenapi import driver as xenapi_conn from nova.virt.xenapi import fake as xenapi_fake +from nova.virt.xenapi import pool from nova.virt.xenapi import pool_states from nova.virt.xenapi import vm_utils from nova.virt.xenapi import vmops @@ -1870,15 +1871,44 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase): 'host': xenapi_fake.get_record('host', host_ref)['uuid']} - def test_add_to_aggregate_called(self): - def fake_add_to_aggregate(context, aggregate, host): - fake_add_to_aggregate.called = True + def test_pool_add_to_aggregate_called_by_driver(self): + + calls = [] + + def pool_add_to_aggregate(context, aggregate, host, slave_info=None): + self.assertEquals("CONTEXT", context) + self.assertEquals("AGGREGATE", aggregate) + self.assertEquals("HOST", host) + self.assertEquals("SLAVEINFO", slave_info) + calls.append(pool_add_to_aggregate) self.stubs.Set(self.conn._pool, "add_to_aggregate", - fake_add_to_aggregate) + pool_add_to_aggregate) + + self.conn.add_to_aggregate("CONTEXT", "AGGREGATE", "HOST", + slave_info="SLAVEINFO") - self.conn.add_to_aggregate(None, None, None) - self.assertTrue(fake_add_to_aggregate.called) + self.assertTrue(pool_add_to_aggregate in calls) + + def test_pool_remove_from_aggregate_called_by_driver(self): + + calls = [] + + def pool_remove_from_aggregate(context, aggregate, host, + slave_info=None): + self.assertEquals("CONTEXT", context) + self.assertEquals("AGGREGATE", aggregate) + self.assertEquals("HOST", host) + self.assertEquals("SLAVEINFO", slave_info) + calls.append(pool_remove_from_aggregate) + self.stubs.Set(self.conn._pool, + "remove_from_aggregate", + pool_remove_from_aggregate) + + self.conn.remove_from_aggregate("CONTEXT", "AGGREGATE", "HOST", + slave_info="SLAVEINFO") + + self.assertTrue(pool_remove_from_aggregate in calls) def test_add_to_aggregate_for_first_host_sets_metadata(self): def fake_init_pool(id, name): @@ -1900,11 +1930,11 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase): aggregate = self._aggregate_setup(hosts=['host', 'host2'], metadata=self.fake_metadata) self.conn._pool.add_to_aggregate(self.context, aggregate, "host2", - compute_uuid='fake_uuid', + dict(compute_uuid='fake_uuid', url='fake_url', user='fake_user', passwd='fake_pass', - xenhost_uuid='fake_uuid') + xenhost_uuid='fake_uuid')) self.assertTrue(fake_join_slave.called) def test_add_to_aggregate_first_host(self): @@ -2060,7 +2090,7 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase): def test_add_aggregate_host_raise_err(self): """Ensure the undo operation works correctly on add.""" - def fake_driver_add_to_aggregate(context, aggregate, host): + def fake_driver_add_to_aggregate(context, aggregate, host, **_ignore): raise exception.AggregateError self.stubs.Set(self.compute.driver, "add_to_aggregate", fake_driver_add_to_aggregate) @@ -2078,6 +2108,98 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase): self.assertEqual(excepted.hosts, []) +class Aggregate(object): + def __init__(self, id=None, hosts=None): + self.id = id + self.hosts = hosts or [] + + +class MockComputeAPI(object): + def __init__(self): + self._mock_calls = [] + + def add_aggregate_host(self, ctxt, aggregate_id, + host_param, host, slave_info): + self._mock_calls.append(( + self.add_aggregate_host, ctxt, aggregate_id, + host_param, host, slave_info)) + + def remove_aggregate_host(self, ctxt, aggregate_id, host_param, + host, slave_info): + self._mock_calls.append(( + self.remove_aggregate_host, ctxt, aggregate_id, + host_param, host, slave_info)) + + +class StubDependencies(object): + """Stub dependencies for ResourcePool""" + + def __init__(self): + self.compute_rpcapi = MockComputeAPI() + + def _is_hv_pool(self, *_ignore): + return True + + def _get_metadata(self, *_ignore): + return { + pool_states.KEY: {}, + 'master_compute': 'master' + } + + def _create_slave_info(self, *ignore): + return "SLAVE_INFO" + + +class ResourcePoolWithStubs(StubDependencies, pool.ResourcePool): + """ A ResourcePool, use stub dependencies """ + + +class HypervisorPoolTestCase(test.TestCase): + + def test_slave_asks_master_to_add_slave_to_pool(self): + slave = ResourcePoolWithStubs() + aggregate = Aggregate(id=98, hosts=[]) + + slave.add_to_aggregate("CONTEXT", aggregate, "slave") + + self.assertIn( + (slave.compute_rpcapi.add_aggregate_host, + "CONTEXT", 98, "slave", "master", "SLAVE_INFO"), + slave.compute_rpcapi._mock_calls) + + def test_slave_asks_master_to_remove_slave_from_pool(self): + slave = ResourcePoolWithStubs() + aggregate = Aggregate(id=98, hosts=[]) + + slave.remove_from_aggregate("CONTEXT", aggregate, "slave") + + self.assertIn( + (slave.compute_rpcapi.remove_aggregate_host, + "CONTEXT", 98, "slave", "master", "SLAVE_INFO"), + slave.compute_rpcapi._mock_calls) + + +class SwapXapiHostTestCase(test.TestCase): + + def test_swapping(self): + self.assertEquals( + "http://otherserver:8765/somepath", + pool.swap_xapi_host( + "http://someserver:8765/somepath", 'otherserver')) + + def test_no_port(self): + self.assertEquals( + "http://otherserver/somepath", + pool.swap_xapi_host( + "http://someserver/somepath", 'otherserver')) + + def test_no_path(self): + self.assertEquals( + "http://otherserver", + pool.swap_xapi_host( + "http://someserver", 'otherserver')) + + class VmUtilsTestCase(test.TestCase): """Unit tests for xenapi utils.""" diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py index d7204d372..71b21ce24 100644 --- a/nova/virt/xenapi/pool.py +++ b/nova/virt/xenapi/pool.py @@ -21,6 +21,7 @@ Management class for Pool-related functions (join, eject, etc). import urlparse +from nova.compute import rpcapi as compute_rpcapi from nova import db from nova import exception from nova import flags @@ -54,6 +55,13 @@ class ResourcePool(object): self._host_addr = host_rec['address'] self._host_uuid = host_rec['uuid'] self._session = session + self.compute_rpcapi = compute_rpcapi.ComputeAPI() + + def _is_hv_pool(self, context, aggregate_id): + return pool_states.is_hv_pool(context, aggregate_id) + + def _get_metadata(self, context, aggregate_id): + return db.aggregate_metadata_get(context, aggregate_id) def undo_aggregate_operation(self, context, op, aggregate_id, host, set_error): @@ -67,25 +75,25 @@ class ResourcePool(object): LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state ' 'during operation on %(host)s') % locals()) - def add_to_aggregate(self, context, aggregate, host, **kwargs): + def add_to_aggregate(self, context, aggregate, host, slave_info=None): """Add a compute host to an aggregate.""" - if not pool_states.is_hv_pool(context, aggregate.id): + if not self._is_hv_pool(context, aggregate.id): return invalid = {pool_states.CHANGING: 'setup in progress', pool_states.DISMISSED: 'aggregate deleted', pool_states.ERROR: 'aggregate in error'} - if (db.aggregate_metadata_get(context, aggregate.id)[pool_states.KEY] + if (self._get_metadata(context, aggregate.id)[pool_states.KEY] in invalid.keys()): raise exception.InvalidAggregateAction( action='add host', aggregate_id=aggregate.id, - reason=invalid[db.aggregate_metadata_get(context, + reason=invalid[self._get_metadata(context, aggregate.id) [pool_states.KEY]]) - if (db.aggregate_metadata_get(context, aggregate.id)[pool_states.KEY] + if (self._get_metadata(context, aggregate.id)[pool_states.KEY] == pool_states.CREATED): db.aggregate_metadata_add(context, aggregate.id, {pool_states.KEY: pool_states.CHANGING}) @@ -100,48 +108,50 @@ class ResourcePool(object): else: # the pool is already up and running, we need to figure out # whether we can serve the request from this host or not. - master_compute = db.aggregate_metadata_get(context, + master_compute = self._get_metadata(context, aggregate.id)['master_compute'] if master_compute == FLAGS.host and master_compute != host: # this is the master -> do a pool-join # To this aim, nova compute on the slave has to go down. # NOTE: it is assumed that ONLY nova compute is running now self._join_slave(aggregate.id, host, - kwargs.get('compute_uuid'), - kwargs.get('url'), kwargs.get('user'), - kwargs.get('passwd')) - metadata = {host: kwargs.get('xenhost_uuid'), } + slave_info.get('compute_uuid'), + slave_info.get('url'), slave_info.get('user'), + slave_info.get('passwd')) + metadata = {host: slave_info.get('xenhost_uuid'), } db.aggregate_metadata_add(context, aggregate.id, metadata) elif master_compute and master_compute != host: # send rpc cast to master, asking to add the following # host with specified credentials. - forward_request(context, "add_aggregate_host", master_compute, - aggregate.id, host, - self._host_addr, self._host_uuid) + slave_info = self._create_slave_info() - def remove_from_aggregate(self, context, aggregate, host, **kwargs): + self.compute_rpcapi.add_aggregate_host( + context, aggregate.id, host, master_compute, slave_info) + + def remove_from_aggregate(self, context, aggregate, host, slave_info=None): """Remove a compute host from an aggregate.""" - if not pool_states.is_hv_pool(context, aggregate.id): + slave_info = slave_info or dict() + if not self._is_hv_pool(context, aggregate.id): return invalid = {pool_states.CREATED: 'no hosts to remove', pool_states.CHANGING: 'setup in progress', pool_states.DISMISSED: 'aggregate deleted', } - if (db.aggregate_metadata_get(context, aggregate.id)[pool_states.KEY] + if (self._get_metadata(context, aggregate.id)[pool_states.KEY] in invalid.keys()): raise exception.InvalidAggregateAction( action='remove host', aggregate_id=aggregate.id, - reason=invalid[db.aggregate_metadata_get(context, + reason=invalid[self._get_metadata(context, aggregate.id)[pool_states.KEY]]) - master_compute = db.aggregate_metadata_get(context, + master_compute = self._get_metadata(context, aggregate.id)['master_compute'] if master_compute == FLAGS.host and master_compute != host: # this is the master -> instruct it to eject a host from the pool - host_uuid = db.aggregate_metadata_get(context, aggregate.id)[host] + host_uuid = self._get_metadata(context, aggregate.id)[host] self._eject_slave(aggregate.id, - kwargs.get('compute_uuid'), host_uuid) + slave_info.get('compute_uuid'), host_uuid) db.aggregate_metadata_delete(context, aggregate.id, host) elif master_compute == host: # Remove master from its own pool -> destroy pool only if the @@ -161,9 +171,10 @@ class ResourcePool(object): db.aggregate_metadata_delete(context, aggregate.id, key) elif master_compute and master_compute != host: # A master exists -> forward pool-eject request to master - forward_request(context, "remove_aggregate_host", master_compute, - aggregate.id, host, - self._host_addr, self._host_uuid) + slave_info = self._create_slave_info() + + self.compute_rpcapi.remove_aggregate_host( + context, aggregate.id, host, master_compute, slave_info) else: # this shouldn't have happened raise exception.AggregateError(aggregate_id=aggregate.id, @@ -232,24 +243,21 @@ class ResourcePool(object): action='remove_from_aggregate', reason=str(e.details)) + def _create_slave_info(self): + """XenServer specific info needed to join the hypervisor pool""" + # replace the address from the xenapi connection url + # because this might be 169.254.0.1, i.e. xenapi + # NOTE: password in clear is not great, but it'll do for now + sender_url = swap_xapi_host( + FLAGS.xenapi_connection_url, self._host_addr) -def forward_request(context, request_type, master, aggregate_id, - slave_compute, slave_address, slave_uuid): - """Casts add/remove requests to the pool master.""" - # replace the address from the xenapi connection url - # because this might be 169.254.0.1, i.e. xenapi - # NOTE: password in clear is not great, but it'll do for now - sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address) - rpc.cast(context, rpc.queue_get_for(context, FLAGS.compute_topic, master), - {"method": request_type, - "args": {"aggregate_id": aggregate_id, - "host": slave_compute, - "url": sender_url, - "user": FLAGS.xenapi_connection_username, - "passwd": FLAGS.xenapi_connection_password, - "compute_uuid": vm_utils.get_this_vm_uuid(), - "xenhost_uuid": slave_uuid, }, - }) + return { + "url": sender_url, + "user": FLAGS.xenapi_connection_username, + "passwd": FLAGS.xenapi_connection_password, + "compute_uuid": vm_utils.get_this_vm_uuid(), + "xenhost_uuid": self._host_uuid, + } def swap_xapi_host(url, host_addr): |