summaryrefslogtreecommitdiffstats
path: root/nova/virt
diff options
context:
space:
mode:
authorSandy Walsh <sandy.walsh@rackspace.com>2011-01-05 22:23:23 -0400
committerSandy Walsh <sandy.walsh@rackspace.com>2011-01-05 22:23:23 -0400
commitd32633399622141e47cba44e25549e3d4e04077f (patch)
tree200e0bc0b532be88fdef0027b4cf845e79a95698 /nova/virt
parent401c3374c708d9f80d12eeea39360a26483c30da (diff)
parent275d06792fd5de40b82ef461e3d565c3d0ed3700 (diff)
Fixed trunk merge conflicts as spotted by dubs.
Diffstat (limited to 'nova/virt')
-rw-r--r--nova/virt/connection.py3
-rw-r--r--nova/virt/hyperv.py459
-rw-r--r--nova/virt/images.py45
-rw-r--r--nova/virt/xenapi/vmops.py212
-rw-r--r--nova/virt/xenapi/volume_utils.py12
-rw-r--r--nova/virt/xenapi_conn.py28
6 files changed, 717 insertions, 42 deletions
diff --git a/nova/virt/connection.py b/nova/virt/connection.py
index 61e99944e..846423afe 100644
--- a/nova/virt/connection.py
+++ b/nova/virt/connection.py
@@ -26,6 +26,7 @@ from nova import flags
from nova.virt import fake
from nova.virt import libvirt_conn
from nova.virt import xenapi_conn
+from nova.virt import hyperv
FLAGS = flags.FLAGS
@@ -62,6 +63,8 @@ def get_connection(read_only=False):
conn = libvirt_conn.get_connection(read_only)
elif t == 'xenapi':
conn = xenapi_conn.get_connection(read_only)
+ elif t == 'hyperv':
+ conn = hyperv.get_connection(read_only)
else:
raise Exception('Unknown connection type "%s"' % t)
diff --git a/nova/virt/hyperv.py b/nova/virt/hyperv.py
new file mode 100644
index 000000000..4b9f6f946
--- /dev/null
+++ b/nova/virt/hyperv.py
@@ -0,0 +1,459 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Cloud.com, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A connection to Hyper-V .
+Uses Windows Management Instrumentation (WMI) calls to interact with Hyper-V
+Hyper-V WMI usage:
+ http://msdn.microsoft.com/en-us/library/cc723875%28v=VS.85%29.aspx
+The Hyper-V object model briefly:
+ The physical computer and its hosted virtual machines are each represented
+ by the Msvm_ComputerSystem class.
+
+ Each virtual machine is associated with a
+ Msvm_VirtualSystemGlobalSettingData (vs_gs_data) instance and one or more
+ Msvm_VirtualSystemSettingData (vmsetting) instances. For each vmsetting
+ there is a series of Msvm_ResourceAllocationSettingData (rasd) objects.
+ The rasd objects describe the settings for each device in a VM.
+ Together, the vs_gs_data, vmsettings and rasds describe the configuration
+ of the virtual machine.
+
+ Creating new resources such as disks and nics involves cloning a default
+ rasd object and appropriately modifying the clone and calling the
+ AddVirtualSystemResources WMI method
+ Changing resources such as memory uses the ModifyVirtualSystemResources
+ WMI method
+
+Using the Python WMI library:
+ Tutorial:
+ http://timgolden.me.uk/python/wmi/tutorial.html
+ Hyper-V WMI objects can be retrieved simply by using the class name
+ of the WMI object and optionally specifying a column to filter the
+ result set. More complex filters can be formed using WQL (sql-like)
+ queries.
+ The parameters and return tuples of WMI method calls can gleaned by
+ examining the doc string. For example:
+ >>> vs_man_svc.ModifyVirtualSystemResources.__doc__
+ ModifyVirtualSystemResources (ComputerSystem, ResourceSettingData[])
+ => (Job, ReturnValue)'
+ When passing setting data (ResourceSettingData) to the WMI method,
+ an XML representation of the data is passed in using GetText_(1).
+ Available methods on a service can be determined using method.keys():
+ >>> vs_man_svc.methods.keys()
+ vmsettings and rasds for a vm can be retrieved using the 'associators'
+ method with the appropriate return class.
+ Long running WMI commands generally return a Job (an instance of
+ Msvm_ConcreteJob) whose state can be polled to determine when it finishes
+
+"""
+
+import os
+import logging
+import time
+
+from nova import exception
+from nova import flags
+from nova.auth import manager
+from nova.compute import power_state
+from nova.virt import images
+
+wmi = None
+
+
+FLAGS = flags.FLAGS
+
+
+HYPERV_POWER_STATE = {
+ 3: power_state.SHUTDOWN,
+ 2: power_state.RUNNING,
+ 32768: power_state.PAUSED,
+}
+
+
+REQ_POWER_STATE = {
+ 'Enabled': 2,
+ 'Disabled': 3,
+ 'Reboot': 10,
+ 'Reset': 11,
+ 'Paused': 32768,
+ 'Suspended': 32769
+}
+
+
+WMI_JOB_STATUS_STARTED = 4096
+WMI_JOB_STATE_RUNNING = 4
+WMI_JOB_STATE_COMPLETED = 7
+
+
+def get_connection(_):
+ global wmi
+ if wmi is None:
+ wmi = __import__('wmi')
+ return HyperVConnection()
+
+
+class HyperVConnection(object):
+ def __init__(self):
+ self._conn = wmi.WMI(moniker='//./root/virtualization')
+ self._cim_conn = wmi.WMI(moniker='//./root/cimv2')
+
+ def init_host(self):
+ #FIXME(chiradeep): implement this
+ logging.debug(_('In init host'))
+ pass
+
+ def list_instances(self):
+ """ Return the names of all the instances known to Hyper-V. """
+ vms = [v.ElementName \
+ for v in self._conn.Msvm_ComputerSystem(['ElementName'])]
+ return vms
+
+ def spawn(self, instance):
+ """ Create a new VM and start it."""
+ vm = self._lookup(instance.name)
+ if vm is not None:
+ raise exception.Duplicate(_('Attempt to create duplicate vm %s') %
+ instance.name)
+
+ user = manager.AuthManager().get_user(instance['user_id'])
+ project = manager.AuthManager().get_project(instance['project_id'])
+ #Fetch the file, assume it is a VHD file.
+ base_vhd_filename = os.path.join(FLAGS.instances_path,
+ instance.name)
+ vhdfile = "%s.vhd" % (base_vhd_filename)
+ images.fetch(instance['image_id'], vhdfile, user, project)
+
+ try:
+ self._create_vm(instance)
+
+ self._create_disk(instance['name'], vhdfile)
+ self._create_nic(instance['name'], instance['mac_address'])
+
+ logging.debug(_('Starting VM %s '), instance.name)
+ self._set_vm_state(instance['name'], 'Enabled')
+ logging.info(_('Started VM %s '), instance.name)
+ except Exception as exn:
+ logging.error(_('spawn vm failed: %s'), exn)
+ self.destroy(instance)
+
+ def _create_vm(self, instance):
+ """Create a VM but don't start it. """
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+
+ vs_gs_data = self._conn.Msvm_VirtualSystemGlobalSettingData.new()
+ vs_gs_data.ElementName = instance['name']
+ (job, ret_val) = vs_man_svc.DefineVirtualSystem(
+ [], None, vs_gs_data.GetText_(1))[1:]
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ else:
+ success = (ret_val == 0)
+
+ if not success:
+ raise Exception(_('Failed to create VM %s'), instance.name)
+
+ logging.debug(_('Created VM %s...'), instance.name)
+ vm = self._conn.Msvm_ComputerSystem(ElementName=instance.name)[0]
+
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ vmsetting = [s for s in vmsettings
+ if s.SettingType == 3][0] # avoid snapshots
+ memsetting = vmsetting.associators(
+ wmi_result_class='Msvm_MemorySettingData')[0]
+ #No Dynamic Memory, so reservation, limit and quantity are identical.
+ mem = long(str(instance['memory_mb']))
+ memsetting.VirtualQuantity = mem
+ memsetting.Reservation = mem
+ memsetting.Limit = mem
+
+ (job, ret_val) = vs_man_svc.ModifyVirtualSystemResources(
+ vm.path_(), [memsetting.GetText_(1)])
+ logging.debug(_('Set memory for vm %s...'), instance.name)
+ procsetting = vmsetting.associators(
+ wmi_result_class='Msvm_ProcessorSettingData')[0]
+ vcpus = long(instance['vcpus'])
+ procsetting.VirtualQuantity = vcpus
+ procsetting.Reservation = vcpus
+ procsetting.Limit = vcpus
+
+ (job, ret_val) = vs_man_svc.ModifyVirtualSystemResources(
+ vm.path_(), [procsetting.GetText_(1)])
+ logging.debug(_('Set vcpus for vm %s...'), instance.name)
+
+ def _create_disk(self, vm_name, vhdfile):
+ """Create a disk and attach it to the vm"""
+ logging.debug(_('Creating disk for %s by attaching disk file %s'),
+ vm_name, vhdfile)
+ #Find the IDE controller for the vm.
+ vms = self._conn.MSVM_ComputerSystem(ElementName=vm_name)
+ vm = vms[0]
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ rasds = vmsettings[0].associators(
+ wmi_result_class='MSVM_ResourceAllocationSettingData')
+ ctrller = [r for r in rasds
+ if r.ResourceSubType == 'Microsoft Emulated IDE Controller'\
+ and r.Address == "0"]
+ #Find the default disk drive object for the vm and clone it.
+ diskdflt = self._conn.query(
+ "SELECT * FROM Msvm_ResourceAllocationSettingData \
+ WHERE ResourceSubType LIKE 'Microsoft Synthetic Disk Drive'\
+ AND InstanceID LIKE '%Default%'")[0]
+ diskdrive = self._clone_wmi_obj(
+ 'Msvm_ResourceAllocationSettingData', diskdflt)
+ #Set the IDE ctrller as parent.
+ diskdrive.Parent = ctrller[0].path_()
+ diskdrive.Address = 0
+ #Add the cloned disk drive object to the vm.
+ new_resources = self._add_virt_resource(diskdrive, vm)
+ if new_resources is None:
+ raise Exception(_('Failed to add diskdrive to VM %s'),
+ vm_name)
+ diskdrive_path = new_resources[0]
+ logging.debug(_('New disk drive path is %s'), diskdrive_path)
+ #Find the default VHD disk object.
+ vhddefault = self._conn.query(
+ "SELECT * FROM Msvm_ResourceAllocationSettingData \
+ WHERE ResourceSubType LIKE 'Microsoft Virtual Hard Disk' AND \
+ InstanceID LIKE '%Default%' ")[0]
+
+ #Clone the default and point it to the image file.
+ vhddisk = self._clone_wmi_obj(
+ 'Msvm_ResourceAllocationSettingData', vhddefault)
+ #Set the new drive as the parent.
+ vhddisk.Parent = diskdrive_path
+ vhddisk.Connection = [vhdfile]
+
+ #Add the new vhd object as a virtual hard disk to the vm.
+ new_resources = self._add_virt_resource(vhddisk, vm)
+ if new_resources is None:
+ raise Exception(_('Failed to add vhd file to VM %s'),
+ vm_name)
+ logging.info(_('Created disk for %s'), vm_name)
+
+ def _create_nic(self, vm_name, mac):
+ """Create a (emulated) nic and attach it to the vm"""
+ logging.debug(_('Creating nic for %s '), vm_name)
+ #Find the vswitch that is connected to the physical nic.
+ vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name)
+ extswitch = self._find_external_network()
+ vm = vms[0]
+ switch_svc = self._conn.Msvm_VirtualSwitchManagementService()[0]
+ #Find the default nic and clone it to create a new nic for the vm.
+ #Use Msvm_SyntheticEthernetPortSettingData for Windows or Linux with
+ #Linux Integration Components installed.
+ emulatednics_data = self._conn.Msvm_EmulatedEthernetPortSettingData()
+ default_nic_data = [n for n in emulatednics_data
+ if n.InstanceID.rfind('Default') > 0]
+ new_nic_data = self._clone_wmi_obj(
+ 'Msvm_EmulatedEthernetPortSettingData',
+ default_nic_data[0])
+ #Create a port on the vswitch.
+ (new_port, ret_val) = switch_svc.CreateSwitchPort(vm_name, vm_name,
+ "", extswitch.path_())
+ if ret_val != 0:
+ logging.error(_('Failed creating a port on the external vswitch'))
+ raise Exception(_('Failed creating port for %s'),
+ vm_name)
+ logging.debug(_("Created switch port %s on switch %s"),
+ vm_name, extswitch.path_())
+ #Connect the new nic to the new port.
+ new_nic_data.Connection = [new_port]
+ new_nic_data.ElementName = vm_name + ' nic'
+ new_nic_data.Address = ''.join(mac.split(':'))
+ new_nic_data.StaticMacAddress = 'TRUE'
+ #Add the new nic to the vm.
+ new_resources = self._add_virt_resource(new_nic_data, vm)
+ if new_resources is None:
+ raise Exception(_('Failed to add nic to VM %s'),
+ vm_name)
+ logging.info(_("Created nic for %s "), vm_name)
+
+ def _add_virt_resource(self, res_setting_data, target_vm):
+ """Add a new resource (disk/nic) to the VM"""
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+ (job, new_resources, ret_val) = vs_man_svc.\
+ AddVirtualSystemResources([res_setting_data.GetText_(1)],
+ target_vm.path_())
+ success = True
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ else:
+ success = (ret_val == 0)
+ if success:
+ return new_resources
+ else:
+ return None
+
+ #TODO: use the reactor to poll instead of sleep
+ def _check_job_status(self, jobpath):
+ """Poll WMI job state for completion"""
+ #Jobs have a path of the form:
+ #\\WIN-P5IG7367DAG\root\virtualization:Msvm_ConcreteJob.InstanceID=
+ #"8A496B9C-AF4D-4E98-BD3C-1128CD85320D"
+ inst_id = jobpath.split('=')[1].strip('"')
+ jobs = self._conn.Msvm_ConcreteJob(InstanceID=inst_id)
+ if len(jobs) == 0:
+ return False
+ job = jobs[0]
+ while job.JobState == WMI_JOB_STATE_RUNNING:
+ time.sleep(0.1)
+ job = self._conn.Msvm_ConcreteJob(InstanceID=inst_id)[0]
+ if job.JobState != WMI_JOB_STATE_COMPLETED:
+ logging.debug(_("WMI job failed: %s"), job.ErrorSummaryDescription)
+ return False
+ logging.debug(_("WMI job succeeded: %s, Elapsed=%s "), job.Description,
+ job.ElapsedTime)
+ return True
+
+ def _find_external_network(self):
+ """Find the vswitch that is connected to the physical nic.
+ Assumes only one physical nic on the host
+ """
+ #If there are no physical nics connected to networks, return.
+ bound = self._conn.Msvm_ExternalEthernetPort(IsBound='TRUE')
+ if len(bound) == 0:
+ return None
+ return self._conn.Msvm_ExternalEthernetPort(IsBound='TRUE')[0]\
+ .associators(wmi_result_class='Msvm_SwitchLANEndpoint')[0]\
+ .associators(wmi_result_class='Msvm_SwitchPort')[0]\
+ .associators(wmi_result_class='Msvm_VirtualSwitch')[0]
+
+ def _clone_wmi_obj(self, wmi_class, wmi_obj):
+ """Clone a WMI object"""
+ cl = self._conn.__getattr__(wmi_class) # get the class
+ newinst = cl.new()
+ #Copy the properties from the original.
+ for prop in wmi_obj._properties:
+ newinst.Properties_.Item(prop).Value =\
+ wmi_obj.Properties_.Item(prop).Value
+ return newinst
+
+ def reboot(self, instance):
+ """Reboot the specified instance."""
+ vm = self._lookup(instance.name)
+ if vm is None:
+ raise exception.NotFound('instance not present %s' % instance.name)
+ self._set_vm_state(instance.name, 'Reboot')
+
+ def destroy(self, instance):
+ """Destroy the VM. Also destroy the associated VHD disk files"""
+ logging.debug(_("Got request to destroy vm %s"), instance.name)
+ vm = self._lookup(instance.name)
+ if vm is None:
+ return
+ vm = self._conn.Msvm_ComputerSystem(ElementName=instance.name)[0]
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+ #Stop the VM first.
+ self._set_vm_state(instance.name, 'Disabled')
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ rasds = vmsettings[0].associators(
+ wmi_result_class='MSVM_ResourceAllocationSettingData')
+ disks = [r for r in rasds \
+ if r.ResourceSubType == 'Microsoft Virtual Hard Disk']
+ diskfiles = []
+ #Collect disk file information before destroying the VM.
+ for disk in disks:
+ diskfiles.extend([c for c in disk.Connection])
+ #Nuke the VM. Does not destroy disks.
+ (job, ret_val) = vs_man_svc.DestroyVirtualSystem(vm.path_())
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ elif ret_val == 0:
+ success = True
+ if not success:
+ raise Exception(_('Failed to destroy vm %s') % instance.name)
+ #Delete associated vhd disk files.
+ for disk in diskfiles:
+ vhdfile = self._cim_conn.CIM_DataFile(Name=disk)
+ for vf in vhdfile:
+ vf.Delete()
+ logging.debug(_("Del: disk %s vm %s"), vhdfile, instance.name)
+
+ def get_info(self, instance_id):
+ """Get information about the VM"""
+ vm = self._lookup(instance_id)
+ if vm is None:
+ raise exception.NotFound('instance not present %s' % instance_id)
+ vm = self._conn.Msvm_ComputerSystem(ElementName=instance_id)[0]
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ settings_paths = [v.path_() for v in vmsettings]
+ #See http://msdn.microsoft.com/en-us/library/cc160706%28VS.85%29.aspx
+ summary_info = vs_man_svc.GetSummaryInformation(
+ [4, 100, 103, 105], settings_paths)[1]
+ info = summary_info[0]
+ logging.debug(_("Got Info for vm %s: state=%s, mem=%s, num_cpu=%s, \
+ cpu_time=%s"), instance_id,
+ str(HYPERV_POWER_STATE[info.EnabledState]),
+ str(info.MemoryUsage),
+ str(info.NumberOfProcessors),
+ str(info.UpTime))
+
+ return {'state': HYPERV_POWER_STATE[info.EnabledState],
+ 'max_mem': info.MemoryUsage,
+ 'mem': info.MemoryUsage,
+ 'num_cpu': info.NumberOfProcessors,
+ 'cpu_time': info.UpTime}
+
+ def _lookup(self, i):
+ vms = self._conn.Msvm_ComputerSystem(ElementName=i)
+ n = len(vms)
+ if n == 0:
+ return None
+ elif n > 1:
+ raise Exception(_('duplicate name found: %s') % i)
+ else:
+ return vms[0].ElementName
+
+ def _set_vm_state(self, vm_name, req_state):
+ """Set the desired state of the VM"""
+ vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name)
+ if len(vms) == 0:
+ return False
+ (job, ret_val) = vms[0].RequestStateChange(REQ_POWER_STATE[req_state])
+ success = False
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ elif ret_val == 0:
+ success = True
+ elif ret_val == 32775:
+ #Invalid state for current operation. Typically means it is
+ #already in the state requested
+ success = True
+ if success:
+ logging.info(_("Successfully changed vm state of %s to %s"),
+ vm_name, req_state)
+ else:
+ logging.error(_("Failed to change vm state of %s to %s"),
+ vm_name, req_state)
+ raise Exception(_("Failed to change vm state of %s to %s"),
+ vm_name, req_state)
+
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ vm = self._lookup(instance_name)
+ if vm is None:
+ raise exception.NotFound('Cannot attach volume to missing %s vm' %
+ instance_name)
+
+ def detach_volume(self, instance_name, mountpoint):
+ vm = self._lookup(instance_name)
+ if vm is None:
+ raise exception.NotFound('Cannot detach volume from missing %s ' %
+ instance_name)
diff --git a/nova/virt/images.py b/nova/virt/images.py
index 1c9b2e093..2d03da4b4 100644
--- a/nova/virt/images.py
+++ b/nova/virt/images.py
@@ -21,8 +21,12 @@
Handling of VM disk images.
"""
+import logging
import os.path
+import shutil
+import sys
import time
+import urllib2
import urlparse
from nova import flags
@@ -45,6 +49,25 @@ def fetch(image, path, user, project):
return f(image, path, user, project)
+def _fetch_image_no_curl(url, path, headers):
+ request = urllib2.Request(url)
+ for (k, v) in headers.iteritems():
+ request.add_header(k, v)
+
+ def urlretrieve(urlfile, fpath):
+ chunk = 1 * 1024 * 1024
+ f = open(fpath, "wb")
+ while 1:
+ data = urlfile.read(chunk)
+ if not data:
+ break
+ f.write(data)
+
+ urlopened = urllib2.urlopen(request)
+ urlretrieve(urlopened, path)
+ logging.debug(_("Finished retreving %s -- placed in %s"), url, path)
+
+
def _fetch_s3_image(image, path, user, project):
url = image_url(image)
@@ -61,18 +84,24 @@ def _fetch_s3_image(image, path, user, project):
url_path)
headers['Authorization'] = 'AWS %s:%s' % (access, signature)
- cmd = ['/usr/bin/curl', '--fail', '--silent', url]
- for (k, v) in headers.iteritems():
- cmd += ['-H', '"%s: %s"' % (k, v)]
+ if sys.platform.startswith('win'):
+ return _fetch_image_no_curl(url, path, headers)
+ else:
+ cmd = ['/usr/bin/curl', '--fail', '--silent', url]
+ for (k, v) in headers.iteritems():
+ cmd += ['-H', '%s: %s' % (k, v)]
- cmd += ['-o', path]
- cmd_out = ' '.join(cmd)
- return utils.execute(cmd_out)
+ cmd += ['-o', path]
+ cmd_out = ' '.join(cmd)
+ return utils.execute(cmd_out)
def _fetch_local_image(image, path, user, project):
- source = _image_path('%s/image' % image)
- return utils.execute('cp %s %s' % (source, path))
+ source = _image_path(os.path.join(image, 'image'))
+ if sys.platform.startswith('win'):
+ return shutil.copy(source, path)
+ else:
+ return utils.execute('cp %s %s' % (source, path))
def _image_path(path):
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 67c95a068..b6d620782 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
+# Copyright 2010 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -18,6 +19,7 @@
Management class for VM-related functions (spawn, reboot, etc).
"""
+import json
import logging
from nova import db
@@ -36,7 +38,6 @@ class VMOps(object):
"""
Management class for VM-related tasks
"""
-
def __init__(self, session):
self.XenAPI = session.get_imported_xenapi()
self._session = session
@@ -120,6 +121,20 @@ class VMOps(object):
timer.f = _wait_for_boot
return timer.start(interval=0.5, now=True)
+ def _get_vm_opaque_ref(self, instance_or_vm):
+ """Refactored out the common code of many methods that receive either
+ a vm name or a vm instance, and want a vm instance in return.
+ """
+ try:
+ instance_name = instance_or_vm.name
+ vm = VMHelper.lookup(self._session, instance_name)
+ except AttributeError:
+ # A vm opaque ref was passed
+ vm = instance_or_vm
+ if vm is None:
+ raise Exception(_('Instance not present %s') % instance_name)
+ return vm
+
def snapshot(self, instance, name):
""" Create snapshot from a running VM instance
@@ -168,11 +183,7 @@ class VMOps(object):
def reboot(self, instance):
"""Reboot VM instance"""
- instance_name = instance.name
- vm = VMHelper.lookup(self._session, instance_name)
- if vm is None:
- raise exception.NotFound(_('instance not'
- ' found %s') % instance_name)
+ vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.clean_reboot', vm)
self._session.wait_for_task(instance.id, task)
@@ -221,21 +232,13 @@ class VMOps(object):
def pause(self, instance, callback):
"""Pause VM instance"""
- instance_name = instance.name
- vm = VMHelper.lookup(self._session, instance_name)
- if vm is None:
- raise exception.NotFound(_('Instance not'
- ' found %s') % instance_name)
+ vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.pause', vm)
self._wait_with_callback(instance.id, task, callback)
def unpause(self, instance, callback):
"""Unpause VM instance"""
- instance_name = instance.name
- vm = VMHelper.lookup(self._session, instance_name)
- if vm is None:
- raise exception.NotFound(_('Instance not'
- ' found %s') % instance_name)
+ vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.unpause', vm)
self._wait_with_callback(instance.id, task, callback)
@@ -270,10 +273,7 @@ class VMOps(object):
def get_diagnostics(self, instance):
"""Return data about VM diagnostics"""
- vm = VMHelper.lookup(self._session, instance.name)
- if vm is None:
- raise exception.NotFound(_("Instance not found %s") %
- instance.name)
+ vm = self._get_vm_opaque_ref(instance)
rec = self._session.get_xenapi().VM.get_record(vm)
return VMHelper.compile_diagnostics(self._session, rec)
@@ -281,3 +281,175 @@ class VMOps(object):
"""Return snapshot of console"""
# TODO: implement this to fix pylint!
return 'FAKE CONSOLE OUTPUT of instance'
+
+ def list_from_xenstore(self, vm, path):
+ """Runs the xenstore-ls command to get a listing of all records
+ from 'path' downward. Returns a dict with the sub-paths as keys,
+ and the value stored in those paths as values. If nothing is
+ found at that path, returns None.
+ """
+ ret = self._make_xenstore_call('list_records', vm, path)
+ return json.loads(ret)
+
+ def read_from_xenstore(self, vm, path):
+ """Returns the value stored in the xenstore record for the given VM
+ at the specified location. A XenAPIPlugin.PluginError will be raised
+ if any error is encountered in the read process.
+ """
+ try:
+ ret = self._make_xenstore_call('read_record', vm, path,
+ {'ignore_missing_path': 'True'})
+ except self.XenAPI.Failure, e:
+ return None
+ ret = json.loads(ret)
+ if ret == "None":
+ # Can't marshall None over RPC calls.
+ return None
+ return ret
+
+ def write_to_xenstore(self, vm, path, value):
+ """Writes the passed value to the xenstore record for the given VM
+ at the specified location. A XenAPIPlugin.PluginError will be raised
+ if any error is encountered in the write process.
+ """
+ return self._make_xenstore_call('write_record', vm, path,
+ {'value': json.dumps(value)})
+
+ def clear_xenstore(self, vm, path):
+ """Deletes the VM's xenstore record for the specified path.
+ If there is no such record, the request is ignored.
+ """
+ self._make_xenstore_call('delete_record', vm, path)
+
+ def _make_xenstore_call(self, method, vm, path, addl_args={}):
+ """Handles calls to the xenstore xenapi plugin."""
+ return self._make_plugin_call('xenstore.py', method=method, vm=vm,
+ path=path, addl_args=addl_args)
+
+ def _make_plugin_call(self, plugin, method, vm, path, addl_args={}):
+ """Abstracts out the process of calling a method of a xenapi plugin.
+ Any errors raised by the plugin will in turn raise a RuntimeError here.
+ """
+ vm = self._get_vm_opaque_ref(vm)
+ rec = self._session.get_xenapi().VM.get_record(vm)
+ args = {'dom_id': rec['domid'], 'path': path}
+ args.update(addl_args)
+ # If the 'testing_mode' attribute is set, add that to the args.
+ if getattr(self, 'testing_mode', False):
+ args['testing_mode'] = 'true'
+ try:
+ task = self._session.async_call_plugin(plugin, method, args)
+ ret = self._session.wait_for_task(0, task)
+ except self.XenAPI.Failure, e:
+ raise RuntimeError("%s" % e.details[-1])
+ return ret
+
+ def add_to_xenstore(self, vm, path, key, value):
+ """Adds the passed key/value pair to the xenstore record for
+ the given VM at the specified location. A XenAPIPlugin.PluginError
+ will be raised if any error is encountered in the write process.
+ """
+ current = self.read_from_xenstore(vm, path)
+ if not current:
+ # Nothing at that location
+ current = {key: value}
+ else:
+ current[key] = value
+ self.write_to_xenstore(vm, path, current)
+
+ def remove_from_xenstore(self, vm, path, key_or_keys):
+ """Takes either a single key or a list of keys and removes
+ them from the xenstoreirecord data for the given VM.
+ If the key doesn't exist, the request is ignored.
+ """
+ current = self.list_from_xenstore(vm, path)
+ if not current:
+ return
+ if isinstance(key_or_keys, basestring):
+ keys = [key_or_keys]
+ else:
+ keys = key_or_keys
+ keys.sort(lambda x, y: cmp(y.count('/'), x.count('/')))
+ for key in keys:
+ if path:
+ keypath = "%s/%s" % (path, key)
+ else:
+ keypath = key
+ self._make_xenstore_call('delete_record', vm, keypath)
+
+ ########################################################################
+ ###### The following methods interact with the xenstore parameter
+ ###### record, not the live xenstore. They were created before I
+ ###### knew the difference, and are left in here in case they prove
+ ###### to be useful. They all have '_param' added to their method
+ ###### names to distinguish them. (dabo)
+ ########################################################################
+ def read_partial_from_param_xenstore(self, instance_or_vm, key_prefix):
+ """Returns a dict of all the keys in the xenstore parameter record
+ for the given instance that begin with the key_prefix.
+ """
+ data = self.read_from_param_xenstore(instance_or_vm)
+ badkeys = [k for k in data.keys()
+ if not k.startswith(key_prefix)]
+ for badkey in badkeys:
+ del data[badkey]
+ return data
+
+ def read_from_param_xenstore(self, instance_or_vm, keys=None):
+ """Returns the xenstore parameter record data for the specified VM
+ instance as a dict. Accepts an optional key or list of keys; if a
+ value for 'keys' is passed, the returned dict is filtered to only
+ return the values for those keys.
+ """
+ vm = self._get_vm_opaque_ref(instance_or_vm)
+ data = self._session.call_xenapi_request('VM.get_xenstore_data',
+ (vm, ))
+ ret = {}
+ if keys is None:
+ keys = data.keys()
+ elif isinstance(keys, basestring):
+ keys = [keys]
+ for key in keys:
+ raw = data.get(key)
+ if raw:
+ ret[key] = json.loads(raw)
+ else:
+ ret[key] = raw
+ return ret
+
+ def add_to_param_xenstore(self, instance_or_vm, key, val):
+ """Takes a key/value pair and adds it to the xenstore parameter
+ record for the given vm instance. If the key exists in xenstore,
+ it is overwritten"""
+ vm = self._get_vm_opaque_ref(instance_or_vm)
+ self.remove_from_param_xenstore(instance_or_vm, key)
+ jsonval = json.dumps(val)
+ self._session.call_xenapi_request('VM.add_to_xenstore_data',
+ (vm, key, jsonval))
+
+ def write_to_param_xenstore(self, instance_or_vm, mapping):
+ """Takes a dict and writes each key/value pair to the xenstore
+ parameter record for the given vm instance. Any existing data for
+ those keys is overwritten.
+ """
+ for k, v in mapping.iteritems():
+ self.add_to_param_xenstore(instance_or_vm, k, v)
+
+ def remove_from_param_xenstore(self, instance_or_vm, key_or_keys):
+ """Takes either a single key or a list of keys and removes
+ them from the xenstore parameter record data for the given VM.
+ If the key doesn't exist, the request is ignored.
+ """
+ vm = self._get_vm_opaque_ref(instance_or_vm)
+ if isinstance(key_or_keys, basestring):
+ keys = [key_or_keys]
+ else:
+ keys = key_or_keys
+ for key in keys:
+ self._session.call_xenapi_request('VM.remove_from_xenstore_data',
+ (vm, key))
+
+ def clear_param_xenstore(self, instance_or_vm):
+ """Removes all data from the xenstore parameter record for this VM."""
+ self.write_to_param_xenstore(instance_or_vm, {})
+ ########################################################################
diff --git a/nova/virt/xenapi/volume_utils.py b/nova/virt/xenapi/volume_utils.py
index 1ca813bcf..4bbc41b03 100644
--- a/nova/virt/xenapi/volume_utils.py
+++ b/nova/virt/xenapi/volume_utils.py
@@ -200,15 +200,19 @@ class VolumeHelper(HelperBase):
return -1
-def _get_volume_id(path):
+def _get_volume_id(path_or_id):
"""Retrieve the volume id from device_path"""
+ # If we have the ID and not a path, just return it.
+ if isinstance(path_or_id, int):
+ return path_or_id
# n must contain at least the volume_id
# /vol- is for remote volumes
# -vol- is for local volumes
# see compute/manager->setup_compute_volume
- volume_id = path[path.find('/vol-') + 1:]
- if volume_id == path:
- volume_id = path[path.find('-vol-') + 1:].replace('--', '-')
+ volume_id = path_or_id[path_or_id.find('/vol-') + 1:]
+ if volume_id == path_or_id:
+ volume_id = path_or_id[path_or_id.find('-vol-') + 1:]
+ volume_id = volume_id.replace('--', '-')
return volume_id
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
index f17c8f39d..c48f5b7cb 100644
--- a/nova/virt/xenapi_conn.py
+++ b/nova/virt/xenapi_conn.py
@@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
+# Copyright 2010 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -19,15 +20,15 @@ A connection to XenServer or Xen Cloud Platform.
The concurrency model for this class is as follows:
-All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
-deferredToThread). They are remote calls, and so may hang for the usual
-reasons. They should not be allowed to block the reactor thread.
+All XenAPI calls are on a green thread (using eventlet's "tpool"
+thread pool). They are remote calls, and so may hang for the usual
+reasons.
All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
-(using XenAPI.VM.async_start etc). These return a task, which can then be
-polled for completion. Polling is handled using reactor.callLater.
+(using XenAPI.VM.async_start etc). These return a task, which can then be
+polled for completion.
-This combination of techniques means that we don't block the reactor thread at
+This combination of techniques means that we don't block the main thread at
all, and at the same time we don't hold lots of threads waiting for
long-running operations.
@@ -81,7 +82,7 @@ flags.DEFINE_string('xenapi_connection_password',
flags.DEFINE_float('xenapi_task_poll_interval',
0.5,
'The interval used for polling of remote tasks '
- '(Async.VM.start, etc). Used only if '
+ '(Async.VM.start, etc). Used only if '
'connection_type=xenapi.')
flags.DEFINE_float('xenapi_vhd_coalesce_poll_interval',
5.0,
@@ -213,6 +214,14 @@ class XenAPISession(object):
f = f.__getattr__(m)
return tpool.execute(f, *args)
+ def call_xenapi_request(self, method, *args):
+ """Some interactions with dom0, such as interacting with xenstore's
+ param record, require using the xenapi_request method of the session
+ object. This wraps that call on a background thread.
+ """
+ f = self._session.xenapi_request
+ return tpool.execute(f, method, *args)
+
def async_call_plugin(self, plugin, fn, args):
"""Call Async.host.call_plugin on a background thread."""
return tpool.execute(self._unwrap_plugin_exceptions,
@@ -222,7 +231,6 @@ class XenAPISession(object):
def wait_for_task(self, id, task):
"""Return the result of the given task. The task is polled
until it completes."""
-
done = event.Event()
loop = utils.LoopingCall(self._poll_task, id, task, done)
loop.start(FLAGS.xenapi_task_poll_interval, now=True)
@@ -235,7 +243,7 @@ class XenAPISession(object):
return self.XenAPI.Session(url)
def _poll_task(self, id, task, done):
- """Poll the given XenAPI task, and fire the given Deferred if we
+ """Poll the given XenAPI task, and fire the given action if we
get a result."""
try:
name = self._session.xenapi.task.get_name_label(task)
@@ -290,7 +298,7 @@ class XenAPISession(object):
def _parse_xmlrpc_value(val):
- """Parse the given value as if it were an XML-RPC value. This is
+ """Parse the given value as if it were an XML-RPC value. This is
sometimes used as the format for the task.result field."""
if not val:
return val