openssh_keypair - Add diff support and general cleanup (#260)

* Initial commit

* Matching tests to overwritten permissions behavior with cryptography

* Ensuring key validation only occurs when state=present and accomodating CentOS6 restrictions

* Making ssh-keygen behavior explicit by version in tests

* Ensuring cyrptography not excluded in new conditions

* Adding changelog fragment

* Fixing sanity checks

* Improving readability

* Applying review suggestions

* addressing restore_on_failure conflict
pull/265/head
Ajpantuso 2021-08-18 03:22:31 -04:00 committed by GitHub
parent b59846b9fa
commit 08ada24a53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 795 additions and 550 deletions

View File

@ -0,0 +1,8 @@
---
minor_changes:
- openssh_keypair - added ``diff`` support (https://github.com/ansible-collections/community.crypto/pull/260).
bugfixes:
- openssh_keypair - fixed error handling to restore original keypair if regeneration fails
(https://github.com/ansible-collections/community.crypto/pull/260).
- openssh_keypair - fixed ``cryptography`` backend to preserve original file permissions when regenerating a keypair
requires existing files to be overwritten (https://github.com/ansible-collections/community.crypto/pull/260).

View File

@ -18,7 +18,15 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import abc
import os
import stat
from ansible.module_utils import six
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
parse_openssh_version,
)
def restore_on_failure(f):
@ -40,3 +48,278 @@ def restore_on_failure(f):
@restore_on_failure
def safe_atomic_move(module, path, destination):
module.atomic_move(path, destination)
def _restore_all_on_failure(f):
def backup_and_restore(self, sources_and_destinations, *args, **kwargs):
backups = [(d, self.module.backup_local(d)) for s, d in sources_and_destinations if os.path.exists(d)]
try:
f(self, sources_and_destinations, *args, **kwargs)
except Exception:
for destination, backup in backups:
self.module.atomic_move(backup, destination)
raise
else:
for destination, backup in backups:
self.module.add_cleanup_file(backup)
return backup_and_restore
@six.add_metaclass(abc.ABCMeta)
class OpensshModule(object):
def __init__(self, module):
self.module = module
self.changed = False
self.check_mode = self.module.check_mode
def execute(self):
self._execute()
self.module.exit_json(**self.result)
@abc.abstractmethod
def _execute(self):
pass
@property
def result(self):
result = self._result
result['changed'] = self.changed
if self.module._diff:
result['diff'] = self.diff
return result
@property
@abc.abstractmethod
def _result(self):
pass
@property
@abc.abstractmethod
def diff(self):
pass
@staticmethod
def skip_if_check_mode(f):
def wrapper(self, *args, **kwargs):
if not self.check_mode:
f(self, *args, **kwargs)
return wrapper
@staticmethod
def trigger_change(f):
def wrapper(self, *args, **kwargs):
f(self, *args, **kwargs)
self.changed = True
return wrapper
def _check_if_base_dir(self, path):
base_dir = os.path.dirname(path) or '.'
if not os.path.isdir(base_dir):
self.module.fail_json(
name=base_dir,
msg='The directory %s does not exist or the file is not a directory' % base_dir
)
def _get_ssh_version(self):
ssh_bin = self.module.get_bin_path('ssh')
if not ssh_bin:
return ""
return parse_openssh_version(self.module.run_command([ssh_bin, '-V', '-q'])[2].strip())
@_restore_all_on_failure
def _safe_secure_move(self, sources_and_destinations):
"""Moves a list of files from 'source' to 'destination' and restores 'destination' from backup upon failure.
If 'destination' does not already exist, then 'source' permissions are preserved to prevent
exposing protected data ('atomic_move' uses the 'destination' base directory mask for
permissions if 'destination' does not already exists).
"""
for source, destination in sources_and_destinations:
if os.path.exists(destination):
self.module.atomic_move(source, destination)
else:
self.module.preserved_copy(source, destination)
def _update_permissions(self, path):
file_args = self.module.load_file_common_arguments(self.module.params)
file_args['path'] = path
if not self.module.check_file_absent_if_check_mode(path):
self.changed = self.module.set_fs_attributes_if_different(file_args, self.changed)
else:
self.changed = True
class KeygenCommand(object):
def __init__(self, module):
self._bin_path = module.get_bin_path('ssh-keygen', True)
self._run_command = module.run_command
def generate_certificate(self, certificate_path, identifier, options, pkcs11_provider, principals,
serial_number, signing_key_path, type, time_parameters, use_agent, **kwargs):
args = [self._bin_path, '-s', signing_key_path, '-P', '', '-I', identifier]
if options:
for option in options:
args.extend(['-O', option])
if pkcs11_provider:
args.extend(['-D', pkcs11_provider])
if principals:
args.extend(['-n', ','.join(principals)])
if serial_number is not None:
args.extend(['-z', str(serial_number)])
if type == 'host':
args.extend(['-h'])
if use_agent:
args.extend(['-U'])
if time_parameters.validity_string:
args.extend(['-V', time_parameters.validity_string])
args.append(certificate_path)
return self._run_command(args, **kwargs)
def generate_keypair(self, private_key_path, size, type, comment, **kwargs):
args = [
self._bin_path,
'-q',
'-N', '',
'-b', str(size),
'-t', type,
'-f', private_key_path,
'-C', comment or ''
]
# "y" must be entered in response to the "overwrite" prompt
data = 'y' if os.path.exists(private_key_path) else None
return self._run_command(args, data=data, **kwargs)
def get_certificate_info(self, certificate_path, **kwargs):
return self._run_command([self._bin_path, '-L', '-f', certificate_path], **kwargs)
def get_matching_public_key(self, private_key_path, **kwargs):
return self._run_command([self._bin_path, '-P', '', '-y', '-f', private_key_path], **kwargs)
def get_private_key(self, private_key_path, **kwargs):
return self._run_command([self._bin_path, '-l', '-f', private_key_path], **kwargs)
def update_comment(self, private_key_path, comment, **kwargs):
if os.path.exists(private_key_path) and not os.access(private_key_path, os.W_OK):
try:
os.chmod(private_key_path, stat.S_IWUSR + stat.S_IRUSR)
except (IOError, OSError) as e:
raise e("The private key at %s is not writeable preventing a comment update" % private_key_path)
return self._run_command([self._bin_path, '-q', '-o', '-c', '-C', comment, '-f', private_key_path], **kwargs)
class PrivateKey(object):
def __init__(self, size, key_type, fingerprint):
self._size = size
self._type = key_type
self._fingerprint = fingerprint
@property
def size(self):
return self._size
@property
def type(self):
return self._type
@property
def fingerprint(self):
return self._fingerprint
@classmethod
def from_string(cls, string):
properties = string.split()
return cls(
size=int(properties[0]),
key_type=properties[-1][1:-1].lower(),
fingerprint=properties[1],
)
def to_dict(self):
return {
'size': self._size,
'type': self._type,
'fingerprint': self._fingerprint,
}
class PublicKey(object):
def __init__(self, type_string, data, comment):
self._type_string = type_string
self._data = data
self._comment = comment
def __eq__(self, other):
if not isinstance(other, type(self)):
return NotImplemented
return all([
self._type_string == other._type_string,
self._data == other._data,
(self._comment == other._comment) if self._comment is not None and other._comment is not None else True
])
def __ne__(self, other):
return not self == other
def __str__(self):
return "%s %s" % (self._type_string, self._data)
@property
def comment(self):
return self._comment
@comment.setter
def comment(self, value):
self._comment = value
@property
def data(self):
return self._data
@property
def type_string(self):
return self._type_string
@classmethod
def from_string(cls, string):
properties = string.strip('\n').split(' ', 2)
return cls(
type_string=properties[0],
data=properties[1],
comment=properties[2] if len(properties) > 2 else ""
)
@classmethod
def load(cls, path):
try:
with open(path, 'r') as f:
properties = f.read().strip(' \n').split(' ', 2)
except (IOError, OSError):
raise
if len(properties) < 2:
return None
return cls(
type_string=properties[0],
data=properties[1],
comment='' if len(properties) <= 2 else properties[2],
)
def to_dict(self):
return {
'comment': self._comment,
'public_key': self._data,
}

View File

@ -20,16 +20,13 @@ from __future__ import absolute_import, division, print_function
__metaclass__ = type
import abc
import errno
import os
import stat
from distutils.version import LooseVersion
from ansible.module_utils import six
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_native, to_text, to_bytes
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import parse_openssh_version
from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptography import (
HAS_OPENSSH_SUPPORT,
HAS_OPENSSH_PRIVATE_FORMAT,
@ -39,322 +36,334 @@ from ansible_collections.community.crypto.plugins.module_utils.openssh.cryptogra
OpenSSHError,
OpensshKeypair,
)
from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.common import (
KeygenCommand,
OpensshModule,
PrivateKey,
PublicKey,
)
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
any_in,
file_mode,
secure_write,
)
@six.add_metaclass(abc.ABCMeta)
class KeypairBackend(object):
class KeypairBackend(OpensshModule):
def __init__(self, module):
self.module = module
super(KeypairBackend, self).__init__(module)
self.path = module.params['path']
self.force = module.params['force']
self.size = module.params['size']
self.type = module.params['type']
self.comment = module.params['comment']
self.passphrase = module.params['passphrase']
self.regenerate = module.params['regenerate']
self.comment = self.module.params['comment']
self.private_key_path = self.module.params['path']
self.public_key_path = self.private_key_path + '.pub'
self.regenerate = self.module.params['regenerate'] if not self.module.params['force'] else 'always'
self.state = self.module.params['state']
self.type = self.module.params['type']
self.changed = False
self.fingerprint = ''
self.public_key = {}
self.size = self._get_size(self.module.params['size'])
self._validate_path()
if self.regenerate == 'always':
self.force = True
self.original_private_key = None
self.original_public_key = None
self.private_key = None
self.public_key = None
def _get_size(self, size):
if self.type in ('rsa', 'rsa1'):
self.size = 4096 if self.size is None else self.size
if self.size < 1024:
module.fail_json(msg=('For RSA keys, the minimum size is 1024 bits and the default is 4096 bits. '
'Attempting to use bit lengths under 1024 will cause the module to fail.'))
result = 4096 if size is None else size
if result < 1024:
return self.module.fail_json(
msg="For RSA keys, the minimum size is 1024 bits and the default is 4096 bits. " +
"Attempting to use bit lengths under 1024 will cause the module to fail."
)
elif self.type == 'dsa':
self.size = 1024 if self.size is None else self.size
if self.size != 1024:
module.fail_json(msg=('DSA keys must be exactly 1024 bits as specified by FIPS 186-2.'))
result = 1024 if size is None else size
if result != 1024:
return self.module.fail_json(msg="DSA keys must be exactly 1024 bits as specified by FIPS 186-2.")
elif self.type == 'ecdsa':
self.size = 256 if self.size is None else self.size
if self.size not in (256, 384, 521):
module.fail_json(msg=('For ECDSA keys, size determines the key length by selecting from '
'one of three elliptic curve sizes: 256, 384 or 521 bits. '
'Attempting to use bit lengths other than these three values for '
'ECDSA keys will cause this module to fail. '))
result = 256 if size is None else size
if result not in (256, 384, 521):
return self.module.fail_json(
msg="For ECDSA keys, size determines the key length by selecting from one of " +
"three elliptic curve sizes: 256, 384 or 521 bits. " +
"Attempting to use bit lengths other than these three values for ECDSA keys will " +
"cause this module to fail."
)
elif self.type == 'ed25519':
# User input is ignored for `key size` when `key type` is ed25519
self.size = 256
result = 256
else:
module.fail_json(msg="%s is not a valid value for key type" % self.type)
return self.module.fail_json(msg="%s is not a valid value for key type" % self.type)
def generate(self):
if self.force or not self.is_private_key_valid(perms_required=False):
try:
if self.exists() and not os.access(self.path, os.W_OK):
os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR)
self._generate_keypair()
self.changed = True
except (IOError, OSError) as e:
self.remove()
self.module.fail_json(msg="%s" % to_native(e))
return result
self.fingerprint = self._get_current_key_properties()[2]
self.public_key = self._get_public_key()
elif not self.is_public_key_valid(perms_required=False):
pubkey = self._get_public_key()
def _validate_path(self):
self._check_if_base_dir(self.private_key_path)
if os.path.isdir(self.private_key_path):
self.module.fail_json(msg='%s is a directory. Please specify a path to a file.' % self.private_key_path)
def _execute(self):
self.original_private_key = self._load_private_key()
self.original_public_key = self._load_public_key()
if self.state == 'present':
self._validate_key_load()
if self._should_generate():
self._generate()
elif not self._public_key_valid():
self._restore_public_key()
self.private_key = self._load_private_key()
self.public_key = self._load_public_key()
for path in (self.private_key_path, self.public_key_path):
self._update_permissions(path)
else:
if self._should_remove():
self._remove()
def _load_private_key(self):
result = None
if self._private_key_exists():
try:
with open(self.path + ".pub", "w") as pubkey_f:
pubkey_f.write(pubkey + '\n')
os.chmod(self.path + ".pub", stat.S_IWUSR + stat.S_IRUSR + stat.S_IRGRP + stat.S_IROTH)
result = self._get_private_key()
except Exception:
pass
return result
def _private_key_exists(self):
return os.path.exists(self.private_key_path)
@abc.abstractmethod
def _get_private_key(self):
pass
def _load_public_key(self):
result = None
if self._public_key_exists():
try:
result = PublicKey.load(self.public_key_path)
except (IOError, OSError):
pass
return result
def _public_key_exists(self):
return os.path.exists(self.public_key_path)
def _validate_key_load(self):
if (self._private_key_exists()
and self.regenerate in ('never', 'fail', 'partial_idempotence')
and (self.original_private_key is None or not self._private_key_readable())):
self.module.fail_json(
msg='The public key is missing or does not match the private key. '
'Unable to regenerate the public key.')
self.changed = True
self.public_key = pubkey
if self.comment:
try:
if self.exists() and not os.access(self.path, os.W_OK):
os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR)
except (IOError, OSError):
self.module.fail_json(msg='Unable to update the comment for the public key.')
self._update_comment()
private_key_perms_changed = self._permissions_changed()
public_key_perms_changed = self._permissions_changed(public_key=True)
if private_key_perms_changed or public_key_perms_changed:
self.changed = True
def is_private_key_valid(self, perms_required=True):
if not self.exists():
return False
if self._check_pass_protected_or_broken_key():
if self.regenerate in ('full_idempotence', 'always'):
return False
self.module.fail_json(msg='Unable to read the key. The key is protected with a passphrase or broken.'
' Will not proceed. To force regeneration, call the module with `generate`'
' set to `full_idempotence` or `always`, or with `force=yes`.')
if not self._private_key_loadable():
if os.path.isdir(self.path):
self.module.fail_json(msg='%s is a directory. Please specify a path to a file.' % self.path)
if self.regenerate in ('full_idempotence', 'always'):
return False
self.module.fail_json(msg='Unable to read the key. The key is protected with a passphrase or broken.'
' Will not proceed. To force regeneration, call the module with `generate`'
' set to `full_idempotence` or `always`, or with `force=yes`.')
keysize, keytype, self.fingerprint = self._get_current_key_properties()
if self.regenerate == 'never':
return True
if not (self.type == keytype and self.size == keysize):
if self.regenerate in ('partial_idempotence', 'full_idempotence', 'always'):
return False
self.module.fail_json(
msg='Key has wrong type and/or size.'
' Will not proceed. To force regeneration, call the module with `generate`'
' set to `partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`.'
msg="Unable to read the key. The key is protected with a passphrase or broken. " +
"Will not proceed. To force regeneration, call the module with `generate` " +
"set to `full_idempotence` or `always`, or with `force=yes`."
)
# Perms required short-circuits evaluation to prevent the side-effects of running _permissions_changed
# when check_mode is not enabled
return not (perms_required and self._permissions_changed())
@abc.abstractmethod
def _private_key_readable(self):
pass
def is_public_key_valid(self, perms_required=True):
def _get_pubkey_content():
if self.exists(public_key=True):
with open(self.path + ".pub", "r") as pubkey_f:
present_pubkey = pubkey_f.read().strip(' \n')
return present_pubkey
def _should_generate(self):
if self.regenerate == 'never':
return self.original_private_key is None
elif self.regenerate == 'fail':
if not self._private_key_valid():
self.module.fail_json(
msg="Key has wrong type and/or size. Will not proceed. " +
"To force regeneration, call the module with `generate` set to " +
"`partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`."
)
return self.original_private_key is None
elif self.regenerate in ('partial_idempotence', 'full_idempotence'):
return not self._private_key_valid()
else:
return ''
def _parse_pubkey(pubkey_content):
if pubkey_content:
parts = pubkey_content.split(' ', 2)
if len(parts) < 2:
return ()
return parts[0], parts[1], '' if len(parts) <= 2 else parts[2]
return ()
def _pubkey_valid(pubkey):
if pubkey_parts and _parse_pubkey(pubkey):
return pubkey_parts[:2] == _parse_pubkey(pubkey)[:2]
return False
def _comment_valid():
if pubkey_parts:
return pubkey_parts[2] == self.comment
return False
pubkey_parts = _parse_pubkey(_get_pubkey_content())
pubkey = self._get_public_key()
if _pubkey_valid(pubkey):
self.public_key = pubkey
else:
return False
if self.comment and not _comment_valid():
return False
# Perms required short-circuits evaluation to prevent the side-effects of running _permissions_changes
# when check_mode is not enabled
return not (perms_required and self._permissions_changed(public_key=True))
def _permissions_changed(self, public_key=False):
file_args = self.module.load_file_common_arguments(self.module.params)
if public_key:
file_args['path'] = file_args['path'] + '.pub'
if self.module.check_file_absent_if_check_mode(file_args['path']):
return True
return self.module.set_fs_attributes_if_different(file_args, False)
@property
def result(self):
return {
'changed': self.changed,
'size': self.size,
'type': self.type,
'filename': self.path,
'fingerprint': self.fingerprint if self.fingerprint else '',
'public_key': self.public_key,
'comment': self.comment if self.comment else '',
}
def _private_key_valid(self):
if self.original_private_key is None:
return False
def remove(self):
"""Remove the resource from the filesystem."""
return all([
self.size == self.original_private_key.size,
self.type == self.original_private_key.type,
])
@OpensshModule.trigger_change
@OpensshModule.skip_if_check_mode
def _generate(self):
temp_private_key, temp_public_key = self._generate_temp_keypair()
try:
os.remove(self.path)
self.changed = True
except (IOError, OSError) as exc:
if exc.errno != errno.ENOENT:
self.module.fail_json(msg=to_native(exc))
else:
pass
self._safe_secure_move([(temp_private_key, self.private_key_path), (temp_public_key, self.public_key_path)])
except OSError as e:
self.module.fail_json(msg=to_native(e))
def _generate_temp_keypair(self):
temp_private_key = os.path.join(self.module.tmpdir, os.path.basename(self.private_key_path))
temp_public_key = temp_private_key + '.pub'
if self.exists(public_key=True):
try:
os.remove(self.path + ".pub")
self.changed = True
except (IOError, OSError) as exc:
if exc.errno != errno.ENOENT:
self.module.fail_json(msg=to_native(exc))
else:
pass
self._generate_keypair(temp_private_key)
except (IOError, OSError) as e:
self.module.fail_json(msg=to_native(e))
def exists(self, public_key=False):
return os.path.exists(self.path if not public_key else self.path + ".pub")
for f in (temp_private_key, temp_public_key):
self.module.add_cleanup_file(f)
return temp_private_key, temp_public_key
@abc.abstractmethod
def _generate_keypair(self):
def _generate_keypair(self, private_key_path):
pass
@abc.abstractmethod
def _get_current_key_properties(self):
pass
def _public_key_valid(self):
if self.original_public_key is None:
return False
valid_public_key = self._get_public_key()
valid_public_key.comment = self.comment
return self.original_public_key == valid_public_key
@abc.abstractmethod
def _get_public_key(self):
pass
@OpensshModule.trigger_change
@OpensshModule.skip_if_check_mode
def _restore_public_key(self):
try:
temp_public_key = self._create_temp_public_key(str(self._get_public_key()) + '\n')
self._safe_secure_move([
(temp_public_key, self.public_key_path)
])
except (IOError, OSError):
self.module.fail_json(
msg="The public key is missing or does not match the private key. " +
"Unable to regenerate the public key."
)
if self.comment:
self._update_comment()
def _create_temp_public_key(self, content):
temp_public_key = os.path.join(self.module.tmpdir, os.path.basename(self.public_key_path))
default_permissions = 0o644
existing_permissions = file_mode(self.public_key_path)
try:
secure_write(temp_public_key, existing_permissions or default_permissions, to_bytes(content))
except (IOError, OSError) as e:
self.module.fail_json(msg=to_native(e))
self.module.add_cleanup_file(temp_public_key)
return temp_public_key
@abc.abstractmethod
def _update_comment(self):
pass
@abc.abstractmethod
def _private_key_loadable(self):
pass
def _should_remove(self):
return self._private_key_exists() or self._public_key_exists()
@abc.abstractmethod
def _check_pass_protected_or_broken_key(self):
pass
@OpensshModule.trigger_change
@OpensshModule.skip_if_check_mode
def _remove(self):
try:
if self._private_key_exists():
os.remove(self.private_key_path)
if self._public_key_exists():
os.remove(self.public_key_path)
except (IOError, OSError) as e:
self.module.fail_json(msg=to_native(e))
@property
def _result(self):
private_key = self.private_key or self.original_private_key
public_key = self.public_key or self.original_public_key
return {
'size': self.size,
'type': self.type,
'filename': self.private_key_path,
'fingerprint': private_key.fingerprint if private_key else '',
'public_key': str(public_key) if public_key else '',
'comment': public_key.comment if public_key else '',
}
@property
def diff(self):
before = self.original_private_key.to_dict() if self.original_private_key else {}
before.update(self.original_public_key.to_dict() if self.original_public_key else {})
after = self.private_key.to_dict() if self.private_key else {}
after.update(self.public_key.to_dict() if self.public_key else {})
return {
'before': before,
'after': after,
}
class KeypairBackendOpensshBin(KeypairBackend):
def __init__(self, module):
super(KeypairBackendOpensshBin, self).__init__(module)
self.openssh_bin = module.get_bin_path('ssh-keygen')
self.ssh_keygen = KeygenCommand(self.module)
def _load_privatekey(self):
return self.module.run_command([self.openssh_bin, '-lf', self.path])
def _generate_keypair(self, private_key_path):
self.ssh_keygen.generate_keypair(private_key_path, self.size, self.type, self.comment)
def _get_publickey_from_privatekey(self):
# -P '' is always included as an option to induce the expected standard output for
# _check_pass_protected_or_broken_key, but introduces no side-effects when used to
# output a matching public key
return self.module.run_command([self.openssh_bin, '-P', '', '-yf', self.path])
def _generate_keypair(self):
args = [
self.openssh_bin,
'-q',
'-N', '',
'-b', str(self.size),
'-t', self.type,
'-f', self.path,
'-C', self.comment if self.comment else ''
]
# "y" must be entered in response to the "overwrite" prompt
stdin_data = 'y' if self.exists() else None
self.module.run_command(args, data=stdin_data)
def _get_current_key_properties(self):
rc, stdout, stderr = self._load_privatekey()
properties = stdout.split()
keysize = int(properties[0])
fingerprint = properties[1]
keytype = properties[-1][1:-1].lower()
return keysize, keytype, fingerprint
def _get_private_key(self):
private_key_content = self.ssh_keygen.get_private_key(self.private_key_path)[1]
return PrivateKey.from_string(private_key_content)
def _get_public_key(self):
rc, stdout, stderr = self._get_publickey_from_privatekey()
return stdout.strip('\n')
public_key_content = self.ssh_keygen.get_matching_public_key(self.private_key_path)[1]
return PublicKey.from_string(public_key_content)
def _private_key_readable(self):
rc, stdout, stderr = self.ssh_keygen.get_matching_public_key(self.private_key_path)
return not (rc == 255 or any_in(stderr, 'is not a public key file', 'incorrect passphrase', 'load failed'))
def _update_comment(self):
return self.module.run_command([self.openssh_bin, '-q', '-o', '-c', '-C', self.comment, '-f', self.path])
def _private_key_loadable(self):
rc, stdout, stderr = self._load_privatekey()
return rc == 0
def _check_pass_protected_or_broken_key(self):
rc, stdout, stderr = self._get_publickey_from_privatekey()
return rc == 255 or any_in(stderr, 'is not a public key file', 'incorrect passphrase', 'load failed')
try:
self.ssh_keygen.update_comment(self.private_key_path, self.comment)
except (IOError, OSError) as e:
self.module.fail_json(msg=to_native(e))
class KeypairBackendCryptography(KeypairBackend):
def __init__(self, module):
super(KeypairBackendCryptography, self).__init__(module)
if module.params['private_key_format'] == 'auto':
ssh = module.get_bin_path('ssh')
if ssh:
proc = module.run_command([ssh, '-Vq'])
ssh_version = parse_openssh_version(proc[2].strip())
else:
# Default to OpenSSH 7.8 compatibility when OpenSSH is not installed
ssh_version = "7.8"
if self.type == 'rsa1':
self.module.fail_json(msg="RSA1 keys are not supported by the cryptography backend")
self.private_key_format = 'SSH'
self.passphrase = to_bytes(module.params['passphrase']) if module.params['passphrase'] else None
self.private_key_format = self._get_key_format(module.params['private_key_format'])
def _get_key_format(self, key_format):
result = 'SSH'
if key_format == 'auto':
# Default to OpenSSH 7.8 compatibility when OpenSSH is not installed
ssh_version = self._get_ssh_version() or "7.8"
if LooseVersion(ssh_version) < LooseVersion("7.8") and self.type != 'ed25519':
# OpenSSH made SSH formatted private keys available in version 6.5,
# but still defaulted to PKCS1 format with the exception of ed25519 keys
self.private_key_format = 'PKCS1'
result = 'PKCS1'
if self.private_key_format == 'SSH' and not HAS_OPENSSH_PRIVATE_FORMAT:
module.fail_json(
if result == 'SSH' and not HAS_OPENSSH_PRIVATE_FORMAT:
self.module.fail_json(
msg=missing_required_lib(
'cryptography >= 3.0',
reason="to load/dump private keys in the default OpenSSH format for OpenSSH >= 7.8 " +
@ -362,96 +371,72 @@ class KeypairBackendCryptography(KeypairBackend):
)
)
if self.type == 'rsa1':
module.fail_json(msg="RSA1 keys are not supported by the cryptography backend")
return result
self.passphrase = to_bytes(self.passphrase) if self.passphrase else None
def _load_privatekey(self):
return OpensshKeypair.load(path=self.path, passphrase=self.passphrase, no_public_key=True)
def _generate_keypair(self):
def _generate_keypair(self, private_key_path):
keypair = OpensshKeypair.generate(
keytype=self.type,
size=self.size,
passphrase=self.passphrase,
comment=self.comment if self.comment else "",
comment=self.comment or '',
)
with open(self.path, 'w+b') as f:
f.write(
OpensshKeypair.encode_openssh_privatekey(
keypair.asymmetric_keypair,
self.private_key_format
)
)
# ssh-keygen defaults private key permissions to 0600 octal
os.chmod(self.path, stat.S_IWUSR + stat.S_IRUSR)
with open(self.path + '.pub', 'w+b') as f:
f.write(keypair.public_key)
# ssh-keygen defaults public key permissions to 0644 octal
os.chmod(self.path + ".pub", stat.S_IWUSR + stat.S_IRUSR + stat.S_IRGRP + stat.S_IROTH)
def _get_current_key_properties(self):
keypair = self._load_privatekey()
encoded_private_key = OpensshKeypair.encode_openssh_privatekey(
keypair.asymmetric_keypair, self.private_key_format
)
secure_write(private_key_path, 0o600, encoded_private_key)
return keypair.size, keypair.key_type, keypair.fingerprint
public_key_path = private_key_path + '.pub'
secure_write(public_key_path, 0o644, keypair.public_key)
def _get_private_key(self):
keypair = OpensshKeypair.load(path=self.private_key_path, passphrase=self.passphrase, no_public_key=True)
return PrivateKey(
size=keypair.size,
key_type=keypair.key_type,
fingerprint=keypair.fingerprint,
)
def _get_public_key(self):
try:
keypair = self._load_privatekey()
keypair = OpensshKeypair.load(path=self.private_key_path, passphrase=self.passphrase, no_public_key=True)
except OpenSSHError:
# Simulates the null output of ssh-keygen
return ""
return to_text(keypair.public_key)
return PublicKey.from_string(to_text(keypair.public_key))
def _update_comment(self):
keypair = self._load_privatekey()
def _private_key_readable(self):
try:
keypair.comment = self.comment
with open(self.path + ".pub", "w+b") as pubkey_file:
pubkey_file.write(keypair.public_key + b'\n')
except (InvalidCommentError, IOError, OSError) as e:
# Return values while unused currently are made to simulate the output of run_command()
return 1, "Comment could not be updated", to_native(e)
return 0, "Comment updated successfully", ""
def _private_key_loadable(self):
try:
self._load_privatekey()
except OpenSSHError:
return False
return True
def _check_pass_protected_or_broken_key(self):
try:
OpensshKeypair.load(
path=self.path,
passphrase=self.passphrase,
no_public_key=True,
)
OpensshKeypair.load(path=self.private_key_path, passphrase=self.passphrase, no_public_key=True)
except (InvalidPrivateKeyFileError, InvalidPassphraseError):
return True
return False
# Cryptography >= 3.0 uses a SSH key loader which does not raise an exception when a passphrase is provided
# when loading an unencrypted key
if self.passphrase:
try:
OpensshKeypair.load(
path=self.path,
passphrase=None,
no_public_key=True,
)
OpensshKeypair.load(path=self.private_key_path, passphrase=None, no_public_key=True)
except (InvalidPrivateKeyFileError, InvalidPassphraseError):
return False
return True
else:
return False
return True
return False
def _update_comment(self):
keypair = OpensshKeypair.load(path=self.private_key_path, passphrase=self.passphrase, no_public_key=True)
try:
keypair.comment = self.comment
except InvalidCommentError as e:
self.module.fail_json(msg=to_native(e))
def any_in(sequence, *elements):
return any(e in sequence for e in elements)
try:
temp_public_key = self._create_temp_public_key(keypair.public_key + b'\n')
self._safe_secure_move([(temp_public_key, self.public_key_path)])
except (IOError, OSError) as e:
self.module.fail_json(msg=to_native(e))
def select_backend(module, backend):

View File

@ -19,7 +19,9 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import os
import re
from contextlib import contextmanager
from struct import Struct
from ansible.module_utils.six import PY3
@ -54,6 +56,16 @@ _UINT64 = Struct(b'!Q')
_UINT64_MAX = 0xFFFFFFFFFFFFFFFF
def any_in(sequence, *elements):
return any(e in sequence for e in elements)
def file_mode(path):
if not os.path.exists(path):
return 0o000
return os.stat(path).st_mode & 0o777
def parse_openssh_version(version_string):
"""Parse the version output of ssh -V and return version numbers that can be compared"""
@ -68,6 +80,20 @@ def parse_openssh_version(version_string):
return version
@contextmanager
def secure_open(path, mode):
fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode)
try:
yield fd
finally:
os.close(fd)
def secure_write(path, mode, content):
with secure_open(path, mode) as fd:
os.write(fd, content)
# See https://datatracker.ietf.org/doc/html/rfc4251#section-5 for SSH data types
class OpensshParser(object):
"""Parser for OpenSSH encoded objects"""

