From 3b7eed15c3f9da7381d240a762b0e557dd18ce96 Mon Sep 17 00:00:00 2001 From: Simo Sorce Date: Tue, 27 Oct 2015 14:47:35 -0400 Subject: Add support in the client for the kem message type This allows to easily use end-to-end encrypted requests and replies to fetch secrets. Signed-off-by: Simo Sorce --- custodia/client.py | 124 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) (limited to 'custodia/client.py') diff --git a/custodia/client.py b/custodia/client.py index 221080a..9647d68 100644 --- a/custodia/client.py +++ b/custodia/client.py @@ -2,6 +2,9 @@ import socket +from jwcrypto.common import json_decode +from jwcrypto.jwk import JWK + import requests from requests.adapters import HTTPAdapter @@ -10,6 +13,10 @@ from requests.compat import unquote, urlparse from requests.packages.urllib3.connection import HTTPConnection from requests.packages.urllib3.connectionpool import HTTPConnectionPool +from custodia.message.kem import ( + check_kem_claims, decode_enc_kem, make_enc_kem +) + class HTTPUnixConnection(HTTPConnection): @@ -150,3 +157,120 @@ class CustodiaSimpleClient(CustodiaHTTPClient): def del_secret(self, name): r = self.delete(name) r.raise_for_status() + + +class CustodiaKEMClient(CustodiaHTTPClient): + def __init__(self, *args, **kwargs): + super(CustodiaKEMClient, self).__init__(*args, **kwargs) + self._cli_signing_key = None + self._cli_decryption_key = None + self._srv_verifying_key = None + self._srv_encryption_key = None + self._sig_alg = None + self._enc_alg = None + + def _decode_key(self, key): + if key is None: + return None + elif isinstance(key, JWK): + return key + elif isinstance(key, dict): + return JWK(**key) + elif isinstance(key, str): + return JWK(**(json_decode(key))) + else: + raise TypeError("Invalid key type") + + def set_server_public_keys(self, sig, enc): + self._srv_verifying_key = self._decode_key(sig) + self._srv_encryption_key = self._decode_key(enc) + + def set_client_keys(self, sig, enc): + self._cli_signing_key = self._decode_key(sig) + self._cli_decryption_key = self._decode_key(enc) + + def set_algorithms(self, sig, enc): + self._sig_alg = sig + self._enc_alg = enc + + def _signing_algorithm(self, key): + if self._sig_alg is not None: + return self._sig_alg + elif key.key_type == 'RSA': + return 'RS256' + elif key.key_type == 'EC': + return 'ES256' + else: + raise ValueError('Unsupported key type') + + def _encryption_algorithm(self, key): + if self._enc_alg is not None: + return self._enc_alg + elif key.key_type == 'RSA': + return ('RSA1_5', 'A256CBC-HS512') + elif key.key_type == 'EC': + return ('ECDH-ES+A256KW', 'A256CBC-HS512') + else: + raise ValueError('Unsupported key type') + + def _kem_wrap(self, name, value): + if self._cli_signing_key is None: + raise KeyError("Client Signing key is not available") + if self._srv_encryption_key is None: + raise KeyError("Server Encryption key is not available") + sig_alg = self._signing_algorithm(self._cli_signing_key) + enc_alg = self._encryption_algorithm(self._srv_encryption_key) + return make_enc_kem(name, value, + self._cli_signing_key, sig_alg, + self._srv_encryption_key, enc_alg) + + def _kem_unwrap(self, name, message): + if message.get("type", None) != "kem": + raise TypeError("Invalid token type, expected 'kem', got %s" % ( + message.get("type", None),)) + + if self._cli_decryption_key is None: + raise KeyError("Client Decryption key is not available") + if self._srv_verifying_key is None: + raise KeyError("Server Verifying key is not available") + claims = decode_enc_kem(message["value"], + self._cli_decryption_key, + self._srv_verifying_key) + check_kem_claims(claims, name) + return claims + + def create_container(self, name): + cname = self.container_name(name) + message = self._kem_wrap(cname, None) + r = self.post(cname, json={"type": "kem", "value": message}) + r.raise_for_status() + self._kem_unwrap(cname, r.json()) + + def delete_container(self, name): + cname = self.container_name(name) + message = self._kem_wrap(cname, None) + r = self.delete(cname, json={"type": "kem", "value": message}) + r.raise_for_status() + self._kem_unwrap(cname, r.json()) + + def list_container(self, name): + return json_decode(self.get_secret(self.container_name(name))) + + def get_secret(self, name): + message = self._kem_wrap(name, None) + r = self.get(name, params={"type": "kem", "value": message}) + r.raise_for_status() + claims = self._kem_unwrap(name, r.json()) + return claims['value'] + + def set_secret(self, name, value): + message = self._kem_wrap(name, value) + r = self.put(name, json={"type": "kem", "value": message}) + r.raise_for_status() + self._kem_unwrap(name, r.json()) + + def del_secret(self, name): + message = self._kem_wrap(name, None) + r = self.delete(name, json={"type": "kem", "value": message}) + r.raise_for_status() + self._kem_unwrap(name, r.json()) -- cgit