diff options
| author | Todd Willey <todd@ansolabs.com> | 2011-01-18 17:42:00 -0500 |
|---|---|---|
| committer | Todd Willey <todd@ansolabs.com> | 2011-01-18 17:42:00 -0500 |
| commit | 19aae256676e45a8147f25553398ca980b3e1bd4 (patch) | |
| tree | d8a78787a0a1d93d719c7615b6223e3fae3bec27 | |
| parent | d4e7eb818c9f4ec51fd3a88a0e92d557867511d4 (diff) | |
| parent | 5b496ed35c53869623e4a968ab9d310089f3bfb2 (diff) | |
Trunk merge.
31 files changed, 1529 insertions, 145 deletions
@@ -16,6 +16,8 @@ <jmckenty@gmail.com> <jmckenty@joshua-mckentys-macbook-pro.local> <jmckenty@gmail.com> <joshua.mckenty@nasa.gov> <justin@fathomdb.com> <justinsb@justinsb-desktop> +<masumotok@nttdata.co.jp> <root@openstack2-api> +<masumotok@nttdata.co.jp> Masumoto<masumotok@nttdata.co.jp> <mordred@inaugust.com> <mordred@hudson> <paul@openstack.org> <pvoccio@castor.local> <paul@openstack.org> <paul.voccio@rackspace.com> @@ -26,6 +26,7 @@ Josh Durgin <joshd@hq.newdream.net> Josh Kearney <josh.kearney@rackspace.com> Joshua McKenty <jmckenty@gmail.com> Justin Santa Barbara <justin@fathomdb.com> +Kei Masumoto <masumotok@nttdata.co.jp> Ken Pepple <ken.pepple@gmail.com> Koji Iida <iida.koji@lab.ntt.co.jp> Lorin Hochstein <lorin@isi.edu> @@ -34,6 +35,7 @@ Michael Gundlach <michael.gundlach@rackspace.com> Monsyne Dragon <mdragon@rackspace.com> Monty Taylor <mordred@inaugust.com> MORITA Kazutaka <morita.kazutaka@gmail.com> +Muneyuki Noguchi <noguchimn@nttdata.co.jp> Nachi Ueno <ueno.nachi@lab.ntt.co.jp> <openstack@lab.ntt.co.jp> <nati.ueno@gmail.com> <nova@u4> Paul Voccio <paul@openstack.org> Rick Clark <rick@openstack.org> diff --git a/bin/nova-manage b/bin/nova-manage index b5842b595..1ad3120b8 100755 --- a/bin/nova-manage +++ b/bin/nova-manage @@ -62,6 +62,7 @@ import time import IPy + # If ../nova/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), @@ -81,8 +82,9 @@ from nova import log as logging from nova import quota from nova import utils from nova.auth import manager +from nova import rpc from nova.cloudpipe import pipelib - +from nova.api.ec2 import cloud logging.basicConfig() FLAGS = flags.FLAGS @@ -465,6 +467,82 @@ class NetworkCommands(object): int(vpn_start), fixed_range_v6) +class InstanceCommands(object): + """Class for mangaging VM instances.""" + + def live_migration(self, ec2_id, dest): + """live_migration""" + + ctxt = context.get_admin_context() + instance_id = cloud.ec2_id_to_id(ec2_id) + + if FLAGS.connection_type != 'libvirt': + msg = _('Only KVM is supported for now. Sorry!') + raise exception.Error(msg) + + if FLAGS.volume_driver != 'nova.volume.driver.AOEDriver': + instance_ref = db.instance_get(ctxt, instance_id) + if len(instance_ref['volumes']) != 0: + msg = _(("""Volumes attached by ISCSIDriver""" + """ are not supported. Sorry!""")) + raise exception.Error(msg) + + rpc.call(ctxt, + FLAGS.scheduler_topic, + {"method": "live_migration", + "args": {"instance_id": instance_id, + "dest": dest, + "topic": FLAGS.compute_topic}}) + + msg = 'Migration of %s initiated. ' % ec2_id + msg += 'Check its progress using euca-describe-instances.' + print msg + + +class HostCommands(object): + """Class for mangaging host(physical nodes).""" + + def list(self): + """describe host list.""" + + # To supress msg: No handlers could be found for logger "amqplib" + logging.basicConfig() + + service_refs = db.service_get_all(context.get_admin_context()) + hosts = [h['host'] for h in service_refs] + hosts = list(set(hosts)) + for host in hosts: + print host + + def show(self, host): + """describe cpu/memory/hdd info for host.""" + + result = rpc.call(context.get_admin_context(), + FLAGS.scheduler_topic, + {"method": "show_host_resource", + "args": {"host": host}}) + + # Checking result msg format is necessary, that will have done + # when this feture is included in API. + if type(result) != dict: + print 'Unexpected error occurs' + elif not result['ret']: + print '%s' % result['msg'] + else: + cpu = result['phy_resource']['vcpus'] + mem = result['phy_resource']['memory_mb'] + hdd = result['phy_resource']['local_gb'] + + print 'HOST\t\tPROJECT\t\tcpu\tmem(mb)\tdisk(gb)' + print '%s\t\t\t%s\t%s\t%s' % (host, cpu, mem, hdd) + for p_id, val in result['usage'].items(): + print '%s\t%s\t\t%s\t%s\t%s' % (host, + p_id, + val['vcpus'], + val['memory_mb'], + val['local_gb']) + + class ServiceCommands(object): """Enable and disable running services""" @@ -527,6 +605,8 @@ CATEGORIES = [ ('vpn', VpnCommands), ('floating', FloatingIpCommands), ('network', NetworkCommands), + ('instance', InstanceCommands), + ('host', HostCommands), ('service', ServiceCommands), ('log', LogCommands)] diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index 57d41ed67..c94540793 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -729,7 +729,7 @@ class CloudController(object): ec2_id = None if (floating_ip_ref['fixed_ip'] and floating_ip_ref['fixed_ip']['instance']): - instance_id = floating_ip_ref['fixed_ip']['instance']['ec2_id'] + instance_id = floating_ip_ref['fixed_ip']['instance']['id'] ec2_id = id_to_ec2_id(instance_id) address_rv = {'public_ip': address, 'instance_id': ec2_id} diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 9c4a23d08..bc73f826b 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -41,6 +41,7 @@ import logging import socket import functools +from nova import db from nova import exception from nova import flags from nova import log as logging @@ -120,6 +121,35 @@ class ComputeManager(manager.Manager): """ self.driver.init_host() + def update_service(self, ctxt, host, binary): + """Insert compute node specific information to DB.""" + + try: + service_ref = db.service_get_by_args(ctxt, + host, + binary) + except exception.NotFound: + msg = _(("""Cannot insert compute manager specific info""" + """Because no service record found.""")) + raise exception.Invalid(msg) + + # Updating host information + vcpu = self.driver.get_vcpu_number() + memory_mb = self.driver.get_memory_mb() + local_gb = self.driver.get_local_gb() + hypervisor = self.driver.get_hypervisor_type() + version = self.driver.get_hypervisor_version() + cpu_info = self.driver.get_cpu_info() + + db.service_update(ctxt, + service_ref['id'], + {'vcpus': vcpu, + 'memory_mb': memory_mb, + 'local_gb': local_gb, + 'hypervisor_type': hypervisor, + 'hypervisor_version': version, + 'cpu_info': cpu_info}) + def _update_state(self, context, instance_id): """Update the state of an instance from the driver info.""" # FIXME(ja): include other fields from state? @@ -183,9 +213,10 @@ class ComputeManager(manager.Manager): raise exception.Error(_("Instance has already been created")) LOG.audit(_("instance %s: starting..."), instance_id, context=context) + self.db.instance_update(context, instance_id, - {'host': self.host}) + {'host': self.host, 'launched_on': self.host}) self.db.instance_set_state(context, instance_id, @@ -565,3 +596,88 @@ class ComputeManager(manager.Manager): self.volume_manager.remove_compute_volume(context, volume_id) self.db.volume_detached(context, volume_id) return True + + def compare_cpu(self, context, cpu_info): + """ Check the host cpu is compatible to a cpu given by xml.""" + return self.driver.compare_cpu(cpu_info) + + def pre_live_migration(self, context, instance_id, dest): + """Any preparation for live migration at dst host.""" + + # Getting instance info + instance_ref = db.instance_get(context, instance_id) + ec2_id = instance_ref['hostname'] + + # Getting fixed ips + fixed_ip = db.instance_get_fixed_address(context, instance_id) + if not fixed_ip: + msg = _('%s(%s) doesnt have fixed_ip') % (instance_id, ec2_id) + raise exception.NotFound(msg) + + # If any volume is mounted, prepare here. + if len(instance_ref['volumes']) == 0: + logging.info(_("%s has no volume.") % ec2_id) + else: + for v in instance_ref['volumes']: + self.volume_manager.setup_compute_volume(context, v['id']) + + # Bridge settings + # call this method prior to ensure_filtering_rules_for_instance, + # since bridge is not set up, ensure_filtering_rules_for instance + # fails. + self.network_manager.setup_compute_network(context, instance_id) + + # Creating filters to hypervisors and firewalls. + # An example is that nova-instance-instance-xxx, + # which is written to libvirt.xml( check "virsh nwfilter-list ) + # On destination host, this nwfilter is necessary. + # In addition, this method is creating filtering rule + # onto destination host. + self.driver.ensure_filtering_rules_for_instance(instance_ref) + + def live_migration(self, context, instance_id, dest): + """executes live migration.""" + + # Get instance for error handling. + instance_ref = db.instance_get(context, instance_id) + ec2_id = instance_ref['hostname'] + + try: + # Checking volume node is working correctly when any volumes + # are attached to instances. + if len(instance_ref['volumes']) != 0: + rpc.call(context, + FLAGS.volume_topic, + {"method": "check_for_export", + "args": {'instance_id': instance_id}}) + + # Asking dest host to preparing live migration. + compute_topic = db.queue_get_for(context, + FLAGS.compute_topic, + dest) + rpc.call(context, + compute_topic, + {"method": "pre_live_migration", + "args": {'instance_id': instance_id, + 'dest': dest}}) + + except Exception, e: + msg = _('Pre live migration for %s failed at %s') + logging.error(msg, ec2_id, dest) + db.instance_set_state(context, + instance_id, + power_state.RUNNING, + 'running') + + for v in instance_ref['volumes']: + db.volume_update(context, + v['id'], + {'status': 'in-use'}) + + # e should be raised. just calling "raise" may raise NotFound. + raise e + + # Executing live migration + # live_migration might raises exceptions, but + # nothing must be recovered in this version. + self.driver.live_migration(context, instance_ref, dest) diff --git a/nova/db/api.py b/nova/db/api.py index a05c8159e..a42e06e3b 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -253,6 +253,10 @@ def floating_ip_get_by_address(context, address): return IMPL.floating_ip_get_by_address(context, address) +def floating_ip_update(context, address, values): + """update floating ip information.""" + return IMPL.floating_ip_update(context, address, values) + #################### @@ -405,6 +409,32 @@ def instance_add_security_group(context, instance_id, security_group_id): security_group_id) +def instance_get_all_by_host(context, hostname): + """Get instances by host""" + return IMPL.instance_get_all_by_host(context, hostname) + + +def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id): + """Get instances.vcpus by host and project""" + return IMPL.instance_get_vcpu_sum_by_host_and_project(context, + hostname, + proj_id) + + +def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id): + """Get amount of memory by host and project """ + return IMPL.instance_get_memory_sum_by_host_and_project(context, + hostname, + proj_id) + + +def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id): + """Get total amount of disk by host and project """ + return IMPL.instance_get_disk_sum_by_host_and_project(context, + hostname, + proj_id) + + def instance_action_create(context, values): """Create an instance action from the values dictionary.""" return IMPL.instance_action_create(context, values) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 4223aa0f7..95ed2d59e 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -495,6 +495,16 @@ def floating_ip_get_by_address(context, address, session=None): return result +@require_context +def floating_ip_update(context, address, values): + session = get_session() + with session.begin(): + floating_ip_ref = floating_ip_get_by_address(context, address, session) + for (key, value) in values.iteritems(): + floating_ip_ref[key] = value + floating_ip_ref.save(session=session) + + ################### @@ -858,6 +868,7 @@ def instance_update(context, instance_id, values): return instance_ref +@require_context def instance_add_security_group(context, instance_id, security_group_id): """Associate the given security group with the given instance""" session = get_session() @@ -871,6 +882,59 @@ def instance_add_security_group(context, instance_id, security_group_id): @require_context +def instance_get_all_by_host(context, hostname): + session = get_session() + if not session: + session = get_session() + + result = session.query(models.Instance).\ + filter_by(host=hostname).\ + filter_by(deleted=can_read_deleted(context)).\ + all() + if not result: + return [] + return result + + +@require_context +def _instance_get_sum_by_host_and_project(context, column, hostname, proj_id): + session = get_session() + + result = session.query(models.Instance).\ + filter_by(host=hostname).\ + filter_by(project_id=proj_id).\ + filter_by(deleted=can_read_deleted(context)).\ + value(column) + if not result: + return 0 + return result + + +@require_context +def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id): + return _instance_get_sum_by_host_and_project(context, + 'vcpus', + hostname, + proj_id) + + +@require_context +def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id): + return _instance_get_sum_by_host_and_project(context, + 'memory_mb', + hostname, + proj_id) + + +@require_context +def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id): + return _instance_get_sum_by_host_and_project(context, + 'local_gb', + hostname, + proj_id) + + +@require_context def instance_action_create(context, values): """Create an instance action from the values dictionary.""" action_ref = models.InstanceActions() diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index 1d82cff18..94318c5a8 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -150,13 +150,32 @@ class Service(BASE, NovaBase): __tablename__ = 'services' id = Column(Integer, primary_key=True) - host = Column(String(255)) # , ForeignKey('hosts.id')) + #host_id = Column(Integer, ForeignKey('hosts.id'), nullable=True) + #host = relationship(Host, backref=backref('services')) + host = Column(String(255)) binary = Column(String(255)) topic = Column(String(255)) report_count = Column(Integer, nullable=False, default=0) disabled = Column(Boolean, default=False) availability_zone = Column(String(255), default='nova') + # The below items are compute node only. + # -1 or None is inserted for other service. + vcpus = Column(Integer, nullable=False, default=-1) + memory_mb = Column(Integer, nullable=False, default=-1) + local_gb = Column(Integer, nullable=False, default=-1) + hypervisor_type = Column(String(128)) + hypervisor_version = Column(Integer, nullable=False, default=-1) + # Note(masumotok): Expected Strings example: + # + # '{"arch":"x86_64", "model":"Nehalem", + # "topology":{"sockets":1, "threads":2, "cores":3}, + # features:[ "tdtscp", "xtpr"]}' + # + # Points are "json translatable" and it must have all + # dictionary keys above. + cpu_info = Column(String(512)) + class Certificate(BASE, NovaBase): """Represents a an x509 certificate""" @@ -231,6 +250,9 @@ class Instance(BASE, NovaBase): display_name = Column(String(255)) display_description = Column(String(255)) + # To remember on which host a instance booted. + # An instance may moved to other host by live migraiton. + launched_on = Column(String(255)) locked = Column(Boolean) # TODO(vish): see Ewan's email about state improvements, probably @@ -599,7 +621,7 @@ def register_models(): Volume, ExportDevice, IscsiTarget, FixedIp, FloatingIp, Network, SecurityGroup, SecurityGroupIngressRule, SecurityGroupInstanceAssociation, AuthToken, User, - Project, Certificate, ConsolePool, Console) # , Image, Host + Project, Certificate, ConsolePool, Console) # , Host, Image engine = create_engine(FLAGS.sql_connection, echo=False) for model in models: model.metadata.create_all(engine) diff --git a/nova/network/manager.py b/nova/network/manager.py index 2a043cc6b..932c77d31 100644 --- a/nova/network/manager.py +++ b/nova/network/manager.py @@ -159,7 +159,7 @@ class NetworkManager(manager.Manager): """Called when this host becomes the host for a network.""" raise NotImplementedError() - def setup_compute_network(self, context, instance_id): + def setup_compute_network(self, context, instance_id, network_ref=None): """Sets up matching network for compute hosts.""" raise NotImplementedError() @@ -320,7 +320,7 @@ class FlatManager(NetworkManager): self.db.fixed_ip_update(context, address, {'allocated': False}) self.db.fixed_ip_disassociate(context.elevated(), address) - def setup_compute_network(self, context, instance_id): + def setup_compute_network(self, context, instance_id, network_ref=None): """Network is created manually.""" pass @@ -395,9 +395,10 @@ class FlatDHCPManager(FlatManager): super(FlatDHCPManager, self).init_host() self.driver.metadata_forward() - def setup_compute_network(self, context, instance_id): + def setup_compute_network(self, context, instance_id, network_ref=None): """Sets up matching network for compute hosts.""" - network_ref = db.network_get_by_instance(context, instance_id) + if network_ref is None: + network_ref = db.network_get_by_instance(context, instance_id) self.driver.ensure_bridge(network_ref['bridge'], FLAGS.flat_interface) @@ -487,9 +488,10 @@ class VlanManager(NetworkManager): """Returns a fixed ip to the pool.""" self.db.fixed_ip_update(context, address, {'allocated': False}) - def setup_compute_network(self, context, instance_id): + def setup_compute_network(self, context, instance_id, network_ref=None): """Sets up matching network for compute hosts.""" - network_ref = db.network_get_by_instance(context, instance_id) + if network_ref is None: + network_ref = db.network_get_by_instance(context, instance_id) self.driver.ensure_vlan_bridge(network_ref['vlan'], network_ref['bridge']) diff --git a/nova/scheduler/driver.py b/nova/scheduler/driver.py index 66e46c1b9..65745093b 100644 --- a/nova/scheduler/driver.py +++ b/nova/scheduler/driver.py @@ -26,6 +26,9 @@ import datetime from nova import db from nova import exception from nova import flags +from nova import log as logging +from nova import rpc +from nova.compute import power_state FLAGS = flags.FLAGS flags.DEFINE_integer('service_down_time', 60, @@ -64,3 +67,183 @@ class Scheduler(object): def schedule(self, context, topic, *_args, **_kwargs): """Must override at least this method for scheduler to work.""" raise NotImplementedError(_("Must implement a fallback schedule")) + + def schedule_live_migration(self, context, instance_id, dest): + """ live migration method """ + + # Whether instance exists and running + instance_ref = db.instance_get(context, instance_id) + ec2_id = instance_ref['hostname'] + + # Checking instance. + self._live_migration_src_check(context, instance_ref) + + # Checking destination host. + self._live_migration_dest_check(context, instance_ref, dest) + + # Common checking. + self._live_migration_common_check(context, instance_ref, dest) + + # Changing instance_state. + db.instance_set_state(context, + instance_id, + power_state.PAUSED, + 'migrating') + + # Changing volume state + for v in instance_ref['volumes']: + db.volume_update(context, + v['id'], + {'status': 'migrating'}) + + # Return value is necessary to send request to src + # Check _schedule() in detail. + src = instance_ref['host'] + return src + + def _live_migration_src_check(self, context, instance_ref): + """Live migration check routine (for src host)""" + + # Checking instance is running. + if power_state.RUNNING != instance_ref['state'] or \ + 'running' != instance_ref['state_description']: + msg = _('Instance(%s) is not running') + ec2_id = instance_ref['hostname'] + raise exception.Invalid(msg % ec2_id) + + # Checing volume node is running when any volumes are mounted + # to the instance. + if len(instance_ref['volumes']) != 0: + services = db.service_get_all_by_topic(context, 'volume') + if len(services) < 1 or not self.service_is_up(services[0]): + msg = _('volume node is not alive(time synchronize problem?)') + raise exception.Invalid(msg) + + # Checking src host is alive. + src = instance_ref['host'] + services = db.service_get_all_by_topic(context, 'compute') + services = [service for service in services if service.host == src] + if len(services) < 1 or not self.service_is_up(services[0]): + msg = _('%s is not alive(time synchronize problem?)') + raise exception.Invalid(msg % src) + + def _live_migration_dest_check(self, context, instance_ref, dest): + """Live migration check routine (for destination host)""" + + # Checking dest exists and compute node. + dservice_refs = db.service_get_all_by_host(context, dest) + if len(dservice_refs) <= 0: + msg = _('%s does not exists.') + raise exception.Invalid(msg % dest) + + dservice_ref = dservice_refs[0] + if dservice_ref['topic'] != 'compute': + msg = _('%s must be compute node') + raise exception.Invalid(msg % dest) + + # Checking dest host is alive. + if not self.service_is_up(dservice_ref): + msg = _('%s is not alive(time synchronize problem?)') + raise exception.Invalid(msg % dest) + + # Checking whether The host where instance is running + # and dest is not same. + src = instance_ref['host'] + if dest == src: + ec2_id = instance_ref['hostname'] + msg = _('%s is where %s is running now. choose other host.') + raise exception.Invalid(msg % (dest, ec2_id)) + + # Checking dst host still has enough capacities. + self.has_enough_resource(context, instance_ref, dest) + + def _live_migration_common_check(self, context, instance_ref, dest): + """ + Live migration check routine. + Below pre-checkings are followed by + http://wiki.libvirt.org/page/TodoPreMigrationChecks + + """ + + # Checking dest exists. + dservice_refs = db.service_get_all_by_host(context, dest) + if len(dservice_refs) <= 0: + msg = _('%s does not exists.') + raise exception.Invalid(msg % dest) + dservice_ref = dservice_refs[0] + + # Checking original host( where instance was launched at) exists. + orighost = instance_ref['launched_on'] + oservice_refs = db.service_get_all_by_host(context, orighost) + if len(oservice_refs) <= 0: + msg = _('%s(where instance was launched at) does not exists.') + raise exception.Invalid(msg % orighost) + oservice_ref = oservice_refs[0] + + # Checking hypervisor is same. + otype = oservice_ref['hypervisor_type'] + dtype = dservice_ref['hypervisor_type'] + if otype != dtype: + msg = _('Different hypervisor type(%s->%s)') + raise exception.Invalid(msg % (otype, dtype)) + + # Checkng hypervisor version. + oversion = oservice_ref['hypervisor_version'] + dversion = dservice_ref['hypervisor_version'] + if oversion > dversion: + msg = _('Older hypervisor version(%s->%s)') + raise exception.Invalid(msg % (oversion, dversion)) + + # Checking cpuinfo. + cpu_info = oservice_ref['cpu_info'] + try: + rpc.call(context, + db.queue_get_for(context, FLAGS.compute_topic, dest), + {"method": 'compare_cpu', + "args": {'cpu_info': cpu_info}}) + + except rpc.RemoteError, e: + msg = _(("""%s doesnt have compatibility to %s""" + """(where %s was launched at)""")) + ec2_id = instance_ref['hostname'] + src = instance_ref['host'] + logging.error(msg % (dest, src, ec2_id)) + raise e + + def has_enough_resource(self, context, instance_ref, dest): + """ Check if destination host has enough resource for live migration""" + + # Getting instance information + ec2_id = instance_ref['hostname'] + vcpus = instance_ref['vcpus'] + mem = instance_ref['memory_mb'] + hdd = instance_ref['local_gb'] + + # Gettin host information + service_refs = db.service_get_all_by_host(context, dest) + if len(service_refs) <= 0: + msg = _('%s does not exists.') + raise exception.Invalid(msg % dest) + service_ref = service_refs[0] + + total_cpu = int(service_ref['vcpus']) + total_mem = int(service_ref['memory_mb']) + total_hdd = int(service_ref['local_gb']) + + instances_ref = db.instance_get_all_by_host(context, dest) + for i_ref in instances_ref: + total_cpu -= int(i_ref['vcpus']) + total_mem -= int(i_ref['memory_mb']) + total_hdd -= int(i_ref['local_gb']) + + # Checking host has enough information + logging.debug('host(%s) remains vcpu:%s mem:%s hdd:%s,' % + (dest, total_cpu, total_mem, total_hdd)) + logging.debug('instance(%s) has vcpu:%s mem:%s hdd:%s,' % + (ec2_id, vcpus, mem, hdd)) + + if total_cpu <= vcpus or total_mem <= mem or total_hdd <= hdd: + msg = '%s doesnt have enough resource for %s' % (dest, ec2_id) + raise exception.NotEmpty(msg) + + logging.debug(_('%s has_enough_resource() for %s') % (dest, ec2_id)) diff --git a/nova/scheduler/manager.py b/nova/scheduler/manager.py index a4d6dd574..1cc767a03 100644 --- a/nova/scheduler/manager.py +++ b/nova/scheduler/manager.py @@ -29,6 +29,7 @@ from nova import log as logging from nova import manager from nova import rpc from nova import utils +from nova import exception LOG = logging.getLogger('nova.scheduler.manager') FLAGS = flags.FLAGS @@ -67,3 +68,50 @@ class SchedulerManager(manager.Manager): {"method": method, "args": kwargs}) LOG.debug(_("Casting to %s %s for %s"), topic, host, method) + + # NOTE (masumotok) : This method should be moved to nova.api.ec2.admin. + # Based on bear design summit discussion, + # just put this here for bexar release. + def show_host_resource(self, context, host, *args): + """ show the physical/usage resource given by hosts.""" + + services = db.service_get_all_by_host(context, host) + if len(services) == 0: + return {'ret': False, 'msg': 'No such Host'} + + compute = [s for s in services if s['topic'] == 'compute'] + if 0 == len(compute): + service_ref = services[0] + else: + service_ref = compute[0] + + # Getting physical resource information + h_resource = {'vcpus': service_ref['vcpus'], + 'memory_mb': service_ref['memory_mb'], + 'local_gb': service_ref['local_gb']} + + # Getting usage resource information + u_resource = {} + instances_ref = db.instance_get_all_by_host(context, + service_ref['host']) + + if 0 == len(instances_ref): + return {'ret': True, 'phy_resource': h_resource, 'usage': {}} + + project_ids = [i['project_id'] for i in instances_ref] + project_ids = list(set(project_ids)) + for p_id in project_ids: + vcpus = db.instance_get_vcpu_sum_by_host_and_project(context, + host, + p_id) + mem = db.instance_get_memory_sum_by_host_and_project(context, + host, + p_id) + hdd = db.instance_get_disk_sum_by_host_and_project(context, + host, + p_id) + u_resource[p_id] = {'vcpus': vcpus, + 'memory_mb': mem, + 'local_gb': hdd} + + return {'ret': True, 'phy_resource': h_resource, 'usage': u_resource} diff --git a/nova/service.py b/nova/service.py index 8b2a22ce0..a8d52e93b 100644 --- a/nova/service.py +++ b/nova/service.py @@ -80,6 +80,7 @@ class Service(object): self.manager.init_host() self.model_disconnected = False ctxt = context.get_admin_context() + try: service_ref = db.service_get_by_args(ctxt, self.host, @@ -88,6 +89,9 @@ class Service(object): except exception.NotFound: self._create_service_ref(ctxt) + if 'nova-compute' == self.binary: + self.manager.update_service(ctxt, self.host, self.binary) + conn1 = rpc.Connection.instance(new=True) conn2 = rpc.Connection.instance(new=True) if self.report_interval: diff --git a/nova/tests/glance/__init__.py b/nova/tests/glance/__init__.py new file mode 100644 index 000000000..ef9fa05a7 --- /dev/null +++ b/nova/tests/glance/__init__.py @@ -0,0 +1,20 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +:mod:`glance` -- Stubs for Glance +================================= +""" diff --git a/nova/tests/glance/stubs.py b/nova/tests/glance/stubs.py new file mode 100644 index 000000000..f182b857a --- /dev/null +++ b/nova/tests/glance/stubs.py @@ -0,0 +1,37 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import StringIO + +import glance.client + + +def stubout_glance_client(stubs, cls): + """Stubs out glance.client.Client""" + stubs.Set(glance.client, 'Client', + lambda *args, **kwargs: cls(*args, **kwargs)) + + +class FakeGlance(object): + def __init__(self, host, port=None, use_ssl=False): + pass + + def get_image(self, image): + meta = { + 'size': 0, + } + image_file = StringIO.StringIO('') + return meta, image_file diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py index afdc89ba2..556fe561c 100644 --- a/nova/tests/test_virt.py +++ b/nova/tests/test_virt.py @@ -122,10 +122,10 @@ class LibvirtConnTestCase(test.TestCase): if rescue: check = (lambda t: t.find('./os/kernel').text.split('/')[1], - 'rescue-kernel') + 'kernel.rescue') check_list.append(check) check = (lambda t: t.find('./os/initrd').text.split('/')[1], - 'rescue-ramdisk') + 'ramdisk.rescue') check_list.append(check) else: if expect_kernel: @@ -161,13 +161,16 @@ class LibvirtConnTestCase(test.TestCase): if rescue: common_checks += [ (lambda t: t.findall('./devices/disk/source')[0].get( - 'file').split('/')[1], 'rescue-disk'), + 'file').split('/')[1], 'disk.rescue'), (lambda t: t.findall('./devices/disk/source')[1].get( 'file').split('/')[1], 'disk')] else: common_checks += [(lambda t: t.findall( './devices/disk/source')[0].get('file').split('/')[1], 'disk')] + common_checks += [(lambda t: t.findall( + './devices/disk/source')[1].get('file').split('/')[1], + 'disk.local')] for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems(): FLAGS.libvirt_type = libvirt_type diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index 261ee0fde..9f5b266f3 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -34,6 +34,7 @@ from nova.virt.xenapi import volume_utils from nova.virt.xenapi.vmops import SimpleDH from nova.tests.db import fakes as db_fakes from nova.tests.xenapi import stubs +from nova.tests.glance import stubs as glance_stubs FLAGS = flags.FLAGS @@ -108,18 +109,16 @@ class XenAPIVolumeTestCase(test.TestCase): conn = xenapi_conn.get_connection(False) volume = self._create_volume() instance = db.instance_create(self.values) - xenapi_fake.create_vm(instance.name, 'Running') + vm = xenapi_fake.create_vm(instance.name, 'Running') result = conn.attach_volume(instance.name, volume['id'], '/dev/sdc') def check(): # check that the VM has a VBD attached to it - # Get XenAPI reference for the VM - vms = xenapi_fake.get_all('VM') # Get XenAPI record for VBD vbds = xenapi_fake.get_all('VBD') vbd = xenapi_fake.get_record('VBD', vbds[0]) vm_ref = vbd['VM'] - self.assertEqual(vm_ref, vms[0]) + self.assertEqual(vm_ref, vm) check() @@ -157,9 +156,14 @@ class XenAPIVMTestCase(test.TestCase): FLAGS.xenapi_connection_url = 'test_url' FLAGS.xenapi_connection_password = 'test_pass' xenapi_fake.reset() + xenapi_fake.create_local_srs() db_fakes.stub_out_db_instance_api(self.stubs) xenapi_fake.create_network('fake', FLAGS.flat_network_bridge) stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) + stubs.stubout_get_this_vm_uuid(self.stubs) + stubs.stubout_stream_disk(self.stubs) + glance_stubs.stubout_glance_client(self.stubs, + glance_stubs.FakeGlance) self.conn = xenapi_conn.get_connection(False) def test_list_instances_0(self): @@ -207,40 +211,70 @@ class XenAPIVMTestCase(test.TestCase): check() - def test_spawn(self): - instance = self._create_instance() + def check_vm_record(self, conn): + instances = conn.list_instances() + self.assertEquals(instances, [1]) + + # Get Nova record for VM + vm_info = conn.get_info(1) + + # Get XenAPI record for VM + vms = [rec for ref, rec + in xenapi_fake.get_all_records('VM').iteritems() + if not rec['is_control_domain']] + vm = vms[0] + + # Check that m1.large above turned into the right thing. + instance_type = instance_types.INSTANCE_TYPES['m1.large'] + mem_kib = long(instance_type['memory_mb']) << 10 + mem_bytes = str(mem_kib << 10) + vcpus = instance_type['vcpus'] + self.assertEquals(vm_info['max_mem'], mem_kib) + self.assertEquals(vm_info['mem'], mem_kib) + self.assertEquals(vm['memory_static_max'], mem_bytes) + self.assertEquals(vm['memory_dynamic_max'], mem_bytes) + self.assertEquals(vm['memory_dynamic_min'], mem_bytes) + self.assertEquals(vm['VCPUs_max'], str(vcpus)) + self.assertEquals(vm['VCPUs_at_startup'], str(vcpus)) + + # Check that the VM is running according to Nova + self.assertEquals(vm_info['state'], power_state.RUNNING) + + # Check that the VM is running according to XenAPI. + self.assertEquals(vm['power_state'], 'Running') + + def _test_spawn(self, image_id, kernel_id, ramdisk_id): + stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) + values = {'name': 1, + 'id': 1, + 'project_id': self.project.id, + 'user_id': self.user.id, + 'image_id': image_id, + 'kernel_id': kernel_id, + 'ramdisk_id': ramdisk_id, + 'instance_type': 'm1.large', + 'mac_address': 'aa:bb:cc:dd:ee:ff', + } + conn = xenapi_conn.get_connection(False) + instance = db.instance_create(values) + conn.spawn(instance) + self.check_vm_record(conn) - def check(): - instances = self.conn.list_instances() - self.assertEquals(instances, [1]) - - # Get Nova record for VM - vm_info = self.conn.get_info(1) - - # Get XenAPI record for VM - vms = xenapi_fake.get_all('VM') - vm = xenapi_fake.get_record('VM', vms[0]) - - # Check that m1.large above turned into the right thing. - instance_type = instance_types.INSTANCE_TYPES['m1.large'] - mem_kib = long(instance_type['memory_mb']) << 10 - mem_bytes = str(mem_kib << 10) - vcpus = instance_type['vcpus'] - self.assertEquals(vm_info['max_mem'], mem_kib) - self.assertEquals(vm_info['mem'], mem_kib) - self.assertEquals(vm['memory_static_max'], mem_bytes) - self.assertEquals(vm['memory_dynamic_max'], mem_bytes) - self.assertEquals(vm['memory_dynamic_min'], mem_bytes) - self.assertEquals(vm['VCPUs_max'], str(vcpus)) - self.assertEquals(vm['VCPUs_at_startup'], str(vcpus)) - - # Check that the VM is running according to Nova - self.assertEquals(vm_info['state'], power_state.RUNNING) - - # Check that the VM is running according to XenAPI. - self.assertEquals(vm['power_state'], 'Running') + def test_spawn_raw_objectstore(self): + FLAGS.xenapi_image_service = 'objectstore' + self._test_spawn(1, None, None) - check() + def test_spawn_objectstore(self): + FLAGS.xenapi_image_service = 'objectstore' + self._test_spawn(1, 2, 3) + + def test_spawn_raw_glance(self): + FLAGS.xenapi_image_service = 'glance' + self._test_spawn(1, None, None) + + def test_spawn_glance(self): + FLAGS.xenapi_image_service = 'glance' + self._test_spawn(1, 2, 3) def tearDown(self): super(XenAPIVMTestCase, self).tearDown() diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 292bd9ba9..624995ada 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -115,6 +115,21 @@ def stub_out_get_target(stubs): stubs.Set(volume_utils, '_get_target', fake_get_target) +def stubout_get_this_vm_uuid(stubs): + def f(): + vms = [rec['uuid'] for ref, rec + in fake.get_all_records('VM').iteritems() + if rec['is_control_domain']] + return vms[0] + stubs.Set(vm_utils, 'get_this_vm_uuid', f) + + +def stubout_stream_disk(stubs): + def f(_1, _2, _3, _4): + pass + stubs.Set(vm_utils, '_stream_disk', f) + + class FakeSessionForVMTests(fake.SessionBase): """ Stubs out a XenAPISession for VM tests """ def __init__(self, uri): @@ -124,7 +139,10 @@ class FakeSessionForVMTests(fake.SessionBase): return self.xenapi.network.get_all_records() def host_call_plugin(self, _1, _2, _3, _4, _5): - return '' + sr_ref = fake.get_all('SR')[0] + vdi_ref = fake.create_vdi('', False, sr_ref, False) + vdi_rec = fake.get_record('VDI', vdi_ref) + return '<string>%s</string>' % vdi_rec['uuid'] def VM_start(self, _1, ref, _2, _3): vm = fake.get_record('VM', ref) @@ -159,10 +177,6 @@ class FakeSessionForVolumeTests(fake.SessionBase): def __init__(self, uri): super(FakeSessionForVolumeTests, self).__init__(uri) - def VBD_plug(self, _1, ref): - rec = fake.get_record('VBD', ref) - rec['currently-attached'] = True - def VDI_introduce(self, _1, uuid, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11): valid_vdi = False diff --git a/nova/virt/cpuinfo.xml.template b/nova/virt/cpuinfo.xml.template new file mode 100644 index 000000000..48842b29d --- /dev/null +++ b/nova/virt/cpuinfo.xml.template @@ -0,0 +1,9 @@ +<cpu> + <arch>$arch</arch> + <model>$model</model> + <vendor>$vendor</vendor> + <topology sockets="$topology.sockets" cores="$topology.cores" threads="$topology.threads"/> +#for $var in $features + <features name="$var" /> +#end for +</cpu> diff --git a/nova/virt/fake.py b/nova/virt/fake.py index a57a8f43b..037cf83f7 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -310,6 +310,86 @@ class FakeConnection(object): 'username': 'fakeuser', 'password': 'fakepassword'} + def refresh_security_group_rules(self, security_group_id): + """This method is called after a change to security groups. + + All security groups and their associated rules live in the datastore, + and calling this method should apply the updated rules to instances + running the specified security group. + + An error should be raised if the operation cannot complete. + + """ + return True + + def refresh_security_group_members(self, security_group_id): + """This method is called when a security group is added to an instance. + + This message is sent to the virtualization drivers on hosts that are + running an instance that belongs to a security group that has a rule + that references the security group identified by `security_group_id`. + It is the responsiblity of this method to make sure any rules + that authorize traffic flow with members of the security group are + updated and any new members can communicate, and any removed members + cannot. + + Scenario: + * we are running on host 'H0' and we have an instance 'i-0'. + * instance 'i-0' is a member of security group 'speaks-b' + * group 'speaks-b' has an ingress rule that authorizes group 'b' + * another host 'H1' runs an instance 'i-1' + * instance 'i-1' is a member of security group 'b' + + When 'i-1' launches or terminates we will recieve the message + to update members of group 'b', at which time we will make + any changes needed to the rules for instance 'i-0' to allow + or deny traffic coming from 'i-1', depending on if it is being + added or removed from the group. + + In this scenario, 'i-1' could just as easily have been running on our + host 'H0' and this method would still have been called. The point was + that this method isn't called on the host where instances of that + group are running (as is the case with + :method:`refresh_security_group_rules`) but is called where references + are made to authorizing those instances. + + An error should be raised if the operation cannot complete. + + """ + return True + + def get_cpu_info(self): + """This method is supported only libvirt. """ + return + + def get_vcpu_number(self): + """This method is supported only libvirt. """ + return -1 + + def get_memory_mb(self): + """This method is supported only libvirt..""" + return -1 + + def get_local_gb(self): + """This method is supported only libvirt..""" + return -1 + + def get_hypervisor_type(self): + """This method is supported only libvirt..""" + return + + def get_hypervisor_version(self): + """This method is supported only libvirt..""" + return -1 + + def compare_cpu(self, xml): + """This method is supported only libvirt..""" + raise NotImplementedError('This method is supported only libvirt.') + + def live_migration(self, context, instance_ref, dest): + """This method is supported only libvirt..""" + raise NotImplementedError('This method is supported only libvirt.') + class FakeInstance(object): diff --git a/nova/virt/libvirt.xml.template b/nova/virt/libvirt.xml.template index de06a1eb0..8139c3620 100644 --- a/nova/virt/libvirt.xml.template +++ b/nova/virt/libvirt.xml.template @@ -18,10 +18,10 @@ #set $disk_prefix = 'vd' #set $disk_bus = 'virtio' <type>hvm</type> - #end if + #end if #if $getVar('rescue', False) - <kernel>${basepath}/rescue-kernel</kernel> - <initrd>${basepath}/rescue-ramdisk</initrd> + <kernel>${basepath}/kernel.rescue</kernel> + <initrd>${basepath}/ramdisk.rescue</initrd> #else #if $getVar('kernel', None) <kernel>${kernel}</kernel> @@ -47,7 +47,7 @@ #if $getVar('rescue', False) <disk type='file'> <driver type='${driver_type}'/> - <source file='${basepath}/rescue-disk'/> + <source file='${basepath}/disk.rescue'/> <target dev='${disk_prefix}a' bus='${disk_bus}'/> </disk> <disk type='file'> @@ -64,7 +64,7 @@ #if $getVar('local', False) <disk type='file'> <driver type='${driver_type}'/> - <source file='${basepath}/local'/> + <source file='${basepath}/disk.local'/> <target dev='${disk_prefix}b' bus='${disk_bus}'/> </disk> #end if diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index fa5dc502e..b189a5b31 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -36,8 +36,11 @@ Supports KVM, QEMU, UML, and XEN. """ +import json import os import shutil +import re +import time import random import subprocess import uuid @@ -80,6 +83,9 @@ flags.DEFINE_string('injected_network_template', flags.DEFINE_string('libvirt_xml_template', utils.abspath('virt/libvirt.xml.template'), 'Libvirt XML Template') +flags.DEFINE_string('cpuinfo_xml_template', + utils.abspath('virt/cpuinfo.xml.template'), + 'CpuInfo XML Template (used only live migration now)') flags.DEFINE_string('libvirt_type', 'kvm', 'Libvirt domain type (valid options are: ' @@ -88,6 +94,16 @@ flags.DEFINE_string('libvirt_uri', '', 'Override the default libvirt URI (which is dependent' ' on libvirt_type)') +flags.DEFINE_string('live_migration_uri', + "qemu+tcp://%s/system", + 'Define protocol used by live_migration feature') +flags.DEFINE_string('live_migration_flag', + "VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER", + 'Define live migration behavior.') +flags.DEFINE_integer('live_migration_bandwidth', 0, + 'Define live migration behavior') +flags.DEFINE_string('live_migration_timeout_sec', 10, + 'Timeout second for pre_live_migration is completed.') flags.DEFINE_bool('allow_project_net_traffic', True, 'Whether to allow in project network traffic') @@ -146,6 +162,7 @@ class LibvirtConnection(object): self.libvirt_uri = self.get_uri() self.libvirt_xml = open(FLAGS.libvirt_xml_template).read() + self.cpuinfo_xml = open(FLAGS.cpuinfo_xml_template).read() self._wrapped_conn = None self.read_only = read_only @@ -350,7 +367,7 @@ class LibvirtConnection(object): rescue_images = {'image_id': FLAGS.rescue_image_id, 'kernel_id': FLAGS.rescue_kernel_id, 'ramdisk_id': FLAGS.rescue_ramdisk_id} - self._create_image(instance, xml, 'rescue-', rescue_images) + self._create_image(instance, xml, '.rescue', rescue_images) self._conn.createXML(xml, 0) timer = utils.LoopingCall(f=None) @@ -532,23 +549,23 @@ class LibvirtConnection(object): utils.execute('truncate %s -s %dG' % (target, local_gb)) # TODO(vish): should we format disk by default? - def _create_image(self, inst, libvirt_xml, prefix='', disk_images=None): + def _create_image(self, inst, libvirt_xml, suffix='', disk_images=None): # syntactic nicety - def basepath(fname='', prefix=prefix): + def basepath(fname='', suffix=suffix): return os.path.join(FLAGS.instances_path, inst['name'], - prefix + fname) + fname + suffix) # ensure directories exist and are writable - utils.execute('mkdir -p %s' % basepath(prefix='')) - utils.execute('chmod 0777 %s' % basepath(prefix='')) + utils.execute('mkdir -p %s' % basepath(suffix='')) + utils.execute('chmod 0777 %s' % basepath(suffix='')) LOG.info(_('instance %s: Creating image'), inst['name']) f = open(basepath('libvirt.xml'), 'w') f.write(libvirt_xml) f.close() - # NOTE(vish): No need add the prefix to console.log + # NOTE(vish): No need add the suffix to console.log os.close(os.open(basepath('console.log', ''), os.O_CREAT | os.O_WRONLY, 0660)) @@ -577,7 +594,7 @@ class LibvirtConnection(object): root_fname = disk_images['image_id'] size = FLAGS.minimum_root_size - if inst['instance_type'] == 'm1.tiny' or prefix == 'rescue-': + if inst['instance_type'] == 'm1.tiny' or suffix == '.rescue': size = None root_fname += "_sm" @@ -593,7 +610,7 @@ class LibvirtConnection(object): if type_data['local_gb']: self._cache_image(fn=self._create_local, - target=basepath('local'), + target=basepath('disk.local'), fname="local_%s" % type_data['local_gb'], cow=FLAGS.use_cow_images, local_gb=type_data['local_gb']) @@ -818,6 +835,74 @@ class LibvirtConnection(object): return interfaces + def get_vcpu_number(self): + """ Get vcpu number of physical computer. """ + return self._conn.getMaxVcpus(None) + + def get_memory_mb(self): + """Get the memory size of physical computer .""" + meminfo = open('/proc/meminfo').read().split() + idx = meminfo.index('MemTotal:') + # transforming kb to mb. + return int(meminfo[idx + 1]) / 1024 + + def get_local_gb(self): + """Get the hdd size of physical computer .""" + hddinfo = os.statvfs(FLAGS.instances_path) + return hddinfo.f_bsize * hddinfo.f_blocks / 1024 / 1024 / 1024 + + def get_hypervisor_type(self): + """ Get hypervisor type """ + return self._conn.getType() + + def get_hypervisor_version(self): + """ Get hypervisor version """ + return self._conn.getVersion() + + def get_cpu_info(self): + """ Get cpuinfo information """ + xmlstr = self._conn.getCapabilities() + xml = libxml2.parseDoc(xmlstr) + nodes = xml.xpathEval('//cpu') + if len(nodes) != 1: + msg = 'Unexpected xml format. tag "cpu" must be 1, but %d.' \ + % len(nodes) + msg += '\n' + xml.serialize() + raise exception.Invalid(_(msg)) + + arch = xml.xpathEval('//cpu/arch')[0].getContent() + model = xml.xpathEval('//cpu/model')[0].getContent() + vendor = xml.xpathEval('//cpu/vendor')[0].getContent() + + topology_node = xml.xpathEval('//cpu/topology')[0].get_properties() + topology = dict() + while topology_node != None: + name = topology_node.get_name() + topology[name] = topology_node.getContent() + topology_node = topology_node.get_next() + + keys = ['cores', 'sockets', 'threads'] + tkeys = topology.keys() + if list(set(tkeys)) != list(set(keys)): + msg = _('Invalid xml: topology(%s) must have %s') + raise exception.Invalid(msg % (str(topology), ', '.join(keys))) + + feature_nodes = xml.xpathEval('//cpu/feature') + features = list() + for nodes in feature_nodes: + feature_name = nodes.get_properties().getContent() + features.append(feature_name) + + template = ("""{"arch":"%s", "model":"%s", "vendor":"%s", """ + """"topology":{"cores":"%s", "threads":"%s", """ + """"sockets":"%s"}, "features":[%s]}""") + c = topology['cores'] + s = topology['sockets'] + t = topology['threads'] + f = ['"%s"' % x for x in features] + cpu_info = template % (arch, model, vendor, c, s, t, ', '.join(f)) + return cpu_info + def block_stats(self, instance_name, disk): """ Note that this function takes an instance name, not an Instance, so @@ -851,6 +936,208 @@ class LibvirtConnection(object): def refresh_provier_fw_rules(self): self.firewall_driver.refresh_provider_fw_rules() + def compare_cpu(self, cpu_info): + """ + Check the host cpu is compatible to a cpu given by xml. + "xml" must be a part of libvirt.openReadonly().getCapabilities(). + return values follows by virCPUCompareResult. + if 0 > return value, do live migration. + + 'http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult' + """ + msg = _('Checking cpu_info: instance was launched this cpu.\n: %s ') + LOG.info(msg % cpu_info) + dic = json.loads(cpu_info) + xml = str(Template(self.cpuinfo_xml, searchList=dic)) + msg = _('to xml...\n: %s ') + LOG.info(msg % xml) + + url = 'http://libvirt.org/html/libvirt-libvirt.html' + url += '#virCPUCompareResult\n' + msg = 'CPU does not have compativility.\n' + msg += 'result:%d \n' + msg += 'Refer to %s' + msg = _(msg) + + # unknown character exists in xml, then libvirt complains + try: + ret = self._conn.compareCPU(xml, 0) + except libvirt.libvirtError, e: + LOG.error(msg % (ret, url)) + raise e + + if ret <= 0: + raise exception.Invalid(msg % (ret, url)) + + return + + def ensure_filtering_rules_for_instance(self, instance_ref): + """ Setting up inevitable filtering rules on compute node, + and waiting for its completion. + To migrate an instance, filtering rules to hypervisors + and firewalls are inevitable on destination host. + ( Waiting only for filterling rules to hypervisor, + since filtering rules to firewall rules can be set faster). + + Concretely, the below method must be called. + - setup_basic_filtering (for nova-basic, etc.) + - prepare_instance_filter(for nova-instance-instance-xxx, etc.) + + to_xml may have to be called since it defines PROJNET, PROJMASK. + but libvirt migrates those value through migrateToURI(), + so , no need to be called. + + Don't use thread for this method since migration should + not be started when setting-up filtering rules operations + are not completed.""" + + # Tf any instances never launch at destination host, + # basic-filtering must be set here. + self.nwfilter.setup_basic_filtering(instance_ref) + # setting up n)ova-instance-instance-xx mainly. + self.firewall_driver.prepare_instance_filter(instance_ref) + + # wait for completion + timeout_count = range(FLAGS.live_migration_timeout_sec * 2) + while len(timeout_count) != 0: + try: + filter_name = 'nova-instance-%s' % instance_ref.name + self._conn.nwfilterLookupByName(filter_name) + break + except libvirt.libvirtError: + timeout_count.pop() + if len(timeout_count) == 0: + ec2_id = instance_ref['hostname'] + msg = _('Timeout migrating for %s(%s)') + raise exception.Error(msg % (ec2_id, instance_ref.name)) + time.sleep(0.5) + + def live_migration(self, context, instance_ref, dest): + """ + Just spawning live_migration operation for + distributing high-load. + """ + greenthread.spawn(self._live_migration, context, instance_ref, dest) + + def _live_migration(self, context, instance_ref, dest): + """ Do live migration.""" + + # Do live migration. + try: + duri = FLAGS.live_migration_uri % dest + + flaglist = FLAGS.live_migration_flag.split(',') + flagvals = [getattr(libvirt, x.strip()) for x in flaglist] + logical_sum = reduce(lambda x, y: x | y, flagvals) + + bandwidth = FLAGS.live_migration_bandwidth + + if self.read_only: + tmpconn = self._connect(self.libvirt_uri, False) + dom = tmpconn.lookupByName(instance_ref.name) + dom.migrateToURI(duri, logical_sum, None, bandwidth) + tmpconn.close() + else: + dom = self._conn.lookupByName(instance_ref.name) + dom.migrateToURI(duri, logical_sum, None, bandwidth) + + except Exception, e: + id = instance_ref['id'] + db.instance_set_state(context, id, power_state.RUNNING, 'running') + for v in instance_ref['volumes']: + db.volume_update(context, + v['id'], + {'status': 'in-use'}) + + raise e + + # Waiting for completion of live_migration. + timer = utils.LoopingCall(f=None) + + def wait_for_live_migration(): + + try: + state = self.get_info(instance_ref.name)['state'] + except exception.NotFound: + timer.stop() + self._post_live_migration(context, instance_ref, dest) + + timer.f = wait_for_live_migration + timer.start(interval=0.5, now=True) + + def _post_live_migration(self, context, instance_ref, dest): + """ + Post operations for live migration. + Mainly, database updating. + """ + LOG.info('post livemigration operation is started..') + # Detaching volumes. + # (not necessary in current version ) + + # Releasing vlan. + # (not necessary in current implementation?) + + # Releasing security group ingress rule. + if FLAGS.firewall_driver == \ + 'nova.virt.libvirt_conn.IptablesFirewallDriver': + try: + self.firewall_driver.unfilter_instance(instance_ref) + except KeyError, e: + pass + + # Database updating. + ec2_id = instance_ref['hostname'] + + instance_id = instance_ref['id'] + fixed_ip = db.instance_get_fixed_address(context, instance_id) + # Not return if fixed_ip is not found, otherwise, + # instance never be accessible.. + if None == fixed_ip: + logging.warn('fixed_ip is not found for %s ' % ec2_id) + db.fixed_ip_update(context, fixed_ip, {'host': dest}) + network_ref = db.fixed_ip_get_network(context, fixed_ip) + db.network_update(context, network_ref['id'], {'host': dest}) + + try: + floating_ip \ + = db.instance_get_floating_address(context, instance_id) + # Not return if floating_ip is not found, otherwise, + # instance never be accessible.. + if None == floating_ip: + logging.error('floating_ip is not found for %s ' % ec2_id) + else: + floating_ip_ref = db.floating_ip_get_by_address(context, + floating_ip) + db.floating_ip_update(context, + floating_ip_ref['address'], + {'host': dest}) + except exception.NotFound: + logging.debug('%s doesnt have floating_ip.. ' % ec2_id) + except: + msg = 'Live migration: Unexpected error:' + msg += '%s cannot inherit floating ip.. ' % ec2_id + logging.error(_(msg)) + + # Restore instance/volume state + db.instance_update(context, + instance_id, + {'state_description': 'running', + 'state': power_state.RUNNING, + 'host': dest}) + + for v in instance_ref['volumes']: + db.volume_update(context, + v['id'], + {'status': 'in-use'}) + + logging.info(_('Live migrating %s to %s finishes successfully') + % (ec2_id, dest)) + msg = _(("""Known error: the below error is nomally occurs.\n""" + """Just check if iinstance is successfully migrated.\n""" + """libvir: QEMU error : Domain not found: no domain """ + """with matching name..""")) + logging.info(msg) + class FirewallDriver(object): def prepare_instance_filter(self, instance): diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py index 96d8f5fc8..4bfaf4b57 100644 --- a/nova/virt/xenapi/fake.py +++ b/nova/virt/xenapi/fake.py @@ -76,6 +76,7 @@ def reset(): for c in _CLASSES: _db_content[c] = {} create_host('fake') + create_vm('fake', 'Running', is_a_template=False, is_control_domain=True) def create_host(name_label): @@ -136,14 +137,21 @@ def create_vdi(name_label, read_only, sr_ref, sharable): def create_vbd(vm_ref, vdi_ref): - vbd_rec = {'VM': vm_ref, 'VDI': vdi_ref} + vbd_rec = { + 'VM': vm_ref, + 'VDI': vdi_ref, + 'currently_attached': False, + } vbd_ref = _create_object('VBD', vbd_rec) after_VBD_create(vbd_ref, vbd_rec) return vbd_ref def after_VBD_create(vbd_ref, vbd_rec): - """Create backref from VM to VBD when VBD is created""" + """Create read-only fields and backref from VM to VBD when VBD is + created.""" + vbd_rec['currently_attached'] = False + vbd_rec['device'] = '' vm_ref = vbd_rec['VM'] vm_rec = _db_content['VM'][vm_ref] vm_rec['VBDs'] = [vbd_ref] @@ -152,9 +160,10 @@ def after_VBD_create(vbd_ref, vbd_rec): vbd_rec['vm_name_label'] = vm_name_label -def create_pbd(config, sr_ref, attached): +def create_pbd(config, host_ref, sr_ref, attached): return _create_object('PBD', { 'device-config': config, + 'host': host_ref, 'SR': sr_ref, 'currently-attached': attached, }) @@ -167,6 +176,33 @@ def create_task(name_label): }) +def create_local_srs(): + """Create an SR that looks like the one created on the local disk by + default by the XenServer installer. Do this one per host.""" + for host_ref in _db_content['host'].keys(): + _create_local_sr(host_ref) + + +def _create_local_sr(host_ref): + sr_ref = _create_object('SR', { + 'name_label': 'Local storage', + 'type': 'lvm', + 'content_type': 'user', + 'shared': False, + 'physical_size': str(1 << 30), + 'physical_utilisation': str(0), + 'virtual_allocation': str(0), + 'other_config': { + 'i18n-original-value-name_label': 'Local storage', + 'i18n-key': 'local-storage', + }, + 'VDIs': [] + }) + pbd_ref = create_pbd('', host_ref, sr_ref, True) + _db_content['SR'][sr_ref]['PBDs'] = [pbd_ref] + return sr_ref + + def _create_object(table, obj): ref = str(uuid.uuid4()) obj['uuid'] = str(uuid.uuid4()) @@ -179,9 +215,10 @@ def _create_sr(table, obj): # Forces fake to support iscsi only if sr_type != 'iscsi': raise Failure(['SR_UNKNOWN_DRIVER', sr_type]) + host_ref = _db_content['host'].keys()[0] sr_ref = _create_object(table, obj[2]) vdi_ref = create_vdi('', False, sr_ref, False) - pbd_ref = create_pbd('', sr_ref, True) + pbd_ref = create_pbd('', host_ref, sr_ref, True) _db_content['SR'][sr_ref]['VDIs'] = [vdi_ref] _db_content['SR'][sr_ref]['PBDs'] = [pbd_ref] _db_content['VDI'][vdi_ref]['SR'] = sr_ref @@ -233,6 +270,20 @@ class SessionBase(object): def __init__(self, uri): self._session = None + def VBD_plug(self, _1, ref): + rec = get_record('VBD', ref) + if rec['currently_attached']: + raise Failure(['DEVICE_ALREADY_ATTACHED', ref]) + rec['currently_attached'] = True + rec['device'] = rec['userdevice'] + + def VBD_unplug(self, _1, ref): + rec = get_record('VBD', ref) + if not rec['currently_attached']: + raise Failure(['DEVICE_ALREADY_DETACHED', ref]) + rec['currently_attached'] = False + rec['device'] = '' + def xenapi_request(self, methodname, params): if methodname.startswith('login'): self._login(methodname, params) @@ -289,6 +340,8 @@ class SessionBase(object): return lambda *params: self._getter(name, params) elif self._is_create(name): return lambda *params: self._create(name, params) + elif self._is_destroy(name): + return lambda *params: self._destroy(name, params) else: return None @@ -299,10 +352,16 @@ class SessionBase(object): bits[1].startswith(getter and 'get_' or 'set_')) def _is_create(self, name): + return self._is_method(name, 'create') + + def _is_destroy(self, name): + return self._is_method(name, 'destroy') + + def _is_method(self, name, meth): bits = name.split('.') return (len(bits) == 2 and bits[0] in _CLASSES and - bits[1] == 'create') + bits[1] == meth) def _getter(self, name, params): self._check_session(params) @@ -370,10 +429,9 @@ class SessionBase(object): _create_sr(cls, params) or _create_object(cls, params[1]) # Call hook to provide any fixups needed (ex. creating backrefs) - try: - globals()["after_%s_create" % cls](ref, params[1]) - except KeyError: - pass + after_hook = 'after_%s_create' % cls + if after_hook in globals(): + globals()[after_hook](ref, params[1]) obj = get_record(cls, ref) @@ -383,6 +441,15 @@ class SessionBase(object): return ref + def _destroy(self, name, params): + self._check_session(params) + self._check_arg_count(params, 2) + table, _ = name.split('.') + ref = params[1] + if ref not in _db_content[table]: + raise Failure(['HANDLE_INVALID', table, ref]) + del _db_content[table][ref] + def _async(self, name, params): task_ref = create_task(name) task = _db_content['task'][task_ref] @@ -420,7 +487,7 @@ class SessionBase(object): try: return result[0] except IndexError: - return None + raise Failure(['UUID_INVALID', v, result, recs, k]) return result diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index eb0393d2a..b80ff4dba 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -19,11 +19,14 @@ Helper methods for operations related to the management of VM records and their attributes like VDIs, VIFs, as well as their lookup functions. """ +import os import pickle +import re import urllib from xml.dom import minidom from eventlet import event +import glance.client from nova import exception from nova import flags from nova import log as logging @@ -47,17 +50,23 @@ XENAPI_POWER_STATE = { 'Crashed': power_state.CRASHED} +SECTOR_SIZE = 512 +MBR_SIZE_SECTORS = 63 +MBR_SIZE_BYTES = MBR_SIZE_SECTORS * SECTOR_SIZE +KERNEL_DIR = '/boot/guest' + + class ImageType: - """ - Enumeration class for distinguishing different image types - 0 - kernel/ramdisk image (goes on dom0's filesystem) - 1 - disk image (local SR, partitioned by objectstore plugin) - 2 - raw disk image (local SR, NOT partitioned by plugin) - """ + """ + Enumeration class for distinguishing different image types + 0 - kernel/ramdisk image (goes on dom0's filesystem) + 1 - disk image (local SR, partitioned by objectstore plugin) + 2 - raw disk image (local SR, NOT partitioned by plugin) + """ - KERNEL_RAMDISK = 0 - DISK = 1 - DISK_RAW = 2 + KERNEL_RAMDISK = 0 + DISK = 1 + DISK_RAW = 2 class VMHelper(HelperBase): @@ -207,6 +216,25 @@ class VMHelper(HelperBase): return vif_ref @classmethod + def create_vdi(cls, session, sr_ref, name_label, virtual_size, read_only): + """Create a VDI record and returns its reference.""" + vdi_ref = session.get_xenapi().VDI.create( + {'name_label': name_label, + 'name_description': '', + 'SR': sr_ref, + 'virtual_size': str(virtual_size), + 'type': 'User', + 'sharable': False, + 'read_only': read_only, + 'xenstore_data': {}, + 'other_config': {}, + 'sm_config': {}, + 'tags': []}) + LOG.debug(_('Created VDI %s (%s, %s, %s) on %s.'), vdi_ref, + name_label, virtual_size, read_only, sr_ref) + return vdi_ref + + @classmethod def create_snapshot(cls, session, instance_id, vm_ref, label): """ Creates Snapshot (Template) VM, Snapshot VBD, Snapshot VDI, Snapshot VHD @@ -256,15 +284,71 @@ class VMHelper(HelperBase): def fetch_image(cls, session, instance_id, image, user, project, type): """ type is interpreted as an ImageType instance + Related flags: + xenapi_image_service = ['glance', 'objectstore'] + glance_address = 'address for glance services' + glance_port = 'port for glance services' """ - url = images.image_url(image) access = AuthManager().get_access_key(user, project) + + if FLAGS.xenapi_image_service == 'glance': + return cls._fetch_image_glance(session, instance_id, image, + access, type) + else: + return cls._fetch_image_objectstore(session, instance_id, image, + access, user.secret, type) + + @classmethod + def _fetch_image_glance(cls, session, instance_id, image, access, type): + sr = find_sr(session) + if sr is None: + raise exception.NotFound('Cannot find SR to write VDI to') + + c = glance.client.Client(FLAGS.glance_host, FLAGS.glance_port) + + meta, image_file = c.get_image(image) + virtual_size = int(meta['size']) + vdi_size = virtual_size + LOG.debug(_("Size for image %s:%d"), image, virtual_size) + if type == ImageType.DISK: + # Make room for MBR. + vdi_size += MBR_SIZE_BYTES + + vdi = cls.create_vdi(session, sr, _('Glance image %s') % image, + vdi_size, False) + + with_vdi_attached_here(session, vdi, False, + lambda dev: + _stream_disk(dev, type, + virtual_size, image_file)) + if (type == ImageType.KERNEL_RAMDISK): + #we need to invoke a plugin for copying VDI's + #content into proper path + LOG.debug(_("Copying VDI %s to /boot/guest on dom0"), vdi) + fn = "copy_kernel_vdi" + args = {} + args['vdi-ref'] = vdi + #let the plugin copy the correct number of bytes + args['image-size'] = str(vdi_size) + task = session.async_call_plugin('glance', fn, args) + filename = session.wait_for_task(instance_id, task) + #remove the VDI as it is not needed anymore + session.get_xenapi().VDI.destroy(vdi) + LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi) + return filename + else: + return session.get_xenapi().VDI.get_uuid(vdi) + + @classmethod + def _fetch_image_objectstore(cls, session, instance_id, image, access, + secret, type): + url = images.image_url(image) LOG.debug(_("Asking xapi to fetch %s as %s"), url, access) fn = (type != ImageType.KERNEL_RAMDISK) and 'get_vdi' or 'get_kernel' args = {} args['src_url'] = url args['username'] = access - args['password'] = user.secret + args['password'] = secret args['add_partition'] = 'false' args['raw'] = 'false' if type != ImageType.KERNEL_RAMDISK: @@ -276,14 +360,21 @@ class VMHelper(HelperBase): return uuid @classmethod - def lookup_image(cls, session, vdi_ref): + def lookup_image(cls, session, instance_id, vdi_ref): + if FLAGS.xenapi_image_service == 'glance': + return cls._lookup_image_glance(session, vdi_ref) + else: + return cls._lookup_image_objectstore(session, instance_id, vdi_ref) + + @classmethod + def _lookup_image_objectstore(cls, session, instance_id, vdi_ref): LOG.debug(_("Looking up vdi %s for PV kernel"), vdi_ref) fn = "is_vdi_pv" args = {} args['vdi-ref'] = vdi_ref - #TODO: Call proper function in plugin task = session.async_call_plugin('objectstore', fn, args) - pv_str = session.wait_for_task(task) + pv_str = session.wait_for_task(instance_id, task) + pv = None if pv_str.lower() == 'true': pv = True elif pv_str.lower() == 'false': @@ -292,6 +383,23 @@ class VMHelper(HelperBase): return pv @classmethod + def _lookup_image_glance(cls, session, vdi_ref): + LOG.debug(_("Looking up vdi %s for PV kernel"), vdi_ref) + + def is_vdi_pv(dev): + LOG.debug(_("Running pygrub against %s"), dev) + output = os.popen('pygrub -qn /dev/%s' % dev) + for line in output.readlines(): + #try to find kernel string + m = re.search('(?<=kernel:)/.*(?:>)', line) + if m and m.group(0).find('xen') != -1: + LOG.debug(_("Found Xen kernel %s") % m.group(0)) + return True + LOG.debug(_("No Xen kernel found. Booting HVM.")) + return False + return with_vdi_attached_here(session, vdi_ref, True, is_vdi_pv) + + @classmethod def lookup(cls, session, i): """Look the instance i up, and returns it if available""" vms = session.get_xenapi().VM.get_by_name_label(i) @@ -464,3 +572,123 @@ def get_vdi_for_vm_safely(session, vm_ref): vdi_ref = vdi_refs[0] vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) return vdi_ref, vdi_rec + + +def find_sr(session): + host = session.get_xenapi_host() + srs = session.get_xenapi().SR.get_all() + for sr in srs: + sr_rec = session.get_xenapi().SR.get_record(sr) + if not ('i18n-key' in sr_rec['other_config'] and + sr_rec['other_config']['i18n-key'] == 'local-storage'): + continue + for pbd in sr_rec['PBDs']: + pbd_rec = session.get_xenapi().PBD.get_record(pbd) + if pbd_rec['host'] == host: + return sr + return None + + +def with_vdi_attached_here(session, vdi, read_only, f): + this_vm_ref = get_this_vm_ref(session) + vbd_rec = {} + vbd_rec['VM'] = this_vm_ref + vbd_rec['VDI'] = vdi + vbd_rec['userdevice'] = 'autodetect' + vbd_rec['bootable'] = False + vbd_rec['mode'] = read_only and 'RO' or 'RW' + vbd_rec['type'] = 'disk' + vbd_rec['unpluggable'] = True + vbd_rec['empty'] = False + vbd_rec['other_config'] = {} + vbd_rec['qos_algorithm_type'] = '' + vbd_rec['qos_algorithm_params'] = {} + vbd_rec['qos_supported_algorithms'] = [] + LOG.debug(_('Creating VBD for VDI %s ... '), vdi) + vbd = session.get_xenapi().VBD.create(vbd_rec) + LOG.debug(_('Creating VBD for VDI %s done.'), vdi) + try: + LOG.debug(_('Plugging VBD %s ... '), vbd) + session.get_xenapi().VBD.plug(vbd) + LOG.debug(_('Plugging VBD %s done.'), vbd) + return f(session.get_xenapi().VBD.get_device(vbd)) + finally: + LOG.debug(_('Destroying VBD for VDI %s ... '), vdi) + vbd_unplug_with_retry(session, vbd) + ignore_failure(session.get_xenapi().VBD.destroy, vbd) + LOG.debug(_('Destroying VBD for VDI %s done.'), vdi) + + +def vbd_unplug_with_retry(session, vbd): + """Call VBD.unplug on the given VBD, with a retry if we get + DEVICE_DETACH_REJECTED. For reasons which I don't understand, we're + seeing the device still in use, even when all processes using the device + should be dead.""" + while True: + try: + session.get_xenapi().VBD.unplug(vbd) + LOG.debug(_('VBD.unplug successful first time.')) + return + except VMHelper.XenAPI.Failure, e: + if (len(e.details) > 0 and + e.details[0] == 'DEVICE_DETACH_REJECTED'): + LOG.debug(_('VBD.unplug rejected: retrying...')) + time.sleep(1) + elif (len(e.details) > 0 and + e.details[0] == 'DEVICE_ALREADY_DETACHED'): + LOG.debug(_('VBD.unplug successful eventually.')) + return + else: + LOG.error(_('Ignoring XenAPI.Failure in VBD.unplug: %s'), + e) + return + + +def ignore_failure(func, *args, **kwargs): + try: + return func(*args, **kwargs) + except VMHelper.XenAPI.Failure, e: + LOG.error(_('Ignoring XenAPI.Failure %s'), e) + return None + + +def get_this_vm_uuid(): + with file('/sys/hypervisor/uuid') as f: + return f.readline().strip() + + +def get_this_vm_ref(session): + return session.get_xenapi().VM.get_by_uuid(get_this_vm_uuid()) + + +def _stream_disk(dev, type, virtual_size, image_file): + offset = 0 + if type == ImageType.DISK: + offset = MBR_SIZE_BYTES + _write_partition(virtual_size, dev) + + with open('/dev/%s' % dev, 'wb') as f: + f.seek(offset) + for chunk in image_file: + f.write(chunk) + + +def _write_partition(virtual_size, dev): + dest = '/dev/%s' % dev + mbr_last = MBR_SIZE_SECTORS - 1 + primary_first = MBR_SIZE_SECTORS + primary_last = MBR_SIZE_SECTORS + (virtual_size / SECTOR_SIZE) - 1 + + LOG.debug(_('Writing partition table %d %d to %s...'), + primary_first, primary_last, dest) + + def execute(cmd, process_input=None, check_exit_code=True): + return utils.execute(cmd=cmd, + process_input=process_input, + check_exit_code=check_exit_code) + + execute('parted --script %s mklabel msdos' % dest) + execute('parted --script %s mkpart primary %ds %ds' % + (dest, primary_first, primary_last)) + + LOG.debug(_('Writing partition table %s done.'), dest) diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 5e414bab4..6c2fd6a68 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -85,7 +85,8 @@ class VMOps(object): #Have a look at the VDI and see if it has a PV kernel pv_kernel = False if not instance.kernel_id: - pv_kernel = VMHelper.lookup_image(self._session, vdi_ref) + pv_kernel = VMHelper.lookup_image(self._session, instance.id, + vdi_ref) kernel = None if instance.kernel_id: kernel = VMHelper.fetch_image(self._session, instance.id, diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index c98310dbc..72ec6bddb 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -89,6 +89,9 @@ flags.DEFINE_float('xenapi_task_poll_interval', 'The interval used for polling of remote tasks ' '(Async.VM.start, etc). Used only if ' 'connection_type=xenapi.') +flags.DEFINE_string('xenapi_image_service', + 'glance', + 'Where to get VM images: glance or objectstore.') flags.DEFINE_float('xenapi_vhd_coalesce_poll_interval', 5.0, 'The interval used for polling of coalescing vhds.' @@ -209,6 +212,36 @@ class XenAPIConnection(object): 'username': FLAGS.xenapi_connection_username, 'password': FLAGS.xenapi_connection_password} + def get_cpu_info(self): + """This method is supported only libvirt. """ + return + + def get_vcpu_number(self): + """This method is supported only libvirt. """ + return -1 + + def get_memory_mb(self): + """This method is supported only libvirt..""" + return -1 + + def get_local_gb(self): + """This method is supported only libvirt..""" + return -1 + + def get_hypervisor_type(self): + """This method is supported only libvirt..""" + return + + def get_hypervisor_version(self): + """This method is supported only libvirt..""" + return -1 + + def compare_cpu(self, xml): + raise NotImplementedError('This method is supported only libvirt.') + + def live_migration(self, context, instance_ref, dest): + raise NotImplementedError('This method is supported only libvirt.') + class XenAPISession(object): """The session to invoke XenAPI SDK calls""" diff --git a/nova/volume/driver.py b/nova/volume/driver.py index 71fe18a40..7a2395dbd 100644 --- a/nova/volume/driver.py +++ b/nova/volume/driver.py @@ -47,7 +47,7 @@ flags.DEFINE_integer('iscsi_num_targets', 'Number of iscsi target ids per host') flags.DEFINE_string('iscsi_target_prefix', 'iqn.2010-10.org.openstack:', 'prefix for iscsi volumes') -flags.DEFINE_string('iscsi_ip_prefix', '127.0', +flags.DEFINE_string('iscsi_ip_prefix', '$my_ip', 'discover volumes on the ip that starts with this prefix') flags.DEFINE_string('rbd_pool', 'rbd', 'the rbd pool in which volumes are stored') @@ -122,7 +122,7 @@ class VolumeDriver(object): """Removes an export for a logical volume.""" raise NotImplementedError() - def discover_volume(self, volume): + def discover_volume(self, _context, volume): """Discover volume on a remote host.""" raise NotImplementedError() @@ -184,15 +184,35 @@ class AOEDriver(VolumeDriver): self._try_execute("sudo vblade-persist destroy %s %s" % (shelf_id, blade_id)) - def discover_volume(self, _volume): + def discover_volume(self, context, volume): """Discover volume on a remote host.""" self._execute("sudo aoe-discover") self._execute("sudo aoe-stat", check_exit_code=False) + shelf_id, blade_id = self.db.volume_get_shelf_and_blade(context, + volume['id']) + return "/dev/etherd/e%s.%s" % (shelf_id, blade_id) def undiscover_volume(self, _volume): """Undiscover volume on a remote host.""" pass + def check_for_export(self, context, volume_id): + """Make sure whether volume is exported.""" + (shelf_id, + blade_id) = self.db.volume_get_shelf_and_blade(context, + volume_id) + (out, _err) = self._execute("sudo vblade-persist ls --no-header") + exists = False + for line in out.split('\n'): + param = line.split(' ') + if len(param) == 6 and param[0] == str(shelf_id) \ + and param[1] == str(blade_id) and param[-1] == "run": + exists = True + break + if not exists: + logging.warning(_("vblade process for e%s.%s isn't running.") + % (shelf_id, blade_id)) + class FakeAOEDriver(AOEDriver): """Logs calls instead of executing.""" @@ -276,7 +296,7 @@ class ISCSIDriver(VolumeDriver): iscsi_portal = location.split(",")[0] return (iscsi_name, iscsi_portal) - def discover_volume(self, volume): + def discover_volume(self, _context, volume): """Discover volume on a remote host.""" iscsi_name, iscsi_portal = self._get_name_and_portal(volume['name'], volume['host']) @@ -285,7 +305,8 @@ class ISCSIDriver(VolumeDriver): self._execute("sudo iscsiadm -m node -T %s -p %s --op update " "-n node.startup -v automatic" % (iscsi_name, iscsi_portal)) - return "/dev/iscsi/%s" % volume['name'] + return "/dev/disk/by-path/ip-%s-iscsi-%s-lun-0" % (iscsi_portal, + iscsi_name) def undiscover_volume(self, volume): """Undiscover volume on a remote host.""" @@ -364,7 +385,7 @@ class RBDDriver(VolumeDriver): """Removes an export for a logical volume""" pass - def discover_volume(self, volume): + def discover_volume(self, _context, volume): """Discover volume on a remote host""" return "rbd:%s/%s" % (FLAGS.rbd_pool, volume['name']) @@ -413,7 +434,7 @@ class SheepdogDriver(VolumeDriver): """Removes an export for a logical volume""" pass - def discover_volume(self, volume): + def discover_volume(self, _context, volume): """Discover volume on a remote host""" return "sheepdog:%s" % volume['name'] diff --git a/nova/volume/manager.py b/nova/volume/manager.py index 6348539c5..1735d79eb 100644 --- a/nova/volume/manager.py +++ b/nova/volume/manager.py @@ -138,7 +138,7 @@ class VolumeManager(manager.Manager): if volume_ref['host'] == self.host and FLAGS.use_local_volumes: path = self.driver.local_path(volume_ref) else: - path = self.driver.discover_volume(volume_ref) + path = self.driver.discover_volume(context, volume_ref) return path def remove_compute_volume(self, context, volume_id): @@ -149,3 +149,10 @@ class VolumeManager(manager.Manager): return True else: self.driver.undiscover_volume(volume_ref) + + def check_for_export(self, context, instance_id): + """Make sure whether volume is exported.""" + if FLAGS.volume_driver == 'nova.volume.driver.AOEDriver': + instance_ref = self.db.instance_get(instance_id) + for v in instance_ref['volumes']: + self.driver.check_for_export(context, v['id']) diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance index cc34a1ec9..aadacce57 100644 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/glance @@ -18,7 +18,7 @@ # under the License. # -# XenAPI plugin for putting images into glance +# XenAPI plugin for managing glance images # import base64 @@ -40,8 +40,36 @@ from pluginlib_nova import * configure_logging('glance') CHUNK_SIZE = 8192 +KERNEL_DIR = '/boot/guest' FILE_SR_PATH = '/var/run/sr-mount' +def copy_kernel_vdi(session,args): + vdi = exists(args, 'vdi-ref') + size = exists(args,'image-size') + #Use the uuid as a filename + vdi_uuid=session.xenapi.VDI.get_uuid(vdi) + copy_args={'vdi_uuid':vdi_uuid,'vdi_size':int(size)} + filename=with_vdi_in_dom0(session, vdi, False, + lambda dev: + _copy_kernel_vdi('/dev/%s' % dev,copy_args)) + return filename + +def _copy_kernel_vdi(dest,copy_args): + vdi_uuid=copy_args['vdi_uuid'] + vdi_size=copy_args['vdi_size'] + logging.debug("copying kernel/ramdisk file from %s to /boot/guest/%s",dest,vdi_uuid) + filename=KERNEL_DIR + '/' + vdi_uuid + #read data from /dev/ and write into a file on /boot/guest + of=open(filename,'wb') + f=open(dest,'rb') + #copy only vdi_size bytes + data=f.read(vdi_size) + of.write(data) + f.close() + of.close() + logging.debug("Done. Filename: %s",filename) + return filename + def put_vdis(session, args): params = pickle.loads(exists(args, 'params')) vdi_uuids = params["vdi_uuids"] @@ -128,4 +156,5 @@ def find_sr(session): if __name__ == '__main__': - XenAPIPlugin.dispatch({'put_vdis': put_vdis}) + XenAPIPlugin.dispatch({'put_vdis': put_vdis, + 'copy_kernel_vdi': copy_kernel_vdi}) @@ -34,6 +34,7 @@ if os.path.isdir('.bzr'): version_file.write(vcsversion) + class local_BuildDoc(BuildDoc): def run(self): for builder in ['html', 'man']: diff --git a/tools/iscsidev.sh b/tools/iscsidev.sh deleted file mode 100755 index 6f5b572df..000000000 --- a/tools/iscsidev.sh +++ /dev/null @@ -1,41 +0,0 @@ -#!/bin/sh - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# NOTE(vish): This script helps udev create common names for discovered iscsi -# volumes under /dev/iscsi. To use it, create /dev/iscsi and add -# a file to /etc/udev/rules.d like so: -# mkdir /dev/iscsi -# echo 'KERNEL=="sd*", BUS=="scsi", PROGRAM="/path/to/iscsidev.sh -# %b",SYMLINK+="iscsi/%c%n"' > /etc/udev/rules.d/55-openiscsi.rules - -BUS=${1} -HOST=${BUS%%:*} - -if [ ! -e /sys/class/iscsi_host ]; then - exit 1 -fi - -file="/sys/class/iscsi_host/host${HOST}/device/session*/iscsi_session*/session*/targetname" - -target_name=$(cat ${file}) - -if [ -z "${target_name}" ]; then - exit 1 -fi - -echo "${target_name##*:}" diff --git a/tools/pip-requires b/tools/pip-requires index 3aa76c24d..a6676a5e9 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -26,3 +26,4 @@ Twisted>=10.1.0 PasteDeploy paste netaddr +glance |
