summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorDirk Mueller <dirk@dmllr.de>2013-06-19 00:08:57 +0200
committerDirk Mueller <dirk@dmllr.de>2013-06-20 14:36:43 +0200
commit3e74c0017e0b1ab209bc066cc0cec6c151b69b83 (patch)
tree22481be81a272bd94a9bc28a9a4baeff0c333852 /tests
parentfb13686a00e933c17bca163b51fb3d7119d34e5a (diff)
downloadoslo-3e74c0017e0b1ab209bc066cc0cec6c151b69b83.tar.gz
oslo-3e74c0017e0b1ab209bc066cc0cec6c151b69b83.tar.xz
oslo-3e74c0017e0b1ab209bc066cc0cec6c151b69b83.zip
Add IpFilter, IPNetnsExecFilter and EnvFilter
These filters have been implemented in Quantum before: - IpFilter provides support for filtering ip commands - IpNetnsExecFilter is a chaining command filter that verifies that the command to be executed by ip netns exec is covered by other established filters. IpNetnsExecFilter has been restricted to ensure that the filter chains have all matching filters run as the same user. EnvFilter is a new filter derived from CommandFilter that allows a Command to be optionally prefixed by "env" and a specific list of environment variables. This is intended to replace the specific DnsmasqFilter and DnsmasqNetnsFilter in the future when all consumers have been updated. Implements bp rootwrap-quantum-features Change-Id: I0cf39967126e99a8dc53d21bee824a0fe2f63aa0
Diffstat (limited to 'tests')
-rw-r--r--tests/unit/test_rootwrap.py127
1 files changed, 123 insertions, 4 deletions
diff --git a/tests/unit/test_rootwrap.py b/tests/unit/test_rootwrap.py
index 0e08b5e..02789ec 100644
--- a/tests/unit/test_rootwrap.py
+++ b/tests/unit/test_rootwrap.py
@@ -61,10 +61,11 @@ class RootwrapTestCase(utils.BaseTestCase):
self.assertRaises(wrapper.NoFilterMatched,
wrapper.match_filter, self.filters, invalid)
- def _test_DnsmasqFilter(self, filter_class, config_file_arg):
+ def _test_EnvFilter_as_DnsMasq(self, config_file_arg):
usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar',
'dnsmasq', 'foo']
- f = filter_class("/usr/bin/dnsmasq", "root")
+ f = filters.EnvFilter("env", "root", config_file_arg + '=A',
+ 'NETWORK_ID=', "/usr/bin/dnsmasq")
self.assertTrue(f.match(usercmd))
self.assertEqual(f.get_command(usercmd), ['/usr/bin/dnsmasq', 'foo'])
env = f.get_environment(usercmd)
@@ -72,10 +73,68 @@ class RootwrapTestCase(utils.BaseTestCase):
self.assertEqual(env.get('NETWORK_ID'), 'foobar')
def test_DnsmasqFilter(self):
- self._test_DnsmasqFilter(filters.DnsmasqFilter, 'CONFIG_FILE')
+ self._test_EnvFilter_as_DnsMasq('CONFIG_FILE')
def test_DeprecatedDnsmasqFilter(self):
- self._test_DnsmasqFilter(filters.DeprecatedDnsmasqFilter, 'FLAGFILE')
+ self._test_EnvFilter_as_DnsMasq('FLAGFILE')
+
+ def test_EnvFilter(self):
+ envset = ['A=/some/thing', 'B=somethingelse']
+ envcmd = ['env'] + envset
+ realcmd = ['sleep', '10']
+ usercmd = envcmd + realcmd
+
+ f = filters.EnvFilter("env", "root", "A=", "B=ignored", "sleep")
+ # accept with leading env
+ self.assertTrue(f.match(envcmd + ["sleep"]))
+ # accept without leading env
+ self.assertTrue(f.match(envset + ["sleep"]))
+
+ # any other command does not match
+ self.assertFalse(f.match(envcmd + ["sleep2"]))
+ self.assertFalse(f.match(envset + ["sleep2"]))
+
+ # accept any trailing arguments
+ self.assertTrue(f.match(usercmd))
+
+ # require given environment variables to match
+ self.assertFalse(f.match([envcmd, 'C=ELSE']))
+ self.assertFalse(f.match(['env', 'C=xx']))
+ self.assertFalse(f.match(['env', 'A=xx']))
+
+ # require env command to be given
+ # (otherwise CommandFilters should match
+ self.assertFalse(f.match(realcmd))
+ # require command to match
+ self.assertFalse(f.match(envcmd))
+ self.assertFalse(f.match(envcmd[1:]))
+
+ # ensure that the env command is stripped when executing
+ self.assertEqual(f.exec_args(usercmd), realcmd)
+ env = f.get_environment(usercmd)
+ # check that environment variables are set
+ self.assertEqual(env.get('A'), '/some/thing')
+ self.assertEqual(env.get('B'), 'somethingelse')
+ self.assertFalse('sleep' in env.keys())
+
+ def test_EnvFilter_without_leading_env(self):
+ envset = ['A=/some/thing', 'B=somethingelse']
+ envcmd = ['env'] + envset
+ realcmd = ['sleep', '10']
+
+ f = filters.EnvFilter("sleep", "root", "A=", "B=ignored")
+
+ # accept without leading env
+ self.assertTrue(f.match(envset + ["sleep"]))
+
+ self.assertEqual(f.get_command(envcmd + realcmd), realcmd)
+ self.assertEqual(f.get_command(envset + realcmd), realcmd)
+
+ env = f.get_environment(envset + realcmd)
+ # check that environment variables are set
+ self.assertEqual(env.get('A'), '/some/thing')
+ self.assertEqual(env.get('B'), 'somethingelse')
+ self.assertFalse('sleep' in env.keys())
def test_KillFilter(self):
if not os.path.exists("/proc/%d" % os.getpid()):
@@ -169,6 +228,66 @@ class RootwrapTestCase(utils.BaseTestCase):
self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn])
self.assertTrue(f.match(usercmd))
+ def test_IpFilter_non_netns(self):
+ f = filters.IpFilter('/sbin/ip', 'root')
+ self.assertTrue(f.match(['ip', 'link', 'list']))
+
+ def _test_IpFilter_netns_helper(self, action):
+ f = filters.IpFilter('/sbin/ip', 'root')
+ self.assertTrue(f.match(['ip', 'link', action]))
+
+ def test_IpFilter_netns_add(self):
+ self._test_IpFilter_netns_helper('add')
+
+ def test_IpFilter_netns_delete(self):
+ self._test_IpFilter_netns_helper('delete')
+
+ def test_IpFilter_netns_list(self):
+ self._test_IpFilter_netns_helper('list')
+
+ def test_IpNetnsExecFilter_match(self):
+ f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
+ self.assertTrue(
+ f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
+
+ def test_IpNetnsExecFilter_nomatch(self):
+ f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
+ self.assertFalse(f.match(['ip', 'link', 'list']))
+
+ # verify that at least a NS is given
+ self.assertFalse(f.match(['ip', 'netns', 'exec']))
+
+ def test_IpNetnsExecFilter_nomatch_nonroot(self):
+ f = filters.IpNetnsExecFilter('/sbin/ip', 'user')
+ self.assertFalse(
+ f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
+
+ def test_match_filter_recurses_exec_command_filter_matches(self):
+ filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
+ filters.IpFilter('/sbin/ip', 'root')]
+ args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
+
+ self.assertIsNotNone(wrapper.match_filter(filter_list, args))
+
+ def test_match_filter_recurses_exec_command_matches_user(self):
+ filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
+ filters.IpFilter('/sbin/ip', 'user')]
+ args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
+
+ # Currently ip netns exec requires root, so verify that
+ # no non-root filter is matched, as that would escalate privileges
+ self.assertRaises(wrapper.NoFilterMatched,
+ wrapper.match_filter, filter_list, args)
+
+ def test_match_filter_recurses_exec_command_filter_does_not_match(self):
+ filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
+ filters.IpFilter('/sbin/ip', 'root')]
+ args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
+ 'ip', 'link', 'list']
+
+ self.assertRaises(wrapper.NoFilterMatched,
+ wrapper.match_filter, filter_list, args)
+
def test_exec_dirs_search(self):
# This test supposes you have /bin/cat or /usr/bin/cat locally
f = filters.CommandFilter("cat", "root")