summaryrefslogtreecommitdiffstats
path: root/safety/safety.py
diff options
context:
space:
mode:
Diffstat (limited to 'safety/safety.py')
-rwxr-xr-xsafety/safety.py232
1 files changed, 232 insertions, 0 deletions
diff --git a/safety/safety.py b/safety/safety.py
new file mode 100755
index 00000000..4a2094f9
--- /dev/null
+++ b/safety/safety.py
@@ -0,0 +1,232 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim: noet sw=4 ts=4 enc=utf-8
+"A static safety-checker for SystemTap modules."
+
+# in python 2.4, set & frozenset are builtins
+# in python 2.3, the equivalents live in the 'sets' module
+from sys import hexversion as __hexversion
+if __hexversion < 0x020400f0:
+ from sets import Set as set, ImmutableSet as frozenset
+
+
+def main(argv):
+ """
+ CLI to the SystemTap static safety-checker.
+
+ Provides a command-line interface for running the SystemTap module
+ safety checker. Use '-h' or '--help' for a description of the
+ command-line options.
+
+ Returns the number of modules that failed the check.
+ """
+ bad = 0
+ (options, args) = __parse_args(argv[1:])
+ safe = StaticSafety(options.arch, options.release, options.datapath)
+ for m in args:
+ if not safe.check_module(m):
+ bad += 1
+ return bad
+
+
+def __parse_args(argv):
+ from optparse import OptionParser
+ parser = OptionParser(usage="usage: %prog [options] [module]...",
+ description=__doc__)
+ parser.add_option('--data-path', dest='datapath', metavar='PATH',
+ help='specify the whitelist data files [default: <script-dir>/data]')
+ parser.add_option('-m', '--machine', '--architecture', dest='arch',
+ help='specify the machine architecture of the target')
+ parser.add_option('-r', '--kernel-release', dest='release',
+ help='specify the kernel release running on the target')
+ return parser.parse_args(argv)
+
+
+class StaticSafety:
+ "Manage a safety-checking session."
+
+ def __init__(self, arch=None, release=None, datapath=None):
+ from os import uname
+ self.__arch = arch or uname()[4]
+ self.__release = release or uname()[2]
+ self.__build_data_path(datapath)
+ self.__build_search_suffixes()
+ self.__load_allowed_references()
+ self.__load_allowed_opcodes()
+
+ def __build_data_path(self, datapath):
+ "Determine where the data directory resides."
+ from sys import argv
+ from os.path import dirname, isdir, realpath
+ if datapath is None:
+ local = dirname(realpath(argv[0]))
+ self.__data_path = local + '/data'
+ else:
+ self.__data_path = datapath
+
+ if not isdir(self.__data_path):
+ raise StandardError(
+ "Can't find the data directory! (looking in %s)"
+ % self.__data_path)
+
+ def __build_search_suffixes(self):
+ "Construct arch & kernel-versioning search suffixes."
+ ss = set()
+
+ # add empty string
+ ss.add('')
+
+ # add architecture search path
+ archsfx = '-%s' % self.__arch
+ ss.add(archsfx)
+
+ # add full kernel-version-release (2.6.NN-FOOBAR) + arch
+ relsfx = '-%s' % self.__release
+ ss.add(relsfx)
+ ss.add(relsfx + archsfx)
+
+ # add kernel version (2.6.NN) + arch
+ dash_i = relsfx.rfind('-')
+ if dash_i > 0:
+ ss.add(relsfx[:dash_i])
+ ss.add(relsfx[:dash_i] + archsfx)
+
+ # start dropping decimals
+ dot_i = relsfx.rfind('.', 0, dash_i)
+ while dot_i > 0:
+ ss.add(relsfx[:dot_i])
+ ss.add(relsfx[:dot_i] + archsfx)
+ dot_i = relsfx.rfind('.', 0, dot_i)
+
+ self.__search_suffixes = frozenset(ss)
+
+ def __load_allowed_references(self):
+ "Build the list of allowed external references from the data files."
+ wr = set()
+ for sfx in self.__search_suffixes:
+ try:
+ refs = open(self.__data_path + '/references' + sfx)
+ for line in refs:
+ wr.add(line.rstrip())
+ refs.close()
+ except IOError:
+ pass
+ if not len(wr):
+ raise StandardError("No whitelisted references found!")
+ self.__white_references = frozenset(wr)
+
+ def __load_allowed_opcodes(self):
+ "Build the regular expression matcher for allowed opcodes from the data files."
+ from re import compile
+ wo = []
+ for sfx in self.__search_suffixes:
+ try:
+ opcs = open(self.__data_path + '/opcodes' + sfx)
+ for line in opcs:
+ wo.append(line.rstrip())
+ opcs.close()
+ except IOError:
+ pass
+ if not len(wo):
+ raise StandardError("No whitelisted opcodes found!")
+ self.__white_opcodes_re = compile(r'^(?:' + r'|'.join(wo) + r')$')
+
+ def __check_references(self, module):
+ "Check that all unresolved references in the module are allowed."
+ from os import popen
+ from re import compile
+
+ sym_re = compile(r'^([\w@.]+) [Uw]\s+$')
+ def check(line):
+ m = sym_re.match(line)
+ if m:
+ ref = m.group(1)
+ if ref not in self.__white_references:
+ print 'ERROR: Invalid reference to %s' % ref
+ return False
+ return True
+ print 'WARNING: Unmatched line:\n %s' % `line`
+ return True
+
+ command = 'nm --format=posix --no-sort --undefined-only ' + `module`
+ ok = True
+ nm = popen(command)
+ for line in nm:
+ ok &= check(line)
+ if nm.close():
+ ok = False
+ return ok
+
+ def __check_opcodes(self, module):
+ "Check that all disassembled opcodes in the module are allowed."
+ from os import popen
+ from re import compile
+
+ skip_ud2a = [0]
+
+ ignore_re = compile(r'^$|^\s+\.{3}$|^.*Disassembly of section|^.*file format')
+ opc = r'(?:(?:lock )|(?:repn?[ze]? )|(?:rex\w+ ))*(\w+)\b'
+ opc_re = compile(r'^[A-Fa-f\d]+ <([^>]+)> %s' % opc)
+ def check(line):
+ m = ignore_re.match(line)
+ if m:
+ return True
+ m = opc_re.match(line)
+ if m:
+ loc, opc = m.groups()
+ if opc == 'ud2a':
+ # The kernel abuses ud2a for BUG checks by following it
+ # directly with __LINE__ and __FILE__. Objdump doesn't
+ # know this though, so it tries to interpret the data as
+ # real instructions. Because x86(-64) instructions are
+ # variable-length, it's hard to tell when objdump is synced
+ # up again. We'll fast-forward to the next function
+ # boundary and hope things are better there.
+ for skip in objdump:
+ mskip = opc_re.match(skip)
+ if mskip:
+ locskip = mskip.group(1)
+ # a loc without an offset marks a new function
+ if '+' not in locskip:
+ return check(skip)
+ skip_ud2a[0] += 1
+ return True
+ elif not self.__white_opcodes_re.match(opc):
+ print "ERROR: Invalid opcode '%s' at <%s>" % (opc, loc)
+ return False
+ return True
+ print 'WARNING: Unmatched line:\n %s' % `line`
+ return True
+
+ command = 'objdump --disassemble --prefix-addresses ' + `module`
+ ok = True
+ objdump = popen(command)
+ for line in objdump:
+ ok &= check(line)
+ if objdump.close():
+ ok = False
+
+ if skip_ud2a[0]:
+ #print 'WARNING: Skipped %d lines due to ud2a corruption' % skip_ud2a[0]
+ pass
+
+ return ok
+
+ def check_module(self, module):
+ "Check a module for exclusively safe opcodes and external references."
+ from os.path import isfile
+ if not isfile(module):
+ print 'ERROR: %s is not a file!' % `module`
+ return False
+ res = self.__check_references(module) and self.__check_opcodes(module)
+ if res:
+ print 'PASS: %s' % module
+ else:
+ print 'FAIL: %s' % module
+ return res
+
+
+if __name__ == '__main__':
+ from sys import exit, argv
+ exit(main(argv))
+