openssh_keypair - Adding backend option and refactoring backend code (#236)

* Refactoring openssh_keypair for multiple backends

* Fixing cryptography backend validations

* Simplifying conditionals and excess variable assignments

* Fixing docs and adding cleanup for integration tests

* Fixing docs and public key validation bugs in crypto backend

* Enhancing cryptogagraphy utils to raise OpenSSHErrors when file not found

* Adding missed copyright and cleanup for idempotency test keys

* Fixing doc style

* Readding crypto/openssh for backwards compatibility

* Adding changelog fragment and final simplifications of conditional statements

* Applied initial review suggestions
pull/239/head
Ajpantuso 2021-05-23 16:36:55 -04:00 committed by GitHub
parent 2bf0bb5fb3
commit c6483751b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1212 additions and 997 deletions

View File

@ -0,0 +1,2 @@
minor_changes:
- openssh_keypair - added ``backend`` parameter for selecting between the cryptography library or the OpenSSH binary for the execution of actions performed by ``openssh_keypair`` (https://github.com/ansible-collections/community.crypto/pull/236).

View File

@ -18,19 +18,7 @@
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
__metaclass__ = type __metaclass__ = type
# This import is only to maintain backwards compatibility
import re from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
parse_openssh_version
)
def parse_openssh_version(version_string):
"""Parse the version output of ssh -V and return version numbers that can be compared"""
parsed_result = re.match(
r"^.*openssh_(?P<version>[0-9.]+)(p?[0-9]+)[^0-9]*.*$", version_string.lower()
)
if parsed_result is not None:
version = parsed_result.group("version").strip()
else:
version = None
return version

View File

@ -0,0 +1,475 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2018, David Kainz <dkainz@mgit.at> <dave.jokain@gmx.at>
# Copyright: (c) 2021, Andrew Pantuso (@ajpantuso) <ajpantuso@gmail.com>
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import abc
import errno
import os
import stat
from distutils.version import LooseVersion
from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_native, to_text, to_bytes
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import parse_openssh_version
from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptography import (
HAS_OPENSSH_SUPPORT,
HAS_OPENSSH_PRIVATE_FORMAT,
InvalidCommentError,
InvalidPassphraseError,
InvalidPrivateKeyFileError,
OpenSSHError,
OpensshKeypair,
)
@six.add_metaclass(abc.ABCMeta)
class KeypairBackend(object):
def __init__(self, module):
self.module = module
self.path = module.params['path']
self.force = module.params['force']
self.size = module.params['size']
self.type = module.params['type']
self.comment = module.params['comment']
self.passphrase = module.params['passphrase']
self.regenerate = module.params['regenerate']
self.changed = False
self.fingerprint = ''
self.public_key = {}
if self.regenerate == 'always':
self.force = True
if self.type in ('rsa', 'rsa1'):
self.size = 4096 if self.size is None else self.size
if self.size < 1024:
module.fail_json(msg=('For RSA keys, the minimum size is 1024 bits and the default is 4096 bits. '
'Attempting to use bit lengths under 1024 will cause the module to fail.'))
elif self.type == 'dsa':
self.size = 1024 if self.size is None else self.size
if self.size != 1024:
module.fail_json(msg=('DSA keys must be exactly 1024 bits as specified by FIPS 186-2.'))
elif self.type == 'ecdsa':
self.size = 256 if self.size is None else self.size
if self.size not in (256, 384, 521):
module.fail_json(msg=('For ECDSA keys, size determines the key length by selecting from '
'one of three elliptic curve sizes: 256, 384 or 521 bits. '
'Attempting to use bit lengths other than these three values for '
'ECDSA keys will cause this module to fail. '))
elif self.type == 'ed25519':
# User input is ignored for `key size` when `key type` is ed25519
self.size = 256
else:
module.fail_json(msg="%s is not a valid value for key type" % self.type)
def generate(self):
if self.force or not self.is_private_key_valid(perms_required=False):
try:
if self.exists() and not os.access(self.path, os.W_OK):
os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR)
self._generate_keypair()
self.changed = True
except (IOError, OSError) as e:
self.remove()
self.module.fail_json(msg="%s" % to_native(e))
self.fingerprint = self._get_current_key_properties()[2]
self.public_key = self._get_public_key()
elif not self.is_public_key_valid(perms_required=False):
pubkey = self._get_public_key()
try:
with open(self.path + ".pub", "w") as pubkey_f:
pubkey_f.write(pubkey + '\n')
os.chmod(self.path + ".pub", stat.S_IWUSR + stat.S_IRUSR + stat.S_IRGRP + stat.S_IROTH)
except (IOError, OSError):
self.module.fail_json(
msg='The public key is missing or does not match the private key. '
'Unable to regenerate the public key.')
self.changed = True
self.public_key = pubkey
if self.comment:
try:
if self.exists() and not os.access(self.path, os.W_OK):
os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR)
except (IOError, OSError):
self.module.fail_json(msg='Unable to update the comment for the public key.')
self._update_comment()
if self._permissions_changed() or self._permissions_changed(public_key=True):
self.changed = True
def is_private_key_valid(self, perms_required=True):
if not self.exists():
return False
if self._check_pass_protected_or_broken_key():
if self.regenerate in ('full_idempotence', 'always'):
return False
self.module.fail_json(msg='Unable to read the key. The key is protected with a passphrase or broken.'
' Will not proceed. To force regeneration, call the module with `generate`'
' set to `full_idempotence` or `always`, or with `force=yes`.')
if not self._private_key_loadable():
if os.path.isdir(self.path):
self.module.fail_json(msg='%s is a directory. Please specify a path to a file.' % self.path)
if self.regenerate in ('full_idempotence', 'always'):
return False
self.module.fail_json(msg='Unable to read the key. The key is protected with a passphrase or broken.'
' Will not proceed. To force regeneration, call the module with `generate`'
' set to `full_idempotence` or `always`, or with `force=yes`.')
keysize, keytype, self.fingerprint = self._get_current_key_properties()
if self.regenerate == 'never':
return True
if not (self.type == keytype and self.size == keysize):
if self.regenerate in ('partial_idempotence', 'full_idempotence', 'always'):
return False
self.module.fail_json(
msg='Key has wrong type and/or size.'
' Will not proceed. To force regeneration, call the module with `generate`'
' set to `partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`.'
)
# Perms required short-circuits evaluation to prevent the side-effects of running _permissions_changed
# when check_mode is not enabled
return not (perms_required and self._permissions_changed())
def is_public_key_valid(self, perms_required=True):
def _get_pubkey_content():
if self.exists(public_key=True):
with open(self.path + ".pub", "r") as pubkey_f:
present_pubkey = pubkey_f.read().strip(' \n')
return present_pubkey
else:
return ''
def _parse_pubkey(pubkey_content):
if pubkey_content:
parts = pubkey_content.split(' ', 2)
if len(parts) < 2:
return ()
return parts[0], parts[1], '' if len(parts) <= 2 else parts[2]
return ()
def _pubkey_valid(pubkey):
if pubkey_parts and _parse_pubkey(pubkey):
return pubkey_parts[:2] == _parse_pubkey(pubkey)[:2]
return False
def _comment_valid():
if pubkey_parts:
return pubkey_parts[2] == self.comment
return False
pubkey_parts = _parse_pubkey(_get_pubkey_content())
pubkey = self._get_public_key()
if _pubkey_valid(pubkey):
self.public_key = pubkey
else:
return False
if self.comment and not _comment_valid():
return False
# Perms required short-circuits evaluation to prevent the side-effects of running _permissions_changes
# when check_mode is not enabled
return not (perms_required and self._permissions_changed(public_key=True))
def _permissions_changed(self, public_key=False):
file_args = self.module.load_file_common_arguments(self.module.params)
if public_key:
file_args['path'] = file_args['path'] + '.pub'
return self.module.set_fs_attributes_if_different(file_args, False)
@property
def result(self):
return {
'changed': self.changed,
'size': self.size,
'type': self.type,
'filename': self.path,
'fingerprint': self.fingerprint if self.fingerprint else '',
'public_key': self.public_key,
'comment': self.comment if self.comment else '',
}
def remove(self):
"""Remove the resource from the filesystem."""
try:
os.remove(self.path)
self.changed = True
except (IOError, OSError) as exc:
if exc.errno != errno.ENOENT:
self.module.fail_json(msg=to_native(exc))
else:
pass
if self.exists(public_key=True):
try:
os.remove(self.path + ".pub")
self.changed = True
except (IOError, OSError) as exc:
if exc.errno != errno.ENOENT:
self.module.fail_json(msg=to_native(exc))
else:
pass
def exists(self, public_key=False):
return os.path.exists(self.path if not public_key else self.path + ".pub")
@abc.abstractmethod
def _generate_keypair(self):
pass
@abc.abstractmethod
def _get_current_key_properties(self):
pass
@abc.abstractmethod
def _get_public_key(self):
pass
@abc.abstractmethod
def _update_comment(self):
pass
@abc.abstractmethod
def _private_key_loadable(self):
pass
@abc.abstractmethod
def _check_pass_protected_or_broken_key(self):
pass
class KeypairBackendOpensshBin(KeypairBackend):
def __init__(self, module):
super(KeypairBackendOpensshBin, self).__init__(module)
self.openssh_bin = module.get_bin_path('ssh-keygen')
def _load_privatekey(self):
return self.module.run_command([self.openssh_bin, '-lf', self.path])
def _get_publickey_from_privatekey(self):
# -P '' is always included as an option to induce the expected standard output for
# _check_pass_protected_or_broken_key, but introduces no side-effects when used to
# output a matching public key
return self.module.run_command([self.openssh_bin, '-P', '', '-yf', self.path])
def _generate_keypair(self):
args = [
self.openssh_bin,
'-q',
'-N', '',
'-b', str(self.size),
'-t', self.type,
'-f', self.path,
'-C', self.comment if self.comment else ''
]
# "y" must be entered in response to the "overwrite" prompt
stdin_data = 'y' if self.exists() else None
self.module.run_command(args, data=stdin_data)
def _get_current_key_properties(self):
rc, stdout, stderr = self._load_privatekey()
properties = stdout.split()
keysize = int(properties[0])
fingerprint = properties[1]
keytype = properties[-1][1:-1].lower()
return keysize, keytype, fingerprint
def _get_public_key(self):
rc, stdout, stderr = self._get_publickey_from_privatekey()
return stdout.strip('\n')
def _update_comment(self):
return self.module.run_command([self.openssh_bin, '-q', '-o', '-c', '-C', self.comment, '-f', self.path])
def _private_key_loadable(self):
rc, stdout, stderr = self._load_privatekey()
return rc == 0
def _check_pass_protected_or_broken_key(self):
rc, stdout, stderr = self._get_publickey_from_privatekey()
return rc == 255 or any_in(stderr, 'is not a public key file', 'incorrect passphrase', 'load failed')
class KeypairBackendCryptography(KeypairBackend):
def __init__(self, module):
super(KeypairBackendCryptography, self).__init__(module)
if module.params['private_key_format'] == 'auto':
ssh = module.get_bin_path('ssh')
if ssh:
proc = module.run_command([ssh, '-Vq'])
ssh_version = parse_openssh_version(proc[2].strip())
else:
# Default to OpenSSH 7.8 compatibility when OpenSSH is not installed
ssh_version = "7.8"
self.private_key_format = 'SSH'
if LooseVersion(ssh_version) < LooseVersion("7.8") and self.type != 'ed25519':
# OpenSSH made SSH formatted private keys available in version 6.5,
# but still defaulted to PKCS1 format with the exception of ed25519 keys
self.private_key_format = 'PKCS1'
if self.private_key_format == 'SSH' and not HAS_OPENSSH_PRIVATE_FORMAT:
module.fail_json(
msg=missing_required_lib(
'cryptography >= 3.0',
reason="to load/dump private keys in the default OpenSSH format for OpenSSH >= 7.8 " +
"or for ed25519 keys"
)
)
if self.type == 'rsa1':
module.fail_json(msg="RSA1 keys are not supported by the cryptography backend")
self.passphrase = to_bytes(self.passphrase) if self.passphrase else None
def _load_privatekey(self):
return OpensshKeypair.load(path=self.path, passphrase=self.passphrase, no_public_key=True)
def _generate_keypair(self):
keypair = OpensshKeypair.generate(
keytype=self.type,
size=self.size,
passphrase=self.passphrase,
comment=self.comment if self.comment else "",
)
with open(self.path, 'w+b') as f:
f.write(
OpensshKeypair.encode_openssh_privatekey(
keypair.asymmetric_keypair,
self.private_key_format
)
)
# ssh-keygen defaults private key permissions to 0600 octal
os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR)
with open(self.path + '.pub', 'w+b') as f:
f.write(keypair.public_key)
# ssh-keygen defaults public key permissions to 0644 octal
os.chmod(self.path + ".pub", stat.S_IWUSR + stat.S_IRUSR + stat.S_IRGRP + stat.S_IROTH)
def _get_current_key_properties(self):
keypair = self._load_privatekey()
return keypair.size, keypair.key_type, keypair.fingerprint
def _get_public_key(self):
try:
keypair = self._load_privatekey()
except OpenSSHError:
# Simulates the null output of ssh-keygen
return ""
return to_text(keypair.public_key)
def _update_comment(self):
keypair = self._load_privatekey()
try:
keypair.comment = self.comment
with open(self.path + ".pub", "w+b") as pubkey_file:
pubkey_file.write(keypair.public_key + b'\n')
except (InvalidCommentError, IOError, OSError) as e:
# Return values while unused currently are made to simulate the output of run_command()
return 1, "Comment could not be updated", to_native(e)
return 0, "Comment updated successfully", ""
def _private_key_loadable(self):
try:
self._load_privatekey()
except OpenSSHError:
return False
return True
def _check_pass_protected_or_broken_key(self):
try:
OpensshKeypair.load(
path=self.path,
passphrase=self.passphrase,
no_public_key=True,
)
except (InvalidPrivateKeyFileError, InvalidPassphraseError):
return True
# Cryptography >= 3.0 uses a SSH key loader which does not raise an exception when a passphrase is provided
# when loading an unencrypted key
if self.passphrase:
try:
OpensshKeypair.load(
path=self.path,
passphrase=None,
no_public_key=True,
)
except (InvalidPrivateKeyFileError, InvalidPassphraseError):
return False
else:
return True
return False
def any_in(sequence, *elements):
return any([e in sequence for e in elements])
def select_backend(module, backend):
can_use_cryptography = HAS_OPENSSH_SUPPORT
can_use_opensshbin = bool(module.get_bin_path('ssh-keygen'))
if backend == 'auto':
if can_use_opensshbin and not module.params['passphrase']:
backend = 'opensshbin'
elif can_use_cryptography:
backend = 'cryptography'
else:
module.fail_json(msg="Cannot find either the OpenSSH binary in the PATH " +
"or cryptography >= 2.6 installed on this system")
if backend == 'opensshbin':
if not can_use_opensshbin:
module.fail_json(msg="Cannot find the OpenSSH binary in the PATH")
return backend, KeypairBackendOpensshBin(module)
elif backend == 'cryptography':
if not can_use_cryptography:
module.fail_json(msg=missing_required_lib("cryptography >= 2.6"))
return backend, KeypairBackendCryptography(module)
else:
raise ValueError('Unsupported value for backend: {0}'.format(backend))

