summaryrefslogtreecommitdiffstats
path: root/ipatests/test_ipaserver
diff options
context:
space:
mode:
authorPetr Viktorin <pviktori@redhat.com>2013-05-21 13:40:27 +0200
committerMartin Kosek <mkosek@redhat.com>2013-06-17 19:22:50 +0200
commitc60142efda817f030a7495cd6fe4a19953e55afa (patch)
tree31a840ceddd4381311bbc879f9851bb71a8e2ffa /ipatests/test_ipaserver
parent6d66e826c1c248dffc80056b20c1e4b74b04d46f (diff)
downloadfreeipa-c60142efda817f030a7495cd6fe4a19953e55afa.tar.gz
freeipa-c60142efda817f030a7495cd6fe4a19953e55afa.tar.xz
freeipa-c60142efda817f030a7495cd6fe4a19953e55afa.zip
Make an ipa-tests package
Rename the 'tests' directory to 'ipa-tests', and create an ipa-tests RPM containing the test suite Part of the work for: https://fedorahosted.org/freeipa/ticket/3654
Diffstat (limited to 'ipatests/test_ipaserver')
-rw-r--r--ipatests/test_ipaserver/__init__.py22
-rw-r--r--ipatests/test_ipaserver/httptest.py52
-rwxr-xr-xipatests/test_ipaserver/install/test_adtrustinstance.py59
-rw-r--r--ipatests/test_ipaserver/test_changepw.py107
-rw-r--r--ipatests/test_ipaserver/test_ldap.py259
-rw-r--r--ipatests/test_ipaserver/test_rpcserver.py247
6 files changed, 746 insertions, 0 deletions
diff --git a/ipatests/test_ipaserver/__init__.py b/ipatests/test_ipaserver/__init__.py
new file mode 100644
index 000000000..2192cc291
--- /dev/null
+++ b/ipatests/test_ipaserver/__init__.py
@@ -0,0 +1,22 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Sub-package containing unit tests for `ipaserver` package.
+"""
diff --git a/ipatests/test_ipaserver/httptest.py b/ipatests/test_ipaserver/httptest.py
new file mode 100644
index 000000000..7f1b5b136
--- /dev/null
+++ b/ipatests/test_ipaserver/httptest.py
@@ -0,0 +1,52 @@
+# Authors:
+# Martin Kosek <mkosek@redhat.com>
+#
+# Copyright (C) 2012 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Base class for HTTP request tests
+"""
+
+import urllib
+import httplib
+
+from ipalib import api
+
+class Unauthorized_HTTP_test(object):
+ """
+ Base class for simple HTTP request tests executed against URI
+ with no required authorization
+ """
+ app_uri = ''
+ host = api.env.host
+ content_type = 'application/x-www-form-urlencoded'
+
+ def send_request(self, method='POST', params=None):
+ """
+ Send a request to HTTP server
+
+ :param key When not None, overrides default app_uri
+ """
+ if params is not None:
+ params = urllib.urlencode(params, True)
+ url = 'https://' + self.host + self.app_uri
+
+ headers = {'Content-Type' : self.content_type,
+ 'Referer' : url}
+
+ conn = httplib.HTTPSConnection(self.host)
+ conn.request(method, self.app_uri, params, headers)
+ return conn.getresponse()
diff --git a/ipatests/test_ipaserver/install/test_adtrustinstance.py b/ipatests/test_ipaserver/install/test_adtrustinstance.py
new file mode 100755
index 000000000..9a62f87ce
--- /dev/null
+++ b/ipatests/test_ipaserver/install/test_adtrustinstance.py
@@ -0,0 +1,59 @@
+# Authors:
+# Sumit Bose <sbose@redhat.com>
+#
+# Copyright (C) 2011 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Test `adtrustinstance`
+"""
+
+import os
+import nose
+
+from ipaserver.install import adtrustinstance
+
+class test_adtrustinstance:
+ """
+ Test `adtrustinstance`.
+ """
+
+ def test_make_netbios_name(self):
+ s = adtrustinstance.make_netbios_name("ABCDEF")
+ assert s == 'ABCDEF' and isinstance(s, str)
+ s = adtrustinstance.make_netbios_name(U"ABCDEF")
+ assert s == 'ABCDEF' and isinstance(s, unicode)
+ s = adtrustinstance.make_netbios_name("abcdef")
+ assert s == 'ABCDEF'
+ s = adtrustinstance.make_netbios_name("abc.def")
+ assert s == 'ABC'
+ s = adtrustinstance.make_netbios_name("abcdefghijklmnopqr.def")
+ assert s == 'ABCDEFGHIJKLMNO'
+ s = adtrustinstance.make_netbios_name("A!$%B&/()C=?+*D")
+ assert s == 'ABCD'
+ s = adtrustinstance.make_netbios_name("!$%&/()=?+*")
+ assert not s
+
+ def test_check_netbios_name(self):
+ assert adtrustinstance.check_netbios_name("ABCDEF")
+ assert not adtrustinstance.check_netbios_name("abcdef")
+ assert adtrustinstance.check_netbios_name("ABCDE12345ABCDE")
+ assert not adtrustinstance.check_netbios_name("ABCDE12345ABCDE1")
+ assert not adtrustinstance.check_netbios_name("")
+
+ assert adtrustinstance.check_netbios_name(U"ABCDEF")
+ assert not adtrustinstance.check_netbios_name(U"abcdef")
+ assert adtrustinstance.check_netbios_name(U"ABCDE12345ABCDE")
+ assert not adtrustinstance.check_netbios_name(U"ABCDE12345ABCDE1")
diff --git a/ipatests/test_ipaserver/test_changepw.py b/ipatests/test_ipaserver/test_changepw.py
new file mode 100644
index 000000000..040c9cd36
--- /dev/null
+++ b/ipatests/test_ipaserver/test_changepw.py
@@ -0,0 +1,107 @@
+# Authors:
+# Martin Kosek <mkosek@redhat.com>
+#
+# Copyright (C) 2012 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import nose
+
+from httptest import Unauthorized_HTTP_test
+from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test
+from ipatests.util import assert_equal, assert_not_equal
+from ipalib import api, errors
+from ipapython.dn import DN
+import ldap
+
+testuser = u'tuser'
+old_password = u'old_password'
+new_password = u'new_password'
+
+class test_changepw(XMLRPC_test, Unauthorized_HTTP_test):
+ app_uri = '/ipa/session/change_password'
+
+ def setUp(self):
+ super(test_changepw, self).setUp()
+ try:
+ api.Command['user_add'](uid=testuser, givenname=u'Test', sn=u'User')
+ api.Command['passwd'](testuser, password=u'old_password')
+ except errors.ExecutionError, e:
+ raise nose.SkipTest(
+ 'Cannot set up test user: %s' % e
+ )
+
+ def tearDown(self):
+ try:
+ api.Command['user_del']([testuser])
+ except errors.NotFound:
+ pass
+ super(test_changepw, self).tearDown()
+
+ def _changepw(self, user, old_password, new_password):
+ return self.send_request(params={'user': str(user),
+ 'old_password' : str(old_password),
+ 'new_password' : str(new_password)},
+ )
+
+ def _checkpw(self, user, password):
+ dn = str(DN(('uid', user), api.env.container_user, api.env.basedn))
+ conn = ldap.initialize(api.env.ldap_uri)
+ try:
+ conn.simple_bind_s(dn, password)
+ finally:
+ conn.unbind_s()
+
+ def test_bad_options(self):
+ for params in (None, # no params
+ {'user': 'foo'}, # missing options
+ {'user': 'foo',
+ 'old_password' : 'old'}, # missing option
+ {'user': 'foo',
+ 'old_password' : 'old',
+ 'new_password' : ''}, # empty option
+ ):
+ response = self.send_request(params=params)
+ assert_equal(response.status, 400)
+ assert_equal(response.reason, 'Bad Request')
+
+ def test_invalid_auth(self):
+ response = self._changepw(testuser, 'wrongpassword', 'new_password')
+
+ assert_equal(response.status, 200)
+ assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'invalid-password')
+
+ # make sure that password is NOT changed
+ self._checkpw(testuser, old_password)
+
+ def test_pwpolicy_error(self):
+ response = self._changepw(testuser, old_password, '1')
+
+ assert_equal(response.status, 200)
+ assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'policy-error')
+ assert_equal(response.getheader('X-IPA-Pwchange-Policy-Error'),
+ 'Constraint violation: Password is too short')
+
+ # make sure that password is NOT changed
+ self._checkpw(testuser, old_password)
+
+ def test_pwpolicy_success(self):
+ response = self._changepw(testuser, old_password, new_password)
+
+ assert_equal(response.status, 200)
+ assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'ok')
+
+ # make sure that password IS changed
+ self._checkpw(testuser, new_password)
diff --git a/ipatests/test_ipaserver/test_ldap.py b/ipatests/test_ipaserver/test_ldap.py
new file mode 100644
index 000000000..21363f2ef
--- /dev/null
+++ b/ipatests/test_ipaserver/test_ldap.py
@@ -0,0 +1,259 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2010 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# Test some simple LDAP requests using the ldap2 backend
+
+# This fetches a certificate from a host principal so we can ensure that the
+# schema is working properly. We know this because the schema will tell the
+# encoder not to utf-8 encode binary attributes.
+
+# The DM password needs to be set in ~/.ipa/.dmpw
+
+import os
+
+import nose
+from nose.tools import assert_raises # pylint: disable=E0611
+import nss.nss as nss
+
+from ipaserver.plugins.ldap2 import ldap2
+from ipalib.plugins.service import service, service_show
+from ipalib.plugins.host import host
+from ipalib import api, x509, create_api, errors
+from ipapython import ipautil
+from ipapython.dn import DN
+
+class test_ldap(object):
+ """
+ Test various LDAP client bind methods.
+ """
+
+ def setUp(self):
+ self.conn = None
+ self.ldapuri = 'ldap://%s' % ipautil.format_netloc(api.env.host)
+ self.ccache = '/tmp/krb5cc_%d' % os.getuid()
+ nss.nss_init_nodb()
+ self.dn = DN(('krbprincipalname','ldap/%s@%s' % (api.env.host, api.env.realm)),
+ ('cn','services'),('cn','accounts'),api.env.basedn)
+
+ def tearDown(self):
+ if self.conn and self.conn.isconnected():
+ self.conn.disconnect()
+
+ def test_anonymous(self):
+ """
+ Test an anonymous LDAP bind using ldap2
+ """
+ self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
+ self.conn.connect()
+ (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
+ cert = entry_attrs.get('usercertificate')
+ cert = cert[0]
+ serial = unicode(x509.get_serial_number(cert, x509.DER))
+ assert serial is not None
+
+ def test_GSSAPI(self):
+ """
+ Test a GSSAPI LDAP bind using ldap2
+ """
+ if not ipautil.file_exists(self.ccache):
+ raise nose.SkipTest('Missing ccache %s' % self.ccache)
+ self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
+ self.conn.connect(ccache='FILE:%s' % self.ccache)
+ (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
+ cert = entry_attrs.get('usercertificate')
+ cert = cert[0]
+ serial = unicode(x509.get_serial_number(cert, x509.DER))
+ assert serial is not None
+
+ def test_simple(self):
+ """
+ Test a simple LDAP bind using ldap2
+ """
+ pwfile = api.env.dot_ipa + os.sep + ".dmpw"
+ if ipautil.file_exists(pwfile):
+ fp = open(pwfile, "r")
+ dm_password = fp.read().rstrip()
+ fp.close()
+ else:
+ raise nose.SkipTest("No directory manager password in %s" % pwfile)
+ self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
+ self.conn.connect(bind_dn=DN(('cn', 'directory manager')), bind_pw=dm_password)
+ (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
+ cert = entry_attrs.get('usercertificate')
+ cert = cert[0]
+ serial = unicode(x509.get_serial_number(cert, x509.DER))
+ assert serial is not None
+
+ def test_Backend(self):
+ """
+ Test using the ldap2 Backend directly (ala ipa-server-install)
+ """
+
+ # Create our own api because the one generated for the tests is
+ # a client-only api. Then we register in the commands and objects
+ # we need for the test.
+ myapi = create_api(mode=None)
+ myapi.bootstrap(context='cli', in_server=True, in_tree=True)
+ myapi.register(ldap2)
+ myapi.register(host)
+ myapi.register(service)
+ myapi.register(service_show)
+ myapi.finalize()
+
+ pwfile = api.env.dot_ipa + os.sep + ".dmpw"
+ if ipautil.file_exists(pwfile):
+ fp = open(pwfile, "r")
+ dm_password = fp.read().rstrip()
+ fp.close()
+ else:
+ raise nose.SkipTest("No directory manager password in %s" % pwfile)
+ myapi.Backend.ldap2.connect(bind_dn=DN(('cn', 'Directory Manager')), bind_pw=dm_password)
+
+ result = myapi.Command['service_show']('ldap/%s@%s' % (api.env.host, api.env.realm,))
+ entry_attrs = result['result']
+ cert = entry_attrs.get('usercertificate')
+ cert = cert[0]
+ serial = unicode(x509.get_serial_number(cert, x509.DER))
+ assert serial is not None
+
+ def test_autobind(self):
+ """
+ Test an autobind LDAP bind using ldap2
+ """
+ ldapuri = 'ldapi://%%2fvar%%2frun%%2fslapd-%s.socket' % api.env.realm.replace('.','-')
+ self.conn = ldap2(shared_instance=False, ldap_uri=ldapuri)
+ try:
+ self.conn.connect(autobind=True)
+ except errors.ACIError:
+ raise nose.SkipTest("Only executed as root")
+ (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
+ cert = entry_attrs.get('usercertificate')
+ cert = cert[0]
+ serial = unicode(x509.get_serial_number(cert, x509.DER))
+ assert serial is not None
+
+
+class test_LDAPEntry(object):
+ """
+ Test the LDAPEntry class
+ """
+ cn1 = [u'test1']
+ cn2 = [u'test2']
+ dn1 = DN(('cn', cn1[0]))
+ dn2 = DN(('cn', cn2[0]))
+
+ def setUp(self):
+ self.ldapuri = 'ldap://%s' % ipautil.format_netloc(api.env.host)
+ self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
+ self.conn.connect()
+
+ self.entry = self.conn.make_entry(self.dn1, cn=self.cn1)
+
+ def tearDown(self):
+ if self.conn and self.conn.isconnected():
+ self.conn.disconnect()
+
+ def test_entry(self):
+ e = self.entry
+ assert e.dn is self.dn1
+ assert u'cn' in e
+ assert u'cn' in e.keys()
+ assert 'CN' in e
+ assert 'CN' not in e.keys()
+ assert 'commonName' in e
+ assert 'commonName' not in e.keys()
+ assert e['CN'] is self.cn1
+ assert e['CN'] is e[u'cn']
+
+ e.dn = self.dn2
+ assert e.dn is self.dn2
+
+ def test_set_attr(self):
+ e = self.entry
+ e['commonName'] = self.cn2
+ assert u'cn' in e
+ assert u'cn' not in e.keys()
+ assert 'CN' in e
+ assert 'CN' not in e.keys()
+ assert 'commonName' in e
+ assert 'commonName' in e.keys()
+ assert e['CN'] is self.cn2
+ assert e['CN'] is e[u'cn']
+
+ def test_del_attr(self):
+ e = self.entry
+ del e['CN']
+ assert 'CN' not in e
+ assert 'CN' not in e.keys()
+ assert u'cn' not in e
+ assert u'cn' not in e.keys()
+ assert 'commonName' not in e
+ assert 'commonName' not in e.keys()
+
+ def test_popitem(self):
+ e = self.entry
+ assert e.popitem() == ('cn', self.cn1)
+ e.keys() == []
+
+ def test_setdefault(self):
+ e = self.entry
+ assert e.setdefault('cn', self.cn2) == self.cn1
+ assert e['cn'] == self.cn1
+ assert e.setdefault('xyz', self.cn2) == self.cn2
+ assert e['xyz'] == self.cn2
+
+ def test_update(self):
+ e = self.entry
+ e.update({'cn': self.cn2}, xyz=self.cn2)
+ assert e['cn'] == self.cn2
+ assert e['xyz'] == self.cn2
+
+ def test_pop(self):
+ e = self.entry
+ assert e.pop('cn') == self.cn1
+ assert 'cn' not in e
+ assert e.pop('cn', 'default') is 'default'
+ with assert_raises(KeyError):
+ e.pop('cn')
+
+ def test_clear(self):
+ e = self.entry
+ e.clear()
+ assert not e
+ assert 'cn' not in e
+
+ def test_has_key(self):
+ e = self.entry
+ assert not e.has_key('xyz')
+ assert e.has_key('cn')
+ assert e.has_key('COMMONNAME')
+
+ def test_get(self):
+ e = self.entry
+ assert e.get('cn') == self.cn1
+ assert e.get('commonname') == self.cn1
+ assert e.get('COMMONNAME', 'default') == self.cn1
+ assert e.get('bad key', 'default') == 'default'
+
+ def test_single_value(self):
+ e = self.entry
+ assert e.single_value('cn') == self.cn1[0]
+ assert e.single_value('commonname') == self.cn1[0]
+ assert e.single_value('COMMONNAME', 'default') == self.cn1[0]
+ assert e.single_value('bad key', 'default') == 'default'
diff --git a/ipatests/test_ipaserver/test_rpcserver.py b/ipatests/test_ipaserver/test_rpcserver.py
new file mode 100644
index 000000000..bd5673844
--- /dev/null
+++ b/ipatests/test_ipaserver/test_rpcserver.py
@@ -0,0 +1,247 @@
+# Authors:
+# Jason Gerard DeRose <jderose@redhat.com>
+#
+# Copyright (C) 2008 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Test the `ipaserver.rpc` module.
+"""
+
+import json
+
+from ipatests.util import create_test_api, assert_equal, raises, PluginTester
+from ipatests.data import unicode_str
+from ipalib import errors, Command
+from ipaserver import rpcserver
+
+
+class StartResponse(object):
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.status = None
+ self.headers = None
+
+ def __call__(self, status, headers):
+ assert self.status is None
+ assert self.headers is None
+ assert isinstance(status, str)
+ assert isinstance(headers, list)
+ self.status = status
+ self.headers = headers
+
+
+def test_not_found():
+ f = rpcserver.HTTP_Status()
+ t = rpcserver._not_found_template
+ s = StartResponse()
+
+ # Test with an innocent URL:
+ url = '/ipa/foo/stuff'
+ assert_equal(
+ f.not_found(None, s, url, None),
+ [t % dict(url='/ipa/foo/stuff')]
+ )
+ assert s.status == '404 Not Found'
+ assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
+
+ # Test when URL contains any of '<>&'
+ s.reset()
+ url ='&nbsp;' + '<script>do_bad_stuff();</script>'
+ assert_equal(
+ f.not_found(None, s, url, None),
+ [t % dict(url='&amp;nbsp;&lt;script&gt;do_bad_stuff();&lt;/script&gt;')]
+ )
+ assert s.status == '404 Not Found'
+ assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
+
+
+def test_bad_request():
+ f = rpcserver.HTTP_Status()
+ t = rpcserver._bad_request_template
+ s = StartResponse()
+
+ assert_equal(
+ f.bad_request(None, s, 'illegal request'),
+ [t % dict(message='illegal request')]
+ )
+ assert s.status == '400 Bad Request'
+ assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
+
+
+def test_internal_error():
+ f = rpcserver.HTTP_Status()
+ t = rpcserver._internal_error_template
+ s = StartResponse()
+
+ assert_equal(
+ f.internal_error(None, s, 'request failed'),
+ [t % dict(message='request failed')]
+ )
+ assert s.status == '500 Internal Server Error'
+ assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
+
+
+def test_unauthorized_error():
+ f = rpcserver.HTTP_Status()
+ t = rpcserver._unauthorized_template
+ s = StartResponse()
+
+ assert_equal(
+ f.unauthorized(None, s, 'unauthorized', 'password-expired'),
+ [t % dict(message='unauthorized')]
+ )
+ assert s.status == '401 Unauthorized'
+ assert s.headers == [('Content-Type', 'text/html; charset=utf-8'),
+ ('X-IPA-Rejection-Reason', 'password-expired')]
+
+
+def test_params_2_args_options():
+ """
+ Test the `ipaserver.rpcserver.params_2_args_options` function.
+ """
+ f = rpcserver.params_2_args_options
+ args = ('Hello', u'world!')
+ options = dict(one=1, two=u'Two', three='Three')
+ assert f(tuple()) == (tuple(), dict())
+ assert f([args]) == (args, dict())
+ assert f([args, options]) == (args, options)
+
+
+class test_session(object):
+ klass = rpcserver.wsgi_dispatch
+
+ def test_route(self):
+ def app1(environ, start_response):
+ return (
+ 'from 1',
+ [environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')]
+ )
+
+ def app2(environ, start_response):
+ return (
+ 'from 2',
+ [environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')]
+ )
+
+ inst = self.klass()
+ inst.mount(app1, '/foo/stuff')
+ inst.mount(app2, '/bar')
+
+ d = dict(SCRIPT_NAME='/ipa', PATH_INFO='/foo/stuff')
+ assert inst.route(d, None) == ('from 1', ['/ipa', '/foo/stuff'])
+
+ d = dict(SCRIPT_NAME='/ipa', PATH_INFO='/bar')
+ assert inst.route(d, None) == ('from 2', ['/ipa', '/bar'])
+
+ def test_mount(self):
+ def app1(environ, start_response):
+ pass
+
+ def app2(environ, start_response):
+ pass
+
+ # Test that mount works:
+ inst = self.klass()
+ inst.mount(app1, 'foo')
+ assert inst['foo'] is app1
+ assert list(inst) == ['foo']
+
+ # Test that StandardError is raise if trying override a mount:
+ e = raises(StandardError, inst.mount, app2, 'foo')
+ assert str(e) == '%s.mount(): cannot replace %r with %r at %r' % (
+ 'wsgi_dispatch', app1, app2, 'foo'
+ )
+
+ # Test mounting a second app:
+ inst.mount(app2, 'bar')
+ assert inst['bar'] is app2
+ assert list(inst) == ['bar', 'foo']
+
+
+class test_xmlserver(PluginTester):
+ """
+ Test the `ipaserver.rpcserver.xmlserver` plugin.
+ """
+
+ _plugin = rpcserver.xmlserver
+
+ def test_marshaled_dispatch(self): # FIXME
+ (o, api, home) = self.instance('Backend', in_server=True)
+
+
+class test_jsonserver(PluginTester):
+ """
+ Test the `ipaserver.rpcserver.jsonserver` plugin.
+ """
+
+ _plugin = rpcserver.jsonserver
+
+ def test_unmarshal(self):
+ """
+ Test the `ipaserver.rpcserver.jsonserver.unmarshal` method.
+ """
+ (o, api, home) = self.instance('Backend', in_server=True)
+
+ # Test with invalid JSON-data:
+ e = raises(errors.JSONError, o.unmarshal, 'this wont work')
+ assert isinstance(e.error, ValueError)
+ assert unicode(e.error) == 'No JSON object could be decoded'
+
+ # Test with non-dict type:
+ e = raises(errors.JSONError, o.unmarshal, json.dumps([1, 2, 3]))
+ assert unicode(e.error) == 'Request must be a dict'
+
+ params = [[1, 2], dict(three=3, four=4)]
+ # Test with missing method:
+ d = dict(params=params, id=18)
+ e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
+ assert unicode(e.error) == 'Request is missing "method"'
+
+ # Test with missing params:
+ d = dict(method='echo', id=18)
+ e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
+ assert unicode(e.error) == 'Request is missing "params"'
+
+ # Test with non-list params:
+ for p in ('hello', dict(args=tuple(), options=dict())):
+ d = dict(method='echo', id=18, params=p)
+ e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
+ assert unicode(e.error) == 'params must be a list'
+
+ # Test with other than 2 params:
+ for p in ([], [tuple()], [None, dict(), tuple()]):
+ d = dict(method='echo', id=18, params=p)
+ e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
+ assert unicode(e.error) == 'params must contain [args, options]'
+
+ # Test when args is not a list:
+ d = dict(method='echo', id=18, params=['args', dict()])
+ e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
+ assert unicode(e.error) == 'params[0] (aka args) must be a list'
+
+ # Test when options is not a dict:
+ d = dict(method='echo', id=18, params=[('hello', 'world'), 'options'])
+ e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
+ assert unicode(e.error) == 'params[1] (aka options) must be a dict'
+
+ # Test with valid values:
+ args = [u'jdoe']
+ options = dict(givenname=u'John', sn='Doe')
+ d = dict(method=u'user_add', params=[args, options], id=18)
+ assert o.unmarshal(json.dumps(d)) == (u'user_add', args, options, 18)