summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--install/conf/ipa.conf8
-rw-r--r--ipaserver/plugins/xmlserver.py3
-rw-r--r--ipaserver/rpcserver.py108
-rw-r--r--tests/test_ipaserver/httptest.py52
-rw-r--r--tests/test_ipaserver/test_changepw.py109
5 files changed, 277 insertions, 3 deletions
diff --git a/install/conf/ipa.conf b/install/conf/ipa.conf
index 89c9849ca..b52d9d2ff 100644
--- a/install/conf/ipa.conf
+++ b/install/conf/ipa.conf
@@ -1,5 +1,5 @@
#
-# VERSION 4 - DO NOT REMOVE THIS LINE
+# VERSION 5 - DO NOT REMOVE THIS LINE
#
# LoadModule auth_kerb_module modules/mod_auth_kerb.so
@@ -72,6 +72,12 @@ KrbConstrainedDelegationLock ipa
Allow from all
</Location>
+<Location "/ipa/session/change_password">
+ Satisfy Any
+ Order Deny,Allow
+ Allow from all
+</Location>
+
# This is where we redirect on failed auth
Alias /ipa/errors "/usr/share/ipa/html"
diff --git a/ipaserver/plugins/xmlserver.py b/ipaserver/plugins/xmlserver.py
index 4ae914950..bd9eb1fdf 100644
--- a/ipaserver/plugins/xmlserver.py
+++ b/ipaserver/plugins/xmlserver.py
@@ -25,10 +25,11 @@ Loads WSGI server plugins.
from ipalib import api
if 'in_server' in api.env and api.env.in_server is True:
- from ipaserver.rpcserver import wsgi_dispatch, xmlserver, jsonserver_kerb, jsonserver_session, login_kerberos, login_password
+ from ipaserver.rpcserver import wsgi_dispatch, xmlserver, jsonserver_kerb, jsonserver_session, login_kerberos, login_password, change_password
api.register(wsgi_dispatch)
api.register(xmlserver)
api.register(jsonserver_kerb)
api.register(jsonserver_session)
api.register(login_kerberos)
api.register(login_password)
+ api.register(change_password)
diff --git a/ipaserver/rpcserver.py b/ipaserver/rpcserver.py
index f9a549f4e..5abbaf1a6 100644
--- a/ipaserver/rpcserver.py
+++ b/ipaserver/rpcserver.py
@@ -28,7 +28,7 @@ from xml.sax.saxutils import escape
from xmlrpclib import Fault
from ipalib import plugable
from ipalib.backend import Executioner
-from ipalib.errors import PublicError, InternalError, CommandError, JSONError, ConversionError, CCacheError, RefererError, InvalidSessionPassword
+from ipalib.errors import PublicError, InternalError, CommandError, JSONError, ConversionError, CCacheError, RefererError, InvalidSessionPassword, NotFound, ACIError, ExecutionError
from ipalib.request import context, Connection, destroy_context
from ipalib.rpc import xml_dumps, xml_loads
from ipalib.util import parse_time_duration
@@ -100,6 +100,18 @@ _unauthorized_template = """<html>
</body>
</html>"""
+_pwchange_template = """<html>
+<head>
+<title>200 Success</title>
+</head>
+<body>
+<h1>%(title)s</h1>
+<p>
+<strong>%(message)s</strong>
+</p>
+</body>
+</html>"""
+
class HTTP_Status(plugable.Plugin):
def not_found(self, environ, start_response, url, message):
"""
@@ -992,3 +1004,97 @@ class login_password(Backend, KerberosSession, HTTP_Status):
if returncode != 0:
raise InvalidSessionPassword(principal=principal, message=unicode(stderr))
+class change_password(Backend, HTTP_Status):
+
+ content_type = 'text/plain'
+ key = '/session/change_password'
+
+ def __init__(self):
+ super(change_password, self).__init__()
+
+ def _on_finalize(self):
+ super(change_password, self)._on_finalize()
+ self.api.Backend.wsgi_dispatch.mount(self, self.key)
+
+ def __call__(self, environ, start_response):
+ self.info('WSGI change_password.__call__:')
+
+ # Get the user and password parameters from the request
+ content_type = environ.get('CONTENT_TYPE', '').lower()
+ if not content_type.startswith('application/x-www-form-urlencoded'):
+ return self.bad_request(environ, start_response, "Content-Type must be application/x-www-form-urlencoded")
+
+ method = environ.get('REQUEST_METHOD', '').upper()
+ if method == 'POST':
+ query_string = read_input(environ)
+ else:
+ return self.bad_request(environ, start_response, "HTTP request method must be POST")
+
+ try:
+ query_dict = urlparse.parse_qs(query_string)
+ except Exception, e:
+ return self.bad_request(environ, start_response, "cannot parse query data")
+
+ data = {}
+ for field in ('user', 'old_password', 'new_password'):
+ value = query_dict.get(field, None)
+ if value is not None:
+ if len(value) == 1:
+ data[field] = value[0]
+ else:
+ return self.bad_request(environ, start_response, "more than one %s parameter"
+ % field)
+ else:
+ return self.bad_request(environ, start_response, "no %s specified" % field)
+
+ # start building the response
+ self.info("WSGI change_password: start password change of user '%s'", data['user'])
+ status = HTTP_STATUS_SUCCESS
+ response_headers = [('Content-Type', 'text/html; charset=utf-8')]
+ title = 'Password change rejected'
+ result = 'error'
+ policy_error = None
+
+ bind_dn = str(DN((self.api.Object.user.primary_key.name, data['user']),
+ self.api.env.container_user, self.api.env.basedn))
+
+ try:
+ conn = ldap2(shared_instance=False,
+ ldap_uri=self.api.env.ldap_uri)
+ conn.connect(bind_dn=bind_dn, bind_pw=data['old_password'])
+ except (NotFound, ACIError):
+ result = 'invalid-password'
+ message = 'The old password or username is not correct.'
+ except Exception, e:
+ message = "Could not connect to LDAP server."
+ self.error("change_password: cannot authenticate '%s' to LDAP server: %s",
+ data['user'], str(e))
+ else:
+ try:
+ conn.modify_password(bind_dn, data['new_password'], data['old_password'])
+ except ExecutionError, e:
+ result = 'policy-error'
+ policy_error = escape(str(e))
+ message = "Password change was rejected: %s" % escape(str(e))
+ except Exception, e:
+ message = "Could not change the password"
+ self.error("change_password: cannot change password of '%s': %s",
+ data['user'], str(e))
+ else:
+ result = 'ok'
+ title = "Password change successful"
+ message = "Password was changed."
+ finally:
+ if conn.isconnected():
+ conn.destroy_connection()
+
+ self.info('%s: %s', status, message)
+
+ response_headers.append(('X-IPA-Pwchange-Result', result))
+ if policy_error:
+ response_headers.append(('X-IPA-Pwchange-Policy-Error', policy_error))
+
+ start_response(status, response_headers)
+ output = _pwchange_template % dict(title=str(title),
+ message=str(message))
+ return [output]
diff --git a/tests/test_ipaserver/httptest.py b/tests/test_ipaserver/httptest.py
new file mode 100644
index 000000000..7f1b5b136
--- /dev/null
+++ b/tests/test_ipaserver/httptest.py
@@ -0,0 +1,52 @@
+# Authors:
+# Martin Kosek <mkosek@redhat.com>
+#
+# Copyright (C) 2012 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""
+Base class for HTTP request tests
+"""
+
+import urllib
+import httplib
+
+from ipalib import api
+
+class Unauthorized_HTTP_test(object):
+ """
+ Base class for simple HTTP request tests executed against URI
+ with no required authorization
+ """
+ app_uri = ''
+ host = api.env.host
+ content_type = 'application/x-www-form-urlencoded'
+
+ def send_request(self, method='POST', params=None):
+ """
+ Send a request to HTTP server
+
+ :param key When not None, overrides default app_uri
+ """
+ if params is not None:
+ params = urllib.urlencode(params, True)
+ url = 'https://' + self.host + self.app_uri
+
+ headers = {'Content-Type' : self.content_type,
+ 'Referer' : url}
+
+ conn = httplib.HTTPSConnection(self.host)
+ conn.request(method, self.app_uri, params, headers)
+ return conn.getresponse()
diff --git a/tests/test_ipaserver/test_changepw.py b/tests/test_ipaserver/test_changepw.py
new file mode 100644
index 000000000..035766855
--- /dev/null
+++ b/tests/test_ipaserver/test_changepw.py
@@ -0,0 +1,109 @@
+# Authors:
+# Martin Kosek <mkosek@redhat.com>
+#
+# Copyright (C) 2012 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import nose
+
+from httptest import Unauthorized_HTTP_test
+from tests.test_xmlrpc.xmlrpc_test import XMLRPC_test
+from tests.util import assert_equal, assert_not_equal
+from ipalib import api, errors
+from ipalib.dn import DN
+import ldap
+
+testuser = u'tuser'
+old_password = u'old_password'
+new_password = u'new_password'
+
+class test_changepw(XMLRPC_test, Unauthorized_HTTP_test):
+ app_uri = '/ipa/session/change_password'
+
+ def setUp(self):
+ super(test_changepw, self).setUp()
+ try:
+ api.Command['user_add'](uid=testuser, givenname=u'Test', sn=u'User')
+ api.Command['passwd'](testuser, password=u'old_password')
+ except errors.ExecutionError, e:
+ raise nose.SkipTest(
+ 'Cannot set up test user: %s' % e
+ )
+
+ def tearDown(self):
+ try:
+ api.Command['user_del']([testuser])
+ except errors.NotFound:
+ pass
+ super(test_changepw, self).tearDown()
+
+ def _changepw(self, user, old_password, new_password):
+ return self.send_request(params={'user': str(user),
+ 'old_password' : str(old_password),
+ 'new_password' : str(new_password)},
+ )
+
+ def _checkpw(self, user, password):
+ dn = str(DN(('uid', user),
+ api.env.container_user,
+ api.env.basedn))
+ conn = ldap.initialize(api.env.ldap_uri)
+ try:
+ conn.simple_bind_s(dn, password)
+ finally:
+ conn.unbind_s()
+
+ def test_bad_options(self):
+ for params in (None, # no params
+ {'user': 'foo'}, # missing options
+ {'user': 'foo',
+ 'old_password' : 'old'}, # missing option
+ {'user': 'foo',
+ 'old_password' : 'old',
+ 'new_password' : ''}, # empty option
+ ):
+ response = self.send_request(params=params)
+ assert_equal(response.status, 400)
+ assert_equal(response.reason, 'Bad Request')
+
+ def test_invalid_auth(self):
+ response = self._changepw(testuser, 'wrongpassword', 'new_password')
+
+ assert_equal(response.status, 200)
+ assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'invalid-password')
+
+ # make sure that password is NOT changed
+ self._checkpw(testuser, old_password)
+
+ def test_pwpolicy_error(self):
+ response = self._changepw(testuser, old_password, '1')
+
+ assert_equal(response.status, 200)
+ assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'policy-error')
+ assert_equal(response.getheader('X-IPA-Pwchange-Policy-Error'),
+ 'Constraint violation: Password is too short')
+
+ # make sure that password is NOT changed
+ self._checkpw(testuser, old_password)
+
+ def test_pwpolicy_success(self):
+ response = self._changepw(testuser, old_password, new_password)
+
+ assert_equal(response.status, 200)
+ assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'ok')
+
+ # make sure that password IS changed
+ self._checkpw(testuser, new_password)