acme module utils: add functions for parsing Retry-After header values and computation of ARI certificate IDs (#737)

* Implement Retry-After value parse.

* Add cert ID computation function.

* Add tests and links to MDN.
pull/738/head
Felix Fontein 2024-04-29 23:06:35 +02:00 committed by GitHub
parent c6fbe58382
commit af5f4b57f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 147 additions and 0 deletions

View File

@ -10,6 +10,7 @@ __metaclass__ = type
import base64 import base64
import datetime
import re import re
import textwrap import textwrap
import traceback import traceback
@ -19,6 +20,10 @@ from ansible.module_utils.six.moves.urllib.parse import unquote
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_int_to_bytes
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import get_now_datetime
def nopad_b64(data): def nopad_b64(data):
return base64.urlsafe_b64encode(data).decode('utf8').replace("=", "") return base64.urlsafe_b64encode(data).decode('utf8').replace("=", "")
@ -65,8 +70,53 @@ def pem_to_der(pem_filename=None, pem_content=None):
def process_links(info, callback): def process_links(info, callback):
''' '''
Process link header, calls callback for every link header with the URL and relation as options. Process link header, calls callback for every link header with the URL and relation as options.
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link
''' '''
if 'link' in info: if 'link' in info:
link = info['link'] link = info['link']
for url, relation in re.findall(r'<([^>]+)>;\s*rel="(\w+)"', link): for url, relation in re.findall(r'<([^>]+)>;\s*rel="(\w+)"', link):
callback(unquote(url), relation) callback(unquote(url), relation)
def parse_retry_after(value, relative_with_timezone=True, now=None):
'''
Parse the value of a Retry-After header and return a timestamp.
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
'''
# First try a number of seconds
try:
delta = datetime.timedelta(seconds=int(value))
if now is None:
now = get_now_datetime(relative_with_timezone)
return now + delta
except ValueError:
pass
try:
return datetime.datetime.strptime(value, '%a, %d %b %Y %H:%M:%S GMT')
except ValueError:
pass
raise ValueError('Cannot parse Retry-After header value %s' % repr(value))
def compute_cert_id(backend, cert_info=None, cert_filename=None, cert_content=None):
# Obtain certificate info if not provided
if cert_info is None:
cert_info = backend.get_cert_information(cert_filename=cert_filename, cert_content=cert_content)
# Convert Authority Key Identifier to string
if cert_info.authority_key_identifier is None:
raise ModuleFailException('Module has no Authority Key Identifier extension')
aki = to_native(base64.urlsafe_b64encode(cert_info.authority_key_identifier)).replace('=', '')
# Convert serial number to string
serial_bytes = convert_int_to_bytes(cert_info.serial_number)
if ord(serial_bytes[:1]) >= 128:
serial_bytes = b'\x00' + serial_bytes
serial = to_native(base64.urlsafe_b64encode(serial_bytes)).replace('=', '')
# Compose cert ID
return '{aki}.{serial}'.format(aki=aki, serial=serial)

View File

@ -6,12 +6,20 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type __metaclass__ = type
import datetime
import pytest import pytest
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import ( from ansible_collections.community.crypto.plugins.module_utils.acme.utils import (
nopad_b64, nopad_b64,
pem_to_der, pem_to_der,
process_links,
parse_retry_after,
compute_cert_id,
) )
from .backend_data import ( from .backend_data import (
@ -27,6 +35,73 @@ NOPAD_B64 = [
] ]
TEST_LINKS_HEADER = [
(
{},
[],
),
(
{
'link': '<foo>; rel="bar"'
},
[
('foo', 'bar'),
],
),
(
{
'link': '<foo>; rel="bar", <baz>; rel="bam"'
},
[
('foo', 'bar'),
('baz', 'bam'),
],
),
(
{
'link': '<https://one.example.com>; rel="preconnect", <https://two.example.com>; rel="preconnect", <https://three.example.com>; rel="preconnect"'
},
[
('https://one.example.com', 'preconnect'),
('https://two.example.com', 'preconnect'),
('https://three.example.com', 'preconnect'),
],
),
]
TEST_RETRY_AFTER_HEADER = [
('120', datetime.datetime(2024, 4, 29, 0, 2, 0)),
('Wed, 21 Oct 2015 07:28:00 GMT', datetime.datetime(2015, 10, 21, 7, 28, 0)),
]
TEST_COMPUTE_CERT_ID = [
(
CertificateInformation(
not_valid_after=datetime.datetime(2018, 11, 26, 15, 28, 24),
not_valid_before=datetime.datetime(2018, 11, 25, 15, 28, 23),
serial_number=1,
subject_key_identifier=None,
authority_key_identifier=b'\x00\xff',
),
'AP8.AQ',
),
(
# AKI, serial number, and expected result taken from
# https://letsencrypt.org/2024/04/25/guide-to-integrating-ari-into-existing-acme-clients.html#step-3-constructing-the-ari-certid
CertificateInformation(
not_valid_after=datetime.datetime(2018, 11, 26, 15, 28, 24),
not_valid_before=datetime.datetime(2018, 11, 25, 15, 28, 23),
serial_number=0x87654321,
subject_key_identifier=None,
authority_key_identifier=b'\x69\x88\x5B\x6B\x87\x46\x40\x41\xE1\xB3\x7B\x84\x7B\xA0\xAE\x2C\xDE\x01\xC8\xD4',
),
'aYhba4dGQEHhs3uEe6CuLN4ByNQ.AIdlQyE',
),
]
@pytest.mark.parametrize("value, result", NOPAD_B64) @pytest.mark.parametrize("value, result", NOPAD_B64)
def test_nopad_b64(value, result): def test_nopad_b64(value, result):
assert nopad_b64(value.encode('utf-8')) == result assert nopad_b64(value.encode('utf-8')) == result
@ -37,3 +112,25 @@ def test_pem_to_der(pem, der, tmpdir):
fn = tmpdir / 'test.pem' fn = tmpdir / 'test.pem'
fn.write(pem) fn.write(pem)
assert pem_to_der(str(fn)) == der assert pem_to_der(str(fn)) == der
@pytest.mark.parametrize("value, expected_result", TEST_LINKS_HEADER)
def test_process_links(value, expected_result):
data = []
def callback(url, rel):
data.append((url, rel))
process_links(value, callback)
assert expected_result == data
@pytest.mark.parametrize("value, expected_result", TEST_RETRY_AFTER_HEADER)
def test_parse_retry_after(value, expected_result):
assert expected_result == parse_retry_after(value, now=datetime.datetime(2024, 4, 29, 0, 0, 0))
@pytest.mark.parametrize("cert_info, expected_result", TEST_COMPUTE_CERT_ID)
def test_compute_cert_id(cert_info, expected_result):
assert expected_result == compute_cert_id(backend=None, cert_info=cert_info)