openssl_csr_info: move main code to module_utils to allow easier implementation of diff mode (#204)
* Move openssl_csr_info code to module_utils. * Add changelog fragment. * Apply suggestions from code review Co-authored-by: Ajpantuso <ajpantuso@gmail.com> Co-authored-by: Ajpantuso <ajpantuso@gmail.com>pull/235/head
parent
6c5a0c6df1
commit
c0edfb46bb
|
@ -0,0 +1,2 @@
|
||||||
|
minor_changes:
|
||||||
|
- "openssl_csr_info - refactor module to allow code re-use for diff mode (https://github.com/ansible-collections/community.crypto/pull/204)."
|
|
@ -0,0 +1,461 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org>
|
||||||
|
# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at>
|
||||||
|
# Copyright: (c) 2020, Felix Fontein <felix@fontein.de>
|
||||||
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||||
|
|
||||||
|
from __future__ import absolute_import, division, print_function
|
||||||
|
__metaclass__ = type
|
||||||
|
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import binascii
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
from distutils.version import LooseVersion
|
||||||
|
|
||||||
|
from ansible.module_utils import six
|
||||||
|
from ansible.module_utils.basic import missing_required_lib
|
||||||
|
from ansible.module_utils._text import to_native, to_text, to_bytes
|
||||||
|
|
||||||
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||||
|
load_certificate_request,
|
||||||
|
get_fingerprint_of_bytes,
|
||||||
|
)
|
||||||
|
|
||||||
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||||
|
cryptography_decode_name,
|
||||||
|
cryptography_get_extensions_from_csr,
|
||||||
|
cryptography_oid_to_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
|
||||||
|
pyopenssl_get_extensions_from_csr,
|
||||||
|
pyopenssl_normalize_name,
|
||||||
|
pyopenssl_normalize_name_attribute,
|
||||||
|
pyopenssl_parse_name_constraints,
|
||||||
|
)
|
||||||
|
|
||||||
|
MINIMAL_CRYPTOGRAPHY_VERSION = '1.3'
|
||||||
|
MINIMAL_PYOPENSSL_VERSION = '0.15'
|
||||||
|
|
||||||
|
PYOPENSSL_IMP_ERR = None
|
||||||
|
try:
|
||||||
|
import OpenSSL
|
||||||
|
from OpenSSL import crypto
|
||||||
|
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
|
||||||
|
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
|
||||||
|
# OpenSSL 1.1.0 or newer
|
||||||
|
OPENSSL_MUST_STAPLE_NAME = b"tlsfeature"
|
||||||
|
OPENSSL_MUST_STAPLE_VALUE = b"status_request"
|
||||||
|
else:
|
||||||
|
# OpenSSL 1.0.x or older
|
||||||
|
OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24"
|
||||||
|
OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05"
|
||||||
|
except ImportError:
|
||||||
|
PYOPENSSL_IMP_ERR = traceback.format_exc()
|
||||||
|
PYOPENSSL_FOUND = False
|
||||||
|
else:
|
||||||
|
PYOPENSSL_FOUND = True
|
||||||
|
|
||||||
|
CRYPTOGRAPHY_IMP_ERR = None
|
||||||
|
try:
|
||||||
|
import cryptography
|
||||||
|
from cryptography import x509
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||||
|
except ImportError:
|
||||||
|
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||||
|
CRYPTOGRAPHY_FOUND = False
|
||||||
|
else:
|
||||||
|
CRYPTOGRAPHY_FOUND = True
|
||||||
|
|
||||||
|
|
||||||
|
TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class CSRInfoRetrieval(object):
|
||||||
|
def __init__(self, module, backend, content, validate_signature):
|
||||||
|
# content must be a bytes string
|
||||||
|
self.module = module
|
||||||
|
self.backend = backend
|
||||||
|
self.content = content
|
||||||
|
self.validate_signature = validate_signature
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_subject_ordered(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_key_usage(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_extended_key_usage(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_basic_constraints(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_ocsp_must_staple(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_subject_alt_name(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_name_constraints(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_public_key(self, binary):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_subject_key_identifier(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_authority_key_identifier(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_all_extensions(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _is_signature_valid(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_info(self):
|
||||||
|
result = dict()
|
||||||
|
self.csr = load_certificate_request(None, content=self.content, backend=self.backend)
|
||||||
|
|
||||||
|
subject = self._get_subject_ordered()
|
||||||
|
result['subject'] = dict()
|
||||||
|
for k, v in subject:
|
||||||
|
result['subject'][k] = v
|
||||||
|
result['subject_ordered'] = subject
|
||||||
|
result['key_usage'], result['key_usage_critical'] = self._get_key_usage()
|
||||||
|
result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage()
|
||||||
|
result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints()
|
||||||
|
result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple()
|
||||||
|
result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name()
|
||||||
|
(
|
||||||
|
result['name_constraints_permitted'],
|
||||||
|
result['name_constraints_excluded'],
|
||||||
|
result['name_constraints_critical'],
|
||||||
|
) = self._get_name_constraints()
|
||||||
|
|
||||||
|
result['public_key'] = self._get_public_key(binary=False)
|
||||||
|
pk = self._get_public_key(binary=True)
|
||||||
|
result['public_key_fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict()
|
||||||
|
|
||||||
|
if self.backend != 'pyopenssl':
|
||||||
|
ski = self._get_subject_key_identifier()
|
||||||
|
if ski is not None:
|
||||||
|
ski = to_native(binascii.hexlify(ski))
|
||||||
|
ski = ':'.join([ski[i:i + 2] for i in range(0, len(ski), 2)])
|
||||||
|
result['subject_key_identifier'] = ski
|
||||||
|
|
||||||
|
aki, aci, acsn = self._get_authority_key_identifier()
|
||||||
|
if aki is not None:
|
||||||
|
aki = to_native(binascii.hexlify(aki))
|
||||||
|
aki = ':'.join([aki[i:i + 2] for i in range(0, len(aki), 2)])
|
||||||
|
result['authority_key_identifier'] = aki
|
||||||
|
result['authority_cert_issuer'] = aci
|
||||||
|
result['authority_cert_serial_number'] = acsn
|
||||||
|
|
||||||
|
result['extensions_by_oid'] = self._get_all_extensions()
|
||||||
|
|
||||||
|
result['signature_valid'] = self._is_signature_valid()
|
||||||
|
if self.validate_signature and not result['signature_valid']:
|
||||||
|
self.module.fail_json(
|
||||||
|
msg='CSR signature is invalid!',
|
||||||
|
**result
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
class CSRInfoRetrievalCryptography(CSRInfoRetrieval):
|
||||||
|
"""Validate the supplied CSR, using the cryptography backend"""
|
||||||
|
def __init__(self, module, content, validate_signature):
|
||||||
|
super(CSRInfoRetrievalCryptography, self).__init__(module, 'cryptography', content, validate_signature)
|
||||||
|
|
||||||
|
def _get_subject_ordered(self):
|
||||||
|
result = []
|
||||||
|
for attribute in self.csr.subject:
|
||||||
|
result.append([cryptography_oid_to_name(attribute.oid), attribute.value])
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _get_key_usage(self):
|
||||||
|
try:
|
||||||
|
current_key_ext = self.csr.extensions.get_extension_for_class(x509.KeyUsage)
|
||||||
|
current_key_usage = current_key_ext.value
|
||||||
|
key_usage = dict(
|
||||||
|
digital_signature=current_key_usage.digital_signature,
|
||||||
|
content_commitment=current_key_usage.content_commitment,
|
||||||
|
key_encipherment=current_key_usage.key_encipherment,
|
||||||
|
data_encipherment=current_key_usage.data_encipherment,
|
||||||
|
key_agreement=current_key_usage.key_agreement,
|
||||||
|
key_cert_sign=current_key_usage.key_cert_sign,
|
||||||
|
crl_sign=current_key_usage.crl_sign,
|
||||||
|
encipher_only=False,
|
||||||
|
decipher_only=False,
|
||||||
|
)
|
||||||
|
if key_usage['key_agreement']:
|
||||||
|
key_usage.update(dict(
|
||||||
|
encipher_only=current_key_usage.encipher_only,
|
||||||
|
decipher_only=current_key_usage.decipher_only
|
||||||
|
))
|
||||||
|
|
||||||
|
key_usage_names = dict(
|
||||||
|
digital_signature='Digital Signature',
|
||||||
|
content_commitment='Non Repudiation',
|
||||||
|
key_encipherment='Key Encipherment',
|
||||||
|
data_encipherment='Data Encipherment',
|
||||||
|
key_agreement='Key Agreement',
|
||||||
|
key_cert_sign='Certificate Sign',
|
||||||
|
crl_sign='CRL Sign',
|
||||||
|
encipher_only='Encipher Only',
|
||||||
|
decipher_only='Decipher Only',
|
||||||
|
)
|
||||||
|
return sorted([
|
||||||
|
key_usage_names[name] for name, value in key_usage.items() if value
|
||||||
|
]), current_key_ext.critical
|
||||||
|
except cryptography.x509.ExtensionNotFound:
|
||||||
|
return None, False
|
||||||
|
|
||||||
|
def _get_extended_key_usage(self):
|
||||||
|
try:
|
||||||
|
ext_keyusage_ext = self.csr.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
|
||||||
|
return sorted([
|
||||||
|
cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value
|
||||||
|
]), ext_keyusage_ext.critical
|
||||||
|
except cryptography.x509.ExtensionNotFound:
|
||||||
|
return None, False
|
||||||
|
|
||||||
|
def _get_basic_constraints(self):
|
||||||
|
try:
|
||||||
|
ext_keyusage_ext = self.csr.extensions.get_extension_for_class(x509.BasicConstraints)
|
||||||
|
result = ['CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE')]
|
||||||
|
if ext_keyusage_ext.value.path_length is not None:
|
||||||
|
result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length))
|
||||||
|
return sorted(result), ext_keyusage_ext.critical
|
||||||
|
except cryptography.x509.ExtensionNotFound:
|
||||||
|
return None, False
|
||||||
|
|
||||||
|
def _get_ocsp_must_staple(self):
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
# This only works with cryptography >= 2.1
|
||||||
|
tlsfeature_ext = self.csr.extensions.get_extension_for_class(x509.TLSFeature)
|
||||||
|
value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value
|
||||||
|
except AttributeError:
|
||||||
|
# Fallback for cryptography < 2.1
|
||||||
|
oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24")
|
||||||
|
tlsfeature_ext = self.csr.extensions.get_extension_for_oid(oid)
|
||||||
|
value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05"
|
||||||
|
return value, tlsfeature_ext.critical
|
||||||
|
except cryptography.x509.ExtensionNotFound:
|
||||||
|
return None, False
|
||||||
|
|
||||||
|
def _get_subject_alt_name(self):
|
||||||
|
try:
|
||||||
|
san_ext = self.csr.extensions.get_extension_for_class(x509.SubjectAlternativeName)
|
||||||
|
result = [cryptography_decode_name(san) for san in san_ext.value]
|
||||||
|
return result, san_ext.critical
|
||||||
|
except cryptography.x509.ExtensionNotFound:
|
||||||
|
return None, False
|
||||||
|
|
||||||
|
def _get_name_constraints(self):
|
||||||
|
try:
|
||||||
|
nc_ext = self.csr.extensions.get_extension_for_class(x509.NameConstraints)
|
||||||
|
permitted = [cryptography_decode_name(san) for san in nc_ext.value.permitted_subtrees or []]
|
||||||
|
excluded = [cryptography_decode_name(san) for san in nc_ext.value.excluded_subtrees or []]
|
||||||
|
return permitted, excluded, nc_ext.critical
|
||||||
|
except cryptography.x509.ExtensionNotFound:
|
||||||
|
return None, None, False
|
||||||
|
|
||||||
|
def _get_public_key(self, binary):
|
||||||
|
return self.csr.public_key().public_bytes(
|
||||||
|
serialization.Encoding.DER if binary else serialization.Encoding.PEM,
|
||||||
|
serialization.PublicFormat.SubjectPublicKeyInfo
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_subject_key_identifier(self):
|
||||||
|
try:
|
||||||
|
ext = self.csr.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
|
||||||
|
return ext.value.digest
|
||||||
|
except cryptography.x509.ExtensionNotFound:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_authority_key_identifier(self):
|
||||||
|
try:
|
||||||
|
ext = self.csr.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
|
||||||
|
issuer = None
|
||||||
|
if ext.value.authority_cert_issuer is not None:
|
||||||
|
issuer = [cryptography_decode_name(san) for san in ext.value.authority_cert_issuer]
|
||||||
|
return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number
|
||||||
|
except cryptography.x509.ExtensionNotFound:
|
||||||
|
return None, None, None
|
||||||
|
|
||||||
|
def _get_all_extensions(self):
|
||||||
|
return cryptography_get_extensions_from_csr(self.csr)
|
||||||
|
|
||||||
|
def _is_signature_valid(self):
|
||||||
|
return self.csr.is_signature_valid
|
||||||
|
|
||||||
|
|
||||||
|
class CSRInfoRetrievalPyOpenSSL(CSRInfoRetrieval):
|
||||||
|
"""validate the supplied CSR."""
|
||||||
|
|
||||||
|
def __init__(self, module, content, validate_signature):
|
||||||
|
super(CSRInfoRetrievalPyOpenSSL, self).__init__(module, 'pyopenssl', content, validate_signature)
|
||||||
|
|
||||||
|
def __get_name(self, name):
|
||||||
|
result = []
|
||||||
|
for sub in name.get_components():
|
||||||
|
result.append([pyopenssl_normalize_name(sub[0]), to_text(sub[1])])
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _get_subject_ordered(self):
|
||||||
|
return self.__get_name(self.csr.get_subject())
|
||||||
|
|
||||||
|
def _get_extension(self, short_name):
|
||||||
|
for extension in self.csr.get_extensions():
|
||||||
|
if extension.get_short_name() == short_name:
|
||||||
|
result = [
|
||||||
|
pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',')
|
||||||
|
]
|
||||||
|
return sorted(result), bool(extension.get_critical())
|
||||||
|
return None, False
|
||||||
|
|
||||||
|
def _get_key_usage(self):
|
||||||
|
return self._get_extension(b'keyUsage')
|
||||||
|
|
||||||
|
def _get_extended_key_usage(self):
|
||||||
|
return self._get_extension(b'extendedKeyUsage')
|
||||||
|
|
||||||
|
def _get_basic_constraints(self):
|
||||||
|
return self._get_extension(b'basicConstraints')
|
||||||
|
|
||||||
|
def _get_ocsp_must_staple(self):
|
||||||
|
extensions = self.csr.get_extensions()
|
||||||
|
oms_ext = [
|
||||||
|
ext for ext in extensions
|
||||||
|
if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE
|
||||||
|
]
|
||||||
|
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000:
|
||||||
|
# Older versions of libssl don't know about OCSP Must Staple
|
||||||
|
oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05'])
|
||||||
|
if oms_ext:
|
||||||
|
return True, bool(oms_ext[0].get_critical())
|
||||||
|
else:
|
||||||
|
return None, False
|
||||||
|
|
||||||
|
def _get_subject_alt_name(self):
|
||||||
|
for extension in self.csr.get_extensions():
|
||||||
|
if extension.get_short_name() == b'subjectAltName':
|
||||||
|
result = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in
|
||||||
|
to_text(extension, errors='surrogate_or_strict').split(', ')]
|
||||||
|
return result, bool(extension.get_critical())
|
||||||
|
return None, False
|
||||||
|
|
||||||
|
def _get_name_constraints(self):
|
||||||
|
for extension in self.csr.get_extensions():
|
||||||
|
if extension.get_short_name() == b'nameConstraints':
|
||||||
|
permitted, excluded = pyopenssl_parse_name_constraints(extension)
|
||||||
|
return permitted, excluded, bool(extension.get_critical())
|
||||||
|
return None, None, False
|
||||||
|
|
||||||
|
def _get_public_key(self, binary):
|
||||||
|
try:
|
||||||
|
return crypto.dump_publickey(
|
||||||
|
crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM,
|
||||||
|
self.csr.get_pubkey()
|
||||||
|
)
|
||||||
|
except AttributeError:
|
||||||
|
try:
|
||||||
|
bio = crypto._new_mem_buf()
|
||||||
|
if binary:
|
||||||
|
rc = crypto._lib.i2d_PUBKEY_bio(bio, self.csr.get_pubkey()._pkey)
|
||||||
|
else:
|
||||||
|
rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.csr.get_pubkey()._pkey)
|
||||||
|
if rc != 1:
|
||||||
|
crypto._raise_current_error()
|
||||||
|
return crypto._bio_to_string(bio)
|
||||||
|
except AttributeError:
|
||||||
|
self.module.warn('Your pyOpenSSL version does not support dumping public keys. '
|
||||||
|
'Please upgrade to version 16.0 or newer, or use the cryptography backend.')
|
||||||
|
|
||||||
|
def _get_subject_key_identifier(self):
|
||||||
|
# Won't be implemented
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_authority_key_identifier(self):
|
||||||
|
# Won't be implemented
|
||||||
|
return None, None, None
|
||||||
|
|
||||||
|
def _get_all_extensions(self):
|
||||||
|
return pyopenssl_get_extensions_from_csr(self.csr)
|
||||||
|
|
||||||
|
def _is_signature_valid(self):
|
||||||
|
try:
|
||||||
|
return bool(self.csr.verify(self.csr.get_pubkey()))
|
||||||
|
except crypto.Error:
|
||||||
|
# OpenSSL error means that key is not consistent
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_csr_info(module, backend, content, validate_signature=True):
|
||||||
|
if backend == 'cryptography':
|
||||||
|
info = CSRInfoRetrievalCryptography(module, content, validate_signature=validate_signature)
|
||||||
|
elif backend == 'pyopenssl':
|
||||||
|
info = CSRInfoRetrievalPyOpenSSL(module, content, validate_signature=validate_signature)
|
||||||
|
return info.get_info()
|
||||||
|
|
||||||
|
|
||||||
|
def select_backend(module, backend, content, validate_signature=True):
|
||||||
|
if backend == 'auto':
|
||||||
|
# Detection what is possible
|
||||||
|
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||||
|
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
|
||||||
|
|
||||||
|
# First try cryptography, then pyOpenSSL
|
||||||
|
if can_use_cryptography:
|
||||||
|
backend = 'cryptography'
|
||||||
|
elif can_use_pyopenssl:
|
||||||
|
backend = 'pyopenssl'
|
||||||
|
|
||||||
|
# Success?
|
||||||
|
if backend == 'auto':
|
||||||
|
module.fail_json(msg=("Can't detect any of the required Python libraries "
|
||||||
|
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
|
||||||
|
MINIMAL_CRYPTOGRAPHY_VERSION,
|
||||||
|
MINIMAL_PYOPENSSL_VERSION))
|
||||||
|
|
||||||
|
if backend == 'pyopenssl':
|
||||||
|
if not PYOPENSSL_FOUND:
|
||||||
|
module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
|
||||||
|
exception=PYOPENSSL_IMP_ERR)
|
||||||
|
try:
|
||||||
|
getattr(crypto.X509Req, 'get_extensions')
|
||||||
|
except AttributeError:
|
||||||
|
module.fail_json(msg='You need to have PyOpenSSL>=0.15 to generate CSRs')
|
||||||
|
|
||||||
|
module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
|
||||||
|
version='2.0.0', collection_name='community.crypto')
|
||||||
|
return backend, CSRInfoRetrievalPyOpenSSL(module, content, validate_signature=validate_signature)
|
||||||
|
elif backend == 'cryptography':
|
||||||
|
if not CRYPTOGRAPHY_FOUND:
|
||||||
|
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
|
||||||
|
exception=CRYPTOGRAPHY_IMP_ERR)
|
||||||
|
return backend, CSRInfoRetrievalCryptography(module, content, validate_signature=validate_signature)
|
||||||
|
else:
|
||||||
|
raise ValueError('Unsupported value for backend: {0}'.format(backend))
|
|
@ -225,428 +225,17 @@ authority_cert_serial_number:
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
import abc
|
from ansible.module_utils.basic import AnsibleModule
|
||||||
import binascii
|
from ansible.module_utils._text import to_native
|
||||||
import os
|
|
||||||
import traceback
|
|
||||||
|
|
||||||
from distutils.version import LooseVersion
|
|
||||||
|
|
||||||
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
|
|
||||||
from ansible.module_utils._text import to_native, to_text, to_bytes
|
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
||||||
OpenSSLObjectError,
|
OpenSSLObjectError,
|
||||||
)
|
)
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.csr_info import (
|
||||||
OpenSSLObject,
|
select_backend,
|
||||||
load_certificate_request,
|
|
||||||
get_fingerprint_of_bytes,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
|
||||||
cryptography_decode_name,
|
|
||||||
cryptography_get_extensions_from_csr,
|
|
||||||
cryptography_oid_to_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.pyopenssl_support import (
|
|
||||||
pyopenssl_get_extensions_from_csr,
|
|
||||||
pyopenssl_normalize_name,
|
|
||||||
pyopenssl_normalize_name_attribute,
|
|
||||||
pyopenssl_parse_name_constraints,
|
|
||||||
)
|
|
||||||
|
|
||||||
MINIMAL_CRYPTOGRAPHY_VERSION = '1.3'
|
|
||||||
MINIMAL_PYOPENSSL_VERSION = '0.15'
|
|
||||||
|
|
||||||
PYOPENSSL_IMP_ERR = None
|
|
||||||
try:
|
|
||||||
import OpenSSL
|
|
||||||
from OpenSSL import crypto
|
|
||||||
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
|
|
||||||
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
|
|
||||||
# OpenSSL 1.1.0 or newer
|
|
||||||
OPENSSL_MUST_STAPLE_NAME = b"tlsfeature"
|
|
||||||
OPENSSL_MUST_STAPLE_VALUE = b"status_request"
|
|
||||||
else:
|
|
||||||
# OpenSSL 1.0.x or older
|
|
||||||
OPENSSL_MUST_STAPLE_NAME = b"1.3.6.1.5.5.7.1.24"
|
|
||||||
OPENSSL_MUST_STAPLE_VALUE = b"DER:30:03:02:01:05"
|
|
||||||
except ImportError:
|
|
||||||
PYOPENSSL_IMP_ERR = traceback.format_exc()
|
|
||||||
PYOPENSSL_FOUND = False
|
|
||||||
else:
|
|
||||||
PYOPENSSL_FOUND = True
|
|
||||||
|
|
||||||
CRYPTOGRAPHY_IMP_ERR = None
|
|
||||||
try:
|
|
||||||
import cryptography
|
|
||||||
from cryptography import x509
|
|
||||||
from cryptography.hazmat.primitives import serialization
|
|
||||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
|
||||||
except ImportError:
|
|
||||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
|
||||||
CRYPTOGRAPHY_FOUND = False
|
|
||||||
else:
|
|
||||||
CRYPTOGRAPHY_FOUND = True
|
|
||||||
|
|
||||||
|
|
||||||
TIMESTAMP_FORMAT = "%Y%m%d%H%M%SZ"
|
|
||||||
|
|
||||||
|
|
||||||
class CertificateSigningRequestInfo(OpenSSLObject):
|
|
||||||
def __init__(self, module, backend):
|
|
||||||
super(CertificateSigningRequestInfo, self).__init__(
|
|
||||||
module.params['path'] or '',
|
|
||||||
'present',
|
|
||||||
False,
|
|
||||||
module.check_mode,
|
|
||||||
)
|
|
||||||
self.backend = backend
|
|
||||||
self.module = module
|
|
||||||
self.content = module.params['content']
|
|
||||||
if self.content is not None:
|
|
||||||
self.content = self.content.encode('utf-8')
|
|
||||||
|
|
||||||
def generate(self):
|
|
||||||
# Empty method because OpenSSLObject wants this
|
|
||||||
pass
|
|
||||||
|
|
||||||
def dump(self):
|
|
||||||
# Empty method because OpenSSLObject wants this
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_subject_ordered(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_key_usage(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_extended_key_usage(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_basic_constraints(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_ocsp_must_staple(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_subject_alt_name(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_name_constraints(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_public_key(self, binary):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_subject_key_identifier(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_authority_key_identifier(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_all_extensions(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _is_signature_valid(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_info(self):
|
|
||||||
result = dict()
|
|
||||||
self.csr = load_certificate_request(self.path, content=self.content, backend=self.backend)
|
|
||||||
|
|
||||||
subject = self._get_subject_ordered()
|
|
||||||
result['subject'] = dict()
|
|
||||||
for k, v in subject:
|
|
||||||
result['subject'][k] = v
|
|
||||||
result['subject_ordered'] = subject
|
|
||||||
result['key_usage'], result['key_usage_critical'] = self._get_key_usage()
|
|
||||||
result['extended_key_usage'], result['extended_key_usage_critical'] = self._get_extended_key_usage()
|
|
||||||
result['basic_constraints'], result['basic_constraints_critical'] = self._get_basic_constraints()
|
|
||||||
result['ocsp_must_staple'], result['ocsp_must_staple_critical'] = self._get_ocsp_must_staple()
|
|
||||||
result['subject_alt_name'], result['subject_alt_name_critical'] = self._get_subject_alt_name()
|
|
||||||
(
|
|
||||||
result['name_constraints_permitted'],
|
|
||||||
result['name_constraints_excluded'],
|
|
||||||
result['name_constraints_critical'],
|
|
||||||
) = self._get_name_constraints()
|
|
||||||
|
|
||||||
result['public_key'] = self._get_public_key(binary=False)
|
|
||||||
pk = self._get_public_key(binary=True)
|
|
||||||
result['public_key_fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict()
|
|
||||||
|
|
||||||
if self.backend != 'pyopenssl':
|
|
||||||
ski = self._get_subject_key_identifier()
|
|
||||||
if ski is not None:
|
|
||||||
ski = to_native(binascii.hexlify(ski))
|
|
||||||
ski = ':'.join([ski[i:i + 2] for i in range(0, len(ski), 2)])
|
|
||||||
result['subject_key_identifier'] = ski
|
|
||||||
|
|
||||||
aki, aci, acsn = self._get_authority_key_identifier()
|
|
||||||
if aki is not None:
|
|
||||||
aki = to_native(binascii.hexlify(aki))
|
|
||||||
aki = ':'.join([aki[i:i + 2] for i in range(0, len(aki), 2)])
|
|
||||||
result['authority_key_identifier'] = aki
|
|
||||||
result['authority_cert_issuer'] = aci
|
|
||||||
result['authority_cert_serial_number'] = acsn
|
|
||||||
|
|
||||||
result['extensions_by_oid'] = self._get_all_extensions()
|
|
||||||
|
|
||||||
result['signature_valid'] = self._is_signature_valid()
|
|
||||||
if not result['signature_valid']:
|
|
||||||
self.module.fail_json(
|
|
||||||
msg='CSR signature is invalid!',
|
|
||||||
**result
|
|
||||||
)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
class CertificateSigningRequestInfoCryptography(CertificateSigningRequestInfo):
|
|
||||||
"""Validate the supplied CSR, using the cryptography backend"""
|
|
||||||
def __init__(self, module):
|
|
||||||
super(CertificateSigningRequestInfoCryptography, self).__init__(module, 'cryptography')
|
|
||||||
|
|
||||||
def _get_subject_ordered(self):
|
|
||||||
result = []
|
|
||||||
for attribute in self.csr.subject:
|
|
||||||
result.append([cryptography_oid_to_name(attribute.oid), attribute.value])
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _get_key_usage(self):
|
|
||||||
try:
|
|
||||||
current_key_ext = self.csr.extensions.get_extension_for_class(x509.KeyUsage)
|
|
||||||
current_key_usage = current_key_ext.value
|
|
||||||
key_usage = dict(
|
|
||||||
digital_signature=current_key_usage.digital_signature,
|
|
||||||
content_commitment=current_key_usage.content_commitment,
|
|
||||||
key_encipherment=current_key_usage.key_encipherment,
|
|
||||||
data_encipherment=current_key_usage.data_encipherment,
|
|
||||||
key_agreement=current_key_usage.key_agreement,
|
|
||||||
key_cert_sign=current_key_usage.key_cert_sign,
|
|
||||||
crl_sign=current_key_usage.crl_sign,
|
|
||||||
encipher_only=False,
|
|
||||||
decipher_only=False,
|
|
||||||
)
|
|
||||||
if key_usage['key_agreement']:
|
|
||||||
key_usage.update(dict(
|
|
||||||
encipher_only=current_key_usage.encipher_only,
|
|
||||||
decipher_only=current_key_usage.decipher_only
|
|
||||||
))
|
|
||||||
|
|
||||||
key_usage_names = dict(
|
|
||||||
digital_signature='Digital Signature',
|
|
||||||
content_commitment='Non Repudiation',
|
|
||||||
key_encipherment='Key Encipherment',
|
|
||||||
data_encipherment='Data Encipherment',
|
|
||||||
key_agreement='Key Agreement',
|
|
||||||
key_cert_sign='Certificate Sign',
|
|
||||||
crl_sign='CRL Sign',
|
|
||||||
encipher_only='Encipher Only',
|
|
||||||
decipher_only='Decipher Only',
|
|
||||||
)
|
|
||||||
return sorted([
|
|
||||||
key_usage_names[name] for name, value in key_usage.items() if value
|
|
||||||
]), current_key_ext.critical
|
|
||||||
except cryptography.x509.ExtensionNotFound:
|
|
||||||
return None, False
|
|
||||||
|
|
||||||
def _get_extended_key_usage(self):
|
|
||||||
try:
|
|
||||||
ext_keyusage_ext = self.csr.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
|
|
||||||
return sorted([
|
|
||||||
cryptography_oid_to_name(eku) for eku in ext_keyusage_ext.value
|
|
||||||
]), ext_keyusage_ext.critical
|
|
||||||
except cryptography.x509.ExtensionNotFound:
|
|
||||||
return None, False
|
|
||||||
|
|
||||||
def _get_basic_constraints(self):
|
|
||||||
try:
|
|
||||||
ext_keyusage_ext = self.csr.extensions.get_extension_for_class(x509.BasicConstraints)
|
|
||||||
result = []
|
|
||||||
result.append('CA:{0}'.format('TRUE' if ext_keyusage_ext.value.ca else 'FALSE'))
|
|
||||||
if ext_keyusage_ext.value.path_length is not None:
|
|
||||||
result.append('pathlen:{0}'.format(ext_keyusage_ext.value.path_length))
|
|
||||||
return sorted(result), ext_keyusage_ext.critical
|
|
||||||
except cryptography.x509.ExtensionNotFound:
|
|
||||||
return None, False
|
|
||||||
|
|
||||||
def _get_ocsp_must_staple(self):
|
|
||||||
try:
|
|
||||||
try:
|
|
||||||
# This only works with cryptography >= 2.1
|
|
||||||
tlsfeature_ext = self.csr.extensions.get_extension_for_class(x509.TLSFeature)
|
|
||||||
value = cryptography.x509.TLSFeatureType.status_request in tlsfeature_ext.value
|
|
||||||
except AttributeError as dummy:
|
|
||||||
# Fallback for cryptography < 2.1
|
|
||||||
oid = x509.oid.ObjectIdentifier("1.3.6.1.5.5.7.1.24")
|
|
||||||
tlsfeature_ext = self.csr.extensions.get_extension_for_oid(oid)
|
|
||||||
value = tlsfeature_ext.value.value == b"\x30\x03\x02\x01\x05"
|
|
||||||
return value, tlsfeature_ext.critical
|
|
||||||
except cryptography.x509.ExtensionNotFound:
|
|
||||||
return None, False
|
|
||||||
|
|
||||||
def _get_subject_alt_name(self):
|
|
||||||
try:
|
|
||||||
san_ext = self.csr.extensions.get_extension_for_class(x509.SubjectAlternativeName)
|
|
||||||
result = [cryptography_decode_name(san) for san in san_ext.value]
|
|
||||||
return result, san_ext.critical
|
|
||||||
except cryptography.x509.ExtensionNotFound:
|
|
||||||
return None, False
|
|
||||||
|
|
||||||
def _get_name_constraints(self):
|
|
||||||
try:
|
|
||||||
nc_ext = self.csr.extensions.get_extension_for_class(x509.NameConstraints)
|
|
||||||
permitted = [cryptography_decode_name(san) for san in nc_ext.value.permitted_subtrees or []]
|
|
||||||
excluded = [cryptography_decode_name(san) for san in nc_ext.value.excluded_subtrees or []]
|
|
||||||
return permitted, excluded, nc_ext.critical
|
|
||||||
except cryptography.x509.ExtensionNotFound:
|
|
||||||
return None, None, False
|
|
||||||
|
|
||||||
def _get_public_key(self, binary):
|
|
||||||
return self.csr.public_key().public_bytes(
|
|
||||||
serialization.Encoding.DER if binary else serialization.Encoding.PEM,
|
|
||||||
serialization.PublicFormat.SubjectPublicKeyInfo
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_subject_key_identifier(self):
|
|
||||||
try:
|
|
||||||
ext = self.csr.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
|
|
||||||
return ext.value.digest
|
|
||||||
except cryptography.x509.ExtensionNotFound:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _get_authority_key_identifier(self):
|
|
||||||
try:
|
|
||||||
ext = self.csr.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
|
|
||||||
issuer = None
|
|
||||||
if ext.value.authority_cert_issuer is not None:
|
|
||||||
issuer = [cryptography_decode_name(san) for san in ext.value.authority_cert_issuer]
|
|
||||||
return ext.value.key_identifier, issuer, ext.value.authority_cert_serial_number
|
|
||||||
except cryptography.x509.ExtensionNotFound:
|
|
||||||
return None, None, None
|
|
||||||
|
|
||||||
def _get_all_extensions(self):
|
|
||||||
return cryptography_get_extensions_from_csr(self.csr)
|
|
||||||
|
|
||||||
def _is_signature_valid(self):
|
|
||||||
return self.csr.is_signature_valid
|
|
||||||
|
|
||||||
|
|
||||||
class CertificateSigningRequestInfoPyOpenSSL(CertificateSigningRequestInfo):
|
|
||||||
"""validate the supplied CSR."""
|
|
||||||
|
|
||||||
def __init__(self, module):
|
|
||||||
super(CertificateSigningRequestInfoPyOpenSSL, self).__init__(module, 'pyopenssl')
|
|
||||||
|
|
||||||
def __get_name(self, name):
|
|
||||||
result = []
|
|
||||||
for sub in name.get_components():
|
|
||||||
result.append([pyopenssl_normalize_name(sub[0]), to_text(sub[1])])
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _get_subject_ordered(self):
|
|
||||||
return self.__get_name(self.csr.get_subject())
|
|
||||||
|
|
||||||
def _get_extension(self, short_name):
|
|
||||||
for extension in self.csr.get_extensions():
|
|
||||||
if extension.get_short_name() == short_name:
|
|
||||||
result = [
|
|
||||||
pyopenssl_normalize_name(usage.strip()) for usage in to_text(extension, errors='surrogate_or_strict').split(',')
|
|
||||||
]
|
|
||||||
return sorted(result), bool(extension.get_critical())
|
|
||||||
return None, False
|
|
||||||
|
|
||||||
def _get_key_usage(self):
|
|
||||||
return self._get_extension(b'keyUsage')
|
|
||||||
|
|
||||||
def _get_extended_key_usage(self):
|
|
||||||
return self._get_extension(b'extendedKeyUsage')
|
|
||||||
|
|
||||||
def _get_basic_constraints(self):
|
|
||||||
return self._get_extension(b'basicConstraints')
|
|
||||||
|
|
||||||
def _get_ocsp_must_staple(self):
|
|
||||||
extensions = self.csr.get_extensions()
|
|
||||||
oms_ext = [
|
|
||||||
ext for ext in extensions
|
|
||||||
if to_bytes(ext.get_short_name()) == OPENSSL_MUST_STAPLE_NAME and to_bytes(ext) == OPENSSL_MUST_STAPLE_VALUE
|
|
||||||
]
|
|
||||||
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER < 0x10100000:
|
|
||||||
# Older versions of libssl don't know about OCSP Must Staple
|
|
||||||
oms_ext.extend([ext for ext in extensions if ext.get_short_name() == b'UNDEF' and ext.get_data() == b'\x30\x03\x02\x01\x05'])
|
|
||||||
if oms_ext:
|
|
||||||
return True, bool(oms_ext[0].get_critical())
|
|
||||||
else:
|
|
||||||
return None, False
|
|
||||||
|
|
||||||
def _get_subject_alt_name(self):
|
|
||||||
for extension in self.csr.get_extensions():
|
|
||||||
if extension.get_short_name() == b'subjectAltName':
|
|
||||||
result = [pyopenssl_normalize_name_attribute(altname.strip()) for altname in
|
|
||||||
to_text(extension, errors='surrogate_or_strict').split(', ')]
|
|
||||||
return result, bool(extension.get_critical())
|
|
||||||
return None, False
|
|
||||||
|
|
||||||
def _get_name_constraints(self):
|
|
||||||
for extension in self.csr.get_extensions():
|
|
||||||
if extension.get_short_name() == b'nameConstraints':
|
|
||||||
permitted, excluded = pyopenssl_parse_name_constraints(extension)
|
|
||||||
return permitted, excluded, bool(extension.get_critical())
|
|
||||||
return None, None, False
|
|
||||||
|
|
||||||
def _get_public_key(self, binary):
|
|
||||||
try:
|
|
||||||
return crypto.dump_publickey(
|
|
||||||
crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM,
|
|
||||||
self.csr.get_pubkey()
|
|
||||||
)
|
|
||||||
except AttributeError:
|
|
||||||
try:
|
|
||||||
bio = crypto._new_mem_buf()
|
|
||||||
if binary:
|
|
||||||
rc = crypto._lib.i2d_PUBKEY_bio(bio, self.csr.get_pubkey()._pkey)
|
|
||||||
else:
|
|
||||||
rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.csr.get_pubkey()._pkey)
|
|
||||||
if rc != 1:
|
|
||||||
crypto._raise_current_error()
|
|
||||||
return crypto._bio_to_string(bio)
|
|
||||||
except AttributeError:
|
|
||||||
self.module.warn('Your pyOpenSSL version does not support dumping public keys. '
|
|
||||||
'Please upgrade to version 16.0 or newer, or use the cryptography backend.')
|
|
||||||
|
|
||||||
def _get_subject_key_identifier(self):
|
|
||||||
# Won't be implemented
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _get_authority_key_identifier(self):
|
|
||||||
# Won't be implemented
|
|
||||||
return None, None, None
|
|
||||||
|
|
||||||
def _get_all_extensions(self):
|
|
||||||
return pyopenssl_get_extensions_from_csr(self.csr)
|
|
||||||
|
|
||||||
def _is_signature_valid(self):
|
|
||||||
try:
|
|
||||||
return bool(self.csr.verify(self.csr.get_pubkey()))
|
|
||||||
except crypto.Error:
|
|
||||||
# OpenSSL error means that key is not consistent
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
module = AnsibleModule(
|
module = AnsibleModule(
|
||||||
|
@ -664,53 +253,19 @@ def main():
|
||||||
supports_check_mode=True,
|
supports_check_mode=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if module.params['content'] is not None:
|
||||||
|
data = module.params['content'].encode('utf-8')
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
with open(module.params['path'], 'rb') as f:
|
||||||
|
data = f.read()
|
||||||
|
except (IOError, OSError) as e:
|
||||||
|
module.fail_json(msg='Error while reading CSR file from disk: {0}'.format(e))
|
||||||
|
|
||||||
|
backend, module_backend = select_backend(module, module.params['select_crypto_backend'], data, validate_signature=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if module.params['path'] is not None:
|
result = module_backend.get_info()
|
||||||
base_dir = os.path.dirname(module.params['path']) or '.'
|
|
||||||
if not os.path.isdir(base_dir):
|
|
||||||
module.fail_json(
|
|
||||||
name=base_dir,
|
|
||||||
msg='The directory %s does not exist or the file is not a directory' % base_dir
|
|
||||||
)
|
|
||||||
|
|
||||||
backend = module.params['select_crypto_backend']
|
|
||||||
if backend == 'auto':
|
|
||||||
# Detect what backend we can use
|
|
||||||
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
|
||||||
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
|
|
||||||
|
|
||||||
# If cryptography is available we'll use it
|
|
||||||
if can_use_cryptography:
|
|
||||||
backend = 'cryptography'
|
|
||||||
elif can_use_pyopenssl:
|
|
||||||
backend = 'pyopenssl'
|
|
||||||
|
|
||||||
# Fail if no backend has been found
|
|
||||||
if backend == 'auto':
|
|
||||||
module.fail_json(msg=("Can't detect any of the required Python libraries "
|
|
||||||
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
|
|
||||||
MINIMAL_CRYPTOGRAPHY_VERSION,
|
|
||||||
MINIMAL_PYOPENSSL_VERSION))
|
|
||||||
|
|
||||||
if backend == 'pyopenssl':
|
|
||||||
if not PYOPENSSL_FOUND:
|
|
||||||
module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
|
|
||||||
exception=PYOPENSSL_IMP_ERR)
|
|
||||||
try:
|
|
||||||
getattr(crypto.X509Req, 'get_extensions')
|
|
||||||
except AttributeError:
|
|
||||||
module.fail_json(msg='You need to have PyOpenSSL>=0.15')
|
|
||||||
|
|
||||||
module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
|
|
||||||
version='2.0.0', collection_name='community.crypto')
|
|
||||||
certificate = CertificateSigningRequestInfoPyOpenSSL(module)
|
|
||||||
elif backend == 'cryptography':
|
|
||||||
if not CRYPTOGRAPHY_FOUND:
|
|
||||||
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
|
|
||||||
exception=CRYPTOGRAPHY_IMP_ERR)
|
|
||||||
certificate = CertificateSigningRequestInfoCryptography(module)
|
|
||||||
|
|
||||||
result = certificate.get_info()
|
|
||||||
module.exit_json(**result)
|
module.exit_json(**result)
|
||||||
except OpenSSLObjectError as exc:
|
except OpenSSLObjectError as exc:
|
||||||
module.fail_json(msg=to_native(exc))
|
module.fail_json(msg=to_native(exc))
|
||||||
|
|
Loading…
Reference in New Issue