diff options
author | Petr Viktorin <pviktori@redhat.com> | 2012-04-18 11:22:35 -0400 |
---|---|---|
committer | Martin Kosek <mkosek@redhat.com> | 2012-05-09 11:54:20 +0200 |
commit | f19218f7d87f5847d51f79b5d2850f90b0ae8407 (patch) | |
tree | 500d3101dc1375d3eb75ffb927040c2593216aa9 /ipapython | |
parent | c02fcf5d34ad880e082cbc0c7f59fc3812d11c9e (diff) | |
download | freeipa-f19218f7d87f5847d51f79b5d2850f90b0ae8407.tar.gz freeipa-f19218f7d87f5847d51f79b5d2850f90b0ae8407.tar.xz freeipa-f19218f7d87f5847d51f79b5d2850f90b0ae8407.zip |
Remove duplicate and unused utility code
IPA has some unused code from abandoned features (Radius, ipa 1.x user
input, commant-line tab completion), as well as some duplicate utilities.
This patch cleans up the utility modules.
Duplicate code consolidated into ipapython.ipautil:
{ipalib.util,ipaserver.ipautil,ipapython.ipautil}.realm_to_suffix
{ipaserver,ipapython}.ipautil.CIDict
(with style improvements from the ipaserver version)
{ipapython.entity,ipaserver.ipautil}.utf8_encode_value
{ipapython.entity,ipaserver.ipautil}.utf8_encode_values
ipalib.util.get_fqdn was removed in favor of the same function in
ipaserver.install.installutils
Removed unused code:
ipalib.util:
load_plugins_in_dir
import_plugins_subpackage
make_repr (was imported but unused; also removed from tests)
ipapython.ipautil:
format_list
parse_key_value_pairs
read_pairs_file
read_items_file
user_input_plain
AttributeValueCompleter
ItemCompleter
ipaserver.ipautil:
get_gsserror (a different version exists in ipapython.ipautil)
ipaserver.ipautil ended up empty and is removed entirely.
https://fedorahosted.org/freeipa/ticket/2650
Diffstat (limited to 'ipapython')
-rw-r--r-- | ipapython/entity.py | 31 | ||||
-rw-r--r-- | ipapython/ipautil.py | 506 |
2 files changed, 44 insertions, 493 deletions
diff --git a/ipapython/entity.py b/ipapython/entity.py index 527adface..27d517879 100644 --- a/ipapython/entity.py +++ b/ipapython/entity.py @@ -17,18 +17,7 @@ import copy -import ipapython.ipautil - -def utf8_encode_value(value): - if isinstance(value,unicode): - return value.encode('utf-8') - return value - -def utf8_encode_values(values): - if isinstance(values,list) or isinstance(values,tuple): - return map(utf8_encode_value, values) - else: - return utf8_encode_value(values) +from ipapython import ipautil def copy_CIDict(x): """Do a deep copy of a CIDict""" @@ -55,19 +44,19 @@ class Entity: if entrydata: if isinstance(entrydata,tuple): self.dn = entrydata[0] - self.data = ipapython.ipautil.CIDict(entrydata[1]) + self.data = ipautil.CIDict(entrydata[1]) elif isinstance(entrydata,str) or isinstance(entrydata,unicode): self.dn = entrydata - self.data = ipapython.ipautil.CIDict() + self.data = ipautil.CIDict() elif isinstance(entrydata,dict): self.dn = entrydata['dn'] del entrydata['dn'] - self.data = ipapython.ipautil.CIDict(entrydata) + self.data = ipautil.CIDict(entrydata) else: self.dn = '' - self.data = ipapython.ipautil.CIDict() + self.data = ipautil.CIDict() - self.orig_data = ipapython.ipautil.CIDict(copy_CIDict(self.data)) + self.orig_data = ipautil.CIDict(copy_CIDict(self.data)) def __nonzero__(self): """This allows us to do tests like if entry: returns false if there is no data, @@ -120,9 +109,9 @@ class Entity: if (len(value) < 1): return if (len(value) == 1): - self.data[name] = utf8_encode_values(value[0]) + self.data[name] = ipautil.utf8_encode_values(value[0]) else: - self.data[name] = utf8_encode_values(value) + self.data[name] = ipautil.utf8_encode_values(value) setValues = setValue @@ -161,7 +150,7 @@ class Entity: def toDict(self): """Convert the attrs and values to a dict. The dict is keyed on the attribute name. The value is either single value or a list of values.""" - result = ipapython.ipautil.CIDict(self.data) + result = ipautil.CIDict(self.data) result['dn'] = self.dn return result @@ -171,7 +160,7 @@ class Entity: def origDataDict(self): """Returns a dict of the original values of the user. Used for updates.""" - result = ipapython.ipautil.CIDict(self.orig_data) + result = ipautil.CIDict(self.orig_data) result['dn'] = self.dn return result diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py index 69c328934..4a9db11e2 100644 --- a/ipapython/ipautil.py +++ b/ipapython/ipautil.py @@ -195,6 +195,7 @@ def format_netloc(host, port=None): return '%s:%s' % (host, str(port)) def realm_to_suffix(realm_name): + """Convert a kerberos realm into the IPA suffix.""" s = realm_name.split(".") terms = ["dc=" + x.lower() for x in s] return ",".join(terms) @@ -406,32 +407,32 @@ class CIDict(dict): If you extend UserDict, isinstance(foo, dict) returns false. """ - def __init__(self,default=None): + def __init__(self, default=None): super(CIDict, self).__init__() self._keys = {} self.update(default or {}) - def __getitem__(self,key): - return super(CIDict,self).__getitem__(string.lower(key)) + def __getitem__(self, key): + return super(CIDict, self).__getitem__(key.lower()) - def __setitem__(self,key,value): - lower_key = string.lower(key) + def __setitem__(self, key, value): + lower_key = key.lower() self._keys[lower_key] = key - return super(CIDict,self).__setitem__(string.lower(key),value) + return super(CIDict, self).__setitem__(lower_key, value) - def __delitem__(self,key): - lower_key = string.lower(key) + def __delitem__(self, key): + lower_key = key.lower() del self._keys[lower_key] - return super(CIDict,self).__delitem__(string.lower(key)) + return super(CIDict, self).__delitem__(key.lower()) - def update(self,dict): + def update(self, dict): for key in dict.keys(): self[key] = dict[key] - def has_key(self,key): - return super(CIDict, self).has_key(string.lower(key)) + def has_key(self, key): + return super(CIDict, self).has_key(key.lower()) - def get(self,key,failobj=None): + def get(self, key, failobj=None): try: return self[key] except KeyError: @@ -443,7 +444,7 @@ class CIDict(dict): def items(self): result = [] for k in self._keys.values(): - result.append((k,self[k])) + result.append((k, self[k])) return result def copy(self): @@ -458,7 +459,7 @@ class CIDict(dict): def iterkeys(self): return self.copy().iterkeys() - def setdefault(self,key,value=None): + def setdefault(self, key, value=None): try: return self[key] except KeyError: @@ -476,11 +477,11 @@ class CIDict(dict): raise def popitem(self): - (lower_key,value) = super(CIDict,self).popitem() + (lower_key, value) = super(CIDict, self).popitem() key = self._keys[lower_key] del self._keys[lower_key] - return (key,value) + return (key, value) class GeneralizedTimeZone(datetime.tzinfo): @@ -610,118 +611,17 @@ def ipa_generate_password(characters=None,pwd_len=None): rndpwd += rndchar return rndpwd -def format_list(items, quote=None, page_width=80): - '''Format a list of items formatting them so they wrap to fit the - available width. The items will be sorted. +def parse_items(text): + '''Given text with items separated by whitespace or comma, return a list of those items - The items may optionally be quoted. The quote parameter may either be - a string, in which case it is added before and after the item. Or the - quote parameter may be a pair (either a tuple or list). In this case - quote[0] is left hand quote and quote[1] is the right hand quote. - ''' - left_quote = right_quote = '' - num_items = len(items) - if not num_items: return "" - - if quote is not None: - if type(quote) in StringTypes: - left_quote = right_quote = quote - elif type(quote) is TupleType or type(quote) is ListType: - left_quote = quote[0] - right_quote = quote[1] - - max_len = max(map(len, items)) - max_len += len(left_quote) + len(right_quote) - num_columns = (page_width + max_len) / (max_len+1) - num_rows = (num_items + num_columns - 1) / num_columns - items.sort() - - rows = [''] * num_rows - i = row = col = 0 - - while i < num_items: - row = 0 - if col == 0: - separator = '' - else: - separator = ' ' - - while i < num_items and row < num_rows: - rows[row] += "%s%*s" % (separator, -max_len, "%s%s%s" % (left_quote, items[i], right_quote)) - i += 1 - row += 1 - col += 1 - return '\n'.join(rows) - -key_value_re = re.compile("(\w+)\s*=\s*(([^\s'\\\"]+)|(?P<quote>['\\\"])((?P=quote)|(.*?[^\\\])(?P=quote)))") -def parse_key_value_pairs(input): - ''' Given a string composed of key=value pairs parse it and return - a dict of the key/value pairs. Keys must be a word, a key must be followed - by an equal sign (=) and a value. The value may be a single word or may be - quoted. Quotes may be either single or double quotes, but must be balanced. - Inside the quoted text the same quote used to start the quoted value may be - used if it is escaped by preceding it with a backslash (\). - White space between the key, the equal sign, and the value is ignored. - Values are always strings. Empty values must be specified with an empty - quoted string, it's value after parsing will be an empty string. - - Example: The string - - arg0 = '' arg1 = 1 arg2='two' arg3 = "three's a crowd" arg4 = "this is a \" quote" - - will produce - - arg0= arg1=1 - arg2=two - arg3=three's a crowd - arg4=this is a " quote + The returned list only contains non-empty items. ''' - - kv_dict = {} - for match in key_value_re.finditer(input): - key = match.group(1) - quote = match.group('quote') - if match.group(5): - value = match.group(6) - if value is None: value = '' - value = re.sub('\\\%s' % quote, quote, value) - else: - value = match.group(2) - kv_dict[key] = value - return kv_dict - -def parse_items(text): - '''Given text with items separated by whitespace or comma, return a list of those items''' split_re = re.compile('[ ,\t\n]+') items = split_re.split(text) for item in items[:]: if not item: items.remove(item) return items -def read_pairs_file(filename): - comment_re = re.compile('#.*$', re.MULTILINE) - if filename == '-': - fd = sys.stdin - else: - fd = open(filename) - text = fd.read() - text = comment_re.sub('', text) # kill comments - pairs = parse_key_value_pairs(text) - if fd != sys.stdin: fd.close() - return pairs - -def read_items_file(filename): - comment_re = re.compile('#.*$', re.MULTILINE) - if filename == '-': - fd = sys.stdin - else: - fd = open(filename) - text = fd.read() - text = comment_re.sub('', text) # kill comments - items = parse_items(text) - if fd != sys.stdin: fd.close() - return items - def user_input(prompt, default = None, allow_empty = True): if default == None: while True: @@ -761,357 +661,6 @@ def user_input(prompt, default = None, allow_empty = True): else: return ret -def user_input_plain(prompt, default = None, allow_empty = True, allow_spaces = True): - while True: - ret = user_input(prompt, default, allow_empty) - if ipavalidate.Plain(ret, not allow_empty, allow_spaces): - return ret - -class AttributeValueCompleter: - ''' - Gets input from the user in the form "lhs operator rhs" - TAB completes partial input. - lhs completes to a name in @lhs_names - The lhs is fully parsed if a lhs_delim delimiter is seen, then TAB will - complete to the operator and a default value. - Default values for a lhs value can specified as: - - a string, all lhs values will use this default - - a dict, the lhs value is looked up in the dict to return the default or None - - a function with a single arg, the lhs value, it returns the default or None - - After creating the completer you must open it to set the terminal - up, Then get a line of input from the user by calling read_input() - which returns two values, the lhs and rhs, which might be None if - lhs or rhs was not parsed. After you are done getting input you - should close the completer to restore the terminal. - - Example: (note this is essentially what the convenience function get_pairs() does) - - This will allow the user to autocomplete foo & foobar, both have - defaults defined in a dict. In addition the foobar attribute must - be specified before the prompting loop will exit. Also, this - example show how to require that each attrbute entered by the user - is valid. - - attrs = ['foo', 'foobar'] - defaults = {'foo' : 'foo_default', 'foobar' : 'foobar_default'} - mandatory_attrs = ['foobar'] - - c = AttributeValueCompleter(attrs, defaults) - c.open() - mandatory_attrs_remaining = mandatory_attrs[:] - - while True: - if mandatory_attrs_remaining: - attribute, value = c.read_input("Enter: ", mandatory_attrs_remaining[0]) - try: - mandatory_attrs_remaining.remove(attribute) - except ValueError: - pass - else: - attribute, value = c.read_input("Enter: ") - if attribute is None: - # Are we done? - if mandatory_attrs_remaining: - print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining)) - continue - else: - break - if attribute not in attrs: - print "ERROR: %s is not a valid attribute" % (attribute) - else: - print "got '%s' = '%s'" % (attribute, value) - - c.close() - print "exiting..." - ''' - - def __init__(self, lhs_names, default_value=None, lhs_regexp=r'^\s*(?P<lhs>[^ =]+)', lhs_delims=' =', - operator='=', strip_rhs=True): - self.lhs_names = lhs_names - self.default_value = default_value - # lhs_regexp must have named group 'lhs' which returns the contents of the lhs - self.lhs_regexp = lhs_regexp - self.lhs_re = re.compile(self.lhs_regexp) - self.lhs_delims = lhs_delims - self.operator = operator - self.strip_rhs = strip_rhs - self.pairs = None - self._reset() - - def _reset(self): - self.lhs = None - self.lhs_complete = False - self.operator_complete = False - self.rhs = None - - def open(self): - # Save state - self.prev_completer = readline.get_completer() - self.prev_completer_delims = readline.get_completer_delims() - - # Set up for ourself - readline.parse_and_bind("tab: complete") - readline.set_completer(self.complete) - readline.set_completer_delims(self.lhs_delims) - - def close(self): - # Restore previous state - readline.set_completer_delims(self.prev_completer_delims) - readline.set_completer(self.prev_completer) - - def parse_input(self): - '''We are looking for 3 tokens: <lhs,op,rhs> - Extract as much of each token as possible. - Set flags indicating if token is fully parsed. - ''' - try: - self._reset() - buf_len = len(self.line_buffer) - pos = 0 - lhs_match = self.lhs_re.search(self.line_buffer, pos) - if not lhs_match: return # no lhs content - self.lhs = lhs_match.group('lhs') # get lhs contents - pos = lhs_match.end('lhs') # new scanning position - if pos == buf_len: return # nothing after lhs, lhs incomplete - self.lhs_complete = True # something trails the lhs, lhs is complete - operator_beg = self.line_buffer.find(self.operator, pos) # locate operator - if operator_beg == -1: return # did not find the operator - self.operator_complete = True # operator fully parsed - operator_end = operator_beg + len(self.operator) - pos = operator_end # step over the operator - self.rhs = self.line_buffer[pos:] - except Exception, e: - traceback.print_exc() - print "Exception in %s.parse_input(): %s" % (self.__class__.__name__, e) - - def get_default_value(self): - '''default_value can be a string, a dict, or a function. - If it's a string it's a global default for all attributes. - If it's a dict the default is looked up in the dict index by attribute. - If it's a function, the function is called with 1 parameter, the attribute - and it should return the default value for the attriubte or None''' - - if not self.lhs_complete: raise ValueError("attribute not parsed") - - # If the user previously provided a value let that override the supplied default - if self.pairs is not None: - prev_value = self.pairs.get(self.lhs) - if prev_value is not None: return prev_value - - # No previous user provided value, query for a default - default_value_type = type(self.default_value) - if default_value_type is DictType: - return self.default_value.get(self.lhs, None) - elif default_value_type is FunctionType: - return self.default_value(self.lhs) - elif default_value_type is StringType: - return self.default_value - else: - return None - - def get_lhs_completions(self, text): - if text: - self.completions = [lhs for lhs in self.lhs_names if lhs.startswith(text)] - else: - self.completions = self.lhs_names - - def complete(self, state): - self.line_buffer= readline.get_line_buffer() - self.parse_input() - if not self.lhs_complete: - # lhs is not complete, set up to complete the lhs - if state == 0: - beg = readline.get_begidx() - end = readline.get_endidx() - self.get_lhs_completions(self.line_buffer[beg:end]) - if state >= len(self.completions): return None - return self.completions[state] - - - elif not self.operator_complete: - # lhs is complete, but the operator is not so we complete - # by inserting the operator manually. - # Also try to complete the default value at this time. - readline.insert_text('%s ' % self.operator) - default_value = self.get_default_value() - if default_value is not None: - readline.insert_text(default_value) - readline.redisplay() - return None - else: - # lhs and operator are complete, if the rhs is blank - # (either empty or only only whitespace) then attempt - # to complete by inserting the default value, otherwise - # there is nothing we can complete to so we're done. - if self.rhs.strip(): - return None - default_value = self.get_default_value() - if default_value is not None: - readline.insert_text(default_value) - readline.redisplay() - return None - - def pre_input_hook(self): - readline.insert_text('%s %s ' % (self.initial_lhs, self.operator)) - readline.redisplay() - - def read_input(self, prompt, initial_lhs=None): - self.initial_lhs = initial_lhs - try: - self._reset() - if initial_lhs is None: - readline.set_pre_input_hook(None) - else: - readline.set_pre_input_hook(self.pre_input_hook) - self.line_buffer = raw_input(prompt).strip() - self.parse_input() - if self.strip_rhs and self.rhs is not None: - return self.lhs, self.rhs.strip() - else: - return self.lhs, self.rhs - except EOFError: - return None, None - - def get_pairs(self, prompt, mandatory_attrs=None, validate_callback=None, must_match=True, value_required=True): - self.pairs = {} - if mandatory_attrs: - mandatory_attrs_remaining = mandatory_attrs[:] - else: - mandatory_attrs_remaining = [] - - print "Enter name = value" - print "Press <ENTER> to accept, a blank line terminates input" - print "Pressing <TAB> will auto completes name, assignment, and value" - print - while True: - if mandatory_attrs_remaining: - attribute, value = self.read_input(prompt, mandatory_attrs_remaining[0]) - else: - attribute, value = self.read_input(prompt) - if attribute is None: - # Are we done? - if mandatory_attrs_remaining: - print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining)) - continue - else: - break - if value is None: - if value_required: - print "ERROR: you must specify a value for %s" % attribute - continue - else: - if must_match and attribute not in self.lhs_names: - print "ERROR: %s is not a valid name" % (attribute) - continue - if validate_callback is not None: - if not validate_callback(attribute, value): - print "ERROR: %s is not valid for %s" % (value, attribute) - continue - try: - mandatory_attrs_remaining.remove(attribute) - except ValueError: - pass - - self.pairs[attribute] = value - return self.pairs - -class ItemCompleter: - ''' - Prompts the user for items in a list of items with auto completion. - TAB completes partial input. - More than one item can be specifed during input, whitespace and/or comma's seperate. - Example: - - possible_items = ['foo', 'bar'] - c = ItemCompleter(possible_items) - c.open() - # Use read_input() to limit input to a single carriage return (e.g. <ENTER>) - #items = c.read_input("Enter: ") - # Use get_items to iterate until a blank line is entered. - items = c.get_items("Enter: ") - c.close() - print "items=%s" % (items) - - ''' - - def __init__(self, items): - self.items = items - self.initial_input = None - self.item_delims = ' \t,' - self.operator = '=' - self.split_re = re.compile('[%s]+' % self.item_delims) - - def open(self): - # Save state - self.prev_completer = readline.get_completer() - self.prev_completer_delims = readline.get_completer_delims() - - # Set up for ourself - readline.parse_and_bind("tab: complete") - readline.set_completer(self.complete) - readline.set_completer_delims(self.item_delims) - - def close(self): - # Restore previous state - readline.set_completer_delims(self.prev_completer_delims) - readline.set_completer(self.prev_completer) - - def get_item_completions(self, text): - if text: - self.completions = [lhs for lhs in self.items if lhs.startswith(text)] - else: - self.completions = self.items - - def complete(self, state): - self.line_buffer= readline.get_line_buffer() - if state == 0: - beg = readline.get_begidx() - end = readline.get_endidx() - self.get_item_completions(self.line_buffer[beg:end]) - if state >= len(self.completions): return None - return self.completions[state] - - def pre_input_hook(self): - readline.insert_text('%s %s ' % (self.initial_input, self.operator)) - readline.redisplay() - - def read_input(self, prompt, initial_input=None): - items = [] - - self.initial_input = initial_input - try: - if initial_input is None: - readline.set_pre_input_hook(None) - else: - readline.set_pre_input_hook(self.pre_input_hook) - self.line_buffer = raw_input(prompt).strip() - items = self.split_re.split(self.line_buffer) - for item in items[:]: - if not item: items.remove(item) - return items - except EOFError: - return items - - def get_items(self, prompt, must_match=True): - items = [] - - print "Enter name [name ...]" - print "Press <ENTER> to accept, blank line or control-D terminates input" - print "Pressing <TAB> auto completes name" - print - while True: - new_items = self.read_input(prompt) - if not new_items: break - for item in new_items: - if must_match: - if item not in self.items: - print "ERROR: %s is not valid" % (item) - continue - if item in items: continue - items.append(item) - - return items def get_gsserror(e): """ @@ -1446,3 +995,16 @@ def make_sshfp(key): else: return return '%d 1 %s' % (algo, fp) + + +def utf8_encode_value(value): + if isinstance(value, unicode): + return value.encode('utf-8') + return value + + +def utf8_encode_values(values): + if isinstance(values, list) or isinstance(values, tuple): + return map(utf8_encode_value, values) + else: + return utf8_encode_value(values) |