summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimo Sorce <simo@redhat.com>2015-06-05 13:00:50 -0400
committerSimo Sorce <simo@redhat.com>2015-06-05 13:00:50 -0400
commit1b55bc36f4176c0d4c6ea8ac5196b48776c08eeb (patch)
treeab5c9cac9eab793007d1859366710b5c67863ae0
parenteff99ee7a065e6e122dbd7cee136a2651073d224 (diff)
downloadcustodia-1b55bc36f4176c0d4c6ea8ac5196b48776c08eeb.tar.gz
custodia-1b55bc36f4176c0d4c6ea8ac5196b48776c08eeb.tar.xz
custodia-1b55bc36f4176c0d4c6ea8ac5196b48776c08eeb.zip
Change KEMClient to be able to encrypt requests
Also fix errors in handling encrypted requests, as well errors in the test suite. Signed-off-by: Simo Sorce <simo@redhat.com>
-rw-r--r--custodia/message/kem.py80
1 files changed, 50 insertions, 30 deletions
diff --git a/custodia/message/kem.py b/custodia/message/kem.py
index 343cb90..ebe2a47 100644
--- a/custodia/message/kem.py
+++ b/custodia/message/kem.py
@@ -128,25 +128,26 @@ class KEMHandler(MessageHandler):
try:
token = jtok.token
- if isinstance(token, JWS):
- skey = self._get_key(token.jose_header, KEY_USAGE_SIG)
- ekey = self._get_key(token.jose_header, KEY_USAGE_ENC)
- self.client_keys = (JWK(**skey), JWK(**ekey))
- token.verify(self.client_keys[0])
- claims = json_decode(token.payload)
- elif isinstance(token, JWE):
- token.decrypt(self.kkstore.server_keys[1])
- # If an ecnrypted payload is received then there must be
- # a nestd signed payload to verify the provenance.
- nested = JWS()
- nested.deserialize(token.payload)
- skey = self._get_key(nested.jose_header, KEY_USAGE_SIG)
- ekey = self._get_key(nested.jose_header, KEY_USAGE_ENC)
- self.client_keys = (JWK(**skey), JWK(**ekey))
- nested.verify(self.client_keys[0])
- claims = json_decode(nested.payload)
+ if isinstance(token, JWE):
+ token.decrypt(self.kkstore.server_keys[KEY_USAGE_ENC])
+ # If an encrypted payload is received then there must be
+ # a nested signed payload to verify the provenance.
+ payload = token.payload
+ token = JWS()
+ token.deserialize(payload)
+ elif isinstance(token, JWS):
+ pass
else:
raise TypeError("Invalid Token type: %s" % type(jtok))
+
+ # Retrieve client keys for later use
+ self.client_keys = [
+ JWK(**self._get_key(token.jose_header, KEY_USAGE_SIG)),
+ JWK(**self._get_key(token.jose_header, KEY_USAGE_ENC))]
+
+ # verify token and get payload
+ token.verify(self.client_keys[KEY_USAGE_SIG])
+ claims = json_decode(token.payload)
except Exception as e:
raise InvalidMessage('Failed to validate message: %s' % str(e))
@@ -186,18 +187,24 @@ class KEMHandler(MessageHandler):
class KEMClient(object):
- def __init__(self, server_key, client_keys):
- self.server_key = server_key
+ def __init__(self, server_keys, client_keys):
+ self.server_keys = server_keys
self.client_keys = client_keys
- def make_request(self, name, value = None, alg = "RS256"):
- return make_sig_kem(name, value, self.client_keys[KEY_USAGE_SIG], alg)
+ def make_request(self, name, value = None, alg = "RS256", encalg = None):
+ if encalg is None:
+ return make_sig_kem(name, value,
+ self.client_keys[KEY_USAGE_SIG], alg)
+ else:
+ return make_enc_kem(name, value,
+ self.client_keys[KEY_USAGE_SIG], alg,
+ self.server_keys[KEY_USAGE_ENC], encalg)
def parse_reply(self, message):
E = JWT(jwt=message,
key=self.client_keys[KEY_USAGE_ENC])
S = JWT(jwt=E.claims,
- key=self.server_key)
+ key=self.server_keys[KEY_USAGE_SIG])
return S.claims
@@ -224,7 +231,7 @@ import unittest
from custodia.store.sqlite import SqliteStore
-server_keys = ({
+test_keys = ({
"kty": "RSA",
"kid": "65d64463-7448-499e-8acc-55db2ce67039",
"use": "sig",
@@ -315,7 +322,7 @@ class KEMTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
config = {
- 'server_keys': server_keys[0]['kid'],
+ 'server_keys': test_keys[0]['kid'],
'signing_algorithm': 'RS256',
'encryption_algorithms': 'RSA1_5 A128CBC-HS256'}
with open('examples/client_enc.key') as f:
@@ -323,10 +330,10 @@ class KEMTests(unittest.TestCase):
cls.client_keys = json_decode(data)
cls.kk = KEMKeysStore(config)
cls.kk.store = SqliteStore({'dburi': 'kemtests.db'})
- _store_keys(cls.kk.store, KEY_USAGE_SIG, server_keys)
- _store_keys(cls.kk.store, KEY_USAGE_SIG, server_keys)
- _store_keys(cls.kk.store, KEY_USAGE_SIG, cls.client_keys)
+ _store_keys(cls.kk.store, KEY_USAGE_SIG, test_keys)
+ _store_keys(cls.kk.store, KEY_USAGE_ENC, test_keys)
_store_keys(cls.kk.store, KEY_USAGE_SIG, cls.client_keys)
+ _store_keys(cls.kk.store, KEY_USAGE_ENC, cls.client_keys)
@classmethod
def AtearDownClass(self):
@@ -357,18 +364,31 @@ class KEMTests(unittest.TestCase):
jtok.token.decrypt(cli_ekey)
nested = jtok.token.payload
jtok = JWT(jwt=nested)
- jtok.token.verify(JWK(**server_keys[0]))
+ jtok.token.verify(JWK(**test_keys[0]))
payload = json_decode(jtok.token.payload)['value']
self.assertEqual(payload, 'output')
def test_2_KEMClient(self):
- server_key = JWK(**server_keys[KEY_USAGE_SIG])
+ server_keys = [JWK(**test_keys[KEY_USAGE_SIG]), None]
client_keys = [JWK(**self.client_keys[KEY_USAGE_SIG]),
JWK(**self.client_keys[KEY_USAGE_ENC])]
- cli = KEMClient(server_key, client_keys)
+ cli = KEMClient(server_keys, client_keys)
kem = KEMHandler({'KEMKeysStore': self.kk})
req = cli.make_request("key name")
kem.parse(req, "key name")
msg = json_decode(kem.reply('key value'))
rep = json_decode(cli.parse_reply(msg['value']))
self.assertEqual(rep['value'], 'key value')
+
+ def test_3_KEMClient(self):
+ server_keys = [JWK(**test_keys[KEY_USAGE_SIG]),
+ JWK(**test_keys[KEY_USAGE_ENC])]
+ client_keys = [JWK(**self.client_keys[KEY_USAGE_SIG]),
+ JWK(**self.client_keys[KEY_USAGE_ENC])]
+ cli = KEMClient(server_keys, client_keys)
+ kem = KEMHandler({'KEMKeysStore': self.kk})
+ req = cli.make_request("key name", encalg=('RSA1_5', 'A256CBC-HS512'))
+ kem.parse(req, "key name")
+ msg = json_decode(kem.reply('key value'))
+ rep = json_decode(cli.parse_reply(msg['value']))
+ self.assertEqual(rep['value'], 'key value')