diff options
Diffstat (limited to 'ipatests/test_ipaserver')
-rw-r--r-- | ipatests/test_ipaserver/__init__.py | 22 | ||||
-rw-r--r-- | ipatests/test_ipaserver/httptest.py | 52 | ||||
-rwxr-xr-x | ipatests/test_ipaserver/install/test_adtrustinstance.py | 59 | ||||
-rw-r--r-- | ipatests/test_ipaserver/test_changepw.py | 107 | ||||
-rw-r--r-- | ipatests/test_ipaserver/test_ldap.py | 259 | ||||
-rw-r--r-- | ipatests/test_ipaserver/test_rpcserver.py | 247 |
6 files changed, 746 insertions, 0 deletions
diff --git a/ipatests/test_ipaserver/__init__.py b/ipatests/test_ipaserver/__init__.py new file mode 100644 index 000000000..2192cc291 --- /dev/null +++ b/ipatests/test_ipaserver/__init__.py @@ -0,0 +1,22 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Sub-package containing unit tests for `ipaserver` package. +""" diff --git a/ipatests/test_ipaserver/httptest.py b/ipatests/test_ipaserver/httptest.py new file mode 100644 index 000000000..7f1b5b136 --- /dev/null +++ b/ipatests/test_ipaserver/httptest.py @@ -0,0 +1,52 @@ +# Authors: +# Martin Kosek <mkosek@redhat.com> +# +# Copyright (C) 2012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Base class for HTTP request tests +""" + +import urllib +import httplib + +from ipalib import api + +class Unauthorized_HTTP_test(object): + """ + Base class for simple HTTP request tests executed against URI + with no required authorization + """ + app_uri = '' + host = api.env.host + content_type = 'application/x-www-form-urlencoded' + + def send_request(self, method='POST', params=None): + """ + Send a request to HTTP server + + :param key When not None, overrides default app_uri + """ + if params is not None: + params = urllib.urlencode(params, True) + url = 'https://' + self.host + self.app_uri + + headers = {'Content-Type' : self.content_type, + 'Referer' : url} + + conn = httplib.HTTPSConnection(self.host) + conn.request(method, self.app_uri, params, headers) + return conn.getresponse() diff --git a/ipatests/test_ipaserver/install/test_adtrustinstance.py b/ipatests/test_ipaserver/install/test_adtrustinstance.py new file mode 100755 index 000000000..9a62f87ce --- /dev/null +++ b/ipatests/test_ipaserver/install/test_adtrustinstance.py @@ -0,0 +1,59 @@ +# Authors: +# Sumit Bose <sbose@redhat.com> +# +# Copyright (C) 2011 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test `adtrustinstance` +""" + +import os +import nose + +from ipaserver.install import adtrustinstance + +class test_adtrustinstance: + """ + Test `adtrustinstance`. + """ + + def test_make_netbios_name(self): + s = adtrustinstance.make_netbios_name("ABCDEF") + assert s == 'ABCDEF' and isinstance(s, str) + s = adtrustinstance.make_netbios_name(U"ABCDEF") + assert s == 'ABCDEF' and isinstance(s, unicode) + s = adtrustinstance.make_netbios_name("abcdef") + assert s == 'ABCDEF' + s = adtrustinstance.make_netbios_name("abc.def") + assert s == 'ABC' + s = adtrustinstance.make_netbios_name("abcdefghijklmnopqr.def") + assert s == 'ABCDEFGHIJKLMNO' + s = adtrustinstance.make_netbios_name("A!$%B&/()C=?+*D") + assert s == 'ABCD' + s = adtrustinstance.make_netbios_name("!$%&/()=?+*") + assert not s + + def test_check_netbios_name(self): + assert adtrustinstance.check_netbios_name("ABCDEF") + assert not adtrustinstance.check_netbios_name("abcdef") + assert adtrustinstance.check_netbios_name("ABCDE12345ABCDE") + assert not adtrustinstance.check_netbios_name("ABCDE12345ABCDE1") + assert not adtrustinstance.check_netbios_name("") + + assert adtrustinstance.check_netbios_name(U"ABCDEF") + assert not adtrustinstance.check_netbios_name(U"abcdef") + assert adtrustinstance.check_netbios_name(U"ABCDE12345ABCDE") + assert not adtrustinstance.check_netbios_name(U"ABCDE12345ABCDE1") diff --git a/ipatests/test_ipaserver/test_changepw.py b/ipatests/test_ipaserver/test_changepw.py new file mode 100644 index 000000000..040c9cd36 --- /dev/null +++ b/ipatests/test_ipaserver/test_changepw.py @@ -0,0 +1,107 @@ +# Authors: +# Martin Kosek <mkosek@redhat.com> +# +# Copyright (C) 2012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import nose + +from httptest import Unauthorized_HTTP_test +from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test +from ipatests.util import assert_equal, assert_not_equal +from ipalib import api, errors +from ipapython.dn import DN +import ldap + +testuser = u'tuser' +old_password = u'old_password' +new_password = u'new_password' + +class test_changepw(XMLRPC_test, Unauthorized_HTTP_test): + app_uri = '/ipa/session/change_password' + + def setUp(self): + super(test_changepw, self).setUp() + try: + api.Command['user_add'](uid=testuser, givenname=u'Test', sn=u'User') + api.Command['passwd'](testuser, password=u'old_password') + except errors.ExecutionError, e: + raise nose.SkipTest( + 'Cannot set up test user: %s' % e + ) + + def tearDown(self): + try: + api.Command['user_del']([testuser]) + except errors.NotFound: + pass + super(test_changepw, self).tearDown() + + def _changepw(self, user, old_password, new_password): + return self.send_request(params={'user': str(user), + 'old_password' : str(old_password), + 'new_password' : str(new_password)}, + ) + + def _checkpw(self, user, password): + dn = str(DN(('uid', user), api.env.container_user, api.env.basedn)) + conn = ldap.initialize(api.env.ldap_uri) + try: + conn.simple_bind_s(dn, password) + finally: + conn.unbind_s() + + def test_bad_options(self): + for params in (None, # no params + {'user': 'foo'}, # missing options + {'user': 'foo', + 'old_password' : 'old'}, # missing option + {'user': 'foo', + 'old_password' : 'old', + 'new_password' : ''}, # empty option + ): + response = self.send_request(params=params) + assert_equal(response.status, 400) + assert_equal(response.reason, 'Bad Request') + + def test_invalid_auth(self): + response = self._changepw(testuser, 'wrongpassword', 'new_password') + + assert_equal(response.status, 200) + assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'invalid-password') + + # make sure that password is NOT changed + self._checkpw(testuser, old_password) + + def test_pwpolicy_error(self): + response = self._changepw(testuser, old_password, '1') + + assert_equal(response.status, 200) + assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'policy-error') + assert_equal(response.getheader('X-IPA-Pwchange-Policy-Error'), + 'Constraint violation: Password is too short') + + # make sure that password is NOT changed + self._checkpw(testuser, old_password) + + def test_pwpolicy_success(self): + response = self._changepw(testuser, old_password, new_password) + + assert_equal(response.status, 200) + assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'ok') + + # make sure that password IS changed + self._checkpw(testuser, new_password) diff --git a/ipatests/test_ipaserver/test_ldap.py b/ipatests/test_ipaserver/test_ldap.py new file mode 100644 index 000000000..21363f2ef --- /dev/null +++ b/ipatests/test_ipaserver/test_ldap.py @@ -0,0 +1,259 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2010 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# Test some simple LDAP requests using the ldap2 backend + +# This fetches a certificate from a host principal so we can ensure that the +# schema is working properly. We know this because the schema will tell the +# encoder not to utf-8 encode binary attributes. + +# The DM password needs to be set in ~/.ipa/.dmpw + +import os + +import nose +from nose.tools import assert_raises # pylint: disable=E0611 +import nss.nss as nss + +from ipaserver.plugins.ldap2 import ldap2 +from ipalib.plugins.service import service, service_show +from ipalib.plugins.host import host +from ipalib import api, x509, create_api, errors +from ipapython import ipautil +from ipapython.dn import DN + +class test_ldap(object): + """ + Test various LDAP client bind methods. + """ + + def setUp(self): + self.conn = None + self.ldapuri = 'ldap://%s' % ipautil.format_netloc(api.env.host) + self.ccache = '/tmp/krb5cc_%d' % os.getuid() + nss.nss_init_nodb() + self.dn = DN(('krbprincipalname','ldap/%s@%s' % (api.env.host, api.env.realm)), + ('cn','services'),('cn','accounts'),api.env.basedn) + + def tearDown(self): + if self.conn and self.conn.isconnected(): + self.conn.disconnect() + + def test_anonymous(self): + """ + Test an anonymous LDAP bind using ldap2 + """ + self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri) + self.conn.connect() + (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate']) + cert = entry_attrs.get('usercertificate') + cert = cert[0] + serial = unicode(x509.get_serial_number(cert, x509.DER)) + assert serial is not None + + def test_GSSAPI(self): + """ + Test a GSSAPI LDAP bind using ldap2 + """ + if not ipautil.file_exists(self.ccache): + raise nose.SkipTest('Missing ccache %s' % self.ccache) + self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri) + self.conn.connect(ccache='FILE:%s' % self.ccache) + (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate']) + cert = entry_attrs.get('usercertificate') + cert = cert[0] + serial = unicode(x509.get_serial_number(cert, x509.DER)) + assert serial is not None + + def test_simple(self): + """ + Test a simple LDAP bind using ldap2 + """ + pwfile = api.env.dot_ipa + os.sep + ".dmpw" + if ipautil.file_exists(pwfile): + fp = open(pwfile, "r") + dm_password = fp.read().rstrip() + fp.close() + else: + raise nose.SkipTest("No directory manager password in %s" % pwfile) + self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri) + self.conn.connect(bind_dn=DN(('cn', 'directory manager')), bind_pw=dm_password) + (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate']) + cert = entry_attrs.get('usercertificate') + cert = cert[0] + serial = unicode(x509.get_serial_number(cert, x509.DER)) + assert serial is not None + + def test_Backend(self): + """ + Test using the ldap2 Backend directly (ala ipa-server-install) + """ + + # Create our own api because the one generated for the tests is + # a client-only api. Then we register in the commands and objects + # we need for the test. + myapi = create_api(mode=None) + myapi.bootstrap(context='cli', in_server=True, in_tree=True) + myapi.register(ldap2) + myapi.register(host) + myapi.register(service) + myapi.register(service_show) + myapi.finalize() + + pwfile = api.env.dot_ipa + os.sep + ".dmpw" + if ipautil.file_exists(pwfile): + fp = open(pwfile, "r") + dm_password = fp.read().rstrip() + fp.close() + else: + raise nose.SkipTest("No directory manager password in %s" % pwfile) + myapi.Backend.ldap2.connect(bind_dn=DN(('cn', 'Directory Manager')), bind_pw=dm_password) + + result = myapi.Command['service_show']('ldap/%s@%s' % (api.env.host, api.env.realm,)) + entry_attrs = result['result'] + cert = entry_attrs.get('usercertificate') + cert = cert[0] + serial = unicode(x509.get_serial_number(cert, x509.DER)) + assert serial is not None + + def test_autobind(self): + """ + Test an autobind LDAP bind using ldap2 + """ + ldapuri = 'ldapi://%%2fvar%%2frun%%2fslapd-%s.socket' % api.env.realm.replace('.','-') + self.conn = ldap2(shared_instance=False, ldap_uri=ldapuri) + try: + self.conn.connect(autobind=True) + except errors.ACIError: + raise nose.SkipTest("Only executed as root") + (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate']) + cert = entry_attrs.get('usercertificate') + cert = cert[0] + serial = unicode(x509.get_serial_number(cert, x509.DER)) + assert serial is not None + + +class test_LDAPEntry(object): + """ + Test the LDAPEntry class + """ + cn1 = [u'test1'] + cn2 = [u'test2'] + dn1 = DN(('cn', cn1[0])) + dn2 = DN(('cn', cn2[0])) + + def setUp(self): + self.ldapuri = 'ldap://%s' % ipautil.format_netloc(api.env.host) + self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri) + self.conn.connect() + + self.entry = self.conn.make_entry(self.dn1, cn=self.cn1) + + def tearDown(self): + if self.conn and self.conn.isconnected(): + self.conn.disconnect() + + def test_entry(self): + e = self.entry + assert e.dn is self.dn1 + assert u'cn' in e + assert u'cn' in e.keys() + assert 'CN' in e + assert 'CN' not in e.keys() + assert 'commonName' in e + assert 'commonName' not in e.keys() + assert e['CN'] is self.cn1 + assert e['CN'] is e[u'cn'] + + e.dn = self.dn2 + assert e.dn is self.dn2 + + def test_set_attr(self): + e = self.entry + e['commonName'] = self.cn2 + assert u'cn' in e + assert u'cn' not in e.keys() + assert 'CN' in e + assert 'CN' not in e.keys() + assert 'commonName' in e + assert 'commonName' in e.keys() + assert e['CN'] is self.cn2 + assert e['CN'] is e[u'cn'] + + def test_del_attr(self): + e = self.entry + del e['CN'] + assert 'CN' not in e + assert 'CN' not in e.keys() + assert u'cn' not in e + assert u'cn' not in e.keys() + assert 'commonName' not in e + assert 'commonName' not in e.keys() + + def test_popitem(self): + e = self.entry + assert e.popitem() == ('cn', self.cn1) + e.keys() == [] + + def test_setdefault(self): + e = self.entry + assert e.setdefault('cn', self.cn2) == self.cn1 + assert e['cn'] == self.cn1 + assert e.setdefault('xyz', self.cn2) == self.cn2 + assert e['xyz'] == self.cn2 + + def test_update(self): + e = self.entry + e.update({'cn': self.cn2}, xyz=self.cn2) + assert e['cn'] == self.cn2 + assert e['xyz'] == self.cn2 + + def test_pop(self): + e = self.entry + assert e.pop('cn') == self.cn1 + assert 'cn' not in e + assert e.pop('cn', 'default') is 'default' + with assert_raises(KeyError): + e.pop('cn') + + def test_clear(self): + e = self.entry + e.clear() + assert not e + assert 'cn' not in e + + def test_has_key(self): + e = self.entry + assert not e.has_key('xyz') + assert e.has_key('cn') + assert e.has_key('COMMONNAME') + + def test_get(self): + e = self.entry + assert e.get('cn') == self.cn1 + assert e.get('commonname') == self.cn1 + assert e.get('COMMONNAME', 'default') == self.cn1 + assert e.get('bad key', 'default') == 'default' + + def test_single_value(self): + e = self.entry + assert e.single_value('cn') == self.cn1[0] + assert e.single_value('commonname') == self.cn1[0] + assert e.single_value('COMMONNAME', 'default') == self.cn1[0] + assert e.single_value('bad key', 'default') == 'default' diff --git a/ipatests/test_ipaserver/test_rpcserver.py b/ipatests/test_ipaserver/test_rpcserver.py new file mode 100644 index 000000000..bd5673844 --- /dev/null +++ b/ipatests/test_ipaserver/test_rpcserver.py @@ -0,0 +1,247 @@ +# Authors: +# Jason Gerard DeRose <jderose@redhat.com> +# +# Copyright (C) 2008 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +""" +Test the `ipaserver.rpc` module. +""" + +import json + +from ipatests.util import create_test_api, assert_equal, raises, PluginTester +from ipatests.data import unicode_str +from ipalib import errors, Command +from ipaserver import rpcserver + + +class StartResponse(object): + def __init__(self): + self.reset() + + def reset(self): + self.status = None + self.headers = None + + def __call__(self, status, headers): + assert self.status is None + assert self.headers is None + assert isinstance(status, str) + assert isinstance(headers, list) + self.status = status + self.headers = headers + + +def test_not_found(): + f = rpcserver.HTTP_Status() + t = rpcserver._not_found_template + s = StartResponse() + + # Test with an innocent URL: + url = '/ipa/foo/stuff' + assert_equal( + f.not_found(None, s, url, None), + [t % dict(url='/ipa/foo/stuff')] + ) + assert s.status == '404 Not Found' + assert s.headers == [('Content-Type', 'text/html; charset=utf-8')] + + # Test when URL contains any of '<>&' + s.reset() + url =' ' + '<script>do_bad_stuff();</script>' + assert_equal( + f.not_found(None, s, url, None), + [t % dict(url='&nbsp;<script>do_bad_stuff();</script>')] + ) + assert s.status == '404 Not Found' + assert s.headers == [('Content-Type', 'text/html; charset=utf-8')] + + +def test_bad_request(): + f = rpcserver.HTTP_Status() + t = rpcserver._bad_request_template + s = StartResponse() + + assert_equal( + f.bad_request(None, s, 'illegal request'), + [t % dict(message='illegal request')] + ) + assert s.status == '400 Bad Request' + assert s.headers == [('Content-Type', 'text/html; charset=utf-8')] + + +def test_internal_error(): + f = rpcserver.HTTP_Status() + t = rpcserver._internal_error_template + s = StartResponse() + + assert_equal( + f.internal_error(None, s, 'request failed'), + [t % dict(message='request failed')] + ) + assert s.status == '500 Internal Server Error' + assert s.headers == [('Content-Type', 'text/html; charset=utf-8')] + + +def test_unauthorized_error(): + f = rpcserver.HTTP_Status() + t = rpcserver._unauthorized_template + s = StartResponse() + + assert_equal( + f.unauthorized(None, s, 'unauthorized', 'password-expired'), + [t % dict(message='unauthorized')] + ) + assert s.status == '401 Unauthorized' + assert s.headers == [('Content-Type', 'text/html; charset=utf-8'), + ('X-IPA-Rejection-Reason', 'password-expired')] + + +def test_params_2_args_options(): + """ + Test the `ipaserver.rpcserver.params_2_args_options` function. + """ + f = rpcserver.params_2_args_options + args = ('Hello', u'world!') + options = dict(one=1, two=u'Two', three='Three') + assert f(tuple()) == (tuple(), dict()) + assert f([args]) == (args, dict()) + assert f([args, options]) == (args, options) + + +class test_session(object): + klass = rpcserver.wsgi_dispatch + + def test_route(self): + def app1(environ, start_response): + return ( + 'from 1', + [environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')] + ) + + def app2(environ, start_response): + return ( + 'from 2', + [environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')] + ) + + inst = self.klass() + inst.mount(app1, '/foo/stuff') + inst.mount(app2, '/bar') + + d = dict(SCRIPT_NAME='/ipa', PATH_INFO='/foo/stuff') + assert inst.route(d, None) == ('from 1', ['/ipa', '/foo/stuff']) + + d = dict(SCRIPT_NAME='/ipa', PATH_INFO='/bar') + assert inst.route(d, None) == ('from 2', ['/ipa', '/bar']) + + def test_mount(self): + def app1(environ, start_response): + pass + + def app2(environ, start_response): + pass + + # Test that mount works: + inst = self.klass() + inst.mount(app1, 'foo') + assert inst['foo'] is app1 + assert list(inst) == ['foo'] + + # Test that StandardError is raise if trying override a mount: + e = raises(StandardError, inst.mount, app2, 'foo') + assert str(e) == '%s.mount(): cannot replace %r with %r at %r' % ( + 'wsgi_dispatch', app1, app2, 'foo' + ) + + # Test mounting a second app: + inst.mount(app2, 'bar') + assert inst['bar'] is app2 + assert list(inst) == ['bar', 'foo'] + + +class test_xmlserver(PluginTester): + """ + Test the `ipaserver.rpcserver.xmlserver` plugin. + """ + + _plugin = rpcserver.xmlserver + + def test_marshaled_dispatch(self): # FIXME + (o, api, home) = self.instance('Backend', in_server=True) + + +class test_jsonserver(PluginTester): + """ + Test the `ipaserver.rpcserver.jsonserver` plugin. + """ + + _plugin = rpcserver.jsonserver + + def test_unmarshal(self): + """ + Test the `ipaserver.rpcserver.jsonserver.unmarshal` method. + """ + (o, api, home) = self.instance('Backend', in_server=True) + + # Test with invalid JSON-data: + e = raises(errors.JSONError, o.unmarshal, 'this wont work') + assert isinstance(e.error, ValueError) + assert unicode(e.error) == 'No JSON object could be decoded' + + # Test with non-dict type: + e = raises(errors.JSONError, o.unmarshal, json.dumps([1, 2, 3])) + assert unicode(e.error) == 'Request must be a dict' + + params = [[1, 2], dict(three=3, four=4)] + # Test with missing method: + d = dict(params=params, id=18) + e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) + assert unicode(e.error) == 'Request is missing "method"' + + # Test with missing params: + d = dict(method='echo', id=18) + e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) + assert unicode(e.error) == 'Request is missing "params"' + + # Test with non-list params: + for p in ('hello', dict(args=tuple(), options=dict())): + d = dict(method='echo', id=18, params=p) + e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) + assert unicode(e.error) == 'params must be a list' + + # Test with other than 2 params: + for p in ([], [tuple()], [None, dict(), tuple()]): + d = dict(method='echo', id=18, params=p) + e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) + assert unicode(e.error) == 'params must contain [args, options]' + + # Test when args is not a list: + d = dict(method='echo', id=18, params=['args', dict()]) + e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) + assert unicode(e.error) == 'params[0] (aka args) must be a list' + + # Test when options is not a dict: + d = dict(method='echo', id=18, params=[('hello', 'world'), 'options']) + e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) + assert unicode(e.error) == 'params[1] (aka options) must be a dict' + + # Test with valid values: + args = [u'jdoe'] + options = dict(givenname=u'John', sn='Doe') + d = dict(method=u'user_add', params=[args, options], id=18) + assert o.unmarshal(json.dumps(d)) == (u'user_add', args, options, 18) |