diff options
| author | Salvatore Orlando <salvatore.orlando@eu.citrix.com> | 2010-12-23 21:41:54 +0000 |
|---|---|---|
| committer | Salvatore Orlando <salvatore.orlando@eu.citrix.com> | 2010-12-23 21:41:54 +0000 |
| commit | 2ae831a6b656ea9203b7326e06db4ba9ebcc25d8 (patch) | |
| tree | a563135ecd47a9d25e85a04122ac42ce3bcf622e /nova/virt | |
| parent | 301dd942b533f8efbe55a74def7ae79de3a11f48 (diff) | |
| parent | 75e2cbec9eb5132a49446f1b6d563d5f43d007de (diff) | |
Merged again from trunk
Diffstat (limited to 'nova/virt')
| -rw-r--r-- | nova/virt/connection.py | 2 | ||||
| -rw-r--r-- | nova/virt/fake.py | 9 | ||||
| -rw-r--r-- | nova/virt/libvirt.xml.template | 3 | ||||
| -rw-r--r-- | nova/virt/libvirt_conn.py | 128 | ||||
| -rw-r--r-- | nova/virt/xenapi/__init__.py | 15 | ||||
| -rw-r--r-- | nova/virt/xenapi/fake.py | 388 | ||||
| -rw-r--r-- | nova/virt/xenapi/network_utils.py | 15 | ||||
| -rw-r--r-- | nova/virt/xenapi/vm_utils.py | 105 | ||||
| -rw-r--r-- | nova/virt/xenapi/vmops.py | 104 | ||||
| -rw-r--r-- | nova/virt/xenapi/volume_utils.py | 268 | ||||
| -rw-r--r-- | nova/virt/xenapi/volumeops.py | 101 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 158 |
12 files changed, 1086 insertions, 210 deletions
diff --git a/nova/virt/connection.py b/nova/virt/connection.py index c40bb4bb4..61e99944e 100644 --- a/nova/virt/connection.py +++ b/nova/virt/connection.py @@ -66,6 +66,6 @@ def get_connection(read_only=False): raise Exception('Unknown connection type "%s"' % t) if conn is None: - logging.error('Failed to open connection to the hypervisor') + logging.error(_('Failed to open connection to the hypervisor')) sys.exit(1) return conn diff --git a/nova/virt/fake.py b/nova/virt/fake.py index 55c6dcef9..238acf798 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -76,6 +76,12 @@ class FakeConnection(object): cls._instance = cls() return cls._instance + def init_host(self): + """ + Initialize anything that is necessary for the driver to function + """ + return + def list_instances(self): """ Return the names of all the instances known to the virtualization @@ -175,7 +181,8 @@ class FakeConnection(object): knowledge of the instance """ if instance_name not in self.instances: - raise exception.NotFound("Instance %s Not Found" % instance_name) + raise exception.NotFound(_("Instance %s Not Found") + % instance_name) i = self.instances[instance_name] return {'state': i._state, 'max_mem': 0, diff --git a/nova/virt/libvirt.xml.template b/nova/virt/libvirt.xml.template index 13d087330..3fb2243da 100644 --- a/nova/virt/libvirt.xml.template +++ b/nova/virt/libvirt.xml.template @@ -66,6 +66,9 @@ <filterref filter="nova-instance-${name}"> <parameter name="IP" value="${ip_address}" /> <parameter name="DHCPSERVER" value="${dhcp_server}" /> +#if $getVar('extra_params', False) + ${extra_params} +#end if </filterref> </interface> <serial type="file"> diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index ad101db2a..651b2af93 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -40,6 +40,7 @@ import logging import os import shutil +from eventlet import greenthread from eventlet import event from eventlet import tpool @@ -96,6 +97,11 @@ def get_connection(read_only): return LibvirtConnection(read_only) +def _get_net_and_mask(cidr): + net = IPy.IP(cidr) + return str(net.net()), str(net.netmask()) + + class LibvirtConnection(object): def __init__(self, read_only): @@ -105,10 +111,13 @@ class LibvirtConnection(object): self._wrapped_conn = None self.read_only = read_only + def init_host(self): + NWFilterFirewall(self._conn).setup_base_nwfilters() + @property def _conn(self): if not self._wrapped_conn or not self._test_connection(): - logging.debug('Connecting to libvirt: %s' % self.libvirt_uri) + logging.debug(_('Connecting to libvirt: %s') % self.libvirt_uri) self._wrapped_conn = self._connect(self.libvirt_uri, self.read_only) return self._wrapped_conn @@ -120,7 +129,7 @@ class LibvirtConnection(object): except libvirt.libvirtError as e: if e.get_error_code() == libvirt.VIR_ERR_SYSTEM_ERROR and \ e.get_error_domain() == libvirt.VIR_FROM_REMOTE: - logging.debug('Connection to libvirt broke') + logging.debug(_('Connection to libvirt broke')) return False raise @@ -183,7 +192,8 @@ class LibvirtConnection(object): # everything has been vetted a bit def _wait_for_timer(): timer_done.wait() - self._cleanup(instance) + if cleanup: + self._cleanup(instance) done.send() greenthread.spawn(_wait_for_timer) @@ -191,7 +201,7 @@ class LibvirtConnection(object): def _cleanup(self, instance): target = os.path.join(FLAGS.instances_path, instance['name']) - logging.info('instance %s: deleting instance files %s', + logging.info(_('instance %s: deleting instance files %s'), instance['name'], target) if os.path.exists(target): shutil.rmtree(target) @@ -233,7 +243,7 @@ class LibvirtConnection(object): mount_device = mountpoint.rpartition("/")[2] xml = self._get_disk_xml(virt_dom.XMLDesc(0), mount_device) if not xml: - raise exception.NotFound("No disk at %s" % mount_device) + raise exception.NotFound(_("No disk at %s") % mount_device) virt_dom.detachDevice(xml) @exception.wrap_exception @@ -249,10 +259,10 @@ class LibvirtConnection(object): db.instance_set_state(context.get_admin_context(), instance['id'], state) if state == power_state.RUNNING: - logging.debug('instance %s: rebooted', instance['name']) + logging.debug(_('instance %s: rebooted'), instance['name']) timer.stop() except Exception, exn: - logging.error('_wait_for_reboot failed: %s', exn) + logging.error(_('_wait_for_reboot failed: %s'), exn) db.instance_set_state(context.get_admin_context(), instance['id'], power_state.SHUTDOWN) @@ -287,10 +297,10 @@ class LibvirtConnection(object): state = self.get_info(instance['name'])['state'] db.instance_set_state(None, instance['id'], state) if state == power_state.RUNNING: - logging.debug('instance %s: rescued', instance['name']) + logging.debug(_('instance %s: rescued'), instance['name']) timer.stop() except Exception, exn: - logging.error('_wait_for_rescue failed: %s', exn) + logging.error(_('_wait_for_rescue failed: %s'), exn) db.instance_set_state(None, instance['id'], power_state.SHUTDOWN) @@ -315,7 +325,7 @@ class LibvirtConnection(object): NWFilterFirewall(self._conn).setup_nwfilters_for_instance(instance) self._create_image(instance, xml) self._conn.createXML(xml, 0) - logging.debug("instance %s: is running", instance['name']) + logging.debug(_("instance %s: is running"), instance['name']) timer = utils.LoopingCall(f=None) @@ -325,10 +335,10 @@ class LibvirtConnection(object): db.instance_set_state(context.get_admin_context(), instance['id'], state) if state == power_state.RUNNING: - logging.debug('instance %s: booted', instance['name']) + logging.debug(_('instance %s: booted'), instance['name']) timer.stop() except: - logging.exception('instance %s: failed to boot', + logging.exception(_('instance %s: failed to boot'), instance['name']) db.instance_set_state(context.get_admin_context(), instance['id'], @@ -343,7 +353,7 @@ class LibvirtConnection(object): virsh_output = virsh_output[0].strip() if virsh_output.startswith('/dev/'): - logging.info('cool, it\'s a device') + logging.info(_('cool, it\'s a device')) out, err = utils.execute("sudo dd if=%s iflag=nonblock" % virsh_output, check_exit_code=False) return out @@ -351,7 +361,7 @@ class LibvirtConnection(object): return '' def _append_to_file(self, data, fpath): - logging.info('data: %r, fpath: %r' % (data, fpath)) + logging.info(_('data: %r, fpath: %r') % (data, fpath)) fp = open(fpath, 'a+') fp.write(data) return fpath @@ -393,7 +403,7 @@ class LibvirtConnection(object): # TODO(termie): these are blocking calls, it would be great # if they weren't. - logging.info('instance %s: Creating image', inst['name']) + logging.info(_('instance %s: Creating image'), inst['name']) f = open(basepath('libvirt.xml'), 'w') f.write(libvirt_xml) f.close() @@ -449,10 +459,10 @@ class LibvirtConnection(object): 'dns': network_ref['dns']} if key or net: if key: - logging.info('instance %s: injecting key into image %s', + logging.info(_('instance %s: injecting key into image %s'), inst['name'], inst.image_id) if net: - logging.info('instance %s: injecting net into image %s', + logging.info(_('instance %s: injecting net into image %s'), inst['name'], inst.image_id) try: disk.inject_data(basepath('disk-raw'), key, net, @@ -460,8 +470,8 @@ class LibvirtConnection(object): execute=execute) except Exception as e: # This could be a windows image, or a vmdk format disk - logging.warn('instance %s: ignoring error injecting data' - ' into image %s (%s)', + logging.warn(_('instance %s: ignoring error injecting data' + ' into image %s (%s)'), inst['name'], inst.image_id, e) if inst['kernel_id']: @@ -488,9 +498,10 @@ class LibvirtConnection(object): def to_xml(self, instance, rescue=False): # TODO(termie): cache? - logging.debug('instance %s: starting toXML method', instance['name']) - network = db.project_get_network(context.get_admin_context(), - instance['project_id']) + logging.debug(_('instance %s: starting toXML method'), + instance['name']) + network = db.network_get_by_instance(context.get_admin_context(), + instance['id']) # FIXME(vish): stick this in db instance_type = instance['instance_type'] instance_type = instance_types.INSTANCE_TYPES[instance_type] @@ -498,6 +509,15 @@ class LibvirtConnection(object): instance['id']) # Assume that the gateway also acts as the dhcp server. dhcp_server = network['gateway'] + + if FLAGS.allow_project_net_traffic: + net, mask = _get_net_and_mask(network['cidr']) + extra_params = ("<parameter name=\"PROJNET\" value=\"%s\" />\n" + "<parameter name=\"PROJMASK\" value=\"%s\" />\n" + ) % (net, mask) + else: + extra_params = "\n" + xml_info = {'type': FLAGS.libvirt_type, 'name': instance['name'], 'basepath': os.path.join(FLAGS.instances_path, @@ -508,6 +528,7 @@ class LibvirtConnection(object): 'mac_address': instance['mac_address'], 'ip_address': ip_address, 'dhcp_server': dhcp_server, + 'extra_params': extra_params, 'rescue': rescue} if not rescue: if instance['kernel_id']: @@ -519,7 +540,8 @@ class LibvirtConnection(object): xml_info['disk'] = xml_info['basepath'] + "/disk" xml = str(Template(self.libvirt_xml, searchList=[xml_info])) - logging.debug('instance %s: finished toXML method', instance['name']) + logging.debug(_('instance %s: finished toXML method'), + instance['name']) return xml @@ -527,7 +549,8 @@ class LibvirtConnection(object): try: virt_dom = self._conn.lookupByName(instance_name) except: - raise exception.NotFound("Instance %s not found" % instance_name) + raise exception.NotFound(_("Instance %s not found") + % instance_name) (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() return {'state': state, 'max_mem': max_mem, @@ -713,6 +736,14 @@ class NWFilterFirewall(object): </rule> </filter>''' + nova_vpn_filter = '''<filter name='nova-vpn' chain='root'> + <uuid>2086015e-cf03-11df-8c5d-080027c27973</uuid> + <filterref filter='allow-dhcp-server'/> + <filterref filter='nova-allow-dhcp-server'/> + <filterref filter='nova-base-ipv4'/> + <filterref filter='nova-base-ipv6'/> + </filter>''' + def nova_base_ipv4_filter(self): retval = "<filter name='nova-base-ipv4' chain='ipv4'>" for protocol in ['tcp', 'udp', 'icmp']: @@ -737,12 +768,12 @@ class NWFilterFirewall(object): retval += '</filter>' return retval - def nova_project_filter(self, project, net, mask): - retval = "<filter name='nova-project-%s' chain='ipv4'>" % project + def nova_project_filter(self): + retval = "<filter name='nova-project' chain='ipv4'>" for protocol in ['tcp', 'udp', 'icmp']: retval += """<rule action='accept' direction='in' priority='200'> - <%s srcipaddr='%s' srcipmask='%s' /> - </rule>""" % (protocol, net, mask) + <%s srcipaddr='$PROJNET' srcipmask='$PROJMASK' /> + </rule>""" % protocol retval += '</filter>' return retval @@ -753,10 +784,14 @@ class NWFilterFirewall(object): # execute in a native thread and block current greenthread until done tpool.execute(self._conn.nwfilterDefineXML, xml) - @staticmethod - def _get_net_and_mask(cidr): - net = IPy.IP(cidr) - return str(net.net()), str(net.netmask()) + def setup_base_nwfilters(self): + self._define_filter(self.nova_base_ipv4_filter) + self._define_filter(self.nova_base_ipv6_filter) + self._define_filter(self.nova_dhcp_filter) + self._define_filter(self.nova_base_filter) + self._define_filter(self.nova_vpn_filter) + if FLAGS.allow_project_net_traffic: + self._define_filter(self.nova_project_filter) def setup_nwfilters_for_instance(self, instance): """ @@ -765,31 +800,22 @@ class NWFilterFirewall(object): the base filter are all in place. """ - self._define_filter(self.nova_base_ipv4_filter) - self._define_filter(self.nova_base_ipv6_filter) - self._define_filter(self.nova_dhcp_filter) - self._define_filter(self.nova_base_filter) + nwfilter_xml = ("<filter name='nova-instance-%s' chain='root'>\n" + ) % instance['name'] - nwfilter_xml = "<filter name='nova-instance-%s' chain='root'>\n" \ - " <filterref filter='nova-base' />\n" % \ - instance['name'] + if instance['image_id'] == FLAGS.vpn_image_id: + nwfilter_xml += " <filterref filter='nova-vpn' />\n" + else: + nwfilter_xml += " <filterref filter='nova-base' />\n" if FLAGS.allow_project_net_traffic: - network_ref = db.project_get_network(context.get_admin_context(), - instance['project_id']) - net, mask = self._get_net_and_mask(network_ref['cidr']) - project_filter = self.nova_project_filter(instance['project_id'], - net, mask) - self._define_filter(project_filter) - - nwfilter_xml += " <filterref filter='nova-project-%s' />\n" % \ - instance['project_id'] + nwfilter_xml += " <filterref filter='nova-project' />\n" for security_group in instance.security_groups: self.ensure_security_group_filter(security_group['id']) - nwfilter_xml += " <filterref filter='nova-secgroup-%d' />\n" % \ - security_group['id'] + nwfilter_xml += (" <filterref filter='nova-secgroup-%d' />\n" + ) % security_group['id'] nwfilter_xml += "</filter>" self._define_filter(nwfilter_xml) @@ -805,7 +831,7 @@ class NWFilterFirewall(object): for rule in security_group.rules: rule_xml += "<rule action='accept' direction='in' priority='300'>" if rule.cidr: - net, mask = self._get_net_and_mask(rule.cidr) + net, mask = _get_net_and_mask(rule.cidr) rule_xml += "<%s srcipaddr='%s' srcipmask='%s' " % \ (rule.protocol, net, mask) if rule.protocol in ['tcp', 'udp']: diff --git a/nova/virt/xenapi/__init__.py b/nova/virt/xenapi/__init__.py index 3d598c463..c75162f08 100644 --- a/nova/virt/xenapi/__init__.py +++ b/nova/virt/xenapi/__init__.py @@ -13,3 +13,18 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +""" +:mod:`xenapi` -- Nova support for XenServer and XCP through XenAPI +================================================================== +""" + + +class HelperBase(object): + """ + The base for helper classes. This adds the XenAPI class attribute + """ + XenAPI = None + + def __init__(self): + return diff --git a/nova/virt/xenapi/fake.py b/nova/virt/xenapi/fake.py new file mode 100644 index 000000000..7a6c9ee71 --- /dev/null +++ b/nova/virt/xenapi/fake.py @@ -0,0 +1,388 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +#============================================================================ +# +# Parts of this file are based upon xmlrpclib.py, the XML-RPC client +# interface included in the Python distribution. +# +# Copyright (c) 1999-2002 by Secret Labs AB +# Copyright (c) 1999-2002 by Fredrik Lundh +# +# By obtaining, using, and/or copying this software and/or its +# associated documentation, you agree that you have read, understood, +# and will comply with the following terms and conditions: +# +# Permission to use, copy, modify, and distribute this software and +# its associated documentation for any purpose and without fee is +# hereby granted, provided that the above copyright notice appears in +# all copies, and that both that copyright notice and this permission +# notice appear in supporting documentation, and that the name of +# Secret Labs AB or the author not be used in advertising or publicity +# pertaining to distribution of the software without specific, written +# prior permission. +# +# SECRET LABS AB AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD +# TO THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANT- +# ABILITY AND FITNESS. IN NO EVENT SHALL SECRET LABS AB OR THE AUTHOR +# BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY +# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE +# OF THIS SOFTWARE. +# -------------------------------------------------------------------- + + +""" +A fake XenAPI SDK. +""" + + +import datetime +import logging +import uuid + +from nova import exception + + +_CLASSES = ['host', 'network', 'session', 'SR', 'VBD',\ + 'PBD', 'VDI', 'VIF', 'VM', 'task'] + +_db_content = {} + + +def reset(): + for c in _CLASSES: + _db_content[c] = {} + create_host('fake') + + +def create_host(name_label): + return _create_object('host', { + 'name_label': name_label, + }) + + +def create_network(name_label, bridge): + return _create_object('network', { + 'name_label': name_label, + 'bridge': bridge, + }) + + +def create_vm(name_label, status, + is_a_template=False, is_control_domain=False): + return _create_object('VM', { + 'name_label': name_label, + 'power-state': status, + 'is_a_template': is_a_template, + 'is_control_domain': is_control_domain, + }) + + +def create_vdi(name_label, read_only, sr_ref, sharable): + return _create_object('VDI', { + 'name_label': name_label, + 'read_only': read_only, + 'SR': sr_ref, + 'type': '', + 'name_description': '', + 'sharable': sharable, + 'other_config': {}, + 'location': '', + 'xenstore_data': '', + 'sm_config': {}, + 'VBDs': {}, + }) + + +def create_pbd(config, sr_ref, attached): + return _create_object('PBD', { + 'device-config': config, + 'SR': sr_ref, + 'currently-attached': attached, + }) + + +def create_task(name_label): + return _create_object('task', { + 'name_label': name_label, + 'status': 'pending', + }) + + +def _create_object(table, obj): + ref = str(uuid.uuid4()) + obj['uuid'] = str(uuid.uuid4()) + _db_content[table][ref] = obj + return ref + + +def _create_sr(table, obj): + sr_type = obj[6] + # Forces fake to support iscsi only + if sr_type != 'iscsi': + raise Failure(['SR_UNKNOWN_DRIVER', sr_type]) + sr_ref = _create_object(table, obj[2]) + vdi_ref = create_vdi('', False, sr_ref, False) + pbd_ref = create_pbd('', sr_ref, True) + _db_content['SR'][sr_ref]['VDIs'] = [vdi_ref] + _db_content['SR'][sr_ref]['PBDs'] = [pbd_ref] + _db_content['VDI'][vdi_ref]['SR'] = sr_ref + _db_content['PBD'][pbd_ref]['SR'] = sr_ref + return sr_ref + + +def get_all(table): + return _db_content[table].keys() + + +def get_all_records(table): + return _db_content[table] + + +def get_record(table, ref): + if ref in _db_content[table]: + return _db_content[table].get(ref) + else: + raise Failure(['HANDLE_INVALID', table, ref]) + + +def check_for_session_leaks(): + if len(_db_content['session']) > 0: + raise exception.Error('Sessions have leaked: %s' % + _db_content['session']) + + +class Failure(Exception): + def __init__(self, details): + self.details = details + + def __str__(self): + try: + return str(self.details) + except Exception, exc: + return "XenAPI Fake Failure: %s" % str(self.details) + + def _details_map(self): + return dict([(str(i), self.details[i]) + for i in range(len(self.details))]) + + +class SessionBase(object): + """ + Base class for Fake Sessions + """ + + def __init__(self, uri): + self._session = None + + def xenapi_request(self, methodname, params): + if methodname.startswith('login'): + self._login(methodname, params) + return None + elif methodname == 'logout' or methodname == 'session.logout': + self._logout() + return None + else: + full_params = (self._session,) + params + meth = getattr(self, methodname, None) + if meth is None: + logging.warn('Raising NotImplemented') + raise NotImplementedError( + 'xenapi.fake does not have an implementation for %s' % + methodname) + return meth(*full_params) + + def _login(self, method, params): + self._session = str(uuid.uuid4()) + _db_content['session'][self._session] = { + 'uuid': str(uuid.uuid4()), + 'this_host': _db_content['host'].keys()[0], + } + + def _logout(self): + s = self._session + self._session = None + if s not in _db_content['session']: + raise exception.Error( + "Logging out a session that is invalid or already logged " + "out: %s" % s) + del _db_content['session'][s] + + def __getattr__(self, name): + if name == 'handle': + return self._session + elif name == 'xenapi': + return _Dispatcher(self.xenapi_request, None) + elif name.startswith('login') or name.startswith('slave_local'): + return lambda *params: self._login(name, params) + elif name.startswith('Async'): + return lambda *params: self._async(name, params) + elif '.' in name: + impl = getattr(self, name.replace('.', '_')) + if impl is not None: + def callit(*params): + logging.warn('Calling %s %s', name, impl) + self._check_session(params) + return impl(*params) + return callit + if self._is_gettersetter(name, True): + logging.warn('Calling getter %s', name) + return lambda *params: self._getter(name, params) + elif self._is_create(name): + return lambda *params: self._create(name, params) + else: + return None + + def _is_gettersetter(self, name, getter): + bits = name.split('.') + return (len(bits) == 2 and + bits[0] in _CLASSES and + bits[1].startswith(getter and 'get_' or 'set_')) + + def _is_create(self, name): + bits = name.split('.') + return (len(bits) == 2 and + bits[0] in _CLASSES and + bits[1] == 'create') + + def _getter(self, name, params): + self._check_session(params) + (cls, func) = name.split('.') + + if func == 'get_all': + self._check_arg_count(params, 1) + return get_all(cls) + + if func == 'get_all_records': + self._check_arg_count(params, 1) + return get_all_records(cls) + + if func == 'get_record': + self._check_arg_count(params, 2) + return get_record(cls, params[1]) + + if (func == 'get_by_name_label' or + func == 'get_by_uuid'): + self._check_arg_count(params, 2) + return self._get_by_field( + _db_content[cls], func[len('get_by_'):], params[1]) + + if len(params) == 2: + field = func[len('get_'):] + ref = params[1] + + if (ref in _db_content[cls] and + field in _db_content[cls][ref]): + return _db_content[cls][ref][field] + + logging.error('Raising NotImplemented') + raise NotImplementedError( + 'xenapi.fake does not have an implementation for %s or it has ' + 'been called with the wrong number of arguments' % name) + + def _setter(self, name, params): + self._check_session(params) + (cls, func) = name.split('.') + + if len(params) == 3: + field = func[len('set_'):] + ref = params[1] + val = params[2] + + if (ref in _db_content[cls] and + field in _db_content[cls][ref]): + _db_content[cls][ref][field] = val + + logging.warn('Raising NotImplemented') + raise NotImplementedError( + 'xenapi.fake does not have an implementation for %s or it has ' + 'been called with the wrong number of arguments or the database ' + 'is missing that field' % name) + + def _create(self, name, params): + self._check_session(params) + is_sr_create = name == 'SR.create' + # Storage Repositories have a different API + expected = is_sr_create and 10 or 2 + self._check_arg_count(params, expected) + (cls, _) = name.split('.') + ref = is_sr_create and \ + _create_sr(cls, params) or _create_object(cls, params[1]) + obj = get_record(cls, ref) + + # Add RO fields + if cls == 'VM': + obj['power_state'] = 'Halted' + + return ref + + def _async(self, name, params): + task_ref = create_task(name) + task = _db_content['task'][task_ref] + func = name[len('Async.'):] + try: + task['result'] = self.xenapi_request(func, params[1:]) + task['status'] = 'success' + except Failure, exc: + task['error_info'] = exc.details + task['status'] = 'failed' + task['finished'] = datetime.datetime.now() + return task_ref + + def _check_session(self, params): + if (self._session is None or + self._session not in _db_content['session']): + raise Failure(['HANDLE_INVALID', 'session', self._session]) + if len(params) == 0 or params[0] != self._session: + logging.warn('Raising NotImplemented') + raise NotImplementedError('Call to XenAPI without using .xenapi') + + def _check_arg_count(self, params, expected): + actual = len(params) + if actual != expected: + raise Failure(['MESSAGE_PARAMETER_COUNT_MISMATCH', + expected, actual]) + + def _get_by_field(self, recs, k, v): + result = [] + for ref, rec in recs.iteritems(): + if rec.get(k) == v: + result.append(ref) + return result + + +# Based upon _Method from xmlrpclib. +class _Dispatcher: + def __init__(self, send, name): + self.__send = send + self.__name = name + + def __repr__(self): + if self.__name: + return '<xenapi.fake._Dispatcher for %s>' % self.__name + else: + return '<xenapi.fake._Dispatcher>' + + def __getattr__(self, name): + if self.__name is None: + return _Dispatcher(self.__send, name) + else: + return _Dispatcher(self.__send, "%s.%s" % (self.__name, name)) + + def __call__(self, *args): + return self.__send(self.__name, args) diff --git a/nova/virt/xenapi/network_utils.py b/nova/virt/xenapi/network_utils.py index ce2c68ce0..c0406d8f0 100644 --- a/nova/virt/xenapi/network_utils.py +++ b/nova/virt/xenapi/network_utils.py @@ -21,22 +21,23 @@ their lookup functions. """ -class NetworkHelper(): +from nova.virt.xenapi import HelperBase + + +class NetworkHelper(HelperBase): """ The class that wraps the helper methods together. """ - def __init__(self): - return - @classmethod def find_network_with_bridge(cls, session, bridge): - """ Return the network on which the bridge is attached, if found.""" + """Return the network on which the bridge is attached, if found.""" expr = 'field "bridge" = "%s"' % bridge networks = session.call_xenapi('network.get_all_records_where', expr) if len(networks) == 1: return networks.keys()[0] elif len(networks) > 1: - raise Exception('Found non-unique network for bridge %s' % bridge) + raise Exception(_('Found non-unique network' + ' for bridge %s') % bridge) else: - raise Exception('Found no network for bridge %s' % bridge) + raise Exception(_('Found no network for bridge %s') % bridge) diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index d92ca235a..2f02f4677 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -23,12 +23,14 @@ import logging import urllib from xml.dom import minidom +from nova import exception from nova import flags -from nova import utils from nova.auth.manager import AuthManager from nova.compute import instance_types from nova.compute import power_state from nova.virt import images +from nova.virt.xenapi import HelperBase +from nova.virt.xenapi.volume_utils import StorageError FLAGS = flags.FLAGS @@ -40,8 +42,6 @@ XENAPI_POWER_STATE = { 'Suspended': power_state.SHUTDOWN, # FIXME 'Crashed': power_state.CRASHED} -XenAPI = None - class ImageType: """ @@ -56,25 +56,11 @@ class ImageType: DISK_RAW = 2 -class VMHelper(): +class VMHelper(HelperBase): """ The class that wraps the helper methods together. """ - def __init__(self): - return - - @classmethod - def late_import(cls): - """ - Load the XenAPI module in for helper class, if required. - This is to avoid to install the XenAPI library when other - hypervisors are used - """ - global XenAPI - if XenAPI is None: - XenAPI = __import__('XenAPI') - @classmethod def create_vm(cls, session, instance, kernel, ramdisk, pv_kernel=False): """Create a VM record. Returns a Deferred that gives the new @@ -134,14 +120,13 @@ class VMHelper(): 'pae': 'true', 'viridian': 'true'} logging.debug('Created VM %s...', instance.name) vm_ref = session.call_xenapi('VM.create', rec) - logging.debug('Created VM %s as %s.', instance.name, vm_ref) + logging.debug(_('Created VM %s as %s.'), instance.name, vm_ref) return vm_ref @classmethod def create_vbd(cls, session, vm_ref, vdi_ref, userdevice, bootable): """Create a VBD record. Returns a Deferred that gives the new VBD reference.""" - vbd_rec = {} vbd_rec['VM'] = vm_ref vbd_rec['VDI'] = vdi_ref @@ -155,17 +140,53 @@ class VMHelper(): vbd_rec['qos_algorithm_type'] = '' vbd_rec['qos_algorithm_params'] = {} vbd_rec['qos_supported_algorithms'] = [] - logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) + logging.debug(_('Creating VBD for VM %s, VDI %s ... '), + vm_ref, vdi_ref) vbd_ref = session.call_xenapi('VBD.create', vbd_rec) - logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, + logging.debug(_('Created VBD %s for VM %s, VDI %s.'), vbd_ref, vm_ref, vdi_ref) return vbd_ref @classmethod + def find_vbd_by_number(cls, session, vm_ref, number): + """Get the VBD reference from the device number""" + vbds = session.get_xenapi().VM.get_VBDs(vm_ref) + if vbds: + for vbd in vbds: + try: + vbd_rec = session.get_xenapi().VBD.get_record(vbd) + if vbd_rec['userdevice'] == str(number): + return vbd + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('VBD not found in instance %s') % vm_ref) + + @classmethod + def unplug_vbd(cls, session, vbd_ref): + """Unplug VBD from VM""" + try: + vbd_ref = session.call_xenapi('VBD.unplug', vbd_ref) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + if exc.details[0] != 'DEVICE_ALREADY_DETACHED': + raise StorageError(_('Unable to unplug VBD %s') % vbd_ref) + + @classmethod + def destroy_vbd(cls, session, vbd_ref): + """Destroy VBD from host database""" + try: + task = session.call_xenapi('Async.VBD.destroy', vbd_ref) + #FIXME(armando): find a solution to missing instance_id + #with Josh Kearney + session.wait_for_task(0, task) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to destroy VBD %s') % vbd_ref) + + @classmethod def create_vif(cls, session, vm_ref, network_ref, mac_address): """Create a VIF record. Returns a Deferred that gives the new VIF reference.""" - vif_rec = {} vif_rec['device'] = '0' vif_rec['network'] = network_ref @@ -175,10 +196,10 @@ class VMHelper(): vif_rec['other_config'] = {} vif_rec['qos_algorithm_type'] = '' vif_rec['qos_algorithm_params'] = {} - logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, + logging.debug(_('Creating VIF for VM %s, network %s.'), vm_ref, network_ref) vif_ref = session.call_xenapi('VIF.create', vif_rec) - logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, + logging.debug(_('Created VIF %s for VM %s, network %s.'), vif_ref, vm_ref, network_ref) return vif_ref @@ -202,7 +223,9 @@ class VMHelper(): if type == ImageType.DISK_RAW: args['raw'] = 'true' task = session.async_call_plugin('objectstore', fn, args) - uuid = session.wait_for_task(task) + #FIXME(armando): find a solution to missing instance_id + #with Josh Kearney + uuid = session.wait_for_task(0, task) return uuid @classmethod @@ -223,29 +246,19 @@ class VMHelper(): @classmethod def lookup(cls, session, i): - """ Look the instance i up, and returns it if available """ - return VMHelper.lookup_blocking(session, i) - - @classmethod - def lookup_blocking(cls, session, i): - """ Synchronous lookup """ + """Look the instance i up, and returns it if available""" vms = session.get_xenapi().VM.get_by_name_label(i) n = len(vms) if n == 0: return None elif n > 1: - raise Exception('duplicate name found: %s' % i) + raise exception.Duplicate(_('duplicate name found: %s') % i) else: return vms[0] @classmethod def lookup_vm_vdis(cls, session, vm): - """ Look for the VDIs that are attached to the VM """ - return VMHelper.lookup_vm_vdis_blocking(session, vm) - - @classmethod - def lookup_vm_vdis_blocking(cls, session, vm): - """ Synchronous lookup_vm_vdis """ + """Look for the VDIs that are attached to the VM""" # Firstly we get the VBDs, then the VDIs. # TODO(Armando): do we leave the read-only devices? vbds = session.get_xenapi().VM.get_VBDs(vm) @@ -256,8 +269,9 @@ class VMHelper(): vdi = session.get_xenapi().VBD.get_VDI(vbd) # Test valid VDI record = session.get_xenapi().VDI.get_record(vdi) - logging.debug('VDI %s is still available', record['uuid']) - except XenAPI.Failure, exc: + logging.debug(_('VDI %s is still available'), + record['uuid']) + except cls.XenAPI.Failure, exc: logging.warn(exc) else: vdis.append(vdi) @@ -268,6 +282,7 @@ class VMHelper(): @classmethod def compile_info(cls, record): + """Fill record with VM status information""" return {'state': XENAPI_POWER_STATE[record['power_state']], 'max_mem': long(record['memory_static_max']) >> 10, 'mem': long(record['memory_dynamic_max']) >> 10, @@ -280,11 +295,7 @@ class VMHelper(): try: host = session.get_xenapi_host() host_ip = session.get_xenapi().host.get_record(host)["address"] - metrics = session.get_xenapi().VM_guest_metrics.get_record( - record["guest_metrics"]) - diags = { - "Kernel": metrics["os_version"]["uname"], - "Distro": metrics["os_version"]["name"]} + diags = {} xml = get_rrd(host_ip, record["uuid"]) if xml: rrd = minidom.parseString(xml) @@ -295,7 +306,7 @@ class VMHelper(): # Name and Value diags[ref[0].firstChild.data] = ref[6].firstChild.data return diags - except XenAPI.Failure as e: + except cls.XenAPI.Failure as e: return {"Unable to retrieve diagnostics": e} diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index cd704d9f8..b1de73641 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -22,14 +22,15 @@ import logging from nova import db from nova import context +from nova import exception +from nova import utils from nova.auth.manager import AuthManager +from nova.compute import power_state from nova.virt.xenapi.network_utils import NetworkHelper from nova.virt.xenapi.vm_utils import VMHelper from nova.virt.xenapi.vm_utils import ImageType -XenAPI = None - class VMOps(object): """ @@ -37,27 +38,28 @@ class VMOps(object): """ def __init__(self, session): - global XenAPI - if XenAPI is None: - XenAPI = __import__('XenAPI') + self.XenAPI = session.get_imported_xenapi() self._session = session - # Load XenAPI module in the helper class - VMHelper.late_import() + VMHelper.XenAPI = self.XenAPI def list_instances(self): - """ List VM instances """ - return [self._session.get_xenapi().VM.get_name_label(vm) \ - for vm in self._session.get_xenapi().VM.get_all()] + """List VM instances""" + vms = [] + for vm in self._session.get_xenapi().VM.get_all(): + rec = self._session.get_xenapi().VM.get_record(vm) + if not rec["is_a_template"] and not rec["is_control_domain"]: + vms.append(rec["name_label"]) + return vms def spawn(self, instance): - """ Create VM instance """ + """Create VM instance""" vm = VMHelper.lookup(self._session, instance.name) if vm is not None: - raise Exception('Attempted to create non-unique name %s' % - instance.name) + raise exception.Duplicate(_('Attempted to create' + ' non-unique name %s') % instance.name) - bridge = db.project_get_network(context.get_admin_context(), - instance.project_id).bridge + bridge = db.network_get_by_instance(context.get_admin_context(), + instance['id'])['bridge'] network_ref = \ NetworkHelper.find_network_with_bridge(self._session, bridge) @@ -90,22 +92,46 @@ class VMOps(object): if network_ref: VMHelper.create_vif(self._session, vm_ref, network_ref, instance.mac_address) - logging.debug('Starting VM %s...', vm_ref) + logging.debug(_('Starting VM %s...'), vm_ref) self._session.call_xenapi('VM.start', vm_ref, False, False) - logging.info('Spawning VM %s created %s.', instance.name, + logging.info(_('Spawning VM %s created %s.'), instance.name, vm_ref) + # NOTE(armando): Do we really need to do this in virt? + timer = utils.LoopingCall(f=None) + + def _wait_for_boot(): + try: + state = self.get_info(instance['name'])['state'] + db.instance_set_state(context.get_admin_context(), + instance['id'], state) + if state == power_state.RUNNING: + logging.debug(_('Instance %s: booted'), instance['name']) + timer.stop() + except Exception, exc: + logging.warn(exc) + logging.exception(_('instance %s: failed to boot'), + instance['name']) + db.instance_set_state(context.get_admin_context(), + instance['id'], + power_state.SHUTDOWN) + timer.stop() + + timer.f = _wait_for_boot + return timer.start(interval=0.5, now=True) + def reboot(self, instance): - """ Reboot VM instance """ + """Reboot VM instance""" instance_name = instance.name vm = VMHelper.lookup(self._session, instance_name) if vm is None: - raise Exception('instance not present %s' % instance_name) + raise exception.NotFound(_('instance not' + ' found %s') % instance_name) task = self._session.call_xenapi('Async.VM.clean_reboot', vm) - self._session.wait_for_task(task) + self._session.wait_for_task(instance.id, task) def destroy(self, instance): - """ Destroy VM instance """ + """Destroy VM instance""" vm = VMHelper.lookup(self._session, instance.name) if vm is None: # Don't complain, just return. This lets us clean up instances @@ -116,7 +142,7 @@ class VMOps(object): try: task = self._session.call_xenapi('Async.VM.hard_shutdown', vm) - self._session.wait_for_task(task) + self._session.wait_for_task(instance.id, task) except XenAPI.Failure, exc: logging.warn(exc) # Disk clean-up @@ -124,46 +150,50 @@ class VMOps(object): for vdi in vdis: try: task = self._session.call_xenapi('Async.VDI.destroy', vdi) - self._session.wait_for_task(task) + self._session.wait_for_task(instance.id, task) except XenAPI.Failure, exc: logging.warn(exc) + # VM Destroy try: task = self._session.call_xenapi('Async.VM.destroy', vm) - self._session.wait_for_task(task) + self._session.wait_for_task(instance.id, task) except XenAPI.Failure, exc: logging.warn(exc) - def _wait_with_callback(self, task, callback): + def _wait_with_callback(self, instance_id, task, callback): ret = None try: - ret = self._session.wait_for_task(task) + ret = self._session.wait_for_task(instance_id, task) except XenAPI.Failure, exc: logging.warn(exc) callback(ret) def pause(self, instance, callback): - """ Pause VM instance """ + """Pause VM instance""" instance_name = instance.name vm = VMHelper.lookup(self._session, instance_name) if vm is None: - raise Exception('instance not present %s' % instance_name) + raise exception.NotFound(_('Instance not' + ' found %s') % instance_name) task = self._session.call_xenapi('Async.VM.pause', vm) - self._wait_with_callback(task, callback) + self._wait_with_callback(instance.id, task, callback) def unpause(self, instance, callback): - """ Unpause VM instance """ + """Unpause VM instance""" instance_name = instance.name vm = VMHelper.lookup(self._session, instance_name) if vm is None: - raise Exception('instance not present %s' % instance_name) + raise exception.NotFound(_('Instance not' + ' found %s') % instance_name) task = self._session.call_xenapi('Async.VM.unpause', vm) - self._wait_with_callback(task, callback) + self._wait_with_callback(instance.id, task, callback) def get_info(self, instance_id): - """ Return data about VM instance """ - vm = VMHelper.lookup_blocking(self._session, instance_id) + """Return data about VM instance""" + vm = VMHelper.lookup(self._session, instance_id) if vm is None: - raise Exception('instance not present %s' % instance_id) + raise exception.NotFound(_('Instance not' + ' found %s') % instance_id) rec = self._session.get_xenapi().VM.get_record(vm) return VMHelper.compile_info(rec) @@ -171,11 +201,11 @@ class VMOps(object): """Return data about VM diagnostics""" vm = VMHelper.lookup(self._session, instance_id) if vm is None: - raise Exception("instance not present %s" % instance_id) + raise exception.NotFound(_("Instance not found %s") % instance_id) rec = self._session.get_xenapi().VM.get_record(vm) return VMHelper.compile_diagnostics(self._session, rec) def get_console_output(self, instance): - """ Return snapshot of console """ + """Return snapshot of console""" # TODO: implement this to fix pylint! return 'FAKE CONSOLE OUTPUT of instance' diff --git a/nova/virt/xenapi/volume_utils.py b/nova/virt/xenapi/volume_utils.py new file mode 100644 index 000000000..a0c0a67d4 --- /dev/null +++ b/nova/virt/xenapi/volume_utils.py @@ -0,0 +1,268 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2010 Citrix Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Helper methods for operations related to the management of volumes, +and storage repositories +""" + +import re +import string +import logging + +from nova import db +from nova import context +from nova import exception +from nova import flags +from nova import utils +from nova.virt.xenapi import HelperBase + +FLAGS = flags.FLAGS + + +class StorageError(Exception): + """To raise errors related to SR, VDI, PBD, and VBD commands""" + + def __init__(self, message=None): + super(StorageError, self).__init__(message) + + +class VolumeHelper(HelperBase): + """ + The class that wraps the helper methods together. + """ + + @classmethod + def create_iscsi_storage(cls, session, info, label, description): + """ + Create an iSCSI storage repository that will be used to mount + the volume for the specified instance + """ + sr_ref = session.get_xenapi().SR.get_by_name_label(label) + if len(sr_ref) == 0: + logging.debug('Introducing %s...', label) + record = {} + if 'chapuser' in info and 'chappassword' in info: + record = {'target': info['targetHost'], + 'port': info['targetPort'], + 'targetIQN': info['targetIQN'], + 'chapuser': info['chapuser'], + 'chappassword': info['chappassword'] + } + else: + record = {'target': info['targetHost'], + 'port': info['targetPort'], + 'targetIQN': info['targetIQN'] + } + try: + sr_ref = session.get_xenapi().SR.create( + session.get_xenapi_host(), + record, + '0', label, description, 'iscsi', '', False, {}) + logging.debug('Introduced %s as %s.', label, sr_ref) + return sr_ref + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to create Storage Repository')) + else: + return sr_ref[0] + + @classmethod + def find_sr_from_vbd(cls, session, vbd_ref): + """Find the SR reference from the VBD reference""" + try: + vdi_ref = session.get_xenapi().VBD.get_VDI(vbd_ref) + sr_ref = session.get_xenapi().VDI.get_SR(vdi_ref) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to find SR from VBD %s') % vbd_ref) + return sr_ref + + @classmethod + def destroy_iscsi_storage(cls, session, sr_ref): + """Forget the SR whilst preserving the state of the disk""" + logging.debug("Forgetting SR %s ... ", sr_ref) + pbds = [] + try: + pbds = session.get_xenapi().SR.get_PBDs(sr_ref) + except cls.XenAPI.Failure, exc: + logging.warn('Ignoring exception %s when getting PBDs for %s', + exc, sr_ref) + for pbd in pbds: + try: + session.get_xenapi().PBD.unplug(pbd) + except cls.XenAPI.Failure, exc: + logging.warn('Ignoring exception %s when unplugging PBD %s', + exc, pbd) + try: + session.get_xenapi().SR.forget(sr_ref) + logging.debug("Forgetting SR %s done.", sr_ref) + except cls.XenAPI.Failure, exc: + logging.warn('Ignoring exception %s when forgetting SR %s', + exc, sr_ref) + + @classmethod + def introduce_vdi(cls, session, sr_ref): + """Introduce VDI in the host""" + try: + vdis = session.get_xenapi().SR.get_VDIs(sr_ref) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to introduce VDI on SR %s') % sr_ref) + try: + vdi_rec = session.get_xenapi().VDI.get_record(vdis[0]) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to get record' + ' of VDI %s on') % vdis[0]) + else: + try: + return session.get_xenapi().VDI.introduce( + vdi_rec['uuid'], + vdi_rec['name_label'], + vdi_rec['name_description'], + vdi_rec['SR'], + vdi_rec['type'], + vdi_rec['sharable'], + vdi_rec['read_only'], + vdi_rec['other_config'], + vdi_rec['location'], + vdi_rec['xenstore_data'], + vdi_rec['sm_config']) + except cls.XenAPI.Failure, exc: + logging.warn(exc) + raise StorageError(_('Unable to introduce VDI for SR %s') + % sr_ref) + + @classmethod + def parse_volume_info(cls, device_path, mountpoint): + """ + Parse device_path and mountpoint as they can be used by XenAPI. + In particular, the mountpoint (e.g. /dev/sdc) must be translated + into a numeric literal. + FIXME(armando): + As for device_path, currently cannot be used as it is, + because it does not contain target information. As for interim + solution, target details are passed either via Flags or obtained + by iscsiadm. Long-term solution is to add a few more fields to the + db in the iscsi_target table with the necessary info and modify + the iscsi driver to set them. + """ + device_number = VolumeHelper.mountpoint_to_number(mountpoint) + volume_id = _get_volume_id(device_path) + (iscsi_name, iscsi_portal) = _get_target(volume_id) + target_host = _get_target_host(iscsi_portal) + target_port = _get_target_port(iscsi_portal) + target_iqn = _get_iqn(iscsi_name, volume_id) + logging.debug('(vol_id,number,host,port,iqn): (%s,%s,%s,%s)', + volume_id, + target_host, + target_port, + target_iqn) + if (device_number < 0) or \ + (volume_id is None) or \ + (target_host is None) or \ + (target_iqn is None): + raise StorageError(_('Unable to obtain target information %s, %s') + % (device_path, mountpoint)) + volume_info = {} + volume_info['deviceNumber'] = device_number + volume_info['volumeId'] = volume_id + volume_info['targetHost'] = target_host + volume_info['targetPort'] = target_port + volume_info['targetIQN'] = target_iqn + return volume_info + + @classmethod + def mountpoint_to_number(cls, mountpoint): + """Translate a mountpoint like /dev/sdc into a numeric""" + if mountpoint.startswith('/dev/'): + mountpoint = mountpoint[5:] + if re.match('^[hs]d[a-p]$', mountpoint): + return (ord(mountpoint[2:3]) - ord('a')) + elif re.match('^vd[a-p]$', mountpoint): + return (ord(mountpoint[2:3]) - ord('a')) + elif re.match('^[0-9]+$', mountpoint): + return string.atoi(mountpoint, 10) + else: + logging.warn('Mountpoint cannot be translated: %s', mountpoint) + return -1 + + +def _get_volume_id(path): + """Retrieve the volume id from device_path""" + # n must contain at least the volume_id + # /vol- is for remote volumes + # -vol- is for local volumes + # see compute/manager->setup_compute_volume + volume_id = path[path.find('/vol-') + 1:] + if volume_id == path: + volume_id = path[path.find('-vol-') + 1:].replace('--', '-') + return volume_id + + +def _get_target_host(iscsi_string): + """Retrieve target host""" + if iscsi_string: + return iscsi_string[0:iscsi_string.find(':')] + elif iscsi_string is None or FLAGS.target_host: + return FLAGS.target_host + + +def _get_target_port(iscsi_string): + """Retrieve target port""" + if iscsi_string: + return iscsi_string[iscsi_string.find(':') + 1:] + elif iscsi_string is None or FLAGS.target_port: + return FLAGS.target_port + + +def _get_iqn(iscsi_string, id): + """Retrieve target IQN""" + if iscsi_string: + return iscsi_string + elif iscsi_string is None or FLAGS.iqn_prefix: + volume_id = _get_volume_id(id) + return '%s:%s' % (FLAGS.iqn_prefix, volume_id) + + +def _get_target(volume_id): + """ + Gets iscsi name and portal from volume name and host. + For this method to work the following are needed: + 1) volume_ref['host'] must resolve to something rather than loopback + 2) ietd must bind only to the address as resolved above + If any of the two conditions are not met, fall back on Flags. + """ + volume_ref = db.volume_get_by_ec2_id(context.get_admin_context(), + volume_id) + result = (None, None) + try: + (r, _e) = utils.execute("sudo iscsiadm -m discovery -t " + "sendtargets -p %s" % + volume_ref['host']) + except exception.ProcessExecutionError, exc: + logging.warn(exc) + else: + targets = r.splitlines() + if len(_e) == 0 and len(targets) == 1: + for target in targets: + if volume_id in target: + (location, _sep, iscsi_name) = target.partition(" ") + break + iscsi_portal = location.split(",")[0] + result = (iscsi_name, iscsi_portal) + return result diff --git a/nova/virt/xenapi/volumeops.py b/nova/virt/xenapi/volumeops.py index 1943ccab0..fdeb2506c 100644 --- a/nova/virt/xenapi/volumeops.py +++ b/nova/virt/xenapi/volumeops.py @@ -17,17 +17,110 @@ """ Management class for Storage-related functions (attach, detach, etc). """ +import logging + +from nova import exception +from nova.virt.xenapi.vm_utils import VMHelper +from nova.virt.xenapi.volume_utils import VolumeHelper +from nova.virt.xenapi.volume_utils import StorageError class VolumeOps(object): + """ + Management class for Volume-related tasks + """ def __init__(self, session): + self.XenAPI = session.get_imported_xenapi() self._session = session + # Load XenAPI module in the helper classes respectively + VolumeHelper.XenAPI = self.XenAPI + VMHelper.XenAPI = self.XenAPI def attach_volume(self, instance_name, device_path, mountpoint): - # FIXME: that's going to be sorted when iscsi-xenapi lands in branch - return True + """Attach volume storage to VM instance""" + # Before we start, check that the VM exists + vm_ref = VMHelper.lookup(self._session, instance_name) + if vm_ref is None: + raise exception.NotFound(_('Instance %s not found') + % instance_name) + # NOTE: No Resource Pool concept so far + logging.debug(_("Attach_volume: %s, %s, %s"), + instance_name, device_path, mountpoint) + # Create the iSCSI SR, and the PDB through which hosts access SRs. + # But first, retrieve target info, like Host, IQN, LUN and SCSIID + vol_rec = VolumeHelper.parse_volume_info(device_path, mountpoint) + label = 'SR-%s' % vol_rec['volumeId'] + description = 'Disk-for:%s' % instance_name + # Create SR + sr_ref = VolumeHelper.create_iscsi_storage(self._session, + vol_rec, + label, + description) + # Introduce VDI and attach VBD to VM + try: + vdi_ref = VolumeHelper.introduce_vdi(self._session, sr_ref) + except StorageError, exc: + logging.warn(exc) + VolumeHelper.destroy_iscsi_storage(self._session, sr_ref) + raise Exception(_('Unable to create VDI on SR %s for instance %s') + % (sr_ref, + instance_name)) + else: + try: + vbd_ref = VMHelper.create_vbd(self._session, + vm_ref, vdi_ref, + vol_rec['deviceNumber'], + False) + except self.XenAPI.Failure, exc: + logging.warn(exc) + VolumeHelper.destroy_iscsi_storage(self._session, sr_ref) + raise Exception(_('Unable to use SR %s for instance %s') + % (sr_ref, + instance_name)) + else: + try: + task = self._session.call_xenapi('Async.VBD.plug', + vbd_ref) + self._session.wait_for_task(vol_rec['deviceNumber'], task) + except self.XenAPI.Failure, exc: + logging.warn(exc) + VolumeHelper.destroy_iscsi_storage(self._session, + sr_ref) + raise Exception(_('Unable to attach volume to instance %s') + % instance_name) + logging.info(_('Mountpoint %s attached to instance %s'), + mountpoint, instance_name) def detach_volume(self, instance_name, mountpoint): - # FIXME: that's going to be sorted when iscsi-xenapi lands in branch - return True + """Detach volume storage to VM instance""" + # Before we start, check that the VM exists + vm_ref = VMHelper.lookup(self._session, instance_name) + if vm_ref is None: + raise exception.NotFound(_('Instance %s not found') + % instance_name) + # Detach VBD from VM + logging.debug(_("Detach_volume: %s, %s"), instance_name, mountpoint) + device_number = VolumeHelper.mountpoint_to_number(mountpoint) + try: + vbd_ref = VMHelper.find_vbd_by_number(self._session, + vm_ref, device_number) + except StorageError, exc: + logging.warn(exc) + raise Exception(_('Unable to locate volume %s') % mountpoint) + else: + try: + sr_ref = VolumeHelper.find_sr_from_vbd(self._session, + vbd_ref) + VMHelper.unplug_vbd(self._session, vbd_ref) + except StorageError, exc: + logging.warn(exc) + raise Exception(_('Unable to detach volume %s') % mountpoint) + try: + VMHelper.destroy_vbd(self._session, vbd_ref) + except StorageError, exc: + logging.warn(exc) + # Forget SR + VolumeHelper.destroy_iscsi_storage(self._session, sr_ref) + logging.info(_('Mountpoint %s detached from instance %s'), + mountpoint, instance_name) diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index 21ed2cd65..11c66c974 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -44,7 +44,10 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block. :xenapi_task_poll_interval: The interval (seconds) used for polling of remote tasks (Async.VM.start, etc) (default: 0.5). - +:target_host: the iSCSI Target Host IP address, i.e. the IP + address for the nova-volume host +:target_port: iSCSI Target Port, 3260 Default +:iqn_prefix: IQN Prefix, e.g. 'iqn.2010-10.org.openstack' """ import logging @@ -54,12 +57,15 @@ import xmlrpclib from eventlet import event from eventlet import tpool +from nova import context +from nova import db from nova import utils from nova import flags from nova.virt.xenapi.vmops import VMOps from nova.virt.xenapi.volumeops import VolumeOps FLAGS = flags.FLAGS + flags.DEFINE_string('xenapi_connection_url', None, 'URL for connection to XenServer/Xen Cloud Platform.' @@ -77,63 +83,72 @@ flags.DEFINE_float('xenapi_task_poll_interval', 'The interval used for polling of remote tasks ' '(Async.VM.start, etc). Used only if ' 'connection_type=xenapi.') - -XenAPI = None +flags.DEFINE_string('target_host', + None, + 'iSCSI Target Host') +flags.DEFINE_string('target_port', + '3260', + 'iSCSI Target Port, 3260 Default') +flags.DEFINE_string('iqn_prefix', + 'iqn.2010-10.org.openstack', + 'IQN Prefix') def get_connection(_): """Note that XenAPI doesn't have a read-only connection mode, so the read_only parameter is ignored.""" - # This is loaded late so that there's no need to install this - # library when not using XenAPI. - global XenAPI - if XenAPI is None: - XenAPI = __import__('XenAPI') url = FLAGS.xenapi_connection_url username = FLAGS.xenapi_connection_username password = FLAGS.xenapi_connection_password if not url or password is None: - raise Exception('Must specify xenapi_connection_url, ' - 'xenapi_connection_username (optionally), and ' - 'xenapi_connection_password to use ' - 'connection_type=xenapi') + raise Exception(_('Must specify xenapi_connection_url, ' + 'xenapi_connection_username (optionally), and ' + 'xenapi_connection_password to use ' + 'connection_type=xenapi')) return XenAPIConnection(url, username, password) class XenAPIConnection(object): - """ A connection to XenServer or Xen Cloud Platform """ + """A connection to XenServer or Xen Cloud Platform""" def __init__(self, url, user, pw): session = XenAPISession(url, user, pw) self._vmops = VMOps(session) self._volumeops = VolumeOps(session) + def init_host(self): + #FIXME(armando): implement this + #NOTE(armando): would we need a method + #to call when shutting down the host? + #e.g. to do session logout? + pass + def list_instances(self): - """ List VM instances """ + """List VM instances""" return self._vmops.list_instances() def spawn(self, instance): - """ Create VM instance """ + """Create VM instance""" self._vmops.spawn(instance) def reboot(self, instance): - """ Reboot VM instance """ + """Reboot VM instance""" self._vmops.reboot(instance) def destroy(self, instance): - """ Destroy VM instance """ + """Destroy VM instance""" self._vmops.destroy(instance) def pause(self, instance, callback): - """ Pause VM instance """ + """Pause VM instance""" self._vmops.pause(instance, callback) def unpause(self, instance, callback): - """ Unpause paused VM instance """ + """Unpause paused VM instance""" self._vmops.unpause(instance, callback) def get_info(self, instance_id): - """ Return data about VM instance """ + """Return data about VM instance""" return self._vmops.get_info(instance_id) def get_diagnostics(self, instance_id): @@ -141,33 +156,38 @@ class XenAPIConnection(object): return self._vmops.get_diagnostics(instance_id) def get_console_output(self, instance): - """ Return snapshot of console """ + """Return snapshot of console""" return self._vmops.get_console_output(instance) def attach_volume(self, instance_name, device_path, mountpoint): - """ Attach volume storage to VM instance """ + """Attach volume storage to VM instance""" return self._volumeops.attach_volume(instance_name, device_path, mountpoint) def detach_volume(self, instance_name, mountpoint): - """ Detach volume storage to VM instance """ + """Detach volume storage to VM instance""" return self._volumeops.detach_volume(instance_name, mountpoint) class XenAPISession(object): - """ The session to invoke XenAPI SDK calls """ + """The session to invoke XenAPI SDK calls""" def __init__(self, url, user, pw): - self._session = XenAPI.Session(url) + self.XenAPI = self.get_imported_xenapi() + self._session = self._create_session(url) self._session.login_with_password(user, pw) + def get_imported_xenapi(self): + """Stubout point. This can be replaced with a mock xenapi module.""" + return __import__('XenAPI') + def get_xenapi(self): - """ Return the xenapi object """ + """Return the xenapi object""" return self._session.xenapi def get_xenapi_host(self): - """ Return the xenapi host """ + """Return the xenapi host""" return self._session.xenapi.session.get_this_host(self._session.handle) def call_xenapi(self, method, *args): @@ -179,64 +199,78 @@ class XenAPISession(object): def async_call_plugin(self, plugin, fn, args): """Call Async.host.call_plugin on a background thread.""" - return tpool.execute(_unwrap_plugin_exceptions, + return tpool.execute(self._unwrap_plugin_exceptions, self._session.xenapi.Async.host.call_plugin, self.get_xenapi_host(), plugin, fn, args) - def wait_for_task(self, task): - """Return a Deferred that will give the result of the given task. - The task is polled until it completes.""" + def wait_for_task(self, id, task): + """Return the result of the given task. The task is polled + until it completes.""" done = event.Event() - loop = utils.LoopingCall(self._poll_task, task, done) + loop = utils.LoopingCall(self._poll_task, id, task, done) loop.start(FLAGS.xenapi_task_poll_interval, now=True) rv = done.wait() loop.stop() return rv - def _poll_task(self, task, done): + def _create_session(self, url): + """Stubout point. This can be replaced with a mock session.""" + return self.XenAPI.Session(url) + + def _poll_task(self, id, task, done): """Poll the given XenAPI task, and fire the given Deferred if we get a result.""" try: - #logging.debug('Polling task %s...', task) + name = self._session.xenapi.task.get_name_label(task) status = self._session.xenapi.task.get_status(task) - if status == 'pending': + action = dict( + id=int(id), + action=name, + error=None) + if status == "pending": return - elif status == 'success': + elif status == "success": result = self._session.xenapi.task.get_result(task) - logging.info('Task %s status: success. %s', task, result) + logging.info(_("Task [%s] %s status: success %s") % ( + name, + task, + result)) done.send(_parse_xmlrpc_value(result)) else: error_info = self._session.xenapi.task.get_error_info(task) - logging.warn('Task %s status: %s. %s', task, status, - error_info) - done.send_exception(XenAPI.Failure(error_info)) - #logging.debug('Polling task %s done.', task) - except XenAPI.Failure, exc: + action["error"] = str(error_info) + logging.warn(_("Task [%s] %s status: %s %s") % ( + name, + task, + status, + error_info)) + done.send_exception(self.XenAPI.Failure(error_info)) + db.instance_action_create(context.get_admin_context(), action) + except self.XenAPI.Failure, exc: logging.warn(exc) done.send_exception(*sys.exc_info()) - -def _unwrap_plugin_exceptions(func, *args, **kwargs): - """ Parse exception details """ - try: - return func(*args, **kwargs) - except XenAPI.Failure, exc: - logging.debug("Got exception: %s", exc) - if (len(exc.details) == 4 and - exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and - exc.details[2] == 'Failure'): - params = None - try: - params = eval(exc.details[3]) - except: - raise exc - raise XenAPI.Failure(params) - else: + def _unwrap_plugin_exceptions(self, func, *args, **kwargs): + """Parse exception details""" + try: + return func(*args, **kwargs) + except self.XenAPI.Failure, exc: + logging.debug(_("Got exception: %s"), exc) + if (len(exc.details) == 4 and + exc.details[0] == 'XENAPI_PLUGIN_EXCEPTION' and + exc.details[2] == 'Failure'): + params = None + try: + params = eval(exc.details[3]) + except: + raise exc + raise self.XenAPI.Failure(params) + else: + raise + except xmlrpclib.ProtocolError, exc: + logging.debug(_("Got exception: %s"), exc) raise - except xmlrpclib.ProtocolError, exc: - logging.debug("Got exception: %s", exc) - raise def _parse_xmlrpc_value(val): |
