diff --git a/plugins/module_utils/openssh/certificate.py b/plugins/module_utils/openssh/certificate.py index 8efb2ad9..e3908ff7 100644 --- a/plugins/module_utils/openssh/certificate.py +++ b/plugins/module_utils/openssh/certificate.py @@ -5,6 +5,7 @@ # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import absolute_import, division, print_function + __metaclass__ = type # Protocol References @@ -14,6 +15,7 @@ __metaclass__ = type # https://datatracker.ietf.org/doc/html/rfc5656 # https://datatracker.ietf.org/doc/html/rfc8032 # https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD +# https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.u2f?annotate=HEAD # # Inspired by: # ------------ @@ -52,8 +54,13 @@ _SSH_TYPE_STRINGS = { 'ecdsa-nistp384': b"ecdsa-sha2-nistp384", 'ecdsa-nistp521': b"ecdsa-sha2-nistp521", 'ed25519': b"ssh-ed25519", + # FIDO2 hardware keys + 'ecdsa-sk': b"sk-ecdsa-sha2-nistp256", + 'ed25519-sk': b"sk-ssh-ed25519", } _CERT_SUFFIX_V01 = b"-cert-v01@openssh.com" +_SK_SUFFIX = b"@openssh.com" +_SK_PREFIX = b"sk-" # See https://datatracker.ietf.org/doc/html/rfc5656#section-6.1 _ECDSA_CURVE_IDENTIFIERS = { @@ -69,7 +76,6 @@ _ECDSA_CURVE_IDENTIFIERS_LOOKUP = { _USE_TIMEZONE = sys.version_info >= (3, 6) - _ALWAYS = _add_or_remove_timezone(datetime(1970, 1, 1), with_timezone=_USE_TIMEZONE) _FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _UTC) if _USE_TIMEZONE else datetime.max @@ -145,7 +151,8 @@ class OpensshCertificateTimeParameters(object): elif dt == _FOREVER: result = 'forever' else: - result = dt.isoformat().replace('+00:00', '') if date_format == 'human_readable' else dt.strftime("%Y%m%d%H%M%S") + result = dt.isoformat().replace('+00:00', '') if date_format == 'human_readable' else dt.strftime( + "%Y%m%d%H%M%S") elif date_format == 'timestamp': td = dt - _ALWAYS result = int((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6) @@ -196,7 +203,8 @@ class OpensshCertificateTimeParameters(object): else: for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: - result = _add_or_remove_timezone(datetime.strptime(time_string, time_format), with_timezone=_USE_TIMEZONE) + result = _add_or_remove_timezone(datetime.strptime(time_string, time_format), + with_timezone=_USE_TIMEZONE) except ValueError: pass if result is None: @@ -279,6 +287,7 @@ class OpensshCertificateOption(object): @six.add_metaclass(abc.ABCMeta) class OpensshCertificateInfo: """Encapsulates all certificate information which is signed by a CA key""" + def __init__(self, nonce=None, serial=None, @@ -304,6 +313,7 @@ class OpensshCertificateInfo: self.signing_key = signing_key self.type_string = None + self.public_key_type_string = None @property def cert_type(self): @@ -326,8 +336,17 @@ class OpensshCertificateInfo: def signing_key_fingerprint(self): return fingerprint(self.signing_key) - @abc.abstractmethod def public_key_fingerprint(self): + if self.public_key_type_string is None: + return b'' + writer = _OpensshWriter() + + writer.string(self.public_key_type_string) + self.write_public_key_params(writer) + return fingerprint(writer.bytes()) + + @abc.abstractmethod + def write_public_key_params(self, writer): pass @abc.abstractmethod @@ -339,21 +358,18 @@ class OpensshRSACertificateInfo(OpensshCertificateInfo): def __init__(self, e=None, n=None, **kwargs): super(OpensshRSACertificateInfo, self).__init__(**kwargs) self.type_string = _SSH_TYPE_STRINGS['rsa'] + _CERT_SUFFIX_V01 + self.public_key_type_string = _SSH_TYPE_STRINGS['rsa'] self.e = e self.n = n # See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 - def public_key_fingerprint(self): + def write_public_key_params(self, writer): if any([self.e is None, self.n is None]): - return b'' + return - writer = _OpensshWriter() - writer.string(_SSH_TYPE_STRINGS['rsa']) writer.mpint(self.e) writer.mpint(self.n) - return fingerprint(writer.bytes()) - def parse_public_numbers(self, parser): self.e = parser.mpint() self.n = parser.mpint() @@ -363,25 +379,22 @@ class OpensshDSACertificateInfo(OpensshCertificateInfo): def __init__(self, p=None, q=None, g=None, y=None, **kwargs): super(OpensshDSACertificateInfo, self).__init__(**kwargs) self.type_string = _SSH_TYPE_STRINGS['dsa'] + _CERT_SUFFIX_V01 + self.public_key_type_string = _SSH_TYPE_STRINGS['dsa'] self.p = p self.q = q self.g = g self.y = y # See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 - def public_key_fingerprint(self): + def write_public_key_params(self, writer): if any([self.p is None, self.q is None, self.g is None, self.y is None]): - return b'' + return - writer = _OpensshWriter() - writer.string(_SSH_TYPE_STRINGS['dsa']) writer.mpint(self.p) writer.mpint(self.q) writer.mpint(self.g) writer.mpint(self.y) - return fingerprint(writer.bytes()) - def parse_public_numbers(self, parser): self.p = parser.mpint() self.q = parser.mpint() @@ -406,6 +419,7 @@ class OpensshECDSACertificateInfo(OpensshCertificateInfo): def curve(self, curve): if curve in _ECDSA_CURVE_IDENTIFIERS.values(): self._curve = curve + self.public_key_type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]] self.type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]] + _CERT_SUFFIX_V01 else: raise ValueError( @@ -413,17 +427,13 @@ class OpensshECDSACertificateInfo(OpensshCertificateInfo): ) # See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 - def public_key_fingerprint(self): + def write_public_key_params(self, writer): if any([self.curve is None, self.public_key is None]): - return b'' + return - writer = _OpensshWriter() - writer.string(_SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[self.curve]]) writer.string(self.curve) writer.string(self.public_key) - return fingerprint(writer.bytes()) - def parse_public_numbers(self, parser): self.curve = parser.string() self.public_key = parser.string() @@ -433,25 +443,74 @@ class OpensshED25519CertificateInfo(OpensshCertificateInfo): def __init__(self, pk=None, **kwargs): super(OpensshED25519CertificateInfo, self).__init__(**kwargs) self.type_string = _SSH_TYPE_STRINGS['ed25519'] + _CERT_SUFFIX_V01 + self.public_key_type_string = _SSH_TYPE_STRINGS['ed25519'] self.pk = pk - def public_key_fingerprint(self): + def write_public_key_params(self, writer): if self.pk is None: - return b'' - - writer = _OpensshWriter() - writer.string(_SSH_TYPE_STRINGS['ed25519']) + return writer.string(self.pk) - return fingerprint(writer.bytes()) - def parse_public_numbers(self, parser): self.pk = parser.string() +class OpensshSKECDSACertificateInfo(OpensshECDSACertificateInfo): + def __init__(self, application=b"ssh:", curve=None, **kwargs): + if curve is None: + curve = b'nistp256' + super(OpensshSKECDSACertificateInfo, self).__init__(curve=curve, **kwargs) + + self.type_string = _SSH_TYPE_STRINGS['ecdsa-sk'] + _CERT_SUFFIX_V01 + self.public_key_type_string = _SSH_TYPE_STRINGS['ecdsa-sk'] + _SK_SUFFIX + self.application = application + + def write_public_key_params(self, writer): + if self.application is None: + return + + super(OpensshSKECDSACertificateInfo, self).write_public_key_params(writer) + + writer.string(self.application) + + def parse_public_numbers(self, parser): + super(OpensshSKECDSACertificateInfo, self).parse_public_numbers(parser) + + self.application = parser.string() + + @OpensshECDSACertificateInfo.curve.setter + def curve(self, curve): + if curve != b'nistp256': + raise ValueError( + "ecdsa-sk only supports curve nistp256" + ) + self._curve = curve + +class OpensshSKED25519CertificateInfo(OpensshED25519CertificateInfo): + def __init__(self, application=b"ssh:", **kwargs): + super(OpensshSKED25519CertificateInfo, self).__init__(**kwargs) + self.type_string = _SSH_TYPE_STRINGS['ed25519-sk'] + _CERT_SUFFIX_V01 + self.public_key_type_string = _SSH_TYPE_STRINGS['ed25519-sk'] + _SK_SUFFIX + self.application = application + + def write_public_key_params(self, writer): + if self.application is None: + return + + super(OpensshSKED25519CertificateInfo, self).write_public_key_params(writer) + + writer.string(self.application) + + def parse_public_numbers(self, parser): + super(OpensshSKED25519CertificateInfo, self).parse_public_numbers(parser) + + self.application = parser.string() + + # See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD class OpensshCertificate(object): """Encapsulates a formatted OpenSSH certificate including signature and signing key""" + def __init__(self, cert_info, signature): self._cert_info = cert_info @@ -635,10 +694,14 @@ def get_cert_info_object(key_type): cert_info = OpensshRSACertificateInfo() elif key_type == 'dsa': cert_info = OpensshDSACertificateInfo() - elif key_type in ('ecdsa-nistp256', 'ecdsa-nistp384', 'ecdsa-nistp521'): + elif key_type in _ECDSA_CURVE_IDENTIFIERS: cert_info = OpensshECDSACertificateInfo() elif key_type == 'ed25519': cert_info = OpensshED25519CertificateInfo() + elif key_type == 'ecdsa-sk': + cert_info = OpensshSKECDSACertificateInfo() + elif key_type == 'ed25519-sk': + cert_info = OpensshSKED25519CertificateInfo() else: raise ValueError("%s is not a valid key type" % key_type) diff --git a/plugins/module_utils/openssh/utils.py b/plugins/module_utils/openssh/utils.py index 0c3af8f2..683998aa 100644 --- a/plugins/module_utils/openssh/utils.py +++ b/plugins/module_utils/openssh/utils.py @@ -87,6 +87,7 @@ def secure_write(path, mode, content): class OpensshParser(object): """Parser for OpenSSH encoded objects""" BOOLEAN_OFFSET = 1 + UINT8_OFFSET = 1 UINT32_OFFSET = 4 UINT64_OFFSET = 8 @@ -104,6 +105,13 @@ class OpensshParser(object): self._pos = next_pos return value + def uint8(self): + next_pos = self._check_position(self.UINT8_OFFSET) + + value = _UBYTE.unpack(self._data[self._pos:next_pos])[0] + self._pos = next_pos + return value + def uint32(self): next_pos = self._check_position(self.UINT32_OFFSET) @@ -189,7 +197,6 @@ class OpensshParser(object): signature_type = parser.string() signature_blob = parser.string() - blob_parser = cls(signature_blob) if signature_type in (b'ssh-rsa', b'rsa-sha2-256', b'rsa-sha2-512'): # https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 # https://datatracker.ietf.org/doc/html/rfc8332#section-3 @@ -198,17 +205,22 @@ class OpensshParser(object): # https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 signature_data['r'] = cls._big_int(signature_blob[:20], "big") signature_data['s'] = cls._big_int(signature_blob[20:], "big") - elif signature_type in (b'ecdsa-sha2-nistp256', b'ecdsa-sha2-nistp384', b'ecdsa-sha2-nistp521'): + elif signature_type in (b'ecdsa-sha2-nistp256', b'ecdsa-sha2-nistp384', b'ecdsa-sha2-nistp521', b'sk-ecdsa-sha2-nistp256@openssh.com'): # https://datatracker.ietf.org/doc/html/rfc5656#section-3.1.2 + blob_parser = cls(signature_blob) signature_data['r'] = blob_parser.mpint() signature_data['s'] = blob_parser.mpint() - elif signature_type == b'ssh-ed25519': + elif signature_type in (b'ssh-ed25519', b'sk-ssh-ed25519@openssh.com'): # https://datatracker.ietf.org/doc/html/rfc8032#section-5.1.2 signature_data['R'] = cls._big_int(signature_blob[:32], "little") signature_data['S'] = cls._big_int(signature_blob[32:], "little") else: raise ValueError("%s is not a valid signature type" % signature_type) + if signature_type.startswith(b'sk-'): + signature_data['signature_flags'] = parser.uint8() + signature_data['signature_counter'] = parser.uint32() + signature_data['signature_type'] = signature_type return signature_data