summaryrefslogtreecommitdiffstats
path: root/ipapython
diff options
context:
space:
mode:
authorPetr Viktorin <pviktori@redhat.com>2012-04-18 11:22:35 -0400
committerMartin Kosek <mkosek@redhat.com>2012-05-09 11:54:20 +0200
commitf19218f7d87f5847d51f79b5d2850f90b0ae8407 (patch)
tree500d3101dc1375d3eb75ffb927040c2593216aa9 /ipapython
parentc02fcf5d34ad880e082cbc0c7f59fc3812d11c9e (diff)
downloadfreeipa-f19218f7d87f5847d51f79b5d2850f90b0ae8407.tar.gz
freeipa-f19218f7d87f5847d51f79b5d2850f90b0ae8407.tar.xz
freeipa-f19218f7d87f5847d51f79b5d2850f90b0ae8407.zip
Remove duplicate and unused utility code
IPA has some unused code from abandoned features (Radius, ipa 1.x user input, commant-line tab completion), as well as some duplicate utilities. This patch cleans up the utility modules. Duplicate code consolidated into ipapython.ipautil: {ipalib.util,ipaserver.ipautil,ipapython.ipautil}.realm_to_suffix {ipaserver,ipapython}.ipautil.CIDict (with style improvements from the ipaserver version) {ipapython.entity,ipaserver.ipautil}.utf8_encode_value {ipapython.entity,ipaserver.ipautil}.utf8_encode_values ipalib.util.get_fqdn was removed in favor of the same function in ipaserver.install.installutils Removed unused code: ipalib.util: load_plugins_in_dir import_plugins_subpackage make_repr (was imported but unused; also removed from tests) ipapython.ipautil: format_list parse_key_value_pairs read_pairs_file read_items_file user_input_plain AttributeValueCompleter ItemCompleter ipaserver.ipautil: get_gsserror (a different version exists in ipapython.ipautil) ipaserver.ipautil ended up empty and is removed entirely. https://fedorahosted.org/freeipa/ticket/2650
Diffstat (limited to 'ipapython')
-rw-r--r--ipapython/entity.py31
-rw-r--r--ipapython/ipautil.py506
2 files changed, 44 insertions, 493 deletions
diff --git a/ipapython/entity.py b/ipapython/entity.py
index 527adface..27d517879 100644
--- a/ipapython/entity.py
+++ b/ipapython/entity.py
@@ -17,18 +17,7 @@
import copy
-import ipapython.ipautil
-
-def utf8_encode_value(value):
- if isinstance(value,unicode):
- return value.encode('utf-8')
- return value
-
-def utf8_encode_values(values):
- if isinstance(values,list) or isinstance(values,tuple):
- return map(utf8_encode_value, values)
- else:
- return utf8_encode_value(values)
+from ipapython import ipautil
def copy_CIDict(x):
"""Do a deep copy of a CIDict"""
@@ -55,19 +44,19 @@ class Entity:
if entrydata:
if isinstance(entrydata,tuple):
self.dn = entrydata[0]
- self.data = ipapython.ipautil.CIDict(entrydata[1])
+ self.data = ipautil.CIDict(entrydata[1])
elif isinstance(entrydata,str) or isinstance(entrydata,unicode):
self.dn = entrydata
- self.data = ipapython.ipautil.CIDict()
+ self.data = ipautil.CIDict()
elif isinstance(entrydata,dict):
self.dn = entrydata['dn']
del entrydata['dn']
- self.data = ipapython.ipautil.CIDict(entrydata)
+ self.data = ipautil.CIDict(entrydata)
else:
self.dn = ''
- self.data = ipapython.ipautil.CIDict()
+ self.data = ipautil.CIDict()
- self.orig_data = ipapython.ipautil.CIDict(copy_CIDict(self.data))
+ self.orig_data = ipautil.CIDict(copy_CIDict(self.data))
def __nonzero__(self):
"""This allows us to do tests like if entry: returns false if there is no data,
@@ -120,9 +109,9 @@ class Entity:
if (len(value) < 1):
return
if (len(value) == 1):
- self.data[name] = utf8_encode_values(value[0])
+ self.data[name] = ipautil.utf8_encode_values(value[0])
else:
- self.data[name] = utf8_encode_values(value)
+ self.data[name] = ipautil.utf8_encode_values(value)
setValues = setValue
@@ -161,7 +150,7 @@ class Entity:
def toDict(self):
"""Convert the attrs and values to a dict. The dict is keyed on the
attribute name. The value is either single value or a list of values."""
- result = ipapython.ipautil.CIDict(self.data)
+ result = ipautil.CIDict(self.data)
result['dn'] = self.dn
return result
@@ -171,7 +160,7 @@ class Entity:
def origDataDict(self):
"""Returns a dict of the original values of the user. Used for updates."""
- result = ipapython.ipautil.CIDict(self.orig_data)
+ result = ipautil.CIDict(self.orig_data)
result['dn'] = self.dn
return result
diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py
index 69c328934..4a9db11e2 100644
--- a/ipapython/ipautil.py
+++ b/ipapython/ipautil.py
@@ -195,6 +195,7 @@ def format_netloc(host, port=None):
return '%s:%s' % (host, str(port))
def realm_to_suffix(realm_name):
+ """Convert a kerberos realm into the IPA suffix."""
s = realm_name.split(".")
terms = ["dc=" + x.lower() for x in s]
return ",".join(terms)
@@ -406,32 +407,32 @@ class CIDict(dict):
If you extend UserDict, isinstance(foo, dict) returns false.
"""
- def __init__(self,default=None):
+ def __init__(self, default=None):
super(CIDict, self).__init__()
self._keys = {}
self.update(default or {})
- def __getitem__(self,key):
- return super(CIDict,self).__getitem__(string.lower(key))
+ def __getitem__(self, key):
+ return super(CIDict, self).__getitem__(key.lower())
- def __setitem__(self,key,value):
- lower_key = string.lower(key)
+ def __setitem__(self, key, value):
+ lower_key = key.lower()
self._keys[lower_key] = key
- return super(CIDict,self).__setitem__(string.lower(key),value)
+ return super(CIDict, self).__setitem__(lower_key, value)
- def __delitem__(self,key):
- lower_key = string.lower(key)
+ def __delitem__(self, key):
+ lower_key = key.lower()
del self._keys[lower_key]
- return super(CIDict,self).__delitem__(string.lower(key))
+ return super(CIDict, self).__delitem__(key.lower())
- def update(self,dict):
+ def update(self, dict):
for key in dict.keys():
self[key] = dict[key]
- def has_key(self,key):
- return super(CIDict, self).has_key(string.lower(key))
+ def has_key(self, key):
+ return super(CIDict, self).has_key(key.lower())
- def get(self,key,failobj=None):
+ def get(self, key, failobj=None):
try:
return self[key]
except KeyError:
@@ -443,7 +444,7 @@ class CIDict(dict):
def items(self):
result = []
for k in self._keys.values():
- result.append((k,self[k]))
+ result.append((k, self[k]))
return result
def copy(self):
@@ -458,7 +459,7 @@ class CIDict(dict):
def iterkeys(self):
return self.copy().iterkeys()
- def setdefault(self,key,value=None):
+ def setdefault(self, key, value=None):
try:
return self[key]
except KeyError:
@@ -476,11 +477,11 @@ class CIDict(dict):
raise
def popitem(self):
- (lower_key,value) = super(CIDict,self).popitem()
+ (lower_key, value) = super(CIDict, self).popitem()
key = self._keys[lower_key]
del self._keys[lower_key]
- return (key,value)
+ return (key, value)
class GeneralizedTimeZone(datetime.tzinfo):
@@ -610,118 +611,17 @@ def ipa_generate_password(characters=None,pwd_len=None):
rndpwd += rndchar
return rndpwd
-def format_list(items, quote=None, page_width=80):
- '''Format a list of items formatting them so they wrap to fit the
- available width. The items will be sorted.
+def parse_items(text):
+ '''Given text with items separated by whitespace or comma, return a list of those items
- The items may optionally be quoted. The quote parameter may either be
- a string, in which case it is added before and after the item. Or the
- quote parameter may be a pair (either a tuple or list). In this case
- quote[0] is left hand quote and quote[1] is the right hand quote.
- '''
- left_quote = right_quote = ''
- num_items = len(items)
- if not num_items: return ""
-
- if quote is not None:
- if type(quote) in StringTypes:
- left_quote = right_quote = quote
- elif type(quote) is TupleType or type(quote) is ListType:
- left_quote = quote[0]
- right_quote = quote[1]
-
- max_len = max(map(len, items))
- max_len += len(left_quote) + len(right_quote)
- num_columns = (page_width + max_len) / (max_len+1)
- num_rows = (num_items + num_columns - 1) / num_columns
- items.sort()
-
- rows = [''] * num_rows
- i = row = col = 0
-
- while i < num_items:
- row = 0
- if col == 0:
- separator = ''
- else:
- separator = ' '
-
- while i < num_items and row < num_rows:
- rows[row] += "%s%*s" % (separator, -max_len, "%s%s%s" % (left_quote, items[i], right_quote))
- i += 1
- row += 1
- col += 1
- return '\n'.join(rows)
-
-key_value_re = re.compile("(\w+)\s*=\s*(([^\s'\\\"]+)|(?P<quote>['\\\"])((?P=quote)|(.*?[^\\\])(?P=quote)))")
-def parse_key_value_pairs(input):
- ''' Given a string composed of key=value pairs parse it and return
- a dict of the key/value pairs. Keys must be a word, a key must be followed
- by an equal sign (=) and a value. The value may be a single word or may be
- quoted. Quotes may be either single or double quotes, but must be balanced.
- Inside the quoted text the same quote used to start the quoted value may be
- used if it is escaped by preceding it with a backslash (\).
- White space between the key, the equal sign, and the value is ignored.
- Values are always strings. Empty values must be specified with an empty
- quoted string, it's value after parsing will be an empty string.
-
- Example: The string
-
- arg0 = '' arg1 = 1 arg2='two' arg3 = "three's a crowd" arg4 = "this is a \" quote"
-
- will produce
-
- arg0= arg1=1
- arg2=two
- arg3=three's a crowd
- arg4=this is a " quote
+ The returned list only contains non-empty items.
'''
-
- kv_dict = {}
- for match in key_value_re.finditer(input):
- key = match.group(1)
- quote = match.group('quote')
- if match.group(5):
- value = match.group(6)
- if value is None: value = ''
- value = re.sub('\\\%s' % quote, quote, value)
- else:
- value = match.group(2)
- kv_dict[key] = value
- return kv_dict
-
-def parse_items(text):
- '''Given text with items separated by whitespace or comma, return a list of those items'''
split_re = re.compile('[ ,\t\n]+')
items = split_re.split(text)
for item in items[:]:
if not item: items.remove(item)
return items
-def read_pairs_file(filename):
- comment_re = re.compile('#.*$', re.MULTILINE)
- if filename == '-':
- fd = sys.stdin
- else:
- fd = open(filename)
- text = fd.read()
- text = comment_re.sub('', text) # kill comments
- pairs = parse_key_value_pairs(text)
- if fd != sys.stdin: fd.close()
- return pairs
-
-def read_items_file(filename):
- comment_re = re.compile('#.*$', re.MULTILINE)
- if filename == '-':
- fd = sys.stdin
- else:
- fd = open(filename)
- text = fd.read()
- text = comment_re.sub('', text) # kill comments
- items = parse_items(text)
- if fd != sys.stdin: fd.close()
- return items
-
def user_input(prompt, default = None, allow_empty = True):
if default == None:
while True:
@@ -761,357 +661,6 @@ def user_input(prompt, default = None, allow_empty = True):
else:
return ret
-def user_input_plain(prompt, default = None, allow_empty = True, allow_spaces = True):
- while True:
- ret = user_input(prompt, default, allow_empty)
- if ipavalidate.Plain(ret, not allow_empty, allow_spaces):
- return ret
-
-class AttributeValueCompleter:
- '''
- Gets input from the user in the form "lhs operator rhs"
- TAB completes partial input.
- lhs completes to a name in @lhs_names
- The lhs is fully parsed if a lhs_delim delimiter is seen, then TAB will
- complete to the operator and a default value.
- Default values for a lhs value can specified as:
- - a string, all lhs values will use this default
- - a dict, the lhs value is looked up in the dict to return the default or None
- - a function with a single arg, the lhs value, it returns the default or None
-
- After creating the completer you must open it to set the terminal
- up, Then get a line of input from the user by calling read_input()
- which returns two values, the lhs and rhs, which might be None if
- lhs or rhs was not parsed. After you are done getting input you
- should close the completer to restore the terminal.
-
- Example: (note this is essentially what the convenience function get_pairs() does)
-
- This will allow the user to autocomplete foo & foobar, both have
- defaults defined in a dict. In addition the foobar attribute must
- be specified before the prompting loop will exit. Also, this
- example show how to require that each attrbute entered by the user
- is valid.
-
- attrs = ['foo', 'foobar']
- defaults = {'foo' : 'foo_default', 'foobar' : 'foobar_default'}
- mandatory_attrs = ['foobar']
-
- c = AttributeValueCompleter(attrs, defaults)
- c.open()
- mandatory_attrs_remaining = mandatory_attrs[:]
-
- while True:
- if mandatory_attrs_remaining:
- attribute, value = c.read_input("Enter: ", mandatory_attrs_remaining[0])
- try:
- mandatory_attrs_remaining.remove(attribute)
- except ValueError:
- pass
- else:
- attribute, value = c.read_input("Enter: ")
- if attribute is None:
- # Are we done?
- if mandatory_attrs_remaining:
- print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
- continue
- else:
- break
- if attribute not in attrs:
- print "ERROR: %s is not a valid attribute" % (attribute)
- else:
- print "got '%s' = '%s'" % (attribute, value)
-
- c.close()
- print "exiting..."
- '''
-
- def __init__(self, lhs_names, default_value=None, lhs_regexp=r'^\s*(?P<lhs>[^ =]+)', lhs_delims=' =',
- operator='=', strip_rhs=True):
- self.lhs_names = lhs_names
- self.default_value = default_value
- # lhs_regexp must have named group 'lhs' which returns the contents of the lhs
- self.lhs_regexp = lhs_regexp
- self.lhs_re = re.compile(self.lhs_regexp)
- self.lhs_delims = lhs_delims
- self.operator = operator
- self.strip_rhs = strip_rhs
- self.pairs = None
- self._reset()
-
- def _reset(self):
- self.lhs = None
- self.lhs_complete = False
- self.operator_complete = False
- self.rhs = None
-
- def open(self):
- # Save state
- self.prev_completer = readline.get_completer()
- self.prev_completer_delims = readline.get_completer_delims()
-
- # Set up for ourself
- readline.parse_and_bind("tab: complete")
- readline.set_completer(self.complete)
- readline.set_completer_delims(self.lhs_delims)
-
- def close(self):
- # Restore previous state
- readline.set_completer_delims(self.prev_completer_delims)
- readline.set_completer(self.prev_completer)
-
- def parse_input(self):
- '''We are looking for 3 tokens: <lhs,op,rhs>
- Extract as much of each token as possible.
- Set flags indicating if token is fully parsed.
- '''
- try:
- self._reset()
- buf_len = len(self.line_buffer)
- pos = 0
- lhs_match = self.lhs_re.search(self.line_buffer, pos)
- if not lhs_match: return # no lhs content
- self.lhs = lhs_match.group('lhs') # get lhs contents
- pos = lhs_match.end('lhs') # new scanning position
- if pos == buf_len: return # nothing after lhs, lhs incomplete
- self.lhs_complete = True # something trails the lhs, lhs is complete
- operator_beg = self.line_buffer.find(self.operator, pos) # locate operator
- if operator_beg == -1: return # did not find the operator
- self.operator_complete = True # operator fully parsed
- operator_end = operator_beg + len(self.operator)
- pos = operator_end # step over the operator
- self.rhs = self.line_buffer[pos:]
- except Exception, e:
- traceback.print_exc()
- print "Exception in %s.parse_input(): %s" % (self.__class__.__name__, e)
-
- def get_default_value(self):
- '''default_value can be a string, a dict, or a function.
- If it's a string it's a global default for all attributes.
- If it's a dict the default is looked up in the dict index by attribute.
- If it's a function, the function is called with 1 parameter, the attribute
- and it should return the default value for the attriubte or None'''
-
- if not self.lhs_complete: raise ValueError("attribute not parsed")
-
- # If the user previously provided a value let that override the supplied default
- if self.pairs is not None:
- prev_value = self.pairs.get(self.lhs)
- if prev_value is not None: return prev_value
-
- # No previous user provided value, query for a default
- default_value_type = type(self.default_value)
- if default_value_type is DictType:
- return self.default_value.get(self.lhs, None)
- elif default_value_type is FunctionType:
- return self.default_value(self.lhs)
- elif default_value_type is StringType:
- return self.default_value
- else:
- return None
-
- def get_lhs_completions(self, text):
- if text:
- self.completions = [lhs for lhs in self.lhs_names if lhs.startswith(text)]
- else:
- self.completions = self.lhs_names
-
- def complete(self, state):
- self.line_buffer= readline.get_line_buffer()
- self.parse_input()
- if not self.lhs_complete:
- # lhs is not complete, set up to complete the lhs
- if state == 0:
- beg = readline.get_begidx()
- end = readline.get_endidx()
- self.get_lhs_completions(self.line_buffer[beg:end])
- if state >= len(self.completions): return None
- return self.completions[state]
-
-
- elif not self.operator_complete:
- # lhs is complete, but the operator is not so we complete
- # by inserting the operator manually.
- # Also try to complete the default value at this time.
- readline.insert_text('%s ' % self.operator)
- default_value = self.get_default_value()
- if default_value is not None:
- readline.insert_text(default_value)
- readline.redisplay()
- return None
- else:
- # lhs and operator are complete, if the rhs is blank
- # (either empty or only only whitespace) then attempt
- # to complete by inserting the default value, otherwise
- # there is nothing we can complete to so we're done.
- if self.rhs.strip():
- return None
- default_value = self.get_default_value()
- if default_value is not None:
- readline.insert_text(default_value)
- readline.redisplay()
- return None
-
- def pre_input_hook(self):
- readline.insert_text('%s %s ' % (self.initial_lhs, self.operator))
- readline.redisplay()
-
- def read_input(self, prompt, initial_lhs=None):
- self.initial_lhs = initial_lhs
- try:
- self._reset()
- if initial_lhs is None:
- readline.set_pre_input_hook(None)
- else:
- readline.set_pre_input_hook(self.pre_input_hook)
- self.line_buffer = raw_input(prompt).strip()
- self.parse_input()
- if self.strip_rhs and self.rhs is not None:
- return self.lhs, self.rhs.strip()
- else:
- return self.lhs, self.rhs
- except EOFError:
- return None, None
-
- def get_pairs(self, prompt, mandatory_attrs=None, validate_callback=None, must_match=True, value_required=True):
- self.pairs = {}
- if mandatory_attrs:
- mandatory_attrs_remaining = mandatory_attrs[:]
- else:
- mandatory_attrs_remaining = []
-
- print "Enter name = value"
- print "Press <ENTER> to accept, a blank line terminates input"
- print "Pressing <TAB> will auto completes name, assignment, and value"
- print
- while True:
- if mandatory_attrs_remaining:
- attribute, value = self.read_input(prompt, mandatory_attrs_remaining[0])
- else:
- attribute, value = self.read_input(prompt)
- if attribute is None:
- # Are we done?
- if mandatory_attrs_remaining:
- print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
- continue
- else:
- break
- if value is None:
- if value_required:
- print "ERROR: you must specify a value for %s" % attribute
- continue
- else:
- if must_match and attribute not in self.lhs_names:
- print "ERROR: %s is not a valid name" % (attribute)
- continue
- if validate_callback is not None:
- if not validate_callback(attribute, value):
- print "ERROR: %s is not valid for %s" % (value, attribute)
- continue
- try:
- mandatory_attrs_remaining.remove(attribute)
- except ValueError:
- pass
-
- self.pairs[attribute] = value
- return self.pairs
-
-class ItemCompleter:
- '''
- Prompts the user for items in a list of items with auto completion.
- TAB completes partial input.
- More than one item can be specifed during input, whitespace and/or comma's seperate.
- Example:
-
- possible_items = ['foo', 'bar']
- c = ItemCompleter(possible_items)
- c.open()
- # Use read_input() to limit input to a single carriage return (e.g. <ENTER>)
- #items = c.read_input("Enter: ")
- # Use get_items to iterate until a blank line is entered.
- items = c.get_items("Enter: ")
- c.close()
- print "items=%s" % (items)
-
- '''
-
- def __init__(self, items):
- self.items = items
- self.initial_input = None
- self.item_delims = ' \t,'
- self.operator = '='
- self.split_re = re.compile('[%s]+' % self.item_delims)
-
- def open(self):
- # Save state
- self.prev_completer = readline.get_completer()
- self.prev_completer_delims = readline.get_completer_delims()
-
- # Set up for ourself
- readline.parse_and_bind("tab: complete")
- readline.set_completer(self.complete)
- readline.set_completer_delims(self.item_delims)
-
- def close(self):
- # Restore previous state
- readline.set_completer_delims(self.prev_completer_delims)
- readline.set_completer(self.prev_completer)
-
- def get_item_completions(self, text):
- if text:
- self.completions = [lhs for lhs in self.items if lhs.startswith(text)]
- else:
- self.completions = self.items
-
- def complete(self, state):
- self.line_buffer= readline.get_line_buffer()
- if state == 0:
- beg = readline.get_begidx()
- end = readline.get_endidx()
- self.get_item_completions(self.line_buffer[beg:end])
- if state >= len(self.completions): return None
- return self.completions[state]
-
- def pre_input_hook(self):
- readline.insert_text('%s %s ' % (self.initial_input, self.operator))
- readline.redisplay()
-
- def read_input(self, prompt, initial_input=None):
- items = []
-
- self.initial_input = initial_input
- try:
- if initial_input is None:
- readline.set_pre_input_hook(None)
- else:
- readline.set_pre_input_hook(self.pre_input_hook)
- self.line_buffer = raw_input(prompt).strip()
- items = self.split_re.split(self.line_buffer)
- for item in items[:]:
- if not item: items.remove(item)
- return items
- except EOFError:
- return items
-
- def get_items(self, prompt, must_match=True):
- items = []
-
- print "Enter name [name ...]"
- print "Press <ENTER> to accept, blank line or control-D terminates input"
- print "Pressing <TAB> auto completes name"
- print
- while True:
- new_items = self.read_input(prompt)
- if not new_items: break
- for item in new_items:
- if must_match:
- if item not in self.items:
- print "ERROR: %s is not valid" % (item)
- continue
- if item in items: continue
- items.append(item)
-
- return items
def get_gsserror(e):
"""
@@ -1446,3 +995,16 @@ def make_sshfp(key):
else:
return
return '%d 1 %s' % (algo, fp)
+
+
+def utf8_encode_value(value):
+ if isinstance(value, unicode):
+ return value.encode('utf-8')
+ return value
+
+
+def utf8_encode_values(values):
+ if isinstance(values, list) or isinstance(values, tuple):
+ return map(utf8_encode_value, values)
+ else:
+ return utf8_encode_value(values)