From 35fda702abf91792d3c4753a1bbccdb119eaf6eb Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Fri, 23 Jul 2010 15:27:18 -0700 Subject: renamed xxxnode to xxservice --- bin/nova-compute | 4 +- bin/nova-network | 4 +- bin/nova-volume | 4 +- nova/compute/computenode.py | 582 ----------------------------------------- nova/compute/computeservice.py | 581 ++++++++++++++++++++++++++++++++++++++++ nova/endpoint/cloud.py | 15 +- nova/network/networknode.py | 35 --- nova/network/networkservice.py | 35 +++ nova/node.py | 103 -------- nova/service.py | 103 ++++++++ nova/test.py | 8 +- nova/tests/cloud_unittest.py | 18 +- nova/tests/compute_unittest.py | 36 +-- nova/tests/volume_unittest.py | 46 ++-- nova/volume/volumenode.py | 305 --------------------- nova/volume/volumeservice.py | 304 +++++++++++++++++++++ 16 files changed, 1090 insertions(+), 1093 deletions(-) delete mode 100644 nova/compute/computenode.py create mode 100644 nova/compute/computeservice.py delete mode 100644 nova/network/networknode.py create mode 100644 nova/network/networkservice.py delete mode 100644 nova/node.py create mode 100644 nova/service.py delete mode 100644 nova/volume/volumenode.py create mode 100644 nova/volume/volumeservice.py diff --git a/bin/nova-compute b/bin/nova-compute index 1d5fa709d..7ef5d074a 100755 --- a/bin/nova-compute +++ b/bin/nova-compute @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.compute import computenode +from nova.compute import computeservice if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = computenode.ComputeNode.create() + application = computeservice.ComputeService.create() diff --git a/bin/nova-network b/bin/nova-network index db9d4b970..0d3aa0002 100755 --- a/bin/nova-network +++ b/bin/nova-network @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.network import networknode +from nova.network import networkservice if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = networknode.NetworkNode.create() + application = networkservice.NetworkService.create() diff --git a/bin/nova-volume b/bin/nova-volume index 2e9b530a7..c1c0163cf 100755 --- a/bin/nova-volume +++ b/bin/nova-volume @@ -22,11 +22,11 @@ """ from nova import twistd -from nova.volume import volumenode +from nova.volume import volumeservice if __name__ == '__main__': twistd.serve(__file__) if __name__ == '__builtin__': - application = volumenode.VolumeNode.create() + application = volumeservice.VolumeService.create() diff --git a/nova/compute/computenode.py b/nova/compute/computenode.py deleted file mode 100644 index f26da1996..000000000 --- a/nova/compute/computenode.py +++ /dev/null @@ -1,582 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Compute Node: - - Runs on each compute node, managing the - hypervisor using libvirt. - -""" - -import base64 -import json -import logging -import os -import shutil -import sys -from twisted.internet import defer -from twisted.internet import task -from twisted.application import service - - -try: - import libvirt -except Exception, err: - logging.warning('no libvirt found') - -from nova import exception -from nova import fakevirt -from nova import flags -from nova import node -from nova import process -from nova import utils -from nova.compute import disk -from nova.compute import model -from nova.compute import network -from nova.objectstore import image # for image_path flag -from nova.volume import volumenode - - -FLAGS = flags.FLAGS -flags.DEFINE_string('libvirt_xml_template', - utils.abspath('compute/libvirt.xml.template'), - 'Libvirt XML Template') -flags.DEFINE_bool('use_s3', True, - 'whether to get images from s3 or use local copy') -flags.DEFINE_string('instances_path', utils.abspath('../instances'), - 'where instances are stored on disk') - -INSTANCE_TYPES = {} -INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} -INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} -INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} -INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} -INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} - - -def _image_path(path=''): - return os.path.join(FLAGS.images_path, path) - - -def _image_url(path): - return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) - - -class ComputeNode(node.Node): - """ - Manages the running instances. - """ - def __init__(self): - """ load configuration options for this node and connect to libvirt """ - super(ComputeNode, self).__init__() - self._instances = {} - self._conn = self._get_connection() - self.instdir = model.InstanceDirectory() - # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe - - def _get_connection(self): - """ returns a libvirt connection object """ - # TODO(termie): maybe lazy load after initial check for permissions - # TODO(termie): check whether we can be disconnected - if FLAGS.fake_libvirt: - conn = fakevirt.FakeVirtConnection.instance() - else: - auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], - 'root', - None] - conn = libvirt.openAuth('qemu:///system', auth, 0) - if conn == None: - logging.error('Failed to open connection to the hypervisor') - sys.exit(1) - return conn - - def noop(self): - """ simple test of an AMQP message call """ - return defer.succeed('PONG') - - def get_instance(self, instance_id): - # inst = self.instdir.get(instance_id) - # return inst - if self.instdir.exists(instance_id): - return Instance.fromName(self._conn, instance_id) - return None - - @exception.wrap_exception - def adopt_instances(self): - """ if there are instances already running, adopt them """ - return defer.succeed(0) - instance_names = [self._conn.lookupByID(x).name() - for x in self._conn.listDomainsID()] - for name in instance_names: - try: - new_inst = Instance.fromName(self._conn, name) - new_inst.update_state() - except: - pass - return defer.succeed(len(self._instances)) - - @exception.wrap_exception - def describe_instances(self): - retval = {} - for inst in self.instdir.by_node(FLAGS.node_name): - retval[inst['instance_id']] = ( - Instance.fromName(self._conn, inst['instance_id'])) - return retval - - @defer.inlineCallbacks - def report_state(self, nodename, daemon): - # TODO(termie): make this pattern be more elegant. -todd - try: - record = model.Daemon(nodename, daemon) - record.heartbeat() - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - except model.ConnectionError, ex: - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - yield - - # @exception.wrap_exception - def run_instance(self, instance_id, **_kwargs): - """ launch a new instance with specified options """ - logging.debug("Starting instance %s..." % (instance_id)) - inst = self.instdir.get(instance_id) - if not FLAGS.simple_network: - # TODO: Get the real security group of launch in here - security_group = "default" - net = network.BridgedNetwork.get_network_for_project(inst['user_id'], - inst['project_id'], - security_group).express() - inst['node_name'] = FLAGS.node_name - inst.save() - # TODO(vish) check to make sure the availability zone matches - new_inst = Instance(self._conn, name=instance_id, data=inst) - logging.info("Instances current state is %s", new_inst.state) - if new_inst.is_running(): - raise exception.Error("Instance is already running") - d = new_inst.spawn() - return d - - @exception.wrap_exception - def terminate_instance(self, instance_id): - """ terminate an instance on this machine """ - logging.debug("Got told to terminate instance %s" % instance_id) - instance = self.get_instance(instance_id) - # inst = self.instdir.get(instance_id) - if not instance: - raise exception.Error( - 'trying to terminate unknown instance: %s' % instance_id) - d = instance.destroy() - # d.addCallback(lambda x: inst.destroy()) - return d - - @exception.wrap_exception - def reboot_instance(self, instance_id): - """ reboot an instance on this server - KVM doesn't support reboot, so we terminate and restart """ - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to reboot unknown instance: %s' % instance_id) - return instance.reboot() - - @defer.inlineCallbacks - @exception.wrap_exception - def get_console_output(self, instance_id): - """ send the console output for an instance """ - logging.debug("Getting console output for %s" % (instance_id)) - inst = self.instdir.get(instance_id) - instance = self.get_instance(instance_id) - if not instance: - raise exception.Error( - 'trying to get console log for unknown: %s' % instance_id) - rv = yield instance.console_output() - # TODO(termie): this stuff belongs in the API layer, no need to - # munge the data we send to ourselves - output = {"InstanceId" : instance_id, - "Timestamp" : "2", - "output" : base64.b64encode(rv)} - defer.returnValue(output) - - @defer.inlineCallbacks - @exception.wrap_exception - def attach_volume(self, instance_id = None, - volume_id = None, mountpoint = None): - volume = volumenode.get_volume(volume_id) - yield self._init_aoe() - yield process.simple_execute( - "sudo virsh attach-disk %s /dev/etherd/%s %s" % - (instance_id, - volume['aoe_device'], - mountpoint.rpartition('/dev/')[2])) - volume.finish_attach() - defer.returnValue(True) - - @defer.inlineCallbacks - def _init_aoe(self): - yield process.simple_execute("sudo aoe-discover") - yield process.simple_execute("sudo aoe-stat") - - @defer.inlineCallbacks - @exception.wrap_exception - def detach_volume(self, instance_id, volume_id): - """ detach a volume from an instance """ - # despite the documentation, virsh detach-disk just wants the device - # name without the leading /dev/ - volume = volumenode.get_volume(volume_id) - target = volume['mountpoint'].rpartition('/dev/')[2] - yield process.simple_execute( - "sudo virsh detach-disk %s %s " % (instance_id, target)) - volume.finish_detach() - defer.returnValue(True) - - -class Group(object): - def __init__(self, group_id): - self.group_id = group_id - - -class ProductCode(object): - def __init__(self, product_code): - self.product_code = product_code - - -class Instance(object): - - NOSTATE = 0x00 - RUNNING = 0x01 - BLOCKED = 0x02 - PAUSED = 0x03 - SHUTDOWN = 0x04 - SHUTOFF = 0x05 - CRASHED = 0x06 - - def __init__(self, conn, name, data): - """ spawn an instance with a given name """ - self._conn = conn - # TODO(vish): this can be removed after data has been updated - # data doesn't seem to have a working iterator so in doesn't work - if data.get('owner_id', None) is not None: - data['user_id'] = data['owner_id'] - data['project_id'] = data['owner_id'] - self.datamodel = data - - size = data.get('instance_type', FLAGS.default_instance_type) - if size not in INSTANCE_TYPES: - raise exception.Error('invalid instance type: %s' % size) - - self.datamodel.update(INSTANCE_TYPES[size]) - - self.datamodel['name'] = name - self.datamodel['instance_id'] = name - self.datamodel['basepath'] = data.get( - 'basepath', os.path.abspath( - os.path.join(FLAGS.instances_path, self.name))) - self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 - self.datamodel.setdefault('image_id', FLAGS.default_image) - self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) - self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) - self.datamodel.setdefault('project_id', self.datamodel['user_id']) - self.datamodel.setdefault('bridge_name', None) - #self.datamodel.setdefault('key_data', None) - #self.datamodel.setdefault('key_name', None) - #self.datamodel.setdefault('addressing_type', None) - - # TODO(joshua) - The ugly non-flat ones - self.datamodel['groups'] = data.get('security_group', 'default') - # TODO(joshua): Support product codes somehow - self.datamodel.setdefault('product_codes', None) - - self.datamodel.save() - logging.debug("Finished init of Instance with id of %s" % name) - - def toXml(self): - # TODO(termie): cache? - logging.debug("Starting the toXML method") - libvirt_xml = open(FLAGS.libvirt_xml_template).read() - xml_info = self.datamodel.copy() - # TODO(joshua): Make this xml express the attached disks as well - - # TODO(termie): lazy lazy hack because xml is annoying - xml_info['nova'] = json.dumps(self.datamodel.copy()) - libvirt_xml = libvirt_xml % xml_info - logging.debug("Finished the toXML method") - - return libvirt_xml - - @classmethod - def fromName(cls, conn, name): - """ use the saved data for reloading the instance """ - instdir = model.InstanceDirectory() - instance = instdir.get(name) - return cls(conn=conn, name=name, data=instance) - - def set_state(self, state_code, state_description=None): - self.datamodel['state'] = state_code - if not state_description: - state_description = STATE_NAMES[state_code] - self.datamodel['state_description'] = state_description - self.datamodel.save() - - @property - def state(self): - # it is a string in datamodel - return int(self.datamodel['state']) - - @property - def name(self): - return self.datamodel['name'] - - def is_pending(self): - return (self.state == Instance.NOSTATE or self.state == 'pending') - - def is_destroyed(self): - return self.state == Instance.SHUTOFF - - def is_running(self): - logging.debug("Instance state is: %s" % self.state) - return (self.state == Instance.RUNNING or self.state == 'running') - - def describe(self): - return self.datamodel - - def info(self): - logging.debug("Getting info for dom %s" % self.name) - virt_dom = self._conn.lookupByName(self.name) - (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() - return {'state': state, - 'max_mem': max_mem, - 'mem': mem, - 'num_cpu': num_cpu, - 'cpu_time': cpu_time, - 'node_name': FLAGS.node_name} - - def basepath(self, path=''): - return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) - - def update_state(self): - self.datamodel.update(self.info()) - self.set_state(self.state) - self.datamodel.save() # Extra, but harmless - - @exception.wrap_exception - def destroy(self): - if self.is_destroyed(): - self.datamodel.destroy() - raise exception.Error('trying to destroy already destroyed' - ' instance: %s' % self.name) - - self.set_state(Instance.NOSTATE, 'shutting_down') - try: - virt_dom = self._conn.lookupByName(self.name) - virt_dom.destroy() - except Exception, _err: - pass - # If the instance is already terminated, we're still happy - d = defer.Deferred() - d.addCallback(lambda x: self._cleanup()) - d.addCallback(lambda x: self.datamodel.destroy()) - # TODO(termie): short-circuit me for tests - # WE'LL save this for when we do shutdown, - # instead of destroy - but destroy returns immediately - timer = task.LoopingCall(f=None) - def _wait_for_shutdown(): - try: - self.update_state() - if self.state == Instance.SHUTDOWN: - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_shutdown - timer.start(interval=0.5, now=True) - return d - - def _cleanup(self): - target = os.path.abspath(self.datamodel['basepath']) - logging.info("Deleting instance files at %s", target) - shutil.rmtree(target) - - @defer.inlineCallbacks - @exception.wrap_exception - def reboot(self): - if not self.is_running(): - raise exception.Error( - 'trying to reboot a non-running' - 'instance: %s (state: %s)' % (self.name, self.state)) - - logging.debug('rebooting instance %s' % self.name) - self.set_state(Instance.NOSTATE, 'rebooting') - yield self._conn.lookupByName(self.name).destroy() - self._conn.createXML(self.toXml(), 0) - - d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_reboot(): - try: - self.update_state() - if self.is_running(): - logging.debug('rebooted instance %s' % self.name) - timer.stop() - d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - timer.stop() - d.callback(None) - timer.f = _wait_for_reboot - timer.start(interval=0.5, now=True) - yield d - - def _fetch_s3_image(self, image, path): - url = _image_url('%s/image' % image) - d = process.simple_execute( - 'curl --silent %s -o %s' % (url, path)) - return d - - def _fetch_local_image(self, image, path): - source = _image_path('%s/image' % image) - d = process.simple_execute('cp %s %s' % (source, path)) - return d - - @defer.inlineCallbacks - def _create_image(self, libvirt_xml): - # syntactic nicety - data = self.datamodel - basepath = self.basepath - - # ensure directories exist and are writable - yield process.simple_execute( - 'mkdir -p %s' % basepath()) - yield process.simple_execute( - 'chmod 0777 %s' % basepath()) - - - # TODO(termie): these are blocking calls, it would be great - # if they weren't. - logging.info('Creating image for: %s', data['instance_id']) - f = open(basepath('libvirt.xml'), 'w') - f.write(libvirt_xml) - f.close() - - if FLAGS.fake_libvirt: - logging.info('fake_libvirt, nothing to do for create_image') - raise defer.returnValue(None); - - if FLAGS.use_s3: - _fetch_file = self._fetch_s3_image - else: - _fetch_file = self._fetch_local_image - - if not os.path.exists(basepath('disk')): - yield _fetch_file(data['image_id'], basepath('disk-raw')) - if not os.path.exists(basepath('kernel')): - yield _fetch_file(data['kernel_id'], basepath('kernel')) - if not os.path.exists(basepath('ramdisk')): - yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) - - execute = lambda cmd, input=None: \ - process.simple_execute(cmd=cmd, - input=input, - error_ok=1) - - key = data['key_data'] - net = None - if FLAGS.simple_network: - with open(FLAGS.simple_network_template) as f: - net = f.read() % {'address': data['private_dns_name'], - 'network': FLAGS.simple_network_network, - 'netmask': FLAGS.simple_network_netmask, - 'gateway': FLAGS.simple_network_gateway, - 'broadcast': FLAGS.simple_network_broadcast, - 'dns': FLAGS.simple_network_dns} - if key or net: - logging.info('Injecting data into image %s', data['image_id']) - yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) - - if os.path.exists(basepath('disk')): - yield process.simple_execute( - 'rm -f %s' % basepath('disk')) - - bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] - * 1024 * 1024 * 1024) - yield disk.partition( - basepath('disk-raw'), basepath('disk'), bytes, execute=execute) - - @defer.inlineCallbacks - @exception.wrap_exception - def spawn(self): - self.set_state(Instance.NOSTATE, 'spawning') - logging.debug("Starting spawn in Instance") - - xml = self.toXml() - self.set_state(Instance.NOSTATE, 'launching') - logging.info('self %s', self) - try: - yield self._create_image(xml) - self._conn.createXML(xml, 0) - # TODO(termie): this should actually register - # a callback to check for successful boot - logging.debug("Instance is running") - - local_d = defer.Deferred() - timer = task.LoopingCall(f=None) - def _wait_for_boot(): - try: - self.update_state() - if self.is_running(): - logging.debug('booted instance %s' % self.name) - timer.stop() - local_d.callback(None) - except Exception: - self.set_state(Instance.SHUTDOWN) - logging.error('Failed to boot instance %s' % self.name) - timer.stop() - local_d.callback(None) - timer.f = _wait_for_boot - timer.start(interval=0.5, now=True) - except Exception, ex: - logging.debug(ex) - self.set_state(Instance.SHUTDOWN) - - @exception.wrap_exception - def console_output(self): - if not FLAGS.fake_libvirt: - fname = os.path.abspath( - os.path.join(self.datamodel['basepath'], 'console.log')) - with open(fname, 'r') as f: - console = f.read() - else: - console = 'FAKE CONSOLE OUTPUT' - return defer.succeed(console) - -STATE_NAMES = { - Instance.NOSTATE : 'pending', - Instance.RUNNING : 'running', - Instance.BLOCKED : 'blocked', - Instance.PAUSED : 'paused', - Instance.SHUTDOWN : 'shutdown', - Instance.SHUTOFF : 'shutdown', - Instance.CRASHED : 'crashed', -} diff --git a/nova/compute/computeservice.py b/nova/compute/computeservice.py new file mode 100644 index 000000000..5568e3888 --- /dev/null +++ b/nova/compute/computeservice.py @@ -0,0 +1,581 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Compute Service: + + Runs on each compute host, managing the + hypervisor using libvirt. + +""" + +import base64 +import json +import logging +import os +import shutil +import sys +from twisted.internet import defer +from twisted.internet import task + + +try: + import libvirt +except Exception, err: + logging.warning('no libvirt found') + +from nova import exception +from nova import fakevirt +from nova import flags +from nova import process +from nova import service +from nova import utils +from nova.compute import disk +from nova.compute import model +from nova.compute import network +from nova.objectstore import image # for image_path flag +from nova.volume import volumeservice + + +FLAGS = flags.FLAGS +flags.DEFINE_string('libvirt_xml_template', + utils.abspath('compute/libvirt.xml.template'), + 'Libvirt XML Template') +flags.DEFINE_bool('use_s3', True, + 'whether to get images from s3 or use local copy') +flags.DEFINE_string('instances_path', utils.abspath('../instances'), + 'where instances are stored on disk') + +INSTANCE_TYPES = {} +INSTANCE_TYPES['m1.tiny'] = {'memory_mb': 512, 'vcpus': 1, 'local_gb': 0} +INSTANCE_TYPES['m1.small'] = {'memory_mb': 1024, 'vcpus': 1, 'local_gb': 10} +INSTANCE_TYPES['m1.medium'] = {'memory_mb': 2048, 'vcpus': 2, 'local_gb': 10} +INSTANCE_TYPES['m1.large'] = {'memory_mb': 4096, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['m1.xlarge'] = {'memory_mb': 8192, 'vcpus': 4, 'local_gb': 10} +INSTANCE_TYPES['c1.medium'] = {'memory_mb': 2048, 'vcpus': 4, 'local_gb': 10} + + +def _image_path(path=''): + return os.path.join(FLAGS.images_path, path) + + +def _image_url(path): + return "%s:%s/_images/%s" % (FLAGS.s3_host, FLAGS.s3_port, path) + + +class ComputeService(service.Service): + """ + Manages the running instances. + """ + def __init__(self): + """ load configuration options for this node and connect to libvirt """ + super(ComputeService, self).__init__() + self._instances = {} + self._conn = self._get_connection() + self.instdir = model.InstanceDirectory() + # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe + + def _get_connection(self): + """ returns a libvirt connection object """ + # TODO(termie): maybe lazy load after initial check for permissions + # TODO(termie): check whether we can be disconnected + if FLAGS.fake_libvirt: + conn = fakevirt.FakeVirtConnection.instance() + else: + auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_NOECHOPROMPT], + 'root', + None] + conn = libvirt.openAuth('qemu:///system', auth, 0) + if conn == None: + logging.error('Failed to open connection to the hypervisor') + sys.exit(1) + return conn + + def noop(self): + """ simple test of an AMQP message call """ + return defer.succeed('PONG') + + def get_instance(self, instance_id): + # inst = self.instdir.get(instance_id) + # return inst + if self.instdir.exists(instance_id): + return Instance.fromName(self._conn, instance_id) + return None + + @exception.wrap_exception + def adopt_instances(self): + """ if there are instances already running, adopt them """ + return defer.succeed(0) + instance_names = [self._conn.lookupByID(x).name() + for x in self._conn.listDomainsID()] + for name in instance_names: + try: + new_inst = Instance.fromName(self._conn, name) + new_inst.update_state() + except: + pass + return defer.succeed(len(self._instances)) + + @exception.wrap_exception + def describe_instances(self): + retval = {} + for inst in self.instdir.by_node(FLAGS.node_name): + retval[inst['instance_id']] = ( + Instance.fromName(self._conn, inst['instance_id'])) + return retval + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except model.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield + + # @exception.wrap_exception + def run_instance(self, instance_id, **_kwargs): + """ launch a new instance with specified options """ + logging.debug("Starting instance %s..." % (instance_id)) + inst = self.instdir.get(instance_id) + if not FLAGS.simple_network: + # TODO: Get the real security group of launch in here + security_group = "default" + net = network.BridgedNetwork.get_network_for_project(inst['user_id'], + inst['project_id'], + security_group).express() + inst['node_name'] = FLAGS.node_name + inst.save() + # TODO(vish) check to make sure the availability zone matches + new_inst = Instance(self._conn, name=instance_id, data=inst) + logging.info("Instances current state is %s", new_inst.state) + if new_inst.is_running(): + raise exception.Error("Instance is already running") + d = new_inst.spawn() + return d + + @exception.wrap_exception + def terminate_instance(self, instance_id): + """ terminate an instance on this machine """ + logging.debug("Got told to terminate instance %s" % instance_id) + instance = self.get_instance(instance_id) + # inst = self.instdir.get(instance_id) + if not instance: + raise exception.Error( + 'trying to terminate unknown instance: %s' % instance_id) + d = instance.destroy() + # d.addCallback(lambda x: inst.destroy()) + return d + + @exception.wrap_exception + def reboot_instance(self, instance_id): + """ reboot an instance on this server + KVM doesn't support reboot, so we terminate and restart """ + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to reboot unknown instance: %s' % instance_id) + return instance.reboot() + + @defer.inlineCallbacks + @exception.wrap_exception + def get_console_output(self, instance_id): + """ send the console output for an instance """ + logging.debug("Getting console output for %s" % (instance_id)) + inst = self.instdir.get(instance_id) + instance = self.get_instance(instance_id) + if not instance: + raise exception.Error( + 'trying to get console log for unknown: %s' % instance_id) + rv = yield instance.console_output() + # TODO(termie): this stuff belongs in the API layer, no need to + # munge the data we send to ourselves + output = {"InstanceId" : instance_id, + "Timestamp" : "2", + "output" : base64.b64encode(rv)} + defer.returnValue(output) + + @defer.inlineCallbacks + @exception.wrap_exception + def attach_volume(self, instance_id = None, + volume_id = None, mountpoint = None): + volume = volumeservice.get_volume(volume_id) + yield self._init_aoe() + yield process.simple_execute( + "sudo virsh attach-disk %s /dev/etherd/%s %s" % + (instance_id, + volume['aoe_device'], + mountpoint.rpartition('/dev/')[2])) + volume.finish_attach() + defer.returnValue(True) + + @defer.inlineCallbacks + def _init_aoe(self): + yield process.simple_execute("sudo aoe-discover") + yield process.simple_execute("sudo aoe-stat") + + @defer.inlineCallbacks + @exception.wrap_exception + def detach_volume(self, instance_id, volume_id): + """ detach a volume from an instance """ + # despite the documentation, virsh detach-disk just wants the device + # name without the leading /dev/ + volume = volumeservice.get_volume(volume_id) + target = volume['mountpoint'].rpartition('/dev/')[2] + yield process.simple_execute( + "sudo virsh detach-disk %s %s " % (instance_id, target)) + volume.finish_detach() + defer.returnValue(True) + + +class Group(object): + def __init__(self, group_id): + self.group_id = group_id + + +class ProductCode(object): + def __init__(self, product_code): + self.product_code = product_code + + +class Instance(object): + + NOSTATE = 0x00 + RUNNING = 0x01 + BLOCKED = 0x02 + PAUSED = 0x03 + SHUTDOWN = 0x04 + SHUTOFF = 0x05 + CRASHED = 0x06 + + def __init__(self, conn, name, data): + """ spawn an instance with a given name """ + self._conn = conn + # TODO(vish): this can be removed after data has been updated + # data doesn't seem to have a working iterator so in doesn't work + if data.get('owner_id', None) is not None: + data['user_id'] = data['owner_id'] + data['project_id'] = data['owner_id'] + self.datamodel = data + + size = data.get('instance_type', FLAGS.default_instance_type) + if size not in INSTANCE_TYPES: + raise exception.Error('invalid instance type: %s' % size) + + self.datamodel.update(INSTANCE_TYPES[size]) + + self.datamodel['name'] = name + self.datamodel['instance_id'] = name + self.datamodel['basepath'] = data.get( + 'basepath', os.path.abspath( + os.path.join(FLAGS.instances_path, self.name))) + self.datamodel['memory_kb'] = int(self.datamodel['memory_mb']) * 1024 + self.datamodel.setdefault('image_id', FLAGS.default_image) + self.datamodel.setdefault('kernel_id', FLAGS.default_kernel) + self.datamodel.setdefault('ramdisk_id', FLAGS.default_ramdisk) + self.datamodel.setdefault('project_id', self.datamodel['user_id']) + self.datamodel.setdefault('bridge_name', None) + #self.datamodel.setdefault('key_data', None) + #self.datamodel.setdefault('key_name', None) + #self.datamodel.setdefault('addressing_type', None) + + # TODO(joshua) - The ugly non-flat ones + self.datamodel['groups'] = data.get('security_group', 'default') + # TODO(joshua): Support product codes somehow + self.datamodel.setdefault('product_codes', None) + + self.datamodel.save() + logging.debug("Finished init of Instance with id of %s" % name) + + def toXml(self): + # TODO(termie): cache? + logging.debug("Starting the toXML method") + libvirt_xml = open(FLAGS.libvirt_xml_template).read() + xml_info = self.datamodel.copy() + # TODO(joshua): Make this xml express the attached disks as well + + # TODO(termie): lazy lazy hack because xml is annoying + xml_info['nova'] = json.dumps(self.datamodel.copy()) + libvirt_xml = libvirt_xml % xml_info + logging.debug("Finished the toXML method") + + return libvirt_xml + + @classmethod + def fromName(cls, conn, name): + """ use the saved data for reloading the instance """ + instdir = model.InstanceDirectory() + instance = instdir.get(name) + return cls(conn=conn, name=name, data=instance) + + def set_state(self, state_code, state_description=None): + self.datamodel['state'] = state_code + if not state_description: + state_description = STATE_NAMES[state_code] + self.datamodel['state_description'] = state_description + self.datamodel.save() + + @property + def state(self): + # it is a string in datamodel + return int(self.datamodel['state']) + + @property + def name(self): + return self.datamodel['name'] + + def is_pending(self): + return (self.state == Instance.NOSTATE or self.state == 'pending') + + def is_destroyed(self): + return self.state == Instance.SHUTOFF + + def is_running(self): + logging.debug("Instance state is: %s" % self.state) + return (self.state == Instance.RUNNING or self.state == 'running') + + def describe(self): + return self.datamodel + + def info(self): + logging.debug("Getting info for dom %s" % self.name) + virt_dom = self._conn.lookupByName(self.name) + (state, max_mem, mem, num_cpu, cpu_time) = virt_dom.info() + return {'state': state, + 'max_mem': max_mem, + 'mem': mem, + 'num_cpu': num_cpu, + 'cpu_time': cpu_time, + 'node_name': FLAGS.node_name} + + def basepath(self, path=''): + return os.path.abspath(os.path.join(self.datamodel['basepath'], path)) + + def update_state(self): + self.datamodel.update(self.info()) + self.set_state(self.state) + self.datamodel.save() # Extra, but harmless + + @exception.wrap_exception + def destroy(self): + if self.is_destroyed(): + self.datamodel.destroy() + raise exception.Error('trying to destroy already destroyed' + ' instance: %s' % self.name) + + self.set_state(Instance.NOSTATE, 'shutting_down') + try: + virt_dom = self._conn.lookupByName(self.name) + virt_dom.destroy() + except Exception, _err: + pass + # If the instance is already terminated, we're still happy + d = defer.Deferred() + d.addCallback(lambda x: self._cleanup()) + d.addCallback(lambda x: self.datamodel.destroy()) + # TODO(termie): short-circuit me for tests + # WE'LL save this for when we do shutdown, + # instead of destroy - but destroy returns immediately + timer = task.LoopingCall(f=None) + def _wait_for_shutdown(): + try: + self.update_state() + if self.state == Instance.SHUTDOWN: + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_shutdown + timer.start(interval=0.5, now=True) + return d + + def _cleanup(self): + target = os.path.abspath(self.datamodel['basepath']) + logging.info("Deleting instance files at %s", target) + shutil.rmtree(target) + + @defer.inlineCallbacks + @exception.wrap_exception + def reboot(self): + if not self.is_running(): + raise exception.Error( + 'trying to reboot a non-running' + 'instance: %s (state: %s)' % (self.name, self.state)) + + logging.debug('rebooting instance %s' % self.name) + self.set_state(Instance.NOSTATE, 'rebooting') + yield self._conn.lookupByName(self.name).destroy() + self._conn.createXML(self.toXml(), 0) + + d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_reboot(): + try: + self.update_state() + if self.is_running(): + logging.debug('rebooted instance %s' % self.name) + timer.stop() + d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + timer.stop() + d.callback(None) + timer.f = _wait_for_reboot + timer.start(interval=0.5, now=True) + yield d + + def _fetch_s3_image(self, image, path): + url = _image_url('%s/image' % image) + d = process.simple_execute( + 'curl --silent %s -o %s' % (url, path)) + return d + + def _fetch_local_image(self, image, path): + source = _image_path('%s/image' % image) + d = process.simple_execute('cp %s %s' % (source, path)) + return d + + @defer.inlineCallbacks + def _create_image(self, libvirt_xml): + # syntactic nicety + data = self.datamodel + basepath = self.basepath + + # ensure directories exist and are writable + yield process.simple_execute( + 'mkdir -p %s' % basepath()) + yield process.simple_execute( + 'chmod 0777 %s' % basepath()) + + + # TODO(termie): these are blocking calls, it would be great + # if they weren't. + logging.info('Creating image for: %s', data['instance_id']) + f = open(basepath('libvirt.xml'), 'w') + f.write(libvirt_xml) + f.close() + + if FLAGS.fake_libvirt: + logging.info('fake_libvirt, nothing to do for create_image') + raise defer.returnValue(None); + + if FLAGS.use_s3: + _fetch_file = self._fetch_s3_image + else: + _fetch_file = self._fetch_local_image + + if not os.path.exists(basepath('disk')): + yield _fetch_file(data['image_id'], basepath('disk-raw')) + if not os.path.exists(basepath('kernel')): + yield _fetch_file(data['kernel_id'], basepath('kernel')) + if not os.path.exists(basepath('ramdisk')): + yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) + + execute = lambda cmd, input=None: \ + process.simple_execute(cmd=cmd, + input=input, + error_ok=1) + + key = data['key_data'] + net = None + if FLAGS.simple_network: + with open(FLAGS.simple_network_template) as f: + net = f.read() % {'address': data['private_dns_name'], + 'network': FLAGS.simple_network_network, + 'netmask': FLAGS.simple_network_netmask, + 'gateway': FLAGS.simple_network_gateway, + 'broadcast': FLAGS.simple_network_broadcast, + 'dns': FLAGS.simple_network_dns} + if key or net: + logging.info('Injecting data into image %s', data['image_id']) + yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) + + if os.path.exists(basepath('disk')): + yield process.simple_execute( + 'rm -f %s' % basepath('disk')) + + bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] + * 1024 * 1024 * 1024) + yield disk.partition( + basepath('disk-raw'), basepath('disk'), bytes, execute=execute) + + @defer.inlineCallbacks + @exception.wrap_exception + def spawn(self): + self.set_state(Instance.NOSTATE, 'spawning') + logging.debug("Starting spawn in Instance") + + xml = self.toXml() + self.set_state(Instance.NOSTATE, 'launching') + logging.info('self %s', self) + try: + yield self._create_image(xml) + self._conn.createXML(xml, 0) + # TODO(termie): this should actually register + # a callback to check for successful boot + logging.debug("Instance is running") + + local_d = defer.Deferred() + timer = task.LoopingCall(f=None) + def _wait_for_boot(): + try: + self.update_state() + if self.is_running(): + logging.debug('booted instance %s' % self.name) + timer.stop() + local_d.callback(None) + except Exception: + self.set_state(Instance.SHUTDOWN) + logging.error('Failed to boot instance %s' % self.name) + timer.stop() + local_d.callback(None) + timer.f = _wait_for_boot + timer.start(interval=0.5, now=True) + except Exception, ex: + logging.debug(ex) + self.set_state(Instance.SHUTDOWN) + + @exception.wrap_exception + def console_output(self): + if not FLAGS.fake_libvirt: + fname = os.path.abspath( + os.path.join(self.datamodel['basepath'], 'console.log')) + with open(fname, 'r') as f: + console = f.read() + else: + console = 'FAKE CONSOLE OUTPUT' + return defer.succeed(console) + +STATE_NAMES = { + Instance.NOSTATE : 'pending', + Instance.RUNNING : 'running', + Instance.BLOCKED : 'blocked', + Instance.PAUSED : 'paused', + Instance.SHUTDOWN : 'shutdown', + Instance.SHUTOFF : 'shutdown', + Instance.CRASHED : 'crashed', +} diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py index eaa608b1e..6e9bdead8 100644 --- a/nova/endpoint/cloud.py +++ b/nova/endpoint/cloud.py @@ -23,7 +23,6 @@ datastore. """ import base64 -import json import logging import os import time @@ -38,9 +37,9 @@ from nova.auth import rbac from nova.auth import users from nova.compute import model from nova.compute import network -from nova.compute import computenode +from nova.compute import computeservice from nova.endpoint import images -from nova.volume import volumenode +from nova.volume import volumeservice FLAGS = flags.FLAGS @@ -76,7 +75,7 @@ class CloudController(object): def volumes(self): """ returns a list of all volumes """ for volume_id in datastore.Redis.instance().smembers("volumes"): - volume = volumenode.get_volume(volume_id) + volume = volumeservice.get_volume(volume_id) yield volume def __str__(self): @@ -103,7 +102,7 @@ class CloudController(object): result = {} for instance in self.instdir.all: if instance['project_id'] == project_id: - line = '%s slots=%d' % (instance['private_dns_name'], computenode.INSTANCE_TYPES[instance['instance_type']]['vcpus']) + line = '%s slots=%d' % (instance['private_dns_name'], computeservice.INSTANCE_TYPES[instance['instance_type']]['vcpus']) if instance['key_name'] in result: result[instance['key_name']].append(line) else: @@ -296,7 +295,7 @@ class CloudController(object): @rbac.allow('projectmanager', 'sysadmin') def create_volume(self, context, size, **kwargs): - # TODO(vish): refactor this to create the volume object here and tell volumenode to create it + # TODO(vish): refactor this to create the volume object here and tell volumeservice to create it res = rpc.call(FLAGS.volume_topic, {"method": "create_volume", "args" : {"size": size, "user_id": context.user.id, @@ -331,7 +330,7 @@ class CloudController(object): raise exception.NotFound('Instance %s could not be found' % instance_id) def _get_volume(self, context, volume_id): - volume = volumenode.get_volume(volume_id) + volume = volumeservice.get_volume(volume_id) if context.user.is_admin() or volume['project_id'] == context.project.id: return volume raise exception.NotFound('Volume %s could not be found' % volume_id) @@ -578,7 +577,7 @@ class CloudController(object): "args": {"instance_id" : inst.instance_id}}) logging.debug("Casting to node for %s's instance with IP of %s" % (context.user.name, inst['private_dns_name'])) - # TODO: Make the NetworkComputeNode figure out the network name from ip. + # TODO: Make Network figure out the network name from ip. return defer.succeed(self._format_instances( context, reservation_id)) diff --git a/nova/network/networknode.py b/nova/network/networknode.py deleted file mode 100644 index e5a346551..000000000 --- a/nova/network/networknode.py +++ /dev/null @@ -1,35 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Network Nodes are responsible for allocating ips and setting up network -""" - -import logging - -from nova import flags -from nova import node - - -FLAGS = flags.FLAGS - -class NetworkNode(node.Node): - """Allocates ips and sets up networks""" - - def __init__(self): - logging.debug("Network node working") diff --git a/nova/network/networkservice.py b/nova/network/networkservice.py new file mode 100644 index 000000000..9d87e05e6 --- /dev/null +++ b/nova/network/networkservice.py @@ -0,0 +1,35 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Network Nodes are responsible for allocating ips and setting up network +""" + +import logging + +from nova import flags +from nova import service + + +FLAGS = flags.FLAGS + +class NetworkService(service.Service): + """Allocates ips and sets up networks""" + + def __init__(self): + logging.debug("Network node working") diff --git a/nova/node.py b/nova/node.py deleted file mode 100644 index 852344da9..000000000 --- a/nova/node.py +++ /dev/null @@ -1,103 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Generic Node baseclass for all workers that run on hosts -""" - -import inspect -import logging -import os - -from twisted.internet import defer -from twisted.internet import task -from twisted.application import service - -from nova import datastore -from nova import flags -from nova import rpc -from nova.compute import model - - -FLAGS = flags.FLAGS - -flags.DEFINE_integer('report_interval', 10, - 'seconds between nodes reporting state to cloud', - lower_bound=1) - -class Node(object, service.Service): - """Base class for workers that run on hosts""" - - @classmethod - def create(cls, - report_interval=None, # defaults to flag - bin_name=None, # defaults to basename of executable - topic=None): # defaults to basename - "nova-" part - """Instantiates class and passes back application object""" - if not report_interval: - # NOTE(vish): set here because if it is set to flag in the - # parameter list, it wrongly uses the default - report_interval = FLAGS.report_interval - # NOTE(vish): magic to automatically determine bin_name and topic - if not bin_name: - bin_name = os.path.basename(inspect.stack()[-1][1]) - if not topic: - topic = bin_name.rpartition("nova-")[2] - logging.warn("Starting %s node" % topic) - node_instance = cls() - - conn = rpc.Connection.instance() - consumer_all = rpc.AdapterConsumer( - connection=conn, - topic='%s' % topic, - proxy=node_instance) - - consumer_node = rpc.AdapterConsumer( - connection=conn, - topic='%s.%s' % (topic, FLAGS.node_name), - proxy=node_instance) - - pulse = task.LoopingCall(node_instance.report_state, - FLAGS.node_name, - bin_name) - pulse.start(interval=report_interval, now=False) - - consumer_all.attach_to_twisted() - consumer_node.attach_to_twisted() - - # This is the parent service that twistd will be looking for when it - # parses this file, return it so that we can get it into globals below - application = service.Application(bin_name) - node_instance.setServiceParent(application) - return application - - @defer.inlineCallbacks - def report_state(self, nodename, daemon): - # TODO(termie): make this pattern be more elegant. -todd - try: - record = model.Daemon(nodename, daemon) - record.heartbeat() - if getattr(self, "model_disconnected", False): - self.model_disconnected = False - logging.error("Recovered model server connection!") - - except datastore.ConnectionError, ex: - if not getattr(self, "model_disconnected", False): - self.model_disconnected = True - logging.exception("model server went away") - yield diff --git a/nova/service.py b/nova/service.py new file mode 100644 index 000000000..96281bc6b --- /dev/null +++ b/nova/service.py @@ -0,0 +1,103 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Generic Node baseclass for all workers that run on hosts +""" + +import inspect +import logging +import os + +from twisted.internet import defer +from twisted.internet import task +from twisted.application import service + +from nova import datastore +from nova import flags +from nova import rpc +from nova.compute import model + + +FLAGS = flags.FLAGS + +flags.DEFINE_integer('report_interval', 10, + 'seconds between nodes reporting state to cloud', + lower_bound=1) + +class Service(object, service.Service): + """Base class for workers that run on hosts""" + + @classmethod + def create(cls, + report_interval=None, # defaults to flag + bin_name=None, # defaults to basename of executable + topic=None): # defaults to basename - "nova-" part + """Instantiates class and passes back application object""" + if not report_interval: + # NOTE(vish): set here because if it is set to flag in the + # parameter list, it wrongly uses the default + report_interval = FLAGS.report_interval + # NOTE(vish): magic to automatically determine bin_name and topic + if not bin_name: + bin_name = os.path.basename(inspect.stack()[-1][1]) + if not topic: + topic = bin_name.rpartition("nova-")[2] + logging.warn("Starting %s node" % topic) + node_instance = cls() + + conn = rpc.Connection.instance() + consumer_all = rpc.AdapterConsumer( + connection=conn, + topic='%s' % topic, + proxy=node_instance) + + consumer_node = rpc.AdapterConsumer( + connection=conn, + topic='%s.%s' % (topic, FLAGS.node_name), + proxy=node_instance) + + pulse = task.LoopingCall(node_instance.report_state, + FLAGS.node_name, + bin_name) + pulse.start(interval=report_interval, now=False) + + consumer_all.attach_to_twisted() + consumer_node.attach_to_twisted() + + # This is the parent service that twistd will be looking for when it + # parses this file, return it so that we can get it into globals below + application = service.Application(bin_name) + node_instance.setServiceParent(application) + return application + + @defer.inlineCallbacks + def report_state(self, nodename, daemon): + # TODO(termie): make this pattern be more elegant. -todd + try: + record = model.Daemon(nodename, daemon) + record.heartbeat() + if getattr(self, "model_disconnected", False): + self.model_disconnected = False + logging.error("Recovered model server connection!") + + except datastore.ConnectionError, ex: + if not getattr(self, "model_disconnected", False): + self.model_disconnected = True + logging.exception("model server went away") + yield diff --git a/nova/test.py b/nova/test.py index 5dcf0b9b0..6fbcab5e4 100644 --- a/nova/test.py +++ b/nova/test.py @@ -156,9 +156,9 @@ class BaseTestCase(TrialTestCase): Example (callback chain, ugly): - d = self.node.terminate_instance(instance_id) # a Deferred instance + d = self.compute.terminate_instance(instance_id) # a Deferred instance def _describe(_): - d_desc = self.node.describe_instances() # another Deferred instance + d_desc = self.compute.describe_instances() # another Deferred instance return d_desc def _checkDescribe(rv): self.assertEqual(rv, []) @@ -169,8 +169,8 @@ class BaseTestCase(TrialTestCase): Example (inline callbacks! yay!): - yield self.node.terminate_instance(instance_id) - rv = yield self.node.describe_instances() + yield self.compute.terminate_instance(instance_id) + rv = yield self.compute.describe_instances() self.assertEqual(rv, []) If the test fits the Inline Callbacks pattern we will automatically diff --git a/nova/tests/cloud_unittest.py b/nova/tests/cloud_unittest.py index 7ab2c257a..38f4de8d9 100644 --- a/nova/tests/cloud_unittest.py +++ b/nova/tests/cloud_unittest.py @@ -28,7 +28,7 @@ from nova import flags from nova import rpc from nova import test from nova.auth import users -from nova.compute import computenode +from nova.compute import computeservice from nova.endpoint import api from nova.endpoint import cloud @@ -53,12 +53,12 @@ class CloudTestCase(test.BaseTestCase): proxy=self.cloud) self.injected.append(self.cloud_consumer.attach_to_tornado(self.ioloop)) - # set up a node - self.node = computenode.ComputeNode() - self.node_consumer = rpc.AdapterConsumer(connection=self.conn, + # set up a service + self.compute = computeservice.ComputeService() + self.compute_consumer = rpc.AdapterConsumer(connection=self.conn, topic=FLAGS.compute_topic, - proxy=self.node) - self.injected.append(self.node_consumer.attach_to_tornado(self.ioloop)) + proxy=self.compute) + self.injected.append(self.compute_consumer.attach_to_tornado(self.ioloop)) try: users.UserManager.instance().create_user('admin', 'admin', 'admin') @@ -76,11 +76,11 @@ class CloudTestCase(test.BaseTestCase): logging.debug("Can't test instances without a real virtual env.") return instance_id = 'foo' - inst = yield self.node.run_instance(instance_id) + inst = yield self.compute.run_instance(instance_id) output = yield self.cloud.get_console_output(self.context, [instance_id]) logging.debug(output) self.assert_(output) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) def test_run_instances(self): if FLAGS.fake_libvirt: @@ -112,7 +112,7 @@ class CloudTestCase(test.BaseTestCase): # for instance in reservations[res_id]: for instance in reservations[reservations.keys()[0]]: logging.debug("Terminating instance %s" % instance['instance_id']) - rv = yield self.node.terminate_instance(instance['instance_id']) + rv = yield self.compute.terminate_instance(instance['instance_id']) def test_instance_update_state(self): def instance(num): diff --git a/nova/tests/compute_unittest.py b/nova/tests/compute_unittest.py index 4c0f1afb3..db08308bb 100644 --- a/nova/tests/compute_unittest.py +++ b/nova/tests/compute_unittest.py @@ -26,7 +26,7 @@ from nova import flags from nova import test from nova import utils from nova.compute import model -from nova.compute import computenode +from nova.compute import computeservice FLAGS = flags.FLAGS @@ -60,7 +60,7 @@ class ComputeConnectionTestCase(test.TrialTestCase): self.flags(fake_libvirt=True, fake_storage=True, fake_users=True) - self.node = computenode.ComputeNode() + self.compute = computeservice.ComputeService() def create_instance(self): instdir = model.InstanceDirectory() @@ -81,48 +81,48 @@ class ComputeConnectionTestCase(test.TrialTestCase): def test_run_describe_terminate(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() logging.info("Running instances: %s", rv) self.assertEqual(rv[instance_id].name, instance_id) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() logging.info("After terminating instances: %s", rv) self.assertEqual(rv, {}) @defer.inlineCallbacks def test_reboot(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() self.assertEqual(rv[instance_id].name, instance_id) - yield self.node.reboot_instance(instance_id) + yield self.compute.reboot_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() self.assertEqual(rv[instance_id].name, instance_id) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) @defer.inlineCallbacks def test_console_output(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - console = yield self.node.get_console_output(instance_id) + console = yield self.compute.get_console_output(instance_id) self.assert_(console) - rv = yield self.node.terminate_instance(instance_id) + rv = yield self.compute.terminate_instance(instance_id) @defer.inlineCallbacks def test_run_instance_existing(self): instance_id = self.create_instance() - rv = yield self.node.run_instance(instance_id) + rv = yield self.compute.run_instance(instance_id) - rv = yield self.node.describe_instances() + rv = yield self.compute.describe_instances() self.assertEqual(rv[instance_id].name, instance_id) - self.assertRaises(exception.Error, self.node.run_instance, instance_id) - rv = yield self.node.terminate_instance(instance_id) + self.assertRaises(exception.Error, self.compute.run_instance, instance_id) + rv = yield self.compute.terminate_instance(instance_id) diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py index c176453d8..568b199a0 100644 --- a/nova/tests/volume_unittest.py +++ b/nova/tests/volume_unittest.py @@ -21,8 +21,8 @@ import logging from nova import exception from nova import flags from nova import test -from nova.compute import computenode -from nova.volume import volumenode +from nova.compute import computeservice +from nova.volume import volumeservice FLAGS = flags.FLAGS @@ -32,24 +32,24 @@ class VolumeTestCase(test.TrialTestCase): def setUp(self): logging.getLogger().setLevel(logging.DEBUG) super(VolumeTestCase, self).setUp() - self.mynode = computenode.ComputeNode() - self.mystorage = None + self.compute = computeservice.ComputeService() + self.volume = None self.flags(fake_libvirt=True, fake_storage=True) - self.mystorage = volumenode.VolumeNode() + self.volume = volumeservice.VolumeService() def test_run_create_volume(self): vol_size = '0' user_id = 'fake' project_id = 'fake' - volume_id = self.mystorage.create_volume(vol_size, user_id, project_id) + volume_id = self.volume.create_volume(vol_size, user_id, project_id) # TODO(termie): get_volume returns differently than create_volume self.assertEqual(volume_id, - volumenode.get_volume(volume_id)['volume_id']) + volumeservice.get_volume(volume_id)['volume_id']) - rv = self.mystorage.delete_volume(volume_id) + rv = self.volume.delete_volume(volume_id) self.assertRaises(exception.Error, - volumenode.get_volume, + volumeservice.get_volume, volume_id) def test_too_big_volume(self): @@ -57,7 +57,7 @@ class VolumeTestCase(test.TrialTestCase): user_id = 'fake' project_id = 'fake' self.assertRaises(TypeError, - self.mystorage.create_volume, + self.volume.create_volume, vol_size, user_id, project_id) def test_too_many_volumes(self): @@ -68,26 +68,26 @@ class VolumeTestCase(test.TrialTestCase): total_slots = FLAGS.slots_per_shelf * num_shelves vols = [] for i in xrange(total_slots): - vid = self.mystorage.create_volume(vol_size, user_id, project_id) + vid = self.volume.create_volume(vol_size, user_id, project_id) vols.append(vid) - self.assertRaises(volumenode.NoMoreVolumes, - self.mystorage.create_volume, + self.assertRaises(volumeservice.NoMoreVolumes, + self.volume.create_volume, vol_size, user_id, project_id) for id in vols: - self.mystorage.delete_volume(id) + self.volume.delete_volume(id) def test_run_attach_detach_volume(self): - # Create one volume and one node to test with + # Create one volume and one compute to test with instance_id = "storage-test" vol_size = "5" user_id = "fake" project_id = 'fake' mountpoint = "/dev/sdf" - volume_id = self.mystorage.create_volume(vol_size, user_id, project_id) + volume_id = self.volume.create_volume(vol_size, user_id, project_id) - volume_obj = volumenode.get_volume(volume_id) + volume_obj = volumeservice.get_volume(volume_id) volume_obj.start_attach(instance_id, mountpoint) - rv = yield self.mynode.attach_volume(volume_id, + rv = yield self.compute.attach_volume(volume_id, instance_id, mountpoint) self.assertEqual(volume_obj['status'], "in-use") @@ -96,16 +96,16 @@ class VolumeTestCase(test.TrialTestCase): self.assertEqual(volume_obj['mountpoint'], mountpoint) self.assertRaises(exception.Error, - self.mystorage.delete_volume, + self.volume.delete_volume, volume_id) - rv = yield self.mystorage.detach_volume(volume_id) - volume_obj = volumenode.get_volume(volume_id) + rv = yield self.volume.detach_volume(volume_id) + volume_obj = volumeservice.get_volume(volume_id) self.assertEqual(volume_obj['status'], "available") - rv = self.mystorage.delete_volume(volume_id) + rv = self.volume.delete_volume(volume_id) self.assertRaises(exception.Error, - volumenode.get_volume, + volumeservice.get_volume, volume_id) def test_multi_node(self): diff --git a/nova/volume/volumenode.py b/nova/volume/volumenode.py deleted file mode 100644 index 6b4ad0d87..000000000 --- a/nova/volume/volumenode.py +++ /dev/null @@ -1,305 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Nova Storage manages creating, attaching, detaching, and -destroying persistent storage volumes, ala EBS. -Currently uses Ata-over-Ethernet. -""" - -import glob -import logging -import os -import shutil -import socket -import tempfile - -from twisted.application import service -from twisted.internet import defer - -from nova import datastore -from nova import exception -from nova import flags -from nova import node -from nova import process -from nova import utils -from nova import validate - - -FLAGS = flags.FLAGS -flags.DEFINE_string('storage_dev', '/dev/sdb', - 'Physical device to use for volumes') -flags.DEFINE_string('volume_group', 'nova-volumes', - 'Name for the VG that will contain exported volumes') -flags.DEFINE_string('aoe_eth_dev', 'eth0', - 'Which device to export the volumes on') -flags.DEFINE_string('storage_name', - socket.gethostname(), - 'name of this node') -flags.DEFINE_integer('first_shelf_id', - utils.last_octet(utils.get_my_ip()) * 10, - 'AoE starting shelf_id for this node') -flags.DEFINE_integer('last_shelf_id', - utils.last_octet(utils.get_my_ip()) * 10 + 9, - 'AoE starting shelf_id for this node') -flags.DEFINE_string('aoe_export_dir', - '/var/lib/vblade-persist/vblades', - 'AoE directory where exports are created') -flags.DEFINE_integer('slots_per_shelf', - 16, - 'Number of AoE slots per shelf') -flags.DEFINE_string('storage_availability_zone', - 'nova', - 'availability zone of this node') -flags.DEFINE_boolean('fake_storage', False, - 'Should we make real storage volumes to attach?') - - -class NoMoreVolumes(exception.Error): - pass - -def get_volume(volume_id): - """ Returns a redis-backed volume object """ - volume_class = Volume - if FLAGS.fake_storage: - volume_class = FakeVolume - if datastore.Redis.instance().sismember('volumes', volume_id): - return volume_class(volume_id=volume_id) - raise exception.Error("Volume does not exist") - -class VolumeNode(node.Node): - """ - There is one VolumeNode running on each host. - However, each VolumeNode can report on the state of - *all* volumes in the cluster. - """ - def __init__(self): - super(VolumeNode, self).__init__() - self.volume_class = Volume - if FLAGS.fake_storage: - FLAGS.aoe_export_dir = tempfile.mkdtemp() - self.volume_class = FakeVolume - self._init_volume_group() - - def __del__(self): - # TODO(josh): Get rid of this destructor, volumes destroy themselves - if FLAGS.fake_storage: - try: - shutil.rmtree(FLAGS.aoe_export_dir) - except Exception, err: - pass - - @validate.rangetest(size=(0, 1000)) - def create_volume(self, size, user_id, project_id): - """ - Creates an exported volume (fake or real), - restarts exports to make it available. - Volume at this point has size, owner, and zone. - """ - logging.debug("Creating volume of size: %s" % (size)) - vol = self.volume_class.create(size, user_id, project_id) - datastore.Redis.instance().sadd('volumes', vol['volume_id']) - datastore.Redis.instance().sadd('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) - self._restart_exports() - return vol['volume_id'] - - def by_node(self, node_id): - """ returns a list of volumes for a node """ - for volume_id in datastore.Redis.instance().smembers('volumes:%s' % (node_id)): - yield self.volume_class(volume_id=volume_id) - - @property - def all(self): - """ returns a list of all volumes """ - for volume_id in datastore.Redis.instance().smembers('volumes'): - yield self.volume_class(volume_id=volume_id) - - def delete_volume(self, volume_id): - logging.debug("Deleting volume with id of: %s" % (volume_id)) - vol = get_volume(volume_id) - if vol['status'] == "attached": - raise exception.Error("Volume is still attached") - if vol['node_name'] != FLAGS.storage_name: - raise exception.Error("Volume is not local to this node") - vol.destroy() - datastore.Redis.instance().srem('volumes', vol['volume_id']) - datastore.Redis.instance().srem('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) - return True - - @defer.inlineCallbacks - def _restart_exports(self): - if FLAGS.fake_storage: - return - yield process.simple_execute( - "sudo vblade-persist auto all") - yield process.simple_execute( - "sudo vblade-persist start all") - - @defer.inlineCallbacks - def _init_volume_group(self): - if FLAGS.fake_storage: - return - yield process.simple_execute( - "sudo pvcreate %s" % (FLAGS.storage_dev)) - yield process.simple_execute( - "sudo vgcreate %s %s" % (FLAGS.volume_group, - FLAGS.storage_dev)) - -class Volume(datastore.BasicModel): - - def __init__(self, volume_id=None): - self.volume_id = volume_id - super(Volume, self).__init__() - - @property - def identifier(self): - return self.volume_id - - def default_state(self): - return {"volume_id": self.volume_id} - - @classmethod - def create(cls, size, user_id, project_id): - volume_id = utils.generate_uid('vol') - vol = cls(volume_id) - vol['node_name'] = FLAGS.storage_name - vol['size'] = size - vol['user_id'] = user_id - vol['project_id'] = project_id - vol['availability_zone'] = FLAGS.storage_availability_zone - vol["instance_id"] = 'none' - vol["mountpoint"] = 'none' - vol['attach_time'] = 'none' - vol['status'] = "creating" # creating | available | in-use - vol['attach_status'] = "detached" # attaching | attached | detaching | detached - vol['delete_on_termination'] = 'False' - vol.save() - vol.create_lv() - vol._setup_export() - # TODO(joshua) - We need to trigger a fanout message for aoe-discover on all the nodes - # TODO(joshua - vol['status'] = "available" - vol.save() - return vol - - def start_attach(self, instance_id, mountpoint): - """ """ - self['instance_id'] = instance_id - self['mountpoint'] = mountpoint - self['status'] = "in-use" - self['attach_status'] = "attaching" - self['attach_time'] = utils.isotime() - self['delete_on_termination'] = 'False' - self.save() - - def finish_attach(self): - """ """ - self['attach_status'] = "attached" - self.save() - - def start_detach(self): - """ """ - self['attach_status'] = "detaching" - self.save() - - def finish_detach(self): - self['instance_id'] = None - self['mountpoint'] = None - self['status'] = "available" - self['attach_status'] = "detached" - self.save() - - def destroy(self): - try: - self._remove_export() - except: - pass - self._delete_lv() - super(Volume, self).destroy() - - @defer.inlineCallbacks - def create_lv(self): - if str(self['size']) == '0': - sizestr = '100M' - else: - sizestr = '%sG' % self['size'] - yield process.simple_execute( - "sudo lvcreate -L %s -n %s %s" % (sizestr, - self['volume_id'], - FLAGS.volume_group)) - - @defer.inlineCallbacks - def _delete_lv(self): - yield process.simple_execute( - "sudo lvremove -f %s/%s" % (FLAGS.volume_group, - self['volume_id'])) - - def _setup_export(self): - (shelf_id, blade_id) = get_next_aoe_numbers() - self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id) - self['shelf_id'] = shelf_id - self['blade_id'] = blade_id - self.save() - self._exec_export() - - @defer.inlineCallbacks - def _exec_export(self): - yield process.simple_execute( - "sudo vblade-persist setup %s %s %s /dev/%s/%s" % - (self['shelf_id'], - self['blade_id'], - FLAGS.aoe_eth_dev, - FLAGS.volume_group, - self['volume_id'])) - - @defer.inlineCallbacks - def _remove_export(self): - yield process.simple_execute( - "sudo vblade-persist stop %s %s" % (self['shelf_id'], - self['blade_id'])) - yield process.simple_execute( - "sudo vblade-persist destroy %s %s" % (self['shelf_id'], - self['blade_id'])) - - -class FakeVolume(Volume): - def create_lv(self): - pass - - def _exec_export(self): - fname = os.path.join(FLAGS.aoe_export_dir, self['aoe_device']) - f = file(fname, "w") - f.close() - - def _remove_export(self): - pass - - def _delete_lv(self): - pass - -def get_next_aoe_numbers(): - for shelf_id in xrange(FLAGS.first_shelf_id, FLAGS.last_shelf_id + 1): - aoes = glob.glob("%s/e%s.*" % (FLAGS.aoe_export_dir, shelf_id)) - if not aoes: - blade_id = 0 - else: - blade_id = int(max([int(a.rpartition('.')[2]) for a in aoes])) + 1 - if blade_id < FLAGS.slots_per_shelf: - logging.debug("Next shelf.blade is %s.%s", shelf_id, blade_id) - return (shelf_id, blade_id) - raise NoMoreVolumes() diff --git a/nova/volume/volumeservice.py b/nova/volume/volumeservice.py new file mode 100644 index 000000000..87a47f40a --- /dev/null +++ b/nova/volume/volumeservice.py @@ -0,0 +1,304 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Nova Storage manages creating, attaching, detaching, and +destroying persistent storage volumes, ala EBS. +Currently uses Ata-over-Ethernet. +""" + +import glob +import logging +import os +import shutil +import socket +import tempfile + +from twisted.internet import defer + +from nova import datastore +from nova import exception +from nova import flags +from nova import process +from nova import service +from nova import utils +from nova import validate + + +FLAGS = flags.FLAGS +flags.DEFINE_string('storage_dev', '/dev/sdb', + 'Physical device to use for volumes') +flags.DEFINE_string('volume_group', 'nova-volumes', + 'Name for the VG that will contain exported volumes') +flags.DEFINE_string('aoe_eth_dev', 'eth0', + 'Which device to export the volumes on') +flags.DEFINE_string('storage_name', + socket.gethostname(), + 'name of this service') +flags.DEFINE_integer('first_shelf_id', + utils.last_octet(utils.get_my_ip()) * 10, + 'AoE starting shelf_id for this service') +flags.DEFINE_integer('last_shelf_id', + utils.last_octet(utils.get_my_ip()) * 10 + 9, + 'AoE starting shelf_id for this service') +flags.DEFINE_string('aoe_export_dir', + '/var/lib/vblade-persist/vblades', + 'AoE directory where exports are created') +flags.DEFINE_integer('slots_per_shelf', + 16, + 'Number of AoE slots per shelf') +flags.DEFINE_string('storage_availability_zone', + 'nova', + 'availability zone of this service') +flags.DEFINE_boolean('fake_storage', False, + 'Should we make real storage volumes to attach?') + + +class NoMoreVolumes(exception.Error): + pass + +def get_volume(volume_id): + """ Returns a redis-backed volume object """ + volume_class = Volume + if FLAGS.fake_storage: + volume_class = FakeVolume + if datastore.Redis.instance().sismember('volumes', volume_id): + return volume_class(volume_id=volume_id) + raise exception.Error("Volume does not exist") + +class VolumeService(service.Service): + """ + There is one VolumeNode running on each host. + However, each VolumeNode can report on the state of + *all* volumes in the cluster. + """ + def __init__(self): + super(VolumeService, self).__init__() + self.volume_class = Volume + if FLAGS.fake_storage: + FLAGS.aoe_export_dir = tempfile.mkdtemp() + self.volume_class = FakeVolume + self._init_volume_group() + + def __del__(self): + # TODO(josh): Get rid of this destructor, volumes destroy themselves + if FLAGS.fake_storage: + try: + shutil.rmtree(FLAGS.aoe_export_dir) + except Exception, err: + pass + + @validate.rangetest(size=(0, 1000)) + def create_volume(self, size, user_id, project_id): + """ + Creates an exported volume (fake or real), + restarts exports to make it available. + Volume at this point has size, owner, and zone. + """ + logging.debug("Creating volume of size: %s" % (size)) + vol = self.volume_class.create(size, user_id, project_id) + datastore.Redis.instance().sadd('volumes', vol['volume_id']) + datastore.Redis.instance().sadd('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) + self._restart_exports() + return vol['volume_id'] + + def by_node(self, node_id): + """ returns a list of volumes for a node """ + for volume_id in datastore.Redis.instance().smembers('volumes:%s' % (node_id)): + yield self.volume_class(volume_id=volume_id) + + @property + def all(self): + """ returns a list of all volumes """ + for volume_id in datastore.Redis.instance().smembers('volumes'): + yield self.volume_class(volume_id=volume_id) + + def delete_volume(self, volume_id): + logging.debug("Deleting volume with id of: %s" % (volume_id)) + vol = get_volume(volume_id) + if vol['status'] == "attached": + raise exception.Error("Volume is still attached") + if vol['node_name'] != FLAGS.storage_name: + raise exception.Error("Volume is not local to this node") + vol.destroy() + datastore.Redis.instance().srem('volumes', vol['volume_id']) + datastore.Redis.instance().srem('volumes:%s' % (FLAGS.storage_name), vol['volume_id']) + return True + + @defer.inlineCallbacks + def _restart_exports(self): + if FLAGS.fake_storage: + return + yield process.simple_execute( + "sudo vblade-persist auto all") + yield process.simple_execute( + "sudo vblade-persist start all") + + @defer.inlineCallbacks + def _init_volume_group(self): + if FLAGS.fake_storage: + return + yield process.simple_execute( + "sudo pvcreate %s" % (FLAGS.storage_dev)) + yield process.simple_execute( + "sudo vgcreate %s %s" % (FLAGS.volume_group, + FLAGS.storage_dev)) + +class Volume(datastore.BasicModel): + + def __init__(self, volume_id=None): + self.volume_id = volume_id + super(Volume, self).__init__() + + @property + def identifier(self): + return self.volume_id + + def default_state(self): + return {"volume_id": self.volume_id} + + @classmethod + def create(cls, size, user_id, project_id): + volume_id = utils.generate_uid('vol') + vol = cls(volume_id) + vol['node_name'] = FLAGS.storage_name + vol['size'] = size + vol['user_id'] = user_id + vol['project_id'] = project_id + vol['availability_zone'] = FLAGS.storage_availability_zone + vol["instance_id"] = 'none' + vol["mountpoint"] = 'none' + vol['attach_time'] = 'none' + vol['status'] = "creating" # creating | available | in-use + vol['attach_status'] = "detached" # attaching | attached | detaching | detached + vol['delete_on_termination'] = 'False' + vol.save() + vol.create_lv() + vol._setup_export() + # TODO(joshua) - We need to trigger a fanout message for aoe-discover on all the nodes + # TODO(joshua + vol['status'] = "available" + vol.save() + return vol + + def start_attach(self, instance_id, mountpoint): + """ """ + self['instance_id'] = instance_id + self['mountpoint'] = mountpoint + self['status'] = "in-use" + self['attach_status'] = "attaching" + self['attach_time'] = utils.isotime() + self['delete_on_termination'] = 'False' + self.save() + + def finish_attach(self): + """ """ + self['attach_status'] = "attached" + self.save() + + def start_detach(self): + """ """ + self['attach_status'] = "detaching" + self.save() + + def finish_detach(self): + self['instance_id'] = None + self['mountpoint'] = None + self['status'] = "available" + self['attach_status'] = "detached" + self.save() + + def destroy(self): + try: + self._remove_export() + except: + pass + self._delete_lv() + super(Volume, self).destroy() + + @defer.inlineCallbacks + def create_lv(self): + if str(self['size']) == '0': + sizestr = '100M' + else: + sizestr = '%sG' % self['size'] + yield process.simple_execute( + "sudo lvcreate -L %s -n %s %s" % (sizestr, + self['volume_id'], + FLAGS.volume_group)) + + @defer.inlineCallbacks + def _delete_lv(self): + yield process.simple_execute( + "sudo lvremove -f %s/%s" % (FLAGS.volume_group, + self['volume_id'])) + + def _setup_export(self): + (shelf_id, blade_id) = get_next_aoe_numbers() + self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id) + self['shelf_id'] = shelf_id + self['blade_id'] = blade_id + self.save() + self._exec_export() + + @defer.inlineCallbacks + def _exec_export(self): + yield process.simple_execute( + "sudo vblade-persist setup %s %s %s /dev/%s/%s" % + (self['shelf_id'], + self['blade_id'], + FLAGS.aoe_eth_dev, + FLAGS.volume_group, + self['volume_id'])) + + @defer.inlineCallbacks + def _remove_export(self): + yield process.simple_execute( + "sudo vblade-persist stop %s %s" % (self['shelf_id'], + self['blade_id'])) + yield process.simple_execute( + "sudo vblade-persist destroy %s %s" % (self['shelf_id'], + self['blade_id'])) + + +class FakeVolume(Volume): + def create_lv(self): + pass + + def _exec_export(self): + fname = os.path.join(FLAGS.aoe_export_dir, self['aoe_device']) + f = file(fname, "w") + f.close() + + def _remove_export(self): + pass + + def _delete_lv(self): + pass + +def get_next_aoe_numbers(): + for shelf_id in xrange(FLAGS.first_shelf_id, FLAGS.last_shelf_id + 1): + aoes = glob.glob("%s/e%s.*" % (FLAGS.aoe_export_dir, shelf_id)) + if not aoes: + blade_id = 0 + else: + blade_id = int(max([int(a.rpartition('.')[2]) for a in aoes])) + 1 + if blade_id < FLAGS.slots_per_shelf: + logging.debug("Next shelf.blade is %s.%s", shelf_id, blade_id) + return (shelf_id, blade_id) + raise NoMoreVolumes() -- cgit