# Authors: Simo Sorce # # Copyright (C) 2007 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; version 2 only # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # SHARE_DIR = "/usr/share/ipa/" PLUGINS_SHARE_DIR = "/usr/share/ipa/plugins" import string import tempfile import logging import subprocess import random import os, sys, traceback, readline import stat import shutil from ipa import ipavalidate from types import * import re import xmlrpclib import datetime try: from subprocess import CalledProcessError class CalledProcessError(subprocess.CalledProcessError): def __init__(self, returncode, cmd): super(CalledProcessError, self).__init__(returncode, cmd) except ImportError: # Python 2.4 doesn't implement CalledProcessError class CalledProcessError(Exception): """This exception is raised when a process run by check_call() returns a non-zero exit status. The exit status will be stored in the returncode attribute.""" def __init__(self, returncode, cmd): self.returncode = returncode self.cmd = cmd def __str__(self): return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) def realm_to_suffix(realm_name): s = realm_name.split(".") terms = ["dc=" + x.lower() for x in s] return ",".join(terms) def template_str(txt, vars): return string.Template(txt).substitute(vars) def template_file(infilename, vars): txt = open(infilename).read() return template_str(txt, vars) def write_tmp_file(txt): fd = tempfile.NamedTemporaryFile() fd.write(txt) fd.flush() return fd def run(args, stdin=None): p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) if stdin: stdout,stderr = p.communicate(stdin) else: stdout,stderr = p.communicate() logging.info(stdout) logging.info(stderr) if p.returncode != 0: raise CalledProcessError(p.returncode, ' '.join(args)) return (stdout, stderr) def file_exists(filename): try: mode = os.stat(filename)[stat.ST_MODE] if stat.S_ISREG(mode): return True else: return False except: return False def dir_exists(filename): try: mode = os.stat(filename)[stat.ST_MODE] if stat.S_ISDIR(mode): return True else: return False except: return False def install_file(fname, dest): if file_exists(dest): os.rename(dest, dest + ".orig") shutil.move(fname, dest) def backup_file(fname): if file_exists(fname): os.rename(fname, fname + ".orig") class CIDict(dict): """ Case-insensitive but case-respecting dictionary. This code is derived from python-ldap's cidict.py module, written by stroeder: http://python-ldap.sourceforge.net/ This version extends 'dict' so it works properly with TurboGears. If you extend UserDict, isinstance(foo, dict) returns false. """ def __init__(self,default=None): super(CIDict, self).__init__() self._keys = {} self.update(default or {}) def __getitem__(self,key): return super(CIDict,self).__getitem__(string.lower(key)) def __setitem__(self,key,value): lower_key = string.lower(key) self._keys[lower_key] = key return super(CIDict,self).__setitem__(string.lower(key),value) def __delitem__(self,key): lower_key = string.lower(key) del self._keys[lower_key] return super(CIDict,self).__delitem__(string.lower(key)) def update(self,dict): for key in dict.keys(): self[key] = dict[key] def has_key(self,key): return super(CIDict, self).has_key(string.lower(key)) def get(self,key,failobj=None): try: return self[key] except KeyError: return failobj def keys(self): return self._keys.values() def items(self): result = [] for k in self._keys.values(): result.append((k,self[k])) return result def copy(self): copy = {} for k in self._keys.values(): copy[k] = self[k] return copy def iteritems(self): return self.copy().iteritems() def iterkeys(self): return self.copy().iterkeys() def setdefault(self,key,value=None): try: return self[key] except KeyError: self[key] = value return value def pop(self, key, *args): try: value = self[key] del self[key] return value except KeyError: if len(args) == 1: return args[0] raise def popitem(self): (lower_key,value) = super(CIDict,self).popitem() key = self._keys[lower_key] del self._keys[lower_key] return (key,value) # # The safe_string_re regexp and needs_base64 function are extracted from the # python-ldap ldif module, which was # written by Michael Stroeder # http://python-ldap.sourceforge.net # # It was extracted because ipaldap.py is naughtily reaching into the ldif # module and squashing this regexp. # SAFE_STRING_PATTERN = '(^(\000|\n|\r| |:|<)|[\000\n\r\200-\377]+|[ ]+$)' safe_string_re = re.compile(SAFE_STRING_PATTERN) def needs_base64(s): """ returns 1 if s has to be base-64 encoded because of special chars """ return not safe_string_re.search(s) is None def wrap_binary_data(data): """Converts all binary data strings into Binary objects for transport back over xmlrpc.""" if isinstance(data, str): if needs_base64(data): return xmlrpclib.Binary(data) else: return data elif isinstance(data, list) or isinstance(data,tuple): retval = [] for value in data: retval.append(wrap_binary_data(value)) return retval elif isinstance(data, dict): retval = {} for (k,v) in data.iteritems(): retval[k] = wrap_binary_data(v) return retval else: return data def unwrap_binary_data(data): """Converts all Binary objects back into strings.""" if isinstance(data, xmlrpclib.Binary): # The data is decoded by the xmlproxy, but is stored # in a binary object for us. return str(data) elif isinstance(data, str): return data elif isinstance(data, list) or isinstance(data,tuple): retval = [] for value in data: retval.append(unwrap_binary_data(value)) return retval elif isinstance(data, dict): retval = {} for (k,v) in data.iteritems(): retval[k] = unwrap_binary_data(v) return retval else: return data class GeneralizedTimeZone(datetime.tzinfo): """This class is a basic timezone wrapper for the offset specified in a Generalized Time. It is dst-ignorant.""" def __init__(self,offsetstr="Z"): super(GeneralizedTimeZone, self).__init__() self.name = offsetstr self.houroffset = 0 self.minoffset = 0 if offsetstr == "Z": self.houroffset = 0 self.minoffset = 0 else: if (len(offsetstr) >= 3) and re.match(r'[-+]\d\d', offsetstr): self.houroffset = int(offsetstr[0:3]) offsetstr = offsetstr[3:] if (len(offsetstr) >= 2) and re.match(r'\d\d', offsetstr): self.minoffset = int(offsetstr[0:2]) offsetstr = offsetstr[2:] if len(offsetstr) > 0: raise ValueError() if self.houroffset < 0: self.minoffset *= -1 def utcoffset(self, dt): return datetime.timedelta(hours=self.houroffset, minutes=self.minoffset) def dst(self, dt): return datetime.timedelta(0) def tzname(self, dt): return self.name def parse_generalized_time(timestr): """Parses are Generalized Time string (as specified in X.680), returning a datetime object. Generalized Times are stored inside the krbPasswordExpiration attribute in LDAP. This method doesn't attempt to be perfect wrt timezones. If python can't be bothered to implement them, how can we...""" if len(timestr) < 8: return None try: date = timestr[:8] time = timestr[8:] year = int(date[:4]) month = int(date[4:6]) day = int(date[6:8]) hour = min = sec = msec = 0 tzone = None if (len(time) >= 2) and re.match(r'\d', time[0]): hour = int(time[:2]) time = time[2:] if len(time) >= 2 and (time[0] == "," or time[0] == "."): hour_fraction = "." time = time[1:] while (len(time) > 0) and re.match(r'\d', time[0]): hour_fraction += time[0] time = time[1:] total_secs = int(float(hour_fraction) * 3600) min, sec = divmod(total_secs, 60) if (len(time) >= 2) and re.match(r'\d', time[0]): min = int(time[:2]) time = time[2:] if len(time) >= 2 and (time[0] == "," or time[0] == "."): min_fraction = "." time = time[1:] while (len(time) > 0) and re.match(r'\d', time[0]): min_fraction += time[0] time = time[1:] sec = int(float(min_fraction) * 60) if (len(time) >= 2) and re.match(r'\d', time[0]): sec = int(time[:2]) time = time[2:] if len(time) >= 2 and (time[0] == "," or time[0] == "."): sec_fraction = "." time = time[1:] while (len(time) > 0) and re.match(r'\d', time[0]): sec_fraction += time[0] time = time[1:] msec = int(float(sec_fraction) * 1000000) if (len(time) > 0): tzone = GeneralizedTimeZone(time) return datetime.datetime(year, month, day, hour, min, sec, msec, tzone) except ValueError: return None def ipa_generate_password(): rndpwd = '' r = random.Random() for x in range(12): # rndpwd += chr(r.randint(32,126)) rndpwd += chr(r.randint(65,90)) #stricter set for testing return rndpwd def format_list(items, quote=None, page_width=80): '''Format a list of items formatting them so they wrap to fit the available width. The items will be sorted. The items may optionally be quoted. The quote parameter may either be a string, in which case it is added before and after the item. Or the quote parameter may be a pair (either a tuple or list). In this case quote[0] is left hand quote and quote[1] is the right hand quote. ''' left_quote = right_quote = '' num_items = len(items) if not num_items: return "" if quote is not None: if type(quote) in StringTypes: left_quote = right_quote = quote elif type(quote) is TupleType or type(quote) is ListType: left_quote = quote[0] right_quote = quote[1] max_len = max(map(len, items)) max_len += len(left_quote) + len(right_quote) num_columns = (page_width + max_len) / (max_len+1) num_rows = (num_items + num_columns - 1) / num_columns items.sort() rows = [''] * num_rows i = row = col = 0 while i < num_items: row = 0 if col == 0: separator = '' else: separator = ' ' while i < num_items and row < num_rows: rows[row] += "%s%*s" % (separator, -max_len, "%s%s%s" % (left_quote, items[i], right_quote)) i += 1 row += 1 col += 1 return '\n'.join(rows) key_value_re = re.compile("(\w+)\s*=\s*(([^\s'\\\"]+)|(?P['\\\"])((?P=quote)|(.*?[^\\\])(?P=quote)))") def parse_key_value_pairs(input): ''' Given a string composed of key=value pairs parse it and return a dict of the key/value pairs. Keys must be a word, a key must be followed by an equal sign (=) and a value. The value may be a single word or may be quoted. Quotes may be either single or double quotes, but must be balanced. Inside the quoted text the same quote used to start the quoted value may be used if it is escaped by preceding it with a backslash (\). White space between the key, the equal sign, and the value is ignored. Values are always strings. Empty values must be specified with an empty quoted string, it's value after parsing will be an empty string. Example: The string arg0 = '' arg1 = 1 arg2='two' arg3 = "three's a crowd" arg4 = "this is a \" quote" will produce arg0= arg1=1 arg2=two arg3=three's a crowd arg4=this is a " quote ''' kv_dict = {} for match in key_value_re.finditer(input): key = match.group(1) quote = match.group('quote') if match.group(5): value = match.group(6) if value is None: value = '' value = re.sub('\\\%s' % quote, quote, value) else: value = match.group(2) kv_dict[key] = value return kv_dict def parse_items(text): '''Given text with items separated by whitespace or comma, return a list of those items''' split_re = re.compile('[ ,\t\n]+') items = split_re.split(text) for item in items[:]: if not item: items.remove(item) return items def read_pairs_file(filename): comment_re = re.compile('#.*$', re.MULTILINE) if filename == '-': fd = sys.stdin else: fd = open(filename) text = fd.read() text = comment_re.sub('', text) # kill comments pairs = parse_key_value_pairs(text) if fd != sys.stdin: fd.close() return pairs def read_items_file(filename): comment_re = re.compile('#.*$', re.MULTILINE) if filename == '-': fd = sys.stdin else: fd = open(filename) text = fd.read() text = comment_re.sub('', text) # kill comments items = parse_items(text) if fd != sys.stdin: fd.close() return items def user_input(prompt, default = None, allow_empty = True): if default == None: while True: ret = raw_input("%s: " % prompt) if allow_empty or ret.strip(): return ret if isinstance(default, basestring): while True: ret = raw_input("%s [%s]: " % (prompt, default)) if not ret and (allow_empty or default): return default elif ret.strip(): return ret if isinstance(default, bool): if default: choice = "yes" else: choice = "no" while True: ret = raw_input("%s [%s]: " % (prompt, choice)) if not ret: return default elif ret.lower()[0] == "y": return True elif ret.lower()[0] == "n": return False if isinstance(default, int): while True: try: ret = raw_input("%s [%s]: " % (prompt, default)) if not ret: return default ret = int(ret) except ValueError: pass else: return ret def user_input_email(prompt, default = None, allow_empty = False): if default != None and allow_empty: prompt += " (enter \"none\" for empty)" while True: ret = user_input(prompt, default, allow_empty) if allow_empty and ret.lower() == "none": return "" if not ipavalidate.Email(ret, not allow_empty): return ret.strip() def user_input_plain(prompt, default = None, allow_empty = True, allow_spaces = True): while True: ret = user_input(prompt, default, allow_empty) if not ipavalidate.Plain(ret, not allow_empty, allow_spaces): return ret def user_input_path(prompt, default = None, allow_empty = True): if default != None and allow_empty: prompt += " (enter \"none\" for empty)" while True: ret = user_input(prompt, default, allow_empty) if allow_empty and ret.lower() == "none": return "" if not ipavalidate.Path(ret, not allow_empty): return ret class AttributeValueCompleter: ''' Gets input from the user in the form "lhs operator rhs" TAB completes partial input. lhs completes to a name in @lhs_names The lhs is fully parsed if a lhs_delim delimiter is seen, then TAB will complete to the operator and a default value. Default values for a lhs value can specified as: - a string, all lhs values will use this default - a dict, the lhs value is looked up in the dict to return the default or None - a function with a single arg, the lhs value, it returns the default or None After creating the completer you must open it to set the terminal up, Then get a line of input from the user by calling read_input() which returns two values, the lhs and rhs, which might be None if lhs or rhs was not parsed. After you are done getting input you should close the completer to restore the terminal. Example: (note this is essentially what the convenience function get_pairs() does) This will allow the user to autocomplete foo & foobar, both have defaults defined in a dict. In addition the foobar attribute must be specified before the prompting loop will exit. Also, this example show how to require that each attrbute entered by the user is valid. attrs = ['foo', 'foobar'] defaults = {'foo' : 'foo_default', 'foobar' : 'foobar_default'} mandatory_attrs = ['foobar'] c = AttributeValueCompleter(attrs, defaults) c.open() mandatory_attrs_remaining = mandatory_attrs[:] while True: if mandatory_attrs_remaining: attribute, value = c.read_input("Enter: ", mandatory_attrs_remaining[0]) try: mandatory_attrs_remaining.remove(attribute) except ValueError: pass else: attribute, value = c.read_input("Enter: ") if attribute is None: # Are we done? if mandatory_attrs_remaining: print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining)) continue else: break if attribute not in attrs: print "ERROR: %s is not a valid attribute" % (attribute) else: print "got '%s' = '%s'" % (attribute, value) c.close() print "exiting..." ''' def __init__(self, lhs_names, default_value=None, lhs_regexp=r'^\s*(?P[^ =]+)', lhs_delims=' =', operator='=', strip_rhs=True): self.lhs_names = lhs_names self.default_value = default_value # lhs_regexp must have named group 'lhs' which returns the contents of the lhs self.lhs_regexp = lhs_regexp self.lhs_re = re.compile(self.lhs_regexp) self.lhs_delims = lhs_delims self.operator = operator self.strip_rhs = strip_rhs self.pairs = None self._reset() def _reset(self): self.lhs = None self.lhs_complete = False self.operator_complete = False self.rhs = None def open(self): # Save state self.prev_completer = readline.get_completer() self.prev_completer_delims = readline.get_completer_delims() # Set up for ourself readline.parse_and_bind("tab: complete") readline.set_completer(self.complete) readline.set_completer_delims(self.lhs_delims) def close(self): # Restore previous state readline.set_completer_delims(self.prev_completer_delims) readline.set_completer(self.prev_completer) def parse_input(self): '''We are looking for 3 tokens: Extract as much of each token as possible. Set flags indicating if token is fully parsed. ''' try: self._reset() buf_len = len(self.line_buffer) pos = 0 lhs_match = self.lhs_re.search(self.line_buffer, pos) if not lhs_match: return # no lhs content self.lhs = lhs_match.group('lhs') # get lhs contents pos = lhs_match.end('lhs') # new scanning position if pos == buf_len: return # nothing after lhs, lhs incomplete self.lhs_complete = True # something trails the lhs, lhs is complete operator_beg = self.line_buffer.find(self.operator, pos) # locate operator if operator_beg == -1: return # did not find the operator self.operator_complete = True # operator fully parsed operator_end = operator_beg + len(self.operator) pos = operator_end # step over the operator self.rhs = self.line_buffer[pos:] except Exception, e: traceback.print_exc() print "Exception in %s.parse_input(): %s" % (self.__class__.__name__, e) def get_default_value(self): '''default_value can be a string, a dict, or a function. If it's a string it's a global default for all attributes. If it's a dict the default is looked up in the dict index by attribute. If it's a function, the function is called with 1 parameter, the attribute and it should return the default value for the attriubte or None''' if not self.lhs_complete: raise ValueError("attribute not parsed") # If the user previously provided a value let that override the supplied default if self.pairs is not None: prev_value = self.pairs.get(self.lhs) if prev_value is not None: return prev_value # No previous user provided value, query for a default default_value_type = type(self.default_value) if default_value_type is DictType: return self.default_value.get(self.lhs, None) elif default_value_type is FunctionType: return self.default_value(self.lhs) elif default_value_type is StringsType: return self.default_value else: return None def get_lhs_completions(self, text): if text: self.completions = [lhs for lhs in self.lhs_names if lhs.startswith(text)] else: self.completions = self.lhs_names def complete(self, text, state): self.line_buffer= readline.get_line_buffer() self.parse_input() if not self.lhs_complete: # lhs is not complete, set up to complete the lhs if state == 0: beg = readline.get_begidx() end = readline.get_endidx() self.get_lhs_completions(self.line_buffer[beg:end]) if state >= len(self.completions): return None return self.completions[state] elif not self.operator_complete: # lhs is complete, but the operator is not so we complete # by inserting the operator manually. # Also try to complete the default value at this time. readline.insert_text('%s ' % self.operator) default_value = self.get_default_value() if default_value is not None: readline.insert_text(default_value) readline.redisplay() return None else: # lhs and operator are complete, if the the rhs is blank # (either empty or only only whitespace) then attempt # to complete by inserting the default value, otherwise # there is nothing we can complete to so we're done. if self.rhs.strip(): return None default_value = self.get_default_value() if default_value is not None: readline.insert_text(default_value) readline.redisplay() return None def pre_input_hook(self): readline.insert_text('%s %s ' % (self.initial_lhs, self.operator)) readline.redisplay() def read_input(self, prompt, initial_lhs=None): self.initial_lhs = initial_lhs try: self._reset() if initial_lhs is None: readline.set_pre_input_hook(None) else: readline.set_pre_input_hook(self.pre_input_hook) self.line_buffer = raw_input(prompt).strip() self.parse_input() if self.strip_rhs and self.rhs is not None: return self.lhs, self.rhs.strip() else: return self.lhs, self.rhs except EOFError: return None, None def get_pairs(self, prompt, mandatory_attrs=None, validate_callback=None, must_match=True, value_required=True): self.pairs = {} if mandatory_attrs: mandatory_attrs_remaining = mandatory_attrs[:] else: mandatory_attrs_remaining = [] print "Enter name = value" print "Press to accept, a blank line terminates input" print "Pressing will auto completes name, assignment, and value" print while True: if mandatory_attrs_remaining: attribute, value = self.read_input(prompt, mandatory_attrs_remaining[0]) else: attribute, value = self.read_input(prompt) if attribute is None: # Are we done? if mandatory_attrs_remaining: print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining)) continue else: break if value is None: if value_required: print "ERROR: you must specify a value for %s" % attribute continue else: if must_match and attribute not in self.lhs_names: print "ERROR: %s is not a valid name" % (attribute) continue if validate_callback is not None: if not validate_callback(attribute, value): print "ERROR: %s is not valid for %s" % (value, attribute) continue try: mandatory_attrs_remaining.remove(attribute) except ValueError: pass self.pairs[attribute] = value return self.pairs class ItemCompleter: ''' Prompts the user for items in a list of items with auto completion. TAB completes partial input. More than one item can be specifed during input, whitespace and/or comma's seperate. Example: possible_items = ['foo', 'bar'] c = ItemCompleter(possible_items) c.open() # Use read_input() to limit input to a single carriage return (e.g. ) #items = c.read_input("Enter: ") # Use get_items to iterate until a blank line is entered. items = c.get_items("Enter: ") c.close() print "items=%s" % (items) ''' def __init__(self, items): self.items = items self.initial_input = None self.item_delims = ' \t,' self.split_re = re.compile('[%s]+' % self.item_delims) def open(self): # Save state self.prev_completer = readline.get_completer() self.prev_completer_delims = readline.get_completer_delims() # Set up for ourself readline.parse_and_bind("tab: complete") readline.set_completer(self.complete) readline.set_completer_delims(self.item_delims) def close(self): # Restore previous state readline.set_completer_delims(self.prev_completer_delims) readline.set_completer(self.prev_completer) def get_item_completions(self, text): if text: self.completions = [lhs for lhs in self.items if lhs.startswith(text)] else: self.completions = self.items def complete(self, text, state): self.line_buffer= readline.get_line_buffer() if state == 0: beg = readline.get_begidx() end = readline.get_endidx() self.get_item_completions(self.line_buffer[beg:end]) if state >= len(self.completions): return None return self.completions[state] def pre_input_hook(self): readline.insert_text('%s %s ' % (self.initial_input, self.operator)) readline.redisplay() def read_input(self, prompt, initial_input=None): items = [] self.initial_input = initial_input try: if initial_input is None: readline.set_pre_input_hook(None) else: readline.set_pre_input_hook(self.pre_input_hook) self.line_buffer = raw_input(prompt).strip() items = self.split_re.split(self.line_buffer) for item in items[:]: if not item: items.remove(item) return items except EOFError: return items def get_items(self, prompt, must_match=True): items = [] print "Enter name [name ...]" print "Press to accept, blank line or control-D terminates input" print "Pressing auto completes name" print while True: new_items = self.read_input(prompt) if not new_items: break for item in new_items: if must_match: if item not in self.items: print "ERROR: %s is not valid" % (item) continue if item in items: continue items.append(item) return items def get_gsserror(e): """A GSSError exception looks differently in python 2.4 than it does in python 2.5, deal with it.""" try: primary = e[0] secondary = e[1] except: primary = e[0][0] secondary = e[0][1] return (primary[0], secondary[0]) id='n811' href='#n811'>811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753
require 'test/unit'
require 'tempfile'
require 'fileutils'

