Add and use CryptoBackend.get_ordered_csr_identifiers(). (#725)
parent
7e33398d5c
commit
1b75f1aa9c
|
@ -0,0 +1,7 @@
|
||||||
|
bugfixes:
|
||||||
|
- "acme_certificate - respect the order of the CNAME and SAN identifiers that are passed on when creating an ACME order
|
||||||
|
(https://github.com/ansible-collections/community.crypto/issues/723, https://github.com/ansible-collections/community.crypto/pull/725)."
|
||||||
|
deprecated_features:
|
||||||
|
- "acme.backends module utils - from community.crypto on, all implementations of ``CryptoBackend`` must override ``get_ordered_csr_identifiers()``.
|
||||||
|
The current default implementation, which simply sorts the result of ``get_csr_identifiers()``, will then be removed
|
||||||
|
(https://github.com/ansible-collections/community.crypto/pull/725)."
|
|
@ -298,31 +298,51 @@ class CryptographyBackend(CryptoBackend):
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
|
||||||
|
'''
|
||||||
|
Return a list of requested identifiers (CN and SANs) for the CSR.
|
||||||
|
Each identifier is a pair (type, identifier), where type is either
|
||||||
|
'dns' or 'ip'.
|
||||||
|
|
||||||
|
The list is deduplicated, and if a CNAME is present, it will be returned
|
||||||
|
as the first element in the result.
|
||||||
|
'''
|
||||||
|
if csr_content is None:
|
||||||
|
csr_content = read_file(csr_filename)
|
||||||
|
else:
|
||||||
|
csr_content = to_bytes(csr_content)
|
||||||
|
csr = cryptography.x509.load_pem_x509_csr(csr_content, _cryptography_backend)
|
||||||
|
|
||||||
|
identifiers = set()
|
||||||
|
result = []
|
||||||
|
|
||||||
|
def add_identifier(identifier):
|
||||||
|
if identifier in identifiers:
|
||||||
|
return
|
||||||
|
identifiers.add(identifier)
|
||||||
|
result.append(identifier)
|
||||||
|
|
||||||
|
for sub in csr.subject:
|
||||||
|
if sub.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
|
||||||
|
add_identifier(('dns', sub.value))
|
||||||
|
for extension in csr.extensions:
|
||||||
|
if extension.oid == cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
|
||||||
|
for name in extension.value:
|
||||||
|
if isinstance(name, cryptography.x509.DNSName):
|
||||||
|
add_identifier(('dns', name.value))
|
||||||
|
elif isinstance(name, cryptography.x509.IPAddress):
|
||||||
|
add_identifier(('ip', name.value.compressed))
|
||||||
|
else:
|
||||||
|
raise BackendException('Found unsupported SAN identifier {0}'.format(name))
|
||||||
|
return result
|
||||||
|
|
||||||
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
|
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
|
||||||
'''
|
'''
|
||||||
Return a set of requested identifiers (CN and SANs) for the CSR.
|
Return a set of requested identifiers (CN and SANs) for the CSR.
|
||||||
Each identifier is a pair (type, identifier), where type is either
|
Each identifier is a pair (type, identifier), where type is either
|
||||||
'dns' or 'ip'.
|
'dns' or 'ip'.
|
||||||
'''
|
'''
|
||||||
identifiers = set([])
|
return set(self.get_ordered_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))
|
||||||
if csr_content is None:
|
|
||||||
csr_content = read_file(csr_filename)
|
|
||||||
else:
|
|
||||||
csr_content = to_bytes(csr_content)
|
|
||||||
csr = cryptography.x509.load_pem_x509_csr(csr_content, _cryptography_backend)
|
|
||||||
for sub in csr.subject:
|
|
||||||
if sub.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
|
|
||||||
identifiers.add(('dns', sub.value))
|
|
||||||
for extension in csr.extensions:
|
|
||||||
if extension.oid == cryptography.x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME:
|
|
||||||
for name in extension.value:
|
|
||||||
if isinstance(name, cryptography.x509.DNSName):
|
|
||||||
identifiers.add(('dns', name.value))
|
|
||||||
elif isinstance(name, cryptography.x509.IPAddress):
|
|
||||||
identifiers.add(('ip', name.value.compressed))
|
|
||||||
else:
|
|
||||||
raise BackendException('Found unsupported SAN identifier {0}'.format(name))
|
|
||||||
return identifiers
|
|
||||||
|
|
||||||
def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
|
def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -225,11 +225,14 @@ class OpenSSLCLIBackend(CryptoBackend):
|
||||||
# We do not want to error out on something IPAddress() cannot parse
|
# We do not want to error out on something IPAddress() cannot parse
|
||||||
return ip
|
return ip
|
||||||
|
|
||||||
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
|
def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
|
||||||
'''
|
'''
|
||||||
Return a set of requested identifiers (CN and SANs) for the CSR.
|
Return a list of requested identifiers (CN and SANs) for the CSR.
|
||||||
Each identifier is a pair (type, identifier), where type is either
|
Each identifier is a pair (type, identifier), where type is either
|
||||||
'dns' or 'ip'.
|
'dns' or 'ip'.
|
||||||
|
|
||||||
|
The list is deduplicated, and if a CNAME is present, it will be returned
|
||||||
|
as the first element in the result.
|
||||||
'''
|
'''
|
||||||
filename = csr_filename
|
filename = csr_filename
|
||||||
data = None
|
data = None
|
||||||
|
@ -241,24 +244,40 @@ class OpenSSLCLIBackend(CryptoBackend):
|
||||||
dummy, out, dummy = self.module.run_command(
|
dummy, out, dummy = self.module.run_command(
|
||||||
openssl_csr_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
|
openssl_csr_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
|
||||||
|
|
||||||
identifiers = set([])
|
identifiers = set()
|
||||||
|
result = []
|
||||||
|
|
||||||
|
def add_identifier(identifier):
|
||||||
|
if identifier in identifiers:
|
||||||
|
return
|
||||||
|
identifiers.add(identifier)
|
||||||
|
result.append(identifier)
|
||||||
|
|
||||||
common_name = re.search(r"Subject:.* CN\s?=\s?([^\s,;/]+)", to_text(out, errors='surrogate_or_strict'))
|
common_name = re.search(r"Subject:.* CN\s?=\s?([^\s,;/]+)", to_text(out, errors='surrogate_or_strict'))
|
||||||
if common_name is not None:
|
if common_name is not None:
|
||||||
identifiers.add(('dns', common_name.group(1)))
|
add_identifier(('dns', common_name.group(1)))
|
||||||
subject_alt_names = re.search(
|
subject_alt_names = re.search(
|
||||||
r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n",
|
r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n",
|
||||||
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
|
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
|
||||||
if subject_alt_names is not None:
|
if subject_alt_names is not None:
|
||||||
for san in subject_alt_names.group(1).split(", "):
|
for san in subject_alt_names.group(1).split(", "):
|
||||||
if san.lower().startswith("dns:"):
|
if san.lower().startswith("dns:"):
|
||||||
identifiers.add(('dns', san[4:]))
|
add_identifier(('dns', san[4:]))
|
||||||
elif san.lower().startswith("ip:"):
|
elif san.lower().startswith("ip:"):
|
||||||
identifiers.add(('ip', self._normalize_ip(san[3:])))
|
add_identifier(('ip', self._normalize_ip(san[3:])))
|
||||||
elif san.lower().startswith("ip address:"):
|
elif san.lower().startswith("ip address:"):
|
||||||
identifiers.add(('ip', self._normalize_ip(san[11:])))
|
add_identifier(('ip', self._normalize_ip(san[11:])))
|
||||||
else:
|
else:
|
||||||
raise BackendException('Found unsupported SAN identifier "{0}"'.format(san))
|
raise BackendException('Found unsupported SAN identifier "{0}"'.format(san))
|
||||||
return identifiers
|
return result
|
||||||
|
|
||||||
|
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
|
||||||
|
'''
|
||||||
|
Return a set of requested identifiers (CN and SANs) for the CSR.
|
||||||
|
Each identifier is a pair (type, identifier), where type is either
|
||||||
|
'dns' or 'ip'.
|
||||||
|
'''
|
||||||
|
return set(self.get_ordered_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))
|
||||||
|
|
||||||
def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
|
def get_cert_days(self, cert_filename=None, cert_content=None, now=None):
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -34,6 +34,23 @@ class CryptoBackend(object):
|
||||||
def create_mac_key(self, alg, key):
|
def create_mac_key(self, alg, key):
|
||||||
'''Create a MAC key.'''
|
'''Create a MAC key.'''
|
||||||
|
|
||||||
|
def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
|
||||||
|
'''
|
||||||
|
Return a list of requested identifiers (CN and SANs) for the CSR.
|
||||||
|
Each identifier is a pair (type, identifier), where type is either
|
||||||
|
'dns' or 'ip'.
|
||||||
|
|
||||||
|
The list is deduplicated, and if a CNAME is present, it will be returned
|
||||||
|
as the first element in the result.
|
||||||
|
'''
|
||||||
|
self.module.deprecate(
|
||||||
|
"Every backend must override the get_ordered_csr_identifiers() method."
|
||||||
|
" The default implementation will be removed in 3.0.0 and this method will be marked as `abstractmethod` by then.",
|
||||||
|
version='3.0.0',
|
||||||
|
collection_name='community.crypto',
|
||||||
|
)
|
||||||
|
return sorted(self.get_csr_identifiers(csr_filename=csr_filename, csr_content=csr_content))
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
|
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -661,7 +661,7 @@ class ACMECertificateClient(object):
|
||||||
raise ModuleFailException("CSR %s not found" % (self.csr))
|
raise ModuleFailException("CSR %s not found" % (self.csr))
|
||||||
|
|
||||||
# Extract list of identifiers from CSR
|
# Extract list of identifiers from CSR
|
||||||
self.identifiers = self.client.backend.get_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content)
|
self.identifiers = self.client.backend.get_ordered_csr_identifiers(csr_filename=self.csr, csr_content=self.csr_content)
|
||||||
|
|
||||||
def is_first_step(self):
|
def is_first_step(self):
|
||||||
'''
|
'''
|
||||||
|
|
Loading…
Reference in New Issue