1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
# Copyright (C) 2015 IPA Project Contributors, see COPYING for license
from __future__ import print_function, absolute_import
import contextlib
import os
import secrets
from base64 import b64encode
# pylint: disable=relative-import
from custodia.message.kem import KEMClient, KEY_USAGE_SIG, KEY_USAGE_ENC
# pylint: enable=relative-import
from jwcrypto.common import json_decode
from jwcrypto.jwk import JWK
from ipalib.krb_utils import krb5_format_service_principal_name
from ipaserver.secrets.kem import IPAKEMKeys
from ipaserver.secrets.store import IPASecStore
from ipaplatform.paths import paths
import gssapi
import requests
@contextlib.contextmanager
def ccache_env(ccache):
"""Temporarily set KRB5CCNAME environment variable
"""
orig_ccache = os.environ.get('KRB5CCNAME')
os.environ['KRB5CCNAME'] = ccache
try:
yield
finally:
os.environ.pop('KRB5CCNAME', None)
if orig_ccache is not None:
os.environ['KRB5CCNAME'] = orig_ccache
class CustodiaClient:
def __init__(self, client_service, keyfile, keytab, server, realm,
ldap_uri=None, auth_type=None):
if client_service.endswith(realm) or "@" not in client_service:
raise ValueError(
"Client service name must be a GSS name (service@host), "
"not '{}'.".format(client_service)
)
self.client_service = client_service
self.keytab = keytab
self.server = server
self.realm = realm
self.ldap_uri = ldap_uri
self.auth_type = auth_type
self.service_name = gssapi.Name(
'HTTP@{}'.format(server), gssapi.NameType.hostbased_service
)
self.keystore = IPASecStore()
# use in-process MEMORY ccache. Handler process don't need a TGT.
self.ccache = 'MEMORY:Custodia_{}'.format(secrets.token_hex())
with ccache_env(self.ccache):
# Init creds immediately to make sure they are valid. Creds
# can also be re-inited by _auth_header to avoid expiry.
self.creds = self._init_creds()
self.ikk = IPAKEMKeys(
{'server_keys': keyfile, 'ldap_uri': ldap_uri}
)
self.kemcli = KEMClient(
self._server_keys(), self._client_keys()
)
def _client_keys(self):
return self.ikk.server_keys
def _server_keys(self):
principal = krb5_format_service_principal_name(
'host', self.server, self.realm
)
sk = JWK(**json_decode(self.ikk.find_key(principal, KEY_USAGE_SIG)))
ek = JWK(**json_decode(self.ikk.find_key(principal, KEY_USAGE_ENC)))
return sk, ek
def _init_creds(self):
name = gssapi.Name(
self.client_service, gssapi.NameType.hostbased_service
)
store = {
'client_keytab': self.keytab,
'ccache': self.ccache
}
return gssapi.Credentials(name=name, store=store, usage='initiate')
def _auth_header(self):
if self.creds.lifetime < 300:
self.creds = self._init_creds()
ctx = gssapi.SecurityContext(
name=self.service_name,
creds=self.creds
)
authtok = ctx.step()
return {'Authorization': 'Negotiate %s' % b64encode(
authtok).decode('ascii')}
def fetch_key(self, keyname, store=True):
# Prepare URL
url = 'https://%s/ipa/keys/%s' % (self.server, keyname)
# Prepare signed/encrypted request
encalg = ('RSA-OAEP', 'A256CBC-HS512')
request = self.kemcli.make_request(keyname, encalg=encalg)
# Prepare Authentication header
headers = self._auth_header()
# Perform request
r = requests.get(
url, headers=headers,
verify=paths.IPA_CA_CRT,
params={'type': 'kem', 'value': request}
)
r.raise_for_status()
reply = r.json()
if 'type' not in reply or reply['type'] != 'kem':
raise RuntimeError('Invlid JSON response type')
value = self.kemcli.parse_reply(keyname, reply['value'])
if store:
self.keystore.set('keys/%s' % keyname, value)
else:
return value
return None
|