diff options
| author | Deepak Garg <deepak.garg@citrix.com> | 2012-01-13 16:03:45 +0530 |
|---|---|---|
| committer | Vishvananda Ishaya <vishvananda@gmail.com> | 2012-01-24 22:31:35 -0800 |
| commit | fe1c97ff4c36d1cc2642d9a485f82874e4b3bda2 (patch) | |
| tree | 522e18112c95cb2006dbb6798af98ee9b11fa08c /nova/tests | |
| parent | 2594e480b2d90490a92865afbeecda35b29320d6 (diff) | |
Blueprint xenapi-provider-firewall and Bug #915403.
1. Provides dom0 IPtables driver to implement the Provider firewall rules.
2. Existing libvirt code has been refactored to reduce the amount of duplicated code to a minimum
3. The three provider apis in ec2/admin.py file are now fixed the following way:
a. remove_external_address_block returned 'OK' on removing blocks which didn't exist. This is now fixed.
b. block_external_addresses raised exception earlier on duplicate network blocks. Now the exception is logged and failed status message is returned.
c. all the three provider apis now logs for invalid and improper inputs and return uniform (a dictionary ) and proper status messages for all cases.
4. appropriate unit tests added to cover the same
Change-Id: I27d83186f850423a6268947aed0c9a349d8f8d65
Diffstat (limited to 'nova/tests')
| -rw-r--r-- | nova/tests/api/ec2/test_admin.py | 59 | ||||
| -rw-r--r-- | nova/tests/test_xenapi.py | 60 | ||||
| -rw-r--r-- | nova/tests/xenapi/stubs.py | 1 |
3 files changed, 109 insertions, 11 deletions
diff --git a/nova/tests/api/ec2/test_admin.py b/nova/tests/api/ec2/test_admin.py index 2b4e463ee..62eb596f3 100644 --- a/nova/tests/api/ec2/test_admin.py +++ b/nova/tests/api/ec2/test_admin.py @@ -365,24 +365,43 @@ class AdminControllerTestCase(test.TestCase): hosts = self._ac.describe_hosts(self._c)['hosts'] self.assertEqual('volume1', hosts[0]['hostname']) - def test_block_external_addresses(self): + def test_block_external_addresses_validate_output_for_valid_input(self): result = self._ac.block_external_addresses(self._c, '192.168.100.1/24') self.assertEqual('OK', result['status']) self.assertEqual('Added 3 rules', result['message']) - def test_block_external_addresses_already_existent_rule(self): - self._ac.block_external_addresses(self._c, '192.168.100.1/24') - self.assertRaises(exception.ApiError, - self._ac.block_external_addresses, - self._c, '192.168.100.1/24') + def test_block_external_addresses_validate_output_for_invalid_input(self): + result = self._ac.block_external_addresses(self._c, '12.10.10.256/24') + self.assertEqual('Failed', result['status']) + value = '0 rules added' in result['message'] + self.assertEqual(value, True) - def test_describe_external_address_blocks(self): - self._ac.block_external_addresses(self._c, '192.168.100.1/24') + def test_block_external_addresses_already_existent_rule(self): + self._ac.block_external_addresses(self._c, '192.168.100.0/24') + result = self._ac.block_external_addresses(self._c, '192.168.100.0/24') + self.assertEqual('Failed', result['status']) + value = '0 rules added' in result['message'] + self.assertEqual(value, True) + + def test_describe_external_address_blocks_normalized_output(self): + self._ac.block_external_addresses(self._c, '192.168.100.11/24') self.assertEqual( - {'externalIpBlockInfo': [{'cidr': u'192.168.100.1/24'}]}, + {'externalIpBlockInfo': [{'cidr': u'192.168.100.0/24'}]}, self._ac.describe_external_address_blocks(self._c)) - def test_remove_external_address_block(self): + def test_describe_external_address_blocks_many_inputs(self): + self._ac.block_external_addresses(self._c, '192.168.100.11/24') + self._ac.block_external_addresses(self._c, '12.12.12.10/24') + self._ac.block_external_addresses(self._c, '18.18.18.0/24') + output1 = {'cidr': u'192.168.100.0/24'} + output2 = {'cidr': u'12.12.12.0/24'} + output3 = {'cidr': u'18.18.18.0/24'} + result = self._ac.describe_external_address_blocks(self._c) + result = sorted(result['externalIpBlockInfo']) + output = sorted([output1, output2, output3]) + self.assertEqual(result, output) + + def test_remove_external_address_block_existent_rule(self): self._ac.block_external_addresses(self._c, '192.168.100.1/24') result = self._ac.remove_external_address_block(self._c, @@ -393,6 +412,26 @@ class AdminControllerTestCase(test.TestCase): result = self._ac.describe_external_address_blocks(self._c) self.assertEqual([], result['externalIpBlockInfo']) + def test_remove_external_address_block_non_existent_rule(self): + result = self._ac.remove_external_address_block(self._c, + '192.168.100.1/24') + self.assertEqual('Failed', result['status']) + value = '0 rules deleted' in result['message'] + self.assertEqual(value, True) + + result = self._ac.describe_external_address_blocks(self._c) + self.assertEqual([], result['externalIpBlockInfo']) + + def test_remove_external_address_block_invalid_input(self): + result = self._ac.remove_external_address_block(self._c, + '192.168.100/24') + self.assertEqual('Failed', result['status']) + value = '0 rules deleted' in result['message'] + self.assertEqual(value, True) + + result = self._ac.describe_external_address_blocks(self._c) + self.assertEqual([], result['externalIpBlockInfo']) + def test_start_vpn(self): def fake_launch_vpn_instance(self, *args): diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index c3730b3ca..482cd450c 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -1334,6 +1334,9 @@ class XenAPIBWUsageTestCase(test.TestCase): self.assertEqual(result, []) +# TODO(salvatore-orlando): this class and +# nova.tests.test_libvirt.IPTablesFirewallDriverTestCase share a lot of code. +# Consider abstracting common code in a base class for firewall driver testing. class XenAPIDom0IptablesFirewallTestCase(test.TestCase): _in_nat_rules = [ @@ -1581,3 +1584,60 @@ class XenAPIDom0IptablesFirewallTestCase(test.TestCase): self.assertTrue(len(filter(regex.match, self._out_rules)) > 0, "Rules were not updated properly." "The rule for UDP acceptance is missing") + + def test_provider_firewall_rules(self): + # setup basic instance data + instance_ref = self._create_instance_ref() + # FRAGILE: as in libvirt tests + # peeks at how the firewall names chains + chain_name = 'inst-%s' % instance_ref['id'] + + network_info = fake_network.fake_get_instance_nw_info(self.stubs, 1, 1) + self.fw.prepare_instance_filter(instance_ref, network_info) + self.assertTrue('provider' in self.fw.iptables.ipv4['filter'].chains) + rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules + if rule.chain == 'provider'] + self.assertEqual(0, len(rules)) + + admin_ctxt = context.get_admin_context() + # add a rule and send the update message, check for 1 rule + provider_fw0 = db.provider_fw_rule_create(admin_ctxt, + {'protocol': 'tcp', + 'cidr': '10.99.99.99/32', + 'from_port': 1, + 'to_port': 65535}) + self.fw.refresh_provider_fw_rules() + rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules + if rule.chain == 'provider'] + self.assertEqual(1, len(rules)) + + # Add another, refresh, and make sure number of rules goes to two + provider_fw1 = db.provider_fw_rule_create(admin_ctxt, + {'protocol': 'udp', + 'cidr': '10.99.99.99/32', + 'from_port': 1, + 'to_port': 65535}) + self.fw.refresh_provider_fw_rules() + rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules + if rule.chain == 'provider'] + self.assertEqual(2, len(rules)) + + # create the instance filter and make sure it has a jump rule + self.fw.prepare_instance_filter(instance_ref, network_info) + self.fw.apply_instance_filter(instance_ref, network_info) + inst_rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules + if rule.chain == chain_name] + jump_rules = [rule for rule in inst_rules if '-j' in rule.rule] + provjump_rules = [] + # IptablesTable doesn't make rules unique internally + for rule in jump_rules: + if 'provider' in rule.rule and rule not in provjump_rules: + provjump_rules.append(rule) + self.assertEqual(1, len(provjump_rules)) + + # remove a rule from the db, cast to compute to refresh rule + db.provider_fw_rule_destroy(admin_ctxt, provider_fw1['id']) + self.fw.refresh_provider_fw_rules() + rules = [rule for rule in self.fw.iptables.ipv4['filter'].rules + if rule.chain == 'provider'] + self.assertEqual(1, len(rules)) diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 0c707e14e..2dfece00f 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -32,7 +32,6 @@ def stubout_firewall_driver(stubs, conn): return vmops = conn._vmops - stubs.Set(vmops.firewall_driver, 'setup_basic_filtering', fake_none) stubs.Set(vmops.firewall_driver, 'prepare_instance_filter', fake_none) stubs.Set(vmops.firewall_driver, 'instance_filter_exists', fake_none) |
