diff options
-rw-r--r-- | install/conf/ipa.conf | 8 | ||||
-rw-r--r-- | ipaserver/plugins/xmlserver.py | 3 | ||||
-rw-r--r-- | ipaserver/rpcserver.py | 108 | ||||
-rw-r--r-- | tests/test_ipaserver/httptest.py | 52 | ||||
-rw-r--r-- | tests/test_ipaserver/test_changepw.py | 109 |
5 files changed, 277 insertions, 3 deletions
diff --git a/install/conf/ipa.conf b/install/conf/ipa.conf index 89c9849ca..b52d9d2ff 100644 --- a/install/conf/ipa.conf +++ b/install/conf/ipa.conf @@ -1,5 +1,5 @@ # -# VERSION 4 - DO NOT REMOVE THIS LINE +# VERSION 5 - DO NOT REMOVE THIS LINE # # LoadModule auth_kerb_module modules/mod_auth_kerb.so @@ -72,6 +72,12 @@ KrbConstrainedDelegationLock ipa Allow from all </Location> +<Location "/ipa/session/change_password"> + Satisfy Any + Order Deny,Allow + Allow from all +</Location> + # This is where we redirect on failed auth Alias /ipa/errors "/usr/share/ipa/html" diff --git a/ipaserver/plugins/xmlserver.py b/ipaserver/plugins/xmlserver.py index 4ae914950..bd9eb1fdf 100644 --- a/ipaserver/plugins/xmlserver.py +++ b/ipaserver/plugins/xmlserver.py @@ -25,10 +25,11 @@ Loads WSGI server plugins. from ipalib import api if 'in_server' in api.env and api.env.in_server is True: - from ipaserver.rpcserver import wsgi_dispatch, xmlserver, jsonserver_kerb, jsonserver_session, login_kerberos, login_password + from ipaserver.rpcserver import wsgi_dispatch, xmlserver, jsonserver_kerb, jsonserver_session, login_kerberos, login_password, change_password api.register(wsgi_dispatch) api.register(xmlserver) api.register(jsonserver_kerb) api.register(jsonserver_session) api.register(login_kerberos) api.register(login_password) + api.register(change_password) diff --git a/ipaserver/rpcserver.py b/ipaserver/rpcserver.py index f9a549f4e..5abbaf1a6 100644 --- a/ipaserver/rpcserver.py +++ b/ipaserver/rpcserver.py @@ -28,7 +28,7 @@ from xml.sax.saxutils import escape from xmlrpclib import Fault from ipalib import plugable from ipalib.backend import Executioner -from ipalib.errors import PublicError, InternalError, CommandError, JSONError, ConversionError, CCacheError, RefererError, InvalidSessionPassword +from ipalib.errors import PublicError, InternalError, CommandError, JSONError, ConversionError, CCacheError, RefererError, InvalidSessionPassword, NotFound, ACIError, ExecutionError from ipalib.request import context, Connection, destroy_context from ipalib.rpc import xml_dumps, xml_loads from ipalib.util import parse_time_duration @@ -100,6 +100,18 @@ _unauthorized_template = """<html> </body> </html>""" +_pwchange_template = """<html> +<head> +<title>200 Success</title> +</head> +<body> +<h1>%(title)s</h1> +<p> +<strong>%(message)s</strong> +</p> +</body> +</html>""" + class HTTP_Status(plugable.Plugin): def not_found(self, environ, start_response, url, message): """ @@ -992,3 +1004,97 @@ class login_password(Backend, KerberosSession, HTTP_Status): if returncode != 0: raise InvalidSessionPassword(principal=principal, message=unicode(stderr)) +class change_password(Backend, HTTP_Status): + + content_type = 'text/plain' + key = '/session/change_password' + + def __init__(self): + super(change_password, self).__init__() + + def _on_finalize(self): + super(change_password, self)._on_finalize() + self.api.Backend.wsgi_dispatch.mount(self, self.key) + + def __call__(self, environ, start_response): + self.info('WSGI change_password.__call__:') + + # Get the user and password parameters from the request + content_type = environ.get('CONTENT_TYPE', '').lower() + if not content_type.startswith('application/x-www-form-urlencoded'): + return self.bad_request(environ, start_response, "Content-Type must be application/x-www-form-urlencoded") + + method = environ.get('REQUEST_METHOD', '').upper() + if method == 'POST': + query_string = read_input(environ) + else: + return self.bad_request(environ, start_response, "HTTP request method must be POST") + + try: + query_dict = urlparse.parse_qs(query_string) + except Exception, e: + return self.bad_request(environ, start_response, "cannot parse query data") + + data = {} + for field in ('user', 'old_password', 'new_password'): + value = query_dict.get(field, None) + if value is not None: + if len(value) == 1: + data[field] = value[0] + else: + return self.bad_request(environ, start_response, "more than one %s parameter" + % field) + else: + return self.bad_request(environ, start_response, "no %s specified" % field) + + # start building the response + self.info("WSGI change_password: start password change of user '%s'", data['user']) + status = HTTP_STATUS_SUCCESS + response_headers = [('Content-Type', 'text/html; charset=utf-8')] + title = 'Password change rejected' + result = 'error' + policy_error = None + + bind_dn = str(DN((self.api.Object.user.primary_key.name, data['user']), + self.api.env.container_user, self.api.env.basedn)) + + try: + conn = ldap2(shared_instance=False, + ldap_uri=self.api.env.ldap_uri) + conn.connect(bind_dn=bind_dn, bind_pw=data['old_password']) + except (NotFound, ACIError): + result = 'invalid-password' + message = 'The old password or username is not correct.' + except Exception, e: + message = "Could not connect to LDAP server." + self.error("change_password: cannot authenticate '%s' to LDAP server: %s", + data['user'], str(e)) + else: + try: + conn.modify_password(bind_dn, data['new_password'], data['old_password']) + except ExecutionError, e: + result = 'policy-error' + policy_error = escape(str(e)) + message = "Password change was rejected: %s" % escape(str(e)) + except Exception, e: + message = "Could not change the password" + self.error("change_password: cannot change password of '%s': %s", + data['user'], str(e)) + else: + result = 'ok' + title = "Password change successful" + message = "Password was changed." + finally: + if conn.isconnected(): + conn.destroy_connection() + + self.info('%s: %s', status, message) + + response_headers.append(('X-IPA-Pwchange-Result', result)) + if policy_error: + response_headers.append(('X-IPA-Pwchange-Policy-Error', policy_error)) + + start_response(status, response_headers) + output = _pwchange_template % dict(title=str(title), + message=str(message)) + return [output] diff --git a/tests/test_ipaserver/httptest.py b/tests/test_ipaserver/httptest.py new file mode 100644 index 000000000..7f1b5b136 --- /dev/null +++ b/tests/test_ipaserver/httptest.py @@ -0,0 +1,52 @@ +# Authors: +# Martin Kosek <mkosek@redhat.com> +# +# Copyright (C) 2012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Base class for HTTP request tests +""" + +import urllib +import httplib + +from ipalib import api + +class Unauthorized_HTTP_test(object): + """ + Base class for simple HTTP request tests executed against URI + with no required authorization + """ + app_uri = '' + host = api.env.host + content_type = 'application/x-www-form-urlencoded' + + def send_request(self, method='POST', params=None): + """ + Send a request to HTTP server + + :param key When not None, overrides default app_uri + """ + if params is not None: + params = urllib.urlencode(params, True) + url = 'https://' + self.host + self.app_uri + + headers = {'Content-Type' : self.content_type, + 'Referer' : url} + + conn = httplib.HTTPSConnection(self.host) + conn.request(method, self.app_uri, params, headers) + return conn.getresponse() diff --git a/tests/test_ipaserver/test_changepw.py b/tests/test_ipaserver/test_changepw.py new file mode 100644 index 000000000..035766855 --- /dev/null +++ b/tests/test_ipaserver/test_changepw.py @@ -0,0 +1,109 @@ +# Authors: +# Martin Kosek <mkosek@redhat.com> +# +# Copyright (C) 2012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import nose + +from httptest import Unauthorized_HTTP_test +from tests.test_xmlrpc.xmlrpc_test import XMLRPC_test +from tests.util import assert_equal, assert_not_equal +from ipalib import api, errors +from ipalib.dn import DN +import ldap + +testuser = u'tuser' +old_password = u'old_password' +new_password = u'new_password' + +class test_changepw(XMLRPC_test, Unauthorized_HTTP_test): + app_uri = '/ipa/session/change_password' + + def setUp(self): + super(test_changepw, self).setUp() + try: + api.Command['user_add'](uid=testuser, givenname=u'Test', sn=u'User') + api.Command['passwd'](testuser, password=u'old_password') + except errors.ExecutionError, e: + raise nose.SkipTest( + 'Cannot set up test user: %s' % e + ) + + def tearDown(self): + try: + api.Command['user_del']([testuser]) + except errors.NotFound: + pass + super(test_changepw, self).tearDown() + + def _changepw(self, user, old_password, new_password): + return self.send_request(params={'user': str(user), + 'old_password' : str(old_password), + 'new_password' : str(new_password)}, + ) + + def _checkpw(self, user, password): + dn = str(DN(('uid', user), + api.env.container_user, + api.env.basedn)) + conn = ldap.initialize(api.env.ldap_uri) + try: + conn.simple_bind_s(dn, password) + finally: + conn.unbind_s() + + def test_bad_options(self): + for params in (None, # no params + {'user': 'foo'}, # missing options + {'user': 'foo', + 'old_password' : 'old'}, # missing option + {'user': 'foo', + 'old_password' : 'old', + 'new_password' : ''}, # empty option + ): + response = self.send_request(params=params) + assert_equal(response.status, 400) + assert_equal(response.reason, 'Bad Request') + + def test_invalid_auth(self): + response = self._changepw(testuser, 'wrongpassword', 'new_password') + + assert_equal(response.status, 200) + assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'invalid-password') + + # make sure that password is NOT changed + self._checkpw(testuser, old_password) + + def test_pwpolicy_error(self): + response = self._changepw(testuser, old_password, '1') + + assert_equal(response.status, 200) + assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'policy-error') + assert_equal(response.getheader('X-IPA-Pwchange-Policy-Error'), + 'Constraint violation: Password is too short') + + # make sure that password is NOT changed + self._checkpw(testuser, old_password) + + def test_pwpolicy_success(self): + response = self._changepw(testuser, old_password, new_password) + + assert_equal(response.status, 200) + assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'ok') + + # make sure that password IS changed + self._checkpw(testuser, new_password) |