summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2012-02-22 17:20:24 +0000
committerGerrit Code Review <review@openstack.org>2012-02-22 17:20:24 +0000
commit60cec0a7f54a9e7ae1c3b32fb39c7e7ee24dfde2 (patch)
tree9d308826cc31ce037f6ac5d1a223c915e7a6572c
parentf300018b1a731a9e427e6b77a05376d78fa8f9ec (diff)
parent424de7eea2588a3f4143e5874aac01d0dd1917e6 (diff)
Merge "blueprint host-aggregates: improvements and clean-up"
-rw-r--r--nova/compute/manager.py17
-rw-r--r--nova/db/api.py5
-rw-r--r--nova/db/sqlalchemy/api.py32
-rw-r--r--nova/tests/test_db_api.py23
-rw-r--r--nova/tests/test_xenapi.py7
-rw-r--r--nova/tests/xenapi/stubs.py18
-rw-r--r--nova/virt/xenapi/pool.py16
-rw-r--r--nova/virt/xenapi/vmops.py4
-rw-r--r--nova/virt/xenapi_conn.py26
-rw-r--r--plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost8
10 files changed, 119 insertions, 37 deletions
diff --git a/nova/compute/manager.py b/nova/compute/manager.py
index ac9bebafe..5fb5ff9c6 100644
--- a/nova/compute/manager.py
+++ b/nova/compute/manager.py
@@ -2341,16 +2341,21 @@ class ComputeManager(manager.SchedulerDependentManager):
try:
self.driver.remove_from_aggregate(context,
aggregate, host, **kwargs)
- except exception.AggregateError:
+ except (exception.AggregateError,
+ exception.InvalidAggregateAction) as e:
error = sys.exc_info()
- self._undo_aggregate_operation(context, self.db.aggregate_host_add,
- aggregate.id, host)
+ self._undo_aggregate_operation(
+ context, self.db.aggregate_host_add,
+ aggregate.id, host,
+ isinstance(e, exception.AggregateError))
raise error[0], error[1], error[2]
- def _undo_aggregate_operation(self, context, op, aggregate_id, host):
+ def _undo_aggregate_operation(self, context, op, aggregate_id,
+ host, set_error=True):
try:
- status = {'operational_state': aggregate_states.ERROR}
- self.db.aggregate_update(context, aggregate_id, status)
+ if set_error:
+ status = {'operational_state': aggregate_states.ERROR}
+ self.db.aggregate_update(context, aggregate_id, status)
op(context, aggregate_id, host)
except Exception:
LOG.exception(_('Aggregate %(aggregate_id)s: unrecoverable state '
diff --git a/nova/db/api.py b/nova/db/api.py
index bd8057532..48fce9d57 100644
--- a/nova/db/api.py
+++ b/nova/db/api.py
@@ -1793,6 +1793,11 @@ def aggregate_get(context, aggregate_id, read_deleted='no'):
return IMPL.aggregate_get(context, aggregate_id, read_deleted)
+def aggregate_get_by_host(context, host, read_deleted='no'):
+ """Get a specific aggregate by host"""
+ return IMPL.aggregate_get_by_host(context, host, read_deleted)
+
+
def aggregate_update(context, aggregate_id, values):
"""Update the attributes of an aggregates. If values contains a metadata
key, it updates the aggregate metadata too."""
diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py
index 026fe2f28..733e903f7 100644
--- a/nova/db/sqlalchemy/api.py
+++ b/nova/db/sqlalchemy/api.py
@@ -4271,12 +4271,24 @@ def _aggregate_get_query(context, model_class, id_field, id,
@require_admin_context
def aggregate_create(context, values, metadata=None):
- try:
+ session = get_session()
+ aggregate = _aggregate_get_query(context,
+ models.Aggregate,
+ models.Aggregate.name,
+ values['name'],
+ session=session,
+ read_deleted='yes').first()
+ if not aggregate:
aggregate = models.Aggregate()
values.setdefault('operational_state', aggregate_states.CREATED)
aggregate.update(values)
- aggregate.save()
- except exception.DBError:
+ aggregate.save(session=session)
+ elif aggregate.deleted:
+ aggregate.update({'deleted': False,
+ 'deleted_at': None,
+ 'availability_zone': values['availability_zone']})
+ aggregate.save(session=session)
+ else:
raise exception.AggregateNameExists(aggregate_name=values['name'])
if metadata:
aggregate_metadata_add(context, aggregate.id, metadata)
@@ -4297,6 +4309,20 @@ def aggregate_get(context, aggregate_id, read_deleted='no'):
@require_admin_context
+def aggregate_get_by_host(context, host, read_deleted='no'):
+ aggregate_host = _aggregate_get_query(context,
+ models.AggregateHost,
+ models.AggregateHost.host,
+ host,
+ read_deleted='no').first()
+
+ if not aggregate_host:
+ raise exception.AggregateHostNotFound(host=host)
+
+ return aggregate_get(context, aggregate_host.aggregate_id, read_deleted)
+
+
+@require_admin_context
def aggregate_update(context, aggregate_id, values):
session = get_session()
aggregate = _aggregate_get_query(context,
diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py
index 4cb17d958..cd1adcfbf 100644
--- a/nova/tests/test_db_api.py
+++ b/nova/tests/test_db_api.py
@@ -338,6 +338,15 @@ class AggregateDBApiTestCase(test.TestCase):
result = _create_aggregate(metadata=None)
self.assertEqual(result['operational_state'], 'created')
+ def test_aggregate_create_avoid_name_conflict(self):
+ """Test we can avoid conflict on deleted aggregates."""
+ r1 = _create_aggregate(metadata=None)
+ db.aggregate_delete(context.get_admin_context(), r1.id)
+ values = {'name': r1.name, 'availability_zone': 'new_zone'}
+ r2 = _create_aggregate(values=values)
+ self.assertEqual(r2.name, values['name'])
+ self.assertEqual(r2.availability_zone, values['availability_zone'])
+
def test_aggregate_create_raise_exist_exc(self):
"""Ensure aggregate names are distinct."""
_create_aggregate(metadata=None)
@@ -383,6 +392,20 @@ class AggregateDBApiTestCase(test.TestCase):
self.assertEqual(_get_fake_aggr_hosts(), expected.hosts)
self.assertEqual(_get_fake_aggr_metadata(), expected.metadetails)
+ def test_aggregate_get_by_host(self):
+ """Ensure we can get an aggregate by host."""
+ ctxt = context.get_admin_context()
+ r1 = _create_aggregate_with_hosts(context=ctxt)
+ r2 = db.aggregate_get_by_host(ctxt, 'foo.openstack.org')
+ self.assertEqual(r1.id, r2.id)
+
+ def test_aggregate_get_by_host_not_found(self):
+ """Ensure AggregateHostNotFound is raised with unknown host."""
+ ctxt = context.get_admin_context()
+ _create_aggregate_with_hosts(context=ctxt)
+ self.assertRaises(exception.AggregateHostNotFound,
+ db.aggregate_get_by_host, ctxt, 'unknown_host')
+
def test_aggregate_delete_raise_not_found(self):
"""Ensure AggregateNotFound is raised when deleting an aggregate."""
ctxt = context.get_admin_context()
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index b08bbd69c..312aaea37 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -1757,10 +1757,13 @@ class XenAPIAggregateTestCase(test.TestCase):
'Dom0IptablesFirewallDriver',
host='host')
xenapi_fake.reset()
+ host_ref = xenapi_fake.get_all('host')[0]
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
self.context = context.get_admin_context()
self.conn = xenapi_conn.get_connection(False)
- self.fake_metadata = {'master_compute': 'host'}
+ self.fake_metadata = {'master_compute': 'host',
+ 'host': xenapi_fake.get_record('host',
+ host_ref)['uuid']}
def tearDown(self):
super(XenAPIAggregateTestCase, self).tearDown()
@@ -1871,7 +1874,7 @@ class XenAPIAggregateTestCase(test.TestCase):
aggregate = self._aggregate_setup(aggr_state=aggregate_states.ACTIVE,
hosts=['host', 'host2'],
metadata=self.fake_metadata)
- self.assertRaises(exception.AggregateError,
+ self.assertRaises(exception.InvalidAggregateAction,
self.conn._pool.remove_from_aggregate,
self.context, aggregate, "host")
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index df444d515..01077be8d 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -202,6 +202,12 @@ class FakeSessionForVMTests(fake.SessionBase):
vm['is_a_template'] = False
vm['is_control_domain'] = False
vm['domid'] = random.randrange(1, 1 << 16)
+ return vm
+
+ def VM_start_on(self, _1, vm_ref, host_ref, _2, _3):
+ vm_rec = self.VM_start(_1, vm_ref, _2, _3)
+ host_rec = fake.get_record('host', host_ref)
+ vm_rec['resident_on'] = host_rec['uuid']
def VM_snapshot(self, session_ref, vm_ref, label):
status = "Running"
@@ -334,7 +340,7 @@ class FakeSessionForVolumeFailedTests(FakeSessionForVolumeTests):
pass
-class FakeSessionForMigrationTests(fake.SessionBase):
+class FakeSessionForMigrationTests(FakeSessionForVMTests):
"""Stubs out a XenAPISession for Migration tests"""
def __init__(self, uri):
super(FakeSessionForMigrationTests, self).__init__(uri)
@@ -342,16 +348,6 @@ class FakeSessionForMigrationTests(fake.SessionBase):
def VDI_get_by_uuid(self, *args):
return 'hurr'
- def VM_start(self, _1, ref, _2, _3):
- vm = fake.get_record('VM', ref)
- if vm['power_state'] != 'Halted':
- raise fake.Failure(['VM_BAD_POWER_STATE', ref, 'Halted',
- vm['power_state']])
- vm['power_state'] = 'Running'
- vm['is_a_template'] = False
- vm['is_control_domain'] = False
- vm['domid'] = random.randrange(1, 1 << 16)
-
def VM_set_name_label(self, *args):
pass
diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py
index 95f0f3467..01db91e31 100644
--- a/nova/virt/xenapi/pool.py
+++ b/nova/virt/xenapi/pool.py
@@ -61,11 +61,11 @@ class ResourcePool(object):
if len(aggregate.hosts) == 1:
# this is the first host of the pool -> make it master
self._init_pool(aggregate.id, aggregate.name)
- # save metadata so that we can find the master again:
- # the password should be encrypted, really.
+ # save metadata so that we can find the master again
values = {
'operational_state': aggregate_states.ACTIVE,
- 'metadata': {'master_compute': host},
+ 'metadata': {'master_compute': host,
+ host: self._host_uuid},
}
db.aggregate_update(context, aggregate.id, values)
else:
@@ -85,7 +85,6 @@ class ResourcePool(object):
elif master_compute and master_compute != host:
# send rpc cast to master, asking to add the following
# host with specified credentials.
- # NOTE: password in clear is not great, but it'll do for now
forward_request(context, "add_aggregate_host", master_compute,
aggregate.id, host,
self._host_addr, self._host_uuid)
@@ -104,15 +103,17 @@ class ResourcePool(object):
# master is on its own, otherwise raise fault. Destroying a
# pool made only by master is fictional
if len(aggregate.hosts) > 1:
- raise exception.AggregateError(
+ # NOTE: this could be avoided by doing a master
+ # re-election, but this is simpler for now.
+ raise exception.InvalidAggregateAction(
aggregate_id=aggregate.id,
action='remove_from_aggregate',
reason=_('Unable to eject %(host)s '
'from the pool; pool not empty')
% locals())
self._clear_pool(aggregate.id)
- db.aggregate_metadata_delete(context,
- aggregate.id, 'master_compute')
+ for key in ['master_compute', host]:
+ db.aggregate_metadata_delete(context, aggregate.id, key)
elif master_compute and master_compute != host:
# A master exists -> forward pool-eject request to master
forward_request(context, "remove_aggregate_host", master_compute,
@@ -194,6 +195,7 @@ def forward_request(context, request_type, master, aggregate_id,
"""Casts add/remove requests to the pool master."""
# replace the address from the xenapi connection url
# because this might be 169.254.0.1, i.e. xenapi
+ # NOTE: password in clear is not great, but it'll do for now
sender_url = swap_xapi_host(FLAGS.xenapi_connection_url, slave_address)
rpc.cast(context, db.queue_get_for(context, FLAGS.compute_topic, master),
{"method": request_type,
diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py
index 8ee34aaa7..78177920c 100644
--- a/nova/virt/xenapi/vmops.py
+++ b/nova/virt/xenapi/vmops.py
@@ -181,7 +181,9 @@ class VMOps(object):
raise Exception(_('Attempted to power on non-existent instance'
' bad instance id %s') % instance.id)
LOG.debug(_("Starting instance %s"), instance.name)
- self._session.call_xenapi('VM.start', vm_ref, False, False)
+ self._session.call_xenapi('VM.start_on', vm_ref,
+ self._session.get_xenapi_host(),
+ False, False)
def _create_disks(self, context, instance, image_meta):
disk_image_type = VMHelper.determine_disk_image_type(image_meta)
diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py
index c9363bb74..77efb34e3 100644
--- a/nova/virt/xenapi_conn.py
+++ b/nova/virt/xenapi_conn.py
@@ -506,8 +506,10 @@ class XenAPISession(object):
def __init__(self, url, user, pw):
self.XenAPI = self.get_imported_xenapi()
self._sessions = queue.Queue()
+ self.host_uuid = None
exception = self.XenAPI.Failure(_("Unable to log in to XenAPI "
- "(is the Dom0 disk full?)"))
+ "(is the Dom0 disk full?)"))
+ is_slave = False
for i in xrange(FLAGS.xenapi_connection_concurrent):
try:
session = self._create_session(url)
@@ -520,10 +522,21 @@ class XenAPISession(object):
session = self.XenAPI.Session(pool.swap_xapi_host(url,
master))
session.login_with_password(user, pw)
+ is_slave = True
else:
raise
self._sessions.put(session)
+ if is_slave:
+ try:
+ aggr = db.aggregate_get_by_host(context.get_admin_context(),
+ FLAGS.host)
+ self.host_uuid = aggr.metadetails[FLAGS.host]
+ except exception.AggregateHostNotFound:
+ LOG.exception(_('Host is member of a pool, but DB '
+ 'says otherwise'))
+ raise
+
def get_product_version(self):
"""Return a tuple of (major, minor, rev) for the host version"""
host = self.get_xenapi_host()
@@ -551,9 +564,12 @@ class XenAPISession(object):
self._sessions.put(session)
def get_xenapi_host(self):
- """Return the xenapi host"""
+ """Return the xenapi host on which nova-compute runs on."""
with self._get_session() as session:
- return session.xenapi.session.get_this_host(session.handle)
+ if self.host_uuid:
+ return session.xenapi.host.get_by_uuid(self.host_uuid)
+ else:
+ return session.xenapi.session.get_this_host(session.handle)
def call_xenapi(self, method, *args):
"""Call the specified XenAPI method on a background thread."""
@@ -578,6 +594,10 @@ class XenAPISession(object):
# _get_session() acquires a session too, it can result in a deadlock
# if multiple greenthreads race with each other. See bug 924918
host = self.get_xenapi_host()
+ # NOTE(armando): pass the host uuid along with the args so that
+ # the plugin gets executed on the right host when using XS pools
+ if self.host_uuid:
+ args['host_uuid'] = self.host_uuid
with self._get_session() as session:
return tpool.execute(self._unwrap_plugin_exceptions,
session.xenapi.Async.host.call_plugin,
diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost
index 9a7ad84af..8fcf9fee0 100644
--- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost
+++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/xenhost
@@ -117,9 +117,9 @@ def _resume_compute(session, compute_ref, compute_uuid):
logging.exception('Waited %d seconds for the slave to '
'become available.' % (c * DEFAULT_SLEEP))
time.sleep(DEFAULT_SLEEP)
- raise pluginlib.PluginError('Unrecoverable error: the host has '
- 'not come back for more than %d seconds'
- % (DEFAULT_SLEEP * (DEFAULT_TRIES + 1)))
+ raise pluginlib.PluginError('Unrecoverable error: the host has '
+ 'not come back for more than %d seconds'
+ % (DEFAULT_SLEEP * (DEFAULT_TRIES + 1)))
def _get_host_uuid():
@@ -315,7 +315,7 @@ def host_data(self, arg_dict):
"""Runs the commands on the xenstore host to return the current status
information.
"""
- host_uuid = _get_host_uuid()
+ host_uuid = arg_dict.get('host_uuid', _get_host_uuid())
cmd = "xe host-param-list uuid=%s" % host_uuid
resp = _run_command(cmd)
parsed_data = parse_response(resp)