View File

@ -18,20 +18,21 @@
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
__metaclass__ = type __metaclass__ = type
import os
from base64 import b64encode, b64decode from base64 import b64encode, b64decode
from distutils.version import LooseVersion from distutils.version import LooseVersion
from getpass import getuser from getpass import getuser
from socket import gethostname from socket import gethostname
try: try:
import cryptography as c from cryptography import __version__ as CRYPTOGRAPHY_VERSION
from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl import backend from cryptography.hazmat.backends.openssl import backend
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa, padding from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa, padding
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey, Ed25519PublicKey
if LooseVersion(c.__version__) >= LooseVersion("3.0"): if LooseVersion(CRYPTOGRAPHY_VERSION) >= LooseVersion("3.0"):
HAS_OPENSSH_PRIVATE_FORMAT = True HAS_OPENSSH_PRIVATE_FORMAT = True
else: else:
HAS_OPENSSH_PRIVATE_FORMAT = False HAS_OPENSSH_PRIVATE_FORMAT = False
@ -78,6 +79,7 @@ try:
except ImportError: except ImportError:
HAS_OPENSSH_PRIVATE_FORMAT = False HAS_OPENSSH_PRIVATE_FORMAT = False
HAS_OPENSSH_SUPPORT = False HAS_OPENSSH_SUPPORT = False
CRYPTOGRAPHY_VERSION = "0.0"
_ALGORITHM_PARAMETERS = {} _ALGORITHM_PARAMETERS = {}
_TEXT_ENCODING = 'UTF-8' _TEXT_ENCODING = 'UTF-8'
@ -127,7 +129,7 @@ class InvalidSignatureError(OpenSSHError):
pass pass
class Asymmetric_Keypair(object): class AsymmetricKeypair(object):
"""Container for newly generated asymmetric key pairs or those loaded from existing files""" """Container for newly generated asymmetric key pairs or those loaded from existing files"""
@classmethod @classmethod
@ -261,7 +263,7 @@ class Asymmetric_Keypair(object):
) )
def __eq__(self, other): def __eq__(self, other):
if not isinstance(other, Asymmetric_Keypair): if not isinstance(other, AsymmetricKeypair):
return NotImplemented return NotImplemented
return (compare_publickeys(self.public_key, other.public_key) and return (compare_publickeys(self.public_key, other.public_key) and
@ -344,7 +346,7 @@ class Asymmetric_Keypair(object):
self.__encryption_algorithm = serialization.NoEncryption() self.__encryption_algorithm = serialization.NoEncryption()
class OpenSSH_Keypair(object): class OpensshKeypair(object):
"""Container for OpenSSH encoded asymmetric key pairs""" """Container for OpenSSH encoded asymmetric key pairs"""
@classmethod @classmethod
@ -360,7 +362,7 @@ class OpenSSH_Keypair(object):
if comment is None: if comment is None:
comment = "%s@%s" % (getuser(), gethostname()) comment = "%s@%s" % (getuser(), gethostname())
asym_keypair = Asymmetric_Keypair.generate(keytype, size, passphrase) asym_keypair = AsymmetricKeypair.generate(keytype, size, passphrase)
openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair, 'SSH') openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair, 'SSH')
openssh_publickey = cls.encode_openssh_publickey(asym_keypair, comment) openssh_publickey = cls.encode_openssh_publickey(asym_keypair, comment)
fingerprint = calculate_fingerprint(openssh_publickey) fingerprint = calculate_fingerprint(openssh_publickey)
@ -382,8 +384,12 @@ class OpenSSH_Keypair(object):
:no_public_key: Set 'True' to only load a private key and automatically populate the matching public key :no_public_key: Set 'True' to only load a private key and automatically populate the matching public key
""" """
comment = extract_comment(path + '.pub') if no_public_key:
asym_keypair = Asymmetric_Keypair.load(path, passphrase, 'SSH', 'SSH', no_public_key) comment = ""
else:
comment = extract_comment(path + '.pub')
asym_keypair = AsymmetricKeypair.load(path, passphrase, 'SSH', 'SSH', no_public_key)
openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair, 'SSH') openssh_privatekey = cls.encode_openssh_privatekey(asym_keypair, 'SSH')
openssh_publickey = cls.encode_openssh_publickey(asym_keypair, comment) openssh_publickey = cls.encode_openssh_publickey(asym_keypair, comment)
fingerprint = calculate_fingerprint(openssh_publickey) fingerprint = calculate_fingerprint(openssh_publickey)
@ -461,7 +467,7 @@ class OpenSSH_Keypair(object):
self.__comment = comment self.__comment = comment
def __eq__(self, other): def __eq__(self, other):
if not isinstance(other, OpenSSH_Keypair): if not isinstance(other, OpensshKeypair):
return NotImplemented return NotImplemented
return self.asymmetric_keypair == other.asymmetric_keypair and self.comment == other.comment return self.asymmetric_keypair == other.asymmetric_keypair and self.comment == other.comment
@ -529,7 +535,7 @@ class OpenSSH_Keypair(object):
""" """
self.__asym_keypair.update_passphrase(passphrase) self.__asym_keypair.update_passphrase(passphrase)
self.__openssh_privatekey = OpenSSH_Keypair.encode_openssh_privatekey(self.__asym_keypair, 'SSH') self.__openssh_privatekey = OpensshKeypair.encode_openssh_privatekey(self.__asym_keypair, 'SSH')
def load_privatekey(path, passphrase, key_format): def load_privatekey(path, passphrase, key_format):
@ -554,6 +560,9 @@ def load_privatekey(path, passphrase, key_format):
) )
) )
if not os.path.exists(path):
raise InvalidPrivateKeyFileError("No file was found at %s" % path)
try: try:
with open(path, 'rb') as f: with open(path, 'rb') as f:
content = f.read() content = f.read()
@ -606,6 +615,9 @@ def load_publickey(path, key_format):
) )
) )
if not os.path.exists(path):
raise InvalidPublicKeyFileError("No file was found at %s" % path)
try: try:
with open(path, 'rb') as f: with open(path, 'rb') as f:
content = f.read() content = f.read()
@ -658,6 +670,10 @@ def validate_comment(comment):
def extract_comment(path): def extract_comment(path):
if not os.path.exists(path):
raise InvalidPublicKeyFileError("No file was found at %s" % path)
try: try:
with open(path, 'rb') as f: with open(path, 'rb') as f:
fields = f.read().split(b' ', 2) fields = f.read().split(b' ', 2)
@ -665,7 +681,7 @@ def extract_comment(path):
comment = fields[2].decode(_TEXT_ENCODING) comment = fields[2].decode(_TEXT_ENCODING)
else: else:
comment = "" comment = ""
except OSError as e: except (IOError, OSError) as e:
raise InvalidPublicKeyFileError(e) raise InvalidPublicKeyFileError(e)
return comment return comment

