diff options
Diffstat (limited to 'tests/test_ipaserver')
-rw-r--r-- | tests/test_ipaserver/__init__.py | 22 | ||||
-rw-r--r-- | tests/test_ipaserver/httptest.py | 52 | ||||
-rwxr-xr-x | tests/test_ipaserver/install/test_adtrustinstance.py | 59 | ||||
-rw-r--r-- | tests/test_ipaserver/test_changepw.py | 107 | ||||
-rw-r--r-- | tests/test_ipaserver/test_ldap.py | 259 | ||||
-rw-r--r-- | tests/test_ipaserver/test_rpcserver.py | 247 |
6 files changed, 0 insertions, 746 deletions
diff --git a/tests/test_ipaserver/__init__.py b/tests/test_ipaserver/__init__.py deleted file mode 100644 index 2192cc29..00000000 --- a/tests/test_ipaserver/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# Authors: -# Jason Gerard DeRose <jderose@redhat.com> -# -# Copyright (C) 2008 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -""" -Sub-package containing unit tests for `ipaserver` package. -""" diff --git a/tests/test_ipaserver/httptest.py b/tests/test_ipaserver/httptest.py deleted file mode 100644 index 7f1b5b13..00000000 --- a/tests/test_ipaserver/httptest.py +++ /dev/null @@ -1,52 +0,0 @@ -# Authors: -# Martin Kosek <mkosek@redhat.com> -# -# Copyright (C) 2012 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Base class for HTTP request tests -""" - -import urllib -import httplib - -from ipalib import api - -class Unauthorized_HTTP_test(object): - """ - Base class for simple HTTP request tests executed against URI - with no required authorization - """ - app_uri = '' - host = api.env.host - content_type = 'application/x-www-form-urlencoded' - - def send_request(self, method='POST', params=None): - """ - Send a request to HTTP server - - :param key When not None, overrides default app_uri - """ - if params is not None: - params = urllib.urlencode(params, True) - url = 'https://' + self.host + self.app_uri - - headers = {'Content-Type' : self.content_type, - 'Referer' : url} - - conn = httplib.HTTPSConnection(self.host) - conn.request(method, self.app_uri, params, headers) - return conn.getresponse() diff --git a/tests/test_ipaserver/install/test_adtrustinstance.py b/tests/test_ipaserver/install/test_adtrustinstance.py deleted file mode 100755 index 9a62f87c..00000000 --- a/tests/test_ipaserver/install/test_adtrustinstance.py +++ /dev/null @@ -1,59 +0,0 @@ -# Authors: -# Sumit Bose <sbose@redhat.com> -# -# Copyright (C) 2011 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -""" -Test `adtrustinstance` -""" - -import os -import nose - -from ipaserver.install import adtrustinstance - -class test_adtrustinstance: - """ - Test `adtrustinstance`. - """ - - def test_make_netbios_name(self): - s = adtrustinstance.make_netbios_name("ABCDEF") - assert s == 'ABCDEF' and isinstance(s, str) - s = adtrustinstance.make_netbios_name(U"ABCDEF") - assert s == 'ABCDEF' and isinstance(s, unicode) - s = adtrustinstance.make_netbios_name("abcdef") - assert s == 'ABCDEF' - s = adtrustinstance.make_netbios_name("abc.def") - assert s == 'ABC' - s = adtrustinstance.make_netbios_name("abcdefghijklmnopqr.def") - assert s == 'ABCDEFGHIJKLMNO' - s = adtrustinstance.make_netbios_name("A!$%B&/()C=?+*D") - assert s == 'ABCD' - s = adtrustinstance.make_netbios_name("!$%&/()=?+*") - assert not s - - def test_check_netbios_name(self): - assert adtrustinstance.check_netbios_name("ABCDEF") - assert not adtrustinstance.check_netbios_name("abcdef") - assert adtrustinstance.check_netbios_name("ABCDE12345ABCDE") - assert not adtrustinstance.check_netbios_name("ABCDE12345ABCDE1") - assert not adtrustinstance.check_netbios_name("") - - assert adtrustinstance.check_netbios_name(U"ABCDEF") - assert not adtrustinstance.check_netbios_name(U"abcdef") - assert adtrustinstance.check_netbios_name(U"ABCDE12345ABCDE") - assert not adtrustinstance.check_netbios_name(U"ABCDE12345ABCDE1") diff --git a/tests/test_ipaserver/test_changepw.py b/tests/test_ipaserver/test_changepw.py deleted file mode 100644 index 4491e0b6..00000000 --- a/tests/test_ipaserver/test_changepw.py +++ /dev/null @@ -1,107 +0,0 @@ -# Authors: -# Martin Kosek <mkosek@redhat.com> -# -# Copyright (C) 2012 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import nose - -from httptest import Unauthorized_HTTP_test -from tests.test_xmlrpc.xmlrpc_test import XMLRPC_test -from tests.util import assert_equal, assert_not_equal -from ipalib import api, errors -from ipapython.dn import DN -import ldap - -testuser = u'tuser' -old_password = u'old_password' -new_password = u'new_password' - -class test_changepw(XMLRPC_test, Unauthorized_HTTP_test): - app_uri = '/ipa/session/change_password' - - def setUp(self): - super(test_changepw, self).setUp() - try: - api.Command['user_add'](uid=testuser, givenname=u'Test', sn=u'User') - api.Command['passwd'](testuser, password=u'old_password') - except errors.ExecutionError, e: - raise nose.SkipTest( - 'Cannot set up test user: %s' % e - ) - - def tearDown(self): - try: - api.Command['user_del']([testuser]) - except errors.NotFound: - pass - super(test_changepw, self).tearDown() - - def _changepw(self, user, old_password, new_password): - return self.send_request(params={'user': str(user), - 'old_password' : str(old_password), - 'new_password' : str(new_password)}, - ) - - def _checkpw(self, user, password): - dn = str(DN(('uid', user), api.env.container_user, api.env.basedn)) - conn = ldap.initialize(api.env.ldap_uri) - try: - conn.simple_bind_s(dn, password) - finally: - conn.unbind_s() - - def test_bad_options(self): - for params in (None, # no params - {'user': 'foo'}, # missing options - {'user': 'foo', - 'old_password' : 'old'}, # missing option - {'user': 'foo', - 'old_password' : 'old', - 'new_password' : ''}, # empty option - ): - response = self.send_request(params=params) - assert_equal(response.status, 400) - assert_equal(response.reason, 'Bad Request') - - def test_invalid_auth(self): - response = self._changepw(testuser, 'wrongpassword', 'new_password') - - assert_equal(response.status, 200) - assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'invalid-password') - - # make sure that password is NOT changed - self._checkpw(testuser, old_password) - - def test_pwpolicy_error(self): - response = self._changepw(testuser, old_password, '1') - - assert_equal(response.status, 200) - assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'policy-error') - assert_equal(response.getheader('X-IPA-Pwchange-Policy-Error'), - 'Constraint violation: Password is too short') - - # make sure that password is NOT changed - self._checkpw(testuser, old_password) - - def test_pwpolicy_success(self): - response = self._changepw(testuser, old_password, new_password) - - assert_equal(response.status, 200) - assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'ok') - - # make sure that password IS changed - self._checkpw(testuser, new_password) diff --git a/tests/test_ipaserver/test_ldap.py b/tests/test_ipaserver/test_ldap.py deleted file mode 100644 index 21363f2e..00000000 --- a/tests/test_ipaserver/test_ldap.py +++ /dev/null @@ -1,259 +0,0 @@ -# Authors: -# Rob Crittenden <rcritten@redhat.com> -# -# Copyright (C) 2010 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -# Test some simple LDAP requests using the ldap2 backend - -# This fetches a certificate from a host principal so we can ensure that the -# schema is working properly. We know this because the schema will tell the -# encoder not to utf-8 encode binary attributes. - -# The DM password needs to be set in ~/.ipa/.dmpw - -import os - -import nose -from nose.tools import assert_raises # pylint: disable=E0611 -import nss.nss as nss - -from ipaserver.plugins.ldap2 import ldap2 -from ipalib.plugins.service import service, service_show -from ipalib.plugins.host import host -from ipalib import api, x509, create_api, errors -from ipapython import ipautil -from ipapython.dn import DN - -class test_ldap(object): - """ - Test various LDAP client bind methods. - """ - - def setUp(self): - self.conn = None - self.ldapuri = 'ldap://%s' % ipautil.format_netloc(api.env.host) - self.ccache = '/tmp/krb5cc_%d' % os.getuid() - nss.nss_init_nodb() - self.dn = DN(('krbprincipalname','ldap/%s@%s' % (api.env.host, api.env.realm)), - ('cn','services'),('cn','accounts'),api.env.basedn) - - def tearDown(self): - if self.conn and self.conn.isconnected(): - self.conn.disconnect() - - def test_anonymous(self): - """ - Test an anonymous LDAP bind using ldap2 - """ - self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri) - self.conn.connect() - (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate']) - cert = entry_attrs.get('usercertificate') - cert = cert[0] - serial = unicode(x509.get_serial_number(cert, x509.DER)) - assert serial is not None - - def test_GSSAPI(self): - """ - Test a GSSAPI LDAP bind using ldap2 - """ - if not ipautil.file_exists(self.ccache): - raise nose.SkipTest('Missing ccache %s' % self.ccache) - self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri) - self.conn.connect(ccache='FILE:%s' % self.ccache) - (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate']) - cert = entry_attrs.get('usercertificate') - cert = cert[0] - serial = unicode(x509.get_serial_number(cert, x509.DER)) - assert serial is not None - - def test_simple(self): - """ - Test a simple LDAP bind using ldap2 - """ - pwfile = api.env.dot_ipa + os.sep + ".dmpw" - if ipautil.file_exists(pwfile): - fp = open(pwfile, "r") - dm_password = fp.read().rstrip() - fp.close() - else: - raise nose.SkipTest("No directory manager password in %s" % pwfile) - self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri) - self.conn.connect(bind_dn=DN(('cn', 'directory manager')), bind_pw=dm_password) - (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate']) - cert = entry_attrs.get('usercertificate') - cert = cert[0] - serial = unicode(x509.get_serial_number(cert, x509.DER)) - assert serial is not None - - def test_Backend(self): - """ - Test using the ldap2 Backend directly (ala ipa-server-install) - """ - - # Create our own api because the one generated for the tests is - # a client-only api. Then we register in the commands and objects - # we need for the test. - myapi = create_api(mode=None) - myapi.bootstrap(context='cli', in_server=True, in_tree=True) - myapi.register(ldap2) - myapi.register(host) - myapi.register(service) - myapi.register(service_show) - myapi.finalize() - - pwfile = api.env.dot_ipa + os.sep + ".dmpw" - if ipautil.file_exists(pwfile): - fp = open(pwfile, "r") - dm_password = fp.read().rstrip() - fp.close() - else: - raise nose.SkipTest("No directory manager password in %s" % pwfile) - myapi.Backend.ldap2.connect(bind_dn=DN(('cn', 'Directory Manager')), bind_pw=dm_password) - - result = myapi.Command['service_show']('ldap/%s@%s' % (api.env.host, api.env.realm,)) - entry_attrs = result['result'] - cert = entry_attrs.get('usercertificate') - cert = cert[0] - serial = unicode(x509.get_serial_number(cert, x509.DER)) - assert serial is not None - - def test_autobind(self): - """ - Test an autobind LDAP bind using ldap2 - """ - ldapuri = 'ldapi://%%2fvar%%2frun%%2fslapd-%s.socket' % api.env.realm.replace('.','-') - self.conn = ldap2(shared_instance=False, ldap_uri=ldapuri) - try: - self.conn.connect(autobind=True) - except errors.ACIError: - raise nose.SkipTest("Only executed as root") - (dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate']) - cert = entry_attrs.get('usercertificate') - cert = cert[0] - serial = unicode(x509.get_serial_number(cert, x509.DER)) - assert serial is not None - - -class test_LDAPEntry(object): - """ - Test the LDAPEntry class - """ - cn1 = [u'test1'] - cn2 = [u'test2'] - dn1 = DN(('cn', cn1[0])) - dn2 = DN(('cn', cn2[0])) - - def setUp(self): - self.ldapuri = 'ldap://%s' % ipautil.format_netloc(api.env.host) - self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri) - self.conn.connect() - - self.entry = self.conn.make_entry(self.dn1, cn=self.cn1) - - def tearDown(self): - if self.conn and self.conn.isconnected(): - self.conn.disconnect() - - def test_entry(self): - e = self.entry - assert e.dn is self.dn1 - assert u'cn' in e - assert u'cn' in e.keys() - assert 'CN' in e - assert 'CN' not in e.keys() - assert 'commonName' in e - assert 'commonName' not in e.keys() - assert e['CN'] is self.cn1 - assert e['CN'] is e[u'cn'] - - e.dn = self.dn2 - assert e.dn is self.dn2 - - def test_set_attr(self): - e = self.entry - e['commonName'] = self.cn2 - assert u'cn' in e - assert u'cn' not in e.keys() - assert 'CN' in e - assert 'CN' not in e.keys() - assert 'commonName' in e - assert 'commonName' in e.keys() - assert e['CN'] is self.cn2 - assert e['CN'] is e[u'cn'] - - def test_del_attr(self): - e = self.entry - del e['CN'] - assert 'CN' not in e - assert 'CN' not in e.keys() - assert u'cn' not in e - assert u'cn' not in e.keys() - assert 'commonName' not in e - assert 'commonName' not in e.keys() - - def test_popitem(self): - e = self.entry - assert e.popitem() == ('cn', self.cn1) - e.keys() == [] - - def test_setdefault(self): - e = self.entry - assert e.setdefault('cn', self.cn2) == self.cn1 - assert e['cn'] == self.cn1 - assert e.setdefault('xyz', self.cn2) == self.cn2 - assert e['xyz'] == self.cn2 - - def test_update(self): - e = self.entry - e.update({'cn': self.cn2}, xyz=self.cn2) - assert e['cn'] == self.cn2 - assert e['xyz'] == self.cn2 - - def test_pop(self): - e = self.entry - assert e.pop('cn') == self.cn1 - assert 'cn' not in e - assert e.pop('cn', 'default') is 'default' - with assert_raises(KeyError): - e.pop('cn') - - def test_clear(self): - e = self.entry - e.clear() - assert not e - assert 'cn' not in e - - def test_has_key(self): - e = self.entry - assert not e.has_key('xyz') - assert e.has_key('cn') - assert e.has_key('COMMONNAME') - - def test_get(self): - e = self.entry - assert e.get('cn') == self.cn1 - assert e.get('commonname') == self.cn1 - assert e.get('COMMONNAME', 'default') == self.cn1 - assert e.get('bad key', 'default') == 'default' - - def test_single_value(self): - e = self.entry - assert e.single_value('cn') == self.cn1[0] - assert e.single_value('commonname') == self.cn1[0] - assert e.single_value('COMMONNAME', 'default') == self.cn1[0] - assert e.single_value('bad key', 'default') == 'default' diff --git a/tests/test_ipaserver/test_rpcserver.py b/tests/test_ipaserver/test_rpcserver.py deleted file mode 100644 index a75a85e0..00000000 --- a/tests/test_ipaserver/test_rpcserver.py +++ /dev/null @@ -1,247 +0,0 @@ -# Authors: -# Jason Gerard DeRose <jderose@redhat.com> -# -# Copyright (C) 2008 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -""" -Test the `ipaserver.rpc` module. -""" - -import json - -from tests.util import create_test_api, assert_equal, raises, PluginTester -from tests.data import unicode_str -from ipalib import errors, Command -from ipaserver import rpcserver - - -class StartResponse(object): - def __init__(self): - self.reset() - - def reset(self): - self.status = None - self.headers = None - - def __call__(self, status, headers): - assert self.status is None - assert self.headers is None - assert isinstance(status, str) - assert isinstance(headers, list) - self.status = status - self.headers = headers - - -def test_not_found(): - f = rpcserver.HTTP_Status() - t = rpcserver._not_found_template - s = StartResponse() - - # Test with an innocent URL: - url = '/ipa/foo/stuff' - assert_equal( - f.not_found(None, s, url, None), - [t % dict(url='/ipa/foo/stuff')] - ) - assert s.status == '404 Not Found' - assert s.headers == [('Content-Type', 'text/html; charset=utf-8')] - - # Test when URL contains any of '<>&' - s.reset() - url =' ' + '<script>do_bad_stuff();</script>' - assert_equal( - f.not_found(None, s, url, None), - [t % dict(url='&nbsp;<script>do_bad_stuff();</script>')] - ) - assert s.status == '404 Not Found' - assert s.headers == [('Content-Type', 'text/html; charset=utf-8')] - - -def test_bad_request(): - f = rpcserver.HTTP_Status() - t = rpcserver._bad_request_template - s = StartResponse() - - assert_equal( - f.bad_request(None, s, 'illegal request'), - [t % dict(message='illegal request')] - ) - assert s.status == '400 Bad Request' - assert s.headers == [('Content-Type', 'text/html; charset=utf-8')] - - -def test_internal_error(): - f = rpcserver.HTTP_Status() - t = rpcserver._internal_error_template - s = StartResponse() - - assert_equal( - f.internal_error(None, s, 'request failed'), - [t % dict(message='request failed')] - ) - assert s.status == '500 Internal Server Error' - assert s.headers == [('Content-Type', 'text/html; charset=utf-8')] - - -def test_unauthorized_error(): - f = rpcserver.HTTP_Status() - t = rpcserver._unauthorized_template - s = StartResponse() - - assert_equal( - f.unauthorized(None, s, 'unauthorized', 'password-expired'), - [t % dict(message='unauthorized')] - ) - assert s.status == '401 Unauthorized' - assert s.headers == [('Content-Type', 'text/html; charset=utf-8'), - ('X-IPA-Rejection-Reason', 'password-expired')] - - -def test_params_2_args_options(): - """ - Test the `ipaserver.rpcserver.params_2_args_options` function. - """ - f = rpcserver.params_2_args_options - args = ('Hello', u'world!') - options = dict(one=1, two=u'Two', three='Three') - assert f(tuple()) == (tuple(), dict()) - assert f([args]) == (args, dict()) - assert f([args, options]) == (args, options) - - -class test_session(object): - klass = rpcserver.wsgi_dispatch - - def test_route(self): - def app1(environ, start_response): - return ( - 'from 1', - [environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')] - ) - - def app2(environ, start_response): - return ( - 'from 2', - [environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')] - ) - - inst = self.klass() - inst.mount(app1, '/foo/stuff') - inst.mount(app2, '/bar') - - d = dict(SCRIPT_NAME='/ipa', PATH_INFO='/foo/stuff') - assert inst.route(d, None) == ('from 1', ['/ipa', '/foo/stuff']) - - d = dict(SCRIPT_NAME='/ipa', PATH_INFO='/bar') - assert inst.route(d, None) == ('from 2', ['/ipa', '/bar']) - - def test_mount(self): - def app1(environ, start_response): - pass - - def app2(environ, start_response): - pass - - # Test that mount works: - inst = self.klass() - inst.mount(app1, 'foo') - assert inst['foo'] is app1 - assert list(inst) == ['foo'] - - # Test that StandardError is raise if trying override a mount: - e = raises(StandardError, inst.mount, app2, 'foo') - assert str(e) == '%s.mount(): cannot replace %r with %r at %r' % ( - 'wsgi_dispatch', app1, app2, 'foo' - ) - - # Test mounting a second app: - inst.mount(app2, 'bar') - assert inst['bar'] is app2 - assert list(inst) == ['bar', 'foo'] - - -class test_xmlserver(PluginTester): - """ - Test the `ipaserver.rpcserver.xmlserver` plugin. - """ - - _plugin = rpcserver.xmlserver - - def test_marshaled_dispatch(self): # FIXME - (o, api, home) = self.instance('Backend', in_server=True) - - -class test_jsonserver(PluginTester): - """ - Test the `ipaserver.rpcserver.jsonserver` plugin. - """ - - _plugin = rpcserver.jsonserver - - def test_unmarshal(self): - """ - Test the `ipaserver.rpcserver.jsonserver.unmarshal` method. - """ - (o, api, home) = self.instance('Backend', in_server=True) - - # Test with invalid JSON-data: - e = raises(errors.JSONError, o.unmarshal, 'this wont work') - assert isinstance(e.error, ValueError) - assert unicode(e.error) == 'No JSON object could be decoded' - - # Test with non-dict type: - e = raises(errors.JSONError, o.unmarshal, json.dumps([1, 2, 3])) - assert unicode(e.error) == 'Request must be a dict' - - params = [[1, 2], dict(three=3, four=4)] - # Test with missing method: - d = dict(params=params, id=18) - e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) - assert unicode(e.error) == 'Request is missing "method"' - - # Test with missing params: - d = dict(method='echo', id=18) - e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) - assert unicode(e.error) == 'Request is missing "params"' - - # Test with non-list params: - for p in ('hello', dict(args=tuple(), options=dict())): - d = dict(method='echo', id=18, params=p) - e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) - assert unicode(e.error) == 'params must be a list' - - # Test with other than 2 params: - for p in ([], [tuple()], [None, dict(), tuple()]): - d = dict(method='echo', id=18, params=p) - e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) - assert unicode(e.error) == 'params must contain [args, options]' - - # Test when args is not a list: - d = dict(method='echo', id=18, params=['args', dict()]) - e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) - assert unicode(e.error) == 'params[0] (aka args) must be a list' - - # Test when options is not a dict: - d = dict(method='echo', id=18, params=[('hello', 'world'), 'options']) - e = raises(errors.JSONError, o.unmarshal, json.dumps(d)) - assert unicode(e.error) == 'params[1] (aka options) must be a dict' - - # Test with valid values: - args = [u'jdoe'] - options = dict(givenname=u'John', sn='Doe') - d = dict(method=u'user_add', params=[args, options], id=18) - assert o.unmarshal(json.dumps(d)) == (u'user_add', args, options, 18) |