diff options
Diffstat (limited to 'tests/custodia.py')
-rw-r--r-- | tests/custodia.py | 57 |
1 files changed, 53 insertions, 4 deletions
diff --git a/tests/custodia.py b/tests/custodia.py index 9db0e4b..2109e38 100644 --- a/tests/custodia.py +++ b/tests/custodia.py @@ -14,7 +14,8 @@ from jwcrypto import jwk from requests.exceptions import HTTPError -from custodia.client import CustodiaSimpleClient +from custodia.client import CustodiaKEMClient, CustodiaSimpleClient +from custodia.store.sqlite import SqliteStore TEST_CUSTODIA_CONF = """ @@ -83,8 +84,15 @@ master_enctype = A128CBC-HS256 handler = custodia.httpd.authorizers.SimplePathAuthz paths = /enc +[authz:enc_kem] +handler = custodia.message.kem.KEMKeysStore +server_keys = srvkid +store = simple +paths = /enc/kem + [/enc] handler = custodia.secrets.Secrets +allowed_keytypes = simple kem store = encgen """ @@ -100,11 +108,27 @@ def unlink_if_exists(filename): raise -def generate_key(filename): +def generate_all_keys(filename): key = jwk.JWK(generate='oct', size=256) - with (open(filename, 'w+')) as keyfile: + with open(filename, 'w+') as keyfile: keyfile.write(key.export()) + srv_kid = "srvkid" + cli_kid = "clikid" + ss_key = jwk.JWK(generate='RSA', kid=srv_kid, use="sig") + se_key = jwk.JWK(generate='RSA', kid=srv_kid, use="enc") + store = SqliteStore({'dburi': 'test_secrets.db', 'table': 'secrets'}) + store.set('kemkeys/sig/%s' % srv_kid, ss_key.export()) + store.set('kemkeys/enc/%s' % srv_kid, se_key.export()) + + cs_key = jwk.JWK(generate='RSA', kid=cli_kid, use="sig") + ce_key = jwk.JWK(generate='RSA', kid=cli_kid, use="enc") + store = SqliteStore({'dburi': 'test_secrets.db', 'table': 'secrets'}) + store.set('kemkeys/sig/%s' % cli_kid, cs_key.export_public()) + store.set('kemkeys/enc/%s' % cli_kid, ce_key.export_public()) + return ([ss_key.export_public(), se_key.export_public()], + [cs_key.export(), ce_key.export()]) + class CustodiaTests(unittest.TestCase): @@ -122,7 +146,7 @@ class CustodiaTests(unittest.TestCase): cls.socket_url = TEST_SOCKET_URL cls.test_auth_id = "test_user" cls.test_auth_key = "cd54b735-e756-4f12-aa18-d85509baef36" - generate_key('test_mkey.conf') + (srvkeys, clikeys) = generate_all_keys('test_mkey.conf') with (open('test_custodia.conf', 'w+')) as conffile: t = Template(TEST_CUSTODIA_CONF) conf = t.substitute({'SOCKET_URL': cls.socket_url, @@ -149,6 +173,11 @@ class CustodiaTests(unittest.TestCase): cls.enc = CustodiaSimpleClient(cls.socket_url + '/enc') cls.enc.headers['REMOTE_USER'] = 'enc' + cls.kem = CustodiaKEMClient(cls.socket_url + '/enc') + cls.kem.headers['REMOTE_USER'] = 'kem' + cls.kem.set_server_public_keys(*srvkeys) + cls.kem.set_client_keys(*clikeys) + @classmethod def tearDownClass(cls): cls.custodia_process.kill() @@ -223,3 +252,23 @@ class CustodiaTests(unittest.TestCase): self.assertNotEqual(key, 'simple') key = self.enc.get_secret('enc/key') self.assertEqual(key, 'simple') + + def test_B_1_kem_create_container(self): + self.kem.create_container('kem') + cl = self.kem.list_container('kem') + self.assertEqual(cl, []) + self.kem.set_secret('kem/key', 'Protected') + cl = self.kem.list_container('kem') + self.assertEqual(cl, ['key']) + value = self.kem.get_secret('kem/key') + self.assertEqual(value, 'Protected') + self.kem.del_secret('kem/key') + try: + self.kem.get_secret('kem/key') + except HTTPError: + self.assertEqual(self.kem.last_response.status_code, 404) + self.kem.delete_container('kem') + try: + self.kem.list_container('kem') + except HTTPError: + self.assertEqual(self.kem.last_response.status_code, 404) |