diff options
-rw-r--r-- | nova/tests/db/test_db_api.py | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/nova/tests/db/test_db_api.py b/nova/tests/db/test_db_api.py index 0a5b75bf4..9f05bc55b 100644 --- a/nova/tests/db/test_db_api.py +++ b/nova/tests/db/test_db_api.py @@ -1425,6 +1425,119 @@ class ReservationTestCase(test.TestCase, ModelsObjectComparatorMixin): self.ctxt, 'project1')) +class SecurityGroupRuleTestCase(test.TestCase, ModelsObjectComparatorMixin): + def setUp(self): + super(SecurityGroupRuleTestCase, self).setUp() + self.ctxt = context.get_admin_context() + + def _get_base_values(self): + return { + 'name': 'fake_sec_group', + 'description': 'fake_sec_group_descr', + 'user_id': 'fake', + 'project_id': 'fake', + 'instances': [] + } + + def _get_base_rule_values(self): + return { + 'protocol': "tcp", + 'from_port': 80, + 'to_port': 8080, + 'cidr': None, + 'deleted': 0, + 'deleted_at': None, + 'grantee_group': None, + 'updated_at': None + } + + def _create_security_group(self, values): + v = self._get_base_values() + v.update(values) + return db.security_group_create(self.ctxt, v) + + def _create_security_group_rule(self, values): + v = self._get_base_rule_values() + v.update(values) + return db.security_group_rule_create(self.ctxt, v) + + def test_security_group_rule_create(self): + security_group_rule = self._create_security_group_rule({}) + self.assertIsNotNone(security_group_rule['id']) + for key, value in self._get_base_rule_values().items(): + self.assertEqual(value, security_group_rule[key]) + + def test_security_group_rule_get_by_security_group(self): + security_group = self._create_security_group({}) + security_group_rule = self._create_security_group_rule( + {'parent_group': security_group}) + security_group_rule1 = self._create_security_group_rule( + {'parent_group': security_group}) + found_rules = db.security_group_rule_get_by_security_group(self.ctxt, + security_group['id']) + self.assertEqual(len(found_rules), 2) + rules_ids = [security_group_rule['id'], security_group_rule1['id']] + for rule in found_rules: + self.assertIn(rule['id'], rules_ids) + + def test_security_group_rule_get_by_security_group_grantee(self): + security_group = self._create_security_group({}) + security_group_rule = self._create_security_group_rule( + {'grantee_group': security_group}) + rules = db.security_group_rule_get_by_security_group_grantee(self.ctxt, + security_group['id']) + self.assertEqual(len(rules), 1) + self.assertEqual(rules[0]['id'], security_group_rule['id']) + + def test_security_group_rule_destroy(self): + security_group1 = self._create_security_group({}) + security_group2 = self._create_security_group({}) + security_group_rule1 = self._create_security_group_rule({}) + security_group_rule2 = self._create_security_group_rule({}) + db.security_group_rule_destroy(self.ctxt, security_group_rule1['id']) + self.assertRaises(exception.SecurityGroupNotFound, + db.security_group_rule_get, + self.ctxt, security_group_rule1['id']) + self._assertEqualObjects(db.security_group_rule_get(self.ctxt, + security_group_rule2['id']), + security_group_rule2, ['grantee_group']) + + def test_security_group_rule_destroy_not_found_exception(self): + self.assertRaises(exception.SecurityGroupNotFound, + db.security_group_rule_destroy, self.ctxt, 100500) + + def test_security_group_rule_get(self): + security_group_rule1 = ( + self._create_security_group_rule({})) + security_group_rule2 = self._create_security_group_rule({}) + real_security_group_rule = db.security_group_rule_get(self.ctxt, + security_group_rule1['id']) + self._assertEqualObjects(security_group_rule1, + real_security_group_rule, ['grantee_group']) + + def test_security_group_rule_get_not_found_exception(self): + self.assertRaises(exception.SecurityGroupNotFound, + db.security_group_rule_get, self.ctxt, 100500) + + def test_security_group_rule_count_by_group(self): + sg1 = self._create_security_group({}) + sg2 = self._create_security_group({}) + rules_by_group = {sg1: [], sg2: []} + for group in rules_by_group: + rules = rules_by_group[group] + for i in range(0, 10): + rules.append( + self._create_security_group_rule({'parent_group_id': + group['id']})) + db.security_group_rule_destroy(self.ctxt, + rules_by_group[sg1][0]['id']) + counted_groups = [db.security_group_rule_count_by_group(self.ctxt, + group['id']) + for group in [sg1, sg2]] + expected = [9, 10] + self.assertEqual(counted_groups, expected) + + class SecurityGroupTestCase(test.TestCase, ModelsObjectComparatorMixin): def setUp(self): super(SecurityGroupTestCase, self).setUp() |