From 33de18633fc6bb5fae64869dfe9963bf81f7f167 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Sun, 15 Aug 2010 15:55:53 -0700 Subject: refactoring volume and some cleanup in model and compute --- nova/compute/service.py | 24 ++-- nova/models.py | 38 ++++-- nova/tests/volume_unittest.py | 47 +++---- nova/volume/service.py | 280 ++++++++++++++---------------------------- 4 files changed, 155 insertions(+), 234 deletions(-) diff --git a/nova/compute/service.py b/nova/compute/service.py index 4e6a2c944..7f6f3ad6e 100644 --- a/nova/compute/service.py +++ b/nova/compute/service.py @@ -67,13 +67,10 @@ class ComputeService(service.Service): """ simple test of an AMQP message call """ return defer.succeed('PONG') - def get_instance(self, instance_id): - return models.Instance.find(instance_id) - def update_state(self, instance_id): inst = models.Instance.find(instance_id) # FIXME(ja): include other fields from state? - inst.state = self._conn.get_info(instance_id)['state'] + inst.state = self._conn.get_info(instance_id)['state'] inst.save() @exception.wrap_exception @@ -109,6 +106,8 @@ class ComputeService(service.Service): @exception.wrap_exception def run_instance(self, instance_id, **_kwargs): """ launch a new instance with specified options """ + if instance_id in self._conn.list_instances(): + raise exception.Error("Instance has already been created") logging.debug("Starting instance %s..." % (instance_id)) inst = models.Instance.find(instance_id) # NOTE(vish): passing network type allows us to express the @@ -135,19 +134,18 @@ class ComputeService(service.Service): def terminate_instance(self, instance_id): """ terminate an instance on this machine """ logging.debug("Got told to terminate instance %s" % instance_id) - session = models.create_session() - instance = session.query(models.Instance).filter_by(id=instance_id).one() + inst = models.Instance.find(instance_id) - if instance.state == power_state.SHUTOFF: + if inst.state == power_state.SHUTOFF: # self.datamodel.destroy() FIXME: RE-ADD ????? raise exception.Error('trying to destroy already destroyed' ' instance: %s' % instance_id) - instance.set_state(power_state.NOSTATE, 'shutting_down') - yield self._conn.destroy(instance) + inst.set_state(power_state.NOSTATE, 'shutting_down') + inst.save() + yield self._conn.destroy(inst) # FIXME(ja): should we keep it in a terminated state for a bit? - session.delete(instance) - session.flush() + inst.delete() @defer.inlineCallbacks @exception.wrap_exception @@ -155,7 +153,7 @@ class ComputeService(service.Service): """ reboot an instance on this server KVM doesn't support reboot, so we terminate and restart """ self.update_state(instance_id) - instance = self.get_instance(instance_id) + instance = models.Instance.find(instance_id) # FIXME(ja): this is only checking the model state - not state on disk? if instance.state != power_state.RUNNING: @@ -174,7 +172,7 @@ class ComputeService(service.Service): # FIXME: Abstract this for Xen logging.debug("Getting console output for %s" % (instance_id)) - inst = self.get_instance(instance_id) + inst = models.Instance.find(instance_id) if FLAGS.connection_type == 'libvirt': fname = os.path.abspath( diff --git a/nova/models.py b/nova/models.py index 62341a24c..c397270db 100644 --- a/nova/models.py +++ b/nova/models.py @@ -1,7 +1,8 @@ -from sqlalchemy.orm import relationship, backref, validates +from sqlalchemy.orm import relationship, backref, validates, exc from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, DateTime, Boolean, Text from sqlalchemy.ext.declarative import declarative_base from nova import auth +from nova import exception Base = declarative_base() @@ -14,9 +15,9 @@ class NovaBase(object): @classmethod def create_engine(cls): if NovaBase._engine is not None: - return _engine + return NovaBase._engine from sqlalchemy import create_engine - NovaBase._engine = create_engine('sqlite:///:memory:', echo=True) + NovaBase._engine = create_engine('sqlite:///:memory:', echo=False) Base.metadata.create_all(NovaBase._engine) return NovaBase._engine @@ -24,7 +25,7 @@ class NovaBase(object): def get_session(cls): from sqlalchemy.orm import sessionmaker if NovaBase._session == None: - NovaBase.create_engine(); + NovaBase.create_engine() NovaBase._session = sessionmaker(bind=NovaBase._engine)() return NovaBase._session @@ -37,13 +38,21 @@ class NovaBase(object): def find(cls, obj_id): session = NovaBase.get_session() #print cls - return session.query(cls).filter_by(id=obj_id).one() + try: + return session.query(cls).filter_by(id=obj_id).one() + except exc.NoResultFound: + raise exception.NotFound("No model for id %s" % obj_id) def save(self): session = NovaBase.get_session() session.add(self) session.commit() + def delete(self): + session = NovaBase.get_session() + session.delete(self) + session.flush() + class Image(Base, NovaBase): __tablename__ = 'images' user_id = Column(String)#, ForeignKey('users.id'), nullable=False) @@ -143,20 +152,33 @@ class Instance(Base, NovaBase): #def validate_state(self, key, state): # assert(state in ['nostate', 'running', 'blocked', 'paused', 'shutdown', 'shutoff', 'crashed']) -class Volume(Base): +class Volume(Base, NovaBase): __tablename__ = 'volumes' id = Column(Integer, primary_key=True) + volume_id = Column(String) shelf_id = Column(Integer) blade_id = Column(Integer) + user_id = Column(String) #, ForeignKey('users.id'), nullable=False) + project_id = Column(String) #, ForeignKey('projects.id')) + # FIXME: should be physical_node_id = Column(Integer) + node_name = Column(String) + size = Column(Integer) + alvailability_zone = Column(String) # FIXME foreign key? + instance_id = Column(Integer, ForeignKey('volumes.id'), nullable=True) + mountpoint = Column(String) + attach_time = Column(String) # FIXME datetime + status = Column(String) # FIXME enum? + attach_status = Column(String) # FIXME enum + delete_on_termination = Column(Boolean) def create_session(engine=None): return NovaBase.get_session() if __name__ == '__main__': - engine = NovasBase.create_engine() - session = NovasBase.create_session(engine) + engine = NovaBase.create_engine() + session = NovaBase.create_session(engine) instance = Instance(image_id='as', ramdisk_id='AS', user_id='anthony') user = User(id='anthony') diff --git a/nova/tests/volume_unittest.py b/nova/tests/volume_unittest.py index 2a07afe69..e979995fd 100644 --- a/nova/tests/volume_unittest.py +++ b/nova/tests/volume_unittest.py @@ -17,15 +17,14 @@ # under the License. import logging -import shutil -import tempfile from twisted.internet import defer -from nova import compute from nova import exception from nova import flags +from nova import models from nova import test +from nova.compute import service as compute_service from nova.volume import service as volume_service @@ -36,29 +35,22 @@ class VolumeTestCase(test.TrialTestCase): def setUp(self): logging.getLogger().setLevel(logging.DEBUG) super(VolumeTestCase, self).setUp() - self.compute = compute.service.ComputeService() - self.volume = None - self.tempdir = tempfile.mkdtemp() + self.compute = compute_service.ComputeService() self.flags(connection_type='fake', - fake_storage=True, - aoe_export_dir=self.tempdir) + fake_storage=True) self.volume = volume_service.VolumeService() - def tearDown(self): - shutil.rmtree(self.tempdir) - @defer.inlineCallbacks def test_run_create_volume(self): vol_size = '0' user_id = 'fake' project_id = 'fake' volume_id = yield self.volume.create_volume(vol_size, user_id, project_id) - # TODO(termie): get_volume returns differently than create_volume self.assertEqual(volume_id, - volume_service.get_volume(volume_id)['volume_id']) + models.Volume.find(volume_id).id) - rv = self.volume.delete_volume(volume_id) - self.assertRaises(exception.Error, volume_service.get_volume, volume_id) + yield self.volume.delete_volume(volume_id) + self.assertRaises(exception.NotFound, models.Volume.find, volume_id) @defer.inlineCallbacks def test_too_big_volume(self): @@ -100,32 +92,31 @@ class VolumeTestCase(test.TrialTestCase): project_id = 'fake' mountpoint = "/dev/sdf" volume_id = yield self.volume.create_volume(vol_size, user_id, project_id) - volume_obj = volume_service.get_volume(volume_id) - volume_obj.start_attach(instance_id, mountpoint) + vol = models.Volume.find(volume_id) + self.volume.start_attach(volume_id, instance_id, mountpoint) if FLAGS.fake_tests: - volume_obj.finish_attach() + self.volume.finish_attach(volume_id) else: rv = yield self.compute.attach_volume(instance_id, volume_id, mountpoint) - self.assertEqual(volume_obj['status'], "in-use") - self.assertEqual(volume_obj['attach_status'], "attached") - self.assertEqual(volume_obj['instance_id'], instance_id) - self.assertEqual(volume_obj['mountpoint'], mountpoint) + self.assertEqual(vol.status, "in-use") + self.assertEqual(vol.attach_status, "attached") + self.assertEqual(vol.instance_id, instance_id) + self.assertEqual(vol.mountpoint, mountpoint) self.assertFailure(self.volume.delete_volume(volume_id), exception.Error) - volume_obj.start_detach() + self.volume.start_detach(volume_id) if FLAGS.fake_tests: - volume_obj.finish_detach() + self.volume.finish_detach(volume_id) else: rv = yield self.volume.detach_volume(instance_id, volume_id) - volume_obj = volume_service.get_volume(volume_id) - self.assertEqual(volume_obj['status'], "available") + self.assertEqual(vol.status, "available") rv = self.volume.delete_volume(volume_id) self.assertRaises(exception.Error, - volume_service.get_volume, + models.Volume.find, volume_id) @defer.inlineCallbacks @@ -135,7 +126,7 @@ class VolumeTestCase(test.TrialTestCase): project_id = 'fake' shelf_blades = [] def _check(volume_id): - vol = volume_service.get_volume(volume_id) + vol = models.Volume.find(volume_id) shelf_blade = '%s.%s' % (vol['shelf_id'], vol['blade_id']) self.assert_(shelf_blade not in shelf_blades) shelf_blades.append(shelf_blade) diff --git a/nova/volume/service.py b/nova/volume/service.py index 1086b4cd0..76f7e9695 100644 --- a/nova/volume/service.py +++ b/nova/volume/service.py @@ -27,9 +27,9 @@ import os from twisted.internet import defer -from nova import datastore from nova import exception from nova import flags +from nova import models from nova import process from nova import service from nova import utils @@ -65,15 +65,6 @@ flags.DEFINE_boolean('fake_storage', False, class NoMoreBlades(exception.Error): pass -def get_volume(volume_id): - """ Returns a redis-backed volume object """ - volume_class = Volume - if FLAGS.fake_storage: - volume_class = FakeVolume - vol = volume_class.lookup(volume_id) - if vol: - return vol - raise exception.Error("Volume does not exist") class VolumeService(service.Service): """ @@ -83,10 +74,7 @@ class VolumeService(service.Service): """ def __init__(self): super(VolumeService, self).__init__() - self.volume_class = Volume - if FLAGS.fake_storage: - self.volume_class = FakeVolume - self._init_volume_group() + self._exec_init_volumes() @defer.inlineCallbacks @validate.rangetest(size=(0, 1000)) @@ -97,218 +85,140 @@ class VolumeService(service.Service): Volume at this point has size, owner, and zone. """ logging.debug("Creating volume of size: %s" % (size)) - vol = yield self.volume_class.create(size, user_id, project_id) - logging.debug("restarting exports") - yield self._restart_exports() - defer.returnValue(vol['volume_id']) - def by_node(self, node_id): - """ returns a list of volumes for a node """ - for volume_id in datastore.Redis.instance().smembers('volumes:%s' % (node_id)): - yield self.volume_class(volume_id=volume_id) - - @property - def all(self): - """ returns a list of all volumes """ - for volume_id in datastore.Redis.instance().smembers('volumes'): - yield self.volume_class(volume_id=volume_id) + vol = models.Volume() + vol.volume_id = utils.generate_uid('vol') + vol.node_name = FLAGS.node_name + vol.size = size + vol.user_id = user_id + vol.project_id = project_id + vol.availability_zone = FLAGS.storage_availability_zone + vol.status = "creating" # creating | available | in-use + vol.attach_status = "detached" # attaching | attached | detaching | detached + vol.save() + yield self._exec_create_volume(vol) + yield self._setup_export(vol) + # TODO(joshua): We need to trigger a fanout message + # for aoe-discover on all the nodes + vol.status = "available" + vol.save() + logging.debug("restarting exports") + yield self._exec_ensure_exports() + defer.returnValue(vol.id) @defer.inlineCallbacks def delete_volume(self, volume_id): logging.debug("Deleting volume with id of: %s" % (volume_id)) - vol = get_volume(volume_id) - if vol['attach_status'] == "attached": + vol = models.Volume.find(volume_id) + if vol.attach_status == "attached": raise exception.Error("Volume is still attached") - if vol['node_name'] != FLAGS.node_name: + if vol.node_name != FLAGS.node_name: raise exception.Error("Volume is not local to this node") - yield vol.destroy() + yield self._exec_delete_volume(vol) + yield vol.delete() defer.returnValue(True) @defer.inlineCallbacks - def _restart_exports(self): - if FLAGS.fake_storage: - return - # NOTE(vish): these commands sometimes sends output to stderr for warnings - yield process.simple_execute("sudo vblade-persist auto all", error_ok=1) - yield process.simple_execute("sudo vblade-persist start all", error_ok=1) - - @defer.inlineCallbacks - def _init_volume_group(self): - if FLAGS.fake_storage: - return - yield process.simple_execute( - "sudo pvcreate %s" % (FLAGS.storage_dev)) - yield process.simple_execute( - "sudo vgcreate %s %s" % (FLAGS.volume_group, - FLAGS.storage_dev)) - -class Volume(): - - def __init__(self, volume_id=None): - self.volume_id = volume_id - super(Volume, self).__init__() - - @property - def identifier(self): - return self.volume_id - - def default_state(self): - return {"volume_id": self.volume_id, - "node_name": "unassigned"} - - @classmethod - @defer.inlineCallbacks - def create(cls, size, user_id, project_id): - volume_id = utils.generate_uid('vol') - vol = cls(volume_id) - vol['node_name'] = FLAGS.node_name - vol['size'] = size - vol['user_id'] = user_id - vol['project_id'] = project_id - vol['availability_zone'] = FLAGS.storage_availability_zone - vol["instance_id"] = 'none' - vol["mountpoint"] = 'none' - vol['attach_time'] = 'none' - vol['status'] = "creating" # creating | available | in-use - vol['attach_status'] = "detached" # attaching | attached | detaching | detached - vol['delete_on_termination'] = 'False' - vol.save() - yield vol._create_lv() - yield vol._setup_export() - # TODO(joshua) - We need to trigger a fanout message for aoe-discover on all the nodes - vol['status'] = "available" - vol.save() - defer.returnValue(vol) - - def start_attach(self, instance_id, mountpoint): - """ """ - self['instance_id'] = instance_id - self['mountpoint'] = mountpoint - self['status'] = "in-use" - self['attach_status'] = "attaching" - self['attach_time'] = utils.isotime() - self['delete_on_termination'] = 'False' - self.save() - - def finish_attach(self): - """ """ - self['attach_status'] = "attached" - self.save() - - def start_detach(self): - """ """ - self['attach_status'] = "detaching" - self.save() - - def finish_detach(self): - self['instance_id'] = None - self['mountpoint'] = None - self['status'] = "available" - self['attach_status'] = "detached" - self.save() - - def save(self): - is_new = self.is_new_record() - super(Volume, self).save() - if is_new: - redis = datastore.Redis.instance() - key = self.__devices_key - # TODO(vish): these should be added by admin commands - more = redis.scard(self._redis_association_name("node", - self['node_name'])) - if (not redis.exists(key) and not more): - for shelf_id in range(FLAGS.first_shelf_id, - FLAGS.last_shelf_id + 1): - for blade_id in range(FLAGS.blades_per_shelf): - redis.sadd(key, "%s.%s" % (shelf_id, blade_id)) - self.associate_with("node", self['node_name']) - - @defer.inlineCallbacks - def destroy(self): - yield self._remove_export() - yield self._delete_lv() - self.unassociate_with("node", self['node_name']) - if self.get('shelf_id', None) and self.get('blade_id', None): - redis = datastore.Redis.instance() - key = self.__devices_key - redis.sadd(key, "%s.%s" % (self['shelf_id'], self['blade_id'])) - super(Volume, self).destroy() - - @defer.inlineCallbacks - def _create_lv(self): - if str(self['size']) == '0': + def _exec_create_volume(self, vol): + if str(vol.size) == '0': sizestr = '100M' else: - sizestr = '%sG' % self['size'] + sizestr = '%sG' % vol.size yield process.simple_execute( "sudo lvcreate -L %s -n %s %s" % (sizestr, - self['volume_id'], + vol.volume_id, FLAGS.volume_group), error_ok=1) @defer.inlineCallbacks - def _delete_lv(self): + def _exec_delete_volume(self, vol): yield process.simple_execute( "sudo lvremove -f %s/%s" % (FLAGS.volume_group, - self['volume_id']), error_ok=1) - - @property - def __devices_key(self): - return 'volume_devices:%s' % FLAGS.node_name + vol.volume_id), error_ok=1) @defer.inlineCallbacks - def _setup_export(self): - redis = datastore.Redis.instance() - key = self.__devices_key - device = redis.spop(key) + def _setup_export(self, vol): + # FIXME: device needs to be a pool + device = "1.1" if not device: raise NoMoreBlades() (shelf_id, blade_id) = device.split('.') - self['aoe_device'] = "e%s.%s" % (shelf_id, blade_id) - self['shelf_id'] = shelf_id - self['blade_id'] = blade_id - self.save() - yield self._exec_setup_export() + vol.aoe_device = "e%s.%s" % (shelf_id, blade_id) + vol.shelf_id = shelf_id + vol.blade_id = blade_id + vol.save() + yield self._exec_setup_export(vol) @defer.inlineCallbacks - def _exec_setup_export(self): + def _exec_setup_export(self, vol): + if FLAGS.fake_storage: + return yield process.simple_execute( "sudo vblade-persist setup %s %s %s /dev/%s/%s" % - (self['shelf_id'], - self['blade_id'], + (self, vol['shelf_id'], + vol.blade_id, FLAGS.aoe_eth_dev, FLAGS.volume_group, - self['volume_id']), error_ok=1) + vol.volume_id), error_ok=1) @defer.inlineCallbacks - def _remove_export(self): - if not self.get('shelf_id', None) or not self.get('blade_id', None): + def _remove_export(self, vol): + if not vol.shelf_id or not vol.blade_id: defer.returnValue(False) - yield self._exec_remove_export() + yield self._exec_remove_export(vol) defer.returnValue(True) @defer.inlineCallbacks - def _exec_remove_export(self): + def _exec_remove_export(self, vol): + if FLAGS.fake_storage: + return yield process.simple_execute( - "sudo vblade-persist stop %s %s" % (self['shelf_id'], - self['blade_id']), error_ok=1) + "sudo vblade-persist stop %s %s" % (self, vol.shelf_id, + vol.blade_id), error_ok=1) yield process.simple_execute( - "sudo vblade-persist destroy %s %s" % (self['shelf_id'], - self['blade_id']), error_ok=1) - + "sudo vblade-persist destroy %s %s" % (self, vol.shelf_id, + vol.blade_id), error_ok=1) + @defer.inlineCallbacks + def _exec_ensure_exports(self): + if FLAGS.fake_storage: + return + # NOTE(vish): these commands sometimes sends output to stderr for warnings + yield process.simple_execute("sudo vblade-persist auto all", error_ok=1) + yield process.simple_execute("sudo vblade-persist start all", error_ok=1) + @defer.inlineCallbacks + def _exec_init_volumes(self): + if FLAGS.fake_storage: + return + yield process.simple_execute( + "sudo pvcreate %s" % (FLAGS.storage_dev)) + yield process.simple_execute( + "sudo vgcreate %s %s" % (FLAGS.volume_group, + FLAGS.storage_dev)) -class FakeVolume(Volume): - def _create_lv(self): - pass + def start_attach(self, volume_id, instance_id, mountpoint): + vol = models.Volume.find(volume_id) + vol.instance_id = instance_id + vol.mountpoint = mountpoint + vol.status = "in-use" + vol.attach_status = "attaching" + vol.attach_time = utils.isotime() + vol.save() - def _exec_setup_export(self): - fname = os.path.join(FLAGS.aoe_export_dir, self['aoe_device']) - f = file(fname, "w") - f.close() + def finish_attach(self, volume_id): + vol = models.Volume.find(volume_id) + vol.attach_status = "attached" + vol.save() - def _exec_remove_export(self): - os.unlink(os.path.join(FLAGS.aoe_export_dir, self['aoe_device'])) + def start_detach(self, volume_id): + vol = models.Volume.find(volume_id) + vol.attach_status = "detaching" + vol.save() - def _delete_lv(self): - pass + def finish_detach(self, volume_id): + vol = models.Volume.find(volume_id) + vol.instance_id = None + vol.mountpoint = None + vol.status = "available" + vol.attach_status = "detached" + vol.save() -- cgit