diff options
author | Vishvananda Ishaya <vishvananda@yahoo.com> | 2010-09-07 05:26:08 -0700 |
---|---|---|
committer | Vishvananda Ishaya <vishvananda@yahoo.com> | 2010-09-07 05:26:08 -0700 |
commit | 9db707dda70bbb11d944ab357841c9bdd5ef5b07 (patch) | |
tree | c6c21f8427fd2eabcb1e409f7690d3e8eabbcc7f | |
parent | 91b6fa84f7fa440f1e8b426aa091fdfaa03de6ef (diff) | |
download | nova-9db707dda70bbb11d944ab357841c9bdd5ef5b07.tar.gz nova-9db707dda70bbb11d944ab357841c9bdd5ef5b07.tar.xz nova-9db707dda70bbb11d944ab357841c9bdd5ef5b07.zip |
Lots of fixes to make the nova commands work properly and make datamodel work with mysql properly
-rw-r--r-- | nova/compute/manager.py | 104 | ||||
-rw-r--r-- | nova/db/api.py | 5 | ||||
-rw-r--r-- | nova/db/sqlalchemy/api.py | 6 | ||||
-rw-r--r-- | nova/db/sqlalchemy/models.py | 111 | ||||
-rw-r--r-- | nova/db/sqlalchemy/session.py | 3 | ||||
-rw-r--r-- | nova/endpoint/cloud.py | 11 | ||||
-rw-r--r-- | nova/process.py | 95 | ||||
-rw-r--r-- | nova/service.py | 24 | ||||
-rw-r--r-- | nova/utils.py | 18 | ||||
-rw-r--r-- | nova/virt/fake.py | 8 | ||||
-rw-r--r-- | nova/virt/libvirt_conn.py | 44 | ||||
-rw-r--r-- | nova/volume/driver.py | 25 | ||||
-rw-r--r-- | nova/volume/manager.py | 28 |
13 files changed, 279 insertions, 203 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py index c15c9e1f5..13e5dcd1f 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -26,10 +26,8 @@ import os from twisted.internet import defer -from nova import db from nova import exception from nova import flags -from nova import process from nova import manager from nova import utils from nova.compute import power_state @@ -53,41 +51,42 @@ class ComputeManager(manager.Manager): compute_driver = FLAGS.compute_driver self.driver = utils.import_object(compute_driver) self.network_manager = utils.import_object(FLAGS.network_manager) + self.volume_manager = utils.import_object(FLAGS.volume_manager) super(ComputeManager, self).__init__(*args, **kwargs) def _update_state(self, context, instance_id): """Update the state of an instance from the driver info""" # FIXME(ja): include other fields from state? - instance_ref = db.instance_get(context, instance_id) + instance_ref = self.db.instance_get(context, instance_id) state = self.driver.get_info(instance_ref.name)['state'] - db.instance_state(context, instance_id, state) + self.db.instance_state(context, instance_id, state) @defer.inlineCallbacks @exception.wrap_exception def run_instance(self, context, instance_id, **_kwargs): """Launch a new instance with specified options.""" - instance_ref = db.instance_get(context, instance_id) + instance_ref = self.db.instance_get(context, instance_id) if instance_ref['str_id'] in self.driver.list_instances(): raise exception.Error("Instance has already been created") - logging.debug("Starting instance %s...", instance_id) + logging.debug("instance %s: starting...", instance_id) project_id = instance_ref['project_id'] self.network_manager.setup_compute_network(context, project_id) - db.instance_update(context, - instance_id, - {'host': FLAGS.host}) + self.db.instance_update(context, + instance_id, + {'host': self.host}) # TODO(vish) check to make sure the availability zone matches - db.instance_state(context, - instance_id, - power_state.NOSTATE, - 'spawning') + self.db.instance_state(context, + instance_id, + power_state.NOSTATE, + 'spawning') try: yield self.driver.spawn(instance_ref) except: # pylint: disable-msg=W0702 - logging.exception("Failed to spawn instance %s", + logging.exception("instance %s: Failed to spawn", instance_ref['name']) - db.instance_state(context, instance_id, power_state.SHUTDOWN) + self.db.instance_state(context, instance_id, power_state.SHUTDOWN) self._update_state(context, instance_id) @@ -95,30 +94,30 @@ class ComputeManager(manager.Manager): @exception.wrap_exception def terminate_instance(self, context, instance_id): """Terminate an instance on this machine.""" - logging.debug("Got told to terminate instance %s", instance_id) - instance_ref = db.instance_get(context, instance_id) + logging.debug("instance %s: terminating", instance_id) + instance_ref = self.db.instance_get(context, instance_id) # TODO(vish): move this logic to layer? if instance_ref['state'] == power_state.SHUTOFF: - db.instance_destroy(context, instance_id) + self.db.instance_destroy(context, instance_id) raise exception.Error('trying to destroy already destroyed' ' instance: %s' % instance_id) - db.instance_state(context, - instance_id, - power_state.NOSTATE, - 'shutting_down') + self.db.instance_state(context, + instance_id, + power_state.NOSTATE, + 'shutting_down') yield self.driver.destroy(instance_ref) # TODO(ja): should we keep it in a terminated state for a bit? - db.instance_destroy(context, instance_id) + self.db.instance_destroy(context, instance_id) @defer.inlineCallbacks @exception.wrap_exception def reboot_instance(self, context, instance_id): """Reboot an instance on this server.""" self._update_state(context, instance_id) - instance_ref = db.instance_get(context, instance_id) + instance_ref = self.db.instance_get(context, instance_id) if instance_ref['state'] != power_state.RUNNING: raise exception.Error( @@ -128,11 +127,11 @@ class ComputeManager(manager.Manager): instance_ref['state'], power_state.RUNNING)) - logging.debug('rebooting instance %s', instance_ref['name']) - db.instance_state(context, - instance_id, - power_state.NOSTATE, - 'rebooting') + logging.debug('instance %s: rebooting', instance_ref['name']) + self.db.instance_state(context, + instance_id, + power_state.NOSTATE, + 'rebooting') yield self.driver.reboot(instance_ref) self._update_state(context, instance_id) @@ -141,8 +140,8 @@ class ComputeManager(manager.Manager): """Send the console output for an instance.""" # TODO(vish): Move this into the driver layer - logging.debug("Getting console output for %s", (instance_id)) - instance_ref = db.instance_get(context, instance_id) + logging.debug("instance %s: getting console output", instance_id) + instance_ref = self.db.instance_get(context, instance_id) if FLAGS.connection_type == 'libvirt': fname = os.path.abspath(os.path.join(FLAGS.instances_path, @@ -164,36 +163,27 @@ class ComputeManager(manager.Manager): @exception.wrap_exception def attach_volume(self, context, instance_id, volume_id, mountpoint): """Attach a volume to an instance.""" - # TODO(termie): check that instance_id exists - volume_ref = db.volume_get(context, volume_id) - yield self._init_aoe() - # TODO(vish): Move this into the driver layer - yield process.simple_execute( - "sudo virsh attach-disk %s /dev/etherd/%s %s" % - (instance_id, - volume_ref['aoe_device'], - mountpoint.rpartition('/dev/')[2])) - db.volume_attached(context, volume_id, instance_id, mountpoint) + logging.debug("instance %s: attaching volume %s to %s", instance_id, + volume_id, mountpoint) + instance_ref = self.db.instance_get(context, instance_id) + dev_path = yield self.volume_manager.setup_compute_volume(context, + volume_id) + yield self.driver.attach_volume(instance_ref['str_id'], + dev_path, + mountpoint) + self.db.volume_attached(context, volume_id, instance_id, mountpoint) defer.returnValue(True) @defer.inlineCallbacks @exception.wrap_exception def detach_volume(self, context, instance_id, volume_id): """Detach a volume from an instance.""" - # despite the documentation, virsh detach-disk just wants the device - # name without the leading /dev/ - # TODO(termie): check that instance_id exists - volume_ref = db.volume_get(context, volume_id) - target = volume_ref['mountpoint'].rpartition('/dev/')[2] - # TODO(vish): Move this into the driver layer - yield process.simple_execute( - "sudo virsh detach-disk %s %s " % (instance_id, target)) - db.volume_detached(context, volume_id) + logging.debug("instance %s: detaching volume %s", + instance_id, + volume_id) + instance_ref = self.db.instance_get(context, instance_id) + volume_ref = self.db.volume_get(context, volume_id) + self.driver.detach_volume(instance_ref['str_id'], + volume_ref['mountpoint']) + self.db.volume_detached(context, volume_id) defer.returnValue(True) - - @defer.inlineCallbacks - def _init_aoe(self): - """Discover aoe exported devices""" - # TODO(vish): these shell calls should move into volume manager. - yield process.simple_execute("sudo aoe-discover") - yield process.simple_execute("sudo aoe-stat") diff --git a/nova/db/api.py b/nova/db/api.py index d5ccfca80..b49707392 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -410,6 +410,11 @@ def volume_get_all(context): return IMPL.volume_get_all(context) +def volume_get_instance(context, volume_id): + """Get the instance that a volume is attached to.""" + return IMPL.volume_get_instance(context, volume_id) + + def volume_get_by_project(context, project_id): """Get all volumes belonging to a project.""" return IMPL.volume_get_by_project(context, project_id) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index fdd2765d3..5172b87b3 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -560,6 +560,12 @@ def volume_get_host(context, volume_id): return volume_ref['host'] +def volume_get_instance(context, volume_id): + volume_ref = db.volume_get(context, volume_id) + instance_ref = db.instance_get(context, volume_ref['instance_id']) + return instance_ref + + def volume_get_shelf_and_blade(_context, volume_id): with managed_session() as session: export_device = session.query(models.ExportDevice) \ diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index 626be87fe..310d4640e 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -119,52 +119,56 @@ class NovaBase(object): def __getitem__(self, key): return getattr(self, key) - -class Image(BASE, NovaBase): - """Represents an image in the datastore""" - __tablename__ = 'images' - __prefix__ = 'ami' - id = Column(Integer, primary_key=True) - user_id = Column(String(255)) - project_id = Column(String(255)) - image_type = Column(String(255)) - public = Column(Boolean, default=False) - state = Column(String(255)) - location = Column(String(255)) - arch = Column(String(255)) - default_kernel_id = Column(String(255)) - default_ramdisk_id = Column(String(255)) - - @validates('image_type') - def validate_image_type(self, key, image_type): - assert(image_type in ['machine', 'kernel', 'ramdisk', 'raw']) - - @validates('state') - def validate_state(self, key, state): - assert(state in ['available', 'pending', 'disabled']) - - @validates('default_kernel_id') - def validate_kernel_id(self, key, val): - if val != 'machine': - assert(val is None) - - @validates('default_ramdisk_id') - def validate_ramdisk_id(self, key, val): - if val != 'machine': - assert(val is None) - - -class Host(BASE, NovaBase): - """Represents a host where services are running""" - __tablename__ = 'hosts' - id = Column(String(255), primary_key=True) - - +# TODO(vish): Store images in the database instead of file system +#class Image(BASE, NovaBase): +# """Represents an image in the datastore""" +# __tablename__ = 'images' +# __prefix__ = 'ami' +# id = Column(Integer, primary_key=True) +# user_id = Column(String(255)) +# project_id = Column(String(255)) +# image_type = Column(String(255)) +# public = Column(Boolean, default=False) +# state = Column(String(255)) +# location = Column(String(255)) +# arch = Column(String(255)) +# default_kernel_id = Column(String(255)) +# default_ramdisk_id = Column(String(255)) +# +# @validates('image_type') +# def validate_image_type(self, key, image_type): +# assert(image_type in ['machine', 'kernel', 'ramdisk', 'raw']) +# +# @validates('state') +# def validate_state(self, key, state): +# assert(state in ['available', 'pending', 'disabled']) +# +# @validates('default_kernel_id') +# def validate_kernel_id(self, key, val): +# if val != 'machine': +# assert(val is None) +# +# @validates('default_ramdisk_id') +# def validate_ramdisk_id(self, key, val): +# if val != 'machine': +# assert(val is None) +# +# +# TODO(vish): To make this into its own table, we need a good place to +# create the host entries. In config somwhere? Or the first +# time any object sets host? This only becomes particularly +# important if we need to store per-host data. +#class Host(BASE, NovaBase): +# """Represents a host where services are running""" +# __tablename__ = 'hosts' +# id = Column(String(255), primary_key=True) +# +# class Service(BASE, NovaBase): """Represents a running service on a host""" __tablename__ = 'services' id = Column(Integer, primary_key=True) - host = Column(String(255), ForeignKey('hosts.id')) + host = Column(String(255)) # , ForeignKey('hosts.id')) binary = Column(String(255)) topic = Column(String(255)) report_count = Column(Integer, nullable=False, default=0) @@ -208,9 +212,12 @@ class Instance(BASE, NovaBase): def name(self): return self.str_id - image_id = Column(Integer, ForeignKey('images.id'), nullable=True) - kernel_id = Column(Integer, ForeignKey('images.id'), nullable=True) - ramdisk_id = Column(Integer, ForeignKey('images.id'), nullable=True) + image_id = Column(String(255)) + kernel_id = Column(String(255)) + ramdisk_id = Column(String(255)) +# image_id = Column(Integer, ForeignKey('images.id'), nullable=True) +# kernel_id = Column(Integer, ForeignKey('images.id'), nullable=True) +# ramdisk_id = Column(Integer, ForeignKey('images.id'), nullable=True) # ramdisk = relationship(Ramdisk, backref=backref('instances', order_by=id)) # kernel = relationship(Kernel, backref=backref('instances', order_by=id)) # project = relationship(Project, backref=backref('instances', order_by=id)) @@ -224,9 +231,9 @@ class Instance(BASE, NovaBase): state_description = Column(String(255)) hostname = Column(String(255)) - host = Column(String(255), ForeignKey('hosts.id')) + host = Column(String(255)) # , ForeignKey('hosts.id')) - instance_type = Column(Integer) + instance_type = Column(String(255)) user_data = Column(Text) @@ -264,7 +271,7 @@ class Volume(BASE, NovaBase): user_id = Column(String(255)) project_id = Column(String(255)) - host = Column(String(255), ForeignKey('hosts.id')) + host = Column(String(255)) # , ForeignKey('hosts.id')) size = Column(Integer) availability_zone = Column(String(255)) # TODO(vish): foreign key? instance_id = Column(Integer, ForeignKey('instances.id'), nullable=True) @@ -305,7 +312,7 @@ class Network(BASE, NovaBase): dhcp_start = Column(String(255)) project_id = Column(String(255)) - host = Column(String(255), ForeignKey('hosts.id')) + host = Column(String(255)) # , ForeignKey('hosts.id')) class NetworkIndex(BASE, NovaBase): @@ -367,7 +374,7 @@ class FloatingIp(BASE, NovaBase): fixed_ip = relationship(FixedIp, backref=backref('floating_ips')) project_id = Column(String(255)) - host = Column(String(255), ForeignKey('hosts.id')) + host = Column(String(255)) # , ForeignKey('hosts.id')) @property def str_id(self): @@ -392,8 +399,8 @@ class FloatingIp(BASE, NovaBase): def register_models(): """Register Models and create metadata""" from sqlalchemy import create_engine - models = (Image, Host, Service, Instance, Volume, ExportDevice, - FixedIp, FloatingIp, Network, NetworkIndex) + models = (Service, Instance, Volume, ExportDevice, + FixedIp, FloatingIp, Network, NetworkIndex) # , Image, Host) engine = create_engine(FLAGS.sql_connection, echo=False) for model in models: model.metadata.create_all(engine) diff --git a/nova/db/sqlalchemy/session.py b/nova/db/sqlalchemy/session.py index 70e3212e1..adcc42293 100644 --- a/nova/db/sqlalchemy/session.py +++ b/nova/db/sqlalchemy/session.py @@ -50,6 +50,7 @@ class SessionExecutionManager: def __exit__(self, exc_type, exc_value, traceback): if exc_type: - logging.exception("Rolling back due to failed transaction") + logging.exception("Rolling back due to failed transaction: %s", + exc_type) self._session.rollback() self._session.close() diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py index 15136adac..932d42de4 100644 --- a/nova/endpoint/cloud.py +++ b/nova/endpoint/cloud.py @@ -41,6 +41,7 @@ from nova.endpoint import images FLAGS = flags.FLAGS +flags.DECLARE('storage_availability_zone', 'nova.volume.manager') def _gen_key(user_id, key_name): @@ -262,11 +263,11 @@ class CloudController(object): volume['mountpoint']) if volume['attach_status'] == 'attached': v['attachmentSet'] = [{'attachTime': volume['attach_time'], - 'deleteOnTermination': volume['delete_on_termination'], + 'deleteOnTermination': False, 'device': volume['mountpoint'], 'instanceId': volume['instance_id'], 'status': 'attached', - 'volume_id': volume['volume_id']}] + 'volume_id': volume['str_id']}] else: v['attachmentSet'] = [{}] return v @@ -293,7 +294,7 @@ class CloudController(object): def attach_volume(self, context, volume_id, instance_id, device, **kwargs): volume_ref = db.volume_get_by_str(context, volume_id) # TODO(vish): abstract status checking? - if volume_ref['status'] == "attached": + if volume_ref['attach_status'] == "attached": raise exception.ApiError("Volume is already attached") #volume.start_attach(instance_id, device) instance_ref = db.instance_get_by_str(context, instance_id) @@ -306,7 +307,7 @@ class CloudController(object): "mountpoint": device}}) return defer.succeed({'attachTime': volume_ref['attach_time'], 'device': volume_ref['mountpoint'], - 'instanceId': instance_ref['id_str'], + 'instanceId': instance_ref['id'], 'requestId': context.request_id, 'status': volume_ref['attach_status'], 'volumeId': volume_ref['id']}) @@ -334,7 +335,7 @@ class CloudController(object): db.volume_detached(context) return defer.succeed({'attachTime': volume_ref['attach_time'], 'device': volume_ref['mountpoint'], - 'instanceId': instance_ref['id_str'], + 'instanceId': instance_ref['str_id'], 'requestId': context.request_id, 'status': volume_ref['attach_status'], 'volumeId': volume_ref['id']}) diff --git a/nova/process.py b/nova/process.py index 425d9f162..74725c157 100644 --- a/nova/process.py +++ b/nova/process.py @@ -18,9 +18,10 @@ # under the License. """ -Process pool, still buggy right now. +Process pool using twisted threading """ +import logging import StringIO from twisted.internet import defer @@ -29,30 +30,14 @@ from twisted.internet import protocol from twisted.internet import reactor from nova import flags +from nova.utils import ProcessExecutionError FLAGS = flags.FLAGS flags.DEFINE_integer('process_pool_size', 4, 'Number of processes to use in the process pool') - -# NOTE(termie): this is copied from twisted.internet.utils but since -# they don't export it I've copied and modified -class UnexpectedErrorOutput(IOError): - """ - Standard error data was received where it was not expected. This is a - subclass of L{IOError} to preserve backward compatibility with the previous - error behavior of L{getProcessOutput}. - - @ivar processEnded: A L{Deferred} which will fire when the process which - produced the data on stderr has ended (exited and all file descriptors - closed). - """ - def __init__(self, stdout=None, stderr=None): - IOError.__init__(self, "got stdout: %r\nstderr: %r" % (stdout, stderr)) - - -# This is based on _BackRelay from twister.internal.utils, but modified to -# capture both stdout and stderr, without odd stderr handling, and also to +# This is based on _BackRelay from twister.internal.utils, but modified to +# capture both stdout and stderr, without odd stderr handling, and also to # handle stdin class BackRelayWithInput(protocol.ProcessProtocol): """ @@ -62,22 +47,23 @@ class BackRelayWithInput(protocol.ProcessProtocol): @ivar deferred: A L{Deferred} which will be called back with all of stdout and all of stderr as well (as a tuple). C{terminate_on_stderr} is true and any bytes are received over stderr, this will fire with an - L{_UnexpectedErrorOutput} instance and the attribute will be set to + L{_ProcessExecutionError} instance and the attribute will be set to C{None}. - @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are - received over stderr, this attribute will refer to a L{Deferred} which - will be called back when the process ends. This C{Deferred} is also - associated with the L{_UnexpectedErrorOutput} which C{deferred} fires - with earlier in this case so that users can determine when the process + @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are + received over stderr, this attribute will refer to a L{Deferred} which + will be called back when the process ends. This C{Deferred} is also + associated with the L{_ProcessExecutionError} which C{deferred} fires + with earlier in this case so that users can determine when the process has actually ended, in addition to knowing when bytes have been received via stderr. """ - def __init__(self, deferred, started_deferred=None, - terminate_on_stderr=False, check_exit_code=True, - process_input=None): + def __init__(self, deferred, cmd, started_deferred=None, + terminate_on_stderr=False, check_exit_code=True, + process_input=None): self.deferred = deferred + self.cmd = cmd self.stdout = StringIO.StringIO() self.stderr = StringIO.StringIO() self.started_deferred = started_deferred @@ -85,14 +71,18 @@ class BackRelayWithInput(protocol.ProcessProtocol): self.check_exit_code = check_exit_code self.process_input = process_input self.on_process_ended = None - + + def _build_execution_error(self, exit_code=None): + return ProcessExecutionError(cmd=self.cmd, + exit_code=exit_code, + stdout=self.stdout.getvalue(), + stderr=self.stderr.getvalue()) + def errReceived(self, text): self.stderr.write(text) if self.terminate_on_stderr and (self.deferred is not None): self.on_process_ended = defer.Deferred() - self.deferred.errback(UnexpectedErrorOutput( - stdout=self.stdout.getvalue(), - stderr=self.stderr.getvalue())) + self.deferred.errback(self._build_execution_error()) self.deferred = None self.transport.loseConnection() @@ -102,15 +92,19 @@ class BackRelayWithInput(protocol.ProcessProtocol): def processEnded(self, reason): if self.deferred is not None: stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue() - try: - if self.check_exit_code: - reason.trap(error.ProcessDone) - self.deferred.callback((stdout, stderr)) - except: - # NOTE(justinsb): This logic is a little suspicious to me... - # If the callback throws an exception, then errback will be - # called also. However, this is what the unit tests test for... - self.deferred.errback(UnexpectedErrorOutput(stdout, stderr)) + exit_code = reason.value.exitCode + if self.check_exit_code and exit_code <> 0: + self.deferred.errback(self._build_execution_error(exit_code)) + else: + try: + if self.check_exit_code: + reason.trap(error.ProcessDone) + self.deferred.callback((stdout, stderr)) + except: + # NOTE(justinsb): This logic is a little suspicious to me... + # If the callback throws an exception, then errback will be + # called also. However, this is what the unit tests test for... + self.deferred.errback(self._build_execution_error(exit_code)) elif self.on_process_ended is not None: self.on_process_ended.errback(reason) @@ -122,8 +116,8 @@ class BackRelayWithInput(protocol.ProcessProtocol): self.transport.write(self.process_input) self.transport.closeStdin() -def get_process_output(executable, args=None, env=None, path=None, - process_reactor=None, check_exit_code=True, +def get_process_output(executable, args=None, env=None, path=None, + process_reactor=None, check_exit_code=True, process_input=None, started_deferred=None, terminate_on_stderr=False): if process_reactor is None: @@ -131,10 +125,15 @@ def get_process_output(executable, args=None, env=None, path=None, args = args and args or () env = env and env and {} deferred = defer.Deferred() + cmd = executable + if args: + cmd = cmd + " " + ' '.join(args) + logging.debug("Running cmd: %s", cmd) process_handler = BackRelayWithInput( - deferred, - started_deferred=started_deferred, - check_exit_code=check_exit_code, + deferred, + cmd, + started_deferred=started_deferred, + check_exit_code=check_exit_code, process_input=process_input, terminate_on_stderr=terminate_on_stderr) # NOTE(vish): commands come in as unicode, but self.executes needs @@ -142,7 +141,7 @@ def get_process_output(executable, args=None, env=None, path=None, executable = str(executable) if not args is None: args = [str(x) for x in args] - process_reactor.spawnProcess( process_handler, executable, + process_reactor.spawnProcess( process_handler, executable, (executable,)+tuple(args), env, path) return deferred diff --git a/nova/service.py b/nova/service.py index d7471f4c6..bc4b80fe4 100644 --- a/nova/service.py +++ b/nova/service.py @@ -58,10 +58,14 @@ class Service(object, service.Service): self.binary) self.service_id = service_ref['id'] except exception.NotFound: - self.service_id = db.service_create(None, {'host': self.host, - 'binary': self.binary, - 'topic': self.topic, - 'report_count': 0}) + self._create_service_ref() + + + def _create_service_ref(self): + self.service_id = db.service_create(None, {'host': self.host, + 'binary': self.binary, + 'topic': self.topic, + 'report_count': 0}) def __getattr__(self, key): try: @@ -122,10 +126,6 @@ class Service(object, service.Service): def kill(self, context=None): """Destroy the service object in the datastore""" try: - service_ref = db.service_get_by_args(context, - self.host, - self.binary) - service_id = service_ref['id'] db.service_destroy(context, self.service_id) except exception.NotFound: logging.warn("Service killed that has no database entry") @@ -134,7 +134,13 @@ class Service(object, service.Service): def report_state(self, context=None): """Update the state of this service in the datastore.""" try: - service_ref = db.service_get(context, self.service_id) + try: + service_ref = db.service_get(context, self.service_id) + except exception.NotFound: + logging.debug("The service database object disappeared, " + "Recreating it.") + self._create_service_ref() + db.service_update(context, self.service_id, {'report_count': service_ref['report_count'] + 1}) diff --git a/nova/utils.py b/nova/utils.py index 536d722bb..3e4a3d94f 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -38,6 +38,16 @@ from nova import flags FLAGS = flags.FLAGS TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" +class ProcessExecutionError(IOError): + def __init__( self, stdout=None, stderr=None, exit_code=None, cmd=None, + description=None): + if description is None: + description = "Unexpected error while running command." + if exit_code is None: + exit_code = '-' + message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % ( + description, cmd, exit_code, stdout, stderr) + IOError.__init__(self, message) def import_class(import_str): """Returns a class from a string including module and class""" @@ -69,6 +79,7 @@ def fetchfile(url, target): execute("curl --fail %s -o %s" % (url, target)) def execute(cmd, process_input=None, addl_env=None, check_exit_code=True): + logging.debug("Running cmd: %s", cmd) env = os.environ.copy() if addl_env: env.update(addl_env) @@ -83,8 +94,11 @@ def execute(cmd, process_input=None, addl_env=None, check_exit_code=True): if obj.returncode: logging.debug("Result was %s" % (obj.returncode)) if check_exit_code and obj.returncode <> 0: - raise Exception( "Unexpected exit code: %s. result=%s" - % (obj.returncode, result)) + (stdout, stderr) = result + raise ProcessExecutionError(exit_code=obj.returncode, + stdout=stdout, + stderr=stderr, + cmd=cmd) return result diff --git a/nova/virt/fake.py b/nova/virt/fake.py index 060b53729..4ae6afcc4 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -132,6 +132,14 @@ class FakeConnection(object): del self.instances[instance.name] return defer.succeed(None) + def attach_volume(self, instance_name, device_path, mountpoint): + """Attach the disk at device_path to the instance at mountpoint""" + return True + + def detach_volume(self, instance_name, mountpoint): + """Detach the disk attached to the instance at mountpoint""" + return True + def get_info(self, instance_name): """ Get a block of information about the given instance. This is returned diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py index 621b7d576..73d0a366f 100644 --- a/nova/virt/libvirt_conn.py +++ b/nova/virt/libvirt_conn.py @@ -21,7 +21,6 @@ A connection to a hypervisor (e.g. KVM) through libvirt. """ -import json import logging import os import shutil @@ -154,12 +153,30 @@ class LibvirtConnection(object): def _cleanup(self, instance): target = os.path.join(FLAGS.instances_path, instance['name']) - logging.info("Deleting instance files at %s", target) + logging.info('instance %s: deleting instance files %s', + instance['name'], target) if os.path.exists(target): shutil.rmtree(target) @defer.inlineCallbacks @exception.wrap_exception + def attach_volume(self, instance_name, device_path, mountpoint): + yield process.simple_execute("sudo virsh attach-disk %s %s %s" % + (instance_name, + device_path, + mountpoint.rpartition('/dev/')[2])) + + @defer.inlineCallbacks + @exception.wrap_exception + def detach_volume(self, instance_name, mountpoint): + # NOTE(vish): despite the documentation, virsh detach-disk just + # wants the device name without the leading /dev/ + yield process.simple_execute("sudo virsh detach-disk %s %s" % + (instance_name, + mountpoint.rpartition('/dev/')[2])) + + @defer.inlineCallbacks + @exception.wrap_exception def reboot(self, instance): xml = self.to_xml(instance) yield self._conn.lookupByName(instance['name']).destroy() @@ -171,7 +188,7 @@ class LibvirtConnection(object): try: instance.set_state(self.get_info(instance['name'])['state']) if instance.state == power_state.RUNNING: - logging.debug('rebooted instance %s' % instance['name']) + logging.debug('instance %s: rebooted', instance['name']) timer.stop() d.callback(None) except Exception, exn: @@ -192,7 +209,7 @@ class LibvirtConnection(object): yield self._conn.createXML(xml, 0) # TODO(termie): this should actually register # a callback to check for successful boot - logging.debug("Instance is running") + logging.debug("instance %s: is running", instance['name']) local_d = defer.Deferred() timer = task.LoopingCall(f=None) @@ -200,11 +217,11 @@ class LibvirtConnection(object): try: instance.set_state(self.get_info(instance['name'])['state']) if instance.state == power_state.RUNNING: - logging.debug('booted instance %s', instance['name']) + logging.debug('instance %s: booted', instance['name']) timer.stop() local_d.callback(None) except: - logging.exception('Failed to boot instance %s', + logging.exception('instance %s: failed to boot', instance['name']) instance.set_state(power_state.SHUTDOWN) timer.stop() @@ -227,7 +244,7 @@ class LibvirtConnection(object): # TODO(termie): these are blocking calls, it would be great # if they weren't. - logging.info('Creating image for: %s', inst['name']) + logging.info('instance %s: Creating image', inst['name']) f = open(basepath('libvirt.xml'), 'w') f.write(libvirt_xml) f.close() @@ -249,7 +266,7 @@ class LibvirtConnection(object): process_input=process_input, check_exit_code=True) - key = inst.key_data + key = str(inst['key_data']) net = None network_ref = db.project_get_network(None, project.id) if network_ref['injected']: @@ -262,7 +279,12 @@ class LibvirtConnection(object): 'broadcast': network_ref['broadcast'], 'dns': network_ref['dns']} if key or net: - logging.info('Injecting data into image %s', inst.image_id) + if key: + logging.info('instance %s: injecting key into image %s', + inst['name'], inst.image_id) + if net: + logging.info('instance %s: injecting net into image %s', + inst['name'], inst.image_id) yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) if os.path.exists(basepath('disk')): @@ -275,7 +297,7 @@ class LibvirtConnection(object): def to_xml(self, instance): # TODO(termie): cache? - logging.debug("Starting the toXML method") + logging.debug('instance %s: starting toXML method', instance['name']) network = db.project_get_network(None, instance['project_id']) # FIXME(vish): stick this in db instance_type = instance_types.INSTANCE_TYPES[instance['instance_type']] @@ -288,7 +310,7 @@ class LibvirtConnection(object): 'bridge_name': network['bridge'], 'mac_address': instance['mac_address']} libvirt_xml = self.libvirt_xml % xml_info - logging.debug("Finished the toXML method") + logging.debug('instance %s: finished toXML method', instance['name']) return libvirt_xml diff --git a/nova/volume/driver.py b/nova/volume/driver.py index f5c1330a3..f875e0213 100644 --- a/nova/volume/driver.py +++ b/nova/volume/driver.py @@ -35,35 +35,34 @@ flags.DEFINE_string('aoe_eth_dev', 'eth0', 'Which device to export the volumes on') - class AOEDriver(object): """Executes commands relating to AOE volumes""" def __init__(self, execute=process.simple_execute, *args, **kwargs): self._execute = execute @defer.inlineCallbacks - def create_volume(self, volume_id, size): + def create_volume(self, volume_name, size): """Creates a logical volume""" # NOTE(vish): makes sure that the volume group exists - yield self._execute("vgs | grep %s" % FLAGS.volume_group) + yield self._execute("vgs %s" % FLAGS.volume_group) if int(size) == 0: sizestr = '100M' else: sizestr = '%sG' % size yield self._execute( "sudo lvcreate -L %s -n %s %s" % (sizestr, - volume_id, + volume_name, FLAGS.volume_group)) @defer.inlineCallbacks - def delete_volume(self, volume_id): + def delete_volume(self, volume_name): """Deletes a logical volume""" yield self._execute( "sudo lvremove -f %s/%s" % (FLAGS.volume_group, - volume_id)) + volume_name)) @defer.inlineCallbacks - def create_export(self, volume_id, shelf_id, blade_id): + def create_export(self, volume_name, shelf_id, blade_id): """Creates an export for a logical volume""" yield self._execute( "sudo vblade-persist setup %s %s %s /dev/%s/%s" % @@ -71,10 +70,16 @@ class AOEDriver(object): blade_id, FLAGS.aoe_eth_dev, FLAGS.volume_group, - volume_id)) + volume_name)) + + @defer.inlineCallbacks + def discover_volume(self, _volume_name): + """Discover volume on a remote host""" + yield self._execute("sudo aoe-discover") + yield self._execute("sudo aoe-stat") @defer.inlineCallbacks - def remove_export(self, _volume_id, shelf_id, blade_id): + def remove_export(self, _volume_name, shelf_id, blade_id): """Removes an export for a logical volume""" yield self._execute( "sudo vblade-persist stop %s %s" % (shelf_id, blade_id)) @@ -92,7 +97,6 @@ class AOEDriver(object): check_exit_code=False) - class FakeAOEDriver(AOEDriver): """Logs calls instead of executing""" def __init__(self, *args, **kwargs): @@ -102,4 +106,3 @@ class FakeAOEDriver(AOEDriver): def fake_execute(cmd, *_args, **_kwargs): """Execute that simply logs the command""" logging.debug("FAKE AOE: %s", cmd) - diff --git a/nova/volume/manager.py b/nova/volume/manager.py index e5f4805a1..c4fa1f982 100644 --- a/nova/volume/manager.py +++ b/nova/volume/manager.py @@ -82,17 +82,17 @@ class AOEManager(manager.Manager): size = volume_ref['size'] logging.debug("volume %s: creating lv of size %sG", volume_id, size) - yield self.driver.create_volume(volume_id, size) + yield self.driver.create_volume(volume_ref['str_id'], size) logging.debug("volume %s: allocating shelf & blade", volume_id) self._ensure_blades(context) rval = self.db.volume_allocate_shelf_and_blade(context, volume_id) (shelf_id, blade_id) = rval - logging.debug("volume %s: exporting shelf %s & blade %s", (volume_id, - shelf_id, blade_id)) + logging.debug("volume %s: exporting shelf %s & blade %s", volume_id, + shelf_id, blade_id) - yield self.driver.create_export(volume_id, shelf_id, blade_id) + yield self.driver.create_export(volume_ref['str_id'], shelf_id, blade_id) # TODO(joshua): We need to trigger a fanout message # for aoe-discover on all the nodes @@ -114,8 +114,22 @@ class AOEManager(manager.Manager): if volume_ref['host'] != FLAGS.host: raise exception.Error("Volume is not local to this node") shelf_id, blade_id = self.db.volume_get_shelf_and_blade(context, - volume_id) - yield self.driver.remove_export(volume_id, shelf_id, blade_id) - yield self.driver.delete_volume(volume_id) + volume_id) + yield self.driver.remove_export(volume_ref['str_id'], + shelf_id, + blade_id) + yield self.driver.delete_volume(volume_ref['str_id']) self.db.volume_destroy(context, volume_id) defer.returnValue(True) + + @defer.inlineCallbacks + def setup_compute_volume(self, context, volume_id): + """Setup remote volume on compute host + + Returns path to device. + """ + volume_ref = self.db.volume_get(context, volume_id) + yield self.driver.discover_volume(volume_ref['str_id']) + shelf_id, blade_id = self.db.volume_get_shelf_and_blade(context, + volume_id) + defer.returnValue("/dev/etherd/e%s.%s" % (shelf_id, blade_id)) |