summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ipalib/plugins/baseldap.py89
-rw-r--r--tests/test_xmlrpc/test_attr.py85
2 files changed, 131 insertions, 43 deletions
diff --git a/ipalib/plugins/baseldap.py b/ipalib/plugins/baseldap.py
index 6a37995c5..32dae5160 100644
--- a/ipalib/plugins/baseldap.py
+++ b/ipalib/plugins/baseldap.py
@@ -784,36 +784,45 @@ last, after all sets and adds."""),
:param attrs: A list of name/value pair strings, in the "name=value"
format. May also be a single string, or None.
"""
-
- newdict = {}
if attrs is None:
- attrs = []
- elif not type(attrs) in (list, tuple):
+ return {}
+
+ if not isinstance(attrs, (tuple, list)):
attrs = [attrs]
+
+ newdict = {}
for a in attrs:
- m = re.match("\s*(.*?)\s*=\s*(.*?)\s*$", a)
- attr = str(m.group(1)).lower()
- value = m.group(2)
+ m = re.match("^\s*(?P<attr>.*?)\s*=\s*(?P<value>.*?)\s*$", a)
+ attr = str(m.group('attr').lower())
+ value = m.group('value')
+
if attr in self.obj.params and attr not in self.params:
# The attribute is managed by IPA, but it didn't get cloned
# to the command. This happens with no_update/no_create attrs.
raise errors.ValidationError(
name=attr, error=_('attribute is not configurable'))
- if len(value) == 0:
- # None means "delete this attribute"
- value = None
- if attr in newdict:
- if type(value) in (tuple,):
- newdict[attr] += list(value)
- else:
- newdict[attr].append(value)
- else:
- if type(value) in (tuple,):
- newdict[attr] = list(value)
- else:
- newdict[attr] = [value]
+
+ newdict.setdefault(attr, []).append(value)
+
return newdict
+ def _convert_entry(self, entry_attrs):
+ result = {}
+ for attr, val in entry_attrs.iteritems():
+ if val is None:
+ val = []
+ elif not isinstance(val, (tuple, list)):
+ val = [val]
+
+ result[attr] = []
+ for v in val:
+ if isinstance(v, str):
+ # This is a Binary value, base64 encode it
+ v = base64.b64encode(v)
+ result[attr].append(unicode(v))
+
+ return result
+
def process_attr_options(self, entry_attrs, dn, keys, options):
"""
Process all --setattr, --addattr, and --delattr options and add the
@@ -860,19 +869,20 @@ last, after all sets and adds."""),
direct_del = setattrs & delattrs
needldapattrs = list((addattrs | delattrs) - setattrs)
+ mod_attrs = self._convert_entry(entry_attrs)
+
for attr, val in setdict.iteritems():
- entry_attrs[attr] = val
+ mod_attrs[attr] = val
for attr in direct_add:
- entry_attrs.setdefault(attr, []).extend(adddict[attr])
+ mod_attrs.setdefault(attr, []).extend(adddict[attr])
for attr in direct_del:
for delval in deldict[attr]:
try:
- entry_attrs[attr].remove(delval)
+ mod_attrs[attr].remove(delval)
except ValueError:
- raise errors.AttrValueNotFound(attr=attr,
- value=delval)
+ raise errors.AttrValueNotFound(attr=attr, value=delval)
if needldapattrs:
try:
@@ -891,28 +901,27 @@ last, after all sets and adds."""),
raise errors.ValidationError(name=del_nonexisting.pop(),
error=_('No such attribute on this entry'))
+ old_entry = self._convert_entry(old_entry)
+
for attr in needldapattrs:
- entry_attrs[attr] = old_entry.get(attr, [])
+ mod_attrs[attr] = old_entry.get(attr, [])
if attr in addattrs:
- entry_attrs[attr].extend(adddict.get(attr, []))
+ mod_attrs[attr].extend(adddict.get(attr, []))
for delval in deldict.get(attr, []):
try:
- entry_attrs[attr].remove(delval)
+ mod_attrs[attr].remove(delval)
except ValueError:
- if isinstance(delval, str):
- # This is a Binary value, base64 encode it
- delval = unicode(base64.b64encode(delval))
raise errors.AttrValueNotFound(attr=attr, value=delval)
# normalize all values
changedattrs = setattrs | addattrs | delattrs
for attr in changedattrs:
+ value = mod_attrs[attr]
if attr in self.params and self.params[attr].attribute:
- # convert single-value params to scalars
param = self.params[attr]
- value = entry_attrs[attr]
+ # convert single-value params to scalars
if not param.multivalue:
if len(value) == 1:
value = value[0]
@@ -922,19 +931,19 @@ last, after all sets and adds."""),
raise errors.OnlyOneValueAllowed(attr=attr)
# validate, convert and encode params
try:
- value = param(value)
+ value = param(value)
except errors.ValidationError, err:
raise errors.ValidationError(name=attr, error=err.error)
except errors.ConversionError, err:
raise errors.ConversionError(name=attr, error=err.error)
- entry_attrs[attr] = value
else:
# unknown attribute: remove duplicite and invalid values
- entry_attrs[attr] = list(set([val for val in entry_attrs[attr] if val]))
- if not entry_attrs[attr]:
- entry_attrs[attr] = None
- elif isinstance(entry_attrs[attr], (tuple, list)) and len(entry_attrs[attr]) == 1:
- entry_attrs[attr] = entry_attrs[attr][0]
+ value = list(set([val for val in value if val]))
+ if not value:
+ value = None
+ elif isinstance(value, (tuple, list)) and len(value) == 1:
+ value = value[0]
+ entry_attrs[attr] = value
@classmethod
def register_pre_callback(cls, callback, first=False):
diff --git a/tests/test_xmlrpc/test_attr.py b/tests/test_xmlrpc/test_attr.py
index 8b78c97b4..f5003c403 100644
--- a/tests/test_xmlrpc/test_attr.py
+++ b/tests/test_xmlrpc/test_attr.py
@@ -21,17 +21,29 @@
Test --setattr and --addattr and other attribute-specific issues
"""
-from ipalib import api, errors
+from ipalib import api, errors, x509
from tests.test_xmlrpc import objectclasses
-from xmlrpc_test import Declarative, fuzzy_digits, fuzzy_uuid
+from xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_date,
+ fuzzy_hex, fuzzy_hash, fuzzy_issuer)
from ipalib.dn import *
+import base64
-user1=u'tuser1'
+user1 = u'tuser1'
+fqdn1 = u'testhost1.%s' % api.env.domain
+
+# We can use the same cert we generated for the service tests
+fd = open('tests/test_xmlrpc/service.crt', 'r')
+servercert = fd.readlines()
+servercert = u''.join(servercert)
+servercert = x509.strip_header(servercert)
+servercert = servercert.replace('\n', '')
+fd.close()
class test_attr(Declarative):
cleanup_commands = [
('user_del', [user1], {}),
+ ('host_del', [fqdn1], {}),
]
tests = [
@@ -551,4 +563,71 @@ class test_attr(Declarative):
desc='Server is unwilling to perform', info=''),
),
+ dict(
+ desc='Try to create %r with description and --addattr description' % fqdn1,
+ command=('host_add', [fqdn1],
+ dict(
+ description=u'Test host 1',
+ addattr=u'description=Test host 2',
+ force=True,
+ ),
+ ),
+ expected=errors.OnlyOneValueAllowed(attr='description'),
+ ),
+
+ dict(
+ desc='Create %r with a certificate' % fqdn1,
+ command=('host_add', [fqdn1],
+ dict(
+ usercertificate=servercert,
+ force=True,
+ ),
+ ),
+ expected=dict(
+ value=fqdn1,
+ summary=u'Added host "%s"' % fqdn1,
+ result=dict(
+ dn=lambda x: DN(x) == DN(('fqdn',fqdn1),('cn','computers'),
+ ('cn','accounts'),api.env.basedn),
+ fqdn=[fqdn1],
+ krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)],
+ objectclass=objectclasses.host,
+ ipauniqueid=[fuzzy_uuid],
+ managedby_host=[fqdn1],
+ usercertificate=[base64.b64decode(servercert)],
+ valid_not_before=fuzzy_date,
+ valid_not_after=fuzzy_date,
+ subject=lambda x: DN(x) == \
+ DN(('CN',api.env.host),x509.subject_base()),
+ serial_number=fuzzy_digits,
+ serial_number_hex=fuzzy_hex,
+ md5_fingerprint=fuzzy_hash,
+ sha1_fingerprint=fuzzy_hash,
+ issuer=fuzzy_issuer,
+ has_keytab=False,
+ has_password=False,
+ ),
+ ),
+ ),
+
+ dict(
+ desc='Remove %r certificate using --delattr' % fqdn1,
+ command=('host_mod', [fqdn1],
+ dict(
+ delattr=u'usercertificate=%s' % servercert,
+ ),
+ ),
+ expected=dict(
+ value=fqdn1,
+ summary=u'Modified host "%s"' % fqdn1,
+ result=dict(
+ fqdn=[fqdn1],
+ krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)],
+ managedby_host=[fqdn1],
+ has_keytab=False,
+ has_password=False,
+ ),
+ ),
+ ),
+
]