summaryrefslogtreecommitdiffstats
path: root/tests/custodia.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/custodia.py')
-rw-r--r--tests/custodia.py57
1 files changed, 53 insertions, 4 deletions
diff --git a/tests/custodia.py b/tests/custodia.py
index 9db0e4b..2109e38 100644
--- a/tests/custodia.py
+++ b/tests/custodia.py
@@ -14,7 +14,8 @@ from jwcrypto import jwk
from requests.exceptions import HTTPError
-from custodia.client import CustodiaSimpleClient
+from custodia.client import CustodiaKEMClient, CustodiaSimpleClient
+from custodia.store.sqlite import SqliteStore
TEST_CUSTODIA_CONF = """
@@ -83,8 +84,15 @@ master_enctype = A128CBC-HS256
handler = custodia.httpd.authorizers.SimplePathAuthz
paths = /enc
+[authz:enc_kem]
+handler = custodia.message.kem.KEMKeysStore
+server_keys = srvkid
+store = simple
+paths = /enc/kem
+
[/enc]
handler = custodia.secrets.Secrets
+allowed_keytypes = simple kem
store = encgen
"""
@@ -100,11 +108,27 @@ def unlink_if_exists(filename):
raise
-def generate_key(filename):
+def generate_all_keys(filename):
key = jwk.JWK(generate='oct', size=256)
- with (open(filename, 'w+')) as keyfile:
+ with open(filename, 'w+') as keyfile:
keyfile.write(key.export())
+ srv_kid = "srvkid"
+ cli_kid = "clikid"
+ ss_key = jwk.JWK(generate='RSA', kid=srv_kid, use="sig")
+ se_key = jwk.JWK(generate='RSA', kid=srv_kid, use="enc")
+ store = SqliteStore({'dburi': 'test_secrets.db', 'table': 'secrets'})
+ store.set('kemkeys/sig/%s' % srv_kid, ss_key.export())
+ store.set('kemkeys/enc/%s' % srv_kid, se_key.export())
+
+ cs_key = jwk.JWK(generate='RSA', kid=cli_kid, use="sig")
+ ce_key = jwk.JWK(generate='RSA', kid=cli_kid, use="enc")
+ store = SqliteStore({'dburi': 'test_secrets.db', 'table': 'secrets'})
+ store.set('kemkeys/sig/%s' % cli_kid, cs_key.export_public())
+ store.set('kemkeys/enc/%s' % cli_kid, ce_key.export_public())
+ return ([ss_key.export_public(), se_key.export_public()],
+ [cs_key.export(), ce_key.export()])
+
class CustodiaTests(unittest.TestCase):
@@ -122,7 +146,7 @@ class CustodiaTests(unittest.TestCase):
cls.socket_url = TEST_SOCKET_URL
cls.test_auth_id = "test_user"
cls.test_auth_key = "cd54b735-e756-4f12-aa18-d85509baef36"
- generate_key('test_mkey.conf')
+ (srvkeys, clikeys) = generate_all_keys('test_mkey.conf')
with (open('test_custodia.conf', 'w+')) as conffile:
t = Template(TEST_CUSTODIA_CONF)
conf = t.substitute({'SOCKET_URL': cls.socket_url,
@@ -149,6 +173,11 @@ class CustodiaTests(unittest.TestCase):
cls.enc = CustodiaSimpleClient(cls.socket_url + '/enc')
cls.enc.headers['REMOTE_USER'] = 'enc'
+ cls.kem = CustodiaKEMClient(cls.socket_url + '/enc')
+ cls.kem.headers['REMOTE_USER'] = 'kem'
+ cls.kem.set_server_public_keys(*srvkeys)
+ cls.kem.set_client_keys(*clikeys)
+
@classmethod
def tearDownClass(cls):
cls.custodia_process.kill()
@@ -223,3 +252,23 @@ class CustodiaTests(unittest.TestCase):
self.assertNotEqual(key, 'simple')
key = self.enc.get_secret('enc/key')
self.assertEqual(key, 'simple')
+
+ def test_B_1_kem_create_container(self):
+ self.kem.create_container('kem')
+ cl = self.kem.list_container('kem')
+ self.assertEqual(cl, [])
+ self.kem.set_secret('kem/key', 'Protected')
+ cl = self.kem.list_container('kem')
+ self.assertEqual(cl, ['key'])
+ value = self.kem.get_secret('kem/key')
+ self.assertEqual(value, 'Protected')
+ self.kem.del_secret('kem/key')
+ try:
+ self.kem.get_secret('kem/key')
+ except HTTPError:
+ self.assertEqual(self.kem.last_response.status_code, 404)
+ self.kem.delete_container('kem')
+ try:
+ self.kem.list_container('kem')
+ except HTTPError:
+ self.assertEqual(self.kem.last_response.status_code, 404)