New module utils openssh.certificate (#246)

* Initial commit

* Adding informational comments

* Adding changelog fragment

* Fixing CRLF changelog fragment

* Refactoring public number parsing and added chaining for writer methods

* Adding more descriptive error for invalid certificate data

* Fixing signature data parsing

* Correcting ed25519 signature type to binary

* Applying initial review suggestions and fixing option-list writer

* Applying review suggestions

* Making OpensshWriter private
pull/254/head
Ajpantuso 2021-06-22 06:54:56 -04:00 committed by GitHub
parent 2ba77e015c
commit 5d153e05ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1018 additions and 1 deletions

View File

@ -0,0 +1,2 @@
minor_changes:
- openssh certificate module utils - new module_utils for parsing OpenSSH certificates (https://github.com/ansible-collections/community.crypto/pull/246).

View File

@ -0,0 +1,337 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2021, Andrew Pantuso (@ajpantuso) <ajpantuso@gmail.com>
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
__metaclass__ = type
# Protocol References
# -------------------
# https://datatracker.ietf.org/doc/html/rfc4251
# https://datatracker.ietf.org/doc/html/rfc4253
# https://datatracker.ietf.org/doc/html/rfc5656
# https://datatracker.ietf.org/doc/html/rfc8032
# https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD
#
# Inspired by:
# ------------
# https://github.com/pyca/cryptography/blob/main/src/cryptography/hazmat/primitives/serialization/ssh.py
# https://github.com/paramiko/paramiko/blob/master/paramiko/message.py
import abc
import binascii
import os
from base64 import b64encode
from hashlib import sha256
from ansible.module_utils import six
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
OpensshParser,
_OpensshWriter,
)
# See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD
_USER_TYPE = 1
_HOST_TYPE = 2
_SSH_TYPE_STRINGS = {
'rsa': b"ssh-rsa",
'dsa': b"ssh-dss",
'ecdsa-nistp256': b"ecdsa-sha2-nistp256",
'ecdsa-nistp384': b"ecdsa-sha2-nistp384",
'ecdsa-nistp521': b"ecdsa-sha2-nistp521",
'ed25519': b"ssh-ed25519",
}
_CERT_SUFFIX_V01 = b"-cert-v01@openssh.com"
# See https://datatracker.ietf.org/doc/html/rfc5656#section-6.1
_ECDSA_CURVE_IDENTIFIERS = {
'ecdsa-nistp256': b'nistp256',
'ecdsa-nistp384': b'nistp384',
'ecdsa-nistp521': b'nistp521',
}
_ECDSA_CURVE_IDENTIFIERS_LOOKUP = {
b'nistp256': 'ecdsa-nistp256',
b'nistp384': 'ecdsa-nistp384',
b'nistp521': 'ecdsa-nistp521',
}
@six.add_metaclass(abc.ABCMeta)
class OpensshCertificateInfo:
"""Encapsulates all certificate information which is signed by a CA key"""
def __init__(self,
nonce=None,
serial=None,
cert_type=None,
key_id=None,
principals=None,
valid_after=None,
valid_before=None,
critical_options=None,
extensions=None,
reserved=None):
self.nonce = nonce
self.serial = serial
self._cert_type = cert_type
self.key_id = key_id
self.principals = principals
self.valid_after = valid_after
self.valid_before = valid_before
self.critical_options = critical_options
self.extensions = extensions
self.reserved = reserved
self.type_string = None
@property
def cert_type(self):
if self._cert_type == _USER_TYPE:
return 'user'
elif self._cert_type == _HOST_TYPE:
return 'host'
else:
return ''
@cert_type.setter
def cert_type(self, cert_type):
if cert_type == 'user' or cert_type == _USER_TYPE:
self._cert_type = _USER_TYPE
elif cert_type == 'host' or cert_type == _HOST_TYPE:
self._cert_type = _HOST_TYPE
else:
raise ValueError("%s is not a valid certificate type" % cert_type)
@abc.abstractmethod
def public_key_fingerprint(self):
pass
@abc.abstractmethod
def parse_public_numbers(self, parser):
pass
class OpensshRSACertificateInfo(OpensshCertificateInfo):
def __init__(self, e=None, n=None, **kwargs):
super(OpensshRSACertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['rsa'] + _CERT_SUFFIX_V01
self.e = e
self.n = n
# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
def public_key_fingerprint(self):
if any([self.e is None, self.n is None]):
return b''
writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS['rsa'])
writer.mpint(self.e)
writer.mpint(self.n)
return fingerprint(writer.bytes())
def parse_public_numbers(self, parser):
self.e = parser.mpint()
self.n = parser.mpint()
class OpensshDSACertificateInfo(OpensshCertificateInfo):
def __init__(self, p=None, q=None, g=None, y=None, **kwargs):
super(OpensshDSACertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['dsa'] + _CERT_SUFFIX_V01
self.p = p
self.q = q
self.g = g
self.y = y
# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
def public_key_fingerprint(self):
if any([self.p is None, self.q is None, self.g is None, self.y is None]):
return b''
writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS['dsa'])
writer.mpint(self.p)
writer.mpint(self.q)
writer.mpint(self.g)
writer.mpint(self.y)
return fingerprint(writer.bytes())
def parse_public_numbers(self, parser):
self.p = parser.mpint()
self.q = parser.mpint()
self.g = parser.mpint()
self.y = parser.mpint()
class OpensshECDSACertificateInfo(OpensshCertificateInfo):
def __init__(self, curve=None, public_key=None, **kwargs):
super(OpensshECDSACertificateInfo, self).__init__(**kwargs)
self._curve = None
if curve is not None:
self.curve = curve
self.public_key = public_key
@property
def curve(self):
return self._curve
@curve.setter
def curve(self, curve):
if curve in _ECDSA_CURVE_IDENTIFIERS.values():
self._curve = curve
self.type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]] + _CERT_SUFFIX_V01
else:
raise ValueError(
"Curve must be one of %s" % (b','.join(list(_ECDSA_CURVE_IDENTIFIERS.values()))).decode('UTF-8')
)
# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
def public_key_fingerprint(self):
if any([self.curve is None, self.public_key is None]):
return b''
writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[self.curve]])
writer.string(self.curve)
writer.string(self.public_key)
return fingerprint(writer.bytes())
def parse_public_numbers(self, parser):
self.curve = parser.string()
self.public_key = parser.string()
class OpensshED25519CertificateInfo(OpensshCertificateInfo):
def __init__(self, pk=None, **kwargs):
super(OpensshED25519CertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['ed25519'] + _CERT_SUFFIX_V01
self.pk = pk
def public_key_fingerprint(self):
if self.pk is None:
return b''
writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS['ed25519'])
writer.string(self.pk)
return fingerprint(writer.bytes())
def parse_public_numbers(self, parser):
self.pk = parser.string()
# See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD
class OpensshCertificate(object):
"""Encapsulates a formatted OpenSSH certificate including signature and signing key"""
def __init__(self, cert_info, signing_key, signature):
self.cert_info = cert_info
self.signing_key = signing_key
self.signature = signature
@classmethod
def load(cls, path):
if not os.path.exists(path):
raise ValueError("%s is not a valid path." % path)
try:
with open(path, 'rb') as cert_file:
data = cert_file.read()
except (IOError, OSError) as e:
raise ValueError("%s cannot be opened for reading: %s" % (path, e))
try:
format_identifier, b64_cert = data.split(b' ')[:2]
cert = binascii.a2b_base64(b64_cert)
except (binascii.Error, ValueError):
raise ValueError("Certificate not in OpenSSH format")
for key_type, string in _SSH_TYPE_STRINGS.items():
if format_identifier == string + _CERT_SUFFIX_V01:
pub_key_type = key_type
break
else:
raise ValueError("Invalid certificate format identifier: %s" % format_identifier)
parser = OpensshParser(cert)
if format_identifier != parser.string():
raise ValueError("Certificate formats do not match")
try:
cert_info = cls._parse_cert_info(pub_key_type, parser)
signing_key = parser.string()
signature = parser.string()
except (TypeError, ValueError) as e:
raise ValueError("Invalid certificate data: %s" % e)
if parser.remaining_bytes():
raise ValueError(
"%s bytes of additional data was not parsed while loading %s" % (parser.remaining_bytes(), path)
)
return cls(
cert_info=cert_info,
signing_key=signing_key,
signature=signature,
)
def signing_key_fingerprint(self):
return fingerprint(self.signing_key)
@staticmethod
def _parse_cert_info(pub_key_type, parser):
cert_info = get_cert_info_object(pub_key_type)
cert_info.nonce = parser.string()
cert_info.parse_public_numbers(parser)
cert_info.serial = parser.uint64()
cert_info.cert_type = parser.uint32()
cert_info.key_id = parser.string()
cert_info.principals = parser.string_list()
cert_info.valid_after = parser.uint64()
cert_info.valid_before = parser.uint64()
cert_info.critical_options = parser.option_list()
cert_info.extensions = parser.option_list()
cert_info.reserved = parser.string()
return cert_info
def fingerprint(public_key):
"""Generates a SHA256 hash and formats output to resemble ``ssh-keygen``"""
h = sha256()
h.update(public_key)
return b'SHA256:' + b64encode(h.digest()).rstrip(b'=')
def get_cert_info_object(key_type):
if key_type == 'rsa':
cert_info = OpensshRSACertificateInfo()
elif key_type == 'dsa':
cert_info = OpensshDSACertificateInfo()
elif key_type in ('ecdsa-nistp256', 'ecdsa-nistp384', 'ecdsa-nistp521'):
cert_info = OpensshECDSACertificateInfo()
elif key_type == 'ed25519':
cert_info = OpensshED25519CertificateInfo()
else:
raise ValueError("%s is not a valid key type" % key_type)
return cert_info

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
#
# (c) 2020, Doug Stanley <doug+ansible@technologixllc.com>
# Copyright: (c) 2020, Doug Stanley <doug+ansible@technologixllc.com>
# Copyright: (c) 2021, Andrew Pantuso (@ajpantuso) <ajpantuso@gmail.com>
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -19,6 +20,38 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import re
from struct import Struct
from ansible.module_utils.six import PY3
# Protocol References
# -------------------
# https://datatracker.ietf.org/doc/html/rfc4251
# https://datatracker.ietf.org/doc/html/rfc4253
# https://datatracker.ietf.org/doc/html/rfc5656
# https://datatracker.ietf.org/doc/html/rfc8032
#
# Inspired by:
# ------------
# https://github.com/pyca/cryptography/blob/main/src/cryptography/hazmat/primitives/serialization/ssh.py
# https://github.com/paramiko/paramiko/blob/master/paramiko/message.py
if PY3:
long = int
# 0 (False) or 1 (True) encoded as a single byte
_BOOLEAN = Struct(b'?')
# Unsigned 8-bit integer in network-byte-order
_UBYTE = Struct(b'!B')
_UBYTE_MAX = 0xFF
# Unsigned 32-bit integer in network-byte-order
_UINT32 = Struct(b'!I')
# Unsigned 32-bit little endian integer
_UINT32_LE = Struct(b'<I')
_UINT32_MAX = 0xFFFFFFFF
# Unsigned 64-bit integer in network-byte-order
_UINT64 = Struct(b'!Q')
_UINT64_MAX = 0xFFFFFFFFFFFFFFFF
def parse_openssh_version(version_string):
@ -33,3 +66,311 @@ def parse_openssh_version(version_string):
version = None
return version
# See https://datatracker.ietf.org/doc/html/rfc4251#section-5 for SSH data types
class OpensshParser(object):
"""Parser for OpenSSH encoded objects"""
BOOLEAN_OFFSET = 1
UINT32_OFFSET = 4
UINT64_OFFSET = 8
def __init__(self, data):
if not isinstance(data, (bytes, bytearray)):
raise TypeError("Data must be bytes-like not %s" % type(data))
self._data = memoryview(data) if PY3 else data
self._pos = 0
def boolean(self):
next_pos = self._check_position(self.BOOLEAN_OFFSET)
value = _BOOLEAN.unpack(self._data[self._pos:next_pos])[0]
self._pos = next_pos
return value
def uint32(self):
next_pos = self._check_position(self.UINT32_OFFSET)
value = _UINT32.unpack(self._data[self._pos:next_pos])[0]
self._pos = next_pos
return value
def uint64(self):
next_pos = self._check_position(self.UINT64_OFFSET)
value = _UINT64.unpack(self._data[self._pos:next_pos])[0]
self._pos = next_pos
return value
def string(self):
length = self.uint32()
next_pos = self._check_position(length)
value = self._data[self._pos:next_pos]
self._pos = next_pos
# Cast to bytes is required as a memoryview slice is itself a memoryview
return value if not PY3 else bytes(value)
def mpint(self):
return self._big_int(self.string(), "big", signed=True)
def name_list(self):
raw_string = self.string()
return raw_string.decode('ASCII').split(',')
# Convenience function, but not an official data type from SSH
def string_list(self):
result = []
raw_string = self.string()
if raw_string:
parser = OpensshParser(raw_string)
while parser.remaining_bytes():
result.append(parser.string())
return result
# Convenience function, but not an official data type from SSH
def option_list(self):
result = []
raw_string = self.string()
if raw_string:
parser = OpensshParser(raw_string)
while parser.remaining_bytes():
name = parser.string()
data = parser.string()
if data:
# data is doubly-encoded
data = OpensshParser(data).string()
result.append((name, data))
return result
def seek(self, offset):
self._pos = self._check_position(offset)
return self._pos
def remaining_bytes(self):
return len(self._data) - self._pos
def _check_position(self, offset):
if self._pos + offset > len(self._data):
raise ValueError("Insufficient data remaining at position: %s" % self._pos)
elif self._pos + offset < 0:
raise ValueError("Position cannot be less than zero.")
else:
return self._pos + offset
@classmethod
def signature_data(cls, signature_string):
signature_data = {}
parser = cls(signature_string)
signature_type = parser.string()
signature_blob = parser.string()
blob_parser = cls(signature_blob)
if signature_type == b'ssh-rsa':
# https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
signature_data['s'] = cls._big_int(signature_blob, "big")
elif signature_type == b'ssh-dss':
# https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
signature_data['r'] = cls._big_int(signature_blob[:20], "big")
signature_data['s'] = cls._big_int(signature_blob[20:], "big")
elif signature_type in (b'ecdsa-sha2-nistp256', b'ecdsa-sha2-nistp384', b'ecdsa-sha2-nistp521'):
# https://datatracker.ietf.org/doc/html/rfc5656#section-3.1.2
signature_data['r'] = blob_parser.mpint()
signature_data['s'] = blob_parser.mpint()
elif signature_type == b'ssh-ed25519':
# https://datatracker.ietf.org/doc/html/rfc8032#section-5.1.2
signature_data['R'] = cls._big_int(signature_blob[:32], "little")
signature_data['S'] = cls._big_int(signature_blob[32:], "little")
else:
raise ValueError("%s is not a valid signature type" % signature_type)
signature_data['signature_type'] = signature_type
return signature_data
@classmethod
def _big_int(cls, raw_string, byte_order, signed=False):
if byte_order not in ("big", "little"):
raise ValueError("Byte_order must be one of (big, little) not %s" % byte_order)
if PY3:
return int.from_bytes(raw_string, byte_order, signed=signed)
result = 0
byte_length = len(raw_string)
if byte_length > 0:
# Check sign-bit
msb = raw_string[0] if byte_order == "big" else raw_string[-1]
negative = bool(ord(msb) & 0x80)
# Match pad value for two's complement
pad = b'\xFF' if signed and negative else b'\x00'
# The definition of ``mpint`` enforces that unnecessary bytes are not encoded so they are added back
pad_length = (4 - byte_length % 4)
if pad_length < 4:
raw_string = pad * pad_length + raw_string if byte_order == "big" else raw_string + pad * pad_length
byte_length += pad_length
# Accumulate arbitrary precision integer bytes in the appropriate order
if byte_order == "big":
for i in range(0, byte_length, cls.UINT32_OFFSET):
left_shift = result << cls.UINT32_OFFSET * 8
result = left_shift + _UINT32.unpack(raw_string[i:i + cls.UINT32_OFFSET])[0]
else:
for i in range(byte_length, 0, -cls.UINT32_OFFSET):
left_shift = result << cls.UINT32_OFFSET * 8
result = left_shift + _UINT32_LE.unpack(raw_string[i - cls.UINT32_OFFSET:i])[0]
# Adjust for two's complement
if signed and negative:
result -= 1 << (8 * byte_length)
return result
class _OpensshWriter(object):
"""Writes SSH encoded values to a bytes-like buffer
.. warning::
This class is a private API and must not be exported outside of the openssh module_utils.
It is not to be used to construct Openssh objects, but rather as a utility to assist
in validating parsed material.
"""
def __init__(self, buffer=None):
if buffer is not None:
if not isinstance(buffer, (bytes, bytearray)):
raise TypeError("Buffer must be a bytes-like object not %s" % type(buffer))
else:
buffer = bytearray()
self._buff = buffer
def boolean(self, value):
if not isinstance(value, bool):
raise TypeError("Value must be of type bool not %s" % type(value))
self._buff.extend(_BOOLEAN.pack(value))
return self
def uint32(self, value):
if not isinstance(value, int):
raise TypeError("Value must be of type int not %s" % type(value))
if value < 0 or value > _UINT32_MAX:
raise ValueError("Value must be a positive integer less than %s" % _UINT32_MAX)
self._buff.extend(_UINT32.pack(value))
return self
def uint64(self, value):
if not isinstance(value, (long, int)):
raise TypeError("Value must be of type (long, int) not %s" % type(value))
if value < 0 or value > _UINT64_MAX:
raise ValueError("Value must be a positive integer less than %s" % _UINT64_MAX)
self._buff.extend(_UINT64.pack(value))
return self
def string(self, value):
if not isinstance(value, (bytes, bytearray)):
raise TypeError("Value must be bytes-like not %s" % type(value))
self.uint32(len(value))
self._buff.extend(value)
return self
def mpint(self, value):
if not isinstance(value, (int, long)):
raise TypeError("Value must be of type (long, int) not %s" % type(value))
self.string(self._int_to_mpint(value))
return self
def name_list(self, value):
if not isinstance(value, list):
raise TypeError("Value must be a list of byte strings not %s" % type(value))
try:
self.string(','.join(value).encode('ASCII'))
except UnicodeEncodeError as e:
raise ValueError("Name-list's must consist of US-ASCII characters: %s" % e)
return self
def string_list(self, value):
if not isinstance(value, list):
raise TypeError("Value must be a list of byte string not %s" % type(value))
writer = _OpensshWriter()
for s in value:
writer.string(s)
self.string(writer.bytes())
return self
def option_list(self, value):
if not isinstance(value, list) or (value and not isinstance(value[0], tuple)):
raise TypeError("Value must be a list of tuples")
writer = _OpensshWriter()
for name, data in value:
writer.string(name)
# SSH option data is encoded twice though this behavior is not documented
writer.string(_OpensshWriter().string(data).bytes() if data else bytes())
self.string(writer.bytes())
return self
@staticmethod
def _int_to_mpint(num):
if PY3:
byte_length = (num.bit_length() + 7) // 8
try:
result = num.to_bytes(byte_length, "big", signed=True)
# Handles values which require \x00 or \xFF to pad sign-bit
except OverflowError:
result = num.to_bytes(byte_length + 1, "big", signed=True)
else:
result = bytes()
# 0 and -1 are treated as special cases since they are used as sentinels for all other values
if num == 0:
result += b'\x00'
elif num == -1:
result += b'\xFF'
elif num > 0:
while num >> 32:
result = _UINT32.pack(num & _UINT32_MAX) + result
num = num >> 32
# Pack last 4 bytes individually to discard insignificant bytes
while num:
result = _UBYTE.pack(num & _UBYTE_MAX) + result
num = num >> 8
# Zero pad final byte if most-significant bit is 1 as per mpint definition
if ord(result[0]) & 0x80:
result = b'\x00' + result
else:
while (num >> 32) < -1:
result = _UINT32.pack(num & _UINT32_MAX) + result
num = num >> 32
while num < -1:
result = _UBYTE.pack(num & _UBYTE_MAX) + result
num = num >> 8
if not ord(result[0]) & 0x80:
result = b'\xFF' + result
return result
def bytes(self):
return bytes(self._buff)

