From af5f4b57f8afa61082e9a24b6218cbc970939e80 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Mon, 29 Apr 2024 23:06:35 +0200 Subject: [PATCH] acme module utils: add functions for parsing Retry-After header values and computation of ARI certificate IDs (#737) * Implement Retry-After value parse. * Add cert ID computation function. * Add tests and links to MDN. --- plugins/module_utils/acme/utils.py | 50 ++++++++++ .../plugins/module_utils/acme/test_utils.py | 97 +++++++++++++++++++ 2 files changed, 147 insertions(+) diff --git a/plugins/module_utils/acme/utils.py b/plugins/module_utils/acme/utils.py index 217b6de4..9179da19 100644 --- a/plugins/module_utils/acme/utils.py +++ b/plugins/module_utils/acme/utils.py @@ -10,6 +10,7 @@ __metaclass__ = type import base64 +import datetime import re import textwrap import traceback @@ -19,6 +20,10 @@ from ansible.module_utils.six.moves.urllib.parse import unquote from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException +from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_int_to_bytes + +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import get_now_datetime + def nopad_b64(data): return base64.urlsafe_b64encode(data).decode('utf8').replace("=", "") @@ -65,8 +70,53 @@ def pem_to_der(pem_filename=None, pem_content=None): def process_links(info, callback): ''' Process link header, calls callback for every link header with the URL and relation as options. + + https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link ''' if 'link' in info: link = info['link'] for url, relation in re.findall(r'<([^>]+)>;\s*rel="(\w+)"', link): callback(unquote(url), relation) + + +def parse_retry_after(value, relative_with_timezone=True, now=None): + ''' + Parse the value of a Retry-After header and return a timestamp. + + https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After + ''' + # First try a number of seconds + try: + delta = datetime.timedelta(seconds=int(value)) + if now is None: + now = get_now_datetime(relative_with_timezone) + return now + delta + except ValueError: + pass + + try: + return datetime.datetime.strptime(value, '%a, %d %b %Y %H:%M:%S GMT') + except ValueError: + pass + + raise ValueError('Cannot parse Retry-After header value %s' % repr(value)) + + +def compute_cert_id(backend, cert_info=None, cert_filename=None, cert_content=None): + # Obtain certificate info if not provided + if cert_info is None: + cert_info = backend.get_cert_information(cert_filename=cert_filename, cert_content=cert_content) + + # Convert Authority Key Identifier to string + if cert_info.authority_key_identifier is None: + raise ModuleFailException('Module has no Authority Key Identifier extension') + aki = to_native(base64.urlsafe_b64encode(cert_info.authority_key_identifier)).replace('=', '') + + # Convert serial number to string + serial_bytes = convert_int_to_bytes(cert_info.serial_number) + if ord(serial_bytes[:1]) >= 128: + serial_bytes = b'\x00' + serial_bytes + serial = to_native(base64.urlsafe_b64encode(serial_bytes)).replace('=', '') + + # Compose cert ID + return '{aki}.{serial}'.format(aki=aki, serial=serial) diff --git a/tests/unit/plugins/module_utils/acme/test_utils.py b/tests/unit/plugins/module_utils/acme/test_utils.py index 9bdd8eb6..5cc318ac 100644 --- a/tests/unit/plugins/module_utils/acme/test_utils.py +++ b/tests/unit/plugins/module_utils/acme/test_utils.py @@ -6,12 +6,20 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type +import datetime + import pytest +from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( + CertificateInformation, +) from ansible_collections.community.crypto.plugins.module_utils.acme.utils import ( nopad_b64, pem_to_der, + process_links, + parse_retry_after, + compute_cert_id, ) from .backend_data import ( @@ -27,6 +35,73 @@ NOPAD_B64 = [ ] +TEST_LINKS_HEADER = [ + ( + {}, + [], + ), + ( + { + 'link': '; rel="bar"' + }, + [ + ('foo', 'bar'), + ], + ), + ( + { + 'link': '; rel="bar", ; rel="bam"' + }, + [ + ('foo', 'bar'), + ('baz', 'bam'), + ], + ), + ( + { + 'link': '; rel="preconnect", ; rel="preconnect", ; rel="preconnect"' + }, + [ + ('https://one.example.com', 'preconnect'), + ('https://two.example.com', 'preconnect'), + ('https://three.example.com', 'preconnect'), + ], + ), +] + + +TEST_RETRY_AFTER_HEADER = [ + ('120', datetime.datetime(2024, 4, 29, 0, 2, 0)), + ('Wed, 21 Oct 2015 07:28:00 GMT', datetime.datetime(2015, 10, 21, 7, 28, 0)), +] + + +TEST_COMPUTE_CERT_ID = [ + ( + CertificateInformation( + not_valid_after=datetime.datetime(2018, 11, 26, 15, 28, 24), + not_valid_before=datetime.datetime(2018, 11, 25, 15, 28, 23), + serial_number=1, + subject_key_identifier=None, + authority_key_identifier=b'\x00\xff', + ), + 'AP8.AQ', + ), + ( + # AKI, serial number, and expected result taken from + # https://letsencrypt.org/2024/04/25/guide-to-integrating-ari-into-existing-acme-clients.html#step-3-constructing-the-ari-certid + CertificateInformation( + not_valid_after=datetime.datetime(2018, 11, 26, 15, 28, 24), + not_valid_before=datetime.datetime(2018, 11, 25, 15, 28, 23), + serial_number=0x87654321, + subject_key_identifier=None, + authority_key_identifier=b'\x69\x88\x5B\x6B\x87\x46\x40\x41\xE1\xB3\x7B\x84\x7B\xA0\xAE\x2C\xDE\x01\xC8\xD4', + ), + 'aYhba4dGQEHhs3uEe6CuLN4ByNQ.AIdlQyE', + ), +] + + @pytest.mark.parametrize("value, result", NOPAD_B64) def test_nopad_b64(value, result): assert nopad_b64(value.encode('utf-8')) == result @@ -37,3 +112,25 @@ def test_pem_to_der(pem, der, tmpdir): fn = tmpdir / 'test.pem' fn.write(pem) assert pem_to_der(str(fn)) == der + + +@pytest.mark.parametrize("value, expected_result", TEST_LINKS_HEADER) +def test_process_links(value, expected_result): + data = [] + + def callback(url, rel): + data.append((url, rel)) + + process_links(value, callback) + + assert expected_result == data + + +@pytest.mark.parametrize("value, expected_result", TEST_RETRY_AFTER_HEADER) +def test_parse_retry_after(value, expected_result): + assert expected_result == parse_retry_after(value, now=datetime.datetime(2024, 4, 29, 0, 0, 0)) + + +@pytest.mark.parametrize("cert_info, expected_result", TEST_COMPUTE_CERT_ID) +def test_compute_cert_id(cert_info, expected_result): + assert expected_result == compute_cert_id(backend=None, cert_info=cert_info)