From ba0358065939093e177e87c7daf22b64d1545e42 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Sat, 15 May 2021 22:48:08 +0200 Subject: [PATCH] x509_certificate_info: move main code to module_utils to allow easier implementation of diff mode (#206) * Move x509_certificate_info code to module_utils. * Add changelog fragment. * Apply suggestions from code review Co-authored-by: Ajpantuso Co-authored-by: Ajpantuso --- .../fragments/206-x509_certificate_info.yml | 2 + .../module_backends/certificate_info.py | 544 ++++++++++++++++ plugins/modules/x509_certificate_info.py | 581 +----------------- 3 files changed, 575 insertions(+), 552 deletions(-) create mode 100644 changelogs/fragments/206-x509_certificate_info.yml create mode 100644 plugins/module_utils/crypto/module_backends/certificate_info.py diff --git a/changelogs/fragments/206-x509_certificate_info.yml b/changelogs/fragments/206-x509_certificate_info.yml new file mode 100644 index 00000000..031736b9 --- /dev/null +++ b/changelogs/fragments/206-x509_certificate_info.yml @@ -0,0 +1,2 @@ +minor_changes: +- "x509_certificate_info - refactor module to allow code re-use for diff mode (https://github.com/ansible-collections/community.crypto/pull/206)." diff --git a/plugins/module_utils/crypto/module_backends/certificate_info.py b/plugins/module_utils/crypto/module_backends/certificate_info.py new file mode 100644 index 00000000..42c136d1 --- /dev/null +++ b/plugins/module_utils/crypto/module_backends/certificate_info.py @@ -0,0 +1,544 @@ +# -*- coding: utf-8 -*- +# +# Copyright: (c) 2016-2017, Yanis Guenane +# Copyright: (c) 2017, Markus Teufelberger +# Copyright: (c) 2020, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import abc +import binascii +import datetime +import re +import traceback + +from distutils.version import LooseVersion + +from ansible.module_utils import six +from ansible.module_utils.basic import missing_required_lib +from ansible.module_utils._text import to_native, to_text, to_bytes + +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + load_certificate, + get_fingerprint_of_bytes, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( + cryptography_decode_name, + cryptography_get_extensions_from_cert, + cryptography_oid_to_name, + cryptography_serial_number_of_cert, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import ( + pyopenssl_get_extensions_from_cert, + pyopenssl_normalize_name, + pyopenssl_normalize_name_attribute, +) + +MINIMAL_CRYPTOGRAPHY_VERSION = '1.6' +MINIMAL_PYOPENSSL_VERSION = '0.15' + +PYOPENSSL_IMP_ERR = None +try: + import OpenSSL + from OpenSSL import crypto + PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) + if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: + # OpenSSL 1.1.0 or newer + OPENSSL_MUST_STAPLE_NAME = b"tlsfeature" + OPENSSL_MUST_STAPLE_VALUE = b"status_request" + else: + # OpenSSL 1.0.x or older + OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" + OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" +except ImportError: + PYOPENSSL_IMP_ERR = traceback.format_exc() + PYOPENSSL_FOUND = False +else: + PYOPENSSL_FOUND = True + +CRYPTOGRAPHY_IMP_ERR = None +try: + import cryptography + from cryptography import x509 + from cryptography.hazmat.primitives import serialization + CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) +except ImportError: + CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() + CRYPTOGRAPHY_FOUND = False +else: + CRYPTOGRAPHY_FOUND = True + + +TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" + + +@six.add_metaclass(abc.ABCMeta) +class CertificateInfoRetrieval(object): + def __init__(self, module, backend, content): + # content must be a bytes string + self.module = module + self.backend = backend + self.content = content + + @abc.abstractmethod + def _get_der_bytes(self): + pass + + @abc.abstractmethod + def _get_signature_algorithm(self): + pass + + @abc.abstractmethod + def _get_subject_ordered(self): + pass + + @abc.abstractmethod + def _get_issuer_ordered(self): + pass + + @abc.abstractmethod + def _get_version(self): + pass + + @abc.abstractmethod + def _get_key_usage(self): + pass + + @abc.abstractmethod + def _get_extended_key_usage(self): + pass + + @abc.abstractmethod + def _get_basic_constraints(self): + pass + + @abc.abstractmethod + def _get_ocsp_must_staple(self): + pass + + @abc.abstractmethod + def _get_subject_alt_name(self): + pass + + @abc.abstractmethod + def get_not_before(self): + pass + + @abc.abstractmethod + def get_not_after(self): + pass + + @abc.abstractmethod + def _get_public_key(self, binary): + pass + + @abc.abstractmethod + def _get_subject_key_identifier(self): + pass + + @abc.abstractmethod + def _get_authority_key_identifier(self): + pass + + @abc.abstractmethod + def _get_serial_number(self): + pass + + @abc.abstractmethod + def _get_all_extensions(self): + pass + + @abc.abstractmethod + def _get_ocsp_uri(self): + pass + + def get_info(self): + result = dict() + self.cert = load_certificate(None, content=self.content, backend=self.backend) + + result['signature_algorithm'] = self._get_signature_algorithm() + subject = self._get_subject_ordered() + issuer = self._get_issuer_ordered() + result['subject'] = dict() + for k, v in subject: + result['subject'][k] = v + result['subject_ordered'] = subject + result['issuer'] = dict() + for k, v in issuer: + result['issuer'][k] = v + result['issuer_ordered'] = issuer + result['version'] = self._get_version() + result['key_usage'], result['key_usage_critical'] = self._get_key_usage() + result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage() + result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints() + result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple() + result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name() + + not_before = self.get_not_before() + not_after = self.get_not_after() + result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT) + result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT) + result['expired'] = not_after < datetime.datetime.utcnow() + + result['public_key'] = self._get_public_key(binary=False) + pk = self._get_public_key(binary=True) + result['public_key_fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict() + + result['fingerprints'] = get_fingerprint_of_bytes(self._get_der_bytes()) + + if self.backend != 'pyopenssl': + ski = self._get_subject_key_identifier() + if ski is not None: + ski = to_native(binascii.hexlify(ski)) + ski = ':'.join([ski[i:i + 2] for i in range(0, len(ski), 2)]) + result['subject_key_identifier'] = ski + + aki, aci, acsn = self._get_authority_key_identifier() + if aki is not None: + aki = to_native(binascii.hexlify(aki)) + aki = ':'.join([aki[i:i + 2] for i in range(0, len(aki), 2)]) + result['authority_key_identifier'] = aki + result['authority_cert_issuer'] = aci + result['authority_cert_serial_number'] = acsn + + result['serial_number'] = self._get_serial_number() + result['extensions_by_oid'] = self._get_all_extensions() + result['ocsp_uri'] = self._get_ocsp_uri() + + return result + + +class CertificateInfoRetrievalCryptography(CertificateInfoRetrieval): + """Validate the supplied cert, using the cryptography backend""" + def __init__(self, module, content): + super(CertificateInfoRetrievalCryptography, self).__init__(module, 'cryptography', content) + + def _get_der_bytes(self): + return self.cert.public_bytes(serialization.Encoding.DER) + + def _get_signature_algorithm(self): + return cryptography_oid_to_name(self.cert.signature_algorithm_oid) + + def _get_subject_ordered(self): + result = [] + for attribute in self.cert.subject: + result.append([cryptography_oid_to_name(attribute.oid), attribute.value]) + return result + + def _get_issuer_ordered(self): + result = [] + for attribute in self.cert.issuer: + result.append([cryptography_oid_to_name(attribute.oid), attribute.value]) + return result + + def _get_version(self): + if self.cert.version == x509.Version.v1: + return 1 + if self.cert.version == x509.Version.v3: + return 3 + return "unknown" + + def _get_key_usage(self): + try: + current_key_ext = self.cert.extensions.get_extension_for_class(x509.KeyUsage) + current_key_usage = current_key_ext.value + key_usage = dict( + digital_signature=current_key_usage.digital_signature, + content_commitment=current_key_usage.content_commitment, + key_encipherment=current_key_usage.key_encipherment, + data_encipherment=current_key_usage.data_encipherment, + key_agreement=current_key_usage.key_agreement, + key_cert_sign=current_key_usage.key_cert_sign, + crl_sign=current_key_usage.crl_sign, + encipher_only=False, + decipher_only=False, + ) + if key_usage['key_agreement']: + key_usage.update(dict( + encipher_only=current_key_usage.encipher_only, + decipher_only=current_key_usage.decipher_only + )) + + key_usage_names = dict( + digital_signature='Digital Signature', + content_commitment='Non Repudiation', + key_encipherment='Key Encipherment', + data_encipherment='Data Encipherment', + key_agreement='Key Agreement', + key_cert_sign='Certificate Sign', + crl_sign='CRL Sign', + encipher_only='Encipher Only', + decipher_only='Decipher Only', + ) + return sorted([ + key_usage_names[name] for name, value in key_usage.items() if value + ]), current_key_ext.critical + except cryptography.x509.ExtensionNotFound: + return None, False + + def _get_extended_key_usage(self): + try: + ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage) + return sorted([ + cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value + ]), ext_keyusage_ext.critical + except cryptography.x509.ExtensionNotFound: + return None, False + + def _get_basic_constraints(self): + try: + ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.BasicConstraints) + result = [] + result.append('CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE')) + if ext_keyusage_ext.value.path_length is not None: + result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length)) + return sorted(result), ext_keyusage_ext.critical + except cryptography.x509.ExtensionNotFound: + return None, False + + def _get_ocsp_must_staple(self): + try: + try: + # This only works with cryptography >= 2.1 + tlsfeature_ext = self.cert.extensions.get_extension_for_class(x509.TLSFeature) + value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value + except AttributeError: + # Fallback for cryptography < 2.1 + oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24") + tlsfeature_ext = self.cert.extensions.get_extension_for_oid(oid) + value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05" + return value, tlsfeature_ext.critical + except cryptography.x509.ExtensionNotFound: + return None, False + + def _get_subject_alt_name(self): + try: + san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) + result = [cryptography_decode_name(san) for san in san_ext.value] + return result, san_ext.critical + except cryptography.x509.ExtensionNotFound: + return None, False + + def get_not_before(self): + return self.cert.not_valid_before + + def get_not_after(self): + return self.cert.not_valid_after + + def _get_public_key(self, binary): + return self.cert.public_key().public_bytes( + serialization.Encoding.DER if binary else serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo + ) + + def _get_subject_key_identifier(self): + try: + ext = self.cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier) + return ext.value.digest + except cryptography.x509.ExtensionNotFound: + return None + + def _get_authority_key_identifier(self): + try: + ext = self.cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier) + issuer = None + if ext.value.authority_cert_issuer is not None: + issuer = [cryptography_decode_name(san) for san in ext.value.authority_cert_issuer] + return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number + except cryptography.x509.ExtensionNotFound: + return None, None, None + + def _get_serial_number(self): + return cryptography_serial_number_of_cert(self.cert) + + def _get_all_extensions(self): + return cryptography_get_extensions_from_cert(self.cert) + + def _get_ocsp_uri(self): + try: + ext = self.cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess) + for desc in ext.value: + if desc.access_method == x509.oid.AuthorityInformationAccessOID.OCSP: + if isinstance(desc.access_location, x509.UniformResourceIdentifier): + return desc.access_location.value + except x509.ExtensionNotFound as dummy: + pass + return None + + +class CertificateInfoRetrievalPyOpenSSL(CertificateInfoRetrieval): + """validate the supplied certificate.""" + + def __init__(self, module, content): + super(CertificateInfoRetrievalPyOpenSSL, self).__init__(module, 'pyopenssl', content) + + def _get_der_bytes(self): + return crypto.dump_certificate(crypto.FILETYPE_ASN1, self.cert) + + def _get_signature_algorithm(self): + return to_text(self.cert.get_signature_algorithm()) + + def __get_name(self, name): + result = [] + for sub in name.get_components(): + result.append([pyopenssl_normalize_name(sub[0]), to_text(sub[1])]) + return result + + def _get_subject_ordered(self): + return self.__get_name(self.cert.get_subject()) + + def _get_issuer_ordered(self): + return self.__get_name(self.cert.get_issuer()) + + def _get_version(self): + # Version numbers in certs are off by one: + # v1: 0, v2: 1, v3: 2 ... + return self.cert.get_version() + 1 + + def _get_extension(self, short_name): + for extension_idx in range(0, self.cert.get_extension_count()): + extension = self.cert.get_extension(extension_idx) + if extension.get_short_name() == short_name: + result = [ + pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',') + ] + return sorted(result), bool(extension.get_critical()) + return None, False + + def _get_key_usage(self): + return self._get_extension(b'keyUsage') + + def _get_extended_key_usage(self): + return self._get_extension(b'extendedKeyUsage') + + def _get_basic_constraints(self): + return self._get_extension(b'basicConstraints') + + def _get_ocsp_must_staple(self): + extensions = [self.cert.get_extension(i) for i in range(0, self.cert.get_extension_count())] + oms_ext = [ + ext for ext in extensions + if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE + ] + if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000: + # Older versions of libssl don't know about OCSP Must Staple + oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05']) + if oms_ext: + return True, bool(oms_ext[0].get_critical()) + else: + return None, False + + def _get_subject_alt_name(self): + for extension_idx in range(0, self.cert.get_extension_count()): + extension = self.cert.get_extension(extension_idx) + if extension.get_short_name() == b'subjectAltName': + result = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in + to_text(extension, errors='surrogate_or_strict').split(', ')] + return result, bool(extension.get_critical()) + return None, False + + def get_not_before(self): + time_string = to_native(self.cert.get_notBefore()) + return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") + + def get_not_after(self): + time_string = to_native(self.cert.get_notAfter()) + return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") + + def _get_public_key(self, binary): + try: + return crypto.dump_publickey( + crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM, + self.cert.get_pubkey() + ) + except AttributeError: + try: + # pyOpenSSL < 16.0: + bio = crypto._new_mem_buf() + if binary: + rc = crypto._lib.i2d_PUBKEY_bio(bio, self.cert.get_pubkey()._pkey) + else: + rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.cert.get_pubkey()._pkey) + if rc != 1: + crypto._raise_current_error() + return crypto._bio_to_string(bio) + except AttributeError: + self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' + 'Please upgrade to version 16.0 or newer, or use the cryptography backend.') + + def _get_subject_key_identifier(self): + # Won't be implemented + return None + + def _get_authority_key_identifier(self): + # Won't be implemented + return None, None, None + + def _get_serial_number(self): + return self.cert.get_serial_number() + + def _get_all_extensions(self): + return pyopenssl_get_extensions_from_cert(self.cert) + + def _get_ocsp_uri(self): + for i in range(self.cert.get_extension_count()): + ext = self.cert.get_extension(i) + if ext.get_short_name() == b'authorityInfoAccess': + v = str(ext) + m = re.search('^OCSP - URI:(.*)$', v, flags=re.MULTILINE) + if m: + return m.group(1) + return None + + +def get_certificate_info(module, backend, content): + if backend == 'cryptography': + info = CertificateInfoRetrievalCryptography(module, content) + elif backend == 'pyopenssl': + info = CertificateInfoRetrievalPyOpenSSL(module, content) + return info.get_info() + + +def select_backend(module, backend, content): + if backend == 'auto': + # Detection what is possible + can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) + can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) + + # First try cryptography, then pyOpenSSL + if can_use_cryptography: + backend = 'cryptography' + elif can_use_pyopenssl: + backend = 'pyopenssl' + + # Success? + if backend == 'auto': + module.fail_json(msg=("Can't detect any of the required Python libraries " + "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( + MINIMAL_CRYPTOGRAPHY_VERSION, + MINIMAL_PYOPENSSL_VERSION)) + + if backend == 'pyopenssl': + if not PYOPENSSL_FOUND: + module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)), + exception=PYOPENSSL_IMP_ERR) + try: + getattr(crypto.X509Req, 'get_extensions') + except AttributeError: + module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs') + + module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', + version='2.0.0', collection_name='community.crypto') + return backend, CertificateInfoRetrievalPyOpenSSL(module, content) + elif backend == 'cryptography': + if not CRYPTOGRAPHY_FOUND: + module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), + exception=CRYPTOGRAPHY_IMP_ERR) + return backend, CertificateInfoRetrievalCryptography(module, content) + else: + raise ValueError('Unsupported value for backend: {0}'.format(backend)) diff --git a/plugins/modules/x509_certificate_info.py b/plugins/modules/x509_certificate_info.py index 3e7bfc71..18aaf4c0 100644 --- a/plugins/modules/x509_certificate_info.py +++ b/plugins/modules/x509_certificate_info.py @@ -304,529 +304,22 @@ ocsp_uri: ''' -import abc -import binascii -import datetime -import os -import re -import traceback - -from distutils.version import LooseVersion - -from ansible.module_utils.basic import AnsibleModule, missing_required_lib +from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.six import string_types -from ansible.module_utils._text import to_native, to_text, to_bytes - -from ansible_collections.community.crypto.plugins.module_utils.compat import ipaddress as compat_ipaddress +from ansible.module_utils._text import to_native from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( - OpenSSLObject, get_relative_time_option, - load_certificate, - get_fingerprint_of_bytes, ) -from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( - cryptography_decode_name, - cryptography_get_extensions_from_cert, - cryptography_oid_to_name, - cryptography_serial_number_of_cert, +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate_info import ( + select_backend, ) -from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import ( - pyopenssl_get_extensions_from_cert, - pyopenssl_normalize_name, - pyopenssl_normalize_name_attribute, -) - -MINIMAL_CRYPTOGRAPHY_VERSION = '1.6' -MINIMAL_PYOPENSSL_VERSION = '0.15' - -PYOPENSSL_IMP_ERR = None -try: - import OpenSSL - from OpenSSL import crypto - PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) - if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: - # OpenSSL 1.1.0 or newer - OPENSSL_MUST_STAPLE_NAME = b"tlsfeature" - OPENSSL_MUST_STAPLE_VALUE = b"status_request" - else: - # OpenSSL 1.0.x or older - OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" - OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" -except ImportError: - PYOPENSSL_IMP_ERR = traceback.format_exc() - PYOPENSSL_FOUND = False -else: - PYOPENSSL_FOUND = True - -CRYPTOGRAPHY_IMP_ERR = None -try: - import cryptography - from cryptography import x509 - from cryptography.hazmat.primitives import serialization - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) -except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True - - -TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ" - - -class CertificateInfo(OpenSSLObject): - def __init__(self, module, backend): - super(CertificateInfo, self).__init__( - module.params['path'] or '', - 'present', - False, - module.check_mode, - ) - self.backend = backend - self.module = module - self.content = module.params['content'] - if self.content is not None: - self.content = self.content.encode('utf-8') - - self.valid_at = module.params['valid_at'] - if self.valid_at: - for k, v in self.valid_at.items(): - if not isinstance(v, string_types): - self.module.fail_json( - msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v)) - ) - self.valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k)) - - def generate(self): - # Empty method because OpenSSLObject wants this - pass - - def dump(self): - # Empty method because OpenSSLObject wants this - pass - - @abc.abstractmethod - def _get_der_bytes(self): - pass - - @abc.abstractmethod - def _get_signature_algorithm(self): - pass - - @abc.abstractmethod - def _get_subject_ordered(self): - pass - - @abc.abstractmethod - def _get_issuer_ordered(self): - pass - - @abc.abstractmethod - def _get_version(self): - pass - - @abc.abstractmethod - def _get_key_usage(self): - pass - - @abc.abstractmethod - def _get_extended_key_usage(self): - pass - - @abc.abstractmethod - def _get_basic_constraints(self): - pass - - @abc.abstractmethod - def _get_ocsp_must_staple(self): - pass - - @abc.abstractmethod - def _get_subject_alt_name(self): - pass - - @abc.abstractmethod - def _get_not_before(self): - pass - - @abc.abstractmethod - def _get_not_after(self): - pass - - @abc.abstractmethod - def _get_public_key(self, binary): - pass - - @abc.abstractmethod - def _get_subject_key_identifier(self): - pass - - @abc.abstractmethod - def _get_authority_key_identifier(self): - pass - - @abc.abstractmethod - def _get_serial_number(self): - pass - - @abc.abstractmethod - def _get_all_extensions(self): - pass - - @abc.abstractmethod - def _get_ocsp_uri(self): - pass - - def get_info(self): - result = dict() - self.cert = load_certificate(self.path, content=self.content, backend=self.backend) - - result['signature_algorithm'] = self._get_signature_algorithm() - subject = self._get_subject_ordered() - issuer = self._get_issuer_ordered() - result['subject'] = dict() - for k, v in subject: - result['subject'][k] = v - result['subject_ordered'] = subject - result['issuer'] = dict() - for k, v in issuer: - result['issuer'][k] = v - result['issuer_ordered'] = issuer - result['version'] = self._get_version() - result['key_usage'], result['key_usage_critical'] = self._get_key_usage() - result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage() - result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints() - result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple() - result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name() - - not_before = self._get_not_before() - not_after = self._get_not_after() - result['not_before'] = not_before.strftime(TIMESTAMP_FORMAT) - result['not_after'] = not_after.strftime(TIMESTAMP_FORMAT) - result['expired'] = not_after < datetime.datetime.utcnow() - - result['valid_at'] = dict() - if self.valid_at: - for k, v in self.valid_at.items(): - result['valid_at'][k] = not_before <= v <= not_after - - result['public_key'] = self._get_public_key(binary=False) - pk = self._get_public_key(binary=True) - result['public_key_fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict() - - result['fingerprints'] = get_fingerprint_of_bytes(self._get_der_bytes()) - - if self.backend != 'pyopenssl': - ski = self._get_subject_key_identifier() - if ski is not None: - ski = to_native(binascii.hexlify(ski)) - ski = ':'.join([ski[i:i + 2] for i in range(0, len(ski), 2)]) - result['subject_key_identifier'] = ski - - aki, aci, acsn = self._get_authority_key_identifier() - if aki is not None: - aki = to_native(binascii.hexlify(aki)) - aki = ':'.join([aki[i:i + 2] for i in range(0, len(aki), 2)]) - result['authority_key_identifier'] = aki - result['authority_cert_issuer'] = aci - result['authority_cert_serial_number'] = acsn - - result['serial_number'] = self._get_serial_number() - result['extensions_by_oid'] = self._get_all_extensions() - result['ocsp_uri'] = self._get_ocsp_uri() - - return result - - -class CertificateInfoCryptography(CertificateInfo): - """Validate the supplied cert, using the cryptography backend""" - def __init__(self, module): - super(CertificateInfoCryptography, self).__init__(module, 'cryptography') - - def _get_der_bytes(self): - return self.cert.public_bytes(serialization.Encoding.DER) - - def _get_signature_algorithm(self): - return cryptography_oid_to_name(self.cert.signature_algorithm_oid) - - def _get_subject_ordered(self): - result = [] - for attribute in self.cert.subject: - result.append([cryptography_oid_to_name(attribute.oid), attribute.value]) - return result - - def _get_issuer_ordered(self): - result = [] - for attribute in self.cert.issuer: - result.append([cryptography_oid_to_name(attribute.oid), attribute.value]) - return result - - def _get_version(self): - if self.cert.version == x509.Version.v1: - return 1 - if self.cert.version == x509.Version.v3: - return 3 - return "unknown" - - def _get_key_usage(self): - try: - current_key_ext = self.cert.extensions.get_extension_for_class(x509.KeyUsage) - current_key_usage = current_key_ext.value - key_usage = dict( - digital_signature=current_key_usage.digital_signature, - content_commitment=current_key_usage.content_commitment, - key_encipherment=current_key_usage.key_encipherment, - data_encipherment=current_key_usage.data_encipherment, - key_agreement=current_key_usage.key_agreement, - key_cert_sign=current_key_usage.key_cert_sign, - crl_sign=current_key_usage.crl_sign, - encipher_only=False, - decipher_only=False, - ) - if key_usage['key_agreement']: - key_usage.update(dict( - encipher_only=current_key_usage.encipher_only, - decipher_only=current_key_usage.decipher_only - )) - - key_usage_names = dict( - digital_signature='Digital Signature', - content_commitment='Non Repudiation', - key_encipherment='Key Encipherment', - data_encipherment='Data Encipherment', - key_agreement='Key Agreement', - key_cert_sign='Certificate Sign', - crl_sign='CRL Sign', - encipher_only='Encipher Only', - decipher_only='Decipher Only', - ) - return sorted([ - key_usage_names[name] for name, value in key_usage.items() if value - ]), current_key_ext.critical - except cryptography.x509.ExtensionNotFound: - return None, False - - def _get_extended_key_usage(self): - try: - ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage) - return sorted([ - cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value - ]), ext_keyusage_ext.critical - except cryptography.x509.ExtensionNotFound: - return None, False - - def _get_basic_constraints(self): - try: - ext_keyusage_ext = self.cert.extensions.get_extension_for_class(x509.BasicConstraints) - result = [] - result.append('CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE')) - if ext_keyusage_ext.value.path_length is not None: - result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length)) - return sorted(result), ext_keyusage_ext.critical - except cryptography.x509.ExtensionNotFound: - return None, False - - def _get_ocsp_must_staple(self): - try: - try: - # This only works with cryptography >= 2.1 - tlsfeature_ext = self.cert.extensions.get_extension_for_class(x509.TLSFeature) - value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value - except AttributeError as dummy: - # Fallback for cryptography < 2.1 - oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24") - tlsfeature_ext = self.cert.extensions.get_extension_for_oid(oid) - value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05" - return value, tlsfeature_ext.critical - except cryptography.x509.ExtensionNotFound: - return None, False - - def _get_subject_alt_name(self): - try: - san_ext = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) - result = [cryptography_decode_name(san) for san in san_ext.value] - return result, san_ext.critical - except cryptography.x509.ExtensionNotFound: - return None, False - - def _get_not_before(self): - return self.cert.not_valid_before - - def _get_not_after(self): - return self.cert.not_valid_after - - def _get_public_key(self, binary): - return self.cert.public_key().public_bytes( - serialization.Encoding.DER if binary else serialization.Encoding.PEM, - serialization.PublicFormat.SubjectPublicKeyInfo - ) - - def _get_subject_key_identifier(self): - try: - ext = self.cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier) - return ext.value.digest - except cryptography.x509.ExtensionNotFound: - return None - - def _get_authority_key_identifier(self): - try: - ext = self.cert.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier) - issuer = None - if ext.value.authority_cert_issuer is not None: - issuer = [cryptography_decode_name(san) for san in ext.value.authority_cert_issuer] - return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number - except cryptography.x509.ExtensionNotFound: - return None, None, None - - def _get_serial_number(self): - return cryptography_serial_number_of_cert(self.cert) - - def _get_all_extensions(self): - return cryptography_get_extensions_from_cert(self.cert) - - def _get_ocsp_uri(self): - try: - ext = self.cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess) - for desc in ext.value: - if desc.access_method == x509.oid.AuthorityInformationAccessOID.OCSP: - if isinstance(desc.access_location, x509.UniformResourceIdentifier): - return desc.access_location.value - except x509.ExtensionNotFound as dummy: - pass - return None - - -class CertificateInfoPyOpenSSL(CertificateInfo): - """validate the supplied certificate.""" - - def __init__(self, module): - super(CertificateInfoPyOpenSSL, self).__init__(module, 'pyopenssl') - - def _get_der_bytes(self): - return crypto.dump_certificate(crypto.FILETYPE_ASN1, self.cert) - - def _get_signature_algorithm(self): - return to_text(self.cert.get_signature_algorithm()) - - def __get_name(self, name): - result = [] - for sub in name.get_components(): - result.append([pyopenssl_normalize_name(sub[0]), to_text(sub[1])]) - return result - - def _get_subject_ordered(self): - return self.__get_name(self.cert.get_subject()) - - def _get_issuer_ordered(self): - return self.__get_name(self.cert.get_issuer()) - - def _get_version(self): - # Version numbers in certs are off by one: - # v1: 0, v2: 1, v3: 2 ... - return self.cert.get_version() + 1 - - def _get_extension(self, short_name): - for extension_idx in range(0, self.cert.get_extension_count()): - extension = self.cert.get_extension(extension_idx) - if extension.get_short_name() == short_name: - result = [ - pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',') - ] - return sorted(result), bool(extension.get_critical()) - return None, False - - def _get_key_usage(self): - return self._get_extension(b'keyUsage') - - def _get_extended_key_usage(self): - return self._get_extension(b'extendedKeyUsage') - - def _get_basic_constraints(self): - return self._get_extension(b'basicConstraints') - - def _get_ocsp_must_staple(self): - extensions = [self.cert.get_extension(i) for i in range(0, self.cert.get_extension_count())] - oms_ext = [ - ext for ext in extensions - if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE - ] - if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000: - # Older versions of libssl don't know about OCSP Must Staple - oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05']) - if oms_ext: - return True, bool(oms_ext[0].get_critical()) - else: - return None, False - - def _get_subject_alt_name(self): - for extension_idx in range(0, self.cert.get_extension_count()): - extension = self.cert.get_extension(extension_idx) - if extension.get_short_name() == b'subjectAltName': - result = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in - to_text(extension, errors='surrogate_or_strict').split(', ')] - return result, bool(extension.get_critical()) - return None, False - - def _get_not_before(self): - time_string = to_native(self.cert.get_notBefore()) - return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") - - def _get_not_after(self): - time_string = to_native(self.cert.get_notAfter()) - return datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") - - def _get_public_key(self, binary): - try: - return crypto.dump_publickey( - crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM, - self.cert.get_pubkey() - ) - except AttributeError: - try: - # pyOpenSSL < 16.0: - bio = crypto._new_mem_buf() - if binary: - rc = crypto._lib.i2d_PUBKEY_bio(bio, self.cert.get_pubkey()._pkey) - else: - rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.cert.get_pubkey()._pkey) - if rc != 1: - crypto._raise_current_error() - return crypto._bio_to_string(bio) - except AttributeError: - self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' - 'Please upgrade to version 16.0 or newer, or use the cryptography backend.') - - def _get_subject_key_identifier(self): - # Won't be implemented - return None - - def _get_authority_key_identifier(self): - # Won't be implemented - return None, None, None - - def _get_serial_number(self): - return self.cert.get_serial_number() - - def _get_all_extensions(self): - return pyopenssl_get_extensions_from_cert(self.cert) - - def _get_ocsp_uri(self): - for i in range(self.cert.get_extension_count()): - ext = self.cert.get_extension(i) - if ext.get_short_name() == b'authorityInfoAccess': - v = str(ext) - m = re.search('^OCSP - URI:(.*)$', v, flags=re.MULTILINE) - if m: - return m.group(1) - return None - def main(): module = AnsibleModule( @@ -848,53 +341,37 @@ def main(): module.deprecate("The 'community.crypto.openssl_certificate_info' module has been renamed to 'community.crypto.x509_certificate_info'", version='2.0.0', collection_name='community.crypto') - try: - if module.params['path'] is not None: - base_dir = os.path.dirname(module.params['path']) or '.' - if not os.path.isdir(base_dir): + if module.params['content'] is not None: + data = module.params['content'].encode('utf-8') + else: + try: + with open(module.params['path'], 'rb') as f: + data = f.read() + except (IOError, OSError) as e: + module.fail_json(msg='Error while reading certificate file from disk: {0}'.format(e)) + + backend, module_backend = select_backend(module, module.params['select_crypto_backend'], data) + + valid_at = module.params['valid_at'] + if valid_at: + for k, v in valid_at.items(): + if not isinstance(v, string_types): module.fail_json( - name=base_dir, - msg='The directory %s does not exist or the file is not a directory' % base_dir + msg='The value for valid_at.{0} must be of type string (got {1})'.format(k, type(v)) ) + valid_at[k] = get_relative_time_option(v, 'valid_at.{0}'.format(k)) - backend = module.params['select_crypto_backend'] - if backend == 'auto': - # Detect what backend we can use - can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) + try: + result = module_backend.get_info() - # If cryptography is available we'll use it - if can_use_cryptography: - backend = 'cryptography' - elif can_use_pyopenssl: - backend = 'pyopenssl' + not_before = module_backend.get_not_before() + not_after = module_backend.get_not_after() - # Fail if no backend has been found - if backend == 'auto': - module.fail_json(msg=("Can't detect any of the required Python libraries " - "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( - MINIMAL_CRYPTOGRAPHY_VERSION, - MINIMAL_PYOPENSSL_VERSION)) + result['valid_at'] = dict() + if valid_at: + for k, v in valid_at.items(): + result['valid_at'][k] = not_before <= v <= not_after - if backend == 'pyopenssl': - if not PYOPENSSL_FOUND: - module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)), - exception=PYOPENSSL_IMP_ERR) - try: - getattr(crypto.X509Req, 'get_extensions') - except AttributeError: - module.fail_json(msg='You need to have PyOpenSSL>=0.15') - - module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', - version='2.0.0', collection_name='community.crypto') - certificate = CertificateInfoPyOpenSSL(module) - elif backend == 'cryptography': - if not CRYPTOGRAPHY_FOUND: - module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), - exception=CRYPTOGRAPHY_IMP_ERR) - certificate = CertificateInfoCryptography(module) - - result = certificate.get_info() module.exit_json(**result) except OpenSSLObjectError as exc: module.fail_json(msg=to_native(exc))