View File

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
#
# (c) 2020, Doug Stanley <doug+ansible@technologixllc.com>
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import re
def parse_openssh_version(version_string):
"""Parse the version output of ssh -V and return version numbers that can be compared"""
parsed_result = re.match(
r"^.*openssh_(?P<version>[0-9.]+)(p?[0-9]+)[^0-9]*.*$", version_string.lower()
)
if parsed_result is not None:
version = parsed_result.group("version").strip()
else:
version = None
return version

View File

@ -229,7 +229,7 @@ from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native from ansible.module_utils._text import to_native
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime
from ansible_collections.community.crypto.plugins.module_utils.crypto.openssh import parse_openssh_version from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import parse_openssh_version
class CertificateError(Exception): class CertificateError(Exception):

View File

@ -17,9 +17,9 @@ description:
ssh-keygen to generate keys. One can generate C(rsa), C(dsa), C(rsa1), C(ed25519) ssh-keygen to generate keys. One can generate C(rsa), C(dsa), C(rsa1), C(ed25519)
or C(ecdsa) private keys." or C(ecdsa) private keys."
requirements: requirements:
- "ssh-keygen" - ssh-keygen (if I(backend=openssh))
- cryptography >= 2.6 (if using I(passphrase) and OpenSSH < 7.8 is installed) - cryptography >= 2.6 (if I(backend=cryptography) and OpenSSH < 7.8 is installed)
- cryptography >= 3.0 (if using I(passphrase) and OpenSSH >= 7.8 is installed) - cryptography >= 3.0 (if I(backend=cryptography) and OpenSSH >= 7.8 is installed)
options: options:
state: state:
description: description:
@ -60,11 +60,12 @@ options:
description: description:
- Passphrase used to decrypt an existing private key or encrypt a newly generated private key. - Passphrase used to decrypt an existing private key or encrypt a newly generated private key.
- Passphrases are not supported for I(type=rsa1). - Passphrases are not supported for I(type=rsa1).
- Can only be used when I(backend=cryptography), or when I(backend=auto) and a required C(cryptography) version is installed.
type: str type: str
version_added: 1.7.0 version_added: 1.7.0
private_key_format: private_key_format:
description: description:
- Used when a value for I(passphrase) is provided to select a format for the private key at the provided I(path). - Used when a I(backend=cryptography) to select a format for the private key at the provided I(path).
- The only valid option currently is C(auto) which will match the key format of the installed OpenSSH version. - The only valid option currently is C(auto) which will match the key format of the installed OpenSSH version.
- For OpenSSH < 7.8 private keys will be in PKCS1 format except ed25519 keys which will be in OpenSSH format. - For OpenSSH < 7.8 private keys will be in PKCS1 format except ed25519 keys which will be in OpenSSH format.
- For OpenSSH >= 7.8 all private key types will be in the OpenSSH format. - For OpenSSH >= 7.8 all private key types will be in the OpenSSH format.
@ -73,6 +74,17 @@ options:
choices: choices:
- auto - auto
version_added: 1.7.0 version_added: 1.7.0
backend:
description:
- Selects between the C(cryptography) library or the OpenSSH binary C(opensshbin).
- C(auto) will default to C(opensshbin) unless the OpenSSH binary is not installed or when using I(passphrase).
type: str
default: auto
choices:
- auto
- cryptography
- opensshbin
version_added: 1.7.0
regenerate: regenerate:
description: description:
- Allows to configure in which situations the module is allowed to regenerate private keys. - Allows to configure in which situations the module is allowed to regenerate private keys.
@ -173,390 +185,17 @@ comment:
sample: test@comment sample: test@comment
''' '''
import errno
import os import os
import stat
from distutils.version import LooseVersion
from ansible.module_utils.basic import AnsibleModule, missing_required_lib from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native, to_text, to_bytes
from ansible_collections.community.crypto.plugins.module_utils.crypto.openssh import parse_openssh_version from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.keypair_backend import (
from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptography_openssh import ( select_backend
HAS_OPENSSH_SUPPORT,
HAS_OPENSSH_PRIVATE_FORMAT,
InvalidPassphraseError,
InvalidPrivateKeyFileError,
OpenSSH_Keypair,
) )
class KeypairError(Exception):
pass
class Keypair(object):
def __init__(self, module):
self.path = module.params['path']
self.state = module.params['state']
self.force = module.params['force']
self.size = module.params['size']
self.type = module.params['type']
self.comment = module.params['comment']
self.passphrase = module.params['passphrase']
self.changed = False
self.check_mode = module.check_mode
self.privatekey = None
self.fingerprint = {}
self.public_key = {}
self.regenerate = module.params['regenerate']
if self.regenerate == 'always':
self.force = True
# The empty string is intentionally ignored so that dependency checks do not cause unnecessary failure
if self.passphrase:
if not HAS_OPENSSH_SUPPORT:
module.fail_json(
msg=missing_required_lib(
'cryptography >= 2.6',
reason="to encrypt/decrypt private keys with passphrases"
)
)
if module.params['private_key_format'] == 'auto':
ssh = module.get_bin_path('ssh', True)
proc = module.run_command([ssh, '-Vq'])
ssh_version = parse_openssh_version(proc[2].strip())
self.private_key_format = 'SSH'
if LooseVersion(ssh_version) < LooseVersion("7.8") and self.type != 'ed25519':
# OpenSSH made SSH formatted private keys available in version 6.5,
# but still defaulted to PKCS1 format with the exception of ed25519 keys
self.private_key_format = 'PKCS1'
if self.private_key_format == 'SSH' and not HAS_OPENSSH_PRIVATE_FORMAT:
module.fail_json(
msg=missing_required_lib(
'cryptography >= 3.0',
reason="to load/dump private keys in the default OpenSSH format for OpenSSH >= 7.8 " +
"or for ed25519 keys"
)
)
if self.type == 'rsa1':
module.fail_json(msg="Passphrases are not supported for RSA1 keys.")
self.passphrase = to_bytes(self.passphrase)
else:
self.private_key_format = None
if self.type in ('rsa', 'rsa1'):
self.size = 4096 if self.size is None else self.size
if self.size < 1024:
module.fail_json(msg=('For RSA keys, the minimum size is 1024 bits and the default is 4096 bits. '
'Attempting to use bit lengths under 1024 will cause the module to fail.'))
if self.type == 'dsa':
self.size = 1024 if self.size is None else self.size
if self.size != 1024:
module.fail_json(msg=('DSA keys must be exactly 1024 bits as specified by FIPS 186-2.'))
if self.type == 'ecdsa':
self.size = 256 if self.size is None else self.size
if self.size not in (256, 384, 521):
module.fail_json(msg=('For ECDSA keys, size determines the key length by selecting from '
'one of three elliptic curve sizes: 256, 384 or 521 bits. '
'Attempting to use bit lengths other than these three values for '
'ECDSA keys will cause this module to fail. '))
if self.type == 'ed25519':
self.size = 256
def generate(self, module):
# generate a keypair
if self.force or not self.isPrivateKeyValid(module, perms_required=False):
try:
if os.path.exists(self.path) and not os.access(self.path, os.W_OK):
os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR)
self.changed = True
if not self.passphrase:
args = [
module.get_bin_path('ssh-keygen', True),
'-q',
'-N', '',
'-b', str(self.size),
'-t', self.type,
'-f', self.path,
]
if self.comment:
args.extend(['-C', self.comment])
else:
args.extend(['-C', ""])
stdin_data = None
if os.path.exists(self.path):
stdin_data = 'y'
module.run_command(args, data=stdin_data)
proc = module.run_command([module.get_bin_path('ssh-keygen', True), '-lf', self.path])
self.fingerprint = proc[1].split()
pubkey = module.run_command([module.get_bin_path('ssh-keygen', True), '-yf', self.path])
self.public_key = pubkey[1].strip('\n')
else:
keypair = OpenSSH_Keypair.generate(
keytype=self.type,
size=self.size,
passphrase=self.passphrase,
comment=self.comment if self.comment else "",
)
with open(self.path, 'w+b') as f:
f.write(
OpenSSH_Keypair.encode_openssh_privatekey(
keypair.asymmetric_keypair,
self.private_key_format
)
)
os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR)
with open(self.path + '.pub', 'w+b') as f:
f.write(keypair.public_key)
os.chmod(self.path + ".pub", stat.S_IWUSR + stat.S_IRUSR + stat.S_IRGRP + stat.S_IROTH)
self.fingerprint = [
str(keypair.size), keypair.fingerprint, keypair.comment, "(%s)" % keypair.key_type.upper()
]
self.public_key = to_text(b' '.join(keypair.public_key.split(b' ', 2)[:2]))
except Exception as e:
self.remove()
module.fail_json(msg="%s" % to_native(e))
elif not self.isPublicKeyValid(module, perms_required=False):
if not self.passphrase:
pubkey = module.run_command([module.get_bin_path('ssh-keygen', True), '-yf', self.path])
pubkey = pubkey[1].strip('\n')
else:
keypair = OpenSSH_Keypair.load(
path=self.path,
passphrase=self.passphrase,
no_public_key=True,
)
pubkey = to_text(keypair.public_key)
try:
self.changed = True
with open(self.path + ".pub", "w") as pubkey_f:
pubkey_f.write(pubkey + '\n')
os.chmod(self.path + ".pub", stat.S_IWUSR + stat.S_IRUSR + stat.S_IRGRP + stat.S_IROTH)
except IOError:
module.fail_json(
msg='The public key is missing or does not match the private key. '
'Unable to regenerate the public key.')
self.public_key = pubkey
if self.comment:
try:
if os.path.exists(self.path) and not os.access(self.path, os.W_OK):
os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR)
if not self.passphrase:
args = [module.get_bin_path('ssh-keygen', True),
'-q', '-o', '-c', '-C', self.comment, '-f', self.path]
module.run_command(args)
else:
keypair.comment = self.comment
with open(self.path + ".pub", "w+b") as pubkey_f:
pubkey_f.write(keypair.public_key + b'\n')
except IOError:
module.fail_json(
msg='Unable to update the comment for the public key.')
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
self.changed = True
file_args['path'] = file_args['path'] + '.pub'
if module.set_fs_attributes_if_different(file_args, False):
self.changed = True
def _check_pass_protected_or_broken_key(self, module):
key_state = module.run_command([module.get_bin_path('ssh-keygen', True),
'-P', '', '-yf', self.path], check_rc=False)
if not self.passphrase:
if key_state[0] == 255 or 'is not a public key file' in key_state[2]:
return True
if 'incorrect passphrase' in key_state[2] or 'load failed' in key_state[2]:
return True
return False
else:
try:
OpenSSH_Keypair.load(
path=self.path,
passphrase=self.passphrase,
no_public_key=True,
)
except (InvalidPrivateKeyFileError, InvalidPassphraseError) as e:
return True
# Cryptography >= 3.0 uses a SSH key loader which does not raise an exception when a passphrase is provided
# when loading an unencrypted key so 'ssh-keygen' is used for this check
if key_state[0] == 0:
return True
return False
def isPrivateKeyValid(self, module, perms_required=True):
# check if the key is correct
def _check_state():
return os.path.exists(self.path)
if not _check_state():
return False
if self._check_pass_protected_or_broken_key(module):
if self.regenerate in ('full_idempotence', 'always'):
return False
module.fail_json(msg='Unable to read the key. The key is protected with a passphrase or broken.'
' Will not proceed. To force regeneration, call the module with `generate`'
' set to `full_idempotence` or `always`, or with `force=yes`.')
proc = module.run_command([module.get_bin_path('ssh-keygen', True), '-lf', self.path], check_rc=False)
if not proc[0] == 0:
if os.path.isdir(self.path):
module.fail_json(msg='%s is a directory. Please specify a path to a file.' % (self.path))
if self.regenerate in ('full_idempotence', 'always'):
return False
module.fail_json(msg='Unable to read the key. The key is protected with a passphrase or broken.'
' Will not proceed. To force regeneration, call the module with `generate`'
' set to `full_idempotence` or `always`, or with `force=yes`.')
fingerprint = proc[1].split()
keysize = int(fingerprint[0])
keytype = fingerprint[-1][1:-1].lower()
self.fingerprint = fingerprint
if self.regenerate == 'never':
return True
def _check_type():
return self.type == keytype
def _check_size():
return self.size == keysize
if not (_check_type() and _check_size()):
if self.regenerate in ('partial_idempotence', 'full_idempotence', 'always'):
return False
module.fail_json(msg='Key has wrong type and/or size.'
' Will not proceed. To force regeneration, call the module with `generate`'
' set to `partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`.')
def _check_perms(module):
file_args = module.load_file_common_arguments(module.params)
return not module.set_fs_attributes_if_different(file_args, False)
return not perms_required or _check_perms(module)
def isPublicKeyValid(self, module, perms_required=True):
def _get_pubkey_content():
if os.path.exists(self.path + ".pub"):
with open(self.path + ".pub", "r") as pubkey_f:
present_pubkey = pubkey_f.read().strip(' \n')
return present_pubkey
else:
return False
def _parse_pubkey(pubkey_content):
if pubkey_content:
parts = pubkey_content.split(' ', 2)
if len(parts) < 2:
return False
return parts[0], parts[1], '' if len(parts) <= 2 else parts[2]
return False
def _pubkey_valid(pubkey):
if pubkey_parts and _parse_pubkey(pubkey):
return pubkey_parts[:2] == _parse_pubkey(pubkey)[:2]
return False
def _comment_valid():
if pubkey_parts:
return pubkey_parts[2] == self.comment
return False
def _check_perms(module):
file_args = module.load_file_common_arguments(module.params)
file_args['path'] = file_args['path'] + '.pub'
return not module.set_fs_attributes_if_different(file_args, False)
pubkey_parts = _parse_pubkey(_get_pubkey_content())
if not self.passphrase:
pubkey = module.run_command([module.get_bin_path('ssh-keygen', True), '-yf', self.path])
pubkey = pubkey[1].strip('\n')
else:
keypair = OpenSSH_Keypair.load(
path=self.path,
passphrase=self.passphrase,
no_public_key=True,
)
pubkey = to_text(keypair.public_key)
if _pubkey_valid(pubkey):
self.public_key = pubkey
else:
return False
if self.comment:
if not _comment_valid():
return False
if perms_required:
if not _check_perms(module):
return False
return True
def dump(self):
# return result as a dict
"""Serialize the object into a dictionary."""
result = {
'changed': self.changed,
'size': self.size,
'type': self.type,
'filename': self.path,
# On removal this has no value
'fingerprint': self.fingerprint[1] if self.fingerprint else '',
'public_key': self.public_key,
'comment': self.comment if self.comment else '',
}
return result
def remove(self):
"""Remove the resource from the filesystem."""
try:
os.remove(self.path)
self.changed = True
except OSError as exc:
if exc.errno != errno.ENOENT:
raise KeypairError(exc)
else:
pass
if os.path.exists(self.path + ".pub"):
try:
os.remove(self.path + ".pub")
self.changed = True
except OSError as exc:
if exc.errno != errno.ENOENT:
raise KeypairError(exc)
else:
pass
def main(): def main():
# Define Ansible Module
module = AnsibleModule( module = AnsibleModule(
argument_spec=dict( argument_spec=dict(
state=dict(type='str', default='present', choices=['present', 'absent']), state=dict(type='str', default='present', choices=['present', 'absent']),
@ -571,13 +210,13 @@ def main():
choices=['never', 'fail', 'partial_idempotence', 'full_idempotence', 'always'] choices=['never', 'fail', 'partial_idempotence', 'full_idempotence', 'always']
), ),
passphrase=dict(type='str', no_log=True), passphrase=dict(type='str', no_log=True),
private_key_format=dict(type='str', default='auto', no_log=False, choices=['auto']) private_key_format=dict(type='str', default='auto', no_log=False, choices=['auto']),
backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'opensshbin'])
), ),
supports_check_mode=True, supports_check_mode=True,
add_file_common_args=True, add_file_common_args=True,
) )
# Check if Path exists
base_dir = os.path.dirname(module.params['path']) or '.' base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir): if not os.path.isdir(base_dir):
module.fail_json( module.fail_json(
@ -585,37 +224,25 @@ def main():
msg='The directory %s does not exist or the file is not a directory' % base_dir msg='The directory %s does not exist or the file is not a directory' % base_dir
) )
keypair = Keypair(module) keypair = select_backend(module, module.params['backend'])[1]
if keypair.state == 'present':
if module.params['state'] == 'present':
if module.check_mode: if module.check_mode:
changed = keypair.force or not keypair.isPrivateKeyValid(module) or not keypair.isPublicKeyValid(module) keypair.changed = any([
result = keypair.dump() keypair.force,
result['changed'] = changed not keypair.is_private_key_valid(),
module.exit_json(**result) not keypair.is_public_key_valid()
])
try: else:
keypair.generate(module) keypair.generate()
except Exception as exc:
module.fail_json(msg=to_native(exc))
else: else:
# When `state=absent` no details from an existing key at the given `path` are returned in the module result
if module.check_mode: if module.check_mode:
keypair.changed = os.path.exists(module.params['path']) keypair.changed = keypair.exists()
if keypair.changed: else:
keypair.fingerprint = {}
result = keypair.dump()
module.exit_json(**result)
try:
keypair.remove() keypair.remove()
except Exception as exc:
module.fail_json(msg=to_native(exc))
result = keypair.dump() module.exit_json(**keypair.result)
module.exit_json(**result)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,3 +1,4 @@
dependencies: dependencies:
- setup_ssh_keygen - setup_ssh_keygen
- setup_openssl - setup_openssl
- setup_bcrypt

