#!/usr/bin/python3.4 """Docker Auth Service in Python PoC for Docker Registry v2 authentication via central service https://docs.docker.com/registry/spec/auth/token/ https://docs.docker.com/registry/spec/auth/jwt/ Authors: Christian Heimes Copyright (C) 2015 Red Hat, Inc. All rights reserved. """ from http.server import HTTPServer, BaseHTTPRequestHandler import urllib.parse import time import os import base64 import json import struct import pprint from jwcrypto.common import base64url_encode from jwcrypto.jwk import JWK from jwcrypto.jws import JWS from jwcrypto.jwt import JWT from cryptography.hazmat.backends import default_backend from cryptography.x509 import load_pem_x509_certificate from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import PublicFormat ISSUER = "C=XX, L=Default City, O=Default Company Ltd, CN=auth.example.org" def n2b64(n, endian='big'): """Arbitrary number to base64 """ b = n.to_bytes((n.bit_length() + 7) // 8, endian) return base64.urlsafe_b64encode(b).rstrip(b'=').decode('ascii') def private_key(filename, kid, password=None): """Get JWK from private key PEM file """ with open(filename, 'rb') as f: data = f.read() pkey = default_backend().load_pem_private_key(data, password) priv = pkey.private_numbers() pub = priv.public_numbers return JWK( kty='RSA', kid=kid, use='sig', n=n2b64(pub.n), e=n2b64(pub.e), d=n2b64(priv.d), p=n2b64(priv.p), q=n2b64(priv.q), dp=n2b64(priv.dmp1), dq=n2b64(priv.dmq1), qi=n2b64(priv.iqmp), ) def get_fingerprint(pemfile): """docker/libtrust compatible SPKI fingerprint https://github.com/docker/libtrust/blob/9cbd2a1374f46905c68a4eb3694a130610adc62a/util.go#L194 keyIDFromCryptoKey """ with open(pemfile, 'rb') as f: data = f.read() x509 = load_pem_x509_certificate(data, default_backend()) pubkey = x509.public_key() public_bytes = pubkey.public_bytes( Encoding.DER, PublicFormat.SubjectPublicKeyInfo) digest = hashes.Hash(hashes.SHA256(), backend=default_backend()) digest.update(public_bytes) fingerprint = digest.finalize() # strip of last two bytes fingerprint = fingerprint[:30] # base32 encode and add ':' every four chars fb32 = base64.b32encode(fingerprint).decode('ascii').rstrip('=') result = [] for i, c in enumerate(fb32): result.append(c) if i % 4 == 3 and i != len(fb32) - 1: result.append(':') return ''.join(result) def bearer_token(claims): kid = get_fingerprint('ssl/server.pem') jwk = private_key('ssl/server.key', kid) header = { "typ": "JWT", "alg": 'RS256', "kid": kid } token = JWT(header, claims) token.make_signed_token(key) return token.serialize() class ClaimSet: def __init__(self, issuer, subject, audience): now = int(time.time()) self.claims = { 'access': [] 'aud': audience, 'exp': now + 600 'jti': base64url_encode(os.urandom(15)) 'iat': now, 'iss': issuer, 'nbf': now, 'sub': subject, } def add_access(self, typ, name, actions): self.claims['access'].append(dict( type=typ, name=name, actions=actions, )) def __repr__(self): return "<%s %r>" % (type(self).__name__, json.dumps(self.claims) class DockerAuthHandler(BaseHTTPRequestHandler): def do_GET(self): parts = urllib.parse.urlsplit(self.path) qs = urllib.parse.parse_qs(parts.query) if 0: pprint.pprint([vars(self), dict(self.headers), parts, qs]) subject = qs.get('account', [''])[0] audience = qs.get('service', [''])[0] scopes = qs.get('scope', []) claimset = ClaimSet(ISSUER, subject, audience) for scope in scopes: typ, name, actions = scope.split(':') actions = list(a.strip() for a in actions.split(',')) claimset.add_access(typ, name, actions) print(qs, claim) token = bearer_token(claimset.claims) token = b'{"token":"' + token + b'"}' self.send_response(200) self.send_header("Content-type", 'application/json') self.send_header("Content-Length", str(len(token))) self.end_headers() self.wfile.write(token) def main(): server_address = ('localhost', 5001) httpd = HTTPServer(server_address, DockerAuthHandler) httpd.serve_forever() def test(): kid = get_fingerprint('ssl/server.pem') jwk = private_key('ssl/server.key', kid) jws = JWS(b'data') jws.add_signature(jwk, alg="RS256") print(jws.serialize()) bearer_token(ClaimSet('iss', 'subject', 'audience')) if __name__ == '__main__': main()