summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVishvananda Ishaya <vishvananda@yahoo.com>2010-09-07 05:26:08 -0700
committerVishvananda Ishaya <vishvananda@yahoo.com>2010-09-07 05:26:08 -0700
commit9db707dda70bbb11d944ab357841c9bdd5ef5b07 (patch)
treec6c21f8427fd2eabcb1e409f7690d3e8eabbcc7f
parent91b6fa84f7fa440f1e8b426aa091fdfaa03de6ef (diff)
downloadnova-9db707dda70bbb11d944ab357841c9bdd5ef5b07.tar.gz
nova-9db707dda70bbb11d944ab357841c9bdd5ef5b07.tar.xz
nova-9db707dda70bbb11d944ab357841c9bdd5ef5b07.zip
Lots of fixes to make the nova commands work properly and make datamodel work with mysql properly
-rw-r--r--nova/compute/manager.py104
-rw-r--r--nova/db/api.py5
-rw-r--r--nova/db/sqlalchemy/api.py6
-rw-r--r--nova/db/sqlalchemy/models.py111
-rw-r--r--nova/db/sqlalchemy/session.py3
-rw-r--r--nova/endpoint/cloud.py11
-rw-r--r--nova/process.py95
-rw-r--r--nova/service.py24
-rw-r--r--nova/utils.py18
-rw-r--r--nova/virt/fake.py8
-rw-r--r--nova/virt/libvirt_conn.py44
-rw-r--r--nova/volume/driver.py25
-rw-r--r--nova/volume/manager.py28
13 files changed, 279 insertions, 203 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index c15c9e1f5..13e5dcd1f 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -26,10 +26,8 @@ import os
from twisted.internet import defer
-from nova import db
from nova import exception
from nova import flags
-from nova import process
from nova import manager
from nova import utils
from nova.compute import power_state
@@ -53,41 +51,42 @@ class ComputeManager(manager.Manager):
compute_driver = FLAGS.compute_driver
self.driver = utils.import_object(compute_driver)
self.network_manager = utils.import_object(FLAGS.network_manager)
+ self.volume_manager = utils.import_object(FLAGS.volume_manager)
super(ComputeManager, self).__init__(*args, **kwargs)
def _update_state(self, context, instance_id):
"""Update the state of an instance from the driver info"""
# FIXME(ja): include other fields from state?
- instance_ref = db.instance_get(context, instance_id)
+ instance_ref = self.db.instance_get(context, instance_id)
state = self.driver.get_info(instance_ref.name)['state']
- db.instance_state(context, instance_id, state)
+ self.db.instance_state(context, instance_id, state)
@defer.inlineCallbacks
@exception.wrap_exception
def run_instance(self, context, instance_id, **_kwargs):
"""Launch a new instance with specified options."""
- instance_ref = db.instance_get(context, instance_id)
+ instance_ref = self.db.instance_get(context, instance_id)
if instance_ref['str_id'] in self.driver.list_instances():
raise exception.Error("Instance has already been created")
- logging.debug("Starting instance %s...", instance_id)
+ logging.debug("instance %s: starting...", instance_id)
project_id = instance_ref['project_id']
self.network_manager.setup_compute_network(context, project_id)
- db.instance_update(context,
- instance_id,
- {'host': FLAGS.host})
+ self.db.instance_update(context,
+ instance_id,
+ {'host': self.host})
# TODO(vish) check to make sure the availability zone matches
- db.instance_state(context,
- instance_id,
- power_state.NOSTATE,
- 'spawning')
+ self.db.instance_state(context,
+ instance_id,
+ power_state.NOSTATE,
+ 'spawning')
try:
yield self.driver.spawn(instance_ref)
except: # pylint: disable-msg=W0702
- logging.exception("Failed to spawn instance %s",
+ logging.exception("instance %s: Failed to spawn",
instance_ref['name'])
- db.instance_state(context, instance_id, power_state.SHUTDOWN)
+ self.db.instance_state(context, instance_id, power_state.SHUTDOWN)
self._update_state(context, instance_id)
@@ -95,30 +94,30 @@ class ComputeManager(manager.Manager):
@exception.wrap_exception
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
- logging.debug("Got told to terminate instance %s", instance_id)
- instance_ref = db.instance_get(context, instance_id)
+ logging.debug("instance %s: terminating", instance_id)
+ instance_ref = self.db.instance_get(context, instance_id)
# TODO(vish): move this logic to layer?
if instance_ref['state'] == power_state.SHUTOFF:
- db.instance_destroy(context, instance_id)
+ self.db.instance_destroy(context, instance_id)
raise exception.Error('trying to destroy already destroyed'
' instance: %s' % instance_id)
- db.instance_state(context,
- instance_id,
- power_state.NOSTATE,
- 'shutting_down')
+ self.db.instance_state(context,
+ instance_id,
+ power_state.NOSTATE,
+ 'shutting_down')
yield self.driver.destroy(instance_ref)
# TODO(ja): should we keep it in a terminated state for a bit?
- db.instance_destroy(context, instance_id)
+ self.db.instance_destroy(context, instance_id)
@defer.inlineCallbacks
@exception.wrap_exception
def reboot_instance(self, context, instance_id):
"""Reboot an instance on this server."""
self._update_state(context, instance_id)
- instance_ref = db.instance_get(context, instance_id)
+ instance_ref = self.db.instance_get(context, instance_id)
if instance_ref['state'] != power_state.RUNNING:
raise exception.Error(
@@ -128,11 +127,11 @@ class ComputeManager(manager.Manager):
instance_ref['state'],
power_state.RUNNING))
- logging.debug('rebooting instance %s', instance_ref['name'])
- db.instance_state(context,
- instance_id,
- power_state.NOSTATE,
- 'rebooting')
+ logging.debug('instance %s: rebooting', instance_ref['name'])
+ self.db.instance_state(context,
+ instance_id,
+ power_state.NOSTATE,
+ 'rebooting')
yield self.driver.reboot(instance_ref)
self._update_state(context, instance_id)
@@ -141,8 +140,8 @@ class ComputeManager(manager.Manager):
"""Send the console output for an instance."""
# TODO(vish): Move this into the driver layer
- logging.debug("Getting console output for %s", (instance_id))
- instance_ref = db.instance_get(context, instance_id)
+ logging.debug("instance %s: getting console output", instance_id)
+ instance_ref = self.db.instance_get(context, instance_id)
if FLAGS.connection_type == 'libvirt':
fname = os.path.abspath(os.path.join(FLAGS.instances_path,
@@ -164,36 +163,27 @@ class ComputeManager(manager.Manager):
@exception.wrap_exception
def attach_volume(self, context, instance_id, volume_id, mountpoint):
"""Attach a volume to an instance."""
- # TODO(termie): check that instance_id exists
- volume_ref = db.volume_get(context, volume_id)
- yield self._init_aoe()
- # TODO(vish): Move this into the driver layer
- yield process.simple_execute(
- "sudo virsh attach-disk %s /dev/etherd/%s %s" %
- (instance_id,
- volume_ref['aoe_device'],
- mountpoint.rpartition('/dev/')[2]))
- db.volume_attached(context, volume_id, instance_id, mountpoint)
+ logging.debug("instance %s: attaching volume %s to %s", instance_id,
+ volume_id, mountpoint)
+ instance_ref = self.db.instance_get(context, instance_id)
+ dev_path = yield self.volume_manager.setup_compute_volume(context,
+ volume_id)
+ yield self.driver.attach_volume(instance_ref['str_id'],
+ dev_path,
+ mountpoint)
+ self.db.volume_attached(context, volume_id, instance_id, mountpoint)
defer.returnValue(True)
@defer.inlineCallbacks
@exception.wrap_exception
def detach_volume(self, context, instance_id, volume_id):
"""Detach a volume from an instance."""
- # despite the documentation, virsh detach-disk just wants the device
- # name without the leading /dev/
- # TODO(termie): check that instance_id exists
- volume_ref = db.volume_get(context, volume_id)
- target = volume_ref['mountpoint'].rpartition('/dev/')[2]
- # TODO(vish): Move this into the driver layer
- yield process.simple_execute(
- "sudo virsh detach-disk %s %s " % (instance_id, target))
- db.volume_detached(context, volume_id)
+ logging.debug("instance %s: detaching volume %s",
+ instance_id,
+ volume_id)
+ instance_ref = self.db.instance_get(context, instance_id)
+ volume_ref = self.db.volume_get(context, volume_id)
+ self.driver.detach_volume(instance_ref['str_id'],
+ volume_ref['mountpoint'])
+ self.db.volume_detached(context, volume_id)
defer.returnValue(True)
-
- @defer.inlineCallbacks
- def _init_aoe(self):
- """Discover aoe exported devices"""
- # TODO(vish): these shell calls should move into volume manager.
- yield process.simple_execute("sudo aoe-discover")
- yield process.simple_execute("sudo aoe-stat")
diff --git a/nova/db/api.py b/nova/db/api.py
index d5ccfca80..b49707392 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -410,6 +410,11 @@ def volume_get_all(context):
return IMPL.volume_get_all(context)
+def volume_get_instance(context, volume_id):
+ """Get the instance that a volume is attached to."""
+ return IMPL.volume_get_instance(context, volume_id)
+
+
def volume_get_by_project(context, project_id):
"""Get all volumes belonging to a project."""
return IMPL.volume_get_by_project(context, project_id)
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index fdd2765d3..5172b87b3 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -560,6 +560,12 @@ def volume_get_host(context, volume_id):
return volume_ref['host']
+def volume_get_instance(context, volume_id):
+ volume_ref = db.volume_get(context, volume_id)
+ instance_ref = db.instance_get(context, volume_ref['instance_id'])
+ return instance_ref
+
+
def volume_get_shelf_and_blade(_context, volume_id):
with managed_session() as session:
export_device = session.query(models.ExportDevice) \
diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py
index 626be87fe..310d4640e 100644
--- a/nova/db/sqlalchemy/models.py
+++ b/nova/db/sqlalchemy/models.py
@@ -119,52 +119,56 @@ class NovaBase(object):
def __getitem__(self, key):
return getattr(self, key)
-
-class Image(BASE, NovaBase):
- """Represents an image in the datastore"""
- __tablename__ = 'images'
- __prefix__ = 'ami'
- id = Column(Integer, primary_key=True)
- user_id = Column(String(255))
- project_id = Column(String(255))
- image_type = Column(String(255))
- public = Column(Boolean, default=False)
- state = Column(String(255))
- location = Column(String(255))
- arch = Column(String(255))
- default_kernel_id = Column(String(255))
- default_ramdisk_id = Column(String(255))
-
- @validates('image_type')
- def validate_image_type(self, key, image_type):
- assert(image_type in ['machine', 'kernel', 'ramdisk', 'raw'])
-
- @validates('state')
- def validate_state(self, key, state):
- assert(state in ['available', 'pending', 'disabled'])
-
- @validates('default_kernel_id')
- def validate_kernel_id(self, key, val):
- if val != 'machine':
- assert(val is None)
-
- @validates('default_ramdisk_id')
- def validate_ramdisk_id(self, key, val):
- if val != 'machine':
- assert(val is None)
-
-
-class Host(BASE, NovaBase):
- """Represents a host where services are running"""
- __tablename__ = 'hosts'
- id = Column(String(255), primary_key=True)
-
-
+# TODO(vish): Store images in the database instead of file system
+#class Image(BASE, NovaBase):
+# """Represents an image in the datastore"""
+# __tablename__ = 'images'
+# __prefix__ = 'ami'
+# id = Column(Integer, primary_key=True)
+# user_id = Column(String(255))
+# project_id = Column(String(255))
+# image_type = Column(String(255))
+# public = Column(Boolean, default=False)
+# state = Column(String(255))
+# location = Column(String(255))
+# arch = Column(String(255))
+# default_kernel_id = Column(String(255))
+# default_ramdisk_id = Column(String(255))
+#
+# @validates('image_type')
+# def validate_image_type(self, key, image_type):
+# assert(image_type in ['machine', 'kernel', 'ramdisk', 'raw'])
+#
+# @validates('state')
+# def validate_state(self, key, state):
+# assert(state in ['available', 'pending', 'disabled'])
+#
+# @validates('default_kernel_id')
+# def validate_kernel_id(self, key, val):
+# if val != 'machine':
+# assert(val is None)
+#
+# @validates('default_ramdisk_id')
+# def validate_ramdisk_id(self, key, val):
+# if val != 'machine':
+# assert(val is None)
+#
+#
+# TODO(vish): To make this into its own table, we need a good place to
+# create the host entries. In config somwhere? Or the first
+# time any object sets host? This only becomes particularly
+# important if we need to store per-host data.
+#class Host(BASE, NovaBase):
+# """Represents a host where services are running"""
+# __tablename__ = 'hosts'
+# id = Column(String(255), primary_key=True)
+#
+#
class Service(BASE, NovaBase):
"""Represents a running service on a host"""
__tablename__ = 'services'
id = Column(Integer, primary_key=True)
- host = Column(String(255), ForeignKey('hosts.id'))
+ host = Column(String(255)) # , ForeignKey('hosts.id'))
binary = Column(String(255))
topic = Column(String(255))
report_count = Column(Integer, nullable=False, default=0)
@@ -208,9 +212,12 @@ class Instance(BASE, NovaBase):
def name(self):
return self.str_id
- image_id = Column(Integer, ForeignKey('images.id'), nullable=True)
- kernel_id = Column(Integer, ForeignKey('images.id'), nullable=True)
- ramdisk_id = Column(Integer, ForeignKey('images.id'), nullable=True)
+ image_id = Column(String(255))
+ kernel_id = Column(String(255))
+ ramdisk_id = Column(String(255))
+# image_id = Column(Integer, ForeignKey('images.id'), nullable=True)
+# kernel_id = Column(Integer, ForeignKey('images.id'), nullable=True)
+# ramdisk_id = Column(Integer, ForeignKey('images.id'), nullable=True)
# ramdisk = relationship(Ramdisk, backref=backref('instances', order_by=id))
# kernel = relationship(Kernel, backref=backref('instances', order_by=id))
# project = relationship(Project, backref=backref('instances', order_by=id))
@@ -224,9 +231,9 @@ class Instance(BASE, NovaBase):
state_description = Column(String(255))
hostname = Column(String(255))
- host = Column(String(255), ForeignKey('hosts.id'))
+ host = Column(String(255)) # , ForeignKey('hosts.id'))
- instance_type = Column(Integer)
+ instance_type = Column(String(255))
user_data = Column(Text)
@@ -264,7 +271,7 @@ class Volume(BASE, NovaBase):
user_id = Column(String(255))
project_id = Column(String(255))
- host = Column(String(255), ForeignKey('hosts.id'))
+ host = Column(String(255)) # , ForeignKey('hosts.id'))
size = Column(Integer)
availability_zone = Column(String(255)) # TODO(vish): foreign key?
instance_id = Column(Integer, ForeignKey('instances.id'), nullable=True)
@@ -305,7 +312,7 @@ class Network(BASE, NovaBase):
dhcp_start = Column(String(255))
project_id = Column(String(255))
- host = Column(String(255), ForeignKey('hosts.id'))
+ host = Column(String(255)) # , ForeignKey('hosts.id'))
class NetworkIndex(BASE, NovaBase):
@@ -367,7 +374,7 @@ class FloatingIp(BASE, NovaBase):
fixed_ip = relationship(FixedIp, backref=backref('floating_ips'))
project_id = Column(String(255))
- host = Column(String(255), ForeignKey('hosts.id'))
+ host = Column(String(255)) # , ForeignKey('hosts.id'))
@property
def str_id(self):
@@ -392,8 +399,8 @@ class FloatingIp(BASE, NovaBase):
def register_models():
"""Register Models and create metadata"""
from sqlalchemy import create_engine
- models = (Image, Host, Service, Instance, Volume, ExportDevice,
- FixedIp, FloatingIp, Network, NetworkIndex)
+ models = (Service, Instance, Volume, ExportDevice,
+ FixedIp, FloatingIp, Network, NetworkIndex) # , Image, Host)
engine = create_engine(FLAGS.sql_connection, echo=False)
for model in models:
model.metadata.create_all(engine)
diff --git a/nova/db/sqlalchemy/session.py b/nova/db/sqlalchemy/session.py
index 70e3212e1..adcc42293 100644
--- a/nova/db/sqlalchemy/session.py
+++ b/nova/db/sqlalchemy/session.py
@@ -50,6 +50,7 @@ class SessionExecutionManager:
def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
- logging.exception("Rolling back due to failed transaction")
+ logging.exception("Rolling back due to failed transaction: %s",
+ exc_type)
self._session.rollback()
self._session.close()
diff --git a/nova/endpoint/cloud.py b/nova/endpoint/cloud.py
index 15136adac..932d42de4 100644
--- a/nova/endpoint/cloud.py
+++ b/nova/endpoint/cloud.py
@@ -41,6 +41,7 @@ from nova.endpoint import images
FLAGS = flags.FLAGS
+flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
def _gen_key(user_id, key_name):
@@ -262,11 +263,11 @@ class CloudController(object):
volume['mountpoint'])
if volume['attach_status'] == 'attached':
v['attachmentSet'] = [{'attachTime': volume['attach_time'],
- 'deleteOnTermination': volume['delete_on_termination'],
+ 'deleteOnTermination': False,
'device': volume['mountpoint'],
'instanceId': volume['instance_id'],
'status': 'attached',
- 'volume_id': volume['volume_id']}]
+ 'volume_id': volume['str_id']}]
else:
v['attachmentSet'] = [{}]
return v
@@ -293,7 +294,7 @@ class CloudController(object):
def attach_volume(self, context, volume_id, instance_id, device, **kwargs):
volume_ref = db.volume_get_by_str(context, volume_id)
# TODO(vish): abstract status checking?
- if volume_ref['status'] == "attached":
+ if volume_ref['attach_status'] == "attached":
raise exception.ApiError("Volume is already attached")
#volume.start_attach(instance_id, device)
instance_ref = db.instance_get_by_str(context, instance_id)
@@ -306,7 +307,7 @@ class CloudController(object):
"mountpoint": device}})
return defer.succeed({'attachTime': volume_ref['attach_time'],
'device': volume_ref['mountpoint'],
- 'instanceId': instance_ref['id_str'],
+ 'instanceId': instance_ref['id'],
'requestId': context.request_id,
'status': volume_ref['attach_status'],
'volumeId': volume_ref['id']})
@@ -334,7 +335,7 @@ class CloudController(object):
db.volume_detached(context)
return defer.succeed({'attachTime': volume_ref['attach_time'],
'device': volume_ref['mountpoint'],
- 'instanceId': instance_ref['id_str'],
+ 'instanceId': instance_ref['str_id'],
'requestId': context.request_id,
'status': volume_ref['attach_status'],
'volumeId': volume_ref['id']})
diff --git a/nova/process.py b/nova/process.py
index 425d9f162..74725c157 100644
--- a/nova/process.py
+++ b/nova/process.py
@@ -18,9 +18,10 @@
# under the License.
"""
-Process pool, still buggy right now.
+Process pool using twisted threading
"""
+import logging
import StringIO
from twisted.internet import defer
@@ -29,30 +30,14 @@ from twisted.internet import protocol
from twisted.internet import reactor
from nova import flags
+from nova.utils import ProcessExecutionError
FLAGS = flags.FLAGS
flags.DEFINE_integer('process_pool_size', 4,
'Number of processes to use in the process pool')
-
-# NOTE(termie): this is copied from twisted.internet.utils but since
-# they don't export it I've copied and modified
-class UnexpectedErrorOutput(IOError):
- """
- Standard error data was received where it was not expected. This is a
- subclass of L{IOError} to preserve backward compatibility with the previous
- error behavior of L{getProcessOutput}.
-
- @ivar processEnded: A L{Deferred} which will fire when the process which
- produced the data on stderr has ended (exited and all file descriptors
- closed).
- """
- def __init__(self, stdout=None, stderr=None):
- IOError.__init__(self, "got stdout: %r\nstderr: %r" % (stdout, stderr))
-
-
-# This is based on _BackRelay from twister.internal.utils, but modified to
-# capture both stdout and stderr, without odd stderr handling, and also to
+# This is based on _BackRelay from twister.internal.utils, but modified to
+# capture both stdout and stderr, without odd stderr handling, and also to
# handle stdin
class BackRelayWithInput(protocol.ProcessProtocol):
"""
@@ -62,22 +47,23 @@ class BackRelayWithInput(protocol.ProcessProtocol):
@ivar deferred: A L{Deferred} which will be called back with all of stdout
and all of stderr as well (as a tuple). C{terminate_on_stderr} is true
and any bytes are received over stderr, this will fire with an
- L{_UnexpectedErrorOutput} instance and the attribute will be set to
+ L{_ProcessExecutionError} instance and the attribute will be set to
C{None}.
- @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
- received over stderr, this attribute will refer to a L{Deferred} which
- will be called back when the process ends. This C{Deferred} is also
- associated with the L{_UnexpectedErrorOutput} which C{deferred} fires
- with earlier in this case so that users can determine when the process
+ @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
+ received over stderr, this attribute will refer to a L{Deferred} which
+ will be called back when the process ends. This C{Deferred} is also
+ associated with the L{_ProcessExecutionError} which C{deferred} fires
+ with earlier in this case so that users can determine when the process
has actually ended, in addition to knowing when bytes have been received
via stderr.
"""
- def __init__(self, deferred, started_deferred=None,
- terminate_on_stderr=False, check_exit_code=True,
- process_input=None):
+ def __init__(self, deferred, cmd, started_deferred=None,
+ terminate_on_stderr=False, check_exit_code=True,
+ process_input=None):
self.deferred = deferred
+ self.cmd = cmd
self.stdout = StringIO.StringIO()
self.stderr = StringIO.StringIO()
self.started_deferred = started_deferred
@@ -85,14 +71,18 @@ class BackRelayWithInput(protocol.ProcessProtocol):
self.check_exit_code = check_exit_code
self.process_input = process_input
self.on_process_ended = None
-
+
+ def _build_execution_error(self, exit_code=None):
+ return ProcessExecutionError(cmd=self.cmd,
+ exit_code=exit_code,
+ stdout=self.stdout.getvalue(),
+ stderr=self.stderr.getvalue())
+
def errReceived(self, text):
self.stderr.write(text)
if self.terminate_on_stderr and (self.deferred is not None):
self.on_process_ended = defer.Deferred()
- self.deferred.errback(UnexpectedErrorOutput(
- stdout=self.stdout.getvalue(),
- stderr=self.stderr.getvalue()))
+ self.deferred.errback(self._build_execution_error())
self.deferred = None
self.transport.loseConnection()
@@ -102,15 +92,19 @@ class BackRelayWithInput(protocol.ProcessProtocol):
def processEnded(self, reason):
if self.deferred is not None:
stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue()
- try:
- if self.check_exit_code:
- reason.trap(error.ProcessDone)
- self.deferred.callback((stdout, stderr))
- except:
- # NOTE(justinsb): This logic is a little suspicious to me...
- # If the callback throws an exception, then errback will be
- # called also. However, this is what the unit tests test for...
- self.deferred.errback(UnexpectedErrorOutput(stdout, stderr))
+ exit_code = reason.value.exitCode
+ if self.check_exit_code and exit_code <> 0:
+ self.deferred.errback(self._build_execution_error(exit_code))
+ else:
+ try:
+ if self.check_exit_code:
+ reason.trap(error.ProcessDone)
+ self.deferred.callback((stdout, stderr))
+ except:
+ # NOTE(justinsb): This logic is a little suspicious to me...
+ # If the callback throws an exception, then errback will be
+ # called also. However, this is what the unit tests test for...
+ self.deferred.errback(self._build_execution_error(exit_code))
elif self.on_process_ended is not None:
self.on_process_ended.errback(reason)
@@ -122,8 +116,8 @@ class BackRelayWithInput(protocol.ProcessProtocol):
self.transport.write(self.process_input)
self.transport.closeStdin()
-def get_process_output(executable, args=None, env=None, path=None,
- process_reactor=None, check_exit_code=True,
+def get_process_output(executable, args=None, env=None, path=None,
+ process_reactor=None, check_exit_code=True,
process_input=None, started_deferred=None,
terminate_on_stderr=False):
if process_reactor is None:
@@ -131,10 +125,15 @@ def get_process_output(executable, args=None, env=None, path=None,
args = args and args or ()
env = env and env and {}
deferred = defer.Deferred()
+ cmd = executable
+ if args:
+ cmd = cmd + " " + ' '.join(args)
+ logging.debug("Running cmd: %s", cmd)
process_handler = BackRelayWithInput(
- deferred,
- started_deferred=started_deferred,
- check_exit_code=check_exit_code,
+ deferred,
+ cmd,
+ started_deferred=started_deferred,
+ check_exit_code=check_exit_code,
process_input=process_input,
terminate_on_stderr=terminate_on_stderr)
# NOTE(vish): commands come in as unicode, but self.executes needs
@@ -142,7 +141,7 @@ def get_process_output(executable, args=None, env=None, path=None,
executable = str(executable)
if not args is None:
args = [str(x) for x in args]
- process_reactor.spawnProcess( process_handler, executable,
+ process_reactor.spawnProcess( process_handler, executable,
(executable,)+tuple(args), env, path)
return deferred
diff --git a/nova/service.py b/nova/service.py
index d7471f4c6..bc4b80fe4 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -58,10 +58,14 @@ class Service(object, service.Service):
self.binary)
self.service_id = service_ref['id']
except exception.NotFound:
- self.service_id = db.service_create(None, {'host': self.host,
- 'binary': self.binary,
- 'topic': self.topic,
- 'report_count': 0})
+ self._create_service_ref()
+
+
+ def _create_service_ref(self):
+ self.service_id = db.service_create(None, {'host': self.host,
+ 'binary': self.binary,
+ 'topic': self.topic,
+ 'report_count': 0})
def __getattr__(self, key):
try:
@@ -122,10 +126,6 @@ class Service(object, service.Service):
def kill(self, context=None):
"""Destroy the service object in the datastore"""
try:
- service_ref = db.service_get_by_args(context,
- self.host,
- self.binary)
- service_id = service_ref['id']
db.service_destroy(context, self.service_id)
except exception.NotFound:
logging.warn("Service killed that has no database entry")
@@ -134,7 +134,13 @@ class Service(object, service.Service):
def report_state(self, context=None):
"""Update the state of this service in the datastore."""
try:
- service_ref = db.service_get(context, self.service_id)
+ try:
+ service_ref = db.service_get(context, self.service_id)
+ except exception.NotFound:
+ logging.debug("The service database object disappeared, "
+ "Recreating it.")
+ self._create_service_ref()
+
db.service_update(context,
self.service_id,
{'report_count': service_ref['report_count'] + 1})
diff --git a/nova/utils.py b/nova/utils.py
index 536d722bb..3e4a3d94f 100644
--- a/nova/utils.py
+++ b/nova/utils.py
@@ -38,6 +38,16 @@ from nova import flags
FLAGS = flags.FLAGS
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
+class ProcessExecutionError(IOError):
+ def __init__( self, stdout=None, stderr=None, exit_code=None, cmd=None,
+ description=None):
+ if description is None:
+ description = "Unexpected error while running command."
+ if exit_code is None:
+ exit_code = '-'
+ message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
+ description, cmd, exit_code, stdout, stderr)
+ IOError.__init__(self, message)
def import_class(import_str):
"""Returns a class from a string including module and class"""
@@ -69,6 +79,7 @@ def fetchfile(url, target):
execute("curl --fail %s -o %s" % (url, target))
def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
+ logging.debug("Running cmd: %s", cmd)
env = os.environ.copy()
if addl_env:
env.update(addl_env)
@@ -83,8 +94,11 @@ def execute(cmd, process_input=None, addl_env=None, check_exit_code=True):
if obj.returncode:
logging.debug("Result was %s" % (obj.returncode))
if check_exit_code and obj.returncode <> 0:
- raise Exception( "Unexpected exit code: %s. result=%s"
- % (obj.returncode, result))
+ (stdout, stderr) = result
+ raise ProcessExecutionError(exit_code=obj.returncode,
+ stdout=stdout,
+ stderr=stderr,
+ cmd=cmd)
return result
diff --git a/nova/virt/fake.py b/nova/virt/fake.py
index 060b53729..4ae6afcc4 100644
--- a/nova/virt/fake.py
+++ b/nova/virt/fake.py
@@ -132,6 +132,14 @@ class FakeConnection(object):
del self.instances[instance.name]
return defer.succeed(None)
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ """Attach the disk at device_path to the instance at mountpoint"""
+ return True
+
+ def detach_volume(self, instance_name, mountpoint):
+ """Detach the disk attached to the instance at mountpoint"""
+ return True
+
def get_info(self, instance_name):
"""
Get a block of information about the given instance. This is returned
diff --git a/nova/virt/libvirt_conn.py b/nova/virt/libvirt_conn.py
index 621b7d576..73d0a366f 100644
--- a/nova/virt/libvirt_conn.py
+++ b/nova/virt/libvirt_conn.py
@@ -21,7 +21,6 @@
A connection to a hypervisor (e.g. KVM) through libvirt.
"""
-import json
import logging
import os
import shutil
@@ -154,12 +153,30 @@ class LibvirtConnection(object):
def _cleanup(self, instance):
target = os.path.join(FLAGS.instances_path, instance['name'])
- logging.info("Deleting instance files at %s", target)
+ logging.info('instance %s: deleting instance files %s',
+ instance['name'], target)
if os.path.exists(target):
shutil.rmtree(target)
@defer.inlineCallbacks
@exception.wrap_exception
+ def attach_volume(self, instance_name, device_path, mountpoint):
+ yield process.simple_execute("sudo virsh attach-disk %s %s %s" %
+ (instance_name,
+ device_path,
+ mountpoint.rpartition('/dev/')[2]))
+
+ @defer.inlineCallbacks
+ @exception.wrap_exception
+ def detach_volume(self, instance_name, mountpoint):
+ # NOTE(vish): despite the documentation, virsh detach-disk just
+ # wants the device name without the leading /dev/
+ yield process.simple_execute("sudo virsh detach-disk %s %s" %
+ (instance_name,
+ mountpoint.rpartition('/dev/')[2]))
+
+ @defer.inlineCallbacks
+ @exception.wrap_exception
def reboot(self, instance):
xml = self.to_xml(instance)
yield self._conn.lookupByName(instance['name']).destroy()
@@ -171,7 +188,7 @@ class LibvirtConnection(object):
try:
instance.set_state(self.get_info(instance['name'])['state'])
if instance.state == power_state.RUNNING:
- logging.debug('rebooted instance %s' % instance['name'])
+ logging.debug('instance %s: rebooted', instance['name'])
timer.stop()
d.callback(None)
except Exception, exn:
@@ -192,7 +209,7 @@ class LibvirtConnection(object):
yield self._conn.createXML(xml, 0)
# TODO(termie): this should actually register
# a callback to check for successful boot
- logging.debug("Instance is running")
+ logging.debug("instance %s: is running", instance['name'])
local_d = defer.Deferred()
timer = task.LoopingCall(f=None)
@@ -200,11 +217,11 @@ class LibvirtConnection(object):
try:
instance.set_state(self.get_info(instance['name'])['state'])
if instance.state == power_state.RUNNING:
- logging.debug('booted instance %s', instance['name'])
+ logging.debug('instance %s: booted', instance['name'])
timer.stop()
local_d.callback(None)
except:
- logging.exception('Failed to boot instance %s',
+ logging.exception('instance %s: failed to boot',
instance['name'])
instance.set_state(power_state.SHUTDOWN)
timer.stop()
@@ -227,7 +244,7 @@ class LibvirtConnection(object):
# TODO(termie): these are blocking calls, it would be great
# if they weren't.
- logging.info('Creating image for: %s', inst['name'])
+ logging.info('instance %s: Creating image', inst['name'])
f = open(basepath('libvirt.xml'), 'w')
f.write(libvirt_xml)
f.close()
@@ -249,7 +266,7 @@ class LibvirtConnection(object):
process_input=process_input,
check_exit_code=True)
- key = inst.key_data
+ key = str(inst['key_data'])
net = None
network_ref = db.project_get_network(None, project.id)
if network_ref['injected']:
@@ -262,7 +279,12 @@ class LibvirtConnection(object):
'broadcast': network_ref['broadcast'],
'dns': network_ref['dns']}
if key or net:
- logging.info('Injecting data into image %s', inst.image_id)
+ if key:
+ logging.info('instance %s: injecting key into image %s',
+ inst['name'], inst.image_id)
+ if net:
+ logging.info('instance %s: injecting net into image %s',
+ inst['name'], inst.image_id)
yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute)
if os.path.exists(basepath('disk')):
@@ -275,7 +297,7 @@ class LibvirtConnection(object):
def to_xml(self, instance):
# TODO(termie): cache?
- logging.debug("Starting the toXML method")
+ logging.debug('instance %s: starting toXML method', instance['name'])
network = db.project_get_network(None, instance['project_id'])
# FIXME(vish): stick this in db
instance_type = instance_types.INSTANCE_TYPES[instance['instance_type']]
@@ -288,7 +310,7 @@ class LibvirtConnection(object):
'bridge_name': network['bridge'],
'mac_address': instance['mac_address']}
libvirt_xml = self.libvirt_xml % xml_info
- logging.debug("Finished the toXML method")
+ logging.debug('instance %s: finished toXML method', instance['name'])
return libvirt_xml
diff --git a/nova/volume/driver.py b/nova/volume/driver.py
index f5c1330a3..f875e0213 100644
--- a/nova/volume/driver.py
+++ b/nova/volume/driver.py
@@ -35,35 +35,34 @@ flags.DEFINE_string('aoe_eth_dev', 'eth0',
'Which device to export the volumes on')
-
class AOEDriver(object):
"""Executes commands relating to AOE volumes"""
def __init__(self, execute=process.simple_execute, *args, **kwargs):
self._execute = execute
@defer.inlineCallbacks
- def create_volume(self, volume_id, size):
+ def create_volume(self, volume_name, size):
"""Creates a logical volume"""
# NOTE(vish): makes sure that the volume group exists
- yield self._execute("vgs | grep %s" % FLAGS.volume_group)
+ yield self._execute("vgs %s" % FLAGS.volume_group)
if int(size) == 0:
sizestr = '100M'
else:
sizestr = '%sG' % size
yield self._execute(
"sudo lvcreate -L %s -n %s %s" % (sizestr,
- volume_id,
+ volume_name,
FLAGS.volume_group))
@defer.inlineCallbacks
- def delete_volume(self, volume_id):
+ def delete_volume(self, volume_name):
"""Deletes a logical volume"""
yield self._execute(
"sudo lvremove -f %s/%s" % (FLAGS.volume_group,
- volume_id))
+ volume_name))
@defer.inlineCallbacks
- def create_export(self, volume_id, shelf_id, blade_id):
+ def create_export(self, volume_name, shelf_id, blade_id):
"""Creates an export for a logical volume"""
yield self._execute(
"sudo vblade-persist setup %s %s %s /dev/%s/%s" %
@@ -71,10 +70,16 @@ class AOEDriver(object):
blade_id,
FLAGS.aoe_eth_dev,
FLAGS.volume_group,
- volume_id))
+ volume_name))
+
+ @defer.inlineCallbacks
+ def discover_volume(self, _volume_name):
+ """Discover volume on a remote host"""
+ yield self._execute("sudo aoe-discover")
+ yield self._execute("sudo aoe-stat")
@defer.inlineCallbacks
- def remove_export(self, _volume_id, shelf_id, blade_id):
+ def remove_export(self, _volume_name, shelf_id, blade_id):
"""Removes an export for a logical volume"""
yield self._execute(
"sudo vblade-persist stop %s %s" % (shelf_id, blade_id))
@@ -92,7 +97,6 @@ class AOEDriver(object):
check_exit_code=False)
-
class FakeAOEDriver(AOEDriver):
"""Logs calls instead of executing"""
def __init__(self, *args, **kwargs):
@@ -102,4 +106,3 @@ class FakeAOEDriver(AOEDriver):
def fake_execute(cmd, *_args, **_kwargs):
"""Execute that simply logs the command"""
logging.debug("FAKE AOE: %s", cmd)
-
diff --git a/nova/volume/manager.py b/nova/volume/manager.py
index e5f4805a1..c4fa1f982 100644
--- a/nova/volume/manager.py
+++ b/nova/volume/manager.py
@@ -82,17 +82,17 @@ class AOEManager(manager.Manager):
size = volume_ref['size']
logging.debug("volume %s: creating lv of size %sG", volume_id, size)
- yield self.driver.create_volume(volume_id, size)
+ yield self.driver.create_volume(volume_ref['str_id'], size)
logging.debug("volume %s: allocating shelf & blade", volume_id)
self._ensure_blades(context)
rval = self.db.volume_allocate_shelf_and_blade(context, volume_id)
(shelf_id, blade_id) = rval
- logging.debug("volume %s: exporting shelf %s & blade %s", (volume_id,
- shelf_id, blade_id))
+ logging.debug("volume %s: exporting shelf %s & blade %s", volume_id,
+ shelf_id, blade_id)
- yield self.driver.create_export(volume_id, shelf_id, blade_id)
+ yield self.driver.create_export(volume_ref['str_id'], shelf_id, blade_id)
# TODO(joshua): We need to trigger a fanout message
# for aoe-discover on all the nodes
@@ -114,8 +114,22 @@ class AOEManager(manager.Manager):
if volume_ref['host'] != FLAGS.host:
raise exception.Error("Volume is not local to this node")
shelf_id, blade_id = self.db.volume_get_shelf_and_blade(context,
- volume_id)
- yield self.driver.remove_export(volume_id, shelf_id, blade_id)
- yield self.driver.delete_volume(volume_id)
+ volume_id)
+ yield self.driver.remove_export(volume_ref['str_id'],
+ shelf_id,
+ blade_id)
+ yield self.driver.delete_volume(volume_ref['str_id'])
self.db.volume_destroy(context, volume_id)
defer.returnValue(True)
+
+ @defer.inlineCallbacks
+ def setup_compute_volume(self, context, volume_id):
+ """Setup remote volume on compute host
+
+ Returns path to device.
+ """
+ volume_ref = self.db.volume_get(context, volume_id)
+ yield self.driver.discover_volume(volume_ref['str_id'])
+ shelf_id, blade_id = self.db.volume_get_shelf_and_blade(context,
+ volume_id)
+ defer.returnValue("/dev/etherd/e%s.%s" % (shelf_id, blade_id))