View File

@ -0,0 +1,557 @@
---
####################################################################
# WARNING: These are designed specifically for Ansible tests #
# and should not be used as examples of how to write Ansible roles #
####################################################################
# Ensures no conflicts from previous test runs
- name: "({{ backend }}) Cleanup Output Directory"
ansible.builtin.file:
path: "{{ item }}"
state: absent
with_fileglob:
- "{{ output_dir }}/privatekey*"
- "{{ output_dir }}/regenerate*"
- name: "({{ backend }}) Generate privatekey1 - standard (check mode)"
openssh_keypair:
path: '{{ output_dir }}/privatekey1'
size: 2048
backend: "{{ backend }}"
register: privatekey1_result_check
check_mode: true
- name: "({{ backend }}) Generate privatekey1 - standard"
openssh_keypair:
path: '{{ output_dir }}/privatekey1'
size: 2048
backend: "{{ backend }}"
register: privatekey1_result
- name: "({{ backend }}) Generate privatekey1 - standard (check mode idempotent)"
openssh_keypair:
path: '{{ output_dir }}/privatekey1'
size: 2048
backend: "{{ backend }}"
register: privatekey1_idem_result_check
check_mode: true
- name: "({{ backend }}) Generate privatekey1 - standard (idempotent)"
openssh_keypair:
path: '{{ output_dir }}/privatekey1'
size: 2048
backend: "{{ backend }}"
register: privatekey1_idem_result
- name: "({{ backend }}) Generate privatekey2 - default size"
openssh_keypair:
path: '{{ output_dir }}/privatekey2'
backend: "{{ backend }}"
- name: "({{ backend }}) Generate privatekey3 - type dsa"
openssh_keypair:
path: '{{ output_dir }}/privatekey3'
type: dsa
backend: "{{ backend }}"
- name: "({{ backend }}) Generate privatekey4 - standard"
openssh_keypair:
path: '{{ output_dir }}/privatekey4'
size: 2048
backend: "{{ backend }}"
- name: "({{ backend }}) Delete privatekey4 - standard"
openssh_keypair:
state: absent
path: '{{ output_dir }}/privatekey4'
backend: "{{ backend }}"
- name: "({{ backend }}) Generate privatekey5 - standard"
openssh_keypair:
path: '{{ output_dir }}/privatekey5'
size: 2048
backend: "{{ backend }}"
register: publickey_gen
- name: "({{ backend }}) Generate privatekey6"
openssh_keypair:
path: '{{ output_dir }}/privatekey6'
type: rsa
size: 2048
backend: "{{ backend }}"
- name: "({{ backend }}) Regenerate privatekey6 via force"
openssh_keypair:
path: '{{ output_dir }}/privatekey6'
type: rsa
size: 2048
force: yes
backend: "{{ backend }}"
register: output_regenerated_via_force
- name: "({{ backend }}) Create broken key"
copy:
dest: '{{ item }}'
content: ''
mode: '0700'
loop:
- '{{ output_dir }}/privatekeybroken'
- '{{ output_dir }}/privatekeybroken.pub'
- name: "({{ backend }}) Regenerate broken key - should fail"
openssh_keypair:
path: '{{ output_dir }}/privatekeybroken'
type: rsa
size: 2048
backend: "{{ backend }}"
register: output_broken
ignore_errors: yes
- name: "({{ backend }}) Regenerate broken key with force"
openssh_keypair:
path: '{{ output_dir }}/privatekeybroken'
type: rsa
force: yes
size: 2048
backend: "{{ backend }}"
register: output_broken_force
- name: "({{ backend }}) Generate read-only private key"
openssh_keypair:
path: '{{ output_dir }}/privatekeyreadonly'
type: rsa
mode: '0200'
size: 2048
backend: "{{ backend }}"
- name: "({{ backend }}) Regenerate read-only private key via force"
openssh_keypair:
path: '{{ output_dir }}/privatekeyreadonly'
type: rsa
force: yes
size: 2048
backend: "{{ backend }}"
register: output_read_only
- name: "({{ backend }}) Generate privatekey7 - standard with comment"
openssh_keypair:
path: '{{ output_dir }}/privatekey7'
comment: 'test@privatekey7'
size: 2048
backend: "{{ backend }}"
register: privatekey7_result
- name: "({{ backend }}) Modify privatekey7 comment"
openssh_keypair:
path: '{{ output_dir }}/privatekey7'
comment: 'test_modified@privatekey7'
size: 2048
backend: "{{ backend }}"
register: privatekey7_modified_result
- name: "({{ backend }}) Generate password protected key"
command: 'ssh-keygen -f {{ output_dir }}/privatekey8 -N {{ passphrase }}'
- name: "({{ backend }}) Try to modify the password protected key - should fail"
openssh_keypair:
path: '{{ output_dir }}/privatekey8'
size: 2048
backend: "{{ backend }}"
register: privatekey8_result
ignore_errors: yes
- name: "({{ backend }}) Try to modify the password protected key with force=yes"
openssh_keypair:
path: '{{ output_dir }}/privatekey8'
force: yes
size: 2048
backend: "{{ backend }}"
register: privatekey8_result_force
- name: "({{ backend }}) Generate another password protected key"
command: 'ssh-keygen -f {{ output_dir }}/privatekey9 -N {{ passphrase }}'
- name: "({{ backend }}) Try to modify the password protected key with passphrase"
openssh_keypair:
path: '{{ output_dir }}/privatekey9'
size: 1024
passphrase: "{{ passphrase }}"
backend: "{{ backend }}"
register: privatekey9_modified_result
when: backend == 'cryptography'
- name: "({{ backend }}) Generate another unprotected key"
openssh_keypair:
path: '{{ output_dir }}/privatekey10'
size: 2048
backend: "{{ backend }}"
- name: "({{ backend }}) Try to Modify unprotected key with passphrase"
openssh_keypair:
path: '{{ output_dir }}/privatekey10'
size: 2048
passphrase: "{{ passphrase }}"
backend: "{{ backend }}"
ignore_errors: true
register: privatekey10_result
when: backend == 'cryptography'
- name: "({{ backend }}) Try to force modify the password protected key with force=true"
openssh_keypair:
path: '{{ output_dir }}/privatekey10'
size: 2048
passphrase: "{{ passphrase }}"
force: true
backend: "{{ backend }}"
register: privatekey10_result_force
when: backend == 'cryptography'
- name: "({{ backend }}) Ensure that ssh-keygen can read keys generated with passphrase"
command: 'ssh-keygen -yf {{ output_dir }}/privatekey10 -P {{ passphrase }}'
register: privatekey10_result_sshkeygen
when: backend == 'cryptography'
- name: "({{ backend }}) Generate PEM encoded key with passphrase"
command: 'ssh-keygen -f {{ output_dir }}/privatekey11 -N {{ passphrase }} -m PEM'
when: backend == 'cryptography'
- name: "({{ backend }}) Try to verify a PEM encoded key"
openssh_keypair:
path: '{{ output_dir }}/privatekey11'
size: 2048
passphrase: "{{ passphrase }}"
backend: "{{ backend }}"
register: privatekey11_result
when: backend == 'cryptography'
- import_tasks: ../tests/validate.yml
# Test regenerate option
- name: "({{ backend }}) Regenerate - setup simple keys"
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: rsa
size: 1024
backend: "{{ backend }}"
loop: "{{ regenerate_values }}"
- name: "({{ backend }}) Regenerate - setup password protected keys"
command: 'ssh-keygen -f {{ output_dir }}/regenerate-b-{{ item }} -N {{ passphrase }}'
loop: "{{ regenerate_values }}"
- name: "({{ backend }}) Regenerate - setup broken keys"
copy:
dest: '{{ output_dir }}/regenerate-c-{{ item.0 }}{{ item.1 }}'
content: 'broken key'
mode: '0700'
with_nested:
- "{{ regenerate_values }}"
- [ '', '.pub' ]
-
- name: "({{ backend }}) Regenerate - setup password protected keys for passphrse test"
command: 'ssh-keygen -f {{ output_dir }}/regenerate-d-{{ item }} -N {{ passphrase }}'
loop: "{{ regenerate_values }}"
- name: "({{ backend }}) Regenerate - modify broken keys (check mode)"
openssh_keypair:
path: '{{ output_dir }}/regenerate-c-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
backend: "{{ backend }}"
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[0].msg"
- result.results[1] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[1].msg"
- result.results[2] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[2].msg"
- result.results[3] is changed
- result.results[4] is changed
- name: "({{ backend }}) Regenerate - modify broken keys"
openssh_keypair:
path: '{{ output_dir }}/regenerate-c-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
backend: "{{ backend }}"
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[0].msg"
- result.results[1] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[1].msg"
- result.results[2] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[2].msg"
- result.results[3] is changed
- result.results[4] is changed
- name: "({{ backend }}) Regenerate - modify password protected keys (check mode)"
openssh_keypair:
path: '{{ output_dir }}/regenerate-b-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
backend: "{{ backend }}"
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[0].msg"
- result.results[1] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[1].msg"
- result.results[2] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[2].msg"
- result.results[3] is changed
- result.results[4] is changed
- name: "({{ backend }}) Regenerate - modify password protected keys with passphrase (check mode)"
openssh_keypair:
path: '{{ output_dir }}/regenerate-b-{{ item }}'
type: rsa
size: 1024
passphrase: "{{ passphrase }}"
regenerate: '{{ item }}'
backend: "{{ backend }}"
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
when: backend == 'cryptography'
- assert:
that:
- result.results[0] is success
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
when: backend == 'cryptography'
- name: "({{ backend }}) Regenerate - modify password protected keys"
openssh_keypair:
path: '{{ output_dir }}/regenerate-b-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
backend: "{{ backend }}"
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[0].msg"
- result.results[1] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[1].msg"
- result.results[2] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[2].msg"
- result.results[3] is changed
- result.results[4] is changed
- name: "({{ backend }}) Regenerate - modify password protected keys with passphrase"
openssh_keypair:
path: '{{ output_dir }}/regenerate-d-{{ item }}'
type: rsa
size: 1024
passphrase: "{{ passphrase }}"
regenerate: '{{ item }}'
backend: "{{ backend }}"
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
when: backend == 'cryptography'
- assert:
that:
- result.results[0] is success
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
when: backend == 'cryptography'
- name: "({{ backend }}) Regenerate - not modify regular keys (check mode)"
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
backend: "{{ backend }}"
check_mode: yes
loop: "{{ regenerate_values }}"
register: result
- assert:
that:
- result.results[0] is not changed
- result.results[1] is not changed
- result.results[2] is not changed
- result.results[3] is not changed
- result.results[4] is changed
- name: "({{ backend }}) Regenerate - not modify regular keys"
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
backend: "{{ backend }}"
loop: "{{ regenerate_values }}"
register: result
- assert:
that:
- result.results[0] is not changed
- result.results[1] is not changed
- result.results[2] is not changed
- result.results[3] is not changed
- result.results[4] is changed
- name: "({{ backend }}) Regenerate - adjust key size (check mode)"
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: rsa
size: 1048
regenerate: '{{ item }}'
backend: "{{ backend }}"
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is success and result.results[0] is not changed
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
- name: "({{ backend }}) Regenerate - adjust key size"
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: rsa
size: 1048
regenerate: '{{ item }}'
backend: "{{ backend }}"
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is success and result.results[0] is not changed
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
- name: "({{ backend }}) Regenerate - redistribute keys"
copy:
src: '{{ output_dir }}/regenerate-a-always{{ item.1 }}'
dest: '{{ output_dir }}/regenerate-a-{{ item.0 }}{{ item.1 }}'
remote_src: true
with_nested:
- "{{ regenerate_values }}"
- [ '', '.pub' ]
when: "item.0 != 'always'"
- name: "({{ backend }}) Regenerate - adjust key type (check mode)"
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: dsa
size: 1024
regenerate: '{{ item }}'
backend: "{{ backend }}"
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is success and result.results[0] is not changed
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
- name: "({{ backend }}) Regenerate - adjust key type"
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: dsa
size: 1024
regenerate: '{{ item }}'
backend: "{{ backend }}"
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is success and result.results[0] is not changed
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
- name: "({{ backend }}) Regenerate - redistribute keys"
copy:
src: '{{ output_dir }}/regenerate-a-always{{ item.1 }}'
dest: '{{ output_dir }}/regenerate-a-{{ item.0 }}{{ item.1 }}'
remote_src: true
with_nested:
- "{{ regenerate_values }}"
- [ '', '.pub' ]
when: "item.0 != 'always'"
- name: "({{ backend }}) Regenerate - adjust comment (check mode)"
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: dsa
size: 1024
comment: test comment
regenerate: '{{ item }}'
backend: "{{ backend }}"
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result is changed
- name: "({{ backend }}) Regenerate - adjust comment"
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: dsa
size: 1024
comment: test comment
regenerate: '{{ item }}'
backend: "{{ backend }}"
loop: "{{ regenerate_values }}"
register: result
- assert:
that:
- result is changed
# for all values but 'always', the key should have not been regenerated.
# verify this by comparing fingerprints:
- result.results[0].fingerprint == result.results[1].fingerprint
- result.results[0].fingerprint == result.results[2].fingerprint
- result.results[0].fingerprint == result.results[3].fingerprint
- result.results[0].fingerprint != result.results[4].fingerprint

