diff --git a/changelogs/fragments/302-openssl_pkcs12-cryptography-36.0.0.yml b/changelogs/fragments/302-openssl_pkcs12-cryptography-36.0.0.yml new file mode 100644 index 00000000..87c25662 --- /dev/null +++ b/changelogs/fragments/302-openssl_pkcs12-cryptography-36.0.0.yml @@ -0,0 +1,2 @@ +bugfixes: + - "openssl_pkcs12 - use new PKCS#12 deserialization infrastructure from cryptography 36.0.0 if available (https://github.com/ansible-collections/community.crypto/pull/302)." diff --git a/plugins/module_utils/crypto/cryptography_support.py b/plugins/module_utils/crypto/cryptography_support.py index 29e6624e..05f49886 100644 --- a/plugins/module_utils/crypto/cryptography_support.py +++ b/plugins/module_utils/crypto/cryptography_support.py @@ -48,6 +48,15 @@ except ImportError: # Error handled in the calling module. _load_key_and_certificates = None +try: + # This is a separate try/except since this is only present in cryptography 36.0.0 or newer + from cryptography.hazmat.primitives.serialization.pkcs12 import ( + load_pkcs12 as _load_pkcs12, + ) +except ImportError: + # Error handled in the calling module. + _load_pkcs12 = None + from .basic import ( CRYPTOGRAPHY_HAS_ED25519, CRYPTOGRAPHY_HAS_ED448, @@ -512,43 +521,74 @@ def cryptography_serial_number_of_cert(cert): def parse_pkcs12(pkcs12_bytes, passphrase=None): '''Returns a tuple (private_key, certificate, additional_certificates, friendly_name). ''' - if _load_key_and_certificates is None: - raise ValueError('load_key_and_certificates() not present in the current cryptography version') - private_key, certificate, additional_certificates = _load_key_and_certificates( - pkcs12_bytes, to_bytes(passphrase) if passphrase is not None else None) + if _load_pkcs12 is None and _load_key_and_certificates is None: + raise ValueError('neither load_pkcs12() nor load_key_and_certificates() present in the current cryptography version') + + if passphrase is not None: + passphrase = to_bytes(passphrase) + + # Main code for cryptography 36.0.0 and forward + if _load_pkcs12 is not None: + return _parse_pkcs12_36_0_0(pkcs12_bytes, passphrase) + + if LooseVersion(cryptography.__version__) >= LooseVersion('35.0'): + return _parse_pkcs12_35_0_0(pkcs12_bytes, passphrase) + + return _parse_pkcs12_legacy(pkcs12_bytes, passphrase) + + +def _parse_pkcs12_36_0_0(pkcs12_bytes, passphrase=None): + # Requires cryptography 36.0.0 or newer + pkcs12 = _load_pkcs12(pkcs12_bytes, passphrase) + additional_certificates = [cert.certificate for cert in pkcs12.additional_certs] + private_key = pkcs12.key + certificate = None + friendly_name = None + if pkcs12.cert: + certificate = pkcs12.cert.certificate + friendly_name = pkcs12.cert.friendly_name + return private_key, certificate, additional_certificates, friendly_name + + +def _parse_pkcs12_35_0_0(pkcs12_bytes, passphrase=None): + # Backwards compatibility code for cryptography 35.x + private_key, certificate, additional_certificates = _load_key_and_certificates(pkcs12_bytes, passphrase) friendly_name = None if certificate: # See https://github.com/pyca/cryptography/issues/5760#issuecomment-842687238 backend = default_backend() - try: - # For certain old versions of cryptography, backend is a MultiBackend object, - # which has no _lib attribute. In that case, revert to the old approach. - backend._lib - except AttributeError: - backend = certificate._backend - if LooseVersion(cryptography.__version__) >= LooseVersion('35.0'): - # This code basically does what load_key_and_certificates() does, but without error-checking. - # Since load_key_and_certificates succeeded, it should not fail. - pkcs12 = backend._ffi.gc( - backend._lib.d2i_PKCS12_bio(backend._bytes_to_bio(pkcs12_bytes).bio, backend._ffi.NULL), - backend._lib.PKCS12_free) - certificate_x509_ptr = backend._ffi.new("X509 **") - with backend._zeroed_null_terminated_buf(to_bytes(passphrase) if passphrase is not None else None) as passphrase_buffer: - backend._lib.PKCS12_parse( - pkcs12, - passphrase_buffer, - backend._ffi.new("EVP_PKEY **"), - certificate_x509_ptr, - backend._ffi.new("Cryptography_STACK_OF_X509 **")) - if certificate_x509_ptr[0] != backend._ffi.NULL: - maybe_name = backend._lib.X509_alias_get0(certificate_x509_ptr[0], backend._ffi.NULL) - if maybe_name != backend._ffi.NULL: - friendly_name = backend._ffi.string(maybe_name) - else: - # cryptography < 35.0.0 - maybe_name = backend._lib.X509_alias_get0(certificate._x509, backend._ffi.NULL) + # This code basically does what load_key_and_certificates() does, but without error-checking. + # Since load_key_and_certificates succeeded, it should not fail. + pkcs12 = backend._ffi.gc( + backend._lib.d2i_PKCS12_bio(backend._bytes_to_bio(pkcs12_bytes).bio, backend._ffi.NULL), + backend._lib.PKCS12_free) + certificate_x509_ptr = backend._ffi.new("X509 **") + with backend._zeroed_null_terminated_buf(to_bytes(passphrase) if passphrase is not None else None) as passphrase_buffer: + backend._lib.PKCS12_parse( + pkcs12, + passphrase_buffer, + backend._ffi.new("EVP_PKEY **"), + certificate_x509_ptr, + backend._ffi.new("Cryptography_STACK_OF_X509 **")) + if certificate_x509_ptr[0] != backend._ffi.NULL: + maybe_name = backend._lib.X509_alias_get0(certificate_x509_ptr[0], backend._ffi.NULL) if maybe_name != backend._ffi.NULL: friendly_name = backend._ffi.string(maybe_name) + + return private_key, certificate, additional_certificates, friendly_name + + +def _parse_pkcs12_legacy(pkcs12_bytes, passphrase=None): + # Backwards compatibility code for cryptography < 35.0.0 + private_key, certificate, additional_certificates = _load_key_and_certificates(pkcs12_bytes, passphrase) + + friendly_name = None + if certificate: + # See https://github.com/pyca/cryptography/issues/5760#issuecomment-842687238 + backend = certificate._backend + maybe_name = backend._lib.X509_alias_get0(certificate._x509, backend._ffi.NULL) + if maybe_name != backend._ffi.NULL: + friendly_name = backend._ffi.string(maybe_name) return private_key, certificate, additional_certificates, friendly_name