summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nova/network/security_group/quantum_driver.py9
-rw-r--r--nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py78
2 files changed, 73 insertions, 14 deletions
diff --git a/nova/network/security_group/quantum_driver.py b/nova/network/security_group/quantum_driver.py
index cb5f6551f..2050fa116 100644
--- a/nova/network/security_group/quantum_driver.py
+++ b/nova/network/security_group/quantum_driver.py
@@ -336,12 +336,11 @@ class SecurityGroupAPI(security_group_base.SecurityGroupBase):
return ret
def _has_security_group_requirements(self, port):
- port_security_enabled = port.get('port_security_enabled')
+ port_security_enabled = port.get('port_security_enabled', True)
has_ip = port.get('fixed_ips')
- if port_security_enabled and has_ip:
- return True
- else:
- return False
+ if has_ip:
+ return port_security_enabled
+ return False
@wrap_check_security_groups_policy
def add_to_instance(self, context, instance, security_group_name):
diff --git a/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py b/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py
index 7201ad954..01ee72a64 100644
--- a/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py
+++ b/nova/tests/api/openstack/compute/contrib/test_quantum_security_groups.py
@@ -203,6 +203,37 @@ class TestQuantumSecurityGroups(
req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
self.manager._addSecurityGroup(req, '1', body)
+ def test_associate_port_security_enabled_true(self):
+ sg = self._create_sg_template().get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg['id']],
+ port_security_enabled=True,
+ device_id=test_security_groups.FAKE_UUID1)
+
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ body = dict(addSecurityGroup=dict(name="test"))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.manager._addSecurityGroup(req, '1', body)
+
+ def test_associate_port_security_enabled_false(self):
+ self._create_sg_template().get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], port_security_enabled=False,
+ device_id=test_security_groups.FAKE_UUID1)
+
+ self.stubs.Set(nova.db, 'instance_get',
+ test_security_groups.return_server)
+ body = dict(addSecurityGroup=dict(name="test"))
+
+ req = fakes.HTTPRequest.blank('/v2/fake/servers/1/action')
+ self.assertRaises(webob.exc.HTTPBadRequest,
+ self.manager._addSecurityGroup,
+ req, '1', body)
+
def test_disassociate_by_non_existing_security_group_name(self):
self.stubs.Set(nova.db, 'instance_get',
test_security_groups.return_server)
@@ -281,6 +312,28 @@ class TestQuantumSecurityGroups(
context.get_admin_context(), test_security_groups.FAKE_UUID1)
self.assertEquals(sgs, expected)
+ def test_create_port_with_sg_and_port_security_enabled_true(self):
+ sg1 = self._create_sg_template(name='test1').get('security_group')
+ net = self._create_network()
+ self._create_port(
+ network_id=net['network']['id'], security_groups=[sg1['id']],
+ port_security_enabled=True,
+ device_id=test_security_groups.FAKE_UUID1)
+ security_group_api = self.controller.security_group_api
+ sgs = security_group_api.get_instance_security_groups(
+ context.get_admin_context(), test_security_groups.FAKE_UUID1)
+ self.assertEquals(sgs, [{'name': 'test1'}])
+
+ def test_create_port_with_sg_and_port_security_enabled_false(self):
+ sg1 = self._create_sg_template(name='test1').get('security_group')
+ net = self._create_network()
+ self.assertRaises(exception.SecurityGroupCannotBeApplied,
+ self._create_port,
+ network_id=net['network']['id'],
+ security_groups=[sg1['id']],
+ port_security_enabled=False,
+ device_id=test_security_groups.FAKE_UUID1)
+
class TestQuantumSecurityGroupRulesTestCase(TestQuantumSecurityGroupsTestCase):
def setUp(self):
@@ -570,8 +623,9 @@ class MockClient(object):
ret = {'status': 'ACTIVE', 'subnets': [], 'name': n.get('name'),
'admin_state_up': n.get('admin_state_up', True),
'tenant_id': 'fake_tenant',
- 'port_security_enabled': n.get('port_security_enabled', True),
'id': str(uuid.uuid4())}
+ if 'port_security_enabled' in n:
+ ret['port_security_enabled'] = n['port_security_enabled']
self._fake_networks[ret['id']] = ret
return {'network': ret}
@@ -594,21 +648,27 @@ class MockClient(object):
p = body.get('port')
ret = {'status': 'ACTIVE', 'id': str(uuid.uuid4()),
'mac_address': p.get('mac_address', 'fa:16:3e:b8:f5:fb'),
- 'port_security_enabled': p.get('port_security_enabled'),
'device_id': p.get('device_id', str(uuid.uuid4())),
- 'security_groups': p.get('security_groups', [])}
-
- fields = ['network_id', 'security_groups', 'admin_state_up']
- for field in fields:
- ret[field] = p.get(field)
+ 'admin_state_up': p.get('admin_state_up', True),
+ 'security_groups': p.get('security_groups', []),
+ 'network_id': p.get('network_id')}
network = self._fake_networks[p['network_id']]
- if not ret['port_security_enabled']:
+ if 'port_security_enabled' in p:
+ ret['port_security_enabled'] = p['port_security_enabled']
+ elif 'port_security_enabled' in network:
ret['port_security_enabled'] = network['port_security_enabled']
+
+ port_security = ret.get('port_security_enabled', True)
+ # port_security must be True if security groups are present
+ if not port_security and ret['security_groups']:
+ raise exception.SecurityGroupCannotBeApplied()
+
if network['subnets']:
ret['fixed_ips'] = [{'subnet_id': network['subnets'][0],
'ip_address': '10.0.0.1'}]
- if not ret['security_groups']:
+ if not ret['security_groups'] and (port_security is None or
+ port_security is True):
for security_group in self._fake_security_groups.values():
if security_group['name'] == 'default':
ret['security_groups'] = [security_group['id']]