Revert "Revert all non-bugfixes merged since the last release."

This reverts commit 82251c2d80.
pull/755/head
Felix Fontein 2024-05-11 17:05:03 +02:00
parent 3d8c68e189
commit 00d23753ca
66 changed files with 2908 additions and 299 deletions

View File

@ -66,7 +66,9 @@ If you use the Ansible package and do not update collections independently, use
- ACME modules and plugins:
- acme_account_info module
- acme_account module
- acme_ari_info module
- acme_certificate module
- acme_certificate_deactivate_authz module
- acme_certificate_revoke module
- acme_challenge_cert_helper module
- acme_inspect module

View File

@ -0,0 +1,2 @@
deprecated_features:
- "acme documentation fragment - the default ``community.crypto.acme[.documentation]`` docs fragment is deprecated and will be removed from community.crypto 3.0.0. Replace it with both the new ``community.crypto.acme.basic`` and ``community.crypto.acme.account`` fragments (https://github.com/ansible-collections/community.crypto/pull/735)."

View File

@ -0,0 +1,2 @@
deprecated_features:
- "acme.backends module utils - the ``get_cert_information()`` method for a ACME crypto backend must be implemented from community.crypto 3.0.0 on (https://github.com/ansible-collections/community.crypto/pull/736)."

View File

@ -0,0 +1,2 @@
minor_changes:
- "acme_certificate - add ``include_renewal_cert_id`` option to allow requesting renewal of a specific certificate according to the current ACME Renewal Information specification draft (https://github.com/ansible-collections/community.crypto/pull/739)."

View File

@ -0,0 +1,2 @@
bugfixes:
- "x509_crl, x509_certificate, x509_certificate_info - when parsing absolute timestamps which omitted the second count, the first digit of the minutes was used as a one-digit minutes count, and the second digit of the minutes as a one-digit second count (https://github.com/ansible-collections/community.crypto/pull/745)."

View File

@ -0,0 +1,2 @@
deprecated_features:
- "crypto.module_backends.common module utils - the ``crypto.module_backends.common`` module utils is deprecated and will be removed from community.crypto 3.0.0. Use the improved ``argspec`` module util instead (https://github.com/ansible-collections/community.crypto/pull/749)."

View File

@ -8,6 +8,7 @@ requires_ansible: '>=2.9.10'
action_groups:
acme:
- acme_inspect
- acme_certificate_deactivate_authz
- acme_certificate_revoke
- acme_certificate
- acme_account

View File

@ -11,6 +11,9 @@ __metaclass__ = type
class ModuleDocFragment(object):
# Standard files documentation fragment
#
# NOTE: This document fragment is DEPRECATED and will be removed from community.crypto 3.0.0.
# Use both the BASIC and ACCOUNT fragments as a replacement.
DOCUMENTATION = r'''
notes:
- "If a new enough version of the C(cryptography) library
@ -137,3 +140,178 @@ options:
default: 10
version_added: 2.3.0
'''
# Basic documentation fragment without account data
BASIC = r'''
notes:
- "Although the defaults are chosen so that the module can be used with
the L(Let's Encrypt,https://letsencrypt.org/) CA, the module can in
principle be used with any CA providing an ACME endpoint, such as
L(Buypass Go SSL,https://www.buypass.com/ssl/products/acme)."
- "So far, the ACME modules have only been tested by the developers against
Let's Encrypt (staging and production), Buypass (staging and production), ZeroSSL (production),
and L(Pebble testing server,https://github.com/letsencrypt/Pebble). We have got
community feedback that they also work with Sectigo ACME Service for InCommon.
If you experience problems with another ACME server, please
L(create an issue,https://github.com/ansible-collections/community.crypto/issues/new/choose)
to help us supporting it. Feedback that an ACME server not mentioned does work
is also appreciated."
requirements:
- either openssl or L(cryptography,https://cryptography.io/) >= 1.5
- ipaddress
options:
acme_version:
description:
- "The ACME version of the endpoint."
- "Must be V(1) for the classic Let's Encrypt and Buypass ACME endpoints,
or V(2) for standardized ACME v2 endpoints."
- "The value V(1) is deprecated since community.crypto 2.0.0 and will be
removed from community.crypto 3.0.0."
required: true
type: int
choices: [ 1, 2 ]
acme_directory:
description:
- "The ACME directory to use. This is the entry point URL to access
the ACME CA server API."
- "For safety reasons the default is set to the Let's Encrypt staging
server (for the ACME v1 protocol). This will create technically correct,
but untrusted certificates."
- "For Let's Encrypt, all staging endpoints can be found here:
U(https://letsencrypt.org/docs/staging-environment/). For Buypass, all
endpoints can be found here:
U(https://community.buypass.com/t/63d4ay/buypass-go-ssl-endpoints)"
- "For B(Let's Encrypt), the production directory URL for ACME v2 is
U(https://acme-v02.api.letsencrypt.org/directory)."
- "For B(Buypass), the production directory URL for ACME v2 and v1 is
U(https://api.buypass.com/acme/directory)."
- "For B(ZeroSSL), the production directory URL for ACME v2 is
U(https://acme.zerossl.com/v2/DV90)."
- "For B(Sectigo), the production directory URL for ACME v2 is
U(https://acme-qa.secure.trust-provider.com/v2/DV)."
- The notes for this module contain a list of ACME services this module has
been tested against.
required: true
type: str
validate_certs:
description:
- Whether calls to the ACME directory will validate TLS certificates.
- "B(Warning:) Should B(only ever) be set to V(false) for testing purposes,
for example when testing against a local Pebble server."
type: bool
default: true
select_crypto_backend:
description:
- Determines which crypto backend to use.
- The default choice is V(auto), which tries to use C(cryptography) if available, and falls back to
C(openssl).
- If set to V(openssl), will try to use the C(openssl) binary.
- If set to V(cryptography), will try to use the
L(cryptography,https://cryptography.io/) library.
type: str
default: auto
choices: [ auto, cryptography, openssl ]
request_timeout:
description:
- The time Ansible should wait for a response from the ACME API.
- This timeout is applied to all HTTP(S) requests (HEAD, GET, POST).
type: int
default: 10
version_added: 2.3.0
'''
# Account data documentation fragment
ACCOUNT = r'''
notes:
- "If a new enough version of the C(cryptography) library
is available (see Requirements for details), it will be used
instead of the C(openssl) binary. This can be explicitly disabled
or enabled with the O(select_crypto_backend) option. Note that using
the C(openssl) binary will be slower and less secure, as private key
contents always have to be stored on disk (see
O(account_key_content))."
options:
account_key_src:
description:
- "Path to a file containing the ACME account RSA or Elliptic Curve
key."
- "Private keys can be created with the
M(community.crypto.openssl_privatekey) or M(community.crypto.openssl_privatekey_pipe)
modules. If the requisite (cryptography) is not available,
keys can also be created directly with the C(openssl) command line tool:
RSA keys can be created with C(openssl genrsa ...). Elliptic curve keys
can be created with C(openssl ecparam -genkey ...). Any other tool creating
private keys in PEM format can be used as well."
- "Mutually exclusive with O(account_key_content)."
- "Required if O(account_key_content) is not used."
type: path
aliases: [ account_key ]
account_key_content:
description:
- "Content of the ACME account RSA or Elliptic Curve key."
- "Mutually exclusive with O(account_key_src)."
- "Required if O(account_key_src) is not used."
- "B(Warning:) the content will be written into a temporary file, which will
be deleted by Ansible when the module completes. Since this is an
important private key it can be used to change the account key,
or to revoke your certificates without knowing their private keys
, this might not be acceptable."
- "In case C(cryptography) is used, the content is not written into a
temporary file. It can still happen that it is written to disk by
Ansible in the process of moving the module with its argument to
the node where it is executed."
type: str
account_key_passphrase:
description:
- Phassphrase to use to decode the account key.
- "B(Note:) this is not supported by the C(openssl) backend, only by the C(cryptography) backend."
type: str
version_added: 1.6.0
account_uri:
description:
- "If specified, assumes that the account URI is as given. If the
account key does not match this account, or an account with this
URI does not exist, the module fails."
type: str
'''
# No account data documentation fragment
NO_ACCOUNT = r'''
notes:
- "If a new enough version of the C(cryptography) library
is available (see Requirements for details), it will be used
instead of the C(openssl) binary. This can be explicitly disabled
or enabled with the O(select_crypto_backend) option. Note that using
the C(openssl) binary will be slower."
options: {}
'''
CERTIFICATE = r'''
options:
csr:
description:
- "File containing the CSR for the new certificate."
- "Can be created with M(community.crypto.openssl_csr)."
- "The CSR may contain multiple Subject Alternate Names, but each one
will lead to an individual challenge that must be fulfilled for the
CSR to be signed."
- "B(Note): the private key used to create the CSR B(must not) be the
account key. This is a bad idea from a security point of view, and
the CA should not accept the CSR. The ACME server should return an
error in this case."
- Precisely one of O(csr) or O(csr_content) must be specified.
type: path
csr_content:
description:
- "Content of the CSR for the new certificate."
- "Can be created with M(community.crypto.openssl_csr_pipe)."
- "The CSR may contain multiple Subject Alternate Names, but each one
will lead to an individual challenge that must be fulfilled for the
CSR to be signed."
- "B(Note): the private key used to create the CSR B(must not) be the
account key. This is a bad idea from a security point of view, and
the CA should not accept the CSR. The ACME server should return an
error in this case."
- Precisely one of O(csr) or O(csr_content) must be specified.
type: str
'''

View File

