diff options
-rw-r--r-- | nova/network/security_group/quantum_driver.py | 9 | ||||
-rw-r--r-- | nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py | 78 |
2 files changed, 73 insertions, 14 deletions
diff --git a/nova/network/security_group/quantum_driver.py b/nova/network/security_group/quantum_driver.py index cb5f6551f..2050fa116 100644 --- a/nova/network/security_group/quantum_driver.py +++ b/nova/network/security_group/quantum_driver.py @@ -336,12 +336,11 @@ class SecurityGroupAPI(security_group_base.SecurityGroupBase): return ret def _has_security_group_requirements(self, port): - port_security_enabled = port.get('port_security_enabled') + port_security_enabled = port.get('port_security_enabled', True) has_ip = port.get('fixed_ips') - if port_security_enabled and has_ip: - return True - else: - return False + if has_ip: + return port_security_enabled + return False @wrap_check_security_groups_policy def add_to_instance(self, context, instance, security_group_name): diff --git a/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py b/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py index 7201ad954..01ee72a64 100644 --- a/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py +++ b/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py @@ -203,6 +203,37 @@ class TestQuantumSecurityGroups( req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') self.manager._addSecurityGroup(req, '1', body) + def test_associate_port_security_enabled_true(self): + sg = self._create_sg_template().get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], security_groups=[sg['id']], + port_security_enabled=True, + device_id=test_security_groups.FAKE_UUID1) + + self.stubs.Set(nova.db, 'instance_get', + test_security_groups.return_server) + body = dict(addSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.manager._addSecurityGroup(req, '1', body) + + def test_associate_port_security_enabled_false(self): + self._create_sg_template().get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], port_security_enabled=False, + device_id=test_security_groups.FAKE_UUID1) + + self.stubs.Set(nova.db, 'instance_get', + test_security_groups.return_server) + body = dict(addSecurityGroup=dict(name="test")) + + req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action') + self.assertRaises(webob.exc.HTTPBadRequest, + self.manager._addSecurityGroup, + req, '1', body) + def test_disassociate_by_non_existing_security_group_name(self): self.stubs.Set(nova.db, 'instance_get', test_security_groups.return_server) @@ -281,6 +312,28 @@ class TestQuantumSecurityGroups( context.get_admin_context(), test_security_groups.FAKE_UUID1) self.assertEquals(sgs, expected) + def test_create_port_with_sg_and_port_security_enabled_true(self): + sg1 = self._create_sg_template(name='test1').get('security_group') + net = self._create_network() + self._create_port( + network_id=net['network']['id'], security_groups=[sg1['id']], + port_security_enabled=True, + device_id=test_security_groups.FAKE_UUID1) + security_group_api = self.controller.security_group_api + sgs = security_group_api.get_instance_security_groups( + context.get_admin_context(), test_security_groups.FAKE_UUID1) + self.assertEquals(sgs, [{'name': 'test1'}]) + + def test_create_port_with_sg_and_port_security_enabled_false(self): + sg1 = self._create_sg_template(name='test1').get('security_group') + net = self._create_network() + self.assertRaises(exception.SecurityGroupCannotBeApplied, + self._create_port, + network_id=net['network']['id'], + security_groups=[sg1['id']], + port_security_enabled=False, + device_id=test_security_groups.FAKE_UUID1) + class TestQuantumSecurityGroupRulesTestCase(TestQuantumSecurityGroupsTestCase): def setUp(self): @@ -570,8 +623,9 @@ class MockClient(object): ret = {'status': 'ACTIVE', 'subnets': [], 'name': n.get('name'), 'admin_state_up': n.get('admin_state_up', True), 'tenant_id': 'fake_tenant', - 'port_security_enabled': n.get('port_security_enabled', True), 'id': str(uuid.uuid4())} + if 'port_security_enabled' in n: + ret['port_security_enabled'] = n['port_security_enabled'] self._fake_networks[ret['id']] = ret return {'network': ret} @@ -594,21 +648,27 @@ class MockClient(object): p = body.get('port') ret = {'status': 'ACTIVE', 'id': str(uuid.uuid4()), 'mac_address': p.get('mac_address', 'fa:16:3e:b8:f5:fb'), - 'port_security_enabled': p.get('port_security_enabled'), 'device_id': p.get('device_id', str(uuid.uuid4())), - 'security_groups': p.get('security_groups', [])} - - fields = ['network_id', 'security_groups', 'admin_state_up'] - for field in fields: - ret[field] = p.get(field) + 'admin_state_up': p.get('admin_state_up', True), + 'security_groups': p.get('security_groups', []), + 'network_id': p.get('network_id')} network = self._fake_networks[p['network_id']] - if not ret['port_security_enabled']: + if 'port_security_enabled' in p: + ret['port_security_enabled'] = p['port_security_enabled'] + elif 'port_security_enabled' in network: ret['port_security_enabled'] = network['port_security_enabled'] + + port_security = ret.get('port_security_enabled', True) + # port_security must be True if security groups are present + if not port_security and ret['security_groups']: + raise exception.SecurityGroupCannotBeApplied() + if network['subnets']: ret['fixed_ips'] = [{'subnet_id': network['subnets'][0], 'ip_address': '10.0.0.1'}] - if not ret['security_groups']: + if not ret['security_groups'] and (port_security is None or + port_security is True): for security_group in self._fake_security_groups.values(): if security_group['name'] == 'default': ret['security_groups'] = [security_group['id']] |