View File

@ -4,509 +4,20 @@
# and should not be used as examples of how to write Ansible roles # # and should not be used as examples of how to write Ansible roles #
#################################################################### ####################################################################
# Bumps up cryptography and bcrypt versions to be compatible with OpenSSH >= 7.8 - name: Backend auto-detection test
- import_tasks: ./setup_bcrypt.yml
- name: Generate privatekey1 - standard (check mode)
openssh_keypair: openssh_keypair:
path: '{{ output_dir }}/privatekey1' path: '{{ output_dir }}/auto_backend_key'
size: 2048 state: "{{ item }}"
register: privatekey1_result_check loop: ['present', 'absent']
check_mode: true
- name: Generate privatekey1 - standard - set_fact:
openssh_keypair: backends: ['opensshbin']
path: '{{ output_dir }}/privatekey1'
size: 2048
register: privatekey1_result
- name: Generate privatekey1 - standard (check mode idempotent) - set_fact:
openssh_keypair: backends: "{{ backends + ['cryptography'] }}"
path: '{{ output_dir }}/privatekey1'
size: 2048
register: privatekey1_idem_result_check
check_mode: true
- name: Generate privatekey1 - standard (idempotent)
openssh_keypair:
path: '{{ output_dir }}/privatekey1'
size: 2048
register: privatekey1_idem_result
- name: Generate privatekey2 - default size
openssh_keypair:
path: '{{ output_dir }}/privatekey2'
- name: Generate privatekey3 - type dsa
openssh_keypair:
path: '{{ output_dir }}/privatekey3'
type: dsa
- name: Generate privatekey4 - standard
openssh_keypair:
path: '{{ output_dir }}/privatekey4'
size: 2048
- name: Delete privatekey4 - standard
openssh_keypair:
state: absent
path: '{{ output_dir }}/privatekey4'
- name: Generate privatekey5 - standard
openssh_keypair:
path: '{{ output_dir }}/privatekey5'
size: 2048
register: publickey_gen
- name: Generate privatekey6
openssh_keypair:
path: '{{ output_dir }}/privatekey6'
type: rsa
size: 2048
- name: Regenerate privatekey6 via force
openssh_keypair:
path: '{{ output_dir }}/privatekey6'
type: rsa
size: 2048
force: yes
register: output_regenerated_via_force
- name: Create broken key
copy:
dest: '{{ item }}'
content: ''
mode: '0700'
loop:
- '{{ output_dir }}/privatekeybroken'
- '{{ output_dir }}/privatekeybroken.pub'
- name: Regenerate broken key - should fail
openssh_keypair:
path: '{{ output_dir }}/privatekeybroken'
type: rsa
size: 2048
register: output_broken
ignore_errors: yes
- name: Regenerate broken key with force
openssh_keypair:
path: '{{ output_dir }}/privatekeybroken'
type: rsa
force: yes
size: 2048
register: output_broken_force
- name: Generate read-only private key
openssh_keypair:
path: '{{ output_dir }}/privatekeyreadonly'
type: rsa
mode: '0200'
size: 2048
- name: Regenerate read-only private key via force
openssh_keypair:
path: '{{ output_dir }}/privatekeyreadonly'
type: rsa
force: yes
size: 2048
register: output_read_only
- name: Generate privatekey7 - standard with comment
openssh_keypair:
path: '{{ output_dir }}/privatekey7'
comment: 'test@privatekey7'
size: 2048
register: privatekey7_result
- name: Modify privatekey7 comment
openssh_keypair:
path: '{{ output_dir }}/privatekey7'
comment: 'test_modified@privatekey7'
size: 2048
register: privatekey7_modified_result
- name: Generate password protected key
command: 'ssh-keygen -f {{ output_dir }}/privatekey8 -N {{ passphrase }}'
- name: Try to modify the password protected key - should fail
openssh_keypair:
path: '{{ output_dir }}/privatekey8'
size: 2048
register: privatekey8_result
ignore_errors: yes
- name: Try to modify the password protected key with force=yes
openssh_keypair:
path: '{{ output_dir }}/privatekey8'
force: yes
size: 2048
register: privatekey8_result_force
- name: Generate another password protected key
command: 'ssh-keygen -f {{ output_dir }}/privatekey9 -N {{ passphrase }}'
- name: Try to modify the password protected key with passphrase
openssh_keypair:
path: '{{ output_dir }}/privatekey9'
size: 1024
passphrase: "{{ passphrase }}"
register: privatekey9_modified_result
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=') when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=')
- name: Generate another unprotected key - include_tasks: ./impl.yml
openssh_keypair: loop: "{{ backends }}"
path: '{{ output_dir }}/privatekey10' loop_control:
size: 2048 loop_var: backend
- name: Try to Modify unprotected key with passphrase
openssh_keypair:
path: '{{ output_dir }}/privatekey10'
size: 2048
passphrase: "{{ passphrase }}"
ignore_errors: true
register: privatekey10_result
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=')
- name: Try to force modify the password protected key with force=true
openssh_keypair:
path: '{{ output_dir }}/privatekey10'
size: 2048
passphrase: "{{ passphrase }}"
force: true
register: privatekey10_result_force
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=')
- name: Ensure that ssh-keygen can read keys generated with passphrase
command: 'ssh-keygen -yf {{ output_dir }}/privatekey10 -P {{ passphrase }}'
register: privatekey10_result_sshkeygen
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=')
- name: Generate PEM encoded key with passphrase
command: 'ssh-keygen -f {{ output_dir }}/privatekey11 -N {{ passphrase }} -m PEM'
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=')
- name: Try to verify a PEM encoded key
openssh_keypair:
path: '{{ output_dir }}/privatekey11'
size: 2048
passphrase: "{{ passphrase }}"
register: privatekey11_result
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=')
- import_tasks: ../tests/validate.yml
# Test regenerate option
- name: Regenerate - setup simple keys
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: rsa
size: 1024
loop: "{{ regenerate_values }}"
- name: Regenerate - setup password protected keys
command: 'ssh-keygen -f {{ output_dir }}/regenerate-b-{{ item }} -N {{ passphrase }}'
loop: "{{ regenerate_values }}"
- name: Regenerate - setup broken keys
copy:
dest: '{{ output_dir }}/regenerate-c-{{ item.0 }}{{ item.1 }}'
content: 'broken key'
mode: '0700'
with_nested:
- "{{ regenerate_values }}"
- [ '', '.pub' ]
-
- name: Regenerate - setup password protected keys for passphrse test
command: 'ssh-keygen -f {{ output_dir }}/regenerate-d-{{ item }} -N {{ passphrase }}'
loop: "{{ regenerate_values }}"
- name: Regenerate - modify broken keys (check mode)
openssh_keypair:
path: '{{ output_dir }}/regenerate-c-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[0].msg"
- result.results[1] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[1].msg"
- result.results[2] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[2].msg"
- result.results[3] is changed
- result.results[4] is changed
- name: Regenerate - modify broken keys
openssh_keypair:
path: '{{ output_dir }}/regenerate-c-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[0].msg"
- result.results[1] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[1].msg"
- result.results[2] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[2].msg"
- result.results[3] is changed
- result.results[4] is changed
- name: Regenerate - modify password protected keys (check mode)
openssh_keypair:
path: '{{ output_dir }}/regenerate-b-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[0].msg"
- result.results[1] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[1].msg"
- result.results[2] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[2].msg"
- result.results[3] is changed
- result.results[4] is changed
- name: Regenerate - modify password protected keys with passphrase (check mode)
openssh_keypair:
path: '{{ output_dir }}/regenerate-b-{{ item }}'
type: rsa
size: 1024
passphrase: "{{ passphrase }}"
regenerate: '{{ item }}'
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=')
- assert:
that:
- result.results[0] is success
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=')
- name: Regenerate - modify password protected keys
openssh_keypair:
path: '{{ output_dir }}/regenerate-b-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[0].msg"
- result.results[1] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[1].msg"
- result.results[2] is failed
- "'Unable to read the key. The key is protected with a passphrase or broken. Will not proceed.' in result.results[2].msg"
- result.results[3] is changed
- result.results[4] is changed
- name: Regenerate - modify password protected keys with passphrase
openssh_keypair:
path: '{{ output_dir }}/regenerate-d-{{ item }}'
type: rsa
size: 1024
passphrase: "{{ passphrase }}"
regenerate: '{{ item }}'
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=')
- assert:
that:
- result.results[0] is success
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=')
- name: Regenerate - not modify regular keys (check mode)
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
check_mode: yes
loop: "{{ regenerate_values }}"
register: result
- assert:
that:
- result.results[0] is not changed
- result.results[1] is not changed
- result.results[2] is not changed
- result.results[3] is not changed
- result.results[4] is changed
- name: Regenerate - not modify regular keys
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: rsa
size: 1024
regenerate: '{{ item }}'
loop: "{{ regenerate_values }}"
register: result
- assert:
that:
- result.results[0] is not changed
- result.results[1] is not changed
- result.results[2] is not changed
- result.results[3] is not changed
- result.results[4] is changed
- name: Regenerate - adjust key size (check mode)
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: rsa
size: 1048
regenerate: '{{ item }}'
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is success and result.results[0] is not changed
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
- name: Regenerate - adjust key size
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: rsa
size: 1048
regenerate: '{{ item }}'
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is success and result.results[0] is not changed
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
- name: Regenerate - redistribute keys
copy:
src: '{{ output_dir }}/regenerate-a-always{{ item.1 }}'
dest: '{{ output_dir }}/regenerate-a-{{ item.0 }}{{ item.1 }}'
remote_src: true
with_nested:
- "{{ regenerate_values }}"
- [ '', '.pub' ]
when: "item.0 != 'always'"
- name: Regenerate - adjust key type (check mode)
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: dsa
size: 1024
regenerate: '{{ item }}'
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is success and result.results[0] is not changed
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
- name: Regenerate - adjust key type
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: dsa
size: 1024
regenerate: '{{ item }}'
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result.results[0] is success and result.results[0] is not changed
- result.results[1] is failed
- "'Key has wrong type and/or size. Will not proceed.' in result.results[1].msg"
- result.results[2] is changed
- result.results[3] is changed
- result.results[4] is changed
- name: Regenerate - redistribute keys
copy:
src: '{{ output_dir }}/regenerate-a-always{{ item.1 }}'
dest: '{{ output_dir }}/regenerate-a-{{ item.0 }}{{ item.1 }}'
remote_src: true
with_nested:
- "{{ regenerate_values }}"
- [ '', '.pub' ]
when: "item.0 != 'always'"
- name: Regenerate - adjust comment (check mode)
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: dsa
size: 1024
comment: test comment
regenerate: '{{ item }}'
check_mode: yes
loop: "{{ regenerate_values }}"
ignore_errors: yes
register: result
- assert:
that:
- result is changed
- name: Regenerate - adjust comment
openssh_keypair:
path: '{{ output_dir }}/regenerate-a-{{ item }}'
type: dsa
size: 1024
comment: test comment
regenerate: '{{ item }}'
loop: "{{ regenerate_values }}"
register: result
- assert:
that:
- result is changed
# for all values but 'always', the key should have not been regenerated.
# verify this by comparing fingerprints:
- result.results[0].fingerprint == result.results[1].fingerprint
- result.results[0].fingerprint == result.results[2].fingerprint
- result.results[0].fingerprint == result.results[3].fingerprint
- result.results[0].fingerprint != result.results[4].fingerprint

