diff options
| author | Jenkins <jenkins@review.openstack.org> | 2012-08-16 18:59:35 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2012-08-16 18:59:35 +0000 |
| commit | 71724dc2855376d9d54f7b136a6ba1ee61d023ec (patch) | |
| tree | 1390bb7917cef13c2de54ec3186b9050653b8ae1 | |
| parent | 2f45636ead3aef36c276804372c382558ecf3fe1 (diff) | |
| parent | 8718f8e47d7d0504724495538eb320be3e209180 (diff) | |
Merge "Always create the run_instance records locally"
| -rw-r--r-- | nova/compute/api.py | 148 | ||||
| -rw-r--r-- | nova/compute/manager.py | 2 | ||||
| -rw-r--r-- | nova/scheduler/chance.py | 26 | ||||
| -rw-r--r-- | nova/scheduler/filter_scheduler.py | 114 | ||||
| -rw-r--r-- | nova/scheduler/manager.py | 12 | ||||
| -rw-r--r-- | nova/scheduler/rpcapi.py | 9 | ||||
| -rw-r--r-- | nova/tests/compute/test_compute.py | 38 | ||||
| -rw-r--r-- | nova/tests/scheduler/test_filter_scheduler.py | 8 | ||||
| -rw-r--r-- | nova/tests/scheduler/test_rpcapi.py | 14 |
9 files changed, 180 insertions, 191 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py index 34f1657ec..e7f9368ea 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -373,8 +373,7 @@ class API(base.Base): access_ip_v4, access_ip_v6, requested_networks, config_drive, block_device_mapping, auto_disk_config, - reservation_id=None, create_instance_here=False, - scheduler_hints=None): + reservation_id=None, scheduler_hints=None): """Verify all the input parameters regardless of the provisioning strategy being performed and schedule the instance(s) for creation.""" @@ -495,45 +494,48 @@ class API(base.Base): LOG.debug(_("Going to run %s instances...") % num_instances) - if create_instance_here: - instance = self.create_db_entry_for_new_instance( - context, instance_type, image, base_options, - security_group, block_device_mapping, - quota_reservations) - - # Reservations committed; don't double-commit - quota_reservations = None - - # Tells scheduler we created the instance already. - base_options['uuid'] = instance['uuid'] - use_call = False - else: - # We need to wait for the scheduler to create the instance - # DB entries, because the instance *could* be # created in - # a child zone. - use_call = True - filter_properties = dict(scheduler_hints=scheduler_hints) if context.is_admin and forced_host: filter_properties['force_hosts'] = [forced_host] - # TODO(comstud): We should use rpc.multicall when we can - # retrieve the full instance dictionary from the scheduler. - # Otherwise, we could exceed the AMQP max message size limit. - # This would require the schedulers' schedule_run_instances - # methods to return an iterator vs a list. - instances = self._schedule_run_instance( - use_call, - context, base_options, - instance_type, - availability_zone, injected_files, - admin_password, image, - num_instances, requested_networks, - block_device_mapping, security_group, - filter_properties, quota_reservations) - - if create_instance_here: - return ([instance], reservation_id) + instances = [] + instance_uuids = [] + try: + for i in xrange(num_instances): + options = base_options.copy() + instance = self.create_db_entry_for_new_instance( + context, instance_type, image, options, + security_group, block_device_mapping) + instances.append(instance) + instance_uuids.append(instance['uuid']) + except Exception: + # Clean up as best we can. + with excutils.save_and_reraise_exception(): + try: + for instance_uuid in instance_uuids: + self.db.instance_destroy(context, + instance_uuid) + finally: + QUOTAS.rollback(context, quota_reservations) + + # Commit the reservations + QUOTAS.commit(context, quota_reservations) + + request_spec = { + 'image': jsonutils.to_primitive(image), + 'instance_properties': base_options, + 'instance_type': instance_type, + 'instance_uuids': instance_uuids, + 'block_device_mapping': block_device_mapping, + 'security_group': security_group, + } + + self.scheduler_rpcapi.run_instance(context, + request_spec=request_spec, + admin_password=admin_password, injected_files=injected_files, + requested_networks=requested_networks, is_first_time=True, + filter_properties=filter_properties) + return (instances, reservation_id) @staticmethod @@ -698,7 +700,7 @@ class API(base.Base): #NOTE(bcwaldon): No policy check since this is only used by scheduler and # the compute api. That should probably be cleaned up, though. def create_db_entry_for_new_instance(self, context, instance_type, image, - base_options, security_group, block_device_mapping, reservations): + base_options, security_group, block_device_mapping): """Create an entry in the DB for this new instance, including any related table updates (such as security group, etc). @@ -724,48 +726,8 @@ class API(base.Base): notifications.send_update_with_states(context, instance, None, vm_states.BUILDING, None, None, service="api") - # Commit the reservations - if reservations: - QUOTAS.commit(context, reservations) - return instance - def _schedule_run_instance(self, - use_call, - context, base_options, - instance_type, - availability_zone, injected_files, - admin_password, image, - num_instances, - requested_networks, - block_device_mapping, - security_group, - filter_properties, - quota_reservations): - """Send a run_instance request to the schedulers for processing.""" - - pid = context.project_id - uid = context.user_id - - LOG.debug(_("Sending create to scheduler for %(pid)s/%(uid)s's") % - locals()) - - request_spec = { - 'image': jsonutils.to_primitive(image), - 'instance_properties': base_options, - 'instance_type': instance_type, - 'num_instances': num_instances, - 'block_device_mapping': block_device_mapping, - 'security_group': security_group, - } - - return self.scheduler_rpcapi.run_instance(context, - request_spec=request_spec, - admin_password=admin_password, injected_files=injected_files, - requested_networks=requested_networks, is_first_time=True, - filter_properties=filter_properties, - reservations=quota_reservations, call=use_call) - def _check_create_policies(self, context, availability_zone, requested_networks, block_device_mapping): """Check policies for create().""" @@ -795,21 +757,13 @@ class API(base.Base): scheduler. The scheduler will determine where the instance(s) go and will handle creating the DB entries. - Returns a tuple of (instances, reservation_id) where instances - could be 'None' or a list of instance dicts depending on if - we waited for information from the scheduler or not. + Returns a tuple of (instances, reservation_id) """ self._check_create_policies(context, availability_zone, requested_networks, block_device_mapping) - # We can create the DB entry for the instance here if we're - # only going to create 1 instance. - # This speeds up API responses for builds - # as we don't need to wait for the scheduler. - create_instance_here = max_count == 1 or max_count is None - - (instances, reservation_id) = self._create_instance( + return self._create_instance( context, instance_type, image_href, kernel_id, ramdisk_id, min_count, max_count, @@ -820,24 +774,8 @@ class API(base.Base): access_ip_v4, access_ip_v6, requested_networks, config_drive, block_device_mapping, auto_disk_config, - create_instance_here=create_instance_here, scheduler_hints=scheduler_hints) - if create_instance_here or instances is None: - return (instances, reservation_id) - - inst_ret_list = [] - for instance in instances: - if instance.get('_is_precooked', False): - inst_ret_list.append(instance) - else: - # Scheduler only gives us the 'id'. We need to pull - # in the created instances from the DB - instance = self.db.instance_get(context, instance['id']) - inst_ret_list.append(dict(instance.iteritems())) - - return (inst_ret_list, reservation_id) - def trigger_provider_fw_rules_refresh(self, context): """Called when a rule is added/removed from a provider firewall""" @@ -1561,7 +1499,7 @@ class API(base.Base): request_spec = { 'instance_type': new_instance_type, - 'num_instances': 1, + 'instance_uuids': instance['uuid'], 'instance_properties': instance} filter_properties = {'ignore_hosts': []} diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 75e6f1f8a..c0f37d877 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -594,7 +594,7 @@ class ComputeManager(manager.SchedulerDependentManager): instance_uuid=instance_uuid) return - request_spec['num_instances'] = 1 + request_spec['instance_uuids'] = [instance_uuid] LOG.debug(_("Re-scheduling instance: attempt %d"), retry['num_attempts'], instance_uuid=instance_uuid) diff --git a/nova/scheduler/chance.py b/nova/scheduler/chance.py index 20955b642..9731a0a07 100644 --- a/nova/scheduler/chance.py +++ b/nova/scheduler/chance.py @@ -65,6 +65,32 @@ class ChanceScheduler(driver.Scheduler): requested_networks, is_first_time, filter_properties, reservations): """Create and run an instance or instances""" + if 'instance_uuids' not in request_spec: + return self._legacy_schedule_run_instance(context, request_spec, + admin_password, injected_files, requested_networks, + is_first_time, filter_properties, reservations) + instances = [] + instance_uuids = request_spec.get('instance_uuids') + for num, instance_uuid in enumerate(instance_uuids): + host = self._schedule(context, 'compute', request_spec, + filter_properties) + request_spec['instance_properties']['launch_index'] = num + updated_instance = driver.instance_update_db(context, + instance_uuid, host) + self.compute_rpcapi.run_instance(context, + instance=updated_instance, host=host, + requested_networks=requested_networks, + injected_files=injected_files, + admin_password=admin_password, is_first_time=is_first_time, + request_spec=request_spec, + filter_properties=filter_properties) + instances.append(driver.encode_instance(updated_instance)) + return instances + + def _legacy_schedule_run_instance(self, context, request_spec, + admin_password, injected_files, + requested_networks, is_first_time, + filter_properties, reservations): num_instances = request_spec.get('num_instances', 1) instances = [] for num in xrange(num_instances): diff --git a/nova/scheduler/filter_scheduler.py b/nova/scheduler/filter_scheduler.py index f34df1568..ccfbd495e 100644 --- a/nova/scheduler/filter_scheduler.py +++ b/nova/scheduler/filter_scheduler.py @@ -61,6 +61,56 @@ class FilterScheduler(driver.Scheduler): Returns a list of the instances created. """ + if 'instance_uuids' not in request_spec: + return self._legacy_schedule_run_instance(context, request_spec, + admin_password, injected_files, requested_networks, + is_first_time, filter_properties, reservations) + elevated = context.elevated() + instance_uuids = request_spec.get('instance_uuids') + num_instances = len(instance_uuids) + LOG.debug(_("Attempting to build %(num_instances)d instance(s)") % + locals()) + + payload = dict(request_spec=request_spec) + notifier.notify(context, notifier.publisher_id("scheduler"), + 'scheduler.run_instance.start', notifier.INFO, payload) + + weighted_hosts = self._schedule(context, "compute", request_spec, + filter_properties, instance_uuids) + + if not weighted_hosts: + raise exception.NoValidHost(reason="") + + # NOTE(comstud): Make sure we do not pass this through. It + # contains an instance of RpcContext that cannot be serialized. + filter_properties.pop('context', None) + + for num, instance_uuid in enumerate(instance_uuids): + if not weighted_hosts: + break + weighted_host = weighted_hosts.pop(0) + + request_spec['instance_properties']['launch_index'] = num + + self._provision_resource(elevated, weighted_host, + request_spec, + filter_properties, + requested_networks, + injected_files, admin_password, + is_first_time, + instance_uuid=instance_uuid) + # scrub retry host list in case we're scheduling multiple + # instances: + retry = filter_properties.get('retry', {}) + retry['hosts'] = [] + + notifier.notify(context, notifier.publisher_id("scheduler"), + 'scheduler.run_instance.end', notifier.INFO, payload) + + def _legacy_schedule_run_instance(self, context, request_spec, + admin_password, injected_files, + requested_networks, is_first_time, + filter_properties, reservations): elevated = context.elevated() num_instances = request_spec.get('num_instances', 1) LOG.debug(_("Attempting to build %(num_instances)d instance(s)") % @@ -89,11 +139,12 @@ class FilterScheduler(driver.Scheduler): request_spec['instance_properties']['launch_index'] = num instance = self._provision_resource(elevated, weighted_host, - request_spec, reservations, + request_spec, filter_properties, requested_networks, injected_files, admin_password, - is_first_time) + is_first_time, + reservations=reservations) # scrub retry host list in case we're scheduling multiple # instances: retry = filter_properties.get('retry', {}) @@ -117,7 +168,7 @@ class FilterScheduler(driver.Scheduler): """ hosts = self._schedule(context, 'compute', request_spec, - filter_properties) + filter_properties, [instance['uuid']]) if not hosts: raise exception.NoValidHost(reason="") host = hosts.pop(0) @@ -127,24 +178,26 @@ class FilterScheduler(driver.Scheduler): instance_type, host.host_state.host, reservations) def _provision_resource(self, context, weighted_host, request_spec, - reservations, filter_properties, requested_networks, - injected_files, admin_password, is_first_time): + filter_properties, requested_networks, injected_files, + admin_password, is_first_time, reservations=None, + instance_uuid=None): """Create the requested resource in this Zone.""" - instance = self.create_instance_db_entry(context, request_spec, - reservations) - + if reservations: + instance = self.create_instance_db_entry(context, request_spec, + reservations) + instance_uuid = instance['uuid'] # Add a retry entry for the selected compute host: self._add_retry_host(filter_properties, weighted_host.host_state.host) payload = dict(request_spec=request_spec, weighted_host=weighted_host.to_dict(), - instance_id=instance['uuid']) + instance_id=instance_uuid) notifier.notify(context, notifier.publisher_id("scheduler"), 'scheduler.run_instance.scheduled', notifier.INFO, payload) - updated_instance = driver.instance_update_db(context, instance['uuid'], - weighted_host.host_state.host) + updated_instance = driver.instance_update_db(context, + instance_uuid, weighted_host.host_state.host) self.compute_rpcapi.run_instance(context, instance=updated_instance, host=weighted_host.host_state.host, @@ -153,14 +206,15 @@ class FilterScheduler(driver.Scheduler): injected_files=injected_files, admin_password=admin_password, is_first_time=is_first_time) - inst = driver.encode_instance(updated_instance, local=True) + if reservations: + inst = driver.encode_instance(updated_instance, local=True) - # So if another instance is created, create_instance_db_entry will - # actually create a new entry, instead of assume it's been created - # already - del request_spec['instance_properties']['uuid'] + # So if another instance is created, create_instance_db_entry will + # actually create a new entry, instead of assume it's been created + # already + del request_spec['instance_properties']['uuid'] - return inst + return inst def _add_retry_host(self, filter_properties, host): """Add a retry entry for the selected computep host. In the event that @@ -212,11 +266,13 @@ class FilterScheduler(driver.Scheduler): filter_properties['retry'] = retry if retry['num_attempts'] > max_attempts: - uuid = instance_properties.get('uuid', None) - msg = _("Exceeded max scheduling attempts %d ") % max_attempts - raise exception.NoValidHost(msg, instance_uuid=uuid) + instance_uuid = instance_properties.get('uuid') + msg = _("Exceeded max scheduling attempts %(max_attempts)d for " + "instance %(instance_uuid)s") % locals() + raise exception.NoValidHost(reason=msg) - def _schedule(self, context, topic, request_spec, filter_properties): + def _schedule(self, context, topic, request_spec, filter_properties, + instance_uuids=None): """Returns a list of hosts that meet the required specs, ordered by their fitness. """ @@ -231,8 +287,13 @@ class FilterScheduler(driver.Scheduler): cost_functions = self.get_cost_functions() config_options = self._get_configuration_options() - # check retry policy: - self._populate_retry(filter_properties, instance_properties) + # check retry policy. Rather ugly use of instance_uuids[0]... + # but if we've exceeded max retries... then we really only + # have a single instance. + properties = instance_properties.copy() + if instance_uuids: + properties['uuid'] = instance_uuids[0] + self._populate_retry(filter_properties, properties) filter_properties.update({'context': context, 'request_spec': request_spec, @@ -256,8 +317,11 @@ class FilterScheduler(driver.Scheduler): # are being scanned in a filter or weighing function. hosts = unfiltered_hosts_dict.itervalues() - num_instances = request_spec.get('num_instances', 1) selected_hosts = [] + if instance_uuids: + num_instances = len(instance_uuids) + else: + num_instances = request_spec.get('num_instances', 1) for num in xrange(num_instances): # Filter local hosts based on requirements ... hosts = self.host_manager.filter_hosts(hosts, @@ -285,7 +349,7 @@ class FilterScheduler(driver.Scheduler): instance_properties) selected_hosts.sort(key=operator.attrgetter('weight')) - return selected_hosts[:num_instances] + return selected_hosts def get_cost_functions(self, topic=None): """Returns a list of tuples containing weights and cost functions to diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py index 41a9cf746..2bc013704 100644 --- a/nova/scheduler/manager.py +++ b/nova/scheduler/manager.py @@ -53,7 +53,7 @@ QUOTAS = quota.QUOTAS class SchedulerManager(manager.Manager): """Chooses a host to run instances on.""" - RPC_API_VERSION = '1.5' + RPC_API_VERSION = '1.6' def __init__(self, scheduler_driver=None, *args, **kwargs): if not scheduler_driver: @@ -117,7 +117,7 @@ class SchedulerManager(manager.Manager): def run_instance(self, context, request_spec, admin_password, injected_files, requested_networks, is_first_time, - filter_properties, reservations, topic=None): + filter_properties, reservations=None, topic=None): """Tries to call schedule_run_instance on the driver. Sets instance vm_state to ERROR on exceptions """ @@ -202,7 +202,13 @@ class SchedulerManager(manager.Manager): vm_state = updates['vm_state'] properties = request_spec.get('instance_properties', {}) - instance_uuid = properties.get('uuid', {}) + # FIXME(comstud): We really need to move error handling closer + # to where the errors occur so we can deal with errors on + # individual instances when scheduling multiple. + if 'instance_uuids' in request_spec: + instance_uuid = request_spec['instance_uuids'][0] + else: + instance_uuid = properties.get('uuid', {}) if instance_uuid: state = vm_state.upper() diff --git a/nova/scheduler/rpcapi.py b/nova/scheduler/rpcapi.py index 6f9d00ad2..84d0286a2 100644 --- a/nova/scheduler/rpcapi.py +++ b/nova/scheduler/rpcapi.py @@ -40,6 +40,7 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy): 1.3 - Remove instance_id, add instance to live_migration 1.4 - Remove update_db from prep_resize 1.5 - Add reservations argument to prep_resize() + 1.6 - Remove reservations argument to run_instance() ''' BASE_RPC_API_VERSION = '1.0' @@ -50,15 +51,13 @@ class SchedulerAPI(nova.openstack.common.rpc.proxy.RpcProxy): def run_instance(self, ctxt, request_spec, admin_password, injected_files, requested_networks, is_first_time, - filter_properties, reservations, call=True): - rpc_method = self.call if call else self.cast - return rpc_method(ctxt, self.make_msg('run_instance', + filter_properties): + return self.cast(ctxt, self.make_msg('run_instance', request_spec=request_spec, admin_password=admin_password, injected_files=injected_files, requested_networks=requested_networks, is_first_time=is_first_time, - filter_properties=filter_properties, - reservations=reservations), version='1.2') + filter_properties=filter_properties), version='1.6') def prep_resize(self, ctxt, instance, instance_type, image, request_spec, filter_properties, reservations): diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py index 1ebe889c1..8a46d51ec 100644 --- a/nova/tests/compute/test_compute.py +++ b/nova/tests/compute/test_compute.py @@ -68,38 +68,6 @@ flags.DECLARE('live_migration_retry_count', 'nova.compute.manager') FAKE_IMAGE_REF = 'fake-image-ref' -orig_rpc_call = rpc.call -orig_rpc_cast = rpc.cast - - -def rpc_call_wrapper(context, topic, msg, do_cast=True): - """Stub out the scheduler creating the instance entry""" - if (topic == FLAGS.scheduler_topic and - msg['method'] == 'run_instance'): - request_spec = msg['args']['request_spec'] - reservations = msg['args'].get('reservations') - scheduler = scheduler_driver.Scheduler - num_instances = request_spec.get('num_instances', 1) - instances = [] - for num in xrange(num_instances): - request_spec['instance_properties']['launch_index'] = num - instance = scheduler().create_instance_db_entry( - context, request_spec, reservations) - encoded = scheduler_driver.encode_instance(instance) - instances.append(encoded) - return instances - else: - if do_cast: - orig_rpc_cast(context, topic, msg) - else: - return orig_rpc_call(context, topic, msg) - - -def rpc_cast_wrapper(context, topic, msg): - """Stub out the scheduler creating the instance entry in - the reservation_id case. - """ - rpc_call_wrapper(context, topic, msg, do_cast=True) def nop_report_driver_status(self): @@ -136,8 +104,6 @@ class BaseTestCase(test.TestCase): fake_image.stub_out_image_service(self.stubs) self.stubs.Set(fake_image._FakeImageService, 'show', fake_show) - self.stubs.Set(rpc, 'call', rpc_call_wrapper) - self.stubs.Set(rpc, 'cast', rpc_cast_wrapper) fake_rpcapi = FakeSchedulerAPI() self.stubs.Set(self.compute, 'scheduler_rpcapi', fake_rpcapi) @@ -4774,10 +4740,10 @@ class ComputeReschedulingTestCase(BaseTestCase): def test_reschedule_success(self): retry = dict(num_attempts=1) filter_properties = dict(retry=retry) - request_spec = {'num_instances': 42} + request_spec = {'instance_uuids': ['foo', 'bar']} self.assertTrue(self._reschedule(filter_properties=filter_properties, request_spec=request_spec)) - self.assertEqual(1, request_spec['num_instances']) + self.assertEqual(1, len(request_spec['instance_uuids'])) class ThatsNoOrdinaryRabbitException(Exception): diff --git a/nova/tests/scheduler/test_filter_scheduler.py b/nova/tests/scheduler/test_filter_scheduler.py index 1ea57b4cf..64bf5415f 100644 --- a/nova/tests/scheduler/test_filter_scheduler.py +++ b/nova/tests/scheduler/test_filter_scheduler.py @@ -119,13 +119,13 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase): # instance 1 self.driver._provision_resource( ctxt, 'host1', - mox.Func(_has_launch_index(0)), None, - {}, None, None, None, None).AndReturn(instance1) + mox.Func(_has_launch_index(0)), {}, + None, None, None, None, reservations=None).AndReturn(instance1) # instance 2 self.driver._provision_resource( ctxt, 'host2', - mox.Func(_has_launch_index(1)), None, - {}, None, None, None, None).AndReturn(instance2) + mox.Func(_has_launch_index(1)), {}, + None, None, None, None, reservations=None).AndReturn(instance2) self.mox.ReplayAll() self.driver.schedule_run_instance(context_fake, request_spec, diff --git a/nova/tests/scheduler/test_rpcapi.py b/nova/tests/scheduler/test_rpcapi.py index ce1fd4efe..a2fee64b2 100644 --- a/nova/tests/scheduler/test_rpcapi.py +++ b/nova/tests/scheduler/test_rpcapi.py @@ -43,8 +43,6 @@ class SchedulerRpcAPITestCase(test.TestCase): expected_version = kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION) expected_msg = rpcapi.make_msg(method, **kwargs) expected_msg['version'] = expected_version - if rpc_method == 'cast' and method == 'run_instance': - kwargs['call'] = False self.fake_args = None self.fake_kwargs = None @@ -64,21 +62,13 @@ class SchedulerRpcAPITestCase(test.TestCase): for arg, expected_arg in zip(self.fake_args, expected_args): self.assertEqual(arg, expected_arg) - def test_run_instance_call(self): - self._test_scheduler_api('run_instance', rpc_method='call', - request_spec='fake_request_spec', - admin_password='pw', injected_files='fake_injected_files', - requested_networks='fake_requested_networks', - is_first_time=True, filter_properties='fake_filter_properties', - reservations=None, version='1.2') - - def test_run_instance_cast(self): + def test_run_instance(self): self._test_scheduler_api('run_instance', rpc_method='cast', request_spec='fake_request_spec', admin_password='pw', injected_files='fake_injected_files', requested_networks='fake_requested_networks', is_first_time=True, filter_properties='fake_filter_properties', - reservations=None, version='1.2') + version='1.6') def test_prep_resize(self): self._test_scheduler_api('prep_resize', rpc_method='cast', |
