From 08ada24a539114c21a13e16614a590f6ee06508f Mon Sep 17 00:00:00 2001 From: Ajpantuso Date: Wed, 18 Aug 2021 03:22:31 -0400 Subject: [PATCH] openssh_keypair - Add diff support and general cleanup (#260) * Initial commit * Matching tests to overwritten permissions behavior with cryptography * Ensuring key validation only occurs when state=present and accomodating CentOS6 restrictions * Making ssh-keygen behavior explicit by version in tests * Ensuring cyrptography not excluded in new conditions * Adding changelog fragment * Fixing sanity checks * Improving readability * Applying review suggestions * addressing restore_on_failure conflict --- .../260-openssh_keypair-diff-support.yml | 8 + .../module_utils/openssh/backends/common.py | 283 ++++++++ .../openssh/backends/keypair_backend.py | 609 +++++++++--------- plugins/module_utils/openssh/utils.py | 26 + plugins/modules/openssh_cert.py | 361 +++++------ plugins/modules/openssh_keypair.py | 27 +- .../targets/openssh_keypair/tests/core.yml | 4 +- .../targets/openssh_keypair/tests/invalid.yml | 7 - .../targets/openssh_keypair/tests/options.yml | 11 +- .../targets/setup_ssh_keygen/tasks/main.yml | 9 + 10 files changed, 795 insertions(+), 550 deletions(-) create mode 100644 changelogs/fragments/260-openssh_keypair-diff-support.yml diff --git a/changelogs/fragments/260-openssh_keypair-diff-support.yml b/changelogs/fragments/260-openssh_keypair-diff-support.yml new file mode 100644 index 00000000..3ba97ade --- /dev/null +++ b/changelogs/fragments/260-openssh_keypair-diff-support.yml @@ -0,0 +1,8 @@ +--- +minor_changes: + - openssh_keypair - added ``diff`` support (https://github.com/ansible-collections/community.crypto/pull/260). +bugfixes: + - openssh_keypair - fixed error handling to restore original keypair if regeneration fails + (https://github.com/ansible-collections/community.crypto/pull/260). + - openssh_keypair - fixed ``cryptography`` backend to preserve original file permissions when regenerating a keypair + requires existing files to be overwritten (https://github.com/ansible-collections/community.crypto/pull/260). diff --git a/plugins/module_utils/openssh/backends/common.py b/plugins/module_utils/openssh/backends/common.py index 7dc952a0..bd28c534 100644 --- a/plugins/module_utils/openssh/backends/common.py +++ b/plugins/module_utils/openssh/backends/common.py @@ -18,7 +18,15 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type +import abc import os +import stat + +from ansible.module_utils import six + +from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import ( + parse_openssh_version, +) def restore_on_failure(f): @@ -40,3 +48,278 @@ def restore_on_failure(f): @restore_on_failure def safe_atomic_move(module, path, destination): module.atomic_move(path, destination) + + +def _restore_all_on_failure(f): + def backup_and_restore(self, sources_and_destinations, *args, **kwargs): + backups = [(d, self.module.backup_local(d)) for s, d in sources_and_destinations if os.path.exists(d)] + + try: + f(self, sources_and_destinations, *args, **kwargs) + except Exception: + for destination, backup in backups: + self.module.atomic_move(backup, destination) + raise + else: + for destination, backup in backups: + self.module.add_cleanup_file(backup) + return backup_and_restore + + +@six.add_metaclass(abc.ABCMeta) +class OpensshModule(object): + def __init__(self, module): + self.module = module + + self.changed = False + self.check_mode = self.module.check_mode + + def execute(self): + self._execute() + self.module.exit_json(**self.result) + + @abc.abstractmethod + def _execute(self): + pass + + @property + def result(self): + result = self._result + + result['changed'] = self.changed + + if self.module._diff: + result['diff'] = self.diff + + return result + + @property + @abc.abstractmethod + def _result(self): + pass + + @property + @abc.abstractmethod + def diff(self): + pass + + @staticmethod + def skip_if_check_mode(f): + def wrapper(self, *args, **kwargs): + if not self.check_mode: + f(self, *args, **kwargs) + return wrapper + + @staticmethod + def trigger_change(f): + def wrapper(self, *args, **kwargs): + f(self, *args, **kwargs) + self.changed = True + return wrapper + + def _check_if_base_dir(self, path): + base_dir = os.path.dirname(path) or '.' + if not os.path.isdir(base_dir): + self.module.fail_json( + name=base_dir, + msg='The directory %s does not exist or the file is not a directory' % base_dir + ) + + def _get_ssh_version(self): + ssh_bin = self.module.get_bin_path('ssh') + if not ssh_bin: + return "" + return parse_openssh_version(self.module.run_command([ssh_bin, '-V', '-q'])[2].strip()) + + @_restore_all_on_failure + def _safe_secure_move(self, sources_and_destinations): + """Moves a list of files from 'source' to 'destination' and restores 'destination' from backup upon failure. + If 'destination' does not already exist, then 'source' permissions are preserved to prevent + exposing protected data ('atomic_move' uses the 'destination' base directory mask for + permissions if 'destination' does not already exists). + """ + for source, destination in sources_and_destinations: + if os.path.exists(destination): + self.module.atomic_move(source, destination) + else: + self.module.preserved_copy(source, destination) + + def _update_permissions(self, path): + file_args = self.module.load_file_common_arguments(self.module.params) + file_args['path'] = path + + if not self.module.check_file_absent_if_check_mode(path): + self.changed = self.module.set_fs_attributes_if_different(file_args, self.changed) + else: + self.changed = True + + +class KeygenCommand(object): + def __init__(self, module): + self._bin_path = module.get_bin_path('ssh-keygen', True) + self._run_command = module.run_command + + def generate_certificate(self, certificate_path, identifier, options, pkcs11_provider, principals, + serial_number, signing_key_path, type, time_parameters, use_agent, **kwargs): + args = [self._bin_path, '-s', signing_key_path, '-P', '', '-I', identifier] + + if options: + for option in options: + args.extend(['-O', option]) + if pkcs11_provider: + args.extend(['-D', pkcs11_provider]) + if principals: + args.extend(['-n', ','.join(principals)]) + if serial_number is not None: + args.extend(['-z', str(serial_number)]) + if type == 'host': + args.extend(['-h']) + if use_agent: + args.extend(['-U']) + if time_parameters.validity_string: + args.extend(['-V', time_parameters.validity_string]) + args.append(certificate_path) + + return self._run_command(args, **kwargs) + + def generate_keypair(self, private_key_path, size, type, comment, **kwargs): + args = [ + self._bin_path, + '-q', + '-N', '', + '-b', str(size), + '-t', type, + '-f', private_key_path, + '-C', comment or '' + ] + + # "y" must be entered in response to the "overwrite" prompt + data = 'y' if os.path.exists(private_key_path) else None + + return self._run_command(args, data=data, **kwargs) + + def get_certificate_info(self, certificate_path, **kwargs): + return self._run_command([self._bin_path, '-L', '-f', certificate_path], **kwargs) + + def get_matching_public_key(self, private_key_path, **kwargs): + return self._run_command([self._bin_path, '-P', '', '-y', '-f', private_key_path], **kwargs) + + def get_private_key(self, private_key_path, **kwargs): + return self._run_command([self._bin_path, '-l', '-f', private_key_path], **kwargs) + + def update_comment(self, private_key_path, comment, **kwargs): + if os.path.exists(private_key_path) and not os.access(private_key_path, os.W_OK): + try: + os.chmod(private_key_path, stat.S_IWUSR + stat.S_IRUSR) + except (IOError, OSError) as e: + raise e("The private key at %s is not writeable preventing a comment update" % private_key_path) + + return self._run_command([self._bin_path, '-q', '-o', '-c', '-C', comment, '-f', private_key_path], **kwargs) + + +class PrivateKey(object): + def __init__(self, size, key_type, fingerprint): + self._size = size + self._type = key_type + self._fingerprint = fingerprint + + @property + def size(self): + return self._size + + @property + def type(self): + return self._type + + @property + def fingerprint(self): + return self._fingerprint + + @classmethod + def from_string(cls, string): + properties = string.split() + + return cls( + size=int(properties[0]), + key_type=properties[-1][1:-1].lower(), + fingerprint=properties[1], + ) + + def to_dict(self): + return { + 'size': self._size, + 'type': self._type, + 'fingerprint': self._fingerprint, + } + + +class PublicKey(object): + def __init__(self, type_string, data, comment): + self._type_string = type_string + self._data = data + self._comment = comment + + def __eq__(self, other): + if not isinstance(other, type(self)): + return NotImplemented + + return all([ + self._type_string == other._type_string, + self._data == other._data, + (self._comment == other._comment) if self._comment is not None and other._comment is not None else True + ]) + + def __ne__(self, other): + return not self == other + + def __str__(self): + return "%s %s" % (self._type_string, self._data) + + @property + def comment(self): + return self._comment + + @comment.setter + def comment(self, value): + self._comment = value + + @property + def data(self): + return self._data + + @property + def type_string(self): + return self._type_string + + @classmethod + def from_string(cls, string): + properties = string.strip('\n').split(' ', 2) + + return cls( + type_string=properties[0], + data=properties[1], + comment=properties[2] if len(properties) > 2 else "" + ) + + @classmethod + def load(cls, path): + try: + with open(path, 'r') as f: + properties = f.read().strip(' \n').split(' ', 2) + except (IOError, OSError): + raise + + if len(properties) < 2: + return None + + return cls( + type_string=properties[0], + data=properties[1], + comment='' if len(properties) <= 2 else properties[2], + ) + + def to_dict(self): + return { + 'comment': self._comment, + 'public_key': self._data, + } diff --git a/plugins/module_utils/openssh/backends/keypair_backend.py b/plugins/module_utils/openssh/backends/keypair_backend.py index c46acb12..bf434e68 100644 --- a/plugins/module_utils/openssh/backends/keypair_backend.py +++ b/plugins/module_utils/openssh/backends/keypair_backend.py @@ -20,16 +20,13 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type import abc -import errno import os -import stat from distutils.version import LooseVersion from ansible.module_utils import six from ansible.module_utils.basic import missing_required_lib from ansible.module_utils.common.text.converters import to_native, to_text, to_bytes -from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import parse_openssh_version from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptography import ( HAS_OPENSSH_SUPPORT, HAS_OPENSSH_PRIVATE_FORMAT, @@ -39,322 +36,334 @@ from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptogra OpenSSHError, OpensshKeypair, ) +from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.common import ( + KeygenCommand, + OpensshModule, + PrivateKey, + PublicKey, +) +from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import ( + any_in, + file_mode, + secure_write, +) @six.add_metaclass(abc.ABCMeta) -class KeypairBackend(object): +class KeypairBackend(OpensshModule): def __init__(self, module): - self.module = module + super(KeypairBackend, self).__init__(module) - self.path = module.params['path'] - self.force = module.params['force'] - self.size = module.params['size'] - self.type = module.params['type'] - self.comment = module.params['comment'] - self.passphrase = module.params['passphrase'] - self.regenerate = module.params['regenerate'] + self.comment = self.module.params['comment'] + self.private_key_path = self.module.params['path'] + self.public_key_path = self.private_key_path + '.pub' + self.regenerate = self.module.params['regenerate'] if not self.module.params['force'] else 'always' + self.state = self.module.params['state'] + self.type = self.module.params['type'] - self.changed = False - self.fingerprint = '' - self.public_key = {} + self.size = self._get_size(self.module.params['size']) + self._validate_path() - if self.regenerate == 'always': - self.force = True + self.original_private_key = None + self.original_public_key = None + self.private_key = None + self.public_key = None + def _get_size(self, size): if self.type in ('rsa', 'rsa1'): - self.size = 4096 if self.size is None else self.size - if self.size < 1024: - module.fail_json(msg=('For RSA keys, the minimum size is 1024 bits and the default is 4096 bits. ' - 'Attempting to use bit lengths under 1024 will cause the module to fail.')) + result = 4096 if size is None else size + if result < 1024: + return self.module.fail_json( + msg="For RSA keys, the minimum size is 1024 bits and the default is 4096 bits. " + + "Attempting to use bit lengths under 1024 will cause the module to fail." + ) elif self.type == 'dsa': - self.size = 1024 if self.size is None else self.size - if self.size != 1024: - module.fail_json(msg=('DSA keys must be exactly 1024 bits as specified by FIPS 186-2.')) + result = 1024 if size is None else size + if result != 1024: + return self.module.fail_json(msg="DSA keys must be exactly 1024 bits as specified by FIPS 186-2.") elif self.type == 'ecdsa': - self.size = 256 if self.size is None else self.size - if self.size not in (256, 384, 521): - module.fail_json(msg=('For ECDSA keys, size determines the key length by selecting from ' - 'one of three elliptic curve sizes: 256, 384 or 521 bits. ' - 'Attempting to use bit lengths other than these three values for ' - 'ECDSA keys will cause this module to fail. ')) + result = 256 if size is None else size + if result not in (256, 384, 521): + return self.module.fail_json( + msg="For ECDSA keys, size determines the key length by selecting from one of " + + "three elliptic curve sizes: 256, 384 or 521 bits. " + + "Attempting to use bit lengths other than these three values for ECDSA keys will " + + "cause this module to fail." + ) elif self.type == 'ed25519': # User input is ignored for `key size` when `key type` is ed25519 - self.size = 256 + result = 256 else: - module.fail_json(msg="%s is not a valid value for key type" % self.type) + return self.module.fail_json(msg="%s is not a valid value for key type" % self.type) - def generate(self): - if self.force or not self.is_private_key_valid(perms_required=False): - try: - if self.exists() and not os.access(self.path, os.W_OK): - os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR) - self._generate_keypair() - self.changed = True - except (IOError, OSError) as e: - self.remove() - self.module.fail_json(msg="%s" % to_native(e)) + return result - self.fingerprint = self._get_current_key_properties()[2] - self.public_key = self._get_public_key() - elif not self.is_public_key_valid(perms_required=False): - pubkey = self._get_public_key() - try: - with open(self.path + ".pub", "w") as pubkey_f: - pubkey_f.write(pubkey + '\n') - os.chmod(self.path + ".pub", stat.S_IWUSR + stat.S_IRUSR + stat.S_IRGRP + stat.S_IROTH) - except (IOError, OSError): - self.module.fail_json( - msg='The public key is missing or does not match the private key. ' - 'Unable to regenerate the public key.') - self.changed = True - self.public_key = pubkey + def _validate_path(self): + self._check_if_base_dir(self.private_key_path) - if self.comment: - try: - if self.exists() and not os.access(self.path, os.W_OK): - os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR) - except (IOError, OSError): - self.module.fail_json(msg='Unable to update the comment for the public key.') - self._update_comment() + if os.path.isdir(self.private_key_path): + self.module.fail_json(msg='%s is a directory. Please specify a path to a file.' % self.private_key_path) - private_key_perms_changed = self._permissions_changed() - public_key_perms_changed = self._permissions_changed(public_key=True) - if private_key_perms_changed or public_key_perms_changed: - self.changed = True + def _execute(self): + self.original_private_key = self._load_private_key() + self.original_public_key = self._load_public_key() - def is_private_key_valid(self, perms_required=True): - if not self.exists(): - return False + if self.state == 'present': + self._validate_key_load() - if self._check_pass_protected_or_broken_key(): - if self.regenerate in ('full_idempotence', 'always'): - return False - self.module.fail_json(msg='Unable to read the key. The key is protected with a passphrase or broken.' - ' Will not proceed. To force regeneration, call the module with `generate`' - ' set to `full_idempotence` or `always`, or with `force=yes`.') + if self._should_generate(): + self._generate() + elif not self._public_key_valid(): + self._restore_public_key() - if not self._private_key_loadable(): - if os.path.isdir(self.path): - self.module.fail_json(msg='%s is a directory. Please specify a path to a file.' % self.path) + self.private_key = self._load_private_key() + self.public_key = self._load_public_key() - if self.regenerate in ('full_idempotence', 'always'): - return False - self.module.fail_json(msg='Unable to read the key. The key is protected with a passphrase or broken.' - ' Will not proceed. To force regeneration, call the module with `generate`' - ' set to `full_idempotence` or `always`, or with `force=yes`.') - - keysize, keytype, self.fingerprint = self._get_current_key_properties() - - if self.regenerate == 'never': - return True - - if not (self.type == keytype and self.size == keysize): - if self.regenerate in ('partial_idempotence', 'full_idempotence', 'always'): - return False - self.module.fail_json( - msg='Key has wrong type and/or size.' - ' Will not proceed. To force regeneration, call the module with `generate`' - ' set to `partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`.' - ) - - # Perms required short-circuits evaluation to prevent the side-effects of running _permissions_changed - # when check_mode is not enabled - return not (perms_required and self._permissions_changed()) - - def is_public_key_valid(self, perms_required=True): - - def _get_pubkey_content(): - if self.exists(public_key=True): - with open(self.path + ".pub", "r") as pubkey_f: - present_pubkey = pubkey_f.read().strip(' \n') - return present_pubkey - else: - return '' - - def _parse_pubkey(pubkey_content): - if pubkey_content: - parts = pubkey_content.split(' ', 2) - if len(parts) < 2: - return () - return parts[0], parts[1], '' if len(parts) <= 2 else parts[2] - return () - - def _pubkey_valid(pubkey): - if pubkey_parts and _parse_pubkey(pubkey): - return pubkey_parts[:2] == _parse_pubkey(pubkey)[:2] - return False - - def _comment_valid(): - if pubkey_parts: - return pubkey_parts[2] == self.comment - return False - - pubkey_parts = _parse_pubkey(_get_pubkey_content()) - - pubkey = self._get_public_key() - if _pubkey_valid(pubkey): - self.public_key = pubkey + for path in (self.private_key_path, self.public_key_path): + self._update_permissions(path) else: - return False + if self._should_remove(): + self._remove() - if self.comment and not _comment_valid(): - return False - - # Perms required short-circuits evaluation to prevent the side-effects of running _permissions_changes - # when check_mode is not enabled - return not (perms_required and self._permissions_changed(public_key=True)) - - def _permissions_changed(self, public_key=False): - file_args = self.module.load_file_common_arguments(self.module.params) - if public_key: - file_args['path'] = file_args['path'] + '.pub' - if self.module.check_file_absent_if_check_mode(file_args['path']): - return True - return self.module.set_fs_attributes_if_different(file_args, False) - - @property - def result(self): - return { - 'changed': self.changed, - 'size': self.size, - 'type': self.type, - 'filename': self.path, - 'fingerprint': self.fingerprint if self.fingerprint else '', - 'public_key': self.public_key, - 'comment': self.comment if self.comment else '', - } - - def remove(self): - """Remove the resource from the filesystem.""" - - try: - os.remove(self.path) - self.changed = True - except (IOError, OSError) as exc: - if exc.errno != errno.ENOENT: - self.module.fail_json(msg=to_native(exc)) - else: + def _load_private_key(self): + result = None + if self._private_key_exists(): + try: + result = self._get_private_key() + except Exception: pass - if self.exists(public_key=True): + return result + + def _private_key_exists(self): + return os.path.exists(self.private_key_path) + + @abc.abstractmethod + def _get_private_key(self): + pass + + def _load_public_key(self): + result = None + if self._public_key_exists(): try: - os.remove(self.path + ".pub") - self.changed = True - except (IOError, OSError) as exc: - if exc.errno != errno.ENOENT: - self.module.fail_json(msg=to_native(exc)) - else: - pass + result = PublicKey.load(self.public_key_path) + except (IOError, OSError): + pass + return result - def exists(self, public_key=False): - return os.path.exists(self.path if not public_key else self.path + ".pub") + def _public_key_exists(self): + return os.path.exists(self.public_key_path) + + def _validate_key_load(self): + if (self._private_key_exists() + and self.regenerate in ('never', 'fail', 'partial_idempotence') + and (self.original_private_key is None or not self._private_key_readable())): + self.module.fail_json( + msg="Unable to read the key. The key is protected with a passphrase or broken. " + + "Will not proceed. To force regeneration, call the module with `generate` " + + "set to `full_idempotence` or `always`, or with `force=yes`." + ) @abc.abstractmethod - def _generate_keypair(self): + def _private_key_readable(self): pass + def _should_generate(self): + if self.regenerate == 'never': + return self.original_private_key is None + elif self.regenerate == 'fail': + if not self._private_key_valid(): + self.module.fail_json( + msg="Key has wrong type and/or size. Will not proceed. " + + "To force regeneration, call the module with `generate` set to " + + "`partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`." + ) + return self.original_private_key is None + elif self.regenerate in ('partial_idempotence', 'full_idempotence'): + return not self._private_key_valid() + else: + return True + + def _private_key_valid(self): + if self.original_private_key is None: + return False + + return all([ + self.size == self.original_private_key.size, + self.type == self.original_private_key.type, + ]) + + @OpensshModule.trigger_change + @OpensshModule.skip_if_check_mode + def _generate(self): + temp_private_key, temp_public_key = self._generate_temp_keypair() + + try: + self._safe_secure_move([(temp_private_key, self.private_key_path), (temp_public_key, self.public_key_path)]) + except OSError as e: + self.module.fail_json(msg=to_native(e)) + + def _generate_temp_keypair(self): + temp_private_key = os.path.join(self.module.tmpdir, os.path.basename(self.private_key_path)) + temp_public_key = temp_private_key + '.pub' + + try: + self._generate_keypair(temp_private_key) + except (IOError, OSError) as e: + self.module.fail_json(msg=to_native(e)) + + for f in (temp_private_key, temp_public_key): + self.module.add_cleanup_file(f) + + return temp_private_key, temp_public_key + @abc.abstractmethod - def _get_current_key_properties(self): + def _generate_keypair(self, private_key_path): pass + def _public_key_valid(self): + if self.original_public_key is None: + return False + + valid_public_key = self._get_public_key() + valid_public_key.comment = self.comment + + return self.original_public_key == valid_public_key + @abc.abstractmethod def _get_public_key(self): pass + @OpensshModule.trigger_change + @OpensshModule.skip_if_check_mode + def _restore_public_key(self): + try: + temp_public_key = self._create_temp_public_key(str(self._get_public_key()) + '\n') + self._safe_secure_move([ + (temp_public_key, self.public_key_path) + ]) + except (IOError, OSError): + self.module.fail_json( + msg="The public key is missing or does not match the private key. " + + "Unable to regenerate the public key." + ) + + if self.comment: + self._update_comment() + + def _create_temp_public_key(self, content): + temp_public_key = os.path.join(self.module.tmpdir, os.path.basename(self.public_key_path)) + + default_permissions = 0o644 + existing_permissions = file_mode(self.public_key_path) + + try: + secure_write(temp_public_key, existing_permissions or default_permissions, to_bytes(content)) + except (IOError, OSError) as e: + self.module.fail_json(msg=to_native(e)) + self.module.add_cleanup_file(temp_public_key) + + return temp_public_key + @abc.abstractmethod def _update_comment(self): pass - @abc.abstractmethod - def _private_key_loadable(self): - pass + def _should_remove(self): + return self._private_key_exists() or self._public_key_exists() - @abc.abstractmethod - def _check_pass_protected_or_broken_key(self): - pass + @OpensshModule.trigger_change + @OpensshModule.skip_if_check_mode + def _remove(self): + try: + if self._private_key_exists(): + os.remove(self.private_key_path) + if self._public_key_exists(): + os.remove(self.public_key_path) + except (IOError, OSError) as e: + self.module.fail_json(msg=to_native(e)) + + @property + def _result(self): + private_key = self.private_key or self.original_private_key + public_key = self.public_key or self.original_public_key + + return { + 'size': self.size, + 'type': self.type, + 'filename': self.private_key_path, + 'fingerprint': private_key.fingerprint if private_key else '', + 'public_key': str(public_key) if public_key else '', + 'comment': public_key.comment if public_key else '', + } + + @property + def diff(self): + before = self.original_private_key.to_dict() if self.original_private_key else {} + before.update(self.original_public_key.to_dict() if self.original_public_key else {}) + + after = self.private_key.to_dict() if self.private_key else {} + after.update(self.public_key.to_dict() if self.public_key else {}) + + return { + 'before': before, + 'after': after, + } class KeypairBackendOpensshBin(KeypairBackend): - def __init__(self, module): super(KeypairBackendOpensshBin, self).__init__(module) - self.openssh_bin = module.get_bin_path('ssh-keygen') + self.ssh_keygen = KeygenCommand(self.module) - def _load_privatekey(self): - return self.module.run_command([self.openssh_bin, '-lf', self.path]) + def _generate_keypair(self, private_key_path): + self.ssh_keygen.generate_keypair(private_key_path, self.size, self.type, self.comment) - def _get_publickey_from_privatekey(self): - # -P '' is always included as an option to induce the expected standard output for - # _check_pass_protected_or_broken_key, but introduces no side-effects when used to - # output a matching public key - return self.module.run_command([self.openssh_bin, '-P', '', '-yf', self.path]) - - def _generate_keypair(self): - args = [ - self.openssh_bin, - '-q', - '-N', '', - '-b', str(self.size), - '-t', self.type, - '-f', self.path, - '-C', self.comment if self.comment else '' - ] - - # "y" must be entered in response to the "overwrite" prompt - stdin_data = 'y' if self.exists() else None - - self.module.run_command(args, data=stdin_data) - - def _get_current_key_properties(self): - rc, stdout, stderr = self._load_privatekey() - properties = stdout.split() - keysize = int(properties[0]) - fingerprint = properties[1] - keytype = properties[-1][1:-1].lower() - - return keysize, keytype, fingerprint + def _get_private_key(self): + private_key_content = self.ssh_keygen.get_private_key(self.private_key_path)[1] + return PrivateKey.from_string(private_key_content) def _get_public_key(self): - rc, stdout, stderr = self._get_publickey_from_privatekey() - return stdout.strip('\n') + public_key_content = self.ssh_keygen.get_matching_public_key(self.private_key_path)[1] + return PublicKey.from_string(public_key_content) + + def _private_key_readable(self): + rc, stdout, stderr = self.ssh_keygen.get_matching_public_key(self.private_key_path) + return not (rc == 255 or any_in(stderr, 'is not a public key file', 'incorrect passphrase', 'load failed')) def _update_comment(self): - return self.module.run_command([self.openssh_bin, '-q', '-o', '-c', '-C', self.comment, '-f', self.path]) - - def _private_key_loadable(self): - rc, stdout, stderr = self._load_privatekey() - return rc == 0 - - def _check_pass_protected_or_broken_key(self): - rc, stdout, stderr = self._get_publickey_from_privatekey() - return rc == 255 or any_in(stderr, 'is not a public key file', 'incorrect passphrase', 'load failed') + try: + self.ssh_keygen.update_comment(self.private_key_path, self.comment) + except (IOError, OSError) as e: + self.module.fail_json(msg=to_native(e)) class KeypairBackendCryptography(KeypairBackend): - def __init__(self, module): super(KeypairBackendCryptography, self).__init__(module) - if module.params['private_key_format'] == 'auto': - ssh = module.get_bin_path('ssh') - if ssh: - proc = module.run_command([ssh, '-Vq']) - ssh_version = parse_openssh_version(proc[2].strip()) - else: - # Default to OpenSSH 7.8 compatibility when OpenSSH is not installed - ssh_version = "7.8" + if self.type == 'rsa1': + self.module.fail_json(msg="RSA1 keys are not supported by the cryptography backend") - self.private_key_format = 'SSH' + self.passphrase = to_bytes(module.params['passphrase']) if module.params['passphrase'] else None + self.private_key_format = self._get_key_format(module.params['private_key_format']) + + def _get_key_format(self, key_format): + result = 'SSH' + + if key_format == 'auto': + # Default to OpenSSH 7.8 compatibility when OpenSSH is not installed + ssh_version = self._get_ssh_version() or "7.8" if LooseVersion(ssh_version) < LooseVersion("7.8") and self.type != 'ed25519': # OpenSSH made SSH formatted private keys available in version 6.5, # but still defaulted to PKCS1 format with the exception of ed25519 keys - self.private_key_format = 'PKCS1' + result = 'PKCS1' - if self.private_key_format == 'SSH' and not HAS_OPENSSH_PRIVATE_FORMAT: - module.fail_json( + if result == 'SSH' and not HAS_OPENSSH_PRIVATE_FORMAT: + self.module.fail_json( msg=missing_required_lib( 'cryptography >= 3.0', reason="to load/dump private keys in the default OpenSSH format for OpenSSH >= 7.8 " + @@ -362,96 +371,72 @@ class KeypairBackendCryptography(KeypairBackend): ) ) - if self.type == 'rsa1': - module.fail_json(msg="RSA1 keys are not supported by the cryptography backend") + return result - self.passphrase = to_bytes(self.passphrase) if self.passphrase else None - - def _load_privatekey(self): - return OpensshKeypair.load(path=self.path, passphrase=self.passphrase, no_public_key=True) - - def _generate_keypair(self): + def _generate_keypair(self, private_key_path): keypair = OpensshKeypair.generate( keytype=self.type, size=self.size, passphrase=self.passphrase, - comment=self.comment if self.comment else "", + comment=self.comment or '', ) - with open(self.path, 'w+b') as f: - f.write( - OpensshKeypair.encode_openssh_privatekey( - keypair.asymmetric_keypair, - self.private_key_format - ) - ) - # ssh-keygen defaults private key permissions to 0600 octal - os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR) - with open(self.path + '.pub', 'w+b') as f: - f.write(keypair.public_key) - # ssh-keygen defaults public key permissions to 0644 octal - os.chmod(self.path + ".pub", stat.S_IWUSR + stat.S_IRUSR + stat.S_IRGRP + stat.S_IROTH) - def _get_current_key_properties(self): - keypair = self._load_privatekey() + encoded_private_key = OpensshKeypair.encode_openssh_privatekey( + keypair.asymmetric_keypair, self.private_key_format + ) + secure_write(private_key_path, 0o600, encoded_private_key) - return keypair.size, keypair.key_type, keypair.fingerprint + public_key_path = private_key_path + '.pub' + secure_write(public_key_path, 0o644, keypair.public_key) + + def _get_private_key(self): + keypair = OpensshKeypair.load(path=self.private_key_path, passphrase=self.passphrase, no_public_key=True) + + return PrivateKey( + size=keypair.size, + key_type=keypair.key_type, + fingerprint=keypair.fingerprint, + ) def _get_public_key(self): try: - keypair = self._load_privatekey() + keypair = OpensshKeypair.load(path=self.private_key_path, passphrase=self.passphrase, no_public_key=True) except OpenSSHError: # Simulates the null output of ssh-keygen return "" - return to_text(keypair.public_key) + return PublicKey.from_string(to_text(keypair.public_key)) - def _update_comment(self): - keypair = self._load_privatekey() + def _private_key_readable(self): try: - keypair.comment = self.comment - with open(self.path + ".pub", "w+b") as pubkey_file: - pubkey_file.write(keypair.public_key + b'\n') - except (InvalidCommentError, IOError, OSError) as e: - # Return values while unused currently are made to simulate the output of run_command() - return 1, "Comment could not be updated", to_native(e) - return 0, "Comment updated successfully", "" - - def _private_key_loadable(self): - try: - self._load_privatekey() - except OpenSSHError: - return False - return True - - def _check_pass_protected_or_broken_key(self): - try: - OpensshKeypair.load( - path=self.path, - passphrase=self.passphrase, - no_public_key=True, - ) + OpensshKeypair.load(path=self.private_key_path, passphrase=self.passphrase, no_public_key=True) except (InvalidPrivateKeyFileError, InvalidPassphraseError): - return True + return False # Cryptography >= 3.0 uses a SSH key loader which does not raise an exception when a passphrase is provided # when loading an unencrypted key if self.passphrase: try: - OpensshKeypair.load( - path=self.path, - passphrase=None, - no_public_key=True, - ) + OpensshKeypair.load(path=self.private_key_path, passphrase=None, no_public_key=True) except (InvalidPrivateKeyFileError, InvalidPassphraseError): - return False - else: return True + else: + return False - return False + return True + def _update_comment(self): + keypair = OpensshKeypair.load(path=self.private_key_path, passphrase=self.passphrase, no_public_key=True) + try: + keypair.comment = self.comment + except InvalidCommentError as e: + self.module.fail_json(msg=to_native(e)) -def any_in(sequence, *elements): - return any(e in sequence for e in elements) + try: + temp_public_key = self._create_temp_public_key(keypair.public_key + b'\n') + self._safe_secure_move([(temp_public_key, self.public_key_path)]) + except (IOError, OSError) as e: + self.module.fail_json(msg=to_native(e)) def select_backend(module, backend): diff --git a/plugins/module_utils/openssh/utils.py b/plugins/module_utils/openssh/utils.py index 03aab56d..22aa7cf7 100644 --- a/plugins/module_utils/openssh/utils.py +++ b/plugins/module_utils/openssh/utils.py @@ -19,7 +19,9 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type +import os import re +from contextlib import contextmanager from struct import Struct from ansible.module_utils.six import PY3 @@ -54,6 +56,16 @@ _UINT64 = Struct(b'!Q') _UINT64_MAX = 0xFFFFFFFFFFFFFFFF +def any_in(sequence, *elements): + return any(e in sequence for e in elements) + + +def file_mode(path): + if not os.path.exists(path): + return 0o000 + return os.stat(path).st_mode & 0o777 + + def parse_openssh_version(version_string): """Parse the version output of ssh -V and return version numbers that can be compared""" @@ -68,6 +80,20 @@ def parse_openssh_version(version_string): return version +@contextmanager +def secure_open(path, mode): + fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode) + try: + yield fd + finally: + os.close(fd) + + +def secure_write(path, mode, content): + with secure_open(path, mode) as fd: + os.write(fd, content) + + # See https://datatracker.ietf.org/doc/html/rfc4251#section-5 for SSH data types class OpensshParser(object): """Parser for OpenSSH encoded objects""" diff --git a/plugins/modules/openssh_cert.py b/plugins/modules/openssh_cert.py index 32f28e36..5f8820a6 100644 --- a/plugins/modules/openssh_cert.py +++ b/plugins/modules/openssh_cert.py @@ -44,8 +44,8 @@ options: required: true regenerate: description: - - When C(never) the task will fail if a certificate already exists at I(path) and is unreadable. - Otherwise, a new certificate will only be generated if there is no existing certificate. + - When C(never) the task will fail if a certificate already exists at I(path) and is unreadable + otherwise a new certificate will only be generated if there is no existing certificate. - When C(fail) the task will fail if a certificate already exists at I(path) and does not match the module's options. - When C(partial_idempotence) an existing certificate will be regenerated based on @@ -239,12 +239,15 @@ info: import os from distutils.version import LooseVersion -from sys import version_info from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.common.text.converters import to_native, to_text -from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.common import safe_atomic_move +from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.common import ( + KeygenCommand, + OpensshModule, + PrivateKey, +) from ansible_collections.community.crypto.plugins.module_utils.openssh.certificate import ( OpensshCertificate, @@ -252,153 +255,120 @@ from ansible_collections.community.crypto.plugins.module_utils.openssh.certifica parse_option_list, ) -from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import ( - parse_openssh_version, -) -PY27 = version_info[0:2] >= (2, 7) - - -class Certificate(object): +class Certificate(OpensshModule): def __init__(self, module): - self.check_mode = module.check_mode - self.module = module - self.ssh_keygen = module.get_bin_path('ssh-keygen', True) + super(Certificate, self).__init__(module) + self.ssh_keygen = KeygenCommand(self.module) - self.force = module.params['force'] - self.identifier = module.params['identifier'] or "" - self.options = module.params['options'] or [] - self.path = module.params['path'] - self.pkcs11_provider = module.params['pkcs11_provider'] - self.principals = module.params['principals'] or [] - self.public_key = module.params['public_key'] - self.regenerate = module.params['regenerate'] if not self.force else 'always' - self.serial_number = module.params['serial_number'] - self.signing_key = module.params['signing_key'] - self.state = module.params['state'] - self.type = module.params['type'] - self.use_agent = module.params['use_agent'] - self.valid_at = module.params['valid_at'] + self.identifier = self.module.params['identifier'] or "" + self.options = self.module.params['options'] or [] + self.path = self.module.params['path'] + self.pkcs11_provider = self.module.params['pkcs11_provider'] + self.principals = self.module.params['principals'] or [] + self.public_key = self.module.params['public_key'] + self.regenerate = self.module.params['regenerate'] if not self.module.params['force'] else 'always' + self.serial_number = self.module.params['serial_number'] + self.signing_key = self.module.params['signing_key'] + self.state = self.module.params['state'] + self.type = self.module.params['type'] + self.use_agent = self.module.params['use_agent'] + self.valid_at = self.module.params['valid_at'] + + self._check_if_base_dir(self.path) + + if self.state == 'present': + self._validate_parameters() - self.changed = False self.data = None self.original_data = None + if self._exists(): + self._load_certificate() + self.time_parameters = None - if self.state == 'present': - try: - self.time_parameters = OpensshCertificateTimeParameters( - valid_from=module.params['valid_from'], - valid_to=module.params['valid_to'], - ) - except ValueError as e: - self.module.fail_json(msg=to_native(e)) + self._set_time_parameters() - if self.exists(): - try: - self.original_data = OpensshCertificate.load(self.path) - except (TypeError, ValueError) as e: - if self.regenerate in ('never', 'fail'): - self.module.fail_json(msg="Unable to read existing certificate: %s" % to_native(e)) - self.module.warn("Unable to read existing certificate: %s" % to_native(e)) + def _validate_parameters(self): + for path in (self.public_key, self.signing_key): + self._check_if_base_dir(path) - self._validate_parameters() + if self.options and self.type == "host": + self.module.fail_json(msg="Options can only be used with user certificates.") - def exists(self): - return os.path.exists(self.path) + if self.use_agent: + self._use_agent_available() - def generate(self): - if self._should_generate(): - if not self.check_mode: - temp_cert = self._generate_temp_certificate() - - try: - safe_atomic_move(self.module, temp_cert, self.path) - except OSError as e: - self.module.fail_json(msg="Unable to write certificate to %s: %s" % (self.path, to_native(e))) - - try: - self.data = OpensshCertificate.load(self.path) - except (TypeError, ValueError) as e: - self.module.fail_json(msg="Unable to read new certificate: %s" % to_native(e)) - - self.changed = True - - if self.exists(): - self._update_permissions() - - def remove(self): - if self.exists(): - if not self.check_mode: - try: - os.remove(self.path) - except OSError as e: - self.module.fail_json(msg="Unable to remove existing certificate: %s" % to_native(e)) - self.changed = True - - @property - def result(self): - result = {'changed': self.changed} - - if self.module._diff: - result['diff'] = { - 'before': get_cert_dict(self.original_data), - 'after': get_cert_dict(self.data) - } - - if self.state == 'present': - result.update({ - 'type': self.type, - 'filename': self.path, - 'info': format_cert_info(self._get_cert_info()), - }) - - return result - - def _check_if_base_dir(self, path): - base_dir = os.path.dirname(path) or '.' - if not os.path.isdir(base_dir): + def _use_agent_available(self): + ssh_version = self._get_ssh_version() + if not ssh_version: + self.module.fail_json(msg="Failed to determine ssh version") + elif LooseVersion(ssh_version) < LooseVersion("7.6"): self.module.fail_json( - name=base_dir, - msg='The directory %s does not exist or the file is not a directory' % base_dir + msg="Signing with CA key in ssh agent requires ssh 7.6 or newer." + + " Your version is: %s" % ssh_version ) - def _command_arguments(self, key_copy_path): - result = [ - self.ssh_keygen, - '-s', self.signing_key, - '-P', '', - '-I', self.identifier, - ] + def _exists(self): + return os.path.exists(self.path) - if self.options: - for option in self.options: - result.extend(['-O', option]) - if self.pkcs11_provider: - result.extend(['-D', self.pkcs11_provider]) - if self.principals: - result.extend(['-n', ','.join(self.principals)]) - if self.serial_number is not None: - result.extend(['-z', str(self.serial_number)]) - if self.type == 'host': - result.extend(['-h']) - if self.use_agent: - result.extend(['-U']) - if self.time_parameters.validity_string: - result.extend(['-V', self.time_parameters.validity_string]) - result.append(key_copy_path) - - return result - - def _compare_options(self): + def _load_certificate(self): try: - critical_options, extensions = parse_option_list(self.options) - except ValueError as e: - return self.module.fail_json(msg=to_native(e)) + self.original_data = OpensshCertificate.load(self.path) + except (TypeError, ValueError) as e: + if self.regenerate in ('never', 'fail'): + self.module.fail_json(msg="Unable to read existing certificate: %s" % to_native(e)) + self.module.warn("Unable to read existing certificate: %s" % to_native(e)) + def _set_time_parameters(self): + try: + self.time_parameters = OpensshCertificateTimeParameters( + valid_from=self.module.params['valid_from'], + valid_to=self.module.params['valid_to'], + ) + except ValueError as e: + self.module.fail_json(msg=to_native(e)) + + def _execute(self): + if self.state == 'present': + if self._should_generate(): + self._generate() + self._update_permissions(self.path) + else: + if self._exists(): + self._remove() + + def _should_generate(self): + if self.regenerate == 'never': + return self.original_data is None + elif self.regenerate == 'fail': + if self.original_data and not self._is_fully_valid(): + self.module.fail_json( + msg="Certificate does not match the provided options.", + cert=get_cert_dict(self.original_data) + ) + return self.original_data is None + elif self.regenerate == 'partial_idempotence': + return self.original_data is None or not self._is_partially_valid() + elif self.regenerate == 'full_idempotence': + return self.original_data is None or not self._is_fully_valid() + else: + return True + + def _is_fully_valid(self): + return self._is_partially_valid() and all([ + self._compare_options(), + self.original_data.key_id == self.identifier, + self.original_data.public_key == self._get_key_fingerprint(self.public_key), + self.original_data.signing_key == self._get_key_fingerprint(self.signing_key), + ]) + + def _is_partially_valid(self): return all([ - set(self.original_data.critical_options) == set(critical_options), - set(self.original_data.extensions) == set(extensions) + set(self.original_data.principals) == set(self.principals), + self.original_data.serial == self.serial_number if self.serial_number is not None else True, + self.original_data.type == self.type, + self._compare_time_parameters(), ]) def _compare_time_parameters(self): @@ -415,6 +385,35 @@ class Certificate(object): original_time_parameters.within_range(self.valid_at) ]) + def _compare_options(self): + try: + critical_options, extensions = parse_option_list(self.options) + except ValueError as e: + return self.module.fail_json(msg=to_native(e)) + + return all([ + set(self.original_data.critical_options) == set(critical_options), + set(self.original_data.extensions) == set(extensions) + ]) + + def _get_key_fingerprint(self, path): + private_key_content = self.ssh_keygen.get_private_key(path, check_rc=True)[1] + return PrivateKey.from_string(private_key_content).fingerprint + + @OpensshModule.trigger_change + @OpensshModule.skip_if_check_mode + def _generate(self): + try: + temp_certificate = self._generate_temp_certificate() + self._safe_secure_move([(temp_certificate, self.path)]) + except OSError as e: + self.module.fail_json(msg="Unable to write certificate to %s: %s" % (self.path, to_native(e))) + + try: + self.data = OpensshCertificate.load(self.path) + except (TypeError, ValueError) as e: + self.module.fail_json(msg="Unable to read new certificate: %s" % to_native(e)) + def _generate_temp_certificate(self): key_copy = os.path.join(self.module.tmpdir, os.path.basename(self.public_key)) @@ -424,77 +423,44 @@ class Certificate(object): self.module.fail_json(msg="Unable to stage temporary key: %s" % to_native(e)) self.module.add_cleanup_file(key_copy) - self.module.run_command(self._command_arguments(key_copy), environ_update=dict(TZ="UTC"), check_rc=True) + self.ssh_keygen.generate_certificate( + key_copy, self.identifier, self.options, self.pkcs11_provider, self.principals, self.serial_number, + self.signing_key, self.type, self.time_parameters, self.use_agent, + environ_update=dict(TZ="UTC"), check_rc=True + ) temp_cert = os.path.splitext(key_copy)[0] + '-cert.pub' self.module.add_cleanup_file(temp_cert) return temp_cert - def _get_cert_info(self): - return self.module.run_command([self.ssh_keygen, '-Lf', self.path])[1] + @OpensshModule.trigger_change + @OpensshModule.skip_if_check_mode + def _remove(self): + try: + os.remove(self.path) + except OSError as e: + self.module.fail_json(msg="Unable to remove existing certificate: %s" % to_native(e)) - def _get_key_fingerprint(self, path): - stdout = self.module.run_command([self.ssh_keygen, '-lf', path], check_rc=True)[1] - return stdout.split()[1] + @property + def _result(self): + if self.state != 'present': + return {} - def _is_valid(self): - partial_result = all([ - set(self.original_data.principals) == set(self.principals), - self.original_data.serial == self.serial_number if self.serial_number is not None else True, - self.original_data.type == self.type, - self._compare_time_parameters(), - ]) + certificate_info = self.ssh_keygen.get_certificate_info(self.path)[1] - if self.regenerate == 'partial_idempotence': - return partial_result + return { + 'type': self.type, + 'filename': self.path, + 'info': format_cert_info(certificate_info), + } - return partial_result and all([ - self._compare_options(), - self.original_data.key_id == self.identifier, - self.original_data.public_key == self._get_key_fingerprint(self.public_key), - self.original_data.signing_key == self._get_key_fingerprint(self.signing_key), - ]) - - def _should_generate(self): - if self.regenerate == 'never': - return self.original_data is None - elif self.regenerate == 'fail': - if self.original_data and not self._is_valid(): - self.module.fail_json( - msg="Certificate does not match the provided options.", - cert=get_cert_dict(self.original_data) - ) - return self.original_data is None - elif self.regenerate in ('partial_idempotence', 'full_idempotence'): - return self.original_data is None or not self._is_valid() - else: - return True - - def _update_permissions(self): - file_args = self.module.load_file_common_arguments(self.module.params) - self.changed = self.module.set_fs_attributes_if_different(file_args, self.changed) - - def _validate_parameters(self): - self._check_if_base_dir(self.path) - - if self.state == 'present': - for path in (self.public_key, self.signing_key): - self._check_if_base_dir(path) - - if self.options and self.type == "host": - self.module.fail_json(msg="Options can only be used with user certificates.") - - if self.use_agent: - ssh_version_string = self.module.run_command([self.module.get_bin_path('ssh', True), '-Vq'])[2].strip() - ssh_version = parse_openssh_version(ssh_version_string) - if ssh_version is None: - self.module.fail_json(msg="Failed to parse ssh version from: %s" % ssh_version_string) - elif LooseVersion(ssh_version) < LooseVersion("7.6"): - self.module.fail_json( - msg="Signing with CA key in ssh agent requires ssh 7.6 or newer." + - " Your version is: %s" % ssh_version_string - ) + @property + def diff(self): + return { + 'before': get_cert_dict(self.original_data), + 'after': get_cert_dict(self.data) + } def format_cert_info(cert_info): @@ -551,14 +517,7 @@ def main(): required_if=[('state', 'present', ['type', 'signing_key', 'public_key', 'valid_from', 'valid_to'])], ) - certificate = Certificate(module) - - if certificate.state == 'present': - certificate.generate() - else: - certificate.remove() - - module.exit_json(**certificate.result) + Certificate(module).execute() if __name__ == '__main__': diff --git a/plugins/modules/openssh_keypair.py b/plugins/modules/openssh_keypair.py index 5090014b..62a975b2 100644 --- a/plugins/modules/openssh_keypair.py +++ b/plugins/modules/openssh_keypair.py @@ -186,8 +186,6 @@ comment: sample: test@comment ''' -import os - from ansible.module_utils.basic import AnsibleModule from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.keypair_backend import ( @@ -218,32 +216,9 @@ def main(): add_file_common_args=True, ) - base_dir = os.path.dirname(module.params['path']) or '.' - if not os.path.isdir(base_dir): - module.fail_json( - name=base_dir, - msg='The directory %s does not exist or the file is not a directory' % base_dir - ) - keypair = select_backend(module, module.params['backend'])[1] - if module.params['state'] == 'present': - if module.check_mode: - keypair.changed = any([ - keypair.force, - not keypair.is_private_key_valid(), - not keypair.is_public_key_valid() - ]) - else: - keypair.generate() - else: - # When `state=absent` no details from an existing key at the given `path` are returned in the module result - if module.check_mode: - keypair.changed = keypair.exists() - else: - keypair.remove() - - module.exit_json(**keypair.result) + keypair.execute() if __name__ == '__main__': diff --git a/tests/integration/targets/openssh_keypair/tests/core.yml b/tests/integration/targets/openssh_keypair/tests/core.yml index 4bedea85..353412d4 100644 --- a/tests/integration/targets/openssh_keypair/tests/core.yml +++ b/tests/integration/targets/openssh_keypair/tests/core.yml @@ -52,8 +52,8 @@ that: - core_output['fingerprint'] is string - core_output['fingerprint'].startswith('SHA256:') - # only distro old enough that it still gives md5 with no prefix - when: not (ansible_distribution == 'CentOS' and ansible_distribution_major_version == '6') + # SHA256 was made the default hashing algorithm for fingerprints in OpenSSH 6.8 + when: not (backend == 'opensshbin' and openssh_version is version('6.8', '<')) - name: "({{ backend }}) Assert key returns public_key" assert: diff --git a/tests/integration/targets/openssh_keypair/tests/invalid.yml b/tests/integration/targets/openssh_keypair/tests/invalid.yml index a2ed9744..43ff7b04 100644 --- a/tests/integration/targets/openssh_keypair/tests/invalid.yml +++ b/tests/integration/targets/openssh_keypair/tests/invalid.yml @@ -87,13 +87,6 @@ assert: that: - write_only_private_key_after.stat.mode == '0200' - when: backend == 'opensshbin' - -- name: "({{ backend }}) Assert key permissions are not preserved with 'cryptography'" - assert: - that: - - write_only_private_key_after.stat.mode == '0600' - when: backend == 'cryptography' - name: "({{ backend }}) Remove key - write-only" openssh_keypair: diff --git a/tests/integration/targets/openssh_keypair/tests/options.yml b/tests/integration/targets/openssh_keypair/tests/options.yml index f6d83651..6cab8e28 100644 --- a/tests/integration/targets/openssh_keypair/tests/options.yml +++ b/tests/integration/targets/openssh_keypair/tests/options.yml @@ -55,7 +55,8 @@ openssh_keypair: path: "{{ output_dir }}/default_size_ed25519" state: absent - when: not (ansible_distribution == 'CentOS' and ansible_distribution_major_version == '6') + # Support for ed25519 keys was added in OpenSSH 6.5 + when: not (backend == 'opensshbin' and openssh_version is version('6.5', '<')) - name: "({{ backend }}) Generate key - force" openssh_keypair: @@ -96,12 +97,18 @@ backend: "{{ backend }}" register: modified_comment_output -- name: "({{ backend }}) Assert only comment changed - comment" +- name: "({{ backend }}) Assert comment preserved public key - comment" assert: that: - comment_output.public_key == modified_comment_output.public_key - comment_output.comment == 'test@comment' + +- name: "({{ backend }}) Assert comment changed - comment" + assert: + that: - modified_comment_output.comment == 'test_modified@comment' + # Support for updating comments for key types other than rsa1 was added in OpenSSH 7.2 + when: not (backend == 'opensshbin' and openssh_version is version('7.2', '<')) - name: "({{ backend }}) Remove key - comment" openssh_keypair: diff --git a/tests/integration/targets/setup_ssh_keygen/tasks/main.yml b/tests/integration/targets/setup_ssh_keygen/tasks/main.yml index 1c8a7143..47c0dd61 100644 --- a/tests/integration/targets/setup_ssh_keygen/tasks/main.yml +++ b/tests/integration/targets/setup_ssh_keygen/tasks/main.yml @@ -12,3 +12,12 @@ package: name: '{{ openssh_client_package_name }}' when: not ansible_os_family == "Darwin" and not ansible_os_family == "FreeBSD" + +- name: Get ssh version + shell: ssh -Vq 2>&1|sed 's/^.*OpenSSH_\([0-9]\{1,\}\.[0-9]\{1,\}\).*$/\1/' + register: + rc_openssh_version_output + +- name: Set ssh version facts + set_fact: + openssh_version: "{{ rc_openssh_version_output.stdout.strip() }}"