summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nova/tests/test_db_api.py169
1 files changed, 108 insertions, 61 deletions
diff --git a/nova/tests/test_db_api.py b/nova/tests/test_db_api.py
index bf7cc003a..b93c682bb 100644
--- a/nova/tests/test_db_api.py
+++ b/nova/tests/test_db_api.py
@@ -1100,67 +1100,6 @@ class DbApiTestCase(DbTestCase):
_compare(bw_usages[2], expected_bw_usages[2])
timeutils.clear_time_override()
- def test_key_pair_create(self):
- ctxt = context.get_admin_context()
- values = {'name': 'test_keypair', 'public_key': 'test-public-key',
- 'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'}
- keypair = db.key_pair_create(ctxt, values)
- self.assertNotEqual(None, keypair)
- for name, value in values.iteritems():
- self.assertEqual(keypair.get(name), value)
-
- def test_key_pair_create_with_duplicate_name(self):
- ctxt = context.get_admin_context()
- values = {'name': 'test_keypair', 'public_key': 'test-public-key',
- 'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'}
- keypair = db.key_pair_create(ctxt, values)
- self.assertRaises(exception.KeyPairExists,
- db.key_pair_create, ctxt, values)
-
- def test_admin_get_deleted_keypair(self):
- # Test deleted keypair can be read by admin user.
- ctxt = context.get_admin_context()
- values = {'name': 'test_keypair', 'public_key': 'test-public-key',
- 'user_id': 'test_user_id', 'fingerprint': 'test_fingerprint'}
- keypair = db.key_pair_create(ctxt, values)
- db.key_pair_destroy(ctxt, keypair['user_id'], keypair['name'])
-
- # Raise exception when read_deleted is 'no'.
- self.assertRaises(exception.KeypairNotFound, db.key_pair_get, ctxt,
- keypair['user_id'], keypair['name'])
- ctxt = ctxt.elevated(read_deleted='yes')
- db_keypair = db.key_pair_get(ctxt, keypair['user_id'],
- keypair['name'])
- self.assertEqual(db_keypair['name'], keypair['name'])
- self.assertEqual(db_keypair['deleted'], keypair['id'])
-
- def test_admin_get_all_keypairs_including_deleted(self):
- # Test all deleted/non-deleted keypairs can be read by admin user.
- ctxt = context.get_admin_context()
- keypair1_values = {'name': 'test_keypair1',
- 'public_key': 'test-public-key1',
- 'user_id': 'test_user_id',
- 'fingerprint': 'test_fingerprint1'}
- keypair2_values = {'name': 'test_keypair2',
- 'public_key': 'test-public-key2',
- 'user_id': 'test_user_id',
- 'fingerprint': 'test_fingerprint2'}
- keypair1 = db.key_pair_create(ctxt, keypair1_values)
- keypair2 = db.key_pair_create(ctxt, keypair2_values)
- db.key_pair_destroy(ctxt, keypair1['user_id'], keypair1['name'])
- db.key_pair_destroy(ctxt, keypair2['user_id'], keypair2['name'])
- # Returns non-deleted keypairs.
- result = db.key_pair_get_all_by_user(ctxt, keypair1['user_id'])
- self.assertEqual(result, [])
- ctxt = ctxt.elevated(read_deleted='yes')
- # Returns deleted and non-deleted keypairs.
- db_keypairs = db.key_pair_get_all_by_user(ctxt, keypair1['user_id'])
- expected_deleted_ids = [keypair1['id'], keypair2['id']]
- expected_keypair_names = [keypair1['name'], keypair2['name']]
- for keypair in db_keypairs:
- self.assertTrue(keypair['name'] in expected_keypair_names)
- self.assertTrue(keypair['deleted'] in expected_deleted_ids)
-
def _get_fake_aggr_values():
return {'name': 'fake_aggregate'}
@@ -3604,6 +3543,114 @@ class VirtualInterfaceTestCase(test.TestCase, ModelsObjectComparatorMixin):
self._assertEqualListsOfObjects(vifs, real_vifs)
+class KeyPairTestCase(test.TestCase, ModelsObjectComparatorMixin):
+ def setUp(self):
+ super(KeyPairTestCase, self).setUp()
+ self.ctxt = context.get_admin_context()
+
+ def _create_key_pair(self, values):
+ return db.key_pair_create(self.ctxt, values)
+
+ def test_key_pair_create(self):
+ param = {
+ 'name': 'test_1',
+ 'user_id': 'test_user_id_1',
+ 'public_key': 'test_public_key_1',
+ 'fingerprint': 'test_fingerprint_1'
+ }
+ key_pair = self._create_key_pair(param)
+
+ self.assertTrue(key_pair['id'] is not None)
+ ignored_keys = ['deleted', 'created_at', 'updated_at',
+ 'deleted_at', 'id']
+ self._assertEqualObjects(key_pair, param, ignored_keys)
+
+ def test_key_pair_create_with_duplicate_name(self):
+ params = {'name': 'test_name', 'user_id': 'test_user_id'}
+ self._create_key_pair(params)
+ self.assertRaises(exception.KeyPairExists, self._create_key_pair,
+ params)
+
+ def test_key_pair_get(self):
+ params = [
+ {'name': 'test_1', 'user_id': 'test_user_id_1'},
+ {'name': 'test_2', 'user_id': 'test_user_id_2'},
+ {'name': 'test_3', 'user_id': 'test_user_id_3'}
+ ]
+ key_pairs = [self._create_key_pair(p) for p in params]
+
+ for key in key_pairs:
+ real_key = db.key_pair_get(self.ctxt, key['user_id'], key['name'])
+ self._assertEqualObjects(key, real_key)
+
+ def test_key_pair_get_no_results(self):
+ param = {'name': 'test_1', 'user_id': 'test_user_id_1'}
+ self.assertRaises(exception.KeypairNotFound, db.key_pair_get,
+ self.ctxt, param['user_id'], param['name'])
+
+ def test_key_pair_get_deleted(self):
+ param = {'name': 'test_1', 'user_id': 'test_user_id_1'}
+ key_pair_created = self._create_key_pair(param)
+
+ db.key_pair_destroy(self.ctxt, param['user_id'], param['name'])
+ self.assertRaises(exception.KeypairNotFound, db.key_pair_get,
+ self.ctxt, param['user_id'], param['name'])
+
+ ctxt = self.ctxt.elevated(read_deleted='yes')
+ key_pair_deleted = db.key_pair_get(ctxt, param['user_id'],
+ param['name'])
+ ignored_keys = ['deleted', 'created_at', 'updated_at', 'deleted_at']
+ self._assertEqualObjects(key_pair_deleted, key_pair_created,
+ ignored_keys)
+ self.assertEqual(key_pair_deleted['deleted'], key_pair_deleted['id'])
+
+ def test_key_pair_get_all_by_user(self):
+ params = [
+ {'name': 'test_1', 'user_id': 'test_user_id_1'},
+ {'name': 'test_2', 'user_id': 'test_user_id_1'},
+ {'name': 'test_3', 'user_id': 'test_user_id_2'}
+ ]
+ key_pairs_user_1 = [self._create_key_pair(p) for p in params
+ if p['user_id'] == 'test_user_id_1']
+ key_pairs_user_2 = [self._create_key_pair(p) for p in params
+ if p['user_id'] == 'test_user_id_2']
+
+ real_keys_1 = db.key_pair_get_all_by_user(self.ctxt, 'test_user_id_1')
+ real_keys_2 = db.key_pair_get_all_by_user(self.ctxt, 'test_user_id_2')
+
+ self._assertEqualListsOfObjects(key_pairs_user_1, real_keys_1)
+ self._assertEqualListsOfObjects(key_pairs_user_2, real_keys_2)
+
+ def test_key_pair_count_by_user(self):
+ params = [
+ {'name': 'test_1', 'user_id': 'test_user_id_1'},
+ {'name': 'test_2', 'user_id': 'test_user_id_1'},
+ {'name': 'test_3', 'user_id': 'test_user_id_2'}
+ ]
+ for p in params:
+ self._create_key_pair(p)
+
+ count_1 = db.key_pair_count_by_user(self.ctxt, 'test_user_id_1')
+ self.assertEqual(count_1, 2)
+
+ count_2 = db.key_pair_count_by_user(self.ctxt, 'test_user_id_2')
+ self.assertEqual(count_2, 1)
+
+ def test_key_pair_destroy(self):
+ param = {'name': 'test_1', 'user_id': 'test_user_id_1'}
+ self._create_key_pair(param)
+
+ db.key_pair_destroy(self.ctxt, param['user_id'], param['name'])
+ self.assertRaises(exception.KeypairNotFound, db.key_pair_get,
+ self.ctxt, param['user_id'], param['name'])
+
+ def test_key_pair_destroy_no_such_key(self):
+ param = {'name': 'test_1', 'user_id': 'test_user_id_1'}
+ self.assertRaises(exception.KeypairNotFound,
+ db.key_pair_destroy, self.ctxt,
+ param['user_id'], param['name'])
+
+
class ArchiveTestCase(test.TestCase):
def setUp(self):