summaryrefslogtreecommitdiffstats
path: root/ipa-python/ipautil.py
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2009-02-05 15:03:08 -0500
committerRob Crittenden <rcritten@redhat.com>2009-02-09 14:35:15 -0500
commit262ff2d731b1bfc4acd91153088b8fcde7ae92b8 (patch)
treebaf8894d4b357b610113b87d4bfee84de24f08bd /ipa-python/ipautil.py
parent58ae191a5afbf29d78afd3969f8d106415897958 (diff)
downloadfreeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.tar.gz
freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.tar.xz
freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.zip
Rename ipa-python directory to ipapython so it is a real python library
We used to install it as ipa, now installing it as ipapython. The rpm is still ipa-python.
Diffstat (limited to 'ipa-python/ipautil.py')
-rw-r--r--ipa-python/ipautil.py969
1 files changed, 0 insertions, 969 deletions
diff --git a/ipa-python/ipautil.py b/ipa-python/ipautil.py
deleted file mode 100644
index aa49c3ba3..000000000
--- a/ipa-python/ipautil.py
+++ /dev/null
@@ -1,969 +0,0 @@
-# Authors: Simo Sorce <ssorce@redhat.com>
-#
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-SHARE_DIR = "/usr/share/ipa/"
-PLUGINS_SHARE_DIR = "/usr/share/ipa/plugins"
-
-import string
-import tempfile
-import logging
-import subprocess
-import random
-import os, sys, traceback, readline
-import stat
-import shutil
-
-from ipa import ipavalidate
-from types import *
-
-import re
-import xmlrpclib
-import datetime
-from ipa import config
-try:
- from subprocess import CalledProcessError
- class CalledProcessError(subprocess.CalledProcessError):
- def __init__(self, returncode, cmd):
- super(CalledProcessError, self).__init__(returncode, cmd)
-except ImportError:
- # Python 2.4 doesn't implement CalledProcessError
- class CalledProcessError(Exception):
- """This exception is raised when a process run by check_call() returns
- a non-zero exit status. The exit status will be stored in the
- returncode attribute."""
- def __init__(self, returncode, cmd):
- self.returncode = returncode
- self.cmd = cmd
- def __str__(self):
- return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
-
-def get_domain_name():
- try:
- config.init_config()
- domain_name = config.config.get_domain()
- except Exception, e:
- return None
-
- return domain_name
-
-def realm_to_suffix(realm_name):
- s = realm_name.split(".")
- terms = ["dc=" + x.lower() for x in s]
- return ",".join(terms)
-
-def template_str(txt, vars):
- return string.Template(txt).substitute(vars)
-
-def template_file(infilename, vars):
- txt = open(infilename).read()
- return template_str(txt, vars)
-
-def write_tmp_file(txt):
- fd = tempfile.NamedTemporaryFile()
- fd.write(txt)
- fd.flush()
-
- return fd
-
-def run(args, stdin=None):
- if stdin:
- p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
- stdout,stderr = p.communicate(stdin)
- else:
- p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
- stdout,stderr = p.communicate()
-
- logging.info(stdout)
- logging.info(stderr)
-
- if p.returncode != 0:
- raise CalledProcessError(p.returncode, ' '.join(args))
-
- return (stdout, stderr)
-
-def file_exists(filename):
- try:
- mode = os.stat(filename)[stat.ST_MODE]
- if stat.S_ISREG(mode):
- return True
- else:
- return False
- except:
- return False
-
-def dir_exists(filename):
- try:
- mode = os.stat(filename)[stat.ST_MODE]
- if stat.S_ISDIR(mode):
- return True
- else:
- return False
- except:
- return False
-
-def install_file(fname, dest):
- if file_exists(dest):
- os.rename(dest, dest + ".orig")
- shutil.move(fname, dest)
-
-def backup_file(fname):
- if file_exists(fname):
- os.rename(fname, fname + ".orig")
-
-# uses gpg to compress and encrypt a file
-def encrypt_file(source, dest, password, workdir = None):
- if type(source) is not StringType or not len(source):
- raise ValueError('Missing Source File')
- #stat it so that we get back an exception if it does no t exist
- os.stat(source)
-
- if type(dest) is not StringType or not len(dest):
- raise ValueError('Missing Destination File')
-
- if type(password) is not StringType or not len(password):
- raise ValueError('Missing Password')
-
- #create a tempdir so that we can clean up with easily
- tempdir = tempfile.mkdtemp('', 'ipa-', workdir)
- gpgdir = tempdir+"/.gnupg"
-
- try:
- try:
- #give gpg a fake dir so that we can leater remove all
- #the cruft when we clean up the tempdir
- os.mkdir(gpgdir)
- args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-c', source]
- run(args, password)
- except:
- raise
- finally:
- #job done, clean up
- shutil.rmtree(tempdir, ignore_errors=True)
-
-
-def decrypt_file(source, dest, password, workdir = None):
- if type(source) is not StringType or not len(source):
- raise ValueError('Missing Source File')
- #stat it so that we get back an exception if it does no t exist
- os.stat(source)
-
- if type(dest) is not StringType or not len(dest):
- raise ValueError('Missing Destination File')
-
- if type(password) is not StringType or not len(password):
- raise ValueError('Missing Password')
-
- #create a tempdir so that we can clean up with easily
- tempdir = tempfile.mkdtemp('', 'ipa-', workdir)
- gpgdir = tempdir+"/.gnupg"
-
- try:
- try:
- #give gpg a fake dir so that we can leater remove all
- #the cruft when we clean up the tempdir
- os.mkdir(gpgdir)
- args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-d', source]
- run(args, password)
- except:
- raise
- finally:
- #job done, clean up
- shutil.rmtree(tempdir, ignore_errors=True)
-
-
-class CIDict(dict):
- """
- Case-insensitive but case-respecting dictionary.
-
- This code is derived from python-ldap's cidict.py module,
- written by stroeder: http://python-ldap.sourceforge.net/
-
- This version extends 'dict' so it works properly with TurboGears.
- If you extend UserDict, isinstance(foo, dict) returns false.
- """
-
- def __init__(self,default=None):
- super(CIDict, self).__init__()
- self._keys = {}
- self.update(default or {})
-
- def __getitem__(self,key):
- return super(CIDict,self).__getitem__(string.lower(key))
-
- def __setitem__(self,key,value):
- lower_key = string.lower(key)
- self._keys[lower_key] = key
- return super(CIDict,self).__setitem__(string.lower(key),value)
-
- def __delitem__(self,key):
- lower_key = string.lower(key)
- del self._keys[lower_key]
- return super(CIDict,self).__delitem__(string.lower(key))
-
- def update(self,dict):
- for key in dict.keys():
- self[key] = dict[key]
-
- def has_key(self,key):
- return super(CIDict, self).has_key(string.lower(key))
-
- def get(self,key,failobj=None):
- try:
- return self[key]
- except KeyError:
- return failobj
-
- def keys(self):
- return self._keys.values()
-
- def items(self):
- result = []
- for k in self._keys.values():
- result.append((k,self[k]))
- return result
-
- def copy(self):
- copy = {}
- for k in self._keys.values():
- copy[k] = self[k]
- return copy
-
- def iteritems(self):
- return self.copy().iteritems()
-
- def iterkeys(self):
- return self.copy().iterkeys()
-
- def setdefault(self,key,value=None):
- try:
- return self[key]
- except KeyError:
- self[key] = value
- return value
-
- def pop(self, key, *args):
- try:
- value = self[key]
- del self[key]
- return value
- except KeyError:
- if len(args) == 1:
- return args[0]
- raise
-
- def popitem(self):
- (lower_key,value) = super(CIDict,self).popitem()
- key = self._keys[lower_key]
- del self._keys[lower_key]
-
- return (key,value)
-
-
-#
-# The safe_string_re regexp and needs_base64 function are extracted from the
-# python-ldap ldif module, which was
-# written by Michael Stroeder <michael@stroeder.com>
-# http://python-ldap.sourceforge.net
-#
-# It was extracted because ipaldap.py is naughtily reaching into the ldif
-# module and squashing this regexp.
-#
-SAFE_STRING_PATTERN = '(^(\000|\n|\r| |:|<)|[\000\n\r\200-\377]+|[ ]+$)'
-safe_string_re = re.compile(SAFE_STRING_PATTERN)
-
-def needs_base64(s):
- """
- returns 1 if s has to be base-64 encoded because of special chars
- """
- return not safe_string_re.search(s) is None
-
-
-def wrap_binary_data(data):
- """Converts all binary data strings into Binary objects for transport
- back over xmlrpc."""
- if isinstance(data, str):
- if needs_base64(data):
- return xmlrpclib.Binary(data)
- else:
- return data
- elif isinstance(data, list) or isinstance(data,tuple):
- retval = []
- for value in data:
- retval.append(wrap_binary_data(value))
- return retval
- elif isinstance(data, dict):
- retval = {}
- for (k,v) in data.iteritems():
- retval[k] = wrap_binary_data(v)
- return retval
- else:
- return data
-
-
-def unwrap_binary_data(data):
- """Converts all Binary objects back into strings."""
- if isinstance(data, xmlrpclib.Binary):
- # The data is decoded by the xmlproxy, but is stored
- # in a binary object for us.
- return str(data)
- elif isinstance(data, str):
- return data
- elif isinstance(data, list) or isinstance(data,tuple):
- retval = []
- for value in data:
- retval.append(unwrap_binary_data(value))
- return retval
- elif isinstance(data, dict):
- retval = {}
- for (k,v) in data.iteritems():
- retval[k] = unwrap_binary_data(v)
- return retval
- else:
- return data
-
-class GeneralizedTimeZone(datetime.tzinfo):
- """This class is a basic timezone wrapper for the offset specified
- in a Generalized Time. It is dst-ignorant."""
- def __init__(self,offsetstr="Z"):
- super(GeneralizedTimeZone, self).__init__()
-
- self.name = offsetstr
- self.houroffset = 0
- self.minoffset = 0
-
- if offsetstr == "Z":
- self.houroffset = 0
- self.minoffset = 0
- else:
- if (len(offsetstr) >= 3) and re.match(r'[-+]\d\d', offsetstr):
- self.houroffset = int(offsetstr[0:3])
- offsetstr = offsetstr[3:]
- if (len(offsetstr) >= 2) and re.match(r'\d\d', offsetstr):
- self.minoffset = int(offsetstr[0:2])
- offsetstr = offsetstr[2:]
- if len(offsetstr) > 0:
- raise ValueError()
- if self.houroffset < 0:
- self.minoffset *= -1
-
- def utcoffset(self, dt):
- return datetime.timedelta(hours=self.houroffset, minutes=self.minoffset)
-
- def dst(self, dt):
- return datetime.timedelta(0)
-
- def tzname(self, dt):
- return self.name
-
-
-def parse_generalized_time(timestr):
- """Parses are Generalized Time string (as specified in X.680),
- returning a datetime object. Generalized Times are stored inside
- the krbPasswordExpiration attribute in LDAP.
-
- This method doesn't attempt to be perfect wrt timezones. If python
- can't be bothered to implement them, how can we..."""
-
- if len(timestr) < 8:
- return None
- try:
- date = timestr[:8]
- time = timestr[8:]
-
- year = int(date[:4])
- month = int(date[4:6])
- day = int(date[6:8])
-
- hour = min = sec = msec = 0
- tzone = None
-
- if (len(time) >= 2) and re.match(r'\d', time[0]):
- hour = int(time[:2])
- time = time[2:]
- if len(time) >= 2 and (time[0] == "," or time[0] == "."):
- hour_fraction = "."
- time = time[1:]
- while (len(time) > 0) and re.match(r'\d', time[0]):
- hour_fraction += time[0]
- time = time[1:]
- total_secs = int(float(hour_fraction) * 3600)
- min, sec = divmod(total_secs, 60)
-
- if (len(time) >= 2) and re.match(r'\d', time[0]):
- min = int(time[:2])
- time = time[2:]
- if len(time) >= 2 and (time[0] == "," or time[0] == "."):
- min_fraction = "."
- time = time[1:]
- while (len(time) > 0) and re.match(r'\d', time[0]):
- min_fraction += time[0]
- time = time[1:]
- sec = int(float(min_fraction) * 60)
-
- if (len(time) >= 2) and re.match(r'\d', time[0]):
- sec = int(time[:2])
- time = time[2:]
- if len(time) >= 2 and (time[0] == "," or time[0] == "."):
- sec_fraction = "."
- time = time[1:]
- while (len(time) > 0) and re.match(r'\d', time[0]):
- sec_fraction += time[0]
- time = time[1:]
- msec = int(float(sec_fraction) * 1000000)
-
- if (len(time) > 0):
- tzone = GeneralizedTimeZone(time)
-
- return datetime.datetime(year, month, day, hour, min, sec, msec, tzone)
-
- except ValueError:
- return None
-
-def ipa_generate_password():
- rndpwd = ''
- r = random.SystemRandom()
- for x in range(12):
- rndpwd += chr(r.randint(32,126))
- return rndpwd
-
-
-def format_list(items, quote=None, page_width=80):
- '''Format a list of items formatting them so they wrap to fit the
- available width. The items will be sorted.
-
- The items may optionally be quoted. The quote parameter may either be
- a string, in which case it is added before and after the item. Or the
- quote parameter may be a pair (either a tuple or list). In this case
- quote[0] is left hand quote and quote[1] is the right hand quote.
- '''
- left_quote = right_quote = ''
- num_items = len(items)
- if not num_items: return ""
-
- if quote is not None:
- if type(quote) in StringTypes:
- left_quote = right_quote = quote
- elif type(quote) is TupleType or type(quote) is ListType:
- left_quote = quote[0]
- right_quote = quote[1]
-
- max_len = max(map(len, items))
- max_len += len(left_quote) + len(right_quote)
- num_columns = (page_width + max_len) / (max_len+1)
- num_rows = (num_items + num_columns - 1) / num_columns
- items.sort()
-
- rows = [''] * num_rows
- i = row = col = 0
-
- while i < num_items:
- row = 0
- if col == 0:
- separator = ''
- else:
- separator = ' '
-
- while i < num_items and row < num_rows:
- rows[row] += "%s%*s" % (separator, -max_len, "%s%s%s" % (left_quote, items[i], right_quote))
- i += 1
- row += 1
- col += 1
- return '\n'.join(rows)
-
-key_value_re = re.compile("(\w+)\s*=\s*(([^\s'\\\"]+)|(?P<quote>['\\\"])((?P=quote)|(.*?[^\\\])(?P=quote)))")
-def parse_key_value_pairs(input):
- ''' Given a string composed of key=value pairs parse it and return
- a dict of the key/value pairs. Keys must be a word, a key must be followed
- by an equal sign (=) and a value. The value may be a single word or may be
- quoted. Quotes may be either single or double quotes, but must be balanced.
- Inside the quoted text the same quote used to start the quoted value may be
- used if it is escaped by preceding it with a backslash (\).
- White space between the key, the equal sign, and the value is ignored.
- Values are always strings. Empty values must be specified with an empty
- quoted string, it's value after parsing will be an empty string.
-
- Example: The string
-
- arg0 = '' arg1 = 1 arg2='two' arg3 = "three's a crowd" arg4 = "this is a \" quote"
-
- will produce
-
- arg0= arg1=1
- arg2=two
- arg3=three's a crowd
- arg4=this is a " quote
- '''
-
- kv_dict = {}
- for match in key_value_re.finditer(input):
- key = match.group(1)
- quote = match.group('quote')
- if match.group(5):
- value = match.group(6)
- if value is None: value = ''
- value = re.sub('\\\%s' % quote, quote, value)
- else:
- value = match.group(2)
- kv_dict[key] = value
- return kv_dict
-
-def parse_items(text):
- '''Given text with items separated by whitespace or comma, return a list of those items'''
- split_re = re.compile('[ ,\t\n]+')
- items = split_re.split(text)
- for item in items[:]:
- if not item: items.remove(item)
- return items
-
-def read_pairs_file(filename):
- comment_re = re.compile('#.*$', re.MULTILINE)
- if filename == '-':
- fd = sys.stdin
- else:
- fd = open(filename)
- text = fd.read()
- text = comment_re.sub('', text) # kill comments
- pairs = parse_key_value_pairs(text)
- if fd != sys.stdin: fd.close()
- return pairs
-
-def read_items_file(filename):
- comment_re = re.compile('#.*$', re.MULTILINE)
- if filename == '-':
- fd = sys.stdin
- else:
- fd = open(filename)
- text = fd.read()
- text = comment_re.sub('', text) # kill comments
- items = parse_items(text)
- if fd != sys.stdin: fd.close()
- return items
-
-def user_input(prompt, default = None, allow_empty = True):
- if default == None:
- while True:
- ret = raw_input("%s: " % prompt)
- if allow_empty or ret.strip():
- return ret
-
- if isinstance(default, basestring):
- while True:
- ret = raw_input("%s [%s]: " % (prompt, default))
- if not ret and (allow_empty or default):
- return default
- elif ret.strip():
- return ret
- if isinstance(default, bool):
- if default:
- choice = "yes"
- else:
- choice = "no"
- while True:
- ret = raw_input("%s [%s]: " % (prompt, choice))
- if not ret:
- return default
- elif ret.lower()[0] == "y":
- return True
- elif ret.lower()[0] == "n":
- return False
- if isinstance(default, int):
- while True:
- try:
- ret = raw_input("%s [%s]: " % (prompt, default))
- if not ret:
- return default
- ret = int(ret)
- except ValueError:
- pass
- else:
- return ret
-
-def user_input_plain(prompt, default = None, allow_empty = True, allow_spaces = True):
- while True:
- ret = user_input(prompt, default, allow_empty)
- if ipavalidate.Plain(ret, not allow_empty, allow_spaces):
- return ret
-
-def user_input_path(prompt, default = None, allow_empty = True):
- if default != None and allow_empty:
- prompt += " (enter \"none\" for empty)"
- while True:
- ret = user_input(prompt, default, allow_empty)
- if allow_empty and ret.lower() == "none":
- return ""
- if ipavalidate.Path(ret, not allow_empty):
- return ret
-
-class AttributeValueCompleter:
- '''
- Gets input from the user in the form "lhs operator rhs"
- TAB completes partial input.
- lhs completes to a name in @lhs_names
- The lhs is fully parsed if a lhs_delim delimiter is seen, then TAB will
- complete to the operator and a default value.
- Default values for a lhs value can specified as:
- - a string, all lhs values will use this default
- - a dict, the lhs value is looked up in the dict to return the default or None
- - a function with a single arg, the lhs value, it returns the default or None
-
- After creating the completer you must open it to set the terminal
- up, Then get a line of input from the user by calling read_input()
- which returns two values, the lhs and rhs, which might be None if
- lhs or rhs was not parsed. After you are done getting input you
- should close the completer to restore the terminal.
-
- Example: (note this is essentially what the convenience function get_pairs() does)
-
- This will allow the user to autocomplete foo & foobar, both have
- defaults defined in a dict. In addition the foobar attribute must
- be specified before the prompting loop will exit. Also, this
- example show how to require that each attrbute entered by the user
- is valid.
-
- attrs = ['foo', 'foobar']
- defaults = {'foo' : 'foo_default', 'foobar' : 'foobar_default'}
- mandatory_attrs = ['foobar']
-
- c = AttributeValueCompleter(attrs, defaults)
- c.open()
- mandatory_attrs_remaining = mandatory_attrs[:]
-
- while True:
- if mandatory_attrs_remaining:
- attribute, value = c.read_input("Enter: ", mandatory_attrs_remaining[0])
- try:
- mandatory_attrs_remaining.remove(attribute)
- except ValueError:
- pass
- else:
- attribute, value = c.read_input("Enter: ")
- if attribute is None:
- # Are we done?
- if mandatory_attrs_remaining:
- print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
- continue
- else:
- break
- if attribute not in attrs:
- print "ERROR: %s is not a valid attribute" % (attribute)
- else:
- print "got '%s' = '%s'" % (attribute, value)
-
- c.close()
- print "exiting..."
- '''
-
- def __init__(self, lhs_names, default_value=None, lhs_regexp=r'^\s*(?P<lhs>[^ =]+)', lhs_delims=' =',
- operator='=', strip_rhs=True):
- self.lhs_names = lhs_names
- self.default_value = default_value
- # lhs_regexp must have named group 'lhs' which returns the contents of the lhs
- self.lhs_regexp = lhs_regexp
- self.lhs_re = re.compile(self.lhs_regexp)
- self.lhs_delims = lhs_delims
- self.operator = operator
- self.strip_rhs = strip_rhs
- self.pairs = None
- self._reset()
-
- def _reset(self):
- self.lhs = None
- self.lhs_complete = False
- self.operator_complete = False
- self.rhs = None
-
- def open(self):
- # Save state
- self.prev_completer = readline.get_completer()
- self.prev_completer_delims = readline.get_completer_delims()
-
- # Set up for ourself
- readline.parse_and_bind("tab: complete")
- readline.set_completer(self.complete)
- readline.set_completer_delims(self.lhs_delims)
-
- def close(self):
- # Restore previous state
- readline.set_completer_delims(self.prev_completer_delims)
- readline.set_completer(self.prev_completer)
-
- def parse_input(self):
- '''We are looking for 3 tokens: <lhs,op,rhs>
- Extract as much of each token as possible.
- Set flags indicating if token is fully parsed.
- '''
- try:
- self._reset()
- buf_len = len(self.line_buffer)
- pos = 0
- lhs_match = self.lhs_re.search(self.line_buffer, pos)
- if not lhs_match: return # no lhs content
- self.lhs = lhs_match.group('lhs') # get lhs contents
- pos = lhs_match.end('lhs') # new scanning position
- if pos == buf_len: return # nothing after lhs, lhs incomplete
- self.lhs_complete = True # something trails the lhs, lhs is complete
- operator_beg = self.line_buffer.find(self.operator, pos) # locate operator
- if operator_beg == -1: return # did not find the operator
- self.operator_complete = True # operator fully parsed
- operator_end = operator_beg + len(self.operator)
- pos = operator_end # step over the operator
- self.rhs = self.line_buffer[pos:]
- except Exception, e:
- traceback.print_exc()
- print "Exception in %s.parse_input(): %s" % (self.__class__.__name__, e)
-
- def get_default_value(self):
- '''default_value can be a string, a dict, or a function.
- If it's a string it's a global default for all attributes.
- If it's a dict the default is looked up in the dict index by attribute.
- If it's a function, the function is called with 1 parameter, the attribute
- and it should return the default value for the attriubte or None'''
-
- if not self.lhs_complete: raise ValueError("attribute not parsed")
-
- # If the user previously provided a value let that override the supplied default
- if self.pairs is not None:
- prev_value = self.pairs.get(self.lhs)
- if prev_value is not None: return prev_value
-
- # No previous user provided value, query for a default
- default_value_type = type(self.default_value)
- if default_value_type is DictType:
- return self.default_value.get(self.lhs, None)
- elif default_value_type is FunctionType:
- return self.default_value(self.lhs)
- elif default_value_type is StringsType:
- return self.default_value
- else:
- return None
-
- def get_lhs_completions(self, text):
- if text:
- self.completions = [lhs for lhs in self.lhs_names if lhs.startswith(text)]
- else:
- self.completions = self.lhs_names
-
- def complete(self, text, state):
- self.line_buffer= readline.get_line_buffer()
- self.parse_input()
- if not self.lhs_complete:
- # lhs is not complete, set up to complete the lhs
- if state == 0:
- beg = readline.get_begidx()
- end = readline.get_endidx()
- self.get_lhs_completions(self.line_buffer[beg:end])
- if state >= len(self.completions): return None
- return self.completions[state]
-
-
- elif not self.operator_complete:
- # lhs is complete, but the operator is not so we complete
- # by inserting the operator manually.
- # Also try to complete the default value at this time.
- readline.insert_text('%s ' % self.operator)
- default_value = self.get_default_value()
- if default_value is not None:
- readline.insert_text(default_value)
- readline.redisplay()
- return None
- else:
- # lhs and operator are complete, if the the rhs is blank
- # (either empty or only only whitespace) then attempt
- # to complete by inserting the default value, otherwise
- # there is nothing we can complete to so we're done.
- if self.rhs.strip():
- return None
- default_value = self.get_default_value()
- if default_value is not None:
- readline.insert_text(default_value)
- readline.redisplay()
- return None
-
- def pre_input_hook(self):
- readline.insert_text('%s %s ' % (self.initial_lhs, self.operator))
- readline.redisplay()
-
- def read_input(self, prompt, initial_lhs=None):
- self.initial_lhs = initial_lhs
- try:
- self._reset()
- if initial_lhs is None:
- readline.set_pre_input_hook(None)
- else:
- readline.set_pre_input_hook(self.pre_input_hook)
- self.line_buffer = raw_input(prompt).strip()
- self.parse_input()
- if self.strip_rhs and self.rhs is not None:
- return self.lhs, self.rhs.strip()
- else:
- return self.lhs, self.rhs
- except EOFError:
- return None, None
-
- def get_pairs(self, prompt, mandatory_attrs=None, validate_callback=None, must_match=True, value_required=True):
- self.pairs = {}
- if mandatory_attrs:
- mandatory_attrs_remaining = mandatory_attrs[:]
- else:
- mandatory_attrs_remaining = []
-
- print "Enter name = value"
- print "Press <ENTER> to accept, a blank line terminates input"
- print "Pressing <TAB> will auto completes name, assignment, and value"
- print
- while True:
- if mandatory_attrs_remaining:
- attribute, value = self.read_input(prompt, mandatory_attrs_remaining[0])
- else:
- attribute, value = self.read_input(prompt)
- if attribute is None:
- # Are we done?
- if mandatory_attrs_remaining:
- print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
- continue
- else:
- break
- if value is None:
- if value_required:
- print "ERROR: you must specify a value for %s" % attribute
- continue
- else:
- if must_match and attribute not in self.lhs_names:
- print "ERROR: %s is not a valid name" % (attribute)
- continue
- if validate_callback is not None:
- if not validate_callback(attribute, value):
- print "ERROR: %s is not valid for %s" % (value, attribute)
- continue
- try:
- mandatory_attrs_remaining.remove(attribute)
- except ValueError:
- pass
-
- self.pairs[attribute] = value
- return self.pairs
-
-class ItemCompleter:
- '''
- Prompts the user for items in a list of items with auto completion.
- TAB completes partial input.
- More than one item can be specifed during input, whitespace and/or comma's seperate.
- Example:
-
- possible_items = ['foo', 'bar']
- c = ItemCompleter(possible_items)
- c.open()
- # Use read_input() to limit input to a single carriage return (e.g. <ENTER>)
- #items = c.read_input("Enter: ")
- # Use get_items to iterate until a blank line is entered.
- items = c.get_items("Enter: ")
- c.close()
- print "items=%s" % (items)
-
- '''
-
- def __init__(self, items):
- self.items = items
- self.initial_input = None
- self.item_delims = ' \t,'
- self.split_re = re.compile('[%s]+' % self.item_delims)
-
- def open(self):
- # Save state
- self.prev_completer = readline.get_completer()
- self.prev_completer_delims = readline.get_completer_delims()
-
- # Set up for ourself
- readline.parse_and_bind("tab: complete")
- readline.set_completer(self.complete)
- readline.set_completer_delims(self.item_delims)
-
- def close(self):
- # Restore previous state
- readline.set_completer_delims(self.prev_completer_delims)
- readline.set_completer(self.prev_completer)
-
- def get_item_completions(self, text):
- if text:
- self.completions = [lhs for lhs in self.items if lhs.startswith(text)]
- else:
- self.completions = self.items
-
- def complete(self, text, state):
- self.line_buffer= readline.get_line_buffer()
- if state == 0:
- beg = readline.get_begidx()
- end = readline.get_endidx()
- self.get_item_completions(self.line_buffer[beg:end])
- if state >= len(self.completions): return None
- return self.completions[state]
-
- def pre_input_hook(self):
- readline.insert_text('%s %s ' % (self.initial_input, self.operator))
- readline.redisplay()
-
- def read_input(self, prompt, initial_input=None):
- items = []
-
- self.initial_input = initial_input
- try:
- if initial_input is None:
- readline.set_pre_input_hook(None)
- else:
- readline.set_pre_input_hook(self.pre_input_hook)
- self.line_buffer = raw_input(prompt).strip()
- items = self.split_re.split(self.line_buffer)
- for item in items[:]:
- if not item: items.remove(item)
- return items
- except EOFError:
- return items
-
- def get_items(self, prompt, must_match=True):
- items = []
-
- print "Enter name [name ...]"
- print "Press <ENTER> to accept, blank line or control-D terminates input"
- print "Pressing <TAB> auto completes name"
- print
- while True:
- new_items = self.read_input(prompt)
- if not new_items: break
- for item in new_items:
- if must_match:
- if item not in self.items:
- print "ERROR: %s is not valid" % (item)
- continue
- if item in items: continue
- items.append(item)
-
- return items
-
-def get_gsserror(e):
- """A GSSError exception looks differently in python 2.4 than it does
- in python 2.5, deal with it."""
-
- try:
- primary = e[0]
- secondary = e[1]
- except:
- primary = e[0][0]
- secondary = e[0][1]
-
- return (primary[0], secondary[0])