From 3e74c0017e0b1ab209bc066cc0cec6c151b69b83 Mon Sep 17 00:00:00 2001 From: Dirk Mueller Date: Wed, 19 Jun 2013 00:08:57 +0200 Subject: Add IpFilter, IPNetnsExecFilter and EnvFilter These filters have been implemented in Quantum before: - IpFilter provides support for filtering ip commands - IpNetnsExecFilter is a chaining command filter that verifies that the command to be executed by ip netns exec is covered by other established filters. IpNetnsExecFilter has been restricted to ensure that the filter chains have all matching filters run as the same user. EnvFilter is a new filter derived from CommandFilter that allows a Command to be optionally prefixed by "env" and a specific list of environment variables. This is intended to replace the specific DnsmasqFilter and DnsmasqNetnsFilter in the future when all consumers have been updated. Implements bp rootwrap-quantum-features Change-Id: I0cf39967126e99a8dc53d21bee824a0fe2f63aa0 --- openstack/common/rootwrap/filters.py | 113 +++++++++++++++++++++++++++++++ openstack/common/rootwrap/wrapper.py | 14 ++++ tests/unit/test_rootwrap.py | 127 +++++++++++++++++++++++++++++++++-- 3 files changed, 250 insertions(+), 4 deletions(-) diff --git a/openstack/common/rootwrap/filters.py b/openstack/common/rootwrap/filters.py index 0cc55ce..dfec412 100644 --- a/openstack/common/rootwrap/filters.py +++ b/openstack/common/rootwrap/filters.py @@ -235,3 +235,116 @@ class ReadFileFilter(CommandFilter): if len(userargs) != 2: return False return True + + +class IpFilter(CommandFilter): + """Specific filter for the ip utility to that does not match exec.""" + + def match(self, userargs): + if userargs[0] == 'ip': + if userargs[1] == 'netns': + return (userargs[2] in ('list', 'add', 'delete')) + else: + return True + + +class EnvFilter(CommandFilter): + """Specific filter for the env utility. + + Behaves like CommandFilter, except that it handles + leading env A=B.. strings appropriately. + """ + + def _extract_env(self, arglist): + """Extract all leading NAME=VALUE arguments from arglist.""" + + envs = set() + for arg in arglist: + if '=' not in arg: + break + envs.add(arg.partition('=')[0]) + return envs + + def __init__(self, exec_path, run_as, *args): + super(EnvFilter, self).__init__(exec_path, run_as, *args) + + env_list = self._extract_env(self.args) + # Set exec_path to X when args are in the form of + # env A=a B=b C=c X Y Z + if "env" in exec_path and len(env_list) < len(self.args): + self.exec_path = self.args[len(env_list)] + + def match(self, userargs): + # ignore leading 'env' + if userargs[0] == 'env': + userargs.pop(0) + + # require one additional argument after configured ones + if len(userargs) < len(self.args): + return False + + # extract all env args + user_envs = self._extract_env(userargs) + filter_envs = self._extract_env(self.args) + user_command = userargs[len(user_envs):len(user_envs) + 1] + + # match first non-env argument with CommandFilter + return (super(EnvFilter, self).match(user_command) + and len(filter_envs) and user_envs == filter_envs) + + def exec_args(self, userargs): + args = userargs[:] + + # ignore leading 'env' + if args[0] == 'env': + args.pop(0) + + # Throw away leading NAME=VALUE arguments + while args and '=' in args[0]: + args.pop(0) + + return args + + def get_command(self, userargs, exec_dirs=[]): + to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path + return [to_exec] + self.exec_args(userargs)[1:] + + def get_environment(self, userargs): + env = os.environ.copy() + + # ignore leading 'env' + if userargs[0] == 'env': + userargs.pop(0) + + # Handle leading NAME=VALUE pairs + for a in userargs: + env_name, equals, env_value = a.partition('=') + if not equals: + break + if env_name and env_value: + env[env_name] = env_value + + return env + + +class ChainingFilter(CommandFilter): + def exec_args(self, userargs): + return [] + + +class IpNetnsExecFilter(ChainingFilter): + """Specific filter for the ip utility to that does match exec.""" + + def match(self, userargs): + # Network namespaces currently require root + # require argument + if self.run_as != "root" or len(userargs) < 4: + return False + + return (userargs[:3] == ['ip', 'netns', 'exec']) + + def exec_args(self, userargs): + args = userargs[4:] + if args: + args[0] = os.path.basename(args[0]) + return args diff --git a/openstack/common/rootwrap/wrapper.py b/openstack/common/rootwrap/wrapper.py index 5390c1b..df1a9f4 100644 --- a/openstack/common/rootwrap/wrapper.py +++ b/openstack/common/rootwrap/wrapper.py @@ -131,6 +131,20 @@ def match_filter(filter_list, userargs, exec_dirs=[]): for f in filter_list: if f.match(userargs): + if isinstance(f, filters.ChainingFilter): + # This command calls exec verify that remaining args + # matches another filter. + def non_chain_filter(fltr): + return (fltr.run_as == f.run_as + and not isinstance(fltr, filters.ChainingFilter)) + + leaf_filters = [fltr for fltr in filter_list + if non_chain_filter(fltr)] + args = f.exec_args(userargs) + if (not args or not match_filter(leaf_filters, + args, exec_dirs=exec_dirs)): + continue + # Try other filters if executable is absent if not f.get_exec(exec_dirs=exec_dirs): if not first_not_executable_filter: diff --git a/tests/unit/test_rootwrap.py b/tests/unit/test_rootwrap.py index 0e08b5e..02789ec 100644 --- a/tests/unit/test_rootwrap.py +++ b/tests/unit/test_rootwrap.py @@ -61,10 +61,11 @@ class RootwrapTestCase(utils.BaseTestCase): self.assertRaises(wrapper.NoFilterMatched, wrapper.match_filter, self.filters, invalid) - def _test_DnsmasqFilter(self, filter_class, config_file_arg): + def _test_EnvFilter_as_DnsMasq(self, config_file_arg): usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar', 'dnsmasq', 'foo'] - f = filter_class("/usr/bin/dnsmasq", "root") + f = filters.EnvFilter("env", "root", config_file_arg + '=A', + 'NETWORK_ID=', "/usr/bin/dnsmasq") self.assertTrue(f.match(usercmd)) self.assertEqual(f.get_command(usercmd), ['/usr/bin/dnsmasq', 'foo']) env = f.get_environment(usercmd) @@ -72,10 +73,68 @@ class RootwrapTestCase(utils.BaseTestCase): self.assertEqual(env.get('NETWORK_ID'), 'foobar') def test_DnsmasqFilter(self): - self._test_DnsmasqFilter(filters.DnsmasqFilter, 'CONFIG_FILE') + self._test_EnvFilter_as_DnsMasq('CONFIG_FILE') def test_DeprecatedDnsmasqFilter(self): - self._test_DnsmasqFilter(filters.DeprecatedDnsmasqFilter, 'FLAGFILE') + self._test_EnvFilter_as_DnsMasq('FLAGFILE') + + def test_EnvFilter(self): + envset = ['A=/some/thing', 'B=somethingelse'] + envcmd = ['env'] + envset + realcmd = ['sleep', '10'] + usercmd = envcmd + realcmd + + f = filters.EnvFilter("env", "root", "A=", "B=ignored", "sleep") + # accept with leading env + self.assertTrue(f.match(envcmd + ["sleep"])) + # accept without leading env + self.assertTrue(f.match(envset + ["sleep"])) + + # any other command does not match + self.assertFalse(f.match(envcmd + ["sleep2"])) + self.assertFalse(f.match(envset + ["sleep2"])) + + # accept any trailing arguments + self.assertTrue(f.match(usercmd)) + + # require given environment variables to match + self.assertFalse(f.match([envcmd, 'C=ELSE'])) + self.assertFalse(f.match(['env', 'C=xx'])) + self.assertFalse(f.match(['env', 'A=xx'])) + + # require env command to be given + # (otherwise CommandFilters should match + self.assertFalse(f.match(realcmd)) + # require command to match + self.assertFalse(f.match(envcmd)) + self.assertFalse(f.match(envcmd[1:])) + + # ensure that the env command is stripped when executing + self.assertEqual(f.exec_args(usercmd), realcmd) + env = f.get_environment(usercmd) + # check that environment variables are set + self.assertEqual(env.get('A'), '/some/thing') + self.assertEqual(env.get('B'), 'somethingelse') + self.assertFalse('sleep' in env.keys()) + + def test_EnvFilter_without_leading_env(self): + envset = ['A=/some/thing', 'B=somethingelse'] + envcmd = ['env'] + envset + realcmd = ['sleep', '10'] + + f = filters.EnvFilter("sleep", "root", "A=", "B=ignored") + + # accept without leading env + self.assertTrue(f.match(envset + ["sleep"])) + + self.assertEqual(f.get_command(envcmd + realcmd), realcmd) + self.assertEqual(f.get_command(envset + realcmd), realcmd) + + env = f.get_environment(envset + realcmd) + # check that environment variables are set + self.assertEqual(env.get('A'), '/some/thing') + self.assertEqual(env.get('B'), 'somethingelse') + self.assertFalse('sleep' in env.keys()) def test_KillFilter(self): if not os.path.exists("/proc/%d" % os.getpid()): @@ -169,6 +228,66 @@ class RootwrapTestCase(utils.BaseTestCase): self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn]) self.assertTrue(f.match(usercmd)) + def test_IpFilter_non_netns(self): + f = filters.IpFilter('/sbin/ip', 'root') + self.assertTrue(f.match(['ip', 'link', 'list'])) + + def _test_IpFilter_netns_helper(self, action): + f = filters.IpFilter('/sbin/ip', 'root') + self.assertTrue(f.match(['ip', 'link', action])) + + def test_IpFilter_netns_add(self): + self._test_IpFilter_netns_helper('add') + + def test_IpFilter_netns_delete(self): + self._test_IpFilter_netns_helper('delete') + + def test_IpFilter_netns_list(self): + self._test_IpFilter_netns_helper('list') + + def test_IpNetnsExecFilter_match(self): + f = filters.IpNetnsExecFilter('/sbin/ip', 'root') + self.assertTrue( + f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'])) + + def test_IpNetnsExecFilter_nomatch(self): + f = filters.IpNetnsExecFilter('/sbin/ip', 'root') + self.assertFalse(f.match(['ip', 'link', 'list'])) + + # verify that at least a NS is given + self.assertFalse(f.match(['ip', 'netns', 'exec'])) + + def test_IpNetnsExecFilter_nomatch_nonroot(self): + f = filters.IpNetnsExecFilter('/sbin/ip', 'user') + self.assertFalse( + f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'])) + + def test_match_filter_recurses_exec_command_filter_matches(self): + filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'), + filters.IpFilter('/sbin/ip', 'root')] + args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'] + + self.assertIsNotNone(wrapper.match_filter(filter_list, args)) + + def test_match_filter_recurses_exec_command_matches_user(self): + filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'), + filters.IpFilter('/sbin/ip', 'user')] + args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'] + + # Currently ip netns exec requires root, so verify that + # no non-root filter is matched, as that would escalate privileges + self.assertRaises(wrapper.NoFilterMatched, + wrapper.match_filter, filter_list, args) + + def test_match_filter_recurses_exec_command_filter_does_not_match(self): + filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'), + filters.IpFilter('/sbin/ip', 'root')] + args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar', + 'ip', 'link', 'list'] + + self.assertRaises(wrapper.NoFilterMatched, + wrapper.match_filter, filter_list, args) + def test_exec_dirs_search(self): # This test supposes you have /bin/cat or /usr/bin/cat locally f = filters.CommandFilter("cat", "root") -- cgit