@ -21,6 +21,8 @@ from ansible.module_utils.common.text.converters import to_bytes
from ansible.module_utils.urls import fetch_url
from ansible.module_utils.six import PY3
from ansible_collections.community.crypto.plugins.module_utils.argspec import ArgumentSpec
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_openssl_cli import (
OpenSSLCLIBackend,
)
@ -42,7 +44,9 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor
)
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
compute_cert_id,
nopad_b64,
parse_retry_after,
)
try:
@ -153,6 +157,9 @@ class ACMEDirectory(object):
self.module, msg='Was not able to obtain nonce, giving up after 5 retries', info=info, response=response)
retry_count += 1
def has_renewal_info_endpoint(self):
return 'renewalInfo' in self.directory
class ACMEClient(object):
'''
@ -168,9 +175,9 @@ class ACMEClient(object):
self.backend = backend
self.version = module.params['acme_version']
# account_key path and content are mutually exclusive
self.account_key_file = module.params['account_key_src']
self.account_key_content = module.params['account_key_content']
self.account_key_passphrase = module.params['account_key_passphrase']
self.account_key_file = module.params.get('account_key_src')
self.account_key_content = module.params.get('account_key_content')
self.account_key_passphrase = module.params.get('account_key_passphrase')
# Grab account URI from module parameters.
# Make sure empty string is treated as None.
@ -383,24 +390,94 @@ class ACMEClient(object):
self.module, msg=error_msg, info=info, content=content, content_json=result if parsed_json_result else None)
return result, info
def get_renewal_info(
self,
cert_id=None,
cert_info=None,
cert_filename=None,
cert_content=None,
include_retry_after=False,
retry_after_relative_with_timezone=True,
):
if not self.directory.has_renewal_info_endpoint():
raise ModuleFailException('The ACME endpoint does not support ACME Renewal Information retrieval')
if cert_id is None:
cert_id = compute_cert_id(self.backend, cert_info=cert_info, cert_filename=cert_filename, cert_content=cert_content)
url = '{base}{cert_id}'.format(base=self.directory.directory['renewalInfo'], cert_id=cert_id)
data, info = self.get_request(url, parse_json_result=True, fail_on_error=True, get_only=True)
# Include Retry-After header if asked for
if include_retry_after and 'retry-after' in info:
try:
data['retryAfter'] = parse_retry_after(
info['retry-after'],
relative_with_timezone=retry_after_relative_with_timezone,
)
except ValueError:
pass
return data
def get_default_argspec():
'''
Provides default argument spec for the options documented in the acme doc fragment.
DEPRECATED: will be removed in community.crypto 3.0.0
'''
return dict(
account_key_src=dict(type='path', aliases=['account_key']),
account_key_content=dict(type='str', no_log=True),
account_key_passphrase=dict(type='str', no_log=True),
account_uri=dict(type='str'),
acme_directory=dict(type='str', required=True),
acme_version=dict(type='int', required=True, choices=[1, 2]),
validate_certs=dict(type='bool', default=True),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']),
request_timeout=dict(type='int', default=10),
account_key_src=dict(type='path', aliases=['account_key']),
account_key_content=dict(type='str', no_log=True),
account_key_passphrase=dict(type='str', no_log=True),
account_uri=dict(type='str'),
)
def create_default_argspec(
with_account=True,
require_account_key=True,
with_certificate=False,
):
'''
Provides default argument spec for the options documented in the acme doc fragment.
'''
result = ArgumentSpec(
argument_spec=dict(
acme_directory=dict(type='str', required=True),
acme_version=dict(type='int', required=True, choices=[1, 2]),
validate_certs=dict(type='bool', default=True),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'openssl', 'cryptography']),
request_timeout=dict(type='int', default=10),
),
)
if with_account:
result.update_argspec(
account_key_src=dict(type='path', aliases=['account_key']),
account_key_content=dict(type='str', no_log=True),
account_key_passphrase=dict(type='str', no_log=True),
account_uri=dict(type='str'),
)
if require_account_key:
result.update(required_one_of=[['account_key_src', 'account_key_content']])
result.update(mutually_exclusive=[['account_key_src', 'account_key_content']])
if with_certificate:
result.update_argspec(
csr=dict(type='path'),
csr_content=dict(type='str'),
)
result.update(
required_one_of=[['csr', 'csr_content']],
mutually_exclusive=[['csr', 'csr_content']],
)
return result
def create_backend(module, needs_acme_v2):
if not HAS_IPADDRESS:
module.fail_json(msg=missing_required_lib('ipaddress'), exception=IPADDRESS_IMPORT_ERROR)

View File

@ -11,6 +11,7 @@ __metaclass__ = type
import base64
import binascii
import datetime
import os
import traceback
@ -19,7 +20,9 @@ from ansible.module_utils.common.text.converters import to_bytes, to_native, to_
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
CryptoBackend,
_parse_acme_timestamp,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import (
@ -35,27 +38,40 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.io import re
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
OpenSSLObjectError,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import (
convert_int_to_bytes,
convert_int_to_hex,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
get_now_datetime,
ensure_utc_timezone,
parse_name_field,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
cryptography_name_to_oid,
cryptography_serial_number_of_cert,
get_not_valid_after,
get_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
extract_first_pem,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
parse_name_field,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
ensure_utc_timezone,
from_epoch_seconds,
get_epoch_seconds,
get_now_datetime,
get_relative_time_option,
UTC,
)
CRYPTOGRAPHY_MINIMAL_VERSION = '1.5'
CRYPTOGRAPHY_ERROR = None
@ -170,6 +186,32 @@ class CryptographyBackend(CryptoBackend):
def __init__(self, module):
super(CryptographyBackend, self).__init__(module)
def get_now(self):
return get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
def parse_acme_timestamp(self, timestamp_str):
return _parse_acme_timestamp(timestamp_str, with_timezone=CRYPTOGRAPHY_TIMEZONE)
def parse_module_parameter(self, value, name):
try:
return get_relative_time_option(value, name, backend='cryptography', with_timezone=CRYPTOGRAPHY_TIMEZONE)
except OpenSSLObjectError as exc:
raise BackendException(to_native(exc))
def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
start = get_epoch_seconds(timestamp_start)
end = get_epoch_seconds(timestamp_end)
return from_epoch_seconds(start + percentage * (end - start), with_timezone=CRYPTOGRAPHY_TIMEZONE)
def get_utc_datetime(self, *args, **kwargs):
kwargs_ext = dict(kwargs)
if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' not in kwargs_ext and len(args) < 8):
kwargs_ext['tzinfo'] = UTC
result = datetime.datetime(*args, **kwargs_ext)
if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' in kwargs or len(args) >= 8):
result = ensure_utc_timezone(result)
return result
def parse_key(self, key_file=None, key_content=None, passphrase=None):
'''
Parses an RSA or Elliptic Curve key file in PEM format and returns key_data.
@ -376,7 +418,7 @@ class CryptographyBackend(CryptoBackend):
raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e))
if now is None:
now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
now = self.get_now()
elif CRYPTOGRAPHY_TIMEZONE:
now = ensure_utc_timezone(now)
return (get_not_valid_after(cert) - now).days
@ -386,3 +428,44 @@ class CryptographyBackend(CryptoBackend):
Given a Criterium object, creates a ChainMatcher object.
'''
return CryptographyChainMatcher(criterium, self.module)
def get_cert_information(self, cert_filename=None, cert_content=None):
'''
Return some information on a X.509 certificate as a CertificateInformation object.
'''
if cert_filename is not None:
cert_content = read_file(cert_filename)
else:
cert_content = to_bytes(cert_content)
# Make sure we have at most one PEM. Otherwise cryptography 36.0.0 will barf.
cert_content = to_bytes(extract_first_pem(to_text(cert_content)) or '')
try:
cert = cryptography.x509.load_pem_x509_certificate(cert_content, _cryptography_backend)
except Exception as e:
if cert_filename is None:
raise BackendException('Cannot parse certificate: {0}'.format(e))
raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e))
ski = None
try:
ext = cert.extensions.get_extension_for_class(cryptography.x509.SubjectKeyIdentifier)
ski = ext.value.digest
except cryptography.x509.ExtensionNotFound:
pass
aki = None
try:
ext = cert.extensions.get_extension_for_class(cryptography.x509.AuthorityKeyIdentifier)
aki = ext.value.key_identifier
except cryptography.x509.ExtensionNotFound:
pass
return CertificateInformation(
not_valid_after=get_not_valid_after(cert),
not_valid_before=get_not_valid_before(cert),
serial_number=cryptography_serial_number_of_cert(cert),
subject_key_identifier=ski,
authority_key_identifier=aki,
)

View File

@ -20,6 +20,7 @@ import traceback
from ansible.module_utils.common.text.converters import to_native, to_text, to_bytes
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
CryptoBackend,
)
@ -30,6 +31,8 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_bytes_to_int
try:
import ipaddress
except ImportError:
@ -39,6 +42,33 @@ except ImportError:
_OPENSSL_ENVIRONMENT_UPDATE = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C', LC_CTYPE='C')
def _extract_date(out_text, name, cert_filename_suffix=""):
try:
date_str = re.search(r"\s+%s\s*:\s+(.*)" % name, out_text).group(1)
return datetime.datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z')
except AttributeError:
raise BackendException("No '{0}' date found{1}".format(name, cert_filename_suffix))
except ValueError as exc:
raise BackendException("Failed to parse '{0}' date{1}: {2}".format(name, cert_filename_suffix, exc))
def _decode_octets(octets_text):
return binascii.unhexlify(re.sub(r"(\s|:)", "", octets_text).encode("utf-8"))
def _extract_octets(out_text, name, required=True, potential_prefixes=None):
regexp = r"\s+%s:\s*\n\s+%s([A-Fa-f0-9]{2}(?::[A-Fa-f0-9]{2})*)\s*\n" % (
name,
('(?:%s)' % '|'.join(re.escape(pp) for pp in potential_prefixes)) if potential_prefixes else '',
)
match = re.search(regexp, out_text, re.MULTILINE | re.DOTALL)
if match is not None:
return _decode_octets(match.group(1))
if not required:
return None
raise BackendException("No '{0}' octet string found".format(name))
class OpenSSLCLIBackend(CryptoBackend):
def __init__(self, module, openssl_binary=None):
super(OpenSSLCLIBackend, self).__init__(module)
@ -89,10 +119,12 @@ class OpenSSLCLIBackend(CryptoBackend):
dummy, out, dummy = self.module.run_command(
openssl_keydump_cmd, check_rc=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
out_text = to_text(out, errors='surrogate_or_strict')
if account_key_type == 'rsa':
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL).groups()
pub_hex = re.search(r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent", out_text, re.MULTILINE | re.DOTALL).group(1)
pub_exp = re.search(r"\npublicExponent: ([0-9]+)", out_text, re.MULTILINE | re.DOTALL).group(1)
pub_exp = "{0:x}".format(int(pub_exp))
if len(pub_exp) % 2:
pub_exp = "0{0}".format(pub_exp)
@ -104,17 +136,19 @@ class OpenSSLCLIBackend(CryptoBackend):
'jwk': {
"kty": "RSA",
"e": nopad_b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
"n": nopad_b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
"n": nopad_b64(_decode_octets(pub_hex)),
},
'hash': 'sha256',
}
elif account_key_type == 'ec':
pub_data = re.search(
r"pub:\s*\n\s+04:([a-f0-9\:\s]+?)\nASN1 OID: (\S+)(?:\nNIST CURVE: (\S+))?",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
out_text,
re.MULTILINE | re.DOTALL,
)
if pub_data is None:
raise KeyParsingError('cannot parse elliptic curve key')
pub_hex = binascii.unhexlify(re.sub(r"(\s|:)", "", pub_data.group(1)).encode("utf-8"))
pub_hex = _decode_octets(pub_data.group(1))
asn1_oid_curve = pub_data.group(2).lower()
nist_curve = pub_data.group(3).lower() if pub_data.group(3) else None
if asn1_oid_curve == 'prime256v1' or nist_curve == 'p-256':
@ -303,13 +337,8 @@ class OpenSSLCLIBackend(CryptoBackend):
openssl_cert_cmd = [self.openssl_binary, "x509", "-in", filename, "-noout", "-text"]
dummy, out, dummy = self.module.run_command(
openssl_cert_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
try:
not_after_str = re.search(r"\s+Not After\s*:\s+(.*)", to_text(out, errors='surrogate_or_strict')).group(1)
not_after = datetime.datetime.strptime(not_after_str, '%b %d %H:%M:%S %Y %Z')
except AttributeError:
raise BackendException("No 'Not after' date found{0}".format(cert_filename_suffix))
except ValueError:
raise BackendException("Failed to parse 'Not after' date{0}".format(cert_filename_suffix))
out_text = to_text(out, errors='surrogate_or_strict')
not_after = _extract_date(out_text, 'Not After', cert_filename_suffix=cert_filename_suffix)
if now is None:
now = datetime.datetime.now()
return (not_after - now).days
@ -319,3 +348,43 @@ class OpenSSLCLIBackend(CryptoBackend):
Given a Criterium object, creates a ChainMatcher object.
'''
raise BackendException('Alternate chain matching can only be used with the "cryptography" backend.')
def get_cert_information(self, cert_filename=None, cert_content=None):
'''
Return some information on a X.509 certificate as a CertificateInformation object.
'''
filename = cert_filename
data = None
if cert_filename is not None:
cert_filename_suffix = ' in {0}'.format(cert_filename)
else:
filename = '/dev/stdin'
data = to_bytes(cert_content)
cert_filename_suffix = ''
openssl_cert_cmd = [self.openssl_binary, "x509", "-in", filename, "-noout", "-text"]
dummy, out, dummy = self.module.run_command(
openssl_cert_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
out_text = to_text(out, errors='surrogate_or_strict')
not_after = _extract_date(out_text, 'Not After', cert_filename_suffix=cert_filename_suffix)
not_before = _extract_date(out_text, 'Not Before', cert_filename_suffix=cert_filename_suffix)
sn = re.search(
r" Serial Number: ([0-9]+)",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
if sn:
serial = int(sn.group(1))
else:
serial = convert_bytes_to_int(_extract_octets(out_text, 'Serial Number', required=True))
ski = _extract_octets(out_text, 'X509v3 Subject Key Identifier', required=False)
aki = _extract_octets(out_text, 'X509v3 Authority Key Identifier', required=False, potential_prefixes=['keyid:', ''])
return CertificateInformation(
not_valid_after=not_after,
not_valid_before=not_before,
serial_number=serial,
subject_key_identifier=ski,
authority_key_identifier=aki,
)

View File

@ -9,9 +9,78 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
from collections import namedtuple
import abc
import datetime
import re
from ansible.module_utils import six
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
BackendException,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
OpenSSLObjectError,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
ensure_utc_timezone,
from_epoch_seconds,
get_epoch_seconds,
get_now_datetime,
get_relative_time_option,
remove_timezone,
)
CertificateInformation = namedtuple(
'CertificateInformation',
(
'not_valid_after',
'not_valid_before',
'serial_number',
'subject_key_identifier',
'authority_key_identifier',
),
)
_FRACTIONAL_MATCHER = re.compile(r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(|\.\d+)(Z|[+-]\d{2}:?\d{2}.*)$')
def _reduce_fractional_digits(timestamp_str):
"""
Given a RFC 3339 timestamp that includes too many digits for the fractional seconds part, reduces these to at most 6.
"""
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
m = _FRACTIONAL_MATCHER.match(timestamp_str)
if not m:
raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str))
timestamp, fractional, timezone = m.groups()
if len(fractional) > 7:
# Python does not support anything smaller than microseconds
# (Golang supports nanoseconds, Boulder often emits more fractional digits, which Python chokes on)
fractional = fractional[:7]
return '%s%s%s' % (timestamp, fractional, timezone)
def _parse_acme_timestamp(timestamp_str, with_timezone):
"""
Parses a RFC 3339 timestamp.
"""
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
timestamp_str = _reduce_fractional_digits(timestamp_str)
for format in ('%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%dT%H:%M:%S.%f%z'):
# Note that %z won't work with Python 2... https://stackoverflow.com/a/27829491
try:
result = datetime.datetime.strptime(timestamp_str, format)
except ValueError:
pass
else:
return ensure_utc_timezone(result) if with_timezone else remove_timezone(result)
raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str))
@six.add_metaclass(abc.ABCMeta)
@ -19,6 +88,30 @@ class CryptoBackend(object):
def __init__(self, module):
self.module = module
def get_now(self):
return get_now_datetime(with_timezone=False)
def parse_acme_timestamp(self, timestamp_str):
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
return _parse_acme_timestamp(timestamp_str, with_timezone=False)
def parse_module_parameter(self, value, name):
try:
return get_relative_time_option(value, name, backend='cryptography', with_timezone=False)
except OpenSSLObjectError as exc:
raise BackendException(to_native(exc))
def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
start = get_epoch_seconds(timestamp_start)
end = get_epoch_seconds(timestamp_end)
return from_epoch_seconds(start + percentage * (end - start), with_timezone=False)
def get_utc_datetime(self, *args, **kwargs):
result = datetime.datetime(*args, **kwargs)
if 'tzinfo' in kwargs or len(args) >= 8:
result = remove_timezone(result)
return result
@abc.abstractmethod
def parse_key(self, key_file=None, key_content=None, passphrase=None):
'''
@ -74,3 +167,12 @@ class CryptoBackend(object):
'''
Given a Criterium object, creates a ChainMatcher object.
'''
def get_cert_information(self, cert_filename=None, cert_content=None):
'''
Return some information on a X.509 certificate as a CertificateInformation object.
'''
# Not implementing this method in a backend is DEPRECATED and will be
# disallowed in community.crypto 3.0.0. This method will be marked as
# @abstractmethod by then.
raise BackendException('This backend does not support get_cert_information()')

View File

@ -103,7 +103,7 @@ class Challenge(object):
# https://tools.ietf.org/html/rfc8555#section-8.4
resource = '_acme-challenge'
value = nopad_b64(hashlib.sha256(to_bytes(key_authorization)).digest())
record = (resource + identifier[1:]) if identifier.startswith('*.') else '{0}.{1}'.format(resource, identifier)
record = '{0}.{1}'.format(resource, identifier[2:] if identifier.startswith('*.') else identifier)
return {
'resource': resource,
'resource_value': value,
@ -283,13 +283,21 @@ class Authorization(object):
return self.status == 'valid'
return self.wait_for_validation(client, challenge_type)
def can_deactivate(self):
'''
Deactivates this authorization.
https://community.letsencrypt.org/t/authorization-deactivation/19860/2
https://tools.ietf.org/html/rfc8555#section-7.5.2
'''
return self.status in ('valid', 'pending')
def deactivate(self, client):
'''
Deactivates this authorization.
https://community.letsencrypt.org/t/authorization-deactivation/19860/2
https://tools.ietf.org/html/rfc8555#section-7.5.2
'''
if self.status != 'valid':
if not self.can_deactivate():
return
authz_deactivate = {
'status': 'deactivated'

View File

@ -32,6 +32,7 @@ class Order(object):
self.identifiers = []
for identifier in data['identifiers']:
self.identifiers.append((identifier['type'], identifier['value']))
self.replaces_cert_id = data.get('replaces')
self.finalize_uri = data.get('finalize')
self.certificate_uri = data.get('certificate')
self.authorization_uris = data['authorizations']
@ -44,6 +45,7 @@ class Order(object):
self.status = None
self.identifiers = []
self.replaces_cert_id = None
self.finalize_uri = None
self.certificate_uri = None
self.authorization_uris = []
@ -62,7 +64,7 @@ class Order(object):
return result
@classmethod
def create(cls, client, identifiers):
def create(cls, client, identifiers, replaces_cert_id=None):
'''
Start a new certificate order (ACME v2 protocol).
https://tools.ietf.org/html/rfc8555#section-7.4
@ -76,6 +78,8 @@ class Order(object):
new_order = {
"identifiers": acme_identifiers
}
if replaces_cert_id is not None:
new_order["replaces"] = replaces_cert_id
result, info = client.send_signed_request(
client.directory['newOrder'], new_order, error_msg='Failed to start new order', expected_status_codes=[201])
return cls.from_json(client, result, info['location'])

View File

@ -10,6 +10,7 @@ __metaclass__ = type
import base64
import datetime
import re
import textwrap
import traceback
@ -19,6 +20,10 @@ from ansible.module_utils.six.moves.urllib.parse import unquote
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_int_to_bytes
from ansible_collections.community.crypto.plugins.module_utils.time import get_now_datetime
def nopad_b64(data):
return base64.urlsafe_b64encode(data).decode('utf8').replace("=", "")
@ -65,8 +70,61 @@ def pem_to_der(pem_filename=None, pem_content=None):
def process_links(info, callback):
'''
Process link header, calls callback for every link header with the URL and relation as options.
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
'''
if 'link' in info:
link = info['link']
for url, relation in re.findall(r'<([^>]+)>;\s*rel="(\w+)"', link):
callback(unquote(url), relation)
def parse_retry_after(value, relative_with_timezone=True, now=None):
'''
Parse the value of a Retry-After header and return a timestamp.
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
'''
# First try a number of seconds
try:
delta = datetime.timedelta(seconds=int(value))
if now is None:
now = get_now_datetime(relative_with_timezone)
return now + delta
except ValueError:
pass
try:
return datetime.datetime.strptime(value, '%a, %d %b %Y %H:%M:%S GMT')
except ValueError:
pass
raise ValueError('Cannot parse Retry-After header value %s' % repr(value))
def compute_cert_id(
backend,
cert_info=None,
cert_filename=None,
cert_content=None,
none_if_required_information_is_missing=False,
):
# Obtain certificate info if not provided
if cert_info is None:
cert_info = backend.get_cert_information(cert_filename=cert_filename, cert_content=cert_content)
# Convert Authority Key Identifier to string
if cert_info.authority_key_identifier is None:
if none_if_required_information_is_missing:
return None
raise ModuleFailException('Certificate has no Authority Key Identifier extension')
aki = to_native(base64.urlsafe_b64encode(cert_info.authority_key_identifier)).replace('=', '')
# Convert serial number to string
serial_bytes = convert_int_to_bytes(cert_info.serial_number)
if ord(serial_bytes[:1]) >= 128:
serial_bytes = b'\x00' + serial_bytes
serial = to_native(base64.urlsafe_b64encode(serial_bytes)).replace('=', '')
# Compose cert ID
return '{aki}.{serial}'.format(aki=aki, serial=serial)

