summaryrefslogtreecommitdiffstats
path: root/nova/virt
diff options
context:
space:
mode:
Diffstat (limited to 'nova/virt')
-rw-r--r--nova/virt/connection.py8
-rw-r--r--nova/virt/hyperv.py462
-rw-r--r--nova/virt/images.py47
-rw-r--r--nova/virt/libvirt_conn.py442
-rw-r--r--nova/virt/xenapi/fake.py24
-rw-r--r--nova/virt/xenapi/vm_utils.py68
-rw-r--r--nova/virt/xenapi/vmops.py240
-rw-r--r--nova/virt/xenapi/volume_utils.py56
-rw-r--r--nova/virt/xenapi/volumeops.py31
-rw-r--r--nova/virt/xenapi_conn.py44
10 files changed, 1183 insertions, 239 deletions
diff --git a/nova/virt/connection.py b/nova/virt/connection.py
index 61e99944e..13181b730 100644
--- a/nova/virt/connection.py
+++ b/nova/virt/connection.py
@@ -19,15 +19,17 @@
"""Abstraction of the underlying virtualization API."""
-import logging
import sys
from nova import flags
+from nova import log as logging
from nova.virt import fake
from nova.virt import libvirt_conn
from nova.virt import xenapi_conn
+from nova.virt import hyperv
+LOG = logging.getLogger("nova.virt.connection")
FLAGS = flags.FLAGS
@@ -62,10 +64,12 @@ def get_connection(read_only=False):
conn = libvirt_conn.get_connection(read_only)
elif t == 'xenapi':
conn = xenapi_conn.get_connection(read_only)
+ elif t == 'hyperv':
+ conn = hyperv.get_connection(read_only)
else:
raise Exception('Unknown connection type "%s"' % t)
if conn is None:
- logging.error(_('Failed to open connection to the hypervisor'))
+ LOG.error(_('Failed to open connection to the hypervisor'))
sys.exit(1)
return conn
diff --git a/nova/virt/hyperv.py b/nova/virt/hyperv.py
new file mode 100644
index 000000000..d71387ac0
--- /dev/null
+++ b/nova/virt/hyperv.py
@@ -0,0 +1,462 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2010 Cloud.com, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+A connection to Hyper-V .
+Uses Windows Management Instrumentation (WMI) calls to interact with Hyper-V
+Hyper-V WMI usage:
+ http://msdn.microsoft.com/en-us/library/cc723875%28v=VS.85%29.aspx
+The Hyper-V object model briefly:
+ The physical computer and its hosted virtual machines are each represented
+ by the Msvm_ComputerSystem class.
+
+ Each virtual machine is associated with a
+ Msvm_VirtualSystemGlobalSettingData (vs_gs_data) instance and one or more
+ Msvm_VirtualSystemSettingData (vmsetting) instances. For each vmsetting
+ there is a series of Msvm_ResourceAllocationSettingData (rasd) objects.
+ The rasd objects describe the settings for each device in a VM.
+ Together, the vs_gs_data, vmsettings and rasds describe the configuration
+ of the virtual machine.
+
+ Creating new resources such as disks and nics involves cloning a default
+ rasd object and appropriately modifying the clone and calling the
+ AddVirtualSystemResources WMI method
+ Changing resources such as memory uses the ModifyVirtualSystemResources
+ WMI method
+
+Using the Python WMI library:
+ Tutorial:
+ http://timgolden.me.uk/python/wmi/tutorial.html
+ Hyper-V WMI objects can be retrieved simply by using the class name
+ of the WMI object and optionally specifying a column to filter the
+ result set. More complex filters can be formed using WQL (sql-like)
+ queries.
+ The parameters and return tuples of WMI method calls can gleaned by
+ examining the doc string. For example:
+ >>> vs_man_svc.ModifyVirtualSystemResources.__doc__
+ ModifyVirtualSystemResources (ComputerSystem, ResourceSettingData[])
+ => (Job, ReturnValue)'
+ When passing setting data (ResourceSettingData) to the WMI method,
+ an XML representation of the data is passed in using GetText_(1).
+ Available methods on a service can be determined using method.keys():
+ >>> vs_man_svc.methods.keys()
+ vmsettings and rasds for a vm can be retrieved using the 'associators'
+ method with the appropriate return class.
+ Long running WMI commands generally return a Job (an instance of
+ Msvm_ConcreteJob) whose state can be polled to determine when it finishes
+
+"""
+
+import os
+import time
+
+from nova import exception
+from nova import flags
+from nova import log as logging
+from nova.auth import manager
+from nova.compute import power_state
+from nova.virt import images
+
+wmi = None
+
+
+FLAGS = flags.FLAGS
+
+
+LOG = logging.getLogger('nova.virt.hyperv')
+
+
+HYPERV_POWER_STATE = {
+ 3: power_state.SHUTDOWN,
+ 2: power_state.RUNNING,
+ 32768: power_state.PAUSED,
+}
+
+
+REQ_POWER_STATE = {
+ 'Enabled': 2,
+ 'Disabled': 3,
+ 'Reboot': 10,
+ 'Reset': 11,
+ 'Paused': 32768,
+ 'Suspended': 32769
+}
+
+
+WMI_JOB_STATUS_STARTED = 4096
+WMI_JOB_STATE_RUNNING = 4
+WMI_JOB_STATE_COMPLETED = 7
+
+
+def get_connection(_):
+ global wmi
+ if wmi is None:
+ wmi = __import__('wmi')
+ return HyperVConnection()
+
+
+class HyperVConnection(object):
+ def __init__(self):
+ self._conn = wmi.WMI(moniker='//./root/virtualization')
+ self._cim_conn = wmi.WMI(moniker='//./root/cimv2')
+
+ def init_host(self):
+ #FIXME(chiradeep): implement this
+ LOG.debug(_('In init host'))
+ pass
+
+ def list_instances(self):
+ """ Return the names of all the instances known to Hyper-V. """
+ vms = [v.ElementName \
+ for v in self._conn.Msvm_ComputerSystem(['ElementName'])]
+ return vms
+
+ def spawn(self, instance):
+ """ Create a new VM and start it."""
+ vm = self._lookup(instance.name)
+ if vm is not None:
+ raise exception.Duplicate(_('Attempt to create duplicate vm %s') %
+ instance.name)
+
+ user = manager.AuthManager().get_user(instance['user_id'])
+ project = manager.AuthManager().get_project(instance['project_id'])
+ #Fetch the file, assume it is a VHD file.
+ base_vhd_filename = os.path.join(FLAGS.instances_path,
+ instance.name)
+ vhdfile = "%s.vhd" % (base_vhd_filename)
+ images.fetch(instance['image_id'], vhdfile, user, project)
+
+ try:
+ self._create_vm(instance)
+
+ self._create_disk(instance['name'], vhdfile)
+ self._create_nic(instance['name'], instance['mac_address'])
+
+ LOG.debug(_('Starting VM %s '), instance.name)
+ self._set_vm_state(instance['name'], 'Enabled')
+ LOG.info(_('Started VM %s '), instance.name)
+ except Exception as exn:
+ LOG.exception(_('spawn vm failed: %s'), exn)
+ self.destroy(instance)
+
+ def _create_vm(self, instance):
+ """Create a VM but don't start it. """
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+
+ vs_gs_data = self._conn.Msvm_VirtualSystemGlobalSettingData.new()
+ vs_gs_data.ElementName = instance['name']
+ (job, ret_val) = vs_man_svc.DefineVirtualSystem(
+ [], None, vs_gs_data.GetText_(1))[1:]
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ else:
+ success = (ret_val == 0)
+
+ if not success:
+ raise Exception(_('Failed to create VM %s'), instance.name)
+
+ LOG.debug(_('Created VM %s...'), instance.name)
+ vm = self._conn.Msvm_ComputerSystem(ElementName=instance.name)[0]
+
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ vmsetting = [s for s in vmsettings
+ if s.SettingType == 3][0] # avoid snapshots
+ memsetting = vmsetting.associators(
+ wmi_result_class='Msvm_MemorySettingData')[0]
+ #No Dynamic Memory, so reservation, limit and quantity are identical.
+ mem = long(str(instance['memory_mb']))
+ memsetting.VirtualQuantity = mem
+ memsetting.Reservation = mem
+ memsetting.Limit = mem
+
+ (job, ret_val) = vs_man_svc.ModifyVirtualSystemResources(
+ vm.path_(), [memsetting.GetText_(1)])
+ LOG.debug(_('Set memory for vm %s...'), instance.name)
+ procsetting = vmsetting.associators(
+ wmi_result_class='Msvm_ProcessorSettingData')[0]
+ vcpus = long(instance['vcpus'])
+ procsetting.VirtualQuantity = vcpus
+ procsetting.Reservation = vcpus
+ procsetting.Limit = vcpus
+
+ (job, ret_val) = vs_man_svc.ModifyVirtualSystemResources(
+ vm.path_(), [procsetting.GetText_(1)])
+ LOG.debug(_('Set vcpus for vm %s...'), instance.name)
+
+ def _create_disk(self, vm_name, vhdfile):
+ """Create a disk and attach it to the vm"""
+ LOG.debug(_('Creating disk for %s by attaching disk file %s'),
+ vm_name, vhdfile)
+ #Find the IDE controller for the vm.
+ vms = self._conn.MSVM_ComputerSystem(ElementName=vm_name)
+ vm = vms[0]
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ rasds = vmsettings[0].associators(
+ wmi_result_class='MSVM_ResourceAllocationSettingData')
+ ctrller = [r for r in rasds
+ if r.ResourceSubType == 'Microsoft Emulated IDE Controller'\
+ and r.Address == "0"]
+ #Find the default disk drive object for the vm and clone it.
+ diskdflt = self._conn.query(
+ "SELECT * FROM Msvm_ResourceAllocationSettingData \
+ WHERE ResourceSubType LIKE 'Microsoft Synthetic Disk Drive'\
+ AND InstanceID LIKE '%Default%'")[0]
+ diskdrive = self._clone_wmi_obj(
+ 'Msvm_ResourceAllocationSettingData', diskdflt)
+ #Set the IDE ctrller as parent.
+ diskdrive.Parent = ctrller[0].path_()
+ diskdrive.Address = 0
+ #Add the cloned disk drive object to the vm.
+ new_resources = self._add_virt_resource(diskdrive, vm)
+ if new_resources is None:
+ raise Exception(_('Failed to add diskdrive to VM %s'),
+ vm_name)
+ diskdrive_path = new_resources[0]
+ LOG.debug(_('New disk drive path is %s'), diskdrive_path)
+ #Find the default VHD disk object.
+ vhddefault = self._conn.query(
+ "SELECT * FROM Msvm_ResourceAllocationSettingData \
+ WHERE ResourceSubType LIKE 'Microsoft Virtual Hard Disk' AND \
+ InstanceID LIKE '%Default%' ")[0]
+
+ #Clone the default and point it to the image file.
+ vhddisk = self._clone_wmi_obj(
+ 'Msvm_ResourceAllocationSettingData', vhddefault)
+ #Set the new drive as the parent.
+ vhddisk.Parent = diskdrive_path
+ vhddisk.Connection = [vhdfile]
+
+ #Add the new vhd object as a virtual hard disk to the vm.
+ new_resources = self._add_virt_resource(vhddisk, vm)
+ if new_resources is None:
+ raise Exception(_('Failed to add vhd file to VM %s'),
+ vm_name)
+ LOG.info(_('Created disk for %s'), vm_name)
+
+ def _create_nic(self, vm_name, mac):
+ """Create a (emulated) nic and attach it to the vm"""
+ LOG.debug(_('Creating nic for %s '), vm_name)
+ #Find the vswitch that is connected to the physical nic.
+ vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name)
+ extswitch = self._find_external_network()
+ vm = vms[0]
+ switch_svc = self._conn.Msvm_VirtualSwitchManagementService()[0]
+ #Find the default nic and clone it to create a new nic for the vm.
+ #Use Msvm_SyntheticEthernetPortSettingData for Windows or Linux with
+ #Linux Integration Components installed.
+ emulatednics_data = self._conn.Msvm_EmulatedEthernetPortSettingData()
+ default_nic_data = [n for n in emulatednics_data
+ if n.InstanceID.rfind('Default') > 0]
+ new_nic_data = self._clone_wmi_obj(
+ 'Msvm_EmulatedEthernetPortSettingData',
+ default_nic_data[0])
+ #Create a port on the vswitch.
+ (new_port, ret_val) = switch_svc.CreateSwitchPort(vm_name, vm_name,
+ "", extswitch.path_())
+ if ret_val != 0:
+ LOG.error(_('Failed creating a port on the external vswitch'))
+ raise Exception(_('Failed creating port for %s'),
+ vm_name)
+ LOG.debug(_("Created switch port %s on switch %s"),
+ vm_name, extswitch.path_())
+ #Connect the new nic to the new port.
+ new_nic_data.Connection = [new_port]
+ new_nic_data.ElementName = vm_name + ' nic'
+ new_nic_data.Address = ''.join(mac.split(':'))
+ new_nic_data.StaticMacAddress = 'TRUE'
+ #Add the new nic to the vm.
+ new_resources = self._add_virt_resource(new_nic_data, vm)
+ if new_resources is None:
+ raise Exception(_('Failed to add nic to VM %s'),
+ vm_name)
+ LOG.info(_("Created nic for %s "), vm_name)
+
+ def _add_virt_resource(self, res_setting_data, target_vm):
+ """Add a new resource (disk/nic) to the VM"""
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+ (job, new_resources, ret_val) = vs_man_svc.\
+ AddVirtualSystemResources([res_setting_data.GetText_(1)],
+ target_vm.path_())
+ success = True
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ else:
+ success = (ret_val == 0)
+ if success:
+ return new_resources
+ else:
+ return None
+
+ #TODO: use the reactor to poll instead of sleep
+ def _check_job_status(self, jobpath):
+ """Poll WMI job state for completion"""
+ #Jobs have a path of the form:
+ #\\WIN-P5IG7367DAG\root\virtualization:Msvm_ConcreteJob.InstanceID=
+ #"8A496B9C-AF4D-4E98-BD3C-1128CD85320D"
+ inst_id = jobpath.split('=')[1].strip('"')
+ jobs = self._conn.Msvm_ConcreteJob(InstanceID=inst_id)
+ if len(jobs) == 0:
+ return False
+ job = jobs[0]
+ while job.JobState == WMI_JOB_STATE_RUNNING:
+ time.sleep(0.1)
+ job = self._conn.Msvm_ConcreteJob(InstanceID=inst_id)[0]
+ if job.JobState != WMI_JOB_STATE_COMPLETED:
+ LOG.debug(_("WMI job failed: %s"), job.ErrorSummaryDescription)
+ return False
+ LOG.debug(_("WMI job succeeded: %s, Elapsed=%s "), job.Description,
+ job.ElapsedTime)
+ return True
+
+ def _find_external_network(self):
+ """Find the vswitch that is connected to the physical nic.
+ Assumes only one physical nic on the host
+ """
+ #If there are no physical nics connected to networks, return.
+ bound = self._conn.Msvm_ExternalEthernetPort(IsBound='TRUE')
+ if len(bound) == 0:
+ return None
+ return self._conn.Msvm_ExternalEthernetPort(IsBound='TRUE')[0]\
+ .associators(wmi_result_class='Msvm_SwitchLANEndpoint')[0]\
+ .associators(wmi_result_class='Msvm_SwitchPort')[0]\
+ .associators(wmi_result_class='Msvm_VirtualSwitch')[0]
+
+ def _clone_wmi_obj(self, wmi_class, wmi_obj):
+ """Clone a WMI object"""
+ cl = self._conn.__getattr__(wmi_class) # get the class
+ newinst = cl.new()
+ #Copy the properties from the original.
+ for prop in wmi_obj._properties:
+ newinst.Properties_.Item(prop).Value =\
+ wmi_obj.Properties_.Item(prop).Value
+ return newinst
+
+ def reboot(self, instance):
+ """Reboot the specified instance."""
+ vm = self._lookup(instance.name)
+ if vm is None:
+ raise exception.NotFound('instance not present %s' % instance.name)
+ self._set_vm_state(instance.name, 'Reboot')
+
+ def destroy(self, instance):
+ """Destroy the VM. Also destroy the associated VHD disk files"""
+ LOG.debug(_("Got request to destroy vm %s"), instance.name)
+ vm = self._lookup(instance.name)
+ if vm is None:
+ return
+ vm = self._conn.Msvm_ComputerSystem(ElementName=instance.name)[0]
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+ #Stop the VM first.
+ self._set_vm_state(instance.name, 'Disabled')
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ rasds = vmsettings[0].associators(
+ wmi_result_class='MSVM_ResourceAllocationSettingData')
+ disks = [r for r in rasds \
+ if r.ResourceSubType == 'Microsoft Virtual Hard Disk']
+ diskfiles = []
+ #Collect disk file information before destroying the VM.
+ for disk in disks:
+ diskfiles.extend([c for c in disk.Connection])
+ #Nuke the VM. Does not destroy disks.
+ (job, ret_val) = vs_man_svc.DestroyVirtualSystem(vm.path_())
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ elif ret_val == 0:
+ success = True
+ if not success:
+ raise Exception(_('Failed to destroy vm %s') % instance.name)
+ #Delete associated vhd disk files.
+ for disk in diskfiles:
+ vhdfile = self._cim_conn.CIM_DataFile(Name=disk)
+ for vf in vhdfile:
+ vf.Delete()
+ LOG.debug(_("Del: disk %s vm %s"), vhdfile, instance.name)
+
+ def get_info(self, instance_id):
+ """Get information about the VM"""
+ vm = self._lookup(instance_id)
+ if vm is None:
+ raise exception.NotFound('instance not present %s' % instance_id)
+ vm = self._conn.Msvm_ComputerSystem(ElementName=instance_id)[0]
+ vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
+ vmsettings = vm.associators(
+ wmi_result_class='Msvm_VirtualSystemSettingData')
+ settings_paths = [v.path_() for v in vmsettings]
+ #See http://msdn.microsoft.com/en-us/library/cc160706%28VS.85%29.aspx
+ summary_info = vs_man_svc.GetSummaryInformation(
+ [4, 100, 103, 105], settings_paths)[1]
+ info = summary_info[0]
+ LOG.debug(_("Got Info for vm %s: state=%s, mem=%s, num_cpu=%s, \
+ cpu_time=%s"), instance_id,
+ str(HYPERV_POWER_STATE[info.EnabledState]),
+ str(info.MemoryUsage),
+ str(info.NumberOfProcessors),
+ str(info.UpTime))
+
+ return {'state': HYPERV_POWER_STATE[info.EnabledState],
+ 'max_mem': info.MemoryUsage,
+ 'mem': info.MemoryUsage,
+ 'num_cpu': info.NumberOfProcessors,
+ 'cpu_time': info.UpTime}
+
+ def _lookup(self, i):
+ vms = self._conn.Msvm_ComputerSystem(ElementName=i)
+ n = len(vms)
+ if n == 0:
+ return None
+ elif n > 1:
+ raise Exception(_('duplicate name found: %s') % i)
+ else:
+ return vms[0].ElementName
+
+ def _set_vm_state(self, vm_name, req_state):
+ """Set the desired state of the VM"""
+ vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name)
+ if len(vms) == 0:
+ return False
+ (job, ret_val) = vms[0].RequestStateChange(REQ_POWER_STATE[req_state])
+ success = False
+ if ret_val == WMI_JOB_STATUS_STARTED:
+ success = self._check_job_status(job)
+ elif ret_val == 0:
+ success = True
+ elif ret_val == 32775:
+ #Invalid state for current operation. Typically means it is
+ #already in the state requested
+ success = True
+ if success:
+ LOG.info(_("Successfully changed vm state of %s to %s"), vm_name,
+ req_state)
+ else:
+ LOG.error(_("Failed to change vm state of %s to %s"), vm_name,
+ req_state)
+ raise Exception(_("Failed to change vm state of %s to %s"),
+ vm_name, req_state)
+
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ vm = self._lookup(instance_name)
+ if vm is None:
+ raise exception.NotFound('Cannot attach volume to missing %s vm' %
+ instance_name)
+
+ def detach_volume(self, instance_name, mountpoint):
+ vm = self._lookup(instance_name)
+ if vm is None:
+ raise exception.NotFound('Cannot detach volume from missing %s ' %
+ instance_name)
diff --git a/nova/virt/images.py b/nova/virt/images.py
index 1c9b2e093..ecf0e5efb 100644
--- a/nova/virt/images.py
+++ b/nova/virt/images.py
@@ -22,10 +22,14 @@ Handling of VM disk images.
"""
import os.path
+import shutil
+import sys
import time
+import urllib2
import urlparse
from nova import flags
+from nova import log as logging
from nova import utils
from nova.auth import manager
from nova.auth import signer
@@ -36,6 +40,8 @@ FLAGS = flags.FLAGS
flags.DEFINE_bool('use_s3', True,
'whether to get images from s3 or use local copy')
+LOG = logging.getLogger('nova.virt.images')
+
def fetch(image, path, user, project):
if FLAGS.use_s3:
@@ -45,6 +51,25 @@ def fetch(image, path, user, project):
return f(image, path, user, project)
+def _fetch_image_no_curl(url, path, headers):
+ request = urllib2.Request(url)
+ for (k, v) in headers.iteritems():
+ request.add_header(k, v)
+
+ def urlretrieve(urlfile, fpath):
+ chunk = 1 * 1024 * 1024
+ f = open(fpath, "wb")
+ while 1:
+ data = urlfile.read(chunk)
+ if not data:
+ break
+ f.write(data)
+
+ urlopened = urllib2.urlopen(request)
+ urlretrieve(urlopened, path)
+ LOG.debug(_("Finished retreving %s -- placed in %s"), url, path)
+
+
def _fetch_s3_image(image, path, user, project):
url = image_url(image)
@@ -61,18 +86,24 @@ def _fetch_s3_image(image, path, user, project):
url_path)
headers['Authorization'] = 'AWS %s:%s' % (access, signature)
- cmd = ['/usr/bin/curl', '--fail', '--silent', url]
- for (k, v) in headers.iteritems():
- cmd += ['-H', '"%s: %s"' % (k, v)]
+ if sys.platform.startswith('win'):
+ return _fetch_image_no_curl(url, path, headers)
+ else:
+ cmd = ['/usr/bin/curl', '--fail', '--silent', url]
+ for (k, v) in headers.iteritems():
+ cmd += ['-H', '\'%s: %s\'' % (k, v)]
- cmd += ['-o', path]
- cmd_out = ' '.join(cmd)
- return utils.execute(cmd_out)
+ cmd += ['-o', path]
+ cmd_out = ' '.join(cmd)
+ return utils.execute(cmd_out)
def _fetch_local_image(image, path, user, project):
- source = _image_path('%s/image' % image)
- return utils.execute('cp %s %s' % (source, path))
+ source = _image_path(os.path.join(image, 'image'))
+ if sys.platform.startswith('win'):
+ return shutil.copy(source, path)
+ else:
+ return utils.execute('cp %s %s' % (source, path))
def _image_path(path):
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index d83c57741..dc31d8357 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -36,7 +36,6 @@ Supports KVM, QEMU, UML, and XEN.
"""
-import logging
import os
import shutil
import random
@@ -55,6 +54,7 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
+from nova import log as logging
from nova import utils
#from nova.api import context
from nova.auth import manager
@@ -67,6 +67,7 @@ libvirt = None
libxml2 = None
Template = None
+LOG = logging.getLogger('nova.virt.libvirt_conn')
FLAGS = flags.FLAGS
# TODO(vish): These flags should probably go into a shared location
@@ -93,6 +94,9 @@ flags.DEFINE_bool('allow_project_net_traffic',
flags.DEFINE_string('ajaxterm_portrange',
'10000-12000',
'Range of ports that ajaxterm should randomly try to bind')
+flags.DEFINE_string('firewall_driver',
+ 'nova.virt.libvirt_conn.IptablesFirewallDriver',
+ 'Firewall driver (defaults to iptables)')
def get_connection(read_only):
@@ -132,16 +136,24 @@ class LibvirtConnection(object):
self._wrapped_conn = None
self.read_only = read_only
+ self.nwfilter = NWFilterFirewall(self._get_connection)
+
+ if not FLAGS.firewall_driver:
+ self.firewall_driver = self.nwfilter
+ self.nwfilter.handle_security_groups = True
+ else:
+ self.firewall_driver = utils.import_object(FLAGS.firewall_driver)
+
def init_host(self):
- NWFilterFirewall(self._conn).setup_base_nwfilters()
+ pass
- @property
- def _conn(self):
+ def _get_connection(self):
if not self._wrapped_conn or not self._test_connection():
- logging.debug(_('Connecting to libvirt: %s') % self.libvirt_uri)
+ LOG.debug(_('Connecting to libvirt: %s'), self.libvirt_uri)
self._wrapped_conn = self._connect(self.libvirt_uri,
self.read_only)
return self._wrapped_conn
+ _conn = property(_get_connection)
def _test_connection(self):
try:
@@ -150,7 +162,7 @@ class LibvirtConnection(object):
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_SYSTEM_ERROR and \
e.get_error_domain() == libvirt.VIR_FROM_REMOTE:
- logging.debug(_('Connection to libvirt broke'))
+ LOG.debug(_('Connection to libvirt broke'))
return False
raise
@@ -222,8 +234,8 @@ class LibvirtConnection(object):
def _cleanup(self, instance):
target = os.path.join(FLAGS.instances_path, instance['name'])
- logging.info(_('instance %s: deleting instance files %s'),
- instance['name'], target)
+ LOG.info(_('instance %s: deleting instance files %s'),
+ instance['name'], target)
if os.path.exists(target):
shutil.rmtree(target)
@@ -287,10 +299,10 @@ class LibvirtConnection(object):
db.instance_set_state(context.get_admin_context(),
instance['id'], state)
if state == power_state.RUNNING:
- logging.debug(_('instance %s: rebooted'), instance['name'])
+ LOG.debug(_('instance %s: rebooted'), instance['name'])
timer.stop()
except Exception, exn:
- logging.error(_('_wait_for_reboot failed: %s'), exn)
+ LOG.exception(_('_wait_for_reboot failed: %s'), exn)
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
@@ -333,10 +345,10 @@ class LibvirtConnection(object):
state = self.get_info(instance['name'])['state']
db.instance_set_state(None, instance['id'], state)
if state == power_state.RUNNING:
- logging.debug(_('instance %s: rescued'), instance['name'])
+ LOG.debug(_('instance %s: rescued'), instance['name'])
timer.stop()
except Exception, exn:
- logging.error(_('_wait_for_rescue failed: %s'), exn)
+ LOG.exception(_('_wait_for_rescue failed: %s'), exn)
db.instance_set_state(None,
instance['id'],
power_state.SHUTDOWN)
@@ -358,10 +370,13 @@ class LibvirtConnection(object):
instance['id'],
power_state.NOSTATE,
'launching')
- NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance)
+
+ self.nwfilter.setup_basic_filtering(instance)
+ self.firewall_driver.prepare_instance_filter(instance)
self._create_image(instance, xml)
self._conn.createXML(xml, 0)
- logging.debug(_("instance %s: is running"), instance['name'])
+ LOG.debug(_("instance %s: is running"), instance['name'])
+ self.firewall_driver.apply_instance_filter(instance)
timer = utils.LoopingCall(f=None)
@@ -371,11 +386,11 @@ class LibvirtConnection(object):
db.instance_set_state(context.get_admin_context(),
instance['id'], state)
if state == power_state.RUNNING:
- logging.debug(_('instance %s: booted'), instance['name'])
+ LOG.debug(_('instance %s: booted'), instance['name'])
timer.stop()
except:
- logging.exception(_('instance %s: failed to boot'),
- instance['name'])
+ LOG.exception(_('instance %s: failed to boot'),
+ instance['name'])
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
@@ -385,11 +400,11 @@ class LibvirtConnection(object):
return timer.start(interval=0.5, now=True)
def _flush_xen_console(self, virsh_output):
- logging.info('virsh said: %r' % (virsh_output,))
+ LOG.info(_('virsh said: %r'), virsh_output)
virsh_output = virsh_output[0].strip()
if virsh_output.startswith('/dev/'):
- logging.info(_('cool, it\'s a device'))
+ LOG.info(_('cool, it\'s a device'))
out, err = utils.execute("sudo dd if=%s iflag=nonblock" %
virsh_output, check_exit_code=False)
return out
@@ -397,7 +412,7 @@ class LibvirtConnection(object):
return ''
def _append_to_file(self, data, fpath):
- logging.info(_('data: %r, fpath: %r') % (data, fpath))
+ LOG.info(_('data: %r, fpath: %r'), data, fpath)
fp = open(fpath, 'a+')
fp.write(data)
return fpath
@@ -405,7 +420,7 @@ class LibvirtConnection(object):
def _dump_file(self, fpath):
fp = open(fpath, 'r+')
contents = fp.read()
- logging.info('Contents: %r' % (contents,))
+ LOG.info(_('Contents of file %s: %r'), fpath, contents)
return contents
@exception.wrap_exception
@@ -475,7 +490,7 @@ class LibvirtConnection(object):
# TODO(termie): these are blocking calls, it would be great
# if they weren't.
- logging.info(_('instance %s: Creating image'), inst['name'])
+ LOG.info(_('instance %s: Creating image'), inst['name'])
f = open(basepath('libvirt.xml'), 'w')
f.write(libvirt_xml)
f.close()
@@ -531,10 +546,10 @@ class LibvirtConnection(object):
'dns': network_ref['dns']}
if key or net:
if key:
- logging.info(_('instance %s: injecting key into image %s'),
+ LOG.info(_('instance %s: injecting key into image %s'),
inst['name'], inst.image_id)
if net:
- logging.info(_('instance %s: injecting net into image %s'),
+ LOG.info(_('instance %s: injecting net into image %s'),
inst['name'], inst.image_id)
try:
disk.inject_data(basepath('disk-raw'), key, net,
@@ -542,9 +557,9 @@ class LibvirtConnection(object):
execute=execute)
except Exception as e:
# This could be a windows image, or a vmdk format disk
- logging.warn(_('instance %s: ignoring error injecting data'
- ' into image %s (%s)'),
- inst['name'], inst.image_id, e)
+ LOG.warn(_('instance %s: ignoring error injecting data'
+ ' into image %s (%s)'),
+ inst['name'], inst.image_id, e)
if inst['kernel_id']:
if os.path.exists(basepath('disk')):
@@ -570,8 +585,10 @@ class LibvirtConnection(object):
def to_xml(self, instance, rescue=False):
# TODO(termie): cache?
- logging.debug(_('instance %s: starting toXML method'),
- instance['name'])
+ LOG.debug(_('instance %s: starting toXML method'), instance['name'])
+ network = db.project_get_network(context.get_admin_context(),
+ instance['project_id'])
+ LOG.debug(_('instance %s: starting toXML method'), instance['name'])
network = db.network_get_by_instance(context.get_admin_context(),
instance['id'])
# FIXME(vish): stick this in db
@@ -613,7 +630,7 @@ class LibvirtConnection(object):
xml_info['disk'] = xml_info['basepath'] + "/disk"
xml = str(Template(self.libvirt_xml, searchList=[xml_info]))
- logging.debug(_('instance %s: finished toXML method'),
+ LOG.debug(_('instance %s: finished toXML method'),
instance['name'])
return xml
@@ -734,18 +751,55 @@ class LibvirtConnection(object):
domain = self._conn.lookupByName(instance_name)
return domain.interfaceStats(interface)
- def refresh_security_group(self, security_group_id):
- fw = NWFilterFirewall(self._conn)
- fw.ensure_security_group_filter(security_group_id)
+ def refresh_security_group_rules(self, security_group_id):
+ self.firewall_driver.refresh_security_group_rules(security_group_id)
+
+ def refresh_security_group_members(self, security_group_id):
+ self.firewall_driver.refresh_security_group_members(security_group_id)
+
+
+class FirewallDriver(object):
+ def prepare_instance_filter(self, instance):
+ """Prepare filters for the instance.
+
+ At this point, the instance isn't running yet."""
+ raise NotImplementedError()
+
+ def apply_instance_filter(self, instance):
+ """Apply instance filter.
+
+ Once this method returns, the instance should be firewalled
+ appropriately. This method should as far as possible be a
+ no-op. It's vastly preferred to get everything set up in
+ prepare_instance_filter.
+ """
+ raise NotImplementedError()
+
+ def refresh_security_group_rules(self, security_group_id):
+ """Refresh security group rules from data store
+
+ Gets called when a rule has been added to or removed from
+ the security group."""
+ raise NotImplementedError()
+
+ def refresh_security_group_members(self, security_group_id):
+ """Refresh security group members from data store
+ Gets called when an instance gets added to or removed from
+ the security group."""
+ raise NotImplementedError()
-class NWFilterFirewall(object):
+
+class NWFilterFirewall(FirewallDriver):
"""
This class implements a network filtering mechanism versatile
enough for EC2 style Security Group filtering by leveraging
libvirt's nwfilter.
First, all instances get a filter ("nova-base-filter") applied.
+ This filter provides some basic security such as protection against
+ MAC spoofing, IP spoofing, and ARP spoofing.
+
This filter drops all incoming ipv4 and ipv6 connections.
Outgoing connections are never blocked.
@@ -779,38 +833,79 @@ class NWFilterFirewall(object):
(*) This sentence brought to you by the redundancy department of
redundancy.
+
"""
def __init__(self, get_connection):
- self._conn = get_connection
-
- nova_base_filter = '''<filter name='nova-base' chain='root'>
- <uuid>26717364-50cf-42d1-8185-29bf893ab110</uuid>
- <filterref filter='no-mac-spoofing'/>
- <filterref filter='no-ip-spoofing'/>
- <filterref filter='no-arp-spoofing'/>
- <filterref filter='allow-dhcp-server'/>
- <filterref filter='nova-allow-dhcp-server'/>
- <filterref filter='nova-base-ipv4'/>
- <filterref filter='nova-base-ipv6'/>
- </filter>'''
-
- nova_dhcp_filter = '''<filter name='nova-allow-dhcp-server' chain='ipv4'>
- <uuid>891e4787-e5c0-d59b-cbd6-41bc3c6b36fc</uuid>
- <rule action='accept' direction='out'
- priority='100'>
- <udp srcipaddr='0.0.0.0'
- dstipaddr='255.255.255.255'
- srcportstart='68'
- dstportstart='67'/>
- </rule>
- <rule action='accept' direction='in'
- priority='100'>
- <udp srcipaddr='$DHCPSERVER'
- srcportstart='67'
- dstportstart='68'/>
- </rule>
- </filter>'''
+ self._libvirt_get_connection = get_connection
+ self.static_filters_configured = False
+ self.handle_security_groups = False
+
+ def _get_connection(self):
+ return self._libvirt_get_connection()
+ _conn = property(_get_connection)
+
+ def nova_dhcp_filter(self):
+ """The standard allow-dhcp-server filter is an <ip> one, so it uses
+ ebtables to allow traffic through. Without a corresponding rule in
+ iptables, it'll get blocked anyway."""
+
+ return '''<filter name='nova-allow-dhcp-server' chain='ipv4'>
+ <uuid>891e4787-e5c0-d59b-cbd6-41bc3c6b36fc</uuid>
+ <rule action='accept' direction='out'
+ priority='100'>
+ <udp srcipaddr='0.0.0.0'
+ dstipaddr='255.255.255.255'
+ srcportstart='68'
+ dstportstart='67'/>
+ </rule>
+ <rule action='accept' direction='in'
+ priority='100'>
+ <udp srcipaddr='$DHCPSERVER'
+ srcportstart='67'
+ dstportstart='68'/>
+ </rule>
+ </filter>'''
+
+ def setup_basic_filtering(self, instance):
+ """Set up basic filtering (MAC, IP, and ARP spoofing protection)"""
+ logging.info('called setup_basic_filtering in nwfilter')
+
+ if self.handle_security_groups:
+ # No point in setting up a filter set that we'll be overriding
+ # anyway.
+ return
+
+ logging.info('ensuring static filters')
+ self._ensure_static_filters()
+
+ instance_filter_name = self._instance_filter_name(instance)
+ self._define_filter(self._filter_container(instance_filter_name,
+ ['nova-base']))
+
+ def _ensure_static_filters(self):
+ if self.static_filters_configured:
+ return
+
+ self._define_filter(self._filter_container('nova-base',
+ ['no-mac-spoofing',
+ 'no-ip-spoofing',
+ 'no-arp-spoofing',
+ 'allow-dhcp-server']))
+ self._define_filter(self.nova_base_ipv4_filter)
+ self._define_filter(self.nova_base_ipv6_filter)
+ self._define_filter(self.nova_dhcp_filter)
+ self._define_filter(self.nova_vpn_filter)
+ if FLAGS.allow_project_net_traffic:
+ self._define_filter(self.nova_project_filter)
+
+ self.static_filters_configured = True
+
+ def _filter_container(self, name, filters):
+ xml = '''<filter name='%s' chain='root'>%s</filter>''' % (
+ name,
+ ''.join(["<filterref filter='%s'/>" % (f,) for f in filters]))
+ return xml
nova_vpn_filter = '''<filter name='nova-vpn' chain='root'>
<uuid>2086015e-cf03-11df-8c5d-080027c27973</uuid>
@@ -824,7 +919,7 @@ class NWFilterFirewall(object):
retval = "<filter name='nova-base-ipv4' chain='ipv4'>"
for protocol in ['tcp', 'udp', 'icmp']:
for direction, action, priority in [('out', 'accept', 399),
- ('inout', 'drop', 400)]:
+ ('in', 'drop', 400)]:
retval += """<rule action='%s' direction='%s' priority='%d'>
<%s />
</rule>""" % (action, direction,
@@ -836,7 +931,7 @@ class NWFilterFirewall(object):
retval = "<filter name='nova-base-ipv6' chain='ipv6'>"
for protocol in ['tcp', 'udp', 'icmp']:
for direction, action, priority in [('out', 'accept', 399),
- ('inout', 'drop', 400)]:
+ ('in', 'drop', 400)]:
retval += """<rule action='%s' direction='%s' priority='%d'>
<%s-ipv6 />
</rule>""" % (action, direction,
@@ -860,43 +955,49 @@ class NWFilterFirewall(object):
# execute in a native thread and block current greenthread until done
tpool.execute(self._conn.nwfilterDefineXML, xml)
- def setup_base_nwfilters(self):
- self._define_filter(self.nova_base_ipv4_filter)
- self._define_filter(self.nova_base_ipv6_filter)
- self._define_filter(self.nova_dhcp_filter)
- self._define_filter(self.nova_base_filter)
- self._define_filter(self.nova_vpn_filter)
- if FLAGS.allow_project_net_traffic:
- self._define_filter(self.nova_project_filter)
-
- def setup_nwfilters_for_instance(self, instance):
+ def prepare_instance_filter(self, instance):
"""
Creates an NWFilter for the given instance. In the process,
it makes sure the filters for the security groups as well as
the base filter are all in place.
"""
- nwfilter_xml = ("<filter name='nova-instance-%s' "
- "chain='root'>\n") % instance['name']
-
if instance['image_id'] == FLAGS.vpn_image_id:
- nwfilter_xml += " <filterref filter='nova-vpn' />\n"
+ base_filter = 'nova-vpn'
else:
- nwfilter_xml += " <filterref filter='nova-base' />\n"
+ base_filter = 'nova-base'
+
+ instance_filter_name = self._instance_filter_name(instance)
+ instance_secgroup_filter_name = '%s-secgroup' % (instance_filter_name,)
+ instance_filter_children = [base_filter, instance_secgroup_filter_name]
+ instance_secgroup_filter_children = ['nova-base-ipv4',
+ 'nova-base-ipv6',
+ 'nova-allow-dhcp-server']
+
+ ctxt = context.get_admin_context()
if FLAGS.allow_project_net_traffic:
- nwfilter_xml += " <filterref filter='nova-project' />\n"
+ instance_filter_children += ['nova-project']
+
+ for security_group in db.security_group_get_by_instance(ctxt,
+ instance['id']):
+
+ self.refresh_security_group_rules(security_group['id'])
+
+ instance_secgroup_filter_children += [('nova-secgroup-%s' %
+ security_group['id'])]
- for security_group in instance.security_groups:
- self.ensure_security_group_filter(security_group['id'])
+ self._define_filter(
+ self._filter_container(instance_secgroup_filter_name,
+ instance_secgroup_filter_children))
- nwfilter_xml += (" <filterref filter='nova-secgroup-%d' "
- "/>\n") % security_group['id']
- nwfilter_xml += "</filter>"
+ self._define_filter(
+ self._filter_container(instance_filter_name,
+ instance_filter_children))
- self._define_filter(nwfilter_xml)
+ return
- def ensure_security_group_filter(self, security_group_id):
+ def refresh_security_group_rules(self, security_group_id):
return self._define_filter(
self.security_group_to_nwfilter_xml(security_group_id))
@@ -914,9 +1015,9 @@ class NWFilterFirewall(object):
rule_xml += "dstportstart='%s' dstportend='%s' " % \
(rule.from_port, rule.to_port)
elif rule.protocol == 'icmp':
- logging.info('rule.protocol: %r, rule.from_port: %r, '
- 'rule.to_port: %r' %
- (rule.protocol, rule.from_port, rule.to_port))
+ LOG.info('rule.protocol: %r, rule.from_port: %r, '
+ 'rule.to_port: %r', rule.protocol,
+ rule.from_port, rule.to_port)
if rule.from_port != -1:
rule_xml += "type='%s' " % rule.from_port
if rule.to_port != -1:
@@ -927,3 +1028,162 @@ class NWFilterFirewall(object):
xml = "<filter name='nova-secgroup-%s' chain='ipv4'>%s</filter>" % \
(security_group_id, rule_xml,)
return xml
+
+ def _instance_filter_name(self, instance):
+ return 'nova-instance-%s' % instance['name']
+
+
+class IptablesFirewallDriver(FirewallDriver):
+ def __init__(self, execute=None):
+ self.execute = execute or utils.execute
+ self.instances = set()
+
+ def apply_instance_filter(self, instance):
+ """No-op. Everything is done in prepare_instance_filter"""
+ pass
+
+ def remove_instance(self, instance):
+ self.instances.remove(instance)
+
+ def add_instance(self, instance):
+ self.instances.add(instance)
+
+ def prepare_instance_filter(self, instance):
+ self.add_instance(instance)
+ self.apply_ruleset()
+
+ def apply_ruleset(self):
+ current_filter, _ = self.execute('sudo iptables-save -t filter')
+ current_lines = current_filter.split('\n')
+ new_filter = self.modify_rules(current_lines)
+ self.execute('sudo iptables-restore',
+ process_input='\n'.join(new_filter))
+
+ def modify_rules(self, current_lines):
+ ctxt = context.get_admin_context()
+ # Remove any trace of nova rules.
+ new_filter = filter(lambda l: 'nova-' not in l, current_lines)
+
+ seen_chains = False
+ for rules_index in range(len(new_filter)):
+ if not seen_chains:
+ if new_filter[rules_index].startswith(':'):
+ seen_chains = True
+ elif seen_chains == 1:
+ if not new_filter[rules_index].startswith(':'):
+ break
+
+ our_chains = [':nova-ipv4-fallback - [0:0]']
+ our_rules = ['-A nova-ipv4-fallback -j DROP']
+
+ our_chains += [':nova-local - [0:0]']
+ our_rules += ['-A FORWARD -j nova-local']
+
+ security_groups = set()
+ # Add our chains
+ # First, we add instance chains and rules
+ for instance in self.instances:
+ chain_name = self._instance_chain_name(instance)
+ ip_address = self._ip_for_instance(instance)
+
+ our_chains += [':%s - [0:0]' % chain_name]
+
+ # Jump to the per-instance chain
+ our_rules += ['-A nova-local -d %s -j %s' % (ip_address,
+ chain_name)]
+
+ # Always drop invalid packets
+ our_rules += ['-A %s -m state --state '
+ 'INVALID -j DROP' % (chain_name,)]
+
+ # Allow established connections
+ our_rules += ['-A %s -m state --state '
+ 'ESTABLISHED,RELATED -j ACCEPT' % (chain_name,)]
+
+ # Jump to each security group chain in turn
+ for security_group in \
+ db.security_group_get_by_instance(ctxt,
+ instance['id']):
+ security_groups.add(security_group)
+
+ sg_chain_name = self._security_group_chain_name(security_group)
+
+ our_rules += ['-A %s -j %s' % (chain_name, sg_chain_name)]
+
+ # Allow DHCP responses
+ dhcp_server = self._dhcp_server_for_instance(instance)
+ our_rules += ['-A %s -s %s -p udp --sport 67 --dport 68' %
+ (chain_name, dhcp_server)]
+
+ # If nothing matches, jump to the fallback chain
+ our_rules += ['-A %s -j nova-ipv4-fallback' % (chain_name,)]
+
+ # then, security group chains and rules
+ for security_group in security_groups:
+ chain_name = self._security_group_chain_name(security_group)
+ our_chains += [':%s - [0:0]' % chain_name]
+
+ rules = \
+ db.security_group_rule_get_by_security_group(ctxt,
+ security_group['id'])
+
+ for rule in rules:
+ logging.info('%r', rule)
+ args = ['-A', chain_name, '-p', rule.protocol]
+
+ if rule.cidr:
+ args += ['-s', rule.cidr]
+ else:
+ # Eventually, a mechanism to grant access for security
+ # groups will turn up here. It'll use ipsets.
+ continue
+
+ if rule.protocol in ['udp', 'tcp']:
+ if rule.from_port == rule.to_port:
+ args += ['--dport', '%s' % (rule.from_port,)]
+ else:
+ args += ['-m', 'multiport',
+ '--dports', '%s:%s' % (rule.from_port,
+ rule.to_port)]
+ elif rule.protocol == 'icmp':
+ icmp_type = rule.from_port
+ icmp_code = rule.to_port
+
+ if icmp_type == '-1':
+ icmp_type_arg = None
+ else:
+ icmp_type_arg = '%s' % icmp_type
+ if not icmp_code == '-1':
+ icmp_type_arg += '/%s' % icmp_code
+
+ if icmp_type_arg:
+ args += ['-m', 'icmp', '--icmp_type', icmp_type_arg]
+
+ args += ['-j ACCEPT']
+ our_rules += [' '.join(args)]
+
+ new_filter[rules_index:rules_index] = our_rules
+ new_filter[rules_index:rules_index] = our_chains
+ logging.info('new_filter: %s', '\n'.join(new_filter))
+ return new_filter
+
+ def refresh_security_group_members(self, security_group):
+ pass
+
+ def refresh_security_group_rules(self, security_group):
+ self.apply_ruleset()
+
+ def _security_group_chain_name(self, security_group):
+ return 'nova-sg-%s' % (security_group['id'],)
+
+ def _instance_chain_name(self, instance):
+ return 'nova-inst-%s' % (instance['id'],)
+
+ def _ip_for_instance(self, instance):
+ return db.instance_get_fixed_address(context.get_admin_context(),
+ instance['id'])
+
+ def _dhcp_server_for_instance(self, instance):
+ network = db.project_get_network(context.get_admin_context(),
+ instance['project_id'])
+ return network['gateway']
diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py
index aa4026f97..96d8f5fc8 100644
--- a/nova/virt/xenapi/fake.py
+++ b/nova/virt/xenapi/fake.py
@@ -52,12 +52,12 @@ A fake XenAPI SDK.
import datetime
-import logging
import uuid
from pprint import pformat
from nova import exception
+from nova import log as logging
_CLASSES = ['host', 'network', 'session', 'SR', 'VBD',\
@@ -65,9 +65,11 @@ _CLASSES = ['host', 'network', 'session', 'SR', 'VBD',\
_db_content = {}
+LOG = logging.getLogger("nova.virt.xenapi.fake")
+
def log_db_contents(msg=None):
- logging.debug(_("%s: _db_content => %s"), msg or "", pformat(_db_content))
+ LOG.debug(_("%s: _db_content => %s"), msg or "", pformat(_db_content))
def reset():
@@ -242,9 +244,9 @@ class SessionBase(object):
full_params = (self._session,) + params
meth = getattr(self, methodname, None)
if meth is None:
- logging.warn('Raising NotImplemented')
+ LOG.debug(_('Raising NotImplemented'))
raise NotImplementedError(
- 'xenapi.fake does not have an implementation for %s' %
+ _('xenapi.fake does not have an implementation for %s') %
methodname)
return meth(*full_params)
@@ -278,12 +280,12 @@ class SessionBase(object):
if impl is not None:
def callit(*params):
- logging.warn('Calling %s %s', name, impl)
+ LOG.debug(_('Calling %s %s'), name, impl)
self._check_session(params)
return impl(*params)
return callit
if self._is_gettersetter(name, True):
- logging.warn('Calling getter %s', name)
+ LOG.debug(_('Calling getter %s'), name)
return lambda *params: self._getter(name, params)
elif self._is_create(name):
return lambda *params: self._create(name, params)
@@ -333,10 +335,10 @@ class SessionBase(object):
field in _db_content[cls][ref]):
return _db_content[cls][ref][field]
- logging.error('Raising NotImplemented')
+ LOG.debuug(_('Raising NotImplemented'))
raise NotImplementedError(
- 'xenapi.fake does not have an implementation for %s or it has '
- 'been called with the wrong number of arguments' % name)
+ _('xenapi.fake does not have an implementation for %s or it has '
+ 'been called with the wrong number of arguments') % name)
def _setter(self, name, params):
self._check_session(params)
@@ -351,7 +353,7 @@ class SessionBase(object):
field in _db_content[cls][ref]):
_db_content[cls][ref][field] = val
- logging.warn('Raising NotImplemented')
+ LOG.debug(_('Raising NotImplemented'))
raise NotImplementedError(
'xenapi.fake does not have an implementation for %s or it has '
'been called with the wrong number of arguments or the database '
@@ -399,7 +401,7 @@ class SessionBase(object):
self._session not in _db_content['session']):
raise Failure(['HANDLE_INVALID', 'session', self._session])
if len(params) == 0 or params[0] != self._session:
- logging.warn('Raising NotImplemented')
+ LOG.debug(_('Raising NotImplemented'))
raise NotImplementedError('Call to XenAPI without using .xenapi')
def _check_arg_count(self, params, expected):
diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py
index 9d1b51848..1e9448a26 100644
--- a/nova/virt/xenapi/vm_utils.py
+++ b/nova/virt/xenapi/vm_utils.py
@@ -19,7 +19,6 @@ Helper methods for operations related to the management of VM records and
their attributes like VDIs, VIFs, as well as their lookup functions.
"""
-import logging
import pickle
import urllib
from xml.dom import minidom
@@ -27,6 +26,7 @@ from xml.dom import minidom
from eventlet import event
from nova import exception
from nova import flags
+from nova import log as logging
from nova import utils
from nova.auth.manager import AuthManager
from nova.compute import instance_types
@@ -37,6 +37,7 @@ from nova.virt.xenapi.volume_utils import StorageError
FLAGS = flags.FLAGS
+LOG = logging.getLogger("nova.virt.xenapi.vm_utils")
XENAPI_POWER_STATE = {
'Halted': power_state.SHUTDOWN,
@@ -121,9 +122,9 @@ class VMHelper(HelperBase):
rec['HVM_boot_params'] = {'order': 'dc'}
rec['platform'] = {'acpi': 'true', 'apic': 'true',
'pae': 'true', 'viridian': 'true'}
- logging.debug('Created VM %s...', instance.name)
+ LOG.debug(_('Created VM %s...'), instance.name)
vm_ref = session.call_xenapi('VM.create', rec)
- logging.debug(_('Created VM %s as %s.'), instance.name, vm_ref)
+ LOG.debug(_('Created VM %s as %s.'), instance.name, vm_ref)
return vm_ref
@classmethod
@@ -143,10 +144,9 @@ class VMHelper(HelperBase):
vbd_rec['qos_algorithm_type'] = ''
vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = []
- logging.debug(_('Creating VBD for VM %s, VDI %s ... '),
- vm_ref, vdi_ref)
+ LOG.debug(_('Creating VBD for VM %s, VDI %s ... '), vm_ref, vdi_ref)
vbd_ref = session.call_xenapi('VBD.create', vbd_rec)
- logging.debug(_('Created VBD %s for VM %s, VDI %s.'), vbd_ref, vm_ref,
+ LOG.debug(_('Created VBD %s for VM %s, VDI %s.'), vbd_ref, vm_ref,
vdi_ref)
return vbd_ref
@@ -161,7 +161,7 @@ class VMHelper(HelperBase):
if vbd_rec['userdevice'] == str(number):
return vbd
except cls.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
raise StorageError(_('VBD not found in instance %s') % vm_ref)
@classmethod
@@ -170,7 +170,7 @@ class VMHelper(HelperBase):
try:
vbd_ref = session.call_xenapi('VBD.unplug', vbd_ref)
except cls.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
if exc.details[0] != 'DEVICE_ALREADY_DETACHED':
raise StorageError(_('Unable to unplug VBD %s') % vbd_ref)
@@ -183,7 +183,7 @@ class VMHelper(HelperBase):
#with Josh Kearney
session.wait_for_task(0, task)
except cls.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
raise StorageError(_('Unable to destroy VBD %s') % vbd_ref)
@classmethod
@@ -199,11 +199,11 @@ class VMHelper(HelperBase):
vif_rec['other_config'] = {}
vif_rec['qos_algorithm_type'] = ''
vif_rec['qos_algorithm_params'] = {}
- logging.debug(_('Creating VIF for VM %s, network %s.'), vm_ref,
- network_ref)
+ LOG.debug(_('Creating VIF for VM %s, network %s.'), vm_ref,
+ network_ref)
vif_ref = session.call_xenapi('VIF.create', vif_rec)
- logging.debug(_('Created VIF %s for VM %s, network %s.'), vif_ref,
- vm_ref, network_ref)
+ LOG.debug(_('Created VIF %s for VM %s, network %s.'), vif_ref,
+ vm_ref, network_ref)
return vif_ref
@classmethod
@@ -213,8 +213,7 @@ class VMHelper(HelperBase):
"""
#TODO(sirp): Add quiesce and VSS locking support when Windows support
# is added
- logging.debug(_("Snapshotting VM %s with label '%s'..."),
- vm_ref, label)
+ LOG.debug(_("Snapshotting VM %s with label '%s'..."), vm_ref, label)
vm_vdi_ref, vm_vdi_rec = get_vdi_for_vm_safely(session, vm_ref)
vm_vdi_uuid = vm_vdi_rec["uuid"]
@@ -227,8 +226,8 @@ class VMHelper(HelperBase):
template_vdi_rec = get_vdi_for_vm_safely(session, template_vm_ref)[1]
template_vdi_uuid = template_vdi_rec["uuid"]
- logging.debug(_('Created snapshot %s from VM %s.'), template_vm_ref,
- vm_ref)
+ LOG.debug(_('Created snapshot %s from VM %s.'), template_vm_ref,
+ vm_ref)
parent_uuid = wait_for_vhd_coalesce(
session, instance_id, sr_ref, vm_vdi_ref, original_parent_uuid)
@@ -241,8 +240,7 @@ class VMHelper(HelperBase):
""" Requests that the Glance plugin bundle the specified VDIs and
push them into Glance using the specified human-friendly name.
"""
- logging.debug(_("Asking xapi to upload %s as '%s'"),
- vdi_uuids, image_name)
+ LOG.debug(_("Asking xapi to upload %s as '%s'"), vdi_uuids, image_name)
params = {'vdi_uuids': vdi_uuids,
'image_name': image_name,
@@ -260,7 +258,7 @@ class VMHelper(HelperBase):
"""
url = images.image_url(image)
access = AuthManager().get_access_key(user, project)
- logging.debug("Asking xapi to fetch %s as %s", url, access)
+ LOG.debug(_("Asking xapi to fetch %s as %s"), url, access)
fn = (type != ImageType.KERNEL_RAMDISK) and 'get_vdi' or 'get_kernel'
args = {}
args['src_url'] = url
@@ -278,7 +276,7 @@ class VMHelper(HelperBase):
@classmethod
def lookup_image(cls, session, vdi_ref):
- logging.debug("Looking up vdi %s for PV kernel", vdi_ref)
+ LOG.debug(_("Looking up vdi %s for PV kernel"), vdi_ref)
fn = "is_vdi_pv"
args = {}
args['vdi-ref'] = vdi_ref
@@ -289,7 +287,7 @@ class VMHelper(HelperBase):
pv = True
elif pv_str.lower() == 'false':
pv = False
- logging.debug("PV Kernel in VDI:%d", pv)
+ LOG.debug(_("PV Kernel in VDI:%d"), pv)
return pv
@classmethod
@@ -317,10 +315,9 @@ class VMHelper(HelperBase):
vdi = session.get_xenapi().VBD.get_VDI(vbd)
# Test valid VDI
record = session.get_xenapi().VDI.get_record(vdi)
- logging.debug(_('VDI %s is still available'),
- record['uuid'])
+ LOG.debug(_('VDI %s is still available'), record['uuid'])
except cls.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
else:
vdis.append(vdi)
if len(vdis) > 0:
@@ -331,10 +328,10 @@ class VMHelper(HelperBase):
@classmethod
def compile_info(cls, record):
"""Fill record with VM status information"""
- logging.info(_("(VM_UTILS) xenserver vm state -> |%s|"),
- record['power_state'])
- logging.info(_("(VM_UTILS) xenapi power_state -> |%s|"),
- XENAPI_POWER_STATE[record['power_state']])
+ LOG.info(_("(VM_UTILS) xenserver vm state -> |%s|"),
+ record['power_state'])
+ LOG.info(_("(VM_UTILS) xenapi power_state -> |%s|"),
+ XENAPI_POWER_STATE[record['power_state']])
return {'state': XENAPI_POWER_STATE[record['power_state']],
'max_mem': long(record['memory_static_max']) >> 10,
'mem': long(record['memory_dynamic_max']) >> 10,
@@ -388,11 +385,9 @@ def get_vhd_parent(session, vdi_rec):
"""
if 'vhd-parent' in vdi_rec['sm_config']:
parent_uuid = vdi_rec['sm_config']['vhd-parent']
- #NOTE(sirp): changed xenapi -> get_xenapi()
parent_ref = session.get_xenapi().VDI.get_by_uuid(parent_uuid)
parent_rec = session.get_xenapi().VDI.get_record(parent_ref)
- #NOTE(sirp): changed log -> logging
- logging.debug(_("VHD %s has parent %s"), vdi_rec['uuid'], parent_ref)
+ LOG.debug(_("VHD %s has parent %s"), vdi_rec['uuid'], parent_ref)
return parent_ref, parent_rec
else:
return None
@@ -409,7 +404,7 @@ def get_vhd_parent_uuid(session, vdi_ref):
def scan_sr(session, instance_id, sr_ref):
- logging.debug(_("Re-scanning SR %s"), sr_ref)
+ LOG.debug(_("Re-scanning SR %s"), sr_ref)
task = session.call_xenapi('Async.SR.scan', sr_ref)
session.wait_for_task(instance_id, task)
@@ -433,10 +428,9 @@ def wait_for_vhd_coalesce(session, instance_id, sr_ref, vdi_ref,
scan_sr(session, instance_id, sr_ref)
parent_uuid = get_vhd_parent_uuid(session, vdi_ref)
if original_parent_uuid and (parent_uuid != original_parent_uuid):
- logging.debug(
- _("Parent %s doesn't match original parent %s, "
- "waiting for coalesce..."),
- parent_uuid, original_parent_uuid)
+ LOG.debug(_("Parent %s doesn't match original parent %s, "
+ "waiting for coalesce..."), parent_uuid,
+ original_parent_uuid)
else:
done.send(parent_uuid)
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 146b9e6df..7e3585991 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
+# Copyright 2010 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -18,10 +19,11 @@
Management class for VM-related functions (spawn, reboot, etc).
"""
-import logging
+import json
from nova import db
from nova import context
+from nova import log as logging
from nova import exception
from nova import utils
@@ -31,12 +33,14 @@ from nova.virt.xenapi.network_utils import NetworkHelper
from nova.virt.xenapi.vm_utils import VMHelper
from nova.virt.xenapi.vm_utils import ImageType
+XenAPI = None
+LOG = logging.getLogger("nova.virt.xenapi.vmops")
+
class VMOps(object):
"""
Management class for VM-related tasks
"""
-
def __init__(self, session):
self.XenAPI = session.get_imported_xenapi()
self._session = session
@@ -92,10 +96,9 @@ class VMOps(object):
if network_ref:
VMHelper.create_vif(self._session, vm_ref,
network_ref, instance.mac_address)
- logging.debug(_('Starting VM %s...'), vm_ref)
+ LOG.debug(_('Starting VM %s...'), vm_ref)
self._session.call_xenapi('VM.start', vm_ref, False, False)
- logging.info(_('Spawning VM %s created %s.'), instance.name,
- vm_ref)
+ LOG.info(_('Spawning VM %s created %s.'), instance.name, vm_ref)
# NOTE(armando): Do we really need to do this in virt?
timer = utils.LoopingCall(f=None)
@@ -106,12 +109,12 @@ class VMOps(object):
db.instance_set_state(context.get_admin_context(),
instance['id'], state)
if state == power_state.RUNNING:
- logging.debug(_('Instance %s: booted'), instance['name'])
+ LOG.debug(_('Instance %s: booted'), instance['name'])
timer.stop()
except Exception, exc:
- logging.warn(exc)
- logging.exception(_('instance %s: failed to boot'),
- instance['name'])
+ LOG.warn(exc)
+ LOG.exception(_('instance %s: failed to boot'),
+ instance['name'])
db.instance_set_state(context.get_admin_context(),
instance['id'],
power_state.SHUTDOWN)
@@ -120,6 +123,20 @@ class VMOps(object):
timer.f = _wait_for_boot
return timer.start(interval=0.5, now=True)
+ def _get_vm_opaque_ref(self, instance_or_vm):
+ """Refactored out the common code of many methods that receive either
+ a vm name or a vm instance, and want a vm instance in return.
+ """
+ try:
+ instance_name = instance_or_vm.name
+ vm = VMHelper.lookup(self._session, instance_name)
+ except AttributeError:
+ # A vm opaque ref was passed
+ vm = instance_or_vm
+ if vm is None:
+ raise Exception(_('Instance not present %s') % instance_name)
+ return vm
+
def snapshot(self, instance, name):
""" Create snapshot from a running VM instance
@@ -168,11 +185,7 @@ class VMOps(object):
def reboot(self, instance):
"""Reboot VM instance"""
- instance_name = instance.name
- vm = VMHelper.lookup(self._session, instance_name)
- if vm is None:
- raise exception.NotFound(_('instance not'
- ' found %s') % instance_name)
+ vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.clean_reboot', vm)
self._session.wait_for_task(instance.id, task)
@@ -194,7 +207,7 @@ class VMOps(object):
task = self._session.call_xenapi('Async.VM.hard_shutdown', vm)
self._session.wait_for_task(instance.id, task)
except self.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
# Disk clean-up
if vdis:
@@ -203,39 +216,31 @@ class VMOps(object):
task = self._session.call_xenapi('Async.VDI.destroy', vdi)
self._session.wait_for_task(instance.id, task)
except self.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
# VM Destroy
try:
task = self._session.call_xenapi('Async.VM.destroy', vm)
self._session.wait_for_task(instance.id, task)
except self.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
def _wait_with_callback(self, instance_id, task, callback):
ret = None
try:
ret = self._session.wait_for_task(instance_id, task)
- except XenAPI.Failure, exc:
- logging.warn(exc)
+ except self.XenAPI.Failure, exc:
+ LOG.exception(exc)
callback(ret)
def pause(self, instance, callback):
"""Pause VM instance"""
- instance_name = instance.name
- vm = VMHelper.lookup(self._session, instance_name)
- if vm is None:
- raise exception.NotFound(_('Instance not'
- ' found %s') % instance_name)
+ vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.pause', vm)
self._wait_with_callback(instance.id, task, callback)
def unpause(self, instance, callback):
"""Unpause VM instance"""
- instance_name = instance.name
- vm = VMHelper.lookup(self._session, instance_name)
- if vm is None:
- raise exception.NotFound(_('Instance not'
- ' found %s') % instance_name)
+ vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.unpause', vm)
self._wait_with_callback(instance.id, task, callback)
@@ -270,10 +275,7 @@ class VMOps(object):
def get_diagnostics(self, instance):
"""Return data about VM diagnostics"""
- vm = VMHelper.lookup(self._session, instance.name)
- if vm is None:
- raise exception.NotFound(_("Instance not found %s") %
- instance.name)
+ vm = self._get_vm_opaque_ref(instance)
rec = self._session.get_xenapi().VM.get_record(vm)
return VMHelper.compile_diagnostics(self._session, rec)
@@ -286,3 +288,175 @@ class VMOps(object):
"""Return link to instance's ajax console"""
# TODO: implement this!
return 'http://fakeajaxconsole/fake_url'
+
+ def list_from_xenstore(self, vm, path):
+ """Runs the xenstore-ls command to get a listing of all records
+ from 'path' downward. Returns a dict with the sub-paths as keys,
+ and the value stored in those paths as values. If nothing is
+ found at that path, returns None.
+ """
+ ret = self._make_xenstore_call('list_records', vm, path)
+ return json.loads(ret)
+
+ def read_from_xenstore(self, vm, path):
+ """Returns the value stored in the xenstore record for the given VM
+ at the specified location. A XenAPIPlugin.PluginError will be raised
+ if any error is encountered in the read process.
+ """
+ try:
+ ret = self._make_xenstore_call('read_record', vm, path,
+ {'ignore_missing_path': 'True'})
+ except self.XenAPI.Failure, e:
+ return None
+ ret = json.loads(ret)
+ if ret == "None":
+ # Can't marshall None over RPC calls.
+ return None
+ return ret
+
+ def write_to_xenstore(self, vm, path, value):
+ """Writes the passed value to the xenstore record for the given VM
+ at the specified location. A XenAPIPlugin.PluginError will be raised
+ if any error is encountered in the write process.
+ """
+ return self._make_xenstore_call('write_record', vm, path,
+ {'value': json.dumps(value)})
+
+ def clear_xenstore(self, vm, path):
+ """Deletes the VM's xenstore record for the specified path.
+ If there is no such record, the request is ignored.
+ """
+ self._make_xenstore_call('delete_record', vm, path)
+
+ def _make_xenstore_call(self, method, vm, path, addl_args={}):
+ """Handles calls to the xenstore xenapi plugin."""
+ return self._make_plugin_call('xenstore.py', method=method, vm=vm,
+ path=path, addl_args=addl_args)
+
+ def _make_plugin_call(self, plugin, method, vm, path, addl_args={}):
+ """Abstracts out the process of calling a method of a xenapi plugin.
+ Any errors raised by the plugin will in turn raise a RuntimeError here.
+ """
+ vm = self._get_vm_opaque_ref(vm)
+ rec = self._session.get_xenapi().VM.get_record(vm)
+ args = {'dom_id': rec['domid'], 'path': path}
+ args.update(addl_args)
+ # If the 'testing_mode' attribute is set, add that to the args.
+ if getattr(self, 'testing_mode', False):
+ args['testing_mode'] = 'true'
+ try:
+ task = self._session.async_call_plugin(plugin, method, args)
+ ret = self._session.wait_for_task(0, task)
+ except self.XenAPI.Failure, e:
+ raise RuntimeError("%s" % e.details[-1])
+ return ret
+
+ def add_to_xenstore(self, vm, path, key, value):
+ """Adds the passed key/value pair to the xenstore record for
+ the given VM at the specified location. A XenAPIPlugin.PluginError
+ will be raised if any error is encountered in the write process.
+ """
+ current = self.read_from_xenstore(vm, path)
+ if not current:
+ # Nothing at that location
+ current = {key: value}
+ else:
+ current[key] = value
+ self.write_to_xenstore(vm, path, current)
+
+ def remove_from_xenstore(self, vm, path, key_or_keys):
+ """Takes either a single key or a list of keys and removes
+ them from the xenstoreirecord data for the given VM.
+ If the key doesn't exist, the request is ignored.
+ """
+ current = self.list_from_xenstore(vm, path)
+ if not current:
+ return
+ if isinstance(key_or_keys, basestring):
+ keys = [key_or_keys]
+ else:
+ keys = key_or_keys
+ keys.sort(lambda x, y: cmp(y.count('/'), x.count('/')))
+ for key in keys:
+ if path:
+ keypath = "%s/%s" % (path, key)
+ else:
+ keypath = key
+ self._make_xenstore_call('delete_record', vm, keypath)
+
+ ########################################################################
+ ###### The following methods interact with the xenstore parameter
+ ###### record, not the live xenstore. They were created before I
+ ###### knew the difference, and are left in here in case they prove
+ ###### to be useful. They all have '_param' added to their method
+ ###### names to distinguish them. (dabo)
+ ########################################################################
+ def read_partial_from_param_xenstore(self, instance_or_vm, key_prefix):
+ """Returns a dict of all the keys in the xenstore parameter record
+ for the given instance that begin with the key_prefix.
+ """
+ data = self.read_from_param_xenstore(instance_or_vm)
+ badkeys = [k for k in data.keys()
+ if not k.startswith(key_prefix)]
+ for badkey in badkeys:
+ del data[badkey]
+ return data
+
+ def read_from_param_xenstore(self, instance_or_vm, keys=None):
+ """Returns the xenstore parameter record data for the specified VM
+ instance as a dict. Accepts an optional key or list of keys; if a
+ value for 'keys' is passed, the returned dict is filtered to only
+ return the values for those keys.
+ """
+ vm = self._get_vm_opaque_ref(instance_or_vm)
+ data = self._session.call_xenapi_request('VM.get_xenstore_data',
+ (vm, ))
+ ret = {}
+ if keys is None:
+ keys = data.keys()
+ elif isinstance(keys, basestring):
+ keys = [keys]
+ for key in keys:
+ raw = data.get(key)
+ if raw:
+ ret[key] = json.loads(raw)
+ else:
+ ret[key] = raw
+ return ret
+
+ def add_to_param_xenstore(self, instance_or_vm, key, val):
+ """Takes a key/value pair and adds it to the xenstore parameter
+ record for the given vm instance. If the key exists in xenstore,
+ it is overwritten"""
+ vm = self._get_vm_opaque_ref(instance_or_vm)
+ self.remove_from_param_xenstore(instance_or_vm, key)
+ jsonval = json.dumps(val)
+ self._session.call_xenapi_request('VM.add_to_xenstore_data',
+ (vm, key, jsonval))
+
+ def write_to_param_xenstore(self, instance_or_vm, mapping):
+ """Takes a dict and writes each key/value pair to the xenstore
+ parameter record for the given vm instance. Any existing data for
+ those keys is overwritten.
+ """
+ for k, v in mapping.iteritems():
+ self.add_to_param_xenstore(instance_or_vm, k, v)
+
+ def remove_from_param_xenstore(self, instance_or_vm, key_or_keys):
+ """Takes either a single key or a list of keys and removes
+ them from the xenstore parameter record data for the given VM.
+ If the key doesn't exist, the request is ignored.
+ """
+ vm = self._get_vm_opaque_ref(instance_or_vm)
+ if isinstance(key_or_keys, basestring):
+ keys = [key_or_keys]
+ else:
+ keys = key_or_keys
+ for key in keys:
+ self._session.call_xenapi_request('VM.remove_from_xenstore_data',
+ (vm, key))
+
+ def clear_param_xenstore(self, instance_or_vm):
+ """Removes all data from the xenstore parameter record for this VM."""
+ self.write_to_param_xenstore(instance_or_vm, {})
+ ########################################################################
diff --git a/nova/virt/xenapi/volume_utils.py b/nova/virt/xenapi/volume_utils.py
index 1ca813bcf..0cd15b950 100644
--- a/nova/virt/xenapi/volume_utils.py
+++ b/nova/virt/xenapi/volume_utils.py
@@ -21,16 +21,17 @@ and storage repositories
import re
import string
-import logging
from nova import db
from nova import context
from nova import exception
from nova import flags
+from nova import log as logging
from nova import utils
from nova.virt.xenapi import HelperBase
FLAGS = flags.FLAGS
+LOG = logging.getLogger("nova.virt.xenapi.volume_utils")
class StorageError(Exception):
@@ -53,7 +54,7 @@ class VolumeHelper(HelperBase):
"""
sr_ref = session.get_xenapi().SR.get_by_name_label(label)
if len(sr_ref) == 0:
- logging.debug('Introducing %s...', label)
+ LOG.debug(_('Introducing %s...'), label)
record = {}
if 'chapuser' in info and 'chappassword' in info:
record = {'target': info['targetHost'],
@@ -70,10 +71,10 @@ class VolumeHelper(HelperBase):
session.get_xenapi_host(),
record,
'0', label, description, 'iscsi', '', False, {})
- logging.debug('Introduced %s as %s.', label, sr_ref)
+ LOG.debug(_('Introduced %s as %s.'), label, sr_ref)
return sr_ref
except cls.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
raise StorageError(_('Unable to create Storage Repository'))
else:
return sr_ref[0]
@@ -85,32 +86,32 @@ class VolumeHelper(HelperBase):
vdi_ref = session.get_xenapi().VBD.get_VDI(vbd_ref)
sr_ref = session.get_xenapi().VDI.get_SR(vdi_ref)
except cls.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
raise StorageError(_('Unable to find SR from VBD %s') % vbd_ref)
return sr_ref
@classmethod
def destroy_iscsi_storage(cls, session, sr_ref):
"""Forget the SR whilst preserving the state of the disk"""
- logging.debug("Forgetting SR %s ... ", sr_ref)
+ LOG.debug(_("Forgetting SR %s ... "), sr_ref)
pbds = []
try:
pbds = session.get_xenapi().SR.get_PBDs(sr_ref)
except cls.XenAPI.Failure, exc:
- logging.warn('Ignoring exception %s when getting PBDs for %s',
- exc, sr_ref)
+ LOG.warn(_('Ignoring exception %s when getting PBDs for %s'),
+ exc, sr_ref)
for pbd in pbds:
try:
session.get_xenapi().PBD.unplug(pbd)
except cls.XenAPI.Failure, exc:
- logging.warn('Ignoring exception %s when unplugging PBD %s',
- exc, pbd)
+ LOG.warn(_('Ignoring exception %s when unplugging PBD %s'),
+ exc, pbd)
try:
session.get_xenapi().SR.forget(sr_ref)
- logging.debug("Forgetting SR %s done.", sr_ref)
+ LOG.debug(_("Forgetting SR %s done."), sr_ref)
except cls.XenAPI.Failure, exc:
- logging.warn('Ignoring exception %s when forgetting SR %s',
- exc, sr_ref)
+ LOG.warn(_('Ignoring exception %s when forgetting SR %s'), exc,
+ sr_ref)
@classmethod
def introduce_vdi(cls, session, sr_ref):
@@ -118,12 +119,12 @@ class VolumeHelper(HelperBase):
try:
vdis = session.get_xenapi().SR.get_VDIs(sr_ref)
except cls.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
raise StorageError(_('Unable to introduce VDI on SR %s') % sr_ref)
try:
vdi_rec = session.get_xenapi().VDI.get_record(vdis[0])
except cls.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
raise StorageError(_('Unable to get record'
' of VDI %s on') % vdis[0])
else:
@@ -141,7 +142,7 @@ class VolumeHelper(HelperBase):
vdi_rec['xenstore_data'],
vdi_rec['sm_config'])
except cls.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
raise StorageError(_('Unable to introduce VDI for SR %s')
% sr_ref)
@@ -165,11 +166,8 @@ class VolumeHelper(HelperBase):
target_host = _get_target_host(iscsi_portal)
target_port = _get_target_port(iscsi_portal)
target_iqn = _get_iqn(iscsi_name, volume_id)
- logging.debug('(vol_id,number,host,port,iqn): (%s,%s,%s,%s)',
- volume_id,
- target_host,
- target_port,
- target_iqn)
+ LOG.debug('(vol_id,number,host,port,iqn): (%s,%s,%s,%s)',
+ volume_id, target_host, target_port, target_iqn)
if (device_number < 0) or \
(volume_id is None) or \
(target_host is None) or \
@@ -196,19 +194,23 @@ class VolumeHelper(HelperBase):
elif re.match('^[0-9]+$', mountpoint):
return string.atoi(mountpoint, 10)
else:
- logging.warn('Mountpoint cannot be translated: %s', mountpoint)
+ LOG.warn(_('Mountpoint cannot be translated: %s'), mountpoint)
return -1
-def _get_volume_id(path):
+def _get_volume_id(path_or_id):
"""Retrieve the volume id from device_path"""
+ # If we have the ID and not a path, just return it.
+ if isinstance(path_or_id, int):
+ return path_or_id
# n must contain at least the volume_id
# /vol- is for remote volumes
# -vol- is for local volumes
# see compute/manager->setup_compute_volume
- volume_id = path[path.find('/vol-') + 1:]
- if volume_id == path:
- volume_id = path[path.find('-vol-') + 1:].replace('--', '-')
+ volume_id = path_or_id[path_or_id.find('/vol-') + 1:]
+ if volume_id == path_or_id:
+ volume_id = path_or_id[path_or_id.find('-vol-') + 1:]
+ volume_id = volume_id.replace('--', '-')
return volume_id
@@ -253,7 +255,7 @@ def _get_target(volume_id):
"sendtargets -p %s" %
volume_ref['host'])
except exception.ProcessExecutionError, exc:
- logging.warn(exc)
+ LOG.exception(exc)
else:
targets = r.splitlines()
if len(_e) == 0 and len(targets) == 1:
diff --git a/nova/virt/xenapi/volumeops.py b/nova/virt/xenapi/volumeops.py
index fdeb2506c..189f968c6 100644
--- a/nova/virt/xenapi/volumeops.py
+++ b/nova/virt/xenapi/volumeops.py
@@ -17,14 +17,17 @@
"""
Management class for Storage-related functions (attach, detach, etc).
"""
-import logging
from nova import exception
+from nova import log as logging
from nova.virt.xenapi.vm_utils import VMHelper
from nova.virt.xenapi.volume_utils import VolumeHelper
from nova.virt.xenapi.volume_utils import StorageError
+LOG = logging.getLogger("nova.virt.xenapi.volumeops")
+
+
class VolumeOps(object):
"""
Management class for Volume-related tasks
@@ -45,8 +48,8 @@ class VolumeOps(object):
raise exception.NotFound(_('Instance %s not found')
% instance_name)
# NOTE: No Resource Pool concept so far
- logging.debug(_("Attach_volume: %s, %s, %s"),
- instance_name, device_path, mountpoint)
+ LOG.debug(_("Attach_volume: %s, %s, %s"),
+ instance_name, device_path, mountpoint)
# Create the iSCSI SR, and the PDB through which hosts access SRs.
# But first, retrieve target info, like Host, IQN, LUN and SCSIID
vol_rec = VolumeHelper.parse_volume_info(device_path, mountpoint)
@@ -61,7 +64,7 @@ class VolumeOps(object):
try:
vdi_ref = VolumeHelper.introduce_vdi(self._session, sr_ref)
except StorageError, exc:
- logging.warn(exc)
+ LOG.exception(exc)
VolumeHelper.destroy_iscsi_storage(self._session, sr_ref)
raise Exception(_('Unable to create VDI on SR %s for instance %s')
% (sr_ref,
@@ -73,7 +76,7 @@ class VolumeOps(object):
vol_rec['deviceNumber'],
False)
except self.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
VolumeHelper.destroy_iscsi_storage(self._session, sr_ref)
raise Exception(_('Unable to use SR %s for instance %s')
% (sr_ref,
@@ -84,13 +87,13 @@ class VolumeOps(object):
vbd_ref)
self._session.wait_for_task(vol_rec['deviceNumber'], task)
except self.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.exception(exc)
VolumeHelper.destroy_iscsi_storage(self._session,
sr_ref)
raise Exception(_('Unable to attach volume to instance %s')
% instance_name)
- logging.info(_('Mountpoint %s attached to instance %s'),
- mountpoint, instance_name)
+ LOG.info(_('Mountpoint %s attached to instance %s'),
+ mountpoint, instance_name)
def detach_volume(self, instance_name, mountpoint):
"""Detach volume storage to VM instance"""
@@ -100,13 +103,13 @@ class VolumeOps(object):
raise exception.NotFound(_('Instance %s not found')
% instance_name)
# Detach VBD from VM
- logging.debug(_("Detach_volume: %s, %s"), instance_name, mountpoint)
+ LOG.debug(_("Detach_volume: %s, %s"), instance_name, mountpoint)
device_number = VolumeHelper.mountpoint_to_number(mountpoint)
try:
vbd_ref = VMHelper.find_vbd_by_number(self._session,
vm_ref, device_number)
except StorageError, exc:
- logging.warn(exc)
+ LOG.exception(exc)
raise Exception(_('Unable to locate volume %s') % mountpoint)
else:
try:
@@ -114,13 +117,13 @@ class VolumeOps(object):
vbd_ref)
VMHelper.unplug_vbd(self._session, vbd_ref)
except StorageError, exc:
- logging.warn(exc)
+ LOG.exception(exc)
raise Exception(_('Unable to detach volume %s') % mountpoint)
try:
VMHelper.destroy_vbd(self._session, vbd_ref)
except StorageError, exc:
- logging.warn(exc)
+ LOG.exception(exc)
# Forget SR
VolumeHelper.destroy_iscsi_storage(self._session, sr_ref)
- logging.info(_('Mountpoint %s detached from instance %s'),
- mountpoint, instance_name)
+ LOG.info(_('Mountpoint %s detached from instance %s'),
+ mountpoint, instance_name)
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
index e1ad04b15..fe1e16877 100644
--- a/nova/virt/xenapi_conn.py
+++ b/nova/virt/xenapi_conn.py
@@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
+# Copyright 2010 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -19,15 +20,15 @@ A connection to XenServer or Xen Cloud Platform.
The concurrency model for this class is as follows:
-All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
-deferredToThread). They are remote calls, and so may hang for the usual
-reasons. They should not be allowed to block the reactor thread.
+All XenAPI calls are on a green thread (using eventlet's "tpool"
+thread pool). They are remote calls, and so may hang for the usual
+reasons.
All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
-(using XenAPI.VM.async_start etc). These return a task, which can then be
-polled for completion. Polling is handled using reactor.callLater.
+(using XenAPI.VM.async_start etc). These return a task, which can then be
+polled for completion.
-This combination of techniques means that we don't block the reactor thread at
+This combination of techniques means that we don't block the main thread at
all, and at the same time we don't hold lots of threads waiting for
long-running operations.
@@ -50,7 +51,6 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
:iqn_prefix: IQN Prefix, e.g. 'iqn.2010-10.org.openstack'
"""
-import logging
import sys
import xmlrpclib
@@ -61,9 +61,14 @@ from nova import context
from nova import db
from nova import utils
from nova import flags
+from nova import log as logging
from nova.virt.xenapi.vmops import VMOps
from nova.virt.xenapi.volumeops import VolumeOps
+
+LOG = logging.getLogger("nova.virt.xenapi")
+
+
FLAGS = flags.FLAGS
flags.DEFINE_string('xenapi_connection_url',
@@ -81,7 +86,7 @@ flags.DEFINE_string('xenapi_connection_password',
flags.DEFINE_float('xenapi_task_poll_interval',
0.5,
'The interval used for polling of remote tasks '
- '(Async.VM.start, etc). Used only if '
+ '(Async.VM.start, etc). Used only if '
'connection_type=xenapi.')
flags.DEFINE_float('xenapi_vhd_coalesce_poll_interval',
5.0,
@@ -217,6 +222,14 @@ class XenAPISession(object):
f = f.__getattr__(m)
return tpool.execute(f, *args)
+ def call_xenapi_request(self, method, *args):
+ """Some interactions with dom0, such as interacting with xenstore's
+ param record, require using the xenapi_request method of the session
+ object. This wraps that call on a background thread.
+ """
+ f = self._session.xenapi_request
+ return tpool.execute(f, method, *args)
+
def async_call_plugin(self, plugin, fn, args):
"""Call Async.host.call_plugin on a background thread."""
return tpool.execute(self._unwrap_plugin_exceptions,
@@ -226,7 +239,6 @@ class XenAPISession(object):
def wait_for_task(self, id, task):
"""Return the result of the given task. The task is polled
until it completes."""
-
done = event.Event()
loop = utils.LoopingCall(self._poll_task, id, task, done)
loop.start(FLAGS.xenapi_task_poll_interval, now=True)
@@ -239,7 +251,7 @@ class XenAPISession(object):
return self.XenAPI.Session(url)
def _poll_task(self, id, task, done):
- """Poll the given XenAPI task, and fire the given Deferred if we
+ """Poll the given XenAPI task, and fire the given action if we
get a result."""
try:
name = self._session.xenapi.task.get_name_label(task)
@@ -252,7 +264,7 @@ class XenAPISession(object):
return
elif status == "success":
result = self._session.xenapi.task.get_result(task)
- logging.info(_("Task [%s] %s status: success %s") % (
+ LOG.info(_("Task [%s] %s status: success %s") % (
name,
task,
result))
@@ -260,7 +272,7 @@ class XenAPISession(object):
else:
error_info = self._session.xenapi.task.get_error_info(task)
action["error"] = str(error_info)
- logging.warn(_("Task [%s] %s status: %s %s") % (
+ LOG.warn(_("Task [%s] %s status: %s %s") % (
name,
task,
status,
@@ -268,7 +280,7 @@ class XenAPISession(object):
done.send_exception(self.XenAPI.Failure(error_info))
db.instance_action_create(context.get_admin_context(), action)
except self.XenAPI.Failure, exc:
- logging.warn(exc)
+ LOG.warn(exc)
done.send_exception(*sys.exc_info())
def _unwrap_plugin_exceptions(self, func, *args, **kwargs):
@@ -276,7 +288,7 @@ class XenAPISession(object):
try:
return func(*args, **kwargs)
except self.XenAPI.Failure, exc:
- logging.debug(_("Got exception: %s"), exc)
+ LOG.debug(_("Got exception: %s"), exc)
if (len(exc.details) == 4 and
exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and
exc.details[2] == 'Failure'):
@@ -289,12 +301,12 @@ class XenAPISession(object):
else:
raise
except xmlrpclib.ProtocolError, exc:
- logging.debug(_("Got exception: %s"), exc)
+ LOG.debug(_("Got exception: %s"), exc)
raise
def _parse_xmlrpc_value(val):
- """Parse the given value as if it were an XML-RPC value. This is
+ """Parse the given value as if it were an XML-RPC value. This is
sometimes used as the format for the task.result field."""
if not val:
return val