diff options
Diffstat (limited to 'custodia/message/kem.py')
-rw-r--r-- | custodia/message/kem.py | 131 |
1 files changed, 94 insertions, 37 deletions
diff --git a/custodia/message/kem.py b/custodia/message/kem.py index 9d58420..f8dbd3a 100644 --- a/custodia/message/kem.py +++ b/custodia/message/kem.py @@ -13,6 +13,11 @@ import os import time +KEY_USAGE_SIG = 0 +KEY_USAGE_ENC = 1 +KEY_USAGE_MAP = {KEY_USAGE_SIG: 'sig', KEY_USAGE_ENC: 'enc'} + + class UnknownPublicKey(Exception): pass @@ -40,7 +45,7 @@ class KEMKeysStore(SimplePathAuthz): self.paths = [] if 'paths' in self.config: self.paths = self.config['paths'].split() - self._server_key = None + self._server_keys = None self._alg = None self._enc = None @@ -53,21 +58,23 @@ class KEMKeysStore(SimplePathAuthz): request['KEMKeysStore'] = self return inpath - def find_key(self, kid): - dbkey = self._db_key(kid) + def find_key(self, kid, usage): + dbkey = self._db_key('%s/%s' % (KEY_USAGE_MAP[usage], kid)) pubkey = self.store.get(dbkey) if pubkey is None: raise UnknownPublicKey(kid) return pubkey @property - def server_key(self): - if self._server_key is None: - if 'server_key' not in self.config: - raise UnknownPublicKey("Server Key not defined") - key = self.find_key(self.config['server_key']) - self._server_key = JWK(**(json_decode(key))) - return self._server_key + def server_keys(self): + if self._server_keys is None: + if 'server_keys' not in self.config: + raise UnknownPublicKey("Server Keys not defined") + skey = self.find_key(self.config['server_keys'], KEY_USAGE_SIG) + ekey = self.find_key(self.config['server_keys'], KEY_USAGE_ENC) + self._server_keys = [JWK(**(json_decode(skey))), + JWK(**(json_decode(ekey)))] + return self._server_keys @property def alg(self): @@ -87,14 +94,14 @@ class KEMHandler(MessageHandler): self.kkstore = self.req.get('KEMKeysStore', None) if self.kkstore is None: raise Exception('KEM KeyStore not configured') - self.client_key = None + self.client_keys = None self.name = None - def _get_key(self, header): + def _get_key(self, header, usage): if 'kid' not in header: raise InvalidMessage("Missing key identifier") - key = self.kkstore.find_key(header['kid']) + key = self.kkstore.find_key(header['kid'], usage) if key is None: raise UnknownPublicKey('Key found [kid:%s]' % header['kid']) return json_decode(key) @@ -119,19 +126,21 @@ class KEMHandler(MessageHandler): try: token = jtok.token if isinstance(token, JWS): - key = self._get_key(token.jose_header) - self.client_key = JWK(**key) - token.verify(self.client_key) + skey = self._get_key(token.jose_header, KEY_USAGE_SIG) + ekey = self._get_key(token.jose_header, KEY_USAGE_ENC) + self.client_keys = (JWK(**skey), JWK(**ekey)) + token.verify(self.client_keys[0]) payload = token.payload elif isinstance(token, JWE): - token.decrypt(self.kkstore.server_key) + token.decrypt(self.kkstore.server_keys[1]) # If an ecnrypted payload is received then there must be # a nestd signed payload to verify the provenance. nested = JWS() nested.deserialize(token.payload) - key = self._get_key(nested.jose_header) - self.client_key = JWK(**key) - nested.verify(self.client_key) + skey = self._get_key(nested.jose_header, KEY_USAGE_SIG) + ekey = self._get_key(nested.jose_header, KEY_USAGE_ENC) + self.client_keys = (JWK(**skey), JWK(**ekey)) + nested.verify(self.client_keys[0]) payload = nested.payload else: raise TypeError("Invalid Token type: %s" % type(jtok)) @@ -141,23 +150,23 @@ class KEMHandler(MessageHandler): # FIXME: check name/time return {'type': 'kem', - 'value': {'kid': self.client_key.key_id, + 'value': {'kid': self.client_keys[0].key_id, 'payload': payload}} def reply(self, output): - if self.client_key is None: + if self.client_keys is None: raise UnknownPublicKey("Peer key not defined") - ktype = self.client_key.key_type + ktype = self.client_keys[1].key_type if ktype == 'RSA': enc = ('RSA1_5', 'A256CBC-HS512') else: raise ValueError("'%s' type not supported yet" % ktype) value = make_enc_kem(self.name, output, - self.kkstore.server_key, + self.kkstore.server_keys[0], self.kkstore.alg, - self.client_key, enc) + self.client_keys[1], enc) return json_encode({'type': 'kem', 'value': value}) @@ -185,9 +194,10 @@ import unittest from custodia.store.sqlite import SqliteStore -server_key = { +server_keys = ({ "kty": "RSA", "kid": "65d64463-7448-499e-8acc-55db2ce67039", + "use": "sig", "n": "maxhbsmBtdQ3CNrKvprUE6n9lYcregDMLYNeTAWcLj8NnPU9XIYegT" "HVHQjxKDSHP2l-F5jS7sppG1wgdAqZyhnWvXhYNvcM7RfgKxqNx_xAHx" "6f3yy7s-M9PSNCwPC2lh6UAkR4I00EhV9lrypM9Pi4lBUop9t5fS9W5U" @@ -222,25 +232,71 @@ server_key = { "qi": "kC-lzZOqoFaZCr5l0tOVtREKoVqaAYhQiqIRGL-MzS4sCmRkxm5vZ" "lXYx6RtE1n_AagjqajlkjieGlxTTThHD8Iga6foGBMaAr5uR1hGQpSc7" "Gl7CF1DZkBJMTQN6EshYzZfxW08mIO8M6Rzuh0beL6fG9mkDcIyPrBXx" - "2bQ_mM"} + "2bQ_mM"}, { + "kty": "RSA", + "kid": "65d64463-7448-499e-8acc-55db2ce67039", + "use": "enc", + "n": "t6Q8PWSi1dkJj9hTP8hNYFlvadM7DflW9mWepOJhJ66w7nyoK1gPNq" + "FMSQRyO125Gp-TEkodhWr0iujjHVx7BcV0llS4w5ACGgPrcAd6ZcSR" + "0-Iqom-QFcNP8Sjg086MwoqQU_LYywlAGZ21WSdS_PERyGFiNnj3QQ" + "lO8Yns5jCtLCRwLHL0Pb1fEv45AuRIuUfVcPySBWYnDyGxvjYGDSM-" + "AqWS9zIQ2ZilgT-GqUmipg0XOC0Cc20rgLe2ymLHjpHciCKVAbY5-L" + "32-lSeZO-Os6U15_aXrk9Gw8cPUaX1_I8sLGuSiVdt3C_Fn2PZ3Z8i" + "744FPFGGcG1qs2Wz-Q", + "e": "AQAB", + "d": "GRtbIQmhOZtyszfgKdg4u_N-R_mZGU_9k7JQ_jn1DnfTuMdSNprTea" + "STyWfSNkuaAwnOEbIQVy1IQbWVV25NY3ybc_IhUJtfri7bAXYEReWa" + "Cl3hdlPKXy9UvqPYGR0kIXTQRqns-dVJ7jahlI7LyckrpTmrM8dWBo" + "4_PMaenNnPiQgO0xnuToxutRZJfJvG4Ox4ka3GORQd9CsCZ2vsUDms" + "XOfUENOyMqADC6p1M3h33tsurY15k9qMSpG9OX_IJAXmxzAh_tWiZO" + "wk2K4yxH9tS3Lq1yX8C1EWmeRDkK2ahecG85-oLKQt5VEpWHKmjOi_" + "gJSdSgqcN96X52esAQ", + "p": "2rnSOV4hKSN8sS4CgcQHFbs08XboFDqKum3sc4h3GRxrTmQdl1ZK9u" + "w-PIHfQP0FkxXVrx-WE-ZEbrqivH_2iCLUS7wAl6XvARt1KkIaUxPP" + "SYB9yk31s0Q8UK96E3_OrADAYtAJs-M3JxCLfNgqh56HDnETTQhH3r" + "CT5T3yJws", + "q": "1u_RiFDP7LBYh3N4GXLT9OpSKYP0uQZyiaZwBtOCBNJgQxaj10RWjs" + "Zu0c6Iedis4S7B_coSKB0Kj9PaPaBzg-IySRvvcQuPamQu66riMhjV" + "tG6TlV8CLCYKrYl52ziqK0E_ym2QnkwsUX7eYTB7LbAHRK9GqocDE5" + "B0f808I4s", + "dp": "KkMTWqBUefVwZ2_Dbj1pPQqyHSHjj90L5x_MOzqYAJMcLMZtbUtwK" + "qvVDq3tbEo3ZIcohbDtt6SbfmWzggabpQxNxuBpoOOf_a_HgMXK_l" + "hqigI4y_kqS1wY52IwjUn5rgRrJ-yYo1h41KR-vz2pYhEAeYrhttW" + "txVqLCRViD6c", + "dq": "AvfS0-gRxvn0bwJoMSnFxYcK1WnuEjQFluMGfwGitQBWtfZ1Er7t1" + "xDkbN9GQTB9yqpDoYaN06H7CFtrkxhJIBQaj6nkF5KKS3TQtQ5qCz" + "kOkmxIe3KRbBymXxkb5qwUpX5ELD5xFc6FeiafWYY63TmmEAu_lRF" + "COJ3xDea-ots", + "qi": "lSQi-w9CpyUReMErP1RsBLk7wNtOvs5EQpPqmuMvqW57NBUczScEo" + "PwmUqqabu9V0-Py4dQ57_bapoKRu1R90bvuFnU63SHWEFglZQvJDM" + "eAvmj4sm-Fp0oYu_neotgQ0hzbI5gry7ajdYy9-2lNx_76aBZoOUu" + "9HCJ-UsfSOI8"}) + + +def _store_keys(keystore, usage, keys): + name = os.path.join('kemkeys', + KEY_USAGE_MAP[usage], + keys[usage]['kid']) + keystore.set(name, json_encode(keys[usage]), True) class KEMTests(unittest.TestCase): + @classmethod def setUpClass(cls): config = { - 'server_key': server_key['kid'], + 'server_keys': server_keys[0]['kid'], 'signing_algorithm': 'RS256', 'encryption_algorithms': 'RSA1_5 A128CBC-HS256'} with open('examples/client_enc.key') as f: data = f.read() - cls.client_key = json_decode(data) + cls.client_keys = json_decode(data) cls.kk = KEMKeysStore(config) cls.kk.store = SqliteStore({'dburi': 'kemtests.db'}) - cls.kk.store.set(os.path.join('kemkeys', server_key['kid']), - json_encode(server_key), True) - cls.kk.store.set(os.path.join('kemkeys', cls.client_key['kid']), - json_encode(cls.client_key), True) + _store_keys(cls.kk.store, KEY_USAGE_SIG, server_keys) + _store_keys(cls.kk.store, KEY_USAGE_SIG, server_keys) + _store_keys(cls.kk.store, KEY_USAGE_SIG, cls.client_keys) + _store_keys(cls.kk.store, KEY_USAGE_SIG, cls.client_keys) @classmethod def AtearDownClass(self): @@ -261,15 +317,16 @@ class KEMTests(unittest.TestCase): return S.serialize() def test_1_Parse_GET(self): - cli_key = JWK(**self.client_key) - jtok = make_sig_kem("mykey", None, cli_key, "RS256") + cli_skey = JWK(**self.client_keys[0]) + jtok = make_sig_kem("mykey", None, cli_skey, "RS256") kem = KEMHandler({'KEMKeysStore': self.kk}) kem.parse(jtok) out = kem.reply('output') jtok = JWT(jwt=json_decode(out)['value']) - jtok.token.decrypt(cli_key) + cli_ekey = JWK(**self.client_keys[1]) + jtok.token.decrypt(cli_ekey) nested = jtok.token.payload jtok = JWT(jwt=nested) - jtok.token.verify(JWK(**server_key)) + jtok.token.verify(JWK(**server_keys[0])) payload = json_decode(jtok.token.payload)['value'] self.assertEqual(payload, 'output') |