From b3feee7425334f4f2369edc100ed4422e60e2288 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Fri, 16 Jul 2010 19:58:12 +0000 Subject: remove calls to runthis from node --- nova/compute/node.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) (limited to 'nova/compute') diff --git a/nova/compute/node.py b/nova/compute/node.py index 3abd20120..3e39e65fc 100644 --- a/nova/compute/node.py +++ b/nova/compute/node.py @@ -223,16 +223,20 @@ class Node(object, service.Service): volume_id = None, mountpoint = None): volume = storage.get_volume(volume_id) yield self._init_aoe() - yield utils.runthis("Attached Volume: %s", - "sudo virsh attach-disk %s /dev/etherd/%s %s" - % (instance_id, volume['aoe_device'], mountpoint.split("/")[-1])) + yield process.SharedPool().simple_execute( + "sudo virsh attach-disk %s /dev/etherd/%s %s" % + (instance_id, + volume['aoe_device'], + mountpoint.rpartition('/dev/')[2])) volume.finish_attach() defer.returnValue(True) + @defer.inlineCallbacks def _init_aoe(self): - utils.runthis("Doin an AoE discover, returns %s", "sudo aoe-discover") - utils.runthis("Doin an AoE stat, returns %s", "sudo aoe-stat") + yield process.SharedPool().simple_execute("sudo aoe-discover") + yield process.SharedPool().simple_execute("sudo aoe-stat") + @defer.inlineCallbacks @exception.wrap_exception def detach_volume(self, instance_id, volume_id): """ detach a volume from an instance """ @@ -240,10 +244,10 @@ class Node(object, service.Service): # name without the leading /dev/ volume = storage.get_volume(volume_id) target = volume['mountpoint'].rpartition('/dev/')[2] - utils.runthis("Detached Volume: %s", "sudo virsh detach-disk %s %s " - % (instance_id, target)) + yield process.SharedPool().simple_execute( + "sudo virsh detach-disk %s %s " % (instance_id, target)) volume.finish_detach() - return defer.succeed(True) + defer.returnValue(True) class Group(object): -- cgit From 382381f74ca3423958add26b2578c4e77282a9a0 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Fri, 16 Jul 2010 20:50:08 +0000 Subject: simplify call to simple_execute --- nova/compute/node.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'nova/compute') diff --git a/nova/compute/node.py b/nova/compute/node.py index 8874ef17e..7cae86d02 100644 --- a/nova/compute/node.py +++ b/nova/compute/node.py @@ -223,7 +223,7 @@ class Node(object, service.Service): volume_id = None, mountpoint = None): volume = storage.get_volume(volume_id) yield self._init_aoe() - yield process.SharedPool().simple_execute( + yield process.simple_execute( "sudo virsh attach-disk %s /dev/etherd/%s %s" % (instance_id, volume['aoe_device'], @@ -233,8 +233,8 @@ class Node(object, service.Service): @defer.inlineCallbacks def _init_aoe(self): - yield process.SharedPool().simple_execute("sudo aoe-discover") - yield process.SharedPool().simple_execute("sudo aoe-stat") + yield process.simple_execute("sudo aoe-discover") + yield process.simple_execute("sudo aoe-stat") @defer.inlineCallbacks @exception.wrap_exception @@ -244,7 +244,7 @@ class Node(object, service.Service): # name without the leading /dev/ volume = storage.get_volume(volume_id) target = volume['mountpoint'].rpartition('/dev/')[2] - yield process.SharedPool().simple_execute( + yield process.simple_execute( "sudo virsh detach-disk %s %s " % (instance_id, target)) volume.finish_detach() defer.returnValue(True) -- cgit From a3ca587654095ffd4b97103302fb0744e505e332 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Mon, 19 Jul 2010 13:19:26 -0500 Subject: Massive refactor of users.py Split users.py into manager.py and ldpadriver.py Added tons of docstrings Cleaned up public methods Simplified manager singleton handling --- nova/compute/network.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'nova/compute') diff --git a/nova/compute/network.py b/nova/compute/network.py index 90d6b2dc6..370e2bf44 100644 --- a/nova/compute/network.py +++ b/nova/compute/network.py @@ -29,7 +29,7 @@ from nova import datastore from nova import exception from nova import flags from nova import utils -from nova.auth import users +from nova.auth import manager from nova.compute import exception as compute_exception from nova.compute import linux_net @@ -209,11 +209,11 @@ class BaseNetwork(datastore.BasicModel): @property def user(self): - return users.UserManager.instance().get_user(self['user_id']) + return manager.AuthManager().get_user(self['user_id']) @property def project(self): - return users.UserManager.instance().get_project(self['project_id']) + return manager.AuthManager().get_project(self['project_id']) @property def _hosts_key(self): @@ -511,7 +511,7 @@ def get_vlan_for_project(project_id): if not known_vlans.has_key(vstr): return Vlan.create(project_id, vnum) old_project_id = known_vlans[vstr] - if not users.UserManager.instance().get_project(old_project_id): + if not manager.AuthManager().get_project(old_project_id): vlan = Vlan.lookup(old_project_id) if vlan: # NOTE(todd): This doesn't check for vlan id match, because @@ -537,7 +537,7 @@ def get_network_by_interface(iface, security_group='default'): def get_network_by_address(address): logging.debug("Get Network By Address: %s" % address) - for project in users.UserManager.instance().get_projects(): + for project in manager.AuthManager().get_projects(): net = get_project_network(project.id) if address in net.assigned: logging.debug("Found %s in %s" % (address, project.id)) @@ -577,7 +577,7 @@ def get_project_network(project_id, security_group='default'): """ get a project's private network, allocating one if needed """ # TODO(todd): It looks goofy to get a project from a UserManager. # Refactor to still use the LDAP backend, but not User specific. - project = users.UserManager.instance().get_project(project_id) + project = manager.AuthManager().get_project(project_id) if not project: raise exception.Error("Project %s doesn't exist, uhoh." % project_id) @@ -587,5 +587,5 @@ def get_project_network(project_id, security_group='default'): def restart_nets(): """ Ensure the network for each user is enabled""" - for project in users.UserManager.instance().get_projects(): + for project in manager.AuthManager().get_projects(): get_project_network(project.id).express() -- cgit From 3b982f62bb7cb92cbe9e96a1bde4410b3d06f997 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Wed, 21 Jul 2010 14:42:22 -0500 Subject: refactor daemons to use common base class in preparation for network refactor --- nova/compute/computenode.py | 582 ++++++++++++++++++++++++++++++++++++++++++++ nova/compute/node.py | 581 ------------------------------------------- 2 files changed, 582 insertions(+), 581 deletions(-) create mode 100644 nova/compute/computenode.py delete mode 100644 nova/compute/node.py (limited to 'nova/compute') diff --git a/nova/compute/computenode.py b/nova/compute/computenode.py new file mode 100644 index 000000000..f26da1996 --- /dev/null +++ b/nova/compute/computenode.py @@ -0,0 +1,582 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Compute Node: + + Runs on each compute node, managing the + hypervisor using libvirt. + +""" + +import base64 +import json +import logging +import os +import shutil +import sys +from twisted.internet import defer +from twisted.internet import task +from twisted.application import service + + +try: + import libvirt +except Exception, err: + logging.warning('no libvirt found') + +from nova import exception +from nova import fakevirt +from nova import flags +from nova import node +from nova import process +from nova import utils +from nova.compute import disk +from nova.compute import model +from nova.compute import network +from nova.objectstore import image # for image_path flag +from nova.volume import volumenode + + +FLAGS = flags.FLAGS +flags.DEFINE_string('libvirt_xml_template', + utils.abspath('compute/libvirt.xml.template'), + 'Libvirt XML Template') +flags.DEFINE_bool('use_s3', True, + 'whether to get images from s3 or use local copy') +flags.DEFINE_string('instances_path', utils.abspath('../instances'), + 'where instances are stored on disk') + +INSTANCE_TYPES = {} +INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} +INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} +INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} +INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} + + +def _image_path(path=''): + return os.path.join(FLAGS.images_path, path) + + +def _image_url(path): + return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) + + +class ComputeNode(node.Node): + """ + Manages the running instances. + """ + def __init__(self): + """ load configuration options for this node and connect to libvirt """ + super(ComputeNode, self).__init__() + self._instances = {} + self._conn = self._get_connection() + self.instdir = model.InstanceDirectory() + # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe + + def _get_connection(self): + """ returns a libvirt connection object """ + # TODO(termie): maybe lazy load after initial check for permissions + # TODO(termie): check whether we can be disconnected + if FLAGS.fake_libvirt: + conn = fakevirt.FakeVirtConnection.instance() + else: + auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], + 'root', + None] + conn = libvirt.openAuth('qemu:///system', auth, 0) + if conn == None: + logging.error('Failed to open connection to the hypervisor') + sys.exit(1) + return conn + + def noop(self): + """ simple test of an AMQP message call """ + return defer.succeed('PONG') + + def get_instance(self, instance_id): + # inst = self.instdir.get(instance_id) + # return inst + if self.instdir.exists(instance_id): + return Instance.fromName(self._conn, instance_id) + return None + + @exception.wrap_exception + def adopt_instances(self): + """ if there are instances already running, adopt them """ + return defer.succeed(0) + instance_names = [self._conn.lookupByID(x).name() + for x in self._conn.listDomainsID()] + for name in instance_names: + try: + new_inst = Instance.fromName(self._conn, name) + new_inst.update_state() + except: + pass + return defer.succeed(len(self._instances)) + + @exception.wrap_exception + def describe_instances(self): + retval = {} + for inst in self.instdir.by_node(FLAGS.node_name): + retval[inst['instance_id']] = ( + Instance.fromName(self._conn, inst['instance_id'])) + return retval + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except model.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield + + # @exception.wrap_exception + def run_instance(self, instance_id, **_kwargs): + """ launch a new instance with specified options """ + logging.debug("Starting instance %s..." % (instance_id)) + inst = self.instdir.get(instance_id) + if not FLAGS.simple_network: + # TODO: Get the real security group of launch in here + security_group = "default" + net = network.BridgedNetwork.get_network_for_project(inst['user_id'], + inst['project_id'], + security_group).express() + inst['node_name'] = FLAGS.node_name + inst.save() + # TODO(vish) check to make sure the availability zone matches + new_inst = Instance(self._conn, name=instance_id, data=inst) + logging.info("Instances current state is %s", new_inst.state) + if new_inst.is_running(): + raise exception.Error("Instance is already running") + d = new_inst.spawn() + return d + + @exception.wrap_exception + def terminate_instance(self, instance_id): + """ terminate an instance on this machine """ + logging.debug("Got told to terminate instance %s" % instance_id) + instance = self.get_instance(instance_id) + # inst = self.instdir.get(instance_id) + if not instance: + raise exception.Error( + 'trying to terminate unknown instance: %s' % instance_id) + d = instance.destroy() + # d.addCallback(lambda x: inst.destroy()) + return d + + @exception.wrap_exception + def reboot_instance(self, instance_id): + """ reboot an instance on this server + KVM doesn't support reboot, so we terminate and restart """ + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to reboot unknown instance: %s' % instance_id) + return instance.reboot() + + @defer.inlineCallbacks + @exception.wrap_exception + def get_console_output(self, instance_id): + """ send the console output for an instance """ + logging.debug("Getting console output for %s" % (instance_id)) + inst = self.instdir.get(instance_id) + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to get console log for unknown: %s' % instance_id) + rv = yield instance.console_output() + # TODO(termie): this stuff belongs in the API layer, no need to + # munge the data we send to ourselves + output = {"InstanceId" : instance_id, + "Timestamp" : "2", + "output" : base64.b64encode(rv)} + defer.returnValue(output) + + @defer.inlineCallbacks + @exception.wrap_exception + def attach_volume(self, instance_id = None, + volume_id = None, mountpoint = None): + volume = volumenode.get_volume(volume_id) + yield self._init_aoe() + yield process.simple_execute( + "sudo virsh attach-disk %s /dev/etherd/%s %s" % + (instance_id, + volume['aoe_device'], + mountpoint.rpartition('/dev/')[2])) + volume.finish_attach() + defer.returnValue(True) + + @defer.inlineCallbacks + def _init_aoe(self): + yield process.simple_execute("sudo aoe-discover") + yield process.simple_execute("sudo aoe-stat") + + @defer.inlineCallbacks + @exception.wrap_exception + def detach_volume(self, instance_id, volume_id): + """ detach a volume from an instance """ + # despite the documentation, virsh detach-disk just wants the device + # name without the leading /dev/ + volume = volumenode.get_volume(volume_id) + target = volume['mountpoint'].rpartition('/dev/')[2] + yield process.simple_execute( + "sudo virsh detach-disk %s %s " % (instance_id, target)) + volume.finish_detach() + defer.returnValue(True) + + +class Group(object): + def __init__(self, group_id): + self.group_id = group_id + + +class ProductCode(object): + def __init__(self, product_code): + self.product_code = product_code + + +class Instance(object): + + NOSTATE = 0x00 + RUNNING = 0x01 + BLOCKED = 0x02 + PAUSED = 0x03 + SHUTDOWN = 0x04 + SHUTOFF = 0x05 + CRASHED = 0x06 + + def __init__(self, conn, name, data): + """ spawn an instance with a given name """ + self._conn = conn + # TODO(vish): this can be removed after data has been updated + # data doesn't seem to have a working iterator so in doesn't work + if data.get('owner_id', None) is not None: + data['user_id'] = data['owner_id'] + data['project_id'] = data['owner_id'] + self.datamodel = data + + size = data.get('instance_type', FLAGS.default_instance_type) + if size not in INSTANCE_TYPES: + raise exception.Error('invalid instance type: %s' % size) + + self.datamodel.update(INSTANCE_TYPES[size]) + + self.datamodel['name'] = name + self.datamodel['instance_id'] = name + self.datamodel['basepath'] = data.get( + 'basepath', os.path.abspath( + os.path.join(FLAGS.instances_path, self.name))) + self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 + self.datamodel.setdefault('image_id', FLAGS.default_image) + self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) + self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) + self.datamodel.setdefault('project_id', self.datamodel['user_id']) + self.datamodel.setdefault('bridge_name', None) + #self.datamodel.setdefault('key_data', None) + #self.datamodel.setdefault('key_name', None) + #self.datamodel.setdefault('addressing_type', None) + + # TODO(joshua) - The ugly non-flat ones + self.datamodel['groups'] = data.get('security_group', 'default') + # TODO(joshua): Support product codes somehow + self.datamodel.setdefault('product_codes', None) + + self.datamodel.save() + logging.debug("Finished init of Instance with id of %s" % name) + + def toXml(self): + # TODO(termie): cache? + logging.debug("Starting the toXML method") + libvirt_xml = open(FLAGS.libvirt_xml_template).read() + xml_info = self.datamodel.copy() + # TODO(joshua): Make this xml express the attached disks as well + + # TODO(termie): lazy lazy hack because xml is annoying + xml_info['nova'] = json.dumps(self.datamodel.copy()) + libvirt_xml = libvirt_xml % xml_info + logging.debug("Finished the toXML method") + + return libvirt_xml + + @classmethod + def fromName(cls, conn, name): + """ use the saved data for reloading the instance """ + instdir = model.InstanceDirectory() + instance = instdir.get(name) + return cls(conn=conn, name=name, data=instance) + + def set_state(self, state_code, state_description=None): + self.datamodel['state'] = state_code + if not state_description: + state_description = STATE_NAMES[state_code] + self.datamodel['state_description'] = state_description + self.datamodel.save() + + @property + def state(self): + # it is a string in datamodel + return int(self.datamodel['state']) + + @property + def name(self): + return self.datamodel['name'] + + def is_pending(self): + return (self.state == Instance.NOSTATE or self.state == 'pending') + + def is_destroyed(self): + return self.state == Instance.SHUTOFF + + def is_running(self): + logging.debug("Instance state is: %s" % self.state) + return (self.state == Instance.RUNNING or self.state == 'running') + + def describe(self): + return self.datamodel + + def info(self): + logging.debug("Getting info for dom %s" % self.name) + virt_dom = self._conn.lookupByName(self.name) + (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() + return {'state': state, + 'max_mem': max_mem, + 'mem': mem, + 'num_cpu': num_cpu, + 'cpu_time': cpu_time, + 'node_name': FLAGS.node_name} + + def basepath(self, path=''): + return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) + + def update_state(self): + self.datamodel.update(self.info()) + self.set_state(self.state) + self.datamodel.save() # Extra, but harmless + + @exception.wrap_exception + def destroy(self): + if self.is_destroyed(): + self.datamodel.destroy() + raise exception.Error('trying to destroy already destroyed' + ' instance: %s' % self.name) + + self.set_state(Instance.NOSTATE, 'shutting_down') + try: + virt_dom = self._conn.lookupByName(self.name) + virt_dom.destroy() + except Exception, _err: + pass + # If the instance is already terminated, we're still happy + d = defer.Deferred() + d.addCallback(lambda x: self._cleanup()) + d.addCallback(lambda x: self.datamodel.destroy()) + # TODO(termie): short-circuit me for tests + # WE'LL save this for when we do shutdown, + # instead of destroy - but destroy returns immediately + timer = task.LoopingCall(f=None) + def _wait_for_shutdown(): + try: + self.update_state() + if self.state == Instance.SHUTDOWN: + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_shutdown + timer.start(interval=0.5, now=True) + return d + + def _cleanup(self): + target = os.path.abspath(self.datamodel['basepath']) + logging.info("Deleting instance files at %s", target) + shutil.rmtree(target) + + @defer.inlineCallbacks + @exception.wrap_exception + def reboot(self): + if not self.is_running(): + raise exception.Error( + 'trying to reboot a non-running' + 'instance: %s (state: %s)' % (self.name, self.state)) + + logging.debug('rebooting instance %s' % self.name) + self.set_state(Instance.NOSTATE, 'rebooting') + yield self._conn.lookupByName(self.name).destroy() + self._conn.createXML(self.toXml(), 0) + + d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_reboot(): + try: + self.update_state() + if self.is_running(): + logging.debug('rebooted instance %s' % self.name) + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_reboot + timer.start(interval=0.5, now=True) + yield d + + def _fetch_s3_image(self, image, path): + url = _image_url('%s/image' % image) + d = process.simple_execute( + 'curl --silent %s -o %s' % (url, path)) + return d + + def _fetch_local_image(self, image, path): + source = _image_path('%s/image' % image) + d = process.simple_execute('cp %s %s' % (source, path)) + return d + + @defer.inlineCallbacks + def _create_image(self, libvirt_xml): + # syntactic nicety + data = self.datamodel + basepath = self.basepath + + # ensure directories exist and are writable + yield process.simple_execute( + 'mkdir -p %s' % basepath()) + yield process.simple_execute( + 'chmod 0777 %s' % basepath()) + + + # TODO(termie): these are blocking calls, it would be great + # if they weren't. + logging.info('Creating image for: %s', data['instance_id']) + f = open(basepath('libvirt.xml'), 'w') + f.write(libvirt_xml) + f.close() + + if FLAGS.fake_libvirt: + logging.info('fake_libvirt, nothing to do for create_image') + raise defer.returnValue(None); + + if FLAGS.use_s3: + _fetch_file = self._fetch_s3_image + else: + _fetch_file = self._fetch_local_image + + if not os.path.exists(basepath('disk')): + yield _fetch_file(data['image_id'], basepath('disk-raw')) + if not os.path.exists(basepath('kernel')): + yield _fetch_file(data['kernel_id'], basepath('kernel')) + if not os.path.exists(basepath('ramdisk')): + yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) + + execute = lambda cmd, input=None: \ + process.simple_execute(cmd=cmd, + input=input, + error_ok=1) + + key = data['key_data'] + net = None + if FLAGS.simple_network: + with open(FLAGS.simple_network_template) as f: + net = f.read() % {'address': data['private_dns_name'], + 'network': FLAGS.simple_network_network, + 'netmask': FLAGS.simple_network_netmask, + 'gateway': FLAGS.simple_network_gateway, + 'broadcast': FLAGS.simple_network_broadcast, + 'dns': FLAGS.simple_network_dns} + if key or net: + logging.info('Injecting data into image %s', data['image_id']) + yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) + + if os.path.exists(basepath('disk')): + yield process.simple_execute( + 'rm -f %s' % basepath('disk')) + + bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] + * 1024 * 1024 * 1024) + yield disk.partition( + basepath('disk-raw'), basepath('disk'), bytes, execute=execute) + + @defer.inlineCallbacks + @exception.wrap_exception + def spawn(self): + self.set_state(Instance.NOSTATE, 'spawning') + logging.debug("Starting spawn in Instance") + + xml = self.toXml() + self.set_state(Instance.NOSTATE, 'launching') + logging.info('self %s', self) + try: + yield self._create_image(xml) + self._conn.createXML(xml, 0) + # TODO(termie): this should actually register + # a callback to check for successful boot + logging.debug("Instance is running") + + local_d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_boot(): + try: + self.update_state() + if self.is_running(): + logging.debug('booted instance %s' % self.name) + timer.stop() + local_d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + logging.error('Failed to boot instance %s' % self.name) + timer.stop() + local_d.callback(None) + timer.f = _wait_for_boot + timer.start(interval=0.5, now=True) + except Exception, ex: + logging.debug(ex) + self.set_state(Instance.SHUTDOWN) + + @exception.wrap_exception + def console_output(self): + if not FLAGS.fake_libvirt: + fname = os.path.abspath( + os.path.join(self.datamodel['basepath'], 'console.log')) + with open(fname, 'r') as f: + console = f.read() + else: + console = 'FAKE CONSOLE OUTPUT' + return defer.succeed(console) + +STATE_NAMES = { + Instance.NOSTATE : 'pending', + Instance.RUNNING : 'running', + Instance.BLOCKED : 'blocked', + Instance.PAUSED : 'paused', + Instance.SHUTDOWN : 'shutdown', + Instance.SHUTOFF : 'shutdown', + Instance.CRASHED : 'crashed', +} diff --git a/nova/compute/node.py b/nova/compute/node.py deleted file mode 100644 index 7cae86d02..000000000 --- a/nova/compute/node.py +++ /dev/null @@ -1,581 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Compute Node: - - Runs on each compute node, managing the - hypervisor using libvirt. - -""" - -import base64 -import json -import logging -import os -import shutil -import sys -from twisted.internet import defer -from twisted.internet import task -from twisted.application import service - - -try: - import libvirt -except Exception, err: - logging.warning('no libvirt found') - -from nova import exception -from nova import fakevirt -from nova import flags -from nova import process -from nova import utils -from nova.compute import disk -from nova.compute import model -from nova.compute import network -from nova.objectstore import image # for image_path flag -from nova.volume import storage - - -FLAGS = flags.FLAGS -flags.DEFINE_string('libvirt_xml_template', - utils.abspath('compute/libvirt.xml.template'), - 'Libvirt XML Template') -flags.DEFINE_bool('use_s3', True, - 'whether to get images from s3 or use local copy') -flags.DEFINE_string('instances_path', utils.abspath('../instances'), - 'where instances are stored on disk') - -INSTANCE_TYPES = {} -INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} -INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} -INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} -INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} - - -def _image_path(path=''): - return os.path.join(FLAGS.images_path, path) - - -def _image_url(path): - return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) - - -class Node(object, service.Service): - """ - Manages the running instances. - """ - def __init__(self): - """ load configuration options for this node and connect to libvirt """ - super(Node, self).__init__() - self._instances = {} - self._conn = self._get_connection() - self.instdir = model.InstanceDirectory() - # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe - - def _get_connection(self): - """ returns a libvirt connection object """ - # TODO(termie): maybe lazy load after initial check for permissions - # TODO(termie): check whether we can be disconnected - if FLAGS.fake_libvirt: - conn = fakevirt.FakeVirtConnection.instance() - else: - auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], - 'root', - None] - conn = libvirt.openAuth('qemu:///system', auth, 0) - if conn == None: - logging.error('Failed to open connection to the hypervisor') - sys.exit(1) - return conn - - def noop(self): - """ simple test of an AMQP message call """ - return defer.succeed('PONG') - - def get_instance(self, instance_id): - # inst = self.instdir.get(instance_id) - # return inst - if self.instdir.exists(instance_id): - return Instance.fromName(self._conn, instance_id) - return None - - @exception.wrap_exception - def adopt_instances(self): - """ if there are instances already running, adopt them """ - return defer.succeed(0) - instance_names = [self._conn.lookupByID(x).name() - for x in self._conn.listDomainsID()] - for name in instance_names: - try: - new_inst = Instance.fromName(self._conn, name) - new_inst.update_state() - except: - pass - return defer.succeed(len(self._instances)) - - @exception.wrap_exception - def describe_instances(self): - retval = {} - for inst in self.instdir.by_node(FLAGS.node_name): - retval[inst['instance_id']] = ( - Instance.fromName(self._conn, inst['instance_id'])) - return retval - - @defer.inlineCallbacks - def report_state(self, nodename, daemon): - # TODO(termie): make this pattern be more elegant. -todd - try: - record = model.Daemon(nodename, daemon) - record.heartbeat() - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - except model.ConnectionError, ex: - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - yield - - # @exception.wrap_exception - def run_instance(self, instance_id, **_kwargs): - """ launch a new instance with specified options """ - logging.debug("Starting instance %s..." % (instance_id)) - inst = self.instdir.get(instance_id) - if not FLAGS.simple_network: - # TODO: Get the real security group of launch in here - security_group = "default" - net = network.BridgedNetwork.get_network_for_project(inst['user_id'], - inst['project_id'], - security_group).express() - inst['node_name'] = FLAGS.node_name - inst.save() - # TODO(vish) check to make sure the availability zone matches - new_inst = Instance(self._conn, name=instance_id, data=inst) - logging.info("Instances current state is %s", new_inst.state) - if new_inst.is_running(): - raise exception.Error("Instance is already running") - d = new_inst.spawn() - return d - - @exception.wrap_exception - def terminate_instance(self, instance_id): - """ terminate an instance on this machine """ - logging.debug("Got told to terminate instance %s" % instance_id) - instance = self.get_instance(instance_id) - # inst = self.instdir.get(instance_id) - if not instance: - raise exception.Error( - 'trying to terminate unknown instance: %s' % instance_id) - d = instance.destroy() - # d.addCallback(lambda x: inst.destroy()) - return d - - @exception.wrap_exception - def reboot_instance(self, instance_id): - """ reboot an instance on this server - KVM doesn't support reboot, so we terminate and restart """ - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to reboot unknown instance: %s' % instance_id) - return instance.reboot() - - @defer.inlineCallbacks - @exception.wrap_exception - def get_console_output(self, instance_id): - """ send the console output for an instance """ - logging.debug("Getting console output for %s" % (instance_id)) - inst = self.instdir.get(instance_id) - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to get console log for unknown: %s' % instance_id) - rv = yield instance.console_output() - # TODO(termie): this stuff belongs in the API layer, no need to - # munge the data we send to ourselves - output = {"InstanceId" : instance_id, - "Timestamp" : "2", - "output" : base64.b64encode(rv)} - defer.returnValue(output) - - @defer.inlineCallbacks - @exception.wrap_exception - def attach_volume(self, instance_id = None, - volume_id = None, mountpoint = None): - volume = storage.get_volume(volume_id) - yield self._init_aoe() - yield process.simple_execute( - "sudo virsh attach-disk %s /dev/etherd/%s %s" % - (instance_id, - volume['aoe_device'], - mountpoint.rpartition('/dev/')[2])) - volume.finish_attach() - defer.returnValue(True) - - @defer.inlineCallbacks - def _init_aoe(self): - yield process.simple_execute("sudo aoe-discover") - yield process.simple_execute("sudo aoe-stat") - - @defer.inlineCallbacks - @exception.wrap_exception - def detach_volume(self, instance_id, volume_id): - """ detach a volume from an instance """ - # despite the documentation, virsh detach-disk just wants the device - # name without the leading /dev/ - volume = storage.get_volume(volume_id) - target = volume['mountpoint'].rpartition('/dev/')[2] - yield process.simple_execute( - "sudo virsh detach-disk %s %s " % (instance_id, target)) - volume.finish_detach() - defer.returnValue(True) - - -class Group(object): - def __init__(self, group_id): - self.group_id = group_id - - -class ProductCode(object): - def __init__(self, product_code): - self.product_code = product_code - - -class Instance(object): - - NOSTATE = 0x00 - RUNNING = 0x01 - BLOCKED = 0x02 - PAUSED = 0x03 - SHUTDOWN = 0x04 - SHUTOFF = 0x05 - CRASHED = 0x06 - - def __init__(self, conn, name, data): - """ spawn an instance with a given name """ - self._conn = conn - # TODO(vish): this can be removed after data has been updated - # data doesn't seem to have a working iterator so in doesn't work - if data.get('owner_id', None) is not None: - data['user_id'] = data['owner_id'] - data['project_id'] = data['owner_id'] - self.datamodel = data - - size = data.get('instance_type', FLAGS.default_instance_type) - if size not in INSTANCE_TYPES: - raise exception.Error('invalid instance type: %s' % size) - - self.datamodel.update(INSTANCE_TYPES[size]) - - self.datamodel['name'] = name - self.datamodel['instance_id'] = name - self.datamodel['basepath'] = data.get( - 'basepath', os.path.abspath( - os.path.join(FLAGS.instances_path, self.name))) - self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 - self.datamodel.setdefault('image_id', FLAGS.default_image) - self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) - self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) - self.datamodel.setdefault('project_id', self.datamodel['user_id']) - self.datamodel.setdefault('bridge_name', None) - #self.datamodel.setdefault('key_data', None) - #self.datamodel.setdefault('key_name', None) - #self.datamodel.setdefault('addressing_type', None) - - # TODO(joshua) - The ugly non-flat ones - self.datamodel['groups'] = data.get('security_group', 'default') - # TODO(joshua): Support product codes somehow - self.datamodel.setdefault('product_codes', None) - - self.datamodel.save() - logging.debug("Finished init of Instance with id of %s" % name) - - def toXml(self): - # TODO(termie): cache? - logging.debug("Starting the toXML method") - libvirt_xml = open(FLAGS.libvirt_xml_template).read() - xml_info = self.datamodel.copy() - # TODO(joshua): Make this xml express the attached disks as well - - # TODO(termie): lazy lazy hack because xml is annoying - xml_info['nova'] = json.dumps(self.datamodel.copy()) - libvirt_xml = libvirt_xml % xml_info - logging.debug("Finished the toXML method") - - return libvirt_xml - - @classmethod - def fromName(cls, conn, name): - """ use the saved data for reloading the instance """ - instdir = model.InstanceDirectory() - instance = instdir.get(name) - return cls(conn=conn, name=name, data=instance) - - def set_state(self, state_code, state_description=None): - self.datamodel['state'] = state_code - if not state_description: - state_description = STATE_NAMES[state_code] - self.datamodel['state_description'] = state_description - self.datamodel.save() - - @property - def state(self): - # it is a string in datamodel - return int(self.datamodel['state']) - - @property - def name(self): - return self.datamodel['name'] - - def is_pending(self): - return (self.state == Instance.NOSTATE or self.state == 'pending') - - def is_destroyed(self): - return self.state == Instance.SHUTOFF - - def is_running(self): - logging.debug("Instance state is: %s" % self.state) - return (self.state == Instance.RUNNING or self.state == 'running') - - def describe(self): - return self.datamodel - - def info(self): - logging.debug("Getting info for dom %s" % self.name) - virt_dom = self._conn.lookupByName(self.name) - (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() - return {'state': state, - 'max_mem': max_mem, - 'mem': mem, - 'num_cpu': num_cpu, - 'cpu_time': cpu_time, - 'node_name': FLAGS.node_name} - - def basepath(self, path=''): - return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) - - def update_state(self): - self.datamodel.update(self.info()) - self.set_state(self.state) - self.datamodel.save() # Extra, but harmless - - @exception.wrap_exception - def destroy(self): - if self.is_destroyed(): - self.datamodel.destroy() - raise exception.Error('trying to destroy already destroyed' - ' instance: %s' % self.name) - - self.set_state(Instance.NOSTATE, 'shutting_down') - try: - virt_dom = self._conn.lookupByName(self.name) - virt_dom.destroy() - except Exception, _err: - pass - # If the instance is already terminated, we're still happy - d = defer.Deferred() - d.addCallback(lambda x: self._cleanup()) - d.addCallback(lambda x: self.datamodel.destroy()) - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, - # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) - def _wait_for_shutdown(): - try: - self.update_state() - if self.state == Instance.SHUTDOWN: - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d - - def _cleanup(self): - target = os.path.abspath(self.datamodel['basepath']) - logging.info("Deleting instance files at %s", target) - shutil.rmtree(target) - - @defer.inlineCallbacks - @exception.wrap_exception - def reboot(self): - if not self.is_running(): - raise exception.Error( - 'trying to reboot a non-running' - 'instance: %s (state: %s)' % (self.name, self.state)) - - logging.debug('rebooting instance %s' % self.name) - self.set_state(Instance.NOSTATE, 'rebooting') - yield self._conn.lookupByName(self.name).destroy() - self._conn.createXML(self.toXml(), 0) - - d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_reboot(): - try: - self.update_state() - if self.is_running(): - logging.debug('rebooted instance %s' % self.name) - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d - - def _fetch_s3_image(self, image, path): - url = _image_url('%s/image' % image) - d = process.simple_execute( - 'curl --silent %s -o %s' % (url, path)) - return d - - def _fetch_local_image(self, image, path): - source = _image_path('%s/image' % image) - d = process.simple_execute('cp %s %s' % (source, path)) - return d - - @defer.inlineCallbacks - def _create_image(self, libvirt_xml): - # syntactic nicety - data = self.datamodel - basepath = self.basepath - - # ensure directories exist and are writable - yield process.simple_execute( - 'mkdir -p %s' % basepath()) - yield process.simple_execute( - 'chmod 0777 %s' % basepath()) - - - # TODO(termie): these are blocking calls, it would be great - # if they weren't. - logging.info('Creating image for: %s', data['instance_id']) - f = open(basepath('libvirt.xml'), 'w') - f.write(libvirt_xml) - f.close() - - if FLAGS.fake_libvirt: - logging.info('fake_libvirt, nothing to do for create_image') - raise defer.returnValue(None); - - if FLAGS.use_s3: - _fetch_file = self._fetch_s3_image - else: - _fetch_file = self._fetch_local_image - - if not os.path.exists(basepath('disk')): - yield _fetch_file(data['image_id'], basepath('disk-raw')) - if not os.path.exists(basepath('kernel')): - yield _fetch_file(data['kernel_id'], basepath('kernel')) - if not os.path.exists(basepath('ramdisk')): - yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) - - execute = lambda cmd, input=None: \ - process.simple_execute(cmd=cmd, - input=input, - error_ok=1) - - key = data['key_data'] - net = None - if FLAGS.simple_network: - with open(FLAGS.simple_network_template) as f: - net = f.read() % {'address': data['private_dns_name'], - 'network': FLAGS.simple_network_network, - 'netmask': FLAGS.simple_network_netmask, - 'gateway': FLAGS.simple_network_gateway, - 'broadcast': FLAGS.simple_network_broadcast, - 'dns': FLAGS.simple_network_dns} - if key or net: - logging.info('Injecting data into image %s', data['image_id']) - yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) - - if os.path.exists(basepath('disk')): - yield process.simple_execute( - 'rm -f %s' % basepath('disk')) - - bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] - * 1024 * 1024 * 1024) - yield disk.partition( - basepath('disk-raw'), basepath('disk'), bytes, execute=execute) - - @defer.inlineCallbacks - @exception.wrap_exception - def spawn(self): - self.set_state(Instance.NOSTATE, 'spawning') - logging.debug("Starting spawn in Instance") - - xml = self.toXml() - self.set_state(Instance.NOSTATE, 'launching') - logging.info('self %s', self) - try: - yield self._create_image(xml) - self._conn.createXML(xml, 0) - # TODO(termie): this should actually register - # a callback to check for successful boot - logging.debug("Instance is running") - - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_boot(): - try: - self.update_state() - if self.is_running(): - logging.debug('booted instance %s' % self.name) - timer.stop() - local_d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - logging.error('Failed to boot instance %s' % self.name) - timer.stop() - local_d.callback(None) - timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - except Exception, ex: - logging.debug(ex) - self.set_state(Instance.SHUTDOWN) - - @exception.wrap_exception - def console_output(self): - if not FLAGS.fake_libvirt: - fname = os.path.abspath( - os.path.join(self.datamodel['basepath'], 'console.log')) - with open(fname, 'r') as f: - console = f.read() - else: - console = 'FAKE CONSOLE OUTPUT' - return defer.succeed(console) - -STATE_NAMES = { - Instance.NOSTATE : 'pending', - Instance.RUNNING : 'running', - Instance.BLOCKED : 'blocked', - Instance.PAUSED : 'paused', - Instance.SHUTDOWN : 'shutdown', - Instance.SHUTOFF : 'shutdown', - Instance.CRASHED : 'crashed', -} -- cgit From abac2033b4aeb399786735a94ce2ddf5c64e6e9e Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Fri, 23 Jul 2010 23:55:39 +0200 Subject: Add (completely untested) code to include an Authorization header for the S3 request to fetch an image. --- nova/compute/node.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'nova/compute') diff --git a/nova/compute/node.py b/nova/compute/node.py index 4683f1c8d..7c09d9583 100644 --- a/nova/compute/node.py +++ b/nova/compute/node.py @@ -25,6 +25,7 @@ Compute Node: """ import base64 +import boto.utils import json import logging import os @@ -45,11 +46,13 @@ from nova import fakevirt from nova import flags from nova import process from nova import utils +from nova.auth import signer from nova.compute import disk from nova.compute import model from nova.compute import network from nova.objectstore import image # for image_path flag from nova.volume import storage +from nova.users import UserManager FLAGS = flags.FLAGS @@ -446,8 +449,12 @@ class Instance(object): def _fetch_s3_image(self, image, path): url = _image_url('%s/image' % image) + user_id = self.datamodel['user_id'] + user = UserManager.instance().get_user(user_id) + auth = signer.Signer(user.secret.encode()).s3_authorization({}, 'GET', url) + auth_header = 'Authorization: %s:%s' % (user_id, auth) d = process.simple_execute( - 'curl --silent %s -o %s' % (url, path)) + 'curl --silent %s -o "%s"' % (url, auth_header, path)) return d def _fetch_local_image(self, image, path): -- cgit From 35fda702abf91792d3c4753a1bbccdb119eaf6eb Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Fri, 23 Jul 2010 15:27:18 -0700 Subject: renamed xxxnode to xxservice --- nova/compute/computenode.py | 582 ----------------------------------------- nova/compute/computeservice.py | 581 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 581 insertions(+), 582 deletions(-) delete mode 100644 nova/compute/computenode.py create mode 100644 nova/compute/computeservice.py (limited to 'nova/compute') diff --git a/nova/compute/computenode.py b/nova/compute/computenode.py deleted file mode 100644 index f26da1996..000000000 --- a/nova/compute/computenode.py +++ /dev/null @@ -1,582 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Compute Node: - - Runs on each compute node, managing the - hypervisor using libvirt. - -""" - -import base64 -import json -import logging -import os -import shutil -import sys -from twisted.internet import defer -from twisted.internet import task -from twisted.application import service - - -try: - import libvirt -except Exception, err: - logging.warning('no libvirt found') - -from nova import exception -from nova import fakevirt -from nova import flags -from nova import node -from nova import process -from nova import utils -from nova.compute import disk -from nova.compute import model -from nova.compute import network -from nova.objectstore import image # for image_path flag -from nova.volume import volumenode - - -FLAGS = flags.FLAGS -flags.DEFINE_string('libvirt_xml_template', - utils.abspath('compute/libvirt.xml.template'), - 'Libvirt XML Template') -flags.DEFINE_bool('use_s3', True, - 'whether to get images from s3 or use local copy') -flags.DEFINE_string('instances_path', utils.abspath('../instances'), - 'where instances are stored on disk') - -INSTANCE_TYPES = {} -INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} -INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} -INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} -INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} - - -def _image_path(path=''): - return os.path.join(FLAGS.images_path, path) - - -def _image_url(path): - return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) - - -class ComputeNode(node.Node): - """ - Manages the running instances. - """ - def __init__(self): - """ load configuration options for this node and connect to libvirt """ - super(ComputeNode, self).__init__() - self._instances = {} - self._conn = self._get_connection() - self.instdir = model.InstanceDirectory() - # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe - - def _get_connection(self): - """ returns a libvirt connection object """ - # TODO(termie): maybe lazy load after initial check for permissions - # TODO(termie): check whether we can be disconnected - if FLAGS.fake_libvirt: - conn = fakevirt.FakeVirtConnection.instance() - else: - auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], - 'root', - None] - conn = libvirt.openAuth('qemu:///system', auth, 0) - if conn == None: - logging.error('Failed to open connection to the hypervisor') - sys.exit(1) - return conn - - def noop(self): - """ simple test of an AMQP message call """ - return defer.succeed('PONG') - - def get_instance(self, instance_id): - # inst = self.instdir.get(instance_id) - # return inst - if self.instdir.exists(instance_id): - return Instance.fromName(self._conn, instance_id) - return None - - @exception.wrap_exception - def adopt_instances(self): - """ if there are instances already running, adopt them """ - return defer.succeed(0) - instance_names = [self._conn.lookupByID(x).name() - for x in self._conn.listDomainsID()] - for name in instance_names: - try: - new_inst = Instance.fromName(self._conn, name) - new_inst.update_state() - except: - pass - return defer.succeed(len(self._instances)) - - @exception.wrap_exception - def describe_instances(self): - retval = {} - for inst in self.instdir.by_node(FLAGS.node_name): - retval[inst['instance_id']] = ( - Instance.fromName(self._conn, inst['instance_id'])) - return retval - - @defer.inlineCallbacks - def report_state(self, nodename, daemon): - # TODO(termie): make this pattern be more elegant. -todd - try: - record = model.Daemon(nodename, daemon) - record.heartbeat() - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - except model.ConnectionError, ex: - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - yield - - # @exception.wrap_exception - def run_instance(self, instance_id, **_kwargs): - """ launch a new instance with specified options """ - logging.debug("Starting instance %s..." % (instance_id)) - inst = self.instdir.get(instance_id) - if not FLAGS.simple_network: - # TODO: Get the real security group of launch in here - security_group = "default" - net = network.BridgedNetwork.get_network_for_project(inst['user_id'], - inst['project_id'], - security_group).express() - inst['node_name'] = FLAGS.node_name - inst.save() - # TODO(vish) check to make sure the availability zone matches - new_inst = Instance(self._conn, name=instance_id, data=inst) - logging.info("Instances current state is %s", new_inst.state) - if new_inst.is_running(): - raise exception.Error("Instance is already running") - d = new_inst.spawn() - return d - - @exception.wrap_exception - def terminate_instance(self, instance_id): - """ terminate an instance on this machine """ - logging.debug("Got told to terminate instance %s" % instance_id) - instance = self.get_instance(instance_id) - # inst = self.instdir.get(instance_id) - if not instance: - raise exception.Error( - 'trying to terminate unknown instance: %s' % instance_id) - d = instance.destroy() - # d.addCallback(lambda x: inst.destroy()) - return d - - @exception.wrap_exception - def reboot_instance(self, instance_id): - """ reboot an instance on this server - KVM doesn't support reboot, so we terminate and restart """ - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to reboot unknown instance: %s' % instance_id) - return instance.reboot() - - @defer.inlineCallbacks - @exception.wrap_exception - def get_console_output(self, instance_id): - """ send the console output for an instance """ - logging.debug("Getting console output for %s" % (instance_id)) - inst = self.instdir.get(instance_id) - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to get console log for unknown: %s' % instance_id) - rv = yield instance.console_output() - # TODO(termie): this stuff belongs in the API layer, no need to - # munge the data we send to ourselves - output = {"InstanceId" : instance_id, - "Timestamp" : "2", - "output" : base64.b64encode(rv)} - defer.returnValue(output) - - @defer.inlineCallbacks - @exception.wrap_exception - def attach_volume(self, instance_id = None, - volume_id = None, mountpoint = None): - volume = volumenode.get_volume(volume_id) - yield self._init_aoe() - yield process.simple_execute( - "sudo virsh attach-disk %s /dev/etherd/%s %s" % - (instance_id, - volume['aoe_device'], - mountpoint.rpartition('/dev/')[2])) - volume.finish_attach() - defer.returnValue(True) - - @defer.inlineCallbacks - def _init_aoe(self): - yield process.simple_execute("sudo aoe-discover") - yield process.simple_execute("sudo aoe-stat") - - @defer.inlineCallbacks - @exception.wrap_exception - def detach_volume(self, instance_id, volume_id): - """ detach a volume from an instance """ - # despite the documentation, virsh detach-disk just wants the device - # name without the leading /dev/ - volume = volumenode.get_volume(volume_id) - target = volume['mountpoint'].rpartition('/dev/')[2] - yield process.simple_execute( - "sudo virsh detach-disk %s %s " % (instance_id, target)) - volume.finish_detach() - defer.returnValue(True) - - -class Group(object): - def __init__(self, group_id): - self.group_id = group_id - - -class ProductCode(object): - def __init__(self, product_code): - self.product_code = product_code - - -class Instance(object): - - NOSTATE = 0x00 - RUNNING = 0x01 - BLOCKED = 0x02 - PAUSED = 0x03 - SHUTDOWN = 0x04 - SHUTOFF = 0x05 - CRASHED = 0x06 - - def __init__(self, conn, name, data): - """ spawn an instance with a given name """ - self._conn = conn - # TODO(vish): this can be removed after data has been updated - # data doesn't seem to have a working iterator so in doesn't work - if data.get('owner_id', None) is not None: - data['user_id'] = data['owner_id'] - data['project_id'] = data['owner_id'] - self.datamodel = data - - size = data.get('instance_type', FLAGS.default_instance_type) - if size not in INSTANCE_TYPES: - raise exception.Error('invalid instance type: %s' % size) - - self.datamodel.update(INSTANCE_TYPES[size]) - - self.datamodel['name'] = name - self.datamodel['instance_id'] = name - self.datamodel['basepath'] = data.get( - 'basepath', os.path.abspath( - os.path.join(FLAGS.instances_path, self.name))) - self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 - self.datamodel.setdefault('image_id', FLAGS.default_image) - self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) - self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) - self.datamodel.setdefault('project_id', self.datamodel['user_id']) - self.datamodel.setdefault('bridge_name', None) - #self.datamodel.setdefault('key_data', None) - #self.datamodel.setdefault('key_name', None) - #self.datamodel.setdefault('addressing_type', None) - - # TODO(joshua) - The ugly non-flat ones - self.datamodel['groups'] = data.get('security_group', 'default') - # TODO(joshua): Support product codes somehow - self.datamodel.setdefault('product_codes', None) - - self.datamodel.save() - logging.debug("Finished init of Instance with id of %s" % name) - - def toXml(self): - # TODO(termie): cache? - logging.debug("Starting the toXML method") - libvirt_xml = open(FLAGS.libvirt_xml_template).read() - xml_info = self.datamodel.copy() - # TODO(joshua): Make this xml express the attached disks as well - - # TODO(termie): lazy lazy hack because xml is annoying - xml_info['nova'] = json.dumps(self.datamodel.copy()) - libvirt_xml = libvirt_xml % xml_info - logging.debug("Finished the toXML method") - - return libvirt_xml - - @classmethod - def fromName(cls, conn, name): - """ use the saved data for reloading the instance """ - instdir = model.InstanceDirectory() - instance = instdir.get(name) - return cls(conn=conn, name=name, data=instance) - - def set_state(self, state_code, state_description=None): - self.datamodel['state'] = state_code - if not state_description: - state_description = STATE_NAMES[state_code] - self.datamodel['state_description'] = state_description - self.datamodel.save() - - @property - def state(self): - # it is a string in datamodel - return int(self.datamodel['state']) - - @property - def name(self): - return self.datamodel['name'] - - def is_pending(self): - return (self.state == Instance.NOSTATE or self.state == 'pending') - - def is_destroyed(self): - return self.state == Instance.SHUTOFF - - def is_running(self): - logging.debug("Instance state is: %s" % self.state) - return (self.state == Instance.RUNNING or self.state == 'running') - - def describe(self): - return self.datamodel - - def info(self): - logging.debug("Getting info for dom %s" % self.name) - virt_dom = self._conn.lookupByName(self.name) - (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() - return {'state': state, - 'max_mem': max_mem, - 'mem': mem, - 'num_cpu': num_cpu, - 'cpu_time': cpu_time, - 'node_name': FLAGS.node_name} - - def basepath(self, path=''): - return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) - - def update_state(self): - self.datamodel.update(self.info()) - self.set_state(self.state) - self.datamodel.save() # Extra, but harmless - - @exception.wrap_exception - def destroy(self): - if self.is_destroyed(): - self.datamodel.destroy() - raise exception.Error('trying to destroy already destroyed' - ' instance: %s' % self.name) - - self.set_state(Instance.NOSTATE, 'shutting_down') - try: - virt_dom = self._conn.lookupByName(self.name) - virt_dom.destroy() - except Exception, _err: - pass - # If the instance is already terminated, we're still happy - d = defer.Deferred() - d.addCallback(lambda x: self._cleanup()) - d.addCallback(lambda x: self.datamodel.destroy()) - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, - # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) - def _wait_for_shutdown(): - try: - self.update_state() - if self.state == Instance.SHUTDOWN: - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d - - def _cleanup(self): - target = os.path.abspath(self.datamodel['basepath']) - logging.info("Deleting instance files at %s", target) - shutil.rmtree(target) - - @defer.inlineCallbacks - @exception.wrap_exception - def reboot(self): - if not self.is_running(): - raise exception.Error( - 'trying to reboot a non-running' - 'instance: %s (state: %s)' % (self.name, self.state)) - - logging.debug('rebooting instance %s' % self.name) - self.set_state(Instance.NOSTATE, 'rebooting') - yield self._conn.lookupByName(self.name).destroy() - self._conn.createXML(self.toXml(), 0) - - d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_reboot(): - try: - self.update_state() - if self.is_running(): - logging.debug('rebooted instance %s' % self.name) - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d - - def _fetch_s3_image(self, image, path): - url = _image_url('%s/image' % image) - d = process.simple_execute( - 'curl --silent %s -o %s' % (url, path)) - return d - - def _fetch_local_image(self, image, path): - source = _image_path('%s/image' % image) - d = process.simple_execute('cp %s %s' % (source, path)) - return d - - @defer.inlineCallbacks - def _create_image(self, libvirt_xml): - # syntactic nicety - data = self.datamodel - basepath = self.basepath - - # ensure directories exist and are writable - yield process.simple_execute( - 'mkdir -p %s' % basepath()) - yield process.simple_execute( - 'chmod 0777 %s' % basepath()) - - - # TODO(termie): these are blocking calls, it would be great - # if they weren't. - logging.info('Creating image for: %s', data['instance_id']) - f = open(basepath('libvirt.xml'), 'w') - f.write(libvirt_xml) - f.close() - - if FLAGS.fake_libvirt: - logging.info('fake_libvirt, nothing to do for create_image') - raise defer.returnValue(None); - - if FLAGS.use_s3: - _fetch_file = self._fetch_s3_image - else: - _fetch_file = self._fetch_local_image - - if not os.path.exists(basepath('disk')): - yield _fetch_file(data['image_id'], basepath('disk-raw')) - if not os.path.exists(basepath('kernel')): - yield _fetch_file(data['kernel_id'], basepath('kernel')) - if not os.path.exists(basepath('ramdisk')): - yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) - - execute = lambda cmd, input=None: \ - process.simple_execute(cmd=cmd, - input=input, - error_ok=1) - - key = data['key_data'] - net = None - if FLAGS.simple_network: - with open(FLAGS.simple_network_template) as f: - net = f.read() % {'address': data['private_dns_name'], - 'network': FLAGS.simple_network_network, - 'netmask': FLAGS.simple_network_netmask, - 'gateway': FLAGS.simple_network_gateway, - 'broadcast': FLAGS.simple_network_broadcast, - 'dns': FLAGS.simple_network_dns} - if key or net: - logging.info('Injecting data into image %s', data['image_id']) - yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) - - if os.path.exists(basepath('disk')): - yield process.simple_execute( - 'rm -f %s' % basepath('disk')) - - bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] - * 1024 * 1024 * 1024) - yield disk.partition( - basepath('disk-raw'), basepath('disk'), bytes, execute=execute) - - @defer.inlineCallbacks - @exception.wrap_exception - def spawn(self): - self.set_state(Instance.NOSTATE, 'spawning') - logging.debug("Starting spawn in Instance") - - xml = self.toXml() - self.set_state(Instance.NOSTATE, 'launching') - logging.info('self %s', self) - try: - yield self._create_image(xml) - self._conn.createXML(xml, 0) - # TODO(termie): this should actually register - # a callback to check for successful boot - logging.debug("Instance is running") - - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_boot(): - try: - self.update_state() - if self.is_running(): - logging.debug('booted instance %s' % self.name) - timer.stop() - local_d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - logging.error('Failed to boot instance %s' % self.name) - timer.stop() - local_d.callback(None) - timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - except Exception, ex: - logging.debug(ex) - self.set_state(Instance.SHUTDOWN) - - @exception.wrap_exception - def console_output(self): - if not FLAGS.fake_libvirt: - fname = os.path.abspath( - os.path.join(self.datamodel['basepath'], 'console.log')) - with open(fname, 'r') as f: - console = f.read() - else: - console = 'FAKE CONSOLE OUTPUT' - return defer.succeed(console) - -STATE_NAMES = { - Instance.NOSTATE : 'pending', - Instance.RUNNING : 'running', - Instance.BLOCKED : 'blocked', - Instance.PAUSED : 'paused', - Instance.SHUTDOWN : 'shutdown', - Instance.SHUTOFF : 'shutdown', - Instance.CRASHED : 'crashed', -} diff --git a/nova/compute/computeservice.py b/nova/compute/computeservice.py new file mode 100644 index 000000000..5568e3888 --- /dev/null +++ b/nova/compute/computeservice.py @@ -0,0 +1,581 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Compute Service: + + Runs on each compute host, managing the + hypervisor using libvirt. + +""" + +import base64 +import json +import logging +import os +import shutil +import sys +from twisted.internet import defer +from twisted.internet import task + + +try: + import libvirt +except Exception, err: + logging.warning('no libvirt found') + +from nova import exception +from nova import fakevirt +from nova import flags +from nova import process +from nova import service +from nova import utils +from nova.compute import disk +from nova.compute import model +from nova.compute import network +from nova.objectstore import image # for image_path flag +from nova.volume import volumeservice + + +FLAGS = flags.FLAGS +flags.DEFINE_string('libvirt_xml_template', + utils.abspath('compute/libvirt.xml.template'), + 'Libvirt XML Template') +flags.DEFINE_bool('use_s3', True, + 'whether to get images from s3 or use local copy') +flags.DEFINE_string('instances_path', utils.abspath('../instances'), + 'where instances are stored on disk') + +INSTANCE_TYPES = {} +INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} +INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} +INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} +INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} + + +def _image_path(path=''): + return os.path.join(FLAGS.images_path, path) + + +def _image_url(path): + return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) + + +class ComputeService(service.Service): + """ + Manages the running instances. + """ + def __init__(self): + """ load configuration options for this node and connect to libvirt """ + super(ComputeService, self).__init__() + self._instances = {} + self._conn = self._get_connection() + self.instdir = model.InstanceDirectory() + # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe + + def _get_connection(self): + """ returns a libvirt connection object """ + # TODO(termie): maybe lazy load after initial check for permissions + # TODO(termie): check whether we can be disconnected + if FLAGS.fake_libvirt: + conn = fakevirt.FakeVirtConnection.instance() + else: + auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], + 'root', + None] + conn = libvirt.openAuth('qemu:///system', auth, 0) + if conn == None: + logging.error('Failed to open connection to the hypervisor') + sys.exit(1) + return conn + + def noop(self): + """ simple test of an AMQP message call """ + return defer.succeed('PONG') + + def get_instance(self, instance_id): + # inst = self.instdir.get(instance_id) + # return inst + if self.instdir.exists(instance_id): + return Instance.fromName(self._conn, instance_id) + return None + + @exception.wrap_exception + def adopt_instances(self): + """ if there are instances already running, adopt them """ + return defer.succeed(0) + instance_names = [self._conn.lookupByID(x).name() + for x in self._conn.listDomainsID()] + for name in instance_names: + try: + new_inst = Instance.fromName(self._conn, name) + new_inst.update_state() + except: + pass + return defer.succeed(len(self._instances)) + + @exception.wrap_exception + def describe_instances(self): + retval = {} + for inst in self.instdir.by_node(FLAGS.node_name): + retval[inst['instance_id']] = ( + Instance.fromName(self._conn, inst['instance_id'])) + return retval + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except model.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield + + # @exception.wrap_exception + def run_instance(self, instance_id, **_kwargs): + """ launch a new instance with specified options """ + logging.debug("Starting instance %s..." % (instance_id)) + inst = self.instdir.get(instance_id) + if not FLAGS.simple_network: + # TODO: Get the real security group of launch in here + security_group = "default" + net = network.BridgedNetwork.get_network_for_project(inst['user_id'], + inst['project_id'], + security_group).express() + inst['node_name'] = FLAGS.node_name + inst.save() + # TODO(vish) check to make sure the availability zone matches + new_inst = Instance(self._conn, name=instance_id, data=inst) + logging.info("Instances current state is %s", new_inst.state) + if new_inst.is_running(): + raise exception.Error("Instance is already running") + d = new_inst.spawn() + return d + + @exception.wrap_exception + def terminate_instance(self, instance_id): + """ terminate an instance on this machine """ + logging.debug("Got told to terminate instance %s" % instance_id) + instance = self.get_instance(instance_id) + # inst = self.instdir.get(instance_id) + if not instance: + raise exception.Error( + 'trying to terminate unknown instance: %s' % instance_id) + d = instance.destroy() + # d.addCallback(lambda x: inst.destroy()) + return d + + @exception.wrap_exception + def reboot_instance(self, instance_id): + """ reboot an instance on this server + KVM doesn't support reboot, so we terminate and restart """ + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to reboot unknown instance: %s' % instance_id) + return instance.reboot() + + @defer.inlineCallbacks + @exception.wrap_exception + def get_console_output(self, instance_id): + """ send the console output for an instance """ + logging.debug("Getting console output for %s" % (instance_id)) + inst = self.instdir.get(instance_id) + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to get console log for unknown: %s' % instance_id) + rv = yield instance.console_output() + # TODO(termie): this stuff belongs in the API layer, no need to + # munge the data we send to ourselves + output = {"InstanceId" : instance_id, + "Timestamp" : "2", + "output" : base64.b64encode(rv)} + defer.returnValue(output) + + @defer.inlineCallbacks + @exception.wrap_exception + def attach_volume(self, instance_id = None, + volume_id = None, mountpoint = None): + volume = volumeservice.get_volume(volume_id) + yield self._init_aoe() + yield process.simple_execute( + "sudo virsh attach-disk %s /dev/etherd/%s %s" % + (instance_id, + volume['aoe_device'], + mountpoint.rpartition('/dev/')[2])) + volume.finish_attach() + defer.returnValue(True) + + @defer.inlineCallbacks + def _init_aoe(self): + yield process.simple_execute("sudo aoe-discover") + yield process.simple_execute("sudo aoe-stat") + + @defer.inlineCallbacks + @exception.wrap_exception + def detach_volume(self, instance_id, volume_id): + """ detach a volume from an instance """ + # despite the documentation, virsh detach-disk just wants the device + # name without the leading /dev/ + volume = volumeservice.get_volume(volume_id) + target = volume['mountpoint'].rpartition('/dev/')[2] + yield process.simple_execute( + "sudo virsh detach-disk %s %s " % (instance_id, target)) + volume.finish_detach() + defer.returnValue(True) + + +class Group(object): + def __init__(self, group_id): + self.group_id = group_id + + +class ProductCode(object): + def __init__(self, product_code): + self.product_code = product_code + + +class Instance(object): + + NOSTATE = 0x00 + RUNNING = 0x01 + BLOCKED = 0x02 + PAUSED = 0x03 + SHUTDOWN = 0x04 + SHUTOFF = 0x05 + CRASHED = 0x06 + + def __init__(self, conn, name, data): + """ spawn an instance with a given name """ + self._conn = conn + # TODO(vish): this can be removed after data has been updated + # data doesn't seem to have a working iterator so in doesn't work + if data.get('owner_id', None) is not None: + data['user_id'] = data['owner_id'] + data['project_id'] = data['owner_id'] + self.datamodel = data + + size = data.get('instance_type', FLAGS.default_instance_type) + if size not in INSTANCE_TYPES: + raise exception.Error('invalid instance type: %s' % size) + + self.datamodel.update(INSTANCE_TYPES[size]) + + self.datamodel['name'] = name + self.datamodel['instance_id'] = name + self.datamodel['basepath'] = data.get( + 'basepath', os.path.abspath( + os.path.join(FLAGS.instances_path, self.name))) + self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 + self.datamodel.setdefault('image_id', FLAGS.default_image) + self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) + self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) + self.datamodel.setdefault('project_id', self.datamodel['user_id']) + self.datamodel.setdefault('bridge_name', None) + #self.datamodel.setdefault('key_data', None) + #self.datamodel.setdefault('key_name', None) + #self.datamodel.setdefault('addressing_type', None) + + # TODO(joshua) - The ugly non-flat ones + self.datamodel['groups'] = data.get('security_group', 'default') + # TODO(joshua): Support product codes somehow + self.datamodel.setdefault('product_codes', None) + + self.datamodel.save() + logging.debug("Finished init of Instance with id of %s" % name) + + def toXml(self): + # TODO(termie): cache? + logging.debug("Starting the toXML method") + libvirt_xml = open(FLAGS.libvirt_xml_template).read() + xml_info = self.datamodel.copy() + # TODO(joshua): Make this xml express the attached disks as well + + # TODO(termie): lazy lazy hack because xml is annoying + xml_info['nova'] = json.dumps(self.datamodel.copy()) + libvirt_xml = libvirt_xml % xml_info + logging.debug("Finished the toXML method") + + return libvirt_xml + + @classmethod + def fromName(cls, conn, name): + """ use the saved data for reloading the instance """ + instdir = model.InstanceDirectory() + instance = instdir.get(name) + return cls(conn=conn, name=name, data=instance) + + def set_state(self, state_code, state_description=None): + self.datamodel['state'] = state_code + if not state_description: + state_description = STATE_NAMES[state_code] + self.datamodel['state_description'] = state_description + self.datamodel.save() + + @property + def state(self): + # it is a string in datamodel + return int(self.datamodel['state']) + + @property + def name(self): + return self.datamodel['name'] + + def is_pending(self): + return (self.state == Instance.NOSTATE or self.state == 'pending') + + def is_destroyed(self): + return self.state == Instance.SHUTOFF + + def is_running(self): + logging.debug("Instance state is: %s" % self.state) + return (self.state == Instance.RUNNING or self.state == 'running') + + def describe(self): + return self.datamodel + + def info(self): + logging.debug("Getting info for dom %s" % self.name) + virt_dom = self._conn.lookupByName(self.name) + (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() + return {'state': state, + 'max_mem': max_mem, + 'mem': mem, + 'num_cpu': num_cpu, + 'cpu_time': cpu_time, + 'node_name': FLAGS.node_name} + + def basepath(self, path=''): + return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) + + def update_state(self): + self.datamodel.update(self.info()) + self.set_state(self.state) + self.datamodel.save() # Extra, but harmless + + @exception.wrap_exception + def destroy(self): + if self.is_destroyed(): + self.datamodel.destroy() + raise exception.Error('trying to destroy already destroyed' + ' instance: %s' % self.name) + + self.set_state(Instance.NOSTATE, 'shutting_down') + try: + virt_dom = self._conn.lookupByName(self.name) + virt_dom.destroy() + except Exception, _err: + pass + # If the instance is already terminated, we're still happy + d = defer.Deferred() + d.addCallback(lambda x: self._cleanup()) + d.addCallback(lambda x: self.datamodel.destroy()) + # TODO(termie): short-circuit me for tests + # WE'LL save this for when we do shutdown, + # instead of destroy - but destroy returns immediately + timer = task.LoopingCall(f=None) + def _wait_for_shutdown(): + try: + self.update_state() + if self.state == Instance.SHUTDOWN: + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_shutdown + timer.start(interval=0.5, now=True) + return d + + def _cleanup(self): + target = os.path.abspath(self.datamodel['basepath']) + logging.info("Deleting instance files at %s", target) + shutil.rmtree(target) + + @defer.inlineCallbacks + @exception.wrap_exception + def reboot(self): + if not self.is_running(): + raise exception.Error( + 'trying to reboot a non-running' + 'instance: %s (state: %s)' % (self.name, self.state)) + + logging.debug('rebooting instance %s' % self.name) + self.set_state(Instance.NOSTATE, 'rebooting') + yield self._conn.lookupByName(self.name).destroy() + self._conn.createXML(self.toXml(), 0) + + d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_reboot(): + try: + self.update_state() + if self.is_running(): + logging.debug('rebooted instance %s' % self.name) + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_reboot + timer.start(interval=0.5, now=True) + yield d + + def _fetch_s3_image(self, image, path): + url = _image_url('%s/image' % image) + d = process.simple_execute( + 'curl --silent %s -o %s' % (url, path)) + return d + + def _fetch_local_image(self, image, path): + source = _image_path('%s/image' % image) + d = process.simple_execute('cp %s %s' % (source, path)) + return d + + @defer.inlineCallbacks + def _create_image(self, libvirt_xml): + # syntactic nicety + data = self.datamodel + basepath = self.basepath + + # ensure directories exist and are writable + yield process.simple_execute( + 'mkdir -p %s' % basepath()) + yield process.simple_execute( + 'chmod 0777 %s' % basepath()) + + + # TODO(termie): these are blocking calls, it would be great + # if they weren't. + logging.info('Creating image for: %s', data['instance_id']) + f = open(basepath('libvirt.xml'), 'w') + f.write(libvirt_xml) + f.close() + + if FLAGS.fake_libvirt: + logging.info('fake_libvirt, nothing to do for create_image') + raise defer.returnValue(None); + + if FLAGS.use_s3: + _fetch_file = self._fetch_s3_image + else: + _fetch_file = self._fetch_local_image + + if not os.path.exists(basepath('disk')): + yield _fetch_file(data['image_id'], basepath('disk-raw')) + if not os.path.exists(basepath('kernel')): + yield _fetch_file(data['kernel_id'], basepath('kernel')) + if not os.path.exists(basepath('ramdisk')): + yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) + + execute = lambda cmd, input=None: \ + process.simple_execute(cmd=cmd, + input=input, + error_ok=1) + + key = data['key_data'] + net = None + if FLAGS.simple_network: + with open(FLAGS.simple_network_template) as f: + net = f.read() % {'address': data['private_dns_name'], + 'network': FLAGS.simple_network_network, + 'netmask': FLAGS.simple_network_netmask, + 'gateway': FLAGS.simple_network_gateway, + 'broadcast': FLAGS.simple_network_broadcast, + 'dns': FLAGS.simple_network_dns} + if key or net: + logging.info('Injecting data into image %s', data['image_id']) + yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) + + if os.path.exists(basepath('disk')): + yield process.simple_execute( + 'rm -f %s' % basepath('disk')) + + bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] + * 1024 * 1024 * 1024) + yield disk.partition( + basepath('disk-raw'), basepath('disk'), bytes, execute=execute) + + @defer.inlineCallbacks + @exception.wrap_exception + def spawn(self): + self.set_state(Instance.NOSTATE, 'spawning') + logging.debug("Starting spawn in Instance") + + xml = self.toXml() + self.set_state(Instance.NOSTATE, 'launching') + logging.info('self %s', self) + try: + yield self._create_image(xml) + self._conn.createXML(xml, 0) + # TODO(termie): this should actually register + # a callback to check for successful boot + logging.debug("Instance is running") + + local_d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_boot(): + try: + self.update_state() + if self.is_running(): + logging.debug('booted instance %s' % self.name) + timer.stop() + local_d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + logging.error('Failed to boot instance %s' % self.name) + timer.stop() + local_d.callback(None) + timer.f = _wait_for_boot + timer.start(interval=0.5, now=True) + except Exception, ex: + logging.debug(ex) + self.set_state(Instance.SHUTDOWN) + + @exception.wrap_exception + def console_output(self): + if not FLAGS.fake_libvirt: + fname = os.path.abspath( + os.path.join(self.datamodel['basepath'], 'console.log')) + with open(fname, 'r') as f: + console = f.read() + else: + console = 'FAKE CONSOLE OUTPUT' + return defer.succeed(console) + +STATE_NAMES = { + Instance.NOSTATE : 'pending', + Instance.RUNNING : 'running', + Instance.BLOCKED : 'blocked', + Instance.PAUSED : 'paused', + Instance.SHUTDOWN : 'shutdown', + Instance.SHUTOFF : 'shutdown', + Instance.CRASHED : 'crashed', +} -- cgit From a0c29a822aaed756728f2619e176d8c54bb1d4e9 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Fri, 23 Jul 2010 17:20:21 -0700 Subject: fixed bug where partition code was sometimes failing due to initial dd not being yielded properly --- nova/compute/disk.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'nova/compute') diff --git a/nova/compute/disk.py b/nova/compute/disk.py index 08a22556e..7e31498e5 100644 --- a/nova/compute/disk.py +++ b/nova/compute/disk.py @@ -64,8 +64,8 @@ def partition(infile, outfile, local_bytes=0, local_type='ext2', execute=None): last_sector = local_last # e # create an empty file - execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' - % (outfile, last_sector, sector_size)) + yield execute('dd if=/dev/zero of=%s count=1 seek=%d bs=%d' + % (outfile, last_sector, sector_size)) # make mbr partition yield execute('parted --script %s mklabel msdos' % outfile) -- cgit From 87e27afec0c7b683ee35f842abdaccea954f2fba Mon Sep 17 00:00:00 2001 From: Monty Taylor Date: Sat, 24 Jul 2010 18:06:22 -0700 Subject: Updated sphinx layout to a two-dir layout like swift. Updated a doc string to get rid of a Sphinx warning. --- nova/compute/disk.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'nova/compute') diff --git a/nova/compute/disk.py b/nova/compute/disk.py index 08a22556e..5749d4c6a 100644 --- a/nova/compute/disk.py +++ b/nova/compute/disk.py @@ -40,7 +40,8 @@ def partition(infile, outfile, local_bytes=0, local_type='ext2', execute=None): formatted as ext2. In the diagram below, dashes represent drive sectors. - 0 a b c d e + +-----+------. . .-------+------. . .------+ + | 0 a| b c|d e| +-----+------. . .-------+------. . .------+ | mbr | primary partiton | local partition | +-----+------. . .-------+------. . .------+ -- cgit From 01dffdd79098429a448cf283119d9026ae1231c2 Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Sun, 25 Jul 2010 14:29:20 +0200 Subject: Always make sure to set a Date headers, since it's needed to calculate the S3 Auth header. --- nova/compute/node.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'nova/compute') diff --git a/nova/compute/node.py b/nova/compute/node.py index 7c09d9583..a01cb3070 100644 --- a/nova/compute/node.py +++ b/nova/compute/node.py @@ -47,12 +47,12 @@ from nova import flags from nova import process from nova import utils from nova.auth import signer +from nova.auth.users import UserManager from nova.compute import disk from nova.compute import model from nova.compute import network from nova.objectstore import image # for image_path flag from nova.volume import storage -from nova.users import UserManager FLAGS = flags.FLAGS @@ -449,12 +449,18 @@ class Instance(object): def _fetch_s3_image(self, image, path): url = _image_url('%s/image' % image) + headers = {} + headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) + user_id = self.datamodel['user_id'] user = UserManager.instance().get_user(user_id) - auth = signer.Signer(user.secret.encode()).s3_authorization({}, 'GET', url) - auth_header = 'Authorization: %s:%s' % (user_id, auth) - d = process.simple_execute( - 'curl --silent %s -o "%s"' % (url, auth_header, path)) + auth = signer.Signer(user.secret.encode()).s3_authorization(headers, 'GET', url) + headers['Authorization'] = auth + + headers_opt_string = ' '.join(['-H %s:%s' % (k,v) for (k,v) in headers.iteritems()]) + d = process.simple_execute('curl --silent %s ' + '%s -o "%s"' % (url, headers_opt_string, + path)) return d def _fetch_local_image(self, image, path): -- cgit From d6e74751fa156f3879ff2136caccf2a40d4b9e8c Mon Sep 17 00:00:00 2001 From: Todd Willey Date: Mon, 26 Jul 2010 15:01:42 -0400 Subject: Basic standup of SessionToken model for shortlived auth tokens. --- nova/compute/model.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) (limited to 'nova/compute') diff --git a/nova/compute/model.py b/nova/compute/model.py index cda188183..331b68349 100644 --- a/nova/compute/model.py +++ b/nova/compute/model.py @@ -43,6 +43,7 @@ True import logging import time import redis +import uuid from nova import datastore from nova import exception @@ -228,6 +229,41 @@ class Daemon(datastore.BasicModel): for x in cls.associated_to("host", hostname): yield x +class SessionToken(datastore.BasicModel): + """This is a short-lived auth token that is passed through web requests""" + + def __init__(self, session_token): + self.token = session_token + super(SessionToken, self).__init__() + + @property + def identifier(self): + return self.token + + def default_state(self): + return {'user': None, 'session_type': None, 'token': self.token} + + @classmethod + def generate(cls, userid, session_type=None): + token = str(uuid.uuid4()) + while cls.lookup(token): + token = str(uuid.uuid4()) + instance = cls(token) + instance['user'] = userid + instance['session_type'] = session_type + instance.save() + return instance + + def save(self): + """Call into superclass to save object, then save associations""" + if not self['user']: + raise exception.Invalid("SessionToken requires a User association") + success = super(SessionToken, self).save() + if success: + self.associate_with("user", self['user']) + return True + + if __name__ == "__main__": import doctest doctest.testmod() -- cgit From fd2d4e3f3dba426eedc22b326d2bb0cb6a19eb76 Mon Sep 17 00:00:00 2001 From: Todd Willey Date: Mon, 26 Jul 2010 17:00:50 -0400 Subject: Expiry awareness for SessionToken. --- nova/compute/model.py | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) (limited to 'nova/compute') diff --git a/nova/compute/model.py b/nova/compute/model.py index 331b68349..3aa6fc841 100644 --- a/nova/compute/model.py +++ b/nova/compute/model.py @@ -40,6 +40,7 @@ True True """ +import datetime import logging import time import redis @@ -241,10 +242,24 @@ class SessionToken(datastore.BasicModel): return self.token def default_state(self): - return {'user': None, 'session_type': None, 'token': self.token} + now = datetime.datetime.utcnow() + diff = datetime.timedelta(hours=1) + expires = now + diff + return {'user': None, 'session_type': None, 'token': self.token, + 'expiry': expires.strftime(utils.TIME_FORMAT)} + + def save(self): + """Call into superclass to save object, then save associations""" + if not self['user']: + raise exception.Invalid("SessionToken requires a User association") + success = super(SessionToken, self).save() + if success: + self.associate_with("user", self['user']) + return True @classmethod def generate(cls, userid, session_type=None): + """make a new token for the given user""" token = str(uuid.uuid4()) while cls.lookup(token): token = str(uuid.uuid4()) @@ -254,14 +269,14 @@ class SessionToken(datastore.BasicModel): instance.save() return instance - def save(self): - """Call into superclass to save object, then save associations""" - if not self['user']: - raise exception.Invalid("SessionToken requires a User association") - success = super(SessionToken, self).save() - if success: - self.associate_with("user", self['user']) - return True + def update_expiry(self, **kwargs): + """updates the expirty attribute, but doesn't save""" + if not kwargs: + kwargs['hours'] = 1 + time = datetime.datetime.utcnow() + diff = datetime.timedelta(**kwargs) + expires = time + diff + self['expiry'] = expires.strftime(utils.TIME_FORMAT) if __name__ == "__main__": -- cgit From 69cbb1ac3559c44f6640939cd2e1db64e82073fe Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Mon, 26 Jul 2010 23:27:42 +0200 Subject: Make _fetch_s3_image pass proper AWS Authorization headers so that image downloads work again. --- nova/compute/node.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) (limited to 'nova/compute') diff --git a/nova/compute/node.py b/nova/compute/node.py index 7b86ca749..a44583c06 100644 --- a/nova/compute/node.py +++ b/nova/compute/node.py @@ -31,6 +31,7 @@ import logging import os import shutil import sys +import time from twisted.internet import defer from twisted.internet import task from twisted.application import service @@ -453,19 +454,25 @@ class Instance(object): def _fetch_s3_image(self, image, path): url = _image_url('%s/image' % image) + + # This should probably move somewhere else, like e.g. a download_as + # method on User objects and at the same time get rewritten to use + # twisted web client. headers = {} headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) user_id = self.datamodel['user_id'] user = UserManager.instance().get_user(user_id) - auth = signer.Signer(user.secret.encode()).s3_authorization(headers, 'GET', url) - headers['Authorization'] = auth + uri = '/' + url.partition('/')[2] + auth = signer.Signer(user.secret.encode()).s3_authorization(headers, 'GET', uri) + headers['Authorization'] = 'AWS %s:%s' % (user.access, auth) - headers_opt_string = ' '.join(['-H %s:%s' % (k,v) for (k,v) in headers.iteritems()]) - d = process.simple_execute('curl --silent %s ' - '%s -o "%s"' % (url, headers_opt_string, - path)) - return d + cmd = ['/usr/bin/curl', '--silent', url] + for (k,v) in headers.iteritems(): + cmd += ['-H', '%s: %s' % (k,v)] + + cmd += ['-o', path] + return process.SharedPool().execute(executable=cmd[0], args=cmd[1:]) def _fetch_local_image(self, image, path): source = _image_path('%s/image' % image) -- cgit From ffe52b8660123335e425c52eb3bebba2e3d2e42f Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Mon, 26 Jul 2010 14:48:08 -0700 Subject: default flag file full path --- nova/compute/linux_net.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'nova/compute') diff --git a/nova/compute/linux_net.py b/nova/compute/linux_net.py index 48e07da66..861ce779b 100644 --- a/nova/compute/linux_net.py +++ b/nova/compute/linux_net.py @@ -29,7 +29,7 @@ from nova import flags FLAGS=flags.FLAGS flags.DEFINE_string('dhcpbridge_flagfile', - '/etc/nova-dhcpbridge.conf', + '/etc/nova/nova-dhcpbridge.conf', 'location of flagfile for dhcpbridge') def execute(cmd, addl_env=None): -- cgit From 58b41fde4c8639577b738d0f57f10acda4c63c0e Mon Sep 17 00:00:00 2001 From: Todd Willey Date: Mon, 26 Jul 2010 18:00:39 -0400 Subject: Lookup should only not return expired tokens. --- nova/compute/model.py | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'nova/compute') diff --git a/nova/compute/model.py b/nova/compute/model.py index 3aa6fc841..ab0bfeb83 100644 --- a/nova/compute/model.py +++ b/nova/compute/model.py @@ -257,6 +257,15 @@ class SessionToken(datastore.BasicModel): self.associate_with("user", self['user']) return True + @classmethod + def lookup(cls, key): + token = super(SessionToken, cls).lookup(key) + if token: + expires_at = utils.parse_isotime(token['expiry']) + if datetime.datetime.utcnow() >= expires_at: + return None + return token + @classmethod def generate(cls, userid, session_type=None): """make a new token for the given user""" -- cgit From 7588ae06e8d6a7d526b12e0f15f3e5be522f16d0 Mon Sep 17 00:00:00 2001 From: Todd Willey Date: Mon, 26 Jul 2010 18:02:00 -0400 Subject: In fact, it should delete them. --- nova/compute/model.py | 1 + 1 file changed, 1 insertion(+) (limited to 'nova/compute') diff --git a/nova/compute/model.py b/nova/compute/model.py index ab0bfeb83..7335d2c79 100644 --- a/nova/compute/model.py +++ b/nova/compute/model.py @@ -263,6 +263,7 @@ class SessionToken(datastore.BasicModel): if token: expires_at = utils.parse_isotime(token['expiry']) if datetime.datetime.utcnow() >= expires_at: + token.destroy() return None return token -- cgit From 754db8ef1ceb84fa9a1f44bfc6c5c6bbd99cd7e1 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Mon, 26 Jul 2010 17:14:28 -0700 Subject: renamed xxxservice to service --- nova/compute/computeservice.py | 581 ----------------------------------------- nova/compute/service.py | 581 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 581 insertions(+), 581 deletions(-) delete mode 100644 nova/compute/computeservice.py create mode 100644 nova/compute/service.py (limited to 'nova/compute') diff --git a/nova/compute/computeservice.py b/nova/compute/computeservice.py deleted file mode 100644 index 5568e3888..000000000 --- a/nova/compute/computeservice.py +++ /dev/null @@ -1,581 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Compute Service: - - Runs on each compute host, managing the - hypervisor using libvirt. - -""" - -import base64 -import json -import logging -import os -import shutil -import sys -from twisted.internet import defer -from twisted.internet import task - - -try: - import libvirt -except Exception, err: - logging.warning('no libvirt found') - -from nova import exception -from nova import fakevirt -from nova import flags -from nova import process -from nova import service -from nova import utils -from nova.compute import disk -from nova.compute import model -from nova.compute import network -from nova.objectstore import image # for image_path flag -from nova.volume import volumeservice - - -FLAGS = flags.FLAGS -flags.DEFINE_string('libvirt_xml_template', - utils.abspath('compute/libvirt.xml.template'), - 'Libvirt XML Template') -flags.DEFINE_bool('use_s3', True, - 'whether to get images from s3 or use local copy') -flags.DEFINE_string('instances_path', utils.abspath('../instances'), - 'where instances are stored on disk') - -INSTANCE_TYPES = {} -INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} -INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} -INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} -INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} - - -def _image_path(path=''): - return os.path.join(FLAGS.images_path, path) - - -def _image_url(path): - return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) - - -class ComputeService(service.Service): - """ - Manages the running instances. - """ - def __init__(self): - """ load configuration options for this node and connect to libvirt """ - super(ComputeService, self).__init__() - self._instances = {} - self._conn = self._get_connection() - self.instdir = model.InstanceDirectory() - # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe - - def _get_connection(self): - """ returns a libvirt connection object """ - # TODO(termie): maybe lazy load after initial check for permissions - # TODO(termie): check whether we can be disconnected - if FLAGS.fake_libvirt: - conn = fakevirt.FakeVirtConnection.instance() - else: - auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], - 'root', - None] - conn = libvirt.openAuth('qemu:///system', auth, 0) - if conn == None: - logging.error('Failed to open connection to the hypervisor') - sys.exit(1) - return conn - - def noop(self): - """ simple test of an AMQP message call """ - return defer.succeed('PONG') - - def get_instance(self, instance_id): - # inst = self.instdir.get(instance_id) - # return inst - if self.instdir.exists(instance_id): - return Instance.fromName(self._conn, instance_id) - return None - - @exception.wrap_exception - def adopt_instances(self): - """ if there are instances already running, adopt them """ - return defer.succeed(0) - instance_names = [self._conn.lookupByID(x).name() - for x in self._conn.listDomainsID()] - for name in instance_names: - try: - new_inst = Instance.fromName(self._conn, name) - new_inst.update_state() - except: - pass - return defer.succeed(len(self._instances)) - - @exception.wrap_exception - def describe_instances(self): - retval = {} - for inst in self.instdir.by_node(FLAGS.node_name): - retval[inst['instance_id']] = ( - Instance.fromName(self._conn, inst['instance_id'])) - return retval - - @defer.inlineCallbacks - def report_state(self, nodename, daemon): - # TODO(termie): make this pattern be more elegant. -todd - try: - record = model.Daemon(nodename, daemon) - record.heartbeat() - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - except model.ConnectionError, ex: - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - yield - - # @exception.wrap_exception - def run_instance(self, instance_id, **_kwargs): - """ launch a new instance with specified options """ - logging.debug("Starting instance %s..." % (instance_id)) - inst = self.instdir.get(instance_id) - if not FLAGS.simple_network: - # TODO: Get the real security group of launch in here - security_group = "default" - net = network.BridgedNetwork.get_network_for_project(inst['user_id'], - inst['project_id'], - security_group).express() - inst['node_name'] = FLAGS.node_name - inst.save() - # TODO(vish) check to make sure the availability zone matches - new_inst = Instance(self._conn, name=instance_id, data=inst) - logging.info("Instances current state is %s", new_inst.state) - if new_inst.is_running(): - raise exception.Error("Instance is already running") - d = new_inst.spawn() - return d - - @exception.wrap_exception - def terminate_instance(self, instance_id): - """ terminate an instance on this machine """ - logging.debug("Got told to terminate instance %s" % instance_id) - instance = self.get_instance(instance_id) - # inst = self.instdir.get(instance_id) - if not instance: - raise exception.Error( - 'trying to terminate unknown instance: %s' % instance_id) - d = instance.destroy() - # d.addCallback(lambda x: inst.destroy()) - return d - - @exception.wrap_exception - def reboot_instance(self, instance_id): - """ reboot an instance on this server - KVM doesn't support reboot, so we terminate and restart """ - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to reboot unknown instance: %s' % instance_id) - return instance.reboot() - - @defer.inlineCallbacks - @exception.wrap_exception - def get_console_output(self, instance_id): - """ send the console output for an instance """ - logging.debug("Getting console output for %s" % (instance_id)) - inst = self.instdir.get(instance_id) - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to get console log for unknown: %s' % instance_id) - rv = yield instance.console_output() - # TODO(termie): this stuff belongs in the API layer, no need to - # munge the data we send to ourselves - output = {"InstanceId" : instance_id, - "Timestamp" : "2", - "output" : base64.b64encode(rv)} - defer.returnValue(output) - - @defer.inlineCallbacks - @exception.wrap_exception - def attach_volume(self, instance_id = None, - volume_id = None, mountpoint = None): - volume = volumeservice.get_volume(volume_id) - yield self._init_aoe() - yield process.simple_execute( - "sudo virsh attach-disk %s /dev/etherd/%s %s" % - (instance_id, - volume['aoe_device'], - mountpoint.rpartition('/dev/')[2])) - volume.finish_attach() - defer.returnValue(True) - - @defer.inlineCallbacks - def _init_aoe(self): - yield process.simple_execute("sudo aoe-discover") - yield process.simple_execute("sudo aoe-stat") - - @defer.inlineCallbacks - @exception.wrap_exception - def detach_volume(self, instance_id, volume_id): - """ detach a volume from an instance """ - # despite the documentation, virsh detach-disk just wants the device - # name without the leading /dev/ - volume = volumeservice.get_volume(volume_id) - target = volume['mountpoint'].rpartition('/dev/')[2] - yield process.simple_execute( - "sudo virsh detach-disk %s %s " % (instance_id, target)) - volume.finish_detach() - defer.returnValue(True) - - -class Group(object): - def __init__(self, group_id): - self.group_id = group_id - - -class ProductCode(object): - def __init__(self, product_code): - self.product_code = product_code - - -class Instance(object): - - NOSTATE = 0x00 - RUNNING = 0x01 - BLOCKED = 0x02 - PAUSED = 0x03 - SHUTDOWN = 0x04 - SHUTOFF = 0x05 - CRASHED = 0x06 - - def __init__(self, conn, name, data): - """ spawn an instance with a given name """ - self._conn = conn - # TODO(vish): this can be removed after data has been updated - # data doesn't seem to have a working iterator so in doesn't work - if data.get('owner_id', None) is not None: - data['user_id'] = data['owner_id'] - data['project_id'] = data['owner_id'] - self.datamodel = data - - size = data.get('instance_type', FLAGS.default_instance_type) - if size not in INSTANCE_TYPES: - raise exception.Error('invalid instance type: %s' % size) - - self.datamodel.update(INSTANCE_TYPES[size]) - - self.datamodel['name'] = name - self.datamodel['instance_id'] = name - self.datamodel['basepath'] = data.get( - 'basepath', os.path.abspath( - os.path.join(FLAGS.instances_path, self.name))) - self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 - self.datamodel.setdefault('image_id', FLAGS.default_image) - self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) - self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) - self.datamodel.setdefault('project_id', self.datamodel['user_id']) - self.datamodel.setdefault('bridge_name', None) - #self.datamodel.setdefault('key_data', None) - #self.datamodel.setdefault('key_name', None) - #self.datamodel.setdefault('addressing_type', None) - - # TODO(joshua) - The ugly non-flat ones - self.datamodel['groups'] = data.get('security_group', 'default') - # TODO(joshua): Support product codes somehow - self.datamodel.setdefault('product_codes', None) - - self.datamodel.save() - logging.debug("Finished init of Instance with id of %s" % name) - - def toXml(self): - # TODO(termie): cache? - logging.debug("Starting the toXML method") - libvirt_xml = open(FLAGS.libvirt_xml_template).read() - xml_info = self.datamodel.copy() - # TODO(joshua): Make this xml express the attached disks as well - - # TODO(termie): lazy lazy hack because xml is annoying - xml_info['nova'] = json.dumps(self.datamodel.copy()) - libvirt_xml = libvirt_xml % xml_info - logging.debug("Finished the toXML method") - - return libvirt_xml - - @classmethod - def fromName(cls, conn, name): - """ use the saved data for reloading the instance """ - instdir = model.InstanceDirectory() - instance = instdir.get(name) - return cls(conn=conn, name=name, data=instance) - - def set_state(self, state_code, state_description=None): - self.datamodel['state'] = state_code - if not state_description: - state_description = STATE_NAMES[state_code] - self.datamodel['state_description'] = state_description - self.datamodel.save() - - @property - def state(self): - # it is a string in datamodel - return int(self.datamodel['state']) - - @property - def name(self): - return self.datamodel['name'] - - def is_pending(self): - return (self.state == Instance.NOSTATE or self.state == 'pending') - - def is_destroyed(self): - return self.state == Instance.SHUTOFF - - def is_running(self): - logging.debug("Instance state is: %s" % self.state) - return (self.state == Instance.RUNNING or self.state == 'running') - - def describe(self): - return self.datamodel - - def info(self): - logging.debug("Getting info for dom %s" % self.name) - virt_dom = self._conn.lookupByName(self.name) - (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() - return {'state': state, - 'max_mem': max_mem, - 'mem': mem, - 'num_cpu': num_cpu, - 'cpu_time': cpu_time, - 'node_name': FLAGS.node_name} - - def basepath(self, path=''): - return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) - - def update_state(self): - self.datamodel.update(self.info()) - self.set_state(self.state) - self.datamodel.save() # Extra, but harmless - - @exception.wrap_exception - def destroy(self): - if self.is_destroyed(): - self.datamodel.destroy() - raise exception.Error('trying to destroy already destroyed' - ' instance: %s' % self.name) - - self.set_state(Instance.NOSTATE, 'shutting_down') - try: - virt_dom = self._conn.lookupByName(self.name) - virt_dom.destroy() - except Exception, _err: - pass - # If the instance is already terminated, we're still happy - d = defer.Deferred() - d.addCallback(lambda x: self._cleanup()) - d.addCallback(lambda x: self.datamodel.destroy()) - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, - # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) - def _wait_for_shutdown(): - try: - self.update_state() - if self.state == Instance.SHUTDOWN: - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d - - def _cleanup(self): - target = os.path.abspath(self.datamodel['basepath']) - logging.info("Deleting instance files at %s", target) - shutil.rmtree(target) - - @defer.inlineCallbacks - @exception.wrap_exception - def reboot(self): - if not self.is_running(): - raise exception.Error( - 'trying to reboot a non-running' - 'instance: %s (state: %s)' % (self.name, self.state)) - - logging.debug('rebooting instance %s' % self.name) - self.set_state(Instance.NOSTATE, 'rebooting') - yield self._conn.lookupByName(self.name).destroy() - self._conn.createXML(self.toXml(), 0) - - d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_reboot(): - try: - self.update_state() - if self.is_running(): - logging.debug('rebooted instance %s' % self.name) - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d - - def _fetch_s3_image(self, image, path): - url = _image_url('%s/image' % image) - d = process.simple_execute( - 'curl --silent %s -o %s' % (url, path)) - return d - - def _fetch_local_image(self, image, path): - source = _image_path('%s/image' % image) - d = process.simple_execute('cp %s %s' % (source, path)) - return d - - @defer.inlineCallbacks - def _create_image(self, libvirt_xml): - # syntactic nicety - data = self.datamodel - basepath = self.basepath - - # ensure directories exist and are writable - yield process.simple_execute( - 'mkdir -p %s' % basepath()) - yield process.simple_execute( - 'chmod 0777 %s' % basepath()) - - - # TODO(termie): these are blocking calls, it would be great - # if they weren't. - logging.info('Creating image for: %s', data['instance_id']) - f = open(basepath('libvirt.xml'), 'w') - f.write(libvirt_xml) - f.close() - - if FLAGS.fake_libvirt: - logging.info('fake_libvirt, nothing to do for create_image') - raise defer.returnValue(None); - - if FLAGS.use_s3: - _fetch_file = self._fetch_s3_image - else: - _fetch_file = self._fetch_local_image - - if not os.path.exists(basepath('disk')): - yield _fetch_file(data['image_id'], basepath('disk-raw')) - if not os.path.exists(basepath('kernel')): - yield _fetch_file(data['kernel_id'], basepath('kernel')) - if not os.path.exists(basepath('ramdisk')): - yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) - - execute = lambda cmd, input=None: \ - process.simple_execute(cmd=cmd, - input=input, - error_ok=1) - - key = data['key_data'] - net = None - if FLAGS.simple_network: - with open(FLAGS.simple_network_template) as f: - net = f.read() % {'address': data['private_dns_name'], - 'network': FLAGS.simple_network_network, - 'netmask': FLAGS.simple_network_netmask, - 'gateway': FLAGS.simple_network_gateway, - 'broadcast': FLAGS.simple_network_broadcast, - 'dns': FLAGS.simple_network_dns} - if key or net: - logging.info('Injecting data into image %s', data['image_id']) - yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) - - if os.path.exists(basepath('disk')): - yield process.simple_execute( - 'rm -f %s' % basepath('disk')) - - bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] - * 1024 * 1024 * 1024) - yield disk.partition( - basepath('disk-raw'), basepath('disk'), bytes, execute=execute) - - @defer.inlineCallbacks - @exception.wrap_exception - def spawn(self): - self.set_state(Instance.NOSTATE, 'spawning') - logging.debug("Starting spawn in Instance") - - xml = self.toXml() - self.set_state(Instance.NOSTATE, 'launching') - logging.info('self %s', self) - try: - yield self._create_image(xml) - self._conn.createXML(xml, 0) - # TODO(termie): this should actually register - # a callback to check for successful boot - logging.debug("Instance is running") - - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_boot(): - try: - self.update_state() - if self.is_running(): - logging.debug('booted instance %s' % self.name) - timer.stop() - local_d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - logging.error('Failed to boot instance %s' % self.name) - timer.stop() - local_d.callback(None) - timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - except Exception, ex: - logging.debug(ex) - self.set_state(Instance.SHUTDOWN) - - @exception.wrap_exception - def console_output(self): - if not FLAGS.fake_libvirt: - fname = os.path.abspath( - os.path.join(self.datamodel['basepath'], 'console.log')) - with open(fname, 'r') as f: - console = f.read() - else: - console = 'FAKE CONSOLE OUTPUT' - return defer.succeed(console) - -STATE_NAMES = { - Instance.NOSTATE : 'pending', - Instance.RUNNING : 'running', - Instance.BLOCKED : 'blocked', - Instance.PAUSED : 'paused', - Instance.SHUTDOWN : 'shutdown', - Instance.SHUTOFF : 'shutdown', - Instance.CRASHED : 'crashed', -} diff --git a/nova/compute/service.py b/nova/compute/service.py new file mode 100644 index 000000000..02e35baa2 --- /dev/null +++ b/nova/compute/service.py @@ -0,0 +1,581 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Compute Service: + + Runs on each compute host, managing the + hypervisor using libvirt. + +""" + +import base64 +import json +import logging +import os +import shutil +import sys +from twisted.internet import defer +from twisted.internet import task + + +try: + import libvirt +except Exception, err: + logging.warning('no libvirt found') + +from nova import exception +from nova import fakevirt +from nova import flags +from nova import process +from nova import service +from nova import utils +from nova.compute import disk +from nova.compute import model +from nova.compute import network +from nova.objectstore import image # for image_path flag +from nova.volume import service as volume_service + + +FLAGS = flags.FLAGS +flags.DEFINE_string('libvirt_xml_template', + utils.abspath('compute/libvirt.xml.template'), + 'Libvirt XML Template') +flags.DEFINE_bool('use_s3', True, + 'whether to get images from s3 or use local copy') +flags.DEFINE_string('instances_path', utils.abspath('../instances'), + 'where instances are stored on disk') + +INSTANCE_TYPES = {} +INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} +INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} +INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} +INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} + + +def _image_path(path=''): + return os.path.join(FLAGS.images_path, path) + + +def _image_url(path): + return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) + + +class ComputeService(service.Service): + """ + Manages the running instances. + """ + def __init__(self): + """ load configuration options for this node and connect to libvirt """ + super(ComputeService, self).__init__() + self._instances = {} + self._conn = self._get_connection() + self.instdir = model.InstanceDirectory() + # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe + + def _get_connection(self): + """ returns a libvirt connection object """ + # TODO(termie): maybe lazy load after initial check for permissions + # TODO(termie): check whether we can be disconnected + if FLAGS.fake_libvirt: + conn = fakevirt.FakeVirtConnection.instance() + else: + auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], + 'root', + None] + conn = libvirt.openAuth('qemu:///system', auth, 0) + if conn == None: + logging.error('Failed to open connection to the hypervisor') + sys.exit(1) + return conn + + def noop(self): + """ simple test of an AMQP message call """ + return defer.succeed('PONG') + + def get_instance(self, instance_id): + # inst = self.instdir.get(instance_id) + # return inst + if self.instdir.exists(instance_id): + return Instance.fromName(self._conn, instance_id) + return None + + @exception.wrap_exception + def adopt_instances(self): + """ if there are instances already running, adopt them """ + return defer.succeed(0) + instance_names = [self._conn.lookupByID(x).name() + for x in self._conn.listDomainsID()] + for name in instance_names: + try: + new_inst = Instance.fromName(self._conn, name) + new_inst.update_state() + except: + pass + return defer.succeed(len(self._instances)) + + @exception.wrap_exception + def describe_instances(self): + retval = {} + for inst in self.instdir.by_node(FLAGS.node_name): + retval[inst['instance_id']] = ( + Instance.fromName(self._conn, inst['instance_id'])) + return retval + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except model.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield + + # @exception.wrap_exception + def run_instance(self, instance_id, **_kwargs): + """ launch a new instance with specified options """ + logging.debug("Starting instance %s..." % (instance_id)) + inst = self.instdir.get(instance_id) + if not FLAGS.simple_network: + # TODO: Get the real security group of launch in here + security_group = "default" + net = network.BridgedNetwork.get_network_for_project(inst['user_id'], + inst['project_id'], + security_group).express() + inst['node_name'] = FLAGS.node_name + inst.save() + # TODO(vish) check to make sure the availability zone matches + new_inst = Instance(self._conn, name=instance_id, data=inst) + logging.info("Instances current state is %s", new_inst.state) + if new_inst.is_running(): + raise exception.Error("Instance is already running") + d = new_inst.spawn() + return d + + @exception.wrap_exception + def terminate_instance(self, instance_id): + """ terminate an instance on this machine """ + logging.debug("Got told to terminate instance %s" % instance_id) + instance = self.get_instance(instance_id) + # inst = self.instdir.get(instance_id) + if not instance: + raise exception.Error( + 'trying to terminate unknown instance: %s' % instance_id) + d = instance.destroy() + # d.addCallback(lambda x: inst.destroy()) + return d + + @exception.wrap_exception + def reboot_instance(self, instance_id): + """ reboot an instance on this server + KVM doesn't support reboot, so we terminate and restart """ + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to reboot unknown instance: %s' % instance_id) + return instance.reboot() + + @defer.inlineCallbacks + @exception.wrap_exception + def get_console_output(self, instance_id): + """ send the console output for an instance """ + logging.debug("Getting console output for %s" % (instance_id)) + inst = self.instdir.get(instance_id) + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to get console log for unknown: %s' % instance_id) + rv = yield instance.console_output() + # TODO(termie): this stuff belongs in the API layer, no need to + # munge the data we send to ourselves + output = {"InstanceId" : instance_id, + "Timestamp" : "2", + "output" : base64.b64encode(rv)} + defer.returnValue(output) + + @defer.inlineCallbacks + @exception.wrap_exception + def attach_volume(self, instance_id = None, + volume_id = None, mountpoint = None): + volume = volume_service.get_volume(volume_id) + yield self._init_aoe() + yield process.simple_execute( + "sudo virsh attach-disk %s /dev/etherd/%s %s" % + (instance_id, + volume['aoe_device'], + mountpoint.rpartition('/dev/')[2])) + volume.finish_attach() + defer.returnValue(True) + + @defer.inlineCallbacks + def _init_aoe(self): + yield process.simple_execute("sudo aoe-discover") + yield process.simple_execute("sudo aoe-stat") + + @defer.inlineCallbacks + @exception.wrap_exception + def detach_volume(self, instance_id, volume_id): + """ detach a volume from an instance """ + # despite the documentation, virsh detach-disk just wants the device + # name without the leading /dev/ + volume = volume_service.get_volume(volume_id) + target = volume['mountpoint'].rpartition('/dev/')[2] + yield process.simple_execute( + "sudo virsh detach-disk %s %s " % (instance_id, target)) + volume.finish_detach() + defer.returnValue(True) + + +class Group(object): + def __init__(self, group_id): + self.group_id = group_id + + +class ProductCode(object): + def __init__(self, product_code): + self.product_code = product_code + + +class Instance(object): + + NOSTATE = 0x00 + RUNNING = 0x01 + BLOCKED = 0x02 + PAUSED = 0x03 + SHUTDOWN = 0x04 + SHUTOFF = 0x05 + CRASHED = 0x06 + + def __init__(self, conn, name, data): + """ spawn an instance with a given name """ + self._conn = conn + # TODO(vish): this can be removed after data has been updated + # data doesn't seem to have a working iterator so in doesn't work + if data.get('owner_id', None) is not None: + data['user_id'] = data['owner_id'] + data['project_id'] = data['owner_id'] + self.datamodel = data + + size = data.get('instance_type', FLAGS.default_instance_type) + if size not in INSTANCE_TYPES: + raise exception.Error('invalid instance type: %s' % size) + + self.datamodel.update(INSTANCE_TYPES[size]) + + self.datamodel['name'] = name + self.datamodel['instance_id'] = name + self.datamodel['basepath'] = data.get( + 'basepath', os.path.abspath( + os.path.join(FLAGS.instances_path, self.name))) + self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 + self.datamodel.setdefault('image_id', FLAGS.default_image) + self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) + self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) + self.datamodel.setdefault('project_id', self.datamodel['user_id']) + self.datamodel.setdefault('bridge_name', None) + #self.datamodel.setdefault('key_data', None) + #self.datamodel.setdefault('key_name', None) + #self.datamodel.setdefault('addressing_type', None) + + # TODO(joshua) - The ugly non-flat ones + self.datamodel['groups'] = data.get('security_group', 'default') + # TODO(joshua): Support product codes somehow + self.datamodel.setdefault('product_codes', None) + + self.datamodel.save() + logging.debug("Finished init of Instance with id of %s" % name) + + def toXml(self): + # TODO(termie): cache? + logging.debug("Starting the toXML method") + libvirt_xml = open(FLAGS.libvirt_xml_template).read() + xml_info = self.datamodel.copy() + # TODO(joshua): Make this xml express the attached disks as well + + # TODO(termie): lazy lazy hack because xml is annoying + xml_info['nova'] = json.dumps(self.datamodel.copy()) + libvirt_xml = libvirt_xml % xml_info + logging.debug("Finished the toXML method") + + return libvirt_xml + + @classmethod + def fromName(cls, conn, name): + """ use the saved data for reloading the instance """ + instdir = model.InstanceDirectory() + instance = instdir.get(name) + return cls(conn=conn, name=name, data=instance) + + def set_state(self, state_code, state_description=None): + self.datamodel['state'] = state_code + if not state_description: + state_description = STATE_NAMES[state_code] + self.datamodel['state_description'] = state_description + self.datamodel.save() + + @property + def state(self): + # it is a string in datamodel + return int(self.datamodel['state']) + + @property + def name(self): + return self.datamodel['name'] + + def is_pending(self): + return (self.state == Instance.NOSTATE or self.state == 'pending') + + def is_destroyed(self): + return self.state == Instance.SHUTOFF + + def is_running(self): + logging.debug("Instance state is: %s" % self.state) + return (self.state == Instance.RUNNING or self.state == 'running') + + def describe(self): + return self.datamodel + + def info(self): + logging.debug("Getting info for dom %s" % self.name) + virt_dom = self._conn.lookupByName(self.name) + (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() + return {'state': state, + 'max_mem': max_mem, + 'mem': mem, + 'num_cpu': num_cpu, + 'cpu_time': cpu_time, + 'node_name': FLAGS.node_name} + + def basepath(self, path=''): + return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) + + def update_state(self): + self.datamodel.update(self.info()) + self.set_state(self.state) + self.datamodel.save() # Extra, but harmless + + @exception.wrap_exception + def destroy(self): + if self.is_destroyed(): + self.datamodel.destroy() + raise exception.Error('trying to destroy already destroyed' + ' instance: %s' % self.name) + + self.set_state(Instance.NOSTATE, 'shutting_down') + try: + virt_dom = self._conn.lookupByName(self.name) + virt_dom.destroy() + except Exception, _err: + pass + # If the instance is already terminated, we're still happy + d = defer.Deferred() + d.addCallback(lambda x: self._cleanup()) + d.addCallback(lambda x: self.datamodel.destroy()) + # TODO(termie): short-circuit me for tests + # WE'LL save this for when we do shutdown, + # instead of destroy - but destroy returns immediately + timer = task.LoopingCall(f=None) + def _wait_for_shutdown(): + try: + self.update_state() + if self.state == Instance.SHUTDOWN: + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_shutdown + timer.start(interval=0.5, now=True) + return d + + def _cleanup(self): + target = os.path.abspath(self.datamodel['basepath']) + logging.info("Deleting instance files at %s", target) + shutil.rmtree(target) + + @defer.inlineCallbacks + @exception.wrap_exception + def reboot(self): + if not self.is_running(): + raise exception.Error( + 'trying to reboot a non-running' + 'instance: %s (state: %s)' % (self.name, self.state)) + + logging.debug('rebooting instance %s' % self.name) + self.set_state(Instance.NOSTATE, 'rebooting') + yield self._conn.lookupByName(self.name).destroy() + self._conn.createXML(self.toXml(), 0) + + d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_reboot(): + try: + self.update_state() + if self.is_running(): + logging.debug('rebooted instance %s' % self.name) + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_reboot + timer.start(interval=0.5, now=True) + yield d + + def _fetch_s3_image(self, image, path): + url = _image_url('%s/image' % image) + d = process.simple_execute( + 'curl --silent %s -o %s' % (url, path)) + return d + + def _fetch_local_image(self, image, path): + source = _image_path('%s/image' % image) + d = process.simple_execute('cp %s %s' % (source, path)) + return d + + @defer.inlineCallbacks + def _create_image(self, libvirt_xml): + # syntactic nicety + data = self.datamodel + basepath = self.basepath + + # ensure directories exist and are writable + yield process.simple_execute( + 'mkdir -p %s' % basepath()) + yield process.simple_execute( + 'chmod 0777 %s' % basepath()) + + + # TODO(termie): these are blocking calls, it would be great + # if they weren't. + logging.info('Creating image for: %s', data['instance_id']) + f = open(basepath('libvirt.xml'), 'w') + f.write(libvirt_xml) + f.close() + + if FLAGS.fake_libvirt: + logging.info('fake_libvirt, nothing to do for create_image') + raise defer.returnValue(None); + + if FLAGS.use_s3: + _fetch_file = self._fetch_s3_image + else: + _fetch_file = self._fetch_local_image + + if not os.path.exists(basepath('disk')): + yield _fetch_file(data['image_id'], basepath('disk-raw')) + if not os.path.exists(basepath('kernel')): + yield _fetch_file(data['kernel_id'], basepath('kernel')) + if not os.path.exists(basepath('ramdisk')): + yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) + + execute = lambda cmd, input=None: \ + process.simple_execute(cmd=cmd, + input=input, + error_ok=1) + + key = data['key_data'] + net = None + if FLAGS.simple_network: + with open(FLAGS.simple_network_template) as f: + net = f.read() % {'address': data['private_dns_name'], + 'network': FLAGS.simple_network_network, + 'netmask': FLAGS.simple_network_netmask, + 'gateway': FLAGS.simple_network_gateway, + 'broadcast': FLAGS.simple_network_broadcast, + 'dns': FLAGS.simple_network_dns} + if key or net: + logging.info('Injecting data into image %s', data['image_id']) + yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) + + if os.path.exists(basepath('disk')): + yield process.simple_execute( + 'rm -f %s' % basepath('disk')) + + bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] + * 1024 * 1024 * 1024) + yield disk.partition( + basepath('disk-raw'), basepath('disk'), bytes, execute=execute) + + @defer.inlineCallbacks + @exception.wrap_exception + def spawn(self): + self.set_state(Instance.NOSTATE, 'spawning') + logging.debug("Starting spawn in Instance") + + xml = self.toXml() + self.set_state(Instance.NOSTATE, 'launching') + logging.info('self %s', self) + try: + yield self._create_image(xml) + self._conn.createXML(xml, 0) + # TODO(termie): this should actually register + # a callback to check for successful boot + logging.debug("Instance is running") + + local_d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_boot(): + try: + self.update_state() + if self.is_running(): + logging.debug('booted instance %s' % self.name) + timer.stop() + local_d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + logging.error('Failed to boot instance %s' % self.name) + timer.stop() + local_d.callback(None) + timer.f = _wait_for_boot + timer.start(interval=0.5, now=True) + except Exception, ex: + logging.debug(ex) + self.set_state(Instance.SHUTDOWN) + + @exception.wrap_exception + def console_output(self): + if not FLAGS.fake_libvirt: + fname = os.path.abspath( + os.path.join(self.datamodel['basepath'], 'console.log')) + with open(fname, 'r') as f: + console = f.read() + else: + console = 'FAKE CONSOLE OUTPUT' + return defer.succeed(console) + +STATE_NAMES = { + Instance.NOSTATE : 'pending', + Instance.RUNNING : 'running', + Instance.BLOCKED : 'blocked', + Instance.PAUSED : 'paused', + Instance.SHUTDOWN : 'shutdown', + Instance.SHUTOFF : 'shutdown', + Instance.CRASHED : 'crashed', +} -- cgit From 74ce3aef4dafca8b0fc6bf0404725afdefe335ec Mon Sep 17 00:00:00 2001 From: Todd Willey Date: Mon, 26 Jul 2010 23:49:49 -0400 Subject: Give SessionToken an is_expired method --- nova/compute/model.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'nova/compute') diff --git a/nova/compute/model.py b/nova/compute/model.py index 7335d2c79..bae93b6c1 100644 --- a/nova/compute/model.py +++ b/nova/compute/model.py @@ -288,6 +288,11 @@ class SessionToken(datastore.BasicModel): expires = time + diff self['expiry'] = expires.strftime(utils.TIME_FORMAT) + def is_expired(self): + now = datetime.datetime.utcnow() + expires = utils.parse_isotime(self['expiry']) + return expires <= now + if __name__ == "__main__": import doctest -- cgit From ad7f099aefc17d04a2a04deb7fd3055adc8cd84a Mon Sep 17 00:00:00 2001 From: Todd Willey Date: Tue, 27 Jul 2010 01:03:05 -0400 Subject: Flag for SessionToken ttl setting. --- nova/compute/model.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'nova/compute') diff --git a/nova/compute/model.py b/nova/compute/model.py index bae93b6c1..212830d3c 100644 --- a/nova/compute/model.py +++ b/nova/compute/model.py @@ -235,6 +235,7 @@ class SessionToken(datastore.BasicModel): def __init__(self, session_token): self.token = session_token + self.default_ttl = FLAGS.auth_token_ttl super(SessionToken, self).__init__() @property @@ -243,7 +244,7 @@ class SessionToken(datastore.BasicModel): def default_state(self): now = datetime.datetime.utcnow() - diff = datetime.timedelta(hours=1) + diff = datetime.timedelta(seconds=self.default_ttl) expires = now + diff return {'user': None, 'session_type': None, 'token': self.token, 'expiry': expires.strftime(utils.TIME_FORMAT)} @@ -282,7 +283,7 @@ class SessionToken(datastore.BasicModel): def update_expiry(self, **kwargs): """updates the expirty attribute, but doesn't save""" if not kwargs: - kwargs['hours'] = 1 + kwargs['seconds'] = self.default_ttl time = datetime.datetime.utcnow() diff = datetime.timedelta(**kwargs) expires = time + diff @@ -293,6 +294,13 @@ class SessionToken(datastore.BasicModel): expires = utils.parse_isotime(self['expiry']) return expires <= now + def ttl(self): + """number of seconds remaining before expiration""" + now = datetime.datetime.utcnow() + expires = utils.parse_isotime(self['expiry']) + delta = expires - now + return (delta.seconds + (delta.days * 24 * 3600)) + if __name__ == "__main__": import doctest -- cgit From 9718f5216ce9423c002c47f86e05b2b2eb08e551 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Tue, 27 Jul 2010 14:16:49 -0700 Subject: properly delete old vlans assigned to deleted projects --- nova/compute/network.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'nova/compute') diff --git a/nova/compute/network.py b/nova/compute/network.py index b5b3c3b5d..62d892e58 100644 --- a/nova/compute/network.py +++ b/nova/compute/network.py @@ -144,7 +144,7 @@ class Vlan(datastore.BasicModel): @datastore.absorb_connection_error def destroy(self): set_name = self._redis_set_name(self.__class__.__name__) - datastore.Redis.instance().hdel(set_name, self.project) + datastore.Redis.instance().hdel(set_name, self.project_id) def subnet(self): vlan = int(self.vlan_id) @@ -529,6 +529,7 @@ def get_vlan_for_project(project_id): # don't orphan any VLANs. It is basically # garbage collection for after projects abandoned # their reference. + vlan.destroy() vlan.project_id = project_id vlan.save() return vlan -- cgit