summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--keystone/common/wsgi.py57
-rw-r--r--tests/test_wsgi.py31
2 files changed, 75 insertions, 13 deletions
diff --git a/keystone/common/wsgi.py b/keystone/common/wsgi.py
index 08540b6a..15d23525 100644
--- a/keystone/common/wsgi.py
+++ b/keystone/common/wsgi.py
@@ -20,6 +20,7 @@
"""Utility methods for working with WSGI servers."""
+import re
import socket
import sys
@@ -389,21 +390,23 @@ class Debug(Middleware):
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, req):
- LOG.debug('%s %s %s', ('*' * 20), 'REQUEST ENVIRON', ('*' * 20))
- for key, value in req.environ.items():
- LOG.debug('%s = %s', key, value)
- LOG.debug('')
- LOG.debug('%s %s %s', ('*' * 20), 'REQUEST BODY', ('*' * 20))
- for line in req.body_file:
- LOG.debug(line)
- LOG.debug('')
+ if LOG.isEnabledFor(logging.DEBUG):
+ LOG.debug('%s %s %s', ('*' * 20), 'REQUEST ENVIRON', ('*' * 20))
+ for key, value in req.environ.items():
+ LOG.debug('%s = %s', key, mask_password(value,
+ is_unicode=True))
+ LOG.debug('')
+ LOG.debug('%s %s %s', ('*' * 20), 'REQUEST BODY', ('*' * 20))
+ for line in req.body_file:
+ LOG.debug(mask_password(line))
+ LOG.debug('')
resp = req.get_response(self.application)
-
- LOG.debug('%s %s %s', ('*' * 20), 'RESPONSE HEADERS', ('*' * 20))
- for (key, value) in resp.headers.iteritems():
- LOG.debug('%s = %s', key, value)
- LOG.debug('')
+ if LOG.isEnabledFor(logging.DEBUG):
+ LOG.debug('%s %s %s', ('*' * 20), 'RESPONSE HEADERS', ('*' * 20))
+ for (key, value) in resp.headers.iteritems():
+ LOG.debug('%s = %s', key, value)
+ LOG.debug('')
resp.app_iter = self.print_generator(resp.app_iter)
@@ -580,3 +583,31 @@ def render_exception(error):
if isinstance(error, exception.AuthPluginException):
body['error']['identity'] = error.authentication
return render_response(status=(error.code, error.title), body=body)
+
+
+_RE_PASS = re.compile(r'([\'"].*?password[\'"]\s*:\s*u?[\'"]).*?([\'"])',
+ re.DOTALL)
+
+
+def mask_password(message, is_unicode=False, secret="***"):
+ """Replace password with 'secret' in message.
+
+ :param message: The string which include security information.
+ :param is_unicode: Is unicode string ?
+ :param secret: substitution string default to "***".
+ :returns: The string
+
+ For example:
+ >>> mask_password('"password" : "aaaaa"')
+ '"password" : "***"'
+ >>> mask_password("'original_password' : 'aaaaa'")
+ "'original_password' : '***'"
+ >>> mask_password("u'original_password' : u'aaaaa'")
+ "u'original_password' : u'***'"
+ """
+ if is_unicode:
+ message = unicode(message)
+ # Match the group 1,2 and replace all others with 'secret'
+ secret = r"\g<1>" + secret + r"\g<2>"
+ result = _RE_PASS.sub(secret, message)
+ return result
diff --git a/tests/test_wsgi.py b/tests/test_wsgi.py
index 9bc26017..87d9929d 100644
--- a/tests/test_wsgi.py
+++ b/tests/test_wsgi.py
@@ -38,6 +38,37 @@ class BaseWSGITest(test.TestCase):
req.environ['wsgiorg.routing_args'] = [None, args]
return req
+ def test_mask_password(self):
+ message = ("test = 'password': 'aaaaaa', 'param1': 'value1', "
+ "\"new_password\": 'bbbbbb'")
+ self.assertEqual(wsgi.mask_password(message, True),
+ u"test = 'password': '***', 'param1': 'value1', "
+ "\"new_password\": '***'")
+
+ message = "test = 'password' : 'aaaaaa'"
+ self.assertEqual(wsgi.mask_password(message, False, '111'),
+ "test = 'password' : '111'")
+
+ message = u"test = u'password' : u'aaaaaa'"
+ self.assertEqual(wsgi.mask_password(message, True),
+ u"test = u'password' : u'***'")
+
+ message = 'test = "password" : "aaaaaaaaa"'
+ self.assertEqual(wsgi.mask_password(message),
+ 'test = "password" : "***"')
+
+ message = 'test = "original_password" : "aaaaaaaaa"'
+ self.assertEqual(wsgi.mask_password(message),
+ 'test = "original_password" : "***"')
+
+ message = 'test = "original_password" : ""'
+ self.assertEqual(wsgi.mask_password(message),
+ 'test = "original_password" : "***"')
+
+ message = 'test = "param1" : "value"'
+ self.assertEqual(wsgi.mask_password(message),
+ 'test = "param1" : "value"')
+
class ApplicationTest(BaseWSGITest):
def test_response_content_type(self):