diff options
| author | Ed Leafe <ed@leafe.com> | 2011-01-05 22:39:58 +0000 |
|---|---|---|
| committer | Tarmac <> | 2011-01-05 22:39:58 +0000 |
| commit | 4b23bbd4772c33621ddbf34fd13a4277c1126dcc (patch) | |
| tree | ed8d3bcdad02244d6c3bf338f444190c2b8d9519 | |
| parent | 2e0fd7df549115e5974a3e6c3723938a5fd8e2ce (diff) | |
| parent | 02c86d1e1146c1162a36620560eb8116ce8d47f1 (diff) | |
| download | nova-4b23bbd4772c33621ddbf34fd13a4277c1126dcc.tar.gz nova-4b23bbd4772c33621ddbf34fd13a4277c1126dcc.tar.xz nova-4b23bbd4772c33621ddbf34fd13a4277c1126dcc.zip | |
Created a XenAPI plugin that will allow nova code to read/write/delete from xenstore records for a given instance. Added the basic methods for working with xenstore data to the vmops script, as well as plugin support to xenapi_conn.py
| -rw-r--r-- | nova/virt/xenapi/vmops.py | 214 | ||||
| -rw-r--r-- | nova/virt/xenapi_conn.py | 28 | ||||
| -rwxr-xr-x | plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py | 28 | ||||
| -rwxr-xr-x | plugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py | 180 |
4 files changed, 410 insertions, 40 deletions
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 76f31635a..b6d620782 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -1,6 +1,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (c) 2010 Citrix Systems, Inc. +# Copyright 2010 OpenStack LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -18,6 +19,7 @@ Management class for VM-related functions (spawn, reboot, etc). """ +import json import logging from nova import db @@ -36,7 +38,6 @@ class VMOps(object): """ Management class for VM-related tasks """ - def __init__(self, session): self.XenAPI = session.get_imported_xenapi() self._session = session @@ -120,6 +121,20 @@ class VMOps(object): timer.f = _wait_for_boot return timer.start(interval=0.5, now=True) + def _get_vm_opaque_ref(self, instance_or_vm): + """Refactored out the common code of many methods that receive either + a vm name or a vm instance, and want a vm instance in return. + """ + try: + instance_name = instance_or_vm.name + vm = VMHelper.lookup(self._session, instance_name) + except AttributeError: + # A vm opaque ref was passed + vm = instance_or_vm + if vm is None: + raise Exception(_('Instance not present %s') % instance_name) + return vm + def snapshot(self, instance, name): """ Create snapshot from a running VM instance @@ -168,11 +183,7 @@ class VMOps(object): def reboot(self, instance): """Reboot VM instance""" - instance_name = instance.name - vm = VMHelper.lookup(self._session, instance_name) - if vm is None: - raise exception.NotFound(_('instance not' - ' found %s') % instance_name) + vm = self._get_vm_opaque_ref(instance) task = self._session.call_xenapi('Async.VM.clean_reboot', vm) self._session.wait_for_task(instance.id, task) @@ -215,27 +226,19 @@ class VMOps(object): ret = None try: ret = self._session.wait_for_task(instance_id, task) - except XenAPI.Failure, exc: + except self.XenAPI.Failure, exc: logging.warn(exc) callback(ret) def pause(self, instance, callback): """Pause VM instance""" - instance_name = instance.name - vm = VMHelper.lookup(self._session, instance_name) - if vm is None: - raise exception.NotFound(_('Instance not' - ' found %s') % instance_name) + vm = self._get_vm_opaque_ref(instance) task = self._session.call_xenapi('Async.VM.pause', vm) self._wait_with_callback(instance.id, task, callback) def unpause(self, instance, callback): """Unpause VM instance""" - instance_name = instance.name - vm = VMHelper.lookup(self._session, instance_name) - if vm is None: - raise exception.NotFound(_('Instance not' - ' found %s') % instance_name) + vm = self._get_vm_opaque_ref(instance) task = self._session.call_xenapi('Async.VM.unpause', vm) self._wait_with_callback(instance.id, task, callback) @@ -270,10 +273,7 @@ class VMOps(object): def get_diagnostics(self, instance): """Return data about VM diagnostics""" - vm = VMHelper.lookup(self._session, instance.name) - if vm is None: - raise exception.NotFound(_("Instance not found %s") % - instance.name) + vm = self._get_vm_opaque_ref(instance) rec = self._session.get_xenapi().VM.get_record(vm) return VMHelper.compile_diagnostics(self._session, rec) @@ -281,3 +281,175 @@ class VMOps(object): """Return snapshot of console""" # TODO: implement this to fix pylint! return 'FAKE CONSOLE OUTPUT of instance' + + def list_from_xenstore(self, vm, path): + """Runs the xenstore-ls command to get a listing of all records + from 'path' downward. Returns a dict with the sub-paths as keys, + and the value stored in those paths as values. If nothing is + found at that path, returns None. + """ + ret = self._make_xenstore_call('list_records', vm, path) + return json.loads(ret) + + def read_from_xenstore(self, vm, path): + """Returns the value stored in the xenstore record for the given VM + at the specified location. A XenAPIPlugin.PluginError will be raised + if any error is encountered in the read process. + """ + try: + ret = self._make_xenstore_call('read_record', vm, path, + {'ignore_missing_path': 'True'}) + except self.XenAPI.Failure, e: + return None + ret = json.loads(ret) + if ret == "None": + # Can't marshall None over RPC calls. + return None + return ret + + def write_to_xenstore(self, vm, path, value): + """Writes the passed value to the xenstore record for the given VM + at the specified location. A XenAPIPlugin.PluginError will be raised + if any error is encountered in the write process. + """ + return self._make_xenstore_call('write_record', vm, path, + {'value': json.dumps(value)}) + + def clear_xenstore(self, vm, path): + """Deletes the VM's xenstore record for the specified path. + If there is no such record, the request is ignored. + """ + self._make_xenstore_call('delete_record', vm, path) + + def _make_xenstore_call(self, method, vm, path, addl_args={}): + """Handles calls to the xenstore xenapi plugin.""" + return self._make_plugin_call('xenstore.py', method=method, vm=vm, + path=path, addl_args=addl_args) + + def _make_plugin_call(self, plugin, method, vm, path, addl_args={}): + """Abstracts out the process of calling a method of a xenapi plugin. + Any errors raised by the plugin will in turn raise a RuntimeError here. + """ + vm = self._get_vm_opaque_ref(vm) + rec = self._session.get_xenapi().VM.get_record(vm) + args = {'dom_id': rec['domid'], 'path': path} + args.update(addl_args) + # If the 'testing_mode' attribute is set, add that to the args. + if getattr(self, 'testing_mode', False): + args['testing_mode'] = 'true' + try: + task = self._session.async_call_plugin(plugin, method, args) + ret = self._session.wait_for_task(0, task) + except self.XenAPI.Failure, e: + raise RuntimeError("%s" % e.details[-1]) + return ret + + def add_to_xenstore(self, vm, path, key, value): + """Adds the passed key/value pair to the xenstore record for + the given VM at the specified location. A XenAPIPlugin.PluginError + will be raised if any error is encountered in the write process. + """ + current = self.read_from_xenstore(vm, path) + if not current: + # Nothing at that location + current = {key: value} + else: + current[key] = value + self.write_to_xenstore(vm, path, current) + + def remove_from_xenstore(self, vm, path, key_or_keys): + """Takes either a single key or a list of keys and removes + them from the xenstoreirecord data for the given VM. + If the key doesn't exist, the request is ignored. + """ + current = self.list_from_xenstore(vm, path) + if not current: + return + if isinstance(key_or_keys, basestring): + keys = [key_or_keys] + else: + keys = key_or_keys + keys.sort(lambda x, y: cmp(y.count('/'), x.count('/'))) + for key in keys: + if path: + keypath = "%s/%s" % (path, key) + else: + keypath = key + self._make_xenstore_call('delete_record', vm, keypath) + + ######################################################################## + ###### The following methods interact with the xenstore parameter + ###### record, not the live xenstore. They were created before I + ###### knew the difference, and are left in here in case they prove + ###### to be useful. They all have '_param' added to their method + ###### names to distinguish them. (dabo) + ######################################################################## + def read_partial_from_param_xenstore(self, instance_or_vm, key_prefix): + """Returns a dict of all the keys in the xenstore parameter record + for the given instance that begin with the key_prefix. + """ + data = self.read_from_param_xenstore(instance_or_vm) + badkeys = [k for k in data.keys() + if not k.startswith(key_prefix)] + for badkey in badkeys: + del data[badkey] + return data + + def read_from_param_xenstore(self, instance_or_vm, keys=None): + """Returns the xenstore parameter record data for the specified VM + instance as a dict. Accepts an optional key or list of keys; if a + value for 'keys' is passed, the returned dict is filtered to only + return the values for those keys. + """ + vm = self._get_vm_opaque_ref(instance_or_vm) + data = self._session.call_xenapi_request('VM.get_xenstore_data', + (vm, )) + ret = {} + if keys is None: + keys = data.keys() + elif isinstance(keys, basestring): + keys = [keys] + for key in keys: + raw = data.get(key) + if raw: + ret[key] = json.loads(raw) + else: + ret[key] = raw + return ret + + def add_to_param_xenstore(self, instance_or_vm, key, val): + """Takes a key/value pair and adds it to the xenstore parameter + record for the given vm instance. If the key exists in xenstore, + it is overwritten""" + vm = self._get_vm_opaque_ref(instance_or_vm) + self.remove_from_param_xenstore(instance_or_vm, key) + jsonval = json.dumps(val) + self._session.call_xenapi_request('VM.add_to_xenstore_data', + (vm, key, jsonval)) + + def write_to_param_xenstore(self, instance_or_vm, mapping): + """Takes a dict and writes each key/value pair to the xenstore + parameter record for the given vm instance. Any existing data for + those keys is overwritten. + """ + for k, v in mapping.iteritems(): + self.add_to_param_xenstore(instance_or_vm, k, v) + + def remove_from_param_xenstore(self, instance_or_vm, key_or_keys): + """Takes either a single key or a list of keys and removes + them from the xenstore parameter record data for the given VM. + If the key doesn't exist, the request is ignored. + """ + vm = self._get_vm_opaque_ref(instance_or_vm) + if isinstance(key_or_keys, basestring): + keys = [key_or_keys] + else: + keys = key_or_keys + for key in keys: + self._session.call_xenapi_request('VM.remove_from_xenstore_data', + (vm, key)) + + def clear_param_xenstore(self, instance_or_vm): + """Removes all data from the xenstore parameter record for this VM.""" + self.write_to_param_xenstore(instance_or_vm, {}) + ######################################################################## diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index f17c8f39d..c48f5b7cb 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -1,6 +1,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (c) 2010 Citrix Systems, Inc. +# Copyright 2010 OpenStack LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -19,15 +20,15 @@ A connection to XenServer or Xen Cloud Platform. The concurrency model for this class is as follows: -All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator -deferredToThread). They are remote calls, and so may hang for the usual -reasons. They should not be allowed to block the reactor thread. +All XenAPI calls are on a green thread (using eventlet's "tpool" +thread pool). They are remote calls, and so may hang for the usual +reasons. All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async -(using XenAPI.VM.async_start etc). These return a task, which can then be -polled for completion. Polling is handled using reactor.callLater. +(using XenAPI.VM.async_start etc). These return a task, which can then be +polled for completion. -This combination of techniques means that we don't block the reactor thread at +This combination of techniques means that we don't block the main thread at all, and at the same time we don't hold lots of threads waiting for long-running operations. @@ -81,7 +82,7 @@ flags.DEFINE_string('xenapi_connection_password', flags.DEFINE_float('xenapi_task_poll_interval', 0.5, 'The interval used for polling of remote tasks ' - '(Async.VM.start, etc). Used only if ' + '(Async.VM.start, etc). Used only if ' 'connection_type=xenapi.') flags.DEFINE_float('xenapi_vhd_coalesce_poll_interval', 5.0, @@ -213,6 +214,14 @@ class XenAPISession(object): f = f.__getattr__(m) return tpool.execute(f, *args) + def call_xenapi_request(self, method, *args): + """Some interactions with dom0, such as interacting with xenstore's + param record, require using the xenapi_request method of the session + object. This wraps that call on a background thread. + """ + f = self._session.xenapi_request + return tpool.execute(f, method, *args) + def async_call_plugin(self, plugin, fn, args): """Call Async.host.call_plugin on a background thread.""" return tpool.execute(self._unwrap_plugin_exceptions, @@ -222,7 +231,6 @@ class XenAPISession(object): def wait_for_task(self, id, task): """Return the result of the given task. The task is polled until it completes.""" - done = event.Event() loop = utils.LoopingCall(self._poll_task, id, task, done) loop.start(FLAGS.xenapi_task_poll_interval, now=True) @@ -235,7 +243,7 @@ class XenAPISession(object): return self.XenAPI.Session(url) def _poll_task(self, id, task, done): - """Poll the given XenAPI task, and fire the given Deferred if we + """Poll the given XenAPI task, and fire the given action if we get a result.""" try: name = self._session.xenapi.task.get_name_label(task) @@ -290,7 +298,7 @@ class XenAPISession(object): def _parse_xmlrpc_value(val): - """Parse the given value as if it were an XML-RPC value. This is + """Parse the given value as if it were an XML-RPC value. This is sometimes used as the format for the task.result field.""" if not val: return val diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py index 2d323a016..8e7a829d5 100755 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/pluginlib_nova.py @@ -45,6 +45,7 @@ class PluginError(Exception): def __init__(self, *args): Exception.__init__(self, *args) + class ArgumentError(PluginError): """Raised when required arguments are missing, argument values are invalid, or incompatible arguments are given. @@ -67,6 +68,7 @@ def ignore_failure(func, *args, **kwargs): ARGUMENT_PATTERN = re.compile(r'^[a-zA-Z0-9_:\.\-,]+$') + def validate_exists(args, key, default=None): """Validates that a string argument to a RPC method call is given, and matches the shell-safe regex, with an optional default value in case it @@ -76,20 +78,24 @@ def validate_exists(args, key, default=None): """ if key in args: if len(args[key]) == 0: - raise ArgumentError('Argument %r value %r is too short.' % (key, args[key])) + raise ArgumentError('Argument %r value %r is too short.' % + (key, args[key])) if not ARGUMENT_PATTERN.match(args[key]): - raise ArgumentError('Argument %r value %r contains invalid characters.' % (key, args[key])) + raise ArgumentError('Argument %r value %r contains invalid ' + 'characters.' % (key, args[key])) if args[key][0] == '-': - raise ArgumentError('Argument %r value %r starts with a hyphen.' % (key, args[key])) + raise ArgumentError('Argument %r value %r starts with a hyphen.' + % (key, args[key])) return args[key] elif default is not None: return default else: raise ArgumentError('Argument %s is required.' % key) + def validate_bool(args, key, default=None): - """Validates that a string argument to a RPC method call is a boolean string, - with an optional default value in case it does not exist. + """Validates that a string argument to a RPC method call is a boolean + string, with an optional default value in case it does not exist. Returns the python boolean value. """ @@ -99,7 +105,9 @@ def validate_bool(args, key, default=None): elif value.lower() == 'false': return False else: - raise ArgumentError("Argument %s may not take value %r. Valid values are ['true', 'false']." % (key, value)) + raise ArgumentError("Argument %s may not take value %r. " + "Valid values are ['true', 'false']." % (key, value)) + def exists(args, key): """Validates that a freeform string argument to a RPC method call is given. @@ -110,6 +118,7 @@ def exists(args, key): else: raise ArgumentError('Argument %s is required.' % key) + def optional(args, key): """If the given key is in args, return the corresponding value, otherwise return None""" @@ -122,13 +131,14 @@ def get_this_host(session): def get_domain_0(session): this_host_ref = get_this_host(session) - expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"' % this_host_ref + expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"' + expr = expr % this_host_ref return session.xenapi.VM.get_all_records_where(expr).keys()[0] def create_vdi(session, sr_ref, name_label, virtual_size, read_only): vdi_ref = session.xenapi.VDI.create( - { 'name_label': name_label, + {'name_label': name_label, 'name_description': '', 'SR': sr_ref, 'virtual_size': str(virtual_size), @@ -138,7 +148,7 @@ def create_vdi(session, sr_ref, name_label, virtual_size, read_only): 'xenstore_data': {}, 'other_config': {}, 'sm_config': {}, - 'tags': [] }) + 'tags': []}) logging.debug('Created VDI %s (%s, %s, %s) on %s.', vdi_ref, name_label, virtual_size, read_only, sr_ref) return vdi_ref diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py new file mode 100755 index 000000000..695bf3448 --- /dev/null +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenstore.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python + +# Copyright (c) 2010 Citrix Systems, Inc. +# Copyright 2010 OpenStack LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# +# XenAPI plugin for reading/writing information to xenstore +# + +try: + import json +except ImportError: + import simplejson as json +import subprocess + +import XenAPIPlugin + +import pluginlib_nova as pluginlib +pluginlib.configure_logging("xenstore") + + +def jsonify(fnc): + def wrapper(*args, **kwargs): + return json.dumps(fnc(*args, **kwargs)) + return wrapper + + +@jsonify +def read_record(self, arg_dict): + """Returns the value stored at the given path for the given dom_id. + These must be encoded as key/value pairs in arg_dict. You can + optinally include a key 'ignore_missing_path'; if this is present + and boolean True, attempting to read a non-existent path will return + the string 'None' instead of raising an exception. + """ + cmd = "xenstore-read /local/domain/%(dom_id)s/%(path)s" % arg_dict + try: + return _run_command(cmd).rstrip("\n") + except pluginlib.PluginError, e: + if arg_dict.get("ignore_missing_path", False): + cmd = "xenstore-exists /local/domain/%(dom_id)s/%(path)s; echo $?" + cmd = cmd % arg_dict + ret = _run_command(cmd).strip() + # If the path exists, the cmd should return "0" + if ret != "0": + # No such path, so ignore the error and return the + # string 'None', since None can't be marshalled + # over RPC. + return "None" + # Either we shouldn't ignore path errors, or another + # error was hit. Re-raise. + raise + + +@jsonify +def write_record(self, arg_dict): + """Writes to xenstore at the specified path. If there is information + already stored in that location, it is overwritten. As in read_record, + the dom_id and path must be specified in the arg_dict; additionally, + you must specify a 'value' key, whose value must be a string. Typically, + you can json-ify more complex values and store the json output. + """ + cmd = "xenstore-write /local/domain/%(dom_id)s/%(path)s '%(value)s'" + cmd = cmd % arg_dict + _run_command(cmd) + return arg_dict["value"] + + +@jsonify +def list_records(self, arg_dict): + """Returns all the stored data at or below the given path for the + given dom_id. The data is returned as a json-ified dict, with the + path as the key and the stored value as the value. If the path + doesn't exist, an empty dict is returned. + """ + cmd = "xenstore-ls /local/domain/%(dom_id)s/%(path)s" % arg_dict + cmd = cmd.rstrip("/") + try: + recs = _run_command(cmd) + except pluginlib.PluginError, e: + if "No such file or directory" in "%s" % e: + # Path doesn't exist. + return {} + return str(e) + raise + base_path = arg_dict["path"] + paths = _paths_from_ls(recs) + ret = {} + for path in paths: + if base_path: + arg_dict["path"] = "%s/%s" % (base_path, path) + else: + arg_dict["path"] = path + rec = read_record(self, arg_dict) + try: + val = json.loads(rec) + except ValueError: + val = rec + ret[path] = val + return ret + + +@jsonify +def delete_record(self, arg_dict): + """Just like it sounds: it removes the record for the specified + VM and the specified path from xenstore. + """ + cmd = "xenstore-rm /local/domain/%(dom_id)s/%(path)s" % arg_dict + return _run_command(cmd) + + +def _paths_from_ls(recs): + """The xenstore-ls command returns a listing that isn't terribly + useful. This method cleans that up into a dict with each path + as the key, and the associated string as the value. + """ + ret = {} + last_nm = "" + level = 0 + path = [] + ret = [] + for ln in recs.splitlines(): + nm, val = ln.rstrip().split(" = ") + barename = nm.lstrip() + this_level = len(nm) - len(barename) + if this_level == 0: + ret.append(barename) + level = 0 + path = [] + elif this_level == level: + # child of same parent + ret.append("%s/%s" % ("/".join(path), barename)) + elif this_level > level: + path.append(last_nm) + ret.append("%s/%s" % ("/".join(path), barename)) + level = this_level + elif this_level < level: + path = path[:this_level] + ret.append("%s/%s" % ("/".join(path), barename)) + level = this_level + last_nm = barename + return ret + + +def _run_command(cmd): + """Abstracts out the basics of issuing system commands. If the command + returns anything in stderr, a PluginError is raised with that information. + Otherwise, the output from stdout is returned. + """ + pipe = subprocess.PIPE + proc = subprocess.Popen([cmd], shell=True, stdin=pipe, stdout=pipe, + stderr=pipe, close_fds=True) + proc.wait() + err = proc.stderr.read() + if err: + raise pluginlib.PluginError(err) + return proc.stdout.read() + + +if __name__ == "__main__": + XenAPIPlugin.dispatch( + {"read_record": read_record, + "write_record": write_record, + "list_records": list_records, + "delete_record": delete_record}) |
