diff --git a/changelogs/fragments/246-openssh-certificate-module-utils.yml b/changelogs/fragments/246-openssh-certificate-module-utils.yml new file mode 100644 index 00000000..6f6d1802 --- /dev/null +++ b/changelogs/fragments/246-openssh-certificate-module-utils.yml @@ -0,0 +1,2 @@ +minor_changes: + - openssh certificate module utils - new module_utils for parsing OpenSSH certificates (https://github.com/ansible-collections/community.crypto/pull/246). diff --git a/plugins/module_utils/openssh/certificate.py b/plugins/module_utils/openssh/certificate.py new file mode 100644 index 00000000..297eedc7 --- /dev/null +++ b/plugins/module_utils/openssh/certificate.py @@ -0,0 +1,337 @@ +# -*- coding: utf-8 -*- +# +# Copyright: (c) 2021, Andrew Pantuso (@ajpantuso) +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +# Protocol References +# ------------------- +# https://datatracker.ietf.org/doc/html/rfc4251 +# https://datatracker.ietf.org/doc/html/rfc4253 +# https://datatracker.ietf.org/doc/html/rfc5656 +# https://datatracker.ietf.org/doc/html/rfc8032 +# https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD +# +# Inspired by: +# ------------ +# https://github.com/pyca/cryptography/blob/main/src/cryptography/hazmat/primitives/serialization/ssh.py +# https://github.com/paramiko/paramiko/blob/master/paramiko/message.py + +import abc +import binascii +import os +from base64 import b64encode +from hashlib import sha256 + +from ansible.module_utils import six +from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import ( + OpensshParser, + _OpensshWriter, +) + +# See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD +_USER_TYPE = 1 +_HOST_TYPE = 2 + +_SSH_TYPE_STRINGS = { + 'rsa': b"ssh-rsa", + 'dsa': b"ssh-dss", + 'ecdsa-nistp256': b"ecdsa-sha2-nistp256", + 'ecdsa-nistp384': b"ecdsa-sha2-nistp384", + 'ecdsa-nistp521': b"ecdsa-sha2-nistp521", + 'ed25519': b"ssh-ed25519", +} +_CERT_SUFFIX_V01 = b"-cert-v01@openssh.com" + +# See https://datatracker.ietf.org/doc/html/rfc5656#section-6.1 +_ECDSA_CURVE_IDENTIFIERS = { + 'ecdsa-nistp256': b'nistp256', + 'ecdsa-nistp384': b'nistp384', + 'ecdsa-nistp521': b'nistp521', +} +_ECDSA_CURVE_IDENTIFIERS_LOOKUP = { + b'nistp256': 'ecdsa-nistp256', + b'nistp384': 'ecdsa-nistp384', + b'nistp521': 'ecdsa-nistp521', +} + + +@six.add_metaclass(abc.ABCMeta) +class OpensshCertificateInfo: + """Encapsulates all certificate information which is signed by a CA key""" + def __init__(self, + nonce=None, + serial=None, + cert_type=None, + key_id=None, + principals=None, + valid_after=None, + valid_before=None, + critical_options=None, + extensions=None, + reserved=None): + self.nonce = nonce + self.serial = serial + self._cert_type = cert_type + self.key_id = key_id + self.principals = principals + self.valid_after = valid_after + self.valid_before = valid_before + self.critical_options = critical_options + self.extensions = extensions + self.reserved = reserved + + self.type_string = None + + @property + def cert_type(self): + if self._cert_type == _USER_TYPE: + return 'user' + elif self._cert_type == _HOST_TYPE: + return 'host' + else: + return '' + + @cert_type.setter + def cert_type(self, cert_type): + if cert_type == 'user' or cert_type == _USER_TYPE: + self._cert_type = _USER_TYPE + elif cert_type == 'host' or cert_type == _HOST_TYPE: + self._cert_type = _HOST_TYPE + else: + raise ValueError("%s is not a valid certificate type" % cert_type) + + @abc.abstractmethod + def public_key_fingerprint(self): + pass + + @abc.abstractmethod + def parse_public_numbers(self, parser): + pass + + +class OpensshRSACertificateInfo(OpensshCertificateInfo): + def __init__(self, e=None, n=None, **kwargs): + super(OpensshRSACertificateInfo, self).__init__(**kwargs) + self.type_string = _SSH_TYPE_STRINGS['rsa'] + _CERT_SUFFIX_V01 + self.e = e + self.n = n + + # See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 + def public_key_fingerprint(self): + if any([self.e is None, self.n is None]): + return b'' + + writer = _OpensshWriter() + writer.string(_SSH_TYPE_STRINGS['rsa']) + writer.mpint(self.e) + writer.mpint(self.n) + + return fingerprint(writer.bytes()) + + def parse_public_numbers(self, parser): + self.e = parser.mpint() + self.n = parser.mpint() + + +class OpensshDSACertificateInfo(OpensshCertificateInfo): + def __init__(self, p=None, q=None, g=None, y=None, **kwargs): + super(OpensshDSACertificateInfo, self).__init__(**kwargs) + self.type_string = _SSH_TYPE_STRINGS['dsa'] + _CERT_SUFFIX_V01 + self.p = p + self.q = q + self.g = g + self.y = y + + # See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 + def public_key_fingerprint(self): + if any([self.p is None, self.q is None, self.g is None, self.y is None]): + return b'' + + writer = _OpensshWriter() + writer.string(_SSH_TYPE_STRINGS['dsa']) + writer.mpint(self.p) + writer.mpint(self.q) + writer.mpint(self.g) + writer.mpint(self.y) + + return fingerprint(writer.bytes()) + + def parse_public_numbers(self, parser): + self.p = parser.mpint() + self.q = parser.mpint() + self.g = parser.mpint() + self.y = parser.mpint() + + +class OpensshECDSACertificateInfo(OpensshCertificateInfo): + def __init__(self, curve=None, public_key=None, **kwargs): + super(OpensshECDSACertificateInfo, self).__init__(**kwargs) + self._curve = None + if curve is not None: + self.curve = curve + + self.public_key = public_key + + @property + def curve(self): + return self._curve + + @curve.setter + def curve(self, curve): + if curve in _ECDSA_CURVE_IDENTIFIERS.values(): + self._curve = curve + self.type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]] + _CERT_SUFFIX_V01 + else: + raise ValueError( + "Curve must be one of %s" % (b','.join(list(_ECDSA_CURVE_IDENTIFIERS.values()))).decode('UTF-8') + ) + + # See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 + def public_key_fingerprint(self): + if any([self.curve is None, self.public_key is None]): + return b'' + + writer = _OpensshWriter() + writer.string(_SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[self.curve]]) + writer.string(self.curve) + writer.string(self.public_key) + + return fingerprint(writer.bytes()) + + def parse_public_numbers(self, parser): + self.curve = parser.string() + self.public_key = parser.string() + + +class OpensshED25519CertificateInfo(OpensshCertificateInfo): + def __init__(self, pk=None, **kwargs): + super(OpensshED25519CertificateInfo, self).__init__(**kwargs) + self.type_string = _SSH_TYPE_STRINGS['ed25519'] + _CERT_SUFFIX_V01 + self.pk = pk + + def public_key_fingerprint(self): + if self.pk is None: + return b'' + + writer = _OpensshWriter() + writer.string(_SSH_TYPE_STRINGS['ed25519']) + writer.string(self.pk) + + return fingerprint(writer.bytes()) + + def parse_public_numbers(self, parser): + self.pk = parser.string() + + +# See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD +class OpensshCertificate(object): + """Encapsulates a formatted OpenSSH certificate including signature and signing key""" + def __init__(self, cert_info, signing_key, signature): + + self.cert_info = cert_info + self.signing_key = signing_key + self.signature = signature + + @classmethod + def load(cls, path): + if not os.path.exists(path): + raise ValueError("%s is not a valid path." % path) + + try: + with open(path, 'rb') as cert_file: + data = cert_file.read() + except (IOError, OSError) as e: + raise ValueError("%s cannot be opened for reading: %s" % (path, e)) + + try: + format_identifier, b64_cert = data.split(b' ')[:2] + cert = binascii.a2b_base64(b64_cert) + except (binascii.Error, ValueError): + raise ValueError("Certificate not in OpenSSH format") + + for key_type, string in _SSH_TYPE_STRINGS.items(): + if format_identifier == string + _CERT_SUFFIX_V01: + pub_key_type = key_type + break + else: + raise ValueError("Invalid certificate format identifier: %s" % format_identifier) + + parser = OpensshParser(cert) + + if format_identifier != parser.string(): + raise ValueError("Certificate formats do not match") + + try: + cert_info = cls._parse_cert_info(pub_key_type, parser) + signing_key = parser.string() + signature = parser.string() + except (TypeError, ValueError) as e: + raise ValueError("Invalid certificate data: %s" % e) + + if parser.remaining_bytes(): + raise ValueError( + "%s bytes of additional data was not parsed while loading %s" % (parser.remaining_bytes(), path) + ) + + return cls( + cert_info=cert_info, + signing_key=signing_key, + signature=signature, + ) + + def signing_key_fingerprint(self): + return fingerprint(self.signing_key) + + @staticmethod + def _parse_cert_info(pub_key_type, parser): + cert_info = get_cert_info_object(pub_key_type) + cert_info.nonce = parser.string() + cert_info.parse_public_numbers(parser) + cert_info.serial = parser.uint64() + cert_info.cert_type = parser.uint32() + cert_info.key_id = parser.string() + cert_info.principals = parser.string_list() + cert_info.valid_after = parser.uint64() + cert_info.valid_before = parser.uint64() + cert_info.critical_options = parser.option_list() + cert_info.extensions = parser.option_list() + cert_info.reserved = parser.string() + + return cert_info + + +def fingerprint(public_key): + """Generates a SHA256 hash and formats output to resemble ``ssh-keygen``""" + h = sha256() + h.update(public_key) + return b'SHA256:' + b64encode(h.digest()).rstrip(b'=') + + +def get_cert_info_object(key_type): + if key_type == 'rsa': + cert_info = OpensshRSACertificateInfo() + elif key_type == 'dsa': + cert_info = OpensshDSACertificateInfo() + elif key_type in ('ecdsa-nistp256', 'ecdsa-nistp384', 'ecdsa-nistp521'): + cert_info = OpensshECDSACertificateInfo() + elif key_type == 'ed25519': + cert_info = OpensshED25519CertificateInfo() + else: + raise ValueError("%s is not a valid key type" % key_type) + + return cert_info diff --git a/plugins/module_utils/openssh/utils.py b/plugins/module_utils/openssh/utils.py index a512bb72..03aab56d 100644 --- a/plugins/module_utils/openssh/utils.py +++ b/plugins/module_utils/openssh/utils.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # -# (c) 2020, Doug Stanley +# Copyright: (c) 2020, Doug Stanley +# Copyright: (c) 2021, Andrew Pantuso (@ajpantuso) # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,6 +20,38 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type import re +from struct import Struct + +from ansible.module_utils.six import PY3 + +# Protocol References +# ------------------- +# https://datatracker.ietf.org/doc/html/rfc4251 +# https://datatracker.ietf.org/doc/html/rfc4253 +# https://datatracker.ietf.org/doc/html/rfc5656 +# https://datatracker.ietf.org/doc/html/rfc8032 +# +# Inspired by: +# ------------ +# https://github.com/pyca/cryptography/blob/main/src/cryptography/hazmat/primitives/serialization/ssh.py +# https://github.com/paramiko/paramiko/blob/master/paramiko/message.py + +if PY3: + long = int + +# 0 (False) or 1 (True) encoded as a single byte +_BOOLEAN = Struct(b'?') +# Unsigned 8-bit integer in network-byte-order +_UBYTE = Struct(b'!B') +_UBYTE_MAX = 0xFF +# Unsigned 32-bit integer in network-byte-order +_UINT32 = Struct(b'!I') +# Unsigned 32-bit little endian integer +_UINT32_LE = Struct(b' len(self._data): + raise ValueError("Insufficient data remaining at position: %s" % self._pos) + elif self._pos + offset < 0: + raise ValueError("Position cannot be less than zero.") + else: + return self._pos + offset + + @classmethod + def signature_data(cls, signature_string): + signature_data = {} + + parser = cls(signature_string) + signature_type = parser.string() + signature_blob = parser.string() + + blob_parser = cls(signature_blob) + if signature_type == b'ssh-rsa': + # https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 + signature_data['s'] = cls._big_int(signature_blob, "big") + elif signature_type == b'ssh-dss': + # https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 + signature_data['r'] = cls._big_int(signature_blob[:20], "big") + signature_data['s'] = cls._big_int(signature_blob[20:], "big") + elif signature_type in (b'ecdsa-sha2-nistp256', b'ecdsa-sha2-nistp384', b'ecdsa-sha2-nistp521'): + # https://datatracker.ietf.org/doc/html/rfc5656#section-3.1.2 + signature_data['r'] = blob_parser.mpint() + signature_data['s'] = blob_parser.mpint() + elif signature_type == b'ssh-ed25519': + # https://datatracker.ietf.org/doc/html/rfc8032#section-5.1.2 + signature_data['R'] = cls._big_int(signature_blob[:32], "little") + signature_data['S'] = cls._big_int(signature_blob[32:], "little") + else: + raise ValueError("%s is not a valid signature type" % signature_type) + + signature_data['signature_type'] = signature_type + + return signature_data + + @classmethod + def _big_int(cls, raw_string, byte_order, signed=False): + if byte_order not in ("big", "little"): + raise ValueError("Byte_order must be one of (big, little) not %s" % byte_order) + + if PY3: + return int.from_bytes(raw_string, byte_order, signed=signed) + + result = 0 + byte_length = len(raw_string) + + if byte_length > 0: + # Check sign-bit + msb = raw_string[0] if byte_order == "big" else raw_string[-1] + negative = bool(ord(msb) & 0x80) + # Match pad value for two's complement + pad = b'\xFF' if signed and negative else b'\x00' + # The definition of ``mpint`` enforces that unnecessary bytes are not encoded so they are added back + pad_length = (4 - byte_length % 4) + if pad_length < 4: + raw_string = pad * pad_length + raw_string if byte_order == "big" else raw_string + pad * pad_length + byte_length += pad_length + # Accumulate arbitrary precision integer bytes in the appropriate order + if byte_order == "big": + for i in range(0, byte_length, cls.UINT32_OFFSET): + left_shift = result << cls.UINT32_OFFSET * 8 + result = left_shift + _UINT32.unpack(raw_string[i:i + cls.UINT32_OFFSET])[0] + else: + for i in range(byte_length, 0, -cls.UINT32_OFFSET): + left_shift = result << cls.UINT32_OFFSET * 8 + result = left_shift + _UINT32_LE.unpack(raw_string[i - cls.UINT32_OFFSET:i])[0] + # Adjust for two's complement + if signed and negative: + result -= 1 << (8 * byte_length) + + return result + + +class _OpensshWriter(object): + """Writes SSH encoded values to a bytes-like buffer + + .. warning:: + This class is a private API and must not be exported outside of the openssh module_utils. + It is not to be used to construct Openssh objects, but rather as a utility to assist + in validating parsed material. + """ + def __init__(self, buffer=None): + if buffer is not None: + if not isinstance(buffer, (bytes, bytearray)): + raise TypeError("Buffer must be a bytes-like object not %s" % type(buffer)) + else: + buffer = bytearray() + + self._buff = buffer + + def boolean(self, value): + if not isinstance(value, bool): + raise TypeError("Value must be of type bool not %s" % type(value)) + + self._buff.extend(_BOOLEAN.pack(value)) + + return self + + def uint32(self, value): + if not isinstance(value, int): + raise TypeError("Value must be of type int not %s" % type(value)) + if value < 0 or value > _UINT32_MAX: + raise ValueError("Value must be a positive integer less than %s" % _UINT32_MAX) + + self._buff.extend(_UINT32.pack(value)) + + return self + + def uint64(self, value): + if not isinstance(value, (long, int)): + raise TypeError("Value must be of type (long, int) not %s" % type(value)) + if value < 0 or value > _UINT64_MAX: + raise ValueError("Value must be a positive integer less than %s" % _UINT64_MAX) + + self._buff.extend(_UINT64.pack(value)) + + return self + + def string(self, value): + if not isinstance(value, (bytes, bytearray)): + raise TypeError("Value must be bytes-like not %s" % type(value)) + self.uint32(len(value)) + self._buff.extend(value) + + return self + + def mpint(self, value): + if not isinstance(value, (int, long)): + raise TypeError("Value must be of type (long, int) not %s" % type(value)) + + self.string(self._int_to_mpint(value)) + + return self + + def name_list(self, value): + if not isinstance(value, list): + raise TypeError("Value must be a list of byte strings not %s" % type(value)) + + try: + self.string(','.join(value).encode('ASCII')) + except UnicodeEncodeError as e: + raise ValueError("Name-list's must consist of US-ASCII characters: %s" % e) + + return self + + def string_list(self, value): + if not isinstance(value, list): + raise TypeError("Value must be a list of byte string not %s" % type(value)) + + writer = _OpensshWriter() + for s in value: + writer.string(s) + + self.string(writer.bytes()) + + return self + + def option_list(self, value): + if not isinstance(value, list) or (value and not isinstance(value[0], tuple)): + raise TypeError("Value must be a list of tuples") + + writer = _OpensshWriter() + for name, data in value: + writer.string(name) + # SSH option data is encoded twice though this behavior is not documented + writer.string(_OpensshWriter().string(data).bytes() if data else bytes()) + + self.string(writer.bytes()) + + return self + + @staticmethod + def _int_to_mpint(num): + if PY3: + byte_length = (num.bit_length() + 7) // 8 + try: + result = num.to_bytes(byte_length, "big", signed=True) + # Handles values which require \x00 or \xFF to pad sign-bit + except OverflowError: + result = num.to_bytes(byte_length + 1, "big", signed=True) + else: + result = bytes() + # 0 and -1 are treated as special cases since they are used as sentinels for all other values + if num == 0: + result += b'\x00' + elif num == -1: + result += b'\xFF' + elif num > 0: + while num >> 32: + result = _UINT32.pack(num & _UINT32_MAX) + result + num = num >> 32 + # Pack last 4 bytes individually to discard insignificant bytes + while num: + result = _UBYTE.pack(num & _UBYTE_MAX) + result + num = num >> 8 + # Zero pad final byte if most-significant bit is 1 as per mpint definition + if ord(result[0]) & 0x80: + result = b'\x00' + result + else: + while (num >> 32) < -1: + result = _UINT32.pack(num & _UINT32_MAX) + result + num = num >> 32 + while num < -1: + result = _UBYTE.pack(num & _UBYTE_MAX) + result + num = num >> 8 + if not ord(result[0]) & 0x80: + result = b'\xFF' + result + + return result + + def bytes(self): + return bytes(self._buff) diff --git a/tests/unit/plugins/module_utils/openssh/test_certificate.py b/tests/unit/plugins/module_utils/openssh/test_certificate.py new file mode 100644 index 00000000..6a215d53 --- /dev/null +++ b/tests/unit/plugins/module_utils/openssh/test_certificate.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- + +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +from ansible_collections.community.crypto.plugins.module_utils.openssh.certificate import ( + OpensshCertificate +) + +# Type: ssh-rsa-cert-v01@openssh.com user certificate +# Public key: RSA-CERT SHA256:SvUwwUer4AwsdePYseJR3LcZS8lnKi6BqiL51Dop030 +# Signing CA: DSA SHA256:YCdJ2lYU+FSkWUud7zg1SJszprXoRGNU/GVcqXUjgC8 +# Key ID: "test" +# Serial: 0 +# Valid: forever +# Principals: (none) +# Critical Options: (none) +# Extensions: +# permit-X11-forwarding +# permit-agent-forwarding +# permit-port-forwarding +# permit-pty +# permit-user-rc +RSA_CERT_SIGNED_BY_DSA = ( + b'ssh-rsa-cert-v01@openssh.com AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgY9CvhGpyvBB611Lmx6hHPD+CmeJ0oW' + + b'SSK1q6K3h5CS4AAAADAQABAAABAQDKYIJtpFaWpTNNifmuV3DM9BBdngMG28jWPy4C/SoZg4EP7mkYUsG6hN+LgjOL17YEF7bKDEWPl9sQS' + + b'92iD+AuAPrjnHVQ9VG5hbTYiQAaicj6hxqBoNqGQWxDzhZL4B35MgqmoUOBGnzYA/fKgqhRVzOXbWFxKLtzSJzB+Z+kmeoBzq+4MazL4Bko' + + b'yPZMrIMnvxiluv+kqE9SWeJ/5e7WXdtbYTnSR4WN3gW/BMKEoKQk/UGwuPvCiRq+y8LorJP4B1Wfwlm/meqtbTidXyCcQPR9xWpce3rRjLL' + + b'T6cimUjWrbx7Q1SlsypdkclgPSTu9Jg457am8tnQUgnL7VdetAAAAAAAAAAAAAAABAAAABHRlc3QAAAAAAAAAAAAAAAD//////////wAAAA' + + b'AAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXcGVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0L' + + b'WZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLXJjAAAAAAAAAAAAAAGxAAAAB3NzaC1kc3MAAACBAPV/' + + b'b5FknU8e56TWAGLRQ0v3c3f5jAS0txcwqtYLHLulTqyMcLL0MyzWxXv77MpjTMwEjWXLbfNWdk/qmsjfBynzs2nSZ7clVsqt/ZOadcBFEhq' + + b'ZM0l+1ZCPkhQiqsD2aodGbkVcJgqL5Z5krzB5MTey7c8rlAAxKOjfs70Bg8MPAAAAFQCW466dSEu2Pf0u8AA5SHgH0i/xuwAAAIBc23gfmv' + + b'GC+oaUAXiak17kH6NvOSJXZBdk/8CyGK6yL+CHKrKyffe6BbiVXwC6sUIa9j4YsFeyYwPFGBtfLuNUmgyKYTJcCM2zJLBykmTIvjSdRaYGN' + + b'Rkyi8GnzVV2lWxQ+4m4UGeTPbPN/OG4B0NwDbBJGbVJv0xJPq2EBKoUdgAAAIAyrFxGDLtOZFZ2fgONVaKaapEpJ5f3qPhLDXxVQ/BKVUkU' + + b'RA4AHHyXF2AMiiOOiHLrO5xsEGUyW+OISFm+6m17cEPNixA7G1fBniLvyVv2woyYW3kaY4J9z266kAFzFWVNgwr+T7MY0hEvct8VFA97JMR' + + b'Q7c8c/tNDaL7uqV46QQAAADcAAAAHc3NoLWRzcwAAAChaQ94wqca+KhkHtbkLpjvGsfu0Gy03SAb0+o11Shk/BXnK7N/cwEVD ' + + b'ansible@ansible-host' +) +RSA_FINGERPRINT = b'SHA256:SvUwwUer4AwsdePYseJR3LcZS8lnKi6BqiL51Dop030' +# Type: ssh-dss-cert-v01@openssh.com user certificate +# Public key: DSA-CERT SHA256:YCdJ2lYU+FSkWUud7zg1SJszprXoRGNU/GVcqXUjgC8 +# Signing CA: ECDSA SHA256:w9lp4zGRJShhm4DzO3ulVm0BEcR0PMjrM6VanQo4C0w +# Key ID: "test" +# Serial: 0 +# Valid: forever +# Principals: (none) +# Critical Options: (none) +# Extensions: (none) +DSA_CERT_SIGNED_BY_ECDSA_NO_OPTS = ( + b'ssh-dss-cert-v01@openssh.com AAAAHHNzaC1kc3MtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgsKvMxIv4viCNQX7z8K4/R5jronpZGf' + + b'ydpoBoh2Cx5dgAAACBAPV/b5FknU8e56TWAGLRQ0v3c3f5jAS0txcwqtYLHLulTqyMcLL0MyzWxXv77MpjTMwEjWXLbfNWdk/qmsjfBynzs' + + b'2nSZ7clVsqt/ZOadcBFEhqZM0l+1ZCPkhQiqsD2aodGbkVcJgqL5Z5krzB5MTey7c8rlAAxKOjfs70Bg8MPAAAAFQCW466dSEu2Pf0u8AA5' + + b'SHgH0i/xuwAAAIBc23gfmvGC+oaUAXiak17kH6NvOSJXZBdk/8CyGK6yL+CHKrKyffe6BbiVXwC6sUIa9j4YsFeyYwPFGBtfLuNUmgyKYTJ' + + b'cCM2zJLBykmTIvjSdRaYGNRkyi8GnzVV2lWxQ+4m4UGeTPbPN/OG4B0NwDbBJGbVJv0xJPq2EBKoUdgAAAIAyrFxGDLtOZFZ2fgONVaKaap' + + b'EpJ5f3qPhLDXxVQ/BKVUkURA4AHHyXF2AMiiOOiHLrO5xsEGUyW+OISFm+6m17cEPNixA7G1fBniLvyVv2woyYW3kaY4J9z266kAFzFWVNg' + + b'wr+T7MY0hEvct8VFA97JMRQ7c8c/tNDaL7uqV46QQAAAAAAAAAAAAAAAQAAAAR0ZXN0AAAAAAAAAAAAAAAA//////////8AAAAAAAAAAAAA' + + b'AAAAAABoAAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBOf55Wc0yzaJPtxXxBGZKmAUozbYXwxZGFS1c/FaJbwLpq/' + + b'wvanQKM01uU73swNIt+ZFra9kRSi21xjzgMPn7U0AAABkAAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAABJAAAAIGmlKa/riG7+EpoW6dTJY6' + + b'0N8BrEcniKgOxdRM1EPJ2DAAAAIQDnK4stvbvS+Bn0/42Was7uOfJtnLYXs5EuB2L3uejPcQ== ansible@ansible-host' +) +DSA_FINGERPRINT = b'SHA256:YCdJ2lYU+FSkWUud7zg1SJszprXoRGNU/GVcqXUjgC8' +# Type: ecdsa-sha2-nistp256-cert-v01@openssh.com user certificate +# Public key: ECDSA-CERT SHA256:w9lp4zGRJShhm4DzO3ulVm0BEcR0PMjrM6VanQo4C0w +# Signing CA: ED25519 SHA256:NP4JdfkCopbjwMepq0aPrpMz13cNmEd+uDOxC/j9N40 +# Key ID: "test" +# Serial: 0 +# Valid: forever +# Principals: (none) +# Critical Options: +# force-command /usr/bin/csh +# Extensions: +# permit-X11-forwarding +# permit-agent-forwarding +# permit-port-forwarding +# permit-pty +# permit-user-rc +ECDSA_CERT_SIGNED_BY_ED25519_VALID_OPTS = ( + b'ecdsa-sha2-nistp256-cert-v01@openssh.com AAAAKGVjZHNhLXNoYTItbmlzdHAyNTYtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgtC' + + b'ips7/sOOOTAgiawGlQhM6pb26t0FfQ1jG60m+tOg0AAAAIbmlzdHAyNTYAAABBBOf55Wc0yzaJPtxXxBGZKmAUozbYXwxZGFS1c/FaJbwLp' + + b'q/wvanQKM01uU73swNIt+ZFra9kRSi21xjzgMPn7U0AAAAAAAAAAAAAAAEAAAAEdGVzdAAAAAAAAAAAAAAAAP//////////AAAAJQAAAA1m' + + b'b3JjZS1jb21tYW5kAAAAEAAAAAwvdXNyL2Jpbi9jc2gAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXcGVybWl0LWFnZW5' + + b'0LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0LWZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLX' + + b'JjAAAAAAAAAAAAAAAzAAAAC3NzaC1lZDI1NTE5AAAAII3qYBforim0x87UXpaTDNFnhFTyb+TPCJVQpEAOHTL6AAAAUwAAAAtzc2gtZWQyN' + + b'TUxOQAAAEAdp3eOLRN5t2wW29TBWbz604uuXg88jH4RA4HDhbRupa/x2rN3j6iZQ4VXPLA4JtdfIslHFkH6HUlxU8XsoJwP ' + + b'ansible@ansible-host' +) +ECDSA_FINGERPRINT = b'SHA256:w9lp4zGRJShhm4DzO3ulVm0BEcR0PMjrM6VanQo4C0w' +# Type: ssh-ed25519-cert-v01@openssh.com user certificate +# Public key: ED25519-CERT SHA256:NP4JdfkCopbjwMepq0aPrpMz13cNmEd+uDOxC/j9N40 +# Signing CA: RSA SHA256:SvUwwUer4AwsdePYseJR3LcZS8lnKi6BqiL51Dop030 +# Key ID: "test" +# Serial: 0 +# Valid: forever +# Principals: (none) +# Critical Options: +# test UNKNOWN OPTION (len 13) +# Extensions: +# test UNKNOWN OPTION (len 0) +ED25519_CERT_SIGNED_BY_RSA_INVALID_OPTS = ( + b'ssh-ed25519-cert-v01@openssh.com AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIP034YpKn6BDcwxqFnVrKt' + + b'kNX7k6X7hxZ7lADp5LAxHrAAAAII3qYBforim0x87UXpaTDNFnhFTyb+TPCJVQpEAOHTL6AAAAAAAAAAAAAAABAAAABHRlc3QAAAAAAAAAA' + + b'AAAAAD//////////wAAABkAAAAEdGVzdAAAAA0AAAAJdW5kZWZpbmVkAAAADAAAAAR0ZXN0AAAAAAAAAAAAAAEXAAAAB3NzaC1yc2EAAAAD' + + b'AQABAAABAQDKYIJtpFaWpTNNifmuV3DM9BBdngMG28jWPy4C/SoZg4EP7mkYUsG6hN+LgjOL17YEF7bKDEWPl9sQS92iD+AuAPrjnHVQ9VG' + + b'5hbTYiQAaicj6hxqBoNqGQWxDzhZL4B35MgqmoUOBGnzYA/fKgqhRVzOXbWFxKLtzSJzB+Z+kmeoBzq+4MazL4BkoyPZMrIMnvxiluv+kqE' + + b'9SWeJ/5e7WXdtbYTnSR4WN3gW/BMKEoKQk/UGwuPvCiRq+y8LorJP4B1Wfwlm/meqtbTidXyCcQPR9xWpce3rRjLLT6cimUjWrbx7Q1Slsy' + + b'pdkclgPSTu9Jg457am8tnQUgnL7VdetAAABDwAAAAdzc2gtcnNhAAABAMZLNacwOMNexYUaFK1nU0JPQTv4fM73QDG3xURtDsIbI6DAcA1y' + + b'KkvgjJcxlZHx0APJ+i1lWNAvPeOmuPTioymjIEuwxi0VGuAoVKgjmIy6aXH2z3YMxy9cGOq6LNfI4c58iBHR5ejVHAzvIg3rowypVsCGugL' + + b'7WJpz3eypBJt4TglwRTJpp54IMN2CyDQm0N97x9ris8jQQHlCF2EgZp1u4aOiZJTSJ5d4hapO0uZwXOI9AIWy/lmx0/6jX07MWrs4iXpfiF' + + b'5T4s6kEn7YW4SaJ0Z7xGp3V0vDOxh+jwHZGD5GM449Il6QxQwDY5BSJq+iMR467yaIjw2g8Kt4ZiU= ansible@ansible-host' +) +ED25519_FINGERPRINT = b'SHA256:NP4JdfkCopbjwMepq0aPrpMz13cNmEd+uDOxC/j9N40' +# garbage +INVALID_DATA = b'yDspTN+BJzvIK2Q+CRD3qBDVSi+YqSxwyz432VEaHKlXbuLURirY0QpuBCqgR6tCtWW5vEGkXKZ3' + +VALID_OPTS = [(b'force-command', b'/usr/bin/csh')] +INVALID_OPTS = [(b'test', b'undefined')] +VALID_EXTENSIONS = [ + (b'permit-X11-forwarding', b''), + (b'permit-agent-forwarding', b''), + (b'permit-port-forwarding', b''), + (b'permit-pty', b''), + (b'permit-user-rc', b''), +] +INVALID_EXTENSIONS = [(b'test', b'')] + + +def test_rsa_certificate(tmpdir): + cert_file = tmpdir / 'id_rsa-cert.pub' + cert_file.write(RSA_CERT_SIGNED_BY_DSA, mode='wb') + + cert = OpensshCertificate.load(str(cert_file)) + assert cert.cert_info.key_id == b'test' + assert cert.cert_info.serial == 0 + assert cert.cert_info.type_string == b'ssh-rsa-cert-v01@openssh.com' + assert cert.cert_info.public_key_fingerprint() == RSA_FINGERPRINT + assert cert.signing_key_fingerprint() == DSA_FINGERPRINT + + +def test_dsa_certificate(tmpdir): + cert_file = tmpdir / 'id_dsa-cert.pub' + cert_file.write(DSA_CERT_SIGNED_BY_ECDSA_NO_OPTS) + + cert = OpensshCertificate.load(str(cert_file)) + + assert cert.cert_info.type_string == b'ssh-dss-cert-v01@openssh.com' + assert cert.cert_info.public_key_fingerprint() == DSA_FINGERPRINT + assert cert.signing_key_fingerprint() == ECDSA_FINGERPRINT + assert cert.cert_info.critical_options == [] + assert cert.cert_info.extensions == [] + + +def test_ecdsa_certificate(tmpdir): + cert_file = tmpdir / 'id_ecdsa-cert.pub' + cert_file.write(ECDSA_CERT_SIGNED_BY_ED25519_VALID_OPTS) + + cert = OpensshCertificate.load(str(cert_file)) + assert cert.cert_info.type_string == b'ecdsa-sha2-nistp256-cert-v01@openssh.com' + assert cert.cert_info.public_key_fingerprint() == ECDSA_FINGERPRINT + assert cert.signing_key_fingerprint() == ED25519_FINGERPRINT + assert cert.cert_info.critical_options == VALID_OPTS + assert cert.cert_info.extensions == VALID_EXTENSIONS + + +def test_ed25519_certificate(tmpdir): + cert_file = tmpdir / 'id_ed25519-cert.pub' + cert_file.write(ED25519_CERT_SIGNED_BY_RSA_INVALID_OPTS) + + cert = OpensshCertificate.load(str(cert_file)) + assert cert.cert_info.type_string == b'ssh-ed25519-cert-v01@openssh.com' + assert cert.cert_info.public_key_fingerprint() == ED25519_FINGERPRINT + assert cert.signing_key_fingerprint() == RSA_FINGERPRINT + assert cert.cert_info.critical_options == INVALID_OPTS + assert cert.cert_info.extensions == INVALID_EXTENSIONS + + +def test_invalid_data(tmpdir): + result = False + cert_file = tmpdir / 'invalid-cert.pub' + cert_file.write(INVALID_DATA) + + try: + OpensshCertificate.load(str(cert_file)) + except ValueError: + result = True + assert result diff --git a/tests/unit/plugins/module_utils/openssh/test_utils.py b/tests/unit/plugins/module_utils/openssh/test_utils.py new file mode 100644 index 00000000..4b82e1a0 --- /dev/null +++ b/tests/unit/plugins/module_utils/openssh/test_utils.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- + +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + +import pytest + +from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import ( + parse_openssh_version, + OpensshParser, + _OpensshWriter +) + +SSH_VERSION_STRING = "OpenSSH_7.9p1, OpenSSL 1.1.0i-fips 14 Aug 2018" +SSH_VERSION_NUMBER = "7.9" + +VALID_BOOLEAN = [ + True, + False +] +INVALID_BOOLEAN = [ + 0x02 +] +VALID_UINT32 = [ + 0x00, + 0x01, + 0x01234567, + 0xFFFFFFFF, +] +INVALID_UINT32 = [ + 0xFFFFFFFFF, + -1, +] +VALID_UINT64 = [ + 0x00, + 0x01, + 0x0123456789ABCDEF, + 0xFFFFFFFFFFFFFFFF, +] +INVALID_UINT64 = [ + 0xFFFFFFFFFFFFFFFFF, + -1, +] +VALID_STRING = [ + b'test string', +] +INVALID_STRING = [ + [], +] +# See https://datatracker.ietf.org/doc/html/rfc4251#section-5 for examples source +VALID_MPINT = [ + 0x00, + 0x9a378f9b2e332a7, + 0x80, + -0x1234, + -0xdeadbeef, + # Additional large int test + 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF, +] +INVALID_MPINT = [ + [], +] + + +def test_parse_openssh_version(): + assert parse_openssh_version(SSH_VERSION_STRING) == SSH_VERSION_NUMBER + + +@pytest.mark.parametrize("boolean", VALID_BOOLEAN) +def test_valid_boolean(boolean): + assert OpensshParser(_OpensshWriter().boolean(boolean).bytes()).boolean() == boolean + + +@pytest.mark.parametrize("boolean", INVALID_BOOLEAN) +def test_invalid_boolean(boolean): + with pytest.raises(TypeError): + _OpensshWriter().boolean(boolean) + + +@pytest.mark.parametrize("uint32", VALID_UINT32) +def test_valid_uint32(uint32): + assert OpensshParser(_OpensshWriter().uint32(uint32).bytes()).uint32() == uint32 + + +@pytest.mark.parametrize("uint32", INVALID_UINT32) +def test_invalid_uint32(uint32): + with pytest.raises(ValueError): + _OpensshWriter().uint32(uint32) + + +@pytest.mark.parametrize("uint64", VALID_UINT64) +def test_valid_uint64(uint64): + assert OpensshParser(_OpensshWriter().uint64(uint64).bytes()).uint64() == uint64 + + +@pytest.mark.parametrize("uint64", INVALID_UINT64) +def test_invalid_uint64(uint64): + with pytest.raises(ValueError): + _OpensshWriter().uint64(uint64) + + +@pytest.mark.parametrize("ssh_string", VALID_STRING) +def test_valid_string(ssh_string): + assert OpensshParser(_OpensshWriter().string(ssh_string).bytes()).string() == ssh_string + + +@pytest.mark.parametrize("ssh_string", INVALID_STRING) +def test_invalid_string(ssh_string): + with pytest.raises(TypeError): + _OpensshWriter().string(ssh_string) + + +@pytest.mark.parametrize("mpint", VALID_MPINT) +def test_valid_mpint(mpint): + assert OpensshParser(_OpensshWriter().mpint(mpint).bytes()).mpint() == mpint + + +@pytest.mark.parametrize("mpint", INVALID_MPINT) +def test_invalid_mpint(mpint): + with pytest.raises(TypeError): + _OpensshWriter().mpint(mpint) + + +def test_valid_seek(): + buffer = bytearray(b'buffer') + parser = OpensshParser(buffer) + parser.seek(len(buffer)) + assert parser.remaining_bytes() == 0 + parser.seek(-len(buffer)) + assert parser.remaining_bytes() == len(buffer) + + +def test_invalid_seek(): + result = False + buffer = b'buffer' + parser = OpensshParser(buffer) + + with pytest.raises(ValueError): + parser.seek(len(buffer) + 1) + + with pytest.raises(ValueError): + parser.seek(-1) + + +def test_writer_bytes(): + buffer = bytearray(b'buffer') + assert _OpensshWriter(buffer).bytes() == buffer