View File

@ -1,9 +1,9 @@
--- ---
- name: Log privatekey1 return values - name: "({{ backend }}) Log privatekey1 return values"
debug: debug:
var: privatekey1_result var: privatekey1_result
- name: Validate general behavior - name: "({{ backend }}) Validate general behavior"
assert: assert:
that: that:
- privatekey1_result_check is changed - privatekey1_result_check is changed
@ -12,7 +12,7 @@
- privatekey1_idem_result_check.public_key.startswith("ssh-rsa") - privatekey1_idem_result_check.public_key.startswith("ssh-rsa")
- privatekey1_idem_result is not changed - privatekey1_idem_result is not changed
- name: Validate privatekey1 return fingerprint - name: "({{ backend }}) Validate privatekey1 return fingerprint"
assert: assert:
that: that:
- privatekey1_result["fingerprint"] is string - privatekey1_result["fingerprint"] is string
@ -20,150 +20,150 @@
# only distro old enough that it still gives md5 with no prefix # only distro old enough that it still gives md5 with no prefix
when: ansible_distribution != 'CentOS' and ansible_distribution_major_version != '6' when: ansible_distribution != 'CentOS' and ansible_distribution_major_version != '6'
- name: Validate privatekey1 return public_key - name: "({{ backend }}) Validate privatekey1 return public_key"
assert: assert:
that: that:
- privatekey1_result["public_key"] is string - privatekey1_result["public_key"] is string
- privatekey1_result["public_key"].startswith("ssh-rsa ") - privatekey1_result["public_key"].startswith("ssh-rsa ")
- name: Validate privatekey1 return size value - name: "({{ backend }}) Validate privatekey1 return size value"
assert: assert:
that: that:
- privatekey1_result["size"]|type_debug == 'int' - privatekey1_result["size"]|type_debug == 'int'
- privatekey1_result["size"] == 2048 - privatekey1_result["size"] == 2048
- name: Validate privatekey1 return key type - name: "({{ backend }}) Validate privatekey1 return key type"
assert: assert:
that: that:
- privatekey1_result["type"] is string - privatekey1_result["type"] is string
- privatekey1_result["type"] == "rsa" - privatekey1_result["type"] == "rsa"
- name: Validate privatekey1 (test - RSA key with size 2048 bits) - name: "({{ backend }}) Validate privatekey1 (test - RSA key with size 2048 bits)"
shell: "ssh-keygen -lf {{ output_dir }}/privatekey1 | grep -o -E '^[0-9]+'" shell: "ssh-keygen -lf {{ output_dir }}/privatekey1 | grep -o -E '^[0-9]+'"
register: privatekey1 register: privatekey1
- name: Validate privatekey1 (assert - RSA key with size 2048 bits) - name: "({{ backend }}) Validate privatekey1 (assert - RSA key with size 2048 bits)"
assert: assert:
that: that:
- privatekey1.stdout == '2048' - privatekey1.stdout == '2048'
- name: Validate privatekey1 idempotence - name: "({{ backend }}) Validate privatekey1 idempotence"
assert: assert:
that: that:
- privatekey1_idem_result is not changed - privatekey1_idem_result is not changed
- name: Validate privatekey2 (test - RSA key with default size 4096 bits) - name: "({{ backend }}) Validate privatekey2 (test - RSA key with default size 4096 bits)"
shell: "ssh-keygen -lf {{ output_dir }}/privatekey2 | grep -o -E '^[0-9]+'" shell: "ssh-keygen -lf {{ output_dir }}/privatekey2 | grep -o -E '^[0-9]+'"
register: privatekey2 register: privatekey2
- name: Validate privatekey2 (assert - RSA key with size 4096 bits) - name: "({{ backend }}) Validate privatekey2 (assert - RSA key with size 4096 bits)"
assert: assert:
that: that:
- privatekey2.stdout == '4096' - privatekey2.stdout == '4096'
- name: Validate privatekey3 (test - DSA key with size 1024 bits) - name: "({{ backend }}) Validate privatekey3 (test - DSA key with size 1024 bits)"
shell: "ssh-keygen -lf {{ output_dir }}/privatekey3 | grep -o -E '^[0-9]+'" shell: "ssh-keygen -lf {{ output_dir }}/privatekey3 | grep -o -E '^[0-9]+'"
register: privatekey3 register: privatekey3
- name: Validate privatekey3 (assert - DSA key with size 4096 bits) - name: "({{ backend }}) Validate privatekey3 (assert - DSA key with size 4096 bits)"
assert: assert:
that: that:
- privatekey3.stdout == '1024' - privatekey3.stdout == '1024'
- name: Validate privatekey4 (test - Ensure key has been removed) - name: "({{ backend }}) Validate privatekey4 (test - Ensure key has been removed)"
stat: stat:
path: '{{ output_dir }}/privatekey4' path: '{{ output_dir }}/privatekey4'
register: privatekey4 register: privatekey4
- name: Validate privatekey4 (assert - Ensure key has been removed) - name: "({{ backend }}) Validate privatekey4 (assert - Ensure key has been removed)"
assert: assert:
that: that:
- privatekey4.stat.exists == False - privatekey4.stat.exists == False
- name: Validate privatekey5 (assert - Public key module output equal to the public key on host) - name: "({{ backend }}) Validate privatekey5 (assert - Public key module output equal to the public key on host)"
assert: assert:
that: that:
- "publickey_gen.public_key == lookup('file', output_dir ~ '/privatekey5.pub').strip('\n')" - "publickey_gen.public_key == lookup('file', output_dir ~ '/privatekey5.pub').strip('\n')"
- name: Verify that privatekey6 will be regenerated via force - name: "({{ backend }}) Verify that privatekey6 will be regenerated via force"
assert: assert:
that: that:
- output_regenerated_via_force is changed - output_regenerated_via_force is changed
- name: Verify that broken key will cause failure - name: "({{ backend }}) Verify that broken key will cause failure"
assert: assert:
that: that:
- output_broken is failed - output_broken is failed
- "'Unable to read the key. The key is protected with a passphrase or broken.' in output_broken.msg" - "'Unable to read the key. The key is protected with a passphrase or broken.' in output_broken.msg"
- name: Verify that broken key will be regenerated if force=yes is specified - name: "({{ backend }}) Verify that broken key will be regenerated if force=yes is specified"
assert: assert:
that: that:
- output_broken_force is changed - output_broken_force is changed
- name: Verify that read-only key will be regenerated - name: "({{ backend }}) Verify that read-only key will be regenerated"
assert: assert:
that: that:
- output_read_only is changed - output_read_only is changed
- name: Validate privatekey7 (assert - Public key remains the same after comment change) - name: "({{ backend }}) Validate privatekey7 (assert - Public key remains the same after comment change)"
assert: assert:
that: that:
- privatekey7_result.public_key == privatekey7_modified_result.public_key - privatekey7_result.public_key == privatekey7_modified_result.public_key
- name: Validate privatekey7 comment on creation - name: "({{ backend }}) Validate privatekey7 comment on creation"
assert: assert:
that: that:
- privatekey7_result.comment == 'test@privatekey7' - privatekey7_result.comment == 'test@privatekey7'
- name: Validate privatekey7 comment update - name: "({{ backend }}) Validate privatekey7 comment update"
assert: assert:
that: that:
- privatekey7_modified_result.comment == 'test_modified@privatekey7' - privatekey7_modified_result.comment == 'test_modified@privatekey7'
- name: Check that password protected key made module fail - name: "({{ backend }}) Check that password protected key made module fail"
assert: assert:
that: that:
- privatekey8_result is failed - privatekey8_result is failed
- "'Unable to read the key. The key is protected with a passphrase or broken.' in privatekey8_result.msg" - "'Unable to read the key. The key is protected with a passphrase or broken.' in privatekey8_result.msg"
- name: Check that password protected key was regenerated with force=yes - name: "({{ backend }}) Check that password protected key was regenerated with force=yes"
assert: assert:
that: that:
- privatekey8_result_force is changed - privatekey8_result_force is changed
- block: - block:
- name: Check that password protected key with passphrase was regenerated - name: "({{ backend }}) Check that password protected key with passphrase was regenerated"
assert: assert:
that: that:
- privatekey9_modified_result is changed - privatekey9_modified_result is changed
- name: Check that modifying unprotected key with passphrase fails - name: "({{ backend }}) Check that modifying unprotected key with passphrase fails"
assert: assert:
that: that:
- privatekey10_result is failed - privatekey10_result is failed
- "'Unable to read the key. The key is protected with a passphrase or broken.' in privatekey8_result.msg" - "'Unable to read the key. The key is protected with a passphrase or broken.' in privatekey8_result.msg"
- name: Check that unprotected key was regenerated with force=yes and passphrase supplied - name: "({{ backend }}) Check that unprotected key was regenerated with force=yes and passphrase supplied"
assert: assert:
that: that:
- privatekey10_result_force is changed - privatekey10_result_force is changed
- name: Check that ssh-keygen output from passphrase protected key matches openssh_keypair - name: "({{ backend }}) Check that ssh-keygen output from passphrase protected key matches openssh_keypair"
assert: assert:
that: that:
- privatekey10_result_force.public_key == privatekey10_result_sshkeygen.stdout - privatekey10_result_force.public_key == privatekey10_result_sshkeygen.stdout
- name: Check that PEM encoded private keys are loaded successfully - name: "({{ backend }}) Check that PEM encoded private keys are loaded successfully"
assert: assert:
that: that:
- privatekey11_result is success - privatekey11_result is success
when: cryptography_version.stdout is version('3.0', '>=') and bcrypt_version.stdout is version('3.1.5', '>=') when: backend == 'cryptography'

