summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-06-26 02:13:59 +0000
committerGerrit Code Review <review@openstack.org>2013-06-26 02:13:59 +0000
commit18dc396748114e5d8af0a9f84ba944e532815b65 (patch)
tree954856ef7b4f6cb841d9edcfca6b6be1352c6633
parent5229aca95f5c29b81bf423fcf438cc7223fa22ad (diff)
parent3e74c0017e0b1ab209bc066cc0cec6c151b69b83 (diff)
downloadoslo-18dc396748114e5d8af0a9f84ba944e532815b65.tar.gz
oslo-18dc396748114e5d8af0a9f84ba944e532815b65.tar.xz
oslo-18dc396748114e5d8af0a9f84ba944e532815b65.zip
Merge "Add IpFilter, IPNetnsExecFilter and EnvFilter"
-rw-r--r--openstack/common/rootwrap/filters.py113
-rw-r--r--openstack/common/rootwrap/wrapper.py14
-rw-r--r--tests/unit/test_rootwrap.py127
3 files changed, 250 insertions, 4 deletions
diff --git a/openstack/common/rootwrap/filters.py b/openstack/common/rootwrap/filters.py
index 0cc55ce..dfec412 100644
--- a/openstack/common/rootwrap/filters.py
+++ b/openstack/common/rootwrap/filters.py
@@ -235,3 +235,116 @@ class ReadFileFilter(CommandFilter):
if len(userargs) != 2:
return False
return True
+
+
+class IpFilter(CommandFilter):
+ """Specific filter for the ip utility to that does not match exec."""
+
+ def match(self, userargs):
+ if userargs[0] == 'ip':
+ if userargs[1] == 'netns':
+ return (userargs[2] in ('list', 'add', 'delete'))
+ else:
+ return True
+
+
+class EnvFilter(CommandFilter):
+ """Specific filter for the env utility.
+
+ Behaves like CommandFilter, except that it handles
+ leading env A=B.. strings appropriately.
+ """
+
+ def _extract_env(self, arglist):
+ """Extract all leading NAME=VALUE arguments from arglist."""
+
+ envs = set()
+ for arg in arglist:
+ if '=' not in arg:
+ break
+ envs.add(arg.partition('=')[0])
+ return envs
+
+ def __init__(self, exec_path, run_as, *args):
+ super(EnvFilter, self).__init__(exec_path, run_as, *args)
+
+ env_list = self._extract_env(self.args)
+ # Set exec_path to X when args are in the form of
+ # env A=a B=b C=c X Y Z
+ if "env" in exec_path and len(env_list) < len(self.args):
+ self.exec_path = self.args[len(env_list)]
+
+ def match(self, userargs):
+ # ignore leading 'env'
+ if userargs[0] == 'env':
+ userargs.pop(0)
+
+ # require one additional argument after configured ones
+ if len(userargs) < len(self.args):
+ return False
+
+ # extract all env args
+ user_envs = self._extract_env(userargs)
+ filter_envs = self._extract_env(self.args)
+ user_command = userargs[len(user_envs):len(user_envs) + 1]
+
+ # match first non-env argument with CommandFilter
+ return (super(EnvFilter, self).match(user_command)
+ and len(filter_envs) and user_envs == filter_envs)
+
+ def exec_args(self, userargs):
+ args = userargs[:]
+
+ # ignore leading 'env'
+ if args[0] == 'env':
+ args.pop(0)
+
+ # Throw away leading NAME=VALUE arguments
+ while args and '=' in args[0]:
+ args.pop(0)
+
+ return args
+
+ def get_command(self, userargs, exec_dirs=[]):
+ to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path
+ return [to_exec] + self.exec_args(userargs)[1:]
+
+ def get_environment(self, userargs):
+ env = os.environ.copy()
+
+ # ignore leading 'env'
+ if userargs[0] == 'env':
+ userargs.pop(0)
+
+ # Handle leading NAME=VALUE pairs
+ for a in userargs:
+ env_name, equals, env_value = a.partition('=')
+ if not equals:
+ break
+ if env_name and env_value:
+ env[env_name] = env_value
+
+ return env
+
+
+class ChainingFilter(CommandFilter):
+ def exec_args(self, userargs):
+ return []
+
+
+class IpNetnsExecFilter(ChainingFilter):
+ """Specific filter for the ip utility to that does match exec."""
+
+ def match(self, userargs):
+ # Network namespaces currently require root
+ # require <ns> argument
+ if self.run_as != "root" or len(userargs) < 4:
+ return False
+
+ return (userargs[:3] == ['ip', 'netns', 'exec'])
+
+ def exec_args(self, userargs):
+ args = userargs[4:]
+ if args:
+ args[0] = os.path.basename(args[0])
+ return args
diff --git a/openstack/common/rootwrap/wrapper.py b/openstack/common/rootwrap/wrapper.py
index 5390c1b..df1a9f4 100644
--- a/openstack/common/rootwrap/wrapper.py
+++ b/openstack/common/rootwrap/wrapper.py
@@ -131,6 +131,20 @@ def match_filter(filter_list, userargs, exec_dirs=[]):
for f in filter_list:
if f.match(userargs):
+ if isinstance(f, filters.ChainingFilter):
+ # This command calls exec verify that remaining args
+ # matches another filter.
+ def non_chain_filter(fltr):
+ return (fltr.run_as == f.run_as
+ and not isinstance(fltr, filters.ChainingFilter))
+
+ leaf_filters = [fltr for fltr in filter_list
+ if non_chain_filter(fltr)]
+ args = f.exec_args(userargs)
+ if (not args or not match_filter(leaf_filters,
+ args, exec_dirs=exec_dirs)):
+ continue
+
# Try other filters if executable is absent
if not f.get_exec(exec_dirs=exec_dirs):
if not first_not_executable_filter:
diff --git a/tests/unit/test_rootwrap.py b/tests/unit/test_rootwrap.py
index 0e08b5e..02789ec 100644
--- a/tests/unit/test_rootwrap.py
+++ b/tests/unit/test_rootwrap.py
@@ -61,10 +61,11 @@ class RootwrapTestCase(utils.BaseTestCase):
self.assertRaises(wrapper.NoFilterMatched,
wrapper.match_filter, self.filters, invalid)
- def _test_DnsmasqFilter(self, filter_class, config_file_arg):
+ def _test_EnvFilter_as_DnsMasq(self, config_file_arg):
usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar',
'dnsmasq', 'foo']
- f = filter_class("/usr/bin/dnsmasq", "root")
+ f = filters.EnvFilter("env", "root", config_file_arg + '=A',
+ 'NETWORK_ID=', "/usr/bin/dnsmasq")
self.assertTrue(f.match(usercmd))
self.assertEqual(f.get_command(usercmd), ['/usr/bin/dnsmasq', 'foo'])
env = f.get_environment(usercmd)
@@ -72,10 +73,68 @@ class RootwrapTestCase(utils.BaseTestCase):
self.assertEqual(env.get('NETWORK_ID'), 'foobar')
def test_DnsmasqFilter(self):
- self._test_DnsmasqFilter(filters.DnsmasqFilter, 'CONFIG_FILE')
+ self._test_EnvFilter_as_DnsMasq('CONFIG_FILE')
def test_DeprecatedDnsmasqFilter(self):
- self._test_DnsmasqFilter(filters.DeprecatedDnsmasqFilter, 'FLAGFILE')
+ self._test_EnvFilter_as_DnsMasq('FLAGFILE')
+
+ def test_EnvFilter(self):
+ envset = ['A=/some/thing', 'B=somethingelse']
+ envcmd = ['env'] + envset
+ realcmd = ['sleep', '10']
+ usercmd = envcmd + realcmd
+
+ f = filters.EnvFilter("env", "root", "A=", "B=ignored", "sleep")
+ # accept with leading env
+ self.assertTrue(f.match(envcmd + ["sleep"]))
+ # accept without leading env
+ self.assertTrue(f.match(envset + ["sleep"]))
+
+ # any other command does not match
+ self.assertFalse(f.match(envcmd + ["sleep2"]))
+ self.assertFalse(f.match(envset + ["sleep2"]))
+
+ # accept any trailing arguments
+ self.assertTrue(f.match(usercmd))
+
+ # require given environment variables to match
+ self.assertFalse(f.match([envcmd, 'C=ELSE']))
+ self.assertFalse(f.match(['env', 'C=xx']))
+ self.assertFalse(f.match(['env', 'A=xx']))
+
+ # require env command to be given
+ # (otherwise CommandFilters should match
+ self.assertFalse(f.match(realcmd))
+ # require command to match
+ self.assertFalse(f.match(envcmd))
+ self.assertFalse(f.match(envcmd[1:]))
+
+ # ensure that the env command is stripped when executing
+ self.assertEqual(f.exec_args(usercmd), realcmd)
+ env = f.get_environment(usercmd)
+ # check that environment variables are set
+ self.assertEqual(env.get('A'), '/some/thing')
+ self.assertEqual(env.get('B'), 'somethingelse')
+ self.assertFalse('sleep' in env.keys())
+
+ def test_EnvFilter_without_leading_env(self):
+ envset = ['A=/some/thing', 'B=somethingelse']
+ envcmd = ['env'] + envset
+ realcmd = ['sleep', '10']
+
+ f = filters.EnvFilter("sleep", "root", "A=", "B=ignored")
+
+ # accept without leading env
+ self.assertTrue(f.match(envset + ["sleep"]))
+
+ self.assertEqual(f.get_command(envcmd + realcmd), realcmd)
+ self.assertEqual(f.get_command(envset + realcmd), realcmd)
+
+ env = f.get_environment(envset + realcmd)
+ # check that environment variables are set
+ self.assertEqual(env.get('A'), '/some/thing')
+ self.assertEqual(env.get('B'), 'somethingelse')
+ self.assertFalse('sleep' in env.keys())
def test_KillFilter(self):
if not os.path.exists("/proc/%d" % os.getpid()):
@@ -169,6 +228,66 @@ class RootwrapTestCase(utils.BaseTestCase):
self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn])
self.assertTrue(f.match(usercmd))
+ def test_IpFilter_non_netns(self):
+ f = filters.IpFilter('/sbin/ip', 'root')
+ self.assertTrue(f.match(['ip', 'link', 'list']))
+
+ def _test_IpFilter_netns_helper(self, action):
+ f = filters.IpFilter('/sbin/ip', 'root')
+ self.assertTrue(f.match(['ip', 'link', action]))
+
+ def test_IpFilter_netns_add(self):
+ self._test_IpFilter_netns_helper('add')
+
+ def test_IpFilter_netns_delete(self):
+ self._test_IpFilter_netns_helper('delete')
+
+ def test_IpFilter_netns_list(self):
+ self._test_IpFilter_netns_helper('list')
+
+ def test_IpNetnsExecFilter_match(self):
+ f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
+ self.assertTrue(
+ f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
+
+ def test_IpNetnsExecFilter_nomatch(self):
+ f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
+ self.assertFalse(f.match(['ip', 'link', 'list']))
+
+ # verify that at least a NS is given
+ self.assertFalse(f.match(['ip', 'netns', 'exec']))
+
+ def test_IpNetnsExecFilter_nomatch_nonroot(self):
+ f = filters.IpNetnsExecFilter('/sbin/ip', 'user')
+ self.assertFalse(
+ f.match(['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']))
+
+ def test_match_filter_recurses_exec_command_filter_matches(self):
+ filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
+ filters.IpFilter('/sbin/ip', 'root')]
+ args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
+
+ self.assertIsNotNone(wrapper.match_filter(filter_list, args))
+
+ def test_match_filter_recurses_exec_command_matches_user(self):
+ filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
+ filters.IpFilter('/sbin/ip', 'user')]
+ args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
+
+ # Currently ip netns exec requires root, so verify that
+ # no non-root filter is matched, as that would escalate privileges
+ self.assertRaises(wrapper.NoFilterMatched,
+ wrapper.match_filter, filter_list, args)
+
+ def test_match_filter_recurses_exec_command_filter_does_not_match(self):
+ filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
+ filters.IpFilter('/sbin/ip', 'root')]
+ args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
+ 'ip', 'link', 'list']
+
+ self.assertRaises(wrapper.NoFilterMatched,
+ wrapper.match_filter, filter_list, args)
+
def test_exec_dirs_search(self):
# This test supposes you have /bin/cat or /usr/bin/cat locally
f = filters.CommandFilter("cat", "root")