Add support for SSH certificates using ecdsa-sk or ed25519-sk public keys

Fixes #796
pull/813/head
Joakim Nohlgård 2024-10-25 14:52:39 +02:00
parent 21e344e283
commit b28f247cd4
2 changed files with 108 additions and 33 deletions

View File

@ -5,6 +5,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later # SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
__metaclass__ = type __metaclass__ = type
# Protocol References # Protocol References
@ -14,6 +15,7 @@ __metaclass__ = type
# https://datatracker.ietf.org/doc/html/rfc5656 # https://datatracker.ietf.org/doc/html/rfc5656
# https://datatracker.ietf.org/doc/html/rfc8032 # https://datatracker.ietf.org/doc/html/rfc8032
# https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD # https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD
# https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.u2f?annotate=HEAD
# #
# Inspired by: # Inspired by:
# ------------ # ------------
@ -52,8 +54,13 @@ _SSH_TYPE_STRINGS = {
'ecdsa-nistp384': b"ecdsa-sha2-nistp384", 'ecdsa-nistp384': b"ecdsa-sha2-nistp384",
'ecdsa-nistp521': b"ecdsa-sha2-nistp521", 'ecdsa-nistp521': b"ecdsa-sha2-nistp521",
'ed25519': b"ssh-ed25519", 'ed25519': b"ssh-ed25519",
# FIDO2 hardware keys
'ecdsa-sk': b"sk-ecdsa-sha2-nistp256",
'ed25519-sk': b"sk-ssh-ed25519",
} }
_CERT_SUFFIX_V01 = b"-cert-v01@openssh.com" _CERT_SUFFIX_V01 = b"-cert-v01@openssh.com"
_SK_SUFFIX = b"@openssh.com"
_SK_PREFIX = b"sk-"
# See https://datatracker.ietf.org/doc/html/rfc5656#section-6.1 # See https://datatracker.ietf.org/doc/html/rfc5656#section-6.1
_ECDSA_CURVE_IDENTIFIERS = { _ECDSA_CURVE_IDENTIFIERS = {
@ -69,7 +76,6 @@ _ECDSA_CURVE_IDENTIFIERS_LOOKUP = {
_USE_TIMEZONE = sys.version_info >= (3, 6) _USE_TIMEZONE = sys.version_info >= (3, 6)
_ALWAYS = _add_or_remove_timezone(datetime(1970, 1, 1), with_timezone=_USE_TIMEZONE) _ALWAYS = _add_or_remove_timezone(datetime(1970, 1, 1), with_timezone=_USE_TIMEZONE)
_FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _UTC) if _USE_TIMEZONE else datetime.max _FOREVER = datetime(9999, 12, 31, 23, 59, 59, 999999, _UTC) if _USE_TIMEZONE else datetime.max
@ -145,7 +151,8 @@ class OpensshCertificateTimeParameters(object):
elif dt == _FOREVER: elif dt == _FOREVER:
result = 'forever' result = 'forever'
else: else:
result = dt.isoformat().replace('+00:00', '') if date_format == 'human_readable' else dt.strftime("%Y%m%d%H%M%S") result = dt.isoformat().replace('+00:00', '') if date_format == 'human_readable' else dt.strftime(
"%Y%m%d%H%M%S")
elif date_format == 'timestamp': elif date_format == 'timestamp':
td = dt - _ALWAYS td = dt - _ALWAYS
result = int((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6) result = int((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6)
@ -196,7 +203,8 @@ class OpensshCertificateTimeParameters(object):
else: else:
for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try: try:
result = _add_or_remove_timezone(datetime.strptime(time_string, time_format), with_timezone=_USE_TIMEZONE) result = _add_or_remove_timezone(datetime.strptime(time_string, time_format),
with_timezone=_USE_TIMEZONE)
except ValueError: except ValueError:
pass pass
if result is None: if result is None:
@ -279,6 +287,7 @@ class OpensshCertificateOption(object):
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class OpensshCertificateInfo: class OpensshCertificateInfo:
"""Encapsulates all certificate information which is signed by a CA key""" """Encapsulates all certificate information which is signed by a CA key"""
def __init__(self, def __init__(self,
nonce=None, nonce=None,
serial=None, serial=None,
@ -304,6 +313,7 @@ class OpensshCertificateInfo:
self.signing_key = signing_key self.signing_key = signing_key
self.type_string = None self.type_string = None
self.public_key_type_string = None
@property @property
def cert_type(self): def cert_type(self):
@ -326,8 +336,17 @@ class OpensshCertificateInfo:
def signing_key_fingerprint(self): def signing_key_fingerprint(self):
return fingerprint(self.signing_key) return fingerprint(self.signing_key)
@abc.abstractmethod
def public_key_fingerprint(self): def public_key_fingerprint(self):
if self.public_key_type_string is None:
return b''
writer = _OpensshWriter()
writer.string(self.public_key_type_string)
self.write_public_key_params(writer)
return fingerprint(writer.bytes())
@abc.abstractmethod
def write_public_key_params(self, writer):
pass pass
@abc.abstractmethod @abc.abstractmethod
@ -339,21 +358,18 @@ class OpensshRSACertificateInfo(OpensshCertificateInfo):
def __init__(self, e=None, n=None, **kwargs): def __init__(self, e=None, n=None, **kwargs):
super(OpensshRSACertificateInfo, self).__init__(**kwargs) super(OpensshRSACertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['rsa'] + _CERT_SUFFIX_V01 self.type_string = _SSH_TYPE_STRINGS['rsa'] + _CERT_SUFFIX_V01
self.public_key_type_string = _SSH_TYPE_STRINGS['rsa']
self.e = e self.e = e
self.n = n self.n = n
# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 # See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
def public_key_fingerprint(self): def write_public_key_params(self, writer):
if any([self.e is None, self.n is None]): if any([self.e is None, self.n is None]):
return b'' return
writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS['rsa'])
writer.mpint(self.e) writer.mpint(self.e)
writer.mpint(self.n) writer.mpint(self.n)
return fingerprint(writer.bytes())
def parse_public_numbers(self, parser): def parse_public_numbers(self, parser):
self.e = parser.mpint() self.e = parser.mpint()
self.n = parser.mpint() self.n = parser.mpint()
@ -363,25 +379,22 @@ class OpensshDSACertificateInfo(OpensshCertificateInfo):
def __init__(self, p=None, q=None, g=None, y=None, **kwargs): def __init__(self, p=None, q=None, g=None, y=None, **kwargs):
super(OpensshDSACertificateInfo, self).__init__(**kwargs) super(OpensshDSACertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['dsa'] + _CERT_SUFFIX_V01 self.type_string = _SSH_TYPE_STRINGS['dsa'] + _CERT_SUFFIX_V01
self.public_key_type_string = _SSH_TYPE_STRINGS['dsa']
self.p = p self.p = p
self.q = q self.q = q
self.g = g self.g = g
self.y = y self.y = y
# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 # See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
def public_key_fingerprint(self): def write_public_key_params(self, writer):
if any([self.p is None, self.q is None, self.g is None, self.y is None]): if any([self.p is None, self.q is None, self.g is None, self.y is None]):
return b'' return
writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS['dsa'])
writer.mpint(self.p) writer.mpint(self.p)
writer.mpint(self.q) writer.mpint(self.q)
writer.mpint(self.g) writer.mpint(self.g)
writer.mpint(self.y) writer.mpint(self.y)
return fingerprint(writer.bytes())
def parse_public_numbers(self, parser): def parse_public_numbers(self, parser):
self.p = parser.mpint() self.p = parser.mpint()
self.q = parser.mpint() self.q = parser.mpint()
@ -406,6 +419,7 @@ class OpensshECDSACertificateInfo(OpensshCertificateInfo):
def curve(self, curve): def curve(self, curve):
if curve in _ECDSA_CURVE_IDENTIFIERS.values(): if curve in _ECDSA_CURVE_IDENTIFIERS.values():
self._curve = curve self._curve = curve
self.public_key_type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]]
self.type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]] + _CERT_SUFFIX_V01 self.type_string = _SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[curve]] + _CERT_SUFFIX_V01
else: else:
raise ValueError( raise ValueError(
@ -413,17 +427,13 @@ class OpensshECDSACertificateInfo(OpensshCertificateInfo):
) )
# See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 # See https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
def public_key_fingerprint(self): def write_public_key_params(self, writer):
if any([self.curve is None, self.public_key is None]): if any([self.curve is None, self.public_key is None]):
return b'' return
writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS[_ECDSA_CURVE_IDENTIFIERS_LOOKUP[self.curve]])
writer.string(self.curve) writer.string(self.curve)
writer.string(self.public_key) writer.string(self.public_key)
return fingerprint(writer.bytes())
def parse_public_numbers(self, parser): def parse_public_numbers(self, parser):
self.curve = parser.string() self.curve = parser.string()
self.public_key = parser.string() self.public_key = parser.string()
@ -433,25 +443,74 @@ class OpensshED25519CertificateInfo(OpensshCertificateInfo):
def __init__(self, pk=None, **kwargs): def __init__(self, pk=None, **kwargs):
super(OpensshED25519CertificateInfo, self).__init__(**kwargs) super(OpensshED25519CertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['ed25519'] + _CERT_SUFFIX_V01 self.type_string = _SSH_TYPE_STRINGS['ed25519'] + _CERT_SUFFIX_V01
self.public_key_type_string = _SSH_TYPE_STRINGS['ed25519']
self.pk = pk self.pk = pk
def public_key_fingerprint(self): def write_public_key_params(self, writer):
if self.pk is None: if self.pk is None:
return b'' return
writer = _OpensshWriter()
writer.string(_SSH_TYPE_STRINGS['ed25519'])
writer.string(self.pk) writer.string(self.pk)
return fingerprint(writer.bytes())
def parse_public_numbers(self, parser): def parse_public_numbers(self, parser):
self.pk = parser.string() self.pk = parser.string()
class OpensshSKECDSACertificateInfo(OpensshECDSACertificateInfo):
def __init__(self, application=b"ssh:", curve=None, **kwargs):
if curve is None:
curve = b'nistp256'
super(OpensshSKECDSACertificateInfo, self).__init__(curve=curve, **kwargs)
self.type_string = _SSH_TYPE_STRINGS['ecdsa-sk'] + _CERT_SUFFIX_V01
self.public_key_type_string = _SSH_TYPE_STRINGS['ecdsa-sk'] + _SK_SUFFIX
self.application = application
def write_public_key_params(self, writer):
if self.application is None:
return
super(OpensshSKECDSACertificateInfo, self).write_public_key_params(writer)
writer.string(self.application)
def parse_public_numbers(self, parser):
super(OpensshSKECDSACertificateInfo, self).parse_public_numbers(parser)
self.application = parser.string()
@OpensshECDSACertificateInfo.curve.setter
def curve(self, curve):
if curve != b'nistp256':
raise ValueError(
"ecdsa-sk only supports curve nistp256"
)
self._curve = curve
class OpensshSKED25519CertificateInfo(OpensshED25519CertificateInfo):
def __init__(self, application=b"ssh:", **kwargs):
super(OpensshSKED25519CertificateInfo, self).__init__(**kwargs)
self.type_string = _SSH_TYPE_STRINGS['ed25519-sk'] + _CERT_SUFFIX_V01
self.public_key_type_string = _SSH_TYPE_STRINGS['ed25519-sk'] + _SK_SUFFIX
self.application = application
def write_public_key_params(self, writer):
if self.application is None:
return
super(OpensshSKED25519CertificateInfo, self).write_public_key_params(writer)
writer.string(self.application)
def parse_public_numbers(self, parser):
super(OpensshSKED25519CertificateInfo, self).parse_public_numbers(parser)
self.application = parser.string()
# See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD # See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD
class OpensshCertificate(object): class OpensshCertificate(object):
"""Encapsulates a formatted OpenSSH certificate including signature and signing key""" """Encapsulates a formatted OpenSSH certificate including signature and signing key"""
def __init__(self, cert_info, signature): def __init__(self, cert_info, signature):
self._cert_info = cert_info self._cert_info = cert_info
@ -635,10 +694,14 @@ def get_cert_info_object(key_type):
cert_info = OpensshRSACertificateInfo() cert_info = OpensshRSACertificateInfo()
elif key_type == 'dsa': elif key_type == 'dsa':
cert_info = OpensshDSACertificateInfo() cert_info = OpensshDSACertificateInfo()
elif key_type in ('ecdsa-nistp256', 'ecdsa-nistp384', 'ecdsa-nistp521'): elif key_type in _ECDSA_CURVE_IDENTIFIERS:
cert_info = OpensshECDSACertificateInfo() cert_info = OpensshECDSACertificateInfo()
elif key_type == 'ed25519': elif key_type == 'ed25519':
cert_info = OpensshED25519CertificateInfo() cert_info = OpensshED25519CertificateInfo()
elif key_type == 'ecdsa-sk':
cert_info = OpensshSKECDSACertificateInfo()
elif key_type == 'ed25519-sk':
cert_info = OpensshSKED25519CertificateInfo()
else: else:
raise ValueError("%s is not a valid key type" % key_type) raise ValueError("%s is not a valid key type" % key_type)

View File

@ -87,6 +87,7 @@ def secure_write(path, mode, content):
class OpensshParser(object): class OpensshParser(object):
"""Parser for OpenSSH encoded objects""" """Parser for OpenSSH encoded objects"""
BOOLEAN_OFFSET = 1 BOOLEAN_OFFSET = 1
UINT8_OFFSET = 1
UINT32_OFFSET = 4 UINT32_OFFSET = 4
UINT64_OFFSET = 8 UINT64_OFFSET = 8
@ -104,6 +105,13 @@ class OpensshParser(object):
self._pos = next_pos self._pos = next_pos
return value return value
def uint8(self):
next_pos = self._check_position(self.UINT8_OFFSET)
value = _UBYTE.unpack(self._data[self._pos:next_pos])[0]
self._pos = next_pos
return value
def uint32(self): def uint32(self):
next_pos = self._check_position(self.UINT32_OFFSET) next_pos = self._check_position(self.UINT32_OFFSET)
@ -189,7 +197,6 @@ class OpensshParser(object):
signature_type = parser.string() signature_type = parser.string()
signature_blob = parser.string() signature_blob = parser.string()
blob_parser = cls(signature_blob)
if signature_type in (b'ssh-rsa', b'rsa-sha2-256', b'rsa-sha2-512'): if signature_type in (b'ssh-rsa', b'rsa-sha2-256', b'rsa-sha2-512'):
# https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 # https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
# https://datatracker.ietf.org/doc/html/rfc8332#section-3 # https://datatracker.ietf.org/doc/html/rfc8332#section-3
@ -198,17 +205,22 @@ class OpensshParser(object):
# https://datatracker.ietf.org/doc/html/rfc4253#section-6.6 # https://datatracker.ietf.org/doc/html/rfc4253#section-6.6
signature_data['r'] = cls._big_int(signature_blob[:20], "big") signature_data['r'] = cls._big_int(signature_blob[:20], "big")
signature_data['s'] = cls._big_int(signature_blob[20:], "big") signature_data['s'] = cls._big_int(signature_blob[20:], "big")
elif signature_type in (b'ecdsa-sha2-nistp256', b'ecdsa-sha2-nistp384', b'ecdsa-sha2-nistp521'): elif signature_type in (b'ecdsa-sha2-nistp256', b'ecdsa-sha2-nistp384', b'ecdsa-sha2-nistp521', b'sk-ecdsa-sha2-nistp256@openssh.com'):
# https://datatracker.ietf.org/doc/html/rfc5656#section-3.1.2 # https://datatracker.ietf.org/doc/html/rfc5656#section-3.1.2
blob_parser = cls(signature_blob)
signature_data['r'] = blob_parser.mpint() signature_data['r'] = blob_parser.mpint()
signature_data['s'] = blob_parser.mpint() signature_data['s'] = blob_parser.mpint()
elif signature_type == b'ssh-ed25519': elif signature_type in (b'ssh-ed25519', b'sk-ssh-ed25519@openssh.com'):
# https://datatracker.ietf.org/doc/html/rfc8032#section-5.1.2 # https://datatracker.ietf.org/doc/html/rfc8032#section-5.1.2
signature_data['R'] = cls._big_int(signature_blob[:32], "little") signature_data['R'] = cls._big_int(signature_blob[:32], "little")
signature_data['S'] = cls._big_int(signature_blob[32:], "little") signature_data['S'] = cls._big_int(signature_blob[32:], "little")
else: else:
raise ValueError("%s is not a valid signature type" % signature_type) raise ValueError("%s is not a valid signature type" % signature_type)
if signature_type.startswith(b'sk-'):
signature_data['signature_flags'] = parser.uint8()
signature_data['signature_counter'] = parser.uint32()
signature_data['signature_type'] = signature_type signature_data['signature_type'] = signature_type
return signature_data return signature_data