View File

@ -0,0 +1,188 @@
# -*- coding: utf-8 -*-
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from ansible_collections.community.crypto.plugins.module_utils.openssh.certificate import (
OpensshCertificate
)
# Type: ssh-rsa-cert-v01@openssh.com user certificate
# Public key: RSA-CERT SHA256:SvUwwUer4AwsdePYseJR3LcZS8lnKi6BqiL51Dop030
# Signing CA: DSA SHA256:YCdJ2lYU+FSkWUud7zg1SJszprXoRGNU/GVcqXUjgC8
# Key ID: "test"
# Serial: 0
# Valid: forever
# Principals: (none)
# Critical Options: (none)
# Extensions:
# permit-X11-forwarding
# permit-agent-forwarding
# permit-port-forwarding
# permit-pty
# permit-user-rc
RSA_CERT_SIGNED_BY_DSA = (
b'ssh-rsa-cert-v01@openssh.com AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgY9CvhGpyvBB611Lmx6hHPD+CmeJ0oW' +
b'SSK1q6K3h5CS4AAAADAQABAAABAQDKYIJtpFaWpTNNifmuV3DM9BBdngMG28jWPy4C/SoZg4EP7mkYUsG6hN+LgjOL17YEF7bKDEWPl9sQS' +
b'92iD+AuAPrjnHVQ9VG5hbTYiQAaicj6hxqBoNqGQWxDzhZL4B35MgqmoUOBGnzYA/fKgqhRVzOXbWFxKLtzSJzB+Z+kmeoBzq+4MazL4Bko' +
b'yPZMrIMnvxiluv+kqE9SWeJ/5e7WXdtbYTnSR4WN3gW/BMKEoKQk/UGwuPvCiRq+y8LorJP4B1Wfwlm/meqtbTidXyCcQPR9xWpce3rRjLL' +
b'T6cimUjWrbx7Q1SlsypdkclgPSTu9Jg457am8tnQUgnL7VdetAAAAAAAAAAAAAAABAAAABHRlc3QAAAAAAAAAAAAAAAD//////////wAAAA' +
b'AAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXcGVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0L' +
b'WZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLXJjAAAAAAAAAAAAAAGxAAAAB3NzaC1kc3MAAACBAPV/' +
b'b5FknU8e56TWAGLRQ0v3c3f5jAS0txcwqtYLHLulTqyMcLL0MyzWxXv77MpjTMwEjWXLbfNWdk/qmsjfBynzs2nSZ7clVsqt/ZOadcBFEhq' +
b'ZM0l+1ZCPkhQiqsD2aodGbkVcJgqL5Z5krzB5MTey7c8rlAAxKOjfs70Bg8MPAAAAFQCW466dSEu2Pf0u8AA5SHgH0i/xuwAAAIBc23gfmv' +
b'GC+oaUAXiak17kH6NvOSJXZBdk/8CyGK6yL+CHKrKyffe6BbiVXwC6sUIa9j4YsFeyYwPFGBtfLuNUmgyKYTJcCM2zJLBykmTIvjSdRaYGN' +
b'Rkyi8GnzVV2lWxQ+4m4UGeTPbPN/OG4B0NwDbBJGbVJv0xJPq2EBKoUdgAAAIAyrFxGDLtOZFZ2fgONVaKaapEpJ5f3qPhLDXxVQ/BKVUkU' +
b'RA4AHHyXF2AMiiOOiHLrO5xsEGUyW+OISFm+6m17cEPNixA7G1fBniLvyVv2woyYW3kaY4J9z266kAFzFWVNgwr+T7MY0hEvct8VFA97JMR' +
b'Q7c8c/tNDaL7uqV46QQAAADcAAAAHc3NoLWRzcwAAAChaQ94wqca+KhkHtbkLpjvGsfu0Gy03SAb0+o11Shk/BXnK7N/cwEVD ' +
b'ansible@ansible-host'
)
RSA_FINGERPRINT = b'SHA256:SvUwwUer4AwsdePYseJR3LcZS8lnKi6BqiL51Dop030'
# Type: ssh-dss-cert-v01@openssh.com user certificate
# Public key: DSA-CERT SHA256:YCdJ2lYU+FSkWUud7zg1SJszprXoRGNU/GVcqXUjgC8
# Signing CA: ECDSA SHA256:w9lp4zGRJShhm4DzO3ulVm0BEcR0PMjrM6VanQo4C0w
# Key ID: "test"
# Serial: 0
# Valid: forever
# Principals: (none)
# Critical Options: (none)
# Extensions: (none)
DSA_CERT_SIGNED_BY_ECDSA_NO_OPTS = (
b'ssh-dss-cert-v01@openssh.com AAAAHHNzaC1kc3MtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgsKvMxIv4viCNQX7z8K4/R5jronpZGf' +
b'ydpoBoh2Cx5dgAAACBAPV/b5FknU8e56TWAGLRQ0v3c3f5jAS0txcwqtYLHLulTqyMcLL0MyzWxXv77MpjTMwEjWXLbfNWdk/qmsjfBynzs' +
b'2nSZ7clVsqt/ZOadcBFEhqZM0l+1ZCPkhQiqsD2aodGbkVcJgqL5Z5krzB5MTey7c8rlAAxKOjfs70Bg8MPAAAAFQCW466dSEu2Pf0u8AA5' +
b'SHgH0i/xuwAAAIBc23gfmvGC+oaUAXiak17kH6NvOSJXZBdk/8CyGK6yL+CHKrKyffe6BbiVXwC6sUIa9j4YsFeyYwPFGBtfLuNUmgyKYTJ' +
b'cCM2zJLBykmTIvjSdRaYGNRkyi8GnzVV2lWxQ+4m4UGeTPbPN/OG4B0NwDbBJGbVJv0xJPq2EBKoUdgAAAIAyrFxGDLtOZFZ2fgONVaKaap' +
b'EpJ5f3qPhLDXxVQ/BKVUkURA4AHHyXF2AMiiOOiHLrO5xsEGUyW+OISFm+6m17cEPNixA7G1fBniLvyVv2woyYW3kaY4J9z266kAFzFWVNg' +
b'wr+T7MY0hEvct8VFA97JMRQ7c8c/tNDaL7uqV46QQAAAAAAAAAAAAAAAQAAAAR0ZXN0AAAAAAAAAAAAAAAA//////////8AAAAAAAAAAAAA' +
b'AAAAAABoAAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBOf55Wc0yzaJPtxXxBGZKmAUozbYXwxZGFS1c/FaJbwLpq/' +
b'wvanQKM01uU73swNIt+ZFra9kRSi21xjzgMPn7U0AAABkAAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAABJAAAAIGmlKa/riG7+EpoW6dTJY6' +
b'0N8BrEcniKgOxdRM1EPJ2DAAAAIQDnK4stvbvS+Bn0/42Was7uOfJtnLYXs5EuB2L3uejPcQ== ansible@ansible-host'
)
DSA_FINGERPRINT = b'SHA256:YCdJ2lYU+FSkWUud7zg1SJszprXoRGNU/GVcqXUjgC8'
# Type: ecdsa-sha2-nistp256-cert-v01@openssh.com user certificate
# Public key: ECDSA-CERT SHA256:w9lp4zGRJShhm4DzO3ulVm0BEcR0PMjrM6VanQo4C0w
# Signing CA: ED25519 SHA256:NP4JdfkCopbjwMepq0aPrpMz13cNmEd+uDOxC/j9N40
# Key ID: "test"
# Serial: 0
# Valid: forever
# Principals: (none)
# Critical Options:
# force-command /usr/bin/csh
# Extensions:
# permit-X11-forwarding
# permit-agent-forwarding
# permit-port-forwarding
# permit-pty
# permit-user-rc
ECDSA_CERT_SIGNED_BY_ED25519_VALID_OPTS = (
b'ecdsa-sha2-nistp256-cert-v01@openssh.com AAAAKGVjZHNhLXNoYTItbmlzdHAyNTYtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgtC' +
b'ips7/sOOOTAgiawGlQhM6pb26t0FfQ1jG60m+tOg0AAAAIbmlzdHAyNTYAAABBBOf55Wc0yzaJPtxXxBGZKmAUozbYXwxZGFS1c/FaJbwLp' +
b'q/wvanQKM01uU73swNIt+ZFra9kRSi21xjzgMPn7U0AAAAAAAAAAAAAAAEAAAAEdGVzdAAAAAAAAAAAAAAAAP//////////AAAAJQAAAA1m' +
b'b3JjZS1jb21tYW5kAAAAEAAAAAwvdXNyL2Jpbi9jc2gAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXcGVybWl0LWFnZW5' +
b'0LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0LWZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLX' +
b'JjAAAAAAAAAAAAAAAzAAAAC3NzaC1lZDI1NTE5AAAAII3qYBforim0x87UXpaTDNFnhFTyb+TPCJVQpEAOHTL6AAAAUwAAAAtzc2gtZWQyN' +
b'TUxOQAAAEAdp3eOLRN5t2wW29TBWbz604uuXg88jH4RA4HDhbRupa/x2rN3j6iZQ4VXPLA4JtdfIslHFkH6HUlxU8XsoJwP ' +
b'ansible@ansible-host'
)
ECDSA_FINGERPRINT = b'SHA256:w9lp4zGRJShhm4DzO3ulVm0BEcR0PMjrM6VanQo4C0w'
# Type: ssh-ed25519-cert-v01@openssh.com user certificate
# Public key: ED25519-CERT SHA256:NP4JdfkCopbjwMepq0aPrpMz13cNmEd+uDOxC/j9N40
# Signing CA: RSA SHA256:SvUwwUer4AwsdePYseJR3LcZS8lnKi6BqiL51Dop030
# Key ID: "test"
# Serial: 0
# Valid: forever
# Principals: (none)
# Critical Options:
# test UNKNOWN OPTION (len 13)
# Extensions:
# test UNKNOWN OPTION (len 0)
ED25519_CERT_SIGNED_BY_RSA_INVALID_OPTS = (
b'ssh-ed25519-cert-v01@openssh.com AAAAIHNzaC1lZDI1NTE5LWNlcnQtdjAxQG9wZW5zc2guY29tAAAAIP034YpKn6BDcwxqFnVrKt' +
b'kNX7k6X7hxZ7lADp5LAxHrAAAAII3qYBforim0x87UXpaTDNFnhFTyb+TPCJVQpEAOHTL6AAAAAAAAAAAAAAABAAAABHRlc3QAAAAAAAAAA' +
b'AAAAAD//////////wAAABkAAAAEdGVzdAAAAA0AAAAJdW5kZWZpbmVkAAAADAAAAAR0ZXN0AAAAAAAAAAAAAAEXAAAAB3NzaC1yc2EAAAAD' +
b'AQABAAABAQDKYIJtpFaWpTNNifmuV3DM9BBdngMG28jWPy4C/SoZg4EP7mkYUsG6hN+LgjOL17YEF7bKDEWPl9sQS92iD+AuAPrjnHVQ9VG' +
b'5hbTYiQAaicj6hxqBoNqGQWxDzhZL4B35MgqmoUOBGnzYA/fKgqhRVzOXbWFxKLtzSJzB+Z+kmeoBzq+4MazL4BkoyPZMrIMnvxiluv+kqE' +
b'9SWeJ/5e7WXdtbYTnSR4WN3gW/BMKEoKQk/UGwuPvCiRq+y8LorJP4B1Wfwlm/meqtbTidXyCcQPR9xWpce3rRjLLT6cimUjWrbx7Q1Slsy' +
b'pdkclgPSTu9Jg457am8tnQUgnL7VdetAAABDwAAAAdzc2gtcnNhAAABAMZLNacwOMNexYUaFK1nU0JPQTv4fM73QDG3xURtDsIbI6DAcA1y' +
b'KkvgjJcxlZHx0APJ+i1lWNAvPeOmuPTioymjIEuwxi0VGuAoVKgjmIy6aXH2z3YMxy9cGOq6LNfI4c58iBHR5ejVHAzvIg3rowypVsCGugL' +
b'7WJpz3eypBJt4TglwRTJpp54IMN2CyDQm0N97x9ris8jQQHlCF2EgZp1u4aOiZJTSJ5d4hapO0uZwXOI9AIWy/lmx0/6jX07MWrs4iXpfiF' +
b'5T4s6kEn7YW4SaJ0Z7xGp3V0vDOxh+jwHZGD5GM449Il6QxQwDY5BSJq+iMR467yaIjw2g8Kt4ZiU= ansible@ansible-host'
)
ED25519_FINGERPRINT = b'SHA256:NP4JdfkCopbjwMepq0aPrpMz13cNmEd+uDOxC/j9N40'
# garbage
INVALID_DATA = b'yDspTN+BJzvIK2Q+CRD3qBDVSi+YqSxwyz432VEaHKlXbuLURirY0QpuBCqgR6tCtWW5vEGkXKZ3'
VALID_OPTS = [(b'force-command', b'/usr/bin/csh')]
INVALID_OPTS = [(b'test', b'undefined')]
VALID_EXTENSIONS = [
(b'permit-X11-forwarding', b''),
(b'permit-agent-forwarding', b''),
(b'permit-port-forwarding', b''),
(b'permit-pty', b''),
(b'permit-user-rc', b''),
]
INVALID_EXTENSIONS = [(b'test', b'')]
def test_rsa_certificate(tmpdir):
cert_file = tmpdir / 'id_rsa-cert.pub'
cert_file.write(RSA_CERT_SIGNED_BY_DSA, mode='wb')
cert = OpensshCertificate.load(str(cert_file))
assert cert.cert_info.key_id == b'test'
assert cert.cert_info.serial == 0
assert cert.cert_info.type_string == b'ssh-rsa-cert-v01@openssh.com'
assert cert.cert_info.public_key_fingerprint() == RSA_FINGERPRINT
assert cert.signing_key_fingerprint() == DSA_FINGERPRINT
def test_dsa_certificate(tmpdir):
cert_file = tmpdir / 'id_dsa-cert.pub'
cert_file.write(DSA_CERT_SIGNED_BY_ECDSA_NO_OPTS)
cert = OpensshCertificate.load(str(cert_file))
assert cert.cert_info.type_string == b'ssh-dss-cert-v01@openssh.com'
assert cert.cert_info.public_key_fingerprint() == DSA_FINGERPRINT
assert cert.signing_key_fingerprint() == ECDSA_FINGERPRINT
assert cert.cert_info.critical_options == []
assert cert.cert_info.extensions == []
def test_ecdsa_certificate(tmpdir):
cert_file = tmpdir / 'id_ecdsa-cert.pub'
cert_file.write(ECDSA_CERT_SIGNED_BY_ED25519_VALID_OPTS)
cert = OpensshCertificate.load(str(cert_file))
assert cert.cert_info.type_string == b'ecdsa-sha2-nistp256-cert-v01@openssh.com'
assert cert.cert_info.public_key_fingerprint() == ECDSA_FINGERPRINT
assert cert.signing_key_fingerprint() == ED25519_FINGERPRINT
assert cert.cert_info.critical_options == VALID_OPTS
assert cert.cert_info.extensions == VALID_EXTENSIONS
def test_ed25519_certificate(tmpdir):
cert_file = tmpdir / 'id_ed25519-cert.pub'
cert_file.write(ED25519_CERT_SIGNED_BY_RSA_INVALID_OPTS)
cert = OpensshCertificate.load(str(cert_file))
assert cert.cert_info.type_string == b'ssh-ed25519-cert-v01@openssh.com'
assert cert.cert_info.public_key_fingerprint() == ED25519_FINGERPRINT
assert cert.signing_key_fingerprint() == RSA_FINGERPRINT
assert cert.cert_info.critical_options == INVALID_OPTS
assert cert.cert_info.extensions == INVALID_EXTENSIONS
def test_invalid_data(tmpdir):
result = False
cert_file = tmpdir / 'invalid-cert.pub'
cert_file.write(INVALID_DATA)
try:
OpensshCertificate.load(str(cert_file))
except ValueError:
result = True
assert result