View File

@ -44,8 +44,8 @@ options:
required: true
regenerate:
description:
- When C(never) the task will fail if a certificate already exists at I(path) and is unreadable.
Otherwise, a new certificate will only be generated if there is no existing certificate.
- When C(never) the task will fail if a certificate already exists at I(path) and is unreadable
otherwise a new certificate will only be generated if there is no existing certificate.
- When C(fail) the task will fail if a certificate already exists at I(path) and does not
match the module's options.
- When C(partial_idempotence) an existing certificate will be regenerated based on
@ -239,12 +239,15 @@ info:
import os
from distutils.version import LooseVersion
from sys import version_info
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_native, to_text
from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.common import safe_atomic_move
from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.common import (
KeygenCommand,
OpensshModule,
PrivateKey,
)
from ansible_collections.community.crypto.plugins.module_utils.openssh.certificate import (
OpensshCertificate,
@ -252,49 +255,64 @@ from ansible_collections.community.crypto.plugins.module_utils.openssh.certifica
parse_option_list,
)
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
parse_openssh_version,
)
PY27 = version_info[0:2] >= (2, 7)
class Certificate(object):
class Certificate(OpensshModule):
def __init__(self, module):
self.check_mode = module.check_mode
self.module = module
self.ssh_keygen = module.get_bin_path('ssh-keygen', True)
super(Certificate, self).__init__(module)
self.ssh_keygen = KeygenCommand(self.module)
self.force = module.params['force']
self.identifier = module.params['identifier'] or ""
self.options = module.params['options'] or []
self.path = module.params['path']
self.pkcs11_provider = module.params['pkcs11_provider']
self.principals = module.params['principals'] or []
self.public_key = module.params['public_key']
self.regenerate = module.params['regenerate'] if not self.force else 'always'
self.serial_number = module.params['serial_number']
self.signing_key = module.params['signing_key']
self.state = module.params['state']
self.type = module.params['type']
self.use_agent = module.params['use_agent']
self.valid_at = module.params['valid_at']
self.identifier = self.module.params['identifier'] or ""
self.options = self.module.params['options'] or []
self.path = self.module.params['path']
self.pkcs11_provider = self.module.params['pkcs11_provider']
self.principals = self.module.params['principals'] or []
self.public_key = self.module.params['public_key']
self.regenerate = self.module.params['regenerate'] if not self.module.params['force'] else 'always'
self.serial_number = self.module.params['serial_number']
self.signing_key = self.module.params['signing_key']
self.state = self.module.params['state']
self.type = self.module.params['type']
self.use_agent = self.module.params['use_agent']
self.valid_at = self.module.params['valid_at']
self.changed = False
self.data = None
self.original_data = None
self.time_parameters = None
self._check_if_base_dir(self.path)
if self.state == 'present':
try:
self.time_parameters = OpensshCertificateTimeParameters(
valid_from=module.params['valid_from'],
valid_to=module.params['valid_to'],
)
except ValueError as e:
self.module.fail_json(msg=to_native(e))
self._validate_parameters()
if self.exists():
self.data = None
self.original_data = None
if self._exists():
self._load_certificate()
self.time_parameters = None
if self.state == 'present':
self._set_time_parameters()
def _validate_parameters(self):
for path in (self.public_key, self.signing_key):
self._check_if_base_dir(path)
if self.options and self.type == "host":
self.module.fail_json(msg="Options can only be used with user certificates.")
if self.use_agent:
self._use_agent_available()
def _use_agent_available(self):
ssh_version = self._get_ssh_version()
if not ssh_version:
self.module.fail_json(msg="Failed to determine ssh version")
elif LooseVersion(ssh_version) < LooseVersion("7.6"):
self.module.fail_json(
msg="Signing with CA key in ssh agent requires ssh 7.6 or newer." +
" Your version is: %s" % ssh_version
)
def _exists(self):
return os.path.exists(self.path)
def _load_certificate(self):
try:
self.original_data = OpensshCertificate.load(self.path)
except (TypeError, ValueError) as e:
@ -302,103 +320,55 @@ class Certificate(object):
self.module.fail_json(msg="Unable to read existing certificate: %s" % to_native(e))
self.module.warn("Unable to read existing certificate: %s" % to_native(e))
self._validate_parameters()
def exists(self):
return os.path.exists(self.path)
def generate(self):
if self._should_generate():
if not self.check_mode:
temp_cert = self._generate_temp_certificate()
def _set_time_parameters(self):
try:
safe_atomic_move(self.module, temp_cert, self.path)
except OSError as e:
self.module.fail_json(msg="Unable to write certificate to %s: %s" % (self.path, to_native(e)))
try:
self.data = OpensshCertificate.load(self.path)
except (TypeError, ValueError) as e:
self.module.fail_json(msg="Unable to read new certificate: %s" % to_native(e))
self.changed = True
if self.exists():
self._update_permissions()
def remove(self):
if self.exists():
if not self.check_mode:
try:
os.remove(self.path)
except OSError as e:
self.module.fail_json(msg="Unable to remove existing certificate: %s" % to_native(e))
self.changed = True
@property
def result(self):
result = {'changed': self.changed}
if self.module._diff:
result['diff'] = {
'before': get_cert_dict(self.original_data),
'after': get_cert_dict(self.data)
}
if self.state == 'present':
result.update({
'type': self.type,
'filename': self.path,
'info': format_cert_info(self._get_cert_info()),
})
return result
def _check_if_base_dir(self, path):
base_dir = os.path.dirname(path) or '.'
if not os.path.isdir(base_dir):
self.module.fail_json(
name=base_dir,
msg='The directory %s does not exist or the file is not a directory' % base_dir
self.time_parameters = OpensshCertificateTimeParameters(
valid_from=self.module.params['valid_from'],
valid_to=self.module.params['valid_to'],
)
def _command_arguments(self, key_copy_path):
result = [
self.ssh_keygen,
'-s', self.signing_key,
'-P', '',
'-I', self.identifier,
]
if self.options:
for option in self.options:
result.extend(['-O', option])
if self.pkcs11_provider:
result.extend(['-D', self.pkcs11_provider])
if self.principals:
result.extend(['-n', ','.join(self.principals)])
if self.serial_number is not None:
result.extend(['-z', str(self.serial_number)])
if self.type == 'host':
result.extend(['-h'])
if self.use_agent:
result.extend(['-U'])
if self.time_parameters.validity_string:
result.extend(['-V', self.time_parameters.validity_string])
result.append(key_copy_path)
return result
def _compare_options(self):
try:
critical_options, extensions = parse_option_list(self.options)
except ValueError as e:
return self.module.fail_json(msg=to_native(e))
self.module.fail_json(msg=to_native(e))
def _execute(self):
if self.state == 'present':
if self._should_generate():
self._generate()
self._update_permissions(self.path)
else:
if self._exists():
self._remove()
def _should_generate(self):
if self.regenerate == 'never':
return self.original_data is None
elif self.regenerate == 'fail':
if self.original_data and not self._is_fully_valid():
self.module.fail_json(
msg="Certificate does not match the provided options.",
cert=get_cert_dict(self.original_data)
)
return self.original_data is None
elif self.regenerate == 'partial_idempotence':
return self.original_data is None or not self._is_partially_valid()
elif self.regenerate == 'full_idempotence':
return self.original_data is None or not self._is_fully_valid()
else:
return True
def _is_fully_valid(self):
return self._is_partially_valid() and all([
self._compare_options(),
self.original_data.key_id == self.identifier,
self.original_data.public_key == self._get_key_fingerprint(self.public_key),
self.original_data.signing_key == self._get_key_fingerprint(self.signing_key),
])
def _is_partially_valid(self):
return all([
set(self.original_data.critical_options) == set(critical_options),
set(self.original_data.extensions) == set(extensions)
set(self.original_data.principals) == set(self.principals),
self.original_data.serial == self.serial_number if self.serial_number is not None else True,
self.original_data.type == self.type,
self._compare_time_parameters(),
])
def _compare_time_parameters(self):
@ -415,6 +385,35 @@ class Certificate(object):
original_time_parameters.within_range(self.valid_at)
])
def _compare_options(self):
try:
critical_options, extensions = parse_option_list(self.options)
except ValueError as e:
return self.module.fail_json(msg=to_native(e))
return all([
set(self.original_data.critical_options) == set(critical_options),
set(self.original_data.extensions) == set(extensions)
])
def _get_key_fingerprint(self, path):
private_key_content = self.ssh_keygen.get_private_key(path, check_rc=True)[1]
return PrivateKey.from_string(private_key_content).fingerprint
@OpensshModule.trigger_change
@OpensshModule.skip_if_check_mode
def _generate(self):
try:
temp_certificate = self._generate_temp_certificate()
self._safe_secure_move([(temp_certificate, self.path)])
except OSError as e:
self.module.fail_json(msg="Unable to write certificate to %s: %s" % (self.path, to_native(e)))
try:
self.data = OpensshCertificate.load(self.path)
except (TypeError, ValueError) as e:
self.module.fail_json(msg="Unable to read new certificate: %s" % to_native(e))
def _generate_temp_certificate(self):
key_copy = os.path.join(self.module.tmpdir, os.path.basename(self.public_key))
@ -424,77 +423,44 @@ class Certificate(object):
self.module.fail_json(msg="Unable to stage temporary key: %s" % to_native(e))
self.module.add_cleanup_file(key_copy)
self.module.run_command(self._command_arguments(key_copy), environ_update=dict(TZ="UTC"), check_rc=True)
self.ssh_keygen.generate_certificate(
key_copy, self.identifier, self.options, self.pkcs11_provider, self.principals, self.serial_number,
self.signing_key, self.type, self.time_parameters, self.use_agent,
environ_update=dict(TZ="UTC"), check_rc=True
)
temp_cert = os.path.splitext(key_copy)[0] + '-cert.pub'
self.module.add_cleanup_file(temp_cert)
return temp_cert
def _get_cert_info(self):
return self.module.run_command([self.ssh_keygen, '-Lf', self.path])[1]
@OpensshModule.trigger_change
@OpensshModule.skip_if_check_mode
def _remove(self):
try:
os.remove(self.path)
except OSError as e:
self.module.fail_json(msg="Unable to remove existing certificate: %s" % to_native(e))
def _get_key_fingerprint(self, path):
stdout = self.module.run_command([self.ssh_keygen, '-lf', path], check_rc=True)[1]
return stdout.split()[1]
@property
def _result(self):
if self.state != 'present':
return {}
def _is_valid(self):
partial_result = all([
set(self.original_data.principals) == set(self.principals),
self.original_data.serial == self.serial_number if self.serial_number is not None else True,
self.original_data.type == self.type,
self._compare_time_parameters(),
])
certificate_info = self.ssh_keygen.get_certificate_info(self.path)[1]
if self.regenerate == 'partial_idempotence':
return partial_result
return {
'type': self.type,
'filename': self.path,
'info': format_cert_info(certificate_info),
}
return partial_result and all([
self._compare_options(),
self.original_data.key_id == self.identifier,
self.original_data.public_key == self._get_key_fingerprint(self.public_key),
self.original_data.signing_key == self._get_key_fingerprint(self.signing_key),
])
def _should_generate(self):
if self.regenerate == 'never':
return self.original_data is None
elif self.regenerate == 'fail':
if self.original_data and not self._is_valid():
self.module.fail_json(
msg="Certificate does not match the provided options.",
cert=get_cert_dict(self.original_data)
)
return self.original_data is None
elif self.regenerate in ('partial_idempotence', 'full_idempotence'):
return self.original_data is None or not self._is_valid()
else:
return True
def _update_permissions(self):
file_args = self.module.load_file_common_arguments(self.module.params)
self.changed = self.module.set_fs_attributes_if_different(file_args, self.changed)
def _validate_parameters(self):
self._check_if_base_dir(self.path)
if self.state == 'present':
for path in (self.public_key, self.signing_key):
self._check_if_base_dir(path)
if self.options and self.type == "host":
self.module.fail_json(msg="Options can only be used with user certificates.")
if self.use_agent:
ssh_version_string = self.module.run_command([self.module.get_bin_path('ssh', True), '-Vq'])[2].strip()
ssh_version = parse_openssh_version(ssh_version_string)
if ssh_version is None:
self.module.fail_json(msg="Failed to parse ssh version from: %s" % ssh_version_string)
elif LooseVersion(ssh_version) < LooseVersion("7.6"):
self.module.fail_json(
msg="Signing with CA key in ssh agent requires ssh 7.6 or newer." +
" Your version is: %s" % ssh_version_string
)
@property
def diff(self):
return {
'before': get_cert_dict(self.original_data),
'after': get_cert_dict(self.data)
}
def format_cert_info(cert_info):
@ -551,14 +517,7 @@ def main():
required_if=[('state', 'present', ['type', 'signing_key', 'public_key', 'valid_from', 'valid_to'])],
)
certificate = Certificate(module)
if certificate.state == 'present':
certificate.generate()
else:
certificate.remove()
module.exit_json(**certificate.result)
Certificate(module).execute()
if __name__ == '__main__':

