From 1b55bc36f4176c0d4c6ea8ac5196b48776c08eeb Mon Sep 17 00:00:00 2001 From: Simo Sorce Date: Fri, 5 Jun 2015 13:00:50 -0400 Subject: Change KEMClient to be able to encrypt requests Also fix errors in handling encrypted requests, as well errors in the test suite. Signed-off-by: Simo Sorce --- custodia/message/kem.py | 80 ++++++++++++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/custodia/message/kem.py b/custodia/message/kem.py index 343cb90..ebe2a47 100644 --- a/custodia/message/kem.py +++ b/custodia/message/kem.py @@ -128,25 +128,26 @@ class KEMHandler(MessageHandler): try: token = jtok.token - if isinstance(token, JWS): - skey = self._get_key(token.jose_header, KEY_USAGE_SIG) - ekey = self._get_key(token.jose_header, KEY_USAGE_ENC) - self.client_keys = (JWK(**skey), JWK(**ekey)) - token.verify(self.client_keys[0]) - claims = json_decode(token.payload) - elif isinstance(token, JWE): - token.decrypt(self.kkstore.server_keys[1]) - # If an ecnrypted payload is received then there must be - # a nestd signed payload to verify the provenance. - nested = JWS() - nested.deserialize(token.payload) - skey = self._get_key(nested.jose_header, KEY_USAGE_SIG) - ekey = self._get_key(nested.jose_header, KEY_USAGE_ENC) - self.client_keys = (JWK(**skey), JWK(**ekey)) - nested.verify(self.client_keys[0]) - claims = json_decode(nested.payload) + if isinstance(token, JWE): + token.decrypt(self.kkstore.server_keys[KEY_USAGE_ENC]) + # If an encrypted payload is received then there must be + # a nested signed payload to verify the provenance. + payload = token.payload + token = JWS() + token.deserialize(payload) + elif isinstance(token, JWS): + pass else: raise TypeError("Invalid Token type: %s" % type(jtok)) + + # Retrieve client keys for later use + self.client_keys = [ + JWK(**self._get_key(token.jose_header, KEY_USAGE_SIG)), + JWK(**self._get_key(token.jose_header, KEY_USAGE_ENC))] + + # verify token and get payload + token.verify(self.client_keys[KEY_USAGE_SIG]) + claims = json_decode(token.payload) except Exception as e: raise InvalidMessage('Failed to validate message: %s' % str(e)) @@ -186,18 +187,24 @@ class KEMHandler(MessageHandler): class KEMClient(object): - def __init__(self, server_key, client_keys): - self.server_key = server_key + def __init__(self, server_keys, client_keys): + self.server_keys = server_keys self.client_keys = client_keys - def make_request(self, name, value = None, alg = "RS256"): - return make_sig_kem(name, value, self.client_keys[KEY_USAGE_SIG], alg) + def make_request(self, name, value = None, alg = "RS256", encalg = None): + if encalg is None: + return make_sig_kem(name, value, + self.client_keys[KEY_USAGE_SIG], alg) + else: + return make_enc_kem(name, value, + self.client_keys[KEY_USAGE_SIG], alg, + self.server_keys[KEY_USAGE_ENC], encalg) def parse_reply(self, message): E = JWT(jwt=message, key=self.client_keys[KEY_USAGE_ENC]) S = JWT(jwt=E.claims, - key=self.server_key) + key=self.server_keys[KEY_USAGE_SIG]) return S.claims @@ -224,7 +231,7 @@ import unittest from custodia.store.sqlite import SqliteStore -server_keys = ({ +test_keys = ({ "kty": "RSA", "kid": "65d64463-7448-499e-8acc-55db2ce67039", "use": "sig", @@ -315,7 +322,7 @@ class KEMTests(unittest.TestCase): @classmethod def setUpClass(cls): config = { - 'server_keys': server_keys[0]['kid'], + 'server_keys': test_keys[0]['kid'], 'signing_algorithm': 'RS256', 'encryption_algorithms': 'RSA1_5 A128CBC-HS256'} with open('examples/client_enc.key') as f: @@ -323,10 +330,10 @@ class KEMTests(unittest.TestCase): cls.client_keys = json_decode(data) cls.kk = KEMKeysStore(config) cls.kk.store = SqliteStore({'dburi': 'kemtests.db'}) - _store_keys(cls.kk.store, KEY_USAGE_SIG, server_keys) - _store_keys(cls.kk.store, KEY_USAGE_SIG, server_keys) - _store_keys(cls.kk.store, KEY_USAGE_SIG, cls.client_keys) + _store_keys(cls.kk.store, KEY_USAGE_SIG, test_keys) + _store_keys(cls.kk.store, KEY_USAGE_ENC, test_keys) _store_keys(cls.kk.store, KEY_USAGE_SIG, cls.client_keys) + _store_keys(cls.kk.store, KEY_USAGE_ENC, cls.client_keys) @classmethod def AtearDownClass(self): @@ -357,18 +364,31 @@ class KEMTests(unittest.TestCase): jtok.token.decrypt(cli_ekey) nested = jtok.token.payload jtok = JWT(jwt=nested) - jtok.token.verify(JWK(**server_keys[0])) + jtok.token.verify(JWK(**test_keys[0])) payload = json_decode(jtok.token.payload)['value'] self.assertEqual(payload, 'output') def test_2_KEMClient(self): - server_key = JWK(**server_keys[KEY_USAGE_SIG]) + server_keys = [JWK(**test_keys[KEY_USAGE_SIG]), None] client_keys = [JWK(**self.client_keys[KEY_USAGE_SIG]), JWK(**self.client_keys[KEY_USAGE_ENC])] - cli = KEMClient(server_key, client_keys) + cli = KEMClient(server_keys, client_keys) kem = KEMHandler({'KEMKeysStore': self.kk}) req = cli.make_request("key name") kem.parse(req, "key name") msg = json_decode(kem.reply('key value')) rep = json_decode(cli.parse_reply(msg['value'])) self.assertEqual(rep['value'], 'key value') + + def test_3_KEMClient(self): + server_keys = [JWK(**test_keys[KEY_USAGE_SIG]), + JWK(**test_keys[KEY_USAGE_ENC])] + client_keys = [JWK(**self.client_keys[KEY_USAGE_SIG]), + JWK(**self.client_keys[KEY_USAGE_ENC])] + cli = KEMClient(server_keys, client_keys) + kem = KEMHandler({'KEMKeysStore': self.kk}) + req = cli.make_request("key name", encalg=('RSA1_5', 'A256CBC-HS512')) + kem.parse(req, "key name") + msg = json_decode(kem.reply('key value')) + rep = json_decode(cli.parse_reply(msg['value'])) + self.assertEqual(rep['value'], 'key value') -- cgit