Refactor time code, add tests, fix bug when parsing absolute timestamps that omit seconds (#745)
* Add time module utils. * Add time helpers to ACME backend. * Add changelog fragment. * ACME timestamp parser: do not choke on nanoseconds.pull/748/head
parent
9501a28a93
commit
0a15be1017
|
@ -0,0 +1,2 @@
|
|||
bugfixes:
|
||||
- "x509_crl, x509_certificate, x509_certificate_info - when parsing absolute timestamps which omitted the second count, the first digit of the minutes was used as a one-digit minutes count, and the second digit of the minutes as a one-digit second count (https://github.com/ansible-collections/community.crypto/pull/745)."
|
|
@ -11,6 +11,7 @@ __metaclass__ = type
|
|||
|
||||
import base64
|
||||
import binascii
|
||||
import datetime
|
||||
import os
|
||||
import traceback
|
||||
|
||||
|
@ -21,6 +22,7 @@ from ansible_collections.community.crypto.plugins.module_utils.version import Lo
|
|||
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
|
||||
CertificateInformation,
|
||||
CryptoBackend,
|
||||
_parse_acme_timestamp,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.acme.certificates import (
|
||||
|
@ -41,12 +43,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.math impor
|
|||
convert_int_to_hex,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||
get_now_datetime,
|
||||
ensure_utc_timezone,
|
||||
parse_name_field,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||
CRYPTOGRAPHY_TIMEZONE,
|
||||
cryptography_name_to_oid,
|
||||
|
@ -59,6 +55,18 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import
|
|||
extract_first_pem,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||
parse_name_field,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
ensure_utc_timezone,
|
||||
from_epoch_seconds,
|
||||
get_epoch_seconds,
|
||||
get_now_datetime,
|
||||
UTC,
|
||||
)
|
||||
|
||||
CRYPTOGRAPHY_MINIMAL_VERSION = '1.5'
|
||||
|
||||
CRYPTOGRAPHY_ERROR = None
|
||||
|
@ -173,6 +181,26 @@ class CryptographyBackend(CryptoBackend):
|
|||
def __init__(self, module):
|
||||
super(CryptographyBackend, self).__init__(module)
|
||||
|
||||
def get_now(self):
|
||||
return get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
|
||||
|
||||
def parse_acme_timestamp(self, timestamp_str):
|
||||
return _parse_acme_timestamp(timestamp_str, with_timezone=CRYPTOGRAPHY_TIMEZONE)
|
||||
|
||||
def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
|
||||
start = get_epoch_seconds(timestamp_start)
|
||||
end = get_epoch_seconds(timestamp_end)
|
||||
return from_epoch_seconds(start + percentage * (end - start), with_timezone=CRYPTOGRAPHY_TIMEZONE)
|
||||
|
||||
def get_utc_datetime(self, *args, **kwargs):
|
||||
kwargs_ext = dict(kwargs)
|
||||
if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' not in kwargs_ext and len(args) < 8):
|
||||
kwargs_ext['tzinfo'] = UTC
|
||||
result = datetime.datetime(*args, **kwargs_ext)
|
||||
if CRYPTOGRAPHY_TIMEZONE and ('tzinfo' in kwargs or len(args) >= 8):
|
||||
result = ensure_utc_timezone(result)
|
||||
return result
|
||||
|
||||
def parse_key(self, key_file=None, key_content=None, passphrase=None):
|
||||
'''
|
||||
Parses an RSA or Elliptic Curve key file in PEM format and returns key_data.
|
||||
|
@ -379,7 +407,7 @@ class CryptographyBackend(CryptoBackend):
|
|||
raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e))
|
||||
|
||||
if now is None:
|
||||
now = get_now_datetime(with_timezone=CRYPTOGRAPHY_TIMEZONE)
|
||||
now = self.get_now()
|
||||
elif CRYPTOGRAPHY_TIMEZONE:
|
||||
now = ensure_utc_timezone(now)
|
||||
return (get_not_valid_after(cert) - now).days
|
||||
|
|
|
@ -11,6 +11,8 @@ __metaclass__ = type
|
|||
|
||||
from collections import namedtuple
|
||||
import abc
|
||||
import datetime
|
||||
import re
|
||||
|
||||
from ansible.module_utils import six
|
||||
|
||||
|
@ -18,6 +20,14 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor
|
|||
BackendException,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
ensure_utc_timezone,
|
||||
from_epoch_seconds,
|
||||
get_epoch_seconds,
|
||||
get_now_datetime,
|
||||
remove_timezone,
|
||||
)
|
||||
|
||||
|
||||
CertificateInformation = namedtuple(
|
||||
'CertificateInformation',
|
||||
|
@ -31,11 +41,65 @@ CertificateInformation = namedtuple(
|
|||
)
|
||||
|
||||
|
||||
_FRACTIONAL_MATCHER = re.compile(r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(|\.\d+)(Z|[+-]\d{2}:?\d{2}.*)$')
|
||||
|
||||
|
||||
def _reduce_fractional_digits(timestamp_str):
|
||||
"""
|
||||
Given a RFC 3339 timestamp that includes too many digits for the fractional seconds part, reduces these to at most 6.
|
||||
"""
|
||||
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
|
||||
m = _FRACTIONAL_MATCHER.match(timestamp_str)
|
||||
if not m:
|
||||
raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str))
|
||||
timestamp, fractional, timezone = m.groups()
|
||||
if len(fractional) > 7:
|
||||
# Python does not support anything smaller than microseconds
|
||||
# (Golang supports nanoseconds, Boulder often emits more fractional digits, which Python chokes on)
|
||||
fractional = fractional[:7]
|
||||
return '%s%s%s' % (timestamp, fractional, timezone)
|
||||
|
||||
|
||||
def _parse_acme_timestamp(timestamp_str, with_timezone):
|
||||
"""
|
||||
Parses a RFC 3339 timestamp.
|
||||
"""
|
||||
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
|
||||
timestamp_str = _reduce_fractional_digits(timestamp_str)
|
||||
for format in ('%Y-%m-%dT%H:%M:%SZ', '%Y-%m-%dT%H:%M:%S.%fZ', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%dT%H:%M:%S.%f%z'):
|
||||
# Note that %z won't work with Python 2... https://stackoverflow.com/a/27829491
|
||||
try:
|
||||
result = datetime.datetime.strptime(timestamp_str, format)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
return ensure_utc_timezone(result) if with_timezone else remove_timezone(result)
|
||||
raise BackendException('Cannot parse ISO 8601 timestamp {0!r}'.format(timestamp_str))
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CryptoBackend(object):
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
|
||||
def get_now(self):
|
||||
return get_now_datetime(with_timezone=False)
|
||||
|
||||
def parse_acme_timestamp(self, timestamp_str):
|
||||
# RFC 3339 (https://www.rfc-editor.org/info/rfc3339)
|
||||
return _parse_acme_timestamp(timestamp_str, with_timezone=False)
|
||||
|
||||
def interpolate_timestamp(self, timestamp_start, timestamp_end, percentage):
|
||||
start = get_epoch_seconds(timestamp_start)
|
||||
end = get_epoch_seconds(timestamp_end)
|
||||
return from_epoch_seconds(start + percentage * (end - start), with_timezone=False)
|
||||
|
||||
def get_utc_datetime(self, *args, **kwargs):
|
||||
result = datetime.datetime(*args, **kwargs)
|
||||
if 'tzinfo' in kwargs or len(args) >= 8:
|
||||
result = remove_timezone(result)
|
||||
return result
|
||||
|
||||
@abc.abstractmethod
|
||||
def parse_key(self, key_file=None, key_content=None, passphrase=None):
|
||||
'''
|
||||
|
|
|
@ -22,7 +22,7 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor
|
|||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_int_to_bytes
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import get_now_datetime
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import get_now_datetime
|
||||
|
||||
|
||||
def nopad_b64(data):
|
||||
|
|
|
@ -18,8 +18,6 @@ from ansible_collections.community.crypto.plugins.module_utils.ecs.api import EC
|
|||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||
load_certificate,
|
||||
get_now_datetime,
|
||||
get_relative_time_option,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||
|
@ -34,6 +32,11 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
|
|||
CertificateProvider,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
get_now_datetime,
|
||||
get_relative_time_option,
|
||||
)
|
||||
|
||||
try:
|
||||
from cryptography.x509.oid import NameOID
|
||||
except ImportError:
|
||||
|
|
|
@ -23,7 +23,6 @@ from ansible_collections.community.crypto.plugins.module_utils.version import Lo
|
|||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||
load_certificate,
|
||||
get_fingerprint_of_bytes,
|
||||
get_now_datetime,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||
|
@ -40,6 +39,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
|
|||
get_publickey_info,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
get_now_datetime,
|
||||
)
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
|
|
|
@ -22,7 +22,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.basic impo
|
|||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||
load_privatekey,
|
||||
load_certificate,
|
||||
get_relative_time_option,
|
||||
select_message_digest,
|
||||
)
|
||||
|
||||
|
@ -44,6 +43,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
|
|||
CertificateProvider,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
get_relative_time_option,
|
||||
)
|
||||
|
||||
try:
|
||||
import cryptography
|
||||
from cryptography import x509
|
||||
|
|
|
@ -14,7 +14,6 @@ import os
|
|||
from random import randrange
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||
get_relative_time_option,
|
||||
select_message_digest,
|
||||
)
|
||||
|
||||
|
@ -34,6 +33,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
|
|||
CertificateProvider,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
get_relative_time_option,
|
||||
)
|
||||
|
||||
try:
|
||||
import cryptography
|
||||
from cryptography import x509
|
||||
|
|
|
@ -9,19 +9,25 @@ __metaclass__ = type
|
|||
|
||||
|
||||
import abc
|
||||
import datetime
|
||||
import errno
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.common.text.converters import to_native, to_bytes
|
||||
from ansible.module_utils.common.text.converters import to_bytes
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
|
||||
identify_pem_format,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import ( # noqa: F401, pylint: disable=unused-import
|
||||
# These imports are for backwards compatibility
|
||||
get_now_datetime,
|
||||
ensure_utc_timezone,
|
||||
convert_relative_to_datetime,
|
||||
get_relative_time_option,
|
||||
)
|
||||
|
||||
try:
|
||||
from OpenSSL import crypto
|
||||
HAS_PYOPENSSL = True
|
||||
|
@ -279,86 +285,6 @@ def parse_ordered_name_field(input_list, name_field_name):
|
|||
return result
|
||||
|
||||
|
||||
def get_now_datetime(with_timezone):
|
||||
if with_timezone:
|
||||
return datetime.datetime.now(tz=datetime.timezone.utc)
|
||||
return datetime.datetime.utcnow()
|
||||
|
||||
|
||||
def ensure_utc_timezone(timestamp):
|
||||
if timestamp.tzinfo is not None:
|
||||
return timestamp
|
||||
return timestamp.astimezone(datetime.timezone.utc)
|
||||
|
||||
|
||||
def convert_relative_to_datetime(relative_time_string, with_timezone=False):
|
||||
"""Get a datetime.datetime or None from a string in the time format described in sshd_config(5)"""
|
||||
|
||||
parsed_result = re.match(
|
||||
r"^(?P<prefix>[+-])((?P<weeks>\d+)[wW])?((?P<days>\d+)[dD])?((?P<hours>\d+)[hH])?((?P<minutes>\d+)[mM])?((?P<seconds>\d+)[sS]?)?$",
|
||||
relative_time_string)
|
||||
|
||||
if parsed_result is None or len(relative_time_string) == 1:
|
||||
# not matched or only a single "+" or "-"
|
||||
return None
|
||||
|
||||
offset = datetime.timedelta(0)
|
||||
if parsed_result.group("weeks") is not None:
|
||||
offset += datetime.timedelta(weeks=int(parsed_result.group("weeks")))
|
||||
if parsed_result.group("days") is not None:
|
||||
offset += datetime.timedelta(days=int(parsed_result.group("days")))
|
||||
if parsed_result.group("hours") is not None:
|
||||
offset += datetime.timedelta(hours=int(parsed_result.group("hours")))
|
||||
if parsed_result.group("minutes") is not None:
|
||||
offset += datetime.timedelta(
|
||||
minutes=int(parsed_result.group("minutes")))
|
||||
if parsed_result.group("seconds") is not None:
|
||||
offset += datetime.timedelta(
|
||||
seconds=int(parsed_result.group("seconds")))
|
||||
|
||||
now = get_now_datetime(with_timezone=with_timezone)
|
||||
if parsed_result.group("prefix") == "+":
|
||||
return now + offset
|
||||
else:
|
||||
return now - offset
|
||||
|
||||
|
||||
def get_relative_time_option(input_string, input_name, backend='cryptography', with_timezone=False):
|
||||
"""Return an absolute timespec if a relative timespec or an ASN1 formatted
|
||||
string is provided.
|
||||
|
||||
The return value will be a datetime object for the cryptography backend,
|
||||
and a ASN1 formatted string for the pyopenssl backend."""
|
||||
result = to_native(input_string)
|
||||
if result is None:
|
||||
raise OpenSSLObjectError(
|
||||
'The timespec "%s" for %s is not valid' %
|
||||
input_string, input_name)
|
||||
# Relative time
|
||||
if result.startswith("+") or result.startswith("-"):
|
||||
result_datetime = convert_relative_to_datetime(result, with_timezone=with_timezone)
|
||||
if backend == 'pyopenssl':
|
||||
return result_datetime.strftime("%Y%m%d%H%M%SZ")
|
||||
elif backend == 'cryptography':
|
||||
return result_datetime
|
||||
# Absolute time
|
||||
if backend == 'cryptography':
|
||||
for date_fmt in ['%Y%m%d%H%M%SZ', '%Y%m%d%H%MZ', '%Y%m%d%H%M%S%z', '%Y%m%d%H%M%z']:
|
||||
try:
|
||||
res = datetime.datetime.strptime(result, date_fmt)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
if with_timezone:
|
||||
res = res.astimezone(datetime.timezone.utc)
|
||||
return res
|
||||
|
||||
raise OpenSSLObjectError(
|
||||
'The time spec "%s" for %s is invalid' %
|
||||
(input_string, input_name)
|
||||
)
|
||||
|
||||
|
||||
def select_message_digest(digest_string):
|
||||
digest = None
|
||||
if digest_string == 'sha256':
|
||||
|
|
|
@ -31,11 +31,15 @@ from hashlib import sha256
|
|||
|
||||
from ansible.module_utils import six
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime
|
||||
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
|
||||
OpensshParser,
|
||||
_OpensshWriter,
|
||||
)
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
add_or_remove_timezone as _add_or_remove_timezone,
|
||||
convert_relative_to_datetime,
|
||||
UTC as _UTC,
|
||||
)
|
||||
|
||||
# See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD
|
||||
_USER_TYPE = 1
|
||||
|
@ -66,14 +70,8 @@ _ECDSA_CURVE_IDENTIFIERS_LOOKUP = {
|
|||
_USE_TIMEZONE = sys.version_info >= (3, 6)
|
||||
|
||||
|
||||
def _ensure_utc_timezone_if_use_timezone(value):
|
||||
if not _USE_TIMEZONE or value.tzinfo is not None:
|
||||
return value
|
||||
return value.astimezone(_datetime.timezone.utc)
|
||||
|
||||
|
||||
_ALWAYS = _ensure_utc_timezone_if_use_timezone(datetime(1970, 1, 1))
|
||||
_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _datetime.timezone.utc) if _USE_TIMEZONE else datetime.max
|
||||
_ALWAYS = _add_or_remove_timezone(datetime(1970, 1, 1), with_timezone=_USE_TIMEZONE)
|
||||
_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _UTC) if _USE_TIMEZONE else datetime.max
|
||||
|
||||
_CRITICAL_OPTIONS = (
|
||||
'force-command',
|
||||
|
@ -198,7 +196,7 @@ class OpensshCertificateTimeParameters(object):
|
|||
else:
|
||||
for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
|
||||
try:
|
||||
result = _ensure_utc_timezone_if_use_timezone(datetime.strptime(time_string, time_format))
|
||||
result = _add_or_remove_timezone(datetime.strptime(time_string, time_format), with_timezone=_USE_TIMEZONE)
|
||||
except ValueError:
|
||||
pass
|
||||
if result is None:
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (c) 2024, Felix Fontein <felix@fontein.de>
|
||||
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
import datetime
|
||||
import re
|
||||
import sys
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_native
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
||||
OpenSSLObjectError,
|
||||
)
|
||||
|
||||
|
||||
try:
|
||||
UTC = datetime.timezone.utc
|
||||
except AttributeError:
|
||||
_DURATION_ZERO = datetime.timedelta(0)
|
||||
|
||||
class _UTCClass(datetime.tzinfo):
|
||||
def utcoffset(self, dt):
|
||||
return _DURATION_ZERO
|
||||
|
||||
def dst(self, dt):
|
||||
return _DURATION_ZERO
|
||||
|
||||
def tzname(self, dt):
|
||||
return 'UTC'
|
||||
|
||||
def fromutc(self, dt):
|
||||
return dt
|
||||
|
||||
def __repr__(self):
|
||||
return 'UTC'
|
||||
|
||||
UTC = _UTCClass()
|
||||
|
||||
|
||||
def get_now_datetime(with_timezone):
|
||||
if with_timezone:
|
||||
return datetime.datetime.now(tz=UTC)
|
||||
return datetime.datetime.utcnow()
|
||||
|
||||
|
||||
def ensure_utc_timezone(timestamp):
|
||||
if timestamp.tzinfo is UTC:
|
||||
return timestamp
|
||||
if timestamp.tzinfo is None:
|
||||
# We assume that naive datetime objects use timezone UTC!
|
||||
return timestamp.replace(tzinfo=UTC)
|
||||
return timestamp.astimezone(UTC)
|
||||
|
||||
|
||||
def remove_timezone(timestamp):
|
||||
# Convert to native datetime object
|
||||
if timestamp.tzinfo is None:
|
||||
return timestamp
|
||||
if timestamp.tzinfo is not UTC:
|
||||
timestamp = timestamp.astimezone(UTC)
|
||||
return timestamp.replace(tzinfo=None)
|
||||
|
||||
|
||||
def add_or_remove_timezone(timestamp, with_timezone):
|
||||
return ensure_utc_timezone(timestamp) if with_timezone else remove_timezone(timestamp)
|
||||
|
||||
|
||||
if sys.version_info < (3, 3):
|
||||
def get_epoch_seconds(timestamp):
|
||||
epoch = datetime.datetime(1970, 1, 1, tzinfo=UTC if timestamp.tzinfo is not None else None)
|
||||
delta = timestamp - epoch
|
||||
try:
|
||||
return delta.total_seconds()
|
||||
except AttributeError:
|
||||
# Python 2.6 and earlier: total_seconds() does not yet exist, so we use the formula from
|
||||
# https://docs.python.org/2/library/datetime.html#datetime.timedelta.total_seconds
|
||||
return (delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10**6) / 10**6
|
||||
else:
|
||||
def get_epoch_seconds(timestamp):
|
||||
return timestamp.timestamp()
|
||||
|
||||
|
||||
def from_epoch_seconds(timestamp, with_timezone):
|
||||
if with_timezone:
|
||||
return datetime.datetime.fromtimestamp(timestamp, UTC)
|
||||
return datetime.datetime.utcfromtimestamp(timestamp)
|
||||
|
||||
|
||||
def convert_relative_to_datetime(relative_time_string, with_timezone=False, now=None):
|
||||
"""Get a datetime.datetime or None from a string in the time format described in sshd_config(5)"""
|
||||
|
||||
parsed_result = re.match(
|
||||
r"^(?P<prefix>[+-])((?P<weeks>\d+)[wW])?((?P<days>\d+)[dD])?((?P<hours>\d+)[hH])?((?P<minutes>\d+)[mM])?((?P<seconds>\d+)[sS]?)?$",
|
||||
relative_time_string)
|
||||
|
||||
if parsed_result is None or len(relative_time_string) == 1:
|
||||
# not matched or only a single "+" or "-"
|
||||
return None
|
||||
|
||||
offset = datetime.timedelta(0)
|
||||
if parsed_result.group("weeks") is not None:
|
||||
offset += datetime.timedelta(weeks=int(parsed_result.group("weeks")))
|
||||
if parsed_result.group("days") is not None:
|
||||
offset += datetime.timedelta(days=int(parsed_result.group("days")))
|
||||
if parsed_result.group("hours") is not None:
|
||||
offset += datetime.timedelta(hours=int(parsed_result.group("hours")))
|
||||
if parsed_result.group("minutes") is not None:
|
||||
offset += datetime.timedelta(
|
||||
minutes=int(parsed_result.group("minutes")))
|
||||
if parsed_result.group("seconds") is not None:
|
||||
offset += datetime.timedelta(
|
||||
seconds=int(parsed_result.group("seconds")))
|
||||
|
||||
if now is None:
|
||||
now = get_now_datetime(with_timezone=with_timezone)
|
||||
else:
|
||||
now = add_or_remove_timezone(now, with_timezone=with_timezone)
|
||||
|
||||
if parsed_result.group("prefix") == "+":
|
||||
return now + offset
|
||||
else:
|
||||
return now - offset
|
||||
|
||||
|
||||
def get_relative_time_option(input_string, input_name, backend='cryptography', with_timezone=False, now=None):
|
||||
"""Return an absolute timespec if a relative timespec or an ASN1 formatted
|
||||
string is provided.
|
||||
|
||||
The return value will be a datetime object for the cryptography backend,
|
||||
and a ASN1 formatted string for the pyopenssl backend."""
|
||||
result = to_native(input_string)
|
||||
if result is None:
|
||||
raise OpenSSLObjectError(
|
||||
'The timespec "%s" for %s is not valid' %
|
||||
input_string, input_name)
|
||||
# Relative time
|
||||
if result.startswith("+") or result.startswith("-"):
|
||||
result_datetime = convert_relative_to_datetime(result, with_timezone=with_timezone, now=now)
|
||||
if backend == 'pyopenssl':
|
||||
return result_datetime.strftime("%Y%m%d%H%M%SZ")
|
||||
elif backend == 'cryptography':
|
||||
return result_datetime
|
||||
# Absolute time
|
||||
if backend == 'pyopenssl':
|
||||
return input_string
|
||||
elif backend == 'cryptography':
|
||||
for date_fmt, length in [
|
||||
('%Y%m%d%H%M%SZ', 15), # this also parses '202401020304Z', but as datetime(2024, 1, 2, 3, 0, 4)
|
||||
('%Y%m%d%H%MZ', 13),
|
||||
('%Y%m%d%H%M%S%z', 14 + 5), # this also parses '202401020304+0000', but as datetime(2024, 1, 2, 3, 0, 4, tzinfo=...)
|
||||
('%Y%m%d%H%M%z', 12 + 5),
|
||||
]:
|
||||
if len(result) != length:
|
||||
continue
|
||||
try:
|
||||
res = datetime.datetime.strptime(result, date_fmt)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
return add_or_remove_timezone(res, with_timezone=with_timezone)
|
||||
|
||||
raise OpenSSLObjectError(
|
||||
'The time spec "%s" for %s is invalid' %
|
||||
(input_string, input_name)
|
||||
)
|
|
@ -165,16 +165,16 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.io import (
|
|||
read_file,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||
get_now_datetime,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||
CRYPTOGRAPHY_TIMEZONE,
|
||||
set_not_valid_after,
|
||||
set_not_valid_before,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
get_now_datetime,
|
||||
)
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
try:
|
||||
import cryptography
|
||||
|
|
|
@ -220,10 +220,6 @@ from ansible.module_utils.common.text.converters import to_bytes
|
|||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||
get_now_datetime,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||
CRYPTOGRAPHY_TIMEZONE,
|
||||
cryptography_oid_to_name,
|
||||
|
@ -232,6 +228,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptograp
|
|||
get_not_valid_before,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
get_now_datetime,
|
||||
)
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = '1.6'
|
||||
|
||||
CREATE_DEFAULT_CONTEXT_IMP_ERR = None
|
||||
|
|
|
@ -406,10 +406,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.basic impo
|
|||
OpenSSLObjectError,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||
get_relative_time_option,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
|
||||
CRYPTOGRAPHY_TIMEZONE,
|
||||
)
|
||||
|
@ -418,6 +414,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
|
|||
select_backend,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
get_relative_time_option,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
module = AnsibleModule(
|
||||
|
|
|
@ -470,7 +470,6 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
|
|||
load_certificate,
|
||||
parse_name_field,
|
||||
parse_ordered_name_field,
|
||||
get_relative_time_option,
|
||||
select_message_digest,
|
||||
)
|
||||
|
||||
|
@ -506,6 +505,10 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.module_bac
|
|||
get_crl_info,
|
||||
)
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
get_relative_time_option,
|
||||
)
|
||||
|
||||
MINIMAL_CRYPTOGRAPHY_VERSION = '1.2'
|
||||
|
||||
CRYPTOGRAPHY_IMP_ERR = None
|
||||
|
|
|
@ -9,6 +9,7 @@ __metaclass__ = type
|
|||
import base64
|
||||
import datetime
|
||||
import os
|
||||
import sys
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
|
||||
CertificateInformation,
|
||||
|
@ -107,6 +108,56 @@ TEST_CERT_INFO = [
|
|||
]
|
||||
|
||||
|
||||
TEST_PARSE_ACME_TIMESTAMP = [
|
||||
(
|
||||
'2024-01-01T00:11:22Z',
|
||||
dict(year=2024, month=1, day=1, hour=0, minute=11, second=22),
|
||||
),
|
||||
(
|
||||
'2024-01-01T00:11:22.123Z',
|
||||
dict(year=2024, month=1, day=1, hour=0, minute=11, second=22, microsecond=123000),
|
||||
),
|
||||
(
|
||||
'2024-04-17T06:54:13.333333334Z',
|
||||
dict(year=2024, month=4, day=17, hour=6, minute=54, second=13, microsecond=333333),
|
||||
),
|
||||
]
|
||||
|
||||
if sys.version_info >= (3, 5):
|
||||
TEST_PARSE_ACME_TIMESTAMP.extend([
|
||||
(
|
||||
'2024-01-01T00:11:22+0100',
|
||||
dict(year=2023, month=12, day=31, hour=23, minute=11, second=22),
|
||||
),
|
||||
(
|
||||
'2024-01-01T00:11:22.123+0100',
|
||||
dict(year=2023, month=12, day=31, hour=23, minute=11, second=22, microsecond=123000),
|
||||
),
|
||||
])
|
||||
|
||||
|
||||
TEST_INTERPOLATE_TIMESTAMP = [
|
||||
(
|
||||
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
|
||||
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
|
||||
0.0,
|
||||
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
|
||||
),
|
||||
(
|
||||
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
|
||||
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
|
||||
0.5,
|
||||
dict(year=2024, month=1, day=1, hour=0, minute=30, second=0),
|
||||
),
|
||||
(
|
||||
dict(year=2024, month=1, day=1, hour=0, minute=0, second=0),
|
||||
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
|
||||
1.0,
|
||||
dict(year=2024, month=1, day=1, hour=1, minute=0, second=0),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
class FakeBackend(CryptoBackend):
|
||||
def parse_key(self, key_file=None, key_content=None, passphrase=None):
|
||||
raise BackendException('Not implemented in fake backend')
|
||||
|
|
|
@ -30,6 +30,8 @@ from .backend_data import (
|
|||
TEST_CERT,
|
||||
TEST_CERT_DAYS,
|
||||
TEST_CERT_INFO,
|
||||
TEST_PARSE_ACME_TIMESTAMP,
|
||||
TEST_INTERPOLATE_TIMESTAMP,
|
||||
)
|
||||
|
||||
|
||||
|
@ -92,3 +94,30 @@ def test_get_cert_information(cert_content, expected_cert_info, openssl_output,
|
|||
assert cert_info == expected_cert_info
|
||||
cert_info = backend.get_cert_information(cert_content=cert_content)
|
||||
assert cert_info == expected_cert_info
|
||||
|
||||
|
||||
def test_now():
|
||||
module = MagicMock()
|
||||
backend = CryptographyBackend(module)
|
||||
now = backend.get_now()
|
||||
assert CRYPTOGRAPHY_TIMEZONE == (now.tzinfo is not None)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input, expected", TEST_PARSE_ACME_TIMESTAMP)
|
||||
def test_parse_acme_timestamp(input, expected):
|
||||
module = MagicMock()
|
||||
backend = CryptographyBackend(module)
|
||||
ts_expected = backend.get_utc_datetime(**expected)
|
||||
timestamp = backend.parse_acme_timestamp(input)
|
||||
assert ts_expected == timestamp
|
||||
|
||||
|
||||
@pytest.mark.parametrize("start, end, percentage, expected", TEST_INTERPOLATE_TIMESTAMP)
|
||||
def test_interpolate_timestamp(start, end, percentage, expected):
|
||||
module = MagicMock()
|
||||
backend = CryptographyBackend(module)
|
||||
ts_start = backend.get_utc_datetime(**start)
|
||||
ts_end = backend.get_utc_datetime(**end)
|
||||
ts_expected = backend.get_utc_datetime(**expected)
|
||||
timestamp = backend.interpolate_timestamp(ts_start, ts_end, percentage)
|
||||
assert ts_expected == timestamp
|
||||
|
|
|
@ -22,6 +22,8 @@ from .backend_data import (
|
|||
TEST_CERT_OPENSSL_OUTPUT,
|
||||
TEST_CERT_DAYS,
|
||||
TEST_CERT_INFO,
|
||||
TEST_PARSE_ACME_TIMESTAMP,
|
||||
TEST_INTERPOLATE_TIMESTAMP,
|
||||
)
|
||||
|
||||
|
||||
|
@ -91,3 +93,30 @@ def test_get_cert_information(cert_content, expected_cert_info, openssl_output,
|
|||
assert cert_info == expected_cert_info
|
||||
cert_info = backend.get_cert_information(cert_content=cert_content)
|
||||
assert cert_info == expected_cert_info
|
||||
|
||||
|
||||
def test_now():
|
||||
module = MagicMock()
|
||||
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
|
||||
now = backend.get_now()
|
||||
assert now.tzinfo is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input, expected", TEST_PARSE_ACME_TIMESTAMP)
|
||||
def test_parse_acme_timestamp(input, expected):
|
||||
module = MagicMock()
|
||||
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
|
||||
ts_expected = backend.get_utc_datetime(**expected)
|
||||
timestamp = backend.parse_acme_timestamp(input)
|
||||
assert ts_expected == timestamp
|
||||
|
||||
|
||||
@pytest.mark.parametrize("start, end, percentage, expected", TEST_INTERPOLATE_TIMESTAMP)
|
||||
def test_interpolate_timestamp(start, end, percentage, expected):
|
||||
module = MagicMock()
|
||||
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
|
||||
ts_start = backend.get_utc_datetime(**start)
|
||||
ts_end = backend.get_utc_datetime(**end)
|
||||
ts_expected = backend.get_utc_datetime(**expected)
|
||||
timestamp = backend.interpolate_timestamp(ts_start, ts_end, percentage)
|
||||
assert ts_expected == timestamp
|
||||
|
|
|
@ -0,0 +1,323 @@
|
|||
# Copyright (c) Ansible Project
|
||||
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
import datetime
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
from ansible_collections.community.crypto.plugins.module_utils.time import (
|
||||
add_or_remove_timezone,
|
||||
get_now_datetime,
|
||||
convert_relative_to_datetime,
|
||||
ensure_utc_timezone,
|
||||
from_epoch_seconds,
|
||||
get_epoch_seconds,
|
||||
get_relative_time_option,
|
||||
remove_timezone,
|
||||
UTC,
|
||||
)
|
||||
|
||||
|
||||
TEST_REMOVE_TIMEZONE = [
|
||||
(
|
||||
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC),
|
||||
datetime.datetime(2024, 1, 1, 0, 1, 2),
|
||||
),
|
||||
(
|
||||
datetime.datetime(2024, 1, 1, 0, 1, 2),
|
||||
datetime.datetime(2024, 1, 1, 0, 1, 2),
|
||||
),
|
||||
]
|
||||
|
||||
TEST_UTC_TIMEZONE = [
|
||||
(
|
||||
datetime.datetime(2024, 1, 1, 0, 1, 2),
|
||||
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC),
|
||||
),
|
||||
(
|
||||
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC),
|
||||
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=UTC),
|
||||
),
|
||||
]
|
||||
|
||||
TEST_EPOCH_SECONDS = [
|
||||
(0, dict(year=1970, day=1, month=1, hour=0, minute=0, second=0, microsecond=0)),
|
||||
(1E-6, dict(year=1970, day=1, month=1, hour=0, minute=0, second=0, microsecond=1)),
|
||||
(1E-3, dict(year=1970, day=1, month=1, hour=0, minute=0, second=0, microsecond=1000)),
|
||||
(3691.2, dict(year=1970, day=1, month=1, hour=1, minute=1, second=31, microsecond=200000)),
|
||||
]
|
||||
|
||||
TEST_EPOCH_TO_SECONDS = [
|
||||
(datetime.datetime(1970, 1, 1, 0, 1, 2, 0), 62),
|
||||
(datetime.datetime(1970, 1, 1, 0, 1, 2, 0, tzinfo=UTC), 62),
|
||||
]
|
||||
|
||||
TEST_CONVERT_RELATIVE_TO_DATETIME = [
|
||||
(
|
||||
'+0',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
),
|
||||
(
|
||||
'+1s',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC),
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 1),
|
||||
),
|
||||
(
|
||||
'-10w20d30h40m50s',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC),
|
||||
datetime.datetime(2023, 10, 1, 17, 19, 10),
|
||||
),
|
||||
(
|
||||
'+0',
|
||||
True,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC),
|
||||
),
|
||||
(
|
||||
'+1s',
|
||||
True,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC),
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 1, tzinfo=UTC),
|
||||
),
|
||||
(
|
||||
'-10w20d30h40m50s',
|
||||
True,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2023, 10, 1, 17, 19, 10, tzinfo=UTC),
|
||||
),
|
||||
]
|
||||
|
||||
TEST_GET_RELATIVE_TIME_OPTION = [
|
||||
(
|
||||
'+1d2h3m4s',
|
||||
'foo',
|
||||
'cryptography',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 2, 2, 3, 4),
|
||||
),
|
||||
(
|
||||
'-1w10d24h',
|
||||
'foo',
|
||||
'cryptography',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2023, 12, 14, 0, 0, 0),
|
||||
),
|
||||
(
|
||||
'20240102040506Z',
|
||||
'foo',
|
||||
'cryptography',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 2, 4, 5, 6),
|
||||
),
|
||||
(
|
||||
'202401020405Z',
|
||||
'foo',
|
||||
'cryptography',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 2, 4, 5, 0),
|
||||
),
|
||||
(
|
||||
'+1d2h3m4s',
|
||||
'foo',
|
||||
'cryptography',
|
||||
True,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 2, 2, 3, 4, tzinfo=UTC),
|
||||
),
|
||||
(
|
||||
'-1w10d24h',
|
||||
'foo',
|
||||
'cryptography',
|
||||
True,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2023, 12, 14, 0, 0, 0, tzinfo=UTC),
|
||||
),
|
||||
(
|
||||
'20240102040506Z',
|
||||
'foo',
|
||||
'cryptography',
|
||||
True,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 2, 4, 5, 6, tzinfo=UTC),
|
||||
),
|
||||
(
|
||||
'202401020405Z',
|
||||
'foo',
|
||||
'cryptography',
|
||||
True,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 2, 4, 5, 0, tzinfo=UTC),
|
||||
),
|
||||
(
|
||||
'+1d2h3m4s',
|
||||
'foo',
|
||||
'pyopenssl',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
'20240102020304Z',
|
||||
),
|
||||
(
|
||||
'-1w10d24h',
|
||||
'foo',
|
||||
'pyopenssl',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
'20231214000000Z',
|
||||
),
|
||||
(
|
||||
'20240102040506Z',
|
||||
'foo',
|
||||
'pyopenssl',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
'20240102040506Z',
|
||||
),
|
||||
(
|
||||
'202401020405Z',
|
||||
'foo',
|
||||
'pyopenssl',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
'202401020405Z',
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
if sys.version_info >= (3, 5):
|
||||
ONE_HOUR_PLUS = datetime.timezone(datetime.timedelta(hours=1))
|
||||
|
||||
TEST_REMOVE_TIMEZONE.extend([
|
||||
(
|
||||
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=ONE_HOUR_PLUS),
|
||||
datetime.datetime(2023, 12, 31, 23, 1, 2),
|
||||
),
|
||||
])
|
||||
TEST_UTC_TIMEZONE.extend([
|
||||
(
|
||||
datetime.datetime(2024, 1, 1, 0, 1, 2, tzinfo=ONE_HOUR_PLUS),
|
||||
datetime.datetime(2023, 12, 31, 23, 1, 2, tzinfo=UTC),
|
||||
),
|
||||
])
|
||||
TEST_EPOCH_TO_SECONDS.extend([
|
||||
(datetime.datetime(1970, 1, 1, 0, 1, 2, 0, tzinfo=ONE_HOUR_PLUS), 62 - 3600),
|
||||
])
|
||||
TEST_GET_RELATIVE_TIME_OPTION.extend([
|
||||
(
|
||||
'20240102040506+0100',
|
||||
'foo',
|
||||
'cryptography',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 2, 3, 5, 6),
|
||||
),
|
||||
(
|
||||
'202401020405+0100',
|
||||
'foo',
|
||||
'cryptography',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 2, 3, 5, 0),
|
||||
),
|
||||
(
|
||||
'20240102040506+0100',
|
||||
'foo',
|
||||
'cryptography',
|
||||
True,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 2, 3, 5, 6, tzinfo=UTC),
|
||||
),
|
||||
(
|
||||
'202401020405+0100',
|
||||
'foo',
|
||||
'cryptography',
|
||||
True,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
datetime.datetime(2024, 1, 2, 3, 5, 0, tzinfo=UTC),
|
||||
),
|
||||
(
|
||||
'20240102040506+0100',
|
||||
'foo',
|
||||
'pyopenssl',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
'20240102040506+0100',
|
||||
),
|
||||
(
|
||||
'202401020405+0100',
|
||||
'foo',
|
||||
'pyopenssl',
|
||||
False,
|
||||
datetime.datetime(2024, 1, 1, 0, 0, 0),
|
||||
'202401020405+0100',
|
||||
),
|
||||
])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input, expected", TEST_REMOVE_TIMEZONE)
|
||||
def test_remove_timezone(input, expected):
|
||||
output_1 = remove_timezone(input)
|
||||
assert expected == output_1
|
||||
output_2 = add_or_remove_timezone(input, with_timezone=False)
|
||||
assert expected == output_2
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input, expected", TEST_UTC_TIMEZONE)
|
||||
def test_utc_timezone(input, expected):
|
||||
output_1 = ensure_utc_timezone(input)
|
||||
assert expected == output_1
|
||||
output_2 = add_or_remove_timezone(input, with_timezone=True)
|
||||
assert expected == output_2
|
||||
|
||||
|
||||
def test_get_now_datetime():
|
||||
output_1 = get_now_datetime(with_timezone=False)
|
||||
assert output_1.tzinfo is None
|
||||
output_2 = get_now_datetime(with_timezone=True)
|
||||
assert output_2.tzinfo is not None
|
||||
assert output_2.tzinfo == UTC
|
||||
|
||||
|
||||
@pytest.mark.parametrize("seconds, timestamp", TEST_EPOCH_SECONDS)
|
||||
def test_epoch_seconds(seconds, timestamp):
|
||||
ts_wo_tz = datetime.datetime(**timestamp)
|
||||
assert seconds == get_epoch_seconds(ts_wo_tz)
|
||||
timestamp_w_tz = dict(timestamp)
|
||||
timestamp_w_tz['tzinfo'] = UTC
|
||||
ts_w_tz = datetime.datetime(**timestamp_w_tz)
|
||||
assert seconds == get_epoch_seconds(ts_w_tz)
|
||||
output_1 = from_epoch_seconds(seconds, with_timezone=False)
|
||||
assert ts_wo_tz == output_1
|
||||
output_2 = from_epoch_seconds(seconds, with_timezone=True)
|
||||
assert ts_w_tz == output_2
|
||||
|
||||
|
||||
@pytest.mark.parametrize("timestamp, expected_seconds", TEST_EPOCH_TO_SECONDS)
|
||||
def test_epoch_to_seconds(timestamp, expected_seconds):
|
||||
assert expected_seconds == get_epoch_seconds(timestamp)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("relative_time_string, with_timezone, now, expected", TEST_CONVERT_RELATIVE_TO_DATETIME)
|
||||
def test_convert_relative_to_datetime(relative_time_string, with_timezone, now, expected):
|
||||
output = convert_relative_to_datetime(relative_time_string, with_timezone=with_timezone, now=now)
|
||||
assert expected == output
|
||||
|
||||
|
||||
@pytest.mark.parametrize("input_string, input_name, backend, with_timezone, now, expected", TEST_GET_RELATIVE_TIME_OPTION)
|
||||
def test_get_relative_time_option(input_string, input_name, backend, with_timezone, now, expected):
|
||||
output = get_relative_time_option(input_string, input_name, backend=backend, with_timezone=with_timezone, now=now)
|
||||
assert expected == output
|
Loading…
Reference in New Issue