From b09b92831f8b8cda26dcf01a04113ae6547b355c Mon Sep 17 00:00:00 2001 From: Jan Cholasta Date: Wed, 27 Apr 2016 15:41:12 +0200 Subject: vault: move client-side code to the module level Move client-side code from the vault class to module-level functions. This will make it possible to move the code to ipaclient without the vault class bits. https://fedorahosted.org/freeipa/ticket/4739 Reviewed-By: David Kupka --- ipalib/plugins/vault.py | 190 ++++++++++++++++++++++++------------------------ 1 file changed, 96 insertions(+), 94 deletions(-) (limited to 'ipalib') diff --git a/ipalib/plugins/vault.py b/ipalib/plugins/vault.py index f495aa1cb..8b1eef5f9 100644 --- a/ipalib/plugins/vault.py +++ b/ipalib/plugins/vault.py @@ -789,55 +789,99 @@ class vault(LDAPObject): return 'ipa:' + id - def get_new_password(self): - """ - Gets new password from user and verify it. - """ - while True: - password = getpass.getpass('New password: ').decode( - sys.stdin.encoding) - password2 = getpass.getpass('Verify password: ').decode( - sys.stdin.encoding) + def get_container_attribute(self, entry, options): + if options.get('raw', False): + return + container_dn = DN(self.container_dn, self.api.env.basedn) + if entry.dn.endswith(DN(('cn', 'services'), container_dn)): + entry['service'] = entry.dn[1]['cn'] + elif entry.dn.endswith(DN(('cn', 'shared'), container_dn)): + entry['shared'] = True + elif entry.dn.endswith(DN(('cn', 'users'), container_dn)): + entry['username'] = entry.dn[1]['cn'] - if password == password2: - return password - print(' ** Passwords do not match! **') +def get_new_password(): + """ + Gets new password from user and verify it. + """ + while True: + password = getpass.getpass('New password: ').decode( + sys.stdin.encoding) + password2 = getpass.getpass('Verify password: ').decode( + sys.stdin.encoding) - def get_existing_password(self): - """ - Gets existing password from user. - """ - return getpass.getpass('Password: ').decode(sys.stdin.encoding) + if password == password2: + return password - def generate_symmetric_key(self, password, salt): - """ - Generates symmetric key from password and salt. - """ - kdf = PBKDF2HMAC( - algorithm=hashes.SHA256(), - length=32, - salt=salt, - iterations=100000, + print(' ** Passwords do not match! **') + + +def get_existing_password(): + """ + Gets existing password from user. + """ + return getpass.getpass('Password: ').decode(sys.stdin.encoding) + + +def generate_symmetric_key(password, salt): + """ + Generates symmetric key from password and salt. + """ + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100000, + backend=default_backend() + ) + + return base64.b64encode(kdf.derive(password.encode('utf-8'))) + + +def encrypt(data, symmetric_key=None, public_key=None): + """ + Encrypts data with symmetric key or public key. + """ + if symmetric_key: + fernet = Fernet(symmetric_key) + return fernet.encrypt(data) + + elif public_key: + public_key_obj = load_pem_public_key( + data=public_key, backend=default_backend() ) + return public_key_obj.encrypt( + data, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA1()), + algorithm=hashes.SHA1(), + label=None + ) + ) - return base64.b64encode(kdf.derive(password.encode('utf-8'))) - def encrypt(self, data, symmetric_key=None, public_key=None): - """ - Encrypts data with symmetric key or public key. - """ - if symmetric_key: +def decrypt(data, symmetric_key=None, private_key=None): + """ + Decrypts data with symmetric key or public key. + """ + if symmetric_key: + try: fernet = Fernet(symmetric_key) - return fernet.encrypt(data) + return fernet.decrypt(data) + except InvalidToken: + raise errors.AuthenticationError( + message=_('Invalid credentials')) - elif public_key: - public_key_obj = load_pem_public_key( - data=public_key, + elif private_key: + try: + private_key_obj = load_pem_private_key( + data=private_key, + password=None, backend=default_backend() ) - return public_key_obj.encrypt( + return private_key_obj.decrypt( data, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), @@ -845,48 +889,9 @@ class vault(LDAPObject): label=None ) ) - - def decrypt(self, data, symmetric_key=None, private_key=None): - """ - Decrypts data with symmetric key or public key. - """ - if symmetric_key: - try: - fernet = Fernet(symmetric_key) - return fernet.decrypt(data) - except InvalidToken: - raise errors.AuthenticationError( - message=_('Invalid credentials')) - - elif private_key: - try: - private_key_obj = load_pem_private_key( - data=private_key, - password=None, - backend=default_backend() - ) - return private_key_obj.decrypt( - data, - padding.OAEP( - mgf=padding.MGF1(algorithm=hashes.SHA1()), - algorithm=hashes.SHA1(), - label=None - ) - ) - except AssertionError: - raise errors.AuthenticationError( - message=_('Invalid credentials')) - - def get_container_attribute(self, entry, options): - if options.get('raw', False): - return - container_dn = DN(self.container_dn, self.api.env.basedn) - if entry.dn.endswith(DN(('cn', 'services'), container_dn)): - entry['service'] = entry.dn[1]['cn'] - elif entry.dn.endswith(DN(('cn', 'shared'), container_dn)): - entry['shared'] = True - elif entry.dn.endswith(DN(('cn', 'users'), container_dn)): - entry['username'] = entry.dn[1]['cn'] + except AssertionError: + raise errors.AuthenticationError( + message=_('Invalid credentials')) @register() @@ -988,7 +993,7 @@ class vault_add(PKQuery, Local): password = password.rstrip('\n') else: - password = self.obj.get_new_password() + password = get_new_password() # generate vault salt options['ipavaultsalt'] = os.urandom(16) @@ -1592,9 +1597,9 @@ class vault_archive(PKQuery, Local): else: if override_password: - password = self.obj.get_new_password() + password = get_new_password() else: - password = self.obj.get_existing_password() + password = get_existing_password() if not override_password: # verify password by retrieving existing data @@ -1608,11 +1613,10 @@ class vault_archive(PKQuery, Local): salt = vault['ipavaultsalt'][0] # generate encryption key from vault password - encryption_key = self.obj.generate_symmetric_key( - password, salt) + encryption_key = generate_symmetric_key(password, salt) # encrypt data with encryption key - data = self.obj.encrypt(data, symmetric_key=encryption_key) + data = encrypt(data, symmetric_key=encryption_key) encrypted_key = None @@ -1624,11 +1628,10 @@ class vault_archive(PKQuery, Local): encryption_key = base64.b64encode(os.urandom(32)) # encrypt data with encryption key - data = self.obj.encrypt(data, symmetric_key=encryption_key) + data = encrypt(data, symmetric_key=encryption_key) # encrypt encryption key with public key - encrypted_key = self.obj.encrypt( - encryption_key, public_key=public_key) + encrypted_key = encrypt(encryption_key, public_key=public_key) else: raise errors.ValidationError( @@ -1918,13 +1921,13 @@ class vault_retrieve(PKQuery, Local): password = password.rstrip('\n') else: - password = self.obj.get_existing_password() + password = get_existing_password() # generate encryption key from password - encryption_key = self.obj.generate_symmetric_key(password, salt) + encryption_key = generate_symmetric_key(password, salt) # decrypt data with encryption key - data = self.obj.decrypt(data, symmetric_key=encryption_key) + data = decrypt(data, symmetric_key=encryption_key) elif vault_type == u'asymmetric': @@ -1947,11 +1950,10 @@ class vault_retrieve(PKQuery, Local): error=_('Missing vault private key')) # decrypt encryption key with private key - encryption_key = self.obj.decrypt( - encrypted_key, private_key=private_key) + encryption_key = decrypt(encrypted_key, private_key=private_key) # decrypt data with encryption key - data = self.obj.decrypt(data, symmetric_key=encryption_key) + data = decrypt(data, symmetric_key=encryption_key) else: raise errors.ValidationError( -- cgit