Explicitly use UTC timezone in ACME OpenSSL backend (#811)
* Allow abstract backend class to handle both with and without timezone. * Explicitly use UTC timezone in OpenSSL backend code.pull/815/head
parent
feee571bc8
commit
6731b38baa
|
@ -0,0 +1,2 @@
|
||||||
|
bugfixes:
|
||||||
|
- "acme_* modules - when using the OpenSSL backend, explicitly use the UTC timezone in Python code (https://github.com/ansible-collections/community.crypto/pull/811)."
|
|
@ -11,7 +11,6 @@ __metaclass__ = type
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
import binascii
|
import binascii
|
||||||
import datetime
|
|
||||||
import os
|
import os
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
@ -22,7 +21,6 @@ from ansible_collections.community.crypto.plugins.module_utils.version import Lo
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
|
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
|
||||||
CertificateInformation,
|
CertificateInformation,
|
||||||
CryptoBackend,
|
CryptoBackend,
|
||||||
_parse_acme_timestamp,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import (
|
from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import (
|
||||||
|
@ -38,10 +36,6 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.io import re
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64
|
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
|
||||||
OpenSSLObjectError,
|
|
||||||
)
|
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import (
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import (
|
||||||
convert_int_to_bytes,
|
convert_int_to_bytes,
|
||||||
convert_int_to_hex,
|
convert_int_to_hex,
|
||||||
|
@ -64,12 +58,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
|
||||||
)
|
)
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||||
ensure_utc_timezone,
|
add_or_remove_timezone,
|
||||||
from_epoch_seconds,
|
|
||||||
get_epoch_seconds,
|
|
||||||
get_now_datetime,
|
|
||||||
get_relative_time_option,
|
|
||||||
UTC,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
CRYPTOGRAPHY_MINIMAL_VERSION = '1.5'
|
CRYPTOGRAPHY_MINIMAL_VERSION = '1.5'
|
||||||
|
@ -184,33 +173,7 @@ class CryptographyChainMatcher(ChainMatcher):
|
||||||
|
|
||||||
class CryptographyBackend(CryptoBackend):
|
class CryptographyBackend(CryptoBackend):
|
||||||
def __init__(self, module):
|
def __init__(self, module):
|
||||||
super(CryptographyBackend, self).__init__(module)
|
super(CryptographyBackend, self).__init__(module, with_timezone=CRYPTOGRAPHY_TIMEZONE)
|
||||||
|
|
||||||
def get_now(self):
|
|
||||||
return get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
|
|
||||||
|
|
||||||
def parse_acme_timestamp(self, timestamp_str):
|
|
||||||
return _parse_acme_timestamp(timestamp_str, with_timezone=CRYPTOGRAPHY_TIMEZONE)
|
|
||||||
|
|
||||||
def parse_module_parameter(self, value, name):
|
|
||||||
try:
|
|
||||||
return get_relative_time_option(value, name, backend='cryptography', with_timezone=CRYPTOGRAPHY_TIMEZONE)
|
|
||||||
except OpenSSLObjectError as exc:
|
|
||||||
raise BackendException(to_native(exc))
|
|
||||||
|
|
||||||
def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
|
|
||||||
start = get_epoch_seconds(timestamp_start)
|
|
||||||
end = get_epoch_seconds(timestamp_end)
|
|
||||||
return from_epoch_seconds(start + percentage * (end - start), with_timezone=CRYPTOGRAPHY_TIMEZONE)
|
|
||||||
|
|
||||||
def get_utc_datetime(self, *args, **kwargs):
|
|
||||||
kwargs_ext = dict(kwargs)
|
|
||||||
if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' not in kwargs_ext and len(args) < 8):
|
|
||||||
kwargs_ext['tzinfo'] = UTC
|
|
||||||
result = datetime.datetime(*args, **kwargs_ext)
|
|
||||||
if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' in kwargs or len(args) >= 8):
|
|
||||||
result = ensure_utc_timezone(result)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def parse_key(self, key_file=None, key_content=None, passphrase=None):
|
def parse_key(self, key_file=None, key_content=None, passphrase=None):
|
||||||
'''
|
'''
|
||||||
|
@ -419,8 +382,8 @@ class CryptographyBackend(CryptoBackend):
|
||||||
|
|
||||||
if now is None:
|
if now is None:
|
||||||
now = self.get_now()
|
now = self.get_now()
|
||||||
elif CRYPTOGRAPHY_TIMEZONE:
|
else:
|
||||||
now = ensure_utc_timezone(now)
|
now = add_or_remove_timezone(now, with_timezone=CRYPTOGRAPHY_TIMEZONE)
|
||||||
return (get_not_valid_after(cert) - now).days
|
return (get_not_valid_after(cert) - now).days
|
||||||
|
|
||||||
def create_chain_matcher(self, criterium):
|
def create_chain_matcher(self, criterium):
|
||||||
|
|
|
@ -33,6 +33,8 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.utils import
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_bytes_to_int
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_bytes_to_int
|
||||||
|
|
||||||
|
from ansible_collections.community.crypto.plugins.module_utils.time import ensure_utc_timezone
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import ipaddress
|
import ipaddress
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
@ -45,7 +47,11 @@ _OPENSSL_ENVIRONMENT_UPDATE = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C', LC_CTY
|
||||||
def _extract_date(out_text, name, cert_filename_suffix=""):
|
def _extract_date(out_text, name, cert_filename_suffix=""):
|
||||||
try:
|
try:
|
||||||
date_str = re.search(r"\s+%s\s*:\s+(.*)" % name, out_text).group(1)
|
date_str = re.search(r"\s+%s\s*:\s+(.*)" % name, out_text).group(1)
|
||||||
return datetime.datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z')
|
# For some reason Python's strptime() doesn't return any timezone information,
|
||||||
|
# even though the information is there and a supported timezone for all supported
|
||||||
|
# Python implementations (GMT). So we have to modify the datetime object by
|
||||||
|
# replacing it by UTC.
|
||||||
|
return ensure_utc_timezone(datetime.datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z'))
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
raise BackendException("No '{0}' date found{1}".format(name, cert_filename_suffix))
|
raise BackendException("No '{0}' date found{1}".format(name, cert_filename_suffix))
|
||||||
except ValueError as exc:
|
except ValueError as exc:
|
||||||
|
@ -71,7 +77,7 @@ def _extract_octets(out_text, name, required=True, potential_prefixes=None):
|
||||||
|
|
||||||
class OpenSSLCLIBackend(CryptoBackend):
|
class OpenSSLCLIBackend(CryptoBackend):
|
||||||
def __init__(self, module, openssl_binary=None):
|
def __init__(self, module, openssl_binary=None):
|
||||||
super(OpenSSLCLIBackend, self).__init__(module)
|
super(OpenSSLCLIBackend, self).__init__(module, with_timezone=True)
|
||||||
if openssl_binary is None:
|
if openssl_binary is None:
|
||||||
openssl_binary = module.get_bin_path('openssl', True)
|
openssl_binary = module.get_bin_path('openssl', True)
|
||||||
self.openssl_binary = openssl_binary
|
self.openssl_binary = openssl_binary
|
||||||
|
@ -340,7 +346,9 @@ class OpenSSLCLIBackend(CryptoBackend):
|
||||||
out_text = to_text(out, errors='surrogate_or_strict')
|
out_text = to_text(out, errors='surrogate_or_strict')
|
||||||
not_after = _extract_date(out_text, 'Not After', cert_filename_suffix=cert_filename_suffix)
|
not_after = _extract_date(out_text, 'Not After', cert_filename_suffix=cert_filename_suffix)
|
||||||
if now is None:
|
if now is None:
|
||||||
now = datetime.datetime.now()
|
now = self.get_now()
|
||||||
|
else:
|
||||||
|
now = ensure_utc_timezone(now)
|
||||||
return (not_after - now).days
|
return (not_after - now).days
|
||||||
|
|
||||||
def create_chain_matcher(self, criterium):
|
def create_chain_matcher(self, criterium):
|
||||||
|
|
|
@ -32,6 +32,7 @@ from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||||
get_now_datetime,
|
get_now_datetime,
|
||||||
get_relative_time_option,
|
get_relative_time_option,
|
||||||
remove_timezone,
|
remove_timezone,
|
||||||
|
UTC,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -85,31 +86,35 @@ def _parse_acme_timestamp(timestamp_str, with_timezone):
|
||||||
|
|
||||||
@six.add_metaclass(abc.ABCMeta)
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
class CryptoBackend(object):
|
class CryptoBackend(object):
|
||||||
def __init__(self, module):
|
def __init__(self, module, with_timezone=False):
|
||||||
self.module = module
|
self.module = module
|
||||||
|
self._with_timezone = with_timezone
|
||||||
|
|
||||||
def get_now(self):
|
def get_now(self):
|
||||||
return get_now_datetime(with_timezone=False)
|
return get_now_datetime(with_timezone=self._with_timezone)
|
||||||
|
|
||||||
def parse_acme_timestamp(self, timestamp_str):
|
def parse_acme_timestamp(self, timestamp_str):
|
||||||
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
|
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
|
||||||
return _parse_acme_timestamp(timestamp_str, with_timezone=False)
|
return _parse_acme_timestamp(timestamp_str, with_timezone=self._with_timezone)
|
||||||
|
|
||||||
def parse_module_parameter(self, value, name):
|
def parse_module_parameter(self, value, name):
|
||||||
try:
|
try:
|
||||||
return get_relative_time_option(value, name, backend='cryptography', with_timezone=False)
|
return get_relative_time_option(value, name, backend='cryptography', with_timezone=self._with_timezone)
|
||||||
except OpenSSLObjectError as exc:
|
except OpenSSLObjectError as exc:
|
||||||
raise BackendException(to_native(exc))
|
raise BackendException(to_native(exc))
|
||||||
|
|
||||||
def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
|
def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
|
||||||
start = get_epoch_seconds(timestamp_start)
|
start = get_epoch_seconds(timestamp_start)
|
||||||
end = get_epoch_seconds(timestamp_end)
|
end = get_epoch_seconds(timestamp_end)
|
||||||
return from_epoch_seconds(start + percentage * (end - start), with_timezone=False)
|
return from_epoch_seconds(start + percentage * (end - start), with_timezone=self._with_timezone)
|
||||||
|
|
||||||
def get_utc_datetime(self, *args, **kwargs):
|
def get_utc_datetime(self, *args, **kwargs):
|
||||||
result = datetime.datetime(*args, **kwargs)
|
kwargs_ext = dict(kwargs)
|
||||||
if 'tzinfo' in kwargs or len(args) >= 8:
|
if self._with_timezone and ('tzinfo' not in kwargs_ext and len(args) < 8):
|
||||||
result = remove_timezone(result)
|
kwargs_ext['tzinfo'] = UTC
|
||||||
|
result = datetime.datetime(*args, **kwargs_ext)
|
||||||
|
if self._with_timezone and ('tzinfo' in kwargs or len(args) >= 8):
|
||||||
|
result = ensure_utc_timezone(result)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
|
|
|
@ -12,21 +12,20 @@ from freezegun import freeze_time
|
||||||
|
|
||||||
from ansible_collections.community.crypto.tests.unit.compat.mock import MagicMock
|
from ansible_collections.community.crypto.tests.unit.compat.mock import MagicMock
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.time import UTC
|
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryptography import (
|
from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryptography import (
|
||||||
HAS_CURRENT_CRYPTOGRAPHY,
|
HAS_CURRENT_CRYPTOGRAPHY,
|
||||||
CryptographyBackend,
|
CryptographyBackend,
|
||||||
)
|
)
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
|
||||||
ensure_utc_timezone,
|
|
||||||
)
|
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||||
CRYPTOGRAPHY_TIMEZONE,
|
CRYPTOGRAPHY_TIMEZONE,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||||
|
ensure_utc_timezone,
|
||||||
|
UTC,
|
||||||
|
)
|
||||||
|
|
||||||
from .backend_data import (
|
from .backend_data import (
|
||||||
TEST_KEYS,
|
TEST_KEYS,
|
||||||
TEST_CSRS,
|
TEST_CSRS,
|
||||||
|
|
|
@ -17,6 +17,11 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.backend_open
|
||||||
OpenSSLCLIBackend,
|
OpenSSLCLIBackend,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||||
|
ensure_utc_timezone,
|
||||||
|
UTC,
|
||||||
|
)
|
||||||
|
|
||||||
from .backend_data import (
|
from .backend_data import (
|
||||||
TEST_KEYS,
|
TEST_KEYS,
|
||||||
TEST_CSRS,
|
TEST_CSRS,
|
||||||
|
@ -28,7 +33,7 @@ from .backend_data import (
|
||||||
TEST_INTERPOLATE_TIMESTAMP,
|
TEST_INTERPOLATE_TIMESTAMP,
|
||||||
)
|
)
|
||||||
|
|
||||||
from ..test_time import TIMEZONES
|
# from ..test_time import TIMEZONES
|
||||||
|
|
||||||
|
|
||||||
TEST_IPS = [
|
TEST_IPS = [
|
||||||
|
@ -94,20 +99,29 @@ def test_get_cert_information(cert_content, expected_cert_info, openssl_output,
|
||||||
module = MagicMock()
|
module = MagicMock()
|
||||||
module.run_command = MagicMock(return_value=(0, openssl_output, 0))
|
module.run_command = MagicMock(return_value=(0, openssl_output, 0))
|
||||||
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
|
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
|
||||||
|
|
||||||
|
expected_cert_info = expected_cert_info._replace(
|
||||||
|
not_valid_after=ensure_utc_timezone(expected_cert_info.not_valid_after),
|
||||||
|
not_valid_before=ensure_utc_timezone(expected_cert_info.not_valid_before),
|
||||||
|
)
|
||||||
|
|
||||||
cert_info = backend.get_cert_information(cert_filename=str(fn))
|
cert_info = backend.get_cert_information(cert_filename=str(fn))
|
||||||
assert cert_info == expected_cert_info
|
assert cert_info == expected_cert_info
|
||||||
cert_info = backend.get_cert_information(cert_content=cert_content)
|
cert_info = backend.get_cert_information(cert_content=cert_content)
|
||||||
assert cert_info == expected_cert_info
|
assert cert_info == expected_cert_info
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("timezone", TIMEZONES)
|
# @pytest.mark.parametrize("timezone", TIMEZONES)
|
||||||
|
# Due to a bug in freezegun (https://github.com/spulec/freezegun/issues/348, https://github.com/spulec/freezegun/issues/553)
|
||||||
|
# this only works with timezone = UTC if CRYPTOGRAPHY_TIMEZONE is truish
|
||||||
|
@pytest.mark.parametrize("timezone", [datetime.timedelta(hours=0)])
|
||||||
def test_now(timezone):
|
def test_now(timezone):
|
||||||
with freeze_time("2024-02-03 04:05:06", tz_offset=timezone):
|
with freeze_time("2024-02-03 04:05:06", tz_offset=timezone):
|
||||||
module = MagicMock()
|
module = MagicMock()
|
||||||
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
|
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
|
||||||
now = backend.get_now()
|
now = backend.get_now()
|
||||||
assert now.tzinfo is None
|
assert now.tzinfo is not None
|
||||||
assert now == datetime.datetime(2024, 2, 3, 4, 5, 6)
|
assert now == datetime.datetime(2024, 2, 3, 4, 5, 6, tzinfo=UTC)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("timezone, input, expected", TEST_PARSE_ACME_TIMESTAMP)
|
@pytest.mark.parametrize("timezone, input, expected", TEST_PARSE_ACME_TIMESTAMP)
|
||||||
|
|
Loading…
Reference in New Issue