summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimo Sorce <simo@redhat.com>2015-03-18 12:45:45 -0400
committerSimo Sorce <simo@redhat.com>2015-03-18 13:44:24 -0400
commit33f36ea10c1db2aaa74818c60933a20a9abe672f (patch)
treeb094dc4b961255ed92db9bbbb6ce76c5f1735d87
parent3302e14058a7ecbe39c7f403a3b0c4aa66d1f87d (diff)
downloadjwcrypto-33f36ea10c1db2aaa74818c60933a20a9abe672f.tar.gz
jwcrypto-33f36ea10c1db2aaa74818c60933a20a9abe672f.tar.xz
jwcrypto-33f36ea10c1db2aaa74818c60933a20a9abe672f.zip
The protected header is optional in some cases
Allow the use of a JWE without protected headers. Thanks to Jan Rusnacko for pointing out this flaw. Signed-off-by: Simo Sorce <simo@redhat.com>
-rw-r--r--jwcrypto/jwe.py50
-rw-r--r--jwcrypto/tests.py5
2 files changed, 30 insertions, 25 deletions
diff --git a/jwcrypto/jwe.py b/jwcrypto/jwe.py
index 40f8edb..a44b7fe 100644
--- a/jwcrypto/jwe.py
+++ b/jwcrypto/jwe.py
@@ -399,8 +399,8 @@ class JWE(object):
return _aes_gcm(256)
def _jwa(self, name):
- attr = '_jwa_%s' % name.replace('-', '_').replace('+', '_')
try:
+ attr = '_jwa_%s' % name.replace('-', '_').replace('+', '_')
return getattr(self, attr)()
except (KeyError, AttributeError):
raise InvalidJWAAlgorithm()
@@ -412,11 +412,24 @@ class JWE(object):
h1.update(h2)
return h1
+ def get_jose_header(self, header=None):
+ jh = dict()
+ if 'protected' in self.objects:
+ ph = json.loads(self.objects['protected'])
+ jh = self.merge_headers(jh, ph)
+ if 'unprotected' in self.objects:
+ uh = json.loads(self.objects['unprotected'])
+ jh = self.merge_headers(jh, uh)
+ if header:
+ rh = json.loads(header)
+ jh = self.merge_headers(jh, rh)
+ return jh
+
def add_recipient(self, key, header=None):
""" Encrypt the provided payload with the given key.
:param key: A JWK key of appropriate type for the "alg"
- provided in the 'protected' json string.
+ provided in the JOSE Headers.
See draft-ietf-jose-json-web-key-41
:param header: A JSON string representing the per-recipient header.
@@ -426,16 +439,9 @@ class JWE(object):
if not isinstance(key, JWK):
raise ValueError('key is not a JWK object')
- ph = json.loads(self.objects['protected'])
- if 'unprotected' in self.objects:
- uh = json.loads(self.objects['unprotected'])
- ph = self.merge_headers(ph, uh)
- if header:
- rh = json.loads(header)
- ph = self.merge_headers(ph, rh)
-
- alg = self._jwa(ph.get('alg', None))
- enc = self._jwa(ph.get('enc', None))
+ jh = self.get_jose_header(header)
+ alg = self._jwa(jh.get('alg', None))
+ enc = self._jwa(jh.get('enc', None))
rec = dict()
if header:
@@ -450,7 +456,7 @@ class JWE(object):
if 'aad' in self.objects:
aad += '.' + base64url_encode(self.objects['aad'])
- compress = ph.get('zip', None)
+ compress = jh.get('zip', None)
if compress == 'DEF':
data = zlib.compress(self.plaintext)[2:-4]
elif compress is None:
@@ -537,19 +543,13 @@ class JWE(object):
# FIXME: allow to specify which algorithms to accept as valid
def decrypt(self, key, ppe):
- ph = json.loads(self.objects['protected'])
- if 'unprotected' in self.objects:
- uh = json.loads(self.objects['unprotected'])
- ph = self.merge_headers(ph, uh)
- if 'header' in ppe:
- rh = json.loads(ppe['header'])
- ph = self.merge_headers(ph, rh)
+ jh = self.get_jose_header(ppe.get('header', None))
+
# TODO: allow caller to specify list of headers it understands
- if 'crit' in ph:
- self.check_crit(ph['crit'])
+ self.check_crit(jh.get('crit', dict()))
- alg = self._jwa(ph.get('alg', None))
- enc = self._jwa(ph.get('enc', None))
+ alg = self._jwa(jh.get('alg', None))
+ enc = self._jwa(jh.get('enc', None))
aad = base64url_encode(self.objects.get('protected', ''))
if 'aad' in self.objects:
@@ -563,7 +563,7 @@ class JWE(object):
self.decryptlog.append('Success')
self.cek = cek
- compress = ph.get('zip', None)
+ compress = jh.get('zip', None)
if compress == 'DEF':
self.plaintext = zlib.decompress(data, -zlib.MAX_WBITS)
elif compress is None:
diff --git a/jwcrypto/tests.py b/jwcrypto/tests.py
index 13dfeb7..d66a5b4 100644
--- a/jwcrypto/tests.py
+++ b/jwcrypto/tests.py
@@ -648,3 +648,8 @@ class ConformanceTests(unittest.TestCase):
kty='RSA', n=1, key_ops=['sign'], use='enc')
self.assertRaises(jwk.InvalidJWKValue, jwk.JWK,
kty='RSA', n=1, key_ops=['sign', 'sign'])
+
+ def test_jwe_no_protected_header(self):
+ enc = jwe.JWE(plaintext='plain')
+ enc.add_recipient(jwk.JWK(kty='oct', k='A'*32),
+ '{"alg":"A128KW","enc":"A128GCM"}')