From 3b982f62bb7cb92cbe9e96a1bde4410b3d06f997 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Wed, 21 Jul 2010 14:42:22 -0500 Subject: refactor daemons to use common base class in preparation for network refactor --- bin/nova-compute | 68 +---- bin/nova-network | 32 +++ bin/nova-volume | 68 +---- nova/compute/computenode.py | 582 +++++++++++++++++++++++++++++++++++++++++ nova/compute/node.py | 581 ---------------------------------------- nova/endpoint/cloud.py | 18 +- nova/endpoint/rackspace.py | 1 - nova/flags.py | 4 +- nova/network/__init__.py | 32 +++ nova/network/networknode.py | 35 +++ nova/node.py | 103 ++++++++ nova/tests/cloud_unittest.py | 4 +- nova/tests/compute_unittest.py | 128 +++++++++ nova/tests/future_unittest.py | 75 ------ nova/tests/model_unittest.py | 1 - nova/tests/node_unittest.py | 128 --------- nova/tests/storage_unittest.py | 115 -------- nova/tests/volume_unittest.py | 115 ++++++++ nova/twistd.py | 12 +- nova/volume/storage.py | 321 ----------------------- nova/volume/volumenode.py | 305 +++++++++++++++++++++ run_tests.py | 4 +- 22 files changed, 1363 insertions(+), 1369 deletions(-) create mode 100644 bin/nova-network create mode 100644 nova/compute/computenode.py delete mode 100644 nova/compute/node.py create mode 100644 nova/network/__init__.py create mode 100644 nova/network/networknode.py create mode 100644 nova/node.py create mode 100644 nova/tests/compute_unittest.py delete mode 100644 nova/tests/future_unittest.py delete mode 100644 nova/tests/node_unittest.py delete mode 100644 nova/tests/storage_unittest.py create mode 100644 nova/tests/volume_unittest.py delete mode 100644 nova/volume/storage.py create mode 100644 nova/volume/volumenode.py diff --git a/bin/nova-compute b/bin/nova-compute index 49710e1b3..67c93fcb8 100755 --- a/bin/nova-compute +++ b/bin/nova-compute @@ -19,80 +19,14 @@ """ Twistd daemon for the nova compute nodes. - Receives messages via AMQP, manages pool of worker threads - for async tasks. """ -import logging -import os -import sys - -# NOTE(termie): kludge so that we can run this from the bin directory in the -# checkout without having to screw with paths -NOVA_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'nova') -if os.path.exists(NOVA_PATH): - sys.path.insert(0, os.path.dirname(NOVA_PATH)) - -from twisted.internet import task -from twisted.application import service - -from nova import flags -from nova import rpc from nova import twistd from nova.compute import node -FLAGS = flags.FLAGS -# NOTE(termie): This file will necessarily be re-imported under different -# context when the twistd.serve() call is made below so any -# flags we define here will have to be conditionally defined, -# flags defined by imported modules are safe. -if 'compute_report_state_interval' not in FLAGS: - flags.DEFINE_integer('compute_report_state_interval', 10, - 'seconds between nodes reporting state to cloud', - lower_bound=1) -logging.getLogger().setLevel(logging.DEBUG) - -def main(): - logging.warn('Starting compute node') - n = node.Node() - d = n.adopt_instances() - d.addCallback(lambda x: logging.info('Adopted %d instances', x)) - - conn = rpc.Connection.instance() - consumer_all = rpc.AdapterConsumer( - connection=conn, - topic='%s' % FLAGS.compute_topic, - proxy=n) - - consumer_node = rpc.AdapterConsumer( - connection=conn, - topic='%s.%s' % (FLAGS.compute_topic, FLAGS.node_name), - proxy=n) - - bin_name = os.path.basename(__file__) - pulse = task.LoopingCall(n.report_state, FLAGS.node_name, bin_name) - pulse.start(interval=FLAGS.compute_report_state_interval, now=False) - - consumer_all.attach_to_twisted() - consumer_node.attach_to_twisted() - - # This is the parent service that twistd will be looking for when it - # parses this file, return it so that we can get it into globals below - application = service.Application(bin_name) - n.setServiceParent(application) - return application - - -# NOTE(termie): When this script is executed from the commandline what it will -# actually do is tell the twistd application runner that it -# should run this file as a twistd application (see below). if __name__ == '__main__': twistd.serve(__file__) -# NOTE(termie): When this script is loaded by the twistd application runner -# this code path will be executed and twistd will expect a -# variable named 'application' to be available, it will then -# handle starting it and stopping it. if __name__ == '__builtin__': - application = main() + application = node.ComputeNode.create() diff --git a/bin/nova-network b/bin/nova-network new file mode 100644 index 000000000..c69690081 --- /dev/null +++ b/bin/nova-network @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" + Twistd daemon for the nova network nodes. +""" + +from nova import twistd +from nova.network import node + + +if __name__ == '__main__': + twistd.serve(__file__) + +if __name__ == '__builtin__': + application = node.NetworkNode.create() diff --git a/bin/nova-volume b/bin/nova-volume index 7d4b65205..cdf2782bc 100755 --- a/bin/nova-volume +++ b/bin/nova-volume @@ -18,77 +18,15 @@ # under the License. """ - Tornado Storage daemon manages AoE volumes via AMQP messaging. + Twistd daemon for the nova volume nodes. """ -import logging -import os -import sys - -# NOTE(termie): kludge so that we can run this from the bin directory in the -# checkout without having to screw with paths -NOVA_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'nova') -if os.path.exists(NOVA_PATH): - sys.path.insert(0, os.path.dirname(NOVA_PATH)) - -from twisted.internet import task -from twisted.application import service - -from nova import flags -from nova import rpc from nova import twistd -from nova.volume import storage - - -FLAGS = flags.FLAGS -# NOTE(termie): This file will necessarily be re-imported under different -# context when the twistd.serve() call is made below so any -# flags we define here will have to be conditionally defined, -# flags defined by imported modules are safe. -if 'volume_report_state_interval' not in FLAGS: - flags.DEFINE_integer('volume_report_state_interval', 10, - 'seconds between nodes reporting state to cloud', - lower_bound=1) - - -def main(): - logging.warn('Starting volume node') - bs = storage.BlockStore() - - conn = rpc.Connection.instance() - consumer_all = rpc.AdapterConsumer( - connection=conn, - topic='%s' % FLAGS.storage_topic, - proxy=bs) - - consumer_node = rpc.AdapterConsumer( - connection=conn, - topic='%s.%s' % (FLAGS.storage_topic, FLAGS.node_name), - proxy=bs) - - bin_name = os.path.basename(__file__) - pulse = task.LoopingCall(bs.report_state, FLAGS.node_name, bin_name) - pulse.start(interval=FLAGS.volume_report_state_interval, now=False) - - consumer_all.attach_to_twisted() - consumer_node.attach_to_twisted() - - # This is the parent service that twistd will be looking for when it - # parses this file, return it so that we can get it into globals below - application = service.Application(bin_name) - bs.setServiceParent(application) - return application +from nova.volume import node -# NOTE(termie): When this script is executed from the commandline what it will -# actually do is tell the twistd application runner that it -# should run this file as a twistd application (see below). if __name__ == '__main__': twistd.serve(__file__) -# NOTE(termie): When this script is loaded by the twistd application runner -# this code path will be executed and twistd will expect a -# variable named 'application' to be available, it will then -# handle starting it and stopping it. if __name__ == '__builtin__': - application = main() + application = node.VolumeNode.create() diff --git a/nova/compute/computenode.py b/nova/compute/computenode.py new file mode 100644 index 000000000..f26da1996 --- /dev/null +++ b/nova/compute/computenode.py @@ -0,0 +1,582 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Compute Node: + + Runs on each compute node, managing the + hypervisor using libvirt. + +""" + +import base64 +import json +import logging +import os +import shutil +import sys +from twisted.internet import defer +from twisted.internet import task +from twisted.application import service + + +try: + import libvirt +except Exception, err: + logging.warning('no libvirt found') + +from nova import exception +from nova import fakevirt +from nova import flags +from nova import node +from nova import process +from nova import utils +from nova.compute import disk +from nova.compute import model +from nova.compute import network +from nova.objectstore import image # for image_path flag +from nova.volume import volumenode + + +FLAGS = flags.FLAGS +flags.DEFINE_string('libvirt_xml_template', + utils.abspath('compute/libvirt.xml.template'), + 'Libvirt XML Template') +flags.DEFINE_bool('use_s3', True, + 'whether to get images from s3 or use local copy') +flags.DEFINE_string('instances_path', utils.abspath('../instances'), + 'where instances are stored on disk') + +INSTANCE_TYPES = {} +INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} +INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} +INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} +INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} + + +def _image_path(path=''): + return os.path.join(FLAGS.images_path, path) + + +def _image_url(path): + return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) + + +class ComputeNode(node.Node): + """ + Manages the running instances. + """ + def __init__(self): + """ load configuration options for this node and connect to libvirt """ + super(ComputeNode, self).__init__() + self._instances = {} + self._conn = self._get_connection() + self.instdir = model.InstanceDirectory() + # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe + + def _get_connection(self): + """ returns a libvirt connection object """ + # TODO(termie): maybe lazy load after initial check for permissions + # TODO(termie): check whether we can be disconnected + if FLAGS.fake_libvirt: + conn = fakevirt.FakeVirtConnection.instance() + else: + auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], + 'root', + None] + conn = libvirt.openAuth('qemu:///system', auth, 0) + if conn == None: + logging.error('Failed to open connection to the hypervisor') + sys.exit(1) + return conn + + def noop(self): + """ simple test of an AMQP message call """ + return defer.succeed('PONG') + + def get_instance(self, instance_id): + # inst = self.instdir.get(instance_id) + # return inst + if self.instdir.exists(instance_id): + return Instance.fromName(self._conn, instance_id) + return None + + @exception.wrap_exception + def adopt_instances(self): + """ if there are instances already running, adopt them """ + return defer.succeed(0) + instance_names = [self._conn.lookupByID(x).name() + for x in self._conn.listDomainsID()] + for name in instance_names: + try: + new_inst = Instance.fromName(self._conn, name) + new_inst.update_state() + except: + pass + return defer.succeed(len(self._instances)) + + @exception.wrap_exception + def describe_instances(self): + retval = {} + for inst in self.instdir.by_node(FLAGS.node_name): + retval[inst['instance_id']] = ( + Instance.fromName(self._conn, inst['instance_id'])) + return retval + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except model.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield + + # @exception.wrap_exception + def run_instance(self, instance_id, **_kwargs): + """ launch a new instance with specified options """ + logging.debug("Starting instance %s..." % (instance_id)) + inst = self.instdir.get(instance_id) + if not FLAGS.simple_network: + # TODO: Get the real security group of launch in here + security_group = "default" + net = network.BridgedNetwork.get_network_for_project(inst['user_id'], + inst['project_id'], + security_group).express() + inst['node_name'] = FLAGS.node_name + inst.save() + # TODO(vish) check to make sure the availability zone matches + new_inst = Instance(self._conn, name=instance_id, data=inst) + logging.info("Instances current state is %s", new_inst.state) + if new_inst.is_running(): + raise exception.Error("Instance is already running") + d = new_inst.spawn() + return d + + @exception.wrap_exception + def terminate_instance(self, instance_id): + """ terminate an instance on this machine """ + logging.debug("Got told to terminate instance %s" % instance_id) + instance = self.get_instance(instance_id) + # inst = self.instdir.get(instance_id) + if not instance: + raise exception.Error( + 'trying to terminate unknown instance: %s' % instance_id) + d = instance.destroy() + # d.addCallback(lambda x: inst.destroy()) + return d + + @exception.wrap_exception + def reboot_instance(self, instance_id): + """ reboot an instance on this server + KVM doesn't support reboot, so we terminate and restart """ + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to reboot unknown instance: %s' % instance_id) + return instance.reboot() + + @defer.inlineCallbacks + @exception.wrap_exception + def get_console_output(self, instance_id): + """ send the console output for an instance """ + logging.debug("Getting console output for %s" % (instance_id)) + inst = self.instdir.get(instance_id) + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to get console log for unknown: %s' % instance_id) + rv = yield instance.console_output() + # TODO(termie): this stuff belongs in the API layer, no need to + # munge the data we send to ourselves + output = {"InstanceId" : instance_id, + "Timestamp" : "2", + "output" : base64.b64encode(rv)} + defer.returnValue(output) + + @defer.inlineCallbacks + @exception.wrap_exception + def attach_volume(self, instance_id = None, + volume_id = None, mountpoint = None): + volume = volumenode.get_volume(volume_id) + yield self._init_aoe() + yield process.simple_execute( + "sudo virsh attach-disk %s /dev/etherd/%s %s" % + (instance_id, + volume['aoe_device'], + mountpoint.rpartition('/dev/')[2])) + volume.finish_attach() + defer.returnValue(True) + + @defer.inlineCallbacks + def _init_aoe(self): + yield process.simple_execute("sudo aoe-discover") + yield process.simple_execute("sudo aoe-stat") + + @defer.inlineCallbacks + @exception.wrap_exception + def detach_volume(self, instance_id, volume_id): + """ detach a volume from an instance """ + # despite the documentation, virsh detach-disk just wants the device + # name without the leading /dev/ + volume = volumenode.get_volume(volume_id) + target = volume['mountpoint'].rpartition('/dev/')[2] + yield process.simple_execute( + "sudo virsh detach-disk %s %s " % (instance_id, target)) + volume.finish_detach() + defer.returnValue(True) + + +class Group(object): + def __init__(self, group_id): + self.group_id = group_id + + +class ProductCode(object): + def __init__(self, product_code): + self.product_code = product_code + + +class Instance(object): + + NOSTATE = 0x00 + RUNNING = 0x01 + BLOCKED = 0x02 + PAUSED = 0x03 + SHUTDOWN = 0x04 + SHUTOFF = 0x05 + CRASHED = 0x06 + + def __init__(self, conn, name, data): + """ spawn an instance with a given name """ + self._conn = conn + # TODO(vish): this can be removed after data has been updated + # data doesn't seem to have a working iterator so in doesn't work + if data.get('owner_id', None) is not None: + data['user_id'] = data['owner_id'] + data['project_id'] = data['owner_id'] + self.datamodel = data + + size = data.get('instance_type', FLAGS.default_instance_type) + if size not in INSTANCE_TYPES: + raise exception.Error('invalid instance type: %s' % size) + + self.datamodel.update(INSTANCE_TYPES[size]) + + self.datamodel['name'] = name + self.datamodel['instance_id'] = name + self.datamodel['basepath'] = data.get( + 'basepath', os.path.abspath( + os.path.join(FLAGS.instances_path, self.name))) + self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 + self.datamodel.setdefault('image_id', FLAGS.default_image) + self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) + self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) + self.datamodel.setdefault('project_id', self.datamodel['user_id']) + self.datamodel.setdefault('bridge_name', None) + #self.datamodel.setdefault('key_data', None) + #self.datamodel.setdefault('key_name', None) + #self.datamodel.setdefault('addressing_type', None) + + # TODO(joshua) - The ugly non-flat ones + self.datamodel['groups'] = data.get('security_group', 'default') + # TODO(joshua): Support product codes somehow + self.datamodel.setdefault('product_codes', None) + + self.datamodel.save() + logging.debug("Finished init of Instance with id of %s" % name) + + def toXml(self): + # TODO(termie): cache? + logging.debug("Starting the toXML method") + libvirt_xml = open(FLAGS.libvirt_xml_template).read() + xml_info = self.datamodel.copy() + # TODO(joshua): Make this xml express the attached disks as well + + # TODO(termie): lazy lazy hack because xml is annoying + xml_info['nova'] = json.dumps(self.datamodel.copy()) + libvirt_xml = libvirt_xml % xml_info + logging.debug("Finished the toXML method") + + return libvirt_xml + + @classmethod + def fromName(cls, conn, name): + """ use the saved data for reloading the instance """ + instdir = model.InstanceDirectory() + instance = instdir.get(name) + return cls(conn=conn, name=name, data=instance) + + def set_state(self, state_code, state_description=None): + self.datamodel['state'] = state_code + if not state_description: + state_description = STATE_NAMES[state_code] + self.datamodel['state_description'] = state_description + self.datamodel.save() + + @property + def state(self): + # it is a string in datamodel + return int(self.datamodel['state']) + + @property + def name(self): + return self.datamodel['name'] + + def is_pending(self): + return (self.state == Instance.NOSTATE or self.state == 'pending') + + def is_destroyed(self): + return self.state == Instance.SHUTOFF + + def is_running(self): + logging.debug("Instance state is: %s" % self.state) + return (self.state == Instance.RUNNING or self.state == 'running') + + def describe(self): + return self.datamodel + + def info(self): + logging.debug("Getting info for dom %s" % self.name) + virt_dom = self._conn.lookupByName(self.name) + (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() + return {'state': state, + 'max_mem': max_mem, + 'mem': mem, + 'num_cpu': num_cpu, + 'cpu_time': cpu_time, + 'node_name': FLAGS.node_name} + + def basepath(self, path=''): + return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) + + def update_state(self): + self.datamodel.update(self.info()) + self.set_state(self.state) + self.datamodel.save() # Extra, but harmless + + @exception.wrap_exception + def destroy(self): + if self.is_destroyed(): + self.datamodel.destroy() + raise exception.Error('trying to destroy already destroyed' + ' instance: %s' % self.name) + + self.set_state(Instance.NOSTATE, 'shutting_down') + try: + virt_dom = self._conn.lookupByName(self.name) + virt_dom.destroy() + except Exception, _err: + pass + # If the instance is already terminated, we're still happy + d = defer.Deferred() + d.addCallback(lambda x: self._cleanup()) + d.addCallback(lambda x: self.datamodel.destroy()) + # TODO(termie): short-circuit me for tests + # WE'LL save this for when we do shutdown, + # instead of destroy - but destroy returns immediately + timer = task.LoopingCall(f=None) + def _wait_for_shutdown(): + try: + self.update_state() + if self.state == Instance.SHUTDOWN: + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_shutdown + timer.start(interval=0.5, now=True) + return d + + def _cleanup(self): + target = os.path.abspath(self.datamodel['basepath']) + logging.info("Deleting instance files at %s", target) + shutil.rmtree(target) + + @defer.inlineCallbacks + @exception.wrap_exception + def reboot(self): + if not self.is_running(): + raise exception.Error( + 'trying to reboot a non-running' + 'instance: %s (state: %s)' % (self.name, self.state)) + + logging.debug('rebooting instance %s' % self.name) + self.set_state(Instance.NOSTATE, 'rebooting') + yield self._conn.lookupByName(self.name).destroy() + self._conn.createXML(self.toXml(), 0) + + d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_reboot(): + try: + self.update_state() + if self.is_running(): + logging.debug('rebooted instance %s' % self.name) + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_reboot + timer.start(interval=0.5, now=True) + yield d + + def _fetch_s3_image(self, image, path): + url = _image_url('%s/image' % image) + d = process.simple_execute( + 'curl --silent %s -o %s' % (url, path)) + return d + + def _fetch_local_image(self, image, path): + source = _image_path('%s/image' % image) + d = process.simple_execute('cp %s %s' % (source, path)) + return d + + @defer.inlineCallbacks + def _create_image(self, libvirt_xml): + # syntactic nicety + data = self.datamodel + basepath = self.basepath + + # ensure directories exist and are writable + yield process.simple_execute( + 'mkdir -p %s' % basepath()) + yield process.simple_execute( + 'chmod 0777 %s' % basepath()) + + + # TODO(termie): these are blocking calls, it would be great + # if they weren't. + logging.info('Creating image for: %s', data['instance_id']) + f = open(basepath('libvirt.xml'), 'w') + f.write(libvirt_xml) + f.close() + + if FLAGS.fake_libvirt: + logging.info('fake_libvirt, nothing to do for create_image') + raise defer.returnValue(None); + + if FLAGS.use_s3: + _fetch_file = self._fetch_s3_image + else: + _fetch_file = self._fetch_local_image + + if not os.path.exists(basepath('disk')): + yield _fetch_file(data['image_id'], basepath('disk-raw')) + if not os.path.exists(basepath('kernel')): + yield _fetch_file(data['kernel_id'], basepath('kernel')) + if not os.path.exists(basepath('ramdisk')): + yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) + + execute = lambda cmd, input=None: \ + process.simple_execute(cmd=cmd, + input=input, + error_ok=1) + + key = data['key_data'] + net = None + if FLAGS.simple_network: + with open(FLAGS.simple_network_template) as f: + net = f.read() % {'address': data['private_dns_name'], + 'network': FLAGS.simple_network_network, + 'netmask': FLAGS.simple_network_netmask, + 'gateway': FLAGS.simple_network_gateway, + 'broadcast': FLAGS.simple_network_broadcast, + 'dns': FLAGS.simple_network_dns} + if key or net: + logging.info('Injecting data into image %s', data['image_id']) + yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) + + if os.path.exists(basepath('disk')): + yield process.simple_execute( + 'rm -f %s' % basepath('disk')) + + bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] + * 1024 * 1024 * 1024) + yield disk.partition( + basepath('disk-raw'), basepath('disk'), bytes, execute=execute) + + @defer.inlineCallbacks + @exception.wrap_exception + def spawn(self): + self.set_state(Instance.NOSTATE, 'spawning') + logging.debug("Starting spawn in Instance") + + xml = self.toXml() + self.set_state(Instance.NOSTATE, 'launching') + logging.info('self %s', self) + try: + yield self._create_image(xml) + self._conn.createXML(xml, 0) + # TODO(termie): this should actually register + # a callback to check for successful boot + logging.debug("Instance is running") + + local_d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_boot(): + try: + self.update_state() + if self.is_running(): + logging.debug('booted instance %s' % self.name) + timer.stop() + local_d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + logging.error('Failed to boot instance %s' % self.name) + timer.stop() + local_d.callback(None) + timer.f = _wait_for_boot + timer.start(interval=0.5, now=True) + except Exception, ex: + logging.debug(ex) + self.set_state(Instance.SHUTDOWN) + + @exception.wrap_exception + def console_output(self): + if not FLAGS.fake_libvirt: + fname = os.path.abspath( + os.path.join(self.datamodel['basepath'], 'console.log')) + with open(fname, 'r') as f: + console = f.read() + else: + console = 'FAKE CONSOLE OUTPUT' + return defer.succeed(console) + +STATE_NAMES = { + Instance.NOSTATE : 'pending', + Instance.RUNNING : 'running', + Instance.BLOCKED : 'blocked', + Instance.PAUSED : 'paused', + Instance.SHUTDOWN : 'shutdown', + Instance.SHUTOFF : 'shutdown', + Instance.CRASHED : 'crashed', +} diff --git a/nova/compute/node.py b/nova/compute/node.py deleted file mode 100644 index 7cae86d02..000000000 --- a/nova/compute/node.py +++ /dev/null @@ -1,581 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Compute Node: - - Runs on each compute node, managing the - hypervisor using libvirt. - -""" - -import base64 -import json -import logging -import os -import shutil -import sys -from twisted.internet import defer -from twisted.internet import task -from twisted.application import service - - -try: - import libvirt -except Exception, err: - logging.warning('no libvirt found') - -from nova import exception -from nova import fakevirt -from nova import flags -from nova import process -from nova import utils -from nova.compute import disk -from nova.compute import model -from nova.compute import network -from nova.objectstore import image # for image_path flag -from nova.volume import storage - - -FLAGS = flags.FLAGS -flags.DEFINE_string('libvirt_xml_template', - utils.abspath('compute/libvirt.xml.template'), - 'Libvirt XML Template') -flags.DEFINE_bool('use_s3', True, - 'whether to get images from s3 or use local copy') -flags.DEFINE_string('instances_path', utils.abspath('../instances'), - 'where instances are stored on disk') - -INSTANCE_TYPES = {} -INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} -INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} -INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} -INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} - - -def _image_path(path=''): - return os.path.join(FLAGS.images_path, path) - - -def _image_url(path): - return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) - - -class Node(object, service.Service): - """ - Manages the running instances. - """ - def __init__(self): - """ load configuration options for this node and connect to libvirt """ - super(Node, self).__init__() - self._instances = {} - self._conn = self._get_connection() - self.instdir = model.InstanceDirectory() - # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe - - def _get_connection(self): - """ returns a libvirt connection object """ - # TODO(termie): maybe lazy load after initial check for permissions - # TODO(termie): check whether we can be disconnected - if FLAGS.fake_libvirt: - conn = fakevirt.FakeVirtConnection.instance() - else: - auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], - 'root', - None] - conn = libvirt.openAuth('qemu:///system', auth, 0) - if conn == None: - logging.error('Failed to open connection to the hypervisor') - sys.exit(1) - return conn - - def noop(self): - """ simple test of an AMQP message call """ - return defer.succeed('PONG') - - def get_instance(self, instance_id): - # inst = self.instdir.get(instance_id) - # return inst - if self.instdir.exists(instance_id): - return Instance.fromName(self._conn, instance_id) - return None - - @exception.wrap_exception - def adopt_instances(self): - """ if there are instances already running, adopt them """ - return defer.succeed(0) - instance_names = [self._conn.lookupByID(x).name() - for x in self._conn.listDomainsID()] - for name in instance_names: - try: - new_inst = Instance.fromName(self._conn, name) - new_inst.update_state() - except: - pass - return defer.succeed(len(self._instances)) - - @exception.wrap_exception - def describe_instances(self): - retval = {} - for inst in self.instdir.by_node(FLAGS.node_name): - retval[inst['instance_id']] = ( - Instance.fromName(self._conn, inst['instance_id'])) - return retval - - @defer.inlineCallbacks - def report_state(self, nodename, daemon): - # TODO(termie): make this pattern be more elegant. -todd - try: - record = model.Daemon(nodename, daemon) - record.heartbeat() - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - except model.ConnectionError, ex: - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - yield - - # @exception.wrap_exception - def run_instance(self, instance_id, **_kwargs): - """ launch a new instance with specified options """ - logging.debug("Starting instance %s..." % (instance_id)) - inst = self.instdir.get(instance_id) - if not FLAGS.simple_network: - # TODO: Get the real security group of launch in here - security_group = "default" - net = network.BridgedNetwork.get_network_for_project(inst['user_id'], - inst['project_id'], - security_group).express() - inst['node_name'] = FLAGS.node_name - inst.save() - # TODO(vish) check to make sure the availability zone matches - new_inst = Instance(self._conn, name=instance_id, data=inst) - logging.info("Instances current state is %s", new_inst.state) - if new_inst.is_running(): - raise exception.Error("Instance is already running") - d = new_inst.spawn() - return d - - @exception.wrap_exception - def terminate_instance(self, instance_id): - """ terminate an instance on this machine """ - logging.debug("Got told to terminate instance %s" % instance_id) - instance = self.get_instance(instance_id) - # inst = self.instdir.get(instance_id) - if not instance: - raise exception.Error( - 'trying to terminate unknown instance: %s' % instance_id) - d = instance.destroy() - # d.addCallback(lambda x: inst.destroy()) - return d - - @exception.wrap_exception - def reboot_instance(self, instance_id): - """ reboot an instance on this server - KVM doesn't support reboot, so we terminate and restart """ - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to reboot unknown instance: %s' % instance_id) - return instance.reboot() - - @defer.inlineCallbacks - @exception.wrap_exception - def get_console_output(self, instance_id): - """ send the console output for an instance """ - logging.debug("Getting console output for %s" % (instance_id)) - inst = self.instdir.get(instance_id) - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to get console log for unknown: %s' % instance_id) - rv = yield instance.console_output() - # TODO(termie): this stuff belongs in the API layer, no need to - # munge the data we send to ourselves - output = {"InstanceId" : instance_id, - "Timestamp" : "2", - "output" : base64.b64encode(rv)} - defer.returnValue(output) - - @defer.inlineCallbacks - @exception.wrap_exception - def attach_volume(self, instance_id = None, - volume_id = None, mountpoint = None): - volume = storage.get_volume(volume_id) - yield self._init_aoe() - yield process.simple_execute( - "sudo virsh attach-disk %s /dev/etherd/%s %s" % - (instance_id, - volume['aoe_device'], - mountpoint.rpartition('/dev/')[2])) - volume.finish_attach() - defer.returnValue(True) - - @defer.inlineCallbacks - def _init_aoe(self): - yield process.simple_execute("sudo aoe-discover") - yield process.simple_execute("sudo aoe-stat") - - @defer.inlineCallbacks - @exception.wrap_exception - def detach_volume(self, instance_id, volume_id): - """ detach a volume from an instance """ - # despite the documentation, virsh detach-disk just wants the device - # name without the leading /dev/ - volume = storage.get_volume(volume_id) - target = volume['mountpoint'].rpartition('/dev/')[2] - yield process.simple_execute( - "sudo virsh detach-disk %s %s " % (instance_id, target)) - volume.finish_detach() - defer.returnValue(True) - - -class Group(object): - def __init__(self, group_id): - self.group_id = group_id - - -class ProductCode(object): - def __init__(self, product_code): - self.product_code = product_code - - -class Instance(object): - - NOSTATE = 0x00 - RUNNING = 0x01 - BLOCKED = 0x02 - PAUSED = 0x03 - SHUTDOWN = 0x04 - SHUTOFF = 0x05 - CRASHED = 0x06 - - def __init__(self, conn, name, data): - """ spawn an instance with a given name """ - self._conn = conn - # TODO(vish): this can be removed after data has been updated - # data doesn't seem to have a working iterator so in doesn't work - if data.get('owner_id', None) is not None: - data['user_id'] = data['owner_id'] - data['project_id'] = data['owner_id'] - self.datamodel = data - - size = data.get('instance_type', FLAGS.default_instance_type) - if size not in INSTANCE_TYPES: - raise exception.Error('invalid instance type: %s' % size) - - self.datamodel.update(INSTANCE_TYPES[size]) - - self.datamodel['name'] = name - self.datamodel['instance_id'] = name - self.datamodel['basepath'] = data.get( - 'basepath', os.path.abspath( - os.path.join(FLAGS.instances_path, self.name))) - self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 - self.datamodel.setdefault('image_id', FLAGS.default_image) - self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) - self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) - self.datamodel.setdefault('project_id', self.datamodel['user_id']) - self.datamodel.setdefault('bridge_name', None) - #self.datamodel.setdefault('key_data', None) - #self.datamodel.setdefault('key_name', None) - #self.datamodel.setdefault('addressing_type', None) - - # TODO(joshua) - The ugly non-flat ones - self.datamodel['groups'] = data.get('security_group', 'default') - # TODO(joshua): Support product codes somehow - self.datamodel.setdefault('product_codes', None) - - self.datamodel.save() - logging.debug("Finished init of Instance with id of %s" % name) - - def toXml(self): - # TODO(termie): cache? - logging.debug("Starting the toXML method") - libvirt_xml = open(FLAGS.libvirt_xml_template).read() - xml_info = self.datamodel.copy() - # TODO(joshua): Make this xml express the attached disks as well - - # TODO(termie): lazy lazy hack because xml is annoying - xml_info['nova'] = json.dumps(self.datamodel.copy()) - libvirt_xml = libvirt_xml % xml_info - logging.debug("Finished the toXML method") - - return libvirt_xml - - @classmethod - def fromName(cls, conn, name): - """ use the saved data for reloading the instance """ - instdir = model.InstanceDirectory() - instance = instdir.get(name) - return cls(conn=conn, name=name, data=instance) - - def set_state(self, state_code, state_description=None): - self.datamodel['state'] = state_code - if not state_description: - state_description = STATE_NAMES[state_code] - self.datamodel['state_description'] = state_description - self.datamodel.save() - - @property - def state(self): - # it is a string in datamodel - return int(self.datamodel['state']) - - @property - def name(self): - return self.datamodel['name'] - - def is_pending(self): - return (self.state == Instance.NOSTATE or self.state == 'pending') - - def is_destroyed(self): - return self.state == Instance.SHUTOFF - - def is_running(self): - logging.debug("Instance state is: %s" % self.state) - return (self.state == Instance.RUNNING or self.state == 'running') - - def describe(self): - return self.datamodel - - def info(self): - logging.debug("Getting info for dom %s" % self.name) - virt_dom = self._conn.lookupByName(self.name) - (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() - return {'state': state, - 'max_mem': max_mem, - 'mem': mem, - 'num_cpu': num_cpu, - 'cpu_time': cpu_time, - 'node_name': FLAGS.node_name} - - def basepath(self, path=''): - return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) - - def update_state(self): - self.datamodel.update(self.info()) - self.set_state(self.state) - self.datamodel.save() # Extra, but harmless - - @exception.wrap_exception - def destroy(self): - if self.is_destroyed(): - self.datamodel.destroy() - raise exception.Error('trying to destroy already destroyed' - ' instance: %s' % self.name) - - self.set_state(Instance.NOSTATE, 'shutting_down') - try: - virt_dom = self._conn.lookupByName(self.name) - virt_dom.destroy() - except Exception, _err: - pass - # If the instance is already terminated, we're still happy - d = defer.Deferred() - d.addCallback(lambda x: self._cleanup()) - d.addCallback(lambda x: self.datamodel.destroy()) - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, - # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) - def _wait_for_shutdown(): - try: - self.update_state() - if self.state == Instance.SHUTDOWN: - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d - - def _cleanup(self): - target = os.path.abspath(self.datamodel['basepath']) - logging.info("Deleting instance files at %s", target) - shutil.rmtree(target) - - @defer.inlineCallbacks - @exception.wrap_exception - def reboot(self): - if not self.is_running(): - raise exception.Error( - 'trying to reboot a non-running' - 'instance: %s (state: %s)' % (self.name, self.state)) - - logging.debug('rebooting instance %s' % self.name) - self.set_state(Instance.NOSTATE, 'rebooting') - yield self._conn.lookupByName(self.name).destroy() - self._conn.createXML(self.toXml(), 0) - - d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_reboot(): - try: - self.update_state() - if self.is_running(): - logging.debug('rebooted instance %s' % self.name) - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d - - def _fetch_s3_image(self, image, path): - url = _image_url('%s/image' % image) - d = process.simple_execute( - 'curl --silent %s -o %s' % (url, path)) - return d - - def _fetch_local_image(self, image, path): - source = _image_path('%s/image' % image) - d = process.simple_execute('cp %s %s' % (source, path)) - return d - - @defer.inlineCallbacks - def _create_image(self, libvirt_xml): - # syntactic nicety - data = self.datamodel - basepath = self.basepath - - # ensure directories exist and are writable - yield process.simple_execute( - 'mkdir -p %s' % basepath()) - yield process.simple_execute( - 'chmod 0777 %s' % basepath()) - - - # TODO(termie): these are blocking calls, it would be great - # if they weren't. - logging.info('Creating image for: %s', data['instance_id']) - f = open(basepath('libvirt.xml'), 'w') - f.write(libvirt_xml) - f.close() - - if FLAGS.fake_libvirt: - logging.info('fake_libvirt, nothing to do for create_image') - raise defer.returnValue(None); - - if FLAGS.use_s3: - _fetch_file = self._fetch_s3_image - else: - _fetch_file = self._fetch_local_image - - if not os.path.exists(basepath('disk')): - yield _fetch_file(data['image_id'], basepath('disk-raw')) - if not os.path.exists(basepath('kernel')): - yield _fetch_file(data['kernel_id'], basepath('kernel')) - if not os.path.exists(basepath('ramdisk')): - yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) - - execute = lambda cmd, input=None: \ - process.simple_execute(cmd=cmd, - input=input, - error_ok=1) - - key = data['key_data'] - net = None - if FLAGS.simple_network: - with open(FLAGS.simple_network_template) as f: - net = f.read() % {'address': data['private_dns_name'], - 'network': FLAGS.simple_network_network, - 'netmask': FLAGS.simple_network_netmask, - 'gateway': FLAGS.simple_network_gateway, - 'broadcast': FLAGS.simple_network_broadcast, - 'dns': FLAGS.simple_network_dns} - if key or net: - logging.info('Injecting data into image %s', data['image_id']) - yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) - - if os.path.exists(basepath('disk')): - yield process.simple_execute( - 'rm -f %s' % basepath('disk')) - - bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] - * 1024 * 1024 * 1024) - yield disk.partition( - basepath('disk-raw'), basepath('disk'), bytes, execute=execute) - - @defer.inlineCallbacks - @exception.wrap_exception - def spawn(self): - self.set_state(Instance.NOSTATE, 'spawning') - logging.debug("Starting spawn in Instance") - - xml = self.toXml() - self.set_state(Instance.NOSTATE, 'launching') - logging.info('self %s', self) - try: - yield self._create_image(xml) - self._conn.createXML(xml, 0) - # TODO(termie): this should actually register - # a callback to check for successful boot - logging.debug("Instance is running") - - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_boot(): - try: - self.update_state() - if self.is_running(): - logging.debug('booted instance %s' % self.name) - timer.stop() - local_d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - logging.error('Failed to boot instance %s' % self.name) - timer.stop() - local_d.callback(None) - timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - except Exception, ex: - logging.debug(ex) - self.set_state(Instance.SHUTDOWN) - - @exception.wrap_exception - def console_output(self): - if not FLAGS.fake_libvirt: - fname = os.path.abspath( - os.path.join(self.datamodel['basepath'], 'console.log')) - with open(fname, 'r') as f: - console = f.read() - else: - console = 'FAKE CONSOLE OUTPUT' - return defer.succeed(console) - -STATE_NAMES = { - Instance.NOSTATE : 'pending', - Instance.RUNNING : 'running', - Instance.BLOCKED : 'blocked', - Instance.PAUSED : 'paused', - Instance.SHUTDOWN : 'shutdown', - Instance.SHUTOFF : 'shutdown', - Instance.CRASHED : 'crashed', -} diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py index 3b7b4804b..eaa608b1e 100644 --- a/nova/endpoint/cloud.py +++ b/nova/endpoint/cloud.py @@ -38,9 +38,9 @@ from nova.auth import rbac from nova.auth import users from nova.compute import model from nova.compute import network -from nova.compute import node +from nova.compute import computenode from nova.endpoint import images -from nova.volume import storage +from nova.volume import volumenode FLAGS = flags.FLAGS @@ -76,7 +76,7 @@ class CloudController(object): def volumes(self): """ returns a list of all volumes """ for volume_id in datastore.Redis.instance().smembers("volumes"): - volume = storage.get_volume(volume_id) + volume = volumenode.get_volume(volume_id) yield volume def __str__(self): @@ -103,7 +103,7 @@ class CloudController(object): result = {} for instance in self.instdir.all: if instance['project_id'] == project_id: - line = '%s slots=%d' % (instance['private_dns_name'], node.INSTANCE_TYPES[instance['instance_type']]['vcpus']) + line = '%s slots=%d' % (instance['private_dns_name'], computenode.INSTANCE_TYPES[instance['instance_type']]['vcpus']) if instance['key_name'] in result: result[instance['key_name']].append(line) else: @@ -296,8 +296,8 @@ class CloudController(object): @rbac.allow('projectmanager', 'sysadmin') def create_volume(self, context, size, **kwargs): - # TODO(vish): refactor this to create the volume object here and tell storage to create it - res = rpc.call(FLAGS.storage_topic, {"method": "create_volume", + # TODO(vish): refactor this to create the volume object here and tell volumenode to create it + res = rpc.call(FLAGS.volume_topic, {"method": "create_volume", "args" : {"size": size, "user_id": context.user.id, "project_id": context.project.id}}) @@ -331,7 +331,7 @@ class CloudController(object): raise exception.NotFound('Instance %s could not be found' % instance_id) def _get_volume(self, context, volume_id): - volume = storage.get_volume(volume_id) + volume = volumenode.get_volume(volume_id) if context.user.is_admin() or volume['project_id'] == context.project.id: return volume raise exception.NotFound('Volume %s could not be found' % volume_id) @@ -628,8 +628,8 @@ class CloudController(object): def delete_volume(self, context, volume_id, **kwargs): # TODO: return error if not authorized volume = self._get_volume(context, volume_id) - storage_node = volume['node_name'] - rpc.cast('%s.%s' % (FLAGS.storage_topic, storage_node), + volume_node = volume['node_name'] + rpc.cast('%s.%s' % (FLAGS.volume_topic, volume_node), {"method": "delete_volume", "args" : {"volume_id": volume_id}}) return defer.succeed(True) diff --git a/nova/endpoint/rackspace.py b/nova/endpoint/rackspace.py index 9208ddab7..08e435c5d 100644 --- a/nova/endpoint/rackspace.py +++ b/nova/endpoint/rackspace.py @@ -39,7 +39,6 @@ from nova.compute import model from nova.compute import network from nova.endpoint import images from nova.endpoint import wsgi -from nova.volume import storage FLAGS = flags.FLAGS diff --git a/nova/flags.py b/nova/flags.py index 06ea1e007..ffb395f13 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -40,7 +40,9 @@ DEFINE_integer('s3_port', 3333, 's3 port') DEFINE_string('s3_host', '127.0.0.1', 's3 host') #DEFINE_string('cloud_topic', 'cloud', 'the topic clouds listen on') DEFINE_string('compute_topic', 'compute', 'the topic compute nodes listen on') -DEFINE_string('storage_topic', 'storage', 'the topic storage nodes listen on') +DEFINE_string('volume_topic', 'volume', 'the topic volume nodes listen on') +DEFINE_string('network_topic', 'network', 'the topic network nodes listen on') + DEFINE_bool('fake_libvirt', False, 'whether to use a fake libvirt or not') DEFINE_bool('verbose', False, 'show debug output') diff --git a/nova/network/__init__.py b/nova/network/__init__.py new file mode 100644 index 000000000..dcc54db09 --- /dev/null +++ b/nova/network/__init__.py @@ -0,0 +1,32 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +:mod:`nova.network` -- Network Nodes +===================================================== + +.. automodule:: nova.network + :platform: Unix + :synopsis: Network is responsible for managing networking +.. moduleauthor:: Jesse Andrews +.. moduleauthor:: Devin Carlen +.. moduleauthor:: Vishvananda Ishaya +.. moduleauthor:: Joshua McKenty +.. moduleauthor:: Manish Singh +.. moduleauthor:: Andy Smith +""" diff --git a/nova/network/networknode.py b/nova/network/networknode.py new file mode 100644 index 000000000..e5a346551 --- /dev/null +++ b/nova/network/networknode.py @@ -0,0 +1,35 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Network Nodes are responsible for allocating ips and setting up network +""" + +import logging + +from nova import flags +from nova import node + + +FLAGS = flags.FLAGS + +class NetworkNode(node.Node): + """Allocates ips and sets up networks""" + + def __init__(self): + logging.debug("Network node working") diff --git a/nova/node.py b/nova/node.py new file mode 100644 index 000000000..852344da9 --- /dev/null +++ b/nova/node.py @@ -0,0 +1,103 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Generic Node baseclass for all workers that run on hosts +""" + +import inspect +import logging +import os + +from twisted.internet import defer +from twisted.internet import task +from twisted.application import service + +from nova import datastore +from nova import flags +from nova import rpc +from nova.compute import model + + +FLAGS = flags.FLAGS + +flags.DEFINE_integer('report_interval', 10, + 'seconds between nodes reporting state to cloud', + lower_bound=1) + +class Node(object, service.Service): + """Base class for workers that run on hosts""" + + @classmethod + def create(cls, + report_interval=None, # defaults to flag + bin_name=None, # defaults to basename of executable + topic=None): # defaults to basename - "nova-" part + """Instantiates class and passes back application object""" + if not report_interval: + # NOTE(vish): set here because if it is set to flag in the + # parameter list, it wrongly uses the default + report_interval = FLAGS.report_interval + # NOTE(vish): magic to automatically determine bin_name and topic + if not bin_name: + bin_name = os.path.basename(inspect.stack()[-1][1]) + if not topic: + topic = bin_name.rpartition("nova-")[2] + logging.warn("Starting %s node" % topic) + node_instance = cls() + + conn = rpc.Connection.instance() + consumer_all = rpc.AdapterConsumer( + connection=conn, + topic='%s' % topic, + proxy=node_instance) + + consumer_node = rpc.AdapterConsumer( + connection=conn, + topic='%s.%s' % (topic, FLAGS.node_name), + proxy=node_instance) + + pulse = task.LoopingCall(node_instance.report_state, + FLAGS.node_name, + bin_name) + pulse.start(interval=report_interval, now=False) + + consumer_all.attach_to_twisted() + consumer_node.attach_to_twisted() + + # This is the parent service that twistd will be looking for when it + # parses this file, return it so that we can get it into globals below + application = service.Application(bin_name) + node_instance.setServiceParent(application) + return application + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except datastore.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index b8614fdc8..7ab2c257a 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -28,7 +28,7 @@ from nova import flags from nova import rpc from nova import test from nova.auth import users -from nova.compute import node +from nova.compute import computenode from nova.endpoint import api from nova.endpoint import cloud @@ -54,7 +54,7 @@ class CloudTestCase(test.BaseTestCase): self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) # set up a node - self.node = node.Node() + self.node = computenode.ComputeNode() self.node_consumer = rpc.AdapterConsumer(connection=self.conn, topic=FLAGS.compute_topic, proxy=self.node) diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py new file mode 100644 index 000000000..4c0f1afb3 --- /dev/null +++ b/nova/tests/compute_unittest.py @@ -0,0 +1,128 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import time +from twisted.internet import defer +from xml.etree import ElementTree + +from nova import exception +from nova import flags +from nova import test +from nova import utils +from nova.compute import model +from nova.compute import computenode + + +FLAGS = flags.FLAGS + + +class InstanceXmlTestCase(test.TrialTestCase): + # @defer.inlineCallbacks + def test_serialization(self): + # TODO: Reimplement this, it doesn't make sense in redis-land + return + + # instance_id = 'foo' + # first_node = node.Node() + # inst = yield first_node.run_instance(instance_id) + # + # # force the state so that we can verify that it changes + # inst._s['state'] = node.Instance.NOSTATE + # xml = inst.toXml() + # self.assert_(ElementTree.parse(StringIO.StringIO(xml))) + # + # second_node = node.Node() + # new_inst = node.Instance.fromXml(second_node._conn, pool=second_node._pool, xml=xml) + # self.assertEqual(new_inst.state, node.Instance.RUNNING) + # rv = yield first_node.terminate_instance(instance_id) + + +class ComputeConnectionTestCase(test.TrialTestCase): + def setUp(self): + logging.getLogger().setLevel(logging.DEBUG) + super(ComputeConnectionTestCase, self).setUp() + self.flags(fake_libvirt=True, + fake_storage=True, + fake_users=True) + self.node = computenode.ComputeNode() + + def create_instance(self): + instdir = model.InstanceDirectory() + inst = instdir.new() + # TODO(ja): add ami, ari, aki, user_data + inst['reservation_id'] = 'r-fakeres' + inst['launch_time'] = '10' + inst['user_id'] = 'fake' + inst['project_id'] = 'fake' + inst['instance_type'] = 'm1.tiny' + inst['node_name'] = FLAGS.node_name + inst['mac_address'] = utils.generate_mac() + inst['ami_launch_index'] = 0 + inst.save() + return inst['instance_id'] + + @defer.inlineCallbacks + def test_run_describe_terminate(self): + instance_id = self.create_instance() + + rv = yield self.node.run_instance(instance_id) + + rv = yield self.node.describe_instances() + logging.info("Running instances: %s", rv) + self.assertEqual(rv[instance_id].name, instance_id) + + rv = yield self.node.terminate_instance(instance_id) + + rv = yield self.node.describe_instances() + logging.info("After terminating instances: %s", rv) + self.assertEqual(rv, {}) + + @defer.inlineCallbacks + def test_reboot(self): + instance_id = self.create_instance() + rv = yield self.node.run_instance(instance_id) + + rv = yield self.node.describe_instances() + self.assertEqual(rv[instance_id].name, instance_id) + + yield self.node.reboot_instance(instance_id) + + rv = yield self.node.describe_instances() + self.assertEqual(rv[instance_id].name, instance_id) + rv = yield self.node.terminate_instance(instance_id) + + @defer.inlineCallbacks + def test_console_output(self): + instance_id = self.create_instance() + rv = yield self.node.run_instance(instance_id) + + console = yield self.node.get_console_output(instance_id) + self.assert_(console) + rv = yield self.node.terminate_instance(instance_id) + + @defer.inlineCallbacks + def test_run_instance_existing(self): + instance_id = self.create_instance() + rv = yield self.node.run_instance(instance_id) + + rv = yield self.node.describe_instances() + self.assertEqual(rv[instance_id].name, instance_id) + + self.assertRaises(exception.Error, self.node.run_instance, instance_id) + rv = yield self.node.terminate_instance(instance_id) diff --git a/nova/tests/future_unittest.py b/nova/tests/future_unittest.py deleted file mode 100644 index da5470ffe..000000000 --- a/nova/tests/future_unittest.py +++ /dev/null @@ -1,75 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import mox -import StringIO -import time -from tornado import ioloop -from twisted.internet import defer -import unittest -from xml.etree import ElementTree - -from nova import cloud -from nova import exception -from nova import flags -from nova import node -from nova import rpc -from nova import test - - -FLAGS = flags.FLAGS - - -class AdminTestCase(test.BaseTestCase): - def setUp(self): - super(AdminTestCase, self).setUp() - self.flags(fake_libvirt=True, - fake_rabbit=True) - - self.conn = rpc.Connection.instance() - - logging.getLogger().setLevel(logging.INFO) - - # set up our cloud - self.cloud = cloud.CloudController() - self.cloud_consumer = rpc.AdapterConsumer(connection=self.conn, - topic=FLAGS.cloud_topic, - proxy=self.cloud) - self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) - - # set up a node - self.node = node.Node() - self.node_consumer = rpc.AdapterConsumer(connection=self.conn, - topic=FLAGS.compute_topic, - proxy=self.node) - self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop)) - - def test_flush_terminated(self): - # Launch an instance - - # Wait until it's running - - # Terminate it - - # Wait until it's terminated - - # Flush terminated nodes - - # ASSERT that it's gone - pass diff --git a/nova/tests/model_unittest.py b/nova/tests/model_unittest.py index 1bd7e527f..f84b6d11c 100644 --- a/nova/tests/model_unittest.py +++ b/nova/tests/model_unittest.py @@ -25,7 +25,6 @@ from nova import flags from nova import test from nova import utils from nova.compute import model -from nova.compute import node FLAGS = flags.FLAGS diff --git a/nova/tests/node_unittest.py b/nova/tests/node_unittest.py deleted file mode 100644 index 93942d79e..000000000 --- a/nova/tests/node_unittest.py +++ /dev/null @@ -1,128 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import time -from twisted.internet import defer -from xml.etree import ElementTree - -from nova import exception -from nova import flags -from nova import test -from nova import utils -from nova.compute import model -from nova.compute import node - - -FLAGS = flags.FLAGS - - -class InstanceXmlTestCase(test.TrialTestCase): - # @defer.inlineCallbacks - def test_serialization(self): - # TODO: Reimplement this, it doesn't make sense in redis-land - return - - # instance_id = 'foo' - # first_node = node.Node() - # inst = yield first_node.run_instance(instance_id) - # - # # force the state so that we can verify that it changes - # inst._s['state'] = node.Instance.NOSTATE - # xml = inst.toXml() - # self.assert_(ElementTree.parse(StringIO.StringIO(xml))) - # - # second_node = node.Node() - # new_inst = node.Instance.fromXml(second_node._conn, pool=second_node._pool, xml=xml) - # self.assertEqual(new_inst.state, node.Instance.RUNNING) - # rv = yield first_node.terminate_instance(instance_id) - - -class NodeConnectionTestCase(test.TrialTestCase): - def setUp(self): - logging.getLogger().setLevel(logging.DEBUG) - super(NodeConnectionTestCase, self).setUp() - self.flags(fake_libvirt=True, - fake_storage=True, - fake_users=True) - self.node = node.Node() - - def create_instance(self): - instdir = model.InstanceDirectory() - inst = instdir.new() - # TODO(ja): add ami, ari, aki, user_data - inst['reservation_id'] = 'r-fakeres' - inst['launch_time'] = '10' - inst['user_id'] = 'fake' - inst['project_id'] = 'fake' - inst['instance_type'] = 'm1.tiny' - inst['node_name'] = FLAGS.node_name - inst['mac_address'] = utils.generate_mac() - inst['ami_launch_index'] = 0 - inst.save() - return inst['instance_id'] - - @defer.inlineCallbacks - def test_run_describe_terminate(self): - instance_id = self.create_instance() - - rv = yield self.node.run_instance(instance_id) - - rv = yield self.node.describe_instances() - logging.info("Running instances: %s", rv) - self.assertEqual(rv[instance_id].name, instance_id) - - rv = yield self.node.terminate_instance(instance_id) - - rv = yield self.node.describe_instances() - logging.info("After terminating instances: %s", rv) - self.assertEqual(rv, {}) - - @defer.inlineCallbacks - def test_reboot(self): - instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) - - rv = yield self.node.describe_instances() - self.assertEqual(rv[instance_id].name, instance_id) - - yield self.node.reboot_instance(instance_id) - - rv = yield self.node.describe_instances() - self.assertEqual(rv[instance_id].name, instance_id) - rv = yield self.node.terminate_instance(instance_id) - - @defer.inlineCallbacks - def test_console_output(self): - instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) - - console = yield self.node.get_console_output(instance_id) - self.assert_(console) - rv = yield self.node.terminate_instance(instance_id) - - @defer.inlineCallbacks - def test_run_instance_existing(self): - instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) - - rv = yield self.node.describe_instances() - self.assertEqual(rv[instance_id].name, instance_id) - - self.assertRaises(exception.Error, self.node.run_instance, instance_id) - rv = yield self.node.terminate_instance(instance_id) diff --git a/nova/tests/storage_unittest.py b/nova/tests/storage_unittest.py deleted file mode 100644 index 60576d74f..000000000 --- a/nova/tests/storage_unittest.py +++ /dev/null @@ -1,115 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -from nova import exception -from nova import flags -from nova import test -from nova.compute import node -from nova.volume import storage - - -FLAGS = flags.FLAGS - - -class StorageTestCase(test.TrialTestCase): - def setUp(self): - logging.getLogger().setLevel(logging.DEBUG) - super(StorageTestCase, self).setUp() - self.mynode = node.Node() - self.mystorage = None - self.flags(fake_libvirt=True, - fake_storage=True) - self.mystorage = storage.BlockStore() - - def test_run_create_volume(self): - vol_size = '0' - user_id = 'fake' - project_id = 'fake' - volume_id = self.mystorage.create_volume(vol_size, user_id, project_id) - # TODO(termie): get_volume returns differently than create_volume - self.assertEqual(volume_id, - storage.get_volume(volume_id)['volume_id']) - - rv = self.mystorage.delete_volume(volume_id) - self.assertRaises(exception.Error, - storage.get_volume, - volume_id) - - def test_too_big_volume(self): - vol_size = '1001' - user_id = 'fake' - project_id = 'fake' - self.assertRaises(TypeError, - self.mystorage.create_volume, - vol_size, user_id, project_id) - - def test_too_many_volumes(self): - vol_size = '1' - user_id = 'fake' - project_id = 'fake' - num_shelves = FLAGS.last_shelf_id - FLAGS.first_shelf_id + 1 - total_slots = FLAGS.slots_per_shelf * num_shelves - vols = [] - for i in xrange(total_slots): - vid = self.mystorage.create_volume(vol_size, user_id, project_id) - vols.append(vid) - self.assertRaises(storage.NoMoreVolumes, - self.mystorage.create_volume, - vol_size, user_id, project_id) - for id in vols: - self.mystorage.delete_volume(id) - - def test_run_attach_detach_volume(self): - # Create one volume and one node to test with - instance_id = "storage-test" - vol_size = "5" - user_id = "fake" - project_id = 'fake' - mountpoint = "/dev/sdf" - volume_id = self.mystorage.create_volume(vol_size, user_id, project_id) - - volume_obj = storage.get_volume(volume_id) - volume_obj.start_attach(instance_id, mountpoint) - rv = yield self.mynode.attach_volume(volume_id, - instance_id, - mountpoint) - self.assertEqual(volume_obj['status'], "in-use") - self.assertEqual(volume_obj['attachStatus'], "attached") - self.assertEqual(volume_obj['instance_id'], instance_id) - self.assertEqual(volume_obj['mountpoint'], mountpoint) - - self.assertRaises(exception.Error, - self.mystorage.delete_volume, - volume_id) - - rv = yield self.mystorage.detach_volume(volume_id) - volume_obj = storage.get_volume(volume_id) - self.assertEqual(volume_obj['status'], "available") - - rv = self.mystorage.delete_volume(volume_id) - self.assertRaises(exception.Error, - storage.get_volume, - volume_id) - - def test_multi_node(self): - # TODO(termie): Figure out how to test with two nodes, - # each of them having a different FLAG for storage_node - # This will allow us to test cross-node interactions - pass diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py new file mode 100644 index 000000000..c176453d8 --- /dev/null +++ b/nova/tests/volume_unittest.py @@ -0,0 +1,115 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from nova import exception +from nova import flags +from nova import test +from nova.compute import computenode +from nova.volume import volumenode + + +FLAGS = flags.FLAGS + + +class VolumeTestCase(test.TrialTestCase): + def setUp(self): + logging.getLogger().setLevel(logging.DEBUG) + super(VolumeTestCase, self).setUp() + self.mynode = computenode.ComputeNode() + self.mystorage = None + self.flags(fake_libvirt=True, + fake_storage=True) + self.mystorage = volumenode.VolumeNode() + + def test_run_create_volume(self): + vol_size = '0' + user_id = 'fake' + project_id = 'fake' + volume_id = self.mystorage.create_volume(vol_size, user_id, project_id) + # TODO(termie): get_volume returns differently than create_volume + self.assertEqual(volume_id, + volumenode.get_volume(volume_id)['volume_id']) + + rv = self.mystorage.delete_volume(volume_id) + self.assertRaises(exception.Error, + volumenode.get_volume, + volume_id) + + def test_too_big_volume(self): + vol_size = '1001' + user_id = 'fake' + project_id = 'fake' + self.assertRaises(TypeError, + self.mystorage.create_volume, + vol_size, user_id, project_id) + + def test_too_many_volumes(self): + vol_size = '1' + user_id = 'fake' + project_id = 'fake' + num_shelves = FLAGS.last_shelf_id - FLAGS.first_shelf_id + 1 + total_slots = FLAGS.slots_per_shelf * num_shelves + vols = [] + for i in xrange(total_slots): + vid = self.mystorage.create_volume(vol_size, user_id, project_id) + vols.append(vid) + self.assertRaises(volumenode.NoMoreVolumes, + self.mystorage.create_volume, + vol_size, user_id, project_id) + for id in vols: + self.mystorage.delete_volume(id) + + def test_run_attach_detach_volume(self): + # Create one volume and one node to test with + instance_id = "storage-test" + vol_size = "5" + user_id = "fake" + project_id = 'fake' + mountpoint = "/dev/sdf" + volume_id = self.mystorage.create_volume(vol_size, user_id, project_id) + + volume_obj = volumenode.get_volume(volume_id) + volume_obj.start_attach(instance_id, mountpoint) + rv = yield self.mynode.attach_volume(volume_id, + instance_id, + mountpoint) + self.assertEqual(volume_obj['status'], "in-use") + self.assertEqual(volume_obj['attachStatus'], "attached") + self.assertEqual(volume_obj['instance_id'], instance_id) + self.assertEqual(volume_obj['mountpoint'], mountpoint) + + self.assertRaises(exception.Error, + self.mystorage.delete_volume, + volume_id) + + rv = yield self.mystorage.detach_volume(volume_id) + volume_obj = volumenode.get_volume(volume_id) + self.assertEqual(volume_obj['status'], "available") + + rv = self.mystorage.delete_volume(volume_id) + self.assertRaises(exception.Error, + volumenode.get_volume, + volume_id) + + def test_multi_node(self): + # TODO(termie): Figure out how to test with two nodes, + # each of them having a different FLAG for storage_node + # This will allow us to test cross-node interactions + pass diff --git a/nova/twistd.py b/nova/twistd.py index 32a46ce03..fc7dad26a 100644 --- a/nova/twistd.py +++ b/nova/twistd.py @@ -32,7 +32,6 @@ from twisted.python import log from twisted.python import reflect from twisted.python import runtime from twisted.python import usage -import UserDict from nova import flags @@ -161,6 +160,13 @@ def WrapTwistedOptions(wrapped): except (AttributeError, KeyError): self._data[key] = value + def get(self, key, default): + key = key.replace('-', '_') + try: + return getattr(FLAGS, key) + except (AttributeError, KeyError): + self._data.get(key, default) + return TwistedOptionsToFlags @@ -210,8 +216,12 @@ def serve(filename): elif FLAGS.pidfile.endswith('twistd.pid'): FLAGS.pidfile = FLAGS.pidfile.replace('twistd.pid', '%s.pid' % name) + print FLAGS.logfile if not FLAGS.logfile: FLAGS.logfile = '%s.log' % name + elif FLAGS.logfile.endswith('twistd.log'): + FLAGS.logfile = FLAGS.logfile.replace('twistd.log', '%s.log' % name) + print FLAGS.logfile action = 'start' if len(argv) > 1: diff --git a/nova/volume/storage.py b/nova/volume/storage.py deleted file mode 100644 index 121bc01e6..000000000 --- a/nova/volume/storage.py +++ /dev/null @@ -1,321 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Nova Storage manages creating, attaching, detaching, and -destroying persistent storage volumes, ala EBS. -Currently uses Ata-over-Ethernet. -""" - -import glob -import logging -import os -import shutil -import socket -import tempfile - -from twisted.application import service -from twisted.internet import defer - -from nova import datastore -from nova import exception -from nova import flags -from nova import process -from nova import utils -from nova import validate -from nova.compute import model - - -FLAGS = flags.FLAGS -flags.DEFINE_string('storage_dev', '/dev/sdb', - 'Physical device to use for volumes') -flags.DEFINE_string('volume_group', 'nova-volumes', - 'Name for the VG that will contain exported volumes') -flags.DEFINE_string('aoe_eth_dev', 'eth0', - 'Which device to export the volumes on') -flags.DEFINE_string('storage_name', - socket.gethostname(), - 'name of this node') -flags.DEFINE_integer('first_shelf_id', - utils.last_octet(utils.get_my_ip()) * 10, - 'AoE starting shelf_id for this node') -flags.DEFINE_integer('last_shelf_id', - utils.last_octet(utils.get_my_ip()) * 10 + 9, - 'AoE starting shelf_id for this node') -flags.DEFINE_string('aoe_export_dir', - '/var/lib/vblade-persist/vblades', - 'AoE directory where exports are created') -flags.DEFINE_integer('slots_per_shelf', - 16, - 'Number of AoE slots per shelf') -flags.DEFINE_string('storage_availability_zone', - 'nova', - 'availability zone of this node') -flags.DEFINE_boolean('fake_storage', False, - 'Should we make real storage volumes to attach?') - - -class NoMoreVolumes(exception.Error): - pass - -def get_volume(volume_id): - """ Returns a redis-backed volume object """ - volume_class = Volume - if FLAGS.fake_storage: - volume_class = FakeVolume - if datastore.Redis.instance().sismember('volumes', volume_id): - return volume_class(volume_id=volume_id) - raise exception.Error("Volume does not exist") - -class BlockStore(object, service.Service): - """ - There is one BlockStore running on each volume node. - However, each BlockStore can report on the state of - *all* volumes in the cluster. - """ - def __init__(self): - super(BlockStore, self).__init__() - self.volume_class = Volume - if FLAGS.fake_storage: - FLAGS.aoe_export_dir = tempfile.mkdtemp() - self.volume_class = FakeVolume - self._init_volume_group() - - def __del__(self): - # TODO(josh): Get rid of this destructor, volumes destroy themselves - if FLAGS.fake_storage: - try: - shutil.rmtree(FLAGS.aoe_export_dir) - except Exception, err: - pass - - @defer.inlineCallbacks - def report_state(self, nodename, daemon): - # TODO(termie): make this pattern be more elegant. -todd - try: - record = model.Daemon(nodename, daemon) - record.heartbeat() - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - except model.ConnectionError, ex: - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - yield - - @validate.rangetest(size=(0, 1000)) - def create_volume(self, size, user_id, project_id): - """ - Creates an exported volume (fake or real), - restarts exports to make it available. - Volume at this point has size, owner, and zone. - """ - logging.debug("Creating volume of size: %s" % (size)) - vol = self.volume_class.create(size, user_id, project_id) - datastore.Redis.instance().sadd('volumes', vol['volume_id']) - datastore.Redis.instance().sadd('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) - self._restart_exports() - return vol['volume_id'] - - def by_node(self, node_id): - """ returns a list of volumes for a node """ - for volume_id in datastore.Redis.instance().smembers('volumes:%s' % (node_id)): - yield self.volume_class(volume_id=volume_id) - - @property - def all(self): - """ returns a list of all volumes """ - for volume_id in datastore.Redis.instance().smembers('volumes'): - yield self.volume_class(volume_id=volume_id) - - def delete_volume(self, volume_id): - logging.debug("Deleting volume with id of: %s" % (volume_id)) - vol = get_volume(volume_id) - if vol['status'] == "attached": - raise exception.Error("Volume is still attached") - if vol['node_name'] != FLAGS.storage_name: - raise exception.Error("Volume is not local to this node") - vol.destroy() - datastore.Redis.instance().srem('volumes', vol['volume_id']) - datastore.Redis.instance().srem('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) - return True - - @defer.inlineCallbacks - def _restart_exports(self): - if FLAGS.fake_storage: - return - yield process.simple_execute( - "sudo vblade-persist auto all") - yield process.simple_execute( - "sudo vblade-persist start all") - - @defer.inlineCallbacks - def _init_volume_group(self): - if FLAGS.fake_storage: - return - yield process.simple_execute( - "sudo pvcreate %s" % (FLAGS.storage_dev)) - yield process.simple_execute( - "sudo vgcreate %s %s" % (FLAGS.volume_group, - FLAGS.storage_dev)) - -class Volume(datastore.BasicModel): - - def __init__(self, volume_id=None): - self.volume_id = volume_id - super(Volume, self).__init__() - - @property - def identifier(self): - return self.volume_id - - def default_state(self): - return {"volume_id": self.volume_id} - - @classmethod - def create(cls, size, user_id, project_id): - volume_id = utils.generate_uid('vol') - vol = cls(volume_id) - vol['node_name'] = FLAGS.storage_name - vol['size'] = size - vol['user_id'] = user_id - vol['project_id'] = project_id - vol['availability_zone'] = FLAGS.storage_availability_zone - vol["instance_id"] = 'none' - vol["mountpoint"] = 'none' - vol['attach_time'] = 'none' - vol['status'] = "creating" # creating | available | in-use - vol['attach_status'] = "detached" # attaching | attached | detaching | detached - vol['delete_on_termination'] = 'False' - vol.save() - vol.create_lv() - vol._setup_export() - # TODO(joshua) - We need to trigger a fanout message for aoe-discover on all the nodes - # TODO(joshua - vol['status'] = "available" - vol.save() - return vol - - def start_attach(self, instance_id, mountpoint): - """ """ - self['instance_id'] = instance_id - self['mountpoint'] = mountpoint - self['status'] = "in-use" - self['attach_status'] = "attaching" - self['attach_time'] = utils.isotime() - self['delete_on_termination'] = 'False' - self.save() - - def finish_attach(self): - """ """ - self['attach_status'] = "attached" - self.save() - - def start_detach(self): - """ """ - self['attach_status'] = "detaching" - self.save() - - def finish_detach(self): - self['instance_id'] = None - self['mountpoint'] = None - self['status'] = "available" - self['attach_status'] = "detached" - self.save() - - def destroy(self): - try: - self._remove_export() - except: - pass - self._delete_lv() - super(Volume, self).destroy() - - @defer.inlineCallbacks - def create_lv(self): - if str(self['size']) == '0': - sizestr = '100M' - else: - sizestr = '%sG' % self['size'] - yield process.simple_execute( - "sudo lvcreate -L %s -n %s %s" % (sizestr, - self['volume_id'], - FLAGS.volume_group)) - - @defer.inlineCallbacks - def _delete_lv(self): - yield process.simple_execute( - "sudo lvremove -f %s/%s" % (FLAGS.volume_group, - self['volume_id'])) - - def _setup_export(self): - (shelf_id, blade_id) = get_next_aoe_numbers() - self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id) - self['shelf_id'] = shelf_id - self['blade_id'] = blade_id - self.save() - self._exec_export() - - @defer.inlineCallbacks - def _exec_export(self): - yield process.simple_execute( - "sudo vblade-persist setup %s %s %s /dev/%s/%s" % - (self['shelf_id'], - self['blade_id'], - FLAGS.aoe_eth_dev, - FLAGS.volume_group, - self['volume_id'])) - - @defer.inlineCallbacks - def _remove_export(self): - yield process.simple_execute( - "sudo vblade-persist stop %s %s" % (self['shelf_id'], - self['blade_id'])) - yield process.simple_execute( - "sudo vblade-persist destroy %s %s" % (self['shelf_id'], - self['blade_id'])) - - -class FakeVolume(Volume): - def create_lv(self): - pass - - def _exec_export(self): - fname = os.path.join(FLAGS.aoe_export_dir, self['aoe_device']) - f = file(fname, "w") - f.close() - - def _remove_export(self): - pass - - def _delete_lv(self): - pass - -def get_next_aoe_numbers(): - for shelf_id in xrange(FLAGS.first_shelf_id, FLAGS.last_shelf_id + 1): - aoes = glob.glob("%s/e%s.*" % (FLAGS.aoe_export_dir, shelf_id)) - if not aoes: - blade_id = 0 - else: - blade_id = int(max([int(a.rpartition('.')[2]) for a in aoes])) + 1 - if blade_id < FLAGS.slots_per_shelf: - logging.debug("Next shelf.blade is %s.%s", shelf_id, blade_id) - return (shelf_id, blade_id) - raise NoMoreVolumes() diff --git a/nova/volume/volumenode.py b/nova/volume/volumenode.py new file mode 100644 index 000000000..6b4ad0d87 --- /dev/null +++ b/nova/volume/volumenode.py @@ -0,0 +1,305 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Nova Storage manages creating, attaching, detaching, and +destroying persistent storage volumes, ala EBS. +Currently uses Ata-over-Ethernet. +""" + +import glob +import logging +import os +import shutil +import socket +import tempfile + +from twisted.application import service +from twisted.internet import defer + +from nova import datastore +from nova import exception +from nova import flags +from nova import node +from nova import process +from nova import utils +from nova import validate + + +FLAGS = flags.FLAGS +flags.DEFINE_string('storage_dev', '/dev/sdb', + 'Physical device to use for volumes') +flags.DEFINE_string('volume_group', 'nova-volumes', + 'Name for the VG that will contain exported volumes') +flags.DEFINE_string('aoe_eth_dev', 'eth0', + 'Which device to export the volumes on') +flags.DEFINE_string('storage_name', + socket.gethostname(), + 'name of this node') +flags.DEFINE_integer('first_shelf_id', + utils.last_octet(utils.get_my_ip()) * 10, + 'AoE starting shelf_id for this node') +flags.DEFINE_integer('last_shelf_id', + utils.last_octet(utils.get_my_ip()) * 10 + 9, + 'AoE starting shelf_id for this node') +flags.DEFINE_string('aoe_export_dir', + '/var/lib/vblade-persist/vblades', + 'AoE directory where exports are created') +flags.DEFINE_integer('slots_per_shelf', + 16, + 'Number of AoE slots per shelf') +flags.DEFINE_string('storage_availability_zone', + 'nova', + 'availability zone of this node') +flags.DEFINE_boolean('fake_storage', False, + 'Should we make real storage volumes to attach?') + + +class NoMoreVolumes(exception.Error): + pass + +def get_volume(volume_id): + """ Returns a redis-backed volume object """ + volume_class = Volume + if FLAGS.fake_storage: + volume_class = FakeVolume + if datastore.Redis.instance().sismember('volumes', volume_id): + return volume_class(volume_id=volume_id) + raise exception.Error("Volume does not exist") + +class VolumeNode(node.Node): + """ + There is one VolumeNode running on each host. + However, each VolumeNode can report on the state of + *all* volumes in the cluster. + """ + def __init__(self): + super(VolumeNode, self).__init__() + self.volume_class = Volume + if FLAGS.fake_storage: + FLAGS.aoe_export_dir = tempfile.mkdtemp() + self.volume_class = FakeVolume + self._init_volume_group() + + def __del__(self): + # TODO(josh): Get rid of this destructor, volumes destroy themselves + if FLAGS.fake_storage: + try: + shutil.rmtree(FLAGS.aoe_export_dir) + except Exception, err: + pass + + @validate.rangetest(size=(0, 1000)) + def create_volume(self, size, user_id, project_id): + """ + Creates an exported volume (fake or real), + restarts exports to make it available. + Volume at this point has size, owner, and zone. + """ + logging.debug("Creating volume of size: %s" % (size)) + vol = self.volume_class.create(size, user_id, project_id) + datastore.Redis.instance().sadd('volumes', vol['volume_id']) + datastore.Redis.instance().sadd('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) + self._restart_exports() + return vol['volume_id'] + + def by_node(self, node_id): + """ returns a list of volumes for a node """ + for volume_id in datastore.Redis.instance().smembers('volumes:%s' % (node_id)): + yield self.volume_class(volume_id=volume_id) + + @property + def all(self): + """ returns a list of all volumes """ + for volume_id in datastore.Redis.instance().smembers('volumes'): + yield self.volume_class(volume_id=volume_id) + + def delete_volume(self, volume_id): + logging.debug("Deleting volume with id of: %s" % (volume_id)) + vol = get_volume(volume_id) + if vol['status'] == "attached": + raise exception.Error("Volume is still attached") + if vol['node_name'] != FLAGS.storage_name: + raise exception.Error("Volume is not local to this node") + vol.destroy() + datastore.Redis.instance().srem('volumes', vol['volume_id']) + datastore.Redis.instance().srem('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) + return True + + @defer.inlineCallbacks + def _restart_exports(self): + if FLAGS.fake_storage: + return + yield process.simple_execute( + "sudo vblade-persist auto all") + yield process.simple_execute( + "sudo vblade-persist start all") + + @defer.inlineCallbacks + def _init_volume_group(self): + if FLAGS.fake_storage: + return + yield process.simple_execute( + "sudo pvcreate %s" % (FLAGS.storage_dev)) + yield process.simple_execute( + "sudo vgcreate %s %s" % (FLAGS.volume_group, + FLAGS.storage_dev)) + +class Volume(datastore.BasicModel): + + def __init__(self, volume_id=None): + self.volume_id = volume_id + super(Volume, self).__init__() + + @property + def identifier(self): + return self.volume_id + + def default_state(self): + return {"volume_id": self.volume_id} + + @classmethod + def create(cls, size, user_id, project_id): + volume_id = utils.generate_uid('vol') + vol = cls(volume_id) + vol['node_name'] = FLAGS.storage_name + vol['size'] = size + vol['user_id'] = user_id + vol['project_id'] = project_id + vol['availability_zone'] = FLAGS.storage_availability_zone + vol["instance_id"] = 'none' + vol["mountpoint"] = 'none' + vol['attach_time'] = 'none' + vol['status'] = "creating" # creating | available | in-use + vol['attach_status'] = "detached" # attaching | attached | detaching | detached + vol['delete_on_termination'] = 'False' + vol.save() + vol.create_lv() + vol._setup_export() + # TODO(joshua) - We need to trigger a fanout message for aoe-discover on all the nodes + # TODO(joshua + vol['status'] = "available" + vol.save() + return vol + + def start_attach(self, instance_id, mountpoint): + """ """ + self['instance_id'] = instance_id + self['mountpoint'] = mountpoint + self['status'] = "in-use" + self['attach_status'] = "attaching" + self['attach_time'] = utils.isotime() + self['delete_on_termination'] = 'False' + self.save() + + def finish_attach(self): + """ """ + self['attach_status'] = "attached" + self.save() + + def start_detach(self): + """ """ + self['attach_status'] = "detaching" + self.save() + + def finish_detach(self): + self['instance_id'] = None + self['mountpoint'] = None + self['status'] = "available" + self['attach_status'] = "detached" + self.save() + + def destroy(self): + try: + self._remove_export() + except: + pass + self._delete_lv() + super(Volume, self).destroy() + + @defer.inlineCallbacks + def create_lv(self): + if str(self['size']) == '0': + sizestr = '100M' + else: + sizestr = '%sG' % self['size'] + yield process.simple_execute( + "sudo lvcreate -L %s -n %s %s" % (sizestr, + self['volume_id'], + FLAGS.volume_group)) + + @defer.inlineCallbacks + def _delete_lv(self): + yield process.simple_execute( + "sudo lvremove -f %s/%s" % (FLAGS.volume_group, + self['volume_id'])) + + def _setup_export(self): + (shelf_id, blade_id) = get_next_aoe_numbers() + self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id) + self['shelf_id'] = shelf_id + self['blade_id'] = blade_id + self.save() + self._exec_export() + + @defer.inlineCallbacks + def _exec_export(self): + yield process.simple_execute( + "sudo vblade-persist setup %s %s %s /dev/%s/%s" % + (self['shelf_id'], + self['blade_id'], + FLAGS.aoe_eth_dev, + FLAGS.volume_group, + self['volume_id'])) + + @defer.inlineCallbacks + def _remove_export(self): + yield process.simple_execute( + "sudo vblade-persist stop %s %s" % (self['shelf_id'], + self['blade_id'])) + yield process.simple_execute( + "sudo vblade-persist destroy %s %s" % (self['shelf_id'], + self['blade_id'])) + + +class FakeVolume(Volume): + def create_lv(self): + pass + + def _exec_export(self): + fname = os.path.join(FLAGS.aoe_export_dir, self['aoe_device']) + f = file(fname, "w") + f.close() + + def _remove_export(self): + pass + + def _delete_lv(self): + pass + +def get_next_aoe_numbers(): + for shelf_id in xrange(FLAGS.first_shelf_id, FLAGS.last_shelf_id + 1): + aoes = glob.glob("%s/e%s.*" % (FLAGS.aoe_export_dir, shelf_id)) + if not aoes: + blade_id = 0 + else: + blade_id = int(max([int(a.rpartition('.')[2]) for a in aoes])) + 1 + if blade_id < FLAGS.slots_per_shelf: + logging.debug("Next shelf.blade is %s.%s", shelf_id, blade_id) + return (shelf_id, blade_id) + raise NoMoreVolumes() diff --git a/run_tests.py b/run_tests.py index db8a582ea..ae2874f58 100644 --- a/run_tests.py +++ b/run_tests.py @@ -52,14 +52,14 @@ from nova import twistd from nova.tests.access_unittest import * from nova.tests.api_unittest import * from nova.tests.cloud_unittest import * +from nova.tests.compute_unittest import * from nova.tests.model_unittest import * from nova.tests.network_unittest import * -from nova.tests.node_unittest import * from nova.tests.objectstore_unittest import * from nova.tests.process_unittest import * -from nova.tests.storage_unittest import * from nova.tests.users_unittest import * from nova.tests.validator_unittest import * +from nova.tests.volume_unittest import * FLAGS = flags.FLAGS -- cgit From 80d03bbeb1f1acb82c495747dcf5cc9390df025a Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Wed, 21 Jul 2010 14:55:16 -0500 Subject: make nova-network executable --- bin/nova-network | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 bin/nova-network diff --git a/bin/nova-network b/bin/nova-network old mode 100644 new mode 100755 -- cgit From b03f05a0eb321350297e2f86a05ec4593bfc4049 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Thu, 22 Jul 2010 07:51:03 -0500 Subject: typo fixes and extra print statements removed --- bin/nova-compute | 4 ++-- bin/nova-network | 4 ++-- bin/nova-volume | 4 ++-- nova/twistd.py | 2 -- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/bin/nova-compute b/bin/nova-compute index 67c93fcb8..1d5fa709d 100755 --- a/bin/nova-compute +++ b/bin/nova-compute @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.compute import node +from nova.compute import computenode if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = node.ComputeNode.create() + application = computenode.ComputeNode.create() diff --git a/bin/nova-network b/bin/nova-network index c69690081..db9d4b970 100755 --- a/bin/nova-network +++ b/bin/nova-network @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.network import node +from nova.network import networknode if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = node.NetworkNode.create() + application = networknode.NetworkNode.create() diff --git a/bin/nova-volume b/bin/nova-volume index cdf2782bc..2e9b530a7 100755 --- a/bin/nova-volume +++ b/bin/nova-volume @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.volume import node +from nova.volume import volumenode if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = node.VolumeNode.create() + application = volumenode.VolumeNode.create() diff --git a/nova/twistd.py b/nova/twistd.py index fc7dad26a..909b23590 100644 --- a/nova/twistd.py +++ b/nova/twistd.py @@ -216,12 +216,10 @@ def serve(filename): elif FLAGS.pidfile.endswith('twistd.pid'): FLAGS.pidfile = FLAGS.pidfile.replace('twistd.pid', '%s.pid' % name) - print FLAGS.logfile if not FLAGS.logfile: FLAGS.logfile = '%s.log' % name elif FLAGS.logfile.endswith('twistd.log'): FLAGS.logfile = FLAGS.logfile.replace('twistd.log', '%s.log' % name) - print FLAGS.logfile action = 'start' if len(argv) > 1: -- cgit From 700f668b58911afe2b98ae8cab79910ae817657b Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Thu, 22 Jul 2010 09:03:28 -0500 Subject: syslog changes --- nova/twistd.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/nova/twistd.py b/nova/twistd.py index 909b23590..b389a73b5 100644 --- a/nova/twistd.py +++ b/nova/twistd.py @@ -215,11 +215,14 @@ def serve(filename): FLAGS.pidfile = '%s.pid' % name elif FLAGS.pidfile.endswith('twistd.pid'): FLAGS.pidfile = FLAGS.pidfile.replace('twistd.pid', '%s.pid' % name) - if not FLAGS.logfile: FLAGS.logfile = '%s.log' % name elif FLAGS.logfile.endswith('twistd.log'): FLAGS.logfile = FLAGS.logfile.replace('twistd.log', '%s.log' % name) + if not FLAGS.prefix: + FLAGS.prefix = name + elif FLAGS.prefix.endswith('twisted'): + FLAGS.prefix = FLAGS.prefix.replace('twisted', name) action = 'start' if len(argv) > 1: @@ -237,7 +240,7 @@ def serve(filename): sys.exit(1) formatter = logging.Formatter( - name + '(%(name)s): %(levelname)s %(message)s') + '(%(name)s): %(levelname)s %(message)r') handler = logging.StreamHandler(log.StdioOnnaStick()) handler.setFormatter(formatter) logging.getLogger().addHandler(handler) @@ -247,11 +250,6 @@ def serve(filename): else: logging.getLogger().setLevel(logging.WARNING) - if FLAGS.syslog: - syslog = logging.handlers.SysLogHandler(address='/dev/log') - syslog.setFormatter(formatter) - logging.getLogger().addHandler(syslog) - logging.debug("Full set of FLAGS:") for flag in FLAGS: logging.debug("%s : %s" % (flag, FLAGS.get(flag, None))) -- cgit From d0f3ad141b848a927c452ba6c71180b21047bef8 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Thu, 22 Jul 2010 17:45:18 -0500 Subject: Fix syslogging of exceptions by stripping newlines from the exception info --- nova/twistd.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/nova/twistd.py b/nova/twistd.py index b389a73b5..ecb6e2892 100644 --- a/nova/twistd.py +++ b/nova/twistd.py @@ -22,7 +22,6 @@ manage pid files and support syslogging. """ import logging -import logging.handlers import os import signal import sys @@ -239,8 +238,16 @@ def serve(filename): print 'usage: %s [options] [start|stop|restart]' % argv[0] sys.exit(1) - formatter = logging.Formatter( - '(%(name)s): %(levelname)s %(message)r') + class NoNewlineFormatter(logging.Formatter): + """Strips newlines from default formatter""" + def format(self, record): + """Grabs default formatter's output and strips newlines""" + data = logging.Formatter.format(self, record) + return data.replace("\n", "--") + + # NOTE(vish): syslog-ng doesn't handle newlines from trackbacks very well + formatter = NoNewlineFormatter( + '(%(name)s): %(levelname)s %(message)s') handler = logging.StreamHandler(log.StdioOnnaStick()) handler.setFormatter(formatter) logging.getLogger().addHandler(handler) -- cgit From a766736b79f794f745438dd597f2aa529ad62ddc Mon Sep 17 00:00:00 2001 From: Andy Smith Date: Fri, 23 Jul 2010 04:44:23 +0200 Subject: Adds a Makefile to fill dependencies for testing. Depends upon pip being installed, but pip is pretty much the standard nowadays and is just an easy_install away if it isn't there. The only dependency installed on to the system is virtualenv which is used to make the other dependencies local to the current environment. Does not remove the need to install redis by hand, though I am in favor of making that possible (using aptitude on linux and brew on os x) I look forward to cutting away at some of these dependencies in further commits. --- Makefile | 27 +++++++++++++++++++++++++++ tools/pip-requires | 14 ++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 Makefile create mode 100644 tools/pip-requires diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..da69f2b72 --- /dev/null +++ b/Makefile @@ -0,0 +1,27 @@ +venv=.venv +with_venv=source $(venv)/bin/activate +installed=$(venv)/lib/python2.6/site-packages +twisted=$(installed)/twisted/__init__.py + + +test: python-dependencies $(twisted) + $(with_venv) && python run_tests.py + +clean: + rm -rf _trial_temp + rm -rf keys + rm -rf instances + rm -rf networks + +clean-all: clean + rm -rf $(venv) + +python-dependencies: $(venv) + pip install -q -E $(venv) -r tools/pip-requires + +$(venv): + pip install -q virtualenv + virtualenv -q --no-site-packages $(venv) + +$(twisted): + pip install -q -E $(venv) http://nova.openstack.org/Twisted-10.0.0Nova.tar.gz diff --git a/tools/pip-requires b/tools/pip-requires new file mode 100644 index 000000000..edb5fc01c --- /dev/null +++ b/tools/pip-requires @@ -0,0 +1,14 @@ +IPy==0.70 +M2Crypto==0.20.2 +amqplib==0.6.1 +anyjson==0.2.4 +boto==2.0b1 +carrot==0.10.5 +lockfile==0.8 +mox==0.5.0 +python-daemon==1.5.5 +python-gflags==1.3 +redis==2.0.0 +tornado==1.0 +wsgiref==0.1.2 +zope.interface==3.6.1 -- cgit From f3350750a7c68b191c3a85ac9caa5eff59b182a6 Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Fri, 23 Jul 2010 08:03:26 -0500 Subject: Check signature for S3 requests. --- nova/auth/signer.py | 8 ++++++++ nova/auth/users.py | 13 +++++++++++-- nova/objectstore/handler.py | 14 +++++++------- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/nova/auth/signer.py b/nova/auth/signer.py index 83831bfac..7d7471575 100644 --- a/nova/auth/signer.py +++ b/nova/auth/signer.py @@ -48,6 +48,7 @@ import hashlib import hmac import logging import urllib +import boto.utils from nova.exception import Error @@ -59,6 +60,13 @@ class Signer(object): if hashlib.sha256: self.hmac_256 = hmac.new(secret_key, digestmod=hashlib.sha256) + def s3_authorization(self, headers, verb, path): + c_string = boto.utils.canonical_string(verb, path, headers) + hmac = self.hmac.copy() + hmac.update(c_string) + b64_hmac = base64.encodestring(hmac.digest()).strip() + return b64_hmac + def generate(self, params, verb, server_string, path): if params['SignatureVersion'] == '0': return self._calc_signature_0(params) diff --git a/nova/auth/users.py b/nova/auth/users.py index fc08dc34d..0e9ca4eeb 100644 --- a/nova/auth/users.py +++ b/nova/auth/users.py @@ -395,11 +395,13 @@ class UserManager(object): def authenticate(self, access, signature, params, verb='GET', server_string='127.0.0.1:8773', path='/', - verify_signature=True): + check_type='ec2', headers=None): # TODO: Check for valid timestamp (access_key, sep, project_name) = access.partition(':') + logging.info('Looking up user: %r', access_key) user = self.get_user_from_access_key(access_key) + logging.info('user: %r', user) if user == None: raise exception.NotFound('No user found for access key %s' % access_key) @@ -413,7 +415,14 @@ class UserManager(object): if not user.is_admin() and not project.has_member(user): raise exception.NotFound('User %s is not a member of project %s' % (user.id, project.id)) - if verify_signature: + if check_type == 's3': + expected_signature = signer.Signer(user.secret.encode()).s3_authorization(headers, verb, path) + logging.debug('user.secret: %s', user.secret) + logging.debug('expected_signature: %s', expected_signature) + logging.debug('signature: %s', signature) + if signature != expected_signature: + raise exception.NotAuthorized('Signature does not match') + elif check_type == 'ec2': # NOTE(vish): hmac can't handle unicode, so encode ensures that # secret isn't unicode expected_signature = signer.Signer(user.secret.encode()).generate( diff --git a/nova/objectstore/handler.py b/nova/objectstore/handler.py index b2ed3d482..655cab752 100644 --- a/nova/objectstore/handler.py +++ b/nova/objectstore/handler.py @@ -106,8 +106,8 @@ def get_context(request): access, sep, secret = request.getHeader('Authorization').split(' ')[1].rpartition(':') um = users.UserManager.instance() print 'um %s' % um - (user, project) = um.authenticate(access, secret, {}, request.method, request.host, request.uri, False) - # FIXME: check signature here! + + (user, project) = um.authenticate(access, secret, {}, request.method, request.getRequestHostname(), request.uri, headers=request.getAllHeaders(), check_type='s3') return api.APIRequestContext(None, user, project) except exception.Error, ex: logging.debug("Authentication Failure: %s" % ex) @@ -117,7 +117,6 @@ class S3(Resource): """Implementation of an S3-like storage server based on local files.""" def getChild(self, name, request): request.context = get_context(request) - if name == '': return self elif name == '_images': @@ -126,6 +125,7 @@ class S3(Resource): return BucketResource(name) def render_GET(self, request): + logging.debug('List of buckets requested') buckets = [b for b in bucket.Bucket.all() if b.is_authorized(request.context)] render_xml(request, {"ListAllMyBucketsResult": { @@ -169,7 +169,8 @@ class BucketResource(Resource): logging.exception(e) logging.debug("calling bucket.Bucket.create(%r, %r)" % (self.name, request.context)) bucket.Bucket.create(self.name, request.context) - return '' + request.finish() + return server.NOT_DONE_YET def render_DELETE(self, request): logging.debug("Deleting bucket %s" % (self.name)) @@ -225,8 +226,6 @@ class ObjectResource(Resource): return '' class ImageResource(Resource): - isLeaf = True - def getChild(self, name, request): if name == '': return self @@ -239,9 +238,10 @@ class ImageResource(Resource): """ returns a json listing of all images that a user has permissions to see """ - images = [i for i in image.Image.all() if i.is_authorized(self.context)] + images = [i for i in image.Image.all() if i.is_authorized(request.context)] request.write(json.dumps([i.metadata for i in images])) + request.finish() return server.NOT_DONE_YET def render_PUT(self, request): -- cgit From abac2033b4aeb399786735a94ce2ddf5c64e6e9e Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Fri, 23 Jul 2010 23:55:39 +0200 Subject: Add (completely untested) code to include an Authorization header for the S3 request to fetch an image. --- nova/compute/node.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/nova/compute/node.py b/nova/compute/node.py index 4683f1c8d..7c09d9583 100644 --- a/nova/compute/node.py +++ b/nova/compute/node.py @@ -25,6 +25,7 @@ Compute Node: """ import base64 +import boto.utils import json import logging import os @@ -45,11 +46,13 @@ from nova import fakevirt from nova import flags from nova import process from nova import utils +from nova.auth import signer from nova.compute import disk from nova.compute import model from nova.compute import network from nova.objectstore import image # for image_path flag from nova.volume import storage +from nova.users import UserManager FLAGS = flags.FLAGS @@ -446,8 +449,12 @@ class Instance(object): def _fetch_s3_image(self, image, path): url = _image_url('%s/image' % image) + user_id = self.datamodel['user_id'] + user = UserManager.instance().get_user(user_id) + auth = signer.Signer(user.secret.encode()).s3_authorization({}, 'GET', url) + auth_header = 'Authorization: %s:%s' % (user_id, auth) d = process.simple_execute( - 'curl --silent %s -o %s' % (url, path)) + 'curl --silent %s -o "%s"' % (url, auth_header, path)) return d def _fetch_local_image(self, image, path): -- cgit From 35fda702abf91792d3c4753a1bbccdb119eaf6eb Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Fri, 23 Jul 2010 15:27:18 -0700 Subject: renamed xxxnode to xxservice --- bin/nova-compute | 4 +- bin/nova-network | 4 +- bin/nova-volume | 4 +- nova/compute/computenode.py | 582 ----------------------------------------- nova/compute/computeservice.py | 581 ++++++++++++++++++++++++++++++++++++++++ nova/endpoint/cloud.py | 15 +- nova/network/networknode.py | 35 --- nova/network/networkservice.py | 35 +++ nova/node.py | 103 -------- nova/service.py | 103 ++++++++ nova/test.py | 8 +- nova/tests/cloud_unittest.py | 18 +- nova/tests/compute_unittest.py | 36 +-- nova/tests/volume_unittest.py | 46 ++-- nova/volume/volumenode.py | 305 --------------------- nova/volume/volumeservice.py | 304 +++++++++++++++++++++ 16 files changed, 1090 insertions(+), 1093 deletions(-) delete mode 100644 nova/compute/computenode.py create mode 100644 nova/compute/computeservice.py delete mode 100644 nova/network/networknode.py create mode 100644 nova/network/networkservice.py delete mode 100644 nova/node.py create mode 100644 nova/service.py delete mode 100644 nova/volume/volumenode.py create mode 100644 nova/volume/volumeservice.py diff --git a/bin/nova-compute b/bin/nova-compute index 1d5fa709d..7ef5d074a 100755 --- a/bin/nova-compute +++ b/bin/nova-compute @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.compute import computenode +from nova.compute import computeservice if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = computenode.ComputeNode.create() + application = computeservice.ComputeService.create() diff --git a/bin/nova-network b/bin/nova-network index db9d4b970..0d3aa0002 100755 --- a/bin/nova-network +++ b/bin/nova-network @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.network import networknode +from nova.network import networkservice if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = networknode.NetworkNode.create() + application = networkservice.NetworkService.create() diff --git a/bin/nova-volume b/bin/nova-volume index 2e9b530a7..c1c0163cf 100755 --- a/bin/nova-volume +++ b/bin/nova-volume @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.volume import volumenode +from nova.volume import volumeservice if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = volumenode.VolumeNode.create() + application = volumeservice.VolumeService.create() diff --git a/nova/compute/computenode.py b/nova/compute/computenode.py deleted file mode 100644 index f26da1996..000000000 --- a/nova/compute/computenode.py +++ /dev/null @@ -1,582 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Compute Node: - - Runs on each compute node, managing the - hypervisor using libvirt. - -""" - -import base64 -import json -import logging -import os -import shutil -import sys -from twisted.internet import defer -from twisted.internet import task -from twisted.application import service - - -try: - import libvirt -except Exception, err: - logging.warning('no libvirt found') - -from nova import exception -from nova import fakevirt -from nova import flags -from nova import node -from nova import process -from nova import utils -from nova.compute import disk -from nova.compute import model -from nova.compute import network -from nova.objectstore import image # for image_path flag -from nova.volume import volumenode - - -FLAGS = flags.FLAGS -flags.DEFINE_string('libvirt_xml_template', - utils.abspath('compute/libvirt.xml.template'), - 'Libvirt XML Template') -flags.DEFINE_bool('use_s3', True, - 'whether to get images from s3 or use local copy') -flags.DEFINE_string('instances_path', utils.abspath('../instances'), - 'where instances are stored on disk') - -INSTANCE_TYPES = {} -INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} -INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} -INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} -INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} - - -def _image_path(path=''): - return os.path.join(FLAGS.images_path, path) - - -def _image_url(path): - return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) - - -class ComputeNode(node.Node): - """ - Manages the running instances. - """ - def __init__(self): - """ load configuration options for this node and connect to libvirt """ - super(ComputeNode, self).__init__() - self._instances = {} - self._conn = self._get_connection() - self.instdir = model.InstanceDirectory() - # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe - - def _get_connection(self): - """ returns a libvirt connection object """ - # TODO(termie): maybe lazy load after initial check for permissions - # TODO(termie): check whether we can be disconnected - if FLAGS.fake_libvirt: - conn = fakevirt.FakeVirtConnection.instance() - else: - auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], - 'root', - None] - conn = libvirt.openAuth('qemu:///system', auth, 0) - if conn == None: - logging.error('Failed to open connection to the hypervisor') - sys.exit(1) - return conn - - def noop(self): - """ simple test of an AMQP message call """ - return defer.succeed('PONG') - - def get_instance(self, instance_id): - # inst = self.instdir.get(instance_id) - # return inst - if self.instdir.exists(instance_id): - return Instance.fromName(self._conn, instance_id) - return None - - @exception.wrap_exception - def adopt_instances(self): - """ if there are instances already running, adopt them """ - return defer.succeed(0) - instance_names = [self._conn.lookupByID(x).name() - for x in self._conn.listDomainsID()] - for name in instance_names: - try: - new_inst = Instance.fromName(self._conn, name) - new_inst.update_state() - except: - pass - return defer.succeed(len(self._instances)) - - @exception.wrap_exception - def describe_instances(self): - retval = {} - for inst in self.instdir.by_node(FLAGS.node_name): - retval[inst['instance_id']] = ( - Instance.fromName(self._conn, inst['instance_id'])) - return retval - - @defer.inlineCallbacks - def report_state(self, nodename, daemon): - # TODO(termie): make this pattern be more elegant. -todd - try: - record = model.Daemon(nodename, daemon) - record.heartbeat() - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - except model.ConnectionError, ex: - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - yield - - # @exception.wrap_exception - def run_instance(self, instance_id, **_kwargs): - """ launch a new instance with specified options """ - logging.debug("Starting instance %s..." % (instance_id)) - inst = self.instdir.get(instance_id) - if not FLAGS.simple_network: - # TODO: Get the real security group of launch in here - security_group = "default" - net = network.BridgedNetwork.get_network_for_project(inst['user_id'], - inst['project_id'], - security_group).express() - inst['node_name'] = FLAGS.node_name - inst.save() - # TODO(vish) check to make sure the availability zone matches - new_inst = Instance(self._conn, name=instance_id, data=inst) - logging.info("Instances current state is %s", new_inst.state) - if new_inst.is_running(): - raise exception.Error("Instance is already running") - d = new_inst.spawn() - return d - - @exception.wrap_exception - def terminate_instance(self, instance_id): - """ terminate an instance on this machine """ - logging.debug("Got told to terminate instance %s" % instance_id) - instance = self.get_instance(instance_id) - # inst = self.instdir.get(instance_id) - if not instance: - raise exception.Error( - 'trying to terminate unknown instance: %s' % instance_id) - d = instance.destroy() - # d.addCallback(lambda x: inst.destroy()) - return d - - @exception.wrap_exception - def reboot_instance(self, instance_id): - """ reboot an instance on this server - KVM doesn't support reboot, so we terminate and restart """ - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to reboot unknown instance: %s' % instance_id) - return instance.reboot() - - @defer.inlineCallbacks - @exception.wrap_exception - def get_console_output(self, instance_id): - """ send the console output for an instance """ - logging.debug("Getting console output for %s" % (instance_id)) - inst = self.instdir.get(instance_id) - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to get console log for unknown: %s' % instance_id) - rv = yield instance.console_output() - # TODO(termie): this stuff belongs in the API layer, no need to - # munge the data we send to ourselves - output = {"InstanceId" : instance_id, - "Timestamp" : "2", - "output" : base64.b64encode(rv)} - defer.returnValue(output) - - @defer.inlineCallbacks - @exception.wrap_exception - def attach_volume(self, instance_id = None, - volume_id = None, mountpoint = None): - volume = volumenode.get_volume(volume_id) - yield self._init_aoe() - yield process.simple_execute( - "sudo virsh attach-disk %s /dev/etherd/%s %s" % - (instance_id, - volume['aoe_device'], - mountpoint.rpartition('/dev/')[2])) - volume.finish_attach() - defer.returnValue(True) - - @defer.inlineCallbacks - def _init_aoe(self): - yield process.simple_execute("sudo aoe-discover") - yield process.simple_execute("sudo aoe-stat") - - @defer.inlineCallbacks - @exception.wrap_exception - def detach_volume(self, instance_id, volume_id): - """ detach a volume from an instance """ - # despite the documentation, virsh detach-disk just wants the device - # name without the leading /dev/ - volume = volumenode.get_volume(volume_id) - target = volume['mountpoint'].rpartition('/dev/')[2] - yield process.simple_execute( - "sudo virsh detach-disk %s %s " % (instance_id, target)) - volume.finish_detach() - defer.returnValue(True) - - -class Group(object): - def __init__(self, group_id): - self.group_id = group_id - - -class ProductCode(object): - def __init__(self, product_code): - self.product_code = product_code - - -class Instance(object): - - NOSTATE = 0x00 - RUNNING = 0x01 - BLOCKED = 0x02 - PAUSED = 0x03 - SHUTDOWN = 0x04 - SHUTOFF = 0x05 - CRASHED = 0x06 - - def __init__(self, conn, name, data): - """ spawn an instance with a given name """ - self._conn = conn - # TODO(vish): this can be removed after data has been updated - # data doesn't seem to have a working iterator so in doesn't work - if data.get('owner_id', None) is not None: - data['user_id'] = data['owner_id'] - data['project_id'] = data['owner_id'] - self.datamodel = data - - size = data.get('instance_type', FLAGS.default_instance_type) - if size not in INSTANCE_TYPES: - raise exception.Error('invalid instance type: %s' % size) - - self.datamodel.update(INSTANCE_TYPES[size]) - - self.datamodel['name'] = name - self.datamodel['instance_id'] = name - self.datamodel['basepath'] = data.get( - 'basepath', os.path.abspath( - os.path.join(FLAGS.instances_path, self.name))) - self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 - self.datamodel.setdefault('image_id', FLAGS.default_image) - self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) - self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) - self.datamodel.setdefault('project_id', self.datamodel['user_id']) - self.datamodel.setdefault('bridge_name', None) - #self.datamodel.setdefault('key_data', None) - #self.datamodel.setdefault('key_name', None) - #self.datamodel.setdefault('addressing_type', None) - - # TODO(joshua) - The ugly non-flat ones - self.datamodel['groups'] = data.get('security_group', 'default') - # TODO(joshua): Support product codes somehow - self.datamodel.setdefault('product_codes', None) - - self.datamodel.save() - logging.debug("Finished init of Instance with id of %s" % name) - - def toXml(self): - # TODO(termie): cache? - logging.debug("Starting the toXML method") - libvirt_xml = open(FLAGS.libvirt_xml_template).read() - xml_info = self.datamodel.copy() - # TODO(joshua): Make this xml express the attached disks as well - - # TODO(termie): lazy lazy hack because xml is annoying - xml_info['nova'] = json.dumps(self.datamodel.copy()) - libvirt_xml = libvirt_xml % xml_info - logging.debug("Finished the toXML method") - - return libvirt_xml - - @classmethod - def fromName(cls, conn, name): - """ use the saved data for reloading the instance """ - instdir = model.InstanceDirectory() - instance = instdir.get(name) - return cls(conn=conn, name=name, data=instance) - - def set_state(self, state_code, state_description=None): - self.datamodel['state'] = state_code - if not state_description: - state_description = STATE_NAMES[state_code] - self.datamodel['state_description'] = state_description - self.datamodel.save() - - @property - def state(self): - # it is a string in datamodel - return int(self.datamodel['state']) - - @property - def name(self): - return self.datamodel['name'] - - def is_pending(self): - return (self.state == Instance.NOSTATE or self.state == 'pending') - - def is_destroyed(self): - return self.state == Instance.SHUTOFF - - def is_running(self): - logging.debug("Instance state is: %s" % self.state) - return (self.state == Instance.RUNNING or self.state == 'running') - - def describe(self): - return self.datamodel - - def info(self): - logging.debug("Getting info for dom %s" % self.name) - virt_dom = self._conn.lookupByName(self.name) - (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() - return {'state': state, - 'max_mem': max_mem, - 'mem': mem, - 'num_cpu': num_cpu, - 'cpu_time': cpu_time, - 'node_name': FLAGS.node_name} - - def basepath(self, path=''): - return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) - - def update_state(self): - self.datamodel.update(self.info()) - self.set_state(self.state) - self.datamodel.save() # Extra, but harmless - - @exception.wrap_exception - def destroy(self): - if self.is_destroyed(): - self.datamodel.destroy() - raise exception.Error('trying to destroy already destroyed' - ' instance: %s' % self.name) - - self.set_state(Instance.NOSTATE, 'shutting_down') - try: - virt_dom = self._conn.lookupByName(self.name) - virt_dom.destroy() - except Exception, _err: - pass - # If the instance is already terminated, we're still happy - d = defer.Deferred() - d.addCallback(lambda x: self._cleanup()) - d.addCallback(lambda x: self.datamodel.destroy()) - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, - # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) - def _wait_for_shutdown(): - try: - self.update_state() - if self.state == Instance.SHUTDOWN: - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d - - def _cleanup(self): - target = os.path.abspath(self.datamodel['basepath']) - logging.info("Deleting instance files at %s", target) - shutil.rmtree(target) - - @defer.inlineCallbacks - @exception.wrap_exception - def reboot(self): - if not self.is_running(): - raise exception.Error( - 'trying to reboot a non-running' - 'instance: %s (state: %s)' % (self.name, self.state)) - - logging.debug('rebooting instance %s' % self.name) - self.set_state(Instance.NOSTATE, 'rebooting') - yield self._conn.lookupByName(self.name).destroy() - self._conn.createXML(self.toXml(), 0) - - d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_reboot(): - try: - self.update_state() - if self.is_running(): - logging.debug('rebooted instance %s' % self.name) - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d - - def _fetch_s3_image(self, image, path): - url = _image_url('%s/image' % image) - d = process.simple_execute( - 'curl --silent %s -o %s' % (url, path)) - return d - - def _fetch_local_image(self, image, path): - source = _image_path('%s/image' % image) - d = process.simple_execute('cp %s %s' % (source, path)) - return d - - @defer.inlineCallbacks - def _create_image(self, libvirt_xml): - # syntactic nicety - data = self.datamodel - basepath = self.basepath - - # ensure directories exist and are writable - yield process.simple_execute( - 'mkdir -p %s' % basepath()) - yield process.simple_execute( - 'chmod 0777 %s' % basepath()) - - - # TODO(termie): these are blocking calls, it would be great - # if they weren't. - logging.info('Creating image for: %s', data['instance_id']) - f = open(basepath('libvirt.xml'), 'w') - f.write(libvirt_xml) - f.close() - - if FLAGS.fake_libvirt: - logging.info('fake_libvirt, nothing to do for create_image') - raise defer.returnValue(None); - - if FLAGS.use_s3: - _fetch_file = self._fetch_s3_image - else: - _fetch_file = self._fetch_local_image - - if not os.path.exists(basepath('disk')): - yield _fetch_file(data['image_id'], basepath('disk-raw')) - if not os.path.exists(basepath('kernel')): - yield _fetch_file(data['kernel_id'], basepath('kernel')) - if not os.path.exists(basepath('ramdisk')): - yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) - - execute = lambda cmd, input=None: \ - process.simple_execute(cmd=cmd, - input=input, - error_ok=1) - - key = data['key_data'] - net = None - if FLAGS.simple_network: - with open(FLAGS.simple_network_template) as f: - net = f.read() % {'address': data['private_dns_name'], - 'network': FLAGS.simple_network_network, - 'netmask': FLAGS.simple_network_netmask, - 'gateway': FLAGS.simple_network_gateway, - 'broadcast': FLAGS.simple_network_broadcast, - 'dns': FLAGS.simple_network_dns} - if key or net: - logging.info('Injecting data into image %s', data['image_id']) - yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) - - if os.path.exists(basepath('disk')): - yield process.simple_execute( - 'rm -f %s' % basepath('disk')) - - bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] - * 1024 * 1024 * 1024) - yield disk.partition( - basepath('disk-raw'), basepath('disk'), bytes, execute=execute) - - @defer.inlineCallbacks - @exception.wrap_exception - def spawn(self): - self.set_state(Instance.NOSTATE, 'spawning') - logging.debug("Starting spawn in Instance") - - xml = self.toXml() - self.set_state(Instance.NOSTATE, 'launching') - logging.info('self %s', self) - try: - yield self._create_image(xml) - self._conn.createXML(xml, 0) - # TODO(termie): this should actually register - # a callback to check for successful boot - logging.debug("Instance is running") - - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_boot(): - try: - self.update_state() - if self.is_running(): - logging.debug('booted instance %s' % self.name) - timer.stop() - local_d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - logging.error('Failed to boot instance %s' % self.name) - timer.stop() - local_d.callback(None) - timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - except Exception, ex: - logging.debug(ex) - self.set_state(Instance.SHUTDOWN) - - @exception.wrap_exception - def console_output(self): - if not FLAGS.fake_libvirt: - fname = os.path.abspath( - os.path.join(self.datamodel['basepath'], 'console.log')) - with open(fname, 'r') as f: - console = f.read() - else: - console = 'FAKE CONSOLE OUTPUT' - return defer.succeed(console) - -STATE_NAMES = { - Instance.NOSTATE : 'pending', - Instance.RUNNING : 'running', - Instance.BLOCKED : 'blocked', - Instance.PAUSED : 'paused', - Instance.SHUTDOWN : 'shutdown', - Instance.SHUTOFF : 'shutdown', - Instance.CRASHED : 'crashed', -} diff --git a/nova/compute/computeservice.py b/nova/compute/computeservice.py new file mode 100644 index 000000000..5568e3888 --- /dev/null +++ b/nova/compute/computeservice.py @@ -0,0 +1,581 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Compute Service: + + Runs on each compute host, managing the + hypervisor using libvirt. + +""" + +import base64 +import json +import logging +import os +import shutil +import sys +from twisted.internet import defer +from twisted.internet import task + + +try: + import libvirt +except Exception, err: + logging.warning('no libvirt found') + +from nova import exception +from nova import fakevirt +from nova import flags +from nova import process +from nova import service +from nova import utils +from nova.compute import disk +from nova.compute import model +from nova.compute import network +from nova.objectstore import image # for image_path flag +from nova.volume import volumeservice + + +FLAGS = flags.FLAGS +flags.DEFINE_string('libvirt_xml_template', + utils.abspath('compute/libvirt.xml.template'), + 'Libvirt XML Template') +flags.DEFINE_bool('use_s3', True, + 'whether to get images from s3 or use local copy') +flags.DEFINE_string('instances_path', utils.abspath('../instances'), + 'where instances are stored on disk') + +INSTANCE_TYPES = {} +INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} +INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} +INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} +INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} + + +def _image_path(path=''): + return os.path.join(FLAGS.images_path, path) + + +def _image_url(path): + return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) + + +class ComputeService(service.Service): + """ + Manages the running instances. + """ + def __init__(self): + """ load configuration options for this node and connect to libvirt """ + super(ComputeService, self).__init__() + self._instances = {} + self._conn = self._get_connection() + self.instdir = model.InstanceDirectory() + # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe + + def _get_connection(self): + """ returns a libvirt connection object """ + # TODO(termie): maybe lazy load after initial check for permissions + # TODO(termie): check whether we can be disconnected + if FLAGS.fake_libvirt: + conn = fakevirt.FakeVirtConnection.instance() + else: + auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], + 'root', + None] + conn = libvirt.openAuth('qemu:///system', auth, 0) + if conn == None: + logging.error('Failed to open connection to the hypervisor') + sys.exit(1) + return conn + + def noop(self): + """ simple test of an AMQP message call """ + return defer.succeed('PONG') + + def get_instance(self, instance_id): + # inst = self.instdir.get(instance_id) + # return inst + if self.instdir.exists(instance_id): + return Instance.fromName(self._conn, instance_id) + return None + + @exception.wrap_exception + def adopt_instances(self): + """ if there are instances already running, adopt them """ + return defer.succeed(0) + instance_names = [self._conn.lookupByID(x).name() + for x in self._conn.listDomainsID()] + for name in instance_names: + try: + new_inst = Instance.fromName(self._conn, name) + new_inst.update_state() + except: + pass + return defer.succeed(len(self._instances)) + + @exception.wrap_exception + def describe_instances(self): + retval = {} + for inst in self.instdir.by_node(FLAGS.node_name): + retval[inst['instance_id']] = ( + Instance.fromName(self._conn, inst['instance_id'])) + return retval + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except model.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield + + # @exception.wrap_exception + def run_instance(self, instance_id, **_kwargs): + """ launch a new instance with specified options """ + logging.debug("Starting instance %s..." % (instance_id)) + inst = self.instdir.get(instance_id) + if not FLAGS.simple_network: + # TODO: Get the real security group of launch in here + security_group = "default" + net = network.BridgedNetwork.get_network_for_project(inst['user_id'], + inst['project_id'], + security_group).express() + inst['node_name'] = FLAGS.node_name + inst.save() + # TODO(vish) check to make sure the availability zone matches + new_inst = Instance(self._conn, name=instance_id, data=inst) + logging.info("Instances current state is %s", new_inst.state) + if new_inst.is_running(): + raise exception.Error("Instance is already running") + d = new_inst.spawn() + return d + + @exception.wrap_exception + def terminate_instance(self, instance_id): + """ terminate an instance on this machine """ + logging.debug("Got told to terminate instance %s" % instance_id) + instance = self.get_instance(instance_id) + # inst = self.instdir.get(instance_id) + if not instance: + raise exception.Error( + 'trying to terminate unknown instance: %s' % instance_id) + d = instance.destroy() + # d.addCallback(lambda x: inst.destroy()) + return d + + @exception.wrap_exception + def reboot_instance(self, instance_id): + """ reboot an instance on this server + KVM doesn't support reboot, so we terminate and restart """ + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to reboot unknown instance: %s' % instance_id) + return instance.reboot() + + @defer.inlineCallbacks + @exception.wrap_exception + def get_console_output(self, instance_id): + """ send the console output for an instance """ + logging.debug("Getting console output for %s" % (instance_id)) + inst = self.instdir.get(instance_id) + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to get console log for unknown: %s' % instance_id) + rv = yield instance.console_output() + # TODO(termie): this stuff belongs in the API layer, no need to + # munge the data we send to ourselves + output = {"InstanceId" : instance_id, + "Timestamp" : "2", + "output" : base64.b64encode(rv)} + defer.returnValue(output) + + @defer.inlineCallbacks + @exception.wrap_exception + def attach_volume(self, instance_id = None, + volume_id = None, mountpoint = None): + volume = volumeservice.get_volume(volume_id) + yield self._init_aoe() + yield process.simple_execute( + "sudo virsh attach-disk %s /dev/etherd/%s %s" % + (instance_id, + volume['aoe_device'], + mountpoint.rpartition('/dev/')[2])) + volume.finish_attach() + defer.returnValue(True) + + @defer.inlineCallbacks + def _init_aoe(self): + yield process.simple_execute("sudo aoe-discover") + yield process.simple_execute("sudo aoe-stat") + + @defer.inlineCallbacks + @exception.wrap_exception + def detach_volume(self, instance_id, volume_id): + """ detach a volume from an instance """ + # despite the documentation, virsh detach-disk just wants the device + # name without the leading /dev/ + volume = volumeservice.get_volume(volume_id) + target = volume['mountpoint'].rpartition('/dev/')[2] + yield process.simple_execute( + "sudo virsh detach-disk %s %s " % (instance_id, target)) + volume.finish_detach() + defer.returnValue(True) + + +class Group(object): + def __init__(self, group_id): + self.group_id = group_id + + +class ProductCode(object): + def __init__(self, product_code): + self.product_code = product_code + + +class Instance(object): + + NOSTATE = 0x00 + RUNNING = 0x01 + BLOCKED = 0x02 + PAUSED = 0x03 + SHUTDOWN = 0x04 + SHUTOFF = 0x05 + CRASHED = 0x06 + + def __init__(self, conn, name, data): + """ spawn an instance with a given name """ + self._conn = conn + # TODO(vish): this can be removed after data has been updated + # data doesn't seem to have a working iterator so in doesn't work + if data.get('owner_id', None) is not None: + data['user_id'] = data['owner_id'] + data['project_id'] = data['owner_id'] + self.datamodel = data + + size = data.get('instance_type', FLAGS.default_instance_type) + if size not in INSTANCE_TYPES: + raise exception.Error('invalid instance type: %s' % size) + + self.datamodel.update(INSTANCE_TYPES[size]) + + self.datamodel['name'] = name + self.datamodel['instance_id'] = name + self.datamodel['basepath'] = data.get( + 'basepath', os.path.abspath( + os.path.join(FLAGS.instances_path, self.name))) + self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 + self.datamodel.setdefault('image_id', FLAGS.default_image) + self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) + self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) + self.datamodel.setdefault('project_id', self.datamodel['user_id']) + self.datamodel.setdefault('bridge_name', None) + #self.datamodel.setdefault('key_data', None) + #self.datamodel.setdefault('key_name', None) + #self.datamodel.setdefault('addressing_type', None) + + # TODO(joshua) - The ugly non-flat ones + self.datamodel['groups'] = data.get('security_group', 'default') + # TODO(joshua): Support product codes somehow + self.datamodel.setdefault('product_codes', None) + + self.datamodel.save() + logging.debug("Finished init of Instance with id of %s" % name) + + def toXml(self): + # TODO(termie): cache? + logging.debug("Starting the toXML method") + libvirt_xml = open(FLAGS.libvirt_xml_template).read() + xml_info = self.datamodel.copy() + # TODO(joshua): Make this xml express the attached disks as well + + # TODO(termie): lazy lazy hack because xml is annoying + xml_info['nova'] = json.dumps(self.datamodel.copy()) + libvirt_xml = libvirt_xml % xml_info + logging.debug("Finished the toXML method") + + return libvirt_xml + + @classmethod + def fromName(cls, conn, name): + """ use the saved data for reloading the instance """ + instdir = model.InstanceDirectory() + instance = instdir.get(name) + return cls(conn=conn, name=name, data=instance) + + def set_state(self, state_code, state_description=None): + self.datamodel['state'] = state_code + if not state_description: + state_description = STATE_NAMES[state_code] + self.datamodel['state_description'] = state_description + self.datamodel.save() + + @property + def state(self): + # it is a string in datamodel + return int(self.datamodel['state']) + + @property + def name(self): + return self.datamodel['name'] + + def is_pending(self): + return (self.state == Instance.NOSTATE or self.state == 'pending') + + def is_destroyed(self): + return self.state == Instance.SHUTOFF + + def is_running(self): + logging.debug("Instance state is: %s" % self.state) + return (self.state == Instance.RUNNING or self.state == 'running') + + def describe(self): + return self.datamodel + + def info(self): + logging.debug("Getting info for dom %s" % self.name) + virt_dom = self._conn.lookupByName(self.name) + (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() + return {'state': state, + 'max_mem': max_mem, + 'mem': mem, + 'num_cpu': num_cpu, + 'cpu_time': cpu_time, + 'node_name': FLAGS.node_name} + + def basepath(self, path=''): + return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) + + def update_state(self): + self.datamodel.update(self.info()) + self.set_state(self.state) + self.datamodel.save() # Extra, but harmless + + @exception.wrap_exception + def destroy(self): + if self.is_destroyed(): + self.datamodel.destroy() + raise exception.Error('trying to destroy already destroyed' + ' instance: %s' % self.name) + + self.set_state(Instance.NOSTATE, 'shutting_down') + try: + virt_dom = self._conn.lookupByName(self.name) + virt_dom.destroy() + except Exception, _err: + pass + # If the instance is already terminated, we're still happy + d = defer.Deferred() + d.addCallback(lambda x: self._cleanup()) + d.addCallback(lambda x: self.datamodel.destroy()) + # TODO(termie): short-circuit me for tests + # WE'LL save this for when we do shutdown, + # instead of destroy - but destroy returns immediately + timer = task.LoopingCall(f=None) + def _wait_for_shutdown(): + try: + self.update_state() + if self.state == Instance.SHUTDOWN: + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_shutdown + timer.start(interval=0.5, now=True) + return d + + def _cleanup(self): + target = os.path.abspath(self.datamodel['basepath']) + logging.info("Deleting instance files at %s", target) + shutil.rmtree(target) + + @defer.inlineCallbacks + @exception.wrap_exception + def reboot(self): + if not self.is_running(): + raise exception.Error( + 'trying to reboot a non-running' + 'instance: %s (state: %s)' % (self.name, self.state)) + + logging.debug('rebooting instance %s' % self.name) + self.set_state(Instance.NOSTATE, 'rebooting') + yield self._conn.lookupByName(self.name).destroy() + self._conn.createXML(self.toXml(), 0) + + d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_reboot(): + try: + self.update_state() + if self.is_running(): + logging.debug('rebooted instance %s' % self.name) + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_reboot + timer.start(interval=0.5, now=True) + yield d + + def _fetch_s3_image(self, image, path): + url = _image_url('%s/image' % image) + d = process.simple_execute( + 'curl --silent %s -o %s' % (url, path)) + return d + + def _fetch_local_image(self, image, path): + source = _image_path('%s/image' % image) + d = process.simple_execute('cp %s %s' % (source, path)) + return d + + @defer.inlineCallbacks + def _create_image(self, libvirt_xml): + # syntactic nicety + data = self.datamodel + basepath = self.basepath + + # ensure directories exist and are writable + yield process.simple_execute( + 'mkdir -p %s' % basepath()) + yield process.simple_execute( + 'chmod 0777 %s' % basepath()) + + + # TODO(termie): these are blocking calls, it would be great + # if they weren't. + logging.info('Creating image for: %s', data['instance_id']) + f = open(basepath('libvirt.xml'), 'w') + f.write(libvirt_xml) + f.close() + + if FLAGS.fake_libvirt: + logging.info('fake_libvirt, nothing to do for create_image') + raise defer.returnValue(None); + + if FLAGS.use_s3: + _fetch_file = self._fetch_s3_image + else: + _fetch_file = self._fetch_local_image + + if not os.path.exists(basepath('disk')): + yield _fetch_file(data['image_id'], basepath('disk-raw')) + if not os.path.exists(basepath('kernel')): + yield _fetch_file(data['kernel_id'], basepath('kernel')) + if not os.path.exists(basepath('ramdisk')): + yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) + + execute = lambda cmd, input=None: \ + process.simple_execute(cmd=cmd, + input=input, + error_ok=1) + + key = data['key_data'] + net = None + if FLAGS.simple_network: + with open(FLAGS.simple_network_template) as f: + net = f.read() % {'address': data['private_dns_name'], + 'network': FLAGS.simple_network_network, + 'netmask': FLAGS.simple_network_netmask, + 'gateway': FLAGS.simple_network_gateway, + 'broadcast': FLAGS.simple_network_broadcast, + 'dns': FLAGS.simple_network_dns} + if key or net: + logging.info('Injecting data into image %s', data['image_id']) + yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) + + if os.path.exists(basepath('disk')): + yield process.simple_execute( + 'rm -f %s' % basepath('disk')) + + bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] + * 1024 * 1024 * 1024) + yield disk.partition( + basepath('disk-raw'), basepath('disk'), bytes, execute=execute) + + @defer.inlineCallbacks + @exception.wrap_exception + def spawn(self): + self.set_state(Instance.NOSTATE, 'spawning') + logging.debug("Starting spawn in Instance") + + xml = self.toXml() + self.set_state(Instance.NOSTATE, 'launching') + logging.info('self %s', self) + try: + yield self._create_image(xml) + self._conn.createXML(xml, 0) + # TODO(termie): this should actually register + # a callback to check for successful boot + logging.debug("Instance is running") + + local_d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_boot(): + try: + self.update_state() + if self.is_running(): + logging.debug('booted instance %s' % self.name) + timer.stop() + local_d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + logging.error('Failed to boot instance %s' % self.name) + timer.stop() + local_d.callback(None) + timer.f = _wait_for_boot + timer.start(interval=0.5, now=True) + except Exception, ex: + logging.debug(ex) + self.set_state(Instance.SHUTDOWN) + + @exception.wrap_exception + def console_output(self): + if not FLAGS.fake_libvirt: + fname = os.path.abspath( + os.path.join(self.datamodel['basepath'], 'console.log')) + with open(fname, 'r') as f: + console = f.read() + else: + console = 'FAKE CONSOLE OUTPUT' + return defer.succeed(console) + +STATE_NAMES = { + Instance.NOSTATE : 'pending', + Instance.RUNNING : 'running', + Instance.BLOCKED : 'blocked', + Instance.PAUSED : 'paused', + Instance.SHUTDOWN : 'shutdown', + Instance.SHUTOFF : 'shutdown', + Instance.CRASHED : 'crashed', +} diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py index eaa608b1e..6e9bdead8 100644 --- a/nova/endpoint/cloud.py +++ b/nova/endpoint/cloud.py @@ -23,7 +23,6 @@ datastore. """ import base64 -import json import logging import os import time @@ -38,9 +37,9 @@ from nova.auth import rbac from nova.auth import users from nova.compute import model from nova.compute import network -from nova.compute import computenode +from nova.compute import computeservice from nova.endpoint import images -from nova.volume import volumenode +from nova.volume import volumeservice FLAGS = flags.FLAGS @@ -76,7 +75,7 @@ class CloudController(object): def volumes(self): """ returns a list of all volumes """ for volume_id in datastore.Redis.instance().smembers("volumes"): - volume = volumenode.get_volume(volume_id) + volume = volumeservice.get_volume(volume_id) yield volume def __str__(self): @@ -103,7 +102,7 @@ class CloudController(object): result = {} for instance in self.instdir.all: if instance['project_id'] == project_id: - line = '%s slots=%d' % (instance['private_dns_name'], computenode.INSTANCE_TYPES[instance['instance_type']]['vcpus']) + line = '%s slots=%d' % (instance['private_dns_name'], computeservice.INSTANCE_TYPES[instance['instance_type']]['vcpus']) if instance['key_name'] in result: result[instance['key_name']].append(line) else: @@ -296,7 +295,7 @@ class CloudController(object): @rbac.allow('projectmanager', 'sysadmin') def create_volume(self, context, size, **kwargs): - # TODO(vish): refactor this to create the volume object here and tell volumenode to create it + # TODO(vish): refactor this to create the volume object here and tell volumeservice to create it res = rpc.call(FLAGS.volume_topic, {"method": "create_volume", "args" : {"size": size, "user_id": context.user.id, @@ -331,7 +330,7 @@ class CloudController(object): raise exception.NotFound('Instance %s could not be found' % instance_id) def _get_volume(self, context, volume_id): - volume = volumenode.get_volume(volume_id) + volume = volumeservice.get_volume(volume_id) if context.user.is_admin() or volume['project_id'] == context.project.id: return volume raise exception.NotFound('Volume %s could not be found' % volume_id) @@ -578,7 +577,7 @@ class CloudController(object): "args": {"instance_id" : inst.instance_id}}) logging.debug("Casting to node for %s's instance with IP of %s" % (context.user.name, inst['private_dns_name'])) - # TODO: Make the NetworkComputeNode figure out the network name from ip. + # TODO: Make Network figure out the network name from ip. return defer.succeed(self._format_instances( context, reservation_id)) diff --git a/nova/network/networknode.py b/nova/network/networknode.py deleted file mode 100644 index e5a346551..000000000 --- a/nova/network/networknode.py +++ /dev/null @@ -1,35 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Network Nodes are responsible for allocating ips and setting up network -""" - -import logging - -from nova import flags -from nova import node - - -FLAGS = flags.FLAGS - -class NetworkNode(node.Node): - """Allocates ips and sets up networks""" - - def __init__(self): - logging.debug("Network node working") diff --git a/nova/network/networkservice.py b/nova/network/networkservice.py new file mode 100644 index 000000000..9d87e05e6 --- /dev/null +++ b/nova/network/networkservice.py @@ -0,0 +1,35 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Network Nodes are responsible for allocating ips and setting up network +""" + +import logging + +from nova import flags +from nova import service + + +FLAGS = flags.FLAGS + +class NetworkService(service.Service): + """Allocates ips and sets up networks""" + + def __init__(self): + logging.debug("Network node working") diff --git a/nova/node.py b/nova/node.py deleted file mode 100644 index 852344da9..000000000 --- a/nova/node.py +++ /dev/null @@ -1,103 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Generic Node baseclass for all workers that run on hosts -""" - -import inspect -import logging -import os - -from twisted.internet import defer -from twisted.internet import task -from twisted.application import service - -from nova import datastore -from nova import flags -from nova import rpc -from nova.compute import model - - -FLAGS = flags.FLAGS - -flags.DEFINE_integer('report_interval', 10, - 'seconds between nodes reporting state to cloud', - lower_bound=1) - -class Node(object, service.Service): - """Base class for workers that run on hosts""" - - @classmethod - def create(cls, - report_interval=None, # defaults to flag - bin_name=None, # defaults to basename of executable - topic=None): # defaults to basename - "nova-" part - """Instantiates class and passes back application object""" - if not report_interval: - # NOTE(vish): set here because if it is set to flag in the - # parameter list, it wrongly uses the default - report_interval = FLAGS.report_interval - # NOTE(vish): magic to automatically determine bin_name and topic - if not bin_name: - bin_name = os.path.basename(inspect.stack()[-1][1]) - if not topic: - topic = bin_name.rpartition("nova-")[2] - logging.warn("Starting %s node" % topic) - node_instance = cls() - - conn = rpc.Connection.instance() - consumer_all = rpc.AdapterConsumer( - connection=conn, - topic='%s' % topic, - proxy=node_instance) - - consumer_node = rpc.AdapterConsumer( - connection=conn, - topic='%s.%s' % (topic, FLAGS.node_name), - proxy=node_instance) - - pulse = task.LoopingCall(node_instance.report_state, - FLAGS.node_name, - bin_name) - pulse.start(interval=report_interval, now=False) - - consumer_all.attach_to_twisted() - consumer_node.attach_to_twisted() - - # This is the parent service that twistd will be looking for when it - # parses this file, return it so that we can get it into globals below - application = service.Application(bin_name) - node_instance.setServiceParent(application) - return application - - @defer.inlineCallbacks - def report_state(self, nodename, daemon): - # TODO(termie): make this pattern be more elegant. -todd - try: - record = model.Daemon(nodename, daemon) - record.heartbeat() - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - except datastore.ConnectionError, ex: - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - yield diff --git a/nova/service.py b/nova/service.py new file mode 100644 index 000000000..96281bc6b --- /dev/null +++ b/nova/service.py @@ -0,0 +1,103 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Generic Node baseclass for all workers that run on hosts +""" + +import inspect +import logging +import os + +from twisted.internet import defer +from twisted.internet import task +from twisted.application import service + +from nova import datastore +from nova import flags +from nova import rpc +from nova.compute import model + + +FLAGS = flags.FLAGS + +flags.DEFINE_integer('report_interval', 10, + 'seconds between nodes reporting state to cloud', + lower_bound=1) + +class Service(object, service.Service): + """Base class for workers that run on hosts""" + + @classmethod + def create(cls, + report_interval=None, # defaults to flag + bin_name=None, # defaults to basename of executable + topic=None): # defaults to basename - "nova-" part + """Instantiates class and passes back application object""" + if not report_interval: + # NOTE(vish): set here because if it is set to flag in the + # parameter list, it wrongly uses the default + report_interval = FLAGS.report_interval + # NOTE(vish): magic to automatically determine bin_name and topic + if not bin_name: + bin_name = os.path.basename(inspect.stack()[-1][1]) + if not topic: + topic = bin_name.rpartition("nova-")[2] + logging.warn("Starting %s node" % topic) + node_instance = cls() + + conn = rpc.Connection.instance() + consumer_all = rpc.AdapterConsumer( + connection=conn, + topic='%s' % topic, + proxy=node_instance) + + consumer_node = rpc.AdapterConsumer( + connection=conn, + topic='%s.%s' % (topic, FLAGS.node_name), + proxy=node_instance) + + pulse = task.LoopingCall(node_instance.report_state, + FLAGS.node_name, + bin_name) + pulse.start(interval=report_interval, now=False) + + consumer_all.attach_to_twisted() + consumer_node.attach_to_twisted() + + # This is the parent service that twistd will be looking for when it + # parses this file, return it so that we can get it into globals below + application = service.Application(bin_name) + node_instance.setServiceParent(application) + return application + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except datastore.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield diff --git a/nova/test.py b/nova/test.py index 5dcf0b9b0..6fbcab5e4 100644 --- a/nova/test.py +++ b/nova/test.py @@ -156,9 +156,9 @@ class BaseTestCase(TrialTestCase): Example (callback chain, ugly): - d = self.node.terminate_instance(instance_id) # a Deferred instance + d = self.compute.terminate_instance(instance_id) # a Deferred instance def _describe(_): - d_desc = self.node.describe_instances() # another Deferred instance + d_desc = self.compute.describe_instances() # another Deferred instance return d_desc def _checkDescribe(rv): self.assertEqual(rv, []) @@ -169,8 +169,8 @@ class BaseTestCase(TrialTestCase): Example (inline callbacks! yay!): - yield self.node.terminate_instance(instance_id) - rv = yield self.node.describe_instances() + yield self.compute.terminate_instance(instance_id) + rv = yield self.compute.describe_instances() self.assertEqual(rv, []) If the test fits the Inline Callbacks pattern we will automatically diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index 7ab2c257a..38f4de8d9 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -28,7 +28,7 @@ from nova import flags from nova import rpc from nova import test from nova.auth import users -from nova.compute import computenode +from nova.compute import computeservice from nova.endpoint import api from nova.endpoint import cloud @@ -53,12 +53,12 @@ class CloudTestCase(test.BaseTestCase): proxy=self.cloud) self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) - # set up a node - self.node = computenode.ComputeNode() - self.node_consumer = rpc.AdapterConsumer(connection=self.conn, + # set up a service + self.compute = computeservice.ComputeService() + self.compute_consumer = rpc.AdapterConsumer(connection=self.conn, topic=FLAGS.compute_topic, - proxy=self.node) - self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop)) + proxy=self.compute) + self.injected.append(self.compute_consumer.attach_to_tornado(self.ioloop)) try: users.UserManager.instance().create_user('admin', 'admin', 'admin') @@ -76,11 +76,11 @@ class CloudTestCase(test.BaseTestCase): logging.debug("Can't test instances without a real virtual env.") return instance_id = 'foo' - inst = yield self.node.run_instance(instance_id) + inst = yield self.compute.run_instance(instance_id) output = yield self.cloud.get_console_output(self.context, [instance_id]) logging.debug(output) self.assert_(output) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) def test_run_instances(self): if FLAGS.fake_libvirt: @@ -112,7 +112,7 @@ class CloudTestCase(test.BaseTestCase): # for instance in reservations[res_id]: for instance in reservations[reservations.keys()[0]]: logging.debug("Terminating instance %s" % instance['instance_id']) - rv = yield self.node.terminate_instance(instance['instance_id']) + rv = yield self.compute.terminate_instance(instance['instance_id']) def test_instance_update_state(self): def instance(num): diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py index 4c0f1afb3..db08308bb 100644 --- a/nova/tests/compute_unittest.py +++ b/nova/tests/compute_unittest.py @@ -26,7 +26,7 @@ from nova import flags from nova import test from nova import utils from nova.compute import model -from nova.compute import computenode +from nova.compute import computeservice FLAGS = flags.FLAGS @@ -60,7 +60,7 @@ class ComputeConnectionTestCase(test.TrialTestCase): self.flags(fake_libvirt=True, fake_storage=True, fake_users=True) - self.node = computenode.ComputeNode() + self.compute = computeservice.ComputeService() def create_instance(self): instdir = model.InstanceDirectory() @@ -81,48 +81,48 @@ class ComputeConnectionTestCase(test.TrialTestCase): def test_run_describe_terminate(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() logging.info("Running instances: %s", rv) self.assertEqual(rv[instance_id].name, instance_id) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() logging.info("After terminating instances: %s", rv) self.assertEqual(rv, {}) @defer.inlineCallbacks def test_reboot(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() self.assertEqual(rv[instance_id].name, instance_id) - yield self.node.reboot_instance(instance_id) + yield self.compute.reboot_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() self.assertEqual(rv[instance_id].name, instance_id) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) @defer.inlineCallbacks def test_console_output(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - console = yield self.node.get_console_output(instance_id) + console = yield self.compute.get_console_output(instance_id) self.assert_(console) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) @defer.inlineCallbacks def test_run_instance_existing(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() self.assertEqual(rv[instance_id].name, instance_id) - self.assertRaises(exception.Error, self.node.run_instance, instance_id) - rv = yield self.node.terminate_instance(instance_id) + self.assertRaises(exception.Error, self.compute.run_instance, instance_id) + rv = yield self.compute.terminate_instance(instance_id) diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py index c176453d8..568b199a0 100644 --- a/nova/tests/volume_unittest.py +++ b/nova/tests/volume_unittest.py @@ -21,8 +21,8 @@ import logging from nova import exception from nova import flags from nova import test -from nova.compute import computenode -from nova.volume import volumenode +from nova.compute import computeservice +from nova.volume import volumeservice FLAGS = flags.FLAGS @@ -32,24 +32,24 @@ class VolumeTestCase(test.TrialTestCase): def setUp(self): logging.getLogger().setLevel(logging.DEBUG) super(VolumeTestCase, self).setUp() - self.mynode = computenode.ComputeNode() - self.mystorage = None + self.compute = computeservice.ComputeService() + self.volume = None self.flags(fake_libvirt=True, fake_storage=True) - self.mystorage = volumenode.VolumeNode() + self.volume = volumeservice.VolumeService() def test_run_create_volume(self): vol_size = '0' user_id = 'fake' project_id = 'fake' - volume_id = self.mystorage.create_volume(vol_size, user_id, project_id) + volume_id = self.volume.create_volume(vol_size, user_id, project_id) # TODO(termie): get_volume returns differently than create_volume self.assertEqual(volume_id, - volumenode.get_volume(volume_id)['volume_id']) + volumeservice.get_volume(volume_id)['volume_id']) - rv = self.mystorage.delete_volume(volume_id) + rv = self.volume.delete_volume(volume_id) self.assertRaises(exception.Error, - volumenode.get_volume, + volumeservice.get_volume, volume_id) def test_too_big_volume(self): @@ -57,7 +57,7 @@ class VolumeTestCase(test.TrialTestCase): user_id = 'fake' project_id = 'fake' self.assertRaises(TypeError, - self.mystorage.create_volume, + self.volume.create_volume, vol_size, user_id, project_id) def test_too_many_volumes(self): @@ -68,26 +68,26 @@ class VolumeTestCase(test.TrialTestCase): total_slots = FLAGS.slots_per_shelf * num_shelves vols = [] for i in xrange(total_slots): - vid = self.mystorage.create_volume(vol_size, user_id, project_id) + vid = self.volume.create_volume(vol_size, user_id, project_id) vols.append(vid) - self.assertRaises(volumenode.NoMoreVolumes, - self.mystorage.create_volume, + self.assertRaises(volumeservice.NoMoreVolumes, + self.volume.create_volume, vol_size, user_id, project_id) for id in vols: - self.mystorage.delete_volume(id) + self.volume.delete_volume(id) def test_run_attach_detach_volume(self): - # Create one volume and one node to test with + # Create one volume and one compute to test with instance_id = "storage-test" vol_size = "5" user_id = "fake" project_id = 'fake' mountpoint = "/dev/sdf" - volume_id = self.mystorage.create_volume(vol_size, user_id, project_id) + volume_id = self.volume.create_volume(vol_size, user_id, project_id) - volume_obj = volumenode.get_volume(volume_id) + volume_obj = volumeservice.get_volume(volume_id) volume_obj.start_attach(instance_id, mountpoint) - rv = yield self.mynode.attach_volume(volume_id, + rv = yield self.compute.attach_volume(volume_id, instance_id, mountpoint) self.assertEqual(volume_obj['status'], "in-use") @@ -96,16 +96,16 @@ class VolumeTestCase(test.TrialTestCase): self.assertEqual(volume_obj['mountpoint'], mountpoint) self.assertRaises(exception.Error, - self.mystorage.delete_volume, + self.volume.delete_volume, volume_id) - rv = yield self.mystorage.detach_volume(volume_id) - volume_obj = volumenode.get_volume(volume_id) + rv = yield self.volume.detach_volume(volume_id) + volume_obj = volumeservice.get_volume(volume_id) self.assertEqual(volume_obj['status'], "available") - rv = self.mystorage.delete_volume(volume_id) + rv = self.volume.delete_volume(volume_id) self.assertRaises(exception.Error, - volumenode.get_volume, + volumeservice.get_volume, volume_id) def test_multi_node(self): diff --git a/nova/volume/volumenode.py b/nova/volume/volumenode.py deleted file mode 100644 index 6b4ad0d87..000000000 --- a/nova/volume/volumenode.py +++ /dev/null @@ -1,305 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Nova Storage manages creating, attaching, detaching, and -destroying persistent storage volumes, ala EBS. -Currently uses Ata-over-Ethernet. -""" - -import glob -import logging -import os -import shutil -import socket -import tempfile - -from twisted.application import service -from twisted.internet import defer - -from nova import datastore -from nova import exception -from nova import flags -from nova import node -from nova import process -from nova import utils -from nova import validate - - -FLAGS = flags.FLAGS -flags.DEFINE_string('storage_dev', '/dev/sdb', - 'Physical device to use for volumes') -flags.DEFINE_string('volume_group', 'nova-volumes', - 'Name for the VG that will contain exported volumes') -flags.DEFINE_string('aoe_eth_dev', 'eth0', - 'Which device to export the volumes on') -flags.DEFINE_string('storage_name', - socket.gethostname(), - 'name of this node') -flags.DEFINE_integer('first_shelf_id', - utils.last_octet(utils.get_my_ip()) * 10, - 'AoE starting shelf_id for this node') -flags.DEFINE_integer('last_shelf_id', - utils.last_octet(utils.get_my_ip()) * 10 + 9, - 'AoE starting shelf_id for this node') -flags.DEFINE_string('aoe_export_dir', - '/var/lib/vblade-persist/vblades', - 'AoE directory where exports are created') -flags.DEFINE_integer('slots_per_shelf', - 16, - 'Number of AoE slots per shelf') -flags.DEFINE_string('storage_availability_zone', - 'nova', - 'availability zone of this node') -flags.DEFINE_boolean('fake_storage', False, - 'Should we make real storage volumes to attach?') - - -class NoMoreVolumes(exception.Error): - pass - -def get_volume(volume_id): - """ Returns a redis-backed volume object """ - volume_class = Volume - if FLAGS.fake_storage: - volume_class = FakeVolume - if datastore.Redis.instance().sismember('volumes', volume_id): - return volume_class(volume_id=volume_id) - raise exception.Error("Volume does not exist") - -class VolumeNode(node.Node): - """ - There is one VolumeNode running on each host. - However, each VolumeNode can report on the state of - *all* volumes in the cluster. - """ - def __init__(self): - super(VolumeNode, self).__init__() - self.volume_class = Volume - if FLAGS.fake_storage: - FLAGS.aoe_export_dir = tempfile.mkdtemp() - self.volume_class = FakeVolume - self._init_volume_group() - - def __del__(self): - # TODO(josh): Get rid of this destructor, volumes destroy themselves - if FLAGS.fake_storage: - try: - shutil.rmtree(FLAGS.aoe_export_dir) - except Exception, err: - pass - - @validate.rangetest(size=(0, 1000)) - def create_volume(self, size, user_id, project_id): - """ - Creates an exported volume (fake or real), - restarts exports to make it available. - Volume at this point has size, owner, and zone. - """ - logging.debug("Creating volume of size: %s" % (size)) - vol = self.volume_class.create(size, user_id, project_id) - datastore.Redis.instance().sadd('volumes', vol['volume_id']) - datastore.Redis.instance().sadd('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) - self._restart_exports() - return vol['volume_id'] - - def by_node(self, node_id): - """ returns a list of volumes for a node """ - for volume_id in datastore.Redis.instance().smembers('volumes:%s' % (node_id)): - yield self.volume_class(volume_id=volume_id) - - @property - def all(self): - """ returns a list of all volumes """ - for volume_id in datastore.Redis.instance().smembers('volumes'): - yield self.volume_class(volume_id=volume_id) - - def delete_volume(self, volume_id): - logging.debug("Deleting volume with id of: %s" % (volume_id)) - vol = get_volume(volume_id) - if vol['status'] == "attached": - raise exception.Error("Volume is still attached") - if vol['node_name'] != FLAGS.storage_name: - raise exception.Error("Volume is not local to this node") - vol.destroy() - datastore.Redis.instance().srem('volumes', vol['volume_id']) - datastore.Redis.instance().srem('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) - return True - - @defer.inlineCallbacks - def _restart_exports(self): - if FLAGS.fake_storage: - return - yield process.simple_execute( - "sudo vblade-persist auto all") - yield process.simple_execute( - "sudo vblade-persist start all") - - @defer.inlineCallbacks - def _init_volume_group(self): - if FLAGS.fake_storage: - return - yield process.simple_execute( - "sudo pvcreate %s" % (FLAGS.storage_dev)) - yield process.simple_execute( - "sudo vgcreate %s %s" % (FLAGS.volume_group, - FLAGS.storage_dev)) - -class Volume(datastore.BasicModel): - - def __init__(self, volume_id=None): - self.volume_id = volume_id - super(Volume, self).__init__() - - @property - def identifier(self): - return self.volume_id - - def default_state(self): - return {"volume_id": self.volume_id} - - @classmethod - def create(cls, size, user_id, project_id): - volume_id = utils.generate_uid('vol') - vol = cls(volume_id) - vol['node_name'] = FLAGS.storage_name - vol['size'] = size - vol['user_id'] = user_id - vol['project_id'] = project_id - vol['availability_zone'] = FLAGS.storage_availability_zone - vol["instance_id"] = 'none' - vol["mountpoint"] = 'none' - vol['attach_time'] = 'none' - vol['status'] = "creating" # creating | available | in-use - vol['attach_status'] = "detached" # attaching | attached | detaching | detached - vol['delete_on_termination'] = 'False' - vol.save() - vol.create_lv() - vol._setup_export() - # TODO(joshua) - We need to trigger a fanout message for aoe-discover on all the nodes - # TODO(joshua - vol['status'] = "available" - vol.save() - return vol - - def start_attach(self, instance_id, mountpoint): - """ """ - self['instance_id'] = instance_id - self['mountpoint'] = mountpoint - self['status'] = "in-use" - self['attach_status'] = "attaching" - self['attach_time'] = utils.isotime() - self['delete_on_termination'] = 'False' - self.save() - - def finish_attach(self): - """ """ - self['attach_status'] = "attached" - self.save() - - def start_detach(self): - """ """ - self['attach_status'] = "detaching" - self.save() - - def finish_detach(self): - self['instance_id'] = None - self['mountpoint'] = None - self['status'] = "available" - self['attach_status'] = "detached" - self.save() - - def destroy(self): - try: - self._remove_export() - except: - pass - self._delete_lv() - super(Volume, self).destroy() - - @defer.inlineCallbacks - def create_lv(self): - if str(self['size']) == '0': - sizestr = '100M' - else: - sizestr = '%sG' % self['size'] - yield process.simple_execute( - "sudo lvcreate -L %s -n %s %s" % (sizestr, - self['volume_id'], - FLAGS.volume_group)) - - @defer.inlineCallbacks - def _delete_lv(self): - yield process.simple_execute( - "sudo lvremove -f %s/%s" % (FLAGS.volume_group, - self['volume_id'])) - - def _setup_export(self): - (shelf_id, blade_id) = get_next_aoe_numbers() - self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id) - self['shelf_id'] = shelf_id - self['blade_id'] = blade_id - self.save() - self._exec_export() - - @defer.inlineCallbacks - def _exec_export(self): - yield process.simple_execute( - "sudo vblade-persist setup %s %s %s /dev/%s/%s" % - (self['shelf_id'], - self['blade_id'], - FLAGS.aoe_eth_dev, - FLAGS.volume_group, - self['volume_id'])) - - @defer.inlineCallbacks - def _remove_export(self): - yield process.simple_execute( - "sudo vblade-persist stop %s %s" % (self['shelf_id'], - self['blade_id'])) - yield process.simple_execute( - "sudo vblade-persist destroy %s %s" % (self['shelf_id'], - self['blade_id'])) - - -class FakeVolume(Volume): - def create_lv(self): - pass - - def _exec_export(self): - fname = os.path.join(FLAGS.aoe_export_dir, self['aoe_device']) - f = file(fname, "w") - f.close() - - def _remove_export(self): - pass - - def _delete_lv(self): - pass - -def get_next_aoe_numbers(): - for shelf_id in xrange(FLAGS.first_shelf_id, FLAGS.last_shelf_id + 1): - aoes = glob.glob("%s/e%s.*" % (FLAGS.aoe_export_dir, shelf_id)) - if not aoes: - blade_id = 0 - else: - blade_id = int(max([int(a.rpartition('.')[2]) for a in aoes])) + 1 - if blade_id < FLAGS.slots_per_shelf: - logging.debug("Next shelf.blade is %s.%s", shelf_id, blade_id) - return (shelf_id, blade_id) - raise NoMoreVolumes() diff --git a/nova/volume/volumeservice.py b/nova/volume/volumeservice.py new file mode 100644 index 000000000..87a47f40a --- /dev/null +++ b/nova/volume/volumeservice.py @@ -0,0 +1,304 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Nova Storage manages creating, attaching, detaching, and +destroying persistent storage volumes, ala EBS. +Currently uses Ata-over-Ethernet. +""" + +import glob +import logging +import os +import shutil +import socket +import tempfile + +from twisted.internet import defer + +from nova import datastore +from nova import exception +from nova import flags +from nova import process +from nova import service +from nova import utils +from nova import validate + + +FLAGS = flags.FLAGS +flags.DEFINE_string('storage_dev', '/dev/sdb', + 'Physical device to use for volumes') +flags.DEFINE_string('volume_group', 'nova-volumes', + 'Name for the VG that will contain exported volumes') +flags.DEFINE_string('aoe_eth_dev', 'eth0', + 'Which device to export the volumes on') +flags.DEFINE_string('storage_name', + socket.gethostname(), + 'name of this service') +flags.DEFINE_integer('first_shelf_id', + utils.last_octet(utils.get_my_ip()) * 10, + 'AoE starting shelf_id for this service') +flags.DEFINE_integer('last_shelf_id', + utils.last_octet(utils.get_my_ip()) * 10 + 9, + 'AoE starting shelf_id for this service') +flags.DEFINE_string('aoe_export_dir', + '/var/lib/vblade-persist/vblades', + 'AoE directory where exports are created') +flags.DEFINE_integer('slots_per_shelf', + 16, + 'Number of AoE slots per shelf') +flags.DEFINE_string('storage_availability_zone', + 'nova', + 'availability zone of this service') +flags.DEFINE_boolean('fake_storage', False, + 'Should we make real storage volumes to attach?') + + +class NoMoreVolumes(exception.Error): + pass + +def get_volume(volume_id): + """ Returns a redis-backed volume object """ + volume_class = Volume + if FLAGS.fake_storage: + volume_class = FakeVolume + if datastore.Redis.instance().sismember('volumes', volume_id): + return volume_class(volume_id=volume_id) + raise exception.Error("Volume does not exist") + +class VolumeService(service.Service): + """ + There is one VolumeNode running on each host. + However, each VolumeNode can report on the state of + *all* volumes in the cluster. + """ + def __init__(self): + super(VolumeService, self).__init__() + self.volume_class = Volume + if FLAGS.fake_storage: + FLAGS.aoe_export_dir = tempfile.mkdtemp() + self.volume_class = FakeVolume + self._init_volume_group() + + def __del__(self): + # TODO(josh): Get rid of this destructor, volumes destroy themselves + if FLAGS.fake_storage: + try: + shutil.rmtree(FLAGS.aoe_export_dir) + except Exception, err: + pass + + @validate.rangetest(size=(0, 1000)) + def create_volume(self, size, user_id, project_id): + """ + Creates an exported volume (fake or real), + restarts exports to make it available. + Volume at this point has size, owner, and zone. + """ + logging.debug("Creating volume of size: %s" % (size)) + vol = self.volume_class.create(size, user_id, project_id) + datastore.Redis.instance().sadd('volumes', vol['volume_id']) + datastore.Redis.instance().sadd('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) + self._restart_exports() + return vol['volume_id'] + + def by_node(self, node_id): + """ returns a list of volumes for a node """ + for volume_id in datastore.Redis.instance().smembers('volumes:%s' % (node_id)): + yield self.volume_class(volume_id=volume_id) + + @property + def all(self): + """ returns a list of all volumes """ + for volume_id in datastore.Redis.instance().smembers('volumes'): + yield self.volume_class(volume_id=volume_id) + + def delete_volume(self, volume_id): + logging.debug("Deleting volume with id of: %s" % (volume_id)) + vol = get_volume(volume_id) + if vol['status'] == "attached": + raise exception.Error("Volume is still attached") + if vol['node_name'] != FLAGS.storage_name: + raise exception.Error("Volume is not local to this node") + vol.destroy() + datastore.Redis.instance().srem('volumes', vol['volume_id']) + datastore.Redis.instance().srem('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) + return True + + @defer.inlineCallbacks + def _restart_exports(self): + if FLAGS.fake_storage: + return + yield process.simple_execute( + "sudo vblade-persist auto all") + yield process.simple_execute( + "sudo vblade-persist start all") + + @defer.inlineCallbacks + def _init_volume_group(self): + if FLAGS.fake_storage: + return + yield process.simple_execute( + "sudo pvcreate %s" % (FLAGS.storage_dev)) + yield process.simple_execute( + "sudo vgcreate %s %s" % (FLAGS.volume_group, + FLAGS.storage_dev)) + +class Volume(datastore.BasicModel): + + def __init__(self, volume_id=None): + self.volume_id = volume_id + super(Volume, self).__init__() + + @property + def identifier(self): + return self.volume_id + + def default_state(self): + return {"volume_id": self.volume_id} + + @classmethod + def create(cls, size, user_id, project_id): + volume_id = utils.generate_uid('vol') + vol = cls(volume_id) + vol['node_name'] = FLAGS.storage_name + vol['size'] = size + vol['user_id'] = user_id + vol['project_id'] = project_id + vol['availability_zone'] = FLAGS.storage_availability_zone + vol["instance_id"] = 'none' + vol["mountpoint"] = 'none' + vol['attach_time'] = 'none' + vol['status'] = "creating" # creating | available | in-use + vol['attach_status'] = "detached" # attaching | attached | detaching | detached + vol['delete_on_termination'] = 'False' + vol.save() + vol.create_lv() + vol._setup_export() + # TODO(joshua) - We need to trigger a fanout message for aoe-discover on all the nodes + # TODO(joshua + vol['status'] = "available" + vol.save() + return vol + + def start_attach(self, instance_id, mountpoint): + """ """ + self['instance_id'] = instance_id + self['mountpoint'] = mountpoint + self['status'] = "in-use" + self['attach_status'] = "attaching" + self['attach_time'] = utils.isotime() + self['delete_on_termination'] = 'False' + self.save() + + def finish_attach(self): + """ """ + self['attach_status'] = "attached" + self.save() + + def start_detach(self): + """ """ + self['attach_status'] = "detaching" + self.save() + + def finish_detach(self): + self['instance_id'] = None + self['mountpoint'] = None + self['status'] = "available" + self['attach_status'] = "detached" + self.save() + + def destroy(self): + try: + self._remove_export() + except: + pass + self._delete_lv() + super(Volume, self).destroy() + + @defer.inlineCallbacks + def create_lv(self): + if str(self['size']) == '0': + sizestr = '100M' + else: + sizestr = '%sG' % self['size'] + yield process.simple_execute( + "sudo lvcreate -L %s -n %s %s" % (sizestr, + self['volume_id'], + FLAGS.volume_group)) + + @defer.inlineCallbacks + def _delete_lv(self): + yield process.simple_execute( + "sudo lvremove -f %s/%s" % (FLAGS.volume_group, + self['volume_id'])) + + def _setup_export(self): + (shelf_id, blade_id) = get_next_aoe_numbers() + self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id) + self['shelf_id'] = shelf_id + self['blade_id'] = blade_id + self.save() + self._exec_export() + + @defer.inlineCallbacks + def _exec_export(self): + yield process.simple_execute( + "sudo vblade-persist setup %s %s %s /dev/%s/%s" % + (self['shelf_id'], + self['blade_id'], + FLAGS.aoe_eth_dev, + FLAGS.volume_group, + self['volume_id'])) + + @defer.inlineCallbacks + def _remove_export(self): + yield process.simple_execute( + "sudo vblade-persist stop %s %s" % (self['shelf_id'], + self['blade_id'])) + yield process.simple_execute( + "sudo vblade-persist destroy %s %s" % (self['shelf_id'], + self['blade_id'])) + + +class FakeVolume(Volume): + def create_lv(self): + pass + + def _exec_export(self): + fname = os.path.join(FLAGS.aoe_export_dir, self['aoe_device']) + f = file(fname, "w") + f.close() + + def _remove_export(self): + pass + + def _delete_lv(self): + pass + +def get_next_aoe_numbers(): + for shelf_id in xrange(FLAGS.first_shelf_id, FLAGS.last_shelf_id + 1): + aoes = glob.glob("%s/e%s.*" % (FLAGS.aoe_export_dir, shelf_id)) + if not aoes: + blade_id = 0 + else: + blade_id = int(max([int(a.rpartition('.')[2]) for a in aoes])) + 1 + if blade_id < FLAGS.slots_per_shelf: + logging.debug("Next shelf.blade is %s.%s", shelf_id, blade_id) + return (shelf_id, blade_id) + raise NoMoreVolumes() -- cgit From 01dffdd79098429a448cf283119d9026ae1231c2 Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Sun, 25 Jul 2010 14:29:20 +0200 Subject: Always make sure to set a Date headers, since it's needed to calculate the S3 Auth header. --- nova/compute/node.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/nova/compute/node.py b/nova/compute/node.py index 7c09d9583..a01cb3070 100644 --- a/nova/compute/node.py +++ b/nova/compute/node.py @@ -47,12 +47,12 @@ from nova import flags from nova import process from nova import utils from nova.auth import signer +from nova.auth.users import UserManager from nova.compute import disk from nova.compute import model from nova.compute import network from nova.objectstore import image # for image_path flag from nova.volume import storage -from nova.users import UserManager FLAGS = flags.FLAGS @@ -449,12 +449,18 @@ class Instance(object): def _fetch_s3_image(self, image, path): url = _image_url('%s/image' % image) + headers = {} + headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) + user_id = self.datamodel['user_id'] user = UserManager.instance().get_user(user_id) - auth = signer.Signer(user.secret.encode()).s3_authorization({}, 'GET', url) - auth_header = 'Authorization: %s:%s' % (user_id, auth) - d = process.simple_execute( - 'curl --silent %s -o "%s"' % (url, auth_header, path)) + auth = signer.Signer(user.secret.encode()).s3_authorization(headers, 'GET', url) + headers['Authorization'] = auth + + headers_opt_string = ' '.join(['-H %s:%s' % (k,v) for (k,v) in headers.iteritems()]) + d = process.simple_execute('curl --silent %s ' + '%s -o "%s"' % (url, headers_opt_string, + path)) return d def _fetch_local_image(self, image, path): -- cgit From dd17c810029d142abaa8f0c97e30a8ae1dc13b5c Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Sun, 25 Jul 2010 16:17:55 +0200 Subject: Return a 404 when attempting to access a bucket that does not exist. --- nova/objectstore/handler.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nova/objectstore/handler.py b/nova/objectstore/handler.py index 558b04881..1a54f8a98 100644 --- a/nova/objectstore/handler.py +++ b/nova/objectstore/handler.py @@ -47,7 +47,7 @@ import urllib from twisted.application import internet, service from twisted.web.resource import Resource -from twisted.web import server, static +from twisted.web import server, static, error from nova import exception @@ -150,7 +150,10 @@ class BucketResource(Resource): def render_GET(self, request): logging.debug("List keys for bucket %s" % (self.name)) - bucket_object = bucket.Bucket(self.name) + try: + bucket_object = bucket.Bucket(self.name) + except exception.NotFound, e: + return error.NoResource(message="No such bucket").render(request) if not bucket_object.is_authorized(request.context): raise exception.NotAuthorized -- cgit From d5051bf90909f693a74ea0f2bdc3db341460a5a3 Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Mon, 26 Jul 2010 16:03:23 +0200 Subject: Add a simple set of tests for S3 API (using boto). --- nova/objectstore/handler.py | 14 ++-- nova/tests/objectstore_unittest.py | 131 ++++++++++++++++++++++++++++--------- 2 files changed, 105 insertions(+), 40 deletions(-) diff --git a/nova/objectstore/handler.py b/nova/objectstore/handler.py index 1a54f8a98..45eeef163 100644 --- a/nova/objectstore/handler.py +++ b/nova/objectstore/handler.py @@ -108,8 +108,6 @@ def get_context(request): raise exception.NotAuthorized access, sep, secret = authorization_header.split(' ')[1].rpartition(':') um = users.UserManager.instance() - print 'um %s' % um - (user, project) = um.authenticate(access, secret, {}, request.method, request.getRequestHostname(), request.uri, headers=request.getAllHeaders(), check_type='s3') return api.APIRequestContext(None, user, project) except exception.Error as ex: @@ -169,10 +167,6 @@ class BucketResource(Resource): def render_PUT(self, request): logging.debug("Creating bucket %s" % (self.name)) - try: - print 'user is %s' % request.context - except Exception as e: - logging.exception(e) logging.debug("calling bucket.Bucket.create(%r, %r)" % (self.name, request.context)) bucket.Bucket.create(self.name, request.context) request.finish() @@ -300,9 +294,13 @@ class ImageResource(Resource): request.setResponseCode(204) return '' -def get_application(): +def get_site(): root = S3() - factory = server.Site(root) + site = server.Site(root) + return site + +def get_application(): + factory = get_site() application = service.Application("objectstore") objectStoreService = internet.TCPServer(FLAGS.s3_port, factory) objectStoreService.setServiceParent(application) diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py index f47ca7f00..ef1a477ff 100644 --- a/nova/tests/objectstore_unittest.py +++ b/nova/tests/objectstore_unittest.py @@ -16,6 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. +import boto import glob import hashlib import logging @@ -27,7 +28,11 @@ from nova import flags from nova import objectstore from nova import test from nova.auth import users +from nova.objectstore.handler import S3 +from boto.s3.connection import S3Connection, OrdinaryCallingFormat +from twisted.internet import reactor, threads, defer +from twisted.web import http, server FLAGS = flags.FLAGS @@ -169,35 +174,97 @@ class ObjectStoreTestCase(test.BaseTestCase): self.context.project = self.um.get_project('proj2') self.assert_(my_img.is_authorized(self.context) == False) -# class ApiObjectStoreTestCase(test.BaseTestCase): -# def setUp(self): -# super(ApiObjectStoreTestCase, self).setUp() -# FLAGS.fake_users = True -# FLAGS.buckets_path = os.path.join(tempdir, 'buckets') -# FLAGS.images_path = os.path.join(tempdir, 'images') -# FLAGS.ca_path = os.path.join(os.path.dirname(__file__), 'CA') -# -# self.users = users.UserManager.instance() -# self.app = handler.Application(self.users) -# -# self.host = '127.0.0.1' -# -# self.conn = boto.s3.connection.S3Connection( -# aws_access_key_id=user.access, -# aws_secret_access_key=user.secret, -# is_secure=False, -# calling_format=boto.s3.connection.OrdinaryCallingFormat(), -# port=FLAGS.s3_port, -# host=FLAGS.s3_host) -# -# self.mox.StubOutWithMock(self.ec2, 'new_http_connection') -# -# def tearDown(self): -# FLAGS.Reset() -# super(ApiObjectStoreTestCase, self).tearDown() -# -# def test_describe_instances(self): -# self.expect_http() -# self.mox.ReplayAll() -# -# self.assertEqual(self.ec2.get_all_instances(), []) + +class TestHTTPChannel(http.HTTPChannel): + # Otherwise we end up with an unclean reactor + def checkPersistence(self, _, __): + return False + + +class TestSite(server.Site): + protocol = TestHTTPChannel + + +class S3APITestCase(test.TrialTestCase): + def setUp(self): + super(S3APITestCase, self).setUp() + FLAGS.fake_users = True + FLAGS.buckets_path = os.path.join(oss_tempdir, 'buckets') + + shutil.rmtree(FLAGS.buckets_path) + os.mkdir(FLAGS.buckets_path) + + root = S3() + self.site = TestSite(root) + self.listening_port = reactor.listenTCP(0, self.site, interface='127.0.0.1') + self.tcp_port = self.listening_port.getHost().port + + + boto.config.set('Boto', 'num_retries', '0') + self.conn = S3Connection(aws_access_key_id='admin', + aws_secret_access_key='admin', + host='127.0.0.1', + port=self.tcp_port, + is_secure=False, + calling_format=OrdinaryCallingFormat()) + + # Don't attempt to reuse connections + def get_http_connection(host, is_secure): + return self.conn.new_http_connection(host, is_secure) + self.conn.get_http_connection = get_http_connection + + def _ensure_empty_list(self, l): + self.assertEquals(len(l), 0, "List was not empty") + return True + + def _ensure_only_bucket(self, l, name): + self.assertEquals(len(l), 1, "List didn't have exactly one element in it") + self.assertEquals(l[0].name, name, "Wrong name") + + def test_000_list_buckets(self): + d = threads.deferToThread(self.conn.get_all_buckets) + d.addCallback(self._ensure_empty_list) + return d + + def test_001_create_and_delete_bucket(self): + bucket_name = 'testbucket' + + d = threads.deferToThread(self.conn.create_bucket, bucket_name) + d.addCallback(lambda _:threads.deferToThread(self.conn.get_all_buckets)) + + def ensure_only_bucket(l, name): + self.assertEquals(len(l), 1, "List didn't have exactly one element in it") + self.assertEquals(l[0].name, name, "Wrong name") + d.addCallback(ensure_only_bucket, bucket_name) + + d.addCallback(lambda _:threads.deferToThread(self.conn.delete_bucket, bucket_name)) + d.addCallback(lambda _:threads.deferToThread(self.conn.get_all_buckets)) + d.addCallback(self._ensure_empty_list) + return d + + def test_002_create_bucket_and_key_and_delete_key_again(self): + bucket_name = 'testbucket' + key_name = 'somekey' + key_contents = 'somekey' + + d = threads.deferToThread(self.conn.create_bucket, bucket_name) + d.addCallback(lambda b:threads.deferToThread(b.new_key, key_name)) + d.addCallback(lambda k:threads.deferToThread(k.set_contents_from_string, key_contents)) + def ensure_key_contents(bucket_name, key_name, contents): + bucket = self.conn.get_bucket(bucket_name) + key = bucket.get_key(key_name) + self.assertEquals(key.get_contents_as_string(), contents, "Bad contents") + d.addCallback(lambda _:threads.deferToThread(ensure_key_contents, bucket_name, key_name, key_contents)) + def delete_key(bucket_name, key_name): + bucket = self.conn.get_bucket(bucket_name) + key = bucket.get_key(key_name) + key.delete() + d.addCallback(lambda _:threads.deferToThread(delete_key, bucket_name, key_name)) + d.addCallback(lambda _:threads.deferToThread(self.conn.get_bucket, bucket_name)) + d.addCallback(lambda b:threads.deferToThread(b.get_all_keys)) + d.addCallback(self._ensure_empty_list) + return d + + def tearDown(self): + super(S3APITestCase, self).tearDown() + return defer.DeferredList([defer.maybeDeferred(self.listening_port.stopListening)]) -- cgit From 6d3331e831ec147916d0d605958f61ca69eee9a8 Mon Sep 17 00:00:00 2001 From: andy Date: Mon, 26 Jul 2010 23:16:49 +0200 Subject: Move virtualenv installation out of the makefile. Also adds some tools for dealing with virtualenvs to the tools directory. --- .bzrignore | 1 + .gitignore | 1 + Makefile | 28 +++++++-------- tools/activate_venv.sh | 3 ++ tools/install_venv.py | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++ tools/pip-requires | 3 +- tools/with_venv.sh | 4 +++ 7 files changed, 120 insertions(+), 15 deletions(-) create mode 100644 tools/activate_venv.sh create mode 100644 tools/install_venv.py create mode 100755 tools/with_venv.sh diff --git a/.bzrignore b/.bzrignore index 93fc868a3..c3a502a1a 100644 --- a/.bzrignore +++ b/.bzrignore @@ -1 +1,2 @@ run_tests.err.log +.nova-venv diff --git a/.gitignore b/.gitignore index 9db87ac29..2afc7a32c 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ keys build/* build-stamp nova.egg-info +.nova-venv diff --git a/Makefile b/Makefile index da69f2b72..5fb512610 100644 --- a/Makefile +++ b/Makefile @@ -1,27 +1,27 @@ -venv=.venv -with_venv=source $(venv)/bin/activate -installed=$(venv)/lib/python2.6/site-packages -twisted=$(installed)/twisted/__init__.py +venv=.nova-venv +with_venv=tools/with_venv.sh +build: + # Nothing to do -test: python-dependencies $(twisted) - $(with_venv) && python run_tests.py +test: $(venv) + $(with_venv) python run_tests.py + +test-system: + python run_tests.py clean: rm -rf _trial_temp rm -rf keys rm -rf instances rm -rf networks + rm run_tests.err.log clean-all: clean rm -rf $(venv) -python-dependencies: $(venv) - pip install -q -E $(venv) -r tools/pip-requires - $(venv): - pip install -q virtualenv - virtualenv -q --no-site-packages $(venv) - -$(twisted): - pip install -q -E $(venv) http://nova.openstack.org/Twisted-10.0.0Nova.tar.gz + @echo "You need to install the Nova virtualenv before you can run this." + @echo "" + @echo "Please run tools/install_venv.py" + @exit 1 diff --git a/tools/activate_venv.sh b/tools/activate_venv.sh new file mode 100644 index 000000000..d12cb3491 --- /dev/null +++ b/tools/activate_venv.sh @@ -0,0 +1,3 @@ +_TOOLS=`dirname $0` +_VENV=$_TOOLS/../.nova-venv +source $_VENV/bin/activate diff --git a/tools/install_venv.py b/tools/install_venv.py new file mode 100644 index 000000000..720dcc0d9 --- /dev/null +++ b/tools/install_venv.py @@ -0,0 +1,95 @@ +""" +Installation script for Nova's development virtualenv +""" + +import os +import subprocess +import sys +import textwrap + + +ROOT = os.path.dirname(os.path.dirname(__file__)) +VENV = os.path.join(ROOT, '.nova-venv') +PIP_REQUIRES = os.path.join(ROOT, 'tools', 'pip-requires') +TWISTED_NOVA='http://nova.openstack.org/Twisted-10.0.0Nova.tar.gz' + + +def die(message, *args): + print >>sys.stderr, message % args + sys.exit(1) + + +def run_command(cmd, redirect_output=True, error_ok=False): + # Useful for debugging: + #print >>sys.stderr, ' '.join(cmd) + if redirect_output: + stdout = subprocess.PIPE + else: + stdout = None + + proc = subprocess.Popen(cmd, stdout=stdout) + output = proc.communicate()[0] + if not error_ok and proc.returncode != 0: + die('Command "%s" failed.\n%s', ' '.join(cmd), output) + return output + + +def check_dependencies(): + """Make sure pip and virtualenv are on the path.""" + print 'Checking for pip...', + if not run_command(['which', 'pip']).strip(): + die('ERROR: pip not found.\n\nNova development requires pip,' + ' please install it using your favorite package management tool') + print 'done.' + + print 'Checking for virtualenv...', + if not run_command(['which', 'virtualenv']).strip(): + die('ERROR: virtualenv not found.\n\nNova development requires virtualenv,' + ' please install it using your favorite package management tool') + print 'done.' + + +def create_virtualenv(venv=VENV): + print 'Creating venv...', + run_command(['virtualenv', '-q', '--no-site-packages', VENV]) + print 'done.' + + +def install_dependencies(venv=VENV): + print 'Installing dependencies with pip (this can take a while)...' + run_command(['pip', 'install', '-E', venv, '-r', PIP_REQUIRES], + redirect_output=False) + run_command(['pip', 'install', '-E', venv, TWISTED_NOVA], + redirect_output=False) + + +def print_help(): + help = """ + Nova development environment setup is complete. + + Nova development uses virtualenv to track and manage Python dependencies + while in development and testing. + + To activate the Nova virtualenv for the extent of your current shell session + you can run: + + $ source tools/activate_venv.sh + + Or, if you prefer, you can run commands in the virtualenv on a case by case + basis by running: + + $ tools/with_venv.sh + + Also, run_tests.sh will automatically use the virtualenv. + """ + print help + + +def main(argv): + check_dependencies() + create_virtualenv() + install_dependencies() + print_help() + +if __name__ == '__main__': + main(sys.argv) diff --git a/tools/pip-requires b/tools/pip-requires index edb5fc01c..4eb47ca2b 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -5,10 +5,11 @@ anyjson==0.2.4 boto==2.0b1 carrot==0.10.5 lockfile==0.8 -mox==0.5.0 python-daemon==1.5.5 python-gflags==1.3 redis==2.0.0 tornado==1.0 wsgiref==0.1.2 zope.interface==3.6.1 +mox==0.5.0 +-f http://pymox.googlecode.com/files/mox-0.5.0.tar.gz diff --git a/tools/with_venv.sh b/tools/with_venv.sh new file mode 100755 index 000000000..51468a334 --- /dev/null +++ b/tools/with_venv.sh @@ -0,0 +1,4 @@ +#!/bin/sh +TOOLS=`dirname $0` +VENV=$TOOLS/../.nova-venv +source $VENV/bin/activate && $@ -- cgit From bed760dcc4dbdf1927c41cfff325cc56102ef962 Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Mon, 26 Jul 2010 23:26:54 +0200 Subject: Make image downloads work again in S3 handler. Listing worked, but fetching the images failed because I wasn't clever enough to use twisted.web.static.File correctly. --- nova/objectstore/handler.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/nova/objectstore/handler.py b/nova/objectstore/handler.py index 6e76abb5a..4074f005d 100644 --- a/nova/objectstore/handler.py +++ b/nova/objectstore/handler.py @@ -121,7 +121,7 @@ class S3(Resource): if name == '': return self elif name == '_images': - return ImageResource() + return ImagesResource() else: return BucketResource(name) @@ -226,13 +226,21 @@ class ObjectResource(Resource): return '' class ImageResource(Resource): + isLeaf = True + + def __init__(self, name): + Resource.__init__(self) + self.img = image.Image(name) + + def render_GET(self, request): + return static.File(self.img.image_path, defaultType='application/octet-stream').render_GET(request) + +class ImagesResource(Resource): def getChild(self, name, request): if name == '': return self else: - request.setHeader("Content-Type", "application/octet-stream") - img = image.Image(name) - return static.File(img.image_path) + return ImageResource(name) def render_GET(self, request): """ returns a json listing of all images -- cgit From 69cbb1ac3559c44f6640939cd2e1db64e82073fe Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Mon, 26 Jul 2010 23:27:42 +0200 Subject: Make _fetch_s3_image pass proper AWS Authorization headers so that image downloads work again. --- nova/compute/node.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/nova/compute/node.py b/nova/compute/node.py index 7b86ca749..a44583c06 100644 --- a/nova/compute/node.py +++ b/nova/compute/node.py @@ -31,6 +31,7 @@ import logging import os import shutil import sys +import time from twisted.internet import defer from twisted.internet import task from twisted.application import service @@ -453,19 +454,25 @@ class Instance(object): def _fetch_s3_image(self, image, path): url = _image_url('%s/image' % image) + + # This should probably move somewhere else, like e.g. a download_as + # method on User objects and at the same time get rewritten to use + # twisted web client. headers = {} headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) user_id = self.datamodel['user_id'] user = UserManager.instance().get_user(user_id) - auth = signer.Signer(user.secret.encode()).s3_authorization(headers, 'GET', url) - headers['Authorization'] = auth + uri = '/' + url.partition('/')[2] + auth = signer.Signer(user.secret.encode()).s3_authorization(headers, 'GET', uri) + headers['Authorization'] = 'AWS %s:%s' % (user.access, auth) - headers_opt_string = ' '.join(['-H %s:%s' % (k,v) for (k,v) in headers.iteritems()]) - d = process.simple_execute('curl --silent %s ' - '%s -o "%s"' % (url, headers_opt_string, - path)) - return d + cmd = ['/usr/bin/curl', '--silent', url] + for (k,v) in headers.iteritems(): + cmd += ['-H', '%s: %s' % (k,v)] + + cmd += ['-o', path] + return process.SharedPool().execute(executable=cmd[0], args=cmd[1:]) def _fetch_local_image(self, image, path): source = _image_path('%s/image' % image) -- cgit From 0f5b95575afeb14fea80bead6720bf510f5358f5 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Mon, 26 Jul 2010 14:41:51 -0700 Subject: moved misnamed nova-dchp file --- debian/nova-dhcp.conf | 2 -- debian/nova-dhcpbridge.conf | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) delete mode 100644 debian/nova-dhcp.conf create mode 100644 debian/nova-dhcpbridge.conf diff --git a/debian/nova-dhcp.conf b/debian/nova-dhcp.conf deleted file mode 100644 index 0aafe7549..000000000 --- a/debian/nova-dhcp.conf +++ /dev/null @@ -1,2 +0,0 @@ ---networks_path=/var/lib/nova/networks ---fake_users=1 diff --git a/debian/nova-dhcpbridge.conf b/debian/nova-dhcpbridge.conf new file mode 100644 index 000000000..0aafe7549 --- /dev/null +++ b/debian/nova-dhcpbridge.conf @@ -0,0 +1,2 @@ +--networks_path=/var/lib/nova/networks +--fake_users=1 -- cgit From ffe52b8660123335e425c52eb3bebba2e3d2e42f Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Mon, 26 Jul 2010 14:48:08 -0700 Subject: default flag file full path --- nova/compute/linux_net.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nova/compute/linux_net.py b/nova/compute/linux_net.py index 48e07da66..861ce779b 100644 --- a/nova/compute/linux_net.py +++ b/nova/compute/linux_net.py @@ -29,7 +29,7 @@ from nova import flags FLAGS=flags.FLAGS flags.DEFINE_string('dhcpbridge_flagfile', - '/etc/nova-dhcpbridge.conf', + '/etc/nova/nova-dhcpbridge.conf', 'location of flagfile for dhcpbridge') def execute(cmd, addl_env=None): -- cgit From 754db8ef1ceb84fa9a1f44bfc6c5c6bbd99cd7e1 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Mon, 26 Jul 2010 17:14:28 -0700 Subject: renamed xxxservice to service --- bin/nova-compute | 4 +- bin/nova-network | 4 +- bin/nova-volume | 4 +- nova/compute/computeservice.py | 581 ----------------------------------------- nova/compute/service.py | 581 +++++++++++++++++++++++++++++++++++++++++ nova/endpoint/cloud.py | 12 +- nova/network/networkservice.py | 35 --- nova/network/service.py | 35 +++ nova/tests/cloud_unittest.py | 4 +- nova/tests/compute_unittest.py | 4 +- nova/tests/volume_unittest.py | 20 +- nova/volume/service.py | 304 +++++++++++++++++++++ nova/volume/volumeservice.py | 304 --------------------- 13 files changed, 946 insertions(+), 946 deletions(-) delete mode 100644 nova/compute/computeservice.py create mode 100644 nova/compute/service.py delete mode 100644 nova/network/networkservice.py create mode 100644 nova/network/service.py create mode 100644 nova/volume/service.py delete mode 100644 nova/volume/volumeservice.py diff --git a/bin/nova-compute b/bin/nova-compute index 7ef5d074a..e0c12354f 100755 --- a/bin/nova-compute +++ b/bin/nova-compute @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.compute import computeservice +from nova.compute import service if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = computeservice.ComputeService.create() + application = service.ComputeService.create() diff --git a/bin/nova-network b/bin/nova-network index 0d3aa0002..52d6cb70a 100755 --- a/bin/nova-network +++ b/bin/nova-network @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.network import networkservice +from nova.network import service if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = networkservice.NetworkService.create() + application = service.NetworkService.create() diff --git a/bin/nova-volume b/bin/nova-volume index c1c0163cf..f7a8fad37 100755 --- a/bin/nova-volume +++ b/bin/nova-volume @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.volume import volumeservice +from nova.volume import service if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = volumeservice.VolumeService.create() + application = service.VolumeService.create() diff --git a/nova/compute/computeservice.py b/nova/compute/computeservice.py deleted file mode 100644 index 5568e3888..000000000 --- a/nova/compute/computeservice.py +++ /dev/null @@ -1,581 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Compute Service: - - Runs on each compute host, managing the - hypervisor using libvirt. - -""" - -import base64 -import json -import logging -import os -import shutil -import sys -from twisted.internet import defer -from twisted.internet import task - - -try: - import libvirt -except Exception, err: - logging.warning('no libvirt found') - -from nova import exception -from nova import fakevirt -from nova import flags -from nova import process -from nova import service -from nova import utils -from nova.compute import disk -from nova.compute import model -from nova.compute import network -from nova.objectstore import image # for image_path flag -from nova.volume import volumeservice - - -FLAGS = flags.FLAGS -flags.DEFINE_string('libvirt_xml_template', - utils.abspath('compute/libvirt.xml.template'), - 'Libvirt XML Template') -flags.DEFINE_bool('use_s3', True, - 'whether to get images from s3 or use local copy') -flags.DEFINE_string('instances_path', utils.abspath('../instances'), - 'where instances are stored on disk') - -INSTANCE_TYPES = {} -INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} -INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} -INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} -INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} - - -def _image_path(path=''): - return os.path.join(FLAGS.images_path, path) - - -def _image_url(path): - return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) - - -class ComputeService(service.Service): - """ - Manages the running instances. - """ - def __init__(self): - """ load configuration options for this node and connect to libvirt """ - super(ComputeService, self).__init__() - self._instances = {} - self._conn = self._get_connection() - self.instdir = model.InstanceDirectory() - # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe - - def _get_connection(self): - """ returns a libvirt connection object """ - # TODO(termie): maybe lazy load after initial check for permissions - # TODO(termie): check whether we can be disconnected - if FLAGS.fake_libvirt: - conn = fakevirt.FakeVirtConnection.instance() - else: - auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], - 'root', - None] - conn = libvirt.openAuth('qemu:///system', auth, 0) - if conn == None: - logging.error('Failed to open connection to the hypervisor') - sys.exit(1) - return conn - - def noop(self): - """ simple test of an AMQP message call """ - return defer.succeed('PONG') - - def get_instance(self, instance_id): - # inst = self.instdir.get(instance_id) - # return inst - if self.instdir.exists(instance_id): - return Instance.fromName(self._conn, instance_id) - return None - - @exception.wrap_exception - def adopt_instances(self): - """ if there are instances already running, adopt them """ - return defer.succeed(0) - instance_names = [self._conn.lookupByID(x).name() - for x in self._conn.listDomainsID()] - for name in instance_names: - try: - new_inst = Instance.fromName(self._conn, name) - new_inst.update_state() - except: - pass - return defer.succeed(len(self._instances)) - - @exception.wrap_exception - def describe_instances(self): - retval = {} - for inst in self.instdir.by_node(FLAGS.node_name): - retval[inst['instance_id']] = ( - Instance.fromName(self._conn, inst['instance_id'])) - return retval - - @defer.inlineCallbacks - def report_state(self, nodename, daemon): - # TODO(termie): make this pattern be more elegant. -todd - try: - record = model.Daemon(nodename, daemon) - record.heartbeat() - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - except model.ConnectionError, ex: - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - yield - - # @exception.wrap_exception - def run_instance(self, instance_id, **_kwargs): - """ launch a new instance with specified options """ - logging.debug("Starting instance %s..." % (instance_id)) - inst = self.instdir.get(instance_id) - if not FLAGS.simple_network: - # TODO: Get the real security group of launch in here - security_group = "default" - net = network.BridgedNetwork.get_network_for_project(inst['user_id'], - inst['project_id'], - security_group).express() - inst['node_name'] = FLAGS.node_name - inst.save() - # TODO(vish) check to make sure the availability zone matches - new_inst = Instance(self._conn, name=instance_id, data=inst) - logging.info("Instances current state is %s", new_inst.state) - if new_inst.is_running(): - raise exception.Error("Instance is already running") - d = new_inst.spawn() - return d - - @exception.wrap_exception - def terminate_instance(self, instance_id): - """ terminate an instance on this machine """ - logging.debug("Got told to terminate instance %s" % instance_id) - instance = self.get_instance(instance_id) - # inst = self.instdir.get(instance_id) - if not instance: - raise exception.Error( - 'trying to terminate unknown instance: %s' % instance_id) - d = instance.destroy() - # d.addCallback(lambda x: inst.destroy()) - return d - - @exception.wrap_exception - def reboot_instance(self, instance_id): - """ reboot an instance on this server - KVM doesn't support reboot, so we terminate and restart """ - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to reboot unknown instance: %s' % instance_id) - return instance.reboot() - - @defer.inlineCallbacks - @exception.wrap_exception - def get_console_output(self, instance_id): - """ send the console output for an instance """ - logging.debug("Getting console output for %s" % (instance_id)) - inst = self.instdir.get(instance_id) - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to get console log for unknown: %s' % instance_id) - rv = yield instance.console_output() - # TODO(termie): this stuff belongs in the API layer, no need to - # munge the data we send to ourselves - output = {"InstanceId" : instance_id, - "Timestamp" : "2", - "output" : base64.b64encode(rv)} - defer.returnValue(output) - - @defer.inlineCallbacks - @exception.wrap_exception - def attach_volume(self, instance_id = None, - volume_id = None, mountpoint = None): - volume = volumeservice.get_volume(volume_id) - yield self._init_aoe() - yield process.simple_execute( - "sudo virsh attach-disk %s /dev/etherd/%s %s" % - (instance_id, - volume['aoe_device'], - mountpoint.rpartition('/dev/')[2])) - volume.finish_attach() - defer.returnValue(True) - - @defer.inlineCallbacks - def _init_aoe(self): - yield process.simple_execute("sudo aoe-discover") - yield process.simple_execute("sudo aoe-stat") - - @defer.inlineCallbacks - @exception.wrap_exception - def detach_volume(self, instance_id, volume_id): - """ detach a volume from an instance """ - # despite the documentation, virsh detach-disk just wants the device - # name without the leading /dev/ - volume = volumeservice.get_volume(volume_id) - target = volume['mountpoint'].rpartition('/dev/')[2] - yield process.simple_execute( - "sudo virsh detach-disk %s %s " % (instance_id, target)) - volume.finish_detach() - defer.returnValue(True) - - -class Group(object): - def __init__(self, group_id): - self.group_id = group_id - - -class ProductCode(object): - def __init__(self, product_code): - self.product_code = product_code - - -class Instance(object): - - NOSTATE = 0x00 - RUNNING = 0x01 - BLOCKED = 0x02 - PAUSED = 0x03 - SHUTDOWN = 0x04 - SHUTOFF = 0x05 - CRASHED = 0x06 - - def __init__(self, conn, name, data): - """ spawn an instance with a given name """ - self._conn = conn - # TODO(vish): this can be removed after data has been updated - # data doesn't seem to have a working iterator so in doesn't work - if data.get('owner_id', None) is not None: - data['user_id'] = data['owner_id'] - data['project_id'] = data['owner_id'] - self.datamodel = data - - size = data.get('instance_type', FLAGS.default_instance_type) - if size not in INSTANCE_TYPES: - raise exception.Error('invalid instance type: %s' % size) - - self.datamodel.update(INSTANCE_TYPES[size]) - - self.datamodel['name'] = name - self.datamodel['instance_id'] = name - self.datamodel['basepath'] = data.get( - 'basepath', os.path.abspath( - os.path.join(FLAGS.instances_path, self.name))) - self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 - self.datamodel.setdefault('image_id', FLAGS.default_image) - self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) - self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) - self.datamodel.setdefault('project_id', self.datamodel['user_id']) - self.datamodel.setdefault('bridge_name', None) - #self.datamodel.setdefault('key_data', None) - #self.datamodel.setdefault('key_name', None) - #self.datamodel.setdefault('addressing_type', None) - - # TODO(joshua) - The ugly non-flat ones - self.datamodel['groups'] = data.get('security_group', 'default') - # TODO(joshua): Support product codes somehow - self.datamodel.setdefault('product_codes', None) - - self.datamodel.save() - logging.debug("Finished init of Instance with id of %s" % name) - - def toXml(self): - # TODO(termie): cache? - logging.debug("Starting the toXML method") - libvirt_xml = open(FLAGS.libvirt_xml_template).read() - xml_info = self.datamodel.copy() - # TODO(joshua): Make this xml express the attached disks as well - - # TODO(termie): lazy lazy hack because xml is annoying - xml_info['nova'] = json.dumps(self.datamodel.copy()) - libvirt_xml = libvirt_xml % xml_info - logging.debug("Finished the toXML method") - - return libvirt_xml - - @classmethod - def fromName(cls, conn, name): - """ use the saved data for reloading the instance """ - instdir = model.InstanceDirectory() - instance = instdir.get(name) - return cls(conn=conn, name=name, data=instance) - - def set_state(self, state_code, state_description=None): - self.datamodel['state'] = state_code - if not state_description: - state_description = STATE_NAMES[state_code] - self.datamodel['state_description'] = state_description - self.datamodel.save() - - @property - def state(self): - # it is a string in datamodel - return int(self.datamodel['state']) - - @property - def name(self): - return self.datamodel['name'] - - def is_pending(self): - return (self.state == Instance.NOSTATE or self.state == 'pending') - - def is_destroyed(self): - return self.state == Instance.SHUTOFF - - def is_running(self): - logging.debug("Instance state is: %s" % self.state) - return (self.state == Instance.RUNNING or self.state == 'running') - - def describe(self): - return self.datamodel - - def info(self): - logging.debug("Getting info for dom %s" % self.name) - virt_dom = self._conn.lookupByName(self.name) - (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() - return {'state': state, - 'max_mem': max_mem, - 'mem': mem, - 'num_cpu': num_cpu, - 'cpu_time': cpu_time, - 'node_name': FLAGS.node_name} - - def basepath(self, path=''): - return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) - - def update_state(self): - self.datamodel.update(self.info()) - self.set_state(self.state) - self.datamodel.save() # Extra, but harmless - - @exception.wrap_exception - def destroy(self): - if self.is_destroyed(): - self.datamodel.destroy() - raise exception.Error('trying to destroy already destroyed' - ' instance: %s' % self.name) - - self.set_state(Instance.NOSTATE, 'shutting_down') - try: - virt_dom = self._conn.lookupByName(self.name) - virt_dom.destroy() - except Exception, _err: - pass - # If the instance is already terminated, we're still happy - d = defer.Deferred() - d.addCallback(lambda x: self._cleanup()) - d.addCallback(lambda x: self.datamodel.destroy()) - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, - # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) - def _wait_for_shutdown(): - try: - self.update_state() - if self.state == Instance.SHUTDOWN: - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d - - def _cleanup(self): - target = os.path.abspath(self.datamodel['basepath']) - logging.info("Deleting instance files at %s", target) - shutil.rmtree(target) - - @defer.inlineCallbacks - @exception.wrap_exception - def reboot(self): - if not self.is_running(): - raise exception.Error( - 'trying to reboot a non-running' - 'instance: %s (state: %s)' % (self.name, self.state)) - - logging.debug('rebooting instance %s' % self.name) - self.set_state(Instance.NOSTATE, 'rebooting') - yield self._conn.lookupByName(self.name).destroy() - self._conn.createXML(self.toXml(), 0) - - d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_reboot(): - try: - self.update_state() - if self.is_running(): - logging.debug('rebooted instance %s' % self.name) - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d - - def _fetch_s3_image(self, image, path): - url = _image_url('%s/image' % image) - d = process.simple_execute( - 'curl --silent %s -o %s' % (url, path)) - return d - - def _fetch_local_image(self, image, path): - source = _image_path('%s/image' % image) - d = process.simple_execute('cp %s %s' % (source, path)) - return d - - @defer.inlineCallbacks - def _create_image(self, libvirt_xml): - # syntactic nicety - data = self.datamodel - basepath = self.basepath - - # ensure directories exist and are writable - yield process.simple_execute( - 'mkdir -p %s' % basepath()) - yield process.simple_execute( - 'chmod 0777 %s' % basepath()) - - - # TODO(termie): these are blocking calls, it would be great - # if they weren't. - logging.info('Creating image for: %s', data['instance_id']) - f = open(basepath('libvirt.xml'), 'w') - f.write(libvirt_xml) - f.close() - - if FLAGS.fake_libvirt: - logging.info('fake_libvirt, nothing to do for create_image') - raise defer.returnValue(None); - - if FLAGS.use_s3: - _fetch_file = self._fetch_s3_image - else: - _fetch_file = self._fetch_local_image - - if not os.path.exists(basepath('disk')): - yield _fetch_file(data['image_id'], basepath('disk-raw')) - if not os.path.exists(basepath('kernel')): - yield _fetch_file(data['kernel_id'], basepath('kernel')) - if not os.path.exists(basepath('ramdisk')): - yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) - - execute = lambda cmd, input=None: \ - process.simple_execute(cmd=cmd, - input=input, - error_ok=1) - - key = data['key_data'] - net = None - if FLAGS.simple_network: - with open(FLAGS.simple_network_template) as f: - net = f.read() % {'address': data['private_dns_name'], - 'network': FLAGS.simple_network_network, - 'netmask': FLAGS.simple_network_netmask, - 'gateway': FLAGS.simple_network_gateway, - 'broadcast': FLAGS.simple_network_broadcast, - 'dns': FLAGS.simple_network_dns} - if key or net: - logging.info('Injecting data into image %s', data['image_id']) - yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) - - if os.path.exists(basepath('disk')): - yield process.simple_execute( - 'rm -f %s' % basepath('disk')) - - bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] - * 1024 * 1024 * 1024) - yield disk.partition( - basepath('disk-raw'), basepath('disk'), bytes, execute=execute) - - @defer.inlineCallbacks - @exception.wrap_exception - def spawn(self): - self.set_state(Instance.NOSTATE, 'spawning') - logging.debug("Starting spawn in Instance") - - xml = self.toXml() - self.set_state(Instance.NOSTATE, 'launching') - logging.info('self %s', self) - try: - yield self._create_image(xml) - self._conn.createXML(xml, 0) - # TODO(termie): this should actually register - # a callback to check for successful boot - logging.debug("Instance is running") - - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_boot(): - try: - self.update_state() - if self.is_running(): - logging.debug('booted instance %s' % self.name) - timer.stop() - local_d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - logging.error('Failed to boot instance %s' % self.name) - timer.stop() - local_d.callback(None) - timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - except Exception, ex: - logging.debug(ex) - self.set_state(Instance.SHUTDOWN) - - @exception.wrap_exception - def console_output(self): - if not FLAGS.fake_libvirt: - fname = os.path.abspath( - os.path.join(self.datamodel['basepath'], 'console.log')) - with open(fname, 'r') as f: - console = f.read() - else: - console = 'FAKE CONSOLE OUTPUT' - return defer.succeed(console) - -STATE_NAMES = { - Instance.NOSTATE : 'pending', - Instance.RUNNING : 'running', - Instance.BLOCKED : 'blocked', - Instance.PAUSED : 'paused', - Instance.SHUTDOWN : 'shutdown', - Instance.SHUTOFF : 'shutdown', - Instance.CRASHED : 'crashed', -} diff --git a/nova/compute/service.py b/nova/compute/service.py new file mode 100644 index 000000000..02e35baa2 --- /dev/null +++ b/nova/compute/service.py @@ -0,0 +1,581 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Compute Service: + + Runs on each compute host, managing the + hypervisor using libvirt. + +""" + +import base64 +import json +import logging +import os +import shutil +import sys +from twisted.internet import defer +from twisted.internet import task + + +try: + import libvirt +except Exception, err: + logging.warning('no libvirt found') + +from nova import exception +from nova import fakevirt +from nova import flags +from nova import process +from nova import service +from nova import utils +from nova.compute import disk +from nova.compute import model +from nova.compute import network +from nova.objectstore import image # for image_path flag +from nova.volume import service as volume_service + + +FLAGS = flags.FLAGS +flags.DEFINE_string('libvirt_xml_template', + utils.abspath('compute/libvirt.xml.template'), + 'Libvirt XML Template') +flags.DEFINE_bool('use_s3', True, + 'whether to get images from s3 or use local copy') +flags.DEFINE_string('instances_path', utils.abspath('../instances'), + 'where instances are stored on disk') + +INSTANCE_TYPES = {} +INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} +INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} +INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} +INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} + + +def _image_path(path=''): + return os.path.join(FLAGS.images_path, path) + + +def _image_url(path): + return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) + + +class ComputeService(service.Service): + """ + Manages the running instances. + """ + def __init__(self): + """ load configuration options for this node and connect to libvirt """ + super(ComputeService, self).__init__() + self._instances = {} + self._conn = self._get_connection() + self.instdir = model.InstanceDirectory() + # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe + + def _get_connection(self): + """ returns a libvirt connection object """ + # TODO(termie): maybe lazy load after initial check for permissions + # TODO(termie): check whether we can be disconnected + if FLAGS.fake_libvirt: + conn = fakevirt.FakeVirtConnection.instance() + else: + auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], + 'root', + None] + conn = libvirt.openAuth('qemu:///system', auth, 0) + if conn == None: + logging.error('Failed to open connection to the hypervisor') + sys.exit(1) + return conn + + def noop(self): + """ simple test of an AMQP message call """ + return defer.succeed('PONG') + + def get_instance(self, instance_id): + # inst = self.instdir.get(instance_id) + # return inst + if self.instdir.exists(instance_id): + return Instance.fromName(self._conn, instance_id) + return None + + @exception.wrap_exception + def adopt_instances(self): + """ if there are instances already running, adopt them """ + return defer.succeed(0) + instance_names = [self._conn.lookupByID(x).name() + for x in self._conn.listDomainsID()] + for name in instance_names: + try: + new_inst = Instance.fromName(self._conn, name) + new_inst.update_state() + except: + pass + return defer.succeed(len(self._instances)) + + @exception.wrap_exception + def describe_instances(self): + retval = {} + for inst in self.instdir.by_node(FLAGS.node_name): + retval[inst['instance_id']] = ( + Instance.fromName(self._conn, inst['instance_id'])) + return retval + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except model.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield + + # @exception.wrap_exception + def run_instance(self, instance_id, **_kwargs): + """ launch a new instance with specified options """ + logging.debug("Starting instance %s..." % (instance_id)) + inst = self.instdir.get(instance_id) + if not FLAGS.simple_network: + # TODO: Get the real security group of launch in here + security_group = "default" + net = network.BridgedNetwork.get_network_for_project(inst['user_id'], + inst['project_id'], + security_group).express() + inst['node_name'] = FLAGS.node_name + inst.save() + # TODO(vish) check to make sure the availability zone matches + new_inst = Instance(self._conn, name=instance_id, data=inst) + logging.info("Instances current state is %s", new_inst.state) + if new_inst.is_running(): + raise exception.Error("Instance is already running") + d = new_inst.spawn() + return d + + @exception.wrap_exception + def terminate_instance(self, instance_id): + """ terminate an instance on this machine """ + logging.debug("Got told to terminate instance %s" % instance_id) + instance = self.get_instance(instance_id) + # inst = self.instdir.get(instance_id) + if not instance: + raise exception.Error( + 'trying to terminate unknown instance: %s' % instance_id) + d = instance.destroy() + # d.addCallback(lambda x: inst.destroy()) + return d + + @exception.wrap_exception + def reboot_instance(self, instance_id): + """ reboot an instance on this server + KVM doesn't support reboot, so we terminate and restart """ + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to reboot unknown instance: %s' % instance_id) + return instance.reboot() + + @defer.inlineCallbacks + @exception.wrap_exception + def get_console_output(self, instance_id): + """ send the console output for an instance """ + logging.debug("Getting console output for %s" % (instance_id)) + inst = self.instdir.get(instance_id) + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to get console log for unknown: %s' % instance_id) + rv = yield instance.console_output() + # TODO(termie): this stuff belongs in the API layer, no need to + # munge the data we send to ourselves + output = {"InstanceId" : instance_id, + "Timestamp" : "2", + "output" : base64.b64encode(rv)} + defer.returnValue(output) + + @defer.inlineCallbacks + @exception.wrap_exception + def attach_volume(self, instance_id = None, + volume_id = None, mountpoint = None): + volume = volume_service.get_volume(volume_id) + yield self._init_aoe() + yield process.simple_execute( + "sudo virsh attach-disk %s /dev/etherd/%s %s" % + (instance_id, + volume['aoe_device'], + mountpoint.rpartition('/dev/')[2])) + volume.finish_attach() + defer.returnValue(True) + + @defer.inlineCallbacks + def _init_aoe(self): + yield process.simple_execute("sudo aoe-discover") + yield process.simple_execute("sudo aoe-stat") + + @defer.inlineCallbacks + @exception.wrap_exception + def detach_volume(self, instance_id, volume_id): + """ detach a volume from an instance """ + # despite the documentation, virsh detach-disk just wants the device + # name without the leading /dev/ + volume = volume_service.get_volume(volume_id) + target = volume['mountpoint'].rpartition('/dev/')[2] + yield process.simple_execute( + "sudo virsh detach-disk %s %s " % (instance_id, target)) + volume.finish_detach() + defer.returnValue(True) + + +class Group(object): + def __init__(self, group_id): + self.group_id = group_id + + +class ProductCode(object): + def __init__(self, product_code): + self.product_code = product_code + + +class Instance(object): + + NOSTATE = 0x00 + RUNNING = 0x01 + BLOCKED = 0x02 + PAUSED = 0x03 + SHUTDOWN = 0x04 + SHUTOFF = 0x05 + CRASHED = 0x06 + + def __init__(self, conn, name, data): + """ spawn an instance with a given name """ + self._conn = conn + # TODO(vish): this can be removed after data has been updated + # data doesn't seem to have a working iterator so in doesn't work + if data.get('owner_id', None) is not None: + data['user_id'] = data['owner_id'] + data['project_id'] = data['owner_id'] + self.datamodel = data + + size = data.get('instance_type', FLAGS.default_instance_type) + if size not in INSTANCE_TYPES: + raise exception.Error('invalid instance type: %s' % size) + + self.datamodel.update(INSTANCE_TYPES[size]) + + self.datamodel['name'] = name + self.datamodel['instance_id'] = name + self.datamodel['basepath'] = data.get( + 'basepath', os.path.abspath( + os.path.join(FLAGS.instances_path, self.name))) + self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 + self.datamodel.setdefault('image_id', FLAGS.default_image) + self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) + self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) + self.datamodel.setdefault('project_id', self.datamodel['user_id']) + self.datamodel.setdefault('bridge_name', None) + #self.datamodel.setdefault('key_data', None) + #self.datamodel.setdefault('key_name', None) + #self.datamodel.setdefault('addressing_type', None) + + # TODO(joshua) - The ugly non-flat ones + self.datamodel['groups'] = data.get('security_group', 'default') + # TODO(joshua): Support product codes somehow + self.datamodel.setdefault('product_codes', None) + + self.datamodel.save() + logging.debug("Finished init of Instance with id of %s" % name) + + def toXml(self): + # TODO(termie): cache? + logging.debug("Starting the toXML method") + libvirt_xml = open(FLAGS.libvirt_xml_template).read() + xml_info = self.datamodel.copy() + # TODO(joshua): Make this xml express the attached disks as well + + # TODO(termie): lazy lazy hack because xml is annoying + xml_info['nova'] = json.dumps(self.datamodel.copy()) + libvirt_xml = libvirt_xml % xml_info + logging.debug("Finished the toXML method") + + return libvirt_xml + + @classmethod + def fromName(cls, conn, name): + """ use the saved data for reloading the instance """ + instdir = model.InstanceDirectory() + instance = instdir.get(name) + return cls(conn=conn, name=name, data=instance) + + def set_state(self, state_code, state_description=None): + self.datamodel['state'] = state_code + if not state_description: + state_description = STATE_NAMES[state_code] + self.datamodel['state_description'] = state_description + self.datamodel.save() + + @property + def state(self): + # it is a string in datamodel + return int(self.datamodel['state']) + + @property + def name(self): + return self.datamodel['name'] + + def is_pending(self): + return (self.state == Instance.NOSTATE or self.state == 'pending') + + def is_destroyed(self): + return self.state == Instance.SHUTOFF + + def is_running(self): + logging.debug("Instance state is: %s" % self.state) + return (self.state == Instance.RUNNING or self.state == 'running') + + def describe(self): + return self.datamodel + + def info(self): + logging.debug("Getting info for dom %s" % self.name) + virt_dom = self._conn.lookupByName(self.name) + (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() + return {'state': state, + 'max_mem': max_mem, + 'mem': mem, + 'num_cpu': num_cpu, + 'cpu_time': cpu_time, + 'node_name': FLAGS.node_name} + + def basepath(self, path=''): + return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) + + def update_state(self): + self.datamodel.update(self.info()) + self.set_state(self.state) + self.datamodel.save() # Extra, but harmless + + @exception.wrap_exception + def destroy(self): + if self.is_destroyed(): + self.datamodel.destroy() + raise exception.Error('trying to destroy already destroyed' + ' instance: %s' % self.name) + + self.set_state(Instance.NOSTATE, 'shutting_down') + try: + virt_dom = self._conn.lookupByName(self.name) + virt_dom.destroy() + except Exception, _err: + pass + # If the instance is already terminated, we're still happy + d = defer.Deferred() + d.addCallback(lambda x: self._cleanup()) + d.addCallback(lambda x: self.datamodel.destroy()) + # TODO(termie): short-circuit me for tests + # WE'LL save this for when we do shutdown, + # instead of destroy - but destroy returns immediately + timer = task.LoopingCall(f=None) + def _wait_for_shutdown(): + try: + self.update_state() + if self.state == Instance.SHUTDOWN: + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_shutdown + timer.start(interval=0.5, now=True) + return d + + def _cleanup(self): + target = os.path.abspath(self.datamodel['basepath']) + logging.info("Deleting instance files at %s", target) + shutil.rmtree(target) + + @defer.inlineCallbacks + @exception.wrap_exception + def reboot(self): + if not self.is_running(): + raise exception.Error( + 'trying to reboot a non-running' + 'instance: %s (state: %s)' % (self.name, self.state)) + + logging.debug('rebooting instance %s' % self.name) + self.set_state(Instance.NOSTATE, 'rebooting') + yield self._conn.lookupByName(self.name).destroy() + self._conn.createXML(self.toXml(), 0) + + d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_reboot(): + try: + self.update_state() + if self.is_running(): + logging.debug('rebooted instance %s' % self.name) + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_reboot + timer.start(interval=0.5, now=True) + yield d + + def _fetch_s3_image(self, image, path): + url = _image_url('%s/image' % image) + d = process.simple_execute( + 'curl --silent %s -o %s' % (url, path)) + return d + + def _fetch_local_image(self, image, path): + source = _image_path('%s/image' % image) + d = process.simple_execute('cp %s %s' % (source, path)) + return d + + @defer.inlineCallbacks + def _create_image(self, libvirt_xml): + # syntactic nicety + data = self.datamodel + basepath = self.basepath + + # ensure directories exist and are writable + yield process.simple_execute( + 'mkdir -p %s' % basepath()) + yield process.simple_execute( + 'chmod 0777 %s' % basepath()) + + + # TODO(termie): these are blocking calls, it would be great + # if they weren't. + logging.info('Creating image for: %s', data['instance_id']) + f = open(basepath('libvirt.xml'), 'w') + f.write(libvirt_xml) + f.close() + + if FLAGS.fake_libvirt: + logging.info('fake_libvirt, nothing to do for create_image') + raise defer.returnValue(None); + + if FLAGS.use_s3: + _fetch_file = self._fetch_s3_image + else: + _fetch_file = self._fetch_local_image + + if not os.path.exists(basepath('disk')): + yield _fetch_file(data['image_id'], basepath('disk-raw')) + if not os.path.exists(basepath('kernel')): + yield _fetch_file(data['kernel_id'], basepath('kernel')) + if not os.path.exists(basepath('ramdisk')): + yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) + + execute = lambda cmd, input=None: \ + process.simple_execute(cmd=cmd, + input=input, + error_ok=1) + + key = data['key_data'] + net = None + if FLAGS.simple_network: + with open(FLAGS.simple_network_template) as f: + net = f.read() % {'address': data['private_dns_name'], + 'network': FLAGS.simple_network_network, + 'netmask': FLAGS.simple_network_netmask, + 'gateway': FLAGS.simple_network_gateway, + 'broadcast': FLAGS.simple_network_broadcast, + 'dns': FLAGS.simple_network_dns} + if key or net: + logging.info('Injecting data into image %s', data['image_id']) + yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) + + if os.path.exists(basepath('disk')): + yield process.simple_execute( + 'rm -f %s' % basepath('disk')) + + bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] + * 1024 * 1024 * 1024) + yield disk.partition( + basepath('disk-raw'), basepath('disk'), bytes, execute=execute) + + @defer.inlineCallbacks + @exception.wrap_exception + def spawn(self): + self.set_state(Instance.NOSTATE, 'spawning') + logging.debug("Starting spawn in Instance") + + xml = self.toXml() + self.set_state(Instance.NOSTATE, 'launching') + logging.info('self %s', self) + try: + yield self._create_image(xml) + self._conn.createXML(xml, 0) + # TODO(termie): this should actually register + # a callback to check for successful boot + logging.debug("Instance is running") + + local_d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_boot(): + try: + self.update_state() + if self.is_running(): + logging.debug('booted instance %s' % self.name) + timer.stop() + local_d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + logging.error('Failed to boot instance %s' % self.name) + timer.stop() + local_d.callback(None) + timer.f = _wait_for_boot + timer.start(interval=0.5, now=True) + except Exception, ex: + logging.debug(ex) + self.set_state(Instance.SHUTDOWN) + + @exception.wrap_exception + def console_output(self): + if not FLAGS.fake_libvirt: + fname = os.path.abspath( + os.path.join(self.datamodel['basepath'], 'console.log')) + with open(fname, 'r') as f: + console = f.read() + else: + console = 'FAKE CONSOLE OUTPUT' + return defer.succeed(console) + +STATE_NAMES = { + Instance.NOSTATE : 'pending', + Instance.RUNNING : 'running', + Instance.BLOCKED : 'blocked', + Instance.PAUSED : 'paused', + Instance.SHUTDOWN : 'shutdown', + Instance.SHUTOFF : 'shutdown', + Instance.CRASHED : 'crashed', +} diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py index 56d474fd9..97a7b5a38 100644 --- a/nova/endpoint/cloud.py +++ b/nova/endpoint/cloud.py @@ -37,9 +37,9 @@ from nova.auth import rbac from nova.auth import users from nova.compute import model from nova.compute import network -from nova.compute import computeservice +from nova.compute import service as compute_service from nova.endpoint import images -from nova.volume import volumeservice +from nova.volume import service as volume_service FLAGS = flags.FLAGS @@ -75,7 +75,7 @@ class CloudController(object): def volumes(self): """ returns a list of all volumes """ for volume_id in datastore.Redis.instance().smembers("volumes"): - volume = volumeservice.get_volume(volume_id) + volume = volume_service.get_volume(volume_id) yield volume def __str__(self): @@ -102,7 +102,7 @@ class CloudController(object): result = {} for instance in self.instdir.all: if instance['project_id'] == project_id: - line = '%s slots=%d' % (instance['private_dns_name'], computeservice.INSTANCE_TYPES[instance['instance_type']]['vcpus']) + line = '%s slots=%d' % (instance['private_dns_name'], compute_service.INSTANCE_TYPES[instance['instance_type']]['vcpus']) if instance['key_name'] in result: result[instance['key_name']].append(line) else: @@ -295,7 +295,7 @@ class CloudController(object): @rbac.allow('projectmanager', 'sysadmin') def create_volume(self, context, size, **kwargs): - # TODO(vish): refactor this to create the volume object here and tell volumeservice to create it + # TODO(vish): refactor this to create the volume object here and tell service to create it res = rpc.call(FLAGS.volume_topic, {"method": "create_volume", "args" : {"size": size, "user_id": context.user.id, @@ -330,7 +330,7 @@ class CloudController(object): raise exception.NotFound('Instance %s could not be found' % instance_id) def _get_volume(self, context, volume_id): - volume = volumeservice.get_volume(volume_id) + volume = volume_service.get_volume(volume_id) if context.user.is_admin() or volume['project_id'] == context.project.id: return volume raise exception.NotFound('Volume %s could not be found' % volume_id) diff --git a/nova/network/networkservice.py b/nova/network/networkservice.py deleted file mode 100644 index 9d87e05e6..000000000 --- a/nova/network/networkservice.py +++ /dev/null @@ -1,35 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Network Nodes are responsible for allocating ips and setting up network -""" - -import logging - -from nova import flags -from nova import service - - -FLAGS = flags.FLAGS - -class NetworkService(service.Service): - """Allocates ips and sets up networks""" - - def __init__(self): - logging.debug("Network node working") diff --git a/nova/network/service.py b/nova/network/service.py new file mode 100644 index 000000000..9d87e05e6 --- /dev/null +++ b/nova/network/service.py @@ -0,0 +1,35 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Network Nodes are responsible for allocating ips and setting up network +""" + +import logging + +from nova import flags +from nova import service + + +FLAGS = flags.FLAGS + +class NetworkService(service.Service): + """Allocates ips and sets up networks""" + + def __init__(self): + logging.debug("Network node working") diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index 38f4de8d9..128188b0d 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -28,7 +28,7 @@ from nova import flags from nova import rpc from nova import test from nova.auth import users -from nova.compute import computeservice +from nova.compute import service from nova.endpoint import api from nova.endpoint import cloud @@ -54,7 +54,7 @@ class CloudTestCase(test.BaseTestCase): self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) # set up a service - self.compute = computeservice.ComputeService() + self.compute = service.ComputeService() self.compute_consumer = rpc.AdapterConsumer(connection=self.conn, topic=FLAGS.compute_topic, proxy=self.compute) diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py index db08308bb..b70260c25 100644 --- a/nova/tests/compute_unittest.py +++ b/nova/tests/compute_unittest.py @@ -26,7 +26,7 @@ from nova import flags from nova import test from nova import utils from nova.compute import model -from nova.compute import computeservice +from nova.compute import service FLAGS = flags.FLAGS @@ -60,7 +60,7 @@ class ComputeConnectionTestCase(test.TrialTestCase): self.flags(fake_libvirt=True, fake_storage=True, fake_users=True) - self.compute = computeservice.ComputeService() + self.compute = service.ComputeService() def create_instance(self): instdir = model.InstanceDirectory() diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py index 568b199a0..62144269c 100644 --- a/nova/tests/volume_unittest.py +++ b/nova/tests/volume_unittest.py @@ -18,11 +18,11 @@ import logging +from nova import compute from nova import exception from nova import flags from nova import test -from nova.compute import computeservice -from nova.volume import volumeservice +from nova.volume import service as volume_service FLAGS = flags.FLAGS @@ -32,11 +32,11 @@ class VolumeTestCase(test.TrialTestCase): def setUp(self): logging.getLogger().setLevel(logging.DEBUG) super(VolumeTestCase, self).setUp() - self.compute = computeservice.ComputeService() + self.compute = compute.service.ComputeService() self.volume = None self.flags(fake_libvirt=True, fake_storage=True) - self.volume = volumeservice.VolumeService() + self.volume = volume_service.VolumeService() def test_run_create_volume(self): vol_size = '0' @@ -45,11 +45,11 @@ class VolumeTestCase(test.TrialTestCase): volume_id = self.volume.create_volume(vol_size, user_id, project_id) # TODO(termie): get_volume returns differently than create_volume self.assertEqual(volume_id, - volumeservice.get_volume(volume_id)['volume_id']) + volume_service.get_volume(volume_id)['volume_id']) rv = self.volume.delete_volume(volume_id) self.assertRaises(exception.Error, - volumeservice.get_volume, + volume_service.get_volume, volume_id) def test_too_big_volume(self): @@ -70,7 +70,7 @@ class VolumeTestCase(test.TrialTestCase): for i in xrange(total_slots): vid = self.volume.create_volume(vol_size, user_id, project_id) vols.append(vid) - self.assertRaises(volumeservice.NoMoreVolumes, + self.assertRaises(volume_service.NoMoreVolumes, self.volume.create_volume, vol_size, user_id, project_id) for id in vols: @@ -85,7 +85,7 @@ class VolumeTestCase(test.TrialTestCase): mountpoint = "/dev/sdf" volume_id = self.volume.create_volume(vol_size, user_id, project_id) - volume_obj = volumeservice.get_volume(volume_id) + volume_obj = volume_service.get_volume(volume_id) volume_obj.start_attach(instance_id, mountpoint) rv = yield self.compute.attach_volume(volume_id, instance_id, @@ -100,12 +100,12 @@ class VolumeTestCase(test.TrialTestCase): volume_id) rv = yield self.volume.detach_volume(volume_id) - volume_obj = volumeservice.get_volume(volume_id) + volume_obj = volume_service.get_volume(volume_id) self.assertEqual(volume_obj['status'], "available") rv = self.volume.delete_volume(volume_id) self.assertRaises(exception.Error, - volumeservice.get_volume, + volume_service.get_volume, volume_id) def test_multi_node(self): diff --git a/nova/volume/service.py b/nova/volume/service.py new file mode 100644 index 000000000..87a47f40a --- /dev/null +++ b/nova/volume/service.py @@ -0,0 +1,304 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Nova Storage manages creating, attaching, detaching, and +destroying persistent storage volumes, ala EBS. +Currently uses Ata-over-Ethernet. +""" + +import glob +import logging +import os +import shutil +import socket +import tempfile + +from twisted.internet import defer + +from nova import datastore +from nova import exception +from nova import flags +from nova import process +from nova import service +from nova import utils +from nova import validate + + +FLAGS = flags.FLAGS +flags.DEFINE_string('storage_dev', '/dev/sdb', + 'Physical device to use for volumes') +flags.DEFINE_string('volume_group', 'nova-volumes', + 'Name for the VG that will contain exported volumes') +flags.DEFINE_string('aoe_eth_dev', 'eth0', + 'Which device to export the volumes on') +flags.DEFINE_string('storage_name', + socket.gethostname(), + 'name of this service') +flags.DEFINE_integer('first_shelf_id', + utils.last_octet(utils.get_my_ip()) * 10, + 'AoE starting shelf_id for this service') +flags.DEFINE_integer('last_shelf_id', + utils.last_octet(utils.get_my_ip()) * 10 + 9, + 'AoE starting shelf_id for this service') +flags.DEFINE_string('aoe_export_dir', + '/var/lib/vblade-persist/vblades', + 'AoE directory where exports are created') +flags.DEFINE_integer('slots_per_shelf', + 16, + 'Number of AoE slots per shelf') +flags.DEFINE_string('storage_availability_zone', + 'nova', + 'availability zone of this service') +flags.DEFINE_boolean('fake_storage', False, + 'Should we make real storage volumes to attach?') + + +class NoMoreVolumes(exception.Error): + pass + +def get_volume(volume_id): + """ Returns a redis-backed volume object """ + volume_class = Volume + if FLAGS.fake_storage: + volume_class = FakeVolume + if datastore.Redis.instance().sismember('volumes', volume_id): + return volume_class(volume_id=volume_id) + raise exception.Error("Volume does not exist") + +class VolumeService(service.Service): + """ + There is one VolumeNode running on each host. + However, each VolumeNode can report on the state of + *all* volumes in the cluster. + """ + def __init__(self): + super(VolumeService, self).__init__() + self.volume_class = Volume + if FLAGS.fake_storage: + FLAGS.aoe_export_dir = tempfile.mkdtemp() + self.volume_class = FakeVolume + self._init_volume_group() + + def __del__(self): + # TODO(josh): Get rid of this destructor, volumes destroy themselves + if FLAGS.fake_storage: + try: + shutil.rmtree(FLAGS.aoe_export_dir) + except Exception, err: + pass + + @validate.rangetest(size=(0, 1000)) + def create_volume(self, size, user_id, project_id): + """ + Creates an exported volume (fake or real), + restarts exports to make it available. + Volume at this point has size, owner, and zone. + """ + logging.debug("Creating volume of size: %s" % (size)) + vol = self.volume_class.create(size, user_id, project_id) + datastore.Redis.instance().sadd('volumes', vol['volume_id']) + datastore.Redis.instance().sadd('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) + self._restart_exports() + return vol['volume_id'] + + def by_node(self, node_id): + """ returns a list of volumes for a node """ + for volume_id in datastore.Redis.instance().smembers('volumes:%s' % (node_id)): + yield self.volume_class(volume_id=volume_id) + + @property + def all(self): + """ returns a list of all volumes """ + for volume_id in datastore.Redis.instance().smembers('volumes'): + yield self.volume_class(volume_id=volume_id) + + def delete_volume(self, volume_id): + logging.debug("Deleting volume with id of: %s" % (volume_id)) + vol = get_volume(volume_id) + if vol['status'] == "attached": + raise exception.Error("Volume is still attached") + if vol['node_name'] != FLAGS.storage_name: + raise exception.Error("Volume is not local to this node") + vol.destroy() + datastore.Redis.instance().srem('volumes', vol['volume_id']) + datastore.Redis.instance().srem('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) + return True + + @defer.inlineCallbacks + def _restart_exports(self): + if FLAGS.fake_storage: + return + yield process.simple_execute( + "sudo vblade-persist auto all") + yield process.simple_execute( + "sudo vblade-persist start all") + + @defer.inlineCallbacks + def _init_volume_group(self): + if FLAGS.fake_storage: + return + yield process.simple_execute( + "sudo pvcreate %s" % (FLAGS.storage_dev)) + yield process.simple_execute( + "sudo vgcreate %s %s" % (FLAGS.volume_group, + FLAGS.storage_dev)) + +class Volume(datastore.BasicModel): + + def __init__(self, volume_id=None): + self.volume_id = volume_id + super(Volume, self).__init__() + + @property + def identifier(self): + return self.volume_id + + def default_state(self): + return {"volume_id": self.volume_id} + + @classmethod + def create(cls, size, user_id, project_id): + volume_id = utils.generate_uid('vol') + vol = cls(volume_id) + vol['node_name'] = FLAGS.storage_name + vol['size'] = size + vol['user_id'] = user_id + vol['project_id'] = project_id + vol['availability_zone'] = FLAGS.storage_availability_zone + vol["instance_id"] = 'none' + vol["mountpoint"] = 'none' + vol['attach_time'] = 'none' + vol['status'] = "creating" # creating | available | in-use + vol['attach_status'] = "detached" # attaching | attached | detaching | detached + vol['delete_on_termination'] = 'False' + vol.save() + vol.create_lv() + vol._setup_export() + # TODO(joshua) - We need to trigger a fanout message for aoe-discover on all the nodes + # TODO(joshua + vol['status'] = "available" + vol.save() + return vol + + def start_attach(self, instance_id, mountpoint): + """ """ + self['instance_id'] = instance_id + self['mountpoint'] = mountpoint + self['status'] = "in-use" + self['attach_status'] = "attaching" + self['attach_time'] = utils.isotime() + self['delete_on_termination'] = 'False' + self.save() + + def finish_attach(self): + """ """ + self['attach_status'] = "attached" + self.save() + + def start_detach(self): + """ """ + self['attach_status'] = "detaching" + self.save() + + def finish_detach(self): + self['instance_id'] = None + self['mountpoint'] = None + self['status'] = "available" + self['attach_status'] = "detached" + self.save() + + def destroy(self): + try: + self._remove_export() + except: + pass + self._delete_lv() + super(Volume, self).destroy() + + @defer.inlineCallbacks + def create_lv(self): + if str(self['size']) == '0': + sizestr = '100M' + else: + sizestr = '%sG' % self['size'] + yield process.simple_execute( + "sudo lvcreate -L %s -n %s %s" % (sizestr, + self['volume_id'], + FLAGS.volume_group)) + + @defer.inlineCallbacks + def _delete_lv(self): + yield process.simple_execute( + "sudo lvremove -f %s/%s" % (FLAGS.volume_group, + self['volume_id'])) + + def _setup_export(self): + (shelf_id, blade_id) = get_next_aoe_numbers() + self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id) + self['shelf_id'] = shelf_id + self['blade_id'] = blade_id + self.save() + self._exec_export() + + @defer.inlineCallbacks + def _exec_export(self): + yield process.simple_execute( + "sudo vblade-persist setup %s %s %s /dev/%s/%s" % + (self['shelf_id'], + self['blade_id'], + FLAGS.aoe_eth_dev, + FLAGS.volume_group, + self['volume_id'])) + + @defer.inlineCallbacks + def _remove_export(self): + yield process.simple_execute( + "sudo vblade-persist stop %s %s" % (self['shelf_id'], + self['blade_id'])) + yield process.simple_execute( + "sudo vblade-persist destroy %s %s" % (self['shelf_id'], + self['blade_id'])) + + +class FakeVolume(Volume): + def create_lv(self): + pass + + def _exec_export(self): + fname = os.path.join(FLAGS.aoe_export_dir, self['aoe_device']) + f = file(fname, "w") + f.close() + + def _remove_export(self): + pass + + def _delete_lv(self): + pass + +def get_next_aoe_numbers(): + for shelf_id in xrange(FLAGS.first_shelf_id, FLAGS.last_shelf_id + 1): + aoes = glob.glob("%s/e%s.*" % (FLAGS.aoe_export_dir, shelf_id)) + if not aoes: + blade_id = 0 + else: + blade_id = int(max([int(a.rpartition('.')[2]) for a in aoes])) + 1 + if blade_id < FLAGS.slots_per_shelf: + logging.debug("Next shelf.blade is %s.%s", shelf_id, blade_id) + return (shelf_id, blade_id) + raise NoMoreVolumes() diff --git a/nova/volume/volumeservice.py b/nova/volume/volumeservice.py deleted file mode 100644 index 87a47f40a..000000000 --- a/nova/volume/volumeservice.py +++ /dev/null @@ -1,304 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Nova Storage manages creating, attaching, detaching, and -destroying persistent storage volumes, ala EBS. -Currently uses Ata-over-Ethernet. -""" - -import glob -import logging -import os -import shutil -import socket -import tempfile - -from twisted.internet import defer - -from nova import datastore -from nova import exception -from nova import flags -from nova import process -from nova import service -from nova import utils -from nova import validate - - -FLAGS = flags.FLAGS -flags.DEFINE_string('storage_dev', '/dev/sdb', - 'Physical device to use for volumes') -flags.DEFINE_string('volume_group', 'nova-volumes', - 'Name for the VG that will contain exported volumes') -flags.DEFINE_string('aoe_eth_dev', 'eth0', - 'Which device to export the volumes on') -flags.DEFINE_string('storage_name', - socket.gethostname(), - 'name of this service') -flags.DEFINE_integer('first_shelf_id', - utils.last_octet(utils.get_my_ip()) * 10, - 'AoE starting shelf_id for this service') -flags.DEFINE_integer('last_shelf_id', - utils.last_octet(utils.get_my_ip()) * 10 + 9, - 'AoE starting shelf_id for this service') -flags.DEFINE_string('aoe_export_dir', - '/var/lib/vblade-persist/vblades', - 'AoE directory where exports are created') -flags.DEFINE_integer('slots_per_shelf', - 16, - 'Number of AoE slots per shelf') -flags.DEFINE_string('storage_availability_zone', - 'nova', - 'availability zone of this service') -flags.DEFINE_boolean('fake_storage', False, - 'Should we make real storage volumes to attach?') - - -class NoMoreVolumes(exception.Error): - pass - -def get_volume(volume_id): - """ Returns a redis-backed volume object """ - volume_class = Volume - if FLAGS.fake_storage: - volume_class = FakeVolume - if datastore.Redis.instance().sismember('volumes', volume_id): - return volume_class(volume_id=volume_id) - raise exception.Error("Volume does not exist") - -class VolumeService(service.Service): - """ - There is one VolumeNode running on each host. - However, each VolumeNode can report on the state of - *all* volumes in the cluster. - """ - def __init__(self): - super(VolumeService, self).__init__() - self.volume_class = Volume - if FLAGS.fake_storage: - FLAGS.aoe_export_dir = tempfile.mkdtemp() - self.volume_class = FakeVolume - self._init_volume_group() - - def __del__(self): - # TODO(josh): Get rid of this destructor, volumes destroy themselves - if FLAGS.fake_storage: - try: - shutil.rmtree(FLAGS.aoe_export_dir) - except Exception, err: - pass - - @validate.rangetest(size=(0, 1000)) - def create_volume(self, size, user_id, project_id): - """ - Creates an exported volume (fake or real), - restarts exports to make it available. - Volume at this point has size, owner, and zone. - """ - logging.debug("Creating volume of size: %s" % (size)) - vol = self.volume_class.create(size, user_id, project_id) - datastore.Redis.instance().sadd('volumes', vol['volume_id']) - datastore.Redis.instance().sadd('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) - self._restart_exports() - return vol['volume_id'] - - def by_node(self, node_id): - """ returns a list of volumes for a node """ - for volume_id in datastore.Redis.instance().smembers('volumes:%s' % (node_id)): - yield self.volume_class(volume_id=volume_id) - - @property - def all(self): - """ returns a list of all volumes """ - for volume_id in datastore.Redis.instance().smembers('volumes'): - yield self.volume_class(volume_id=volume_id) - - def delete_volume(self, volume_id): - logging.debug("Deleting volume with id of: %s" % (volume_id)) - vol = get_volume(volume_id) - if vol['status'] == "attached": - raise exception.Error("Volume is still attached") - if vol['node_name'] != FLAGS.storage_name: - raise exception.Error("Volume is not local to this node") - vol.destroy() - datastore.Redis.instance().srem('volumes', vol['volume_id']) - datastore.Redis.instance().srem('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) - return True - - @defer.inlineCallbacks - def _restart_exports(self): - if FLAGS.fake_storage: - return - yield process.simple_execute( - "sudo vblade-persist auto all") - yield process.simple_execute( - "sudo vblade-persist start all") - - @defer.inlineCallbacks - def _init_volume_group(self): - if FLAGS.fake_storage: - return - yield process.simple_execute( - "sudo pvcreate %s" % (FLAGS.storage_dev)) - yield process.simple_execute( - "sudo vgcreate %s %s" % (FLAGS.volume_group, - FLAGS.storage_dev)) - -class Volume(datastore.BasicModel): - - def __init__(self, volume_id=None): - self.volume_id = volume_id - super(Volume, self).__init__() - - @property - def identifier(self): - return self.volume_id - - def default_state(self): - return {"volume_id": self.volume_id} - - @classmethod - def create(cls, size, user_id, project_id): - volume_id = utils.generate_uid('vol') - vol = cls(volume_id) - vol['node_name'] = FLAGS.storage_name - vol['size'] = size - vol['user_id'] = user_id - vol['project_id'] = project_id - vol['availability_zone'] = FLAGS.storage_availability_zone - vol["instance_id"] = 'none' - vol["mountpoint"] = 'none' - vol['attach_time'] = 'none' - vol['status'] = "creating" # creating | available | in-use - vol['attach_status'] = "detached" # attaching | attached | detaching | detached - vol['delete_on_termination'] = 'False' - vol.save() - vol.create_lv() - vol._setup_export() - # TODO(joshua) - We need to trigger a fanout message for aoe-discover on all the nodes - # TODO(joshua - vol['status'] = "available" - vol.save() - return vol - - def start_attach(self, instance_id, mountpoint): - """ """ - self['instance_id'] = instance_id - self['mountpoint'] = mountpoint - self['status'] = "in-use" - self['attach_status'] = "attaching" - self['attach_time'] = utils.isotime() - self['delete_on_termination'] = 'False' - self.save() - - def finish_attach(self): - """ """ - self['attach_status'] = "attached" - self.save() - - def start_detach(self): - """ """ - self['attach_status'] = "detaching" - self.save() - - def finish_detach(self): - self['instance_id'] = None - self['mountpoint'] = None - self['status'] = "available" - self['attach_status'] = "detached" - self.save() - - def destroy(self): - try: - self._remove_export() - except: - pass - self._delete_lv() - super(Volume, self).destroy() - - @defer.inlineCallbacks - def create_lv(self): - if str(self['size']) == '0': - sizestr = '100M' - else: - sizestr = '%sG' % self['size'] - yield process.simple_execute( - "sudo lvcreate -L %s -n %s %s" % (sizestr, - self['volume_id'], - FLAGS.volume_group)) - - @defer.inlineCallbacks - def _delete_lv(self): - yield process.simple_execute( - "sudo lvremove -f %s/%s" % (FLAGS.volume_group, - self['volume_id'])) - - def _setup_export(self): - (shelf_id, blade_id) = get_next_aoe_numbers() - self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id) - self['shelf_id'] = shelf_id - self['blade_id'] = blade_id - self.save() - self._exec_export() - - @defer.inlineCallbacks - def _exec_export(self): - yield process.simple_execute( - "sudo vblade-persist setup %s %s %s /dev/%s/%s" % - (self['shelf_id'], - self['blade_id'], - FLAGS.aoe_eth_dev, - FLAGS.volume_group, - self['volume_id'])) - - @defer.inlineCallbacks - def _remove_export(self): - yield process.simple_execute( - "sudo vblade-persist stop %s %s" % (self['shelf_id'], - self['blade_id'])) - yield process.simple_execute( - "sudo vblade-persist destroy %s %s" % (self['shelf_id'], - self['blade_id'])) - - -class FakeVolume(Volume): - def create_lv(self): - pass - - def _exec_export(self): - fname = os.path.join(FLAGS.aoe_export_dir, self['aoe_device']) - f = file(fname, "w") - f.close() - - def _remove_export(self): - pass - - def _delete_lv(self): - pass - -def get_next_aoe_numbers(): - for shelf_id in xrange(FLAGS.first_shelf_id, FLAGS.last_shelf_id + 1): - aoes = glob.glob("%s/e%s.*" % (FLAGS.aoe_export_dir, shelf_id)) - if not aoes: - blade_id = 0 - else: - blade_id = int(max([int(a.rpartition('.')[2]) for a in aoes])) + 1 - if blade_id < FLAGS.slots_per_shelf: - logging.debug("Next shelf.blade is %s.%s", shelf_id, blade_id) - return (shelf_id, blade_id) - raise NoMoreVolumes() -- cgit From 4373ca830fe93ddff70cf02bc366303d6b001ae9 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Mon, 26 Jul 2010 18:57:24 -0700 Subject: Fixes to the virtualenv installer --- Makefile | 2 +- tools/activate_venv.sh | 1 + tools/install_venv.py | 3 +-- tools/with_venv.sh | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 5fb512610..fa11cf335 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ clean: rm -rf keys rm -rf instances rm -rf networks - rm run_tests.err.log + rm -f run_tests.err.log clean-all: clean rm -rf $(venv) diff --git a/tools/activate_venv.sh b/tools/activate_venv.sh index d12cb3491..376db5b96 100644 --- a/tools/activate_venv.sh +++ b/tools/activate_venv.sh @@ -1,3 +1,4 @@ +#!/bin/bash _TOOLS=`dirname $0` _VENV=$_TOOLS/../.nova-venv source $_VENV/bin/activate diff --git a/tools/install_venv.py b/tools/install_venv.py index 720dcc0d9..c35af434b 100644 --- a/tools/install_venv.py +++ b/tools/install_venv.py @@ -5,7 +5,6 @@ Installation script for Nova's development virtualenv import os import subprocess import sys -import textwrap ROOT = os.path.dirname(os.path.dirname(__file__)) @@ -80,7 +79,7 @@ def print_help(): $ tools/with_venv.sh - Also, run_tests.sh will automatically use the virtualenv. + Also, make test will automatically use the virtualenv. """ print help diff --git a/tools/with_venv.sh b/tools/with_venv.sh index 51468a334..99d1ac18f 100755 --- a/tools/with_venv.sh +++ b/tools/with_venv.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash TOOLS=`dirname $0` VENV=$TOOLS/../.nova-venv source $VENV/bin/activate && $@ -- cgit From a2cf8a6f6038062cf343322acdbde66456b73dfb Mon Sep 17 00:00:00 2001 From: "jaypipes@gmail.com" <> Date: Mon, 26 Jul 2010 23:28:59 -0400 Subject: Fixes bug#610140. Thanks to Vish and Muharem for the patch --- nova/tests/api_unittest.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py index e5e2afe26..45ae50b2e 100644 --- a/nova/tests/api_unittest.py +++ b/nova/tests/api_unittest.py @@ -43,7 +43,11 @@ def boto_to_tornado(method, path, headers, data, host, connection=None): connection should be a FakeTornadoHttpConnection instance """ - headers = httpserver.HTTPHeaders() + try: + headers = httpserver.HTTPHeaders() + except AttributeError: + from tornado import httputil + headers = httputil.HTTPHeaders() for k, v in headers.iteritems(): headers[k] = v -- cgit From 4199dab2e761efc6cb2d9af353f5f09c943a2d2f Mon Sep 17 00:00:00 2001 From: Paul Voccio Date: Mon, 26 Jul 2010 23:23:44 -0500 Subject: resolving conflict w/ merge, cleaning up virtenv setups --- tools/activate_venv.sh | 4 ---- tools/install_venv.py | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) delete mode 100644 tools/activate_venv.sh diff --git a/tools/activate_venv.sh b/tools/activate_venv.sh deleted file mode 100644 index 376db5b96..000000000 --- a/tools/activate_venv.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash -_TOOLS=`dirname $0` -_VENV=$_TOOLS/../.nova-venv -source $_VENV/bin/activate diff --git a/tools/install_venv.py b/tools/install_venv.py index c35af434b..0b35fc8e9 100644 --- a/tools/install_venv.py +++ b/tools/install_venv.py @@ -72,7 +72,7 @@ def print_help(): To activate the Nova virtualenv for the extent of your current shell session you can run: - $ source tools/activate_venv.sh + $ source .nova-venv/bin/activate Or, if you prefer, you can run commands in the virtualenv on a case by case basis by running: -- cgit From cb874bbe6d73007bfad8be750f6b57555adac7b6 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Mon, 26 Jul 2010 22:48:57 -0700 Subject: removed old reference from nova-common.install and fixed spacing --- debian/nova-common.install | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/debian/nova-common.install b/debian/nova-common.install index 9b1bbf147..93251363a 100644 --- a/debian/nova-common.install +++ b/debian/nova-common.install @@ -1,10 +1,9 @@ -bin/nova-manage usr/bin -debian/nova-manage.conf etc/nova +bin/nova-manage usr/bin +debian/nova-manage.conf etc/nova nova/auth/novarc.template usr/share/nova nova/cloudpipe/client.ovpn.template usr/share/nova nova/compute/libvirt.xml.template usr/share/nova nova/compute/interfaces.template usr/share/nova -usr/lib/python*/*-packages/nova/* CA/openssl.cnf.tmpl var/lib/nova/CA CA/geninter.sh var/lib/nova/CA CA/genrootca.sh var/lib/nova/CA -- cgit From 9641dac5d0cbfd8d1f3026e1bae5749b9d8e00fc Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Tue, 27 Jul 2010 10:30:00 +0200 Subject: Ensure that boto's config has a "Boto" section before attempting to set a value in it. --- nova/tests/objectstore_unittest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nova/tests/objectstore_unittest.py b/nova/tests/objectstore_unittest.py index 0a2f54031..20053a258 100644 --- a/nova/tests/objectstore_unittest.py +++ b/nova/tests/objectstore_unittest.py @@ -189,6 +189,8 @@ class S3APITestCase(test.TrialTestCase): self.tcp_port = self.listening_port.getHost().port + if not boto.config.has_section('Boto'): + boto.config.add_section('Boto') boto.config.set('Boto', 'num_retries', '0') self.conn = S3Connection(aws_access_key_id='admin', aws_secret_access_key='admin', -- cgit From 3365987ee5bc8ec66676f838d7eeefce377a76b9 Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Tue, 27 Jul 2010 12:26:53 +0200 Subject: Automatically choose the correct type of test (virtualenv or system). --- Makefile | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index fa11cf335..cd7e233e1 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,11 @@ with_venv=tools/with_venv.sh build: # Nothing to do -test: $(venv) +default_test_type:= $(shell if [ -e $(venv) ]; then echo venv; else echo system; fi) + +test: test-$(default_test_type) + +test-venv: $(venv) $(with_venv) python run_tests.py test-system: -- cgit From 56da5a45f6b7e511d3adc2f7ce12e1c9a9dc3665 Mon Sep 17 00:00:00 2001 From: Soren Hansen Date: Tue, 27 Jul 2010 22:38:09 +0200 Subject: Bump version to 0.9.0. Change author to "OpenStack". Change author_email to nova@lists.launchpad.net. Change url to http://www.openstack.org/. Change description to "cloud computing fabric controller". --- setup.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/setup.py b/setup.py index eb42283ea..f9a616335 100644 --- a/setup.py +++ b/setup.py @@ -25,11 +25,10 @@ from setuptools import setup, find_packages srcdir = os.path.join(os.path.dirname(sys.argv[0]), 'src') setup(name='nova', - version='0.3.0', - description='None Other, Vaguely Awesome', - author='nova-core', - author_email='nova-core@googlegroups.com', - url='http://novacc.org/', + version='0.9.0', + description='cloud computing fabric controller', + author='OpenStack', + author_email='nova@lists.launchpad.net', + url='http://www.openstack.org/', packages = find_packages(), - ) -- cgit From 9718f5216ce9423c002c47f86e05b2b2eb08e551 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Tue, 27 Jul 2010 14:16:49 -0700 Subject: properly delete old vlans assigned to deleted projects --- nova/compute/network.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nova/compute/network.py b/nova/compute/network.py index b5b3c3b5d..62d892e58 100644 --- a/nova/compute/network.py +++ b/nova/compute/network.py @@ -144,7 +144,7 @@ class Vlan(datastore.BasicModel): @datastore.absorb_connection_error def destroy(self): set_name = self._redis_set_name(self.__class__.__name__) - datastore.Redis.instance().hdel(set_name, self.project) + datastore.Redis.instance().hdel(set_name, self.project_id) def subnet(self): vlan = int(self.vlan_id) @@ -529,6 +529,7 @@ def get_vlan_for_project(project_id): # don't orphan any VLANs. It is basically # garbage collection for after projects abandoned # their reference. + vlan.destroy() vlan.project_id = project_id vlan.save() return vlan -- cgit