diff --git a/changelogs/fragments/150-diff.yml b/changelogs/fragments/150-diff.yml new file mode 100644 index 00000000..ed491173 --- /dev/null +++ b/changelogs/fragments/150-diff.yml @@ -0,0 +1,9 @@ +minor_changes: +- openssl_csr - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150). +- openssl_csr_pipe - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150). +- openssl_privatekey - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150). +- openssl_privatekey_pipe - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150). +- openssl_publickey - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150). +- x509_certificate - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150). +- x509_certificate_pipe - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150). +- x509_crl - add diff mode (https://github.com/ansible-collections/community.crypto/issues/38, https://github.com/ansible-collections/community.crypto/pull/150). diff --git a/plugins/module_utils/crypto/module_backends/certificate.py b/plugins/module_utils/crypto/module_backends/certificate.py index e82cf886..dbf2936c 100644 --- a/plugins/module_utils/crypto/module_backends/certificate.py +++ b/plugins/module_utils/crypto/module_backends/certificate.py @@ -33,6 +33,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp cryptography_compare_public_keys, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.certificate_info import ( + get_certificate_info, +) + MINIMAL_CRYPTOGRAPHY_VERSION = '1.6' MINIMAL_PYOPENSSL_VERSION = '0.15' @@ -95,6 +99,19 @@ class CertificateBackend(object): self.check_csr_subject = True self.check_csr_extensions = True + self.diff_before = self._get_info(None) + self.diff_after = self._get_info(None) + + def _get_info(self, data): + if data is None: + return dict() + try: + result = get_certificate_info(self.module, self.backend, data, prefer_one_fingerprint=True) + result['can_parse_certificate'] = True + return result + except Exception as exc: + return dict(can_parse_certificate=False) + @abc.abstractmethod def generate_certificate(self): """(Re-)Generate certificate.""" @@ -108,6 +125,7 @@ class CertificateBackend(object): def set_existing(self, certificate_bytes): """Set existing certificate bytes. None indicates that the key does not exist.""" self.existing_certificate_bytes = certificate_bytes + self.diff_after = self.diff_before = self._get_info(self.existing_certificate_bytes) def has_existing(self): """Query whether an existing certificate is/has been there.""" @@ -284,13 +302,19 @@ class CertificateBackend(object): 'privatekey': self.privatekey_path, 'csr': self.csr_path } + # Get hold of certificate bytes + certificate_bytes = self.existing_certificate_bytes + if self.cert is not None: + certificate_bytes = self.get_certificate_data() + self.diff_after = self._get_info(certificate_bytes) if include_certificate: - # Get hold of certificate bytes - certificate_bytes = self.existing_certificate_bytes - if self.cert is not None: - certificate_bytes = self.get_certificate_data() # Store result result['certificate'] = certificate_bytes.decode('utf-8') if certificate_bytes else None + + result['diff'] = dict( + before=self.diff_before, + after=self.diff_after, + ) return result diff --git a/plugins/module_utils/crypto/module_backends/certificate_info.py b/plugins/module_utils/crypto/module_backends/certificate_info.py index f329d168..29b788a4 100644 --- a/plugins/module_utils/crypto/module_backends/certificate_info.py +++ b/plugins/module_utils/crypto/module_backends/certificate_info.py @@ -165,7 +165,7 @@ class CertificateInfoRetrieval(object): def _get_ocsp_uri(self): pass - def get_info(self): + def get_info(self, prefer_one_fingerprint=False): result = dict() self.cert = load_certificate(None, content=self.content, backend=self.backend) @@ -195,14 +195,19 @@ class CertificateInfoRetrieval(object): result['public_key'] = self._get_public_key_pem() - public_key_info = get_publickey_info(self.module, self.backend, key=self._get_public_key_object()) + public_key_info = get_publickey_info( + self.module, + self.backend, + key=self._get_public_key_object(), + prefer_one_fingerprint=prefer_one_fingerprint) result.update({ 'public_key_type': public_key_info['type'], 'public_key_data': public_key_info['public_data'], 'public_key_fingerprints': public_key_info['fingerprints'], }) - result['fingerprints'] = get_fingerprint_of_bytes(self._get_der_bytes()) + result['fingerprints'] = get_fingerprint_of_bytes( + self._get_der_bytes(), prefer_one=prefer_one_fingerprint) if self.backend != 'pyopenssl': ski = self._get_subject_key_identifier() @@ -512,12 +517,12 @@ class CertificateInfoRetrievalPyOpenSSL(CertificateInfoRetrieval): return None -def get_certificate_info(module, backend, content): +def get_certificate_info(module, backend, content, prefer_one_fingerprint=False): if backend == 'cryptography': info = CertificateInfoRetrievalCryptography(module, content) elif backend == 'pyopenssl': info = CertificateInfoRetrievalPyOpenSSL(module, content) - return info.get_info() + return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) def select_backend(module, backend, content): diff --git a/plugins/module_utils/crypto/module_backends/csr.py b/plugins/module_utils/crypto/module_backends/csr.py index 75560f8c..acdc0a1f 100644 --- a/plugins/module_utils/crypto/module_backends/csr.py +++ b/plugins/module_utils/crypto/module_backends/csr.py @@ -48,6 +48,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_ pyopenssl_parse_name_constraints, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.csr_info import ( + get_csr_info, +) + from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec @@ -177,6 +181,20 @@ class CertificateSigningRequestBackend(object): self.existing_csr = None self.existing_csr_bytes = None + self.diff_before = self._get_info(None) + self.diff_after = self._get_info(None) + + def _get_info(self, data): + if data is None: + return dict() + try: + result = get_csr_info( + self.module, self.backend, data, validate_signature=False, prefer_one_fingerprint=True) + result['can_parse_csr'] = True + return result + except Exception as exc: + return dict(can_parse_csr=False) + @abc.abstractmethod def generate_csr(self): """(Re-)Generate CSR.""" @@ -190,6 +208,7 @@ class CertificateSigningRequestBackend(object): def set_existing(self, csr_bytes): """Set existing CSR bytes. None indicates that the CSR does not exist.""" self.existing_csr_bytes = csr_bytes + self.diff_after = self.diff_before = self._get_info(self.existing_csr_bytes) def has_existing(self): """Query whether an existing CSR is/has been there.""" @@ -238,13 +257,19 @@ class CertificateSigningRequestBackend(object): 'name_constraints_permitted': self.name_constraints_permitted, 'name_constraints_excluded': self.name_constraints_excluded, } + # Get hold of CSR bytes + csr_bytes = self.existing_csr_bytes + if self.csr is not None: + csr_bytes = self.get_csr_data() + self.diff_after = self._get_info(csr_bytes) if include_csr: - # Get hold of CSR bytes - csr_bytes = self.existing_csr_bytes - if self.csr is not None: - csr_bytes = self.get_csr_data() # Store result result['csr'] = csr_bytes.decode('utf-8') if csr_bytes else None + + result['diff'] = dict( + before=self.diff_before, + after=self.diff_after, + ) return result diff --git a/plugins/module_utils/crypto/module_backends/csr_info.py b/plugins/module_utils/crypto/module_backends/csr_info.py index 7fc8fcef..3ea76a9b 100644 --- a/plugins/module_utils/crypto/module_backends/csr_info.py +++ b/plugins/module_utils/crypto/module_backends/csr_info.py @@ -140,7 +140,7 @@ class CSRInfoRetrieval(object): def _is_signature_valid(self): pass - def get_info(self): + def get_info(self, prefer_one_fingerprint=False): result = dict() self.csr = load_certificate_request(None, content=self.content, backend=self.backend) @@ -162,7 +162,11 @@ class CSRInfoRetrieval(object): result['public_key'] = self._get_public_key_pem() - public_key_info = get_publickey_info(self.module, self.backend, key=self._get_public_key_object()) + public_key_info = get_publickey_info( + self.module, + self.backend, + key=self._get_public_key_object(), + prefer_one_fingerprint=prefer_one_fingerprint) result.update({ 'public_key_type': public_key_info['type'], 'public_key_data': public_key_info['public_data'], @@ -429,12 +433,12 @@ class CSRInfoRetrievalPyOpenSSL(CSRInfoRetrieval): return False -def get_csr_info(module, backend, content, validate_signature=True): +def get_csr_info(module, backend, content, validate_signature=True, prefer_one_fingerprint=False): if backend == 'cryptography': info = CSRInfoRetrievalCryptography(module, content, validate_signature=validate_signature) elif backend == 'pyopenssl': info = CSRInfoRetrievalPyOpenSSL(module, content, validate_signature=validate_signature) - return info.get_info() + return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) def select_backend(module, backend, content, validate_signature=True): diff --git a/plugins/module_utils/crypto/module_backends/privatekey.py b/plugins/module_utils/crypto/module_backends/privatekey.py index 0c8809a3..82d6bf81 100644 --- a/plugins/module_utils/crypto/module_backends/privatekey.py +++ b/plugins/module_utils/crypto/module_backends/privatekey.py @@ -37,6 +37,12 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import identify_private_key_format, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.privatekey_info import ( + PrivateKeyConsistencyError, + PrivateKeyParseError, + get_privatekey_info, +) + from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec @@ -102,6 +108,25 @@ class PrivateKeyBackend: self.existing_private_key = None self.existing_private_key_bytes = None + self.diff_before = self._get_info(None) + self.diff_after = self._get_info(None) + + def _get_info(self, data): + if data is None: + return dict() + result = dict(can_parse_key=False) + try: + result.update(get_privatekey_info( + self.module, self.backend, data, passphrase=self.passphrase, + return_private_key_data=False, prefer_one_fingerprint=True)) + except PrivateKeyConsistencyError as exc: + result.update(exc.result) + except PrivateKeyParseError as exc: + result.update(exc.result) + except Exception as exc: + pass + return result + @abc.abstractmethod def generate_private_key(self): """(Re-)Generate private key.""" @@ -125,6 +150,7 @@ class PrivateKeyBackend: def set_existing(self, privatekey_bytes): """Set existing private key bytes. None indicates that the key does not exist.""" self.existing_private_key_bytes = privatekey_bytes + self.diff_after = self.diff_before = self._get_info(self.existing_private_key_bytes) def has_existing(self): """Query whether an existing private key is/has been there.""" @@ -215,11 +241,12 @@ class PrivateKeyBackend: } if self.type == 'ECC': result['curve'] = self.curve + # Get hold of private key bytes + pk_bytes = self.existing_private_key_bytes + if self.private_key is not None: + pk_bytes = self.get_private_key_data() + self.diff_after = self._get_info(pk_bytes) if include_key: - # Get hold of private key bytes - pk_bytes = self.existing_private_key_bytes - if self.private_key is not None: - pk_bytes = self.get_private_key_data() # Store result if pk_bytes: if identify_private_key_format(pk_bytes) == 'raw': @@ -229,6 +256,10 @@ class PrivateKeyBackend: else: result['privatekey'] = None + result['diff'] = dict( + before=self.diff_before, + after=self.diff_after, + ) return result diff --git a/plugins/module_utils/crypto/module_backends/privatekey_info.py b/plugins/module_utils/crypto/module_backends/privatekey_info.py index 95c683e2..47c686de 100644 --- a/plugins/module_utils/crypto/module_backends/privatekey_info.py +++ b/plugins/module_utils/crypto/module_backends/privatekey_info.py @@ -208,7 +208,7 @@ class PrivateKeyInfoRetrieval(object): def _is_key_consistent(self, key_public_data, key_private_data): pass - def get_info(self): + def get_info(self, prefer_one_fingerprint=False): result = dict( can_parse_key=False, key_is_consistent=None, @@ -227,7 +227,8 @@ class PrivateKeyInfoRetrieval(object): result['public_key'] = self._get_public_key(binary=False) pk = self._get_public_key(binary=True) - result['public_key_fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict() + result['public_key_fingerprints'] = get_fingerprint_of_bytes( + pk, prefer_one=prefer_one_fingerprint) if pk is not None else dict() key_type, key_public_data, key_private_data = self._get_key_info() result['type'] = key_type @@ -386,14 +387,14 @@ class PrivateKeyInfoRetrievalPyOpenSSL(PrivateKeyInfoRetrieval): return None -def get_privatekey_info(module, backend, content, passphrase=None, return_private_key_data=False): +def get_privatekey_info(module, backend, content, passphrase=None, return_private_key_data=False, prefer_one_fingerprint=False): if backend == 'cryptography': info = PrivateKeyInfoRetrievalCryptography( module, content, passphrase=passphrase, return_private_key_data=return_private_key_data) elif backend == 'pyopenssl': info = PrivateKeyInfoRetrievalPyOpenSSL( module, content, passphrase=passphrase, return_private_key_data=return_private_key_data) - return info.get_info() + return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) def select_backend(module, backend, content, passphrase=None, return_private_key_data=False): diff --git a/plugins/module_utils/crypto/module_backends/publickey_info.py b/plugins/module_utils/crypto/module_backends/publickey_info.py index ae12acb4..f249dff4 100644 --- a/plugins/module_utils/crypto/module_backends/publickey_info.py +++ b/plugins/module_utils/crypto/module_backends/publickey_info.py @@ -209,7 +209,7 @@ class PublicKeyInfoRetrieval(object): def _get_key_info(self): pass - def get_info(self): + def get_info(self, prefer_one_fingerprint=False): result = dict() if self.key is None: try: @@ -218,7 +218,8 @@ class PublicKeyInfoRetrieval(object): raise PublicKeyParseError(to_native(e)) pk = self._get_public_key(binary=True) - result['fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict() + result['fingerprints'] = get_fingerprint_of_bytes( + pk, prefer_one=prefer_one_fingerprint) if pk is not None else dict() key_type, key_public_data = self._get_key_info() result['type'] = key_type @@ -276,12 +277,12 @@ class PublicKeyInfoRetrievalPyOpenSSL(PublicKeyInfoRetrieval): return key_type, key_public_data -def get_publickey_info(module, backend, content=None, key=None): +def get_publickey_info(module, backend, content=None, key=None, prefer_one_fingerprint=False): if backend == 'cryptography': info = PublicKeyInfoRetrievalCryptography(module, content=content, key=key) elif backend == 'pyopenssl': info = PublicKeyInfoRetrievalPyOpenSSL(module, content=content, key=key) - return info.get_info() + return info.get_info(prefer_one_fingerprint=prefer_one_fingerprint) def select_backend(module, backend, content=None, key=None): diff --git a/plugins/module_utils/crypto/support.py b/plugins/module_utils/crypto/support.py index 355786da..0a3828e7 100644 --- a/plugins/module_utils/crypto/support.py +++ b/plugins/module_utils/crypto/support.py @@ -52,7 +52,14 @@ from .basic import ( ) -def get_fingerprint_of_bytes(source): +# This list of preferred fingerprints is used when prefer_one=True is supplied to the +# fingerprinting methods. +PREFERRED_FINGERPRINTS = ( + 'sha256', 'sha3_256', 'sha512', 'sha3_512', 'sha384', 'sha3_384', 'sha1', 'md5' +) + + +def get_fingerprint_of_bytes(source, prefer_one=False): """Generate the fingerprint of the given bytes.""" fingerprint = {} @@ -65,6 +72,12 @@ def get_fingerprint_of_bytes(source): except AttributeError: return None + if prefer_one: + # Sort algorithms to have the ones in PREFERRED_FINGERPRINTS at the beginning + prefered_algorithms = [algorithm for algorithm in PREFERRED_FINGERPRINTS if algorithm in algorithms] + prefered_algorithms += sorted([algorithm for algorithm in algorithms if algorithm not in PREFERRED_FINGERPRINTS]) + algorithms = prefered_algorithms + for algo in algorithms: f = getattr(hashlib, algo) try: @@ -79,11 +92,13 @@ def get_fingerprint_of_bytes(source): except TypeError: pubkey_digest = h.hexdigest(32) fingerprint[algo] = ':'.join(pubkey_digest[i:i + 2] for i in range(0, len(pubkey_digest), 2)) + if prefer_one: + break return fingerprint -def get_fingerprint_of_privatekey(privatekey, backend='pyopenssl'): +def get_fingerprint_of_privatekey(privatekey, backend='pyopenssl', prefer_one=False): """Generate the fingerprint of the public key. """ if backend == 'pyopenssl': @@ -107,15 +122,15 @@ def get_fingerprint_of_privatekey(privatekey, backend='pyopenssl'): serialization.PublicFormat.SubjectPublicKeyInfo ) - return get_fingerprint_of_bytes(publickey) + return get_fingerprint_of_bytes(publickey, prefer_one=prefer_one) -def get_fingerprint(path, passphrase=None, content=None, backend='pyopenssl'): +def get_fingerprint(path, passphrase=None, content=None, backend='pyopenssl', prefer_one=False): """Generate the fingerprint of the public key. """ privatekey = load_privatekey(path, passphrase=passphrase, content=content, check_passphrase=False, backend=backend) - return get_fingerprint_of_privatekey(privatekey, backend=backend) + return get_fingerprint_of_privatekey(privatekey, backend=backend, prefer_one=prefer_one) def load_privatekey(path, passphrase=None, check_passphrase=True, content=None, backend='pyopenssl'): diff --git a/plugins/modules/openssl_publickey.py b/plugins/modules/openssl_publickey.py index 524e870c..a9b1c596 100644 --- a/plugins/modules/openssl_publickey.py +++ b/plugins/modules/openssl_publickey.py @@ -203,6 +203,11 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im get_fingerprint, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.publickey_info import ( + PublicKeyParseError, + get_publickey_info, +) + MINIMAL_PYOPENSSL_VERSION = '16.0.0' MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3' MINIMAL_CRYPTOGRAPHY_VERSION_OPENSSH = '1.4' @@ -244,6 +249,7 @@ class PublicKey(OpenSSLObject): module.params['force'], module.check_mode ) + self.module = module self.format = module.params['format'] self.privatekey_path = module.params['privatekey_path'] self.privatekey_content = module.params['privatekey_content'] @@ -259,6 +265,23 @@ class PublicKey(OpenSSLObject): self.backup = module.params['backup'] self.backup_file = None + self.diff_before = self._get_info(None) + self.diff_after = self._get_info(None) + + def _get_info(self, data): + if data is None: + return dict() + result = dict(can_parse_key=False) + try: + result.update(get_publickey_info( + self.module, self.backend, content=data, prefer_one_fingerprint=True)) + result['can_parse_key'] = True + except PublicKeyParseError as exc: + result.update(exc.result) + except Exception as exc: + pass + return result + def _create_publickey(self, module): self.privatekey = load_privatekey( path=self.privatekey_path, @@ -294,6 +317,7 @@ class PublicKey(OpenSSLObject): if not self.check(module, perms_required=False) or self.force: try: publickey_content = self._create_publickey(module) + self.diff_after = self._get_info(publickey_content) if self.return_content: self.publickey_bytes = publickey_content @@ -329,6 +353,7 @@ class PublicKey(OpenSSLObject): try: with open(self.path, 'rb') as public_key_fh: publickey_content = public_key_fh.read() + self.diff_before = self.diff_after = self._get_info(publickey_content) if self.return_content: self.publickey_bytes = publickey_content if self.backend == 'cryptography': @@ -387,6 +412,11 @@ class PublicKey(OpenSSLObject): self.publickey_bytes = load_file_if_exists(self.path, ignore_errors=True) result['publickey'] = self.publickey_bytes.decode('utf-8') if self.publickey_bytes else None + result['diff'] = dict( + before=self.diff_before, + after=self.diff_after, + ) + return result diff --git a/plugins/modules/x509_crl.py b/plugins/modules/x509_crl.py index 62286c14..eeb54e39 100644 --- a/plugins/modules/x509_crl.py +++ b/plugins/modules/x509_crl.py @@ -409,6 +409,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import identify_pem_format, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.crl_info import ( + get_crl_info, +) + MINIMAL_CRYPTOGRAPHY_VERSION = '1.2' CRYPTOGRAPHY_IMP_ERR = None @@ -550,6 +554,19 @@ class CRL(OpenSSLObject): except Exception as dummy: self.crl_content = None self.actual_format = self.format + data = None + + self.diff_after = self.diff_before = self._get_info(data) + + def _get_info(self, data): + if data is None: + return dict() + try: + result = get_crl_info(self.module, data) + result['can_parse_crl'] = True + return result + except Exception as exc: + return dict(can_parse_crl=False) def remove(self): if self.backup: @@ -681,6 +698,7 @@ class CRL(OpenSSLObject): result = self.crl.public_bytes(Encoding.DER) if result is not None: + self.diff_after = self._get_info(result) if self.return_content: if self.format == 'pem': self.crl_content = result @@ -742,6 +760,10 @@ class CRL(OpenSSLObject): if self.return_content: result['crl'] = self.crl_content + result['diff'] = dict( + before=self.diff_before, + after=self.diff_after, + ) return result