View File

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from ansible.module_utils.basic import AnsibleModule
def _ensure_list(value):
if value is None:
return []
return list(value)
class ArgumentSpec:
def __init__(self, argument_spec=None, mutually_exclusive=None, required_together=None, required_one_of=None, required_if=None, required_by=None):
self.argument_spec = argument_spec or {}
self.mutually_exclusive = _ensure_list(mutually_exclusive)
self.required_together = _ensure_list(required_together)
self.required_one_of = _ensure_list(required_one_of)
self.required_if = _ensure_list(required_if)
self.required_by = required_by or {}
def update_argspec(self, **kwargs):
self.argument_spec.update(kwargs)
return self
def update(self, mutually_exclusive=None, required_together=None, required_one_of=None, required_if=None, required_by=None):
if mutually_exclusive:
self.mutually_exclusive.extend(mutually_exclusive)
if required_together:
self.required_together.extend(required_together)
if required_one_of:
self.required_one_of.extend(required_one_of)
if required_if:
self.required_if.extend(required_if)
if required_by:
for k, v in required_by.items():
if k in self.required_by:
v = list(self.required_by[k]) + list(v)
self.required_by[k] = v
return self
def merge(self, other):
self.update_argspec(**other.argument_spec)
self.update(
mutually_exclusive=other.mutually_exclusive,
required_together=other.required_together,
required_one_of=other.required_one_of,
required_if=other.required_if,
required_by=other.required_by,
)
return self
def create_ansible_module_helper(self, clazz, args, **kwargs):
return clazz(
*args,
argument_spec=self.argument_spec,
mutually_exclusive=self.mutually_exclusive,
required_together=self.required_together,
required_one_of=self.required_one_of,
required_if=self.required_if,
required_by=self.required_by,
**kwargs)
def create_ansible_module(self, **kwargs):
return self.create_ansible_module_helper(AnsibleModule, (), **kwargs)
__all__ = ('ArgumentSpec', )

View File

@ -110,6 +110,9 @@ if sys.version_info[0] >= 3:
def _convert_int_to_bytes(count, no):
return no.to_bytes(count, byteorder='big')
def _convert_bytes_to_int(data):
return int.from_bytes(data, byteorder='big', signed=False)
def _to_hex(no):
return hex(no)[2:]
else:
@ -122,6 +125,12 @@ else:
raise Exception('Number {1} needs more than {0} bytes!'.format(count, n))
return ('0' * (2 * count - len(h)) + h).decode('hex')
def _convert_bytes_to_int(data):
v = 0
for x in data:
v = (v << 8) | ord(x)
return v
def _to_hex(no):
return '%x' % no
@ -155,3 +164,10 @@ def convert_int_to_hex(no, digits=None):
if digits is not None and len(value) < digits:
value = '0' * (digits - len(value)) + value
return value
def convert_bytes_to_int(data):
"""
Convert a byte string to an unsigned integer in network byte order.
"""
return _convert_bytes_to_int(data)

View File

@ -15,9 +15,9 @@ import traceback
from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.argspec import ArgumentSpec
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
OpenSSLObjectError,

View File

@ -18,8 +18,6 @@ from ansible_collections.community.crypto.plugins.module_utils.ecs.api import EC
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_certificate,
get_now_datetime,
get_relative_time_option,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
@ -34,6 +32,11 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
CertificateProvider,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
get_now_datetime,
get_relative_time_option,
)
try:
from cryptography.x509.oid import NameOID
except ImportError:

View File

@ -23,7 +23,6 @@ from ansible_collections.community.crypto.plugins.module_utils.version import Lo
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_certificate,
get_fingerprint_of_bytes,
get_now_datetime,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
@ -40,6 +39,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
get_publickey_info,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
get_now_datetime,
)
MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
CRYPTOGRAPHY_IMP_ERR = None

View File

@ -22,7 +22,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.basic impo
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
load_privatekey,
load_certificate,
get_relative_time_option,
select_message_digest,
)
@ -45,6 +44,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
CertificateProvider,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
get_relative_time_option,
)
try:
import cryptography
from cryptography import x509

View File

@ -14,7 +14,6 @@ import os
from random import randrange
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
get_relative_time_option,
select_message_digest,
)
@ -35,6 +34,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
CertificateProvider,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
get_relative_time_option,
)
try:
import cryptography
from cryptography import x509

View File

@ -10,26 +10,19 @@ __metaclass__ = type
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.crypto.plugins.module_utils.argspec import ArgumentSpec as _ArgumentSpec
class ArgumentSpec:
def __init__(self, argument_spec, mutually_exclusive=None, required_together=None, required_one_of=None, required_if=None, required_by=None):
self.argument_spec = argument_spec
self.mutually_exclusive = mutually_exclusive or []
self.required_together = required_together or []
self.required_one_of = required_one_of or []
self.required_if = required_if or []
self.required_by = required_by or {}
class ArgumentSpec(_ArgumentSpec):
def create_ansible_module_helper(self, clazz, args, **kwargs):
return clazz(
*args,
argument_spec=self.argument_spec,
mutually_exclusive=self.mutually_exclusive,
required_together=self.required_together,
required_one_of=self.required_one_of,
required_if=self.required_if,
required_by=self.required_by,
**kwargs)
result = super(ArgumentSpec, self).create_ansible_module_helper(clazz, args, **kwargs)
result.deprecate(
"The crypto.module_backends.common module utils is deprecated and will be removed from community.crypto 3.0.0."
" Use the argspec module utils from community.crypto instead.",
version='3.0.0',
collection_name='community.crypto',
)
return result
def create_ansible_module(self, **kwargs):
return self.create_ansible_module_helper(AnsibleModule, (), **kwargs)
__all__ = ('AnsibleModule', 'ArgumentSpec')

View File

@ -17,6 +17,8 @@ from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_native, to_text
from ansible_collections.community.crypto.plugins.module_utils.argspec import ArgumentSpec
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
@ -49,8 +51,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
get_csr_info,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec
MINIMAL_CRYPTOGRAPHY_VERSION = '1.3'

View File

@ -17,6 +17,8 @@ from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_bytes
from ansible_collections.community.crypto.plugins.module_utils.argspec import ArgumentSpec
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
@ -42,8 +44,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
get_privatekey_info,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec
MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3'

View File

@ -15,12 +15,14 @@ from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_bytes
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.argspec import ArgumentSpec
from ansible_collections.community.crypto.plugins.module_utils.io import (
load_file,
)
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
CRYPTOGRAPHY_HAS_X25519,
CRYPTOGRAPHY_HAS_X448,
@ -37,8 +39,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import
identify_private_key_format,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec
MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3'

View File

@ -9,19 +9,25 @@ __metaclass__ = type
import abc
import datetime
import errno
import hashlib
import os
import re
from ansible.module_utils import six
from ansible.module_utils.common.text.converters import to_native, to_bytes
from ansible.module_utils.common.text.converters import to_bytes
from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
identify_pem_format,
)
from ansible_collections.community.crypto.plugins.module_utils.time import ( # noqa: F401, pylint: disable=unused-import
# These imports are for backwards compatibility
get_now_datetime,
ensure_utc_timezone,
convert_relative_to_datetime,
get_relative_time_option,
)
try:
from OpenSSL import crypto
HAS_PYOPENSSL = True
@ -279,86 +285,6 @@ def parse_ordered_name_field(input_list, name_field_name):
return result
def get_now_datetime(with_timezone):
if with_timezone:
return datetime.datetime.now(tz=datetime.timezone.utc)
return datetime.datetime.utcnow()
def ensure_utc_timezone(timestamp):
if timestamp.tzinfo is not None:
return timestamp
return timestamp.astimezone(datetime.timezone.utc)
def convert_relative_to_datetime(relative_time_string, with_timezone=False):
"""Get a datetime.datetime or None from a string in the time format described in sshd_config(5)"""
parsed_result = re.match(
r"^(?P<prefix>[+-])((?P<weeks>\d+)[wW])?((?P<days>\d+)[dD])?((?P<hours>\d+)[hH])?((?P<minutes>\d+)[mM])?((?P<seconds>\d+)[sS]?)?$",
relative_time_string)
if parsed_result is None or len(relative_time_string) == 1:
# not matched or only a single "+" or "-"
return None
offset = datetime.timedelta(0)
if parsed_result.group("weeks") is not None:
offset += datetime.timedelta(weeks=int(parsed_result.group("weeks")))
if parsed_result.group("days") is not None:
offset += datetime.timedelta(days=int(parsed_result.group("days")))
if parsed_result.group("hours") is not None:
offset += datetime.timedelta(hours=int(parsed_result.group("hours")))
if parsed_result.group("minutes") is not None:
offset += datetime.timedelta(
minutes=int(parsed_result.group("minutes")))
if parsed_result.group("seconds") is not None:
offset += datetime.timedelta(
seconds=int(parsed_result.group("seconds")))
now = get_now_datetime(with_timezone=with_timezone)
if parsed_result.group("prefix") == "+":
return now + offset
else:
return now - offset
def get_relative_time_option(input_string, input_name, backend='cryptography', with_timezone=False):
"""Return an absolute timespec if a relative timespec or an ASN1 formatted
string is provided.
The return value will be a datetime object for the cryptography backend,
and a ASN1 formatted string for the pyopenssl backend."""
result = to_native(input_string)
if result is None:
raise OpenSSLObjectError(
'The timespec "%s" for %s is not valid' %
input_string, input_name)
# Relative time
if result.startswith("+") or result.startswith("-"):
result_datetime = convert_relative_to_datetime(result, with_timezone=with_timezone)
if backend == 'pyopenssl':
return result_datetime.strftime("%Y%m%d%H%M%SZ")
elif backend == 'cryptography':
return result_datetime
# Absolute time
if backend == 'cryptography':
for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
try:
res = datetime.datetime.strptime(result, date_fmt)
except ValueError:
pass
else:
if with_timezone:
res = res.astimezone(datetime.timezone.utc)
return res
raise OpenSSLObjectError(
'The time spec "%s" for %s is invalid' %
(input_string, input_name)
)
def select_message_digest(digest_string):
digest = None
if digest_string == 'sha256':

View File

@ -31,11 +31,15 @@ from hashlib import sha256
from ansible.module_utils import six
from ansible.module_utils.common.text.converters import to_text
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
OpensshParser,
_OpensshWriter,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
add_or_remove_timezone as _add_or_remove_timezone,
convert_relative_to_datetime,
UTC as _UTC,
)
# See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD
_USER_TYPE = 1
@ -66,14 +70,8 @@ _ECDSA_CURVE_IDENTIFIERS_LOOKUP = {
_USE_TIMEZONE = sys.version_info >= (3, 6)
def _ensure_utc_timezone_if_use_timezone(value):
if not _USE_TIMEZONE or value.tzinfo is not None:
return value
return value.astimezone(_datetime.timezone.utc)
_ALWAYS = _ensure_utc_timezone_if_use_timezone(datetime(1970, 1, 1))
_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _datetime.timezone.utc) if _USE_TIMEZONE else datetime.max
_ALWAYS = _add_or_remove_timezone(datetime(1970, 1, 1), with_timezone=_USE_TIMEZONE)
_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _UTC) if _USE_TIMEZONE else datetime.max
_CRITICAL_OPTIONS = (
'force-command',
@ -198,7 +196,7 @@ class OpensshCertificateTimeParameters(object):
else:
for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
result = _ensure_utc_timezone_if_use_timezone(datetime.strptime(time_string, time_format))
result = _add_or_remove_timezone(datetime.strptime(time_string, time_format), with_timezone=_USE_TIMEZONE)
except ValueError:
pass
if result is None:

View File

@ -0,0 +1,171 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import datetime
import re
import sys
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
OpenSSLObjectError,
)
try:
UTC = datetime.timezone.utc
except AttributeError:
_DURATION_ZERO = datetime.timedelta(0)
class _UTCClass(datetime.tzinfo):
def utcoffset(self, dt):
return _DURATION_ZERO
def dst(self, dt):
return _DURATION_ZERO
def tzname(self, dt):
return 'UTC'
def fromutc(self, dt):
return dt
def __repr__(self):
return 'UTC'
UTC = _UTCClass()
def get_now_datetime(with_timezone):
if with_timezone:
return datetime.datetime.now(tz=UTC)
return datetime.datetime.utcnow()
def ensure_utc_timezone(timestamp):
if timestamp.tzinfo is UTC:
return timestamp
if timestamp.tzinfo is None:
# We assume that naive datetime objects use timezone UTC!
return timestamp.replace(tzinfo=UTC)
return timestamp.astimezone(UTC)
def remove_timezone(timestamp):
# Convert to native datetime object
if timestamp.tzinfo is None:
return timestamp
if timestamp.tzinfo is not UTC:
timestamp = timestamp.astimezone(UTC)
return timestamp.replace(tzinfo=None)
def add_or_remove_timezone(timestamp, with_timezone):
return ensure_utc_timezone(timestamp) if with_timezone else remove_timezone(timestamp)
if sys.version_info < (3, 3):
def get_epoch_seconds(timestamp):
epoch = datetime.datetime(1970, 1, 1, tzinfo=UTC if timestamp.tzinfo is not None else None)
delta = timestamp - epoch
try:
return delta.total_seconds()
except AttributeError:
# Python 2.6 and earlier: total_seconds() does not yet exist, so we use the formula from
# https://docs.python.org/2/library/datetime.html#datetime.timedelta.total_seconds
return (delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10**6) / 10**6
else:
def get_epoch_seconds(timestamp):
return timestamp.timestamp()
def from_epoch_seconds(timestamp, with_timezone):
if with_timezone:
return datetime.datetime.fromtimestamp(timestamp, UTC)
return datetime.datetime.utcfromtimestamp(timestamp)
def convert_relative_to_datetime(relative_time_string, with_timezone=False, now=None):
"""Get a datetime.datetime or None from a string in the time format described in sshd_config(5)"""
parsed_result = re.match(
r"^(?P<prefix>[+-])((?P<weeks>\d+)[wW])?((?P<days>\d+)[dD])?((?P<hours>\d+)[hH])?((?P<minutes>\d+)[mM])?((?P<seconds>\d+)[sS]?)?$",
relative_time_string)
if parsed_result is None or len(relative_time_string) == 1:
# not matched or only a single "+" or "-"
return None
offset = datetime.timedelta(0)
if parsed_result.group("weeks") is not None:
offset += datetime.timedelta(weeks=int(parsed_result.group("weeks")))
if parsed_result.group("days") is not None:
offset += datetime.timedelta(days=int(parsed_result.group("days")))
if parsed_result.group("hours") is not None:
offset += datetime.timedelta(hours=int(parsed_result.group("hours")))
if parsed_result.group("minutes") is not None:
offset += datetime.timedelta(
minutes=int(parsed_result.group("minutes")))
if parsed_result.group("seconds") is not None:
offset += datetime.timedelta(
seconds=int(parsed_result.group("seconds")))
if now is None:
now = get_now_datetime(with_timezone=with_timezone)
else:
now = add_or_remove_timezone(now, with_timezone=with_timezone)
if parsed_result.group("prefix") == "+":
return now + offset
else:
return now - offset
def get_relative_time_option(input_string, input_name, backend='cryptography', with_timezone=False, now=None):
"""Return an absolute timespec if a relative timespec or an ASN1 formatted
string is provided.
The return value will be a datetime object for the cryptography backend,
and a ASN1 formatted string for the pyopenssl backend."""
result = to_native(input_string)
if result is None:
raise OpenSSLObjectError(
'The timespec "%s" for %s is not valid' %
input_string, input_name)
# Relative time
if result.startswith("+") or result.startswith("-"):
result_datetime = convert_relative_to_datetime(result, with_timezone=with_timezone, now=now)
if backend == 'pyopenssl':
return result_datetime.strftime("%Y%m%d%H%M%SZ")
elif backend == 'cryptography':
return result_datetime
# Absolute time
if backend == 'pyopenssl':
return input_string
elif backend == 'cryptography':
for date_fmt, length in [
('%Y%m%d%H%M%SZ', 15), # this also parses '202401020304Z', but as datetime(2024, 1, 2, 3, 0, 4)
('%Y%m%d%H%MZ', 13),
('%Y%m%d%H%M%S%z', 14 + 5), # this also parses '202401020304+0000', but as datetime(2024, 1, 2, 3, 0, 4, tzinfo=...)
('%Y%m%d%H%M%z', 12 + 5),
]:
if len(result) != length:
continue
try:
res = datetime.datetime.strptime(result, date_fmt)
except ValueError:
pass
else:
return add_or_remove_timezone(res, with_timezone=with_timezone)
raise OpenSSLObjectError(
'The time spec "%s" for %s is invalid' %
(input_string, input_name)
)

