#!/usr/bin/python import copy import json import logging import re import six def _(string): return string class InvalidRuleError(ValueError): pass class UndefinedValueError(ValueError): pass class StatementError(ValueError): pass class IllegalStateError(RuntimeError): pass RULE_FAIL = 0 RULE_SUCCESS = 1 BLOCK_CONTINUE = 2 STATEMENT_CONTINUE = 3 rule_result_names = { RULE_FAIL: 'RULE_FAIL', RULE_SUCCESS: 'RULE_SUCCESS', BLOCK_CONTINUE: 'BLOCK_CONTINUE', STATEMENT_CONTINUE: 'STATEMENT_CONTINUE', } def rule_result_name(result): return rule_result_names.get(result, "unknown") # # Reserved variables # ASSERTION = 'assertion' RULE_NUMBER = 'rule_number' RULE_NAME = 'rule_name' BLOCK_NUMBER = 'block_number' BLOCK_NAME = 'block_name' STATEMENT_NUMBER = 'statement_number' REGEXP_ARRAY_VARIABLE = 'regexp_array' REGEXP_MAP_VARIABLE = 'regexp_map' class Token(object): # Regexp to identify a variable beginning with $ # Supports array notation, e.g. $foo[bar] # Optional delimiting braces may be used to separate variable from # surrounding text. # # Examples: $foo ${foo} $foo[bar] ${foo[bar]} # where foo is the variable name and bar is the array index. # # Identifer is any alphabetic followed by alphanumeric or underscore VARIABLE_PAT = (r'(?' self.statement_id_format = ('') if isinstance(rules, basestring): rules = json.loads(rules) self.rules = rules if mappings is None: self.mappings = {} else: self.mappings = mappings def rule_id(self, namespace): return self.substitute_variables(self.rule_id_format, namespace) def statement_id(self, namespace): return self.substitute_variables(self.statement_id_format, namespace) # FIXME, not used def to_string(self, value): Token.classify(value) # raises TypeError if not supported type return json.dumps(value) def substitute_variables(self, string, namespace): def get_replacement(match): token = Token(match.group(0), namespace) token.load() if token.type == Token.TYPE_STRING: replacement = token.value else: replacement = six.text_type(token.value) return replacement return Token.VARIABLE_RE.sub(get_replacement, string) # FIXME, should we be passing namespace? Just used for rule_id def get_mapping(self, namespace, rule): mapping = rule.get('mapping') if mapping is not None: self.log.debug("using mapping local to rule %s mapping=%s", self.rule_id(namespace), mapping) return mapping mapping_name = rule.get('mapping_name') if mapping_name is None: raise InvalidRuleError("%s rule does not define mapping nor mapping_name unable to load mapping" % (self.rule_id(namespace))) mapping = self.mappings.get(mapping_name) if mapping is None: raise InvalidRuleError("%s rule specifies mapping_name '%s' but a mapping by that name does not exist, unable to load mapping" % (self.rule_id(namespace))) self.log.debug("using named mapping '%s' from rule %s mapping=%s", mapping_name, self.rule_id(namespace), mapping) return mapping def get_verb(self, statement): if len(statement) < 1: raise InvalidRuleError("statement has no verb") try: verb = Token(statement[0], None) except Exception as exc: raise InvalidRuleError("statement first member (i.e. verb) error %s" % (exc)) if verb.type != Token.TYPE_STRING: raise InvalidRuleError("statement first member (i.e. verb) must be a string, not %s" % (verb.type_name)) return verb.value.lower() def get_token(self, verb, statement, index, namespace, storage_type=None, token_types=None): try: item = statement[index] except IndexError: raise InvalidRuleError("verb '%s' requires at least %d items but only %d are available." % (verb, index+1, len(statement))) try: token = Token(item, namespace) except Exception as exc: raise StatementError("parameter %d, %s" % (index, exc)) if storage_type is not None: if token.storage_type not in storage_type: raise TypeError("verb '%s' requires parameter #%d to have storage types %s not %s. statement=%s" % (verb, index, [Token.get_storage_type_name(x) for x in storage_type], token.storage_type_name, statement)) if token_types is not None: try: token.load() # Note, Token.load() sets the Token.type except UndefinedValueError: # OK if not yet defined pass if token.type not in token_types: raise TypeError("verb '%s' requires parameter #%d to have types %s, not %s. statement=%s" % (verb, index, [Token.get_type_name(x) for x in sorted(token_types)], token.type_name, statement)) return token def get_parameter(self, verb, statement, index, namespace, token_types=None): try: item = statement[index] except IndexError: raise InvalidRuleError("verb '%s' requires at least %d items but only %d are available." % (verb, index+1, len(statement))) try: token = Token(item, namespace) except Exception as exc: raise StatementError("parameter %d, %s" % (index, exc)) if token_types is not None: token.get() # Note, Token.get() sets the Token.type if token.type not in token_types: raise TypeError("verb '%s' requires parameter #%d to have types %s, not %s. statement=%s" % (verb, index, [Token.get_type_name(x) for x in sorted(token_types)], token.type_name, statement)) token.load() return token def get_raw_parameter(self, verb, statement, index, token_types=None): try: item = statement[index] except IndexError: raise InvalidRuleError("verb '%s' requires at least %d items but only %d are available." % (verb, index+1, len(statement))) if token_types is not None: item_type = Token.classify(item) if item_type not in token_types: raise TypeError("verb '%s' requires parameter #%d to have types %s, not %s. statement=%s" % (verb, index, [Token.get_type_name(x) for x in sorted(token_types)], Token.get_type_name(item_type), statement)) return item def get_variable(self, verb, statement, index, namespace): try: item = statement[index] except IndexError: raise InvalidRuleError("verb '%s' requires at least %d items but only %d are available." % (verb, index+1, len(statement))) try: token = Token(item, namespace) except Exception as exc: raise StatementError("parameter %d, %s" % (index, exc)) if token.storage_type != Token.STORAGE_TYPE_VARIABLE: raise TypeError("verb '%s' requires parameter #%d to be a variable not %s. statement=%s" % (verb, index, token.storage_type_name, statement)) return token def process(self, assertion): self.success = True for rule_number, rule in enumerate(self.rules): namespace = {} namespace[RULE_NUMBER] = rule_number namespace[RULE_NAME] = '' namespace[ASSERTION] = copy.deepcopy(assertion) try: result = self.process_rule(namespace, rule) except Exception as exc: self.log.error("%s", exc) # FIXME log.exception? raise if result == RULE_SUCCESS: mapped = {} mapping = self.get_mapping(namespace, rule) for k, v in mapping.iteritems(): try: token = Token(v, namespace) new_value = token.get() except Exception as e: raise InvalidRule("%s unable to get value for mapping %s=%s, %s" % (self.rule_id(namespace), k, v, e)) mapped[k] = new_value return mapped return None def process_rule(self, namespace, rule): statement_blocks = rule.get('statement_blocks') if statement_blocks is None: raise InvalidRuleError("rule missing 'statement_blocks'") result = BLOCK_CONTINUE for block_number, block in enumerate(statement_blocks): namespace[BLOCK_NUMBER] = block_number namespace[BLOCK_NAME] = '' result = self.process_block(namespace, block) if result in (RULE_SUCCESS, RULE_FAIL): break elif result == BLOCK_CONTINUE: continue else: raise ValueError("%s unexpected block result: %s" % (self.statement_id(namespace), result)) if result in (RULE_SUCCESS, BLOCK_CONTINUE): return RULE_SUCCESS else: return RULE_FAIL def process_block(self, namespace, statements): result = STATEMENT_CONTINUE for statement_number, statement in enumerate(statements): namespace[STATEMENT_NUMBER] = statement_number try: result = self.process_statement(namespace, statement) except Exception as exc: raise StatementError("%s statement=%s %s" % (self.statement_id(namespace), statement, exc)) if result in (BLOCK_CONTINUE, RULE_SUCCESS, RULE_FAIL): break elif result == STATEMENT_CONTINUE: continue else: raise ValueError("%s unexpected statement result: %s" % (self.statement_id(namespace), result)) if result == STATEMENT_CONTINUE: result = BLOCK_CONTINUE return result def process_statement(self, namespace, statement): result = STATEMENT_CONTINUE verb = self.get_verb(statement) if verb == 'set': result = self.verb_set(verb, namespace, statement) elif verb == 'length': result = self.verb_length(verb, namespace, statement) elif verb == 'interpolate': result = self.verb_interpolate(verb, namespace, statement) elif verb == 'append': result = self.verb_append(verb, namespace, statement) elif verb == 'unique': result = self.verb_unique(verb, namespace, statement) elif verb == 'split': result = self.verb_split(verb, namespace, statement) elif verb == 'join': result = self.verb_join(verb, namespace, statement) elif verb == 'lower': result = self.verb_lower(verb, namespace, statement) elif verb == 'upper': result = self.verb_upper(verb, namespace, statement) elif verb == 'in': result = self.verb_in(verb, namespace, statement) elif verb == 'not_in': result = self.verb_not_in(verb, namespace, statement) elif verb == 'compare': result = self.verb_compare(verb, namespace, statement) elif verb == 'regexp': result = self.verb_regexp(verb, namespace, statement) elif verb == 'regexp_replace': result = self.verb_regexp_replace(verb, namespace, statement) elif verb == 'exit': result = self.verb_exit(verb, namespace, statement) elif verb == 'continue': result = self.verb_continue(verb, namespace, statement) else: raise InvalidRuleError("unknown verb '%s'" % (verb)) return result def verb_set(self, verb, namespace, statement): variable = self.get_variable(verb, statement, 1, namespace) parameter = self.get_parameter(verb, statement, 2, namespace) variable.set(parameter.value) self.success = True if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s variable: %s=%s", self.statement_id(namespace), verb, self.success, variable, variable.get()) return STATEMENT_CONTINUE def verb_length(self, verb, namespace, statement): variable = self.get_variable(verb, statement, 1, namespace) parameter = self.get_parameter(verb, statement, 2, namespace, set([Token.TYPE_ARRAY, Token.TYPE_MAP, Token.TYPE_STRING])) try: length = len(parameter.value) except Exception as exc: raise ValueError("verb '%s' failed, variable='%s' parameter='%s': %s" % (verb, variable, parameter.value, exc)) variable.set(length) self.success = True if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s variable: %s=%s parameter=%s", self.statement_id(namespace), verb, self.success, variable, variable.get(), parameter.value) return STATEMENT_CONTINUE def verb_interpolate(self, verb, namespace, statement): variable = self.get_variable(verb, statement, 1, namespace) string = self.get_raw_parameter(verb, statement, 2, set([Token.TYPE_STRING])) try: new_value = self.substitute_variables(string, namespace) except Exception as exc: raise ValueError("verb '%s' failed, variable='%s' string='%s': %s" % (verb, variable, string, exc)) variable.set(new_value) self.success = True if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s variable: %s=%s string='%s'", self.statement_id(namespace), verb, self.success, variable, variable.get(), string) return STATEMENT_CONTINUE def verb_append(self, verb, namespace, statement): variable = self.get_token(verb, statement, 1, namespace, set([Token.STORAGE_TYPE_VARIABLE]), set([Token.TYPE_ARRAY])) item = self.get_parameter(verb, statement, 2, namespace) try: variable.get().append(item.value) except Exception as exc: raise ValueError("verb '%s' failed, variable='%s' item='%s': %s" % (verb, variable, item.value, exc)) self.success = True if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s variable: %s=%s item=%s", self.statement_id(namespace), verb, self.success, variable, variable.get(), item.value) return STATEMENT_CONTINUE def verb_unique(self, verb, namespace, statement): variable = self.get_variable(verb, statement, 1, namespace) array = self.get_parameter(verb, statement, 2, namespace, set([Token.TYPE_ARRAY])) seen = set() new_value = [] for member in array.value: if member in seen: continue new_value.append(member) seen.add(member) variable.set(new_value) self.success = True if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s variable: %s=%s array=%s", self.statement_id(namespace), verb, self.success, variable, variable.get(), array.value) return STATEMENT_CONTINUE def verb_split(self, verb, namespace, statement): variable = self.get_variable(verb, statement, 1, namespace) string = self.get_parameter(verb, statement, 2, namespace, set([Token.TYPE_STRING])) pattern = self.get_parameter(verb, statement, 3, namespace, set([Token.TYPE_STRING])) try: new_value = re.split(pattern.value, string.value) except Exception as exc: raise ValueError("verb '%s' failed, pattern='%s' string='%s': %s" % (verb, pattern.value, string.value, exc)) variable.set(new_value) self.success = True if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s variable: %s=%s string='%s' pattern='%s'", self.statement_id(namespace), verb, self.success, variable, variable.get(), string.value, pattern.value) return STATEMENT_CONTINUE def verb_join(self, verb, namespace, statement): variable = self.get_variable(verb, statement, 1, namespace) array = self.get_parameter(verb, statement, 2, namespace, set([Token.TYPE_ARRAY])) conjunction = self.get_parameter(verb, statement, 3, namespace, set([Token.TYPE_STRING])) try: new_value = conjunction.value.join(array.value) except Exception as exc: raise ValueError("verb '%s' failed, array=%s conjunction='%s'': %s" % (verb, array.value, conjunction.value, exc)) variable.set(new_value) self.success = True if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s variable: %s=%s array=%s conjunction='%s'", self.statement_id(namespace), verb, self.success, variable, variable.get(), array.value, conjunction.value) return STATEMENT_CONTINUE def verb_lower(self, verb, namespace, statement): variable = self.get_variable(verb, statement, 1, namespace) parameter = self.get_parameter(verb, statement, 2, namespace, token_types=set([Token.TYPE_STRING, Token.TYPE_ARRAY, Token.TYPE_MAP])) try: if parameter.type == Token.TYPE_STRING: new_value = parameter.value.lower() elif parameter.type == Token.TYPE_ARRAY: new_value = [x.lower() for x in parameter.value] elif parameter.type == Token.TYPE_MAP: new_value = dict((k.lower(), v) for k, v in parameter.value.iteritems()) else: raise IllegalStateError("unexpected token type: %s" % (parameter.type_name)) except Exception as exc: raise ValueError("verb '%s' failed, variable='%s' parameter='%s': %s" % (verb, variable, parameter.value, exc)) variable.set(new_value) self.success = True if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s variable: %s=%s parameter=%s", self.statement_id(namespace), verb, self.success, variable, variable.get(), parameter) return STATEMENT_CONTINUE def verb_upper(self, verb, namespace, statement): variable = self.get_variable(verb, statement, 1, namespace) parameter = self.get_parameter(verb, statement, 2, namespace, token_types=set([Token.TYPE_STRING, Token.TYPE_ARRAY, Token.TYPE_MAP])) try: if parameter.type == Token.TYPE_STRING: new_value = parameter.value.upper() elif parameter.type == Token.TYPE_ARRAY: new_value = [x.upper() for x in parameter.value] elif parameter.type == Token.TYPE_MAP: new_value = dict((k.upper(), v) for k, v in parameter.value.iteritems()) else: raise IllegalStateError("unexpected token type: %s" % (parameter.type_name)) except Exception as exc: raise ValueError("verb '%s' failed, variable='%s' parameter='%s': %s" % (verb, variable, parameter.value, exc)) variable.set(new_value) self.success = True if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s variable: %s=%s parameter=%s", self.statement_id(namespace), verb, self.success, variable, variable.get(), parameter) return STATEMENT_CONTINUE def verb_in(self, verb, namespace, statement): member = self.get_parameter(verb, statement, 1, namespace) collection = self.get_parameter(verb, statement, 2, namespace, token_types=set([Token.TYPE_ARRAY, Token.TYPE_MAP, Token.TYPE_STRING])) try: self.success = member.value in collection.value except Exception as exc: raise ValueError("verb '%s' failed, member='%s' collection='%s': %s" % (verb, member.value, collection.value, exc)) if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s member=%s collection=%s", self.statement_id(namespace), verb, self.success, member.value, collection.value) return STATEMENT_CONTINUE def verb_not_in(self, verb, namespace, statement): member = self.get_parameter(verb, statement, 1, namespace) collection = self.get_parameter(verb, statement, 2, namespace, token_types=set([Token.TYPE_ARRAY, Token.TYPE_MAP, Token.TYPE_STRING])) try: self.success = member.value not in collection.value except Exception as exc: raise ValueError("verb '%s' failed, member='%s' collection='%s': %s" % (verb, member.value, collection.value, exc)) if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s member=%s collection=%s", self.statement_id(namespace), verb, self.success, member.value, collection.value) return STATEMENT_CONTINUE def verb_compare(self, verb, namespace, statement): left = self.get_parameter(verb, statement, 1, namespace) op = self.get_parameter(verb, statement, 2, namespace, set([Token.TYPE_STRING])) right = self.get_parameter(verb, statement, 3, namespace) if left.type != right.type: raise TypeError("verb '%s' both items must have the same type left is %s and right is %s" % (verb, left.type_name, right.type_name)) try: if op.value == '==': self.success = left.value == right.value elif op.value == '!=': self.success = left.value != right.value elif op.value == '<': self.success = left.value < right.value elif op.value == '<=': self.success = left.value <= right.value elif op.value == '>': self.success = left.value > right.value elif op.value == '>=': self.success = left.value >= right.value else: raise InvalidRuleError("verb '%s' has unknown comparison operator '%s'" % (verb, op.value)) except Exception as exc: self.success = False raise ValueError("verb '%s' failed, left=%s op='%s' right=%s, %s" % (verb, left.value, op.value, right.value, exc)) if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s left=%s op='%s' right=%s", self.statement_id(namespace), verb, self.success, left.value, op.value, right.value) return STATEMENT_CONTINUE def verb_regexp(self, verb, namespace, statement): string = self.get_parameter(verb, statement, 1, namespace, set([Token.TYPE_STRING])) pattern = self.get_parameter(verb, statement, 2, namespace, set([Token.TYPE_STRING])) try: match = re.search(pattern.value, string.value) except Exception as exc: self.success = False raise ValueError("verb '%s' failed, string='%s' pattern='%s', %s" % (verb, string.value, pattern.value, exc)) if match: self.success = True # Note, match.groups() returns a tuple # containing all the subgroups of the match, # from 1 up to however many groups are in the # pattern. But we want to allow zero-based # indexing as well as access to group 0, # therefore we insert group 0 at the head of # the list. result = list(match.groups()) result.insert(0, match.group(0)) namespace[REGEXP_ARRAY_VARIABLE] = result namespace[REGEXP_MAP_VARIABLE] = match.groupdict() else: self.success = False namespace[REGEXP_ARRAY_VARIABLE] = [] namespace[REGEXP_MAP_VARIABLE] = {} if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s string='%s' pattern='%s' %s=%s %s=%s", self.statement_id(namespace), verb, self.success, string.value, pattern.value, REGEXP_ARRAY_VARIABLE, namespace[REGEXP_ARRAY_VARIABLE], REGEXP_MAP_VARIABLE, namespace[REGEXP_MAP_VARIABLE]) return STATEMENT_CONTINUE def verb_regexp_replace(self, verb, namespace, statement): variable = self.get_variable(verb, statement, 1, namespace,) string = self.get_parameter(verb, statement, 2, namespace, set([Token.TYPE_STRING])) pattern = self.get_parameter(verb, statement, 3, namespace, set([Token.TYPE_STRING])) replacement = self.get_parameter(verb, statement, 4, namespace, set([Token.TYPE_STRING])) try: new_value = re.sub(pattern.value, replacement.value, string.value) except Exception as exc: self.success = False raise ValueError("verb '%s' verb failed, pattern='%s' replacement='%s', %s" % (verb, pattern.value, replacement.value, exc)) else: variable.set(new_value) self.success = True if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s variable: %s=%s string='%s' pattern='%s' replacement='%s'", self.statement_id(namespace), verb, self.success, variable, variable.get(), string.value, pattern.value, replacement.value) return STATEMENT_CONTINUE def verb_exit(self, verb, namespace, statement): statement_result = STATEMENT_CONTINUE exit_status_param = self.get_parameter(verb, statement, 1, namespace, set([Token.TYPE_STRING])) criteria_param = self.get_parameter(verb, statement, 2, namespace, set([Token.TYPE_STRING])) exit_status = exit_status_param.value.lower() criteria = criteria_param.value.lower() if exit_status == 'rule_succeeds': result = RULE_SUCCESS elif exit_status == 'rule_fails': result = RULE_FAIL else: raise InvalidRuleError("verb='%s' unknown exit status '%s'" % (verb, exit_status)) if criteria == 'if_success': if self.success: do_exit = True else: do_exit = False elif criteria == 'if_not_success': if not self.success: do_exit = True else: do_exit = False elif criteria == 'always': do_exit = True elif criteria == 'never': do_exit = False else: raise InvalidRuleError("verb='%s' unknown exit criteria '%s'" % (verb, criteria)) if do_exit: statement_result = result if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s status=%s criteria=%s exiting=%s result=%s", self.statement_id(namespace), verb, self.success, exit_status, criteria, do_exit, rule_result_name(statement_result)) return statement_result def verb_continue(self, verb, namespace, statement): statement_result = STATEMENT_CONTINUE criteria_param = self.get_parameter(verb, statement, 1, namespace, set([Token.TYPE_STRING])) criteria = criteria_param.value.lower() if criteria == 'if_success': if self.success: do_continue = True else: do_continue = False elif criteria == 'if_not_success': if not self.success: do_continue = True else: do_continue = False elif criteria == 'always': do_continue = True elif criteria == 'never': do_continue = False else: raise InvalidRuleError("verb='%s' unknown continue criteria '%s'" % (verb, criteria)) if do_continue: statement_result = BLOCK_CONTINUE if self.log.isEnabledFor(logging.DEBUG): self.log.debug("%s verb='%s' success=%s criteria=%s continuing=%s result=%s", self.statement_id(namespace), verb, self.success, criteria, do_continue, rule_result_name(statement_result)) return statement_result