Implement certificate information retrieval code in the ACME backends. (#736)

pull/738/head
Felix Fontein 2024-04-29 22:29:43 +02:00 committed by GitHub
parent afe7f7522c
commit c6fbe58382
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 309 additions and 13 deletions

View File

@ -0,0 +1,2 @@
deprecated_features:
- "acme.backends module utils - the ``get_cert_information()`` method for a ACME crypto backend must be implemented from community.crypto 3.0.0 on (https://github.com/ansible-collections/community.crypto/pull/736)."

View File

@ -19,6 +19,7 @@ from ansible.module_utils.common.text.converters import to_bytes, to_native, to_
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
CryptoBackend,
)
@ -49,7 +50,9 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.support im
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
cryptography_name_to_oid,
cryptography_serial_number_of_cert,
get_not_valid_after,
get_not_valid_before,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
@ -386,3 +389,44 @@ class CryptographyBackend(CryptoBackend):
Given a Criterium object, creates a ChainMatcher object.
'''
return CryptographyChainMatcher(criterium, self.module)
def get_cert_information(self, cert_filename=None, cert_content=None):
'''
Return some information on a X.509 certificate as a CertificateInformation object.
'''
if cert_filename is not None:
cert_content = read_file(cert_filename)
else:
cert_content = to_bytes(cert_content)
# Make sure we have at most one PEM. Otherwise cryptography 36.0.0 will barf.
cert_content = to_bytes(extract_first_pem(to_text(cert_content)) or '')
try:
cert = cryptography.x509.load_pem_x509_certificate(cert_content, _cryptography_backend)
except Exception as e:
if cert_filename is None:
raise BackendException('Cannot parse certificate: {0}'.format(e))
raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e))
ski = None
try:
ext = cert.extensions.get_extension_for_class(cryptography.x509.SubjectKeyIdentifier)
ski = ext.value.digest
except cryptography.x509.ExtensionNotFound:
pass
aki = None
try:
ext = cert.extensions.get_extension_for_class(cryptography.x509.AuthorityKeyIdentifier)
aki = ext.value.key_identifier
except cryptography.x509.ExtensionNotFound:
pass
return CertificateInformation(
not_valid_after=get_not_valid_after(cert),
not_valid_before=get_not_valid_before(cert),
serial_number=cryptography_serial_number_of_cert(cert),
subject_key_identifier=ski,
authority_key_identifier=aki,
)

View File

@ -20,6 +20,7 @@ import traceback
from ansible.module_utils.common.text.converters import to_native, to_text, to_bytes
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
CryptoBackend,
)
@ -30,6 +31,8 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.errors impor
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_bytes_to_int
try:
import ipaddress
except ImportError:
@ -39,6 +42,33 @@ except ImportError:
_OPENSSL_ENVIRONMENT_UPDATE = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C', LC_CTYPE='C')
def _extract_date(out_text, name, cert_filename_suffix=""):
try:
date_str = re.search(r"\s+%s\s*:\s+(.*)" % name, out_text).group(1)
return datetime.datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z')
except AttributeError:
raise BackendException("No '{0}' date found{1}".format(name, cert_filename_suffix))
except ValueError as exc:
raise BackendException("Failed to parse '{0}' date{1}: {2}".format(name, cert_filename_suffix, exc))
def _decode_octets(octets_text):
return binascii.unhexlify(re.sub(r"(\s|:)", "", octets_text).encode("utf-8"))
def _extract_octets(out_text, name, required=True):
match = re.search(
r"\s+%s:\s*\n\s+([A-Fa-f0-9]{2}(?::[A-Fa-f0-9]{2})*)\s*\n" % name,
out_text,
re.MULTILINE | re.DOTALL,
)
if match is not None:
return _decode_octets(match.group(1))
if not required:
return None
raise BackendException("No '{0}' octet string found".format(name))
class OpenSSLCLIBackend(CryptoBackend):
def __init__(self, module, openssl_binary=None):
super(OpenSSLCLIBackend, self).__init__(module)
@ -89,10 +119,12 @@ class OpenSSLCLIBackend(CryptoBackend):
dummy, out, dummy = self.module.run_command(
openssl_keydump_cmd, check_rc=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
out_text = to_text(out, errors='surrogate_or_strict')
if account_key_type == 'rsa':
pub_hex, pub_exp = re.search(
r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL).groups()
pub_hex = re.search(r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent", out_text, re.MULTILINE | re.DOTALL).group(1)
pub_exp = re.search(r"\npublicExponent: ([0-9]+)", out_text, re.MULTILINE | re.DOTALL).group(1)
pub_exp = "{0:x}".format(int(pub_exp))
if len(pub_exp) % 2:
pub_exp = "0{0}".format(pub_exp)
@ -104,17 +136,19 @@ class OpenSSLCLIBackend(CryptoBackend):
'jwk': {
"kty": "RSA",
"e": nopad_b64(binascii.unhexlify(pub_exp.encode("utf-8"))),
"n": nopad_b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))),
"n": nopad_b64(_decode_octets(pub_hex)),
},
'hash': 'sha256',
}
elif account_key_type == 'ec':
pub_data = re.search(
r"pub:\s*\n\s+04:([a-f0-9\:\s]+?)\nASN1 OID: (\S+)(?:\nNIST CURVE: (\S+))?",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
out_text,
re.MULTILINE | re.DOTALL,
)
if pub_data is None:
raise KeyParsingError('cannot parse elliptic curve key')
pub_hex = binascii.unhexlify(re.sub(r"(\s|:)", "", pub_data.group(1)).encode("utf-8"))
pub_hex = _decode_octets(pub_data.group(1))
asn1_oid_curve = pub_data.group(2).lower()
nist_curve = pub_data.group(3).lower() if pub_data.group(3) else None
if asn1_oid_curve == 'prime256v1' or nist_curve == 'p-256':
@ -303,13 +337,8 @@ class OpenSSLCLIBackend(CryptoBackend):
openssl_cert_cmd = [self.openssl_binary, "x509", "-in", filename, "-noout", "-text"]
dummy, out, dummy = self.module.run_command(
openssl_cert_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
try:
not_after_str = re.search(r"\s+Not After\s*:\s+(.*)", to_text(out, errors='surrogate_or_strict')).group(1)
not_after = datetime.datetime.strptime(not_after_str, '%b %d %H:%M:%S %Y %Z')
except AttributeError:
raise BackendException("No 'Not after' date found{0}".format(cert_filename_suffix))
except ValueError:
raise BackendException("Failed to parse 'Not after' date{0}".format(cert_filename_suffix))
out_text = to_text(out, errors='surrogate_or_strict')
not_after = _extract_date(out_text, 'Not After', cert_filename_suffix=cert_filename_suffix)
if now is None:
now = datetime.datetime.now()
return (not_after - now).days
@ -319,3 +348,43 @@ class OpenSSLCLIBackend(CryptoBackend):
Given a Criterium object, creates a ChainMatcher object.
'''
raise BackendException('Alternate chain matching can only be used with the "cryptography" backend.')
def get_cert_information(self, cert_filename=None, cert_content=None):
'''
Return some information on a X.509 certificate as a CertificateInformation object.
'''
filename = cert_filename
data = None
if cert_filename is not None:
cert_filename_suffix = ' in {0}'.format(cert_filename)
else:
filename = '/dev/stdin'
data = to_bytes(cert_content)
cert_filename_suffix = ''
openssl_cert_cmd = [self.openssl_binary, "x509", "-in", filename, "-noout", "-text"]
dummy, out, dummy = self.module.run_command(
openssl_cert_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE)
out_text = to_text(out, errors='surrogate_or_strict')
not_after = _extract_date(out_text, 'Not After', cert_filename_suffix=cert_filename_suffix)
not_before = _extract_date(out_text, 'Not Before', cert_filename_suffix=cert_filename_suffix)
sn = re.search(
r" Serial Number: ([0-9]+)",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
if sn:
serial = int(sn.group(1))
else:
serial = convert_bytes_to_int(_extract_octets(out_text, 'Serial Number', required=True))
ski = _extract_octets(out_text, 'X509v3 Subject Key Identifier', required=False)
aki = _extract_octets(out_text, 'X509v3 Authority Key Identifier', required=False)
return CertificateInformation(
not_valid_after=not_after,
not_valid_before=not_before,
serial_number=serial,
subject_key_identifier=ski,
authority_key_identifier=aki,
)

View File

@ -9,10 +9,27 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
from collections import namedtuple
import abc
from ansible.module_utils import six
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
BackendException,
)
CertificateInformation = namedtuple(
'CertificateInformation',
(
'not_valid_after',
'not_valid_before',
'serial_number',
'subject_key_identifier',
'authority_key_identifier',
),
)
@six.add_metaclass(abc.ABCMeta)
class CryptoBackend(object):
@ -74,3 +91,12 @@ class CryptoBackend(object):
'''
Given a Criterium object, creates a ChainMatcher object.
'''
def get_cert_information(self, cert_filename=None, cert_content=None):
'''
Return some information on a X.509 certificate as a CertificateInformation object.
'''
# Not implementing this method in a backend is DEPRECATED and will be
# disallowed in community.crypto 3.0.0. This method will be marked as
# @abstractmethod by then.
raise BackendException('This backend does not support get_cert_information()')