require 'csv'

class CSV
  class StreamBuf
    # Let buffer work hard.
    remove_const("BufSize")
    BufSize = 2
  end
end


module CSVTestSupport
  def d(data)
    data
  end
end


class TestCSV < Test::Unit::TestCase
  file = Tempfile.new("crlf")
  file << "\n"
  file.open
  file.binmode
  RSEP = file.read
  file.close

  include CSVTestSupport

  class << self
    include CSVTestSupport
  end

  @@simpleCSVData = {
    [nil] => '',
    [''] => '""',
    [nil, nil] => ',',
    [nil, nil, nil] => ',,',
    ['foo'] => 'foo',
    [','] => '","',
    [',', ','] => '",",","',
    [';'] => ';',
    [';', ';'] => ';,;',
    ["\"\r", "\"\r"] => "\"\"\"\r\",\"\"\"\r\"",
    ["\"\n", "\"\n"] => "\"\"\"\n\",\"\"\"\n\"",
    ["\t"] => "\t",
    ["\t", "\t"] => "\t,\t",
    ['foo', 'bar'] => 'foo,bar',
    ['foo', '"bar"', 'baz'] => 'foo,"""bar""",baz',
    ['foo', 'foo,bar', 'baz'] => 'foo,"foo,bar",baz',
    ['foo', '""', 'baz'] => 'foo,"""""",baz',
    ['foo', '', 'baz'] => 'foo,"",baz',
    ['foo', nil, 'baz'] => 'foo,,baz',
    [nil, 'foo', 'bar'] => ',foo,bar',
    ['foo', 'bar', nil] => 'foo,bar,',
    ['foo', "\r", 'baz'] => "foo,\"\r\",baz",
    ['foo', "\n", 'baz'] => "foo,\"\n\",baz",
    ['foo', "\r\n\r", 'baz'] => "foo,\"\r\n\r\",baz",
    ['foo', "\r\n", 'baz'] => "foo,\"\r\n\",baz",
    ['foo', "\r.\n", 'baz'] => "foo,\"\r.\n\",baz",
    ['foo', "\r\n\n", 'baz'] => "foo,\"\r\n\n\",baz",
    ['foo', '"', 'baz'] => 'foo,"""",baz',
  }

  @@fullCSVData = {
    [d(nil)] => '',
    [d('')] => '""',
    [d(nil), d(nil)] => ',',
    [d(nil), d(nil), d(nil)] => ',,',
    [d('foo')] => 'foo',
    [d('foo'), d('bar')] => 'foo,bar',
    [d('foo'), d('"bar"'), d('baz')] => 'foo,"""bar""",baz',
    [d('foo'), d('foo,bar'), d('baz')] => 'foo,"foo,bar",baz',
    [d('foo'), d('""'), d('baz')] => 'foo,"""""",baz',
    [d('foo'), d(''), d('baz')] => 'foo,"",baz',
    [d('foo'), d(nil), d('baz')] => 'foo,,baz',
    [d('foo'), d("\r"), d('baz')] => "foo,\"\r\",baz",
    [d('foo'), d("\n"), d('baz')] => "foo,\"\n\",baz",
    [d('foo'), d("\r\n"), d('baz')] => "foo,\"\r\n\",baz",
    [d('foo'), d("\r.\n"), d('baz')] => "foo,\"\r.\n\",baz",
    [d('foo'), d("\r\n\n"), d('baz')] => "foo,\"\r\n\n\",baz",
    [d('foo'), d('"'), d('baz')] => 'foo,"""",baz',
  }

  @@fullCSVDataArray = @@fullCSVData.collect { |key, value| key }

  def ssv2csv(ssvStr, row_sep = nil)
    sepConv(ssvStr, ?;, ?,, row_sep)
  end

  def csv2ssv(csvStr, row_sep = nil)
    sepConv(csvStr, ?,, ?;, row_sep)
  end

  def tsv2csv(tsvStr, row_sep = nil)
    sepConv(tsvStr, ?\t, ?,, row_sep)
  end

  def csv2tsv(csvStr, row_sep = nil)
    sepConv(csvStr, ?,, ?\t, row_sep)
  end

  def sepConv(srcStr, srcSep, destSep, row_sep = nil)
    rows = []
    cols, idx = CSV.parse_row(srcStr, 0, rows, srcSep, row_sep)
    destStr = ''
    cols = CSV.generate_row(rows, rows.size, destStr, destSep, row_sep)
    destStr
  end