View File

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
parse_openssh_version,
OpensshParser,
_OpensshWriter
)
SSH_VERSION_STRING = "OpenSSH_7.9p1, OpenSSL 1.1.0i-fips 14 Aug 2018"
SSH_VERSION_NUMBER = "7.9"
VALID_BOOLEAN = [
True,
False
]
INVALID_BOOLEAN = [
0x02
]
VALID_UINT32 = [
0x00,
0x01,
0x01234567,
0xFFFFFFFF,
]
INVALID_UINT32 = [
0xFFFFFFFFF,
-1,
]
VALID_UINT64 = [
0x00,
0x01,
0x0123456789ABCDEF,
0xFFFFFFFFFFFFFFFF,
]
INVALID_UINT64 = [
0xFFFFFFFFFFFFFFFFF,
-1,
]
VALID_STRING = [
b'test string',
]
INVALID_STRING = [
[],
]
# See https://datatracker.ietf.org/doc/html/rfc4251#section-5 for examples source
VALID_MPINT = [
0x00,
0x9a378f9b2e332a7,
0x80,
-0x1234,
-0xdeadbeef,
# Additional large int test
0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF,
]
INVALID_MPINT = [
[],
]
def test_parse_openssh_version():
assert parse_openssh_version(SSH_VERSION_STRING) == SSH_VERSION_NUMBER
@pytest.mark.parametrize("boolean", VALID_BOOLEAN)
def test_valid_boolean(boolean):
assert OpensshParser(_OpensshWriter().boolean(boolean).bytes()).boolean() == boolean
@pytest.mark.parametrize("boolean", INVALID_BOOLEAN)
def test_invalid_boolean(boolean):
with pytest.raises(TypeError):
_OpensshWriter().boolean(boolean)
@pytest.mark.parametrize("uint32", VALID_UINT32)
def test_valid_uint32(uint32):
assert OpensshParser(_OpensshWriter().uint32(uint32).bytes()).uint32() == uint32
@pytest.mark.parametrize("uint32", INVALID_UINT32)
def test_invalid_uint32(uint32):
with pytest.raises(ValueError):
_OpensshWriter().uint32(uint32)
@pytest.mark.parametrize("uint64", VALID_UINT64)
def test_valid_uint64(uint64):
assert OpensshParser(_OpensshWriter().uint64(uint64).bytes()).uint64() == uint64
@pytest.mark.parametrize("uint64", INVALID_UINT64)
def test_invalid_uint64(uint64):
with pytest.raises(ValueError):
_OpensshWriter().uint64(uint64)
@pytest.mark.parametrize("ssh_string", VALID_STRING)
def test_valid_string(ssh_string):
assert OpensshParser(_OpensshWriter().string(ssh_string).bytes()).string() == ssh_string
@pytest.mark.parametrize("ssh_string", INVALID_STRING)
def test_invalid_string(ssh_string):
with pytest.raises(TypeError):
_OpensshWriter().string(ssh_string)
@pytest.mark.parametrize("mpint", VALID_MPINT)
def test_valid_mpint(mpint):
assert OpensshParser(_OpensshWriter().mpint(mpint).bytes()).mpint() == mpint
@pytest.mark.parametrize("mpint", INVALID_MPINT)
def test_invalid_mpint(mpint):
with pytest.raises(TypeError):
_OpensshWriter().mpint(mpint)
def test_valid_seek():
buffer = bytearray(b'buffer')
parser = OpensshParser(buffer)
parser.seek(len(buffer))
assert parser.remaining_bytes() == 0
parser.seek(-len(buffer))
assert parser.remaining_bytes() == len(buffer)
def test_invalid_seek():
result = False
buffer = b'buffer'
parser = OpensshParser(buffer)
with pytest.raises(ValueError):
parser.seek(len(buffer) + 1)
with pytest.raises(ValueError):
parser.seek(-1)
def test_writer_bytes():
buffer = bytearray(b'buffer')
assert _OpensshWriter(buffer).bytes() == buffer