View File

@ -37,7 +37,8 @@ seealso:
- module: community.crypto.acme_inspect
description: Allows to debug problems.
extends_documentation_fragment:
- community.crypto.acme
- community.crypto.acme.basic
- community.crypto.acme.account
- community.crypto.attributes
- community.crypto.attributes.actiongroup_acme
attributes:
@ -169,11 +170,9 @@ account_uri:
import base64
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.crypto.plugins.module_utils.acme.acme import (
create_backend,
get_default_argspec,
create_default_argspec,
ACMEClient,
)
@ -188,8 +187,8 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor
def main():
argument_spec = get_default_argspec()
argument_spec.update(dict(
argument_spec = create_default_argspec()
argument_spec.update_argspec(
terms_agreed=dict(type='bool', default=False),
state=dict(type='str', required=True, choices=['absent', 'present', 'changed_key']),
allow_creation=dict(type='bool', default=True),
@ -202,14 +201,9 @@ def main():
alg=dict(type='str', required=True, choices=['HS256', 'HS384', 'HS512']),
key=dict(type='str', required=True, no_log=True),
))
))
module = AnsibleModule(
argument_spec=argument_spec,
required_one_of=(
['account_key_src', 'account_key_content'],
),
)
argument_spec.update(
mutually_exclusive=(
['account_key_src', 'account_key_content'],
['new_account_key_src', 'new_account_key_content'],
),
required_if=(
@ -217,8 +211,8 @@ def main():
# new_account_key_src and new_account_key_content are specified
['state', 'changed_key', ['new_account_key_src', 'new_account_key_content'], True],
),
supports_check_mode=True,
)
module = argument_spec.create_ansible_module(supports_check_mode=True)
backend = create_backend(module, True)
if module.params['external_account_binding']:

View File