View File

@ -110,6 +110,9 @@ if sys.version_info[0] >= 3:
def _convert_int_to_bytes(count, no):
return no.to_bytes(count, byteorder='big')
def _convert_bytes_to_int(data):
return int.from_bytes(data, byteorder='big', signed=False)
def _to_hex(no):
return hex(no)[2:]
else:
@ -122,6 +125,12 @@ else:
raise Exception('Number {1} needs more than {0} bytes!'.format(count, n))
return ('0' * (2 * count - len(h)) + h).decode('hex')
def _convert_bytes_to_int(data):
v = 0
for x in data:
v = (v << 8) | ord(x)
return v
def _to_hex(no):
return '%x' % no
@ -155,3 +164,10 @@ def convert_int_to_hex(no, digits=None):
if digits is not None and len(value) < digits:
value = '0' * (digits - len(value)) + value
return value
def convert_bytes_to_int(data):
"""
Convert a byte string to an unsigned integer in network byte order.
"""
return _convert_bytes_to_int(data)

View File

@ -11,6 +11,7 @@ import datetime
import os
from ansible_collections.community.crypto.plugins.module_utils.acme.backends import (
CertificateInformation,
CryptoBackend,
)
@ -81,6 +82,9 @@ TEST_CSRS = [
TEST_CERT = load_fixture("cert_1.pem")
TEST_CERT_OPENSSL_OUTPUT = load_fixture("cert_1.txt")
TEST_CERT_DAYS = [
(datetime.datetime(2018, 11, 15, 1, 2, 3), 11),
(datetime.datetime(2018, 11, 25, 15, 20, 0), 1),
@ -88,6 +92,21 @@ TEST_CERT_DAYS = [
]
TEST_CERT_INFO = [
(
TEST_CERT,
CertificateInformation(
not_valid_after=datetime.datetime(2018, 11, 26, 15, 28, 24),
not_valid_before=datetime.datetime(2018, 11, 25, 15, 28, 23),
serial_number=1,
subject_key_identifier=b'\x98\xD2\xFD\x3C\xCC\xCD\x69\x45\xFB\xE2\x8C\x30\x2C\x54\x62\x18\x34\xB7\x07\x73',
authority_key_identifier=None,
),
TEST_CERT_OPENSSL_OUTPUT,
),
]
class FakeBackend(CryptoBackend):
def parse_key(self, key_file=None, key_content=None, passphrase=None):
raise BackendException('Not implemented in fake backend')
@ -98,6 +117,9 @@ class FakeBackend(CryptoBackend):
def create_mac_key(self, alg, key):
raise BackendException('Not implemented in fake backend')
def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None):
raise BackendException('Not implemented in fake backend')
def get_csr_identifiers(self, csr_filename=None, csr_content=None):
raise BackendException('Not implemented in fake backend')
@ -106,3 +128,6 @@ class FakeBackend(CryptoBackend):
def create_chain_matcher(self, criterium):
raise BackendException('Not implemented in fake backend')
def get_cert_information(self, cert_filename=None, cert_content=None):
raise BackendException('Not implemented in fake backend')

View File

@ -0,0 +1,38 @@
Certificate:
Data:
Version: 3 (0x2)
Serial Number: 1 (0x1)
Signature Algorithm: ecdsa-with-SHA256
Issuer: CN=ansible.com
Validity
Not Before: Nov 25 15:28:23 2018 GMT
Not After : Nov 26 15:28:24 2018 GMT
Subject: CN=ansible.com
Subject Public Key Info:
Public Key Algorithm: id-ecPublicKey
Public-Key: (256 bit)
pub:
04:00:9c:f4:c8:00:17:03:01:26:3a:14:d1:92:35:
f1:c2:07:9d:6d:63:ba:82:86:d8:33:79:56:b3:3a:
d2:eb:c1:bc:41:2c:e1:5d:1e:80:99:0d:c8:cd:90:
e2:9a:74:d3:5c:ee:d7:85:5c:a5:0d:3f:12:2f:31:
38:e3:f1:29:9b
ASN1 OID: prime256v1
NIST CURVE: P-256
X509v3 extensions:
X509v3 Subject Alternative Name:
DNS:example.com, DNS:example.org
X509v3 Basic Constraints: critical
CA:FALSE
X509v3 Key Usage: critical
Digital Signature
X509v3 Extended Key Usage:
TLS Web Server Authentication
X509v3 Subject Key Identifier:
98:D2:FD:3C:CC:CD:69:45:FB:E2:8C:30:2C:54:62:18:34:B7:07:73
Signature Algorithm: ecdsa-with-SHA256
Signature Value:
30:46:02:21:00:bc:fb:52:bf:7a:93:2d:0e:7c:ce:43:f4:cc:
05:98:28:36:8d:c7:2a:9b:f5:20:94:62:3d:fb:82:9e:38:42:
32:02:21:00:c0:55:f8:b5:d9:65:41:2a:dd:d4:76:3f:8c:cb:
07:c1:d2:b9:c0:7d:c9:90:af:fd:f9:f1:b0:c9:13:f5:d5:52

View File

@ -0,0 +1,3 @@
GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
SPDX-License-Identifier: GPL-3.0-or-later
SPDX-FileCopyrightText: Ansible Project

View File

@ -16,11 +16,20 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.backend_cryp
CryptographyBackend,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
ensure_utc_timezone,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
CRYPTOGRAPHY_TIMEZONE,
)
from .backend_data import (
TEST_KEYS,
TEST_CSRS,
TEST_CERT,
TEST_CERT_DAYS,
TEST_CERT_INFO,
)
@ -64,3 +73,22 @@ def test_certdays_cryptography(now, expected_days, tmpdir):
assert days == expected_days
days = backend.get_cert_days(cert_content=TEST_CERT, now=now)
assert days == expected_days
@pytest.mark.parametrize("cert_content, expected_cert_info, openssl_output", TEST_CERT_INFO)
def test_get_cert_information(cert_content, expected_cert_info, openssl_output, tmpdir):
fn = tmpdir / 'test-cert.pem'
fn.write(cert_content)
module = MagicMock()
backend = CryptographyBackend(module)
if CRYPTOGRAPHY_TIMEZONE:
expected_cert_info = expected_cert_info._replace(
not_valid_after=ensure_utc_timezone(expected_cert_info.not_valid_after),
not_valid_before=ensure_utc_timezone(expected_cert_info.not_valid_before),
)
cert_info = backend.get_cert_information(cert_filename=str(fn))
assert cert_info == expected_cert_info
cert_info = backend.get_cert_information(cert_content=cert_content)
assert cert_info == expected_cert_info