View File

@ -186,8 +186,6 @@ comment:
sample: test@comment
'''
import os
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.crypto.plugins.module_utils.openssh.backends.keypair_backend import (
@ -218,32 +216,9 @@ def main():
add_file_common_args=True,
)
base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir):
module.fail_json(
name=base_dir,
msg='The directory %s does not exist or the file is not a directory' % base_dir
)
keypair = select_backend(module, module.params['backend'])[1]
if module.params['state'] == 'present':
if module.check_mode:
keypair.changed = any([
keypair.force,
not keypair.is_private_key_valid(),
not keypair.is_public_key_valid()
])
else:
keypair.generate()
else:
# When `state=absent` no details from an existing key at the given `path` are returned in the module result
if module.check_mode:
keypair.changed = keypair.exists()
else:
keypair.remove()
module.exit_json(**keypair.result)
keypair.execute()
if __name__ == '__main__':

View File

@ -52,8 +52,8 @@
that:
- core_output['fingerprint'] is string
- core_output['fingerprint'].startswith('SHA256:')
# only distro old enough that it still gives md5 with no prefix
when: not (ansible_distribution == 'CentOS' and ansible_distribution_major_version == '6')
# SHA256 was made the default hashing algorithm for fingerprints in OpenSSH 6.8
when: not (backend == 'opensshbin' and openssh_version is version('6.8', '<'))
- name: "({{ backend }}) Assert key returns public_key"
assert:

View File

@ -87,13 +87,6 @@
assert:
that:
- write_only_private_key_after.stat.mode == '0200'
when: backend == 'opensshbin'
- name: "({{ backend }}) Assert key permissions are not preserved with 'cryptography'"
assert:
that:
- write_only_private_key_after.stat.mode == '0600'
when: backend == 'cryptography'
- name: "({{ backend }}) Remove key - write-only"
openssh_keypair:

View File

@ -55,7 +55,8 @@
openssh_keypair:
path: "{{ output_dir }}/default_size_ed25519"
state: absent
when: not (ansible_distribution == 'CentOS' and ansible_distribution_major_version == '6')
# Support for ed25519 keys was added in OpenSSH 6.5
when: not (backend == 'opensshbin' and openssh_version is version('6.5', '<'))
- name: "({{ backend }}) Generate key - force"
openssh_keypair:
@ -96,12 +97,18 @@
backend: "{{ backend }}"
register: modified_comment_output
- name: "({{ backend }}) Assert only comment changed - comment"
- name: "({{ backend }}) Assert comment preserved public key - comment"
assert:
that:
- comment_output.public_key == modified_comment_output.public_key
- comment_output.comment == 'test@comment'
- name: "({{ backend }}) Assert comment changed - comment"
assert:
that:
- modified_comment_output.comment == 'test_modified@comment'
# Support for updating comments for key types other than rsa1 was added in OpenSSH 7.2
when: not (backend == 'opensshbin' and openssh_version is version('7.2', '<'))
- name: "({{ backend }}) Remove key - comment"
openssh_keypair:

View File

@ -12,3 +12,12 @@
package:
name: '{{ openssh_client_package_name }}'
when: not ansible_os_family == "Darwin" and not ansible_os_family == "FreeBSD"
- name: Get ssh version
shell: ssh -Vq 2>&1|sed 's/^.*OpenSSH_\([0-9]\{1,\}\.[0-9]\{1,\}\).*$/\1/'
register:
rc_openssh_version_output
- name: Set ssh version facts
set_fact:
openssh_version: "{{ rc_openssh_version_output.stdout.strip() }}"