@ -25,7 +25,8 @@ notes:
- "This module was called C(acme_account_facts) before Ansible 2.8. The usage
did not change."
extends_documentation_fragment:
- community.crypto.acme
- community.crypto.acme.basic
- community.crypto.acme.account
- community.crypto.attributes
- community.crypto.attributes.actiongroup_acme
- community.crypto.attributes.info_module
@ -213,11 +214,9 @@ order_uris:
version_added: 1.5.0
'''
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.crypto.plugins.module_utils.acme.acme import (
create_backend,
get_default_argspec,
create_default_argspec,
ACMEClient,
)
@ -270,20 +269,11 @@ def get_order(client, order_url):
def main():
argument_spec = get_default_argspec()
argument_spec.update(dict(
argument_spec = create_default_argspec()
argument_spec.update_argspec(
retrieve_orders=dict(type='str', default='ignore', choices=['ignore', 'url_list', 'object_list']),
))
module = AnsibleModule(
argument_spec=argument_spec,
required_one_of=(
['account_key_src', 'account_key_content'],
),
mutually_exclusive=(
['account_key_src', 'account_key_content'],
),
supports_check_mode=True,
)
module = argument_spec.create_ansible_module(supports_check_mode=True)
backend = create_backend(module, True)
try:

View File

@ -0,0 +1,142 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2018 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
---
module: acme_ari_info
author: "Felix Fontein (@felixfontein)"
version_added: 2.20.0
short_description: Retrieves ACME Renewal Information (ARI) for a certificate
description:
- "Allows to retrieve renewal information on a certificate obtained with the
L(ACME protocol,https://tools.ietf.org/html/rfc8555)."
- "This module only works with the ACME v2 protocol, and requires the ACME server
to support the ARI extension (U(https://datatracker.ietf.org/doc/draft-ietf-acme-ari/)).
This module implements version 3 of the ARI draft."
extends_documentation_fragment:
- community.crypto.acme.basic
- community.crypto.acme.no_account
- community.crypto.attributes
- community.crypto.attributes.info_module
options:
certificate_path:
description:
- A path to the X.509 certificate to request information for.
- Exactly one of O(certificate_path) and O(certificate_content) must be provided.
type: path
certificate_content:
description:
- The content of the X.509 certificate to request information for.
- Exactly one of O(certificate_path) and O(certificate_content) must be provided.
type: str
seealso:
- module: community.crypto.acme_certificate
description: Allows to obtain a certificate using the ACME protocol
- module: community.crypto.acme_certificate_revoke
description: Allows to revoke a certificate using the ACME protocol
'''
EXAMPLES = '''
- name: Retrieve renewal information for a certificate
community.crypto.acme_ari_info:
certificate_path: /etc/httpd/ssl/sample.com.crt
register: cert_data
- name: Show the certificate renewal information
ansible.builtin.debug:
var: cert_data.renewal_info
'''
RETURN = '''
renewal_info:
description: The ARI renewal info object (U(https://www.ietf.org/archive/id/draft-ietf-acme-ari-03.html#section-4.2)).
returned: success
type: dict
contains:
suggestedWindow:
description:
- Describes the window during which the certificate should be renewed.
type: dict
returned: always
contains:
start:
description:
- The start of the window during which the certificate should be renewed.
- The format is specified in L(RFC 3339,https://www.rfc-editor.org/info/rfc3339).
returned: always
type: str
sample: '2021-01-03T00:00:00Z'
end:
description:
- The end of the window during which the certificate should be renewed.
- The format is specified in L(RFC 3339,https://www.rfc-editor.org/info/rfc3339).
returned: always
type: str
sample: '2021-01-03T00:00:00Z'
explanationURL:
description:
- A URL pointing to a page which may explain why the suggested renewal window is what it is.
- For example, it may be a page explaining the CA's dynamic load-balancing strategy, or a
page documenting which certificates are affected by a mass revocation event. Should be shown
to the user.
returned: depends on the ACME server
type: str
sample: https://example.com/docs/ari
retryAfter:
description:
- A timestamp before the next retry to ask for this information should not be made.
returned: depends on the ACME server
type: str
sample: '2024-04-29T01:17:10.236921+00:00'
'''
from ansible_collections.community.crypto.plugins.module_utils.acme.acme import (
create_backend,
create_default_argspec,
ACMEClient,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException
def main():
argument_spec = create_default_argspec(with_account=False)
argument_spec.update_argspec(
certificate_path=dict(type='path'),
certificate_content=dict(type='str'),
)
argument_spec.update(
required_one_of=(
['certificate_path', 'certificate_content'],
),
mutually_exclusive=(
['certificate_path', 'certificate_content'],
),
)
module = argument_spec.create_ansible_module(supports_check_mode=True)
backend = create_backend(module, True)
try:
client = ACMEClient(module, backend)
if not client.directory.has_renewal_info_endpoint():
module.fail_json(msg='The ACME endpoint does not support ACME Renewal Information retrieval')
renewal_info = client.get_renewal_info(
cert_filename=module.params['certificate_path'],
cert_content=module.params['certificate_content'],
include_retry_after=True,
)
module.exit_json(renewal_info=renewal_info)
except ModuleFailException as e:
e.do_fail(module)
if __name__ == '__main__':
main()

View File

@ -58,7 +58,7 @@ seealso:
link: https://tools.ietf.org/html/rfc8555
- name: ACME TLS ALPN Challenge Extension
description: The specification of the V(tls-alpn-01) challenge (RFC 8737).
link: https://www.rfc-editor.org/rfc/rfc8737.html-05
link: https://www.rfc-editor.org/rfc/rfc8737.html
- module: community.crypto.acme_challenge_cert_helper
description: Helps preparing V(tls-alpn-01) challenges.
- module: community.crypto.openssl_privatekey
@ -77,8 +77,12 @@ seealso:
description: Allows to create, modify or delete an ACME account.
- module: community.crypto.acme_inspect
description: Allows to debug problems.
- module: community.crypto.acme_certificate_deactivate_authz
description: Allows to deactivate (invalidate) ACME v2 orders.
extends_documentation_fragment:
- community.crypto.acme
- community.crypto.acme.basic
- community.crypto.acme.account
- community.crypto.acme.certificate
- community.crypto.attributes
- community.crypto.attributes.files
- community.crypto.attributes.actiongroup_acme
@ -138,32 +142,8 @@ options:
- 'tls-alpn-01'
- 'no challenge'
csr:
description:
- "File containing the CSR for the new certificate."
- "Can be created with M(community.crypto.openssl_csr) or C(openssl req ...)."
- "The CSR may contain multiple Subject Alternate Names, but each one
will lead to an individual challenge that must be fulfilled for the
CSR to be signed."
- "I(Note): the private key used to create the CSR I(must not) be the
account key. This is a bad idea from a security point of view, and
the CA should not accept the CSR. The ACME server should return an
error in this case."
- Precisely one of O(csr) or O(csr_content) must be specified.
type: path
aliases: ['src']
csr_content:
description:
- "Content of the CSR for the new certificate."
- "Can be created with M(community.crypto.openssl_csr_pipe) or C(openssl req ...)."
- "The CSR may contain multiple Subject Alternate Names, but each one
will lead to an individual challenge that must be fulfilled for the
CSR to be signed."
- "I(Note): the private key used to create the CSR I(must not) be the
account key. This is a bad idea from a security point of view, and
the CA should not accept the CSR. The ACME server should return an
error in this case."
- Precisely one of O(csr) or O(csr_content) must be specified.
type: str
version_added: 1.2.0
data:
description:
@ -292,6 +272,32 @@ options:
- "The identifier must be of the form
V(C4:A7:B1:A4:7B:2C:71:FA:DB:E1:4B:90:75:FF:C4:15:60:85:89:10)."
type: str
include_renewal_cert_id:
description:
- Determines whether to request renewal of an existing certificate according to
L(the ACME ARI draft 3, https://www.ietf.org/archive/id/draft-ietf-acme-ari-03.html#section-5).
- This is only used when the certificate specified in O(dest) or O(fullchain_dest) already exists.
- V(never) never sends the certificate ID of the certificate to renew. V(always) will always send it.
- V(when_ari_supported) only sends the certificate ID if the ARI endpoint is found in the ACME directory.
- Generally you should use V(when_ari_supported) if you know that the ACME service supports a compatible
draft (or final version, once it is out) of the ARI extension. V(always) should never be necessary.
If you are not sure, or if you receive strange errors on invalid C(replaces) values in order objects,
use V(never), which also happens to be the default.
- ACME servers might refuse to create new orders with C(replaces) for certificates that already have an
existing order. This can happen if this module is used to create an order, and then the playbook/role
fails in case the challenges cannot be set up. If the playbook/role does not record the order data to
continue with the existing order, but tries to create a new one on the next run, creating the new order
might fail. For this reason, this option should only be set to a value different from V(never) if the
role/playbook using it keeps track of order data accross restarts, or if it takes care to deactivate
orders whose processing is aborted. Orders can be deactivated with the
M(community.crypto.acme_certificate_deactivate_authz) module.
type: str
choices:
- never
- when_ari_supported
- always
default: never
version_added: 2.20.0
'''
EXAMPLES = r'''
@ -375,7 +381,7 @@ EXAMPLES = r'''
# state: present
# wait: true
# # Note: route53 requires TXT entries to be enclosed in quotes
# value: "{{ sample_com_challenge.challenge_data['sample.com']['dns-01'].resource_value | regex_replace('^(.*)$', '\"\\1\"') }}"
# value: "{{ sample_com_challenge.challenge_data['sample.com']['dns-01'].resource_value | community.dns.quote_txt(always_quote=true) }}"
# when: sample_com_challenge is changed and 'sample.com' in sample_com_challenge.challenge_data
#
# Alternative way:
@ -390,7 +396,7 @@ EXAMPLES = r'''
# wait: true
# # Note: item.value is a list of TXT entries, and route53
# # requires every entry to be enclosed in quotes
# value: "{{ item.value | map('regex_replace', '^(.*)$', '\"\\1\"' ) | list }}"
# value: "{{ item.value | map('community.dns.quote_txt', always_quote=true) | list }}"
# loop: "{{ sample_com_challenge.challenge_data_dns | dict2items }}"
# when: sample_com_challenge is changed
@ -446,39 +452,55 @@ challenge_data:
- Per identifier / challenge type challenge data.
- Since Ansible 2.8.5, only challenges which are not yet valid are returned.
returned: changed
type: list
elements: dict
type: dict
contains:
resource:
description: The challenge resource that must be created for validation.
returned: changed
type: str
sample: .well-known/acme-challenge/evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA
resource_original:
identifier:
description:
- The original challenge resource including type identifier for V(tls-alpn-01)
challenges.
returned: changed and O(challenge) is V(tls-alpn-01)
type: str
sample: DNS:example.com
resource_value:
description:
- The value the resource has to produce for the validation.
- For V(http-01) and V(dns-01) challenges, the value can be used as-is.
- "For V(tls-alpn-01) challenges, note that this return value contains a
Base64 encoded version of the correct binary blob which has to be put
into the acmeValidation x509 extension; see
U(https://www.rfc-editor.org/rfc/rfc8737.html#section-3)
for details. To do this, you might need the P(ansible.builtin.b64decode#filter) Jinja filter
to extract the binary blob from this return value."
- For every identifier, provides a dictionary of challenge types mapping to challenge data.
- The keys in this dictionary are the identifiers. C(identifier) is a placeholder used in the documentation.
- Note that the keys are not valid Jinja2 identifiers.
returned: changed
type: str
sample: IlirfxKKXA...17Dt3juxGJ-PCt92wr-oA
record:
description: The full DNS record's name for the challenge.
returned: changed and challenge is V(dns-01)
type: str
sample: _acme-challenge.example.com
type: dict
contains:
challenge-type:
description:
- Data for every challenge type.
- The keys in this dictionary are the challenge types. C(challenge-type) is a placeholder used in the documentation.
Possible keys are V(http-01), V(dns-01), and V(tls-alpn-01).
- Note that the keys are not valid Jinja2 identifiers.
returned: changed
type: dict
contains:
resource:
description: The challenge resource that must be created for validation.
returned: changed
type: str
sample: .well-known/acme-challenge/evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA
resource_original:
description:
- The original challenge resource including type identifier for V(tls-alpn-01)
challenges.
returned: changed and O(challenge) is V(tls-alpn-01)
type: str
sample: DNS:example.com
resource_value:
description:
- The value the resource has to produce for the validation.
- For V(http-01) and V(dns-01) challenges, the value can be used as-is.
- "For V(tls-alpn-01) challenges, note that this return value contains a
Base64 encoded version of the correct binary blob which has to be put
into the acmeValidation x509 extension; see
U(https://www.rfc-editor.org/rfc/rfc8737.html#section-3)
for details. To do this, you might need the P(ansible.builtin.b64decode#filter) Jinja filter
to extract the binary blob from this return value."
returned: changed
type: str
sample: IlirfxKKXA...17Dt3juxGJ-PCt92wr-oA
record:
description: The full DNS record's name for the challenge.
returned: changed and challenge is V(dns-01)
type: str
sample: _acme-challenge.example.com
challenge_data_dns:
description:
- List of TXT values per DNS record, in case challenge is V(dns-01).
@ -547,11 +569,9 @@ all_chains:
import os
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.crypto.plugins.module_utils.acme.acme import (
create_backend,
get_default_argspec,
create_default_argspec,
ACMEClient,
)
@ -585,6 +605,7 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.orders impor
)
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
compute_cert_id,
pem_to_der,
)
@ -621,6 +642,7 @@ class ACMECertificateClient(object):
self.order_uri = self.data.get('order_uri') if self.data else None
self.all_chains = None
self.select_chain_matcher = []
self.include_renewal_cert_id = module.params['include_renewal_cert_id']
if self.module.params['select_chain']:
for criterium_idx, criterium in enumerate(self.module.params['select_chain']):
@ -678,6 +700,15 @@ class ACMECertificateClient(object):
# stored in self.order_uri by the constructor).
return self.order_uri is None
def _get_cert_info_or_none(self):
if self.module.params.get('dest'):
filename = self.module.params['dest']
else:
filename = self.module.params['fullchain_dest']
if not os.path.exists(filename):
return None
return self.client.backend.get_cert_information(cert_filename=filename)
def start_challenges(self):
'''
Create new authorizations for all identifiers of the CSR,
@ -692,7 +723,19 @@ class ACMECertificateClient(object):
authz = Authorization.create(self.client, identifier_type, identifier)
self.authorizations[authz.combined_identifier] = authz
else:
self.order = Order.create(self.client, self.identifiers)
replaces_cert_id = None
if (
self.include_renewal_cert_id == 'always' or
(self.include_renewal_cert_id == 'when_ari_supported' and self.client.directory.has_renewal_info_endpoint())
):
cert_info = self._get_cert_info_or_none()
if cert_info is not None:
replaces_cert_id = compute_cert_id(
self.client.backend,
cert_info=cert_info,
none_if_required_information_is_missing=True,
)
self.order = Order.create(self.client, self.identifiers, replaces_cert_id)
self.order_uri = self.order.url
self.order.load_authorizations(self.client)
self.authorizations.update(self.order.authorizations)
@ -854,15 +897,14 @@ class ACMECertificateClient(object):
def main():
argument_spec = get_default_argspec()
argument_spec.update(dict(
argument_spec = create_default_argspec(with_certificate=True)
argument_spec.argument_spec['csr']['aliases'] = ['src']
argument_spec.update_argspec(
modify_account=dict(type='bool', default=True),
account_email=dict(type='str'),
agreement=dict(type='str'),
terms_agreed=dict(type='bool', default=False),
challenge=dict(type='str', default='http-01', choices=['http-01', 'dns-01', 'tls-alpn-01', NO_CHALLENGE]),
csr=dict(type='path', aliases=['src']),
csr_content=dict(type='str'),
data=dict(type='dict'),
dest=dict(type='path', aliases=['cert']),
fullchain_dest=dict(type='path', aliases=['fullchain']),
@ -878,20 +920,14 @@ def main():
subject_key_identifier=dict(type='str'),
authority_key_identifier=dict(type='str'),
)),
))
module = AnsibleModule(
argument_spec=argument_spec,
required_one_of=(
['account_key_src', 'account_key_content'],
['dest', 'fullchain_dest'],
['csr', 'csr_content'],
),
mutually_exclusive=(
['account_key_src', 'account_key_content'],
['csr', 'csr_content'],
),
supports_check_mode=True,
include_renewal_cert_id=dict(type='str', choices=['never', 'when_ari_supported', 'always'], default='never'),
)
argument_spec.update(
required_one_of=[
['dest', 'fullchain_dest'],
],
)
module = argument_spec.create_ansible_module(supports_check_mode=True)
backend = create_backend(module, False)
try:

View File

@ -0,0 +1,119 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Michael Gruener <michael.gruener@chaosmoon.net>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
---
module: acme_certificate_deactivate_authz
author: "Felix Fontein (@felixfontein)"
version_added: 2.20.0
short_description: Deactivate all authz for an ACME v2 order
description:
- "Deactivate all authentication objects (authz) for an ACME v2 order,
which effectively deactivates (invalidates) the order itself."
- "Authentication objects are bound to an account key and remain valid
for a certain amount of time, and can be used to issue certificates
without having to re-authenticate the domain. This can be a security
concern."
- "Another reason to use this module is to deactivate an order whose
processing failed when using O(community.crypto.acme_certificate#module:include_renewal_cert_id)."
seealso:
- module: community.crypto.acme_certificate
extends_documentation_fragment:
- community.crypto.acme.basic
- community.crypto.acme.account
- community.crypto.attributes
- community.crypto.attributes.actiongroup_acme
attributes:
check_mode:
support: full
diff_mode:
support: none
options:
order_uri:
description:
- The ACME v2 order to deactivate.
- Can be obtained from RV(community.crypto.acme_certificate#module:order_uri).
type: str
required: true
'''
EXAMPLES = r'''
- name: Deactivate all authzs for an order
community.crypto.acme_certificate_deactivate_authz:
account_key_content: "{{ account_private_key }}"
order_uri: "{{ certificate_result.order_uri }}"
'''
RETURN = '''#'''
from ansible_collections.community.crypto.plugins.module_utils.acme.acme import (
create_backend,
create_default_argspec,
ACMEClient,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.account import (
ACMEAccount,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
ModuleFailException,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.orders import (
Order,
)
def main():
argument_spec = create_default_argspec()
argument_spec.update_argspec(
order_uri=dict(type='str', required=True),
)
module = argument_spec.create_ansible_module(supports_check_mode=True)
if module.params['acme_version'] == 1:
module.fail_json('The module does not support acme_version=1')
backend = create_backend(module, False)
try:
client = ACMEClient(module, backend)
account = ACMEAccount(client)
dummy, account_data = account.setup_account(allow_creation=False)
if account_data is None:
raise ModuleFailException(msg='Account does not exist or is deactivated.')
order = Order.from_url(client, module.params['order_uri'])
order.load_authorizations(client)
changed = False
for authz in order.authorizations.values():
if not authz.can_deactivate():
continue
changed = True
if module.check_mode:
continue
try:
authz.deactivate(client)
except Exception:
# ignore errors
pass
if authz.status != 'deactivated':
module.warn(warning='Could not deactivate authz object {0}.'.format(authz.url))
module.exit_json(changed=changed)
except ModuleFailException as e:
e.do_fail(module)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,245 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2018 Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
---
module: acme_certificate_renewal_info
author: "Felix Fontein (@felixfontein)"
version_added: 2.20.0
short_description: Determine whether a certificate should be renewed or not
description:
- Uses various information to determine whether a certificate should be renewed or not.
- If available, the ARI extension (ACME Renewal Information, U(https://datatracker.ietf.org/doc/draft-ietf-acme-ari/))
is used. This module implements version 3 of the ARI draft."
extends_documentation_fragment:
- community.crypto.acme.basic
- community.crypto.acme.no_account
- community.crypto.attributes
- community.crypto.attributes.info_module
options:
certificate_path:
description:
- A path to the X.509 certificate to determine renewal of.
- In case the certificate does not exist, the module will always return RV(should_renew=true).
- O(certificate_path) and O(certificate_content) are mutually exclusive.
type: path
certificate_content:
description:
- The content of the X.509 certificate to determine renewal of.
- O(certificate_path) and O(certificate_content) are mutually exclusive.
type: str
use_ari:
description:
- Whether to use ARI information, if available.
- Set this to V(false) if the ACME server implements ARI in a way that is incompatible with this module.
type: bool
default: true
ari_algorithm:
description:
- If ARI information is used, selects which algorithm is used to determine whether to renew now.
- V(standard) selects the L(algorithm provided in the the ARI specification,
https://www.ietf.org/archive/id/draft-ietf-acme-ari-03.html#name-renewalinfo-objects).
- V(start) returns RV(should_renew=true) once the start of the renewal interval has been reached.
type: str
choices:
- standard
- start
default: standard
remaining_days:
description:
- The number of days the certificate must have left being valid.
- For example, if O(remaining_days=20), this check causes RV(should_renew=true) if the
certificate is valid for less than 20 days.
type: int
remaining_percentage:
description:
- The percentage of the certificate's validity period that should be left.
- For example, if O(remaining_percentage=0.1), and the certificate's validity period is 90 days,
this check causes RV(should_renew=true) if the certificate is valid for less than 9 days.
- Must be a value between 0 and 1.
type: float
now:
description:
- Use this timestamp instead of the current timestamp to determine whether a certificate should be renewed.
- Time can be specified either as relative time or as absolute timestamp.
- Time will always be interpreted as UTC.
- Valid format is C([+-]timespec | ASN.1 TIME) where timespec can be an integer
+ C([w | d | h | m | s]) (for example V(+32w1d2h)).
type: str
seealso:
- module: community.crypto.acme_certificate
description: Allows to obtain a certificate using the ACME protocol
- module: community.crypto.acme_ari_info
description: Obtain renewal information for a certificate
'''
EXAMPLES = '''
- name: Retrieve renewal information for a certificate
community.crypto.acme_certificate_renewal_info:
certificate_path: /etc/httpd/ssl/sample.com.crt
register: cert_data
- name: Should the certificate be renewed?
ansible.builtin.debug:
var: cert_data.should_renew
'''
RETURN = '''
should_renew:
description:
- Whether the certificate should be renewed.
- If no certificate is provided, or the certificate is expired, will always be V(true).
returned: success
type: bool
sample: true
msg:
description:
- Information on the reason for renewal.
- Should be shown to the user, as in case of ARI triggered renewal it can contain important
information, for example on forced revocations for misissued certificates.
type: str
returned: success
sample: The certificate does not exist.
supports_ari:
description:
- Whether ARI information was used to determine renewal. This can be used to determine whether to
specify O(community.crypto.acme_certificate#module:include_renewal_cert_id=when_ari_supported)
for the M(community.crypto.acme_certificate) module.
- If O(use_ari=false), this will always be V(false).
returned: success
type: bool
sample: true
cert_id:
description:
- The certificate ID according to the L(ARI specification, https://www.ietf.org/archive/id/draft-ietf-acme-ari-03.html#section-4.1).
returned: success, the certificate exists, and has an Authority Key Identifier X.509 extension
type: str
sample: aYhba4dGQEHhs3uEe6CuLN4ByNQ.AIdlQyE
'''
import os
import random
from ansible_collections.community.crypto.plugins.module_utils.acme.acme import (
create_backend,
create_default_argspec,
ACMEClient,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import compute_cert_id
def main():
argument_spec = create_default_argspec(with_account=False)
argument_spec.update_argspec(
certificate_path=dict(type='path'),
certificate_content=dict(type='str'),
use_ari=dict(type='bool', default=True),
ari_algorithm=dict(type='str', choices=['standard', 'start'], default='standard'),
remaining_days=dict(type='int'),
remaining_percentage=dict(type='float'),
now=dict(type='str'),
)
argument_spec.update(
mutually_exclusive=(
['certificate_path', 'certificate_content'],
),
)
module = argument_spec.create_ansible_module(supports_check_mode=True)
backend = create_backend(module, True)
result = dict(
changed=False,
msg='The certificate is still valid and no condition was reached',
supports_ari=False,
)
def complete(should_renew, **kwargs):
result['should_renew'] = should_renew
result.update(kwargs)
module.exit_json(**result)
if not module.params['certificate_path'] and not module.params['certificate_content']:
complete(True, msg='No certificate was specified')
if module.params['certificate_path'] is not None and not os.path.exists(module.params['certificate_path']):
complete(True, msg='The certificate file does not exist')
try:
cert_info = backend.get_cert_information(
cert_filename=module.params['certificate_path'],
cert_content=module.params['certificate_content'],
)
cert_id = compute_cert_id(backend, cert_info=cert_info, none_if_required_information_is_missing=True)
if cert_id is not None:
result['cert_id'] = cert_id
if module.params['now']:
now = backend.parse_module_parameter(module.params['now'], 'now')
else:
now = backend.get_now()
if now >= cert_info.not_valid_after:
complete(True, msg='The certificate has already expired')
client = ACMEClient(module, backend)
if cert_id is not None and module.params['use_ari'] and client.directory.has_renewal_info_endpoint():
renewal_info = client.get_renewal_info(cert_id=cert_id)
window_start = backend.parse_acme_timestamp(renewal_info['suggestedWindow']['start'])
window_end = backend.parse_acme_timestamp(renewal_info['suggestedWindow']['end'])
msg_append = ''
if 'explanationURL' in renewal_info:
msg_append = '. Information on renewal interval: {0}'.format(renewal_info['explanationURL'])
result['supports_ari'] = True
if now > window_end:
complete(True, msg='The suggested renewal interval provided by ARI is in the past{0}'.format(msg_append))
if module.params['ari_algorithm'] == 'start':
if now > window_start:
complete(True, msg='The suggested renewal interval provided by ARI has begun{0}'.format(msg_append))
else:
random_time = backend.interpolate_timestamp(window_start, window_end, random.random())
if now > random_time:
complete(
True,
msg='The picked random renewal time {0} in sugested renewal internal provided by ARI is in the past{1}'.format(
random_time,
msg_append,
),
)
if module.params['remaining_days'] is not None:
remaining_days = (cert_info.not_valid_after - now).days
if remaining_days < module.params['remaining_days']:
complete(True, msg='The certificate expires in {0} days'.format(remaining_days))
if module.params['remaining_percentage'] is not None:
timestamp = backend.interpolate_timestamp(cert_info.not_valid_before, cert_info.not_valid_after, 1 - module.params['remaining_percentage'])
if timestamp < now:
complete(
True,
msg="The remaining percentage {0}% of the certificate's lifespan was reached on {1}".format(
module.params['remaining_percentage'] * 100,
timestamp,
),
)
complete(False)
except ModuleFailException as e:
e.do_fail(module)
if __name__ == '__main__':
main()

View File

@ -37,7 +37,8 @@ seealso:
- module: community.crypto.acme_inspect
description: Allows to debug problems.
extends_documentation_fragment:
- community.crypto.acme
- community.crypto.acme.basic
- community.crypto.acme.account
- community.crypto.attributes
- community.crypto.attributes.actiongroup_acme
attributes:
@ -127,11 +128,9 @@ EXAMPLES = '''
RETURN = '''#'''
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.crypto.plugins.module_utils.acme.acme import (
create_backend,
get_default_argspec,
create_default_argspec,
ACMEClient,
)
@ -152,24 +151,23 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.utils import
def main():
argument_spec = get_default_argspec()
argument_spec.update(dict(
argument_spec = create_default_argspec(require_account_key=False)
argument_spec.update_argspec(
private_key_src=dict(type='path'),
private_key_content=dict(type='str', no_log=True),
private_key_passphrase=dict(type='str', no_log=True),
certificate=dict(type='path', required=True),
revoke_reason=dict(type='int'),
))
module = AnsibleModule(
argument_spec=argument_spec,
)
argument_spec.update(
required_one_of=(
['account_key_src', 'account_key_content', 'private_key_src', 'private_key_content'],
),
mutually_exclusive=(
['account_key_src', 'account_key_content', 'private_key_src', 'private_key_content'],
),
supports_check_mode=False,
)
module = argument_spec.create_ansible_module()
backend = create_backend(module, False)
try:

View File

@ -165,16 +165,16 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.io import (
read_file,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
get_now_datetime,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
set_not_valid_after,
set_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
get_now_datetime,
)
CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography

View File

@ -42,7 +42,8 @@ seealso:
description: The specification of the C(tls-alpn-01) challenge (RFC 8737).
link: https://www.rfc-editor.org/rfc/rfc8737.html
extends_documentation_fragment:
- community.crypto.acme
- community.crypto.acme.basic
- community.crypto.acme.account
- community.crypto.attributes
- community.crypto.attributes.actiongroup_acme
attributes:
@ -247,12 +248,11 @@ output_json:
- ...
'''
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_native, to_bytes, to_text
from ansible_collections.community.crypto.plugins.module_utils.acme.acme import (
create_backend,
get_default_argspec,
create_default_argspec,
ACMEClient,
)
@ -263,18 +263,14 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor
def main():
argument_spec = get_default_argspec()
argument_spec.update(dict(
argument_spec = create_default_argspec(require_account_key=False)
argument_spec.update_argspec(
url=dict(type='str'),
method=dict(type='str', choices=['get', 'post', 'directory-only'], default='get'),
content=dict(type='str'),
fail_on_acme_error=dict(type='bool', default=True),
))
module = AnsibleModule(
argument_spec=argument_spec,
mutually_exclusive=(
['account_key_src', 'account_key_content'],
),
)
argument_spec.update(
required_if=(
['method', 'get', ['url']],
['method', 'post', ['url', 'content']],
@ -282,6 +278,7 @@ def main():
['method', 'post', ['account_key_src', 'account_key_content'], True],
),
)
module = argument_spec.create_ansible_module()
backend = create_backend(module, False)
result = dict()

View File

@ -220,10 +220,6 @@ from ansible.module_utils.common.text.converters import to_bytes
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
get_now_datetime,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
cryptography_oid_to_name,
@ -232,6 +228,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp
get_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
get_now_datetime,
)
MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
CREATE_DEFAULT_CONTEXT_IMP_ERR = None

View File

@ -406,10 +406,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.basic impo
OpenSSLObjectError,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
get_relative_time_option,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
)
@ -418,6 +414,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
select_backend,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
get_relative_time_option,
)
def main():
module = AnsibleModule(

View File

@ -470,7 +470,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
load_certificate,
parse_name_field,
parse_ordered_name_field,
get_relative_time_option,
select_message_digest,
)
@ -506,6 +505,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
get_crl_info,
)
from ansible_collections.community.crypto.plugins.module_utils.time import (
get_relative_time_option,
)
MINIMAL_CRYPTOGRAPHY_VERSION = '1.2'
CRYPTOGRAPHY_IMP_ERR = None

View File

@ -0,0 +1,10 @@
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
azp/generic/1
azp/posix/1
cloud/acme
# For some reason connecting to helper containers does not work on the Alpine VMs
skip/alpine

View File

@ -0,0 +1,8 @@
---
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
dependencies:
- setup_acme
- setup_remote_tmp_dir

View File

@ -0,0 +1,154 @@
---
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
- vars:
certificate_name: cert-1
subject_alt_name: DNS:example.com
account_email: example@example.org
block:
- name: Generate account key
openssl_privatekey:
path: "{{ remote_tmp_dir }}/account-ec256.pem"
type: ECC
curve: secp256r1
force: true
- name: Create cert private key
openssl_privatekey:
path: "{{ remote_tmp_dir }}/{{ certificate_name }}.key"
type: ECC
curve: secp256r1
force: true
- name: Create cert CSR
openssl_csr:
path: "{{ remote_tmp_dir }}/{{ certificate_name }}.csr"
privatekey_path: "{{ remote_tmp_dir }}/{{ certificate_name }}.key"
subject_alt_name: "{{ subject_alt_name }}"
- name: Start process of obtaining certificate
acme_certificate:
select_crypto_backend: "{{ select_crypto_backend }}"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: false
account_key_src: "{{ remote_tmp_dir }}/account-ec256.pem"
modify_account: true
csr: "{{ remote_tmp_dir }}/{{ certificate_name }}.csr"
dest: "{{ remote_tmp_dir }}/{{ certificate_name }}.pem"
challenge: http-01
force: true
terms_agreed: true
account_email: "{{ account_email }}"
register: certificate_data
- name: Inspect order
acme_inspect:
acme_directory: https://{{ acme_host }}:14000/dir
acme_version: 2
validate_certs: false
account_key_src: "{{ remote_tmp_dir }}/account-ec256.pem"
account_uri: "{{ certificate_data.account_uri }}"
url: "{{ certificate_data.order_uri }}"
method: get
register: order_1
- name: Show order
debug:
var: order_1.output_json
- name: Deactivate order (check mode)
acme_certificate_deactivate_authz:
acme_directory: https://{{ acme_host }}:14000/dir
acme_version: 2
validate_certs: false
account_key_src: "{{ remote_tmp_dir }}/account-ec256.pem"
account_uri: "{{ certificate_data.account_uri }}"
order_uri: "{{ certificate_data.order_uri }}"
check_mode: true
register: deactivate_1
- name: Inspect order again
acme_inspect:
acme_directory: https://{{ acme_host }}:14000/dir
acme_version: 2
validate_certs: false
account_key_src: "{{ remote_tmp_dir }}/account-ec256.pem"
account_uri: "{{ certificate_data.account_uri }}"
url: "{{ certificate_data.order_uri }}"
method: get
register: order_2
- name: Show order
debug:
var: order_2.output_json
- name: Deactivate order
acme_certificate_deactivate_authz:
acme_directory: https://{{ acme_host }}:14000/dir
acme_version: 2
validate_certs: false
account_key_src: "{{ remote_tmp_dir }}/account-ec256.pem"
account_uri: "{{ certificate_data.account_uri }}"
order_uri: "{{ certificate_data.order_uri }}"
register: deactivate_2
- name: Inspect order again
acme_inspect:
acme_directory: https://{{ acme_host }}:14000/dir
acme_version: 2
validate_certs: false
account_key_src: "{{ remote_tmp_dir }}/account-ec256.pem"
account_uri: "{{ certificate_data.account_uri }}"
url: "{{ certificate_data.order_uri }}"
method: get
register: order_3
- name: Show order
debug:
var: order_3.output_json
- name: Deactivate order (check mode, idempotent)
acme_certificate_deactivate_authz:
acme_directory: https://{{ acme_host }}:14000/dir
acme_version: 2
validate_certs: false
account_key_src: "{{ remote_tmp_dir }}/account-ec256.pem"
account_uri: "{{ certificate_data.account_uri }}"
order_uri: "{{ certificate_data.order_uri }}"
check_mode: true
register: deactivate_3
- name: Inspect order again
acme_inspect:
acme_directory: https://{{ acme_host }}:14000/dir
acme_version: 2
validate_certs: false
account_key_src: "{{ remote_tmp_dir }}/account-ec256.pem"
account_uri: "{{ certificate_data.account_uri }}"
url: "{{ certificate_data.order_uri }}"
method: get
register: order_4
- name: Show order
debug:
var: order_4.output_json
- name: Deactivate order (idempotent)
acme_certificate_deactivate_authz:
acme_directory: https://{{ acme_host }}:14000/dir
acme_version: 2
validate_certs: false
account_key_src: "{{ remote_tmp_dir }}/account-ec256.pem"
account_uri: "{{ certificate_data.account_uri }}"
order_uri: "{{ certificate_data.order_uri }}"
register: deactivate_4
- name: Inspect order again
acme_inspect:
acme_directory: https://{{ acme_host }}:14000/dir
acme_version: 2
validate_certs: false
account_key_src: "{{ remote_tmp_dir }}/account-ec256.pem"
account_uri: "{{ certificate_data.account_uri }}"
url: "{{ certificate_data.order_uri }}"
method: get
register: order_5
- name: Show order
debug:
var: order_5.output_json

View File

@ -0,0 +1,40 @@
---
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
####################################################################
# WARNING: These are designed specifically for Ansible tests #
# and should not be used as examples of how to write Ansible roles #
####################################################################
- block:
- name: Running tests with OpenSSL backend
include_tasks: impl.yml
vars:
select_crypto_backend: openssl
- import_tasks: ../tests/validate.yml
# Old 0.9.8 versions have insufficient CLI support for signing with EC keys
when: openssl_version.stdout is version('1.0.0', '>=')
- name: Remove output directory
file:
path: "{{ remote_tmp_dir }}"
state: absent
- name: Re-create output directory
file:
path: "{{ remote_tmp_dir }}"
state: directory
- block:
- name: Running tests with cryptography backend
include_tasks: impl.yml
vars:
select_crypto_backend: cryptography
- import_tasks: ../tests/validate.yml
when: cryptography_version.stdout is version('1.5', '>=')

View File

@ -0,0 +1,17 @@
---
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
- name: Checks
assert:
that:
- order_1.output_json.status == 'pending'
- deactivate_1 is changed
- order_2.output_json.status == 'pending'
- deactivate_2 is changed
- order_3.output_json.status == 'deactivated'
- deactivate_3 is not changed
- order_4.output_json.status == 'deactivated'
- deactivate_4 is not changed
- order_5.output_json.status == 'deactivated'

View File

@ -0,0 +1,10 @@
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
azp/generic/1
azp/posix/1
cloud/acme
# For some reason connecting to helper containers does not work on the Alpine VMs
skip/alpine

View File

@ -0,0 +1,8 @@
---
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
dependencies:
- setup_acme
- setup_remote_tmp_dir

View File

@ -0,0 +1,145 @@
---
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
## SET UP ACCOUNT KEYS ########################################################################
- block:
- name: Generate account keys
openssl_privatekey:
path: "{{ remote_tmp_dir }}/{{ item.name }}.pem"
type: "{{ item.type }}"
size: "{{ item.size | default(omit) }}"
curve: "{{ item.curve | default(omit) }}"
force: true
loop: "{{ account_keys }}"
vars:
account_keys:
- name: account-ec256
type: ECC
curve: secp256r1
## CREATE ACCOUNTS AND OBTAIN CERTIFICATES ####################################################
- name: Obtain cert 1
include_tasks: obtain-cert.yml
vars:
certgen_title: Certificate 1 for renewal check
certificate_name: cert-1
key_type: rsa
rsa_bits: "{{ default_rsa_key_size }}"
subject_alt_name: "DNS:example.com"
subject_alt_name_critical: false
account_key: account-ec256
challenge: http-01
modify_account: true
deactivate_authzs: false
force: true
remaining_days: "{{ omit }}"
terms_agreed: true
account_email: "example@example.org"
## OBTAIN CERTIFICATE INFOS ###################################################################
- name: Dump OpenSSL x509 info
command:
cmd: openssl x509 -in {{ remote_tmp_dir }}/cert-1.pem -noout -text
- name: Obtain certificate information
x509_certificate_info:
path: "{{ remote_tmp_dir }}/cert-1.pem"
register: cert_1_info
- name: Read certificate
slurp:
src: '{{ remote_tmp_dir }}/cert-1.pem'
register: slurp_cert_1
- name: Obtain certificate information (1/9)
acme_certificate_renewal_info:
select_crypto_backend: "{{ select_crypto_backend }}"
certificate_path: "{{ remote_tmp_dir }}/cert-1.pem"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: false
# Certificate is valid for ~1826 days
register: cert_1_renewal_1
- name: Obtain certificate information (2/9)
acme_certificate_renewal_info:
select_crypto_backend: "{{ select_crypto_backend }}"
certificate_path: "{{ remote_tmp_dir }}/cert-1.pem"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: false
# Certificate is valid for ~1826 days
remaining_days: 1000
remaining_percentage: 0.5
register: cert_1_renewal_2
- name: Obtain certificate information (3/9)
acme_certificate_renewal_info:
select_crypto_backend: "{{ select_crypto_backend }}"
certificate_content: "{{ slurp_cert_1.content | b64decode }}"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: false
now: +1800d
# Certificate is valid for ~26 days
register: cert_1_renewal_3
- name: Obtain certificate information (4/9)
acme_certificate_renewal_info:
select_crypto_backend: "{{ select_crypto_backend }}"
certificate_path: "{{ remote_tmp_dir }}/cert-1.pem"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: false
now: +1800d
# Certificate is valid for ~26 days
remaining_days: 30
remaining_percentage: 0.1
register: cert_1_renewal_4
- name: Obtain certificate information (5/9)
acme_certificate_renewal_info:
select_crypto_backend: "{{ select_crypto_backend }}"
certificate_path: "{{ remote_tmp_dir }}/cert-1.pem"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: false
now: +1800d
# Certificate is valid for ~26 days
remaining_days: 30
remaining_percentage: 0.01
register: cert_1_renewal_5
- name: Obtain certificate information (6/9)
acme_certificate_renewal_info:
select_crypto_backend: "{{ select_crypto_backend }}"
certificate_path: "{{ remote_tmp_dir }}/cert-1.pem"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: false
now: +1800d
# Certificate is valid for ~26 days
remaining_days: 10
remaining_percentage: 0.03
register: cert_1_renewal_6
- name: Obtain certificate information (7/9)
acme_certificate_renewal_info:
select_crypto_backend: "{{ select_crypto_backend }}"
certificate_path: "{{ remote_tmp_dir }}/cert-1.pem"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: false
now: +1830d
# Certificate is no longer valid
register: cert_1_renewal_7
- name: Obtain certificate information (8/9)
acme_certificate_renewal_info:
select_crypto_backend: "{{ select_crypto_backend }}"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: false
now: +1830d
# Certificate is no longer valid
register: cert_1_renewal_8
- name: Obtain certificate information (9/9)
acme_certificate_renewal_info:
select_crypto_backend: "{{ select_crypto_backend }}"
certificate_path: "{{ remote_tmp_dir }}/cert-does-not-exist.pem"
acme_version: 2
acme_directory: https://{{ acme_host }}:14000/dir
validate_certs: false
# Certificate is no longer valid
register: cert_1_renewal_9

View File

@ -0,0 +1,40 @@
---
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
####################################################################
# WARNING: These are designed specifically for Ansible tests #
# and should not be used as examples of how to write Ansible roles #
####################################################################
- block:
- name: Running tests with OpenSSL backend
include_tasks: impl.yml
vars:
select_crypto_backend: openssl
- import_tasks: ../tests/validate.yml
# Old 0.9.8 versions have insufficient CLI support for signing with EC keys
when: openssl_version.stdout is version('1.0.0', '>=')
- name: Remove output directory
file:
path: "{{ remote_tmp_dir }}"
state: absent
- name: Re-create output directory
file:
path: "{{ remote_tmp_dir }}"
state: directory
- block:
- name: Running tests with cryptography backend
include_tasks: impl.yml
vars:
select_crypto_backend: cryptography
- import_tasks: ../tests/validate.yml
when: cryptography_version.stdout is version('1.5', '>=')

View File

@ -0,0 +1 @@
../../setup_acme/tasks/obtain-cert.yml

View File

@ -0,0 +1,47 @@
---
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
- name: Validate results
assert:
that:
- cert_1_renewal_1.should_renew == false
- cert_1_renewal_1.msg == 'The certificate is still valid and no condition was reached'
- cert_1_renewal_1.supports_ari == supports_ari
- cert_1_renewal_1.cert_id is string or not can_have_cert_id
- cert_1_renewal_2.should_renew == false
- cert_1_renewal_2.msg == 'The certificate is still valid and no condition was reached'
- cert_1_renewal_2.supports_ari == supports_ari
- cert_1_renewal_2.cert_id is string or not can_have_cert_id
- cert_1_renewal_3.should_renew == false
- cert_1_renewal_3.msg == 'The certificate is still valid and no condition was reached'
- cert_1_renewal_3.supports_ari == supports_ari
- cert_1_renewal_3.cert_id is string or not can_have_cert_id
- cert_1_renewal_4.should_renew == true
- cert_1_renewal_4.msg == 'The certificate expires in 25 days'
- cert_1_renewal_4.supports_ari == supports_ari
- cert_1_renewal_4.cert_id is string or not can_have_cert_id
- cert_1_renewal_5.should_renew == true
- cert_1_renewal_5.msg == 'The certificate expires in 25 days'
- cert_1_renewal_5.supports_ari == supports_ari
- cert_1_renewal_5.cert_id is string or not can_have_cert_id
- cert_1_renewal_6.should_renew == true
- cert_1_renewal_6.msg.startswith("The remaining percentage 3.0% of the certificate's lifespan was reached on ")
- cert_1_renewal_6.supports_ari == supports_ari
- cert_1_renewal_6.cert_id is string or not can_have_cert_id
- cert_1_renewal_7.should_renew == true
- cert_1_renewal_7.msg == 'The certificate has already expired'
- cert_1_renewal_7.supports_ari == false
- cert_1_renewal_7.cert_id is string or not can_have_cert_id
- cert_1_renewal_8.should_renew == true
- cert_1_renewal_8.msg == 'No certificate was specified'
- cert_1_renewal_8.supports_ari == false
- cert_1_renewal_8.cert_id is not defined
- cert_1_renewal_9.should_renew == true
- cert_1_renewal_9.msg == 'The certificate file does not exist'
- cert_1_renewal_9.supports_ari == false
- cert_1_renewal_9.cert_id is not defined
vars:
can_have_cert_id: cert_1_info.authority_key_identifier is string
supports_ari: false

View File

@ -9,8 +9,10 @@ __metaclass__ = type
import base64
import datetime
import os
import sys
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
CryptoBackend,
)
@ -79,6 +81,12 @@ TEST_CSRS = [
TEST_CERT = load_fixture("cert_1.pem")
TEST_CERT_2 = load_fixture("cert_2.pem")
TEST_CERT_OPENSSL_OUTPUT = load_fixture("cert_1.txt") # OpenSSL 3.3.0 output
TEST_CERT_OPENSSL_OUTPUT_2 = load_fixture("cert_2.txt") # OpenSSL 3.3.0 output
TEST_CERT_OPENSSL_OUTPUT_2B = load_fixture("cert_2-b.txt") # OpenSSL 1.1.1f output
TEST_CERT_DAYS = [
@ -88,6 +96,81 @@ TEST_CERT_DAYS = [
]
TEST_CERT_INFO = CertificateInformation(
not_valid_after=datetime.datetime(2018, 11, 26, 15, 28, 24),
not_valid_before=datetime.datetime(2018, 11, 25, 15, 28, 23),
serial_number=1,
subject_key_identifier=b'\x98\xD2\xFD\x3C\xCC\xCD\x69\x45\xFB\xE2\x8C\x30\x2C\x54\x62\x18\x34\xB7\x07\x73',
authority_key_identifier=None,
)
TEST_CERT_INFO_2 = CertificateInformation(
not_valid_before=datetime.datetime(2024, 5, 4, 20, 42, 21),
not_valid_after=datetime.datetime(2029, 5, 4, 20, 42, 20),
serial_number=4218235397573492796,
subject_key_identifier=b'\x17\xE5\x83\x22\x14\xEF\x74\xD3\xBE\x7E\x30\x76\x56\x1F\x51\x74\x65\x1F\xE9\xF0',
authority_key_identifier=b'\x13\xC3\x4C\x3E\x59\x45\xDD\xE3\x63\x51\xA3\x46\x80\xC4\x08\xC7\x14\xC0\x64\x4E',
)
TEST_CERT_INFO = [
(TEST_CERT, TEST_CERT_INFO, TEST_CERT_OPENSSL_OUTPUT),
(TEST_CERT_2, TEST_CERT_INFO_2, TEST_CERT_OPENSSL_OUTPUT_2),
(TEST_CERT_2, TEST_CERT_INFO_2, TEST_CERT_OPENSSL_OUTPUT_2B),
]
TEST_PARSE_ACME_TIMESTAMP = [
(
'2024-01-01T00:11:22Z',
dict(year=2024, month=1, day=1, hour=0, minute=11, second=22),
),
(
'2024-01-01T00:11:22.123Z',
dict(year=2024, month=1, day=1, hour=0, minute=11, second=22, microsecond=123000),
),
(
'2024-04-17T06:54:13.333333334Z',
dict(year=2024, month=4, day=17, hour=6, minute=54, second=13, microsecond=333333),
),
]
if sys.version_info >= (3, 5):
TEST_PARSE_ACME_TIMESTAMP.extend([
(
'2024-01-01T00:11:22+0100',
dict(year=2023, month=12, day=31, hour=23, minute=11, second=22),
),
(
'2024-01-01T00:11:22.123+0100',
dict(year=2023, month=12, day=31, hour=23, minute=11, second=22, microsecond=123000),
),
])
TEST_INTERPOLATE_TIMESTAMP = [
(
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
0.0,
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
),
(
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
0.5,
dict(year=2024, month=1, day=1, hour=0, minute=30, second=0),
),
(
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
1.0,
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
),
]
class FakeBackend(CryptoBackend):
def parse_key(self, key_file=None, key_content=None, passphrase=None):
raise BackendException('Not implemented in fake backend')
@ -98,6 +181,9 @@ class FakeBackend(CryptoBackend):
def create_mac_key(self, alg, key):
raise BackendException('Not implemented in fake backend')
def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
raise BackendException('Not implemented in fake backend')
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
raise BackendException('Not implemented in fake backend')
@ -106,3 +192,6 @@ class FakeBackend(CryptoBackend):
def create_chain_matcher(self, criterium):
raise BackendException('Not implemented in fake backend')
def get_cert_information(self, cert_filename=None, cert_content=None):
raise BackendException('Not implemented in fake backend')

View File

@ -0,0 +1,38 @@
Certificate:
Data:
Version: 3 (0x2)
Serial Number: 1 (0x1)
Signature Algorithm: ecdsa-with-SHA256
Issuer: CN=ansible.com
Validity
Not Before: Nov 25 15:28:23 2018 GMT
Not After : Nov 26 15:28:24 2018 GMT
Subject: CN=ansible.com
Subject Public Key Info:
Public Key Algorithm: id-ecPublicKey
Public-Key: (256 bit)
pub:
04:00:9c:f4:c8:00:17:03:01:26:3a:14:d1:92:35:
f1:c2:07:9d:6d:63:ba:82:86:d8:33:79:56:b3:3a:
d2:eb:c1:bc:41:2c:e1:5d:1e:80:99:0d:c8:cd:90:
e2:9a:74:d3:5c:ee:d7:85:5c:a5:0d:3f:12:2f:31:
38:e3:f1:29:9b
ASN1 OID: prime256v1
NIST CURVE: P-256
X509v3 extensions:
X509v3 Subject Alternative Name:
DNS:example.com, DNS:example.org
X509v3 Basic Constraints: critical
CA:FALSE
X509v3 Key Usage: critical
Digital Signature
X509v3 Extended Key Usage:
TLS Web Server Authentication
X509v3 Subject Key Identifier:
98:D2:FD:3C:CC:CD:69:45:FB:E2:8C:30:2C:54:62:18:34:B7:07:73
Signature Algorithm: ecdsa-with-SHA256
Signature Value:
30:46:02:21:00:bc:fb:52:bf:7a:93:2d:0e:7c:ce:43:f4:cc:
05:98:28:36:8d:c7:2a:9b:f5:20:94:62:3d:fb:82:9e:38:42:
32:02:21:00:c0:55:f8:b5:d9:65:41:2a:dd:d4:76:3f:8c:cb:
07:c1:d2:b9:c0:7d:c9:90:af:fd:f9:f1:b0:c9:13:f5:d5:52

View File

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: Ansible Project

View File

@ -0,0 +1,57 @@
Certificate:
Data:
Version: 3 (0x2)
Serial Number: 4218235397573492796 (0x3a8a2ebeb358c03c)
Signature Algorithm: sha256WithRSAEncryption
Issuer: CN = Pebble Intermediate CA 734609
Validity
Not Before: May 4 20:42:21 2024 GMT
Not After : May 4 20:42:20 2029 GMT
Subject: CN = example.com
Subject Public Key Info:
Public Key Algorithm: rsaEncryption
RSA Public-Key: (1024 bit)
Modulus:
00:c1:43:a5:f9:ad:00:b7:bb:1b:73:27:00:b3:a2:
4e:27:0d:ff:ae:64:3e:a0:7e:f9:28:56:48:47:21:
9e:0f:d8:fb:69:b5:21:e8:98:84:60:6c:aa:73:b9:
6e:d9:f6:19:ad:85:e0:c2:f6:80:d3:22:b8:5a:d6:
3a:89:3e:2a:7a:fc:1d:bf:fc:69:20:e5:91:b8:34:
52:26:c8:15:74:e1:36:0c:cd:ab:01:4a:ad:83:f5:
0b:77:96:31:cf:1c:ea:6f:88:75:23:ac:51:a6:d8:
77:43:1b:b3:44:93:2c:8d:05:25:fb:77:41:36:94:
81:d5:ca:56:ff:b5:23:b2:a5
Exponent: 65537 (0x10001)
X509v3 extensions:
X509v3 Key Usage: critical
Digital Signature, Key Encipherment
X509v3 Extended Key Usage:
TLS Web Server Authentication, TLS Web Client Authentication
X509v3 Basic Constraints: critical
CA:FALSE
X509v3 Subject Key Identifier:
17:E5:83:22:14:EF:74:D3:BE:7E:30:76:56:1F:51:74:65:1F:E9:F0
X509v3 Authority Key Identifier:
keyid:13:C3:4C:3E:59:45:DD:E3:63:51:A3:46:80:C4:08:C7:14:C0:64:4E
Authority Information Access:
OCSP - URI:http://10.88.0.74:5000/ocsp
X509v3 Subject Alternative Name:
DNS:example.com
Signature Algorithm: sha256WithRSAEncryption
31:43:de:b6:48:f4:b8:30:46:25:65:e6:91:22:33:1b:d1:ba:
3f:60:f8:c3:18:32:72:e9:f8:d1:88:11:5a:0a:86:dc:1d:6d:
a5:ea:58:cd:05:ea:cd:5e:40:86:c1:ae:d5:cd:2e:8a:ca:50:
ee:df:bd:cf:6c:d9:20:3b:4b:49:f8:d5:8a:e3:be:f3:dd:24:
b2:7f:3f:3b:bf:e6:8d:7a:f8:8f:4b:6e:25:60:80:33:6f:0f:
53:b7:7d:94:2a:d2:4a:db:3a:2f:70:79:d7:bf:05:ed:df:10:
61:e7:24:ac:b2:fc:03:bd:ad:8c:e1:f3:1d:cc:78:99:e3:22:
59:bf:c5:92:57:95:92:56:35:fc:05:8b:26:10:c5:1b:87:17:
64:0b:bd:33:a9:54:d5:c0:2b:43:56:1b:52:d3:4f:8b:6f:25:
06:58:7f:6f:aa:27:35:05:d5:57:6d:83:a0:73:de:40:3f:67:
1c:5a:92:c6:37:e6:8f:c7:b8:91:d7:50:b9:4d:d4:f2:92:1f:
8b:93:0c:e2:b4:b8:d7:1d:8e:ce:6d:19:dc:8f:12:8e:c0:f2:
92:3b:95:5a:8c:c8:69:0e:0b:f7:fa:1f:55:62:80:7c:e2:f6:
41:3f:7d:69:36:9e:7c:90:7e:d7:3b:e6:a3:15:de:a4:7d:95:
13:46:c6:1a

View File

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: Ansible Project

View File

@ -0,0 +1,19 @@
-----BEGIN CERTIFICATE-----
MIIDDjCCAfagAwIBAgIIOoouvrNYwDwwDQYJKoZIhvcNAQELBQAwKDEmMCQGA1UE
AxMdUGViYmxlIEludGVybWVkaWF0ZSBDQSA3MzQ2MDkwHhcNMjQwNTA0MjA0MjIx
WhcNMjkwNTA0MjA0MjIwWjAWMRQwEgYDVQQDEwtleGFtcGxlLmNvbTCBnzANBgkq
hkiG9w0BAQEFAAOBjQAwgYkCgYEAwUOl+a0At7sbcycAs6JOJw3/rmQ+oH75KFZI
RyGeD9j7abUh6JiEYGyqc7lu2fYZrYXgwvaA0yK4WtY6iT4qevwdv/xpIOWRuDRS
JsgVdOE2DM2rAUqtg/ULd5Yxzxzqb4h1I6xRpth3QxuzRJMsjQUl+3dBNpSB1cpW
/7UjsqUCAwEAAaOB0TCBzjAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYB
BQUHAwEGCCsGAQUFBwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFBflgyIU73TT
vn4wdlYfUXRlH+nwMB8GA1UdIwQYMBaAFBPDTD5ZRd3jY1GjRoDECMcUwGROMDcG
CCsGAQUFBwEBBCswKTAnBggrBgEFBQcwAYYbaHR0cDovLzEwLjg4LjAuNzQ6NTAw
MC9vY3NwMBYGA1UdEQQPMA2CC2V4YW1wbGUuY29tMA0GCSqGSIb3DQEBCwUAA4IB
AQAxQ962SPS4MEYlZeaRIjMb0bo/YPjDGDJy6fjRiBFaCobcHW2l6ljNBerNXkCG
wa7VzS6KylDu373PbNkgO0tJ+NWK477z3SSyfz87v+aNeviPS24lYIAzbw9Tt32U
KtJK2zovcHnXvwXt3xBh5ySssvwDva2M4fMdzHiZ4yJZv8WSV5WSVjX8BYsmEMUb
hxdkC70zqVTVwCtDVhtS00+LbyUGWH9vqic1BdVXbYOgc95AP2ccWpLGN+aPx7iR
11C5TdTykh+LkwzitLjXHY7ObRncjxKOwPKSO5VajMhpDgv3+h9VYoB84vZBP31p
Np58kH7XO+ajFd6kfZUTRsYa
-----END CERTIFICATE-----

View File

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: Ansible Project

View File

@ -0,0 +1,56 @@
Certificate:
Data:
Version: 3 (0x2)
Serial Number: 4218235397573492796 (0x3a8a2ebeb358c03c)
Signature Algorithm: sha256WithRSAEncryption
Issuer: CN=Pebble Intermediate CA 734609
Validity
Not Before: May 4 20:42:21 2024 GMT
Not After : May 4 20:42:20 2029 GMT
Subject: CN=example.com
Subject Public Key Info:
Public Key Algorithm: rsaEncryption
Public-Key: (1024 bit)
Modulus:
00:c1:43:a5:f9:ad:00:b7:bb:1b:73:27:00:b3:a2:
4e:27:0d:ff:ae:64:3e:a0:7e:f9:28:56:48:47:21:
9e:0f:d8:fb:69:b5:21:e8:98:84:60:6c:aa:73:b9:
6e:d9:f6:19:ad:85:e0:c2:f6:80:d3:22:b8:5a:d6:
3a:89:3e:2a:7a:fc:1d:bf:fc:69:20:e5:91:b8:34:
52:26:c8:15:74:e1:36:0c:cd:ab:01:4a:ad:83:f5:
0b:77:96:31:cf:1c:ea:6f:88:75:23:ac:51:a6:d8:
77:43:1b:b3:44:93:2c:8d:05:25:fb:77:41:36:94:
81:d5:ca:56:ff:b5:23:b2:a5
Exponent: 65537 (0x10001)
X509v3 extensions:
X509v3 Key Usage: critical
Digital Signature, Key Encipherment
X509v3 Extended Key Usage:
TLS Web Server Authentication, TLS Web Client Authentication
X509v3 Basic Constraints: critical
CA:FALSE
X509v3 Subject Key Identifier:
17:E5:83:22:14:EF:74:D3:BE:7E:30:76:56:1F:51:74:65:1F:E9:F0
X509v3 Authority Key Identifier:
13:C3:4C:3E:59:45:DD:E3:63:51:A3:46:80:C4:08:C7:14:C0:64:4E
Authority Information Access:
OCSP - URI:http://10.88.0.74:5000/ocsp
X509v3 Subject Alternative Name:
DNS:example.com
Signature Algorithm: sha256WithRSAEncryption
Signature Value:
31:43:de:b6:48:f4:b8:30:46:25:65:e6:91:22:33:1b:d1:ba:
3f:60:f8:c3:18:32:72:e9:f8:d1:88:11:5a:0a:86:dc:1d:6d:
a5:ea:58:cd:05:ea:cd:5e:40:86:c1:ae:d5:cd:2e:8a:ca:50:
ee:df:bd:cf:6c:d9:20:3b:4b:49:f8:d5:8a:e3:be:f3:dd:24:
b2:7f:3f:3b:bf:e6:8d:7a:f8:8f:4b:6e:25:60:80:33:6f:0f:
53:b7:7d:94:2a:d2:4a:db:3a:2f:70:79:d7:bf:05:ed:df:10:
61:e7:24:ac:b2:fc:03:bd:ad:8c:e1:f3:1d:cc:78:99:e3:22:
59:bf:c5:92:57:95:92:56:35:fc:05:8b:26:10:c5:1b:87:17:
64:0b:bd:33:a9:54:d5:c0:2b:43:56:1b:52:d3:4f:8b:6f:25:
06:58:7f:6f:aa:27:35:05:d5:57:6d:83:a0:73:de:40:3f:67:
1c:5a:92:c6:37:e6:8f:c7:b8:91:d7:50:b9:4d:d4:f2:92:1f:
8b:93:0c:e2:b4:b8:d7:1d:8e:ce:6d:19:dc:8f:12:8e:c0:f2:
92:3b:95:5a:8c:c8:69:0e:0b:f7:fa:1f:55:62:80:7c:e2:f6:
41:3f:7d:69:36:9e:7c:90:7e:d7:3b:e6:a3:15:de:a4:7d:95:
13:46:c6:1a

View File

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: Ansible Project

View File

@ -16,11 +16,22 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryp
CryptographyBackend,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
ensure_utc_timezone,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
)
from .backend_data import (
TEST_KEYS,
TEST_CSRS,
TEST_CERT,
TEST_CERT_DAYS,
TEST_CERT_INFO,
TEST_PARSE_ACME_TIMESTAMP,
TEST_INTERPOLATE_TIMESTAMP,
)
@ -64,3 +75,49 @@ def test_certdays_cryptography(now, expected_days, tmpdir):
assert days == expected_days
days = backend.get_cert_days(cert_content=TEST_CERT, now=now)
assert days == expected_days
@pytest.mark.parametrize("cert_content, expected_cert_info, openssl_output", TEST_CERT_INFO)
def test_get_cert_information(cert_content, expected_cert_info, openssl_output, tmpdir):
fn = tmpdir / 'test-cert.pem'
fn.write(cert_content)
module = MagicMock()
backend = CryptographyBackend(module)
if CRYPTOGRAPHY_TIMEZONE:
expected_cert_info = expected_cert_info._replace(
not_valid_after=ensure_utc_timezone(expected_cert_info.not_valid_after),
not_valid_before=ensure_utc_timezone(expected_cert_info.not_valid_before),
)
cert_info = backend.get_cert_information(cert_filename=str(fn))
assert cert_info == expected_cert_info
cert_info = backend.get_cert_information(cert_content=cert_content)
assert cert_info == expected_cert_info
def test_now():
module = MagicMock()
backend = CryptographyBackend(module)
now = backend.get_now()
assert CRYPTOGRAPHY_TIMEZONE == (now.tzinfo is not None)
@pytest.mark.parametrize("input, expected", TEST_PARSE_ACME_TIMESTAMP)
def test_parse_acme_timestamp(input, expected):
module = MagicMock()
backend = CryptographyBackend(module)
ts_expected = backend.get_utc_datetime(**expected)
timestamp = backend.parse_acme_timestamp(input)
assert ts_expected == timestamp
@pytest.mark.parametrize("start, end, percentage, expected", TEST_INTERPOLATE_TIMESTAMP)
def test_interpolate_timestamp(start, end, percentage, expected):
module = MagicMock()
backend = CryptographyBackend(module)
ts_start = backend.get_utc_datetime(**start)
ts_end = backend.get_utc_datetime(**end)
ts_expected = backend.get_utc_datetime(**expected)
timestamp = backend.interpolate_timestamp(ts_start, ts_end, percentage)
assert ts_expected == timestamp

View File

@ -18,6 +18,12 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.backend_open
from .backend_data import (
TEST_KEYS,
TEST_CSRS,
TEST_CERT,
TEST_CERT_OPENSSL_OUTPUT,
TEST_CERT_DAYS,
TEST_CERT_INFO,
TEST_PARSE_ACME_TIMESTAMP,
TEST_INTERPOLATE_TIMESTAMP,
)
@ -61,3 +67,56 @@ def test_normalize_ip(ip, result):
module = MagicMock()
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
assert backend._normalize_ip(ip) == result
@pytest.mark.parametrize("now, expected_days", TEST_CERT_DAYS)
def test_certdays_cryptography(now, expected_days, tmpdir):
fn = tmpdir / 'test-cert.pem'
fn.write(TEST_CERT)
module = MagicMock()
module.run_command = MagicMock(return_value=(0, TEST_CERT_OPENSSL_OUTPUT, 0))
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
days = backend.get_cert_days(cert_filename=str(fn), now=now)
assert days == expected_days
days = backend.get_cert_days(cert_content=TEST_CERT, now=now)
assert days == expected_days
@pytest.mark.parametrize("cert_content, expected_cert_info, openssl_output", TEST_CERT_INFO)
def test_get_cert_information(cert_content, expected_cert_info, openssl_output, tmpdir):
fn = tmpdir / 'test-cert.pem'
fn.write(cert_content)
module = MagicMock()
module.run_command = MagicMock(return_value=(0, openssl_output, 0))
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
cert_info = backend.get_cert_information(cert_filename=str(fn))
assert cert_info == expected_cert_info
cert_info = backend.get_cert_information(cert_content=cert_content)
assert cert_info == expected_cert_info
def test_now():
module = MagicMock()
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
now = backend.get_now()
assert now.tzinfo is None
@pytest.mark.parametrize("input, expected", TEST_PARSE_ACME_TIMESTAMP)
def test_parse_acme_timestamp(input, expected):
module = MagicMock()
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
ts_expected = backend.get_utc_datetime(**expected)
timestamp = backend.parse_acme_timestamp(input)
assert ts_expected == timestamp
@pytest.mark.parametrize("start, end, percentage, expected", TEST_INTERPOLATE_TIMESTAMP)
def test_interpolate_timestamp(start, end, percentage, expected):
module = MagicMock()
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
ts_start = backend.get_utc_datetime(**start)
ts_end = backend.get_utc_datetime(**end)
ts_expected = backend.get_utc_datetime(**expected)
timestamp = backend.interpolate_timestamp(ts_start, ts_end, percentage)
assert ts_expected == timestamp

View File

@ -6,12 +6,20 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import datetime
import pytest
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
nopad_b64,
pem_to_der,
process_links,
parse_retry_after,
compute_cert_id,
)
from .backend_data import (
@ -27,6 +35,73 @@ NOPAD_B64 = [
]
TEST_LINKS_HEADER = [
(
{},
[],
),
(
{
'link': '<foo>; rel="bar"'
},
[
('foo', 'bar'),
],
),
(
{
'link': '<foo>; rel="bar", <baz>; rel="bam"'
},
[
('foo', 'bar'),
('baz', 'bam'),
],
),
(
{
'link': '<https://one.example.com>; rel="preconnect", <https://two.example.com>; rel="preconnect", <https://three.example.com>; rel="preconnect"'
},
[
('https://one.example.com', 'preconnect'),
('https://two.example.com', 'preconnect'),
('https://three.example.com', 'preconnect'),
],
),
]
TEST_RETRY_AFTER_HEADER = [
('120', datetime.datetime(2024, 4, 29, 0, 2, 0)),
('Wed, 21 Oct 2015 07:28:00 GMT', datetime.datetime(2015, 10, 21, 7, 28, 0)),
]
TEST_COMPUTE_CERT_ID = [
(
CertificateInformation(
not_valid_after=datetime.datetime(2018, 11, 26, 15, 28, 24),
not_valid_before=datetime.datetime(2018, 11, 25, 15, 28, 23),
serial_number=1,
subject_key_identifier=None,
authority_key_identifier=b'\x00\xff',
),
'AP8.AQ',
),
(
# AKI, serial number, and expected result taken from
# https://letsencrypt.org/2024/04/25/guide-to-integrating-ari-into-existing-acme-clients.html#step-3-constructing-the-ari-certid
CertificateInformation(
not_valid_after=datetime.datetime(2018, 11, 26, 15, 28, 24),
not_valid_before=datetime.datetime(2018, 11, 25, 15, 28, 23),
serial_number=0x87654321,
subject_key_identifier=None,
authority_key_identifier=b'\x69\x88\x5B\x6B\x87\x46\x40\x41\xE1\xB3\x7B\x84\x7B\xA0\xAE\x2C\xDE\x01\xC8\xD4',
),
'aYhba4dGQEHhs3uEe6CuLN4ByNQ.AIdlQyE',
),
]
@pytest.mark.parametrize("value, result", NOPAD_B64)
def test_nopad_b64(value, result):
assert nopad_b64(value.encode('utf-8')) == result
@ -37,3 +112,25 @@ def test_pem_to_der(pem, der, tmpdir):
fn = tmpdir / 'test.pem'
fn.write(pem)
assert pem_to_der(str(fn)) == der
@pytest.mark.parametrize("value, expected_result", TEST_LINKS_HEADER)
def test_process_links(value, expected_result):
data = []
def callback(url, rel):
data.append((url, rel))
process_links(value, callback)
assert expected_result == data
@pytest.mark.parametrize("value, expected_result", TEST_RETRY_AFTER_HEADER)
def test_parse_retry_after(value, expected_result):
assert expected_result == parse_retry_after(value, now=datetime.datetime(2024, 4, 29, 0, 0, 0))
@pytest.mark.parametrize("cert_info, expected_result", TEST_COMPUTE_CERT_ID)
def test_compute_cert_id(cert_info, expected_result):
assert expected_result == compute_cert_id(backend=None, cert_info=cert_info)

View File

@ -15,6 +15,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.math impor
quick_is_not_prime,
convert_int_to_bytes,
convert_int_to_hex,
convert_bytes_to_int,
)
@ -100,3 +101,17 @@ def test_convert_int_to_hex(no, digits, result):
value = convert_int_to_hex(no, digits=digits)
print(value)
assert value == result
@pytest.mark.parametrize('data, result', [
(b'', 0),
(b'\x00', 0),
(b'\x00\x01', 1),
(b'\x01', 1),
(b'\xff', 255),
(b'\x01\x00', 256),
])
def test_convert_bytes_to_int(data, result):
value = convert_bytes_to_int(data)
print(value)
assert value == result

View File

@ -0,0 +1,323 @@
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import datetime
import sys
import pytest
from ansible_collections.community.crypto.plugins.module_utils.time import (
add_or_remove_timezone,
get_now_datetime,
convert_relative_to_datetime,
ensure_utc_timezone,
from_epoch_seconds,
get_epoch_seconds,
get_relative_time_option,
remove_timezone,
UTC,
)
TEST_REMOVE_TIMEZONE = [
(
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC),
datetime.datetime(2024, 1, 1, 0, 1, 2),
),
(
datetime.datetime(2024, 1, 1, 0, 1, 2),
datetime.datetime(2024, 1, 1, 0, 1, 2),
),
]
TEST_UTC_TIMEZONE = [
(
datetime.datetime(2024, 1, 1, 0, 1, 2),
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC),
),
(
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC),
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC),
),
]
TEST_EPOCH_SECONDS = [
(0, dict(year=1970, day=1, month=1, hour=0, minute=0, second=0, microsecond=0)),
(1E-6, dict(year=1970, day=1, month=1, hour=0, minute=0, second=0, microsecond=1)),
(1E-3, dict(year=1970, day=1, month=1, hour=0, minute=0, second=0, microsecond=1000)),
(3691.2, dict(year=1970, day=1, month=1, hour=1, minute=1, second=31, microsecond=200000)),
]
TEST_EPOCH_TO_SECONDS = [
(datetime.datetime(1970, 1, 1, 0, 1, 2, 0), 62),
(datetime.datetime(1970, 1, 1, 0, 1, 2, 0, tzinfo=UTC), 62),
]
TEST_CONVERT_RELATIVE_TO_DATETIME = [
(
'+0',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 1, 0, 0, 0),
),
(
'+1s',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC),
datetime.datetime(2024, 1, 1, 0, 0, 1),
),
(
'-10w20d30h40m50s',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC),
datetime.datetime(2023, 10, 1, 17, 19, 10),
),
(
'+0',
True,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC),
),
(
'+1s',
True,
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC),
datetime.datetime(2024, 1, 1, 0, 0, 1, tzinfo=UTC),
),
(
'-10w20d30h40m50s',
True,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2023, 10, 1, 17, 19, 10, tzinfo=UTC),
),
]
TEST_GET_RELATIVE_TIME_OPTION = [
(
'+1d2h3m4s',
'foo',
'cryptography',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 2, 2, 3, 4),
),
(
'-1w10d24h',
'foo',
'cryptography',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2023, 12, 14, 0, 0, 0),
),
(
'20240102040506Z',
'foo',
'cryptography',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 2, 4, 5, 6),
),
(
'202401020405Z',
'foo',
'cryptography',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 2, 4, 5, 0),
),
(
'+1d2h3m4s',
'foo',
'cryptography',
True,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 2, 2, 3, 4, tzinfo=UTC),
),
(
'-1w10d24h',
'foo',
'cryptography',
True,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2023, 12, 14, 0, 0, 0, tzinfo=UTC),
),
(
'20240102040506Z',
'foo',
'cryptography',
True,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 2, 4, 5, 6, tzinfo=UTC),
),
(
'202401020405Z',
'foo',
'cryptography',
True,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 2, 4, 5, 0, tzinfo=UTC),
),
(
'+1d2h3m4s',
'foo',
'pyopenssl',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
'20240102020304Z',
),
(
'-1w10d24h',
'foo',
'pyopenssl',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
'20231214000000Z',
),
(
'20240102040506Z',
'foo',
'pyopenssl',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
'20240102040506Z',
),
(
'202401020405Z',
'foo',
'pyopenssl',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
'202401020405Z',
),
]
if sys.version_info >= (3, 5):
ONE_HOUR_PLUS = datetime.timezone(datetime.timedelta(hours=1))
TEST_REMOVE_TIMEZONE.extend([
(
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=ONE_HOUR_PLUS),
datetime.datetime(2023, 12, 31, 23, 1, 2),
),
])
TEST_UTC_TIMEZONE.extend([
(
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=ONE_HOUR_PLUS),
datetime.datetime(2023, 12, 31, 23, 1, 2, tzinfo=UTC),
),
])
TEST_EPOCH_TO_SECONDS.extend([
(datetime.datetime(1970, 1, 1, 0, 1, 2, 0, tzinfo=ONE_HOUR_PLUS), 62 - 3600),
])
TEST_GET_RELATIVE_TIME_OPTION.extend([
(
'20240102040506+0100',
'foo',
'cryptography',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 2, 3, 5, 6),
),
(
'202401020405+0100',
'foo',
'cryptography',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 2, 3, 5, 0),
),
(
'20240102040506+0100',
'foo',
'cryptography',
True,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 2, 3, 5, 6, tzinfo=UTC),
),
(
'202401020405+0100',
'foo',
'cryptography',
True,
datetime.datetime(2024, 1, 1, 0, 0, 0),
datetime.datetime(2024, 1, 2, 3, 5, 0, tzinfo=UTC),
),
(
'20240102040506+0100',
'foo',
'pyopenssl',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
'20240102040506+0100',
),
(
'202401020405+0100',
'foo',
'pyopenssl',
False,
datetime.datetime(2024, 1, 1, 0, 0, 0),
'202401020405+0100',
),
])
@pytest.mark.parametrize("input, expected", TEST_REMOVE_TIMEZONE)
def test_remove_timezone(input, expected):
output_1 = remove_timezone(input)
assert expected == output_1
output_2 = add_or_remove_timezone(input, with_timezone=False)
assert expected == output_2
@pytest.mark.parametrize("input, expected", TEST_UTC_TIMEZONE)
def test_utc_timezone(input, expected):
output_1 = ensure_utc_timezone(input)
assert expected == output_1
output_2 = add_or_remove_timezone(input, with_timezone=True)
assert expected == output_2
def test_get_now_datetime():
output_1 = get_now_datetime(with_timezone=False)
assert output_1.tzinfo is None
output_2 = get_now_datetime(with_timezone=True)
assert output_2.tzinfo is not None
assert output_2.tzinfo == UTC
@pytest.mark.parametrize("seconds, timestamp", TEST_EPOCH_SECONDS)
def test_epoch_seconds(seconds, timestamp):
ts_wo_tz = datetime.datetime(**timestamp)
assert seconds == get_epoch_seconds(ts_wo_tz)
timestamp_w_tz = dict(timestamp)
timestamp_w_tz['tzinfo'] = UTC
ts_w_tz = datetime.datetime(**timestamp_w_tz)
assert seconds == get_epoch_seconds(ts_w_tz)
output_1 = from_epoch_seconds(seconds, with_timezone=False)
assert ts_wo_tz == output_1
output_2 = from_epoch_seconds(seconds, with_timezone=True)
assert ts_w_tz == output_2
@pytest.mark.parametrize("timestamp, expected_seconds", TEST_EPOCH_TO_SECONDS)
def test_epoch_to_seconds(timestamp, expected_seconds):
assert expected_seconds == get_epoch_seconds(timestamp)
@pytest.mark.parametrize("relative_time_string, with_timezone, now, expected", TEST_CONVERT_RELATIVE_TO_DATETIME)
def test_convert_relative_to_datetime(relative_time_string, with_timezone, now, expected):
output = convert_relative_to_datetime(relative_time_string, with_timezone=with_timezone, now=now)
assert expected == output
@pytest.mark.parametrize("input_string, input_name, backend, with_timezone, now, expected", TEST_GET_RELATIVE_TIME_OPTION)
def test_get_relative_time_option(input_string, input_name, backend, with_timezone, now, expected):
output = get_relative_time_option(input_string, input_name, backend=backend, with_timezone=with_timezone, now=now)
assert expected == output