diff options
| author | Soren Hansen <soren@linux2go.dk> | 2011-08-09 22:20:53 +0100 |
|---|---|---|
| committer | Soren Hansen <soren@linux2go.dk> | 2011-08-09 22:20:53 +0100 |
| commit | adc4d2dc71b6dcdad4bca57925f89d7344a613e8 (patch) | |
| tree | fd996b679318d453e1e742b7c4d514e3675348e3 /nova/compute | |
| parent | 1d269ad0b9a8bc7d30ff1f91faa9afe465f87e98 (diff) | |
| parent | d6943d72525fd6a48bc9b3407bc90d9da7f99ad9 (diff) | |
Merge trunk
Diffstat (limited to 'nova/compute')
| -rw-r--r-- | nova/compute/api.py | 101 | ||||
| -rw-r--r-- | nova/compute/instance_types.py | 9 | ||||
| -rw-r--r-- | nova/compute/manager.py | 118 | ||||
| -rw-r--r-- | nova/compute/monitor.py | 435 |
4 files changed, 164 insertions, 499 deletions
diff --git a/nova/compute/api.py b/nova/compute/api.py index adc023a4d..09ba1ee76 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -22,6 +22,7 @@ import eventlet import re import time +from nova import block_device from nova import db from nova import exception from nova import flags @@ -32,7 +33,6 @@ from nova import quota from nova import rpc from nova import utils from nova import volume -from nova.api.ec2 import ec2utils from nova.compute import instance_types from nova.compute import power_state from nova.compute.utils import terminate_volumes @@ -218,7 +218,7 @@ class API(base.Base): if reservation_id is None: reservation_id = utils.generate_uid('r') - root_device_name = ec2utils.properties_root_device_name( + root_device_name = block_device.properties_root_device_name( image['properties']) base_options = { @@ -250,34 +250,64 @@ class API(base.Base): return (num_instances, base_options, image) - def _update_image_block_device_mapping(self, elevated_context, instance_id, + @staticmethod + def _ephemeral_size(instance_type, ephemeral_name): + num = block_device.ephemeral_num(ephemeral_name) + + # TODO(yamahata): ephemeralN where N > 0 + # Only ephemeral0 is allowed for now because InstanceTypes + # table only allows single local disk, local_gb. + # In order to enhance it, we need to add a new columns to + # instance_types table. + if num > 0: + return 0 + + return instance_type.get('local_gb') + + def _update_image_block_device_mapping(self, elevated_context, + instance_type, instance_id, mappings): """tell vm driver to create ephemeral/swap device at boot time by updating BlockDeviceMapping """ - for bdm in ec2utils.mappings_prepend_dev(mappings): + instance_type = (instance_type or + instance_types.get_default_instance_type()) + + for bdm in block_device.mappings_prepend_dev(mappings): LOG.debug(_("bdm %s"), bdm) virtual_name = bdm['virtual'] if virtual_name == 'ami' or virtual_name == 'root': continue - assert (virtual_name == 'swap' or - virtual_name.startswith('ephemeral')) + if not block_device.is_swap_or_ephemeral(virtual_name): + continue + + size = 0 + if virtual_name == 'swap': + size = instance_type.get('swap', 0) + elif block_device.is_ephemeral(virtual_name): + size = self._ephemeral_size(instance_type, virtual_name) + + if size == 0: + continue + values = { 'instance_id': instance_id, 'device_name': bdm['device'], - 'virtual_name': virtual_name, } + 'virtual_name': virtual_name, + 'volume_size': size} self.db.block_device_mapping_update_or_create(elevated_context, values) - def _update_block_device_mapping(self, elevated_context, instance_id, + def _update_block_device_mapping(self, elevated_context, + instance_type, instance_id, block_device_mapping): """tell vm driver to attach volume at boot time by updating BlockDeviceMapping """ + LOG.debug(_("block_device_mapping %s"), block_device_mapping) for bdm in block_device_mapping: - LOG.debug(_('bdm %s'), bdm) assert 'device_name' in bdm values = {'instance_id': instance_id} @@ -286,10 +316,18 @@ class API(base.Base): 'no_device'): values[key] = bdm.get(key) + virtual_name = bdm.get('virtual_name') + if (virtual_name is not None and + block_device.is_ephemeral(virtual_name)): + size = self._ephemeral_size(instance_type, virtual_name) + if size == 0: + continue + values['volume_size'] = size + # NOTE(yamahata): NoDevice eliminates devices defined in image # files by command line option. # (--block-device-mapping) - if bdm.get('virtual_name') == 'NoDevice': + if virtual_name == 'NoDevice': values['no_device'] = True for k in ('delete_on_termination', 'volume_id', 'snapshot_id', 'volume_id', 'volume_size', @@ -299,8 +337,8 @@ class API(base.Base): self.db.block_device_mapping_update_or_create(elevated_context, values) - def create_db_entry_for_new_instance(self, context, image, base_options, - security_group, block_device_mapping, num=1): + def create_db_entry_for_new_instance(self, context, instance_type, image, + base_options, security_group, block_device_mapping, num=1): """Create an entry in the DB for this new instance, including any related table updates (such as security group, etc). @@ -333,12 +371,12 @@ class API(base.Base): security_group_id) # BlockDeviceMapping table - self._update_image_block_device_mapping(elevated, instance_id, - image['properties'].get('mappings', [])) - self._update_block_device_mapping(elevated, instance_id, + self._update_image_block_device_mapping(elevated, instance_type, + instance_id, image['properties'].get('mappings', [])) + self._update_block_device_mapping(elevated, instance_type, instance_id, image['properties'].get('block_device_mapping', [])) # override via command line option - self._update_block_device_mapping(elevated, instance_id, + self._update_block_device_mapping(elevated, instance_type, instance_id, block_device_mapping) # Set sane defaults if not specified @@ -356,6 +394,7 @@ class API(base.Base): instance_type, zone_blob, availability_zone, injected_files, admin_password, + image, instance_id=None, num_instances=1): """Send the run_instance request to the schedulers for processing.""" pid = context.project_id @@ -369,6 +408,7 @@ class API(base.Base): filter_class = 'nova.scheduler.host_filter.InstanceTypeFilter' request_spec = { + 'image': image, 'instance_properties': base_options, 'instance_type': instance_type, 'filter': filter_class, @@ -411,6 +451,7 @@ class API(base.Base): instance_type, zone_blob, availability_zone, injected_files, admin_password, + image, num_instances=num_instances) return base_options['reservation_id'] @@ -449,7 +490,8 @@ class API(base.Base): instances = [] LOG.debug(_("Going to run %s instances..."), num_instances) for num in range(num_instances): - instance = self.create_db_entry_for_new_instance(context, image, + instance = self.create_db_entry_for_new_instance(context, + instance_type, image, base_options, security_group, block_device_mapping, num=num) instances.append(instance) @@ -459,6 +501,7 @@ class API(base.Base): instance_type, zone_blob, availability_zone, injected_files, admin_password, + image, instance_id=instance_id) return [dict(x.iteritems()) for x in instances] @@ -687,7 +730,7 @@ class API(base.Base): raise instances = None elif project_id or not context.is_admin: - if not context.project: + if not context.project_id: instances = self.db.instance_get_all_by_user( context, context.user_id) else: @@ -886,7 +929,7 @@ class API(base.Base): params = {'migration_id': migration_ref['id']} self._cast_compute_message('revert_resize', context, instance_ref['uuid'], - migration_ref['source_compute'], + migration_ref['dest_compute'], params=params) self.db.migration_update(context, migration_ref['id'], @@ -906,7 +949,7 @@ class API(base.Base): params = {'migration_id': migration_ref['id']} self._cast_compute_message('confirm_resize', context, instance_ref['uuid'], - migration_ref['dest_compute'], + migration_ref['source_compute'], params=params) self.db.migration_update(context, migration_ref['id'], @@ -938,25 +981,22 @@ class API(base.Base): LOG.debug(_("Old instance type %(current_instance_type_name)s, " " new instance type %(new_instance_type_name)s") % locals()) if not new_instance_type: - raise exception.ApiError(_("Requested flavor %(flavor_id)d " - "does not exist") % locals()) + raise exception.FlavorNotFound(flavor_id=flavor_id) current_memory_mb = current_instance_type['memory_mb'] new_memory_mb = new_instance_type['memory_mb'] if current_memory_mb > new_memory_mb: - raise exception.ApiError(_("Invalid flavor: cannot downsize" - "instances")) + raise exception.CannotResizeToSmallerSize() if (current_memory_mb == new_memory_mb) and flavor_id: - raise exception.ApiError(_("Invalid flavor: cannot use" - "the same flavor. ")) + raise exception.CannotResizeToSameSize() instance_ref = self._get_instance(context, instance_id, 'resize') self._cast_scheduler_message(context, {"method": "prep_resize", "args": {"topic": FLAGS.compute_topic, "instance_id": instance_ref['uuid'], - "flavor_id": new_instance_type['id']}}) + "instance_type_id": new_instance_type['id']}}) @scheduler_api.reroute_compute("add_fixed_ip") def add_fixed_ip(self, context, instance_id, network_id): @@ -994,7 +1034,12 @@ class API(base.Base): def set_host_enabled(self, context, host, enabled): """Sets the specified host's ability to accept new instances.""" return self._call_compute_message("set_host_enabled", context, - instance_id=None, host=host, params={"enabled": enabled}) + host=host, params={"enabled": enabled}) + + def host_power_action(self, context, host, action): + """Reboots, shuts down or powers up the host.""" + return self._call_compute_message("host_power_action", context, + host=host, params={"action": action}) @scheduler_api.reroute_compute("diagnostics") def get_diagnostics(self, context, instance_id): diff --git a/nova/compute/instance_types.py b/nova/compute/instance_types.py index c13a629a9..824416514 100644 --- a/nova/compute/instance_types.py +++ b/nova/compute/instance_types.py @@ -132,11 +132,8 @@ def get_instance_type_by_name(name): # flavors. def get_instance_type_by_flavor_id(flavor_id): """Retrieve instance type by flavor_id.""" - if flavor_id is None: - return get_default_instance_type() + ctxt = context.get_admin_context() try: - ctxt = context.get_admin_context() return db.instance_type_get_by_flavor_id(ctxt, flavor_id) - except exception.DBError, e: - LOG.exception(_('DB error: %s') % e) - raise exception.ApiError(_("Unknown flavor: %s") % flavor_id) + except ValueError: + raise exception.FlavorNotFound(flavor_id=flavor_id) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 173469bc3..d38213083 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -44,6 +44,8 @@ import functools from eventlet import greenthread +import nova.context +from nova import block_device from nova import exception from nova import flags import nova.image @@ -147,6 +149,31 @@ class ComputeManager(manager.SchedulerDependentManager): def init_host(self): """Initialization for a standalone compute service.""" self.driver.init_host(host=self.host) + context = nova.context.get_admin_context() + instances = self.db.instance_get_all_by_host(context, self.host) + for instance in instances: + inst_name = instance['name'] + db_state = instance['state'] + drv_state = self._update_state(context, instance['id']) + + expect_running = db_state == power_state.RUNNING \ + and drv_state != db_state + + LOG.debug(_('Current state of %(inst_name)s is %(drv_state)s, ' + 'state in DB is %(db_state)s.'), locals()) + + if (expect_running and FLAGS.resume_guests_state_on_host_boot)\ + or FLAGS.start_guests_on_host_boot: + LOG.info(_('Rebooting instance %(inst_name)s after ' + 'nova-compute restart.'), locals()) + self.reboot_instance(context, instance['id']) + elif drv_state == power_state.RUNNING: + # Hyper-V and VMWareAPI drivers will raise and exception + try: + self.driver.ensure_filtering_rules_for_instance(instance) + except NotImplementedError: + LOG.warning(_('Hypervisor driver does not ' + 'support firewall rules')) def _update_state(self, context, instance_id, state=None): """Update the state of an instance from the driver info.""" @@ -154,6 +181,7 @@ class ComputeManager(manager.SchedulerDependentManager): if state is None: try: + LOG.debug(_('Checking state of %s'), instance_ref['name']) info = self.driver.get_info(instance_ref['name']) except exception.NotFound: info = None @@ -164,6 +192,7 @@ class ComputeManager(manager.SchedulerDependentManager): state = power_state.FAILED self.db.instance_set_state(context, instance_id, state) + return state def _update_launched_at(self, context, instance_id, launched_at=None): """Update the launched_at parameter of the given instance.""" @@ -232,6 +261,8 @@ class ComputeManager(manager.SchedulerDependentManager): volume_api = volume.API() block_device_mapping = [] + swap = None + ephemerals = [] for bdm in self.db.block_device_mapping_get_all_by_instance( context, instance_id): LOG.debug(_("setting up bdm %s"), bdm) @@ -239,11 +270,18 @@ class ComputeManager(manager.SchedulerDependentManager): if bdm['no_device']: continue if bdm['virtual_name']: - # TODO(yamahata): - # block devices for swap and ephemeralN will be - # created by virt driver locally in compute node. - assert (bdm['virtual_name'] == 'swap' or - bdm['virtual_name'].startswith('ephemeral')) + virtual_name = bdm['virtual_name'] + device_name = bdm['device_name'] + assert block_device.is_swap_or_ephemeral(virtual_name) + if virtual_name == 'swap': + swap = {'device_name': device_name, + 'swap_size': bdm['volume_size']} + elif block_device.is_ephemeral(virtual_name): + eph = {'num': block_device.ephemeral_num(virtual_name), + 'virtual_name': virtual_name, + 'device_name': device_name, + 'size': bdm['volume_size']} + ephemerals.append(eph) continue if ((bdm['snapshot_id'] is not None) and @@ -279,7 +317,7 @@ class ComputeManager(manager.SchedulerDependentManager): 'mount_device': bdm['device_name']}) - return block_device_mapping + return (swap, ephemerals, block_device_mapping) def _run_instance(self, context, instance_id, **kwargs): """Launch a new instance with specified options.""" @@ -320,13 +358,21 @@ class ComputeManager(manager.SchedulerDependentManager): # all vif creation and network injection, maybe this is correct network_info = [] - bd_mapping = self._setup_block_device_mapping(context, instance_id) + (swap, ephemerals, + block_device_mapping) = self._setup_block_device_mapping( + context, instance_id) + block_device_info = { + 'root_device_name': instance['root_device_name'], + 'swap': swap, + 'ephemerals': ephemerals, + 'block_device_mapping': block_device_mapping} # TODO(vish) check to make sure the availability zone matches self._update_state(context, instance_id, power_state.BUILDING) try: - self.driver.spawn(instance, network_info, bd_mapping) + self.driver.spawn(context, instance, + network_info, block_device_info) except Exception as ex: # pylint: disable=W0702 msg = _("Instance '%(instance_id)s' failed to spawn. Is " "virtualization enabled in the BIOS? Details: " @@ -433,7 +479,7 @@ class ComputeManager(manager.SchedulerDependentManager): network_info = self.network_api.get_instance_nw_info(context, instance_ref) bd_mapping = self._setup_block_device_mapping(context, instance_id) - self.driver.spawn(instance_ref, network_info, bd_mapping) + self.driver.spawn(context, instance_ref, network_info, bd_mapping) self._update_image_ref(context, instance_id, image_ref) self._update_launched_at(context, instance_id) @@ -501,7 +547,7 @@ class ComputeManager(manager.SchedulerDependentManager): 'instance: %(instance_id)s (state: %(state)s ' 'expected: %(running)s)') % locals()) - self.driver.snapshot(instance_ref, image_id) + self.driver.snapshot(context, instance_ref, image_id) if image_type == 'snapshot': if rotation: @@ -660,7 +706,7 @@ class ComputeManager(manager.SchedulerDependentManager): _update_state = lambda result: self._update_state_callback( self, context, instance_id, result) network_info = self._get_instance_nw_info(context, instance_ref) - self.driver.rescue(instance_ref, _update_state, network_info) + self.driver.rescue(context, instance_ref, _update_state, network_info) self._update_state(context, instance_id) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @@ -720,7 +766,8 @@ class ComputeManager(manager.SchedulerDependentManager): instance_ref['host']) rpc.cast(context, topic, {'method': 'finish_revert_resize', - 'args': {'migration_id': migration_ref['id']}, + 'args': {'instance_id': instance_ref['uuid'], + 'migration_id': migration_ref['id']}, }) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @@ -736,8 +783,8 @@ class ComputeManager(manager.SchedulerDependentManager): instance_ref = self.db.instance_get_by_uuid(context, migration_ref.instance_uuid) - instance_type = self.db.instance_type_get_by_flavor_id(context, - migration_ref['old_flavor_id']) + instance_type = self.db.instance_type_get(context, + migration_ref['old_instance_type_id']) # Just roll back the record. There's no need to resize down since # the 'old' VM already has the preferred attributes @@ -747,7 +794,7 @@ class ComputeManager(manager.SchedulerDependentManager): local_gb=instance_type['local_gb'], instance_type_id=instance_type['id'])) - self.driver.revert_resize(instance_ref) + self.driver.revert_migration(instance_ref) self.db.migration_update(context, migration_id, {'status': 'reverted'}) usage_info = utils.usage_from_instance(instance_ref) @@ -758,7 +805,7 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @checks_instance_lock - def prep_resize(self, context, instance_id, flavor_id): + def prep_resize(self, context, instance_id, instance_type_id): """Initiates the process of moving a running instance to another host. Possibly changes the RAM and disk size in the process. @@ -777,16 +824,16 @@ class ComputeManager(manager.SchedulerDependentManager): old_instance_type = self.db.instance_type_get(context, instance_ref['instance_type_id']) - new_instance_type = self.db.instance_type_get_by_flavor_id(context, - flavor_id) + new_instance_type = self.db.instance_type_get(context, + instance_type_id) migration_ref = self.db.migration_create(context, {'instance_uuid': instance_ref['uuid'], 'source_compute': instance_ref['host'], 'dest_compute': FLAGS.host, 'dest_host': self.driver.get_host_ip_addr(), - 'old_flavor_id': old_instance_type['flavorid'], - 'new_flavor_id': flavor_id, + 'old_instance_type_id': old_instance_type['id'], + 'new_instance_type_id': instance_type_id, 'status': 'pre-migrating'}) LOG.audit(_('instance %s: migrating'), instance_ref['uuid'], @@ -845,20 +892,27 @@ class ComputeManager(manager.SchedulerDependentManager): """ migration_ref = self.db.migration_get(context, migration_id) + + resize_instance = False instance_ref = self.db.instance_get_by_uuid(context, migration_ref.instance_uuid) - instance_type = self.db.instance_type_get_by_flavor_id(context, - migration_ref['new_flavor_id']) - self.db.instance_update(context, instance_ref.uuid, - dict(instance_type_id=instance_type['id'], - memory_mb=instance_type['memory_mb'], - vcpus=instance_type['vcpus'], - local_gb=instance_type['local_gb'])) + if migration_ref['old_instance_type_id'] != \ + migration_ref['new_instance_type_id']: + instance_type = self.db.instance_type_get(context, + migration_ref['new_instance_type_id']) + self.db.instance_update(context, instance_ref.uuid, + dict(instance_type_id=instance_type['id'], + memory_mb=instance_type['memory_mb'], + vcpus=instance_type['vcpus'], + local_gb=instance_type['local_gb'])) + resize_instance = True instance_ref = self.db.instance_get_by_uuid(context, instance_ref.uuid) + network_info = self._get_instance_nw_info(context, instance_ref) - self.driver.finish_resize(instance_ref, disk_info, network_info) + self.driver.finish_migration(context, instance_ref, disk_info, + network_info, resize_instance) self.db.migration_update(context, migration_id, {'status': 'finished', }) @@ -922,8 +976,12 @@ class ComputeManager(manager.SchedulerDependentManager): result)) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) - def set_host_enabled(self, context, instance_id=None, host=None, - enabled=None): + def host_power_action(self, context, host=None, action=None): + """Reboots, shuts down or powers up the host.""" + return self.driver.host_power_action(host, action) + + @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) + def set_host_enabled(self, context, host=None, enabled=None): """Sets the specified host's ability to accept new instances.""" return self.driver.set_host_enabled(host, enabled) diff --git a/nova/compute/monitor.py b/nova/compute/monitor.py deleted file mode 100644 index 9d8e2a25d..000000000 --- a/nova/compute/monitor.py +++ /dev/null @@ -1,435 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Instance Monitoring: - - Optionally may be run on each compute node. Provides RRD - based statistics and graphs and makes them internally available - in the object store. -""" - -import datetime -import os -import time - -import boto -import boto.s3 -import rrdtool -from twisted.internet import task -from twisted.application import service - -from nova import flags -from nova import log as logging -from nova import utils -from nova.virt import connection as virt_connection - - -FLAGS = flags.FLAGS -flags.DEFINE_integer('monitoring_instances_delay', 5, - 'Sleep time between updates') -flags.DEFINE_integer('monitoring_instances_step', 300, - 'Interval of RRD updates') -flags.DEFINE_string('monitoring_rrd_path', '$state_path/monitor/instances', - 'Location of RRD files') - - -RRD_VALUES = { - 'cpu': [ - 'DS:cpu:GAUGE:600:0:100', - 'RRA:AVERAGE:0.5:1:800', - 'RRA:AVERAGE:0.5:6:800', - 'RRA:AVERAGE:0.5:24:800', - 'RRA:AVERAGE:0.5:288:800', - 'RRA:MAX:0.5:1:800', - 'RRA:MAX:0.5:6:800', - 'RRA:MAX:0.5:24:800', - 'RRA:MAX:0.5:288:800', - ], - 'net': [ - 'DS:rx:COUNTER:600:0:1250000', - 'DS:tx:COUNTER:600:0:1250000', - 'RRA:AVERAGE:0.5:1:800', - 'RRA:AVERAGE:0.5:6:800', - 'RRA:AVERAGE:0.5:24:800', - 'RRA:AVERAGE:0.5:288:800', - 'RRA:MAX:0.5:1:800', - 'RRA:MAX:0.5:6:800', - 'RRA:MAX:0.5:24:800', - 'RRA:MAX:0.5:288:800', - ], - 'disk': [ - 'DS:rd:COUNTER:600:U:U', - 'DS:wr:COUNTER:600:U:U', - 'RRA:AVERAGE:0.5:1:800', - 'RRA:AVERAGE:0.5:6:800', - 'RRA:AVERAGE:0.5:24:800', - 'RRA:AVERAGE:0.5:288:800', - 'RRA:MAX:0.5:1:800', - 'RRA:MAX:0.5:6:800', - 'RRA:MAX:0.5:24:800', - 'RRA:MAX:0.5:444:800', - ]} - - -utcnow = utils.utcnow - - -LOG = logging.getLogger('nova.compute.monitor') - - -def update_rrd(instance, name, data): - """ - Updates the specified RRD file. - """ - filename = os.path.join(instance.get_rrd_path(), '%s.rrd' % name) - - if not os.path.exists(filename): - init_rrd(instance, name) - - timestamp = int(time.mktime(utcnow().timetuple())) - rrdtool.update(filename, '%d:%s' % (timestamp, data)) - - -def init_rrd(instance, name): - """ - Initializes the specified RRD file. - """ - path = os.path.join(FLAGS.monitoring_rrd_path, instance.instance_id) - - if not os.path.exists(path): - os.makedirs(path) - - filename = os.path.join(path, '%s.rrd' % name) - - if not os.path.exists(filename): - rrdtool.create( - filename, - '--step', '%d' % FLAGS.monitoring_instances_step, - '--start', '0', - *RRD_VALUES[name]) - - -def graph_cpu(instance, duration): - """ - Creates a graph of cpu usage for the specified instance and duration. - """ - path = instance.get_rrd_path() - filename = os.path.join(path, 'cpu-%s.png' % duration) - - rrdtool.graph( - filename, - '--disable-rrdtool-tag', - '--imgformat', 'PNG', - '--width', '400', - '--height', '120', - '--start', 'now-%s' % duration, - '--vertical-label', '% cpu used', - '-l', '0', - '-u', '100', - 'DEF:cpu=%s:cpu:AVERAGE' % os.path.join(path, 'cpu.rrd'), - 'AREA:cpu#eacc00:% CPU',) - - store_graph(instance.instance_id, filename) - - -def graph_net(instance, duration): - """ - Creates a graph of network usage for the specified instance and duration. - """ - path = instance.get_rrd_path() - filename = os.path.join(path, 'net-%s.png' % duration) - - rrdtool.graph( - filename, - '--disable-rrdtool-tag', - '--imgformat', 'PNG', - '--width', '400', - '--height', '120', - '--start', 'now-%s' % duration, - '--vertical-label', 'bytes/s', - '--logarithmic', - '--units', 'si', - '--lower-limit', '1000', - '--rigid', - 'DEF:rx=%s:rx:AVERAGE' % os.path.join(path, 'net.rrd'), - 'DEF:tx=%s:tx:AVERAGE' % os.path.join(path, 'net.rrd'), - 'AREA:rx#00FF00:In traffic', - 'LINE1:tx#0000FF:Out traffic',) - - store_graph(instance.instance_id, filename) - - -def graph_disk(instance, duration): - """ - Creates a graph of disk usage for the specified duration. - """ - path = instance.get_rrd_path() - filename = os.path.join(path, 'disk-%s.png' % duration) - - rrdtool.graph( - filename, - '--disable-rrdtool-tag', - '--imgformat', 'PNG', - '--width', '400', - '--height', '120', - '--start', 'now-%s' % duration, - '--vertical-label', 'bytes/s', - '--logarithmic', - '--units', 'si', - '--lower-limit', '1000', - '--rigid', - 'DEF:rd=%s:rd:AVERAGE' % os.path.join(path, 'disk.rrd'), - 'DEF:wr=%s:wr:AVERAGE' % os.path.join(path, 'disk.rrd'), - 'AREA:rd#00FF00:Read', - 'LINE1:wr#0000FF:Write',) - - store_graph(instance.instance_id, filename) - - -def store_graph(instance_id, filename): - """ - Transmits the specified graph file to internal object store on cloud - controller. - """ - # TODO(devcamcar): Need to use an asynchronous method to make this - # connection. If boto has some separate method that generates - # the request it would like to make and another method to parse - # the response we can make our own client that does the actual - # request and hands it off to the response parser. - s3 = boto.s3.connection.S3Connection( - aws_access_key_id=FLAGS.aws_access_key_id, - aws_secret_access_key=FLAGS.aws_secret_access_key, - is_secure=False, - calling_format=boto.s3.connection.OrdinaryCallingFormat(), - port=FLAGS.s3_port, - host=FLAGS.s3_host) - bucket_name = '_%s.monitor' % instance_id - - # Object store isn't creating the bucket like it should currently - # when it is first requested, so have to catch and create manually. - try: - bucket = s3.get_bucket(bucket_name) - except Exception: - bucket = s3.create_bucket(bucket_name) - - key = boto.s3.Key(bucket) - key.key = os.path.basename(filename) - key.set_contents_from_filename(filename) - - -class Instance(object): - def __init__(self, conn, instance_id): - self.conn = conn - self.instance_id = instance_id - self.last_updated = datetime.datetime.min - self.cputime = 0 - self.cputime_last_updated = None - - init_rrd(self, 'cpu') - init_rrd(self, 'net') - init_rrd(self, 'disk') - - def needs_update(self): - """ - Indicates whether this instance is due to have its statistics updated. - """ - delta = utcnow() - self.last_updated - return delta.seconds >= FLAGS.monitoring_instances_step - - def update(self): - """ - Updates the instances statistics and stores the resulting graphs - in the internal object store on the cloud controller. - """ - LOG.debug(_('updating %s...'), self.instance_id) - - try: - data = self.fetch_cpu_stats() - if data is not None: - LOG.debug('CPU: %s', data) - update_rrd(self, 'cpu', data) - - data = self.fetch_net_stats() - LOG.debug('NET: %s', data) - update_rrd(self, 'net', data) - - data = self.fetch_disk_stats() - LOG.debug('DISK: %s', data) - update_rrd(self, 'disk', data) - - # TODO(devcamcar): Turn these into pool.ProcessPool.execute() calls - # and make the methods @defer.inlineCallbacks. - graph_cpu(self, '1d') - graph_cpu(self, '1w') - graph_cpu(self, '1m') - - graph_net(self, '1d') - graph_net(self, '1w') - graph_net(self, '1m') - - graph_disk(self, '1d') - graph_disk(self, '1w') - graph_disk(self, '1m') - except Exception: - LOG.exception(_('unexpected error during update')) - - self.last_updated = utcnow() - - def get_rrd_path(self): - """ - Returns the path to where RRD files are stored. - """ - return os.path.join(FLAGS.monitoring_rrd_path, self.instance_id) - - def fetch_cpu_stats(self): - """ - Returns cpu usage statistics for this instance. - """ - info = self.conn.get_info(self.instance_id) - - # Get the previous values. - cputime_last = self.cputime - cputime_last_updated = self.cputime_last_updated - - # Get the raw CPU time used in nanoseconds. - self.cputime = float(info['cpu_time']) - self.cputime_last_updated = utcnow() - - LOG.debug('CPU: %d', self.cputime) - - # Skip calculation on first pass. Need delta to get a meaningful value. - if cputime_last_updated is None: - return None - - # Calculate the number of seconds between samples. - d = self.cputime_last_updated - cputime_last_updated - t = d.days * 86400 + d.seconds - - LOG.debug('t = %d', t) - - # Calculate change over time in number of nanoseconds of CPU time used. - cputime_delta = self.cputime - cputime_last - - LOG.debug('cputime_delta = %s', cputime_delta) - - # Get the number of virtual cpus in this domain. - vcpus = int(info['num_cpu']) - - LOG.debug('vcpus = %d', vcpus) - - # Calculate CPU % used and cap at 100. - return min(cputime_delta / (t * vcpus * 1.0e9) * 100, 100) - - def fetch_disk_stats(self): - """ - Returns disk usage statistics for this instance. - """ - rd = 0 - wr = 0 - - disks = self.conn.get_disks(self.instance_id) - - # Aggregate the read and write totals. - for disk in disks: - try: - rd_req, rd_bytes, wr_req, wr_bytes, errs = \ - self.conn.block_stats(self.instance_id, disk) - rd += rd_bytes - wr += wr_bytes - except TypeError: - iid = self.instance_id - LOG.error(_('Cannot get blockstats for "%(disk)s"' - ' on "%(iid)s"') % locals()) - raise - - return '%d:%d' % (rd, wr) - - def fetch_net_stats(self): - """ - Returns network usage statistics for this instance. - """ - rx = 0 - tx = 0 - - interfaces = self.conn.get_interfaces(self.instance_id) - - # Aggregate the in and out totals. - for interface in interfaces: - try: - stats = self.conn.interface_stats(self.instance_id, interface) - rx += stats[0] - tx += stats[4] - except TypeError: - iid = self.instance_id - LOG.error(_('Cannot get ifstats for "%(interface)s"' - ' on "%(iid)s"') % locals()) - raise - - return '%d:%d' % (rx, tx) - - -class InstanceMonitor(object, service.Service): - """ - Monitors the running instances of the current machine. - """ - - def __init__(self): - """ - Initialize the monitoring loop. - """ - self._instances = {} - self._loop = task.LoopingCall(self.updateInstances) - - def startService(self): - self._instances = {} - self._loop.start(interval=FLAGS.monitoring_instances_delay) - service.Service.startService(self) - - def stopService(self): - self._loop.stop() - service.Service.stopService(self) - - def updateInstances(self): - """ - Update resource usage for all running instances. - """ - try: - conn = virt_connection.get_connection(read_only=True) - except Exception, exn: - LOG.exception(_('unexpected exception getting connection')) - time.sleep(FLAGS.monitoring_instances_delay) - return - - domain_ids = conn.list_instances() - try: - self.updateInstances_(conn, domain_ids) - except Exception, exn: - LOG.exception('updateInstances_') - - def updateInstances_(self, conn, domain_ids): - for domain_id in domain_ids: - if not domain_id in self._instances: - instance = Instance(conn, domain_id) - self._instances[domain_id] = instance - LOG.debug(_('Found instance: %s'), domain_id) - - for key in self._instances.keys(): - instance = self._instances[key] - if instance.needs_update(): - instance.update() |
