summaryrefslogtreecommitdiffstats
path: root/jwcrypto/jwk.py
diff options
context:
space:
mode:
Diffstat (limited to 'jwcrypto/jwk.py')
-rw-r--r--jwcrypto/jwk.py254
1 files changed, 254 insertions, 0 deletions
diff --git a/jwcrypto/jwk.py b/jwcrypto/jwk.py
new file mode 100644
index 0000000..25744ef
--- /dev/null
+++ b/jwcrypto/jwk.py
@@ -0,0 +1,254 @@
+# Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file
+
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric import ec
+from jwcrypto.common import base64url_decode
+import json
+
+# draft-ietf-jose-json-web-algorithms-24 - 7.4
+JWKTypesRegistry = {'EC': 'Elliptic Curve',
+ 'RSA': 'RSA',
+ 'oct': 'Octet sequence'}
+
+# draft-ietf-jose-json-web-algorithms-24 - 7.5
+# It is part of the JWK Parameters Registry, but we want a more
+# specific map for internal usage
+JWKValuesRegistry = {'EC': {'crv': ('Curve', 'Public'),
+ 'x': ('X Coordinate', 'Public'),
+ 'y': ('Y Coordinate', 'Public'),
+ 'd': ('ECC Private Key', 'Private')},
+ 'RSA': {'n': ('Modulus', 'Public'),
+ 'e': ('Exponent', 'Public'),
+ 'd': ('Private Exponent', 'Private'),
+ 'p': ('First Prime Factor', 'Private'),
+ 'q': ('Second Prime Factor', 'Private'),
+ 'dp': ('First Factor CRT Exponent', 'Private'),
+ 'dq': ('Second Factor CRT Exponent', 'Private'),
+ 'qi': ('First CRT Coefficient', 'Private')},
+ 'oct': {'k': ('Key Value', 'Private')}}
+
+JWKParamsRegistry = {'kty': ('Key Type', 'Public', ),
+ 'use': ('Public Key Use', 'Public'),
+ 'key_ops': ('Key Operations', 'Public'),
+ 'alg': ('Algorithm', 'Public'),
+ 'kid': ('Key ID', 'Public'),
+ 'x5u': ('X.509 URL', 'Public'),
+ 'x5c': ('X.509 Certificate Chain', 'Public'),
+ 'x5t': ('X.509 Certificate SHA-1 Thumbprint', 'Public'),
+ 'x5t#S256': ('X.509 Certificate SHA-256 Thumbprint',
+ 'Public')}
+
+# draft-ietf-jose-json-web-algorithms-24 - 7.6
+JWKEllipticCurveRegistry = {'P-256': 'P-256 curve',
+ 'P-384': 'P-384 curve',
+ 'P-521': 'P-521 curve'}
+
+# draft-ietf-jose-json-web-key-41 - 8.2
+JWKUseRegistry = {'sig': 'Digital Signature or MAC',
+ 'enc': 'Encryption'}
+
+# draft-ietf-jose-json-web-key-41 - 8.2
+JWKOperationsRegistry = {'sign': 'Compute digital Signature or MAC',
+ 'verify': 'Verify digital signature or MAC',
+ 'encrypt': 'Encrypt content',
+ 'decrypt': 'Decrypt content and validate'
+ ' decryption, if applicable',
+ 'wrapKey': 'Encrypt key',
+ 'unwrapKey': 'Decrypt key and validate'
+ ' decryption, if applicable',
+ 'deriveKey': 'Derive key',
+ 'deriveBits': 'Derive bits not to be used as a key'}
+
+
+class InvalidJWKType(Exception):
+
+ def __init__(self, value=None):
+ super(InvalidJWKType, self).__init__()
+ self.value = value
+
+ def __str__(self):
+ return 'Unknown type "%s", valid types are: %s' % (
+ self.value, JWKTypesRegistry.keys())
+
+
+class InvalidJWKUsage(Exception):
+
+ def __init__(self, use, value):
+ super(InvalidJWKUsage, self).__init__()
+ self.value = value
+ self.use = use
+
+ def __str__(self):
+ if self.use in JWKUseRegistry.keys():
+ usage = JWKUseRegistry[self.use]
+ else:
+ usage = 'Unknown(%s)' % self.use
+ if self.value in JWKUseRegistry.keys():
+ valid = JWKUseRegistry[self.value]
+ else:
+ valid = 'Unknown(%s)' % self.value
+ return 'Invalid usage requested: "%s". Valid for: "%s"' % (usage,
+ valid)
+
+
+class InvalidJWKOperation(Exception):
+
+ def __init__(self, operation, values):
+ super(InvalidJWKOperation, self).__init__()
+ self.op = operation
+ self.values = values
+
+ def __str__(self):
+ if self.op in JWKOperationsRegistry.keys():
+ op = JWKOperationsRegistry[self.op]
+ else:
+ op = 'Unknown(%s)' % self.op
+ valid = list()
+ for v in self.values:
+ if v in JWKOperationsRegistry.keys():
+ valid.append(JWKOperationsRegistry[v])
+ else:
+ valid.append('Unknown(%s)' % v)
+ return 'Invalid operation requested: "%s". Valid for: "%s"' % (op,
+ valid)
+
+
+class InvalidJWKValue(Exception):
+ pass
+
+
+class JWK(object):
+
+ def __init__(self, **kwargs):
+
+ names = kwargs.keys()
+
+ self._params = dict()
+ for name in JWKParamsRegistry.keys():
+ if name in kwargs:
+ self._params[name] = kwargs[name]
+ while name in names:
+ names.remove(name)
+
+ kty = self._params.get('kty', None)
+ if kty not in JWKTypesRegistry:
+ raise InvalidJWKType(kty)
+
+ self._key = dict()
+ for name in JWKValuesRegistry[kty].keys():
+ if name in kwargs:
+ self._key[name] = kwargs[name]
+ while name in names:
+ names.remove(name)
+
+ if len(names) != 0:
+ raise InvalidJWKValue('Unknown key parameters: %s' % names)
+
+ if len(self._key) == 0:
+ raise InvalidJWKValue('No Key Values found')
+
+ def export(self):
+ d = dict()
+ d.update(self._params)
+ d.update(self._key)
+ return json.dumps(d)
+
+ @property
+ def key_id(self):
+ return self._params.get('kid', None)
+
+ def get_curve(self, arg):
+ k = self._key
+ if self._params['kty'] != 'EC':
+ raise InvalidJWKType('Not an EC key')
+ if arg and k['crv'] != arg:
+ raise InvalidJWKValue('Curve requested is "%s", but '
+ 'key curve is "%s"' % (arg, k['crv']))
+ if k['crv'] == 'P-256':
+ return ec.SECP256R1()
+ elif k['crv'] == 'P-384':
+ return ec.SECP384R1()
+ elif k['crv'] == 'P-521':
+ return ec.SECP521R1()
+ else:
+ raise InvalidJWKValue('Unknown Elliptic Curve Type')
+
+ def _check_constraints(self, usage, operation):
+ use = self._params.get('use', None)
+ if use and use != usage:
+ raise InvalidJWKUsage(usage, use)
+ ops = self._params.get('key_ops', None)
+ if ops:
+ if not isinstance(ops, list):
+ ops = [ops]
+ if operation not in ops:
+ raise InvalidJWKOperation(operation, ops)
+ # TODO: check alg ?
+
+ def _decode_int(self, n):
+ return int(base64url_decode(n).encode('hex'), 16)
+
+ def sign_key(self, arg=None):
+ self._check_constraints('sig', 'sign')
+ if self._params['kty'] == 'oct':
+ return self._key['k']
+ elif self._params['kty'] == 'RSA':
+ k = self._key
+ pub = rsa.RSAPublicNumbers(self._decode_int(k['e']),
+ self._decode_int(k['n']))
+ pri = rsa.RSAPrivateNumbers(self._decode_int(k['p']),
+ self._decode_int(k['q']),
+ self._decode_int(k['d']),
+ self._decode_int(k['dp']),
+ self._decode_int(k['dq']),
+ self._decode_int(k['qi']), pub)
+ return pri.private_key(default_backend())
+ elif self._params['kty'] == 'EC':
+ k = self._key
+ pub = ec.EllipticCurvePublicNumbers(self._decode_int(k['x']),
+ self._decode_int(k['y']),
+ self.get_curve(arg))
+ pri = ec.EllipticCurvePrivateNumbers(self._decode_int(k['d']),
+ pub)
+ return pri.private_key(default_backend())
+ else:
+ raise NotImplementedError
+
+ def verify_key(self, arg=None):
+ self._check_constraints('sig', 'verify')
+ if self._params['kty'] == 'oct':
+ return self._key['k']
+ elif self._params['kty'] == 'RSA':
+ k = self._key
+ pub = rsa.RSAPublicNumbers(self._decode_int(k['e']),
+ self._decode_int(k['n']))
+ return pub.public_key(default_backend())
+ elif self._params['kty'] == 'EC':
+ k = self._key
+ pub = ec.EllipticCurvePublicNumbers(self._decode_int(k['x']),
+ self._decode_int(k['y']),
+ self.get_curve(arg))
+ return pub.public_key(default_backend())
+ else:
+ raise NotImplementedError
+
+
+class JWKSet(set):
+
+ def add(self, elem):
+ if not isinstance(elem, JWK):
+ raise TypeError('Only JWK objects are valid elements')
+ set.add(self, elem)
+
+ def export(self):
+ keys = list()
+ for jwk in self:
+ keys.append(json.loads(jwk.export()))
+ return json.dumps({'keys': keys})
+
+ def get_key(self, kid):
+ for jwk in self:
+ if jwk.key_id == kid:
+ return jwk
+ return None