# # ipachangeconf - configuration file manipulation classes and functions # partially based on authconfig code # Copyright (c) 1999-2007 Red Hat, Inc. # Author: Simo Sorce # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import fcntl import os import shutil import six if six.PY3: unicode = str def openLocked(filename, perms): fd = -1 try: fd = os.open(filename, os.O_RDWR | os.O_CREAT, perms) fcntl.lockf(fd, fcntl.LOCK_EX) except OSError as e: if fd != -1: try: os.close(fd) except OSError: pass raise IOError(e.errno, e.strerror) return os.fdopen(fd, "r+") #TODO: add subsection as a concept # (ex. REALM.NAME = { foo = x bar = y } ) #TODO: put section delimiters as separating element of the list # so that we can process multiple sections in one go #TODO: add a comment all but provided options as a section option class IPAChangeConf: def __init__(self, name): self.progname = name self.indent = ("", "", "") self.assign = (" = ", "=") self.dassign = self.assign[0] self.comment = ("#",) self.dcomment = self.comment[0] self.eol = ("\n",) self.deol = self.eol[0] self.sectnamdel = ("[", "]") self.subsectdel = ("{", "}") self.case_insensitive_sections = True def setProgName(self, name): self.progname = name def setIndent(self, indent): if type(indent) is tuple: self.indent = indent elif type(indent) is str: self.indent = (indent, ) else: raise ValueError('Indent must be a list of strings') def setOptionAssignment(self, assign): if type(assign) is tuple: self.assign = assign else: self.assign = (assign, ) self.dassign = self.assign[0] def setCommentPrefix(self, comment): if type(comment) is tuple: self.comment = comment else: self.comment = (comment, ) self.dcomment = self.comment[0] def setEndLine(self, eol): if type(eol) is tuple: self.eol = eol else: self.eol = (eol, ) self.deol = self.eol[0] def setSectionNameDelimiters(self, delims): self.sectnamdel = delims def setSubSectionDelimiters(self, delims): self.subsectdel = delims def matchComment(self, line): for v in self.comment: if line.lstrip().startswith(v): return line.lstrip()[len(v):] return False def matchEmpty(self, line): if line.strip() == "": return True return False def matchSection(self, line): cl = "".join(line.strip().split()) cl = cl.lower() if self.case_insensitive_sections else cl if len(self.sectnamdel) != 2: return False if not cl.startswith(self.sectnamdel[0]): return False if not cl.endswith(self.sectnamdel[1]): return False return cl[len(self.sectnamdel[0]):-len(self.sectnamdel[1])] def matchSubSection(self, line): if self.matchComment(line): return False parts = line.split(self.dassign, 1) if len(parts) < 2: return False if parts[1].strip() == self.subsectdel[0]: return parts[0].strip() return False def matchSubSectionEnd(self, line): if self.matchComment(line): return False if line.strip() == self.subsectdel[1]: return True return False def getSectionLine(self, section): if len(self.sectnamdel) != 2: return section return self._dump_line(self.sectnamdel[0], section, self.sectnamdel[1], self.deol) def _dump_line(self, *args): return u"".join(unicode(x) for x in args) def dump(self, options, level=0): output = [] if level >= len(self.indent): level = len(self.indent) - 1 for o in options: if o['type'] == "section": output.append(self._dump_line(self.sectnamdel[0], o['name'], self.sectnamdel[1])) output.append(self.dump(o['value'], (level + 1))) continue if o['type'] == "subsection": output.append(self._dump_line(self.indent[level], o['name'], self.dassign, self.subsectdel[0])) output.append(self.dump(o['value'], (level + 1))) output.append(self._dump_line(self.indent[level], self.subsectdel[1])) continue if o['type'] == "option": delim = o.get('delim', self.dassign) if delim not in self.assign: raise ValueError('Unknown delim "%s" must be one of "%s"' % (delim, " ".join([d for d in self.assign]))) output.append(self._dump_line(self.indent[level], o['name'], delim, o['value'])) continue if o['type'] == "comment": output.append(self._dump_line(self.dcomment, o['value'])) continue if o['type'] == "empty": output.append('') continue raise SyntaxError('Unknown type: [%s]' % o['type']) # append an empty string to the output so that we add eol to the end # of the file contents in a single join() output.append('') return self.deol.join(output) def parseLine(self, line): if self.matchEmpty(line): return {'name': 'empty', 'type': 'empty'} value = self.matchComment(line) if value: return {'name': 'comment', 'type': 'comment', 'value': value.rstrip()} # pylint: disable=E1103 o = dict() parts = line.split(self.dassign, 1) if len(parts) < 2: # The default assign didn't match, try the non-default for d in self.assign[1:]: parts = line.split(d, 1) if len(parts) >= 2: o['delim'] = d break if 'delim' not in o: raise SyntaxError('Syntax Error: Unknown line format') o.update({'name':parts[0].strip(), 'type':'option', 'value':parts[1].rstrip()}) return o def findOpts(self, opts, type, name, exclude_sections=False): num = 0 for o in opts: if o['type'] == type and o['name'] == name: return (num, o) if exclude_sections and (o['type'] == "section" or o['type'] == "subsection"): return (num, None) num += 1 return (num, None) def commentOpts(self, inopts, level=0): opts = [] if level >= len(self.indent): level = len(self.indent) - 1 for o in inopts: if o['type'] == 'section': no = self.commentOpts(o['value'], (level + 1)) val = self._dump_line(self.dcomment, self.sectnamdel[0], o['name'], self.sectnamdel[1]) opts.append({'name': 'comment', 'type': 'comment', 'value': val}) for n in no: opts.append(n) continue if o['type'] == 'subsection': no = self.commentOpts(o['value'], (level + 1)) val = self._dump_line(self.indent[level], o['name'], self.dassign, self.subsectdel[0]) opts.append({'name': 'comment', 'type': 'comment', 'value': val}) opts.extend(no) val = self._dump_line(self.indent[level], self.subsectdel[1]) opts.append({'name': 'comment', 'type': 'comment', 'value': val}) continue if o['type'] == 'option': delim = o.get('delim', self.dassign) if delim not in self.assign: val = self._dump_line(self.indent[level], o['name'], delim, o['value']) opts.append({'name':'comment', 'type':'comment', 'value':val}) continue if o['type'] == 'comment': opts.append(o) continue if o['type'] == 'empty': opts.append({'name': 'comment', 'type': 'comment', 'value': ''}) continue raise SyntaxError('Unknown type: [%s]' % o['type']) return opts def mergeOld(self, oldopts, newopts): opts = [] for o in oldopts: if o['type'] == "section" or o['type'] == "subsection": (num, no) = self.findOpts(newopts, o['type'], o['name']) if not no: opts.append(o) continue if no['action'] == "set": mo = self.mergeOld(o['value'], no['value']) opts.append({'name': o['name'], 'type': o['type'], 'value': mo}) continue if no['action'] == "comment": co = self.commentOpts(o['value']) for c in co: opts.append(c) continue if no['action'] == "remove": continue raise SyntaxError('Unknown action: [%s]' % no['action']) if o['type'] == "comment" or o['type'] == "empty": opts.append(o) continue if o['type'] == "option": (num, no) = self.findOpts(newopts, 'option', o['name'], True) if not no: opts.append(o) continue if no['action'] == 'comment' or no['action'] == 'remove': if (no['value'] is not None and o['value'] is not no['value']): opts.append(o) continue if no['action'] == 'comment': value = self._dump_line(self.dcomment, o['name'], self.dassign, o['value']) opts.append({'name': 'comment', 'type': 'comment', 'value': value}) continue if no['action'] == 'set': opts.append(no) continue if no['action'] == 'addifnotset': opts.append({'name': 'comment', 'type': 'comment', 'value': self._dump_line(no['name'], self.dassign, no['value'], u' # modified by IPA' )}) opts.append(o) continue raise SyntaxError('Unknown action: [%s]' % no['action']) raise SyntaxError('Unknown type: [%s]' % o['type']) return opts def mergeNew(self, opts, newopts): cline = 0 for no in newopts: if no['type'] == "section" or no['type'] == "subsection": (num, o) = self.findOpts(opts, no['type'], no['name']) if not o: if no['action'] == 'set': opts.append(no) continue if no['action'] == "set": self.mergeNew(o['value'], no['value']) continue cline = num + 1 continue if no['type'] == "option": (num, o) = self.findOpts(opts, no['type'], no['name'], True) if not o: if no['action'] == 'set' or no['action'] == 'addifnotset': opts.append(no) continue cline = num + 1 continue if no['type'] == "comment" or no['type'] == "empty": opts.insert(cline, no) cline += 1 continue raise SyntaxError('Unknown type: [%s]' % no['type']) def merge(self, oldopts, newopts): #Use a two pass strategy #First we create a new opts tree from oldopts removing/commenting # the options as indicated by the contents of newopts #Second we fill in the new opts tree with options as indicated # in the newopts tree (this is becaus eentire (sub)sections may # in the newopts tree (this is becaus entire (sub)sections may # exist in the newopts that do not exist in oldopts) opts = self.mergeOld(oldopts, newopts) self.mergeNew(opts, newopts) return opts #TODO: Make parse() recursive? def parse(self, f): opts = [] sectopts = [] section = None subsectopts = [] subsection = None curopts = opts fatheropts = opts # Read in the old file. for line in f: # It's a section start. value = self.matchSection(line) if value: if section is not None: opts.append({'name': section, 'type': 'section', 'value': sectopts}) sectopts = [] curopts = sectopts fatheropts = sectopts section = value continue # It's a subsection start. value = self.matchSubSection(line) if value: if subsection is not None: raise SyntaxError('nested subsections are not ' 'supported yet') subsectopts = [] curopts = subsectopts subsection = value continue value = self.matchSubSectionEnd(line) if value: if subsection is None: raise SyntaxError('Unmatched end subsection terminator ' 'found') fatheropts.append({'name': subsection, 'type': 'subsection', 'value': subsectopts}) subsection = None curopts = fatheropts continue # Copy anything else as is. try: curopts.append(self.parseLine(line)) except SyntaxError as e: raise SyntaxError('{error} in file {fname}: [{line}]'.format( error=e, fname=f.name, line=line.rstrip())) #Add last section if any if len(sectopts) is not 0: opts.append({'name': section, 'type': 'section', 'value': sectopts}) return opts # Write settings to configuration file # file is a path # options is a set of dictionaries in the form: # [{'name': 'foo', 'value': 'bar', 'action': 'set/comment'}] # section is a section name like 'global' def changeConf(self, file, newopts): autosection = False savedsection = None done = False output = "" f = None try: # Do not catch an unexisting file error # we want to fail in that case shutil.copy2(file, (file + ".ipabkp")) f = openLocked(file, 0o644) oldopts = self.parse(f) options = self.merge(oldopts, newopts) output = self.dump(options) # Write it out and close it. f.seek(0) f.truncate(0) f.write(output) finally: try: if f: f.close() except IOError: pass return True # Write settings to new file, backup old # file is a path # options is a set of dictionaries in the form: # [{'name': 'foo', 'value': 'bar', 'action': 'set/comment'}] def newConf(self, file, options): autosection = False savedsection = None done = False output = "" f = None try: try: shutil.copy2(file, (file + ".ipabkp")) except IOError as err: if err.errno == 2: # The orign file did not exist pass f = openLocked(file, 0o644) # Trunkate f.seek(0) f.truncate(0) output = self.dump(options) f.write(output) finally: try: if f: f.close() except IOError: pass return True @staticmethod def setOption(name, value): return {'name': name, 'type': 'option', 'action': 'set', 'value': value} @staticmethod def rmOption(name): return {'name': name, 'type': 'option', 'action': 'remove', 'value': None} @staticmethod def setSection(name, options): return {'name': name, 'type': 'section', 'action': 'set', 'value': options}