diff options
Diffstat (limited to 'ipapython/p11helper.py')
| -rw-r--r-- | ipapython/p11helper.py | 1772 |
1 files changed, 0 insertions, 1772 deletions
diff --git a/ipapython/p11helper.py b/ipapython/p11helper.py deleted file mode 100644 index 5963c6d71..000000000 --- a/ipapython/p11helper.py +++ /dev/null @@ -1,1772 +0,0 @@ -# -# Copyright (C) 2014 FreeIPA Contributors see COPYING for license -# - -import random -import ctypes.util -import binascii - -import six -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa -from cffi import FFI - -if six.PY3: - unicode = str - - -_ffi = FFI() - -_ffi.cdef(''' -/* p11-kit/pkcs11.h */ - -typedef unsigned long CK_FLAGS; - -struct _CK_VERSION -{ - unsigned char major; - unsigned char minor; -}; - -typedef unsigned long CK_SLOT_ID; - -typedef unsigned long CK_SESSION_HANDLE; - -typedef unsigned long CK_USER_TYPE; - -typedef unsigned long CK_OBJECT_HANDLE; - -typedef unsigned long CK_OBJECT_CLASS; - -typedef unsigned long CK_KEY_TYPE; - -typedef unsigned long CK_ATTRIBUTE_TYPE; - -struct _CK_ATTRIBUTE -{ - CK_ATTRIBUTE_TYPE type; - void *pValue; - unsigned long ulValueLen; -}; - -typedef unsigned long CK_MECHANISM_TYPE; - -struct _CK_MECHANISM -{ - CK_MECHANISM_TYPE mechanism; - void *pParameter; - unsigned long ulParameterLen; -}; - -typedef unsigned long CK_RV; - -typedef ... *CK_NOTIFY; - -struct _CK_FUNCTION_LIST; - -typedef CK_RV (*CK_C_Initialize) (void *init_args); -typedef CK_RV (*CK_C_Finalize) (void *pReserved); -typedef ... *CK_C_GetInfo; -typedef ... *CK_C_GetFunctionList; -CK_RV C_GetFunctionList (struct _CK_FUNCTION_LIST **function_list); -typedef ... *CK_C_GetSlotList; -typedef ... *CK_C_GetSlotInfo; -typedef ... *CK_C_GetTokenInfo; -typedef ... *CK_C_WaitForSlotEvent; -typedef ... *CK_C_GetMechanismList; -typedef ... *CK_C_GetMechanismInfo; -typedef ... *CK_C_InitToken; -typedef ... *CK_C_InitPIN; -typedef ... *CK_C_SetPIN; -typedef CK_RV (*CK_C_OpenSession) (CK_SLOT_ID slotID, CK_FLAGS flags, - void *application, CK_NOTIFY notify, - CK_SESSION_HANDLE *session); -typedef CK_RV (*CK_C_CloseSession) (CK_SESSION_HANDLE session); -typedef ... *CK_C_CloseAllSessions; -typedef ... *CK_C_GetSessionInfo; -typedef ... *CK_C_GetOperationState; -typedef ... *CK_C_SetOperationState; -typedef CK_RV (*CK_C_Login) (CK_SESSION_HANDLE session, CK_USER_TYPE user_type, - unsigned char *pin, unsigned long pin_len); -typedef CK_RV (*CK_C_Logout) (CK_SESSION_HANDLE session); -typedef CK_RV (*CK_C_CreateObject) (CK_SESSION_HANDLE session, - struct _CK_ATTRIBUTE *templ, - unsigned long count, - CK_OBJECT_HANDLE *object); -typedef ... *CK_C_CopyObject; -typedef CK_RV (*CK_C_DestroyObject) (CK_SESSION_HANDLE session, - CK_OBJECT_HANDLE object); -typedef ... *CK_C_GetObjectSize; -typedef CK_RV (*CK_C_GetAttributeValue) (CK_SESSION_HANDLE session, - CK_OBJECT_HANDLE object, - struct _CK_ATTRIBUTE *templ, - unsigned long count); -typedef CK_RV (*CK_C_SetAttributeValue) (CK_SESSION_HANDLE session, - CK_OBJECT_HANDLE object, - struct _CK_ATTRIBUTE *templ, - unsigned long count); -typedef CK_RV (*CK_C_FindObjectsInit) (CK_SESSION_HANDLE session, - struct _CK_ATTRIBUTE *templ, - unsigned long count); -typedef CK_RV (*CK_C_FindObjects) (CK_SESSION_HANDLE session, - CK_OBJECT_HANDLE *object, - unsigned long max_object_count, - unsigned long *object_count); -typedef CK_RV (*CK_C_FindObjectsFinal) (CK_SESSION_HANDLE session); -typedef ... *CK_C_EncryptInit; -typedef ... *CK_C_Encrypt; -typedef ... *CK_C_EncryptUpdate; -typedef ... *CK_C_EncryptFinal; -typedef ... *CK_C_DecryptInit; -typedef ... *CK_C_Decrypt; -typedef ... *CK_C_DecryptUpdate; -typedef ... *CK_C_DecryptFinal; -typedef ... *CK_C_DigestInit; -typedef ... *CK_C_Digest; -typedef ... *CK_C_DigestUpdate; -typedef ... *CK_C_DigestKey; -typedef ... *CK_C_DigestFinal; -typedef ... *CK_C_SignInit; -typedef ... *CK_C_Sign; -typedef ... *CK_C_SignUpdate; -typedef ... *CK_C_SignFinal; -typedef ... *CK_C_SignRecoverInit; -typedef ... *CK_C_SignRecover; -typedef ... *CK_C_VerifyInit; -typedef ... *CK_C_Verify; -typedef ... *CK_C_VerifyUpdate; -typedef ... *CK_C_VerifyFinal; -typedef ... *CK_C_VerifyRecoverInit; -typedef ... *CK_C_VerifyRecover; -typedef ... *CK_C_DigestEncryptUpdate; -typedef ... *CK_C_DecryptDigestUpdate; -typedef ... *CK_C_SignEncryptUpdate; -typedef ... *CK_C_DecryptVerifyUpdate; -typedef CK_RV (*CK_C_GenerateKey) (CK_SESSION_HANDLE session, - struct _CK_MECHANISM *mechanism, - struct _CK_ATTRIBUTE *templ, - unsigned long count, - CK_OBJECT_HANDLE *key); -typedef CK_RV (*CK_C_GenerateKeyPair) (CK_SESSION_HANDLE session, - struct _CK_MECHANISM *mechanism, - struct _CK_ATTRIBUTE * - public_key_template, - unsigned long - public_key_attribute_count, - struct _CK_ATTRIBUTE * - private_key_template, - unsigned long - private_key_attribute_count, - CK_OBJECT_HANDLE *public_key, - CK_OBJECT_HANDLE *private_key); -typedef CK_RV (*CK_C_WrapKey) (CK_SESSION_HANDLE session, - struct _CK_MECHANISM *mechanism, - CK_OBJECT_HANDLE wrapping_key, - CK_OBJECT_HANDLE key, - unsigned char *wrapped_key, - unsigned long *wrapped_key_len); -typedef CK_RV (*CK_C_UnwrapKey) (CK_SESSION_HANDLE session, - struct _CK_MECHANISM *mechanism, - CK_OBJECT_HANDLE unwrapping_key, - unsigned char *wrapped_key, - unsigned long wrapped_key_len, - struct _CK_ATTRIBUTE *templ, - unsigned long attribute_count, - CK_OBJECT_HANDLE *key); -typedef ... *CK_C_DeriveKey; -typedef ... *CK_C_SeedRandom; -typedef ... *CK_C_GenerateRandom; -typedef ... *CK_C_GetFunctionStatus; -typedef ... *CK_C_CancelFunction; - -struct _CK_FUNCTION_LIST -{ - struct _CK_VERSION version; - CK_C_Initialize C_Initialize; - CK_C_Finalize C_Finalize; - CK_C_GetInfo C_GetInfo; - CK_C_GetFunctionList C_GetFunctionList; - CK_C_GetSlotList C_GetSlotList; - CK_C_GetSlotInfo C_GetSlotInfo; - CK_C_GetTokenInfo C_GetTokenInfo; - CK_C_GetMechanismList C_GetMechanismList; - CK_C_GetMechanismInfo C_GetMechanismInfo; - CK_C_InitToken C_InitToken; - CK_C_InitPIN C_InitPIN; - CK_C_SetPIN C_SetPIN; - CK_C_OpenSession C_OpenSession; - CK_C_CloseSession C_CloseSession; - CK_C_CloseAllSessions C_CloseAllSessions; - CK_C_GetSessionInfo C_GetSessionInfo; - CK_C_GetOperationState C_GetOperationState; - CK_C_SetOperationState C_SetOperationState; - CK_C_Login C_Login; - CK_C_Logout C_Logout; - CK_C_CreateObject C_CreateObject; - CK_C_CopyObject C_CopyObject; - CK_C_DestroyObject C_DestroyObject; - CK_C_GetObjectSize C_GetObjectSize; - CK_C_GetAttributeValue C_GetAttributeValue; - CK_C_SetAttributeValue C_SetAttributeValue; - CK_C_FindObjectsInit C_FindObjectsInit; - CK_C_FindObjects C_FindObjects; - CK_C_FindObjectsFinal C_FindObjectsFinal; - CK_C_EncryptInit C_EncryptInit; - CK_C_Encrypt C_Encrypt; - CK_C_EncryptUpdate C_EncryptUpdate; - CK_C_EncryptFinal C_EncryptFinal; - CK_C_DecryptInit C_DecryptInit; - CK_C_Decrypt C_Decrypt; - CK_C_DecryptUpdate C_DecryptUpdate; - CK_C_DecryptFinal C_DecryptFinal; - CK_C_DigestInit C_DigestInit; - CK_C_Digest C_Digest; - CK_C_DigestUpdate C_DigestUpdate; - CK_C_DigestKey C_DigestKey; - CK_C_DigestFinal C_DigestFinal; - CK_C_SignInit C_SignInit; - CK_C_Sign C_Sign; - CK_C_SignUpdate C_SignUpdate; - CK_C_SignFinal C_SignFinal; - CK_C_SignRecoverInit C_SignRecoverInit; - CK_C_SignRecover C_SignRecover; - CK_C_VerifyInit C_VerifyInit; - CK_C_Verify C_Verify; - CK_C_VerifyUpdate C_VerifyUpdate; - CK_C_VerifyFinal C_VerifyFinal; - CK_C_VerifyRecoverInit C_VerifyRecoverInit; - CK_C_VerifyRecover C_VerifyRecover; - CK_C_DigestEncryptUpdate C_DigestEncryptUpdate; - CK_C_DecryptDigestUpdate C_DecryptDigestUpdate; - CK_C_SignEncryptUpdate C_SignEncryptUpdate; - CK_C_DecryptVerifyUpdate C_DecryptVerifyUpdate; - CK_C_GenerateKey C_GenerateKey; - CK_C_GenerateKeyPair C_GenerateKeyPair; - CK_C_WrapKey C_WrapKey; - CK_C_UnwrapKey C_UnwrapKey; - CK_C_DeriveKey C_DeriveKey; - CK_C_SeedRandom C_SeedRandom; - CK_C_GenerateRandom C_GenerateRandom; - CK_C_GetFunctionStatus C_GetFunctionStatus; - CK_C_CancelFunction C_CancelFunction; - CK_C_WaitForSlotEvent C_WaitForSlotEvent; -}; - -typedef unsigned char CK_BYTE; -typedef unsigned char CK_UTF8CHAR; -typedef unsigned char CK_BBOOL; -typedef unsigned long int CK_ULONG; -typedef CK_BYTE *CK_BYTE_PTR; -typedef CK_ULONG *CK_ULONG_PTR; - -typedef CK_OBJECT_HANDLE *CK_OBJECT_HANDLE_PTR; - -typedef struct _CK_ATTRIBUTE CK_ATTRIBUTE; -typedef struct _CK_ATTRIBUTE *CK_ATTRIBUTE_PTR; - -typedef struct _CK_MECHANISM CK_MECHANISM; - -typedef struct _CK_FUNCTION_LIST *CK_FUNCTION_LIST_PTR; - - -/* p11-kit/uri.h */ - -typedef enum { - DUMMY /* ..., */ -} P11KitUriType; - -typedef ... P11KitUri; - -CK_ATTRIBUTE_PTR p11_kit_uri_get_attributes (P11KitUri *uri, - CK_ULONG *n_attrs); - -int p11_kit_uri_any_unrecognized (P11KitUri *uri); - -P11KitUri* p11_kit_uri_new (void); - -int p11_kit_uri_parse (const char *string, - P11KitUriType uri_type, - P11KitUri *uri); - -void p11_kit_uri_free (P11KitUri *uri); - - -/* p11helper.c */ - -struct ck_rsa_pkcs_oaep_params { - CK_MECHANISM_TYPE hash_alg; - unsigned long mgf; - unsigned long source; - void *source_data; - unsigned long source_data_len; -}; - -typedef struct ck_rsa_pkcs_oaep_params CK_RSA_PKCS_OAEP_PARAMS; -''') - -_libp11_kit = _ffi.dlopen(ctypes.util.find_library('p11-kit')) - - -# utility - -NULL = _ffi.NULL - -unsigned_char = _ffi.typeof('unsigned char') -unsigned_long = _ffi.typeof('unsigned long') - -sizeof = _ffi.sizeof - - -def new_ptr(ctype, *args): - return _ffi.new(_ffi.getctype(ctype, '*'), *args) - - -def new_array(ctype, *args): - return _ffi.new(_ffi.getctype(ctype, '[]'), *args) - - -# p11-kit/pkcs11.h - -CK_SESSION_HANDLE = _ffi.typeof('CK_SESSION_HANDLE') - -CK_OBJECT_HANDLE = _ffi.typeof('CK_OBJECT_HANDLE') - -CKU_USER = 1 - -CKF_RW_SESSION = 0x2 -CKF_SERIAL_SESSION = 0x4 - -CK_OBJECT_CLASS = _ffi.typeof('CK_OBJECT_CLASS') - -CKO_PUBLIC_KEY = 2 -CKO_PRIVATE_KEY = 3 -CKO_SECRET_KEY = 4 -CKO_VENDOR_DEFINED = 0x80000000 - -CK_KEY_TYPE = _ffi.typeof('CK_KEY_TYPE') - -CKK_RSA = 0 -CKK_AES = 0x1f - -CKA_CLASS = 0 -CKA_TOKEN = 1 -CKA_PRIVATE = 2 -CKA_LABEL = 3 -CKA_TRUSTED = 0x86 -CKA_KEY_TYPE = 0x100 -CKA_ID = 0x102 -CKA_SENSITIVE = 0x103 -CKA_ENCRYPT = 0x104 -CKA_DECRYPT = 0x105 -CKA_WRAP = 0x106 -CKA_UNWRAP = 0x107 -CKA_SIGN = 0x108 -CKA_SIGN_RECOVER = 0x109 -CKA_VERIFY = 0x10a -CKA_VERIFY_RECOVER = 0x10b -CKA_DERIVE = 0x10c -CKA_MODULUS = 0x120 -CKA_MODULUS_BITS = 0x121 -CKA_PUBLIC_EXPONENT = 0x122 -CKA_VALUE_LEN = 0x161 -CKA_EXTRACTABLE = 0x162 -CKA_LOCAL = 0x163 -CKA_NEVER_EXTRACTABLE = 0x164 -CKA_ALWAYS_SENSITIVE = 0x165 -CKA_MODIFIABLE = 0x170 -CKA_ALWAYS_AUTHENTICATE = 0x202 -CKA_WRAP_WITH_TRUSTED = 0x210 - -CKM_RSA_PKCS_KEY_PAIR_GEN = 0 -CKM_RSA_PKCS = 1 -CKM_RSA_PKCS_OAEP = 9 -CKM_SHA_1 = 0x220 -CKM_AES_KEY_GEN = 0x1080 - -CKR_OK = 0 -CKR_ATTRIBUTE_TYPE_INVALID = 0x12 -CKR_USER_NOT_LOGGED_IN = 0x101 - -CK_BYTE = _ffi.typeof('CK_BYTE') -CK_BBOOL = _ffi.typeof('CK_BBOOL') -CK_ULONG = _ffi.typeof('CK_ULONG') -CK_BYTE_PTR = _ffi.typeof('CK_BYTE_PTR') -CK_FALSE = 0 -CK_TRUE = 1 - -CK_OBJECT_HANDLE_PTR = _ffi.typeof('CK_OBJECT_HANDLE_PTR') - -CK_ATTRIBUTE = _ffi.typeof('CK_ATTRIBUTE') - -CK_MECHANISM = _ffi.typeof('CK_MECHANISM') - -CK_FUNCTION_LIST_PTR = _ffi.typeof('CK_FUNCTION_LIST_PTR') - -NULL_PTR = NULL - - -# p11-kit/uri.h - -P11_KIT_URI_OK = 0 - -P11_KIT_URI_FOR_OBJECT = 2 - -p11_kit_uri_get_attributes = _libp11_kit.p11_kit_uri_get_attributes - -p11_kit_uri_any_unrecognized = _libp11_kit.p11_kit_uri_any_unrecognized - -p11_kit_uri_new = _libp11_kit.p11_kit_uri_new - -p11_kit_uri_parse = _libp11_kit.p11_kit_uri_parse - -p11_kit_uri_free = _libp11_kit.p11_kit_uri_free - - -# library.c - -def loadLibrary(module): - """Load the PKCS#11 library""" - # Load PKCS #11 library - try: - if module: - # pylint: disable=no-member - pDynLib = _ffi.dlopen(module, _ffi.RTLD_NOW | _ffi.RTLD_LOCAL) - else: - raise Exception() - - except Exception: - # Failed to load the PKCS #11 library - raise - - # Retrieve the entry point for C_GetFunctionList - pGetFunctionList = pDynLib.C_GetFunctionList - if pGetFunctionList == NULL: - raise Exception() - - # Store the handle so we can dlclose it later - - return pGetFunctionList, pDynLib - - -# p11helper.c - -# compat TODO -CKM_AES_KEY_WRAP = 0x2109 -CKM_AES_KEY_WRAP_PAD = 0x210a - -# TODO -CKA_COPYABLE = 0x0017 - -CKG_MGF1_SHA1 = 0x00000001 - -CKZ_DATA_SPECIFIED = 0x00000001 - -CK_RSA_PKCS_OAEP_PARAMS = _ffi.typeof('CK_RSA_PKCS_OAEP_PARAMS') - - -true_ptr = new_ptr(CK_BBOOL, CK_TRUE) -false_ptr = new_ptr(CK_BBOOL, CK_FALSE) - -MAX_TEMPLATE_LEN = 32 - -# -# Constants -# -CONST_RSA_PKCS_OAEP_PARAMS_ptr = new_ptr(CK_RSA_PKCS_OAEP_PARAMS, dict( - hash_alg=CKM_SHA_1, - mgf=CKG_MGF1_SHA1, - source=CKZ_DATA_SPECIFIED, - source_data=NULL, - source_data_len=0, -)) - - -# -# ipap11helper Exceptions -# -class P11HelperException(Exception): - """parent class for all exceptions""" - pass -P11HelperException.__name__ = 'Exception' - - -class Error(P11HelperException): - """general error""" - pass - - -class NotFound(P11HelperException): - """key not found""" - pass - - -class DuplicationError(P11HelperException): - """key already exists""" - pass - - -######################################################################## -# Support functions -# - -def pyobj_to_bool(pyobj): - if pyobj: - return true_ptr - return false_ptr - - -def convert_py2bool(mapping): - return tuple(pyobj_to_bool(py_obj) for py_obj in mapping) - - -def string_to_pybytes_or_none(str, len): - if str == NULL: - return None - return _ffi.buffer(str, len)[:] - - -def unicode_to_char_array(unicode): - """ - Convert a unicode string to the utf8 encoded char array - :param unicode: input python unicode object - """ - try: - utf8_str = unicode.encode('utf-8') - except Exception: - raise Error("Unable to encode UTF-8") - try: - result = new_array(unsigned_char, utf8_str) - except Exception: - raise Error("Unable to get bytes from string") - l = len(utf8_str) - return result, l - - -def char_array_to_unicode(array, l): - """ - Convert utf-8 encoded char array to unicode object - """ - return _ffi.buffer(array, l)[:].decode('utf-8') - - -def int_to_bytes(value): - try: - return binascii.unhexlify('{0:x}'.format(value)) - except (TypeError, binascii.Error): - return binascii.unhexlify('0{0:x}'.format(value)) - - -def bytes_to_int(value): - return int(binascii.hexlify(value), 16) - - -def check_return_value(rv, message): - """ - Tests result value of pkc11 operations - """ - if rv != CKR_OK: - try: - errmsg = "Error at %s: 0x%x\n" % (message, rv) - except Exception: - raise Error("An error occured during error message generation. " - "Please report this problem. Developers will use " - "a crystal ball to find out the root cause.") - else: - raise Error(errmsg) - - -def _fill_template_from_parts(attr, template_len, id, id_len, label, label_len, - class_, cka_wrap, cka_unwrap): - """ - Fill template structure with pointers to attributes passed as independent - variables. - Variables with NULL values will be omitted from template. - - @warning input variables should not be modified when template is in use - """ - cnt = 0 - if label != NULL: - attr[0].type = CKA_LABEL - attr[0].pValue = label - attr[0].ulValueLen = label_len - attr += 1 - cnt += 1 - assert cnt < template_len[0] - if id != NULL: - attr[0].type = CKA_ID - attr[0].pValue = id - attr[0].ulValueLen = id_len - attr += 1 - cnt += 1 - assert cnt < template_len[0] - if cka_wrap != NULL: - attr[0].type = CKA_WRAP - attr[0].pValue = cka_wrap - attr[0].ulValueLen = sizeof(CK_BBOOL) - attr += 1 - cnt += 1 - assert cnt < template_len[0] - if cka_unwrap != NULL: - attr[0].type = CKA_UNWRAP - attr[0].pValue = cka_unwrap - attr[0].ulValueLen = sizeof(CK_BBOOL) - attr += 1 - cnt += 1 - assert cnt < template_len[0] - - if class_ != NULL: - attr[0].type = CKA_CLASS - attr[0].pValue = class_ - attr[0].ulValueLen = sizeof(CK_OBJECT_CLASS) - attr += 1 - cnt += 1 - assert cnt < template_len[0] - template_len[0] = cnt - - -def _parse_uri(uri_str): - """ - Parse string to P11-kit representation of PKCS#11 URI. - """ - uri = p11_kit_uri_new() - if not uri: - raise Error("Cannot initialize URI parser") - - try: - result = p11_kit_uri_parse(uri_str, P11_KIT_URI_FOR_OBJECT, uri) - if result != P11_KIT_URI_OK: - raise Error("Cannot parse URI") - - if p11_kit_uri_any_unrecognized(uri): - raise Error("PKCS#11 URI contains unsupported attributes") - except Error: - p11_kit_uri_free(uri) - raise - - return uri - - -def _set_wrapping_mech_parameters(mech_type, mech): - """ - Function set default param values for wrapping mechanism - :param mech_type: mechanism type - :param mech: filled structure with params based on mech type - - Warning: do not dealloc param values, it is static variables - """ - if mech_type in (CKM_RSA_PKCS, CKM_AES_KEY_WRAP, CKM_AES_KEY_WRAP_PAD): - mech.pParameter = NULL - mech.ulParameterLen = 0 - elif mech_type == CKM_RSA_PKCS_OAEP: - # Use the same configuration as openSSL - # https://www.openssl.org/docs/crypto/RSA_public_encrypt.html - mech.pParameter = CONST_RSA_PKCS_OAEP_PARAMS_ptr - mech.ulParameterLen = sizeof(CK_RSA_PKCS_OAEP_PARAMS) - else: - raise Error("Unsupported wrapping mechanism") - mech.mechanism = mech_type - - -######################################################################## -# P11_Helper object -# -class P11_Helper(object): - @property - def p11(self): - return self.p11_ptr[0] - - @property - def session(self): - return self.session_ptr[0] - - def _find_key(self, template, template_len): - """ - Find keys matching specified template. - Function returns list of key handles via objects parameter. - - :param template: PKCS#11 template for attribute matching - """ - result_objects = [] - result_object_ptr = new_ptr(CK_OBJECT_HANDLE) - objectCount_ptr = new_ptr(CK_ULONG) - - rv = self.p11.C_FindObjectsInit(self.session, template, template_len) - check_return_value(rv, "Find key init") - - rv = self.p11.C_FindObjects(self.session, result_object_ptr, 1, - objectCount_ptr) - check_return_value(rv, "Find key") - - while objectCount_ptr[0] > 0: - result_objects.append(result_object_ptr[0]) - - rv = self.p11.C_FindObjects(self.session, result_object_ptr, 1, - objectCount_ptr) - check_return_value(rv, "Check for duplicated key") - - rv = self.p11.C_FindObjectsFinal(self.session) - check_return_value(rv, "Find objects final") - - return result_objects - - def _id_exists(self, id, id_len, class_): - """ - Test if object with specified label, id and class exists - - :param id: key ID, (if value is NULL, will not be used to find key) - :param id_len: key ID length - :param class_ key: class - - :return: True if object was found, False if object doesnt exists - """ - object_count_ptr = new_ptr(CK_ULONG) - result_object_ptr = new_ptr(CK_OBJECT_HANDLE) - class_ptr = new_ptr(CK_OBJECT_CLASS, class_) - class_sec_ptr = new_ptr(CK_OBJECT_CLASS, CKO_SECRET_KEY) - - template_pub_priv = new_array(CK_ATTRIBUTE, ( - (CKA_ID, id, id_len), - (CKA_CLASS, class_ptr, sizeof(CK_OBJECT_CLASS)), - )) - - template_sec = new_array(CK_ATTRIBUTE, ( - (CKA_ID, id, id_len), - (CKA_CLASS, class_sec_ptr, sizeof(CK_OBJECT_CLASS)), - )) - - template_id = new_array(CK_ATTRIBUTE, ( - (CKA_ID, id, id_len), - )) - - # - # Only one secret key with same ID is allowed - # - if class_ == CKO_SECRET_KEY: - rv = self.p11.C_FindObjectsInit(self.session, template_id, 1) - check_return_value(rv, "id, label exists init") - - rv = self.p11.C_FindObjects(self.session, result_object_ptr, 1, - object_count_ptr) - check_return_value(rv, "id, label exists") - - rv = self.p11.C_FindObjectsFinal(self.session) - check_return_value(rv, "id, label exists final") - - if object_count_ptr[0] > 0: - return True - return False - - # - # Public and private keys can share one ID, but - # - - # test if secret key with same ID exists - rv = self.p11.C_FindObjectsInit(self.session, template_sec, 2) - check_return_value(rv, "id, label exists init") - - rv = self.p11.C_FindObjects(self.session, result_object_ptr, 1, - object_count_ptr) - check_return_value(rv, "id, label exists") - - rv = self.p11.C_FindObjectsFinal(self.session) - check_return_value(rv, "id, label exists final") - - if object_count_ptr[0] > 0: - # object found - return True - - # test if pub/private key with same id exists - object_count_ptr[0] = 0 - - rv = self.p11.C_FindObjectsInit(self.session, template_pub_priv, 2) - check_return_value(rv, "id, label exists init") - - rv = self.p11.C_FindObjects(self.session, result_object_ptr, 1, - object_count_ptr) - check_return_value(rv, "id, label exists") - - rv = self.p11.C_FindObjectsFinal(self.session) - check_return_value(rv, "id, label exists final") - - if object_count_ptr[0] > 0: - # Object found - return True - - # Object not found - return False - - def __init__(self, slot, user_pin, library_path): - self.p11_ptr = new_ptr(CK_FUNCTION_LIST_PTR) - self.session_ptr = new_ptr(CK_SESSION_HANDLE) - - self.slot = 0 - self.session_ptr[0] = 0 - self.p11_ptr[0] = NULL - self.module_handle = None - - # Parse method args - if isinstance(user_pin, unicode): - user_pin = user_pin.encode() - self.slot = slot - - try: - pGetFunctionList, module_handle = loadLibrary(library_path) - except Exception: - raise Error("Could not load the library.") - - self.module_handle = module_handle - - # - # Load the function list - # - pGetFunctionList(self.p11_ptr) - - # - # Initialize - # - rv = self.p11.C_Initialize(NULL) - check_return_value(rv, "initialize") - - # - # Start session - # - rv = self.p11.C_OpenSession(self.slot, - CKF_SERIAL_SESSION | CKF_RW_SESSION, NULL, - NULL, self.session_ptr) - check_return_value(rv, "open session") - - # - # Login - # - rv = self.p11.C_Login(self.session, CKU_USER, user_pin, len(user_pin)) - check_return_value(rv, "log in") - - def finalize(self): - """ - Finalize operations with pkcs11 library - """ - if self.p11 == NULL: - return - - # - # Logout - # - rv = self.p11.C_Logout(self.session) - check_return_value(rv, "log out") - - # - # End session - # - rv = self.p11.C_CloseSession(self.session) - check_return_value(rv, "close session") - - # - # Finalize - # - self.p11.C_Finalize(NULL) - - self.p11_ptr[0] = NULL - self.session_ptr[0] = 0 - self.slot = 0 - self.module_handle = None - - ################################################################# - # Methods working with keys - # - - def generate_master_key(self, label, id, key_length=16, cka_copyable=True, - cka_decrypt=False, cka_derive=False, - cka_encrypt=False, cka_extractable=True, - cka_modifiable=True, cka_private=True, - cka_sensitive=True, cka_sign=False, - cka_unwrap=True, cka_verify=False, cka_wrap=True, - cka_wrap_with_trusted=False): - """ - Generate master key - - :return: master key handle - """ - if isinstance(id, unicode): - id = id.encode() - - attrs = ( - cka_copyable, - cka_decrypt, - cka_derive, - cka_encrypt, - cka_extractable, - cka_modifiable, - cka_private, - cka_sensitive, - cka_sign, - cka_unwrap, - cka_verify, - cka_wrap, - cka_wrap_with_trusted, - ) - - key_length_ptr = new_ptr(CK_ULONG, key_length) - master_key_ptr = new_ptr(CK_OBJECT_HANDLE) - - label_unicode = label - id_length = len(id) - id_ = new_array(CK_BYTE, id) - # TODO check long overflow - - label, label_length = unicode_to_char_array(label_unicode) - - # TODO param? - mechanism_ptr = new_ptr(CK_MECHANISM, ( - CKM_AES_KEY_GEN, NULL_PTR, 0 - )) - - if key_length not in (16, 24, 32): - raise Error("generate_master_key: key length allowed values are: " - "16, 24 and 32") - - if self._id_exists(id_, id_length, CKO_SECRET_KEY): - raise DuplicationError("Master key with same ID already exists") - - # Process keyword boolean arguments - (_cka_copyable_ptr, cka_decrypt_ptr, cka_derive_ptr, cka_encrypt_ptr, - cka_extractable_ptr, cka_modifiable_ptr, cka_private_ptr, - cka_sensitive_ptr, cka_sign_ptr, cka_unwrap_ptr, cka_verify_ptr, - cka_wrap_ptr, cka_wrap_with_trusted_ptr,) = convert_py2bool(attrs) - - symKeyTemplate = new_array(CK_ATTRIBUTE, ( - (CKA_ID, id_, id_length), - (CKA_LABEL, label, label_length), - (CKA_TOKEN, true_ptr, sizeof(CK_BBOOL)), - (CKA_VALUE_LEN, key_length_ptr, sizeof(CK_ULONG)), - # TODO Softhsm doesn't support it - # (CKA_COPYABLE, cka_copyable_ptr, sizeof(CK_BBOOL)), - (CKA_DECRYPT, cka_decrypt_ptr, sizeof(CK_BBOOL)), - (CKA_DERIVE, cka_derive_ptr, sizeof(CK_BBOOL)), - (CKA_ENCRYPT, cka_encrypt_ptr, sizeof(CK_BBOOL)), - (CKA_EXTRACTABLE, cka_extractable_ptr, sizeof(CK_BBOOL)), - (CKA_MODIFIABLE, cka_modifiable_ptr, sizeof(CK_BBOOL)), - (CKA_PRIVATE, cka_private_ptr, sizeof(CK_BBOOL)), - (CKA_SENSITIVE, cka_sensitive_ptr, sizeof(CK_BBOOL)), - (CKA_SIGN, cka_sign_ptr, sizeof(CK_BBOOL)), - (CKA_UNWRAP, cka_unwrap_ptr, sizeof(CK_BBOOL)), - (CKA_VERIFY, cka_verify_ptr, sizeof(CK_BBOOL)), - (CKA_WRAP, cka_wrap_ptr, sizeof(CK_BBOOL)), - (CKA_WRAP_WITH_TRUSTED, cka_wrap_with_trusted_ptr, - sizeof(CK_BBOOL)), - )) - - rv = self.p11.C_GenerateKey(self.session, mechanism_ptr, - symKeyTemplate, - (sizeof(symKeyTemplate) // - sizeof(CK_ATTRIBUTE)), master_key_ptr) - check_return_value(rv, "generate master key") - - return master_key_ptr[0] - - def generate_replica_key_pair(self, label, id, modulus_bits=2048, - pub_cka_copyable=True, pub_cka_derive=False, - pub_cka_encrypt=False, - pub_cka_modifiable=True, - pub_cka_private=True, pub_cka_trusted=False, - pub_cka_verify=False, - pub_cka_verify_recover=False, - pub_cka_wrap=True, - priv_cka_always_authenticate=False, - priv_cka_copyable=True, - priv_cka_decrypt=False, - priv_cka_derive=False, - priv_cka_extractable=False, - priv_cka_modifiable=True, - priv_cka_private=True, - priv_cka_sensitive=True, - priv_cka_sign=False, - priv_cka_sign_recover=False, - priv_cka_unwrap=True, - priv_cka_wrap_with_trusted=False): - """ - Generate replica keys - - :returns: tuple (public_key_handle, private_key_handle) - """ - if isinstance(id, unicode): - id = id.encode() - - attrs_pub = ( - pub_cka_copyable, - pub_cka_derive, - pub_cka_encrypt, - pub_cka_modifiable, - pub_cka_private, - pub_cka_trusted, - pub_cka_verify, - pub_cka_verify_recover, - pub_cka_wrap, - ) - - attrs_priv = ( - priv_cka_always_authenticate, - priv_cka_copyable, - priv_cka_decrypt, - priv_cka_derive, - priv_cka_extractable, - priv_cka_modifiable, - priv_cka_private, - priv_cka_sensitive, - priv_cka_sign, - priv_cka_sign_recover, - priv_cka_unwrap, - priv_cka_wrap_with_trusted, - ) - - label_unicode = label - id_ = new_array(CK_BYTE, id) - id_length = len(id) - - label, label_length = unicode_to_char_array(label_unicode) - - public_key_ptr = new_ptr(CK_OBJECT_HANDLE) - private_key_ptr = new_ptr(CK_OBJECT_HANDLE) - mechanism_ptr = new_ptr(CK_MECHANISM, - (CKM_RSA_PKCS_KEY_PAIR_GEN, NULL_PTR, 0)) - - if self._id_exists(id_, id_length, CKO_PRIVATE_KEY): - raise DuplicationError("Private key with same ID already exists") - - if self._id_exists(id_, id_length, CKO_PUBLIC_KEY): - raise DuplicationError("Public key with same ID already exists") - - modulus_bits_ptr = new_ptr(CK_ULONG, modulus_bits) - - # Process keyword boolean arguments - (_pub_cka_copyable_ptr, pub_cka_derive_ptr, pub_cka_encrypt_ptr, - pub_cka_modifiable_ptr, pub_cka_private_ptr, pub_cka_trusted_ptr, - pub_cka_verify_ptr, pub_cka_verify_recover_ptr, pub_cka_wrap_ptr, - ) = convert_py2bool(attrs_pub) - (priv_cka_always_authenticate_ptr, _priv_cka_copyable_ptr, - priv_cka_decrypt_ptr, priv_cka_derive_ptr, priv_cka_extractable_ptr, - priv_cka_modifiable_ptr, priv_cka_private_ptr, priv_cka_sensitive_ptr, - priv_cka_sign_ptr, _priv_cka_sign_recover_ptr, priv_cka_unwrap_ptr, - priv_cka_wrap_with_trusted_ptr,) = convert_py2bool(attrs_priv) - - # 65537 (RFC 6376 section 3.3.1) - public_exponent = new_array(CK_BYTE, (1, 0, 1)) - publicKeyTemplate = new_array(CK_ATTRIBUTE, ( - (CKA_ID, id_, id_length), - (CKA_LABEL, label, label_length), - (CKA_TOKEN, true_ptr, sizeof(CK_BBOOL)), - (CKA_MODULUS_BITS, modulus_bits_ptr, sizeof(CK_ULONG)), - (CKA_PUBLIC_EXPONENT, public_exponent, 3), - # TODO Softhsm doesn't support it - # (CKA_COPYABLE, pub_cka_copyable_p, sizeof(CK_BBOOL)), - (CKA_DERIVE, pub_cka_derive_ptr, sizeof(CK_BBOOL)), - (CKA_ENCRYPT, pub_cka_encrypt_ptr, sizeof(CK_BBOOL)), - (CKA_MODIFIABLE, pub_cka_modifiable_ptr, sizeof(CK_BBOOL)), - (CKA_PRIVATE, pub_cka_private_ptr, sizeof(CK_BBOOL)), - (CKA_TRUSTED, pub_cka_trusted_ptr, sizeof(CK_BBOOL)), - (CKA_VERIFY, pub_cka_verify_ptr, sizeof(CK_BBOOL)), - (CKA_VERIFY_RECOVER, pub_cka_verify_recover_ptr, sizeof(CK_BBOOL)), - (CKA_WRAP, pub_cka_wrap_ptr, sizeof(CK_BBOOL)), - )) - - privateKeyTemplate = new_array(CK_ATTRIBUTE, ( - (CKA_ID, id_, id_length), - (CKA_LABEL, label, label_length), - (CKA_TOKEN, true_ptr, sizeof(CK_BBOOL)), - (CKA_ALWAYS_AUTHENTICATE, priv_cka_always_authenticate_ptr, - sizeof(CK_BBOOL)), - # TODO Softhsm doesn't support it - # (CKA_COPYABLE, priv_cka_copyable_ptr, sizeof(CK_BBOOL)), - (CKA_DECRYPT, priv_cka_decrypt_ptr, sizeof(CK_BBOOL)), - (CKA_DERIVE, priv_cka_derive_ptr, sizeof(CK_BBOOL)), - (CKA_EXTRACTABLE, priv_cka_extractable_ptr, sizeof(CK_BBOOL)), - (CKA_MODIFIABLE, priv_cka_modifiable_ptr, sizeof(CK_BBOOL)), - (CKA_PRIVATE, priv_cka_private_ptr, sizeof(CK_BBOOL)), - (CKA_SENSITIVE, priv_cka_sensitive_ptr, sizeof(CK_BBOOL)), - (CKA_SIGN, priv_cka_sign_ptr, sizeof(CK_BBOOL)), - (CKA_SIGN_RECOVER, priv_cka_sign_ptr, sizeof(CK_BBOOL)), - (CKA_UNWRAP, priv_cka_unwrap_ptr, sizeof(CK_BBOOL)), - (CKA_WRAP_WITH_TRUSTED, priv_cka_wrap_with_trusted_ptr, - sizeof(CK_BBOOL)), - )) - - rv = self.p11.C_GenerateKeyPair(self.session, mechanism_ptr, - publicKeyTemplate, - (sizeof(publicKeyTemplate) // - sizeof(CK_ATTRIBUTE)), - privateKeyTemplate, - (sizeof(privateKeyTemplate) // - sizeof(CK_ATTRIBUTE)), - public_key_ptr, - private_key_ptr) - check_return_value(rv, "generate key pair") - - return public_key_ptr[0], private_key_ptr[0] - - def find_keys(self, objclass=CKO_VENDOR_DEFINED, label=None, id=None, - cka_wrap=None, cka_unwrap=None, uri=None): - """ - Find key - """ - if isinstance(id, unicode): - id = id.encode() - if isinstance(uri, unicode): - uri = uri.encode() - - class_ = objclass - class_ptr = new_ptr(CK_OBJECT_CLASS, class_) - ckawrap = NULL - ckaunwrap = NULL - if id is not None: - id_ = new_array(CK_BYTE, id) - id_length = len(id) - else: - id_ = NULL - id_length = 0 - label_unicode, label = label, NULL - cka_wrap_bool = cka_wrap - cka_unwrap_bool = cka_unwrap - label_length = 0 - uri_str = uri - uri = NULL - template = new_array(CK_ATTRIBUTE, MAX_TEMPLATE_LEN) - template_len_ptr = new_ptr(CK_ULONG, MAX_TEMPLATE_LEN) - - # TODO check long overflow - - if label_unicode is not None: - label, label_length = unicode_to_char_array(label_unicode) - - if cka_wrap_bool is not None: - if cka_wrap_bool: - ckawrap = true_ptr - else: - ckawrap = false_ptr - - if cka_unwrap_bool is not None: - if cka_unwrap_bool: - ckaunwrap = true_ptr - else: - ckaunwrap = false_ptr - - if class_ == CKO_VENDOR_DEFINED: - class_ptr = NULL - - try: - if uri_str is None: - _fill_template_from_parts(template, template_len_ptr, id_, - id_length, label, label_length, - class_ptr, ckawrap, ckaunwrap) - else: - uri = _parse_uri(uri_str) - template = (p11_kit_uri_get_attributes(uri, template_len_ptr)) - # Do not deallocate URI while you are using the template. - # Template contains pointers to values inside URI! - - result_list = self._find_key(template, template_len_ptr[0]) - - return result_list - finally: - if uri != NULL: - p11_kit_uri_free(uri) - - def delete_key(self, key_handle): - """ - delete key - """ - # TODO check long overflow - rv = self.p11.C_DestroyObject(self.session, key_handle) - check_return_value(rv, "object deletion") - - def _export_RSA_public_key(self, object): - """ - export RSA public key - """ - class_ptr = new_ptr(CK_OBJECT_CLASS, CKO_PUBLIC_KEY) - key_type_ptr = new_ptr(CK_KEY_TYPE, CKK_RSA) - - obj_template = new_array(CK_ATTRIBUTE, ( - (CKA_MODULUS, NULL_PTR, 0), - (CKA_PUBLIC_EXPONENT, NULL_PTR, 0), - (CKA_CLASS, class_ptr, sizeof(CK_OBJECT_CLASS)), - (CKA_KEY_TYPE, key_type_ptr, sizeof(CK_KEY_TYPE)), - )) - - rv = self.p11.C_GetAttributeValue(self.session, object, obj_template, - (sizeof(obj_template) // - sizeof(CK_ATTRIBUTE))) - check_return_value(rv, "get RSA public key values - prepare") - - # Set proper size for attributes - modulus = new_array(CK_BYTE, - obj_template[0].ulValueLen * sizeof(CK_BYTE)) - obj_template[0].pValue = modulus - exponent = new_array(CK_BYTE, - obj_template[1].ulValueLen * sizeof(CK_BYTE)) - obj_template[1].pValue = exponent - - rv = self.p11.C_GetAttributeValue(self.session, object, obj_template, - (sizeof(obj_template) // - sizeof(CK_ATTRIBUTE))) - check_return_value(rv, "get RSA public key values") - - # Check if the key is RSA public key - if class_ptr[0] != CKO_PUBLIC_KEY: - raise Error("export_RSA_public_key: required public key class") - - if key_type_ptr[0] != CKK_RSA: - raise Error("export_RSA_public_key: required RSA key type") - - try: - n = bytes_to_int(string_to_pybytes_or_none( - modulus, obj_template[0].ulValueLen)) - except Exception: - raise Error("export_RSA_public_key: internal error: unable to " - "convert modulus") - - try: - e = bytes_to_int(string_to_pybytes_or_none( - exponent, obj_template[1].ulValueLen)) - except Exception: - raise Error("export_RSA_public_key: internal error: unable to " - "convert exponent") - - # set modulus and exponent - rsa_ = rsa.RSAPublicNumbers(e, n) - - try: - pkey = rsa_.public_key(default_backend()) - except Exception: - raise Error("export_RSA_public_key: internal error: " - "EVP_PKEY_set1_RSA failed") - - try: - ret = pkey.public_bytes( - format=serialization.PublicFormat.SubjectPublicKeyInfo, - encoding=serialization.Encoding.DER, - ) - except Exception: - ret = None - - return ret - - def export_public_key(self, key_handle): - """ - Export public key - - Export public key in SubjectPublicKeyInfo (RFC5280) DER encoded format - """ - object = key_handle - class_ptr = new_ptr(CK_OBJECT_CLASS, CKO_PUBLIC_KEY) - key_type_ptr = new_ptr(CK_KEY_TYPE, CKK_RSA) - # TODO check long overflow - - obj_template = new_array(CK_ATTRIBUTE, ( - (CKA_CLASS, class_ptr, sizeof(CK_OBJECT_CLASS)), - (CKA_KEY_TYPE, key_type_ptr, sizeof(CK_KEY_TYPE)), - )) - - rv = self.p11.C_GetAttributeValue(self.session, object, obj_template, - (sizeof(obj_template) // - sizeof(CK_ATTRIBUTE))) - check_return_value(rv, "export_public_key: get RSA public key values") - - if class_ptr[0] != CKO_PUBLIC_KEY: - raise Error("export_public_key: required public key class") - - if key_type_ptr[0] == CKK_RSA: - return self._export_RSA_public_key(object) - else: - raise Error("export_public_key: unsupported key type") - - def _import_RSA_public_key(self, label, label_length, id, id_length, pkey, - cka_copyable, cka_derive, cka_encrypt, - cka_modifiable, cka_private, cka_trusted, - cka_verify, cka_verify_recover, cka_wrap): - """ - Import RSA public key - """ - class_ptr = new_ptr(CK_OBJECT_CLASS, CKO_PUBLIC_KEY) - keyType_ptr = new_ptr(CK_KEY_TYPE, CKK_RSA) - cka_token = true_ptr - - if not isinstance(pkey, rsa.RSAPublicKey): - raise Error("Required RSA public key") - - rsa_ = pkey.public_numbers() - - # convert BIGNUM to binary array - modulus = new_array(CK_BYTE, int_to_bytes(rsa_.n)) - modulus_len = sizeof(modulus) - 1 - if modulus_len == 0: - raise Error("import_RSA_public_key: BN_bn2bin modulus error") - - exponent = new_array(CK_BYTE, int_to_bytes(rsa_.e)) - exponent_len = sizeof(exponent) - 1 - if exponent_len == 0: - raise Error("import_RSA_public_key: BN_bn2bin exponent error") - - template = new_array(CK_ATTRIBUTE, ( - (CKA_ID, id, id_length), - (CKA_CLASS, class_ptr, sizeof(CK_OBJECT_CLASS)), - (CKA_KEY_TYPE, keyType_ptr, sizeof(CK_KEY_TYPE)), - (CKA_TOKEN, cka_token, sizeof(CK_BBOOL)), - (CKA_LABEL, label, label_length), - (CKA_MODULUS, modulus, modulus_len), - (CKA_PUBLIC_EXPONENT, exponent, exponent_len), - # TODO Softhsm doesn't support it - # (CKA_COPYABLE, cka_copyable, sizeof(CK_BBOOL)), - (CKA_DERIVE, cka_derive, sizeof(CK_BBOOL)), - (CKA_ENCRYPT, cka_encrypt, sizeof(CK_BBOOL)), - (CKA_MODIFIABLE, cka_modifiable, sizeof(CK_BBOOL)), - (CKA_PRIVATE, cka_private, sizeof(CK_BBOOL)), - (CKA_TRUSTED, cka_trusted, sizeof(CK_BBOOL)), - (CKA_VERIFY, cka_verify, sizeof(CK_BBOOL)), - (CKA_VERIFY_RECOVER, cka_verify_recover, sizeof(CK_BBOOL)), - (CKA_WRAP, cka_wrap, sizeof(CK_BBOOL)), - )) - object_ptr = new_ptr(CK_OBJECT_HANDLE) - - rv = self.p11.C_CreateObject(self.session, template, - (sizeof(template) // - sizeof(CK_ATTRIBUTE)), object_ptr) - check_return_value(rv, "create public key object") - - return object_ptr[0] - - def import_public_key(self, label, id, data, cka_copyable=True, - cka_derive=False, cka_encrypt=False, - cka_modifiable=True, cka_private=True, - cka_trusted=False, cka_verify=True, - cka_verify_recover=True, cka_wrap=False): - """ - Import RSA public key - """ - if isinstance(id, unicode): - id = id.encode() - if isinstance(data, unicode): - data = data.encode() - - label_unicode = label - id_ = new_array(CK_BYTE, id) - id_length = len(id) - - attrs_pub = ( - cka_copyable, - cka_derive, - cka_encrypt, - cka_modifiable, - cka_private, - cka_trusted, - cka_verify, - cka_verify_recover, - cka_wrap, - ) - - label, label_length = unicode_to_char_array(label_unicode) - - if self._id_exists(id_, id_length, CKO_PUBLIC_KEY): - raise DuplicationError("Public key with same ID already exists") - - # Process keyword boolean arguments - (cka_copyable_ptr, cka_derive_ptr, cka_encrypt_ptr, cka_modifiable_ptr, - cka_private_ptr, cka_trusted_ptr, cka_verify_ptr, - cka_verify_recover_ptr, cka_wrap_ptr,) = convert_py2bool(attrs_pub) - - # decode from ASN1 DER - try: - pkey = serialization.load_der_public_key(data, default_backend()) - except Exception: - raise Error("import_public_key: d2i_PUBKEY error") - if isinstance(pkey, rsa.RSAPublicKey): - ret = self._import_RSA_public_key(label, label_length, id_, - id_length, pkey, - cka_copyable_ptr, - cka_derive_ptr, - cka_encrypt_ptr, - cka_modifiable_ptr, - cka_private_ptr, - cka_trusted_ptr, - cka_verify_ptr, - cka_verify_recover_ptr, - cka_wrap_ptr) - elif isinstance(pkey, dsa.DSAPublicKey): - raise Error("DSA is not supported") - elif isinstance(pkey, ec.EllipticCurvePublicKey): - raise Error("EC is not supported") - else: - raise Error("Unsupported key type") - - return ret - - def export_wrapped_key(self, key, wrapping_key, wrapping_mech): - """ - Export wrapped key - """ - object_key = key - object_wrapping_key = wrapping_key - wrapped_key_len_ptr = new_ptr(CK_ULONG, 0) - wrapping_mech_ptr = new_ptr(CK_MECHANISM, (wrapping_mech, NULL, 0)) - # currently we don't support parameter in mechanism - - # TODO check long overflow - # TODO export method - - # fill mech parameters - _set_wrapping_mech_parameters(wrapping_mech_ptr.mechanism, - wrapping_mech_ptr) - - rv = self.p11.C_WrapKey(self.session, wrapping_mech_ptr, - object_wrapping_key, object_key, NULL, - wrapped_key_len_ptr) - check_return_value(rv, "key wrapping: get buffer length") - - wrapped_key = new_array(CK_BYTE, wrapped_key_len_ptr[0]) - - rv = self.p11.C_WrapKey(self.session, wrapping_mech_ptr, - object_wrapping_key, object_key, wrapped_key, - wrapped_key_len_ptr) - check_return_value(rv, "key wrapping: wrapping") - - result = string_to_pybytes_or_none(wrapped_key, wrapped_key_len_ptr[0]) - - return result - - def import_wrapped_secret_key(self, label, id, data, unwrapping_key, - wrapping_mech, key_type, cka_copyable=True, - cka_decrypt=False, cka_derive=False, - cka_encrypt=False, cka_extractable=True, - cka_modifiable=True, cka_private=True, - cka_sensitive=True, cka_sign=False, - cka_unwrap=True, cka_verify=False, - cka_wrap=True, cka_wrap_with_trusted=False): - """ - Import wrapped secret key - """ - if isinstance(id, unicode): - id = id.encode() - if isinstance(data, unicode): - data = data.encode() - - wrapped_key = new_array(CK_BYTE, data) - wrapped_key_len = len(data) - unwrapping_key_object = unwrapping_key - unwrapped_key_object_ptr = new_ptr(CK_OBJECT_HANDLE, 0) - label_unicode = label - id_ = new_array(CK_BYTE, id) - id_length = len(id) - wrapping_mech_ptr = new_ptr(CK_MECHANISM, (wrapping_mech, NULL, 0)) - key_class_ptr = new_ptr(CK_OBJECT_CLASS, CKO_SECRET_KEY) - key_type_ptr = new_ptr(CK_KEY_TYPE, key_type) - - attrs = ( - cka_copyable, - cka_decrypt, - cka_derive, - cka_encrypt, - cka_extractable, - cka_modifiable, - cka_private, - cka_sensitive, - cka_sign, - cka_unwrap, - cka_verify, - cka_wrap, - cka_wrap_with_trusted, - ) - - _set_wrapping_mech_parameters(wrapping_mech_ptr.mechanism, - wrapping_mech_ptr) - - label, label_length = unicode_to_char_array(label_unicode) - - if self._id_exists(id_, id_length, key_class_ptr[0]): - raise DuplicationError("Secret key with same ID already exists") - - # Process keyword boolean arguments - (_cka_copyable_ptr, cka_decrypt_ptr, cka_derive_ptr, cka_encrypt_ptr, - cka_extractable_ptr, cka_modifiable_ptr, cka_private_ptr, - cka_sensitive_ptr, cka_sign_ptr, cka_unwrap_ptr, cka_verify_ptr, - cka_wrap_ptr, cka_wrap_with_trusted_ptr,) = convert_py2bool(attrs) - - template = new_array(CK_ATTRIBUTE, ( - (CKA_CLASS, key_class_ptr, sizeof(CK_OBJECT_CLASS)), - (CKA_KEY_TYPE, key_type_ptr, sizeof(CK_KEY_TYPE)), - (CKA_ID, id_, id_length), - (CKA_LABEL, label, label_length), - (CKA_TOKEN, true_ptr, sizeof(CK_BBOOL)), - # TODO Softhsm doesn't support it - # (CKA_COPYABLE, cka_copyable_ptr, sizeof(CK_BBOOL)), - (CKA_DECRYPT, cka_decrypt_ptr, sizeof(CK_BBOOL)), - (CKA_DERIVE, cka_derive_ptr, sizeof(CK_BBOOL)), - (CKA_ENCRYPT, cka_encrypt_ptr, sizeof(CK_BBOOL)), - (CKA_EXTRACTABLE, cka_extractable_ptr, sizeof(CK_BBOOL)), - (CKA_MODIFIABLE, cka_modifiable_ptr, sizeof(CK_BBOOL)), - (CKA_PRIVATE, cka_private_ptr, sizeof(CK_BBOOL)), - (CKA_SENSITIVE, cka_sensitive_ptr, sizeof(CK_BBOOL)), - (CKA_SIGN, cka_sign_ptr, sizeof(CK_BBOOL)), - (CKA_UNWRAP, cka_unwrap_ptr, sizeof(CK_BBOOL)), - (CKA_VERIFY, cka_verify_ptr, sizeof(CK_BBOOL)), - (CKA_WRAP, cka_wrap_ptr, sizeof(CK_BBOOL)), - (CKA_WRAP_WITH_TRUSTED, cka_wrap_with_trusted_ptr, - sizeof(CK_BBOOL)), - )) - - rv = self.p11.C_UnwrapKey(self.session, wrapping_mech_ptr, - unwrapping_key_object, wrapped_key, - wrapped_key_len, template, - sizeof(template) // sizeof(CK_ATTRIBUTE), - unwrapped_key_object_ptr) - check_return_value(rv, "import_wrapped_key: key unwrapping") - - return unwrapped_key_object_ptr[0] - - def import_wrapped_private_key(self, label, id, data, unwrapping_key, - wrapping_mech, key_type, - cka_always_authenticate=False, - cka_copyable=True, cka_decrypt=False, - cka_derive=False, cka_extractable=True, - cka_modifiable=True, cka_private=True, - cka_sensitive=True, cka_sign=True, - cka_sign_recover=True, cka_unwrap=False, - cka_wrap_with_trusted=False): - """ - Import wrapped private key - """ - if isinstance(id, unicode): - id = id.encode() - if isinstance(data, unicode): - data = data.encode() - - wrapped_key = new_array(CK_BYTE, data) - wrapped_key_len = len(data) - unwrapping_key_object = unwrapping_key - unwrapped_key_object_ptr = new_ptr(CK_OBJECT_HANDLE, 0) - label_unicode = label - id_ = new_array(CK_BYTE, id) - id_length = len(id) - wrapping_mech_ptr = new_ptr(CK_MECHANISM, (wrapping_mech, NULL, 0)) - key_class_ptr = new_ptr(CK_OBJECT_CLASS, CKO_PRIVATE_KEY) - key_type_ptr = new_ptr(CK_KEY_TYPE, key_type) - - attrs_priv = ( - cka_always_authenticate, - cka_copyable, - cka_decrypt, - cka_derive, - cka_extractable, - cka_modifiable, - cka_private, - cka_sensitive, - cka_sign, - cka_sign_recover, - cka_unwrap, - cka_wrap_with_trusted, - ) - - label, label_length = unicode_to_char_array(label_unicode) - - if self._id_exists(id_, id_length, CKO_SECRET_KEY): - raise DuplicationError("Secret key with same ID already exists") - - # Process keyword boolean arguments - (cka_always_authenticate_ptr, _cka_copyable_ptr, cka_decrypt_ptr, - cka_derive_ptr, cka_extractable_ptr, cka_modifiable_ptr, - cka_private_ptr, cka_sensitive_ptr, cka_sign_ptr, - _cka_sign_recover_ptr, cka_unwrap_ptr, cka_wrap_with_trusted_ptr, - ) = convert_py2bool(attrs_priv) - - template = new_array(CK_ATTRIBUTE, ( - (CKA_CLASS, key_class_ptr, sizeof(CK_OBJECT_CLASS)), - (CKA_KEY_TYPE, key_type_ptr, sizeof(CK_KEY_TYPE)), - (CKA_ID, id_, id_length), - (CKA_LABEL, label, label_length), - (CKA_TOKEN, true_ptr, sizeof(CK_BBOOL)), - (CKA_ALWAYS_AUTHENTICATE, cka_always_authenticate_ptr, - sizeof(CK_BBOOL)), - # TODO Softhsm doesn't support it - # (CKA_COPYABLE, cka_copyable_ptr, sizeof(CK_BBOOL)), - (CKA_DECRYPT, cka_decrypt_ptr, sizeof(CK_BBOOL)), - (CKA_DERIVE, cka_derive_ptr, sizeof(CK_BBOOL)), - (CKA_EXTRACTABLE, cka_extractable_ptr, sizeof(CK_BBOOL)), - (CKA_MODIFIABLE, cka_modifiable_ptr, sizeof(CK_BBOOL)), - (CKA_PRIVATE, cka_private_ptr, sizeof(CK_BBOOL)), - (CKA_SENSITIVE, cka_sensitive_ptr, sizeof(CK_BBOOL)), - (CKA_SIGN, cka_sign_ptr, sizeof(CK_BBOOL)), - (CKA_SIGN_RECOVER, cka_sign_ptr, sizeof(CK_BBOOL)), - (CKA_UNWRAP, cka_unwrap_ptr, sizeof(CK_BBOOL)), - (CKA_WRAP_WITH_TRUSTED, cka_wrap_with_trusted_ptr, - sizeof(CK_BBOOL)), - )) - - rv = self.p11.C_UnwrapKey(self.session, wrapping_mech_ptr, - unwrapping_key_object, wrapped_key, - wrapped_key_len, template, - sizeof(template) // sizeof(CK_ATTRIBUTE), - unwrapped_key_object_ptr) - check_return_value(rv, "import_wrapped_key: key unwrapping") - - return unwrapped_key_object_ptr[0] - - def set_attribute(self, key_object, attr, value): - """ - Set object attributes - """ - object = key_object - attribute_ptr = new_ptr(CK_ATTRIBUTE) - - attribute_ptr.type = attr - if attr in (CKA_ALWAYS_AUTHENTICATE, - CKA_ALWAYS_SENSITIVE, - CKA_COPYABLE, - CKA_ENCRYPT, - CKA_EXTRACTABLE, - CKA_DECRYPT, - CKA_DERIVE, - CKA_LOCAL, - CKA_MODIFIABLE, - CKA_NEVER_EXTRACTABLE, - CKA_PRIVATE, - CKA_SENSITIVE, - CKA_SIGN, - CKA_SIGN_RECOVER, - CKA_TOKEN, - CKA_TRUSTED, - CKA_UNWRAP, - CKA_VERIFY, - CKA_VERIFY_RECOVER, - CKA_WRAP, - CKA_WRAP_WITH_TRUSTED): - attribute_ptr.pValue = true_ptr if value else false_ptr - attribute_ptr.ulValueLen = sizeof(CK_BBOOL) - elif attr == CKA_ID: - if not isinstance(value, bytes): - raise Error("Bytestring value expected") - attribute_ptr.pValue = new_array(CK_BYTE, value) - attribute_ptr.ulValueLen = len(value) - elif attr == CKA_LABEL: - if not isinstance(value, unicode): - raise Error("Unicode value expected") - label, label_length = unicode_to_char_array(value) - attribute_ptr.pValue = label - attribute_ptr.ulValueLen = label_length - elif attr == CKA_KEY_TYPE: - if not isinstance(value, int): - raise Error("Integer value expected") - attribute_ptr.pValue = new_ptr(unsigned_long, value) - attribute_ptr.ulValueLen = sizeof(unsigned_long) - else: - raise Error("Unknown attribute") - - template = new_array(CK_ATTRIBUTE, (attribute_ptr[0],)) - - rv = self.p11.C_SetAttributeValue(self.session, object, template, - (sizeof(template) // - sizeof(CK_ATTRIBUTE))) - check_return_value(rv, "set_attribute") - - def get_attribute(self, key_object, attr): - object = key_object - attribute_ptr = new_ptr(CK_ATTRIBUTE) - - attribute_ptr.type = attr - attribute_ptr.pValue = NULL_PTR - attribute_ptr.ulValueLen = 0 - template = new_array(CK_ATTRIBUTE, (attribute_ptr[0],)) - - rv = self.p11.C_GetAttributeValue(self.session, object, template, - (sizeof(template) // - sizeof(CK_ATTRIBUTE))) - if rv == CKR_ATTRIBUTE_TYPE_INVALID or template[0].ulValueLen == -1: - raise NotFound("attribute does not exist") - check_return_value(rv, "get_attribute init") - value = new_array(unsigned_char, template[0].ulValueLen) - template[0].pValue = value - - rv = self.p11.C_GetAttributeValue(self.session, object, template, - (sizeof(template) // - sizeof(CK_ATTRIBUTE))) - check_return_value(rv, "get_attribute") - - if attr in (CKA_ALWAYS_AUTHENTICATE, - CKA_ALWAYS_SENSITIVE, - CKA_COPYABLE, - CKA_ENCRYPT, - CKA_EXTRACTABLE, - CKA_DECRYPT, - CKA_DERIVE, - CKA_LOCAL, - CKA_MODIFIABLE, - CKA_NEVER_EXTRACTABLE, - CKA_PRIVATE, - CKA_SENSITIVE, - CKA_SIGN, - CKA_SIGN_RECOVER, - CKA_TOKEN, - CKA_TRUSTED, - CKA_UNWRAP, - CKA_VERIFY, - CKA_VERIFY_RECOVER, - CKA_WRAP, - CKA_WRAP_WITH_TRUSTED): - ret = bool(_ffi.cast(_ffi.getctype(CK_BBOOL, '*'), value)[0]) - elif attr == CKA_LABEL: - ret = char_array_to_unicode(value, template[0].ulValueLen) - elif attr in (CKA_MODULUS, CKA_PUBLIC_EXPONENT, CKA_ID): - ret = string_to_pybytes_or_none(value, template[0].ulValueLen) - elif attr == CKA_KEY_TYPE: - ret = _ffi.cast(_ffi.getctype(unsigned_long, '*'), value)[0] - else: - raise Error("Unknown attribute") - - return ret - - -# Key Classes -KEY_CLASS_PUBLIC_KEY = CKO_PUBLIC_KEY -KEY_CLASS_PRIVATE_KEY = CKO_PRIVATE_KEY -KEY_CLASS_SECRET_KEY = CKO_SECRET_KEY - -# Key types -KEY_TYPE_RSA = CKK_RSA -KEY_TYPE_AES = CKK_AES - -# Wrapping mech type -MECH_RSA_PKCS = CKM_RSA_PKCS -MECH_RSA_PKCS_OAEP = CKM_RSA_PKCS_OAEP -MECH_AES_KEY_WRAP = CKM_AES_KEY_WRAP -MECH_AES_KEY_WRAP_PAD = CKM_AES_KEY_WRAP_PAD - - -def generate_master_key(p11, keylabel=u"dnssec-master", key_length=16, - disable_old_keys=True): - assert isinstance(p11, P11_Helper) - - key_id = None - while True: - # check if key with this ID exist in LDAP or softHSM - # id is 16 Bytes long - key_id = "".join(chr(random.randint(0, 255)) for _ in range(0, 16)) - keys = p11.find_keys(KEY_CLASS_SECRET_KEY, - label=keylabel, - id=key_id) - if not keys: - break # we found unique id - - p11.generate_master_key(keylabel, - key_id, - key_length=key_length, - cka_wrap=True, - cka_unwrap=True) - - if disable_old_keys: - # set CKA_WRAP=False for old master keys - master_keys = p11.find_keys(KEY_CLASS_SECRET_KEY, - label=keylabel, - cka_wrap=True) - - for handle in master_keys: - # don't disable wrapping for new key - # compare IDs not handle - if key_id != p11.get_attribute(handle, CKA_ID): - p11.set_attribute(handle, CKA_WRAP, False) |
