summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorAaron Rosen <arosen@nicira.com>2013-02-28 14:47:11 -0800
committerAaron Rosen <arosen@nicira.com>2013-03-01 10:34:58 -0800
commit5a2b9d7e95cde85d58a35a73030fc8eea88f3386 (patch)
tree6c6d2e90047708a3a43e9087e4e8fdefd8c85333 /nova/tests
parent35b2b2c5c456c3138e0e31085c71d852320b9ce4 (diff)
Implement rules_exist method for quantum security group driver
Originally I thought the quantum security group driver should not enforce rules exist on the nova-api side and instead it should just forward the request to quantum which would return the error. That said there is no extra cost to doing this on the nova-api side as nova-api already queries for the group before adding the rule. In addition, rules_exists() is used in revoke_security_group_ingress() for the ec2 compat APIs so this needs to be implemented. This patch moves create_security_group_rule() and rule_exists() from nova/compute/api.py to nova/network/security_group/security_group_base.py as the same code can be leveraged in both places. Fixes bug 1136345 Change-Id: I444ffc2b53b30ed496b6e3250433d14f316e594d
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py31
1 files changed, 25 insertions, 6 deletions
diff --git a/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py b/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py
index 5f9c5cefa..70c430860 100644
--- a/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py
+++ b/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py
@@ -237,6 +237,7 @@ class TestQuantumSecurityGroupRulesTestCase(TestQuantumSecurityGroupsTestCase):
id2 = '22222222-2222-2222-2222-222222222222'
sg_template2 = test_security_groups.security_group_template(
security_group_rules=[], id=id2)
+ self.controller_sg = security_groups.SecurityGroupController()
quantum = get_client()
quantum._fake_security_groups[id1] = sg_template1
quantum._fake_security_groups[id2] = sg_template2
@@ -252,12 +253,26 @@ class TestQuantumSecurityGroupRules(
TestQuantumSecurityGroupRulesTestCase):
def test_create_add_existing_rules_by_cidr(self):
- # Enforced by quantum
- pass
+ sg = test_security_groups.security_group_template()
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ self.controller_sg.create(req, {'security_group': sg})
+ rule = test_security_groups.security_group_rule_template(
+ cidr='15.0.0.0/8', parent_group_id=self.sg2['id'])
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
+ self.controller.create(req, {'security_group_rule': rule})
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_create_add_existing_rules_by_group_id(self):
- # Enforced by quantum
- pass
+ sg = test_security_groups.security_group_template()
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-groups')
+ self.controller_sg.create(req, {'security_group': sg})
+ rule = test_security_groups.security_group_rule_template(
+ group=self.sg1['id'], parent_group_id=self.sg2['id'])
+ req = fakes.HTTPRequest.blank('/v2/fake/os-security-group-rules')
+ self.controller.create(req, {'security_group_rule': rule})
+ self.assertRaises(webob.exc.HTTPBadRequest, self.controller.create,
+ req, {'security_group_rule': rule})
def test_delete(self):
rule = test_security_groups.security_group_rule_template(
@@ -528,11 +543,15 @@ class MockClient(object):
def show_security_group(self, security_group, **_params):
try:
- return {'security_group':
- self._fake_security_groups[security_group]}
+ sg = self._fake_security_groups[security_group]
except KeyError:
msg = 'Security Group %s not found' % security_group
raise q_exc.QuantumClientException(message=msg, status_code=404)
+ for security_group_rule in self._fake_security_group_rules.values():
+ if security_group_rule['security_group_id'] == sg['id']:
+ sg['security_group_rules'].append(security_group_rule)
+
+ return {'security_group': sg}
def show_security_group_rule(self, security_group_rule, **_params):
try: