diff options
Diffstat (limited to 'openstack/common/rootwrap/filters.py')
-rw-r--r-- | openstack/common/rootwrap/filters.py | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/openstack/common/rootwrap/filters.py b/openstack/common/rootwrap/filters.py index 0cc55ce..dfec412 100644 --- a/openstack/common/rootwrap/filters.py +++ b/openstack/common/rootwrap/filters.py @@ -235,3 +235,116 @@ class ReadFileFilter(CommandFilter): if len(userargs) != 2: return False return True + + +class IpFilter(CommandFilter): + """Specific filter for the ip utility to that does not match exec.""" + + def match(self, userargs): + if userargs[0] == 'ip': + if userargs[1] == 'netns': + return (userargs[2] in ('list', 'add', 'delete')) + else: + return True + + +class EnvFilter(CommandFilter): + """Specific filter for the env utility. + + Behaves like CommandFilter, except that it handles + leading env A=B.. strings appropriately. + """ + + def _extract_env(self, arglist): + """Extract all leading NAME=VALUE arguments from arglist.""" + + envs = set() + for arg in arglist: + if '=' not in arg: + break + envs.add(arg.partition('=')[0]) + return envs + + def __init__(self, exec_path, run_as, *args): + super(EnvFilter, self).__init__(exec_path, run_as, *args) + + env_list = self._extract_env(self.args) + # Set exec_path to X when args are in the form of + # env A=a B=b C=c X Y Z + if "env" in exec_path and len(env_list) < len(self.args): + self.exec_path = self.args[len(env_list)] + + def match(self, userargs): + # ignore leading 'env' + if userargs[0] == 'env': + userargs.pop(0) + + # require one additional argument after configured ones + if len(userargs) < len(self.args): + return False + + # extract all env args + user_envs = self._extract_env(userargs) + filter_envs = self._extract_env(self.args) + user_command = userargs[len(user_envs):len(user_envs) + 1] + + # match first non-env argument with CommandFilter + return (super(EnvFilter, self).match(user_command) + and len(filter_envs) and user_envs == filter_envs) + + def exec_args(self, userargs): + args = userargs[:] + + # ignore leading 'env' + if args[0] == 'env': + args.pop(0) + + # Throw away leading NAME=VALUE arguments + while args and '=' in args[0]: + args.pop(0) + + return args + + def get_command(self, userargs, exec_dirs=[]): + to_exec = self.get_exec(exec_dirs=exec_dirs) or self.exec_path + return [to_exec] + self.exec_args(userargs)[1:] + + def get_environment(self, userargs): + env = os.environ.copy() + + # ignore leading 'env' + if userargs[0] == 'env': + userargs.pop(0) + + # Handle leading NAME=VALUE pairs + for a in userargs: + env_name, equals, env_value = a.partition('=') + if not equals: + break + if env_name and env_value: + env[env_name] = env_value + + return env + + +class ChainingFilter(CommandFilter): + def exec_args(self, userargs): + return [] + + +class IpNetnsExecFilter(ChainingFilter): + """Specific filter for the ip utility to that does match exec.""" + + def match(self, userargs): + # Network namespaces currently require root + # require <ns> argument + if self.run_as != "root" or len(userargs) < 4: + return False + + return (userargs[:3] == ['ip', 'netns', 'exec']) + + def exec_args(self, userargs): + args = userargs[4:] + if args: + args[0] = os.path.basename(args[0]) + return args |