From 12e264d58f052f192f3408f5cd8637809eff085b Mon Sep 17 00:00:00 2001 From: Thierry Carrez Date: Fri, 16 Nov 2012 15:50:01 +0100 Subject: Configurable exec_dirs to find rootwrap commands Adds support for a configurable set of trusted directories to search executables in (exec_dirs), which defaults to system PATH. If your filter specifies an exec_path that doesn't start with '/', then it will be searched in exec_dirs. Avoids having to write multiple filters to care for distro differences. Fixes bug 1079723. Also returns a specific error rather than try to run absent executables. Change-Id: Idab03bb0be6832a75ffeed4e78d25d0543f5caf9 --- nova/rootwrap/filters.py | 29 ++++++++++++++++++++++++----- nova/rootwrap/wrapper.py | 38 +++++++++++++++++++++++++++++--------- nova/tests/test_nova_rootwrap.py | 20 ++++++++++++++------ 3 files changed, 67 insertions(+), 20 deletions(-) (limited to 'nova') diff --git a/nova/rootwrap/filters.py b/nova/rootwrap/filters.py index 46a812e5d..a3e5f1c3c 100644 --- a/nova/rootwrap/filters.py +++ b/nova/rootwrap/filters.py @@ -26,6 +26,23 @@ class CommandFilter(object): self.exec_path = exec_path self.run_as = run_as self.args = args + self.real_exec = None + + def get_exec(self, exec_dirs=[]): + """Returns existing executable, or empty string if none found""" + if self.real_exec is not None: + return self.real_exec + self.real_exec = "" + if self.exec_path.startswith('/'): + if os.access(self.exec_path, os.X_OK): + self.real_exec = self.exec_path + else: + for binary_path in exec_dirs: + expanded_path = os.path.join(binary_path, self.exec_path) + if os.access(expanded_path, os.X_OK): + self.real_exec = expanded_path + break + return self.real_exec def match(self, userargs): """Only check that the first argument (command) matches exec_path""" @@ -33,12 +50,13 @@ class CommandFilter(object): return True return False - def get_command(self, userargs): + def get_command(self, userargs, exec_dirs=[]): """Returns command to execute (with sudo -u if run_as != root).""" + to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path if (self.run_as != 'root'): # Used to run commands at lesser privileges - return ['sudo', '-u', self.run_as, self.exec_path] + userargs[1:] - return [self.exec_path] + userargs[1:] + return ['sudo', '-u', self.run_as, to_exec] + userargs[1:] + return [to_exec] + userargs[1:] def get_environment(self, userargs): """Returns specific environment to set, None if none""" @@ -82,9 +100,10 @@ class DnsmasqFilter(CommandFilter): return True return False - def get_command(self, userargs): + def get_command(self, userargs, exec_dirs=[]): + to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path dnsmasq_pos = userargs.index('dnsmasq') - return [self.exec_path] + userargs[dnsmasq_pos + 1:] + return [to_exec] + userargs[dnsmasq_pos + 1:] def get_environment(self, userargs): env = os.environ.copy() diff --git a/nova/rootwrap/wrapper.py b/nova/rootwrap/wrapper.py index 3dd7ee7e3..742f23b14 100644 --- a/nova/rootwrap/wrapper.py +++ b/nova/rootwrap/wrapper.py @@ -23,6 +23,20 @@ import string from nova.rootwrap import filters +class NoFilterMatched(Exception): + """This exception is raised when no filter matched.""" + pass + + +class FilterMatchNotExecutable(Exception): + """ + This exception is raised when a filter matched but no executable was + found. + """ + def __init__(self, match=None, **kwargs): + self.match = match + + def build_filter(class_name, *args): """Returns a filter object of class class_name""" if not hasattr(filters, class_name): @@ -50,23 +64,29 @@ def load_filters(filters_path): return filterlist -def match_filter(filters, userargs): +def match_filter(filters, userargs, exec_dirs=[]): """ Checks user command and arguments through command filters and - returns the first matching filter, or None is none matched. + returns the first matching filter. + Raises NoFilterMatched if no filter matched. + Raises FilterMatchNotExecutable if no executable was found for the + best filter match. """ - - found_filter = None + first_not_executable_filter = None for f in filters: if f.match(userargs): # Try other filters if executable is absent - if not os.access(f.exec_path, os.X_OK): - if not found_filter: - found_filter = f + if not f.get_exec(exec_dirs=exec_dirs): + if not first_not_executable_filter: + first_not_executable_filter = f continue # Otherwise return matching filter for execution return f - # No filter matched or first missing executable - return found_filter + if first_not_executable_filter: + # A filter matched, but no executable was found for it + raise FilterMatchNotExecutable(match=first_not_executable_filter) + + # No filter matched + raise NoFilterMatched() diff --git a/nova/tests/test_nova_rootwrap.py b/nova/tests/test_nova_rootwrap.py index 135a5e46e..1dfd57a72 100644 --- a/nova/tests/test_nova_rootwrap.py +++ b/nova/tests/test_nova_rootwrap.py @@ -43,16 +43,16 @@ class RootwrapTestCase(test.TestCase): def test_RegExpFilter_reject(self): usercmd = ["ls", "root"] - filtermatch = wrapper.match_filter(self.filters, usercmd) - self.assertTrue(filtermatch is None) + self.assertRaises(wrapper.NoFilterMatched, + wrapper.match_filter, self.filters, usercmd) def test_missing_command(self): valid_but_missing = ["foo_bar_not_exist"] invalid = ["foo_bar_not_exist_and_not_matched"] - filtermatch = wrapper.match_filter(self.filters, valid_but_missing) - self.assertTrue(filtermatch is not None) - filtermatch = wrapper.match_filter(self.filters, invalid) - self.assertTrue(filtermatch is None) + self.assertRaises(wrapper.FilterMatchNotExecutable, + wrapper.match_filter, self.filters, valid_but_missing) + self.assertRaises(wrapper.NoFilterMatched, + wrapper.match_filter, self.filters, invalid) def _test_DnsmasqFilter(self, filter_class, config_file_arg): usercmd = ['env', config_file_arg + '=A', 'NETWORK_ID=foobar', @@ -136,6 +136,14 @@ class RootwrapTestCase(test.TestCase): self.assertEqual(f.get_command(usercmd), ['/bin/cat', goodfn]) self.assertTrue(f.match(usercmd)) + def test_exec_dirs_search(self): + # This test supposes you have /bin/cat or /usr/bin/cat locally + f = filters.CommandFilter("cat", "root") + usercmd = ['cat', '/f'] + self.assertTrue(f.match(usercmd)) + self.assertTrue(f.get_command(usercmd, exec_dirs=['/bin', + '/usr/bin']) in (['/bin/cat', '/f'], ['/usr/bin/cat', '/f'])) + def test_skips(self): # Check that all filters are skipped and that the last matches usercmd = ["cat", "/"] -- cgit