summaryrefslogtreecommitdiffstats
path: root/custodia/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'custodia/client.py')
-rw-r--r--custodia/client.py124
1 files changed, 124 insertions, 0 deletions
diff --git a/custodia/client.py b/custodia/client.py
index 221080a..9647d68 100644
--- a/custodia/client.py
+++ b/custodia/client.py
@@ -2,6 +2,9 @@
import socket
+from jwcrypto.common import json_decode
+from jwcrypto.jwk import JWK
+
import requests
from requests.adapters import HTTPAdapter
@@ -10,6 +13,10 @@ from requests.compat import unquote, urlparse
from requests.packages.urllib3.connection import HTTPConnection
from requests.packages.urllib3.connectionpool import HTTPConnectionPool
+from custodia.message.kem import (
+ check_kem_claims, decode_enc_kem, make_enc_kem
+)
+
class HTTPUnixConnection(HTTPConnection):
@@ -150,3 +157,120 @@ class CustodiaSimpleClient(CustodiaHTTPClient):
def del_secret(self, name):
r = self.delete(name)
r.raise_for_status()
+
+
+class CustodiaKEMClient(CustodiaHTTPClient):
+ def __init__(self, *args, **kwargs):
+ super(CustodiaKEMClient, self).__init__(*args, **kwargs)
+ self._cli_signing_key = None
+ self._cli_decryption_key = None
+ self._srv_verifying_key = None
+ self._srv_encryption_key = None
+ self._sig_alg = None
+ self._enc_alg = None
+
+ def _decode_key(self, key):
+ if key is None:
+ return None
+ elif isinstance(key, JWK):
+ return key
+ elif isinstance(key, dict):
+ return JWK(**key)
+ elif isinstance(key, str):
+ return JWK(**(json_decode(key)))
+ else:
+ raise TypeError("Invalid key type")
+
+ def set_server_public_keys(self, sig, enc):
+ self._srv_verifying_key = self._decode_key(sig)
+ self._srv_encryption_key = self._decode_key(enc)
+
+ def set_client_keys(self, sig, enc):
+ self._cli_signing_key = self._decode_key(sig)
+ self._cli_decryption_key = self._decode_key(enc)
+
+ def set_algorithms(self, sig, enc):
+ self._sig_alg = sig
+ self._enc_alg = enc
+
+ def _signing_algorithm(self, key):
+ if self._sig_alg is not None:
+ return self._sig_alg
+ elif key.key_type == 'RSA':
+ return 'RS256'
+ elif key.key_type == 'EC':
+ return 'ES256'
+ else:
+ raise ValueError('Unsupported key type')
+
+ def _encryption_algorithm(self, key):
+ if self._enc_alg is not None:
+ return self._enc_alg
+ elif key.key_type == 'RSA':
+ return ('RSA1_5', 'A256CBC-HS512')
+ elif key.key_type == 'EC':
+ return ('ECDH-ES+A256KW', 'A256CBC-HS512')
+ else:
+ raise ValueError('Unsupported key type')
+
+ def _kem_wrap(self, name, value):
+ if self._cli_signing_key is None:
+ raise KeyError("Client Signing key is not available")
+ if self._srv_encryption_key is None:
+ raise KeyError("Server Encryption key is not available")
+ sig_alg = self._signing_algorithm(self._cli_signing_key)
+ enc_alg = self._encryption_algorithm(self._srv_encryption_key)
+ return make_enc_kem(name, value,
+ self._cli_signing_key, sig_alg,
+ self._srv_encryption_key, enc_alg)
+
+ def _kem_unwrap(self, name, message):
+ if message.get("type", None) != "kem":
+ raise TypeError("Invalid token type, expected 'kem', got %s" % (
+ message.get("type", None),))
+
+ if self._cli_decryption_key is None:
+ raise KeyError("Client Decryption key is not available")
+ if self._srv_verifying_key is None:
+ raise KeyError("Server Verifying key is not available")
+ claims = decode_enc_kem(message["value"],
+ self._cli_decryption_key,
+ self._srv_verifying_key)
+ check_kem_claims(claims, name)
+ return claims
+
+ def create_container(self, name):
+ cname = self.container_name(name)
+ message = self._kem_wrap(cname, None)
+ r = self.post(cname, json={"type": "kem", "value": message})
+ r.raise_for_status()
+ self._kem_unwrap(cname, r.json())
+
+ def delete_container(self, name):
+ cname = self.container_name(name)
+ message = self._kem_wrap(cname, None)
+ r = self.delete(cname, json={"type": "kem", "value": message})
+ r.raise_for_status()
+ self._kem_unwrap(cname, r.json())
+
+ def list_container(self, name):
+ return json_decode(self.get_secret(self.container_name(name)))
+
+ def get_secret(self, name):
+ message = self._kem_wrap(name, None)
+ r = self.get(name, params={"type": "kem", "value": message})
+ r.raise_for_status()
+ claims = self._kem_unwrap(name, r.json())
+ return claims['value']
+
+ def set_secret(self, name, value):
+ message = self._kem_wrap(name, value)
+ r = self.put(name, json={"type": "kem", "value": message})
+ r.raise_for_status()
+ self._kem_unwrap(name, r.json())
+
+ def del_secret(self, name):
+ message = self._kem_wrap(name, None)
+ r = self.delete(name, json={"type": "kem", "value": message})
+ r.raise_for_status()
+ self._kem_unwrap(name, r.json())