public

  def setup
    @tmpdir = File.join(Dir.tmpdir, "ruby_test_csv_tmp_#{$$}")
    Dir.mkdir(@tmpdir)
    @infile = File.join(@tmpdir, 'in.csv')
    @infiletsv = File.join(@tmpdir, 'in.tsv')
    @emptyfile = File.join(@tmpdir, 'empty.csv')
    @outfile = File.join(@tmpdir, 'out.csv')
    @bomfile = File.join(@tmpdir, "bom.csv")
    @macfile = File.join(@tmpdir, "mac.csv")

    CSV.open(@infile, "wb") do |writer|
      @@fullCSVDataArray.each do |row|
	writer.add_row(row)
      end
    end

    CSV.open(@infiletsv, "wb", ?\t) do |writer|
      @@fullCSVDataArray.each do |row|
	writer.add_row(row)
      end
    end

    CSV.generate(@emptyfile) do |writer|
      # Create empty file.
    end

    File.open(@bomfile, "wb") do |f|
      f.write("\357\273\277\"foo\"\r\n\"bar\"\r\n")
    end

    File.open(@macfile, "wb") do |f|
      f.write("\"Avenches\",\"aus Umgebung\"\r\"Bad Hersfeld\",\"Ausgrabung\"")
    end
  end

  def teardown
    FileUtils.rm_rf(@tmpdir)
  end

  #### CSV::Reader unit test
  
  def test_Reader_each
    file = File.open(@infile, "rb")
    begin
      reader = CSV::Reader.create(file)
      expectedArray = @@fullCSVDataArray.dup
      first = true
      ret = reader.each { |row|
	if first
	  assert_instance_of(Array, row)
	  first = false
	end
	expected = expectedArray.shift
	assert_equal(expected, row)
      }
      assert_nil(ret, "Return is nil")
      assert(expectedArray.empty?)
    ensure
      file.close
    end

    # Illegal format.
    reader = CSV::Reader.create("a,b\r\na,b,\"c\"\ra")
    assert_raises(CSV::IllegalFormatError) do
      reader.each do |row|
      end
    end

    reader = CSV::Reader.create("a,b\r\n\"")
    assert_raises(CSV::IllegalFormatError) do
      reader.each do |row|
      end
    end
  end

  def test_Reader_shift
    file = File.open(@infile, "rb")
    begin
      reader = CSV::Reader.create(file)
      first = true
      checked = 0
      @@fullCSVDataArray.each do |expected|
	actual = reader.shift
	if first
	  assert_instance_of(Array, actual)
	  first = false
	end
	assert_equal(expected, actual)
	checked += 1
      end
      assert(checked == @@fullCSVDataArray.size)
    ensure
      file.close
    end

    # Illegal format.
    reader = CSV::Reader.create("a,b\r\na,b,\"c\"\ra")
    assert_raises(CSV::IllegalFormatError) do
      reader.shift
      reader.shift
    end

    reader = CSV::Reader.create("a,b\r\na,b,\"c\"\ra")
    assert_raises(CSV::IllegalFormatError) do
      reader.shift
      reader.shift
    end
  end

  def test_Reader_getRow
    if CSV::Reader.respond_to?(:allocate)
      obj = CSV::Reader.allocate
      assert_raises(NotImplementedError) do
	row = []
	obj.shift
      end
    end
  end

  def test_IOReader_close_on_terminate
    f = File.open(@infile, "r")
    reader = CSV::IOReader.create(f)
    reader.close
    assert(!f.closed?)
    f.close

    f = File.open(@infile, "r")
    writer = CSV::IOReader.create(f)
    writer.close_on_terminate
    writer.close
    assert(f.closed?)
  end

  def test_Reader_close
    f = File.open(@infile, "r")
    reader = CSV::IOReader.create(f)
    reader.close_on_terminate
    reader.close
    assert(f.closed?)
  end

  def test_Reader_s_new
    assert_raises(RuntimeError) do
      CSV::Reader.new(nil)
    end
  end

  def test_Reader_s_create
    reader = CSV::Reader.create("abc")
    assert_instance_of(CSV::StringReader, reader, "With a String")

    file = File.open(@infile, "rb")
    reader = CSV::Reader.create(file)
    assert_instance_of(CSV::IOReader, reader, 'With an IO')

    obj = Object.new
    def obj.sysread(size)
      "abc"
    end
    def obj.read(size)
      "abc"
    end
    reader = CSV::Reader.create(obj)
    assert_instance_of(CSV::IOReader, reader, "With not an IO or String")

    # No need to test Tempfile because it's a pseudo IO.  I test this here
    # fors other tests.
    reader = CSV::Reader.create(Tempfile.new("in.csv"))
    assert_instance_of(CSV::IOReader, reader, "With an pseudo IO.")
    file.close
  end

  def test_IOReader_s_create_binmode
    file = File.open(@outfile, "wb")
    file << "\"\r\n\",\"\r\",\"\n\"\r1,2,3"
    file.close

    file = File.open(@outfile, "rb")
    begin
      reader = CSV::IOReader.new(file, ?,, ?\r)
      assert_equal(["\r\n", "\r", "\n"], reader.shift.to_a)
      assert_equal(["1", "2", "3"], reader.shift.to_a)
      reader.close
    ensure
      file.close
    end

    file = File.open(@outfile, "r")	# not "rb"
    begin
      lfincell = (RSEP == "\n" ? "\r\n" : "\n")
      reader = CSV::IOReader.new(file, ?,, ?\r)
      assert_equal([lfincell, "\r", "\n"], reader.shift.to_a)
      assert_equal(["1", "2", "3"], reader.shift.to_a)
      reader.close
    ensure
      file.close
    end
  end

  def test_Reader_s_parse
    ret = CSV::Reader.parse("a,b,c") { |row|
      assert_instance_of(Array, row, "Block parameter")
    }
    assert_nil(ret, "Return is nil")

    ret = CSV::Reader.parse("a;b;c", ?;) { |row|
      assert_instance_of(Array, row, "Block parameter")
    }

    file = Tempfile.new("in.csv")
    file << "a,b,c"
    file.open
    ret = CSV::Reader.parse(file) { |row|
      assert_instance_of(Array, row, "Block parameter")
    }
    assert_nil(ret, "Return is nil")

    file = Tempfile.new("in.csv")
    file << "a,b,c"
    file.open
    ret = CSV::Reader.parse(file, ?,) { |row|
      assert_instance_of(Array, row, "Block parameter")
    }

    # Illegal format.
    assert_raises(CSV::IllegalFormatError) do
      CSV::Reader.parse("a,b\r\na,b,\"c\"\ra") do |row|
      end
    end

    assert_raises(CSV::IllegalFormatError) do
      CSV::Reader.parse("a,b\r\na,b\"") do |row|
      end
    end
  end


  #### CSV::Writer unit test
  
  def test_Writer_s_new
    assert_raises(RuntimeError) do
      CSV::Writer.new(nil)
    end
  end

  def test_Writer_s_generate
    ret = CSV::Writer.generate(STDOUT) { |writer|
      assert_instance_of(CSV::BasicWriter, writer, "Block parameter")
    }

    ret = CSV::Writer.generate(STDOUT, ?;) { |writer|
      assert_instance_of(CSV::BasicWriter, writer, "Block parameter")
    }

    assert_nil(ret, "Return is nil")
  end

  def test_Writer_s_create
    writer = CSV::Writer.create(STDERR)
    assert_instance_of(CSV::BasicWriter, writer, "String")

    writer = CSV::Writer.create(STDERR, ?;)
    assert_instance_of(CSV::BasicWriter, writer, "String")

    writer = CSV::Writer.create(Tempfile.new("out.csv"))
    assert_instance_of(CSV::BasicWriter, writer, "IO")
  end

  def test_Writer_LSHIFT # '<<'
    file = Tempfile.new("out.csv")
    CSV::Writer.generate(file) do |writer|
      ret = writer << ['a', 'b', 'c']
      assert_instance_of(CSV::BasicWriter, ret, 'Return is self')

      writer << [nil, 'e', 'f'] << [nil, nil, '']
    end
    file.open
    file.binmode
    str = file.read
    assert_equal("a,b,c#{RSEP},e,f#{RSEP},,\"\"#{RSEP}", str, 'Normal')

    file = Tempfile.new("out2.csv")
    CSV::Writer.generate(file) do |writer|
      ret = writer << [d('a'), d('b'), d('c')]
      assert_instance_of(CSV::BasicWriter, ret, 'Return is self')

      writer << [d(nil), d('e'), d('f')] << [d(nil), d(nil), d('')]
    end
    file.open
    file.binmode
    str = file.read
    assert_equal("a,b,c#{RSEP},e,f#{RSEP},,\"\"#{RSEP}", str, 'Normal')
  end

  def test_Writer_add_row
    file = Tempfile.new("out.csv")
    CSV::Writer.generate(file) do |writer|
      ret = writer.add_row(
	[d('a'), d('b'), d('c')])
      assert_instance_of(CSV::BasicWriter, ret, 'Return is self')

      writer.add_row(
	[d(nil), d('e'), d('f')]
     ).add_row(
	[d(nil), d(nil), d('')]
     )
    end
    file.open
    file.binmode
    str = file.read
    assert_equal("a,b,c#{RSEP},e,f#{RSEP},,\"\"#{RSEP}", str, 'Normal')
  end

  def test_Writer_close
    f = File.open(@outfile, "w")
    writer = CSV::BasicWriter.create(f)
    writer.close_on_terminate
    writer.close
    assert(f.closed?)
  end

  def test_BasicWriter_close_on_terminate
    f = File.open(@outfile, "w")
    writer = CSV::BasicWriter.create(f)
    writer.close
    assert(!f.closed?)
    f.close

    f = File.open(@outfile, "w")
    writer = CSV::BasicWriter.new(f)
    writer.close_on_terminate
    writer.close
    assert(f.closed?)
  end

  def test_BasicWriter_s_create_binmode
    file = File.open(@outfile, "w")	# not "wb"
    begin
      writer = CSV::BasicWriter.new(file, ?,, ?\r)
      writer << ["\r\n", "\r", "\n"]
      writer << ["1", "2", "3"]
      writer.close
    ensure
      file.close
    end

    file = File.open(@outfile, "rb")
    str = file.read
    file.close
    assert_equal("\"\r#{RSEP}\",\"\r\",\"#{RSEP}\"\r1,2,3\r", str)
  end

  #### CSV unit test

  def test_s_open_reader
    assert_raises(ArgumentError, 'Illegal mode') do
      CSV.open("temp", "a")
    end

    assert_raises(ArgumentError, 'Illegal mode') do
      CSV.open("temp", "a", ?;)
    end

    reader = CSV.open(@infile, "r")
    assert_instance_of(CSV::IOReader, reader)
    reader.close

    reader = CSV.open(@infile, "rb")
    assert_instance_of(CSV::IOReader, reader)
    reader.close

    reader = CSV.open(@infile, "r", ?;)
    assert_instance_of(CSV::IOReader, reader)
    reader.close

    CSV.open(@infile, "r") do |row|
      assert_instance_of(Array, row)
      break
    end

    CSV.open(@infiletsv, "r", ?\t) do |row|
      assert_instance_of(Array, row)
      break
    end

    assert_raises(Errno::ENOENT) do
      CSV.open("NoSuchFileOrDirectory", "r")
    end

    assert_raises(Errno::ENOENT) do
      CSV.open("NoSuchFileOrDirectory", "r", ?;)
    end

    # Illegal format.
    File.open(@outfile, "wb") do |f|
      f << "a,b\r\na,b,\"c\"\ra"
    end
    assert_raises(CSV::IllegalFormatError) do
      CSV.open(@outfile, "r") do |row|
      end
    end

    File.open(@outfile, "wb") do |f|
      f << "a,b\r\na,b\""
    end
    assert_raises(CSV::IllegalFormatError) do
      CSV.open(@outfile, "r") do |row|
      end
    end

    CSV.open(@emptyfile, "r") do |row|
      assert_fail("Must not reach here")
    end
  end

  def test_s_parse
    result = CSV.parse(File.read(@infile))
    assert_instance_of(Array, result)
    assert_instance_of(Array, result[0])

    result = CSV.parse(File.read(@infile))
    assert_instance_of(Array, result)
    assert_instance_of(Array, result[0])

    assert_equal([], CSV.parse(""))
    assert_equal([[nil]], CSV.parse("\n"))

    CSV.parse(File.read(@infile)) do |row|
      assert_instance_of(Array, row)
      break
    end

    CSV.parse(File.read(@infiletsv), ?\t) do |row|
      assert_instance_of(Array, row)
      break
    end

    CSV.parse("") do |row|
      assert(false)
    end

    count = 0
    CSV.parse("\n") do |row|
      assert_equal([nil], row)
      count += 1
    end
    assert_equal(1, count)

    assert_equal([["a|b-c|d"]], CSV.parse("a|b-c|d"))
    assert_equal([["a", "b"], ["c", "d"]], CSV.parse("a|b-c|d", "|", "-"))
  end

  def test_s_open_writer
    writer = CSV.open(@outfile, "w")
    assert_instance_of(CSV::BasicWriter, writer)
    writer.close

    writer = CSV.open(@outfile, "wb")
    assert_instance_of(CSV::BasicWriter, writer)
    writer.close

    writer = CSV.open(@outfile, "wb", ?;)
    assert_instance_of(CSV::BasicWriter, writer)
    writer.close

    CSV.open(@outfile, "w") do |writer|
      assert_instance_of(CSV::BasicWriter, writer)
    end

    CSV.open(@outfile, "w", ?;) do |writer|
      assert_instance_of(CSV::BasicWriter, writer)
    end

    begin
      CSV.open(@tmpdir, "w")
      assert(false)
    rescue Exception => ex
      assert(ex.is_a?(Errno::EEXIST) || ex.is_a?(Errno::EISDIR) || ex.is_a?(Errno::EACCES))
    end
  end

  def test_s_generate
    writer = CSV.generate(@outfile)
    assert_instance_of(CSV::BasicWriter, writer)
    writer.close

    writer = CSV.generate(@outfile, ?;)
    assert_instance_of(CSV::BasicWriter, writer)
    writer.close

    CSV.generate(@outfile) do |writer|
      assert_instance_of(CSV::BasicWriter, writer)
    end

    CSV.generate(@outfile, ?;) do |writer|
      assert_instance_of(CSV::BasicWriter, writer)
    end

    begin
      CSV.generate(@tmpdir)
      assert(false)
    rescue Exception => ex
      assert(ex.is_a?(Errno::EEXIST) || ex.is_a?(Errno::EISDIR) || ex.is_a?(Errno::EACCES))
    end
  end

  def test_s_generate_line
    str = CSV.generate_line([])
    assert_equal('', str, "Extra boundary check.")

    str = CSV.generate_line([], ?;)
    assert_equal('', str, "Extra boundary check.")

    @@simpleCSVData.each do |col, str|
      buf = CSV.generate_line(col)
      assert_equal(str, buf)
    end

    @@simpleCSVData.each do |col, str|
      buf = CSV.generate_line(col, ?;)
      assert_equal(str + "\n", ssv2csv(buf))
    end

    @@simpleCSVData.each do |col, str|
      buf = CSV.generate_line(col, ?\t)
      assert_equal(str + "\n", tsv2csv(buf))
    end

    str = CSV.generate_line(['a', 'b'], nil, ?|)
    assert_equal('a,b', str)

    str = CSV.generate_line(['a', 'b'], nil, "a")
    assert_equal('"a",b', str)
  end

  def test_s_generate_row
    buf = ''
    cols = CSV.generate_row([], 0, buf)
    assert_equal(0, cols)
    assert_equal("\n", buf, "Extra boundary check.")

    buf = ''
    cols = CSV.generate_row([], 0, buf, ?;)
    assert_equal(0, cols)
    assert_equal("\n", buf, "Extra boundary check.")

    buf = ''
    cols = CSV.generate_row([], 0, buf, ?\t)
    assert_equal(0, cols)
    assert_equal("\n", buf, "Extra boundary check.")

    buf = ''
    cols = CSV.generate_row([], 0, buf, ?\t, ?|)
    assert_equal(0, cols)
    assert_equal("|", buf, "Extra boundary check.")

    buf = ''
    cols = CSV.generate_row([d('1')], 2, buf)
    assert_equal('1,', buf)

    buf = ''
    cols = CSV.generate_row([d('1')], 2, buf, ?;)
    assert_equal('1;', buf)

    buf = ''
    cols = CSV.generate_row([d('1')], 2, buf, ?\t)
    assert_equal("1\t", buf)

    buf = ''
    cols = CSV.generate_row([d('1')], 2, buf, ?\t, ?|)
    assert_equal("1\t", buf)

    buf = ''
    cols = CSV.generate_row([d('1'), d('2')], 1, buf)
    assert_equal("1\n", buf)

    buf = ''
    cols = CSV.generate_row([d('1'), d('2')], 1, buf, ?;)
    assert_equal("1\n", buf)

    buf = ''
    cols = CSV.generate_row([d('1'), d('2')], 1, buf, ?\t)
    assert_equal("1\n", buf)

    buf = ''
    cols = CSV.generate_row([d('1'), d('2')], 1, buf, ?\t, ?\n)
    assert_equal("1\n", buf)

    buf = ''
    cols = CSV.generate_row([d('1'), d('2')], 1, buf, ?\t, ?\r)
    assert_equal("1\r", buf)

    buf = ''
    cols = CSV.generate_row([d('1'), d('2')], 1, buf, ?\t, ?|)
    assert_equal("1|", buf)

    @@fullCSVData.each do |col, str|
      buf = ''
      cols = CSV.generate_row(col, col.size, buf)
      assert_equal(col.size, cols)
      assert_equal(str + "\n", buf)
    end

    @@fullCSVData.each do |col, str|
      buf = ''
      cols = CSV.generate_row(col, col.size, buf, ?;)
      assert_equal(col.size, cols)
      assert_equal(str + "\n", ssv2csv(buf))
    end

    @@fullCSVData.each do |col, str|
      buf = ''
      cols = CSV.generate_row(col, col.size, buf, ?\t)
      assert_equal(col.size, cols)
      assert_equal(str + "\n", tsv2csv(buf))
    end

    # row separator
    @@fullCSVData.each do |col, str|
      buf = ''
      cols = CSV.generate_row(col, col.size, buf, ?,, ?|)
      assert_equal(col.size, cols)
      assert_equal(str + "|", buf)
    end

    # col and row separator
    @@fullCSVData.each do |col, str|
      buf = ''
      cols = CSV.generate_row(col, col.size, buf, ?\t, ?|)
      assert_equal(col.size, cols)
      assert_equal(str + "|", tsv2csv(buf, ?|))
    end

    buf = ''
    toBe = ''
    cols = 0
    colsToBe = 0
    @@fullCSVData.each do |col, str|
      cols += CSV.generate_row(col, col.size, buf)
      toBe << str << "\n"
      colsToBe += col.size
    end
    assert_equal(colsToBe, cols)
    assert_equal(toBe, buf)

    buf = ''
    toBe = ''
    cols = 0
    colsToBe = 0
    @@fullCSVData.each do |col, str|
      lineBuf = ''
      cols += CSV.generate_row(col, col.size, lineBuf, ?;)
      buf << ssv2csv(lineBuf) << "\n"
      toBe << ssv2csv(lineBuf) << "\n"
      colsToBe += col.size
    end
    assert_equal(colsToBe, cols)
    assert_equal(toBe, buf)

    buf = ''
    toBe = ''
    cols = 0
    colsToBe = 0
    @@fullCSVData.each do |col, str|
      lineBuf = ''
      cols += CSV.generate_row(col, col.size, lineBuf, ?\t)
      buf << tsv2csv(lineBuf) << "\n"
      toBe << tsv2csv(lineBuf) << "\n"
      colsToBe += col.size
    end
    assert_equal(colsToBe, cols)
    assert_equal(toBe, buf)

    buf = ''
    toBe = ''
    cols = 0
    colsToBe = 0
    @@fullCSVData.each do |col, str|
      lineBuf = ''
      cols += CSV.generate_row(col, col.size, lineBuf, ?|)
      buf << tsv2csv(lineBuf, ?|)
      toBe << tsv2csv(lineBuf, ?|)
      colsToBe += col.size
    end
    assert_equal(colsToBe, cols)
    assert_equal(toBe, buf)
  end

  def test_s_parse_line
    @@simpleCSVData.each do |col, str|
      row = CSV.parse_line(str)
      assert_instance_of(Array, row)
      assert_equal(col.size, row.size)
      assert_equal(col, row)
    end

    @@simpleCSVData.each do |col, str|
      str = csv2ssv(str)
      row = CSV.parse_line(str, ?;)
      assert_instance_of(Array, row)
      assert_equal(col.size, row.size, str.inspect)
      assert_equal(col, row, str.inspect)
    end

    @@simpleCSVData.each do |col, str|
      str = csv2tsv(str)
      row = CSV.parse_line(str, ?\t)
      assert_instance_of(Array, row)
      assert_equal(col.size, row.size)
      assert_equal(col, row)
    end

    assert_equal(['a', 'b', 'c'], CSV.parse_line("a,b,c", nil, nil))
    assert_equal(['a', nil], CSV.parse_line("a,b,c", nil, ?b))
    assert_equal(['a', 'b', nil], CSV.parse_line("a,b,c", nil, "c"))
    assert_equal([nil], CSV.parse_line(""))
    assert_equal([nil], CSV.parse_line("\n"))
    assert_equal([""], CSV.parse_line("\"\"\n"))
    
    # Illegal format.
    buf = []
    row = CSV.parse_line("a,b,\"c\"\ra")
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    buf = Array.new
    row = CSV.parse_line("a;b;\"c\"\ra", ?;)
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    buf = Array.new
    row = CSV.parse_line("a\tb\t\"c\"\ra", ?\t)
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    row = CSV.parse_line("a,b\"")
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    row = CSV.parse_line("a;b\"", ?;)
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    row = CSV.parse_line("a\tb\"", ?\t)
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    row = CSV.parse_line("\"a,b\"\r,")
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    row = CSV.parse_line("\"a;b\"\r;", ?;)
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    row = CSV.parse_line("\"a\tb\"\r\t", ?\t)
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    row = CSV.parse_line("\"a,b\"\r\"")
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    row = CSV.parse_line("\"a;b\"\r\"", ?;)
    assert_instance_of(Array, row)
    assert_equal(0, row.size)

    row = CSV.parse_line("\"a\tb\"\r\"", ?\t)
    assert_instance_of(Array, row)
    assert_equal(0, row.size)
  end

  def test_s_parse_row
    @@fullCSVData.each do |col, str|
      buf = Array.new
      cols, idx = CSV.parse_row(str + "\r\n", 0, buf)
      assert_equal(cols, buf.size, "Reported size.")
      assert_equal(col.size, buf.size, "Size.")
      assert_equal(col, buf, str.inspect)

      buf = Array.new
      cols, idx = CSV.parse_row(str + "\n", 0, buf, ?,, ?\n)
      assert_equal(cols, buf.size, "Reported size.")
      assert_equal(col.size, buf.size, "Size.")
      assert_equal(col, buf, str.inspect)

      # separator: |
      buf = Array.new
      cols, idx = CSV.parse_row(str + "|", 0, buf, ?,)
      assert_not_equal(col, buf)
      buf = Array.new
      cols, idx = CSV.parse_row(str + "|", 0, buf, ?,, ?|)
      assert_equal(cols, buf.size, "Reported size.")
      assert_equal(col.size, buf.size, "Size.")
      assert_equal(col, buf, str.inspect)
    end

    @@fullCSVData.each do |col, str|
      str = csv2ssv(str)
      buf = Array.new
      cols, idx = CSV.parse_row(str + "\r\n", 0, buf, ?;)
      assert_equal(cols, buf.size, "Reported size.")
      assert_equal(col.size, buf.size, "Size.")
      assert_equal(col, buf, str)
    end

    @@fullCSVData.each do |col, str|
      str = csv2tsv(str)
      buf = Array.new
      cols, idx = CSV.parse_row(str + "\r\n", 0, buf, ?\t)
      assert_equal(cols, buf.size, "Reported size.")
      assert_equal(col.size, buf.size, "Size.")
      assert_equal(col, buf, str)
    end

    @@fullCSVData.each do |col, str|
      str = csv2tsv(str, ?|)
      buf = Array.new
      cols, idx = CSV.parse_row(str + "|", 0, buf, ?\t, ?|)
      assert_equal(cols, buf.size, "Reported size.")
      assert_equal(col.size, buf.size, "Size.")
      assert_equal(col, buf, str)
    end

    buf = []
    CSV.parse_row("a,b,c", 0, buf, nil, nil)
    assert_equal(['a', 'b', 'c'], buf)

    buf = []
    CSV.parse_row("a,b,c", 0, buf, nil, ?b)
    assert_equal(['a', nil], buf)

    buf = []
    CSV.parse_row("a,b,c", 0, buf, nil, "c")
    assert_equal(['a', 'b', nil], buf)

    buf = Array.new
    cols, idx = CSV.parse_row("a,b,\"c\r\"", 0, buf)
    assert_equal(["a", "b", "c\r"], buf.to_a)

    buf = Array.new
    cols, idx = CSV.parse_row("a;b;\"c\r\"", 0, buf, ?;)
    assert_equal(["a", "b", "c\r"], buf.to_a)

    buf = Array.new
    cols, idx = CSV.parse_row("a\tb\t\"c\r\"", 0, buf, ?\t)
    assert_equal(["a", "b", "c\r"], buf.to_a)

    buf = Array.new
    cols, idx = CSV.parse_row("a,b,c\n", 0, buf, ?,, ?\n)
    assert_equal(["a", "b", "c"], buf.to_a)

    buf = Array.new
    cols, idx = CSV.parse_row("a\tb\tc\n", 0, buf, ?\t, ?\n)
    assert_equal(["a", "b", "c"], buf.to_a)

    # Illegal format.
    buf = Array.new
    cols, idx = CSV.parse_row("a,b,c\"", 0, buf)
    assert_equal(0, cols, "Illegal format; unbalanced double-quote.")

    buf = Array.new
    cols, idx = CSV.parse_row("a;b;c\"", 0, buf, ?;)
    assert_equal(0, cols, "Illegal format; unbalanced double-quote.")

    buf = Array.new
    cols, idx = CSV.parse_row("a,b,\"c\"\ra", 0, buf)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("a,b,\"c\"\ra", 0, buf, ?;)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("a,b\"", 0, buf)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("a;b\"", 0, buf, ?;)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("\"a,b\"\r,", 0, buf)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("a\r,", 0, buf)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("a\r", 0, buf)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("a\rbc", 0, buf)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("a\r\"\"", 0, buf)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("a\r\rabc,", 0, buf)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("\"a;b\"\r;", 0, buf, ?;)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("\"a,b\"\r\"", 0, buf)
    assert_equal(0, cols)
    assert_equal(0, idx)

    buf = Array.new
    cols, idx = CSV.parse_row("\"a;b\"\r\"", 0, buf, ?;)
    assert_equal(0, cols)
    assert_equal(0, idx)
  end

  def test_s_parse_rowEOF
    @@fullCSVData.each do |col, str|
      if str == ''
	# String "" is not allowed.
	next
      end
      buf = Array.new
      cols, idx = CSV.parse_row(str, 0, buf)
      assert_equal(col.size, cols, "Reported size.")
      assert_equal(col.size, buf.size, "Size.")
      assert_equal(col, buf)
    end
  end

  def test_s_parse_rowConcat
    buf = ''
    toBe = []
    @@fullCSVData.each do |col, str|
      buf  << str << "\r\n"
      toBe.concat(col)
    end
    idx = 0
    cols = 0
    parsed = Array.new
    parsedCols = 0
    begin
      cols, idx = CSV.parse_row(buf, idx, parsed)
      parsedCols += cols
    end while cols > 0
    assert_equal(toBe.size, parsedCols)
    assert_equal(toBe.size, parsed.size)
    assert_equal(toBe, parsed)

    buf = ''
    toBe = []