diff --git a/plugins/doc_fragments/module_csr.py b/plugins/doc_fragments/module_csr.py new file mode 100644 index 00000000..b8524985 --- /dev/null +++ b/plugins/doc_fragments/module_csr.py @@ -0,0 +1,267 @@ +# -*- coding: utf-8 -*- + +# Copyrigt: (c) 2017, Yanis Guenane +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +class ModuleDocFragment(object): + + # Standard files documentation fragment + DOCUMENTATION = r''' +description: + - This module allows one to (re)generate OpenSSL certificate signing requests. + - This module supports the subjectAltName, keyUsage, extendedKeyUsage, basicConstraints and OCSP Must Staple + extensions. + - "The module can use the cryptography Python library, or the pyOpenSSL Python + library. By default, it tries to detect which one is available. This can be + overridden with the I(select_crypto_backend) option. Please note that the + PyOpenSSL backend was deprecated in Ansible 2.9 and will be removed in community.crypto 2.0.0." +requirements: + - Either cryptography >= 1.3 + - Or pyOpenSSL >= 0.15 +options: + digest: + description: + - The digest used when signing the certificate signing request with the private key. + type: str + default: sha256 + privatekey_path: + description: + - The path to the private key to use when signing the certificate signing request. + - Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both. + type: path + privatekey_content: + description: + - The content of the private key to use when signing the certificate signing request. + - Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both. + type: str + privatekey_passphrase: + description: + - The passphrase for the private key. + - This is required if the private key is password protected. + type: str + version: + description: + - The version of the certificate signing request. + - "The only allowed value according to L(RFC 2986,https://tools.ietf.org/html/rfc2986#section-4.1) + is 1." + - This option will no longer accept unsupported values from community.crypto 2.0.0 on. + type: int + default: 1 + subject: + description: + - Key/value pairs that will be present in the subject name field of the certificate signing request. + - If you need to specify more than one value with the same key, use a list as value. + type: dict + country_name: + description: + - The countryName field of the certificate signing request subject. + type: str + aliases: [ C, countryName ] + state_or_province_name: + description: + - The stateOrProvinceName field of the certificate signing request subject. + type: str + aliases: [ ST, stateOrProvinceName ] + locality_name: + description: + - The localityName field of the certificate signing request subject. + type: str + aliases: [ L, localityName ] + organization_name: + description: + - The organizationName field of the certificate signing request subject. + type: str + aliases: [ O, organizationName ] + organizational_unit_name: + description: + - The organizationalUnitName field of the certificate signing request subject. + type: str + aliases: [ OU, organizationalUnitName ] + common_name: + description: + - The commonName field of the certificate signing request subject. + type: str + aliases: [ CN, commonName ] + email_address: + description: + - The emailAddress field of the certificate signing request subject. + type: str + aliases: [ E, emailAddress ] + subject_alt_name: + description: + - Subject Alternative Name (SAN) extension to attach to the certificate signing request. + - This can either be a 'comma separated string' or a YAML list. + - Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName), + C(otherName) and the ones specific to your CA). + - Note that if no SAN is specified, but a common name, the common + name will be added as a SAN except if C(useCommonNameForSAN) is + set to I(false). + - More at U(https://tools.ietf.org/html/rfc5280#section-4.2.1.6). + type: list + elements: str + aliases: [ subjectAltName ] + subject_alt_name_critical: + description: + - Should the subjectAltName extension be considered as critical. + type: bool + aliases: [ subjectAltName_critical ] + use_common_name_for_san: + description: + - If set to C(yes), the module will fill the common name in for + C(subject_alt_name) with C(DNS:) prefix if no SAN is specified. + type: bool + default: yes + aliases: [ useCommonNameForSAN ] + key_usage: + description: + - This defines the purpose (e.g. encipherment, signature, certificate signing) + of the key contained in the certificate. + type: list + elements: str + aliases: [ keyUsage ] + key_usage_critical: + description: + - Should the keyUsage extension be considered as critical. + type: bool + aliases: [ keyUsage_critical ] + extended_key_usage: + description: + - Additional restrictions (e.g. client authentication, server authentication) + on the allowed purposes for which the public key may be used. + type: list + elements: str + aliases: [ extKeyUsage, extendedKeyUsage ] + extended_key_usage_critical: + description: + - Should the extkeyUsage extension be considered as critical. + type: bool + aliases: [ extKeyUsage_critical, extendedKeyUsage_critical ] + basic_constraints: + description: + - Indicates basic constraints, such as if the certificate is a CA. + type: list + elements: str + aliases: [ basicConstraints ] + basic_constraints_critical: + description: + - Should the basicConstraints extension be considered as critical. + type: bool + aliases: [ basicConstraints_critical ] + ocsp_must_staple: + description: + - Indicates that the certificate should contain the OCSP Must Staple + extension (U(https://tools.ietf.org/html/rfc7633)). + type: bool + aliases: [ ocspMustStaple ] + ocsp_must_staple_critical: + description: + - Should the OCSP Must Staple extension be considered as critical. + - Note that according to the RFC, this extension should not be marked + as critical, as old clients not knowing about OCSP Must Staple + are required to reject such certificates + (see U(https://tools.ietf.org/html/rfc7633#section-4)). + type: bool + aliases: [ ocspMustStaple_critical ] + name_constraints_permitted: + description: + - For CA certificates, this specifies a list of identifiers which describe + subtrees of names that this CA is allowed to issue certificates for. + - Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName), + C(otherName) and the ones specific to your CA). + type: list + elements: str + name_constraints_excluded: + description: + - For CA certificates, this specifies a list of identifiers which describe + subtrees of names that this CA is *not* allowed to issue certificates for. + - Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName), + C(otherName) and the ones specific to your CA). + type: list + elements: str + name_constraints_critical: + description: + - Should the Name Constraints extension be considered as critical. + type: bool + select_crypto_backend: + description: + - Determines which crypto backend to use. + - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl). + - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library. + - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in community.crypto 2.0.0. + From that point on, only the C(cryptography) backend will be available. + type: str + default: auto + choices: [ auto, cryptography, pyopenssl ] + create_subject_key_identifier: + description: + - Create the Subject Key Identifier from the public key. + - "Please note that commercial CAs can ignore the value, respectively use a value of + their own choice instead. Specifying this option is mostly useful for self-signed + certificates or for own CAs." + - Note that this is only supported if the C(cryptography) backend is used! + type: bool + default: no + subject_key_identifier: + description: + - The subject key identifier as a hex string, where two bytes are separated by colons. + - "Example: C(00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33)" + - "Please note that commercial CAs ignore this value, respectively use a value of their + own choice. Specifying this option is mostly useful for self-signed certificates + or for own CAs." + - Note that this option can only be used if I(create_subject_key_identifier) is C(no). + - Note that this is only supported if the C(cryptography) backend is used! + type: str + authority_key_identifier: + description: + - The authority key identifier as a hex string, where two bytes are separated by colons. + - "Example: C(00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33)" + - If specified, I(authority_cert_issuer) must also be specified. + - "Please note that commercial CAs ignore this value, respectively use a value of their + own choice. Specifying this option is mostly useful for self-signed certificates + or for own CAs." + - Note that this is only supported if the C(cryptography) backend is used! + - The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier), + I(authority_cert_issuer) and I(authority_cert_serial_number) is specified. + type: str + authority_cert_issuer: + description: + - Names that will be present in the authority cert issuer field of the certificate signing request. + - Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName), + C(otherName) and the ones specific to your CA) + - "Example: C(DNS:ca.example.org)" + - If specified, I(authority_key_identifier) must also be specified. + - "Please note that commercial CAs ignore this value, respectively use a value of their + own choice. Specifying this option is mostly useful for self-signed certificates + or for own CAs." + - Note that this is only supported if the C(cryptography) backend is used! + - The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier), + I(authority_cert_issuer) and I(authority_cert_serial_number) is specified. + type: list + elements: str + authority_cert_serial_number: + description: + - The authority cert serial number. + - Note that this is only supported if the C(cryptography) backend is used! + - "Please note that commercial CAs ignore this value, respectively use a value of their + own choice. Specifying this option is mostly useful for self-signed certificates + or for own CAs." + - The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier), + I(authority_cert_issuer) and I(authority_cert_serial_number) is specified. + type: int +notes: + - If the certificate signing request already exists it will be checked whether subjectAltName, + keyUsage, extendedKeyUsage and basicConstraints only contain the requested values, whether + OCSP Must Staple is as requested, and if the request was signed by the given private key. +seealso: +- module: community.crypto.x509_certificate +- module: community.crypto.openssl_dhparam +- module: community.crypto.openssl_pkcs12 +- module: community.crypto.openssl_privatekey +- module: community.crypto.openssl_publickey +- module: community.crypto.openssl_csr_info +''' diff --git a/plugins/module_utils/crypto/module_backends/common.py b/plugins/module_utils/crypto/module_backends/common.py new file mode 100644 index 00000000..7f22caff --- /dev/null +++ b/plugins/module_utils/crypto/module_backends/common.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# +# Copyright: (c) 2020, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +from ansible.module_utils.basic import AnsibleModule + + +class ArgumentSpec: + def __init__(self, argument_spec, mutually_exclusive=None, required_together=None, required_one_of=None, required_if=None, required_by=None): + self.argument_spec = argument_spec + self.mutually_exclusive = mutually_exclusive or [] + self.required_together = required_together or [] + self.required_one_of = required_one_of or [] + self.required_if = required_if or [] + self.required_by = required_by or {} + + def create_ansible_module(self, **kwargs): + return AnsibleModule( + argument_spec=self.argument_spec, + mutually_exclusive=self.mutually_exclusive, + required_together=self.required_together, + required_one_of=self.required_one_of, + required_if=self.required_if, + required_by=self.required_by, + **kwargs) diff --git a/plugins/module_utils/crypto/module_backends/csr.py b/plugins/module_utils/crypto/module_backends/csr.py new file mode 100644 index 00000000..24a46a3c --- /dev/null +++ b/plugins/module_utils/crypto/module_backends/csr.py @@ -0,0 +1,764 @@ +# -*- coding: utf-8 -*- +# +# Copyright: (c) 2016, Yanis Guenane +# Copyright: (c) 2020, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import abc +import binascii +import traceback + +from distutils.version import LooseVersion + +from ansible.module_utils import six +from ansible.module_utils.basic import missing_required_lib +from ansible.module_utils._text import to_bytes, to_text + +from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( + OpenSSLObjectError, + OpenSSLBadPassphraseError, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + load_privatekey, + load_certificate_request, + parse_name_field, + select_message_digest, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( + cryptography_get_basic_constraints, + cryptography_get_name, + cryptography_name_to_oid, + cryptography_key_needs_digest_for_signing, + cryptography_parse_key_usage_params, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import ( + pyopenssl_normalize_name_attribute, + pyopenssl_parse_name_constraints, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec + + +MINIMAL_PYOPENSSL_VERSION = '0.15' +MINIMAL_CRYPTOGRAPHY_VERSION = '1.3' + +PYOPENSSL_IMP_ERR = None +try: + import OpenSSL + from OpenSSL import crypto + PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) +except ImportError: + PYOPENSSL_IMP_ERR = traceback.format_exc() + PYOPENSSL_FOUND = False +else: + PYOPENSSL_FOUND = True + if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: + # OpenSSL 1.1.0 or newer + OPENSSL_MUST_STAPLE_NAME = b"tlsfeature" + OPENSSL_MUST_STAPLE_VALUE = b"status_request" + else: + # OpenSSL 1.0.x or older + OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" + OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" + +CRYPTOGRAPHY_IMP_ERR = None +try: + import cryptography + import cryptography.x509 + import cryptography.x509.oid + import cryptography.exceptions + import cryptography.hazmat.backends + import cryptography.hazmat.primitives.serialization + import cryptography.hazmat.primitives.hashes + CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) +except ImportError: + CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() + CRYPTOGRAPHY_FOUND = False +else: + CRYPTOGRAPHY_FOUND = True + CRYPTOGRAPHY_MUST_STAPLE_NAME = cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24") + CRYPTOGRAPHY_MUST_STAPLE_VALUE = b"\x30\x03\x02\x01\x05" + + +class CertificateSigningRequestError(OpenSSLObjectError): + pass + + +# From the object called `module`, only the following properties are used: +# +# - module.params[] +# - module.warn(msg: str) +# - module.fail_json(msg: str, **kwargs) + + +@six.add_metaclass(abc.ABCMeta) +class CertificateSigningRequestBackend(object): + def __init__(self, module, backend): + self.module = module + self.backend = backend + self.digest = module.params['digest'] + self.privatekey_path = module.params['privatekey_path'] + self.privatekey_content = module.params['privatekey_content'] + if self.privatekey_content is not None: + self.privatekey_content = self.privatekey_content.encode('utf-8') + self.privatekey_passphrase = module.params['privatekey_passphrase'] + self.version = module.params['version'] + self.subjectAltName = module.params['subject_alt_name'] + self.subjectAltName_critical = module.params['subject_alt_name_critical'] + self.keyUsage = module.params['key_usage'] + self.keyUsage_critical = module.params['key_usage_critical'] + self.extendedKeyUsage = module.params['extended_key_usage'] + self.extendedKeyUsage_critical = module.params['extended_key_usage_critical'] + self.basicConstraints = module.params['basic_constraints'] + self.basicConstraints_critical = module.params['basic_constraints_critical'] + self.ocspMustStaple = module.params['ocsp_must_staple'] + self.ocspMustStaple_critical = module.params['ocsp_must_staple_critical'] + self.name_constraints_permitted = module.params['name_constraints_permitted'] or [] + self.name_constraints_excluded = module.params['name_constraints_excluded'] or [] + self.name_constraints_critical = module.params['name_constraints_critical'] + self.create_subject_key_identifier = module.params['create_subject_key_identifier'] + self.subject_key_identifier = module.params['subject_key_identifier'] + self.authority_key_identifier = module.params['authority_key_identifier'] + self.authority_cert_issuer = module.params['authority_cert_issuer'] + self.authority_cert_serial_number = module.params['authority_cert_serial_number'] + self.csr = None + self.privatekey = None + + if self.create_subject_key_identifier and self.subject_key_identifier is not None: + module.fail_json(msg='subject_key_identifier cannot be specified if create_subject_key_identifier is true') + + self.subject = [ + ('C', module.params['country_name']), + ('ST', module.params['state_or_province_name']), + ('L', module.params['locality_name']), + ('O', module.params['organization_name']), + ('OU', module.params['organizational_unit_name']), + ('CN', module.params['common_name']), + ('emailAddress', module.params['email_address']), + ] + + if module.params['subject']: + self.subject = self.subject + parse_name_field(module.params['subject']) + self.subject = [(entry[0], entry[1]) for entry in self.subject if entry[1]] + + self.using_common_name_for_san = False + if not self.subjectAltName and module.params['use_common_name_for_san']: + for sub in self.subject: + if sub[0] in ('commonName', 'CN'): + self.subjectAltName = ['DNS:%s' % sub[1]] + self.using_common_name_for_san = True + break + + if self.subject_key_identifier is not None: + try: + self.subject_key_identifier = binascii.unhexlify(self.subject_key_identifier.replace(':', '')) + except Exception as e: + raise CertificateSigningRequestError('Cannot parse subject_key_identifier: {0}'.format(e)) + + if self.authority_key_identifier is not None: + try: + self.authority_key_identifier = binascii.unhexlify(self.authority_key_identifier.replace(':', '')) + except Exception as e: + raise CertificateSigningRequestError('Cannot parse authority_key_identifier: {0}'.format(e)) + + self.existing_csr = None + self.existing_csr_bytes = None + + @abc.abstractmethod + def generate_csr(self): + """(Re-)Generate private key.""" + pass + + @abc.abstractmethod + def get_csr_data(self): + """Return bytes for self.csr.""" + pass + + def set_existing(self, csr_bytes): + """Set existing private key bytes. None indicates that the key does not exist.""" + self.existing_csr_bytes = csr_bytes + + def has_existing(self): + """Query whether an existing private key is/has been there.""" + return self.existing_csr_bytes is not None + + def _ensure_private_key_loaded(self): + """Load the provided private key into self.privatekey.""" + if self.privatekey is not None: + return + try: + self.privatekey = load_privatekey( + path=self.privatekey_path, + content=self.privatekey_content, + passphrase=self.privatekey_passphrase, + backend=self.backend, + ) + except OpenSSLBadPassphraseError as exc: + raise CertificateSigningRequestError(exc) + + @abc.abstractmethod + def _check_csr(self): + """Check whether provided parameters, assuming self.existing_csr and self.privatekey have been populated.""" + pass + + def needs_regeneration(self): + """Check whether a regeneration is necessary.""" + if self.existing_csr_bytes is None: + return True + try: + self.existing_csr = load_certificate_request(None, content=self.existing_csr_bytes, backend=self.backend) + except Exception as dummy: + return True + self._ensure_private_key_loaded() + return not self._check_csr() + + def dump(self, include_csr): + """Serialize the object into a dictionary.""" + result = { + 'privatekey': self.privatekey_path, + 'subject': self.subject, + 'subjectAltName': self.subjectAltName, + 'keyUsage': self.keyUsage, + 'extendedKeyUsage': self.extendedKeyUsage, + 'basicConstraints': self.basicConstraints, + 'ocspMustStaple': self.ocspMustStaple, + 'name_constraints_permitted': self.name_constraints_permitted, + 'name_constraints_excluded': self.name_constraints_excluded, + } + if include_csr: + # Get hold of CSR bytes + csr_bytes = self.existing_csr_bytes + if self.csr is not None: + csr_bytes = self.get_csr_data() + # Store result + result['csr'] = csr_bytes.decode('utf-8') if csr_bytes else None + return result + + +# Implementation with using pyOpenSSL +class CertificateSigningRequestPyOpenSSLBackend(CertificateSigningRequestBackend): + def __init__(self, module): + if module.params['create_subject_key_identifier']: + module.fail_json(msg='You cannot use create_subject_key_identifier with the pyOpenSSL backend!') + for o in ('subject_key_identifier', 'authority_key_identifier', 'authority_cert_issuer', 'authority_cert_serial_number'): + if module.params[o] is not None: + module.fail_json(msg='You cannot use {0} with the pyOpenSSL backend!'.format(o)) + super(CertificateSigningRequestPyOpenSSLBackend, self).__init__(module, 'pyopenssl') + + def generate_csr(self): + """(Re-)Generate private key.""" + self._ensure_private_key_loaded() + + req = crypto.X509Req() + req.set_version(self.version - 1) + subject = req.get_subject() + for entry in self.subject: + if entry[1] is not None: + # Workaround for https://github.com/pyca/pyopenssl/issues/165 + nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(entry[0])) + if nid == 0: + raise CertificateSigningRequestError('Unknown subject field identifier "{0}"'.format(entry[0])) + res = OpenSSL._util.lib.X509_NAME_add_entry_by_NID(subject._name, nid, OpenSSL._util.lib.MBSTRING_UTF8, to_bytes(entry[1]), -1, -1, 0) + if res == 0: + raise CertificateSigningRequestError('Invalid value for subject field identifier "{0}": {1}'.format(entry[0], entry[1])) + + extensions = [] + if self.subjectAltName: + altnames = ', '.join(self.subjectAltName) + try: + extensions.append(crypto.X509Extension(b"subjectAltName", self.subjectAltName_critical, altnames.encode('ascii'))) + except OpenSSL.crypto.Error as e: + raise CertificateSigningRequestError( + 'Error while parsing Subject Alternative Names {0} (check for missing type prefix, such as "DNS:"!): {1}'.format( + ', '.join(["{0}".format(san) for san in self.subjectAltName]), str(e) + ) + ) + + if self.keyUsage: + usages = ', '.join(self.keyUsage) + extensions.append(crypto.X509Extension(b"keyUsage", self.keyUsage_critical, usages.encode('ascii'))) + + if self.extendedKeyUsage: + usages = ', '.join(self.extendedKeyUsage) + extensions.append(crypto.X509Extension(b"extendedKeyUsage", self.extendedKeyUsage_critical, usages.encode('ascii'))) + + if self.basicConstraints: + usages = ', '.join(self.basicConstraints) + extensions.append(crypto.X509Extension(b"basicConstraints", self.basicConstraints_critical, usages.encode('ascii'))) + + if self.name_constraints_permitted or self.name_constraints_excluded: + usages = ', '.join( + ['permitted;{0}'.format(name) for name in self.name_constraints_permitted] + + ['excluded;{0}'.format(name) for name in self.name_constraints_excluded] + ) + extensions.append(crypto.X509Extension(b"nameConstraints", self.name_constraints_critical, usages.encode('ascii'))) + + if self.ocspMustStaple: + extensions.append(crypto.X509Extension(OPENSSL_MUST_STAPLE_NAME, self.ocspMustStaple_critical, OPENSSL_MUST_STAPLE_VALUE)) + + if extensions: + req.add_extensions(extensions) + + req.set_pubkey(self.privatekey) + req.sign(self.privatekey, self.digest) + self.csr = req + + def get_csr_data(self): + """Return bytes for self.csr.""" + return crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.csr) + + def _check_csr(self): + def _check_subject(csr): + subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in self.subject] + current_subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in csr.get_subject().get_components()] + if not set(subject) == set(current_subject): + return False + + return True + + def _check_subjectAltName(extensions): + altnames_ext = next((ext for ext in extensions if ext.get_short_name() == b'subjectAltName'), '') + altnames = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in + to_text(altnames_ext, errors='surrogate_or_strict').split(',') if altname.strip()] + if self.subjectAltName: + if (set(altnames) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.subjectAltName]) or + altnames_ext.get_critical() != self.subjectAltName_critical): + return False + else: + if altnames: + return False + + return True + + def _check_keyUsage_(extensions, extName, expected, critical): + usages_ext = [ext for ext in extensions if ext.get_short_name() == extName] + if (not usages_ext and expected) or (usages_ext and not expected): + return False + elif not usages_ext and not expected: + return True + else: + current = [OpenSSL._util.lib.OBJ_txt2nid(to_bytes(usage.strip())) for usage in str(usages_ext[0]).split(',')] + expected = [OpenSSL._util.lib.OBJ_txt2nid(to_bytes(usage)) for usage in expected] + return set(current) == set(expected) and usages_ext[0].get_critical() == critical + + def _check_keyUsage(extensions): + usages_ext = [ext for ext in extensions if ext.get_short_name() == b'keyUsage'] + if (not usages_ext and self.keyUsage) or (usages_ext and not self.keyUsage): + return False + elif not usages_ext and not self.keyUsage: + return True + else: + # OpenSSL._util.lib.OBJ_txt2nid() always returns 0 for all keyUsage values + # (since keyUsage has a fixed bitfield for these values and is not extensible). + # Therefore, we create an extension for the wanted values, and compare the + # data of the extensions (which is the serialized bitfield). + expected_ext = crypto.X509Extension(b"keyUsage", False, ', '.join(self.keyUsage).encode('ascii')) + return usages_ext[0].get_data() == expected_ext.get_data() and usages_ext[0].get_critical() == self.keyUsage_critical + + def _check_extenededKeyUsage(extensions): + return _check_keyUsage_(extensions, b'extendedKeyUsage', self.extendedKeyUsage, self.extendedKeyUsage_critical) + + def _check_basicConstraints(extensions): + return _check_keyUsage_(extensions, b'basicConstraints', self.basicConstraints, self.basicConstraints_critical) + + def _check_nameConstraints(extensions): + nc_ext = next((ext for ext in extensions if ext.get_short_name() == b'nameConstraints'), '') + permitted, excluded = pyopenssl_parse_name_constraints(nc_ext) + if self.name_constraints_permitted or self.name_constraints_excluded: + if set(permitted) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.name_constraints_permitted]): + return False + if set(excluded) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.name_constraints_excluded]): + return False + if nc_ext.get_critical() != self.name_constraints_critical: + return False + else: + if permitted or excluded: + return False + + return True + + def _check_ocspMustStaple(extensions): + oms_ext = [ext for ext in extensions if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE] + if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000: + # Older versions of libssl don't know about OCSP Must Staple + oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05']) + if self.ocspMustStaple: + return len(oms_ext) > 0 and oms_ext[0].get_critical() == self.ocspMustStaple_critical + else: + return len(oms_ext) == 0 + + def _check_extensions(csr): + extensions = csr.get_extensions() + return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and + _check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and + _check_ocspMustStaple(extensions) and _check_nameConstraints(extensions)) + + def _check_signature(csr): + try: + return csr.verify(self.privatekey) + except crypto.Error: + return False + + return _check_subject(self.existing_csr) and _check_extensions(self.existing_csr) and _check_signature(self.existing_csr) + + +# Implementation with using cryptography +class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBackend): + def __init__(self, module): + super(CertificateSigningRequestCryptographyBackend, self).__init__(module, 'cryptography') + self.cryptography_backend = cryptography.hazmat.backends.default_backend() + if self.version != 1: + module.warn('The cryptography backend only supports version 1. (The only valid value according to RFC 2986.)') + + def generate_csr(self): + """(Re-)Generate private key.""" + self._ensure_private_key_loaded() + + csr = cryptography.x509.CertificateSigningRequestBuilder() + try: + csr = csr.subject_name(cryptography.x509.Name([ + cryptography.x509.NameAttribute(cryptography_name_to_oid(entry[0]), to_text(entry[1])) for entry in self.subject + ])) + except ValueError as e: + raise CertificateSigningRequestError(e) + + if self.subjectAltName: + csr = csr.add_extension(cryptography.x509.SubjectAlternativeName([ + cryptography_get_name(name) for name in self.subjectAltName + ]), critical=self.subjectAltName_critical) + + if self.keyUsage: + params = cryptography_parse_key_usage_params(self.keyUsage) + csr = csr.add_extension(cryptography.x509.KeyUsage(**params), critical=self.keyUsage_critical) + + if self.extendedKeyUsage: + usages = [cryptography_name_to_oid(usage) for usage in self.extendedKeyUsage] + csr = csr.add_extension(cryptography.x509.ExtendedKeyUsage(usages), critical=self.extendedKeyUsage_critical) + + if self.basicConstraints: + params = {} + ca, path_length = cryptography_get_basic_constraints(self.basicConstraints) + csr = csr.add_extension(cryptography.x509.BasicConstraints(ca, path_length), critical=self.basicConstraints_critical) + + if self.ocspMustStaple: + try: + # This only works with cryptography >= 2.1 + csr = csr.add_extension(cryptography.x509.TLSFeature([cryptography.x509.TLSFeatureType.status_request]), critical=self.ocspMustStaple_critical) + except AttributeError as dummy: + csr = csr.add_extension( + cryptography.x509.UnrecognizedExtension(CRYPTOGRAPHY_MUST_STAPLE_NAME, CRYPTOGRAPHY_MUST_STAPLE_VALUE), + critical=self.ocspMustStaple_critical + ) + + if self.name_constraints_permitted or self.name_constraints_excluded: + try: + csr = csr.add_extension(cryptography.x509.NameConstraints( + [cryptography_get_name(name) for name in self.name_constraints_permitted], + [cryptography_get_name(name) for name in self.name_constraints_excluded], + ), critical=self.name_constraints_critical) + except TypeError as e: + raise OpenSSLObjectError('Error while parsing name constraint: {0}'.format(e)) + + if self.create_subject_key_identifier: + csr = csr.add_extension( + cryptography.x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()), + critical=False + ) + elif self.subject_key_identifier is not None: + csr = csr.add_extension(cryptography.x509.SubjectKeyIdentifier(self.subject_key_identifier), critical=False) + + if self.authority_key_identifier is not None or self.authority_cert_issuer is not None or self.authority_cert_serial_number is not None: + issuers = None + if self.authority_cert_issuer is not None: + issuers = [cryptography_get_name(n) for n in self.authority_cert_issuer] + csr = csr.add_extension( + cryptography.x509.AuthorityKeyIdentifier(self.authority_key_identifier, issuers, self.authority_cert_serial_number), + critical=False + ) + + digest = None + if cryptography_key_needs_digest_for_signing(self.privatekey): + digest = select_message_digest(self.digest) + if digest is None: + raise CertificateSigningRequestError('Unsupported digest "{0}"'.format(self.digest)) + try: + self.csr = csr.sign(self.privatekey, digest, self.cryptography_backend) + except TypeError as e: + if str(e) == 'Algorithm must be a registered hash algorithm.' and digest is None: + self.module.fail_json(msg='Signing with Ed25519 and Ed448 keys requires cryptography 2.8 or newer.') + raise + except UnicodeError as e: + # This catches IDNAErrors, which happens when a bad name is passed as a SAN + # (https://github.com/ansible-collections/community.crypto/issues/105). + # For older cryptography versions, this is handled by idna, which raises + # an idna.core.IDNAError. Later versions of cryptography deprecated and stopped + # requiring idna, whence we cannot easily handle this error. Fortunately, in + # most versions of idna, IDNAError extends UnicodeError. There is only version + # 2.3 where it extends Exception instead (see + # https://github.com/kjd/idna/commit/ebefacd3134d0f5da4745878620a6a1cba86d130 + # and then + # https://github.com/kjd/idna/commit/ea03c7b5db7d2a99af082e0239da2b68aeea702a). + msg = 'Error while creating CSR: {0}\n'.format(e) + if self.using_common_name_for_san: + self.module.fail_json(msg=msg + 'This is probably caused because the Common Name is used as a SAN.' + ' Specifying use_common_name_for_san=false might fix this.') + self.module.fail_json(msg=msg + 'This is probably caused by an invalid Subject Alternative DNS Name.') + + def get_csr_data(self): + """Return bytes for self.csr.""" + return self.csr.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.PEM) + + def _check_csr(self): + """Check whether provided parameters, assuming self.existing_csr and self.privatekey have been populated.""" + def _check_subject(csr): + subject = [(cryptography_name_to_oid(entry[0]), entry[1]) for entry in self.subject] + current_subject = [(sub.oid, sub.value) for sub in csr.subject] + return set(subject) == set(current_subject) + + def _find_extension(extensions, exttype): + return next( + (ext for ext in extensions if isinstance(ext.value, exttype)), + None + ) + + def _check_subjectAltName(extensions): + current_altnames_ext = _find_extension(extensions, cryptography.x509.SubjectAlternativeName) + current_altnames = [str(altname) for altname in current_altnames_ext.value] if current_altnames_ext else [] + altnames = [str(cryptography_get_name(altname)) for altname in self.subjectAltName] if self.subjectAltName else [] + if set(altnames) != set(current_altnames): + return False + if altnames: + if current_altnames_ext.critical != self.subjectAltName_critical: + return False + return True + + def _check_keyUsage(extensions): + current_keyusage_ext = _find_extension(extensions, cryptography.x509.KeyUsage) + if not self.keyUsage: + return current_keyusage_ext is None + elif current_keyusage_ext is None: + return False + params = cryptography_parse_key_usage_params(self.keyUsage) + for param in params: + if getattr(current_keyusage_ext.value, '_' + param) != params[param]: + return False + if current_keyusage_ext.critical != self.keyUsage_critical: + return False + return True + + def _check_extenededKeyUsage(extensions): + current_usages_ext = _find_extension(extensions, cryptography.x509.ExtendedKeyUsage) + current_usages = [str(usage) for usage in current_usages_ext.value] if current_usages_ext else [] + usages = [str(cryptography_name_to_oid(usage)) for usage in self.extendedKeyUsage] if self.extendedKeyUsage else [] + if set(current_usages) != set(usages): + return False + if usages: + if current_usages_ext.critical != self.extendedKeyUsage_critical: + return False + return True + + def _check_basicConstraints(extensions): + bc_ext = _find_extension(extensions, cryptography.x509.BasicConstraints) + current_ca = bc_ext.value.ca if bc_ext else False + current_path_length = bc_ext.value.path_length if bc_ext else None + ca, path_length = cryptography_get_basic_constraints(self.basicConstraints) + # Check CA flag + if ca != current_ca: + return False + # Check path length + if path_length != current_path_length: + return False + # Check criticality + if self.basicConstraints: + if bc_ext.critical != self.basicConstraints_critical: + return False + return True + + def _check_ocspMustStaple(extensions): + try: + # This only works with cryptography >= 2.1 + tlsfeature_ext = _find_extension(extensions, cryptography.x509.TLSFeature) + has_tlsfeature = True + except AttributeError as dummy: + tlsfeature_ext = next( + (ext for ext in extensions if ext.value.oid == CRYPTOGRAPHY_MUST_STAPLE_NAME), + None + ) + has_tlsfeature = False + if self.ocspMustStaple: + if not tlsfeature_ext or tlsfeature_ext.critical != self.ocspMustStaple_critical: + return False + if has_tlsfeature: + return cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value + else: + return tlsfeature_ext.value.value == CRYPTOGRAPHY_MUST_STAPLE_VALUE + else: + return tlsfeature_ext is None + + def _check_nameConstraints(extensions): + current_nc_ext = _find_extension(extensions, cryptography.x509.NameConstraints) + current_nc_perm = [str(altname) for altname in current_nc_ext.value.permitted_subtrees] if current_nc_ext else [] + current_nc_excl = [str(altname) for altname in current_nc_ext.value.excluded_subtrees] if current_nc_ext else [] + nc_perm = [str(cryptography_get_name(altname)) for altname in self.name_constraints_permitted] + nc_excl = [str(cryptography_get_name(altname)) for altname in self.name_constraints_excluded] + if set(nc_perm) != set(current_nc_perm) or set(nc_excl) != set(current_nc_excl): + return False + if nc_perm or nc_excl: + if current_nc_ext.critical != self.name_constraints_critical: + return False + return True + + def _check_subject_key_identifier(extensions): + ext = _find_extension(extensions, cryptography.x509.SubjectKeyIdentifier) + if self.create_subject_key_identifier or self.subject_key_identifier is not None: + if not ext or ext.critical: + return False + if self.create_subject_key_identifier: + digest = cryptography.x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()).digest + return ext.value.digest == digest + else: + return ext.value.digest == self.subject_key_identifier + else: + return ext is None + + def _check_authority_key_identifier(extensions): + ext = _find_extension(extensions, cryptography.x509.AuthorityKeyIdentifier) + if self.authority_key_identifier is not None or self.authority_cert_issuer is not None or self.authority_cert_serial_number is not None: + if not ext or ext.critical: + return False + aci = None + csr_aci = None + if self.authority_cert_issuer is not None: + aci = [str(cryptography_get_name(n)) for n in self.authority_cert_issuer] + if ext.value.authority_cert_issuer is not None: + csr_aci = [str(n) for n in ext.value.authority_cert_issuer] + return (ext.value.key_identifier == self.authority_key_identifier + and csr_aci == aci + and ext.value.authority_cert_serial_number == self.authority_cert_serial_number) + else: + return ext is None + + def _check_extensions(csr): + extensions = csr.extensions + return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and + _check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and + _check_ocspMustStaple(extensions) and _check_subject_key_identifier(extensions) and + _check_authority_key_identifier(extensions) and _check_nameConstraints(extensions)) + + def _check_signature(csr): + if not csr.is_signature_valid: + return False + # To check whether public key of CSR belongs to private key, + # encode both public keys and compare PEMs. + key_a = csr.public_key().public_bytes( + cryptography.hazmat.primitives.serialization.Encoding.PEM, + cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo + ) + key_b = self.privatekey.public_key().public_bytes( + cryptography.hazmat.primitives.serialization.Encoding.PEM, + cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo + ) + return key_a == key_b + + return _check_subject(self.existing_csr) and _check_extensions(self.existing_csr) and _check_signature(self.existing_csr) + + +def select_backend(module, backend): + if module.params['version'] != 1: + module.deprecate('The version option will only support allowed values from community.crypto 2.0.0 on. ' + 'Currently, only the value 1 is allowed by RFC 2986', + version='2.0.0', collection_name='community.crypto') + + if backend == 'auto': + # Detection what is possible + can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) + can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) + + # First try cryptography, then pyOpenSSL + if can_use_cryptography: + backend = 'cryptography' + elif can_use_pyopenssl: + backend = 'pyopenssl' + + # Success? + if backend == 'auto': + module.fail_json(msg=("Can't detect any of the required Python libraries " + "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( + MINIMAL_CRYPTOGRAPHY_VERSION, + MINIMAL_PYOPENSSL_VERSION)) + + if backend == 'pyopenssl': + if not PYOPENSSL_FOUND: + module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)), + exception=PYOPENSSL_IMP_ERR) + try: + getattr(crypto.X509Req, 'get_extensions') + except AttributeError: + module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs') + + module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', + version='2.0.0', collection_name='community.crypto') + return backend, CertificateSigningRequestPyOpenSSLBackend(module) + elif backend == 'cryptography': + if not CRYPTOGRAPHY_FOUND: + module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), + exception=CRYPTOGRAPHY_IMP_ERR) + return backend, CertificateSigningRequestCryptographyBackend(module) + else: + raise Exception('Unsupported value for backend: {0}'.format(backend)) + + +def get_csr_argument_spec(): + return ArgumentSpec( + argument_spec=dict( + digest=dict(type='str', default='sha256'), + privatekey_path=dict(type='path'), + privatekey_content=dict(type='str', no_log=True), + privatekey_passphrase=dict(type='str', no_log=True), + version=dict(type='int', default=1), + subject=dict(type='dict'), + country_name=dict(type='str', aliases=['C', 'countryName']), + state_or_province_name=dict(type='str', aliases=['ST', 'stateOrProvinceName']), + locality_name=dict(type='str', aliases=['L', 'localityName']), + organization_name=dict(type='str', aliases=['O', 'organizationName']), + organizational_unit_name=dict(type='str', aliases=['OU', 'organizationalUnitName']), + common_name=dict(type='str', aliases=['CN', 'commonName']), + email_address=dict(type='str', aliases=['E', 'emailAddress']), + subject_alt_name=dict(type='list', elements='str', aliases=['subjectAltName']), + subject_alt_name_critical=dict(type='bool', default=False, aliases=['subjectAltName_critical']), + use_common_name_for_san=dict(type='bool', default=True, aliases=['useCommonNameForSAN']), + key_usage=dict(type='list', elements='str', aliases=['keyUsage']), + key_usage_critical=dict(type='bool', default=False, aliases=['keyUsage_critical']), + extended_key_usage=dict(type='list', elements='str', aliases=['extKeyUsage', 'extendedKeyUsage']), + extended_key_usage_critical=dict(type='bool', default=False, aliases=['extKeyUsage_critical', 'extendedKeyUsage_critical']), + basic_constraints=dict(type='list', elements='str', aliases=['basicConstraints']), + basic_constraints_critical=dict(type='bool', default=False, aliases=['basicConstraints_critical']), + ocsp_must_staple=dict(type='bool', default=False, aliases=['ocspMustStaple']), + ocsp_must_staple_critical=dict(type='bool', default=False, aliases=['ocspMustStaple_critical']), + name_constraints_permitted=dict(type='list', elements='str'), + name_constraints_excluded=dict(type='list', elements='str'), + name_constraints_critical=dict(type='bool', default=False), + create_subject_key_identifier=dict(type='bool', default=False), + subject_key_identifier=dict(type='str'), + authority_key_identifier=dict(type='str'), + authority_cert_issuer=dict(type='list', elements='str'), + authority_cert_serial_number=dict(type='int'), + select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), + ), + required_together=[ + ['authority_cert_issuer', 'authority_cert_serial_number'], + ], + mutually_exclusive=[ + ['privatekey_path', 'privatekey_content'], + ], + required_one_of=[ + ['privatekey_path', 'privatekey_content'], + ], + ) diff --git a/plugins/modules/openssl_csr.py b/plugins/modules/openssl_csr.py index dcbc5209..031fa962 100644 --- a/plugins/modules/openssl_csr.py +++ b/plugins/modules/openssl_csr.py @@ -1,7 +1,8 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -# Copyrigt: (c) 2017, Yanis Guenane +# Copyright: (c) 2017, Yanis Guenane +# Copyright: (c) 2020, Felix Fontein # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import absolute_import, division, print_function @@ -13,22 +14,12 @@ DOCUMENTATION = r''' module: openssl_csr short_description: Generate OpenSSL Certificate Signing Request (CSR) description: - - This module allows one to (re)generate OpenSSL certificate signing requests. - - It uses the pyOpenSSL python library to interact with openssl. This module supports - the subjectAltName, keyUsage, extendedKeyUsage, basicConstraints and OCSP Must Staple - extensions. - - "Please note that the module regenerates existing CSR if it doesn't match the module's + - "Please note that the module regenerates an existing CSR if it doesn't match the module's options, or if it seems to be corrupt. If you are concerned that this could overwrite your existing CSR, consider using the I(backup) option." - - "The module can use the cryptography Python library, or the pyOpenSSL Python - library. By default, it tries to detect which one is available. This can be - overridden with the I(select_crypto_backend) option. Please note that the - PyOpenSSL backend was deprecated in Ansible 2.9 and will be removed in community.crypto 2.0.0." -requirements: - - Either cryptography >= 1.3 - - Or pyOpenSSL >= 0.15 author: - Yanis Guenane (@Spredzy) +- Felix Fontein (@felixfontein) options: state: description: @@ -36,35 +27,6 @@ options: type: str default: present choices: [ absent, present ] - digest: - description: - - The digest used when signing the certificate signing request with the private key. - type: str - default: sha256 - privatekey_path: - description: - - The path to the private key to use when signing the certificate signing request. - - Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both. - type: path - privatekey_content: - description: - - The content of the private key to use when signing the certificate signing request. - - Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both. - type: str - version_added: "1.0.0" - privatekey_passphrase: - description: - - The passphrase for the private key. - - This is required if the private key is password protected. - type: str - version: - description: - - The version of the certificate signing request. - - "The only allowed value according to L(RFC 2986,https://tools.ietf.org/html/rfc2986#section-4.1) - is 1." - - This option will no longer accept unsupported values from Ansible 2.14 on. - type: int - default: 1 force: description: - Should the certificate signing request be forced regenerated by this ansible module. @@ -75,235 +37,31 @@ options: - The name of the file into which the generated OpenSSL certificate signing request will be written. type: path required: true - subject: - description: - - Key/value pairs that will be present in the subject name field of the certificate signing request. - - If you need to specify more than one value with the same key, use a list as value. - type: dict - country_name: - description: - - The countryName field of the certificate signing request subject. - type: str - aliases: [ C, countryName ] - state_or_province_name: - description: - - The stateOrProvinceName field of the certificate signing request subject. - type: str - aliases: [ ST, stateOrProvinceName ] - locality_name: - description: - - The localityName field of the certificate signing request subject. - type: str - aliases: [ L, localityName ] - organization_name: - description: - - The organizationName field of the certificate signing request subject. - type: str - aliases: [ O, organizationName ] - organizational_unit_name: - description: - - The organizationalUnitName field of the certificate signing request subject. - type: str - aliases: [ OU, organizationalUnitName ] - common_name: - description: - - The commonName field of the certificate signing request subject. - type: str - aliases: [ CN, commonName ] - email_address: - description: - - The emailAddress field of the certificate signing request subject. - type: str - aliases: [ E, emailAddress ] - subject_alt_name: - description: - - SAN extension to attach to the certificate signing request. - - This can either be a 'comma separated string' or a YAML list. - - Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName), - C(otherName) and the ones specific to your CA). - - Note that if no SAN is specified, but a common name, the common - name will be added as a SAN except if C(useCommonNameForSAN) is - set to I(false). - - More at U(https://tools.ietf.org/html/rfc5280#section-4.2.1.6). - type: list - elements: str - aliases: [ subjectAltName ] - subject_alt_name_critical: - description: - - Should the subjectAltName extension be considered as critical. - type: bool - aliases: [ subjectAltName_critical ] - use_common_name_for_san: - description: - - If set to C(yes), the module will fill the common name in for - C(subject_alt_name) with C(DNS:) prefix if no SAN is specified. - type: bool - default: yes - aliases: [ useCommonNameForSAN ] - key_usage: - description: - - This defines the purpose (e.g. encipherment, signature, certificate signing) - of the key contained in the certificate. - type: list - elements: str - aliases: [ keyUsage ] - key_usage_critical: - description: - - Should the keyUsage extension be considered as critical. - type: bool - aliases: [ keyUsage_critical ] - extended_key_usage: - description: - - Additional restrictions (e.g. client authentication, server authentication) - on the allowed purposes for which the public key may be used. - type: list - elements: str - aliases: [ extKeyUsage, extendedKeyUsage ] - extended_key_usage_critical: - description: - - Should the extkeyUsage extension be considered as critical. - type: bool - aliases: [ extKeyUsage_critical, extendedKeyUsage_critical ] - basic_constraints: - description: - - Indicates basic constraints, such as if the certificate is a CA. - type: list - elements: str - aliases: [ basicConstraints ] - basic_constraints_critical: - description: - - Should the basicConstraints extension be considered as critical. - type: bool - aliases: [ basicConstraints_critical ] - ocsp_must_staple: - description: - - Indicates that the certificate should contain the OCSP Must Staple - extension (U(https://tools.ietf.org/html/rfc7633)). - type: bool - aliases: [ ocspMustStaple ] - ocsp_must_staple_critical: - description: - - Should the OCSP Must Staple extension be considered as critical. - - Note that according to the RFC, this extension should not be marked - as critical, as old clients not knowing about OCSP Must Staple - are required to reject such certificates - (see U(https://tools.ietf.org/html/rfc7633#section-4)). - type: bool - aliases: [ ocspMustStaple_critical ] - name_constraints_permitted: - description: - - For CA certificates, this specifies a list of identifiers which describe - subtrees of names that this CA is allowed to issue certificates for. - - Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName), - C(otherName) and the ones specific to your CA). - type: list - elements: str - version_added: 1.1.0 - name_constraints_excluded: - description: - - For CA certificates, this specifies a list of identifiers which describe - subtrees of names that this CA is *not* allowed to issue certificates for. - - Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName), - C(otherName) and the ones specific to your CA). - type: list - elements: str - version_added: 1.1.0 - name_constraints_critical: - description: - - Should the Name Constraints extension be considered as critical. - type: bool - version_added: 1.1.0 - select_crypto_backend: - description: - - Determines which crypto backend to use. - - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl). - - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library. - - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. - - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in community.crypto 2.0.0. - From that point on, only the C(cryptography) backend will be available. - type: str - default: auto - choices: [ auto, cryptography, pyopenssl ] backup: description: - Create a backup file including a timestamp so you can get the original CSR back if you overwrote it with a new one by accident. type: bool default: no - create_subject_key_identifier: - description: - - Create the Subject Key Identifier from the public key. - - "Please note that commercial CAs can ignore the value, respectively use a value of - their own choice instead. Specifying this option is mostly useful for self-signed - certificates or for own CAs." - - Note that this is only supported if the C(cryptography) backend is used! - type: bool - default: no - subject_key_identifier: - description: - - The subject key identifier as a hex string, where two bytes are separated by colons. - - "Example: C(00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33)" - - "Please note that commercial CAs ignore this value, respectively use a value of their - own choice. Specifying this option is mostly useful for self-signed certificates - or for own CAs." - - Note that this option can only be used if I(create_subject_key_identifier) is C(no). - - Note that this is only supported if the C(cryptography) backend is used! - type: str - authority_key_identifier: - description: - - The authority key identifier as a hex string, where two bytes are separated by colons. - - "Example: C(00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33)" - - If specified, I(authority_cert_issuer) must also be specified. - - "Please note that commercial CAs ignore this value, respectively use a value of their - own choice. Specifying this option is mostly useful for self-signed certificates - or for own CAs." - - Note that this is only supported if the C(cryptography) backend is used! - - The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier), - I(authority_cert_issuer) and I(authority_cert_serial_number) is specified. - type: str - authority_cert_issuer: - description: - - Names that will be present in the authority cert issuer field of the certificate signing request. - - Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName), - C(otherName) and the ones specific to your CA) - - "Example: C(DNS:ca.example.org)" - - If specified, I(authority_key_identifier) must also be specified. - - "Please note that commercial CAs ignore this value, respectively use a value of their - own choice. Specifying this option is mostly useful for self-signed certificates - or for own CAs." - - Note that this is only supported if the C(cryptography) backend is used! - - The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier), - I(authority_cert_issuer) and I(authority_cert_serial_number) is specified. - type: list - elements: str - authority_cert_serial_number: - description: - - The authority cert serial number. - - Note that this is only supported if the C(cryptography) backend is used! - - "Please note that commercial CAs ignore this value, respectively use a value of their - own choice. Specifying this option is mostly useful for self-signed certificates - or for own CAs." - - The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier), - I(authority_cert_issuer) and I(authority_cert_serial_number) is specified. - type: int return_content: description: - If set to C(yes), will return the (current or generated) CSR's content as I(csr). type: bool default: no version_added: "1.0.0" + privatekey_content: + version_added: "1.0.0" + name_constraints_permitted: + version_added: 1.1.0 + name_constraints_excluded: + version_added: 1.1.0 + name_constraints_critical: + version_added: 1.1.0 extends_documentation_fragment: -- files -notes: - - If the certificate signing request already exists it will be checked whether subjectAltName, - keyUsage, extendedKeyUsage and basicConstraints only contain the requested values, whether - OCSP Must Staple is as requested, and if the request was signed by the given private key. +- ansible.builtin.files +- community.crypto.module_csr seealso: -- module: community.crypto.x509_certificate -- module: community.crypto.openssl_dhparam -- module: community.crypto.openssl_pkcs12 -- module: community.crypto.openssl_privatekey -- module: community.crypto.openssl_publickey +- module: community.crypto.openssl_csr_pipe ''' EXAMPLES = r''' @@ -461,15 +219,14 @@ csr: version_added: "1.0.0" ''' -import abc -import binascii import os -import traceback -from distutils.version import LooseVersion +from ansible.module_utils._text import to_native -from ansible.module_utils.basic import AnsibleModule, missing_required_lib -from ansible.module_utils._text import to_native, to_bytes, to_text +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.csr import ( + select_backend, + get_csr_argument_spec, +) from ansible_collections.community.crypto.plugins.module_utils.io import ( load_file_if_exists, @@ -478,796 +235,89 @@ from ansible_collections.community.crypto.plugins.module_utils.io import ( from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( OpenSSLObjectError, - OpenSSLBadPassphraseError, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( OpenSSLObject, - load_privatekey, - load_certificate_request, - parse_name_field, ) -from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( - cryptography_get_basic_constraints, - cryptography_get_name, - cryptography_name_to_oid, - cryptography_key_needs_digest_for_signing, - cryptography_parse_key_usage_params, -) -from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import ( - pyopenssl_normalize_name_attribute, - pyopenssl_parse_name_constraints, -) +class CertificateSigningRequestModule(OpenSSLObject): -MINIMAL_PYOPENSSL_VERSION = '0.15' -MINIMAL_CRYPTOGRAPHY_VERSION = '1.3' - -PYOPENSSL_IMP_ERR = None -try: - import OpenSSL - from OpenSSL import crypto - PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) -except ImportError: - PYOPENSSL_IMP_ERR = traceback.format_exc() - PYOPENSSL_FOUND = False -else: - PYOPENSSL_FOUND = True - if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: - # OpenSSL 1.1.0 or newer - OPENSSL_MUST_STAPLE_NAME = b"tlsfeature" - OPENSSL_MUST_STAPLE_VALUE = b"status_request" - else: - # OpenSSL 1.0.x or older - OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24" - OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05" - -CRYPTOGRAPHY_IMP_ERR = None -try: - import cryptography - import cryptography.x509 - import cryptography.x509.oid - import cryptography.exceptions - import cryptography.hazmat.backends - import cryptography.hazmat.primitives.serialization - import cryptography.hazmat.primitives.hashes - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) -except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True - CRYPTOGRAPHY_MUST_STAPLE_NAME = cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24") - CRYPTOGRAPHY_MUST_STAPLE_VALUE = b"\x30\x03\x02\x01\x05" - - -class CertificateSigningRequestError(OpenSSLObjectError): - pass - - -class CertificateSigningRequestBase(OpenSSLObject): - - def __init__(self, module): - super(CertificateSigningRequestBase, self).__init__( + def __init__(self, module, module_backend): + super(CertificateSigningRequestModule, self).__init__( module.params['path'], module.params['state'], module.params['force'], module.check_mode ) - self.digest = module.params['digest'] - self.privatekey_path = module.params['privatekey_path'] - self.privatekey_content = module.params['privatekey_content'] - if self.privatekey_content is not None: - self.privatekey_content = self.privatekey_content.encode('utf-8') - self.privatekey_passphrase = module.params['privatekey_passphrase'] - self.version = module.params['version'] - self.subjectAltName = module.params['subject_alt_name'] - self.subjectAltName_critical = module.params['subject_alt_name_critical'] - self.keyUsage = module.params['key_usage'] - self.keyUsage_critical = module.params['key_usage_critical'] - self.extendedKeyUsage = module.params['extended_key_usage'] - self.extendedKeyUsage_critical = module.params['extended_key_usage_critical'] - self.basicConstraints = module.params['basic_constraints'] - self.basicConstraints_critical = module.params['basic_constraints_critical'] - self.ocspMustStaple = module.params['ocsp_must_staple'] - self.ocspMustStaple_critical = module.params['ocsp_must_staple_critical'] - self.name_constraints_permitted = module.params['name_constraints_permitted'] or [] - self.name_constraints_excluded = module.params['name_constraints_excluded'] or [] - self.name_constraints_critical = module.params['name_constraints_critical'] - self.create_subject_key_identifier = module.params['create_subject_key_identifier'] - self.subject_key_identifier = module.params['subject_key_identifier'] - self.authority_key_identifier = module.params['authority_key_identifier'] - self.authority_cert_issuer = module.params['authority_cert_issuer'] - self.authority_cert_serial_number = module.params['authority_cert_serial_number'] - self.request = None - self.privatekey = None - self.csr_bytes = None + self.module_backend = module_backend self.return_content = module.params['return_content'] - if self.create_subject_key_identifier and self.subject_key_identifier is not None: - module.fail_json(msg='subject_key_identifier cannot be specified if create_subject_key_identifier is true') - self.backup = module.params['backup'] self.backup_file = None - self.subject = [ - ('C', module.params['country_name']), - ('ST', module.params['state_or_province_name']), - ('L', module.params['locality_name']), - ('O', module.params['organization_name']), - ('OU', module.params['organizational_unit_name']), - ('CN', module.params['common_name']), - ('emailAddress', module.params['email_address']), - ] - - if module.params['subject']: - self.subject = self.subject + parse_name_field(module.params['subject']) - self.subject = [(entry[0], entry[1]) for entry in self.subject if entry[1]] - - self.using_common_name_for_san = False - if not self.subjectAltName and module.params['use_common_name_for_san']: - for sub in self.subject: - if sub[0] in ('commonName', 'CN'): - self.subjectAltName = ['DNS:%s' % sub[1]] - self.using_common_name_for_san = True - break - - if self.subject_key_identifier is not None: - try: - self.subject_key_identifier = binascii.unhexlify(self.subject_key_identifier.replace(':', '')) - except Exception as e: - raise CertificateSigningRequestError('Cannot parse subject_key_identifier: {0}'.format(e)) - - if self.authority_key_identifier is not None: - try: - self.authority_key_identifier = binascii.unhexlify(self.authority_key_identifier.replace(':', '')) - except Exception as e: - raise CertificateSigningRequestError('Cannot parse authority_key_identifier: {0}'.format(e)) - - @abc.abstractmethod - def _generate_csr(self): - pass + self.module_backend.set_existing(load_file_if_exists(self.path, module)) def generate(self, module): '''Generate the certificate signing request.''' - if not self.check(module, perms_required=False) or self.force: - result = self._generate_csr() - if self.backup: - self.backup_file = module.backup_local(self.path) - if self.return_content: - self.csr_bytes = result - write_file(module, result) + if self.force or self.module_backend.needs_regeneration(): + if not self.check_mode: + self.module_backend.generate_csr() + result = self.module_backend.get_csr_data() + if self.backup: + self.backup_file = module.backup_local(self.path) + write_file(module, result) self.changed = True file_args = module.load_file_common_arguments(module.params) - if module.set_fs_attributes_if_different(file_args, False): - self.changed = True - - @abc.abstractmethod - def _load_private_key(self): - pass - - @abc.abstractmethod - def _check_csr(self): - pass - - def check(self, module, perms_required=True): - """Ensure the resource is in its desired state.""" - state_and_perms = super(CertificateSigningRequestBase, self).check(module, perms_required) - - self._load_private_key() - - if not state_and_perms: - return False - - return self._check_csr() + self.changed = module.set_fs_attributes_if_different(file_args, self.changed) def remove(self, module): - if self.backup: + self.module_backend.set_existing(None) + if self.backup and not self.check_mode: self.backup_file = module.backup_local(self.path) - super(CertificateSigningRequestBase, self).remove(module) + super(CertificateSigningRequestModule, self).remove(module) def dump(self): '''Serialize the object into a dictionary.''' - - result = { - 'privatekey': self.privatekey_path, + result = self.module_backend.dump(include_csr=self.return_content) + result.update({ 'filename': self.path, - 'subject': self.subject, - 'subjectAltName': self.subjectAltName, - 'keyUsage': self.keyUsage, - 'extendedKeyUsage': self.extendedKeyUsage, - 'basicConstraints': self.basicConstraints, - 'ocspMustStaple': self.ocspMustStaple, 'changed': self.changed, - 'name_constraints_permitted': self.name_constraints_permitted, - 'name_constraints_excluded': self.name_constraints_excluded, - } + }) if self.backup_file: result['backup_file'] = self.backup_file - if self.return_content: - if self.csr_bytes is None: - self.csr_bytes = load_file_if_exists(self.path, ignore_errors=True) - result['csr'] = self.csr_bytes.decode('utf-8') if self.csr_bytes else None - return result -class CertificateSigningRequestPyOpenSSL(CertificateSigningRequestBase): - - def __init__(self, module): - if module.params['create_subject_key_identifier']: - module.fail_json(msg='You cannot use create_subject_key_identifier with the pyOpenSSL backend!') - for o in ('subject_key_identifier', 'authority_key_identifier', 'authority_cert_issuer', 'authority_cert_serial_number'): - if module.params[o] is not None: - module.fail_json(msg='You cannot use {0} with the pyOpenSSL backend!'.format(o)) - super(CertificateSigningRequestPyOpenSSL, self).__init__(module) - - def _generate_csr(self): - req = crypto.X509Req() - req.set_version(self.version - 1) - subject = req.get_subject() - for entry in self.subject: - if entry[1] is not None: - # Workaround for https://github.com/pyca/pyopenssl/issues/165 - nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(entry[0])) - if nid == 0: - raise CertificateSigningRequestError('Unknown subject field identifier "{0}"'.format(entry[0])) - res = OpenSSL._util.lib.X509_NAME_add_entry_by_NID(subject._name, nid, OpenSSL._util.lib.MBSTRING_UTF8, to_bytes(entry[1]), -1, -1, 0) - if res == 0: - raise CertificateSigningRequestError('Invalid value for subject field identifier "{0}": {1}'.format(entry[0], entry[1])) - - extensions = [] - if self.subjectAltName: - altnames = ', '.join(self.subjectAltName) - try: - extensions.append(crypto.X509Extension(b"subjectAltName", self.subjectAltName_critical, altnames.encode('ascii'))) - except OpenSSL.crypto.Error as e: - raise CertificateSigningRequestError( - 'Error while parsing Subject Alternative Names {0} (check for missing type prefix, such as "DNS:"!): {1}'.format( - ', '.join(["{0}".format(san) for san in self.subjectAltName]), str(e) - ) - ) - - if self.keyUsage: - usages = ', '.join(self.keyUsage) - extensions.append(crypto.X509Extension(b"keyUsage", self.keyUsage_critical, usages.encode('ascii'))) - - if self.extendedKeyUsage: - usages = ', '.join(self.extendedKeyUsage) - extensions.append(crypto.X509Extension(b"extendedKeyUsage", self.extendedKeyUsage_critical, usages.encode('ascii'))) - - if self.basicConstraints: - usages = ', '.join(self.basicConstraints) - extensions.append(crypto.X509Extension(b"basicConstraints", self.basicConstraints_critical, usages.encode('ascii'))) - - if self.name_constraints_permitted or self.name_constraints_excluded: - usages = ', '.join( - ['permitted;{0}'.format(name) for name in self.name_constraints_permitted] + - ['excluded;{0}'.format(name) for name in self.name_constraints_excluded] - ) - extensions.append(crypto.X509Extension(b"nameConstraints", self.name_constraints_critical, usages.encode('ascii'))) - - if self.ocspMustStaple: - extensions.append(crypto.X509Extension(OPENSSL_MUST_STAPLE_NAME, self.ocspMustStaple_critical, OPENSSL_MUST_STAPLE_VALUE)) - - if extensions: - req.add_extensions(extensions) - - req.set_pubkey(self.privatekey) - req.sign(self.privatekey, self.digest) - self.request = req - - return crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request) - - def _load_private_key(self): - try: - self.privatekey = load_privatekey( - path=self.privatekey_path, - content=self.privatekey_content, - passphrase=self.privatekey_passphrase - ) - except OpenSSLBadPassphraseError as exc: - raise CertificateSigningRequestError(exc) - - def _check_csr(self): - def _check_subject(csr): - subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in self.subject] - current_subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in csr.get_subject().get_components()] - if not set(subject) == set(current_subject): - return False - - return True - - def _check_subjectAltName(extensions): - altnames_ext = next((ext for ext in extensions if ext.get_short_name() == b'subjectAltName'), '') - altnames = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in - to_text(altnames_ext, errors='surrogate_or_strict').split(',') if altname.strip()] - if self.subjectAltName: - if (set(altnames) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.subjectAltName]) or - altnames_ext.get_critical() != self.subjectAltName_critical): - return False - else: - if altnames: - return False - - return True - - def _check_keyUsage_(extensions, extName, expected, critical): - usages_ext = [ext for ext in extensions if ext.get_short_name() == extName] - if (not usages_ext and expected) or (usages_ext and not expected): - return False - elif not usages_ext and not expected: - return True - else: - current = [OpenSSL._util.lib.OBJ_txt2nid(to_bytes(usage.strip())) for usage in str(usages_ext[0]).split(',')] - expected = [OpenSSL._util.lib.OBJ_txt2nid(to_bytes(usage)) for usage in expected] - return set(current) == set(expected) and usages_ext[0].get_critical() == critical - - def _check_keyUsage(extensions): - usages_ext = [ext for ext in extensions if ext.get_short_name() == b'keyUsage'] - if (not usages_ext and self.keyUsage) or (usages_ext and not self.keyUsage): - return False - elif not usages_ext and not self.keyUsage: - return True - else: - # OpenSSL._util.lib.OBJ_txt2nid() always returns 0 for all keyUsage values - # (since keyUsage has a fixed bitfield for these values and is not extensible). - # Therefore, we create an extension for the wanted values, and compare the - # data of the extensions (which is the serialized bitfield). - expected_ext = crypto.X509Extension(b"keyUsage", False, ', '.join(self.keyUsage).encode('ascii')) - return usages_ext[0].get_data() == expected_ext.get_data() and usages_ext[0].get_critical() == self.keyUsage_critical - - def _check_extenededKeyUsage(extensions): - return _check_keyUsage_(extensions, b'extendedKeyUsage', self.extendedKeyUsage, self.extendedKeyUsage_critical) - - def _check_basicConstraints(extensions): - return _check_keyUsage_(extensions, b'basicConstraints', self.basicConstraints, self.basicConstraints_critical) - - def _check_nameConstraints(extensions): - nc_ext = next((ext for ext in extensions if ext.get_short_name() == b'nameConstraints'), '') - permitted, excluded = pyopenssl_parse_name_constraints(nc_ext) - if self.name_constraints_permitted or self.name_constraints_excluded: - if set(permitted) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.name_constraints_permitted]): - return False - if set(excluded) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.name_constraints_excluded]): - return False - if nc_ext.get_critical() != self.name_constraints_critical: - return False - else: - if permitted or excluded: - return False - - return True - - def _check_ocspMustStaple(extensions): - oms_ext = [ext for ext in extensions if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE] - if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000: - # Older versions of libssl don't know about OCSP Must Staple - oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05']) - if self.ocspMustStaple: - return len(oms_ext) > 0 and oms_ext[0].get_critical() == self.ocspMustStaple_critical - else: - return len(oms_ext) == 0 - - def _check_extensions(csr): - extensions = csr.get_extensions() - return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and - _check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and - _check_ocspMustStaple(extensions) and _check_nameConstraints(extensions)) - - def _check_signature(csr): - try: - return csr.verify(self.privatekey) - except crypto.Error: - return False - - try: - csr = load_certificate_request(self.path, backend='pyopenssl') - except Exception as dummy: - return False - - return _check_subject(csr) and _check_extensions(csr) and _check_signature(csr) - - -class CertificateSigningRequestCryptography(CertificateSigningRequestBase): - - def __init__(self, module): - super(CertificateSigningRequestCryptography, self).__init__(module) - self.cryptography_backend = cryptography.hazmat.backends.default_backend() - self.module = module - if self.version != 1: - module.warn('The cryptography backend only supports version 1. (The only valid value according to RFC 2986.)') - - def _generate_csr(self): - csr = cryptography.x509.CertificateSigningRequestBuilder() - try: - csr = csr.subject_name(cryptography.x509.Name([ - cryptography.x509.NameAttribute(cryptography_name_to_oid(entry[0]), to_text(entry[1])) for entry in self.subject - ])) - except ValueError as e: - raise CertificateSigningRequestError(e) - - if self.subjectAltName: - csr = csr.add_extension(cryptography.x509.SubjectAlternativeName([ - cryptography_get_name(name) for name in self.subjectAltName - ]), critical=self.subjectAltName_critical) - - if self.keyUsage: - params = cryptography_parse_key_usage_params(self.keyUsage) - csr = csr.add_extension(cryptography.x509.KeyUsage(**params), critical=self.keyUsage_critical) - - if self.extendedKeyUsage: - usages = [cryptography_name_to_oid(usage) for usage in self.extendedKeyUsage] - csr = csr.add_extension(cryptography.x509.ExtendedKeyUsage(usages), critical=self.extendedKeyUsage_critical) - - if self.basicConstraints: - params = {} - ca, path_length = cryptography_get_basic_constraints(self.basicConstraints) - csr = csr.add_extension(cryptography.x509.BasicConstraints(ca, path_length), critical=self.basicConstraints_critical) - - if self.ocspMustStaple: - try: - # This only works with cryptography >= 2.1 - csr = csr.add_extension(cryptography.x509.TLSFeature([cryptography.x509.TLSFeatureType.status_request]), critical=self.ocspMustStaple_critical) - except AttributeError as dummy: - csr = csr.add_extension( - cryptography.x509.UnrecognizedExtension(CRYPTOGRAPHY_MUST_STAPLE_NAME, CRYPTOGRAPHY_MUST_STAPLE_VALUE), - critical=self.ocspMustStaple_critical - ) - - if self.name_constraints_permitted or self.name_constraints_excluded: - try: - csr = csr.add_extension(cryptography.x509.NameConstraints( - [cryptography_get_name(name) for name in self.name_constraints_permitted], - [cryptography_get_name(name) for name in self.name_constraints_excluded], - ), critical=self.name_constraints_critical) - except TypeError as e: - raise OpenSSLObjectError('Error while parsing name constraint: {0}'.format(e)) - - if self.create_subject_key_identifier: - csr = csr.add_extension( - cryptography.x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()), - critical=False - ) - elif self.subject_key_identifier is not None: - csr = csr.add_extension(cryptography.x509.SubjectKeyIdentifier(self.subject_key_identifier), critical=False) - - if self.authority_key_identifier is not None or self.authority_cert_issuer is not None or self.authority_cert_serial_number is not None: - issuers = None - if self.authority_cert_issuer is not None: - issuers = [cryptography_get_name(n) for n in self.authority_cert_issuer] - csr = csr.add_extension( - cryptography.x509.AuthorityKeyIdentifier(self.authority_key_identifier, issuers, self.authority_cert_serial_number), - critical=False - ) - - digest = None - if cryptography_key_needs_digest_for_signing(self.privatekey): - if self.digest == 'sha256': - digest = cryptography.hazmat.primitives.hashes.SHA256() - elif self.digest == 'sha384': - digest = cryptography.hazmat.primitives.hashes.SHA384() - elif self.digest == 'sha512': - digest = cryptography.hazmat.primitives.hashes.SHA512() - elif self.digest == 'sha1': - digest = cryptography.hazmat.primitives.hashes.SHA1() - elif self.digest == 'md5': - digest = cryptography.hazmat.primitives.hashes.MD5() - # FIXME - else: - raise CertificateSigningRequestError('Unsupported digest "{0}"'.format(self.digest)) - try: - self.request = csr.sign(self.privatekey, digest, self.cryptography_backend) - except TypeError as e: - if str(e) == 'Algorithm must be a registered hash algorithm.' and digest is None: - self.module.fail_json(msg='Signing with Ed25519 and Ed448 keys requires cryptography 2.8 or newer.') - raise - except UnicodeError as e: - # This catches IDNAErrors, which happens when a bad name is passed as a SAN - # (https://github.com/ansible-collections/community.crypto/issues/105). - # For older cryptography versions, this is handled by idna, which raises - # an idna.core.IDNAError. Later versions of cryptography deprecated and stopped - # requiring idna, whence we cannot easily handle this error. Fortunately, in - # most versions of idna, IDNAError extends UnicodeError. There is only version - # 2.3 where it extends Exception instead (see - # https://github.com/kjd/idna/commit/ebefacd3134d0f5da4745878620a6a1cba86d130 - # and then - # https://github.com/kjd/idna/commit/ea03c7b5db7d2a99af082e0239da2b68aeea702a). - msg = 'Error while creating CSR: {0}\n'.format(e) - if self.using_common_name_for_san: - self.module.fail_json(msg=msg + 'This is probably caused because the Common Name is used as a SAN.' - ' Specifying use_common_name_for_san=false might fix this.') - self.module.fail_json(msg=msg + 'This is probably caused by an invalid Subject Alternative DNS Name.') - - return self.request.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.PEM) - - def _load_private_key(self): - try: - if self.privatekey_content is not None: - content = self.privatekey_content - else: - with open(self.privatekey_path, 'rb') as f: - content = f.read() - self.privatekey = cryptography.hazmat.primitives.serialization.load_pem_private_key( - content, - None if self.privatekey_passphrase is None else to_bytes(self.privatekey_passphrase), - backend=self.cryptography_backend - ) - except Exception as e: - raise CertificateSigningRequestError(e) - - def _check_csr(self): - def _check_subject(csr): - subject = [(cryptography_name_to_oid(entry[0]), entry[1]) for entry in self.subject] - current_subject = [(sub.oid, sub.value) for sub in csr.subject] - return set(subject) == set(current_subject) - - def _find_extension(extensions, exttype): - return next( - (ext for ext in extensions if isinstance(ext.value, exttype)), - None - ) - - def _check_subjectAltName(extensions): - current_altnames_ext = _find_extension(extensions, cryptography.x509.SubjectAlternativeName) - current_altnames = [str(altname) for altname in current_altnames_ext.value] if current_altnames_ext else [] - altnames = [str(cryptography_get_name(altname)) for altname in self.subjectAltName] if self.subjectAltName else [] - if set(altnames) != set(current_altnames): - return False - if altnames: - if current_altnames_ext.critical != self.subjectAltName_critical: - return False - return True - - def _check_keyUsage(extensions): - current_keyusage_ext = _find_extension(extensions, cryptography.x509.KeyUsage) - if not self.keyUsage: - return current_keyusage_ext is None - elif current_keyusage_ext is None: - return False - params = cryptography_parse_key_usage_params(self.keyUsage) - for param in params: - if getattr(current_keyusage_ext.value, '_' + param) != params[param]: - return False - if current_keyusage_ext.critical != self.keyUsage_critical: - return False - return True - - def _check_extenededKeyUsage(extensions): - current_usages_ext = _find_extension(extensions, cryptography.x509.ExtendedKeyUsage) - current_usages = [str(usage) for usage in current_usages_ext.value] if current_usages_ext else [] - usages = [str(cryptography_name_to_oid(usage)) for usage in self.extendedKeyUsage] if self.extendedKeyUsage else [] - if set(current_usages) != set(usages): - return False - if usages: - if current_usages_ext.critical != self.extendedKeyUsage_critical: - return False - return True - - def _check_basicConstraints(extensions): - bc_ext = _find_extension(extensions, cryptography.x509.BasicConstraints) - current_ca = bc_ext.value.ca if bc_ext else False - current_path_length = bc_ext.value.path_length if bc_ext else None - ca, path_length = cryptography_get_basic_constraints(self.basicConstraints) - # Check CA flag - if ca != current_ca: - return False - # Check path length - if path_length != current_path_length: - return False - # Check criticality - if self.basicConstraints: - if bc_ext.critical != self.basicConstraints_critical: - return False - return True - - def _check_ocspMustStaple(extensions): - try: - # This only works with cryptography >= 2.1 - tlsfeature_ext = _find_extension(extensions, cryptography.x509.TLSFeature) - has_tlsfeature = True - except AttributeError as dummy: - tlsfeature_ext = next( - (ext for ext in extensions if ext.value.oid == CRYPTOGRAPHY_MUST_STAPLE_NAME), - None - ) - has_tlsfeature = False - if self.ocspMustStaple: - if not tlsfeature_ext or tlsfeature_ext.critical != self.ocspMustStaple_critical: - return False - if has_tlsfeature: - return cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value - else: - return tlsfeature_ext.value.value == CRYPTOGRAPHY_MUST_STAPLE_VALUE - else: - return tlsfeature_ext is None - - def _check_nameConstraints(extensions): - current_nc_ext = _find_extension(extensions, cryptography.x509.NameConstraints) - current_nc_perm = [str(altname) for altname in current_nc_ext.value.permitted_subtrees] if current_nc_ext else [] - current_nc_excl = [str(altname) for altname in current_nc_ext.value.excluded_subtrees] if current_nc_ext else [] - nc_perm = [str(cryptography_get_name(altname)) for altname in self.name_constraints_permitted] - nc_excl = [str(cryptography_get_name(altname)) for altname in self.name_constraints_excluded] - if set(nc_perm) != set(current_nc_perm) or set(nc_excl) != set(current_nc_excl): - return False - if nc_perm or nc_excl: - if current_nc_ext.critical != self.name_constraints_critical: - return False - return True - - def _check_subject_key_identifier(extensions): - ext = _find_extension(extensions, cryptography.x509.SubjectKeyIdentifier) - if self.create_subject_key_identifier or self.subject_key_identifier is not None: - if not ext or ext.critical: - return False - if self.create_subject_key_identifier: - digest = cryptography.x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()).digest - return ext.value.digest == digest - else: - return ext.value.digest == self.subject_key_identifier - else: - return ext is None - - def _check_authority_key_identifier(extensions): - ext = _find_extension(extensions, cryptography.x509.AuthorityKeyIdentifier) - if self.authority_key_identifier is not None or self.authority_cert_issuer is not None or self.authority_cert_serial_number is not None: - if not ext or ext.critical: - return False - aci = None - csr_aci = None - if self.authority_cert_issuer is not None: - aci = [str(cryptography_get_name(n)) for n in self.authority_cert_issuer] - if ext.value.authority_cert_issuer is not None: - csr_aci = [str(n) for n in ext.value.authority_cert_issuer] - return (ext.value.key_identifier == self.authority_key_identifier - and csr_aci == aci - and ext.value.authority_cert_serial_number == self.authority_cert_serial_number) - else: - return ext is None - - def _check_extensions(csr): - extensions = csr.extensions - return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and - _check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and - _check_ocspMustStaple(extensions) and _check_subject_key_identifier(extensions) and - _check_authority_key_identifier(extensions) and _check_nameConstraints(extensions)) - - def _check_signature(csr): - if not csr.is_signature_valid: - return False - # To check whether public key of CSR belongs to private key, - # encode both public keys and compare PEMs. - key_a = csr.public_key().public_bytes( - cryptography.hazmat.primitives.serialization.Encoding.PEM, - cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo - ) - key_b = self.privatekey.public_key().public_bytes( - cryptography.hazmat.primitives.serialization.Encoding.PEM, - cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo - ) - return key_a == key_b - - try: - csr = load_certificate_request(self.path, backend='cryptography') - except Exception as dummy: - return False - - return _check_subject(csr) and _check_extensions(csr) and _check_signature(csr) - - def main(): - module = AnsibleModule( - argument_spec=dict( - state=dict(type='str', default='present', choices=['absent', 'present']), - digest=dict(type='str', default='sha256'), - privatekey_path=dict(type='path'), - privatekey_content=dict(type='str', no_log=True), - privatekey_passphrase=dict(type='str', no_log=True), - version=dict(type='int', default=1), - force=dict(type='bool', default=False), - path=dict(type='path', required=True), - subject=dict(type='dict'), - country_name=dict(type='str', aliases=['C', 'countryName']), - state_or_province_name=dict(type='str', aliases=['ST', 'stateOrProvinceName']), - locality_name=dict(type='str', aliases=['L', 'localityName']), - organization_name=dict(type='str', aliases=['O', 'organizationName']), - organizational_unit_name=dict(type='str', aliases=['OU', 'organizationalUnitName']), - common_name=dict(type='str', aliases=['CN', 'commonName']), - email_address=dict(type='str', aliases=['E', 'emailAddress']), - subject_alt_name=dict(type='list', elements='str', aliases=['subjectAltName']), - subject_alt_name_critical=dict(type='bool', default=False, aliases=['subjectAltName_critical']), - use_common_name_for_san=dict(type='bool', default=True, aliases=['useCommonNameForSAN']), - key_usage=dict(type='list', elements='str', aliases=['keyUsage']), - key_usage_critical=dict(type='bool', default=False, aliases=['keyUsage_critical']), - extended_key_usage=dict(type='list', elements='str', aliases=['extKeyUsage', 'extendedKeyUsage']), - extended_key_usage_critical=dict(type='bool', default=False, aliases=['extKeyUsage_critical', 'extendedKeyUsage_critical']), - basic_constraints=dict(type='list', elements='str', aliases=['basicConstraints']), - basic_constraints_critical=dict(type='bool', default=False, aliases=['basicConstraints_critical']), - ocsp_must_staple=dict(type='bool', default=False, aliases=['ocspMustStaple']), - ocsp_must_staple_critical=dict(type='bool', default=False, aliases=['ocspMustStaple_critical']), - name_constraints_permitted=dict(type='list', elements='str'), - name_constraints_excluded=dict(type='list', elements='str'), - name_constraints_critical=dict(type='bool', default=False), - backup=dict(type='bool', default=False), - create_subject_key_identifier=dict(type='bool', default=False), - subject_key_identifier=dict(type='str'), - authority_key_identifier=dict(type='str'), - authority_cert_issuer=dict(type='list', elements='str'), - authority_cert_serial_number=dict(type='int'), - select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), - return_content=dict(type='bool', default=False), - ), - required_together=[('authority_cert_issuer', 'authority_cert_serial_number')], - required_if=[('state', 'present', ['privatekey_path', 'privatekey_content'], True)], - mutually_exclusive=( - ['privatekey_path', 'privatekey_content'], - ), + argument_spec = get_csr_argument_spec() + argument_spec.argument_spec.update(dict( + state=dict(type='str', default='present', choices=['absent', 'present']), + force=dict(type='bool', default=False), + path=dict(type='path', required=True), + backup=dict(type='bool', default=False), + return_content=dict(type='bool', default=False), + )) + argument_spec.required_if.extend([('state', 'present', rof, True) for rof in argument_spec.required_one_of]) + argument_spec.required_one_of = [] + module = argument_spec.create_ansible_module( add_file_common_args=True, supports_check_mode=True, ) - if module.params['version'] != 1: - module.deprecate('The version option will only support allowed values from community.crypto 2.0.0 on. ' - 'Currently, only the value 1 is allowed by RFC 2986', - version='2.0.0', collection_name='community.crypto') - base_dir = os.path.dirname(module.params['path']) or '.' if not os.path.isdir(base_dir): module.fail_json(name=base_dir, msg='The directory %s does not exist or the file is not a directory' % base_dir) backend = module.params['select_crypto_backend'] - if backend == 'auto': - # Detection what is possible - can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) - - # First try cryptography, then pyOpenSSL - if can_use_cryptography: - backend = 'cryptography' - elif can_use_pyopenssl: - backend = 'pyopenssl' - - # Success? - if backend == 'auto': - module.fail_json(msg=("Can't detect any of the required Python libraries " - "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( - MINIMAL_CRYPTOGRAPHY_VERSION, - MINIMAL_PYOPENSSL_VERSION)) + backend, module_backend = select_backend(module, backend) try: - if backend == 'pyopenssl': - if not PYOPENSSL_FOUND: - module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)), - exception=PYOPENSSL_IMP_ERR) - try: - getattr(crypto.X509Req, 'get_extensions') - except AttributeError: - module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs') - - module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', - version='2.0.0', collection_name='community.crypto') - csr = CertificateSigningRequestPyOpenSSL(module) - elif backend == 'cryptography': - if not CRYPTOGRAPHY_FOUND: - module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), - exception=CRYPTOGRAPHY_IMP_ERR) - csr = CertificateSigningRequestCryptography(module) - + csr = CertificateSigningRequestModule(module, module_backend) if module.params['state'] == 'present': - if module.check_mode: - result = csr.dump() - result['changed'] = module.params['force'] or not csr.check(module) - module.exit_json(**result) - csr.generate(module) - else: - if module.check_mode: - result = csr.dump() - result['changed'] = os.path.exists(module.params['path']) - module.exit_json(**result) - csr.remove(module) result = csr.dump() diff --git a/plugins/modules/openssl_csr_pipe.py b/plugins/modules/openssl_csr_pipe.py new file mode 100644 index 00000000..d8fc9644 --- /dev/null +++ b/plugins/modules/openssl_csr_pipe.py @@ -0,0 +1,175 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2017, Yanis Guenane +# Copyright: (c) 2020, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: openssl_csr_pipe +short_description: Generate OpenSSL Certificate Signing Request (CSR) +version_added: 1.3.0 +description: + - "Please note that the module regenerates an existing CSR if it doesn't match the module's + options, or if it seems to be corrupt." +author: +- Yanis Guenane (@Spredzy) +- Felix Fontein (@felixfontein) +options: + content: + description: + - The existing CSR. + type: str +extends_documentation_fragment: +- community.crypto.module_csr +seealso: +- module: community.crypto.openssl_csr +''' + +EXAMPLES = r''' +- name: Generate an OpenSSL Certificate Signing Request + community.crypto.openssl_csr_pipe: + privatekey_path: /etc/ssl/private/ansible.com.pem + common_name: www.ansible.com + register: result +- debug: + var: result.csr + +- name: Generate an OpenSSL Certificate Signing Request with an inline key + community.crypto.openssl_csr: + content: "{{ lookup('file', '/etc/ssl/csr/www.ansible.com.csr') }}" + privatekey_content: "{{ private_key_content }}" + common_name: www.ansible.com + register: result +- name: Store CSR + ansible.builtin.copy: + path: /etc/ssl/csr/www.ansible.com.csr + content: "{{ result.csr }}" + when: result is changed +''' + +RETURN = r''' +privatekey: + description: + - Path to the TLS/SSL private key the CSR was generated for + - Will be C(none) if the private key has been provided in I(privatekey_content). + returned: changed or success + type: str + sample: /etc/ssl/private/ansible.com.pem +subject: + description: A list of the subject tuples attached to the CSR + returned: changed or success + type: list + elements: list + sample: "[('CN', 'www.ansible.com'), ('O', 'Ansible')]" +subjectAltName: + description: The alternative names this CSR is valid for + returned: changed or success + type: list + elements: str + sample: [ 'DNS:www.ansible.com', 'DNS:m.ansible.com' ] +keyUsage: + description: Purpose for which the public key may be used + returned: changed or success + type: list + elements: str + sample: [ 'digitalSignature', 'keyAgreement' ] +extendedKeyUsage: + description: Additional restriction on the public key purposes + returned: changed or success + type: list + elements: str + sample: [ 'clientAuth' ] +basicConstraints: + description: Indicates if the certificate belongs to a CA + returned: changed or success + type: list + elements: str + sample: ['CA:TRUE', 'pathLenConstraint:0'] +ocsp_must_staple: + description: Indicates whether the certificate has the OCSP + Must Staple feature enabled + returned: changed or success + type: bool + sample: false +name_constraints_permitted: + description: List of permitted subtrees to sign certificates for. + returned: changed or success + type: list + elements: str + sample: ['email:.somedomain.com'] +name_constraints_excluded: + description: List of excluded subtrees the CA cannot sign certificates for. + returned: changed or success + type: list + elements: str + sample: ['email:.com'] +csr: + description: The (current or generated) CSR's content. + returned: if I(state) is C(present) + type: str +''' + +from ansible.module_utils._text import to_native + +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.csr import ( + select_backend, + get_csr_argument_spec, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( + OpenSSLObjectError, +) + + +class CertificateSigningRequestModule(object): + def __init__(self, module, module_backend): + self.check_mode = module.check_mode + self.module_backend = module_backend + self.changed = False + if module.params['content'] is not None: + self.module_backend.set_existing(module.params['content'].encode('utf-8')) + + def generate(self, module): + '''Generate the certificate signing request.''' + if self.module_backend.needs_regeneration(): + if not self.check_mode: + self.module_backend.generate_csr() + self.changed = True + + def dump(self): + '''Serialize the object into a dictionary.''' + result = self.module_backend.dump(include_csr=True) + result.update({ + 'changed': self.changed, + }) + return result + + +def main(): + argument_spec = get_csr_argument_spec() + argument_spec.argument_spec.update(dict( + content=dict(type='str'), + )) + module = argument_spec.create_ansible_module( + supports_check_mode=True, + ) + + backend = module.params['select_crypto_backend'] + backend, module_backend = select_backend(module, backend) + try: + csr = CertificateSigningRequestModule(module, module_backend) + csr.generate(module) + result = csr.dump() + module.exit_json(**result) + except OpenSSLObjectError as exc: + module.fail_json(msg=to_native(exc)) + + +if __name__ == "__main__": + main() diff --git a/plugins/modules/x509_certificate.py b/plugins/modules/x509_certificate.py index add55bda..43d8a1bd 100644 --- a/plugins/modules/x509_certificate.py +++ b/plugins/modules/x509_certificate.py @@ -599,6 +599,7 @@ notes: certificate can be moved to the target machine. seealso: - module: community.crypto.openssl_csr +- module: community.crypto.openssl_csr_pipe - module: community.crypto.openssl_dhparam - module: community.crypto.openssl_pkcs12 - module: community.crypto.openssl_privatekey diff --git a/tests/integration/targets/openssl_csr_pipe/aliases b/tests/integration/targets/openssl_csr_pipe/aliases new file mode 100644 index 00000000..6eae8bd8 --- /dev/null +++ b/tests/integration/targets/openssl_csr_pipe/aliases @@ -0,0 +1,2 @@ +shippable/posix/group1 +destructive diff --git a/tests/integration/targets/openssl_csr_pipe/meta/main.yml b/tests/integration/targets/openssl_csr_pipe/meta/main.yml new file mode 100644 index 00000000..800aff64 --- /dev/null +++ b/tests/integration/targets/openssl_csr_pipe/meta/main.yml @@ -0,0 +1,2 @@ +dependencies: + - setup_openssl diff --git a/tests/integration/targets/openssl_csr_pipe/tasks/impl.yml b/tests/integration/targets/openssl_csr_pipe/tasks/impl.yml new file mode 100644 index 00000000..89cd9b6c --- /dev/null +++ b/tests/integration/targets/openssl_csr_pipe/tasks/impl.yml @@ -0,0 +1,91 @@ +--- +- name: "({{ select_crypto_backend }}) Generate privatekey" + openssl_privatekey: + path: '{{ output_dir }}/privatekey.pem' + +- name: "({{ select_crypto_backend }}) Generate CSR (check mode)" + openssl_csr_pipe: + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + check_mode: yes + register: generate_csr_check + +- name: "({{ select_crypto_backend }}) Generate CSR" + openssl_csr_pipe: + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + register: generate_csr + +- name: "({{ select_crypto_backend }}) Generate CSR (idempotent)" + openssl_csr_pipe: + content: "{{ generate_csr.csr }}" + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + register: generate_csr_idempotent + +- name: "({{ select_crypto_backend }}) Generate CSR (idempotent, check mode)" + openssl_csr_pipe: + content: "{{ generate_csr.csr }}" + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: www.ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + check_mode: yes + register: generate_csr_idempotent_check + +- name: "({{ select_crypto_backend }}) Generate CSR (changed)" + openssl_csr_pipe: + content: "{{ generate_csr.csr }}" + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + register: generate_csr_changed + +- name: "({{ select_crypto_backend }}) Generate CSR (changed, check mode)" + openssl_csr_pipe: + content: "{{ generate_csr.csr }}" + privatekey_path: '{{ output_dir }}/privatekey.pem' + subject: + commonName: ansible.com + select_crypto_backend: '{{ select_crypto_backend }}' + check_mode: yes + register: generate_csr_changed_check + +- name: "({{ select_crypto_backend }}) Validate CSR (test - privatekey modulus)" + shell: 'openssl rsa -noout -modulus -in {{ output_dir }}/privatekey.pem' + register: privatekey_modulus + +- name: "({{ select_crypto_backend }}) Validate CSR (test - Common Name)" + shell: "openssl req -noout -subject -in /dev/stdin -nameopt oneline,-space_eq" + args: + stdin: "{{ generate_csr.csr }}" + register: csr_cn + +- name: "({{ select_crypto_backend }}) Validate CSR (test - csr modulus)" + shell: 'openssl req -noout -modulus -in /dev/stdin' + args: + stdin: "{{ generate_csr.csr }}" + register: csr_modulus + +- name: "({{ select_crypto_backend }}) Validate CSR (assert)" + assert: + that: + - csr_cn.stdout.split('=')[-1] == 'www.ansible.com' + - csr_modulus.stdout == privatekey_modulus.stdout + +- name: "({{ select_crypto_backend }}) Validate CSR (check mode, idempotency)" + assert: + that: + - generate_csr_check is changed + - generate_csr is changed + - generate_csr_idempotent is not changed + - generate_csr_idempotent_check is not changed + - generate_csr_changed is changed + - generate_csr_changed_check is changed diff --git a/tests/integration/targets/openssl_csr_pipe/tasks/main.yml b/tests/integration/targets/openssl_csr_pipe/tasks/main.yml new file mode 100644 index 00000000..330edb8f --- /dev/null +++ b/tests/integration/targets/openssl_csr_pipe/tasks/main.yml @@ -0,0 +1,40 @@ +--- +#################################################################### +# WARNING: These are designed specifically for Ansible tests # +# and should not be used as examples of how to write Ansible roles # +#################################################################### + +- name: Prepare private key for backend autodetection test + openssl_privatekey: + path: '{{ output_dir }}/privatekey_backend_selection.pem' +- name: Run module with backend autodetection + openssl_csr_pipe: + privatekey_path: '{{ output_dir }}/privatekey_backend_selection.pem' + subject: + commonName: www.ansible.com + +- block: + - name: Running tests with pyOpenSSL backend + include_tasks: impl.yml + vars: + select_crypto_backend: pyopenssl + + when: pyopenssl_version.stdout is version('0.15', '>=') + +- name: Remove output directory + file: + path: "{{ output_dir }}" + state: absent + +- name: Re-create output directory + file: + path: "{{ output_dir }}" + state: directory + +- block: + - name: Running tests with cryptography backend + include_tasks: impl.yml + vars: + select_crypto_backend: cryptography + + when: cryptography_version.stdout is version('1.3', '>=')