# -*- coding: utf-8 -*- # # Copyright 2009 - 2013 Clark Williams # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # For the avoidance of doubt the "preferred form" of this code is one which # is in an open unpatent encumbered format. Where cryptographic key signing # forms part of the process of creating an executable the information # including keys needed to generate an equivalently functional executable # are deemed to be part of the source code. # import os import sys import libxml2 import lxml.etree import codecs import re from string import maketrans def convert_libxml2_to_lxml_doc(inxml): "Converts a libxml2.xmlDoc into a lxml.etree document object" if not isinstance(inxml, libxml2.xmlDoc): raise TypeError('Function requires an libxml2.xmlDoc as input') root = inxml.getRootElement() ret = lxml.etree.XML(root.serialize('UTF-8')) del root return ret def convert_lxml_to_libxml2_nodes(inlxml): "Converts a lxml.etree elements tree into a libxml2.xmlNode object" if not isinstance(inlxml,lxml.etree._Element) and not isinstance(inlxml, lxml.etree._XSLTResultTree): raise TypeError('Function requires an lxml.etree object as input') return libxml2.parseDoc(lxml.etree.tostring(inlxml)).getRootElement() class XMLOut(object): '''Class to create XML output''' def __init__(self, roottag, version, attr = None, encoding='UTF-8'): self.level = 0 self.encoding = encoding self.rootattr = attr self.version = version self.status = 0 # 0 - no report created/loaded, 1 - new report, 2 - loaded report, 3 - XML closed self.tag_trans = self.__setup_tag_trans() self.roottag = self.__fixtag(roottag) self.xmldoc = None def __del__(self): if self.level > 0: raise RuntimeError, "XMLOut: open blocks at __del__ (last opened '%s')" % self.currtag.name if self.xmldoc is not None: self.xmldoc.freeDoc() def __setup_tag_trans(self): t = maketrans('', '') t = t.replace(' ', '_') t = t.replace('\t', '_') t = t.replace('(', '_') t = t.replace(')', '_') t = t.replace(':', '-') return t def __fixtag(self, tagname): if not isinstance(tagname, str): return str(tagname) return tagname.translate(self.tag_trans) def __encode(self, value, tagmode = False): if type(value) is unicode: val = value elif type(value) is str: val = unicode(value) else: val = unicode(str(value)) if tagmode is True: rx = re.compile(" ") val = rx.sub("_", val) # libxml2 uses UTF-8 internally and must have # all input as UTF-8. return val.encode('utf-8') def __add_attributes(self, node, attr): if attr is not None: for k, v in attr.iteritems(): node.newProp(k, self.__encode(v)) def __parseToXML(self, node, data): # All supported variable types needs to be set up # here. TypeError exception will be raised on # unknown types. t = type(data) if t is unicode or t is str or t is int or t is float: n = libxml2.newText(self.__encode(data)) node.addChild(n) elif t is bool: v = data and "1" or "0" n = libxml2.newText(self.__encode(v)) node.addChild(n) elif t is dict: for (key, val) in data.iteritems(): node2 = libxml2.newNode(self.__encode(self.parsedata_prefix + key, True)) self.__parseToXML(node2, val) node.addChild(node2) elif t is tuple: for v in data: if type(v) is dict: self.__parseToXML(node, v) else: n = libxml2.newNode(self.tuple_tagname) self.__parseToXML(n, v) node.addChild(n) else: raise TypeError, "unhandled type (%s) for value '%s'" % (type(data), unicode(data)) def close(self): if self.status == 0: raise RuntimeError, "XMLOut: No XML document is created nor loaded" if self.status == 3: raise RuntimeError, "XMLOut: XML document already closed" if self.level > 0: raise RuntimeError, "XMLOut: open blocks at close() (last opened '%s')" % self.currtag.name if self.status == 1: # Only set the root node in the doc on created reports (NewReport called) self.xmldoc.setRootElement(self.xmlroot) self.status = 3 def NewReport(self): if self.status != 0 and self.status != 3: raise RuntimeError, "XMLOut: Cannot start a new report without closing the currently opened one" if self.status == 3: self.xmldoc.freeDoc() # Free the report from memory if we have one already self.xmldoc = libxml2.newDoc("1.0") self.xmlroot = libxml2.newNode(self.roottag) self.__add_attributes(self.xmlroot, {'version': self.version}) self.__add_attributes(self.xmlroot, self.rootattr) self.currtag = self.xmlroot self.level = 0 self.status = 1 def LoadReport(self, filename, validate_version = False): if self.status == 3: self.xmldoc.freeDoc() # Free the report from memory if we have one already self.xmldoc = libxml2.parseFile(filename) if self.xmldoc.name != filename: self.status = 3 raise RuntimeError, "XMLOut: Loading report failed" root = self.xmldoc.children if root.name != self.roottag: self.status = 3 raise RuntimeError, "XMLOut: Loaded report is not a valid %s XML file" % self.roottag if validate_version is True: ver = root.hasProp('version') if ver is None: self.status = 3 raise RuntimeError, "XMLOut: Loaded report is missing version attribute in root node" if ver.getContent() != self.version: self.status = 3 raise RuntimeError, "XMLOut: Loaded report is not of version %s" % self.version self.status = 2 # Confirm that we have loaded a report from file def Write(self, filename, xslt = None): if self.status != 2 and self.status != 3: raise RuntimeError, "XMLOut: XML document is not closed" if xslt == None: # If no XSLT template is give, write raw XML self.xmldoc.saveFormatFileEnc(filename, self.encoding, 1) return else: # Load XSLT file and prepare the XSLT parser xsltfile = open(xslt, 'r') xsltdoc = lxml.etree.parse(xsltfile) parser = lxml.etree.XSLT(xsltdoc) xsltfile.close() # imitate libxml2's filename interpretation if filename != "-": dstfile = codecs.open(filename, "w", encoding=self.encoding) else: dstfile = sys.stdout # # Parse XML+XSLT and write the result to file # xmldoc = convert_libxml2_to_lxml_doc(self.xmldoc) resdoc = parser(xmldoc) # Write the file with the requested output encoding dstfile.write(unicode(resdoc).encode(self.encoding)) if dstfile != sys.stdout: dstfile.close() # Clean up del resdoc del xsltdoc del parser del xsltfile del xmldoc def GetXMLdocument(self): if self.status != 2 and self.status != 3: raise RuntimeError, "XMLOut: XML document is not closed" return self.xmldoc def openblock(self, tagname, attributes=None): if self.status != 1: raise RuntimeError, "XMLOut: openblock() cannot be called before NewReport() is called" ntag = libxml2.newNode(self.__fixtag(tagname)); self.__add_attributes(ntag, attributes) self.currtag.addChild(ntag) self.currtag = ntag self.level += 1 return ntag def closeblock(self): if self.status != 1: raise RuntimeError, "XMLOut: closeblock() cannot be called before NewReport() is called" if self.level == 0: raise RuntimeError, "XMLOut: no open tags to close" self.currtag = self.currtag.get_parent() self.level -= 1 return self.currtag def taggedvalue(self, tag, value, attributes=None): if self.status != 1: raise RuntimeError, "XMLOut: taggedvalue() cannot be called before NewReport() is called" ntag = self.currtag.newTextChild(None, self.__fixtag(tag), self.__encode(value)) self.__add_attributes(ntag, attributes) return ntag def ParseData(self, tagname, data, attributes=None, tuple_tagname="tuples", prefix = ""): if self.status != 1: raise RuntimeError, "XMLOut: taggedvalue() cannot be called before NewReport() is called" self.tuple_tagname = self.__fixtag(tuple_tagname) self.parsedata_prefix = prefix ntag = libxml2.newNode(self.__fixtag(tagname)) self.__add_attributes(ntag, attributes) self.__parseToXML(ntag, data) self.currtag.addChild(ntag) return ntag def AppendXMLnodes(self, nodes): if not isinstance(nodes, libxml2.xmlNode): raise ValueError, "Input value is not a libxml2.xmlNode" return self.currtag.addChild(nodes) def unit_test(rootdir): try: x = XMLOut('rteval', 'UNIT-TEST', None, 'UTF-8') x.NewReport() x.openblock('run_info', {'days': 0, 'hours': 0, 'minutes': 32, 'seconds': 18}) x.taggedvalue('time', '11:22:33') x.taggedvalue('date', '2000-11-22') x.closeblock() x.openblock('uname') x.taggedvalue('node', u'testing - \xe6\xf8') x.taggedvalue('kernel', 'my_test_kernel', {'is_RT': 0}) x.taggedvalue('arch', 'mips') x.closeblock() x.openblock('hardware') x.taggedvalue('cpu_cores', 2) x.taggedvalue('memory_size', 1024*1024*2) x.closeblock() x.openblock('loads', {'load_average': 3.29}) x.taggedvalue('command_line','./load/loader --extreme --ultimate --threads 4096', {'name': 'heavyloader'}) x.taggedvalue('command_line','dd if=/dev/zero of=/dev/null', {'name': 'lightloader'}) x.closeblock() x.close() print "------------- XML OUTPUT ----------------------------" x.Write("-") print "------------- XSLT PARSED OUTPUT --------------------" x.Write("-", "rteval_text.xsl") print "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" x.Write("/tmp/xmlout-test.xml") del x print "------------- LOAD XML FROM FILE -----------------------------" x = XMLOut('rteval','UNIT-TEST', None, 'UTF-8') x.LoadReport("/tmp/xmlout-test.xml", True) print "------------- LOADED XML DATA --------------------------------" x.Write("-") print "------------- XSLT PARSED OUTPUT FROM LOADED XML--------------" x.Write("-", "rteval_text.xsl") x.close() ## Test new data parser ... it eats most data types print "------------- TESTING XMLOut::ParseData() --------------" x.NewReport() x.ParseData("ParseTest", "test string", {"type": "simple_string"}) x.ParseData("ParseTest", 1234, {"type": "integer"}) x.ParseData("ParseTest", 39.3904, {"type": "float"}) x.ParseData("ParseTest", (11,22,33,44,55), {"type": "tuples"}) x.ParseData("ParseTest", (99,88,77), {"type": "tuples", "comment": "Changed default tuple tag name"}, "int_values") test = {"var1": "value 1", "var2": { "varA1": 1, "pi": 3.1415926, "varA3": (1, 2, {"test1": "val1"}, (4.1,4.2,4.3), 5), "varA4": {'another_level': True, 'another_value': "blabla"} }, "utf8 data": u'æøå', u"løpe": True} x.ParseData("ParseTest", test, {"type": "dict"}, prefix="test ") x.close() x.Write("-") return 0 except Exception, e: print "** EXCEPTION %s", str(e) return 1 if __name__ == '__main__': sys.exit(unit_test('..'))