diff options
| author | masumotok <masumotok@nttdata.co.jp> | 2010-12-31 04:03:37 +0900 |
|---|---|---|
| committer | masumotok <masumotok@nttdata.co.jp> | 2010-12-31 04:03:37 +0900 |
| commit | bf7bc8725fcc26cc5074ea1628bcba6ac6093768 (patch) | |
| tree | dd845322f758c2353f295afa44d4e6455564ffff /nova/virt | |
| parent | 85acbbe916df8b2d18f0dc3a0b8cad9fcfdd6907 (diff) | |
| parent | bd6a3cb1acb68ac2252c1bafc531b3b12f9746d8 (diff) | |
merge recent revision(version of 2010/12/28)
Change:
1. Use greenthread instead of defer at nova.virt.libvirt_conn.live_migration.
2. Move nova.scheduler.manager.live_migration to nova.scheduler.driver
3. Move nova.scheduler.manager.has_enough_resource to nova.scheduler.driver
4. Any check routine in nova-manage.instance.live_migration is moved to
nova.scheduler.driver.schedule_live_migration.
Diffstat (limited to 'nova/virt')
| -rw-r--r-- | nova/virt/connection.py | 6 | ||||
| -rw-r--r-- | nova/virt/fake.py | 32 | ||||
| -rw-r--r-- | nova/virt/images.py | 11 | ||||
| -rw-r--r-- | nova/virt/libvirt.qemu.xml.template | 33 | ||||
| -rw-r--r-- | nova/virt/libvirt.rescue.qemu.xml.template | 37 | ||||
| -rw-r--r-- | nova/virt/libvirt.rescue.uml.xml.template | 26 | ||||
| -rw-r--r-- | nova/virt/libvirt.rescue.xen.xml.template | 34 | ||||
| -rw-r--r-- | nova/virt/libvirt.uml.xml.template | 26 | ||||
| -rw-r--r-- | nova/virt/libvirt.xen.xml.template | 30 | ||||
| -rw-r--r-- | nova/virt/libvirt.xml.template | 79 | ||||
| -rw-r--r-- | nova/virt/libvirt_conn.py | 509 | ||||
| -rw-r--r-- | nova/virt/xenapi.py | 442 | ||||
| -rw-r--r-- | nova/virt/xenapi/__init__.py | 30 | ||||
| -rw-r--r-- | nova/virt/xenapi/fake.py | 388 | ||||
| -rw-r--r-- | nova/virt/xenapi/network_utils.py | 43 | ||||
| -rw-r--r-- | nova/virt/xenapi/vm_utils.py | 271 | ||||
| -rw-r--r-- | nova/virt/xenapi/vmops.py | 196 | ||||
| -rw-r--r-- | nova/virt/xenapi/volume_utils.py | 268 | ||||
| -rw-r--r-- | nova/virt/xenapi/volumeops.py | 126 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 285 |
20 files changed, 1984 insertions, 888 deletions
diff --git a/nova/virt/connection.py b/nova/virt/connection.py index 11f0fa8ce..61e99944e 100644 --- a/nova/virt/connection.py +++ b/nova/virt/connection.py @@ -25,7 +25,7 @@ import sys from nova import flags from nova.virt import fake from nova.virt import libvirt_conn -from nova.virt import xenapi +from nova.virt import xenapi_conn FLAGS = flags.FLAGS @@ -61,11 +61,11 @@ def get_connection(read_only=False): elif t == 'libvirt': conn = libvirt_conn.get_connection(read_only) elif t == 'xenapi': - conn = xenapi.get_connection(read_only) + conn = xenapi_conn.get_connection(read_only) else: raise Exception('Unknown connection type "%s"' % t) if conn is None: - logging.error('Failed to open connection to the hypervisor') + logging.error(_('Failed to open connection to the hypervisor')) sys.exit(1) return conn diff --git a/nova/virt/fake.py b/nova/virt/fake.py index f855523d3..238acf798 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -25,8 +25,6 @@ semantics of real hypervisor connections. """ -from twisted.internet import defer - from nova import exception from nova.compute import power_state @@ -78,6 +76,12 @@ class FakeConnection(object): cls._instance = cls() return cls._instance + def init_host(self): + """ + Initialize anything that is necessary for the driver to function + """ + return + def list_instances(self): """ Return the names of all the instances known to the virtualization @@ -107,7 +111,6 @@ class FakeConnection(object): fake_instance = FakeInstance() self.instances[instance.name] = fake_instance fake_instance._state = power_state.RUNNING - return defer.succeed(None) def reboot(self, instance): """ @@ -119,19 +122,31 @@ class FakeConnection(object): The work will be done asynchronously. This function returns a Deferred that allows the caller to detect when it is complete. """ - return defer.succeed(None) + pass def rescue(self, instance): """ Rescue the specified instance. """ - return defer.succeed(None) + pass def unrescue(self, instance): """ Unrescue the specified instance. """ - return defer.succeed(None) + pass + + def pause(self, instance, callback): + """ + Pause the specified instance. + """ + pass + + def unpause(self, instance, callback): + """ + Unpause the specified instance. + """ + pass def destroy(self, instance): """ @@ -144,7 +159,6 @@ class FakeConnection(object): Deferred that allows the caller to detect when it is complete. """ del self.instances[instance.name] - return defer.succeed(None) def attach_volume(self, instance_name, device_path, mountpoint): """Attach the disk at device_path to the instance at mountpoint""" @@ -167,7 +181,8 @@ class FakeConnection(object): knowledge of the instance """ if instance_name not in self.instances: - raise exception.NotFound("Instance %s Not Found" % instance_name) + raise exception.NotFound(_("Instance %s Not Found") + % instance_name) i = self.instances[instance_name] return {'state': i._state, 'max_mem': 0, @@ -247,5 +262,6 @@ class FakeConnection(object): class FakeInstance(object): + def __init__(self): self._state = power_state.NOSTATE diff --git a/nova/virt/images.py b/nova/virt/images.py index 981aa5cf3..1c9b2e093 100644 --- a/nova/virt/images.py +++ b/nova/virt/images.py @@ -26,7 +26,7 @@ import time import urlparse from nova import flags -from nova import process +from nova import utils from nova.auth import manager from nova.auth import signer from nova.objectstore import image @@ -50,7 +50,7 @@ def _fetch_s3_image(image, path, user, project): # This should probably move somewhere else, like e.g. a download_as # method on User objects and at the same time get rewritten to use - # twisted web client. + # a web client. headers = {} headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) @@ -63,15 +63,16 @@ def _fetch_s3_image(image, path, user, project): cmd = ['/usr/bin/curl', '--fail', '--silent', url] for (k, v) in headers.iteritems(): - cmd += ['-H', '%s: %s' % (k, v)] + cmd += ['-H', '"%s: %s"' % (k, v)] cmd += ['-o', path] - return process.SharedPool().execute(executable=cmd[0], args=cmd[1:]) + cmd_out = ' '.join(cmd) + return utils.execute(cmd_out) def _fetch_local_image(image, path, user, project): source = _image_path('%s/image' % image) - return process.simple_execute('cp %s %s' % (source, path)) + return utils.execute('cp %s %s' % (source, path)) def _image_path(path): diff --git a/nova/virt/libvirt.qemu.xml.template b/nova/virt/libvirt.qemu.xml.template deleted file mode 100644 index 2538b1ade..000000000 --- a/nova/virt/libvirt.qemu.xml.template +++ /dev/null @@ -1,33 +0,0 @@ -<domain type='%(type)s'> - <name>%(name)s</name> - <os> - <type>hvm</type> - <kernel>%(basepath)s/kernel</kernel> - <initrd>%(basepath)s/ramdisk</initrd> - <cmdline>root=/dev/vda1 console=ttyS0</cmdline> - </os> - <features> - <acpi/> - </features> - <memory>%(memory_kb)s</memory> - <vcpu>%(vcpus)s</vcpu> - <devices> - <disk type='file'> - <source file='%(basepath)s/disk'/> - <target dev='vda' bus='virtio'/> - </disk> - <interface type='bridge'> - <source bridge='%(bridge_name)s'/> - <mac address='%(mac_address)s'/> - <!-- <model type='virtio'/> CANT RUN virtio network right now --> - <filterref filter="nova-instance-%(name)s"> - <parameter name="IP" value="%(ip_address)s" /> - <parameter name="DHCPSERVER" value="%(dhcp_server)s" /> - </filterref> - </interface> - <serial type="file"> - <source path='%(basepath)s/console.log'/> - <target port='1'/> - </serial> - </devices> -</domain> diff --git a/nova/virt/libvirt.rescue.qemu.xml.template b/nova/virt/libvirt.rescue.qemu.xml.template deleted file mode 100644 index c0ffbdcee..000000000 --- a/nova/virt/libvirt.rescue.qemu.xml.template +++ /dev/null @@ -1,37 +0,0 @@ -<domain type='%(type)s'> - <name>%(name)s</name> - <os> - <type>hvm</type> - <kernel>%(basepath)s/rescue-kernel</kernel> - <initrd>%(basepath)s/rescue-ramdisk</initrd> - <cmdline>root=/dev/vda1 console=ttyS0</cmdline> - </os> - <features> - <acpi/> - </features> - <memory>%(memory_kb)s</memory> - <vcpu>%(vcpus)s</vcpu> - <devices> - <disk type='file'> - <source file='%(basepath)s/rescue-disk'/> - <target dev='vda' bus='virtio'/> - </disk> - <disk type='file'> - <source file='%(basepath)s/disk'/> - <target dev='vdb' bus='virtio'/> - </disk> - <interface type='bridge'> - <source bridge='%(bridge_name)s'/> - <mac address='%(mac_address)s'/> - <!-- <model type='virtio'/> CANT RUN virtio network right now --> - <filterref filter="nova-instance-%(name)s"> - <parameter name="IP" value="%(ip_address)s" /> - <parameter name="DHCPSERVER" value="%(dhcp_server)s" /> - </filterref> - </interface> - <serial type="file"> - <source path='%(basepath)s/console.log'/> - <target port='1'/> - </serial> - </devices> -</domain> diff --git a/nova/virt/libvirt.rescue.uml.xml.template b/nova/virt/libvirt.rescue.uml.xml.template deleted file mode 100644 index 836f47532..000000000 --- a/nova/virt/libvirt.rescue.uml.xml.template +++ /dev/null @@ -1,26 +0,0 @@ -<domain type='%(type)s'> - <name>%(name)s</name> - <memory>%(memory_kb)s</memory> - <os> - <type>%(type)s</type> - <kernel>/usr/bin/linux</kernel> - <root>/dev/ubda1</root> - </os> - <devices> - <disk type='file'> - <source file='%(basepath)s/rescue-disk'/> - <target dev='ubd0' bus='uml'/> - </disk> - <disk type='file'> - <source file='%(basepath)s/disk'/> - <target dev='ubd1' bus='uml'/> - </disk> - <interface type='bridge'> - <source bridge='%(bridge_name)s'/> - <mac address='%(mac_address)s'/> - </interface> - <console type="file"> - <source path='%(basepath)s/console.log'/> - </console> - </devices> -</domain> diff --git a/nova/virt/libvirt.rescue.xen.xml.template b/nova/virt/libvirt.rescue.xen.xml.template deleted file mode 100644 index 3b8d27237..000000000 --- a/nova/virt/libvirt.rescue.xen.xml.template +++ /dev/null @@ -1,34 +0,0 @@ -<domain type='%(type)s'> - <name>%(name)s</name> - <os> - <type>linux</type> - <kernel>%(basepath)s/kernel</kernel> - <initrd>%(basepath)s/ramdisk</initrd> - <root>/dev/xvda1</root> - <cmdline>ro</cmdline> - </os> - <features> - <acpi/> - </features> - <memory>%(memory_kb)s</memory> - <vcpu>%(vcpus)s</vcpu> - <devices> - <disk type='file'> - <source file='%(basepath)s/rescue-disk'/> - <target dev='sda' /> - </disk> - <disk type='file'> - <source file='%(basepath)s/disk'/> - <target dev='sdb' /> - </disk> - <interface type='bridge'> - <source bridge='%(bridge_name)s'/> - <mac address='%(mac_address)s'/> - </interface> - <console type="file"> - <source path='%(basepath)s/console.log'/> - <target port='1'/> - </console> - </devices> -</domain> - diff --git a/nova/virt/libvirt.uml.xml.template b/nova/virt/libvirt.uml.xml.template deleted file mode 100644 index bb8b47911..000000000 --- a/nova/virt/libvirt.uml.xml.template +++ /dev/null @@ -1,26 +0,0 @@ -<domain type='%(type)s'> - <name>%(name)s</name> - <memory>%(memory_kb)s</memory> - <os> - <type>%(type)s</type> - <kernel>/usr/bin/linux</kernel> - <root>/dev/ubda1</root> - </os> - <devices> - <disk type='file'> - <source file='%(basepath)s/disk'/> - <target dev='ubd0' bus='uml'/> - </disk> - <interface type='bridge'> - <source bridge='%(bridge_name)s'/> - <mac address='%(mac_address)s'/> - <filterref filter="nova-instance-%(name)s"> - <parameter name="IP" value="%(ip_address)s" /> - <parameter name="DHCPSERVER" value="%(dhcp_server)s" /> - </filterref> - </interface> - <console type="file"> - <source path='%(basepath)s/console.log'/> - </console> - </devices> -</domain> diff --git a/nova/virt/libvirt.xen.xml.template b/nova/virt/libvirt.xen.xml.template deleted file mode 100644 index 9677902c6..000000000 --- a/nova/virt/libvirt.xen.xml.template +++ /dev/null @@ -1,30 +0,0 @@ -<domain type='%(type)s'> - <name>%(name)s</name> - <os> - <type>linux</type> - <kernel>%(basepath)s/kernel</kernel> - <initrd>%(basepath)s/ramdisk</initrd> - <root>/dev/xvda1</root> - <cmdline>ro</cmdline> - </os> - <features> - <acpi/> - </features> - <memory>%(memory_kb)s</memory> - <vcpu>%(vcpus)s</vcpu> - <devices> - <disk type='file'> - <source file='%(basepath)s/disk'/> - <target dev='sda' /> - </disk> - <interface type='bridge'> - <source bridge='%(bridge_name)s'/> - <mac address='%(mac_address)s'/> - </interface> - <console type="file"> - <source path='%(basepath)s/console.log'/> - <target port='1'/> - </console> - </devices> -</domain> - diff --git a/nova/virt/libvirt.xml.template b/nova/virt/libvirt.xml.template new file mode 100644 index 000000000..3fb2243da --- /dev/null +++ b/nova/virt/libvirt.xml.template @@ -0,0 +1,79 @@ +<domain type='${type}'> + <name>${name}</name> + <memory>${memory_kb}</memory> + <os> +#if $type == 'uml' + #set $disk_prefix = 'ubd' + #set $disk_bus = 'uml' + <type>uml</type> + <kernel>/usr/bin/linux</kernel> + <root>/dev/ubda1</root> +#else + #if $type == 'xen' + #set $disk_prefix = 'sd' + #set $disk_bus = 'scsi' + <type>linux</type> + <root>/dev/xvda1</root> + #else + #set $disk_prefix = 'vd' + #set $disk_bus = 'virtio' + <type>hvm</type> + #end if + #if $getVar('rescue', False) + <kernel>${basepath}/rescue-kernel</kernel> + <initrd>${basepath}/rescue-ramdisk</initrd> + #else + #if $getVar('kernel', None) + <kernel>${kernel}</kernel> + #if $type == 'xen' + <cmdline>ro</cmdline> + #else + <cmdline>root=/dev/vda1 console=ttyS0</cmdline> + #end if + #if $getVar('ramdisk', None) + <initrd>${ramdisk}</initrd> + #end if + #else + <boot dev="hd" /> + #end if + #end if +#end if + </os> + <features> + <acpi/> + </features> + <vcpu>${vcpus}</vcpu> + <devices> +#if $getVar('rescue', False) + <disk type='file'> + <source file='${basepath}/rescue-disk'/> + <target dev='${disk_prefix}a' bus='${disk_bus}'/> + </disk> + <disk type='file'> + <source file='${basepath}/disk'/> + <target dev='${disk_prefix}b' bus='${disk_bus}'/> + </disk> +#else + <disk type='file'> + <source file='${basepath}/disk'/> + <target dev='${disk_prefix}a' bus='${disk_bus}'/> + </disk> +#end if + <interface type='bridge'> + <source bridge='${bridge_name}'/> + <mac address='${mac_address}'/> + <!-- <model type='virtio'/> CANT RUN virtio network right now --> + <filterref filter="nova-instance-${name}"> + <parameter name="IP" value="${ip_address}" /> + <parameter name="DHCPSERVER" value="${dhcp_server}" /> +#if $getVar('extra_params', False) + ${extra_params} +#end if + </filterref> + </interface> + <serial type="file"> + <source path='${basepath}/console.log'/> + <target port='1'/> + </serial> + </devices> +</domain> diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index 6450db8bd..8c2d52bb6 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -27,12 +27,7 @@ Supports KVM, QEMU, UML, and XEN. :libvirt_type: Libvirt domain type. Can be kvm, qemu, uml, xen (default: kvm). :libvirt_uri: Override for the default libvirt URI (depends on libvirt_type). -:libvirt_xml_template: Libvirt XML Template (QEmu/KVM). -:libvirt_xen_xml_template: Libvirt XML Template (Xen). -:libvirt_uml_xml_template: Libvirt XML Template (User Mode Linux). -:libvirt_rescue_xml_template: XML template for rescue mode (KVM & QEMU). -:libvirt_rescue_xen_xml_template: XML templage for rescue mode (XEN). -:libvirt_rescue_uml_xml_template: XML template for rescue mode (UML). +:libvirt_xml_template: Libvirt XML Template. :rescue_image_id: Rescue ami image (default: ami-rescue). :rescue_kernel_id: Rescue aki image (default: aki-rescue). :rescue_ramdisk_id: Rescue ari image (default: ari-rescue). @@ -46,16 +41,16 @@ import os import shutil import re +from eventlet import greenthread +from eventlet import event +from eventlet import tpool + import IPy -from twisted.internet import defer -from twisted.internet import task -from twisted.internet import threads from nova import context from nova import db from nova import exception from nova import flags -from nova import process from nova import utils #from nova.api import context from nova.auth import manager @@ -64,36 +59,20 @@ from nova.compute import instance_types from nova.compute import power_state from nova.virt import images +from Cheetah.Template import Template + libvirt = None libxml2 = None FLAGS = flags.FLAGS -flags.DEFINE_string('libvirt_rescue_xml_template', - utils.abspath('virt/libvirt.rescue.qemu.xml.template'), - 'Libvirt RESCUE XML Template for QEmu/KVM') -flags.DEFINE_string('libvirt_rescue_xen_xml_template', - utils.abspath('virt/libvirt.rescue.xen.xml.template'), - 'Libvirt RESCUE XML Template for xen') -flags.DEFINE_string('libvirt_rescue_uml_xml_template', - utils.abspath('virt/libvirt.rescue.uml.xml.template'), - 'Libvirt RESCUE XML Template for user-mode-linux') # TODO(vish): These flags should probably go into a shared location flags.DEFINE_string('rescue_image_id', 'ami-rescue', 'Rescue ami image') flags.DEFINE_string('rescue_kernel_id', 'aki-rescue', 'Rescue aki image') flags.DEFINE_string('rescue_ramdisk_id', 'ari-rescue', 'Rescue ari image') flags.DEFINE_string('libvirt_xml_template', - utils.abspath('virt/libvirt.qemu.xml.template'), - 'Libvirt XML Template for QEmu/KVM') -flags.DEFINE_string('libvirt_xen_xml_template', - utils.abspath('virt/libvirt.xen.xml.template'), - 'Libvirt XML Template for Xen') -flags.DEFINE_string('libvirt_uml_xml_template', - utils.abspath('virt/libvirt.uml.xml.template'), - 'Libvirt XML Template for user-mode-linux') -flags.DEFINE_string('injected_network_template', - utils.abspath('virt/interfaces.template'), - 'Template file for injected network') + utils.abspath('virt/libvirt.xml.template'), + 'Libvirt XML Template') flags.DEFINE_string('libvirt_type', 'kvm', 'Libvirt domain type (valid options are: ' @@ -122,21 +101,27 @@ def get_connection(read_only): return LibvirtConnection(read_only) +def _get_net_and_mask(cidr): + net = IPy.IP(cidr) + return str(net.net()), str(net.netmask()) + + class LibvirtConnection(object): + def __init__(self, read_only): - (self.libvirt_uri, - template_file, - rescue_file) = self.get_uri_and_templates() + self.libvirt_uri = self.get_uri() - self.libvirt_xml = open(template_file).read() - self.rescue_xml = open(rescue_file).read() + self.libvirt_xml = open(FLAGS.libvirt_xml_template).read() self._wrapped_conn = None self.read_only = read_only + def init_host(self): + NWFilterFirewall(self._conn).setup_base_nwfilters() + @property def _conn(self): if not self._wrapped_conn or not self._test_connection(): - logging.debug('Connecting to libvirt: %s' % self.libvirt_uri) + logging.debug(_('Connecting to libvirt: %s') % self.libvirt_uri) self._wrapped_conn = self._connect(self.libvirt_uri, self.read_only) return self._wrapped_conn @@ -148,24 +133,18 @@ class LibvirtConnection(object): except libvirt.libvirtError as e: if e.get_error_code() == libvirt.VIR_ERR_SYSTEM_ERROR and \ e.get_error_domain() == libvirt.VIR_FROM_REMOTE: - logging.debug('Connection to libvirt broke') + logging.debug(_('Connection to libvirt broke')) return False raise - def get_uri_and_templates(self): + def get_uri(self): if FLAGS.libvirt_type == 'uml': uri = FLAGS.libvirt_uri or 'uml:///system' - template_file = FLAGS.libvirt_uml_xml_template - rescue_file = FLAGS.libvirt_rescue_uml_xml_template elif FLAGS.libvirt_type == 'xen': uri = FLAGS.libvirt_uri or 'xen:///' - template_file = FLAGS.libvirt_xen_xml_template - rescue_file = FLAGS.libvirt_rescue_xen_xml_template else: uri = FLAGS.libvirt_uri or 'qemu:///system' - template_file = FLAGS.libvirt_xml_template - rescue_file = FLAGS.libvirt_rescue_xml_template - return uri, template_file, rescue_file + return uri def _connect(self, uri, read_only): auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], @@ -188,14 +167,12 @@ class LibvirtConnection(object): except Exception as _err: pass # If the instance is already terminated, we're still happy - d = defer.Deferred() - if cleanup: - d.addCallback(lambda _: self._cleanup(instance)) - # FIXME: What does this comment mean? - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, + + done = event.Event() + + # We'll save this for when we do shutdown, # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_shutdown(): try: @@ -204,26 +181,35 @@ class LibvirtConnection(object): instance['id'], state) if state == power_state.SHUTDOWN: timer.stop() - d.callback(None) except Exception: db.instance_set_state(context.get_admin_context(), instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d + timer_done = timer.start(interval=0.5, now=True) + + # NOTE(termie): this is strictly superfluous (we could put the + # cleanup code in the timer), but this emulates the + # previous model so I am keeping it around until + # everything has been vetted a bit + def _wait_for_timer(): + timer_done.wait() + if cleanup: + self._cleanup(instance) + done.send() + + greenthread.spawn(_wait_for_timer) + return done def _cleanup(self, instance): target = os.path.join(FLAGS.instances_path, instance['name']) - logging.info('instance %s: deleting instance files %s', + logging.info(_('instance %s: deleting instance files %s'), instance['name'], target) if os.path.exists(target): shutil.rmtree(target) - @defer.inlineCallbacks @exception.wrap_exception def attach_volume(self, instance_name, device_path, mountpoint): virt_dom = self._conn.lookupByName(instance_name) @@ -234,7 +220,6 @@ class LibvirtConnection(object): <target dev='%s' bus='virtio'/> </disk>""" % (device_path, mount_device) virt_dom.attachDevice(xml) - yield def _get_disk_xml(self, xml, device): """Returns the xml for the disk mounted at device""" @@ -256,26 +241,21 @@ class LibvirtConnection(object): if doc != None: doc.freeDoc() - @defer.inlineCallbacks @exception.wrap_exception def detach_volume(self, instance_name, mountpoint): virt_dom = self._conn.lookupByName(instance_name) mount_device = mountpoint.rpartition("/")[2] xml = self._get_disk_xml(virt_dom.XMLDesc(0), mount_device) if not xml: - raise exception.NotFound("No disk at %s" % mount_device) + raise exception.NotFound(_("No disk at %s") % mount_device) virt_dom.detachDevice(xml) - yield - @defer.inlineCallbacks @exception.wrap_exception def reboot(self, instance): - yield self.destroy(instance, False) + self.destroy(instance, False) xml = self.to_xml(instance) - yield self._conn.createXML(xml, 0) - - d = defer.Deferred() - timer = task.LoopingCall(f=None) + self._conn.createXML(xml, 0) + timer = utils.LoopingCall(f=None) def _wait_for_reboot(): try: @@ -283,64 +263,62 @@ class LibvirtConnection(object): db.instance_set_state(context.get_admin_context(), instance['id'], state) if state == power_state.RUNNING: - logging.debug('instance %s: rebooted', instance['name']) + logging.debug(_('instance %s: rebooted'), instance['name']) timer.stop() - d.callback(None) except Exception, exn: - logging.error('_wait_for_reboot failed: %s', exn) + logging.error(_('_wait_for_reboot failed: %s'), exn) db.instance_set_state(context.get_admin_context(), instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d + return timer.start(interval=0.5, now=True) + + @exception.wrap_exception + def pause(self, instance, callback): + raise exception.APIError("pause not supported for libvirt.") + + @exception.wrap_exception + def unpause(self, instance, callback): + raise exception.APIError("unpause not supported for libvirt.") - @defer.inlineCallbacks @exception.wrap_exception def rescue(self, instance): - yield self.destroy(instance, False) + self.destroy(instance, False) xml = self.to_xml(instance, rescue=True) rescue_images = {'image_id': FLAGS.rescue_image_id, 'kernel_id': FLAGS.rescue_kernel_id, 'ramdisk_id': FLAGS.rescue_ramdisk_id} - yield self._create_image(instance, xml, 'rescue-', rescue_images) - yield self._conn.createXML(xml, 0) + self._create_image(instance, xml, 'rescue-', rescue_images) + self._conn.createXML(xml, 0) - d = defer.Deferred() - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_rescue(): try: state = self.get_info(instance['name'])['state'] db.instance_set_state(None, instance['id'], state) if state == power_state.RUNNING: - logging.debug('instance %s: rescued', instance['name']) + logging.debug(_('instance %s: rescued'), instance['name']) timer.stop() - d.callback(None) except Exception, exn: - logging.error('_wait_for_rescue failed: %s', exn) + logging.error(_('_wait_for_rescue failed: %s'), exn) db.instance_set_state(None, instance['id'], power_state.SHUTDOWN) timer.stop() - d.callback(None) timer.f = _wait_for_rescue - timer.start(interval=0.5, now=True) - yield d + return timer.start(interval=0.5, now=True) - @defer.inlineCallbacks @exception.wrap_exception def unrescue(self, instance): # NOTE(vish): Because reboot destroys and recreates an instance using # the normal xml file, we can just call reboot here - yield self.reboot(instance) + self.reboot(instance) - @defer.inlineCallbacks @exception.wrap_exception def spawn(self, instance): xml = self.to_xml(instance) @@ -348,14 +326,12 @@ class LibvirtConnection(object): instance['id'], power_state.NOSTATE, 'launching') - yield NWFilterFirewall(self._conn).\ - setup_nwfilters_for_instance(instance) - yield self._create_image(instance, xml) - yield self._conn.createXML(xml, 0) - logging.debug("instance %s: is running", instance['name']) + NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance) + self._create_image(instance, xml) + self._conn.createXML(xml, 0) + logging.debug(_("instance %s: is running"), instance['name']) - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) + timer = utils.LoopingCall(f=None) def _wait_for_boot(): try: @@ -363,36 +339,33 @@ class LibvirtConnection(object): db.instance_set_state(context.get_admin_context(), instance['id'], state) if state == power_state.RUNNING: - logging.debug('instance %s: booted', instance['name']) + logging.debug(_('instance %s: booted'), instance['name']) timer.stop() - local_d.callback(None) except: - logging.exception('instance %s: failed to boot', + logging.exception(_('instance %s: failed to boot'), instance['name']) db.instance_set_state(context.get_admin_context(), instance['id'], power_state.SHUTDOWN) timer.stop() - local_d.callback(None) + timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - yield local_d + return timer.start(interval=0.5, now=True) def _flush_xen_console(self, virsh_output): logging.info('virsh said: %r' % (virsh_output,)) virsh_output = virsh_output[0].strip() if virsh_output.startswith('/dev/'): - logging.info('cool, it\'s a device') - d = process.simple_execute("sudo dd if=%s iflag=nonblock" % - virsh_output, check_exit_code=False) - d.addCallback(lambda r: r[0]) - return d + logging.info(_('cool, it\'s a device')) + out, err = utils.execute("sudo dd if=%s iflag=nonblock" % + virsh_output, check_exit_code=False) + return out else: return '' def _append_to_file(self, data, fpath): - logging.info('data: %r, fpath: %r' % (data, fpath)) + logging.info(_('data: %r, fpath: %r') % (data, fpath)) fp = open(fpath, 'a+') fp.write(data) return fpath @@ -407,21 +380,20 @@ class LibvirtConnection(object): def get_console_output(self, instance): console_log = os.path.join(FLAGS.instances_path, instance['name'], 'console.log') - d = process.simple_execute('sudo chown %d %s' % (os.getuid(), - console_log)) + + utils.execute('sudo chown %d %s' % (os.getuid(), console_log)) + if FLAGS.libvirt_type == 'xen': - # Xen is spethial - d.addCallback(lambda _: - process.simple_execute("virsh ttyconsole %s" % - instance['name'])) - d.addCallback(self._flush_xen_console) - d.addCallback(self._append_to_file, console_log) + # Xen is special + virsh_output = utils.execute("virsh ttyconsole %s" % + instance['name']) + data = self._flush_xen_console(virsh_output) + fpath = self._append_to_file(data, console_log) else: - d.addCallback(lambda _: defer.succeed(console_log)) - d.addCallback(self._dump_file) - return d + fpath = console_log + + return self._dump_file(fpath) - @defer.inlineCallbacks def _create_image(self, inst, libvirt_xml, prefix='', disk_images=None): # syntactic nicety basepath = lambda fname = '', prefix = prefix: os.path.join( @@ -430,12 +402,12 @@ class LibvirtConnection(object): prefix + fname) # ensure directories exist and are writable - yield process.simple_execute('mkdir -p %s' % basepath(prefix='')) - yield process.simple_execute('chmod 0777 %s' % basepath(prefix='')) + utils.execute('mkdir -p %s' % basepath(prefix='')) + utils.execute('chmod 0777 %s' % basepath(prefix='')) # TODO(termie): these are blocking calls, it would be great # if they weren't. - logging.info('instance %s: Creating image', inst['name']) + logging.info(_('instance %s: Creating image'), inst['name']) f = open(basepath('libvirt.xml'), 'w') f.write(libvirt_xml) f.close() @@ -452,19 +424,29 @@ class LibvirtConnection(object): 'kernel_id': inst['kernel_id'], 'ramdisk_id': inst['ramdisk_id']} if not os.path.exists(basepath('disk')): - yield images.fetch(inst.image_id, basepath('disk-raw'), user, - project) - if not os.path.exists(basepath('kernel')): - yield images.fetch(inst.kernel_id, basepath('kernel'), user, - project) - if not os.path.exists(basepath('ramdisk')): - yield images.fetch(inst.ramdisk_id, basepath('ramdisk'), user, - project) - - execute = lambda cmd, process_input = None, check_exit_code = True: \ - process.simple_execute(cmd=cmd, - process_input=process_input, - check_exit_code=check_exit_code) + images.fetch(inst.image_id, basepath('disk-raw'), user, + project) + + if inst['kernel_id']: + if not os.path.exists(basepath('kernel')): + images.fetch(inst['kernel_id'], basepath('kernel'), + user, project) + if inst['ramdisk_id']: + if not os.path.exists(basepath('ramdisk')): + images.fetch(inst['ramdisk_id'], basepath('ramdisk'), + user, project) + + def execute(cmd, process_input=None, check_exit_code=True): + return utils.execute(cmd=cmd, + process_input=process_input, + check_exit_code=check_exit_code) + + # For now, we assume that if we're not using a kernel, we're using a + # partitioned disk image where the target partition is the first + # partition + target_partition = None + if not inst['kernel_id']: + target_partition = "1" key = str(inst['key_data']) net = None @@ -481,16 +463,24 @@ class LibvirtConnection(object): 'dns': network_ref['dns']} if key or net: if key: - logging.info('instance %s: injecting key into image %s', + logging.info(_('instance %s: injecting key into image %s'), inst['name'], inst.image_id) if net: - logging.info('instance %s: injecting net into image %s', - inst['name'], inst.image_id) - yield disk.inject_data(basepath('disk-raw'), key, net, - execute=execute) - - if os.path.exists(basepath('disk')): - yield process.simple_execute('rm -f %s' % basepath('disk')) + logging.info(_('instance %s: injecting net into image %s'), + inst['name'], inst.image_id) + try: + disk.inject_data(basepath('disk-raw'), key, net, + partition=target_partition, + execute=execute) + except Exception as e: + # This could be a windows image, or a vmdk format disk + logging.warn(_('instance %s: ignoring error injecting data' + ' into image %s (%s)'), + inst['name'], inst.image_id, e) + + if inst['kernel_id']: + if os.path.exists(basepath('disk')): + utils.execute('rm -f %s' % basepath('disk')) local_bytes = (instance_types.INSTANCE_TYPES[inst.instance_type] ['local_gb'] @@ -499,18 +489,23 @@ class LibvirtConnection(object): resize = True if inst['instance_type'] == 'm1.tiny' or prefix == 'rescue-': resize = False - yield disk.partition(basepath('disk-raw'), basepath('disk'), - local_bytes, resize, execute=execute) + + if inst['kernel_id']: + disk.partition(basepath('disk-raw'), basepath('disk'), + local_bytes, resize, execute=execute) + else: + os.rename(basepath('disk-raw'), basepath('disk')) + disk.extend(basepath('disk'), local_bytes, execute=execute) if FLAGS.libvirt_type == 'uml': - yield process.simple_execute('sudo chown root %s' % - basepath('disk')) + utils.execute('sudo chown root %s' % basepath('disk')) def to_xml(self, instance, rescue=False): # TODO(termie): cache? - logging.debug('instance %s: starting toXML method', instance['name']) - network = db.project_get_network(context.get_admin_context(), - instance['project_id']) + logging.debug(_('instance %s: starting toXML method'), + instance['name']) + network = db.network_get_by_instance(context.get_admin_context(), + instance['id']) # FIXME(vish): stick this in db instance_type = instance['instance_type'] instance_type = instance_types.INSTANCE_TYPES[instance_type] @@ -518,6 +513,15 @@ class LibvirtConnection(object): instance['id']) # Assume that the gateway also acts as the dhcp server. dhcp_server = network['gateway'] + + if FLAGS.allow_project_net_traffic: + net, mask = _get_net_and_mask(network['cidr']) + extra_params = ("<parameter name=\"PROJNET\" value=\"%s\" />\n" + "<parameter name=\"PROJMASK\" value=\"%s\" />\n" + ) % (net, mask) + else: + extra_params = "\n" + xml_info = {'type': FLAGS.libvirt_type, 'name': instance['name'], 'basepath': os.path.join(FLAGS.instances_path, @@ -527,20 +531,30 @@ class LibvirtConnection(object): 'bridge_name': network['bridge'], 'mac_address': instance['mac_address'], 'ip_address': ip_address, - 'dhcp_server': dhcp_server} - if rescue: - libvirt_xml = self.rescue_xml % xml_info - else: - libvirt_xml = self.libvirt_xml % xml_info - logging.debug('instance %s: finished toXML method', instance['name']) + 'dhcp_server': dhcp_server, + 'extra_params': extra_params, + 'rescue': rescue} + if not rescue: + if instance['kernel_id']: + xml_info['kernel'] = xml_info['basepath'] + "/kernel" + + if instance['ramdisk_id']: + xml_info['ramdisk'] = xml_info['basepath'] + "/ramdisk" + + xml_info['disk'] = xml_info['basepath'] + "/disk" - return libvirt_xml + xml = str(Template(self.libvirt_xml, searchList=[xml_info])) + logging.debug(_('instance %s: finished toXML method'), + instance['name']) + + return xml def get_info(self, instance_name): try: virt_dom = self._conn.lookupByName(instance_name) except: - raise exception.NotFound("Instance %s not found" % instance_name) + raise exception.NotFound(_("Instance %s not found") + % instance_name) (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() return {'state': state, 'max_mem': max_mem, @@ -644,19 +658,18 @@ class LibvirtConnection(object): """ Get hypervisor version """ return self._conn.getVersion() - def get_cpu_xml(self): + def get_cpu_xml(self): """ Get cpuinfo information """ xmlstr = self._conn.getCapabilities() xml = libxml2.parseDoc(xmlstr) nodes = xml.xpathEval('//cpu') if 1 != len(nodes): msg = 'Unexpected xml format. tag "cpu" must be 1, but %d.' % len(nodes) - msg += '\n'+xml.serialize() - raise exception.Invalid(msg) + msg += '\n' + xml.serialize() + raise exception.Invalid(_(msg)) cpuxmlstr = re.sub("\n|[ ]+", ' ', nodes[0].serialize()) return cpuxmlstr - def block_stats(self, instance_name, disk): """ Note that this function takes an instance name, not an Instance, so @@ -673,17 +686,15 @@ class LibvirtConnection(object): domain = self._conn.lookupByName(instance_name) return domain.interfaceStats(interface) - def refresh_security_group(self, security_group_id): fw = NWFilterFirewall(self._conn) fw.ensure_security_group_filter(security_group_id) - def setup_nwfilters_for_instance(self, instance): + """ See same method of NWFilterFirewall class """ nwfilter = NWFilterFirewall(self._conn) return nwfilter.setup_nwfilters_for_instance(instance) - def nwfilter_for_instance_exists(self, instance_ref): try: filter = 'nova-instance-%s' % instance_ref.name @@ -692,7 +703,6 @@ class LibvirtConnection(object): except libvirt.libvirtError: return False - def compareCPU(self, xml): """ Check the host cpu is compatible to a cpu given by xml. @@ -702,86 +712,95 @@ class LibvirtConnection(object): 'http://libvirt.org/html/libvirt-libvirt.html#virCPUCompareResult' """ - return self._conn.compareCPU(xml,0) + return self._conn.compareCPU(xml, 0) + def live_migration(self, context, instance_ref, dest): + """ + Just spawning live_migration operation for + distributing high-load. + """ + greenthread.spawn(self._live_migration, context, instance_ref, dest) + + def _live_migration(self, context, instance_ref, dest): + """ Do live migration.""" + + # Do live migration. + try: + uri = FLAGS.live_migration_uri % dest + out, err = utils.execute("sudo virsh migrate --live %s %s" + % (instance_ref.name, uri)) + except exception.ProcessExecutionError: + id = instance_ref['id'] + db.instance_set_state(context, id, power_state.RUNNING, 'running') + raise - def live_migration(self, instance_ref, dest): - uri = FLAGS.live_migration_uri % dest - out, err = utils.execute("sudo virsh migrate --live %s %s" - % (instance_ref.name, uri)) + # Waiting for completion of live_migration. + timer = utils.LoopingCall(f=None) - # wait for completion of live_migration - d = defer.Deferred() - d.addCallback(lambda _: self._post_live_migration(instance_ref, dest)) - timer = task.LoopingCall(f=None) + def wait_for_live_migration(): - def _wait_for_live_migration(): try: state = self.get_info(instance_ref.name)['state'] - #except libvirt.libvirtError, e: except exception.NotFound: timer.stop() - d.callback(None) - timer.f = _wait_for_live_migration - timer.start(interval=0.5, now=True) - return d + self._post_live_migration(context, instance_ref, dest) - def _post_live_migration(self, instance_ref, dest): + timer.f = wait_for_live_migration + timer.start(interval=0.5, now=True) - # 1. detaching volumes + def _post_live_migration(self, context, instance_ref, dest): + """ + Post operations for live migration. + Mainly, database updating. + """ + # Detaching volumes. # (not necessary in current version ) - #try : - # ec2_id = instance_ref['ec2_id'] - # volumes = db.volume_get_by_ec2_id(context, ec2_id) - # for volume in volumes : - # self.detach_volume(context, instance_id, volume.id) - #except exception.NotFound: - # logging.debug('%s doesnt mount any volumes.. ' % ec2_id) - - # 2. releasing vlan + + # Releasing vlan. # (not necessary in current implementation?) - # 3. releasing security group ingress rule + # Releasing security group ingress rule. # (not necessary in current implementation?) - # 4. database updating + # Database updating. ec2_id = instance_ref['hostname'] - ctxt = context.get_admin_context() instance_id = instance_ref['id'] - fixed_ip = db.instance_get_fixed_address(ctxt, instance_id) - # not return if fixed_ip is not found, otherwise, + fixed_ip = db.instance_get_fixed_address(context, instance_id) + # Not return if fixed_ip is not found, otherwise, # instance never be accessible.. if None == fixed_ip: - logging.error('fixed_ip is not found for %s ' % ec2_id) - db.fixed_ip_update(ctxt, fixed_ip, {'host': dest}) - network_ref = db.fixed_ip_get_network(ctxt, fixed_ip) - db.network_update(ctxt, network_ref['id'], {'host': dest}) + logging.warn('fixed_ip is not found for %s ' % ec2_id) + db.fixed_ip_update(context, fixed_ip, {'host': dest}) + network_ref = db.fixed_ip_get_network(context, fixed_ip) + db.network_update(context, network_ref['id'], {'host': dest}) try: - floating_ip = db.instance_get_floating_address(ctxt, instance_id) - # not return if floating_ip is not found, otherwise, + floating_ip = db.instance_get_floating_address(context, instance_id) + # Not return if floating_ip is not found, otherwise, # instance never be accessible.. if None == floating_ip: logging.error('floating_ip is not found for %s ' % ec2_id) - floating_ip_ref = db.floating_ip_get_by_address(ctxt, floating_ip) - db.floating_ip_update(ctxt, - floating_ip_ref['address'], - {'host': dest}) + else: + floating_ip_ref = db.floating_ip_get_by_address(context, + floating_ip) + db.floating_ip_update(context, + floating_ip_ref['address'], + {'host': dest}) except exception.NotFound: logging.debug('%s doesnt have floating_ip.. ' % ec2_id) except: msg = 'Live migration: Unexpected error:' msg += '%s cannot inherit floating ip.. ' % ec2_id - logging.error(msg) + logging.error(_(msg)) - db.instance_update(ctxt, + db.instance_update(context, instance_id, {'state_description': 'running', 'state': power_state.RUNNING, 'host': dest}) - logging.info('Live migrating %s to %s finishes successfully' + logging.info(_('Live migrating %s to %s finishes successfully') % (ec2_id, dest)) @@ -858,6 +877,14 @@ class NWFilterFirewall(object): </rule> </filter>''' + nova_vpn_filter = '''<filter name='nova-vpn' chain='root'> + <uuid>2086015e-cf03-11df-8c5d-080027c27973</uuid> + <filterref filter='allow-dhcp-server'/> + <filterref filter='nova-allow-dhcp-server'/> + <filterref filter='nova-base-ipv4'/> + <filterref filter='nova-base-ipv6'/> + </filter>''' + def nova_base_ipv4_filter(self): retval = "<filter name='nova-base-ipv4' chain='ipv4'>" for protocol in ['tcp', 'udp', 'icmp']: @@ -882,27 +909,31 @@ class NWFilterFirewall(object): retval += '</filter>' return retval - def nova_project_filter(self, project, net, mask): - retval = "<filter name='nova-project-%s' chain='ipv4'>" % project + def nova_project_filter(self): + retval = "<filter name='nova-project' chain='ipv4'>" for protocol in ['tcp', 'udp', 'icmp']: retval += """<rule action='accept' direction='in' priority='200'> - <%s srcipaddr='%s' srcipmask='%s' /> - </rule>""" % (protocol, net, mask) + <%s srcipaddr='$PROJNET' srcipmask='$PROJMASK' /> + </rule>""" % protocol retval += '</filter>' return retval def _define_filter(self, xml): if callable(xml): xml = xml() - d = threads.deferToThread(self._conn.nwfilterDefineXML, xml) - return d - @staticmethod - def _get_net_and_mask(cidr): - net = IPy.IP(cidr) - return str(net.net()), str(net.netmask()) + # execute in a native thread and block current greenthread until done + tpool.execute(self._conn.nwfilterDefineXML, xml) + + def setup_base_nwfilters(self): + self._define_filter(self.nova_base_ipv4_filter) + self._define_filter(self.nova_base_ipv6_filter) + self._define_filter(self.nova_dhcp_filter) + self._define_filter(self.nova_base_filter) + self._define_filter(self.nova_vpn_filter) + if FLAGS.allow_project_net_traffic: + self._define_filter(self.nova_project_filter) - @defer.inlineCallbacks def setup_nwfilters_for_instance(self, instance): """ Creates an NWFilter for the given instance. In the process, @@ -910,35 +941,25 @@ class NWFilterFirewall(object): the base filter are all in place. """ - yield self._define_filter(self.nova_base_ipv4_filter) - yield self._define_filter(self.nova_base_ipv6_filter) - yield self._define_filter(self.nova_dhcp_filter) - yield self._define_filter(self.nova_base_filter) + nwfilter_xml = ("<filter name='nova-instance-%s' chain='root'>\n" + ) % instance['name'] - nwfilter_xml = "<filter name='nova-instance-%s' chain='root'>\n" \ - " <filterref filter='nova-base' />\n" % \ - instance['name'] + if instance['image_id'] == FLAGS.vpn_image_id: + nwfilter_xml += " <filterref filter='nova-vpn' />\n" + else: + nwfilter_xml += " <filterref filter='nova-base' />\n" if FLAGS.allow_project_net_traffic: - network_ref = db.project_get_network(context.get_admin_context(), - instance['project_id']) - net, mask = self._get_net_and_mask(network_ref['cidr']) - project_filter = self.nova_project_filter(instance['project_id'], - net, mask) - yield self._define_filter(project_filter) - - nwfilter_xml += " <filterref filter='nova-project-%s' />\n" % \ - instance['project_id'] + nwfilter_xml += " <filterref filter='nova-project' />\n" for security_group in instance.security_groups: - yield self.ensure_security_group_filter(security_group['id']) + self.ensure_security_group_filter(security_group['id']) - nwfilter_xml += " <filterref filter='nova-secgroup-%d' />\n" % \ - security_group['id'] + nwfilter_xml += (" <filterref filter='nova-secgroup-%d' />\n" + ) % security_group['id'] nwfilter_xml += "</filter>" - yield self._define_filter(nwfilter_xml) - return + self._define_filter(nwfilter_xml) def ensure_security_group_filter(self, security_group_id): return self._define_filter( @@ -951,7 +972,7 @@ class NWFilterFirewall(object): for rule in security_group.rules: rule_xml += "<rule action='accept' direction='in' priority='300'>" if rule.cidr: - net, mask = self._get_net_and_mask(rule.cidr) + net, mask = _get_net_and_mask(rule.cidr) rule_xml += "<%s srcipaddr='%s' srcipmask='%s' " % \ (rule.protocol, net, mask) if rule.protocol in ['tcp', 'udp']: diff --git a/nova/virt/xenapi.py b/nova/virt/xenapi.py deleted file mode 100644 index de3d68582..000000000 --- a/nova/virt/xenapi.py +++ /dev/null @@ -1,442 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (c) 2010 Citrix Systems, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -A connection to XenServer or Xen Cloud Platform. - -The concurrency model for this class is as follows: - -All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator -deferredToThread). They are remote calls, and so may hang for the usual -reasons. They should not be allowed to block the reactor thread. - -All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async -(using XenAPI.VM.async_start etc). These return a task, which can then be -polled for completion. Polling is handled using reactor.callLater. - -This combination of techniques means that we don't block the reactor thread at -all, and at the same time we don't hold lots of threads waiting for -long-running operations. - -FIXME: get_info currently doesn't conform to these rules, and will block the -reactor thread if the VM.get_by_name_label or VM.get_record calls block. - -**Related Flags** - -:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform. -:xenapi_connection_username: Username for connection to XenServer/Xen Cloud - Platform (default: root). -:xenapi_connection_password: Password for connection to XenServer/Xen Cloud - Platform. -:xenapi_task_poll_interval: The interval (seconds) used for polling of - remote tasks (Async.VM.start, etc) - (default: 0.5). - -""" - -import logging -import xmlrpclib - -from twisted.internet import defer -from twisted.internet import reactor - -from nova import db -from nova import flags -from nova import utils -from nova.auth.manager import AuthManager -from nova.compute import instance_types -from nova.compute import power_state -from nova.virt import images - -XenAPI = None - - -FLAGS = flags.FLAGS -flags.DEFINE_string('xenapi_connection_url', - None, - 'URL for connection to XenServer/Xen Cloud Platform.' - ' Required if connection_type=xenapi.') -flags.DEFINE_string('xenapi_connection_username', - 'root', - 'Username for connection to XenServer/Xen Cloud Platform.' - ' Used only if connection_type=xenapi.') -flags.DEFINE_string('xenapi_connection_password', - None, - 'Password for connection to XenServer/Xen Cloud Platform.' - ' Used only if connection_type=xenapi.') -flags.DEFINE_float('xenapi_task_poll_interval', - 0.5, - 'The interval used for polling of remote tasks ' - '(Async.VM.start, etc). Used only if ' - 'connection_type=xenapi.') - - -XENAPI_POWER_STATE = { - 'Halted': power_state.SHUTDOWN, - 'Running': power_state.RUNNING, - 'Paused': power_state.PAUSED, - 'Suspended': power_state.SHUTDOWN, # FIXME - 'Crashed': power_state.CRASHED} - - -def get_connection(_): - """Note that XenAPI doesn't have a read-only connection mode, so - the read_only parameter is ignored.""" - # This is loaded late so that there's no need to install this - # library when not using XenAPI. - global XenAPI - if XenAPI is None: - XenAPI = __import__('XenAPI') - url = FLAGS.xenapi_connection_url - username = FLAGS.xenapi_connection_username - password = FLAGS.xenapi_connection_password - if not url or password is None: - raise Exception('Must specify xenapi_connection_url, ' - 'xenapi_connection_username (optionally), and ' - 'xenapi_connection_password to use ' - 'connection_type=xenapi') - return XenAPIConnection(url, username, password) - - -class XenAPIConnection(object): - def __init__(self, url, user, pw): - self._conn = XenAPI.Session(url) - self._conn.login_with_password(user, pw) - - def list_instances(self): - return [self._conn.xenapi.VM.get_name_label(vm) \ - for vm in self._conn.xenapi.VM.get_all()] - - @defer.inlineCallbacks - def spawn(self, instance): - vm = yield self._lookup(instance.name) - if vm is not None: - raise Exception('Attempted to create non-unique name %s' % - instance.name) - - network = db.project_get_network(None, instance.project_id) - network_ref = \ - yield self._find_network_with_bridge(network.bridge) - - user = AuthManager().get_user(instance.user_id) - project = AuthManager().get_project(instance.project_id) - vdi_uuid = yield self._fetch_image( - instance.image_id, user, project, True) - kernel = yield self._fetch_image( - instance.kernel_id, user, project, False) - ramdisk = yield self._fetch_image( - instance.ramdisk_id, user, project, False) - vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid) - - vm_ref = yield self._create_vm(instance, kernel, ramdisk) - yield self._create_vbd(vm_ref, vdi_ref, 0, True) - if network_ref: - yield self._create_vif(vm_ref, network_ref, instance.mac_address) - logging.debug('Starting VM %s...', vm_ref) - yield self._call_xenapi('VM.start', vm_ref, False, False) - logging.info('Spawning VM %s created %s.', instance.name, vm_ref) - - @defer.inlineCallbacks - def _create_vm(self, instance, kernel, ramdisk): - """Create a VM record. Returns a Deferred that gives the new - VM reference.""" - - instance_type = instance_types.INSTANCE_TYPES[instance.instance_type] - mem = str(long(instance_type['memory_mb']) * 1024 * 1024) - vcpus = str(instance_type['vcpus']) - rec = { - 'name_label': instance.name, - 'name_description': '', - 'is_a_template': False, - 'memory_static_min': '0', - 'memory_static_max': mem, - 'memory_dynamic_min': mem, - 'memory_dynamic_max': mem, - 'VCPUs_at_startup': vcpus, - 'VCPUs_max': vcpus, - 'VCPUs_params': {}, - 'actions_after_shutdown': 'destroy', - 'actions_after_reboot': 'restart', - 'actions_after_crash': 'destroy', - 'PV_bootloader': '', - 'PV_kernel': kernel, - 'PV_ramdisk': ramdisk, - 'PV_args': 'root=/dev/xvda1', - 'PV_bootloader_args': '', - 'PV_legacy_args': '', - 'HVM_boot_policy': '', - 'HVM_boot_params': {}, - 'platform': {}, - 'PCI_bus': '', - 'recommendations': '', - 'affinity': '', - 'user_version': '0', - 'other_config': {}, - } - logging.debug('Created VM %s...', instance.name) - vm_ref = yield self._call_xenapi('VM.create', rec) - logging.debug('Created VM %s as %s.', instance.name, vm_ref) - defer.returnValue(vm_ref) - - @defer.inlineCallbacks - def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable): - """Create a VBD record. Returns a Deferred that gives the new - VBD reference.""" - - vbd_rec = {} - vbd_rec['VM'] = vm_ref - vbd_rec['VDI'] = vdi_ref - vbd_rec['userdevice'] = str(userdevice) - vbd_rec['bootable'] = bootable - vbd_rec['mode'] = 'RW' - vbd_rec['type'] = 'disk' - vbd_rec['unpluggable'] = True - vbd_rec['empty'] = False - vbd_rec['other_config'] = {} - vbd_rec['qos_algorithm_type'] = '' - vbd_rec['qos_algorithm_params'] = {} - vbd_rec['qos_supported_algorithms'] = [] - logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) - vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec) - logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, - vdi_ref) - defer.returnValue(vbd_ref) - - @defer.inlineCallbacks - def _create_vif(self, vm_ref, network_ref, mac_address): - """Create a VIF record. Returns a Deferred that gives the new - VIF reference.""" - - vif_rec = {} - vif_rec['device'] = '0' - vif_rec['network'] = network_ref - vif_rec['VM'] = vm_ref - vif_rec['MAC'] = mac_address - vif_rec['MTU'] = '1500' - vif_rec['other_config'] = {} - vif_rec['qos_algorithm_type'] = '' - vif_rec['qos_algorithm_params'] = {} - logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, - network_ref) - vif_ref = yield self._call_xenapi('VIF.create', vif_rec) - logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, - vm_ref, network_ref) - defer.returnValue(vif_ref) - - @defer.inlineCallbacks - def _find_network_with_bridge(self, bridge): - expr = 'field "bridge" = "%s"' % bridge - networks = yield self._call_xenapi('network.get_all_records_where', - expr) - if len(networks) == 1: - defer.returnValue(networks.keys()[0]) - elif len(networks) > 1: - raise Exception('Found non-unique network for bridge %s' % bridge) - else: - raise Exception('Found no network for bridge %s' % bridge) - - @defer.inlineCallbacks - def _fetch_image(self, image, user, project, use_sr): - """use_sr: True to put the image as a VDI in an SR, False to place - it on dom0's filesystem. The former is for VM disks, the latter for - its kernel and ramdisk (if external kernels are being used). - Returns a Deferred that gives the new VDI UUID.""" - - url = images.image_url(image) - access = AuthManager().get_access_key(user, project) - logging.debug("Asking xapi to fetch %s as %s" % (url, access)) - fn = use_sr and 'get_vdi' or 'get_kernel' - args = {} - args['src_url'] = url - args['username'] = access - args['password'] = user.secret - if use_sr: - args['add_partition'] = 'true' - task = yield self._async_call_plugin('objectstore', fn, args) - uuid = yield self._wait_for_task(task) - defer.returnValue(uuid) - - @defer.inlineCallbacks - def reboot(self, instance): - vm = yield self._lookup(instance.name) - if vm is None: - raise Exception('instance not present %s' % instance.name) - task = yield self._call_xenapi('Async.VM.clean_reboot', vm) - yield self._wait_for_task(task) - - @defer.inlineCallbacks - def destroy(self, instance): - vm = yield self._lookup(instance.name) - if vm is None: - # Don't complain, just return. This lets us clean up instances - # that have already disappeared from the underlying platform. - defer.returnValue(None) - # Get the VDIs related to the VM - vdis = yield self._lookup_vm_vdis(vm) - try: - task = yield self._call_xenapi('Async.VM.hard_shutdown', vm) - yield self._wait_for_task(task) - except Exception, exc: - logging.warn(exc) - # Disk clean-up - if vdis: - for vdi in vdis: - try: - task = yield self._call_xenapi('Async.VDI.destroy', vdi) - yield self._wait_for_task(task) - except Exception, exc: - logging.warn(exc) - try: - task = yield self._call_xenapi('Async.VM.destroy', vm) - yield self._wait_for_task(task) - except Exception, exc: - logging.warn(exc) - - def get_info(self, instance_id): - vm = self._lookup_blocking(instance_id) - if vm is None: - raise Exception('instance not present %s' % instance_id) - rec = self._conn.xenapi.VM.get_record(vm) - return {'state': XENAPI_POWER_STATE[rec['power_state']], - 'max_mem': long(rec['memory_static_max']) >> 10, - 'mem': long(rec['memory_dynamic_max']) >> 10, - 'num_cpu': rec['VCPUs_max'], - 'cpu_time': 0} - - def get_console_output(self, instance): - return 'FAKE CONSOLE OUTPUT' - - @utils.deferredToThread - def _lookup(self, i): - return self._lookup_blocking(i) - - def _lookup_blocking(self, i): - vms = self._conn.xenapi.VM.get_by_name_label(i) - n = len(vms) - if n == 0: - return None - elif n > 1: - raise Exception('duplicate name found: %s' % i) - else: - return vms[0] - - @utils.deferredToThread - def _lookup_vm_vdis(self, vm): - return self._lookup_vm_vdis_blocking(vm) - - def _lookup_vm_vdis_blocking(self, vm): - # Firstly we get the VBDs, then the VDIs. - # TODO: do we leave the read-only devices? - vbds = self._conn.xenapi.VM.get_VBDs(vm) - vdis = [] - if vbds: - for vbd in vbds: - try: - vdi = self._conn.xenapi.VBD.get_VDI(vbd) - # Test valid VDI - record = self._conn.xenapi.VDI.get_record(vdi) - except Exception, exc: - logging.warn(exc) - else: - vdis.append(vdi) - if len(vdis) > 0: - return vdis - else: - return None - - def _wait_for_task(self, task): - """Return a Deferred that will give the result of the given task. - The task is polled until it completes.""" - d = defer.Deferred() - reactor.callLater(0, self._poll_task, task, d) - return d - - @utils.deferredToThread - def _poll_task(self, task, deferred): - """Poll the given XenAPI task, and fire the given Deferred if we - get a result.""" - try: - #logging.debug('Polling task %s...', task) - status = self._conn.xenapi.task.get_status(task) - if status == 'pending': - reactor.callLater(FLAGS.xenapi_task_poll_interval, - self._poll_task, task, deferred) - elif status == 'success': - result = self._conn.xenapi.task.get_result(task) - logging.info('Task %s status: success. %s', task, result) - deferred.callback(_parse_xmlrpc_value(result)) - else: - error_info = self._conn.xenapi.task.get_error_info(task) - logging.warn('Task %s status: %s. %s', task, status, - error_info) - deferred.errback(XenAPI.Failure(error_info)) - #logging.debug('Polling task %s done.', task) - except Exception, exc: - logging.warn(exc) - deferred.errback(exc) - - @utils.deferredToThread - def _call_xenapi(self, method, *args): - """Call the specified XenAPI method on a background thread. Returns - a Deferred for the result.""" - f = self._conn.xenapi - for m in method.split('.'): - f = f.__getattr__(m) - return f(*args) - - @utils.deferredToThread - def _async_call_plugin(self, plugin, fn, args): - """Call Async.host.call_plugin on a background thread. Returns a - Deferred with the task reference.""" - return _unwrap_plugin_exceptions( - self._conn.xenapi.Async.host.call_plugin, - self._get_xenapi_host(), plugin, fn, args) - - def _get_xenapi_host(self): - return self._conn.xenapi.session.get_this_host(self._conn.handle) - - -def _unwrap_plugin_exceptions(func, *args, **kwargs): - try: - return func(*args, **kwargs) - except XenAPI.Failure, exc: - logging.debug("Got exception: %s", exc) - if (len(exc.details) == 4 and - exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and - exc.details[2] == 'Failure'): - params = None - try: - params = eval(exc.details[3]) - except: - raise exc - raise XenAPI.Failure(params) - else: - raise - except xmlrpclib.ProtocolError, exc: - logging.debug("Got exception: %s", exc) - raise - - -def _parse_xmlrpc_value(val): - """Parse the given value as if it were an XML-RPC value. This is - sometimes used as the format for the task.result field.""" - if not val: - return val - x = xmlrpclib.loads( - '<?xml version="1.0"?><methodResponse><params><param>' + - val + - '</param></params></methodResponse>') - return x[0][0] diff --git a/nova/virt/xenapi/__init__.py b/nova/virt/xenapi/__init__.py new file mode 100644 index 000000000..c75162f08 --- /dev/null +++ b/nova/virt/xenapi/__init__.py @@ -0,0 +1,30 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +:mod:`xenapi` -- Nova support for XenServer and XCP through XenAPI +================================================================== +""" + + +class HelperBase(object): + """ + The base for helper classes. This adds the XenAPI class attribute + """ + XenAPI = None + + def __init__(self): + return diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py new file mode 100644 index 000000000..7a6c9ee71 --- /dev/null +++ b/nova/virt/xenapi/fake.py @@ -0,0 +1,388 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +#============================================================================ +# +# Parts of this file are based upon xmlrpclib.py, the XML-RPC client +# interface included in the Python distribution. +# +# Copyright (c) 1999-2002 by Secret Labs AB +# Copyright (c) 1999-2002 by Fredrik Lundh +# +# By obtaining, using, and/or copying this software and/or its +# associated documentation, you agree that you have read, understood, +# and will comply with the following terms and conditions: +# +# Permission to use, copy, modify, and distribute this software and +# its associated documentation for any purpose and without fee is +# hereby granted, provided that the above copyright notice appears in +# all copies, and that both that copyright notice and this permission +# notice appear in supporting documentation, and that the name of +# Secret Labs AB or the author not be used in advertising or publicity +# pertaining to distribution of the software without specific, written +# prior permission. +# +# SECRET LABS AB AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD +# TO THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANT- +# ABILITY AND FITNESS. IN NO EVENT SHALL SECRET LABS AB OR THE AUTHOR +# BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# -------------------------------------------------------------------- + + +""" +A fake XenAPI SDK. +""" + + +import datetime +import logging +import uuid + +from nova import exception + + +_CLASSES = ['host', 'network', 'session', 'SR', 'VBD',\ + 'PBD', 'VDI', 'VIF', 'VM', 'task'] + +_db_content = {} + + +def reset(): + for c in _CLASSES: + _db_content[c] = {} + create_host('fake') + + +def create_host(name_label): + return _create_object('host', { + 'name_label': name_label, + }) + + +def create_network(name_label, bridge): + return _create_object('network', { + 'name_label': name_label, + 'bridge': bridge, + }) + + +def create_vm(name_label, status, + is_a_template=False, is_control_domain=False): + return _create_object('VM', { + 'name_label': name_label, + 'power-state': status, + 'is_a_template': is_a_template, + 'is_control_domain': is_control_domain, + }) + + +def create_vdi(name_label, read_only, sr_ref, sharable): + return _create_object('VDI', { + 'name_label': name_label, + 'read_only': read_only, + 'SR': sr_ref, + 'type': '', + 'name_description': '', + 'sharable': sharable, + 'other_config': {}, + 'location': '', + 'xenstore_data': '', + 'sm_config': {}, + 'VBDs': {}, + }) + + +def create_pbd(config, sr_ref, attached): + return _create_object('PBD', { + 'device-config': config, + 'SR': sr_ref, + 'currently-attached': attached, + }) + + +def create_task(name_label): + return _create_object('task', { + 'name_label': name_label, + 'status': 'pending', + }) + + +def _create_object(table, obj): + ref = str(uuid.uuid4()) + obj['uuid'] = str(uuid.uuid4()) + _db_content[table][ref] = obj + return ref + + +def _create_sr(table, obj): + sr_type = obj[6] + # Forces fake to support iscsi only + if sr_type != 'iscsi': + raise Failure(['SR_UNKNOWN_DRIVER', sr_type]) + sr_ref = _create_object(table, obj[2]) + vdi_ref = create_vdi('', False, sr_ref, False) + pbd_ref = create_pbd('', sr_ref, True) + _db_content['SR'][sr_ref]['VDIs'] = [vdi_ref] + _db_content['SR'][sr_ref]['PBDs'] = [pbd_ref] + _db_content['VDI'][vdi_ref]['SR'] = sr_ref + _db_content['PBD'][pbd_ref]['SR'] = sr_ref + return sr_ref + + +def get_all(table): + return _db_content[table].keys() + + +def get_all_records(table): + return _db_content[table] + + +def get_record(table, ref): + if ref in _db_content[table]: + return _db_content[table].get(ref) + else: + raise Failure(['HANDLE_INVALID', table, ref]) + + +def check_for_session_leaks(): + if len(_db_content['session']) > 0: + raise exception.Error('Sessions have leaked: %s' % + _db_content['session']) + + +class Failure(Exception): + def __init__(self, details): + self.details = details + + def __str__(self): + try: + return str(self.details) + except Exception, exc: + return "XenAPI Fake Failure: %s" % str(self.details) + + def _details_map(self): + return dict([(str(i), self.details[i]) + for i in range(len(self.details))]) + + +class SessionBase(object): + """ + Base class for Fake Sessions + """ + + def __init__(self, uri): + self._session = None + + def xenapi_request(self, methodname, params): + if methodname.startswith('login'): + self._login(methodname, params) + return None + elif methodname == 'logout' or methodname == 'session.logout': + self._logout() + return None + else: + full_params = (self._session,) + params + meth = getattr(self, methodname, None) + if meth is None: + logging.warn('Raising NotImplemented') + raise NotImplementedError( + 'xenapi.fake does not have an implementation for %s' % + methodname) + return meth(*full_params) + + def _login(self, method, params): + self._session = str(uuid.uuid4()) + _db_content['session'][self._session] = { + 'uuid': str(uuid.uuid4()), + 'this_host': _db_content['host'].keys()[0], + } + + def _logout(self): + s = self._session + self._session = None + if s not in _db_content['session']: + raise exception.Error( + "Logging out a session that is invalid or already logged " + "out: %s" % s) + del _db_content['session'][s] + + def __getattr__(self, name): + if name == 'handle': + return self._session + elif name == 'xenapi': + return _Dispatcher(self.xenapi_request, None) + elif name.startswith('login') or name.startswith('slave_local'): + return lambda *params: self._login(name, params) + elif name.startswith('Async'): + return lambda *params: self._async(name, params) + elif '.' in name: + impl = getattr(self, name.replace('.', '_')) + if impl is not None: + def callit(*params): + logging.warn('Calling %s %s', name, impl) + self._check_session(params) + return impl(*params) + return callit + if self._is_gettersetter(name, True): + logging.warn('Calling getter %s', name) + return lambda *params: self._getter(name, params) + elif self._is_create(name): + return lambda *params: self._create(name, params) + else: + return None + + def _is_gettersetter(self, name, getter): + bits = name.split('.') + return (len(bits) == 2 and + bits[0] in _CLASSES and + bits[1].startswith(getter and 'get_' or 'set_')) + + def _is_create(self, name): + bits = name.split('.') + return (len(bits) == 2 and + bits[0] in _CLASSES and + bits[1] == 'create') + + def _getter(self, name, params): + self._check_session(params) + (cls, func) = name.split('.') + + if func == 'get_all': + self._check_arg_count(params, 1) + return get_all(cls) + + if func == 'get_all_records': + self._check_arg_count(params, 1) + return get_all_records(cls) + + if func == 'get_record': + self._check_arg_count(params, 2) + return get_record(cls, params[1]) + + if (func == 'get_by_name_label' or + func == 'get_by_uuid'): + self._check_arg_count(params, 2) + return self._get_by_field( + _db_content[cls], func[len('get_by_'):], params[1]) + + if len(params) == 2: + field = func[len('get_'):] + ref = params[1] + + if (ref in _db_content[cls] and + field in _db_content[cls][ref]): + return _db_content[cls][ref][field] + + logging.error('Raising NotImplemented') + raise NotImplementedError( + 'xenapi.fake does not have an implementation for %s or it has ' + 'been called with the wrong number of arguments' % name) + + def _setter(self, name, params): + self._check_session(params) + (cls, func) = name.split('.') + + if len(params) == 3: + field = func[len('set_'):] + ref = params[1] + val = params[2] + + if (ref in _db_content[cls] and + field in _db_content[cls][ref]): + _db_content[cls][ref][field] = val + + logging.warn('Raising NotImplemented') + raise NotImplementedError( + 'xenapi.fake does not have an implementation for %s or it has ' + 'been called with the wrong number of arguments or the database ' + 'is missing that field' % name) + + def _create(self, name, params): + self._check_session(params) + is_sr_create = name == 'SR.create' + # Storage Repositories have a different API + expected = is_sr_create and 10 or 2 + self._check_arg_count(params, expected) + (cls, _) = name.split('.') + ref = is_sr_create and \ + _create_sr(cls, params) or _create_object(cls, params[1]) + obj = get_record(cls, ref) + + # Add RO fields + if cls == 'VM': + obj['power_state'] = 'Halted' + + return ref + + def _async(self, name, params): + task_ref = create_task(name) + task = _db_content['task'][task_ref] + func = name[len('Async.'):] + try: + task['result'] = self.xenapi_request(func, params[1:]) + task['status'] = 'success' + except Failure, exc: + task['error_info'] = exc.details + task['status'] = 'failed' + task['finished'] = datetime.datetime.now() + return task_ref + + def _check_session(self, params): + if (self._session is None or + self._session not in _db_content['session']): + raise Failure(['HANDLE_INVALID', 'session', self._session]) + if len(params) == 0 or params[0] != self._session: + logging.warn('Raising NotImplemented') + raise NotImplementedError('Call to XenAPI without using .xenapi') + + def _check_arg_count(self, params, expected): + actual = len(params) + if actual != expected: + raise Failure(['MESSAGE_PARAMETER_COUNT_MISMATCH', + expected, actual]) + + def _get_by_field(self, recs, k, v): + result = [] + for ref, rec in recs.iteritems(): + if rec.get(k) == v: + result.append(ref) + return result + + +# Based upon _Method from xmlrpclib. +class _Dispatcher: + def __init__(self, send, name): + self.__send = send + self.__name = name + + def __repr__(self): + if self.__name: + return '<xenapi.fake._Dispatcher for %s>' % self.__name + else: + return '<xenapi.fake._Dispatcher>' + + def __getattr__(self, name): + if self.__name is None: + return _Dispatcher(self.__send, name) + else: + return _Dispatcher(self.__send, "%s.%s" % (self.__name, name)) + + def __call__(self, *args): + return self.__send(self.__name, args) diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py new file mode 100644 index 000000000..c0406d8f0 --- /dev/null +++ b/nova/virt/xenapi/network_utils.py @@ -0,0 +1,43 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper methods for operations related to the management of network +records and their attributes like bridges, PIFs, QoS, as well as +their lookup functions. +""" + + +from nova.virt.xenapi import HelperBase + + +class NetworkHelper(HelperBase): + """ + The class that wraps the helper methods together. + """ + + @classmethod + def find_network_with_bridge(cls, session, bridge): + """Return the network on which the bridge is attached, if found.""" + expr = 'field "bridge" = "%s"' % bridge + networks = session.call_xenapi('network.get_all_records_where', expr) + if len(networks) == 1: + return networks.keys()[0] + elif len(networks) > 1: + raise Exception(_('Found non-unique network' + ' for bridge %s') % bridge) + else: + raise Exception(_('Found no network for bridge %s') % bridge) diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py new file mode 100644 index 000000000..89e02c917 --- /dev/null +++ b/nova/virt/xenapi/vm_utils.py @@ -0,0 +1,271 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper methods for operations related to the management of VM records and +their attributes like VDIs, VIFs, as well as their lookup functions. +""" + +import logging +import urllib +from xml.dom import minidom + +from nova import exception +from nova import flags +from nova.auth.manager import AuthManager +from nova.compute import instance_types +from nova.compute import power_state +from nova.virt import images +from nova.virt.xenapi import HelperBase +from nova.virt.xenapi.volume_utils import StorageError + + +FLAGS = flags.FLAGS + +XENAPI_POWER_STATE = { + 'Halted': power_state.SHUTDOWN, + 'Running': power_state.RUNNING, + 'Paused': power_state.PAUSED, + 'Suspended': power_state.SHUTDOWN, # FIXME + 'Crashed': power_state.CRASHED} + + +class VMHelper(HelperBase): + """ + The class that wraps the helper methods together. + """ + + @classmethod + def create_vm(cls, session, instance, kernel, ramdisk): + """Create a VM record. Returns a Deferred that gives the new + VM reference.""" + instance_type = instance_types.INSTANCE_TYPES[instance.instance_type] + mem = str(long(instance_type['memory_mb']) * 1024 * 1024) + vcpus = str(instance_type['vcpus']) + rec = { + 'name_label': instance.name, + 'name_description': '', + 'is_a_template': False, + 'memory_static_min': '0', + 'memory_static_max': mem, + 'memory_dynamic_min': mem, + 'memory_dynamic_max': mem, + 'VCPUs_at_startup': vcpus, + 'VCPUs_max': vcpus, + 'VCPUs_params': {}, + 'actions_after_shutdown': 'destroy', + 'actions_after_reboot': 'restart', + 'actions_after_crash': 'destroy', + 'PV_bootloader': '', + 'PV_kernel': kernel, + 'PV_ramdisk': ramdisk, + 'PV_args': 'root=/dev/xvda1', + 'PV_bootloader_args': '', + 'PV_legacy_args': '', + 'HVM_boot_policy': '', + 'HVM_boot_params': {}, + 'platform': {}, + 'PCI_bus': '', + 'recommendations': '', + 'affinity': '', + 'user_version': '0', + 'other_config': {}, + } + logging.debug(_('Created VM %s...'), instance.name) + vm_ref = session.call_xenapi('VM.create', rec) + logging.debug(_('Created VM %s as %s.'), instance.name, vm_ref) + return vm_ref + + @classmethod + def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable): + """Create a VBD record. Returns a Deferred that gives the new + VBD reference.""" + vbd_rec = {} + vbd_rec['VM'] = vm_ref + vbd_rec['VDI'] = vdi_ref + vbd_rec['userdevice'] = str(userdevice) + vbd_rec['bootable'] = bootable + vbd_rec['mode'] = 'RW' + vbd_rec['type'] = 'disk' + vbd_rec['unpluggable'] = True + vbd_rec['empty'] = False + vbd_rec['other_config'] = {} + vbd_rec['qos_algorithm_type'] = '' + vbd_rec['qos_algorithm_params'] = {} + vbd_rec['qos_supported_algorithms'] = [] + logging.debug(_('Creating VBD for VM %s, VDI %s ... '), + vm_ref, vdi_ref) + vbd_ref = session.call_xenapi('VBD.create', vbd_rec) + logging.debug(_('Created VBD %s for VM %s, VDI %s.'), vbd_ref, vm_ref, + vdi_ref) + return vbd_ref + + @classmethod + def find_vbd_by_number(cls, session, vm_ref, number): + """Get the VBD reference from the device number""" + vbds = session.get_xenapi().VM.get_VBDs(vm_ref) + if vbds: + for vbd in vbds: + try: + vbd_rec = session.get_xenapi().VBD.get_record(vbd) + if vbd_rec['userdevice'] == str(number): + return vbd + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('VBD not found in instance %s') % vm_ref) + + @classmethod + def unplug_vbd(cls, session, vbd_ref): + """Unplug VBD from VM""" + try: + vbd_ref = session.call_xenapi('VBD.unplug', vbd_ref) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + if exc.details[0] != 'DEVICE_ALREADY_DETACHED': + raise StorageError(_('Unable to unplug VBD %s') % vbd_ref) + + @classmethod + def destroy_vbd(cls, session, vbd_ref): + """Destroy VBD from host database""" + try: + task = session.call_xenapi('Async.VBD.destroy', vbd_ref) + #FIXME(armando): find a solution to missing instance_id + #with Josh Kearney + session.wait_for_task(0, task) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to destroy VBD %s') % vbd_ref) + + @classmethod + def create_vif(cls, session, vm_ref, network_ref, mac_address): + """Create a VIF record. Returns a Deferred that gives the new + VIF reference.""" + vif_rec = {} + vif_rec['device'] = '0' + vif_rec['network'] = network_ref + vif_rec['VM'] = vm_ref + vif_rec['MAC'] = mac_address + vif_rec['MTU'] = '1500' + vif_rec['other_config'] = {} + vif_rec['qos_algorithm_type'] = '' + vif_rec['qos_algorithm_params'] = {} + logging.debug(_('Creating VIF for VM %s, network %s.'), vm_ref, + network_ref) + vif_ref = session.call_xenapi('VIF.create', vif_rec) + logging.debug(_('Created VIF %s for VM %s, network %s.'), vif_ref, + vm_ref, network_ref) + return vif_ref + + @classmethod + def fetch_image(cls, session, image, user, project, use_sr): + """use_sr: True to put the image as a VDI in an SR, False to place + it on dom0's filesystem. The former is for VM disks, the latter for + its kernel and ramdisk (if external kernels are being used). + Returns a Deferred that gives the new VDI UUID.""" + + url = images.image_url(image) + access = AuthManager().get_access_key(user, project) + logging.debug(_("Asking xapi to fetch %s as %s"), url, access) + fn = use_sr and 'get_vdi' or 'get_kernel' + args = {} + args['src_url'] = url + args['username'] = access + args['password'] = user.secret + if use_sr: + args['add_partition'] = 'true' + task = session.async_call_plugin('objectstore', fn, args) + #FIXME(armando): find a solution to missing instance_id + #with Josh Kearney + uuid = session.wait_for_task(0, task) + return uuid + + @classmethod + def lookup(cls, session, i): + """Look the instance i up, and returns it if available""" + vms = session.get_xenapi().VM.get_by_name_label(i) + n = len(vms) + if n == 0: + return None + elif n > 1: + raise exception.Duplicate(_('duplicate name found: %s') % i) + else: + return vms[0] + + @classmethod + def lookup_vm_vdis(cls, session, vm): + """Look for the VDIs that are attached to the VM""" + # Firstly we get the VBDs, then the VDIs. + # TODO(Armando): do we leave the read-only devices? + vbds = session.get_xenapi().VM.get_VBDs(vm) + vdis = [] + if vbds: + for vbd in vbds: + try: + vdi = session.get_xenapi().VBD.get_VDI(vbd) + # Test valid VDI + record = session.get_xenapi().VDI.get_record(vdi) + logging.debug(_('VDI %s is still available'), + record['uuid']) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + else: + vdis.append(vdi) + if len(vdis) > 0: + return vdis + else: + return None + + @classmethod + def compile_info(cls, record): + """Fill record with VM status information""" + return {'state': XENAPI_POWER_STATE[record['power_state']], + 'max_mem': long(record['memory_static_max']) >> 10, + 'mem': long(record['memory_dynamic_max']) >> 10, + 'num_cpu': record['VCPUs_max'], + 'cpu_time': 0} + + @classmethod + def compile_diagnostics(cls, session, record): + """Compile VM diagnostics data""" + try: + host = session.get_xenapi_host() + host_ip = session.get_xenapi().host.get_record(host)["address"] + diags = {} + xml = get_rrd(host_ip, record["uuid"]) + if xml: + rrd = minidom.parseString(xml) + for i, node in enumerate(rrd.firstChild.childNodes): + # We don't want all of the extra garbage + if i >= 3 and i <= 11: + ref = node.childNodes + # Name and Value + diags[ref[0].firstChild.data] = ref[6].firstChild.data + return diags + except cls.XenAPI.Failure as e: + return {"Unable to retrieve diagnostics": e} + + +def get_rrd(host, uuid): + """Return the VM RRD XML as a string""" + try: + xml = urllib.urlopen("http://%s:%s@%s/vm_rrd?uuid=%s" % ( + FLAGS.xenapi_connection_username, + FLAGS.xenapi_connection_password, + host, + uuid)) + return xml.read() + except IOError: + return None diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py new file mode 100644 index 000000000..39bb0f1e8 --- /dev/null +++ b/nova/virt/xenapi/vmops.py @@ -0,0 +1,196 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Management class for VM-related functions (spawn, reboot, etc). +""" + +import logging + +from nova import db +from nova import context +from nova import exception +from nova import utils + +from nova.auth.manager import AuthManager +from nova.compute import power_state +from nova.virt.xenapi.network_utils import NetworkHelper +from nova.virt.xenapi.vm_utils import VMHelper + + +class VMOps(object): + """ + Management class for VM-related tasks + """ + + def __init__(self, session): + self.XenAPI = session.get_imported_xenapi() + self._session = session + VMHelper.XenAPI = self.XenAPI + + def list_instances(self): + """List VM instances""" + vms = [] + for vm in self._session.get_xenapi().VM.get_all(): + rec = self._session.get_xenapi().VM.get_record(vm) + if not rec["is_a_template"] and not rec["is_control_domain"]: + vms.append(rec["name_label"]) + return vms + + def spawn(self, instance): + """Create VM instance""" + vm = VMHelper.lookup(self._session, instance.name) + if vm is not None: + raise exception.Duplicate(_('Attempted to create' + ' non-unique name %s') % instance.name) + + bridge = db.network_get_by_instance(context.get_admin_context(), + instance['id'])['bridge'] + network_ref = \ + NetworkHelper.find_network_with_bridge(self._session, bridge) + + user = AuthManager().get_user(instance.user_id) + project = AuthManager().get_project(instance.project_id) + vdi_uuid = VMHelper.fetch_image( + self._session, instance.image_id, user, project, True) + kernel = VMHelper.fetch_image( + self._session, instance.kernel_id, user, project, False) + ramdisk = VMHelper.fetch_image( + self._session, instance.ramdisk_id, user, project, False) + vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid) + vm_ref = VMHelper.create_vm( + self._session, instance, kernel, ramdisk) + VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True) + if network_ref: + VMHelper.create_vif(self._session, vm_ref, + network_ref, instance.mac_address) + logging.debug(_('Starting VM %s...'), vm_ref) + self._session.call_xenapi('VM.start', vm_ref, False, False) + logging.info(_('Spawning VM %s created %s.'), instance.name, + vm_ref) + + # NOTE(armando): Do we really need to do this in virt? + timer = utils.LoopingCall(f=None) + + def _wait_for_boot(): + try: + state = self.get_info(instance['name'])['state'] + db.instance_set_state(context.get_admin_context(), + instance['id'], state) + if state == power_state.RUNNING: + logging.debug(_('Instance %s: booted'), instance['name']) + timer.stop() + except Exception, exc: + logging.warn(exc) + logging.exception(_('instance %s: failed to boot'), + instance['name']) + db.instance_set_state(context.get_admin_context(), + instance['id'], + power_state.SHUTDOWN) + timer.stop() + + timer.f = _wait_for_boot + return timer.start(interval=0.5, now=True) + + def reboot(self, instance): + """Reboot VM instance""" + instance_name = instance.name + vm = VMHelper.lookup(self._session, instance_name) + if vm is None: + raise exception.NotFound(_('instance not' + ' found %s') % instance_name) + task = self._session.call_xenapi('Async.VM.clean_reboot', vm) + self._session.wait_for_task(instance.id, task) + + def destroy(self, instance): + """Destroy VM instance""" + vm = VMHelper.lookup(self._session, instance.name) + if vm is None: + # Don't complain, just return. This lets us clean up instances + # that have already disappeared from the underlying platform. + return + # Get the VDIs related to the VM + vdis = VMHelper.lookup_vm_vdis(self._session, vm) + try: + task = self._session.call_xenapi('Async.VM.hard_shutdown', + vm) + self._session.wait_for_task(instance.id, task) + except XenAPI.Failure, exc: + logging.warn(exc) + # Disk clean-up + if vdis: + for vdi in vdis: + try: + task = self._session.call_xenapi('Async.VDI.destroy', vdi) + self._session.wait_for_task(instance.id, task) + except XenAPI.Failure, exc: + logging.warn(exc) + # VM Destroy + try: + task = self._session.call_xenapi('Async.VM.destroy', vm) + self._session.wait_for_task(instance.id, task) + except XenAPI.Failure, exc: + logging.warn(exc) + + def _wait_with_callback(self, instance_id, task, callback): + ret = None + try: + ret = self._session.wait_for_task(instance_id, task) + except XenAPI.Failure, exc: + logging.warn(exc) + callback(ret) + + def pause(self, instance, callback): + """Pause VM instance""" + instance_name = instance.name + vm = VMHelper.lookup(self._session, instance_name) + if vm is None: + raise exception.NotFound(_('Instance not' + ' found %s') % instance_name) + task = self._session.call_xenapi('Async.VM.pause', vm) + self._wait_with_callback(instance.id, task, callback) + + def unpause(self, instance, callback): + """Unpause VM instance""" + instance_name = instance.name + vm = VMHelper.lookup(self._session, instance_name) + if vm is None: + raise exception.NotFound(_('Instance not' + ' found %s') % instance_name) + task = self._session.call_xenapi('Async.VM.unpause', vm) + self._wait_with_callback(instance.id, task, callback) + + def get_info(self, instance_id): + """Return data about VM instance""" + vm = VMHelper.lookup(self._session, instance_id) + if vm is None: + raise exception.NotFound(_('Instance not' + ' found %s') % instance_id) + rec = self._session.get_xenapi().VM.get_record(vm) + return VMHelper.compile_info(rec) + + def get_diagnostics(self, instance_id): + """Return data about VM diagnostics""" + vm = VMHelper.lookup(self._session, instance_id) + if vm is None: + raise exception.NotFound(_("Instance not found %s") % instance_id) + rec = self._session.get_xenapi().VM.get_record(vm) + return VMHelper.compile_diagnostics(self._session, rec) + + def get_console_output(self, instance): + """Return snapshot of console""" + # TODO: implement this to fix pylint! + return 'FAKE CONSOLE OUTPUT of instance' diff --git a/nova/virt/xenapi/volume_utils.py b/nova/virt/xenapi/volume_utils.py new file mode 100644 index 000000000..a0c0a67d4 --- /dev/null +++ b/nova/virt/xenapi/volume_utils.py @@ -0,0 +1,268 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper methods for operations related to the management of volumes, +and storage repositories +""" + +import re +import string +import logging + +from nova import db +from nova import context +from nova import exception +from nova import flags +from nova import utils +from nova.virt.xenapi import HelperBase + +FLAGS = flags.FLAGS + + +class StorageError(Exception): + """To raise errors related to SR, VDI, PBD, and VBD commands""" + + def __init__(self, message=None): + super(StorageError, self).__init__(message) + + +class VolumeHelper(HelperBase): + """ + The class that wraps the helper methods together. + """ + + @classmethod + def create_iscsi_storage(cls, session, info, label, description): + """ + Create an iSCSI storage repository that will be used to mount + the volume for the specified instance + """ + sr_ref = session.get_xenapi().SR.get_by_name_label(label) + if len(sr_ref) == 0: + logging.debug('Introducing %s...', label) + record = {} + if 'chapuser' in info and 'chappassword' in info: + record = {'target': info['targetHost'], + 'port': info['targetPort'], + 'targetIQN': info['targetIQN'], + 'chapuser': info['chapuser'], + 'chappassword': info['chappassword'] + } + else: + record = {'target': info['targetHost'], + 'port': info['targetPort'], + 'targetIQN': info['targetIQN'] + } + try: + sr_ref = session.get_xenapi().SR.create( + session.get_xenapi_host(), + record, + '0', label, description, 'iscsi', '', False, {}) + logging.debug('Introduced %s as %s.', label, sr_ref) + return sr_ref + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to create Storage Repository')) + else: + return sr_ref[0] + + @classmethod + def find_sr_from_vbd(cls, session, vbd_ref): + """Find the SR reference from the VBD reference""" + try: + vdi_ref = session.get_xenapi().VBD.get_VDI(vbd_ref) + sr_ref = session.get_xenapi().VDI.get_SR(vdi_ref) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to find SR from VBD %s') % vbd_ref) + return sr_ref + + @classmethod + def destroy_iscsi_storage(cls, session, sr_ref): + """Forget the SR whilst preserving the state of the disk""" + logging.debug("Forgetting SR %s ... ", sr_ref) + pbds = [] + try: + pbds = session.get_xenapi().SR.get_PBDs(sr_ref) + except cls.XenAPI.Failure, exc: + logging.warn('Ignoring exception %s when getting PBDs for %s', + exc, sr_ref) + for pbd in pbds: + try: + session.get_xenapi().PBD.unplug(pbd) + except cls.XenAPI.Failure, exc: + logging.warn('Ignoring exception %s when unplugging PBD %s', + exc, pbd) + try: + session.get_xenapi().SR.forget(sr_ref) + logging.debug("Forgetting SR %s done.", sr_ref) + except cls.XenAPI.Failure, exc: + logging.warn('Ignoring exception %s when forgetting SR %s', + exc, sr_ref) + + @classmethod + def introduce_vdi(cls, session, sr_ref): + """Introduce VDI in the host""" + try: + vdis = session.get_xenapi().SR.get_VDIs(sr_ref) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to introduce VDI on SR %s') % sr_ref) + try: + vdi_rec = session.get_xenapi().VDI.get_record(vdis[0]) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to get record' + ' of VDI %s on') % vdis[0]) + else: + try: + return session.get_xenapi().VDI.introduce( + vdi_rec['uuid'], + vdi_rec['name_label'], + vdi_rec['name_description'], + vdi_rec['SR'], + vdi_rec['type'], + vdi_rec['sharable'], + vdi_rec['read_only'], + vdi_rec['other_config'], + vdi_rec['location'], + vdi_rec['xenstore_data'], + vdi_rec['sm_config']) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to introduce VDI for SR %s') + % sr_ref) + + @classmethod + def parse_volume_info(cls, device_path, mountpoint): + """ + Parse device_path and mountpoint as they can be used by XenAPI. + In particular, the mountpoint (e.g. /dev/sdc) must be translated + into a numeric literal. + FIXME(armando): + As for device_path, currently cannot be used as it is, + because it does not contain target information. As for interim + solution, target details are passed either via Flags or obtained + by iscsiadm. Long-term solution is to add a few more fields to the + db in the iscsi_target table with the necessary info and modify + the iscsi driver to set them. + """ + device_number = VolumeHelper.mountpoint_to_number(mountpoint) + volume_id = _get_volume_id(device_path) + (iscsi_name, iscsi_portal) = _get_target(volume_id) + target_host = _get_target_host(iscsi_portal) + target_port = _get_target_port(iscsi_portal) + target_iqn = _get_iqn(iscsi_name, volume_id) + logging.debug('(vol_id,number,host,port,iqn): (%s,%s,%s,%s)', + volume_id, + target_host, + target_port, + target_iqn) + if (device_number < 0) or \ + (volume_id is None) or \ + (target_host is None) or \ + (target_iqn is None): + raise StorageError(_('Unable to obtain target information %s, %s') + % (device_path, mountpoint)) + volume_info = {} + volume_info['deviceNumber'] = device_number + volume_info['volumeId'] = volume_id + volume_info['targetHost'] = target_host + volume_info['targetPort'] = target_port + volume_info['targetIQN'] = target_iqn + return volume_info + + @classmethod + def mountpoint_to_number(cls, mountpoint): + """Translate a mountpoint like /dev/sdc into a numeric""" + if mountpoint.startswith('/dev/'): + mountpoint = mountpoint[5:] + if re.match('^[hs]d[a-p]$', mountpoint): + return (ord(mountpoint[2:3]) - ord('a')) + elif re.match('^vd[a-p]$', mountpoint): + return (ord(mountpoint[2:3]) - ord('a')) + elif re.match('^[0-9]+$', mountpoint): + return string.atoi(mountpoint, 10) + else: + logging.warn('Mountpoint cannot be translated: %s', mountpoint) + return -1 + + +def _get_volume_id(path): + """Retrieve the volume id from device_path""" + # n must contain at least the volume_id + # /vol- is for remote volumes + # -vol- is for local volumes + # see compute/manager->setup_compute_volume + volume_id = path[path.find('/vol-') + 1:] + if volume_id == path: + volume_id = path[path.find('-vol-') + 1:].replace('--', '-') + return volume_id + + +def _get_target_host(iscsi_string): + """Retrieve target host""" + if iscsi_string: + return iscsi_string[0:iscsi_string.find(':')] + elif iscsi_string is None or FLAGS.target_host: + return FLAGS.target_host + + +def _get_target_port(iscsi_string): + """Retrieve target port""" + if iscsi_string: + return iscsi_string[iscsi_string.find(':') + 1:] + elif iscsi_string is None or FLAGS.target_port: + return FLAGS.target_port + + +def _get_iqn(iscsi_string, id): + """Retrieve target IQN""" + if iscsi_string: + return iscsi_string + elif iscsi_string is None or FLAGS.iqn_prefix: + volume_id = _get_volume_id(id) + return '%s:%s' % (FLAGS.iqn_prefix, volume_id) + + +def _get_target(volume_id): + """ + Gets iscsi name and portal from volume name and host. + For this method to work the following are needed: + 1) volume_ref['host'] must resolve to something rather than loopback + 2) ietd must bind only to the address as resolved above + If any of the two conditions are not met, fall back on Flags. + """ + volume_ref = db.volume_get_by_ec2_id(context.get_admin_context(), + volume_id) + result = (None, None) + try: + (r, _e) = utils.execute("sudo iscsiadm -m discovery -t " + "sendtargets -p %s" % + volume_ref['host']) + except exception.ProcessExecutionError, exc: + logging.warn(exc) + else: + targets = r.splitlines() + if len(_e) == 0 and len(targets) == 1: + for target in targets: + if volume_id in target: + (location, _sep, iscsi_name) = target.partition(" ") + break + iscsi_portal = location.split(",")[0] + result = (iscsi_name, iscsi_portal) + return result diff --git a/nova/virt/xenapi/volumeops.py b/nova/virt/xenapi/volumeops.py new file mode 100644 index 000000000..fdeb2506c --- /dev/null +++ b/nova/virt/xenapi/volumeops.py @@ -0,0 +1,126 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Management class for Storage-related functions (attach, detach, etc). +""" +import logging + +from nova import exception +from nova.virt.xenapi.vm_utils import VMHelper +from nova.virt.xenapi.volume_utils import VolumeHelper +from nova.virt.xenapi.volume_utils import StorageError + + +class VolumeOps(object): + """ + Management class for Volume-related tasks + """ + + def __init__(self, session): + self.XenAPI = session.get_imported_xenapi() + self._session = session + # Load XenAPI module in the helper classes respectively + VolumeHelper.XenAPI = self.XenAPI + VMHelper.XenAPI = self.XenAPI + + def attach_volume(self, instance_name, device_path, mountpoint): + """Attach volume storage to VM instance""" + # Before we start, check that the VM exists + vm_ref = VMHelper.lookup(self._session, instance_name) + if vm_ref is None: + raise exception.NotFound(_('Instance %s not found') + % instance_name) + # NOTE: No Resource Pool concept so far + logging.debug(_("Attach_volume: %s, %s, %s"), + instance_name, device_path, mountpoint) + # Create the iSCSI SR, and the PDB through which hosts access SRs. + # But first, retrieve target info, like Host, IQN, LUN and SCSIID + vol_rec = VolumeHelper.parse_volume_info(device_path, mountpoint) + label = 'SR-%s' % vol_rec['volumeId'] + description = 'Disk-for:%s' % instance_name + # Create SR + sr_ref = VolumeHelper.create_iscsi_storage(self._session, + vol_rec, + label, + description) + # Introduce VDI and attach VBD to VM + try: + vdi_ref = VolumeHelper.introduce_vdi(self._session, sr_ref) + except StorageError, exc: + logging.warn(exc) + VolumeHelper.destroy_iscsi_storage(self._session, sr_ref) + raise Exception(_('Unable to create VDI on SR %s for instance %s') + % (sr_ref, + instance_name)) + else: + try: + vbd_ref = VMHelper.create_vbd(self._session, + vm_ref, vdi_ref, + vol_rec['deviceNumber'], + False) + except self.XenAPI.Failure, exc: + logging.warn(exc) + VolumeHelper.destroy_iscsi_storage(self._session, sr_ref) + raise Exception(_('Unable to use SR %s for instance %s') + % (sr_ref, + instance_name)) + else: + try: + task = self._session.call_xenapi('Async.VBD.plug', + vbd_ref) + self._session.wait_for_task(vol_rec['deviceNumber'], task) + except self.XenAPI.Failure, exc: + logging.warn(exc) + VolumeHelper.destroy_iscsi_storage(self._session, + sr_ref) + raise Exception(_('Unable to attach volume to instance %s') + % instance_name) + logging.info(_('Mountpoint %s attached to instance %s'), + mountpoint, instance_name) + + def detach_volume(self, instance_name, mountpoint): + """Detach volume storage to VM instance""" + # Before we start, check that the VM exists + vm_ref = VMHelper.lookup(self._session, instance_name) + if vm_ref is None: + raise exception.NotFound(_('Instance %s not found') + % instance_name) + # Detach VBD from VM + logging.debug(_("Detach_volume: %s, %s"), instance_name, mountpoint) + device_number = VolumeHelper.mountpoint_to_number(mountpoint) + try: + vbd_ref = VMHelper.find_vbd_by_number(self._session, + vm_ref, device_number) + except StorageError, exc: + logging.warn(exc) + raise Exception(_('Unable to locate volume %s') % mountpoint) + else: + try: + sr_ref = VolumeHelper.find_sr_from_vbd(self._session, + vbd_ref) + VMHelper.unplug_vbd(self._session, vbd_ref) + except StorageError, exc: + logging.warn(exc) + raise Exception(_('Unable to detach volume %s') % mountpoint) + try: + VMHelper.destroy_vbd(self._session, vbd_ref) + except StorageError, exc: + logging.warn(exc) + # Forget SR + VolumeHelper.destroy_iscsi_storage(self._session, sr_ref) + logging.info(_('Mountpoint %s detached from instance %s'), + mountpoint, instance_name) diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py new file mode 100644 index 000000000..11c66c974 --- /dev/null +++ b/nova/virt/xenapi_conn.py @@ -0,0 +1,285 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A connection to XenServer or Xen Cloud Platform. + +The concurrency model for this class is as follows: + +All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator +deferredToThread). They are remote calls, and so may hang for the usual +reasons. They should not be allowed to block the reactor thread. + +All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async +(using XenAPI.VM.async_start etc). These return a task, which can then be +polled for completion. Polling is handled using reactor.callLater. + +This combination of techniques means that we don't block the reactor thread at +all, and at the same time we don't hold lots of threads waiting for +long-running operations. + +FIXME: get_info currently doesn't conform to these rules, and will block the +reactor thread if the VM.get_by_name_label or VM.get_record calls block. + +**Related Flags** + +:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform. +:xenapi_connection_username: Username for connection to XenServer/Xen Cloud + Platform (default: root). +:xenapi_connection_password: Password for connection to XenServer/Xen Cloud + Platform. +:xenapi_task_poll_interval: The interval (seconds) used for polling of + remote tasks (Async.VM.start, etc) + (default: 0.5). +:target_host: the iSCSI Target Host IP address, i.e. the IP + address for the nova-volume host +:target_port: iSCSI Target Port, 3260 Default +:iqn_prefix: IQN Prefix, e.g. 'iqn.2010-10.org.openstack' +""" + +import logging +import sys +import xmlrpclib + +from eventlet import event +from eventlet import tpool + +from nova import context +from nova import db +from nova import utils +from nova import flags +from nova.virt.xenapi.vmops import VMOps +from nova.virt.xenapi.volumeops import VolumeOps + +FLAGS = flags.FLAGS + +flags.DEFINE_string('xenapi_connection_url', + None, + 'URL for connection to XenServer/Xen Cloud Platform.' + ' Required if connection_type=xenapi.') +flags.DEFINE_string('xenapi_connection_username', + 'root', + 'Username for connection to XenServer/Xen Cloud Platform.' + ' Used only if connection_type=xenapi.') +flags.DEFINE_string('xenapi_connection_password', + None, + 'Password for connection to XenServer/Xen Cloud Platform.' + ' Used only if connection_type=xenapi.') +flags.DEFINE_float('xenapi_task_poll_interval', + 0.5, + 'The interval used for polling of remote tasks ' + '(Async.VM.start, etc). Used only if ' + 'connection_type=xenapi.') +flags.DEFINE_string('target_host', + None, + 'iSCSI Target Host') +flags.DEFINE_string('target_port', + '3260', + 'iSCSI Target Port, 3260 Default') +flags.DEFINE_string('iqn_prefix', + 'iqn.2010-10.org.openstack', + 'IQN Prefix') + + +def get_connection(_): + """Note that XenAPI doesn't have a read-only connection mode, so + the read_only parameter is ignored.""" + url = FLAGS.xenapi_connection_url + username = FLAGS.xenapi_connection_username + password = FLAGS.xenapi_connection_password + if not url or password is None: + raise Exception(_('Must specify xenapi_connection_url, ' + 'xenapi_connection_username (optionally), and ' + 'xenapi_connection_password to use ' + 'connection_type=xenapi')) + return XenAPIConnection(url, username, password) + + +class XenAPIConnection(object): + """A connection to XenServer or Xen Cloud Platform""" + + def __init__(self, url, user, pw): + session = XenAPISession(url, user, pw) + self._vmops = VMOps(session) + self._volumeops = VolumeOps(session) + + def init_host(self): + #FIXME(armando): implement this + #NOTE(armando): would we need a method + #to call when shutting down the host? + #e.g. to do session logout? + pass + + def list_instances(self): + """List VM instances""" + return self._vmops.list_instances() + + def spawn(self, instance): + """Create VM instance""" + self._vmops.spawn(instance) + + def reboot(self, instance): + """Reboot VM instance""" + self._vmops.reboot(instance) + + def destroy(self, instance): + """Destroy VM instance""" + self._vmops.destroy(instance) + + def pause(self, instance, callback): + """Pause VM instance""" + self._vmops.pause(instance, callback) + + def unpause(self, instance, callback): + """Unpause paused VM instance""" + self._vmops.unpause(instance, callback) + + def get_info(self, instance_id): + """Return data about VM instance""" + return self._vmops.get_info(instance_id) + + def get_diagnostics(self, instance_id): + """Return data about VM diagnostics""" + return self._vmops.get_diagnostics(instance_id) + + def get_console_output(self, instance): + """Return snapshot of console""" + return self._vmops.get_console_output(instance) + + def attach_volume(self, instance_name, device_path, mountpoint): + """Attach volume storage to VM instance""" + return self._volumeops.attach_volume(instance_name, + device_path, + mountpoint) + + def detach_volume(self, instance_name, mountpoint): + """Detach volume storage to VM instance""" + return self._volumeops.detach_volume(instance_name, mountpoint) + + +class XenAPISession(object): + """The session to invoke XenAPI SDK calls""" + + def __init__(self, url, user, pw): + self.XenAPI = self.get_imported_xenapi() + self._session = self._create_session(url) + self._session.login_with_password(user, pw) + + def get_imported_xenapi(self): + """Stubout point. This can be replaced with a mock xenapi module.""" + return __import__('XenAPI') + + def get_xenapi(self): + """Return the xenapi object""" + return self._session.xenapi + + def get_xenapi_host(self): + """Return the xenapi host""" + return self._session.xenapi.session.get_this_host(self._session.handle) + + def call_xenapi(self, method, *args): + """Call the specified XenAPI method on a background thread.""" + f = self._session.xenapi + for m in method.split('.'): + f = f.__getattr__(m) + return tpool.execute(f, *args) + + def async_call_plugin(self, plugin, fn, args): + """Call Async.host.call_plugin on a background thread.""" + return tpool.execute(self._unwrap_plugin_exceptions, + self._session.xenapi.Async.host.call_plugin, + self.get_xenapi_host(), plugin, fn, args) + + def wait_for_task(self, id, task): + """Return the result of the given task. The task is polled + until it completes.""" + + done = event.Event() + loop = utils.LoopingCall(self._poll_task, id, task, done) + loop.start(FLAGS.xenapi_task_poll_interval, now=True) + rv = done.wait() + loop.stop() + return rv + + def _create_session(self, url): + """Stubout point. This can be replaced with a mock session.""" + return self.XenAPI.Session(url) + + def _poll_task(self, id, task, done): + """Poll the given XenAPI task, and fire the given Deferred if we + get a result.""" + try: + name = self._session.xenapi.task.get_name_label(task) + status = self._session.xenapi.task.get_status(task) + action = dict( + id=int(id), + action=name, + error=None) + if status == "pending": + return + elif status == "success": + result = self._session.xenapi.task.get_result(task) + logging.info(_("Task [%s] %s status: success %s") % ( + name, + task, + result)) + done.send(_parse_xmlrpc_value(result)) + else: + error_info = self._session.xenapi.task.get_error_info(task) + action["error"] = str(error_info) + logging.warn(_("Task [%s] %s status: %s %s") % ( + name, + task, + status, + error_info)) + done.send_exception(self.XenAPI.Failure(error_info)) + db.instance_action_create(context.get_admin_context(), action) + except self.XenAPI.Failure, exc: + logging.warn(exc) + done.send_exception(*sys.exc_info()) + + def _unwrap_plugin_exceptions(self, func, *args, **kwargs): + """Parse exception details""" + try: + return func(*args, **kwargs) + except self.XenAPI.Failure, exc: + logging.debug(_("Got exception: %s"), exc) + if (len(exc.details) == 4 and + exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and + exc.details[2] == 'Failure'): + params = None + try: + params = eval(exc.details[3]) + except: + raise exc + raise self.XenAPI.Failure(params) + else: + raise + except xmlrpclib.ProtocolError, exc: + logging.debug(_("Got exception: %s"), exc) + raise + + +def _parse_xmlrpc_value(val): + """Parse the given value as if it were an XML-RPC value. This is + sometimes used as the format for the task.result field.""" + if not val: + return val + x = xmlrpclib.loads( + '<?xml version="1.0"?><methodResponse><params><param>' + + val + + '</param></params></methodResponse>') + return x[0][0] |
