summaryrefslogtreecommitdiffstats
path: root/nova/scheduler/manager.py
blob: d0ff30c36b760dd1d0907f40ea014c8bec94fff6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright (c) 2010 OpenStack, LLC.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""
Scheduler Service
"""

import functools

from nova.compute import vm_states
from nova import db
from nova import exception
from nova import flags
from nova import manager
from nova import notifications
from nova.openstack.common import cfg
from nova.openstack.common import excutils
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
from nova.openstack.common.notifier import api as notifier
from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common.rpc import dispatcher as rpc_dispatcher
from nova import quota


LOG = logging.getLogger(__name__)

scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
        default='nova.scheduler.multi.MultiScheduler',
        help='Default driver to use for the scheduler')

FLAGS = flags.FLAGS
FLAGS.register_opt(scheduler_driver_opt)

QUOTAS = quota.QUOTAS


class SchedulerManager(manager.Manager):
    """Chooses a host to run instances on."""

    RPC_API_VERSION = '1.7'

    def __init__(self, scheduler_driver=None, *args, **kwargs):
        if not scheduler_driver:
            scheduler_driver = FLAGS.scheduler_driver
        self.driver = importutils.import_object(scheduler_driver)
        super(SchedulerManager, self).__init__(*args, **kwargs)

    def create_rpc_dispatcher(self):
        """Get the rpc dispatcher for this manager.

        Return a dispatcher which can call out to either SchedulerManager
        or _V2SchedulerManagerProxy depending on the RPC API version.
        """
        return rpc_dispatcher.RpcDispatcher([self,
                                              _V2SchedulerManagerProxy(self)])

    def __getattr__(self, key):
        """Converts all method calls to use the schedule method"""
        # NOTE(russellb) Because of what this is doing, we must be careful
        # when changing the API of the scheduler drivers, as that changes
        # the rpc API as well, and the version should be updated accordingly.
        # NOTE(markmc): This remains only for backwards compat support
        # and can be removed when we bump the major version
        return functools.partial(self._schedule, key)

    def get_host_list(self, context):
        """Get a list of hosts from the HostManager.

        Currently unused, but left for backwards compatibility.
        """
        raise rpc_common.RPCException(message=_('Deprecated in version 1.0'))

    def get_service_capabilities(self, context):
        """Get the normalized set of capabilities for this zone.

        Has been unused since pre-essex, but remains for rpc API 1.X
        completeness.
        """
        raise rpc_common.RPCException(message=_('Deprecated in version 1.0'))

    def update_service_capabilities(self, context, service_name=None,
            host=None, capabilities=None, **kwargs):
        """Process a capability update from a service node."""
        if capabilities is None:
            capabilities = {}
        self.driver.update_service_capabilities(service_name, host,
                capabilities)

    def create_volume(self, context, volume_id, snapshot_id, reservations):
        try:
            self.driver.schedule_create_volume(
                context, volume_id, snapshot_id, reservations)
        except Exception as ex:
            with excutils.save_and_reraise_exception():
                self._set_vm_state_and_notify('create_volume',
                                             {'vm_state': vm_states.ERROR},
                                             context, ex, {})

    def live_migration(self, context, dest,
                       block_migration=False, disk_over_commit=False,
                       instance=None, instance_id=None, topic=None):
        try:
            return self.driver.schedule_live_migration(
                context, dest,
                block_migration, disk_over_commit,
                instance, instance_id)
        except Exception as ex:
            with excutils.save_and_reraise_exception():
                self._set_vm_state_and_notify('live_migration',
                                             {'vm_state': vm_states.ERROR},
                                             context, ex, {})

    def _schedule(self, method, context, topic, *args, **kwargs):
        """Tries to call schedule_* method on the driver to retrieve host.
        Falls back to schedule(context, topic) if method doesn't exist.
        """
        driver_method_name = 'schedule_%s' % method
        try:
            driver_method = getattr(self.driver, driver_method_name)
            args = (context,) + args
        except AttributeError, e:
            LOG.warning(_("Driver Method %(driver_method_name)s missing: "
                       "%(e)s. Reverting to schedule()") % locals())
            driver_method = self.driver.schedule
            args = (context, topic, method) + args

        # Scheduler methods are responsible for casting.
        try:
            return driver_method(*args, **kwargs)
        except Exception as ex:
            with excutils.save_and_reraise_exception():
                request_spec = kwargs.get('request_spec', {})
                self._set_vm_state_and_notify(method,
                                             {'vm_state': vm_states.ERROR},
                                             context, ex, request_spec)

    def run_instance(self, context, request_spec, admin_password,
            injected_files, requested_networks, is_first_time,
            filter_properties, reservations=None, topic=None):
        """Tries to call schedule_run_instance on the driver.
        Sets instance vm_state to ERROR on exceptions
        """
        try:
            result = self.driver.schedule_run_instance(context,
                    request_spec, admin_password, injected_files,
                    requested_networks, is_first_time, filter_properties,
                    reservations)
            return result
        except exception.NoValidHost as ex:
            # don't re-raise
            self._set_vm_state_and_notify('run_instance',
                                         {'vm_state': vm_states.ERROR},
                                          context, ex, request_spec)
            if reservations:
                QUOTAS.rollback(context, reservations)
        except Exception as ex:
            with excutils.save_and_reraise_exception():
                self._set_vm_state_and_notify('run_instance',
                                             {'vm_state': vm_states.ERROR},
                                             context, ex, request_spec)
                if reservations:
                    QUOTAS.rollback(context, reservations)

    # FIXME(comstud): Remove 'update_db' in a future version.  It's only
    # here for rpcapi backwards compatibility.
    def prep_resize(self, context, image, request_spec, filter_properties,
                    update_db=None, instance=None, instance_uuid=None,
                    instance_type=None, instance_type_id=None,
                    reservations=None, topic=None):
        """Tries to call schedule_prep_resize on the driver.
        Sets instance vm_state to ACTIVE on NoHostFound
        Sets vm_state to ERROR on other exceptions
        """
        if not instance:
            instance = db.instance_get_by_uuid(context, instance_uuid)

        if not instance_type:
            instance_type = db.instance_type_get(context, instance_type_id)

        try:
            kwargs = {
                'context': context,
                'image': image,
                'request_spec': request_spec,
                'filter_properties': filter_properties,
                'instance': instance,
                'instance_type': instance_type,
                'reservations': reservations,
            }
            return self.driver.schedule_prep_resize(**kwargs)
        except exception.NoValidHost as ex:
            self._set_vm_state_and_notify('prep_resize',
                                         {'vm_state': vm_states.ACTIVE,
                                          'task_state': None},
                                         context, ex, request_spec)
            if reservations:
                QUOTAS.rollback(context, reservations)
        except Exception as ex:
            with excutils.save_and_reraise_exception():
                self._set_vm_state_and_notify('prep_resize',
                                             {'vm_state': vm_states.ERROR},
                                             context, ex, request_spec)
                if reservations:
                    QUOTAS.rollback(context, reservations)

    def _set_vm_state_and_notify(self, method, updates, context, ex,
                                 request_spec):
        """changes VM state and notifies"""
        # FIXME(comstud): Re-factor this somehow. Not sure this belongs in the
        # scheduler manager like this. We should make this easier.
        # run_instance only sends a request_spec, and an instance may or may
        # not have been created in the API (or scheduler) already. If it was
        # created, there's a 'uuid' set in the instance_properties of the
        # request_spec.
        # (littleidea): I refactored this a bit, and I agree
        # it should be easier :)
        # The refactoring could go further but trying to minimize changes
        # for essex timeframe

        LOG.warning(_("Failed to schedule_%(method)s: %(ex)s") % locals())

        vm_state = updates['vm_state']
        properties = request_spec.get('instance_properties', {})
        # FIXME(comstud): We really need to move error handling closer
        # to where the errors occur so we can deal with errors on
        # individual instances when scheduling multiple.
        if 'instance_uuids' in request_spec:
            instance_uuid = request_spec['instance_uuids'][0]
        else:
            instance_uuid = properties.get('uuid', {})

        if instance_uuid:
            state = vm_state.upper()
            LOG.warning(_('Setting instance to %(state)s state.'), locals(),
                        instance_uuid=instance_uuid)

            # update instance state and notify on the transition
            (old_ref, new_ref) = db.instance_update_and_get_original(context,
                    instance_uuid, updates)
            notifications.send_update(context, old_ref, new_ref,
                    service="scheduler")

        payload = dict(request_spec=request_spec,
                       instance_properties=properties,
                       instance_id=instance_uuid,
                       state=vm_state,
                       method=method,
                       reason=ex)

        notifier.notify(context, notifier.publisher_id("scheduler"),
                        'scheduler.' + method, notifier.ERROR, payload)

    # NOTE (masumotok) : This method should be moved to nova.api.ec2.admin.
    # Based on bexar design summit discussion,
    # just put this here for bexar release.
    def show_host_resources(self, context, host):
        """Shows the physical/usage resource given by hosts.

        :param context: security context
        :param host: hostname
        :returns:
            example format is below::

                {'resource':D, 'usage':{proj_id1:D, proj_id2:D}}
                D: {'vcpus': 3, 'memory_mb': 2048, 'local_gb': 2048,
                    'vcpus_used': 12, 'memory_mb_used': 10240,
                    'local_gb_used': 64}

        """
        # Getting compute node info and related instances info
        compute_ref = db.service_get_all_compute_by_host(context, host)
        compute_ref = compute_ref[0]
        instance_refs = db.instance_get_all_by_host(context,
                                                    compute_ref['host'])

        # Getting total available/used resource
        compute_ref = compute_ref['compute_node'][0]
        resource = {'vcpus': compute_ref['vcpus'],
                    'memory_mb': compute_ref['memory_mb'],
                    'local_gb': compute_ref['local_gb'],
                    'vcpus_used': compute_ref['vcpus_used'],
                    'memory_mb_used': compute_ref['memory_mb_used'],
                    'local_gb_used': compute_ref['local_gb_used']}
        usage = dict()
        if not instance_refs:
            return {'resource': resource, 'usage': usage}

        # Getting usage resource per project
        project_ids = [i['project_id'] for i in instance_refs]
        project_ids = list(set(project_ids))
        for project_id in project_ids:
            vcpus = [i['vcpus'] for i in instance_refs
                     if i['project_id'] == project_id]

            mem = [i['memory_mb'] for i in instance_refs
                   if i['project_id'] == project_id]

            root = [i['root_gb'] for i in instance_refs
                    if i['project_id'] == project_id]

            ephemeral = [i['ephemeral_gb'] for i in instance_refs
                         if i['project_id'] == project_id]

            usage[project_id] = {'vcpus': sum(vcpus),
                                 'memory_mb': sum(mem),
                                 'root_gb': sum(root),
                                 'ephemeral_gb': sum(ephemeral)}

        return {'resource': resource, 'usage': usage}

    @manager.periodic_task
    def _expire_reservations(self, context):
        QUOTAS.expire(context)


class _V2SchedulerManagerProxy(object):

    RPC_API_VERSION = '2.0'

    # Notes:
    # - remove get_host_list()
    # - remove get_service_capabilities()
    # - add explicit live_migration() method
    # - remove __getattr__ magic which is replaced by schedule()

    def __init__(self, manager):
        self.manager = manager

    def create_volume(self, context, volume_id, snapshot_id, reservations):
        return self.manager.create_volume(
            context, volume_id, snapshot_id, reservations)

    # Remove instance_id, require instance
    # Remove topic
    # Make block_migration and disk_over_commit required
    def live_migration(self, context, instance, dest,
                       block_migration, disk_over_commit):
        return self.manager.live_migration(
            context, dest, instance=instance,
            block_migration=block_migration,
            disk_over_commit=disk_over_commit,
            instance_id=None)

    # Remove update_db
    # Remove instance_uuid, require instance
    # Remove instance_type_id, require instance_type
    # Remove topic
    # Make reservations required
    def prep_resize(self, context, image, request_spec, filter_properties,
                    instance, instance_type, reservations):
        return self.manager.prep_resize(
            context, image=image, request_spec=request_spec,
            filter_properties=filter_properties,
            instance=instance, instance_type=instance_type,
            reservations=reservations, topic=None,
            update_db=None, instance_uuid=None, instance_type_id=None)

    # Remove reservations and topic
    # Require instance_uuids in request_spec
    def run_instance(self, context, request_spec, admin_password,
            injected_files, requested_networks, is_first_time,
            filter_properties):
        return self.manager.run_instance(
            context, request_spec, admin_password, injected_files,
            requested_networks, is_first_time, filter_properties,
            reservations=None, topic=None)

    def show_host_resources(self, context, host):
        return self.manager.show_host_resources(context, host)

    # remove kwargs
    # require service_name, host and capabilities
    def update_service_capabilities(self, context, service_name,
                                    host, capabilities):
        return self.manager.update_service_capabilities(
            context, service_name=service_name, host=host,
            capabilities=capabilities)