summaryrefslogtreecommitdiffstats
path: root/nova/tests
diff options
context:
space:
mode:
authorSalvatore Orlando <salvatore.orlando@eu.citrix.com>2011-11-07 12:01:11 +0000
committerSalvatore Orlando <salvatore.orlando@eu.citrix.com>2012-01-10 17:16:31 +0000
commiteac7888e722759b7c9a0d7841dfe8b26dfd77897 (patch)
tree0bc9df758e65dd2d6d0710c9465237af1407fe49 /nova/tests
parent799801f856a0f3e7788e89ecdca02828fd64e6ad (diff)
Blueprint xenapi-security-groups
Provides two drivers for implementing security groups in xenapi: 1) domU driver that enforces security groups on the Openstack virtual appliance (use advised with FlatDHCP in HA mode) 2) dom0 driver that enforces security groups where VIFs are attached Both drivers translate security groups into iptables rules. Existing libvirt code has been refactored to reduce the amount of duplicated code to a minimum Now Addressing reviewers's comments on style. Fixing issue spotted with snapshots Change-Id: Ifa16a8f2508a709be03241bac0f942fe1a51d1e8
Diffstat (limited to 'nova/tests')
-rw-r--r--nova/tests/test_xenapi.py292
-rw-r--r--nova/tests/xenapi/stubs.py89
2 files changed, 368 insertions, 13 deletions
diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py
index 2b9f977cc..12e15c991 100644
--- a/nova/tests/test_xenapi.py
+++ b/nova/tests/test_xenapi.py
@@ -42,6 +42,7 @@ from nova.virt.xenapi import vm_utils
from nova.tests.db import fakes as db_fakes
from nova.tests.xenapi import stubs
from nova.tests.glance import stubs as glance_stubs
+from nova.tests import fake_network
from nova.tests import fake_utils
LOG = logging.getLogger('nova.tests.test_xenapi')
@@ -94,7 +95,9 @@ class XenAPIVolumeTestCase(test.TestCase):
self.context = context.RequestContext(self.user_id, self.project_id)
self.flags(target_host='127.0.0.1',
xenapi_connection_url='test_url',
- xenapi_connection_password='test_pass')
+ xenapi_connection_password='test_pass',
+ firewall_driver='nova.virt.xenapi.firewall.'
+ 'Dom0IptablesFirewallDriver')
db_fakes.stub_out_db_instance_api(self.stubs)
stubs.stub_out_get_target(self.stubs)
xenapi_fake.reset()
@@ -205,7 +208,9 @@ class XenAPIVMTestCase(test.TestCase):
self.stubs = stubout.StubOutForTesting()
self.flags(xenapi_connection_url='test_url',
xenapi_connection_password='test_pass',
- instance_name_template='%d')
+ instance_name_template='%d',
+ firewall_driver='nova.virt.xenapi.firewall.'
+ 'Dom0IptablesFirewallDriver')
xenapi_fake.reset()
xenapi_fake.create_local_srs()
xenapi_fake.create_local_pifs()
@@ -247,6 +252,9 @@ class XenAPIVMTestCase(test.TestCase):
self.stubs.Set(xenapi_fake, 'create_vbd', create_bad_vbd)
stubs.stubout_instance_snapshot(self.stubs)
+ # Stubbing out firewall driver as previous stub sets alters
+ # xml rpc result parsing
+ stubs.stubout_firewall_driver(self.stubs, self.conn)
instance = self._create_instance()
name = "MySnapshot"
@@ -255,6 +263,10 @@ class XenAPIVMTestCase(test.TestCase):
def test_instance_snapshot(self):
stubs.stubout_instance_snapshot(self.stubs)
+ stubs.stubout_is_snapshot(self.stubs)
+ # Stubbing out firewall driver as previous stub sets alters
+ # xml rpc result parsing
+ stubs.stubout_firewall_driver(self.stubs, self.conn)
instance = self._create_instance()
name = "MySnapshot"
@@ -340,6 +352,7 @@ class XenAPIVMTestCase(test.TestCase):
'ips': [{'enabled': '1',
'ip': '192.168.0.100',
'netmask': '255.255.255.0'}],
+ 'dhcp_server': '192.168.0.1',
'label': 'fake',
'mac': 'DE:AD:BE:EF:00:00',
'rxtx_cap': 3})
@@ -409,7 +422,11 @@ class XenAPIVMTestCase(test.TestCase):
instance = db.instance_create(self.context, instance_values)
else:
instance = db.instance_get(self.context, instance_id)
- network_info = [({'bridge': 'fa0', 'id': 0, 'injected': True},
+ network_info = [({'bridge': 'fa0', 'id': 0,
+ 'injected': True,
+ 'cidr': '192.168.0.0/24',
+ 'cidr_v6': 'dead:beef::1/120',
+ },
{'broadcast': '192.168.0.255',
'dns': ['192.168.0.1'],
'gateway': '192.168.0.1',
@@ -420,6 +437,7 @@ class XenAPIVMTestCase(test.TestCase):
'ips': [{'enabled': '1',
'ip': '192.168.0.100',
'netmask': '255.255.255.0'}],
+ 'dhcp_server': '192.168.0.1',
'label': 'fake',
'mac': 'DE:AD:BE:EF:00:00',
'rxtx_cap': 3})]
@@ -488,10 +506,11 @@ class XenAPIVMTestCase(test.TestCase):
# Change the default host_call_plugin to one that'll return
# a swap disk
orig_func = stubs.FakeSessionForVMTests.host_call_plugin
-
stubs.FakeSessionForVMTests.host_call_plugin = \
stubs.FakeSessionForVMTests.host_call_plugin_swap
-
+ # Stubbing out firewall driver as previous stub sets a particular
+ # stub for async plugin calls
+ stubs.stubout_firewall_driver(self.stubs, self.conn)
try:
# We'll steal the above glance linux test
self.test_spawn_vhd_glance_linux()
@@ -686,7 +705,11 @@ class XenAPIVMTestCase(test.TestCase):
'os_type': 'linux',
'architecture': 'x86-64'}
instance = db.instance_create(self.context, instance_values)
- network_info = [({'bridge': 'fa0', 'id': 0, 'injected': False},
+ network_info = [({'bridge': 'fa0', 'id': 0,
+ 'injected': False,
+ 'cidr': '192.168.0.0/24',
+ 'cidr_v6': 'dead:beef::1/120',
+ },
{'broadcast': '192.168.0.255',
'dns': ['192.168.0.1'],
'gateway': '192.168.0.1',
@@ -697,6 +720,7 @@ class XenAPIVMTestCase(test.TestCase):
'ips': [{'enabled': '1',
'ip': '192.168.0.100',
'netmask': '255.255.255.0'}],
+ 'dhcp_server': '192.168.0.1',
'label': 'fake',
'mac': 'DE:AD:BE:EF:00:00',
'rxtx_cap': 3})]
@@ -757,7 +781,9 @@ class XenAPIMigrateInstance(test.TestCase):
self.stubs = stubout.StubOutForTesting()
self.flags(target_host='127.0.0.1',
xenapi_connection_url='test_url',
- xenapi_connection_password='test_pass')
+ xenapi_connection_password='test_pass',
+ firewall_driver='nova.virt.xenapi.firewall.'
+ 'Dom0IptablesFirewallDriver')
db_fakes.stub_out_db_instance_api(self.stubs)
stubs.stub_out_get_target(self.stubs)
xenapi_fake.reset()
@@ -1139,7 +1165,9 @@ class XenAPIAutoDiskConfigTestCase(test.TestCase):
self.stubs = stubout.StubOutForTesting()
self.flags(target_host='127.0.0.1',
xenapi_connection_url='test_url',
- xenapi_connection_password='test_pass')
+ xenapi_connection_password='test_pass',
+ firewall_driver='nova.virt.xenapi.firewall.'
+ 'Dom0IptablesFirewallDriver')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
xenapi_fake.reset()
self.conn = xenapi_conn.get_connection(False)
@@ -1230,7 +1258,9 @@ class XenAPIBWUsageTestCase(test.TestCase):
XenAPIBWUsageTestCase._fake_compile_metrics)
self.flags(target_host='127.0.0.1',
xenapi_connection_url='test_url',
- xenapi_connection_password='test_pass')
+ xenapi_connection_password='test_pass',
+ firewall_driver='nova.virt.xenapi.firewall.'
+ 'Dom0IptablesFirewallDriver')
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
xenapi_fake.reset()
self.conn = xenapi_conn.get_connection(False)
@@ -1245,3 +1275,247 @@ class XenAPIBWUsageTestCase(test.TestCase):
"""
result = self.conn.get_all_bw_usage(datetime.datetime.utcnow())
self.assertEqual(result, [])
+
+
+class XenAPIDom0IptablesFirewallTestCase(test.TestCase):
+
+ _in_nat_rules = [
+ '# Generated by iptables-save v1.4.10 on Sat Feb 19 00:03:19 2011',
+ '*nat',
+ ':PREROUTING ACCEPT [1170:189210]',
+ ':INPUT ACCEPT [844:71028]',
+ ':OUTPUT ACCEPT [5149:405186]',
+ ':POSTROUTING ACCEPT [5063:386098]',
+ ]
+
+ _in_filter_rules = [
+ '# Generated by iptables-save v1.4.4 on Mon Dec 6 11:54:13 2010',
+ '*filter',
+ ':INPUT ACCEPT [969615:281627771]',
+ ':FORWARD ACCEPT [0:0]',
+ ':OUTPUT ACCEPT [915599:63811649]',
+ ':nova-block-ipv4 - [0:0]',
+ '-A INPUT -i virbr0 -p tcp -m tcp --dport 67 -j ACCEPT ',
+ '-A FORWARD -d 192.168.122.0/24 -o virbr0 -m state --state RELATED'
+ ',ESTABLISHED -j ACCEPT ',
+ '-A FORWARD -s 192.168.122.0/24 -i virbr0 -j ACCEPT ',
+ '-A FORWARD -i virbr0 -o virbr0 -j ACCEPT ',
+ '-A FORWARD -o virbr0 -j REJECT --reject-with icmp-port-unreachable ',
+ '-A FORWARD -i virbr0 -j REJECT --reject-with icmp-port-unreachable ',
+ 'COMMIT',
+ '# Completed on Mon Dec 6 11:54:13 2010',
+ ]
+
+ _in6_filter_rules = [
+ '# Generated by ip6tables-save v1.4.4 on Tue Jan 18 23:47:56 2011',
+ '*filter',
+ ':INPUT ACCEPT [349155:75810423]',
+ ':FORWARD ACCEPT [0:0]',
+ ':OUTPUT ACCEPT [349256:75777230]',
+ 'COMMIT',
+ '# Completed on Tue Jan 18 23:47:56 2011',
+ ]
+
+ def setUp(self):
+ super(XenAPIDom0IptablesFirewallTestCase, self).setUp()
+ self.flags(xenapi_connection_url='test_url',
+ xenapi_connection_password='test_pass',
+ instance_name_template='%d',
+ firewall_driver='nova.virt.xenapi.firewall.'
+ 'Dom0IptablesFirewallDriver')
+ self.stubs = stubout.StubOutForTesting()
+ xenapi_fake.reset()
+ xenapi_fake.create_local_srs()
+ xenapi_fake.create_local_pifs()
+ self.user_id = 'mappin'
+ self.project_id = 'fake'
+ stubs.stubout_session(self.stubs, stubs.FakeSessionForFirewallTests,
+ test_case=self)
+ self.context = context.RequestContext(self.user_id, self.project_id)
+ self.network = utils.import_object(FLAGS.network_manager)
+ self.conn = xenapi_conn.get_connection(False)
+ self.fw = self.conn._vmops.firewall_driver
+
+ def _create_instance_ref(self):
+ return db.instance_create(self.context,
+ {'user_id': self.user_id,
+ 'project_id': self.project_id,
+ 'instance_type_id': 1})
+
+ def _create_test_security_group(self):
+ admin_ctxt = context.get_admin_context()
+ secgroup = db.security_group_create(admin_ctxt,
+ {'user_id': self.user_id,
+ 'project_id': self.project_id,
+ 'name': 'testgroup',
+ 'description': 'test group'})
+ db.security_group_rule_create(admin_ctxt,
+ {'parent_group_id': secgroup['id'],
+ 'protocol': 'icmp',
+ 'from_port': -1,
+ 'to_port': -1,
+ 'cidr': '192.168.11.0/24'})
+
+ db.security_group_rule_create(admin_ctxt,
+ {'parent_group_id': secgroup['id'],
+ 'protocol': 'icmp',
+ 'from_port': 8,
+ 'to_port': -1,
+ 'cidr': '192.168.11.0/24'})
+
+ db.security_group_rule_create(admin_ctxt,
+ {'parent_group_id': secgroup['id'],
+ 'protocol': 'tcp',
+ 'from_port': 80,
+ 'to_port': 81,
+ 'cidr': '192.168.10.0/24'})
+ return secgroup
+
+ def _validate_security_group(self):
+ in_rules = filter(lambda l: not l.startswith('#'),
+ self._in_filter_rules)
+ for rule in in_rules:
+ if not 'nova' in rule:
+ self.assertTrue(rule in self._out_rules,
+ 'Rule went missing: %s' % rule)
+
+ instance_chain = None
+ for rule in self._out_rules:
+ # This is pretty crude, but it'll do for now
+ # last two octets change
+ if re.search('-d 192.168.[0-9]{1,3}.[0-9]{1,3} -j', rule):
+ instance_chain = rule.split(' ')[-1]
+ break
+ self.assertTrue(instance_chain, "The instance chain wasn't added")
+ security_group_chain = None
+ for rule in self._out_rules:
+ # This is pretty crude, but it'll do for now
+ if '-A %s -j' % instance_chain in rule:
+ security_group_chain = rule.split(' ')[-1]
+ break
+ self.assertTrue(security_group_chain,
+ "The security group chain wasn't added")
+
+ regex = re.compile('-A .* -j ACCEPT -p icmp -s 192.168.11.0/24')
+ self.assertTrue(len(filter(regex.match, self._out_rules)) > 0,
+ "ICMP acceptance rule wasn't added")
+
+ regex = re.compile('-A .* -j ACCEPT -p icmp -m icmp --icmp-type 8'
+ ' -s 192.168.11.0/24')
+ self.assertTrue(len(filter(regex.match, self._out_rules)) > 0,
+ "ICMP Echo Request acceptance rule wasn't added")
+
+ regex = re.compile('-A .* -j ACCEPT -p tcp --dport 80:81'
+ ' -s 192.168.10.0/24')
+ self.assertTrue(len(filter(regex.match, self._out_rules)) > 0,
+ "TCP port 80/81 acceptance rule wasn't added")
+
+ def test_static_filters(self):
+ instance_ref = self._create_instance_ref()
+ src_instance_ref = self._create_instance_ref()
+ admin_ctxt = context.get_admin_context()
+ secgroup = self._create_test_security_group()
+
+ src_secgroup = db.security_group_create(admin_ctxt,
+ {'user_id': self.user_id,
+ 'project_id': self.project_id,
+ 'name': 'testsourcegroup',
+ 'description': 'src group'})
+ db.security_group_rule_create(admin_ctxt,
+ {'parent_group_id': secgroup['id'],
+ 'protocol': 'tcp',
+ 'from_port': 80,
+ 'to_port': 81,
+ 'group_id': src_secgroup['id']})
+
+ db.instance_add_security_group(admin_ctxt, instance_ref['uuid'],
+ secgroup['id'])
+ db.instance_add_security_group(admin_ctxt, src_instance_ref['uuid'],
+ src_secgroup['id'])
+ instance_ref = db.instance_get(admin_ctxt, instance_ref['id'])
+ src_instance_ref = db.instance_get(admin_ctxt, src_instance_ref['id'])
+
+ def get_fixed_ips(*args, **kwargs):
+ ips = []
+ for _n, info in network_info:
+ ips.extend(info['ips'])
+ return [ip['ip'] for ip in ips]
+
+ network_info = fake_network.fake_get_instance_nw_info(self.stubs, 1)
+ self.stubs.Set(db, 'instance_get_fixed_addresses', get_fixed_ips)
+ self.fw.prepare_instance_filter(instance_ref, network_info)
+ self.fw.apply_instance_filter(instance_ref, network_info)
+
+ self._validate_security_group()
+ # Extra test for TCP acceptance rules
+ for ip in get_fixed_ips():
+ regex = re.compile('-A .* -j ACCEPT -p tcp'
+ ' --dport 80:81 -s %s' % ip)
+ self.assertTrue(len(filter(regex.match, self._out_rules)) > 0,
+ "TCP port 80/81 acceptance rule wasn't added")
+
+ db.instance_destroy(admin_ctxt, instance_ref['id'])
+
+ def test_filters_for_instance_with_ip_v6(self):
+ self.flags(use_ipv6=True)
+ network_info = fake_network.fake_get_instance_nw_info(self.stubs, 1)
+ rulesv4, rulesv6 = self.fw._filters_for_instance("fake", network_info)
+ self.assertEquals(len(rulesv4), 2)
+ self.assertEquals(len(rulesv6), 1)
+
+ def test_filters_for_instance_without_ip_v6(self):
+ self.flags(use_ipv6=False)
+ network_info = fake_network.fake_get_instance_nw_info(self.stubs, 1)
+ rulesv4, rulesv6 = self.fw._filters_for_instance("fake", network_info)
+ self.assertEquals(len(rulesv4), 2)
+ self.assertEquals(len(rulesv6), 0)
+
+ def test_multinic_iptables(self):
+ ipv4_rules_per_addr = 1
+ ipv4_addr_per_network = 2
+ ipv6_rules_per_addr = 1
+ ipv6_addr_per_network = 1
+ networks_count = 5
+ instance_ref = self._create_instance_ref()
+ network_info = fake_network.\
+ fake_get_instance_nw_info(self.stubs,
+ networks_count,
+ ipv4_addr_per_network)
+ ipv4_len = len(self.fw.iptables.ipv4['filter'].rules)
+ ipv6_len = len(self.fw.iptables.ipv6['filter'].rules)
+ inst_ipv4, inst_ipv6 = self.fw.instance_rules(instance_ref,
+ network_info)
+ self.fw.prepare_instance_filter(instance_ref, network_info)
+ ipv4 = self.fw.iptables.ipv4['filter'].rules
+ ipv6 = self.fw.iptables.ipv6['filter'].rules
+ ipv4_network_rules = len(ipv4) - len(inst_ipv4) - ipv4_len
+ ipv6_network_rules = len(ipv6) - len(inst_ipv6) - ipv6_len
+ self.assertEquals(ipv4_network_rules,
+ ipv4_rules_per_addr * ipv4_addr_per_network * networks_count)
+ self.assertEquals(ipv6_network_rules,
+ ipv6_rules_per_addr * ipv6_addr_per_network * networks_count)
+
+ def test_do_refresh_security_group_rules(self):
+ admin_ctxt = context.get_admin_context()
+ instance_ref = self._create_instance_ref()
+ network_info = fake_network.fake_get_instance_nw_info(self.stubs, 1, 1)
+ secgroup = self._create_test_security_group()
+ db.instance_add_security_group(admin_ctxt, instance_ref['uuid'],
+ secgroup['id'])
+ self.fw.prepare_instance_filter(instance_ref, network_info)
+ self.fw.instances[instance_ref['id']] = instance_ref
+ self._validate_security_group()
+ # add a rule to the security group
+ db.security_group_rule_create(admin_ctxt,
+ {'parent_group_id': secgroup['id'],
+ 'protocol': 'udp',
+ 'from_port': 200,
+ 'to_port': 299,
+ 'cidr': '192.168.99.0/24'})
+ #validate the extra rule
+ self.fw.refresh_security_group_rules(secgroup)
+ regex = re.compile('-A .* -j ACCEPT -p udp --dport 200:299'
+ ' -s 192.168.99.0/24')
+ self.assertTrue(len(filter(regex.match, self._out_rules)) > 0,
+ "Rules were not updated properly."
+ "The rule for UDP acceptance is missing")
diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py
index 64e7bc3fc..6e24bdc34 100644
--- a/nova/tests/xenapi/stubs.py
+++ b/nova/tests/xenapi/stubs.py
@@ -7,7 +7,6 @@
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
-#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -17,6 +16,7 @@
"""Stubouts, mocks and fixtures for the test suite"""
import eventlet
+import json
import random
from nova.virt import xenapi_conn
@@ -27,6 +27,17 @@ from nova.virt.xenapi import vmops
from nova import utils
+def stubout_firewall_driver(stubs, conn):
+
+ def fake_none(self, *args):
+ return
+
+ vmops = conn._vmops
+ stubs.Set(vmops.firewall_driver, 'setup_basic_filtering', fake_none)
+ stubs.Set(vmops.firewall_driver, 'prepare_instance_filter', fake_none)
+ stubs.Set(vmops.firewall_driver, 'instance_filter_exists', fake_none)
+
+
def stubout_instance_snapshot(stubs):
@classmethod
def fake_fetch_image(cls, context, session, instance, image, user,
@@ -42,7 +53,7 @@ def stubout_instance_snapshot(stubs):
stubs.Set(vm_utils, '_wait_for_vhd_coalesce', fake_wait_for_vhd_coalesce)
-def stubout_session(stubs, cls, product_version=None):
+def stubout_session(stubs, cls, product_version=None, **opt_args):
"""Stubs out three methods from XenAPISession"""
def fake_import(self):
"""Stubs out get_imported_xenapi of XenAPISession"""
@@ -51,7 +62,7 @@ def stubout_session(stubs, cls, product_version=None):
return __import__(fake_module, globals(), locals(), from_list, -1)
stubs.Set(xenapi_conn.XenAPISession, '_create_session',
- lambda s, url: cls(url))
+ lambda s, url: cls(url, **opt_args))
stubs.Set(xenapi_conn.XenAPISession, 'get_imported_xenapi',
fake_import)
if product_version is None:
@@ -90,7 +101,7 @@ def stubout_is_vdi_pv(stubs):
def stubout_determine_is_pv_objectstore(stubs):
- """Assumes VMs never have PV kernels"""
+ """Assumes VMs stu have PV kernels"""
@classmethod
def f(cls, *args):
@@ -98,6 +109,16 @@ def stubout_determine_is_pv_objectstore(stubs):
stubs.Set(vm_utils.VMHelper, '_determine_is_pv_objectstore', f)
+def stubout_is_snapshot(stubs):
+ """ Always returns true
+ xenapi fake driver does not create vmrefs for snapshots """
+
+ @classmethod
+ def f(cls, *args):
+ return True
+ stubs.Set(vm_utils.VMHelper, 'is_snapshot', f)
+
+
def stubout_lookup_image(stubs):
"""Simulates a failure in lookup image."""
def f(_1, _2, _3, _4):
@@ -140,6 +161,16 @@ def _make_fake_vdi():
class FakeSessionForVMTests(fake.SessionBase):
""" Stubs out a XenAPISession for VM tests """
+
+ _fake_iptables_save_output = \
+ "# Generated by iptables-save v1.4.10 on Sun Nov 6 22:49:02 2011\n"\
+ "*filter\n"\
+ ":INPUT ACCEPT [0:0]\n"\
+ ":FORWARD ACCEPT [0:0]\n"\
+ ":OUTPUT ACCEPT [0:0]\n"\
+ "COMMIT\n"\
+ "# Completed on Sun Nov 6 22:49:02 2011\n"
+
def __init__(self, uri):
super(FakeSessionForVMTests, self).__init__(uri)
@@ -147,6 +178,9 @@ class FakeSessionForVMTests(fake.SessionBase):
if (plugin, method) == ('glance', 'download_vhd'):
return fake.as_json(dict(vdi_type='os',
vdi_uuid=_make_fake_vdi()))
+ elif (plugin, method) == ("xenhost", "iptables_config"):
+ return fake.as_json(out=self._fake_iptables_save_output,
+ err='')
else:
return (super(FakeSessionForVMTests, self).
host_call_plugin(_1, _2, plugin, method, _5))
@@ -196,6 +230,53 @@ class FakeSessionForVMTests(fake.SessionBase):
pass
+class FakeSessionForFirewallTests(FakeSessionForVMTests):
+ """ Stubs out a XenApi Session for doing IPTable Firewall tests """
+
+ def __init__(self, uri, test_case=None):
+ super(FakeSessionForFirewallTests, self).__init__(uri)
+ if hasattr(test_case, '_in_filter_rules'):
+ self._in_filter_rules = test_case._in_filter_rules
+ if hasattr(test_case, '_in6_filter_rules'):
+ self._in6_filter_rules = test_case._in6_filter_rules
+ if hasattr(test_case, '_in_nat_rules'):
+ self._in_nat_rules = test_case._in_nat_rules
+ self._test_case = test_case
+
+ def host_call_plugin(self, _1, _2, plugin, method, args):
+ """Mock method four host_call_plugin to be used in unit tests
+ for the dom0 iptables Firewall drivers for XenAPI
+
+ """
+ if plugin == "xenhost" and method == "iptables_config":
+ # The command to execute is a json-encoded list
+ cmd_args = args.get('cmd_args', None)
+ cmd = json.loads(cmd_args)
+ if not cmd:
+ ret_str = ''
+ else:
+ output = ''
+ process_input = args.get('process_input', None)
+ if cmd == ['ip6tables-save', '-t', 'filter']:
+ output = '\n'.join(self._in6_filter_rules)
+ if cmd == ['iptables-save', '-t', 'filter']:
+ output = '\n'.join(self._in_filter_rules)
+ if cmd == ['iptables-save', '-t', 'nat']:
+ output = '\n'.join(self._in_nat_rules)
+ if cmd == ['iptables-restore', ]:
+ lines = process_input.split('\n')
+ if '*filter' in lines:
+ if self._test_case is not None:
+ self._test_case._out_rules = lines
+ output = '\n'.join(lines)
+ if cmd == ['ip6tables-restore', ]:
+ lines = process_input.split('\n')
+ if '*filter' in lines:
+ output = '\n'.join(lines)
+ ret_str = fake.as_json(out=output, err='')
+ return ret_str
+
+
def stub_out_vm_methods(stubs):
def fake_shutdown(self, inst, vm, method="clean"):
pass