View File

@ -0,0 +1,3 @@
dependencies:
- setup_remote_constraints
- setup_pkg_mgr

View File

@ -21,4 +21,4 @@
set_fact: set_fact:
bcrypt_version: bcrypt_version:
stdout: 0.0 stdout: 0.0
when: bcrypt_version is not defined when: bcrypt_version is not defined

View File

@ -13,8 +13,8 @@ from os import remove, rmdir
from socket import gethostname from socket import gethostname
from tempfile import mkdtemp from tempfile import mkdtemp
from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptography_openssh import ( from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptography import (
Asymmetric_Keypair, AsymmetricKeypair,
HAS_OPENSSH_SUPPORT, HAS_OPENSSH_SUPPORT,
InvalidCommentError, InvalidCommentError,
InvalidPrivateKeyFileError, InvalidPrivateKeyFileError,
@ -22,7 +22,7 @@ from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptogra
InvalidKeySizeError, InvalidKeySizeError,
InvalidKeyTypeError, InvalidKeyTypeError,
InvalidPassphraseError, InvalidPassphraseError,
OpenSSH_Keypair OpensshKeypair
) )
DEFAULT_KEY_PARAMS = [ DEFAULT_KEY_PARAMS = [
@ -147,9 +147,9 @@ def test_default_key_params(keytype, size, passphrase, comment):
} }
default_comment = "%s@%s" % (getuser(), gethostname()) default_comment = "%s@%s" % (getuser(), gethostname())
pair = OpenSSH_Keypair.generate(keytype=keytype, size=size, passphrase=passphrase, comment=comment) pair = OpensshKeypair.generate(keytype=keytype, size=size, passphrase=passphrase, comment=comment)
try: try:
pair = OpenSSH_Keypair.generate(keytype=keytype, size=size, passphrase=passphrase, comment=comment) pair = OpensshKeypair.generate(keytype=keytype, size=size, passphrase=passphrase, comment=comment)
if pair.size != default_sizes[pair.key_type] or pair.comment != default_comment: if pair.size != default_sizes[pair.key_type] or pair.comment != default_comment:
result = False result = False
except Exception as e: except Exception as e:
@ -165,7 +165,7 @@ def test_valid_user_key_params(keytype, size, passphrase, comment):
result = True result = True
try: try:
pair = OpenSSH_Keypair.generate(keytype=keytype, size=size, passphrase=passphrase, comment=comment) pair = OpensshKeypair.generate(keytype=keytype, size=size, passphrase=passphrase, comment=comment)
if pair.key_type != keytype or pair.size != size or pair.comment != comment: if pair.key_type != keytype or pair.size != size or pair.comment != comment:
result = False result = False
except Exception as e: except Exception as e:
@ -181,7 +181,7 @@ def test_invalid_user_key_params(keytype, size, passphrase, comment):
result = False result = False
try: try:
OpenSSH_Keypair.generate(keytype=keytype, size=size, passphrase=passphrase, comment=comment) OpensshKeypair.generate(keytype=keytype, size=size, passphrase=passphrase, comment=comment)
except (InvalidCommentError, InvalidKeyTypeError, InvalidPassphraseError): except (InvalidCommentError, InvalidKeyTypeError, InvalidPassphraseError):
result = True result = True
except Exception as e: except Exception as e:
@ -197,7 +197,7 @@ def test_invalid_key_sizes(keytype, size, passphrase, comment):
result = False result = False
try: try:
OpenSSH_Keypair.generate(keytype=keytype, size=size, passphrase=passphrase, comment=comment) OpensshKeypair.generate(keytype=keytype, size=size, passphrase=passphrase, comment=comment)
except InvalidKeySizeError: except InvalidKeySizeError:
result = True result = True
except Exception as e: except Exception as e:
@ -210,7 +210,7 @@ def test_invalid_key_sizes(keytype, size, passphrase, comment):
@pytest.mark.skipif(not HAS_OPENSSH_SUPPORT, reason="requires cryptography") @pytest.mark.skipif(not HAS_OPENSSH_SUPPORT, reason="requires cryptography")
def test_valid_comment_update(): def test_valid_comment_update():
pair = OpenSSH_Keypair.generate() pair = OpensshKeypair.generate()
new_comment = "comment" new_comment = "comment"
try: try:
pair.comment = new_comment pair.comment = new_comment
@ -225,7 +225,7 @@ def test_valid_comment_update():
def test_invalid_comment_update(): def test_invalid_comment_update():
result = False result = False
pair = OpenSSH_Keypair.generate() pair = OpensshKeypair.generate()
new_comment = [1, 2, 3] new_comment = [1, 2, 3]
try: try:
pair.comment = new_comment pair.comment = new_comment
@ -245,7 +245,7 @@ def test_valid_passphrase_update():
tmpdir = mkdtemp() tmpdir = mkdtemp()
keyfilename = os.path.join(tmpdir, "id_rsa") keyfilename = os.path.join(tmpdir, "id_rsa")
pair1 = OpenSSH_Keypair.generate() pair1 = OpensshKeypair.generate()
pair1.update_passphrase(passphrase) pair1.update_passphrase(passphrase)
with open(keyfilename, "w+b") as keyfile: with open(keyfilename, "w+b") as keyfile:
@ -254,7 +254,7 @@ def test_valid_passphrase_update():
with open(keyfilename + '.pub', "w+b") as pubkeyfile: with open(keyfilename + '.pub', "w+b") as pubkeyfile:
pubkeyfile.write(pair1.public_key) pubkeyfile.write(pair1.public_key)
pair2 = OpenSSH_Keypair.load(path=keyfilename, passphrase=passphrase) pair2 = OpensshKeypair.load(path=keyfilename, passphrase=passphrase)
if pair1 == pair2: if pair1 == pair2:
result = True result = True
@ -274,7 +274,7 @@ def test_invalid_passphrase_update():
result = False result = False
passphrase = [1, 2, 3] passphrase = [1, 2, 3]
pair = OpenSSH_Keypair.generate() pair = OpensshKeypair.generate()
try: try:
pair.update_passphrase(passphrase) pair.update_passphrase(passphrase)
except InvalidPassphraseError: except InvalidPassphraseError:
@ -291,7 +291,7 @@ def test_invalid_privatekey():
tmpdir = mkdtemp() tmpdir = mkdtemp()
keyfilename = os.path.join(tmpdir, "id_rsa") keyfilename = os.path.join(tmpdir, "id_rsa")
pair = OpenSSH_Keypair.generate() pair = OpensshKeypair.generate()
with open(keyfilename, "w+b") as keyfile: with open(keyfilename, "w+b") as keyfile:
keyfile.write(pair.private_key[1:]) keyfile.write(pair.private_key[1:])
@ -299,7 +299,7 @@ def test_invalid_privatekey():
with open(keyfilename + '.pub', "w+b") as pubkeyfile: with open(keyfilename + '.pub', "w+b") as pubkeyfile:
pubkeyfile.write(pair.public_key) pubkeyfile.write(pair.public_key)
OpenSSH_Keypair.load(path=keyfilename) OpensshKeypair.load(path=keyfilename)
except InvalidPrivateKeyFileError: except InvalidPrivateKeyFileError:
result = True result = True
finally: finally:
@ -321,8 +321,8 @@ def test_mismatched_keypair():
tmpdir = mkdtemp() tmpdir = mkdtemp()
keyfilename = os.path.join(tmpdir, "id_rsa") keyfilename = os.path.join(tmpdir, "id_rsa")
pair1 = OpenSSH_Keypair.generate() pair1 = OpensshKeypair.generate()
pair2 = OpenSSH_Keypair.generate() pair2 = OpensshKeypair.generate()
with open(keyfilename, "w+b") as keyfile: with open(keyfilename, "w+b") as keyfile:
keyfile.write(pair1.private_key) keyfile.write(pair1.private_key)
@ -330,7 +330,7 @@ def test_mismatched_keypair():
with open(keyfilename + '.pub', "w+b") as pubkeyfile: with open(keyfilename + '.pub', "w+b") as pubkeyfile:
pubkeyfile.write(pair2.public_key) pubkeyfile.write(pair2.public_key)
OpenSSH_Keypair.load(path=keyfilename) OpensshKeypair.load(path=keyfilename)
except InvalidPublicKeyFileError: except InvalidPublicKeyFileError:
result = True result = True
finally: finally:
@ -346,24 +346,24 @@ def test_mismatched_keypair():
@pytest.mark.skipif(not HAS_OPENSSH_SUPPORT, reason="requires cryptography") @pytest.mark.skipif(not HAS_OPENSSH_SUPPORT, reason="requires cryptography")
def test_keypair_comparison(): def test_keypair_comparison():
assert OpenSSH_Keypair.generate() != OpenSSH_Keypair.generate() assert OpensshKeypair.generate() != OpensshKeypair.generate()
assert OpenSSH_Keypair.generate() != OpenSSH_Keypair.generate(keytype='dsa') assert OpensshKeypair.generate() != OpensshKeypair.generate(keytype='dsa')
assert OpenSSH_Keypair.generate() != OpenSSH_Keypair.generate(keytype='ed25519') assert OpensshKeypair.generate() != OpensshKeypair.generate(keytype='ed25519')
assert OpenSSH_Keypair.generate(keytype='ed25519') != OpenSSH_Keypair.generate(keytype='ed25519') assert OpensshKeypair.generate(keytype='ed25519') != OpensshKeypair.generate(keytype='ed25519')
try: try:
tmpdir = mkdtemp() tmpdir = mkdtemp()
keys = { keys = {
'rsa': { 'rsa': {
'pair': OpenSSH_Keypair.generate(), 'pair': OpensshKeypair.generate(),
'filename': os.path.join(tmpdir, "id_rsa"), 'filename': os.path.join(tmpdir, "id_rsa"),
}, },
'dsa': { 'dsa': {
'pair': OpenSSH_Keypair.generate(keytype='dsa', passphrase='change_me'.encode('UTF-8')), 'pair': OpensshKeypair.generate(keytype='dsa', passphrase='change_me'.encode('UTF-8')),
'filename': os.path.join(tmpdir, "id_dsa"), 'filename': os.path.join(tmpdir, "id_dsa"),
}, },
'ed25519': { 'ed25519': {
'pair': OpenSSH_Keypair.generate(keytype='ed25519'), 'pair': OpensshKeypair.generate(keytype='ed25519'),
'filename': os.path.join(tmpdir, "id_ed25519"), 'filename': os.path.join(tmpdir, "id_ed25519"),
} }
} }
@ -374,9 +374,9 @@ def test_keypair_comparison():
with open(v['filename'] + '.pub', "w+b") as pubkeyfile: with open(v['filename'] + '.pub', "w+b") as pubkeyfile:
pubkeyfile.write(v['pair'].public_key) pubkeyfile.write(v['pair'].public_key)
assert keys['rsa']['pair'] == OpenSSH_Keypair.load(path=keys['rsa']['filename']) assert keys['rsa']['pair'] == OpensshKeypair.load(path=keys['rsa']['filename'])
loaded_dsa_key = OpenSSH_Keypair.load(path=keys['dsa']['filename'], passphrase='change_me'.encode('UTF-8')) loaded_dsa_key = OpensshKeypair.load(path=keys['dsa']['filename'], passphrase='change_me'.encode('UTF-8'))
assert keys['dsa']['pair'] == loaded_dsa_key assert keys['dsa']['pair'] == loaded_dsa_key
loaded_dsa_key.update_passphrase('change_me_again'.encode('UTF-8')) loaded_dsa_key.update_passphrase('change_me_again'.encode('UTF-8'))
@ -388,7 +388,7 @@ def test_keypair_comparison():
loaded_dsa_key.comment = "comment" loaded_dsa_key.comment = "comment"
assert keys['dsa']['pair'] != loaded_dsa_key assert keys['dsa']['pair'] != loaded_dsa_key
assert keys['ed25519']['pair'] == OpenSSH_Keypair.load(path=keys['ed25519']['filename']) assert keys['ed25519']['pair'] == OpensshKeypair.load(path=keys['ed25519']['filename'])
finally: finally:
for v in keys.values(): for v in keys.values():
if os.path.exists(v['filename']): if os.path.exists(v['filename']):
@ -397,4 +397,4 @@ def test_keypair_comparison():
remove(v['filename'] + '.pub') remove(v['filename'] + '.pub')
if os.path.exists(tmpdir): if os.path.exists(tmpdir):
rmdir(tmpdir) rmdir(tmpdir)
assert OpenSSH_Keypair.generate() != [] assert OpensshKeypair.generate() != []