diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-06-26 02:13:59 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-06-26 02:13:59 +0000 |
commit | 18dc396748114e5d8af0a9f84ba944e532815b65 (patch) | |
tree | 954856ef7b4f6cb841d9edcfca6b6be1352c6633 | |
parent | 5229aca95f5c29b81bf423fcf438cc7223fa22ad (diff) | |
parent | 3e74c0017e0b1ab209bc066cc0cec6c151b69b83 (diff) | |
download | oslo-18dc396748114e5d8af0a9f84ba944e532815b65.tar.gz oslo-18dc396748114e5d8af0a9f84ba944e532815b65.tar.xz oslo-18dc396748114e5d8af0a9f84ba944e532815b65.zip |
Merge "Add IpFilter, IPNetnsExecFilter and EnvFilter"
-rw-r--r-- | openstack/common/rootwrap/filters.py | 113 | ||||
-rw-r--r-- | openstack/common/rootwrap/wrapper.py | 14 | ||||
-rw-r--r-- | tests/unit/test_rootwrap.py | 127 |
3 files changed, 250 insertions, 4 deletions
diff --git a/openstack/common/rootwrap/filters.py b/openstack/common/rootwrap/filters.py index 0cc55ce..dfec412 100644 --- a/openstack/common/rootwrap/filters.py +++ b/openstack/common/rootwrap/filters.py @@ -235,3 +235,116 @@ class ReadFileFilter(CommandFilter): if len(userargs) != 2: return False return True + + +class IpFilter(CommandFilter): + """Specific filter for the ip utility to that does not match exec.""" + + def match(self, userargs): + if userargs[0] == 'ip': + if userargs[1] == 'netns': + return (userargs[2] in ('list', 'add', 'delete')) + else: + return True + + +class EnvFilter(CommandFilter): + """Specific filter for the env utility. + + Behaves like CommandFilter, except that it handles + leading env A=B.. strings appropriately. + """ + + def _extract_env(self, arglist): + """Extract all leading NAME=VALUE arguments from arglist.""" + + envs = set() + for arg in arglist: + if '=' not in arg: + break + envs.add(arg.partition('=')[0]) + return envs + + def __init__(self, exec_path, run_as, *args): + super(EnvFilter, self).__init__(exec_path, run_as, *args) + + env_list = self._extract_env(self.args) + # Set exec_path to X when args are in the form of + # env A=a B=b C=c X Y Z + if "env" in exec_path and len(env_list) < len(self.args): + self.exec_path = self.args[len(env_list)] + + def match(self, userargs): + # ignore leading 'env' + if userargs[0] == 'env': + userargs.pop(0) + + # require one additional argument after configured ones + if len(userargs) < len(self.args): + return False + + # extract all env args + user_envs = self._extract_env(userargs) + filter_envs = self._extract_env(self.args) + user_command = userargs[len(user_envs):len(user_envs) + 1] + + # match first non-env argument with CommandFilter + return (super(EnvFilter, self).match(user_command) + and len(filter_envs) and user_envs == filter_envs) + + def exec_args(self, userargs): + args = userargs[:] + + # ignore leading 'env' + if args[0] == 'env': + args.pop(0) + + # Throw away leading NAME=VALUE arguments + while args and '=' in args[0]: + args.pop(0) + + return args + + def get_command(self, userargs, exec_dirs=[]): + to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path + return [to_exec] + self.exec_args(userargs)[1:] + + def get_environment(self, userargs): + env = os.environ.copy() + + # ignore leading 'env' + if userargs[0] == 'env': + userargs.pop(0) + + # Handle leading NAME=VALUE pairs + for a in userargs: + env_name, equals, env_value = a.partition('=') + if not equals: + break + if env_name and env_value: + env[env_name] = env_value + + return env + + +class ChainingFilter(CommandFilter): + def exec_args(self, userargs): + return [] + + +class IpNetnsExecFilter(ChainingFilter): + """Specific filter for the ip utility to that does match exec.""" + + def match(self, userargs): + # Network namespaces currently require root + # require <ns> argument + if self.run_as != "root" or len(userargs) < 4: + return False + + return (userargs[:3] == ['ip', 'netns', 'exec']) + + def exec_args(self, userargs): + args = userargs[4:] + if args: + args[0] = os.path.basename(args[0]) + return args diff --git a/openstack/common/rootwrap/wrapper.py b/openstack/common/rootwrap/wrapper.py index 5390c1b..df1a9f4 100644 --- a/openstack/common/rootwrap/wrapper.py +++ b/openstack/common/rootwrap/wrapper.py @@ -131,6 +131,20 @@ def match_filter(filter_list, userargs, exec_dirs=[]): for f in filter_list: if f.match(userargs): + if isinstance(f, filters.ChainingFilter): + # This command calls exec verify that remaining args + # matches another filter. + def non_chain_filter(fltr): + return (fltr.run_as == f.run_as + and not isinstance(fltr, filters.ChainingFilter)) + + leaf_filters = [fltr for fltr in filter_list + if non_chain_filter(fltr)] + args = f.exec_args(userargs) + if (not args or not match_filter(leaf_filters, + args, exec_dirs=exec_dirs)): + continue + # Try other filters if executable is absent if not f.get_exec(exec_dirs=exec_dirs): if not first_not_executable_filter: diff --git a/tests/unit/test_rootwrap.py b/tests/unit/test_rootwrap.py index 0e08b5e..02789ec 100644 --- a/tests/unit/test_rootwrap.py +++ b/tests/unit/test_rootwrap.py @@ -61,10 +61,11 @@ class RootwrapTestCase(utils.BaseTestCase): self.assertRaises(wrapper.NoFilterMatched, wrapper.match_filter, self.filters, invalid) - def _test_DnsmasqFilter(self, filter_class, config_file_arg): + def _test_EnvFilter_as_DnsMasq(self, config_file_arg): usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar', 'dnsmasq', 'foo'] - f = filter_class("/usr/bin/dnsmasq", "root") + f = filters.EnvFilter("env", "root", config_file_arg + '=A', + 'NETWORK_ID=', "/usr/bin/dnsmasq") self.assertTrue(f.match(usercmd)) self.assertEqual(f.get_command(usercmd), ['/usr/bin/dnsmasq', 'foo']) env = f.get_environment(usercmd) @@ -72,10 +73,68 @@ class RootwrapTestCase(utils.BaseTestCase): self.assertEqual(env.get('NETWORK_ID'), 'foobar') def test_DnsmasqFilter(self): - self._test_DnsmasqFilter(filters.DnsmasqFilter, 'CONFIG_FILE') + self._test_EnvFilter_as_DnsMasq('CONFIG_FILE') def test_DeprecatedDnsmasqFilter(self): - self._test_DnsmasqFilter(filters.DeprecatedDnsmasqFilter, 'FLAGFILE') + self._test_EnvFilter_as_DnsMasq('FLAGFILE') + + def test_EnvFilter(self): + envset = ['A=/some/thing', 'B=somethingelse'] + envcmd = ['env'] + envset + realcmd = ['sleep', '10'] + usercmd = envcmd + realcmd + + f = filters.EnvFilter("env", "root", "A=", "B=ignored", "sleep") + # accept with leading env + self.assertTrue(f.match(envcmd + ["sleep"])) + # accept without leading env + self.assertTrue(f.match(envset + ["sleep"])) + + # any other command does not match + self.assertFalse(f.match(envcmd + ["sleep2"])) + self.assertFalse(f.match(envset + ["sleep2"])) + + # accept any trailing arguments + self.assertTrue(f.match(usercmd)) + + # require given environment variables to match + self.assertFalse(f.match([envcmd, 'C=ELSE'])) + self.assertFalse(f.match(['env', 'C=xx'])) + self.assertFalse(f.match(['env', 'A=xx'])) + + # require env command to be given + # (otherwise CommandFilters should match + self.assertFalse(f.match(realcmd)) + # require command to match + self.assertFalse(f.match(envcmd)) + self.assertFalse(f.match(envcmd[1:])) + + # ensure that the env command is stripped when executing + self.assertEqual(f.exec_args(usercmd), realcmd) + env = f.get_environment(usercmd) + # check that environment variables are set + self.assertEqual(env.get('A'), '/some/thing') + self.assertEqual(env.get('B'), 'somethingelse') + self.assertFalse('sleep' in env.keys()) + + def test_EnvFilter_without_leading_env(self): + envset = ['A=/some/thing', 'B=somethingelse'] + envcmd = ['env'] + envset + realcmd = ['sleep', '10'] + + f = filters.EnvFilter("sleep", "root", "A=", "B=ignored") + + # accept without leading env + self.assertTrue(f.match(envset + ["sleep"])) + + self.assertEqual(f.get_command(envcmd + realcmd), realcmd) + self.assertEqual(f.get_command(envset + realcmd), realcmd) + + env = f.get_environment(envset + realcmd) + # check that environment variables are set + self.assertEqual(env.get('A'), '/some/thing') + self.assertEqual(env.get('B'), 'somethingelse') + self.assertFalse('sleep' in env.keys()) def test_KillFilter(self): if not os.path.exists("/proc/%d" % os.getpid()): @@ -169,6 +228,66 @@ class RootwrapTestCase(utils.BaseTestCase): self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn]) self.assertTrue(f.match(usercmd)) + def test_IpFilter_non_netns(self): + f = filters.IpFilter('/sbin/ip', 'root') + self.assertTrue(f.match(['ip', 'link', 'list'])) + + def _test_IpFilter_netns_helper(self, action): + f = filters.IpFilter('/sbin/ip', 'root') + self.assertTrue(f.match(['ip', 'link', action])) + + def test_IpFilter_netns_add(self): + self._test_IpFilter_netns_helper('add') + + def test_IpFilter_netns_delete(self): + self._test_IpFilter_netns_helper('delete') + + def test_IpFilter_netns_list(self): + self._test_IpFilter_netns_helper('list') + + def test_IpNetnsExecFilter_match(self): + f = filters.IpNetnsExecFilter('/sbin/ip', 'root') + self.assertTrue( + f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'])) + + def test_IpNetnsExecFilter_nomatch(self): + f = filters.IpNetnsExecFilter('/sbin/ip', 'root') + self.assertFalse(f.match(['ip', 'link', 'list'])) + + # verify that at least a NS is given + self.assertFalse(f.match(['ip', 'netns', 'exec'])) + + def test_IpNetnsExecFilter_nomatch_nonroot(self): + f = filters.IpNetnsExecFilter('/sbin/ip', 'user') + self.assertFalse( + f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'])) + + def test_match_filter_recurses_exec_command_filter_matches(self): + filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'), + filters.IpFilter('/sbin/ip', 'root')] + args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'] + + self.assertIsNotNone(wrapper.match_filter(filter_list, args)) + + def test_match_filter_recurses_exec_command_matches_user(self): + filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'), + filters.IpFilter('/sbin/ip', 'user')] + args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list'] + + # Currently ip netns exec requires root, so verify that + # no non-root filter is matched, as that would escalate privileges + self.assertRaises(wrapper.NoFilterMatched, + wrapper.match_filter, filter_list, args) + + def test_match_filter_recurses_exec_command_filter_does_not_match(self): + filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'), + filters.IpFilter('/sbin/ip', 'root')] + args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar', + 'ip', 'link', 'list'] + + self.assertRaises(wrapper.NoFilterMatched, + wrapper.match_filter, filter_list, args) + def test_exec_dirs_search(self): # This test supposes you have /bin/cat or /usr/bin/cat locally f = filters.CommandFilter("cat", "root") |