#!/usr/bin/python import sys try: import cjson except ImportError: print "Warning: skipping audit log verification because the cjson module" \ " is unavailable" sys.exit(0) from collections import defaultdict from optparse import OptionParser class Parser(object): DEFAULTS = {int:0, str:'', list:[]} def __init__(self, defconf=None): self.defaults = None if defconf is not None: self.defaults = self.flatten(defconf) def run(self, logs, verbose=None): result = self.parse(logs) if len(result) != len(self.defaults): diff = set(self.defaults.keys()).difference(result.keys()) print 'Test failed.' print 'The following attributes were not set:' for it in diff: print it sys.exit(1) def flatten(self, defaults): """ Flattens pathes to attributes. Parameters ---------- defaults : a dictionaries populated with default values Returns : dict : with flattened attributes """ result = dict() for path,value in self._walk(defaults): if path in result: print 'Warning: attribute path %s already exists' % path result[path] = value return result def parse(self, logs): result = defaultdict(list) for msg in logs: # each message is treated as a dictionary of dictionaries for a,v in self._walk(msg): # see if path is registered in defaults if a in self.defaults: dv = self.defaults.get(a) if dv is None: # determine default value by type if v is not None: dv = self.DEFAULTS[type(v)] else: print 'Warning: attribute %s is set to None' % a continue # by now we have default value if v != dv: # test passed result[a].append(v) return result def _walk(self, adict): """ Generator that works through dictionary. """ for a,v in adict.iteritems(): if isinstance(v,dict): for (attrpath,u) in self._walk(v): yield (a+'.'+attrpath,u) else: yield (a,v) if __name__ == '__main__': parser = OptionParser() parser.add_option("-i", "--logfile", dest="filename", help="input log file in json fmt", metavar="FILE") parser.add_option("-d", "--defaults", dest="defaults", help="dictionary with defaults", metavar="FILE") (options, args) = parser.parse_args() if options.filename is not None: with open(options.filename, 'r') as f: content = list() for l in f: content.append(cjson.decode(l.rstrip())) f.close() else: print 'Input file in jason format is required' exit() defaults = None if options.defaults is not None: with open(options.defaults, 'r') as f: defaults = cjson.decode(f.read()) f.close() # run test p = Parser(defaults) p.run(content) exit()