View File

@ -18,6 +18,10 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.backend_open
from .backend_data import (
TEST_KEYS,
TEST_CSRS,
TEST_CERT,
TEST_CERT_OPENSSL_OUTPUT,
TEST_CERT_DAYS,
TEST_CERT_INFO,
)
@ -61,3 +65,29 @@ def test_normalize_ip(ip, result):
module = MagicMock()
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
assert backend._normalize_ip(ip) == result
@pytest.mark.parametrize("now, expected_days", TEST_CERT_DAYS)
def test_certdays_cryptography(now, expected_days, tmpdir):
fn = tmpdir / 'test-cert.pem'
fn.write(TEST_CERT)
module = MagicMock()
module.run_command = MagicMock(return_value=(0, TEST_CERT_OPENSSL_OUTPUT, 0))
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
days = backend.get_cert_days(cert_filename=str(fn), now=now)
assert days == expected_days
days = backend.get_cert_days(cert_content=TEST_CERT, now=now)
assert days == expected_days
@pytest.mark.parametrize("cert_content, expected_cert_info, openssl_output", TEST_CERT_INFO)
def test_get_cert_information(cert_content, expected_cert_info, openssl_output, tmpdir):
fn = tmpdir / 'test-cert.pem'
fn.write(cert_content)
module = MagicMock()
module.run_command = MagicMock(return_value=(0, openssl_output, 0))
backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
cert_info = backend.get_cert_information(cert_filename=str(fn))
assert cert_info == expected_cert_info
cert_info = backend.get_cert_information(cert_content=cert_content)
assert cert_info == expected_cert_info

View File

@ -15,6 +15,7 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.math impor
quick_is_not_prime,
convert_int_to_bytes,
convert_int_to_hex,
convert_bytes_to_int,
)
@ -100,3 +101,17 @@ def test_convert_int_to_hex(no, digits, result):
value = convert_int_to_hex(no, digits=digits)
print(value)
assert value == result
@pytest.mark.parametrize('data, result', [
(b'', 0),
(b'\x00', 0),
(b'\x00\x01', 1),
(b'\x01', 1),
(b'\xff', 255),
(b'\x01\x00', 256),
])
def test_convert_bytes_to_int(data, result):
value = convert_bytes_to_int(data)
print(value)
assert value == result