Refactor openssl_csr module, add openssl_csr_pipe module (#123)
* Extract doc fragment from openssl_csr. * Refactor openssl_csr module into backend + module. * Add openssl_csr_pipe module. * Add seealso references. * ... * Use /dev/stdin instead of -, which seems to be only supported by newer openssl versions. * Bump version. * DRY: use select_message_digest. * Fix deprecation version in docs. * Docs improvements. * Improve argument spec handling for module backends. * Linting. * Fix linting problems by using kwargs.pull/131/head
parent
fd7871ae7d
commit
9792188b0e
|
@ -0,0 +1,267 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyrigt: (c) 2017, Yanis Guenane <yanis+ansible@guenane.org>
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
class ModuleDocFragment(object):
|
||||
|
||||
# Standard files documentation fragment
|
||||
DOCUMENTATION = r'''
|
||||
description:
|
||||
- This module allows one to (re)generate OpenSSL certificate signing requests.
|
||||
- This module supports the subjectAltName, keyUsage, extendedKeyUsage, basicConstraints and OCSP Must Staple
|
||||
extensions.
|
||||
- "The module can use the cryptography Python library, or the pyOpenSSL Python
|
||||
library. By default, it tries to detect which one is available. This can be
|
||||
overridden with the I(select_crypto_backend) option. Please note that the
|
||||
PyOpenSSL backend was deprecated in Ansible 2.9 and will be removed in community.crypto 2.0.0."
|
||||
requirements:
|
||||
- Either cryptography >= 1.3
|
||||
- Or pyOpenSSL >= 0.15
|
||||
options:
|
||||
digest:
|
||||
description:
|
||||
- The digest used when signing the certificate signing request with the private key.
|
||||
type: str
|
||||
default: sha256
|
||||
privatekey_path:
|
||||
description:
|
||||
- The path to the private key to use when signing the certificate signing request.
|
||||
- Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both.
|
||||
type: path
|
||||
privatekey_content:
|
||||
description:
|
||||
- The content of the private key to use when signing the certificate signing request.
|
||||
- Either I(privatekey_path) or I(privatekey_content) must be specified if I(state) is C(present), but not both.
|
||||
type: str
|
||||
privatekey_passphrase:
|
||||
description:
|
||||
- The passphrase for the private key.
|
||||
- This is required if the private key is password protected.
|
||||
type: str
|
||||
version:
|
||||
description:
|
||||
- The version of the certificate signing request.
|
||||
- "The only allowed value according to L(RFC 2986,https://tools.ietf.org/html/rfc2986#section-4.1)
|
||||
is 1."
|
||||
- This option will no longer accept unsupported values from community.crypto 2.0.0 on.
|
||||
type: int
|
||||
default: 1
|
||||
subject:
|
||||
description:
|
||||
- Key/value pairs that will be present in the subject name field of the certificate signing request.
|
||||
- If you need to specify more than one value with the same key, use a list as value.
|
||||
type: dict
|
||||
country_name:
|
||||
description:
|
||||
- The countryName field of the certificate signing request subject.
|
||||
type: str
|
||||
aliases: [ C, countryName ]
|
||||
state_or_province_name:
|
||||
description:
|
||||
- The stateOrProvinceName field of the certificate signing request subject.
|
||||
type: str
|
||||
aliases: [ ST, stateOrProvinceName ]
|
||||
locality_name:
|
||||
description:
|
||||
- The localityName field of the certificate signing request subject.
|
||||
type: str
|
||||
aliases: [ L, localityName ]
|
||||
organization_name:
|
||||
description:
|
||||
- The organizationName field of the certificate signing request subject.
|
||||
type: str
|
||||
aliases: [ O, organizationName ]
|
||||
organizational_unit_name:
|
||||
description:
|
||||
- The organizationalUnitName field of the certificate signing request subject.
|
||||
type: str
|
||||
aliases: [ OU, organizationalUnitName ]
|
||||
common_name:
|
||||
description:
|
||||
- The commonName field of the certificate signing request subject.
|
||||
type: str
|
||||
aliases: [ CN, commonName ]
|
||||
email_address:
|
||||
description:
|
||||
- The emailAddress field of the certificate signing request subject.
|
||||
type: str
|
||||
aliases: [ E, emailAddress ]
|
||||
subject_alt_name:
|
||||
description:
|
||||
- Subject Alternative Name (SAN) extension to attach to the certificate signing request.
|
||||
- This can either be a 'comma separated string' or a YAML list.
|
||||
- Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName),
|
||||
C(otherName) and the ones specific to your CA).
|
||||
- Note that if no SAN is specified, but a common name, the common
|
||||
name will be added as a SAN except if C(useCommonNameForSAN) is
|
||||
set to I(false).
|
||||
- More at U(https://tools.ietf.org/html/rfc5280#section-4.2.1.6).
|
||||
type: list
|
||||
elements: str
|
||||
aliases: [ subjectAltName ]
|
||||
subject_alt_name_critical:
|
||||
description:
|
||||
- Should the subjectAltName extension be considered as critical.
|
||||
type: bool
|
||||
aliases: [ subjectAltName_critical ]
|
||||
use_common_name_for_san:
|
||||
description:
|
||||
- If set to C(yes), the module will fill the common name in for
|
||||
C(subject_alt_name) with C(DNS:) prefix if no SAN is specified.
|
||||
type: bool
|
||||
default: yes
|
||||
aliases: [ useCommonNameForSAN ]
|
||||
key_usage:
|
||||
description:
|
||||
- This defines the purpose (e.g. encipherment, signature, certificate signing)
|
||||
of the key contained in the certificate.
|
||||
type: list
|
||||
elements: str
|
||||
aliases: [ keyUsage ]
|
||||
key_usage_critical:
|
||||
description:
|
||||
- Should the keyUsage extension be considered as critical.
|
||||
type: bool
|
||||
aliases: [ keyUsage_critical ]
|
||||
extended_key_usage:
|
||||
description:
|
||||
- Additional restrictions (e.g. client authentication, server authentication)
|
||||
on the allowed purposes for which the public key may be used.
|
||||
type: list
|
||||
elements: str
|
||||
aliases: [ extKeyUsage, extendedKeyUsage ]
|
||||
extended_key_usage_critical:
|
||||
description:
|
||||
- Should the extkeyUsage extension be considered as critical.
|
||||
type: bool
|
||||
aliases: [ extKeyUsage_critical, extendedKeyUsage_critical ]
|
||||
basic_constraints:
|
||||
description:
|
||||
- Indicates basic constraints, such as if the certificate is a CA.
|
||||
type: list
|
||||
elements: str
|
||||
aliases: [ basicConstraints ]
|
||||
basic_constraints_critical:
|
||||
description:
|
||||
- Should the basicConstraints extension be considered as critical.
|
||||
type: bool
|
||||
aliases: [ basicConstraints_critical ]
|
||||
ocsp_must_staple:
|
||||
description:
|
||||
- Indicates that the certificate should contain the OCSP Must Staple
|
||||
extension (U(https://tools.ietf.org/html/rfc7633)).
|
||||
type: bool
|
||||
aliases: [ ocspMustStaple ]
|
||||
ocsp_must_staple_critical:
|
||||
description:
|
||||
- Should the OCSP Must Staple extension be considered as critical.
|
||||
- Note that according to the RFC, this extension should not be marked
|
||||
as critical, as old clients not knowing about OCSP Must Staple
|
||||
are required to reject such certificates
|
||||
(see U(https://tools.ietf.org/html/rfc7633#section-4)).
|
||||
type: bool
|
||||
aliases: [ ocspMustStaple_critical ]
|
||||
name_constraints_permitted:
|
||||
description:
|
||||
- For CA certificates, this specifies a list of identifiers which describe
|
||||
subtrees of names that this CA is allowed to issue certificates for.
|
||||
- Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName),
|
||||
C(otherName) and the ones specific to your CA).
|
||||
type: list
|
||||
elements: str
|
||||
name_constraints_excluded:
|
||||
description:
|
||||
- For CA certificates, this specifies a list of identifiers which describe
|
||||
subtrees of names that this CA is *not* allowed to issue certificates for.
|
||||
- Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName),
|
||||
C(otherName) and the ones specific to your CA).
|
||||
type: list
|
||||
elements: str
|
||||
name_constraints_critical:
|
||||
description:
|
||||
- Should the Name Constraints extension be considered as critical.
|
||||
type: bool
|
||||
select_crypto_backend:
|
||||
description:
|
||||
- Determines which crypto backend to use.
|
||||
- The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl).
|
||||
- If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library.
|
||||
- If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
|
||||
- Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in community.crypto 2.0.0.
|
||||
From that point on, only the C(cryptography) backend will be available.
|
||||
type: str
|
||||
default: auto
|
||||
choices: [ auto, cryptography, pyopenssl ]
|
||||
create_subject_key_identifier:
|
||||
description:
|
||||
- Create the Subject Key Identifier from the public key.
|
||||
- "Please note that commercial CAs can ignore the value, respectively use a value of
|
||||
their own choice instead. Specifying this option is mostly useful for self-signed
|
||||
certificates or for own CAs."
|
||||
- Note that this is only supported if the C(cryptography) backend is used!
|
||||
type: bool
|
||||
default: no
|
||||
subject_key_identifier:
|
||||
description:
|
||||
- The subject key identifier as a hex string, where two bytes are separated by colons.
|
||||
- "Example: C(00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33)"
|
||||
- "Please note that commercial CAs ignore this value, respectively use a value of their
|
||||
own choice. Specifying this option is mostly useful for self-signed certificates
|
||||
or for own CAs."
|
||||
- Note that this option can only be used if I(create_subject_key_identifier) is C(no).
|
||||
- Note that this is only supported if the C(cryptography) backend is used!
|
||||
type: str
|
||||
authority_key_identifier:
|
||||
description:
|
||||
- The authority key identifier as a hex string, where two bytes are separated by colons.
|
||||
- "Example: C(00:11:22:33:44:55:66:77:88:99:aa:bb:cc:dd:ee:ff:00:11:22:33)"
|
||||
- If specified, I(authority_cert_issuer) must also be specified.
|
||||
- "Please note that commercial CAs ignore this value, respectively use a value of their
|
||||
own choice. Specifying this option is mostly useful for self-signed certificates
|
||||
or for own CAs."
|
||||
- Note that this is only supported if the C(cryptography) backend is used!
|
||||
- The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier),
|
||||
I(authority_cert_issuer) and I(authority_cert_serial_number) is specified.
|
||||
type: str
|
||||
authority_cert_issuer:
|
||||
description:
|
||||
- Names that will be present in the authority cert issuer field of the certificate signing request.
|
||||
- Values must be prefixed by their options. (i.e., C(email), C(URI), C(DNS), C(RID), C(IP), C(dirName),
|
||||
C(otherName) and the ones specific to your CA)
|
||||
- "Example: C(DNS:ca.example.org)"
|
||||
- If specified, I(authority_key_identifier) must also be specified.
|
||||
- "Please note that commercial CAs ignore this value, respectively use a value of their
|
||||
own choice. Specifying this option is mostly useful for self-signed certificates
|
||||
or for own CAs."
|
||||
- Note that this is only supported if the C(cryptography) backend is used!
|
||||
- The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier),
|
||||
I(authority_cert_issuer) and I(authority_cert_serial_number) is specified.
|
||||
type: list
|
||||
elements: str
|
||||
authority_cert_serial_number:
|
||||
description:
|
||||
- The authority cert serial number.
|
||||
- Note that this is only supported if the C(cryptography) backend is used!
|
||||
- "Please note that commercial CAs ignore this value, respectively use a value of their
|
||||
own choice. Specifying this option is mostly useful for self-signed certificates
|
||||
or for own CAs."
|
||||
- The C(AuthorityKeyIdentifier) will only be added if at least one of I(authority_key_identifier),
|
||||
I(authority_cert_issuer) and I(authority_cert_serial_number) is specified.
|
||||
type: int
|
||||
notes:
|
||||
- If the certificate signing request already exists it will be checked whether subjectAltName,
|
||||
keyUsage, extendedKeyUsage and basicConstraints only contain the requested values, whether
|
||||
OCSP Must Staple is as requested, and if the request was signed by the given private key.
|
||||
seealso:
|
||||
- module: community.crypto.x509_certificate
|
||||
- module: community.crypto.openssl_dhparam
|
||||
- module: community.crypto.openssl_pkcs12
|
||||
- module: community.crypto.openssl_privatekey
|
||||
- module: community.crypto.openssl_publickey
|
||||
- module: community.crypto.openssl_csr_info
|
||||
'''
|
|
@ -0,0 +1,30 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright: (c) 2020, Felix Fontein <felix@fontein.de>
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
|
||||
|
||||
class ArgumentSpec:
|
||||
def __init__(self, argument_spec, mutually_exclusive=None, required_together=None, required_one_of=None, required_if=None, required_by=None):
|
||||
self.argument_spec = argument_spec
|
||||
self.mutually_exclusive = mutually_exclusive or []
|
||||
self.required_together = required_together or []
|
||||
self.required_one_of = required_one_of or []
|
||||
self.required_if = required_if or []
|
||||
self.required_by = required_by or {}
|
||||
|
||||
def create_ansible_module(self, **kwargs):
|
||||
return AnsibleModule(
|
||||
argument_spec=self.argument_spec,
|
||||
mutually_exclusive=self.mutually_exclusive,
|
||||
required_together=self.required_together,
|
||||
required_one_of=self.required_one_of,
|
||||
required_if=self.required_if,
|
||||
required_by=self.required_by,
|
||||
**kwargs)
|
|
@ -0,0 +1,764 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright: (c) 2016, Yanis Guenane <yanis+ansible@guenane.org>
|
||||
# Copyright: (c) 2020, Felix Fontein <felix@fontein.de>
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
import abc
|
||||
import binascii
|
||||
import traceback
|
||||
|
||||
from distutils.version import LooseVersion
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible.module_utils._text import to_bytes, to_text
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
||||
OpenSSLObjectError,
|
||||
OpenSSLBadPassphraseError,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||
load_privatekey,
|
||||
load_certificate_request,
|
||||
parse_name_field,
|
||||
select_message_digest,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||
cryptography_get_basic_constraints,
|
||||
cryptography_get_name,
|
||||
cryptography_name_to_oid,
|
||||
cryptography_key_needs_digest_for_signing,
|
||||
cryptography_parse_key_usage_params,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
|
||||
pyopenssl_normalize_name_attribute,
|
||||
pyopenssl_parse_name_constraints,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec
|
||||
|
||||
|
||||
MINIMAL_PYOPENSSL_VERSION = '0.15'
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = '1.3'
|
||||
|
||||
PYOPENSSL_IMP_ERR = None
|
||||
try:
|
||||
import OpenSSL
|
||||
from OpenSSL import crypto
|
||||
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
|
||||
except ImportError:
|
||||
PYOPENSSL_IMP_ERR = traceback.format_exc()
|
||||
PYOPENSSL_FOUND = False
|
||||
else:
|
||||
PYOPENSSL_FOUND = True
|
||||
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
|
||||
# OpenSSL 1.1.0 or newer
|
||||
OPENSSL_MUST_STAPLE_NAME = b"tlsfeature"
|
||||
OPENSSL_MUST_STAPLE_VALUE = b"status_request"
|
||||
else:
|
||||
# OpenSSL 1.0.x or older
|
||||
OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24"
|
||||
OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05"
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
import cryptography.x509
|
||||
import cryptography.x509.oid
|
||||
import cryptography.exceptions
|
||||
import cryptography.hazmat.backends
|
||||
import cryptography.hazmat.primitives.serialization
|
||||
import cryptography.hazmat.primitives.hashes
|
||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||
except ImportError:
|
||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||
CRYPTOGRAPHY_FOUND = False
|
||||
else:
|
||||
CRYPTOGRAPHY_FOUND = True
|
||||
CRYPTOGRAPHY_MUST_STAPLE_NAME = cryptography.x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24")
|
||||
CRYPTOGRAPHY_MUST_STAPLE_VALUE = b"\x30\x03\x02\x01\x05"
|
||||
|
||||
|
||||
class CertificateSigningRequestError(OpenSSLObjectError):
|
||||
pass
|
||||
|
||||
|
||||
# From the object called `module`, only the following properties are used:
|
||||
#
|
||||
# - module.params[]
|
||||
# - module.warn(msg: str)
|
||||
# - module.fail_json(msg: str, **kwargs)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CertificateSigningRequestBackend(object):
|
||||
def __init__(self, module, backend):
|
||||
self.module = module
|
||||
self.backend = backend
|
||||
self.digest = module.params['digest']
|
||||
self.privatekey_path = module.params['privatekey_path']
|
||||
self.privatekey_content = module.params['privatekey_content']
|
||||
if self.privatekey_content is not None:
|
||||
self.privatekey_content = self.privatekey_content.encode('utf-8')
|
||||
self.privatekey_passphrase = module.params['privatekey_passphrase']
|
||||
self.version = module.params['version']
|
||||
self.subjectAltName = module.params['subject_alt_name']
|
||||
self.subjectAltName_critical = module.params['subject_alt_name_critical']
|
||||
self.keyUsage = module.params['key_usage']
|
||||
self.keyUsage_critical = module.params['key_usage_critical']
|
||||
self.extendedKeyUsage = module.params['extended_key_usage']
|
||||
self.extendedKeyUsage_critical = module.params['extended_key_usage_critical']
|
||||
self.basicConstraints = module.params['basic_constraints']
|
||||
self.basicConstraints_critical = module.params['basic_constraints_critical']
|
||||
self.ocspMustStaple = module.params['ocsp_must_staple']
|
||||
self.ocspMustStaple_critical = module.params['ocsp_must_staple_critical']
|
||||
self.name_constraints_permitted = module.params['name_constraints_permitted'] or []
|
||||
self.name_constraints_excluded = module.params['name_constraints_excluded'] or []
|
||||
self.name_constraints_critical = module.params['name_constraints_critical']
|
||||
self.create_subject_key_identifier = module.params['create_subject_key_identifier']
|
||||
self.subject_key_identifier = module.params['subject_key_identifier']
|
||||
self.authority_key_identifier = module.params['authority_key_identifier']
|
||||
self.authority_cert_issuer = module.params['authority_cert_issuer']
|
||||
self.authority_cert_serial_number = module.params['authority_cert_serial_number']
|
||||
self.csr = None
|
||||
self.privatekey = None
|
||||
|
||||
if self.create_subject_key_identifier and self.subject_key_identifier is not None:
|
||||
module.fail_json(msg='subject_key_identifier cannot be specified if create_subject_key_identifier is true')
|
||||
|
||||
self.subject = [
|
||||
('C', module.params['country_name']),
|
||||
('ST', module.params['state_or_province_name']),
|
||||
('L', module.params['locality_name']),
|
||||
('O', module.params['organization_name']),
|
||||
('OU', module.params['organizational_unit_name']),
|
||||
('CN', module.params['common_name']),
|
||||
('emailAddress', module.params['email_address']),
|
||||
]
|
||||
|
||||
if module.params['subject']:
|
||||
self.subject = self.subject + parse_name_field(module.params['subject'])
|
||||
self.subject = [(entry[0], entry[1]) for entry in self.subject if entry[1]]
|
||||
|
||||
self.using_common_name_for_san = False
|
||||
if not self.subjectAltName and module.params['use_common_name_for_san']:
|
||||
for sub in self.subject:
|
||||
if sub[0] in ('commonName', 'CN'):
|
||||
self.subjectAltName = ['DNS:%s' % sub[1]]
|
||||
self.using_common_name_for_san = True
|
||||
break
|
||||
|
||||
if self.subject_key_identifier is not None:
|
||||
try:
|
||||
self.subject_key_identifier = binascii.unhexlify(self.subject_key_identifier.replace(':', ''))
|
||||
except Exception as e:
|
||||
raise CertificateSigningRequestError('Cannot parse subject_key_identifier: {0}'.format(e))
|
||||
|
||||
if self.authority_key_identifier is not None:
|
||||
try:
|
||||
self.authority_key_identifier = binascii.unhexlify(self.authority_key_identifier.replace(':', ''))
|
||||
except Exception as e:
|
||||
raise CertificateSigningRequestError('Cannot parse authority_key_identifier: {0}'.format(e))
|
||||
|
||||
self.existing_csr = None
|
||||
self.existing_csr_bytes = None
|
||||
|
||||
@abc.abstractmethod
|
||||
def generate_csr(self):
|
||||
"""(Re-)Generate private key."""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_csr_data(self):
|
||||
"""Return bytes for self.csr."""
|
||||
pass
|
||||
|
||||
def set_existing(self, csr_bytes):
|
||||
"""Set existing private key bytes. None indicates that the key does not exist."""
|
||||
self.existing_csr_bytes = csr_bytes
|
||||
|
||||
def has_existing(self):
|
||||
"""Query whether an existing private key is/has been there."""
|
||||
return self.existing_csr_bytes is not None
|
||||
|
||||
def _ensure_private_key_loaded(self):
|
||||
"""Load the provided private key into self.privatekey."""
|
||||
if self.privatekey is not None:
|
||||
return
|
||||
try:
|
||||
self.privatekey = load_privatekey(
|
||||
path=self.privatekey_path,
|
||||
content=self.privatekey_content,
|
||||
passphrase=self.privatekey_passphrase,
|
||||
backend=self.backend,
|
||||
)
|
||||
except OpenSSLBadPassphraseError as exc:
|
||||
raise CertificateSigningRequestError(exc)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _check_csr(self):
|
||||
"""Check whether provided parameters, assuming self.existing_csr and self.privatekey have been populated."""
|
||||
pass
|
||||
|
||||
def needs_regeneration(self):
|
||||
"""Check whether a regeneration is necessary."""
|
||||
if self.existing_csr_bytes is None:
|
||||
return True
|
||||
try:
|
||||
self.existing_csr = load_certificate_request(None, content=self.existing_csr_bytes, backend=self.backend)
|
||||
except Exception as dummy:
|
||||
return True
|
||||
self._ensure_private_key_loaded()
|
||||
return not self._check_csr()
|
||||
|
||||
def dump(self, include_csr):
|
||||
"""Serialize the object into a dictionary."""
|
||||
result = {
|
||||
'privatekey': self.privatekey_path,
|
||||
'subject': self.subject,
|
||||
'subjectAltName': self.subjectAltName,
|
||||
'keyUsage': self.keyUsage,
|
||||
'extendedKeyUsage': self.extendedKeyUsage,
|
||||
'basicConstraints': self.basicConstraints,
|
||||
'ocspMustStaple': self.ocspMustStaple,
|
||||
'name_constraints_permitted': self.name_constraints_permitted,
|
||||
'name_constraints_excluded': self.name_constraints_excluded,
|
||||
}
|
||||
if include_csr:
|
||||
# Get hold of CSR bytes
|
||||
csr_bytes = self.existing_csr_bytes
|
||||
if self.csr is not None:
|
||||
csr_bytes = self.get_csr_data()
|
||||
# Store result
|
||||
result['csr'] = csr_bytes.decode('utf-8') if csr_bytes else None
|
||||
return result
|
||||
|
||||
|
||||
# Implementation with using pyOpenSSL
|
||||
class CertificateSigningRequestPyOpenSSLBackend(CertificateSigningRequestBackend):
|
||||
def __init__(self, module):
|
||||
if module.params['create_subject_key_identifier']:
|
||||
module.fail_json(msg='You cannot use create_subject_key_identifier with the pyOpenSSL backend!')
|
||||
for o in ('subject_key_identifier', 'authority_key_identifier', 'authority_cert_issuer', 'authority_cert_serial_number'):
|
||||
if module.params[o] is not None:
|
||||
module.fail_json(msg='You cannot use {0} with the pyOpenSSL backend!'.format(o))
|
||||
super(CertificateSigningRequestPyOpenSSLBackend, self).__init__(module, 'pyopenssl')
|
||||
|
||||
def generate_csr(self):
|
||||
"""(Re-)Generate private key."""
|
||||
self._ensure_private_key_loaded()
|
||||
|
||||
req = crypto.X509Req()
|
||||
req.set_version(self.version - 1)
|
||||
subject = req.get_subject()
|
||||
for entry in self.subject:
|
||||
if entry[1] is not None:
|
||||
# Workaround for https://github.com/pyca/pyopenssl/issues/165
|
||||
nid = OpenSSL._util.lib.OBJ_txt2nid(to_bytes(entry[0]))
|
||||
if nid == 0:
|
||||
raise CertificateSigningRequestError('Unknown subject field identifier "{0}"'.format(entry[0]))
|
||||
res = OpenSSL._util.lib.X509_NAME_add_entry_by_NID(subject._name, nid, OpenSSL._util.lib.MBSTRING_UTF8, to_bytes(entry[1]), -1, -1, 0)
|
||||
if res == 0:
|
||||
raise CertificateSigningRequestError('Invalid value for subject field identifier "{0}": {1}'.format(entry[0], entry[1]))
|
||||
|
||||
extensions = []
|
||||
if self.subjectAltName:
|
||||
altnames = ', '.join(self.subjectAltName)
|
||||
try:
|
||||
extensions.append(crypto.X509Extension(b"subjectAltName", self.subjectAltName_critical, altnames.encode('ascii')))
|
||||
except OpenSSL.crypto.Error as e:
|
||||
raise CertificateSigningRequestError(
|
||||
'Error while parsing Subject Alternative Names {0} (check for missing type prefix, such as "DNS:"!): {1}'.format(
|
||||
', '.join(["{0}".format(san) for san in self.subjectAltName]), str(e)
|
||||
)
|
||||
)
|
||||
|
||||
if self.keyUsage:
|
||||
usages = ', '.join(self.keyUsage)
|
||||
extensions.append(crypto.X509Extension(b"keyUsage", self.keyUsage_critical, usages.encode('ascii')))
|
||||
|
||||
if self.extendedKeyUsage:
|
||||
usages = ', '.join(self.extendedKeyUsage)
|
||||
extensions.append(crypto.X509Extension(b"extendedKeyUsage", self.extendedKeyUsage_critical, usages.encode('ascii')))
|
||||
|
||||
if self.basicConstraints:
|
||||
usages = ', '.join(self.basicConstraints)
|
||||
extensions.append(crypto.X509Extension(b"basicConstraints", self.basicConstraints_critical, usages.encode('ascii')))
|
||||
|
||||
if self.name_constraints_permitted or self.name_constraints_excluded:
|
||||
usages = ', '.join(
|
||||
['permitted;{0}'.format(name) for name in self.name_constraints_permitted] +
|
||||
['excluded;{0}'.format(name) for name in self.name_constraints_excluded]
|
||||
)
|
||||
extensions.append(crypto.X509Extension(b"nameConstraints", self.name_constraints_critical, usages.encode('ascii')))
|
||||
|
||||
if self.ocspMustStaple:
|
||||
extensions.append(crypto.X509Extension(OPENSSL_MUST_STAPLE_NAME, self.ocspMustStaple_critical, OPENSSL_MUST_STAPLE_VALUE))
|
||||
|
||||
if extensions:
|
||||
req.add_extensions(extensions)
|
||||
|
||||
req.set_pubkey(self.privatekey)
|
||||
req.sign(self.privatekey, self.digest)
|
||||
self.csr = req
|
||||
|
||||
def get_csr_data(self):
|
||||
"""Return bytes for self.csr."""
|
||||
return crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.csr)
|
||||
|
||||
def _check_csr(self):
|
||||
def _check_subject(csr):
|
||||
subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in self.subject]
|
||||
current_subject = [(OpenSSL._util.lib.OBJ_txt2nid(to_bytes(sub[0])), to_bytes(sub[1])) for sub in csr.get_subject().get_components()]
|
||||
if not set(subject) == set(current_subject):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _check_subjectAltName(extensions):
|
||||
altnames_ext = next((ext for ext in extensions if ext.get_short_name() == b'subjectAltName'), '')
|
||||
altnames = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in
|
||||
to_text(altnames_ext, errors='surrogate_or_strict').split(',') if altname.strip()]
|
||||
if self.subjectAltName:
|
||||
if (set(altnames) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.subjectAltName]) or
|
||||
altnames_ext.get_critical() != self.subjectAltName_critical):
|
||||
return False
|
||||
else:
|
||||
if altnames:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _check_keyUsage_(extensions, extName, expected, critical):
|
||||
usages_ext = [ext for ext in extensions if ext.get_short_name() == extName]
|
||||
if (not usages_ext and expected) or (usages_ext and not expected):
|
||||
return False
|
||||
elif not usages_ext and not expected:
|
||||
return True
|
||||
else:
|
||||
current = [OpenSSL._util.lib.OBJ_txt2nid(to_bytes(usage.strip())) for usage in str(usages_ext[0]).split(',')]
|
||||
expected = [OpenSSL._util.lib.OBJ_txt2nid(to_bytes(usage)) for usage in expected]
|
||||
return set(current) == set(expected) and usages_ext[0].get_critical() == critical
|
||||
|
||||
def _check_keyUsage(extensions):
|
||||
usages_ext = [ext for ext in extensions if ext.get_short_name() == b'keyUsage']
|
||||
if (not usages_ext and self.keyUsage) or (usages_ext and not self.keyUsage):
|
||||
return False
|
||||
elif not usages_ext and not self.keyUsage:
|
||||
return True
|
||||
else:
|
||||
# OpenSSL._util.lib.OBJ_txt2nid() always returns 0 for all keyUsage values
|
||||
# (since keyUsage has a fixed bitfield for these values and is not extensible).
|
||||
# Therefore, we create an extension for the wanted values, and compare the
|
||||
# data of the extensions (which is the serialized bitfield).
|
||||
expected_ext = crypto.X509Extension(b"keyUsage", False, ', '.join(self.keyUsage).encode('ascii'))
|
||||
return usages_ext[0].get_data() == expected_ext.get_data() and usages_ext[0].get_critical() == self.keyUsage_critical
|
||||
|
||||
def _check_extenededKeyUsage(extensions):
|
||||
return _check_keyUsage_(extensions, b'extendedKeyUsage', self.extendedKeyUsage, self.extendedKeyUsage_critical)
|
||||
|
||||
def _check_basicConstraints(extensions):
|
||||
return _check_keyUsage_(extensions, b'basicConstraints', self.basicConstraints, self.basicConstraints_critical)
|
||||
|
||||
def _check_nameConstraints(extensions):
|
||||
nc_ext = next((ext for ext in extensions if ext.get_short_name() == b'nameConstraints'), '')
|
||||
permitted, excluded = pyopenssl_parse_name_constraints(nc_ext)
|
||||
if self.name_constraints_permitted or self.name_constraints_excluded:
|
||||
if set(permitted) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.name_constraints_permitted]):
|
||||
return False
|
||||
if set(excluded) != set([pyopenssl_normalize_name_attribute(to_text(name)) for name in self.name_constraints_excluded]):
|
||||
return False
|
||||
if nc_ext.get_critical() != self.name_constraints_critical:
|
||||
return False
|
||||
else:
|
||||
if permitted or excluded:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _check_ocspMustStaple(extensions):
|
||||
oms_ext = [ext for ext in extensions if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE]
|
||||
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000:
|
||||
# Older versions of libssl don't know about OCSP Must Staple
|
||||
oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05'])
|
||||
if self.ocspMustStaple:
|
||||
return len(oms_ext) > 0 and oms_ext[0].get_critical() == self.ocspMustStaple_critical
|
||||
else:
|
||||
return len(oms_ext) == 0
|
||||
|
||||
def _check_extensions(csr):
|
||||
extensions = csr.get_extensions()
|
||||
return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and
|
||||
_check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and
|
||||
_check_ocspMustStaple(extensions) and _check_nameConstraints(extensions))
|
||||
|
||||
def _check_signature(csr):
|
||||
try:
|
||||
return csr.verify(self.privatekey)
|
||||
except crypto.Error:
|
||||
return False
|
||||
|
||||
return _check_subject(self.existing_csr) and _check_extensions(self.existing_csr) and _check_signature(self.existing_csr)
|
||||
|
||||
|
||||
# Implementation with using cryptography
|
||||
class CertificateSigningRequestCryptographyBackend(CertificateSigningRequestBackend):
|
||||
def __init__(self, module):
|
||||
super(CertificateSigningRequestCryptographyBackend, self).__init__(module, 'cryptography')
|
||||
self.cryptography_backend = cryptography.hazmat.backends.default_backend()
|
||||
if self.version != 1:
|
||||
module.warn('The cryptography backend only supports version 1. (The only valid value according to RFC 2986.)')
|
||||
|
||||
def generate_csr(self):
|
||||
"""(Re-)Generate private key."""
|
||||
self._ensure_private_key_loaded()
|
||||
|
||||
csr = cryptography.x509.CertificateSigningRequestBuilder()
|
||||
try:
|
||||
csr = csr.subject_name(cryptography.x509.Name([
|
||||
cryptography.x509.NameAttribute(cryptography_name_to_oid(entry[0]), to_text(entry[1])) for entry in self.subject
|
||||
]))
|
||||
except ValueError as e:
|
||||
raise CertificateSigningRequestError(e)
|
||||
|
||||
if self.subjectAltName:
|
||||
csr = csr.add_extension(cryptography.x509.SubjectAlternativeName([
|
||||
cryptography_get_name(name) for name in self.subjectAltName
|
||||
]), critical=self.subjectAltName_critical)
|
||||
|
||||
if self.keyUsage:
|
||||
params = cryptography_parse_key_usage_params(self.keyUsage)
|
||||
csr = csr.add_extension(cryptography.x509.KeyUsage(**params), critical=self.keyUsage_critical)
|
||||
|
||||
if self.extendedKeyUsage:
|
||||
usages = [cryptography_name_to_oid(usage) for usage in self.extendedKeyUsage]
|
||||
csr = csr.add_extension(cryptography.x509.ExtendedKeyUsage(usages), critical=self.extendedKeyUsage_critical)
|
||||
|
||||
if self.basicConstraints:
|
||||
params = {}
|
||||
ca, path_length = cryptography_get_basic_constraints(self.basicConstraints)
|
||||
csr = csr.add_extension(cryptography.x509.BasicConstraints(ca, path_length), critical=self.basicConstraints_critical)
|
||||
|
||||
if self.ocspMustStaple:
|
||||
try:
|
||||
# This only works with cryptography >= 2.1
|
||||
csr = csr.add_extension(cryptography.x509.TLSFeature([cryptography.x509.TLSFeatureType.status_request]), critical=self.ocspMustStaple_critical)
|
||||
except AttributeError as dummy:
|
||||
csr = csr.add_extension(
|
||||
cryptography.x509.UnrecognizedExtension(CRYPTOGRAPHY_MUST_STAPLE_NAME, CRYPTOGRAPHY_MUST_STAPLE_VALUE),
|
||||
critical=self.ocspMustStaple_critical
|
||||
)
|
||||
|
||||
if self.name_constraints_permitted or self.name_constraints_excluded:
|
||||
try:
|
||||
csr = csr.add_extension(cryptography.x509.NameConstraints(
|
||||
[cryptography_get_name(name) for name in self.name_constraints_permitted],
|
||||
[cryptography_get_name(name) for name in self.name_constraints_excluded],
|
||||
), critical=self.name_constraints_critical)
|
||||
except TypeError as e:
|
||||
raise OpenSSLObjectError('Error while parsing name constraint: {0}'.format(e))
|
||||
|
||||
if self.create_subject_key_identifier:
|
||||
csr = csr.add_extension(
|
||||
cryptography.x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()),
|
||||
critical=False
|
||||
)
|
||||
elif self.subject_key_identifier is not None:
|
||||
csr = csr.add_extension(cryptography.x509.SubjectKeyIdentifier(self.subject_key_identifier), critical=False)
|
||||
|
||||
if self.authority_key_identifier is not None or self.authority_cert_issuer is not None or self.authority_cert_serial_number is not None:
|
||||
issuers = None
|
||||
if self.authority_cert_issuer is not None:
|
||||
issuers = [cryptography_get_name(n) for n in self.authority_cert_issuer]
|
||||
csr = csr.add_extension(
|
||||
cryptography.x509.AuthorityKeyIdentifier(self.authority_key_identifier, issuers, self.authority_cert_serial_number),
|
||||
critical=False
|
||||
)
|
||||
|
||||
digest = None
|
||||
if cryptography_key_needs_digest_for_signing(self.privatekey):
|
||||
digest = select_message_digest(self.digest)
|
||||
if digest is None:
|
||||
raise CertificateSigningRequestError('Unsupported digest "{0}"'.format(self.digest))
|
||||
try:
|
||||
self.csr = csr.sign(self.privatekey, digest, self.cryptography_backend)
|
||||
except TypeError as e:
|
||||
if str(e) == 'Algorithm must be a registered hash algorithm.' and digest is None:
|
||||
self.module.fail_json(msg='Signing with Ed25519 and Ed448 keys requires cryptography 2.8 or newer.')
|
||||
raise
|
||||
except UnicodeError as e:
|
||||
# This catches IDNAErrors, which happens when a bad name is passed as a SAN
|
||||
# (https://github.com/ansible-collections/community.crypto/issues/105).
|
||||
# For older cryptography versions, this is handled by idna, which raises
|
||||
# an idna.core.IDNAError. Later versions of cryptography deprecated and stopped
|
||||
# requiring idna, whence we cannot easily handle this error. Fortunately, in
|
||||
# most versions of idna, IDNAError extends UnicodeError. There is only version
|
||||
# 2.3 where it extends Exception instead (see
|
||||
# https://github.com/kjd/idna/commit/ebefacd3134d0f5da4745878620a6a1cba86d130
|
||||
# and then
|
||||
# https://github.com/kjd/idna/commit/ea03c7b5db7d2a99af082e0239da2b68aeea702a).
|
||||
msg = 'Error while creating CSR: {0}\n'.format(e)
|
||||
if self.using_common_name_for_san:
|
||||
self.module.fail_json(msg=msg + 'This is probably caused because the Common Name is used as a SAN.'
|
||||
' Specifying use_common_name_for_san=false might fix this.')
|
||||
self.module.fail_json(msg=msg + 'This is probably caused by an invalid Subject Alternative DNS Name.')
|
||||
|
||||
def get_csr_data(self):
|
||||
"""Return bytes for self.csr."""
|
||||
return self.csr.public_bytes(cryptography.hazmat.primitives.serialization.Encoding.PEM)
|
||||
|
||||
def _check_csr(self):
|
||||
"""Check whether provided parameters, assuming self.existing_csr and self.privatekey have been populated."""
|
||||
def _check_subject(csr):
|
||||
subject = [(cryptography_name_to_oid(entry[0]), entry[1]) for entry in self.subject]
|
||||
current_subject = [(sub.oid, sub.value) for sub in csr.subject]
|
||||
return set(subject) == set(current_subject)
|
||||
|
||||
def _find_extension(extensions, exttype):
|
||||
return next(
|
||||
(ext for ext in extensions if isinstance(ext.value, exttype)),
|
||||
None
|
||||
)
|
||||
|
||||
def _check_subjectAltName(extensions):
|
||||
current_altnames_ext = _find_extension(extensions, cryptography.x509.SubjectAlternativeName)
|
||||
current_altnames = [str(altname) for altname in current_altnames_ext.value] if current_altnames_ext else []
|
||||
altnames = [str(cryptography_get_name(altname)) for altname in self.subjectAltName] if self.subjectAltName else []
|
||||
if set(altnames) != set(current_altnames):
|
||||
return False
|
||||
if altnames:
|
||||
if current_altnames_ext.critical != self.subjectAltName_critical:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _check_keyUsage(extensions):
|
||||
current_keyusage_ext = _find_extension(extensions, cryptography.x509.KeyUsage)
|
||||
if not self.keyUsage:
|
||||
return current_keyusage_ext is None
|
||||
elif current_keyusage_ext is None:
|
||||
return False
|
||||
params = cryptography_parse_key_usage_params(self.keyUsage)
|
||||
for param in params:
|
||||
if getattr(current_keyusage_ext.value, '_' + param) != params[param]:
|
||||
return False
|
||||
if current_keyusage_ext.critical != self.keyUsage_critical:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _check_extenededKeyUsage(extensions):
|
||||
current_usages_ext = _find_extension(extensions, cryptography.x509.ExtendedKeyUsage)
|
||||
current_usages = [str(usage) for usage in current_usages_ext.value] if current_usages_ext else []
|
||||
usages = [str(cryptography_name_to_oid(usage)) for usage in self.extendedKeyUsage] if self.extendedKeyUsage else []
|
||||
if set(current_usages) != set(usages):
|
||||
return False
|
||||
if usages:
|
||||
if current_usages_ext.critical != self.extendedKeyUsage_critical:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _check_basicConstraints(extensions):
|
||||
bc_ext = _find_extension(extensions, cryptography.x509.BasicConstraints)
|
||||
current_ca = bc_ext.value.ca if bc_ext else False
|
||||
current_path_length = bc_ext.value.path_length if bc_ext else None
|
||||
ca, path_length = cryptography_get_basic_constraints(self.basicConstraints)
|
||||
# Check CA flag
|
||||
if ca != current_ca:
|
||||
return False
|
||||
# Check path length
|
||||
if path_length != current_path_length:
|
||||
return False
|
||||
# Check criticality
|
||||
if self.basicConstraints:
|
||||
if bc_ext.critical != self.basicConstraints_critical:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _check_ocspMustStaple(extensions):
|
||||
try:
|
||||
# This only works with cryptography >= 2.1
|
||||
tlsfeature_ext = _find_extension(extensions, cryptography.x509.TLSFeature)
|
||||
has_tlsfeature = True
|
||||
except AttributeError as dummy:
|
||||
tlsfeature_ext = next(
|
||||
(ext for ext in extensions if ext.value.oid == CRYPTOGRAPHY_MUST_STAPLE_NAME),
|
||||
None
|
||||
)
|
||||
has_tlsfeature = False
|
||||
if self.ocspMustStaple:
|
||||
if not tlsfeature_ext or tlsfeature_ext.critical != self.ocspMustStaple_critical:
|
||||
return False
|
||||
if has_tlsfeature:
|
||||
return cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value
|
||||
else:
|
||||
return tlsfeature_ext.value.value == CRYPTOGRAPHY_MUST_STAPLE_VALUE
|
||||
else:
|
||||
return tlsfeature_ext is None
|
||||
|
||||
def _check_nameConstraints(extensions):
|
||||
current_nc_ext = _find_extension(extensions, cryptography.x509.NameConstraints)
|
||||
current_nc_perm = [str(altname) for altname in current_nc_ext.value.permitted_subtrees] if current_nc_ext else []
|
||||
current_nc_excl = [str(altname) for altname in current_nc_ext.value.excluded_subtrees] if current_nc_ext else []
|
||||
nc_perm = [str(cryptography_get_name(altname)) for altname in self.name_constraints_permitted]
|
||||
nc_excl = [str(cryptography_get_name(altname)) for altname in self.name_constraints_excluded]
|
||||
if set(nc_perm) != set(current_nc_perm) or set(nc_excl) != set(current_nc_excl):
|
||||
return False
|
||||
if nc_perm or nc_excl:
|
||||
if current_nc_ext.critical != self.name_constraints_critical:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _check_subject_key_identifier(extensions):
|
||||
ext = _find_extension(extensions, cryptography.x509.SubjectKeyIdentifier)
|
||||
if self.create_subject_key_identifier or self.subject_key_identifier is not None:
|
||||
if not ext or ext.critical:
|
||||
return False
|
||||
if self.create_subject_key_identifier:
|
||||
digest = cryptography.x509.SubjectKeyIdentifier.from_public_key(self.privatekey.public_key()).digest
|
||||
return ext.value.digest == digest
|
||||
else:
|
||||
return ext.value.digest == self.subject_key_identifier
|
||||
else:
|
||||
return ext is None
|
||||
|
||||
def _check_authority_key_identifier(extensions):
|
||||
ext = _find_extension(extensions, cryptography.x509.AuthorityKeyIdentifier)
|
||||
if self.authority_key_identifier is not None or self.authority_cert_issuer is not None or self.authority_cert_serial_number is not None:
|
||||
if not ext or ext.critical:
|
||||
return False
|
||||
aci = None
|
||||
csr_aci = None
|
||||
if self.authority_cert_issuer is not None:
|
||||
aci = [str(cryptography_get_name(n)) for n in self.authority_cert_issuer]
|
||||
if ext.value.authority_cert_issuer is not None:
|
||||
csr_aci = [str(n) for n in ext.value.authority_cert_issuer]
|
||||
return (ext.value.key_identifier == self.authority_key_identifier
|
||||
and csr_aci == aci
|
||||
and ext.value.authority_cert_serial_number == self.authority_cert_serial_number)
|
||||
else:
|
||||
return ext is None
|
||||
|
||||
def _check_extensions(csr):
|
||||
extensions = csr.extensions
|
||||
return (_check_subjectAltName(extensions) and _check_keyUsage(extensions) and
|
||||
_check_extenededKeyUsage(extensions) and _check_basicConstraints(extensions) and
|
||||
_check_ocspMustStaple(extensions) and _check_subject_key_identifier(extensions) and
|
||||
_check_authority_key_identifier(extensions) and _check_nameConstraints(extensions))
|
||||
|
||||
def _check_signature(csr):
|
||||
if not csr.is_signature_valid:
|
||||
return False
|
||||
# To check whether public key of CSR belongs to private key,
|
||||
# encode both public keys and compare PEMs.
|
||||
key_a = csr.public_key().public_bytes(
|
||||
cryptography.hazmat.primitives.serialization.Encoding.PEM,
|
||||
cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
key_b = self.privatekey.public_key().public_bytes(
|
||||
cryptography.hazmat.primitives.serialization.Encoding.PEM,
|
||||
cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
return key_a == key_b
|
||||
|
||||
return _check_subject(self.existing_csr) and _check_extensions(self.existing_csr) and _check_signature(self.existing_csr)
|
||||
|
||||
|
||||
def select_backend(module, backend):
|
||||
if module.params['version'] != 1:
|
||||
module.deprecate('The version option will only support allowed values from community.crypto 2.0.0 on. '
|
||||
'Currently, only the value 1 is allowed by RFC 2986',
|
||||
version='2.0.0', collection_name='community.crypto')
|
||||
|
||||
if backend == 'auto':
|
||||
# Detection what is possible
|
||||
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
|
||||
|
||||
# First try cryptography, then pyOpenSSL
|
||||
if can_use_cryptography:
|
||||
backend = 'cryptography'
|
||||
elif can_use_pyopenssl:
|
||||
backend = 'pyopenssl'
|
||||
|
||||
# Success?
|
||||
if backend == 'auto':
|
||||
module.fail_json(msg=("Can't detect any of the required Python libraries "
|
||||
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION,
|
||||
MINIMAL_PYOPENSSL_VERSION))
|
||||
|
||||
if backend == 'pyopenssl':
|
||||
if not PYOPENSSL_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
|
||||
exception=PYOPENSSL_IMP_ERR)
|
||||
try:
|
||||
getattr(crypto.X509Req, 'get_extensions')
|
||||
except AttributeError:
|
||||
module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs')
|
||||
|
||||
module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
|
||||
version='2.0.0', collection_name='community.crypto')
|
||||
return backend, CertificateSigningRequestPyOpenSSLBackend(module)
|
||||
elif backend == 'cryptography':
|
||||
if not CRYPTOGRAPHY_FOUND:
|
||||
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
|
||||
exception=CRYPTOGRAPHY_IMP_ERR)
|
||||
return backend, CertificateSigningRequestCryptographyBackend(module)
|
||||
else:
|
||||
raise Exception('Unsupported value for backend: {0}'.format(backend))
|
||||
|
||||
|
||||
def get_csr_argument_spec():
|
||||
return ArgumentSpec(
|
||||
argument_spec=dict(
|
||||
digest=dict(type='str', default='sha256'),
|
||||
privatekey_path=dict(type='path'),
|
||||
privatekey_content=dict(type='str', no_log=True),
|
||||
privatekey_passphrase=dict(type='str', no_log=True),
|
||||
version=dict(type='int', default=1),
|
||||
subject=dict(type='dict'),
|
||||
country_name=dict(type='str', aliases=['C', 'countryName']),
|
||||
state_or_province_name=dict(type='str', aliases=['ST', 'stateOrProvinceName']),
|
||||
locality_name=dict(type='str', aliases=['L', 'localityName']),
|
||||
organization_name=dict(type='str', aliases=['O', 'organizationName']),
|
||||
organizational_unit_name=dict(type='str', aliases=['OU', 'organizationalUnitName']),
|
||||
common_name=dict(type='str', aliases=['CN', 'commonName']),
|
||||
email_address=dict(type='str', aliases=['E', 'emailAddress']),
|
||||
subject_alt_name=dict(type='list', elements='str', aliases=['subjectAltName']),
|
||||
subject_alt_name_critical=dict(type='bool', default=False, aliases=['subjectAltName_critical']),
|
||||
use_common_name_for_san=dict(type='bool', default=True, aliases=['useCommonNameForSAN']),
|
||||
key_usage=dict(type='list', elements='str', aliases=['keyUsage']),
|
||||
key_usage_critical=dict(type='bool', default=False, aliases=['keyUsage_critical']),
|
||||
extended_key_usage=dict(type='list', elements='str', aliases=['extKeyUsage', 'extendedKeyUsage']),
|
||||
extended_key_usage_critical=dict(type='bool', default=False, aliases=['extKeyUsage_critical', 'extendedKeyUsage_critical']),
|
||||
basic_constraints=dict(type='list', elements='str', aliases=['basicConstraints']),
|
||||
basic_constraints_critical=dict(type='bool', default=False, aliases=['basicConstraints_critical']),
|
||||
ocsp_must_staple=dict(type='bool', default=False, aliases=['ocspMustStaple']),
|
||||
ocsp_must_staple_critical=dict(type='bool', default=False, aliases=['ocspMustStaple_critical']),
|
||||
name_constraints_permitted=dict(type='list', elements='str'),
|
||||
name_constraints_excluded=dict(type='list', elements='str'),
|
||||
name_constraints_critical=dict(type='bool', default=False),
|
||||
create_subject_key_identifier=dict(type='bool', default=False),
|
||||
subject_key_identifier=dict(type='str'),
|
||||
authority_key_identifier=dict(type='str'),
|
||||
authority_cert_issuer=dict(type='list', elements='str'),
|
||||
authority_cert_serial_number=dict(type='int'),
|
||||
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
|
||||
),
|
||||
required_together=[
|
||||
['authority_cert_issuer', 'authority_cert_serial_number'],
|
||||
],
|
||||
mutually_exclusive=[
|
||||
['privatekey_path', 'privatekey_content'],
|
||||
],
|
||||
required_one_of=[
|
||||
['privatekey_path', 'privatekey_content'],
|
||||
],
|
||||
)
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,175 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright: (c) 2017, Yanis Guenane <yanis+ansible@guenane.org>
|
||||
# Copyright: (c) 2020, Felix Fontein <felix@fontein.de>
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
DOCUMENTATION = r'''
|
||||
---
|
||||
module: openssl_csr_pipe
|
||||
short_description: Generate OpenSSL Certificate Signing Request (CSR)
|
||||
version_added: 1.3.0
|
||||
description:
|
||||
- "Please note that the module regenerates an existing CSR if it doesn't match the module's
|
||||
options, or if it seems to be corrupt."
|
||||
author:
|
||||
- Yanis Guenane (@Spredzy)
|
||||
- Felix Fontein (@felixfontein)
|
||||
options:
|
||||
content:
|
||||
description:
|
||||
- The existing CSR.
|
||||
type: str
|
||||
extends_documentation_fragment:
|
||||
- community.crypto.module_csr
|
||||
seealso:
|
||||
- module: community.crypto.openssl_csr
|
||||
'''
|
||||
|
||||
EXAMPLES = r'''
|
||||
- name: Generate an OpenSSL Certificate Signing Request
|
||||
community.crypto.openssl_csr_pipe:
|
||||
privatekey_path: /etc/ssl/private/ansible.com.pem
|
||||
common_name: www.ansible.com
|
||||
register: result
|
||||
- debug:
|
||||
var: result.csr
|
||||
|
||||
- name: Generate an OpenSSL Certificate Signing Request with an inline key
|
||||
community.crypto.openssl_csr:
|
||||
content: "{{ lookup('file', '/etc/ssl/csr/www.ansible.com.csr') }}"
|
||||
privatekey_content: "{{ private_key_content }}"
|
||||
common_name: www.ansible.com
|
||||
register: result
|
||||
- name: Store CSR
|
||||
ansible.builtin.copy:
|
||||
path: /etc/ssl/csr/www.ansible.com.csr
|
||||
content: "{{ result.csr }}"
|
||||
when: result is changed
|
||||
'''
|
||||
|
||||
RETURN = r'''
|
||||
privatekey:
|
||||
description:
|
||||
- Path to the TLS/SSL private key the CSR was generated for
|
||||
- Will be C(none) if the private key has been provided in I(privatekey_content).
|
||||
returned: changed or success
|
||||
type: str
|
||||
sample: /etc/ssl/private/ansible.com.pem
|
||||
subject:
|
||||
description: A list of the subject tuples attached to the CSR
|
||||
returned: changed or success
|
||||
type: list
|
||||
elements: list
|
||||
sample: "[('CN', 'www.ansible.com'), ('O', 'Ansible')]"
|
||||
subjectAltName:
|
||||
description: The alternative names this CSR is valid for
|
||||
returned: changed or success
|
||||
type: list
|
||||
elements: str
|
||||
sample: [ 'DNS:www.ansible.com', 'DNS:m.ansible.com' ]
|
||||
keyUsage:
|
||||
description: Purpose for which the public key may be used
|
||||
returned: changed or success
|
||||
type: list
|
||||
elements: str
|
||||
sample: [ 'digitalSignature', 'keyAgreement' ]
|
||||
extendedKeyUsage:
|
||||
description: Additional restriction on the public key purposes
|
||||
returned: changed or success
|
||||
type: list
|
||||
elements: str
|
||||
sample: [ 'clientAuth' ]
|
||||
basicConstraints:
|
||||
description: Indicates if the certificate belongs to a CA
|
||||
returned: changed or success
|
||||
type: list
|
||||
elements: str
|
||||
sample: ['CA:TRUE', 'pathLenConstraint:0']
|
||||
ocsp_must_staple:
|
||||
description: Indicates whether the certificate has the OCSP
|
||||
Must Staple feature enabled
|
||||
returned: changed or success
|
||||
type: bool
|
||||
sample: false
|
||||
name_constraints_permitted:
|
||||
description: List of permitted subtrees to sign certificates for.
|
||||
returned: changed or success
|
||||
type: list
|
||||
elements: str
|
||||
sample: ['email:.somedomain.com']
|
||||
name_constraints_excluded:
|
||||
description: List of excluded subtrees the CA cannot sign certificates for.
|
||||
returned: changed or success
|
||||
type: list
|
||||
elements: str
|
||||
sample: ['email:.com']
|
||||
csr:
|
||||
description: The (current or generated) CSR's content.
|
||||
returned: if I(state) is C(present)
|
||||
type: str
|
||||
'''
|
||||
|
||||
from ansible.module_utils._text import to_native
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.csr import (
|
||||
select_backend,
|
||||
get_csr_argument_spec,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
||||
OpenSSLObjectError,
|
||||
)
|
||||
|
||||
|
||||
class CertificateSigningRequestModule(object):
|
||||
def __init__(self, module, module_backend):
|
||||
self.check_mode = module.check_mode
|
||||
self.module_backend = module_backend
|
||||
self.changed = False
|
||||
if module.params['content'] is not None:
|
||||
self.module_backend.set_existing(module.params['content'].encode('utf-8'))
|
||||
|
||||
def generate(self, module):
|
||||
'''Generate the certificate signing request.'''
|
||||
if self.module_backend.needs_regeneration():
|
||||
if not self.check_mode:
|
||||
self.module_backend.generate_csr()
|
||||
self.changed = True
|
||||
|
||||
def dump(self):
|
||||
'''Serialize the object into a dictionary.'''
|
||||
result = self.module_backend.dump(include_csr=True)
|
||||
result.update({
|
||||
'changed': self.changed,
|
||||
})
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
argument_spec = get_csr_argument_spec()
|
||||
argument_spec.argument_spec.update(dict(
|
||||
content=dict(type='str'),
|
||||
))
|
||||
module = argument_spec.create_ansible_module(
|
||||
supports_check_mode=True,
|
||||
)
|
||||
|
||||
backend = module.params['select_crypto_backend']
|
||||
backend, module_backend = select_backend(module, backend)
|
||||
try:
|
||||
csr = CertificateSigningRequestModule(module, module_backend)
|
||||
csr.generate(module)
|
||||
result = csr.dump()
|
||||
module.exit_json(**result)
|
||||
except OpenSSLObjectError as exc:
|
||||
module.fail_json(msg=to_native(exc))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -599,6 +599,7 @@ notes:
|
|||
certificate can be moved to the target machine.
|
||||
seealso:
|
||||
- module: community.crypto.openssl_csr
|
||||
- module: community.crypto.openssl_csr_pipe
|
||||
- module: community.crypto.openssl_dhparam
|
||||
- module: community.crypto.openssl_pkcs12
|
||||
- module: community.crypto.openssl_privatekey
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
shippable/posix/group1
|
||||
destructive
|
|
@ -0,0 +1,2 @@
|
|||
dependencies:
|
||||
- setup_openssl
|
|
@ -0,0 +1,91 @@
|
|||
---
|
||||
- name: "({{ select_crypto_backend }}) Generate privatekey"
|
||||
openssl_privatekey:
|
||||
path: '{{ output_dir }}/privatekey.pem'
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Generate CSR (check mode)"
|
||||
openssl_csr_pipe:
|
||||
privatekey_path: '{{ output_dir }}/privatekey.pem'
|
||||
subject:
|
||||
commonName: www.ansible.com
|
||||
select_crypto_backend: '{{ select_crypto_backend }}'
|
||||
check_mode: yes
|
||||
register: generate_csr_check
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Generate CSR"
|
||||
openssl_csr_pipe:
|
||||
privatekey_path: '{{ output_dir }}/privatekey.pem'
|
||||
subject:
|
||||
commonName: www.ansible.com
|
||||
select_crypto_backend: '{{ select_crypto_backend }}'
|
||||
register: generate_csr
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Generate CSR (idempotent)"
|
||||
openssl_csr_pipe:
|
||||
content: "{{ generate_csr.csr }}"
|
||||
privatekey_path: '{{ output_dir }}/privatekey.pem'
|
||||
subject:
|
||||
commonName: www.ansible.com
|
||||
select_crypto_backend: '{{ select_crypto_backend }}'
|
||||
register: generate_csr_idempotent
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Generate CSR (idempotent, check mode)"
|
||||
openssl_csr_pipe:
|
||||
content: "{{ generate_csr.csr }}"
|
||||
privatekey_path: '{{ output_dir }}/privatekey.pem'
|
||||
subject:
|
||||
commonName: www.ansible.com
|
||||
select_crypto_backend: '{{ select_crypto_backend }}'
|
||||
check_mode: yes
|
||||
register: generate_csr_idempotent_check
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Generate CSR (changed)"
|
||||
openssl_csr_pipe:
|
||||
content: "{{ generate_csr.csr }}"
|
||||
privatekey_path: '{{ output_dir }}/privatekey.pem'
|
||||
subject:
|
||||
commonName: ansible.com
|
||||
select_crypto_backend: '{{ select_crypto_backend }}'
|
||||
register: generate_csr_changed
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Generate CSR (changed, check mode)"
|
||||
openssl_csr_pipe:
|
||||
content: "{{ generate_csr.csr }}"
|
||||
privatekey_path: '{{ output_dir }}/privatekey.pem'
|
||||
subject:
|
||||
commonName: ansible.com
|
||||
select_crypto_backend: '{{ select_crypto_backend }}'
|
||||
check_mode: yes
|
||||
register: generate_csr_changed_check
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Validate CSR (test - privatekey modulus)"
|
||||
shell: 'openssl rsa -noout -modulus -in {{ output_dir }}/privatekey.pem'
|
||||
register: privatekey_modulus
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Validate CSR (test - Common Name)"
|
||||
shell: "openssl req -noout -subject -in /dev/stdin -nameopt oneline,-space_eq"
|
||||
args:
|
||||
stdin: "{{ generate_csr.csr }}"
|
||||
register: csr_cn
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Validate CSR (test - csr modulus)"
|
||||
shell: 'openssl req -noout -modulus -in /dev/stdin'
|
||||
args:
|
||||
stdin: "{{ generate_csr.csr }}"
|
||||
register: csr_modulus
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Validate CSR (assert)"
|
||||
assert:
|
||||
that:
|
||||
- csr_cn.stdout.split('=')[-1] == 'www.ansible.com'
|
||||
- csr_modulus.stdout == privatekey_modulus.stdout
|
||||
|
||||
- name: "({{ select_crypto_backend }}) Validate CSR (check mode, idempotency)"
|
||||
assert:
|
||||
that:
|
||||
- generate_csr_check is changed
|
||||
- generate_csr is changed
|
||||
- generate_csr_idempotent is not changed
|
||||
- generate_csr_idempotent_check is not changed
|
||||
- generate_csr_changed is changed
|
||||
- generate_csr_changed_check is changed
|
|
@ -0,0 +1,40 @@
|
|||
---
|
||||
####################################################################
|
||||
# WARNING: These are designed specifically for Ansible tests #
|
||||
# and should not be used as examples of how to write Ansible roles #
|
||||
####################################################################
|
||||
|
||||
- name: Prepare private key for backend autodetection test
|
||||
openssl_privatekey:
|
||||
path: '{{ output_dir }}/privatekey_backend_selection.pem'
|
||||
- name: Run module with backend autodetection
|
||||
openssl_csr_pipe:
|
||||
privatekey_path: '{{ output_dir }}/privatekey_backend_selection.pem'
|
||||
subject:
|
||||
commonName: www.ansible.com
|
||||
|
||||
- block:
|
||||
- name: Running tests with pyOpenSSL backend
|
||||
include_tasks: impl.yml
|
||||
vars:
|
||||
select_crypto_backend: pyopenssl
|
||||
|
||||
when: pyopenssl_version.stdout is version('0.15', '>=')
|
||||
|
||||
- name: Remove output directory
|
||||
file:
|
||||
path: "{{ output_dir }}"
|
||||
state: absent
|
||||
|
||||
- name: Re-create output directory
|
||||
file:
|
||||
path: "{{ output_dir }}"
|
||||
state: directory
|
||||
|
||||
- block:
|
||||
- name: Running tests with cryptography backend
|
||||
include_tasks: impl.yml
|
||||
vars:
|
||||
select_crypto_backend: cryptography
|
||||
|
||||
when: cryptography_version.stdout is version('1.3', '>=')
|
Loading…
Reference in New Issue