summaryrefslogtreecommitdiffstats
path: root/nova/scheduler/driver.py
blob: d5a1eedeac5f52877bee344291b6bdee9f28a212 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright (c) 2010 OpenStack Foundation
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""
Scheduler base class that all Schedulers should inherit from
"""

import sys

from oslo.config import cfg

from nova.compute import flavors
from nova.compute import power_state
from nova.compute import rpcapi as compute_rpcapi
from nova.compute import utils as compute_utils
from nova.compute import vm_states
from nova.conductor import api as conductor_api
from nova import db
from nova import exception
from nova.image import glance
from nova import notifications
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
from nova.openstack.common.notifier import api as notifier
from nova.openstack.common import timeutils
from nova import servicegroup

LOG = logging.getLogger(__name__)

scheduler_driver_opts = [
    cfg.StrOpt('scheduler_host_manager',
               default='nova.scheduler.host_manager.HostManager',
               help='The scheduler host manager class to use'),
    cfg.IntOpt('scheduler_max_attempts',
               default=3,
               help='Maximum number of attempts to schedule an instance'),
    ]

CONF = cfg.CONF
CONF.register_opts(scheduler_driver_opts)


def handle_schedule_error(context, ex, instance_uuid, request_spec):
    if not isinstance(ex, exception.NoValidHost):
        LOG.exception(_("Exception during scheduler.run_instance"))
    state = vm_states.ERROR.upper()
    LOG.warning(_('Setting instance to %s state.'), state,
                instance_uuid=instance_uuid)

    # update instance state and notify on the transition
    (old_ref, new_ref) = db.instance_update_and_get_original(context,
            instance_uuid, {'vm_state': vm_states.ERROR,
                            'task_state': None})
    notifications.send_update(context, old_ref, new_ref,
            service="scheduler")
    compute_utils.add_instance_fault_from_exc(context,
            conductor_api.LocalAPI(),
            new_ref, ex, sys.exc_info())

    properties = request_spec.get('instance_properties', {})
    payload = dict(request_spec=request_spec,
                   instance_properties=properties,
                   instance_id=instance_uuid,
                   state=vm_states.ERROR,
                   method='run_instance',
                   reason=ex)

    notifier.notify(context, notifier.publisher_id("scheduler"),
                    'scheduler.run_instance', notifier.ERROR, payload)


def instance_update_db(context, instance_uuid, extra_values=None):
    '''Clear the host and node - set the scheduled_at field of an Instance.

    :returns: An Instance with the updated fields set properly.
    '''
    now = timeutils.utcnow()
    values = {'host': None, 'node': None, 'scheduled_at': now}
    if extra_values:
        values.update(extra_values)

    return db.instance_update(context, instance_uuid, values)


def encode_instance(instance, local=True):
    """Encode locally created instance for return via RPC."""
    # TODO(comstud): I would love to be able to return the full
    # instance information here, but we'll need some modifications
    # to the RPC code to handle datetime conversions with the
    # json encoding/decoding.  We should be able to set a default
    # json handler somehow to do it.
    #
    # For now, I'll just return the instance ID and let the caller
    # do a DB lookup :-/
    if local:
        return dict(id=instance['id'], _is_precooked=False)
    else:
        inst = dict(instance)
        inst['_is_precooked'] = True
        return inst


class Scheduler(object):
    """The base class that all Scheduler classes should inherit from."""

    def __init__(self):
        self.host_manager = importutils.import_object(
                CONF.scheduler_host_manager)
        self.compute_rpcapi = compute_rpcapi.ComputeAPI()
        self.servicegroup_api = servicegroup.API()
        self.image_service = glance.get_default_image_service()

    def update_service_capabilities(self, service_name, host, capabilities):
        """Process a capability update from a service node."""
        self.host_manager.update_service_capabilities(service_name,
                host, capabilities)

    def hosts_up(self, context, topic):
        """Return the list of hosts that have a running service for topic."""

        services = db.service_get_all_by_topic(context, topic)
        return [service['host']
                for service in services
                if self.servicegroup_api.service_is_up(service)]

    def group_hosts(self, context, group):
        """Return the list of hosts that have VM's from the group."""

        # The system_metadata 'group' will be filtered
        members = db.instance_get_all_by_filters(context,
                {'deleted': False, 'group': group})
        return [member['host']
                for member in members
                if member.get('host') is not None]

    def schedule_prep_resize(self, context, image, request_spec,
                             filter_properties, instance, instance_type,
                             reservations):
        """Must override schedule_prep_resize method for scheduler to work."""
        msg = _("Driver must implement schedule_prep_resize")
        raise NotImplementedError(msg)

    def schedule_run_instance(self, context, request_spec,
                              admin_password, injected_files,
                              requested_networks, is_first_time,
                              filter_properties):
        """Must override schedule_run_instance method for scheduler to work."""
        msg = _("Driver must implement schedule_run_instance")
        raise NotImplementedError(msg)

    def select_hosts(self, context, request_spec, filter_properties):
        """Must override select_hosts method for scheduler to work."""
        msg = _("Driver must implement select_hosts")
        raise NotImplementedError(msg)

    def schedule_live_migration(self, context, instance, dest,
                                block_migration, disk_over_commit):
        """Live migration scheduling method.

        :param context:
        :param instance: instance dict
        :param dest: destination host
        :param block_migration: if true, block_migration.
        :param disk_over_commit: if True, consider real(not virtual)
                                 disk size.

        :return:
            The host where instance is running currently.
            Then scheduler send request that host.
        """
        # Check we can do live migration
        self._live_migration_src_check(context, instance)

        if dest is None:
            # Let scheduler select a dest host, retry next best until success
            # or no more valid hosts.
            ignore_hosts = [instance['host']]
            while dest is None:
                dest = self._live_migration_dest_check(context, instance, dest,
                                                       ignore_hosts)
                try:
                    self._live_migration_common_check(context, instance, dest)
                    migrate_data = self.compute_rpcapi.\
                        check_can_live_migrate_destination(context, instance,
                                                           dest,
                                                           block_migration,
                                                           disk_over_commit)
                except exception.Invalid:
                    ignore_hosts.append(dest)
                    dest = None
                    continue
        else:
            # Test the given dest host
            self._live_migration_dest_check(context, instance, dest)
            self._live_migration_common_check(context, instance, dest)
            migrate_data = self.compute_rpcapi.\
                check_can_live_migrate_destination(context, instance, dest,
                                                   block_migration,
                                                   disk_over_commit)

        # Perform migration
        src = instance['host']
        self.compute_rpcapi.live_migration(context, host=src,
                instance=instance, dest=dest,
                block_migration=block_migration,
                migrate_data=migrate_data)

    def _live_migration_src_check(self, context, instance_ref):
        """Live migration check routine (for src host).

        :param context: security context
        :param instance_ref: nova.db.sqlalchemy.models.Instance object

        """
        # TODO(johngar) why is this not in the API layer?
        # Checking instance is running.
        if instance_ref['power_state'] != power_state.RUNNING:
            raise exception.InstanceNotRunning(
                    instance_id=instance_ref['uuid'])

        # Checking src host exists and compute node
        src = instance_ref['host']
        try:
            service = db.service_get_by_compute_host(context, src)
        except exception.NotFound:
            raise exception.ComputeServiceUnavailable(host=src)

        # Checking src host is alive.
        if not self.servicegroup_api.service_is_up(service):
            raise exception.ComputeServiceUnavailable(host=src)

    def _live_migration_dest_check(self, context, instance_ref, dest,
                                   ignore_hosts=None):
        """Live migration check routine (for destination host).

        :param context: security context
        :param instance_ref: nova.db.sqlalchemy.models.Instance object
        :param dest: destination host
        :param ignore_hosts: hosts that should be avoided as dest host
        """

        # If dest is not specified, have scheduler pick one.
        if dest is None:
            instance_type = flavors.extract_flavor(instance_ref)
            if not instance_ref['image_ref']:
                image = None
            else:
                image = self.image_service.show(context,
                                                instance_ref['image_ref'])
            request_spec = {'instance_properties': instance_ref,
                            'instance_type': instance_type,
                            'instance_uuids': [instance_ref['uuid']],
                            'image': image}
            filter_properties = {'ignore_hosts': ignore_hosts}
            return self.select_hosts(context, request_spec,
                                     filter_properties)[0]

        # Checking whether The host where instance is running
        # and dest is not same.
        src = instance_ref['host']
        if dest == src:
            raise exception.UnableToMigrateToSelf(
                    instance_id=instance_ref['uuid'], host=dest)

        # Checking dest exists and compute node.
        try:
            dservice_ref = db.service_get_by_compute_host(context, dest)
        except exception.NotFound:
            raise exception.ComputeServiceUnavailable(host=dest)

        # Checking dest host is alive.
        if not self.servicegroup_api.service_is_up(dservice_ref):
            raise exception.ComputeServiceUnavailable(host=dest)

        # Check memory requirements
        self._assert_compute_node_has_enough_memory(context,
                                                   instance_ref, dest)

        return dest

    def _live_migration_common_check(self, context, instance_ref, dest):
        """Live migration common check routine.

        The following checks are based on
        http://wiki.libvirt.org/page/TodoPreMigrationChecks

        :param context: security context
        :param instance_ref: nova.db.sqlalchemy.models.Instance object
        :param dest: destination host
        """
        dservice_ref = self._get_compute_info(context, dest)
        src = instance_ref['host']
        oservice_ref = self._get_compute_info(context, src)

        # Checking hypervisor is same.
        orig_hypervisor = oservice_ref['hypervisor_type']
        dest_hypervisor = dservice_ref['hypervisor_type']
        if orig_hypervisor != dest_hypervisor:
            raise exception.InvalidHypervisorType()

        # Checking hypervisor version.
        orig_hypervisor = oservice_ref['hypervisor_version']
        dest_hypervisor = dservice_ref['hypervisor_version']
        if orig_hypervisor > dest_hypervisor:
            raise exception.DestinationHypervisorTooOld()

    def _assert_compute_node_has_enough_memory(self, context,
                                              instance_ref, dest):
        """Checks if destination host has enough memory for live migration.


        :param context: security context
        :param instance_ref: nova.db.sqlalchemy.models.Instance object
        :param dest: destination host

        """
        # Getting total available memory of host
        avail = self._get_compute_info(context, dest)['free_ram_mb']

        mem_inst = instance_ref['memory_mb']
        if not mem_inst or avail <= mem_inst:
            instance_uuid = instance_ref['uuid']
            reason = _("Unable to migrate %(instance_uuid)s to %(dest)s: "
                       "Lack of memory(host:%(avail)s <= "
                       "instance:%(mem_inst)s)")
            raise exception.MigrationPreCheckError(reason=reason %
                {'instance_uuid': instance_uuid, 'dest': dest, 'avail': avail,
                 'mem_inst': mem_inst})

    def _get_compute_info(self, context, host):
        """get compute node's information specified by key

        :param context: security context
        :param host: hostname(must be compute node)
        :param key: column name of compute_nodes
        :return: value specified by key

        """
        service_ref = db.service_get_by_compute_host(context, host)
        return service_ref['compute_node'][0]