diff --git a/changelogs/fragments/255-openssh_cert-adding-diff-support.yml b/changelogs/fragments/255-openssh_cert-adding-diff-support.yml new file mode 100644 index 00000000..164f9ddd --- /dev/null +++ b/changelogs/fragments/255-openssh_cert-adding-diff-support.yml @@ -0,0 +1,6 @@ +--- +minor_changes: + - openssh_cert - adding ``diff`` support (https://github.com/ansible-collections/community.crypto/pull/255). +bugfixes: + - openssh_cert - fixed certificate generation to restore original certificate if an error is encountered + (https://github.com/ansible-collections/community.crypto/pull/255). diff --git a/plugins/module_utils/openssh/certificate.py b/plugins/module_utils/openssh/certificate.py index 297eedc7..3e651487 100644 --- a/plugins/module_utils/openssh/certificate.py +++ b/plugins/module_utils/openssh/certificate.py @@ -35,9 +35,11 @@ import abc import binascii import os from base64 import b64encode +from datetime import datetime from hashlib import sha256 from ansible.module_utils import six +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import ( OpensshParser, _OpensshWriter, @@ -69,6 +71,108 @@ _ECDSA_CURVE_IDENTIFIERS_LOOKUP = { b'nistp521': 'ecdsa-nistp521', } +_ALWAYS = datetime(1970, 1, 1) +_FOREVER = datetime.max + +if six.PY3: + long = int + + +class OpensshCertificateTimeParameters(object): + def __init__(self, valid_from, valid_to): + self._valid_from = self.to_datetime(valid_from) + self._valid_to = self.to_datetime(valid_to) + + if self._valid_from > self._valid_to: + raise ValueError("Valid from: %s must not be greater than Valid to: %s" % (valid_from, valid_to)) + + def __eq__(self, other): + if not isinstance(other, type(self)): + return NotImplemented + else: + return self._valid_from == other._valid_from and self._valid_to == other._valid_to + + @property + def validity_string(self): + if not (self._valid_from == _ALWAYS and self._valid_to == _FOREVER): + return "%s:%s" % ( + self.valid_from(date_format='openssh'), self.valid_to(date_format='openssh') + ) + return "" + + def valid_from(self, date_format): + return self.format_datetime(self._valid_from, date_format) + + def valid_to(self, date_format): + return self.format_datetime(self._valid_to, date_format) + + def within_range(self, valid_at): + if valid_at is not None: + valid_at_datetime = self.to_datetime(valid_at) + return self._valid_from <= valid_at_datetime <= self._valid_to + return True + + @staticmethod + def format_datetime(dt, date_format): + if date_format in ('human_readable', 'openssh'): + if dt == _ALWAYS: + result = 'always' + elif dt == _FOREVER: + result = 'forever' + else: + result = dt.isoformat() if date_format == 'human_readable' else dt.strftime("%Y%m%d%H%M%S") + elif date_format == 'timestamp': + td = dt - _ALWAYS + result = int((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6) + else: + raise ValueError("%s is not a valid format" % date_format) + return result + + @staticmethod + def to_datetime(time_string_or_timestamp): + try: + if isinstance(time_string_or_timestamp, str): + result = OpensshCertificateTimeParameters._time_string_to_datetime(time_string_or_timestamp.strip()) + elif isinstance(time_string_or_timestamp, (long, int)): + result = OpensshCertificateTimeParameters._timestamp_to_datetime(time_string_or_timestamp) + else: + raise ValueError("Value must be of type (str, int, long) not %s" % type(time_string_or_timestamp)) + except ValueError: + raise + return result + + @staticmethod + def _timestamp_to_datetime(timestamp): + if timestamp == 0x0: + result = _ALWAYS + elif timestamp == 0xFFFFFFFFFFFFFFFF: + result = _FOREVER + else: + try: + result = datetime.utcfromtimestamp(timestamp) + except OverflowError as e: + raise ValueError + return result + + @staticmethod + def _time_string_to_datetime(time_string): + result = None + if time_string == 'always': + result = _ALWAYS + elif time_string == 'forever': + result = _FOREVER + elif is_relative_time_string(time_string): + result = convert_relative_to_datetime(time_string) + else: + for time_format in ("%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): + try: + result = datetime.strptime(time_string, time_format) + except ValueError: + pass + if result is None: + raise ValueError + return result + @six.add_metaclass(abc.ABCMeta) class OpensshCertificateInfo: @@ -83,7 +187,8 @@ class OpensshCertificateInfo: valid_before=None, critical_options=None, extensions=None, - reserved=None): + reserved=None, + signing_key=None): self.nonce = nonce self.serial = serial self._cert_type = cert_type @@ -94,6 +199,7 @@ class OpensshCertificateInfo: self.critical_options = critical_options self.extensions = extensions self.reserved = reserved + self.signing_key = signing_key self.type_string = None @@ -115,6 +221,9 @@ class OpensshCertificateInfo: else: raise ValueError("%s is not a valid certificate type" % cert_type) + def signing_key_fingerprint(self): + return fingerprint(self.signing_key) + @abc.abstractmethod def public_key_fingerprint(self): pass @@ -241,10 +350,9 @@ class OpensshED25519CertificateInfo(OpensshCertificateInfo): # See https://cvsweb.openbsd.org/src/usr.bin/ssh/PROTOCOL.certkeys?annotate=HEAD class OpensshCertificate(object): """Encapsulates a formatted OpenSSH certificate including signature and signing key""" - def __init__(self, cert_info, signing_key, signature): + def __init__(self, cert_info, signature): - self.cert_info = cert_info - self.signing_key = signing_key + self._cert_info = cert_info self.signature = signature @classmethod @@ -278,7 +386,6 @@ class OpensshCertificate(object): try: cert_info = cls._parse_cert_info(pub_key_type, parser) - signing_key = parser.string() signature = parser.string() except (TypeError, ValueError) as e: raise ValueError("Invalid certificate data: %s" % e) @@ -290,12 +397,60 @@ class OpensshCertificate(object): return cls( cert_info=cert_info, - signing_key=signing_key, signature=signature, ) - def signing_key_fingerprint(self): - return fingerprint(self.signing_key) + @property + def type_string(self): + return self._cert_info.type_string + + @property + def nonce(self): + return self._cert_info.nonce + + @property + def public_key(self): + return self._cert_info.public_key_fingerprint() + + @property + def serial(self): + return self._cert_info.serial + + @property + def type(self): + return self._cert_info.cert_type + + @property + def key_id(self): + return self._cert_info.key_id + + @property + def principals(self): + return self._cert_info.principals + + @property + def valid_after(self): + return self._cert_info.valid_after + + @property + def valid_before(self): + return self._cert_info.valid_before + + @property + def critical_options(self): + return self._cert_info.critical_options + + @property + def extensions(self): + return self._cert_info.extensions + + @property + def reserved(self): + return self._cert_info.reserved + + @property + def signing_key(self): + return self._cert_info.signing_key_fingerprint() @staticmethod def _parse_cert_info(pub_key_type, parser): @@ -311,9 +466,31 @@ class OpensshCertificate(object): cert_info.critical_options = parser.option_list() cert_info.extensions = parser.option_list() cert_info.reserved = parser.string() + cert_info.signing_key = parser.string() return cert_info + def to_dict(self): + time_parameters = OpensshCertificateTimeParameters( + valid_from=self.valid_after, + valid_to=self.valid_before + ) + return { + 'type_string': self.type_string, + 'nonce': self.nonce, + 'serial': self.serial, + 'cert_type': self.type, + 'identifier': self.key_id, + 'principals': self.principals, + 'valid_after': time_parameters.valid_from(date_format='human_readable'), + 'valid_before': time_parameters.valid_to(date_format='human_readable'), + 'critical_options': self.critical_options, + 'extensions': [e[0] for e in self.extensions], + 'reserved': self.reserved, + 'public_key': self.public_key, + 'signing_key': self.signing_key, + } + def fingerprint(public_key): """Generates a SHA256 hash and formats output to resemble ``ssh-keygen``""" @@ -335,3 +512,7 @@ def get_cert_info_object(key_type): raise ValueError("%s is not a valid key type" % key_type) return cert_info + + +def is_relative_time_string(time_string): + return time_string.startswith("+") or time_string.startswith("-") diff --git a/plugins/modules/openssh_cert.py b/plugins/modules/openssh_cert.py index d2699e90..a518b5df 100644 --- a/plugins/modules/openssh_cert.py +++ b/plugins/modules/openssh_cert.py @@ -20,7 +20,8 @@ requirements: options: state: description: - - Whether the host or user certificate should exist or not, taking action if the state is different from what is stated. + - Whether the host or user certificate should exist or not, taking action if the state is different + from what is stated. type: str default: "present" choices: [ 'present', 'absent' ] @@ -215,423 +216,270 @@ info: ''' -import errno import os -import re -import tempfile - -from datetime import datetime -from datetime import MINYEAR, MAXYEAR from distutils.version import LooseVersion -from shutil import copy2, rmtree +from sys import version_info from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils.common.text.converters import to_native +from ansible.module_utils.common.text.converters import to_native, to_text -from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime -from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import parse_openssh_version +from ansible_collections.community.crypto.plugins.module_utils.openssh.certificate import ( + OpensshCertificate, + OpensshCertificateTimeParameters, +) +from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import ( + parse_openssh_version, +) -class CertificateError(Exception): - pass +PY27 = version_info[0:2] >= (2, 7) class Certificate(object): - def __init__(self, module): - self.state = module.params['state'] - self.force = module.params['force'] - self.type = module.params['type'] - self.signing_key = module.params['signing_key'] - self.use_agent = module.params['use_agent'] - self.pkcs11_provider = module.params['pkcs11_provider'] - self.public_key = module.params['public_key'] - self.path = module.params['path'] - self.identifier = module.params['identifier'] - self.serial_number = module.params['serial_number'] - self.valid_from = module.params['valid_from'] - self.valid_to = module.params['valid_to'] - self.valid_at = module.params['valid_at'] - self.principals = module.params['principals'] - self.options = module.params['options'] - self.changed = False self.check_mode = module.check_mode - self.cert_info = {} - - if self.state == 'present': - - if self.options and self.type == "host": - module.fail_json(msg="Options can only be used with user certificates.") - - if self.valid_at: - self.valid_at = self.valid_at.lstrip() - - self.valid_from = self.valid_from.lstrip() - self.valid_to = self.valid_to.lstrip() - + self.module = module self.ssh_keygen = module.get_bin_path('ssh-keygen', True) - def generate(self, module): + self.force = module.params['force'] + self.identifier = module.params['identifier'] or "" + self.options = module.params['options'] + self.path = module.params['path'] + self.pkcs11_provider = module.params['pkcs11_provider'] + self.principals = module.params['principals'] or [] + self.public_key = module.params['public_key'] + self.serial_number = module.params['serial_number'] + self.signing_key = module.params['signing_key'] + self.state = module.params['state'] + self.type = module.params['type'] + self.use_agent = module.params['use_agent'] + self.valid_at = module.params['valid_at'] - if not self.is_valid(module, perms_required=False) or self.force: - args = [ - self.ssh_keygen, - '-s', self.signing_key - ] - - if self.pkcs11_provider: - args.extend(['-D', self.pkcs11_provider]) - - if self.use_agent: - args.extend(['-U']) - - validity = "" - - if not (self.valid_from == "always" and self.valid_to == "forever"): - - if not self.valid_from == "always": - timeobj = self.convert_to_datetime(module, self.valid_from) - validity += ( - str(timeobj.year).zfill(4) + - str(timeobj.month).zfill(2) + - str(timeobj.day).zfill(2) + - str(timeobj.hour).zfill(2) + - str(timeobj.minute).zfill(2) + - str(timeobj.second).zfill(2) - ) - else: - validity += "19700101010101" - - validity += ":" - - if self.valid_to == "forever": - # on ssh-keygen versions that have the year 2038 bug this will cause the datetime to be 2038-01-19T04:14:07 - timeobj = datetime(MAXYEAR, 12, 31) - else: - timeobj = self.convert_to_datetime(module, self.valid_to) - - validity += ( - str(timeobj.year).zfill(4) + - str(timeobj.month).zfill(2) + - str(timeobj.day).zfill(2) + - str(timeobj.hour).zfill(2) + - str(timeobj.minute).zfill(2) + - str(timeobj.second).zfill(2) - ) - - args.extend(["-V", validity]) - - if self.type == 'host': - args.extend(['-h']) - - if self.identifier: - args.extend(['-I', self.identifier]) - else: - args.extend(['-I', ""]) - - if self.serial_number is not None: - args.extend(['-z', str(self.serial_number)]) - - if self.principals: - args.extend(['-n', ','.join(self.principals)]) - - if self.options: - for option in self.options: - args.extend(['-O']) - args.extend([option]) - - args.extend(['-P', '']) - - try: - temp_directory = tempfile.mkdtemp() - copy2(self.public_key, temp_directory) - args.extend([temp_directory + "/" + os.path.basename(self.public_key)]) - module.run_command(args, environ_update=dict(TZ="UTC"), check_rc=True) - copy2(temp_directory + "/" + os.path.splitext(os.path.basename(self.public_key))[0] + "-cert.pub", self.path) - rmtree(temp_directory, ignore_errors=True) - proc = module.run_command([self.ssh_keygen, '-L', '-f', self.path]) - self.cert_info = proc[1].split() - self.changed = True - except Exception as e: - try: - self.remove() - rmtree(temp_directory, ignore_errors=True) - except OSError as exc: - if exc.errno != errno.ENOENT: - raise CertificateError(exc) - else: - pass - module.fail_json(msg="%s" % to_native(e)) - - file_args = module.load_file_common_arguments(module.params) - if module.check_file_absent_if_check_mode(file_args['path']): - self.changed = True - elif module.set_fs_attributes_if_different(file_args, False): - self.changed = True - - def convert_to_datetime(self, module, timestring): - - if self.is_relative(timestring): - result = convert_relative_to_datetime(timestring) - if result is None: - module.fail_json( - msg="'%s' is not a valid time format." % timestring) - else: - return result - else: - formats = ["%Y-%m-%d", - "%Y-%m-%d %H:%M:%S", - "%Y-%m-%dT%H:%M:%S", - ] - for fmt in formats: - try: - return datetime.strptime(timestring, fmt) - except ValueError: - pass - module.fail_json(msg="'%s' is not a valid time format" % timestring) - - def is_relative(self, timestr): - if timestr.startswith("+") or timestr.startswith("-"): - return True - return False - - def is_same_datetime(self, datetime_one, datetime_two): - - # This function is for backwards compatibility only because .total_seconds() is new in python2.7 - def timedelta_total_seconds(time_delta): - return (time_delta.microseconds + 0.0 + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6 - # try to use .total_ seconds() from python2.7 - try: - return (datetime_one - datetime_two).total_seconds() == 0.0 - except AttributeError: - return timedelta_total_seconds(datetime_one - datetime_two) == 0.0 - - def is_valid(self, module, perms_required=True): - - def _check_state(): - return os.path.exists(self.path) - - if _check_state(): - proc = module.run_command([self.ssh_keygen, '-L', '-f', self.path], environ_update=dict(TZ="UTC"), check_rc=False) - if proc[0] != 0: - return False - self.cert_info = proc[1].split() - principals = re.findall("(?<=Principals:)(.*)(?=Critical)", proc[1], re.S)[0].split() - principals = list(map(str.strip, principals)) - if principals == ["(none)"]: - principals = None - cert_type = re.findall("( user | host )", proc[1])[0].strip() - serial_number = re.search(r"Serial: (\d+)", proc[1]).group(1) - validity = re.findall("(from (\\d{4}-\\d{2}-\\d{2}T\\d{2}(:\\d{2}){2}) to (\\d{4}-\\d{2}-\\d{2}T\\d{2}(:\\d{2}){2}))", proc[1]) - if validity: - if validity[0][1]: - cert_valid_from = self.convert_to_datetime(module, validity[0][1]) - if self.is_same_datetime(cert_valid_from, self.convert_to_datetime(module, "1970-01-01 01:01:01")): - cert_valid_from = datetime(MINYEAR, 1, 1) - else: - cert_valid_from = datetime(MINYEAR, 1, 1) - - if validity[0][3]: - cert_valid_to = self.convert_to_datetime(module, validity[0][3]) - if self.is_same_datetime(cert_valid_to, self.convert_to_datetime(module, "2038-01-19 03:14:07")): - cert_valid_to = datetime(MAXYEAR, 12, 31) - else: - cert_valid_to = datetime(MAXYEAR, 12, 31) - else: - cert_valid_from = datetime(MINYEAR, 1, 1) - cert_valid_to = datetime(MAXYEAR, 12, 31) - else: - return False - - def _check_perms(module): - file_args = module.load_file_common_arguments(module.params) - return not module.set_fs_attributes_if_different(file_args, False) - - def _check_serial_number(): - if self.serial_number is None: - return True - return self.serial_number == int(serial_number) - - def _check_type(): - return self.type == cert_type - - def _check_principals(): - if not principals or not self.principals: - return self.principals == principals - return set(self.principals) == set(principals) - - def _check_validity(module): - if self.valid_from == "always": - earliest_time = datetime(MINYEAR, 1, 1) - elif self.is_relative(self.valid_from): - earliest_time = None - else: - earliest_time = self.convert_to_datetime(module, self.valid_from) - - if self.valid_to == "forever": - last_time = datetime(MAXYEAR, 12, 31) - elif self.is_relative(self.valid_to): - last_time = None - else: - last_time = self.convert_to_datetime(module, self.valid_to) - - if earliest_time: - if not self.is_same_datetime(earliest_time, cert_valid_from): - return False - if last_time: - if not self.is_same_datetime(last_time, cert_valid_to): - return False - - if self.valid_at: - if cert_valid_from <= self.convert_to_datetime(module, self.valid_at) <= cert_valid_to: - return True - - if earliest_time and last_time: - return True - - return False - - if perms_required and not _check_perms(module): - return False - - return _check_type() and _check_principals() and _check_validity(module) and _check_serial_number() - - def dump(self): - - """Serialize the object into a dictionary.""" - - def filter_keywords(arr, keywords): - concated = [] - string = "" - for word in arr: - if word in keywords: - concated.append(string) - string = word - else: - string += " " + word - concated.append(string) - # drop the certificate path - concated.pop(0) - return concated - - def format_cert_info(): - return filter_keywords(self.cert_info, [ - "Type:", - "Public", - "Signing", - "Key", - "Serial:", - "Valid:", - "Principals:", - "Critical", - "Extensions:"]) + self.changed = False + self.data = None + self.original_data = None + self.time_parameters = None if self.state == 'present': - result = { - 'changed': self.changed, + try: + self.time_parameters = OpensshCertificateTimeParameters( + valid_from=module.params['valid_from'], + valid_to=module.params['valid_to'], + ) + except ValueError as e: + self.module.fail_json(msg=to_native(e)) + + if self.exists(): + try: + self.original_data = OpensshCertificate.load(self.path) + except (TypeError, ValueError) as e: + self.module.warn("Unable to read existing certificate: %s" % to_native(e)) + + self._validate_parameters() + + def exists(self): + return os.path.exists(self.path) + + def generate(self): + if not self._is_valid() or self.force: + if not self.check_mode: + key_copy = os.path.join(self.module.tmpdir, os.path.basename(self.public_key)) + + try: + self.module.preserved_copy(self.public_key, key_copy) + except OSError as e: + self.module.fail_json(msg="Unable to stage temporary key: %s" % to_native(e)) + self.module.add_cleanup_file(key_copy) + + self.module.run_command(self._command_arguments(key_copy), environ_update=dict(TZ="UTC"), check_rc=True) + + temp_cert = os.path.splitext(key_copy)[0] + '-cert.pub' + self.module.add_cleanup_file(temp_cert) + backup_cert = self.module.backup_local(self.path) if self.exists() else None + + try: + self.module.atomic_move(temp_cert, self.path) + except OSError as e: + if backup_cert is not None: + self.module.atomic_move(backup_cert, self.path) + self.module.fail_json(msg="Unable to write certificate to %s: %s" % (self.path, to_native(e))) + else: + if backup_cert is not None: + self.module.add_cleanup_file(backup_cert) + + try: + self.data = OpensshCertificate.load(self.path) + except (TypeError, ValueError) as e: + self.module.fail_json(msg="Unable to read new certificate: %s" % to_native(e)) + + self.changed = True + + if self.exists(): + self._update_permissions() + + def remove(self): + if self.exists(): + if not self.check_mode: + try: + os.remove(self.path) + except OSError as e: + self.module.fail_json(msg="Unable to remove existing certificate: %s" % to_native(e)) + self.changed = True + + @property + def result(self): + result = {'changed': self.changed} + + if self.module._diff: + result['diff'] = self._generate_diff() + + if self.state == 'present': + result.update({ 'type': self.type, 'filename': self.path, - 'info': format_cert_info(), - } - else: - result = { - 'changed': self.changed, - } + 'info': format_cert_info(self._get_cert_info()), + }) return result - def remove(self): - """Remove the resource from the filesystem.""" + def _check_if_base_dir(self, path): + base_dir = os.path.dirname(path) or '.' + if not os.path.isdir(base_dir): + self.module.fail_json( + name=base_dir, + msg='The directory %s does not exist or the file is not a directory' % base_dir + ) - try: - os.remove(self.path) - self.changed = True - except OSError as exc: - if exc.errno != errno.ENOENT: - raise CertificateError(exc) - else: - pass + def _command_arguments(self, key_copy_path): + result = [ + self.ssh_keygen, + '-s', self.signing_key, + '-P', '', + '-I', self.identifier, + ] + + if self.options: + for option in self.options: + result.extend(['-O', option]) + if self.pkcs11_provider: + result.extend(['-D', self.pkcs11_provider]) + if self.principals: + result.extend(['-n', ','.join(self.principals)]) + if self.serial_number is not None: + result.extend(['-z', str(self.serial_number)]) + if self.type == 'host': + result.extend(['-h']) + if self.use_agent: + result.extend(['-U']) + if self.time_parameters.validity_string: + result.extend(['-V', self.time_parameters.validity_string]) + result.append(key_copy_path) + + return result + + def _generate_diff(self): + before = self.original_data.to_dict() if self.original_data else {} + before.pop('nonce', None) + after = self.data.to_dict() if self.data else {} + after.pop('nonce', None) + + return {'before': before, 'after': after} + + def _get_cert_info(self): + return self.module.run_command([self.ssh_keygen, '-Lf', self.path])[1] + + def _is_valid(self): + if self.original_data: + try: + original_time_parameters = OpensshCertificateTimeParameters( + valid_from=self.original_data.valid_after, + valid_to=self.original_data.valid_before + ) + except ValueError as e: + return self.module.fail_json(msg=to_native(e)) + return all([ + self.original_data.type == self.type, + set(to_text(p) for p in self.original_data.principals) == set(self.principals), + self.original_data.serial == self.serial_number if self.serial_number is not None else True, + original_time_parameters == self.time_parameters, + original_time_parameters.within_range(self.valid_at) + ]) + return False + + def _update_permissions(self): + file_args = self.module.load_file_common_arguments(self.module.params) + self.changed = self.module.set_fs_attributes_if_different(file_args, self.changed) + + def _validate_parameters(self): + self._check_if_base_dir(self.path) + + if self.state == 'present': + for path in (self.public_key, self.signing_key): + self._check_if_base_dir(path) + + if self.options and self.type == "host": + self.module.fail_json(msg="Options can only be used with user certificates.") + + if self.use_agent: + ssh_version_string = self.module.run_command([self.module.get_bin_path('ssh', True), '-Vq'])[2].strip() + ssh_version = parse_openssh_version(ssh_version_string) + if ssh_version is None: + self.module.fail_json(msg="Failed to parse ssh version from: %s" % ssh_version_string) + elif LooseVersion(ssh_version) < LooseVersion("7.6"): + self.module.fail_json( + msg="Signing with CA key in ssh agent requires ssh 7.6 or newer." + + " Your version is: %s" % ssh_version_string + ) + + +def format_cert_info(cert_info): + result = [] + string = "" + + for word in cert_info.split(): + if word in ("Type:", "Public", "Signing", "Key", "Serial:", "Valid:", "Principals:", "Critical", "Extensions:"): + result.append(string) + string = word + else: + string += " " + word + result.append(string) + # Drop the certificate path + result.pop(0) + return result def main(): - module = AnsibleModule( argument_spec=dict( - state=dict(type='str', default='present', choices=['absent', 'present']), force=dict(type='bool', default=False), - type=dict(type='str', choices=['host', 'user']), - signing_key=dict(type='path'), - use_agent=dict(type='bool', default=False), - pkcs11_provider=dict(type='str'), - public_key=dict(type='path'), - path=dict(type='path', required=True), identifier=dict(type='str'), + options=dict(type='list', elements='str'), + path=dict(type='path', required=True), + pkcs11_provider=dict(type='str'), + principals=dict(type='list', elements='str'), + public_key=dict(type='path'), + signing_key=dict(type='path'), serial_number=dict(type='int'), + state=dict(type='str', default='present', choices=['absent', 'present']), + type=dict(type='str', choices=['host', 'user']), + use_agent=dict(type='bool', default=False), + valid_at=dict(type='str'), valid_from=dict(type='str'), valid_to=dict(type='str'), - valid_at=dict(type='str'), - principals=dict(type='list', elements='str'), - options=dict(type='list', elements='str'), ), supports_check_mode=True, add_file_common_args=True, required_if=[('state', 'present', ['type', 'signing_key', 'public_key', 'valid_from', 'valid_to'])], ) - if module.params['use_agent']: - ssh = module.get_bin_path('ssh', True) - proc = module.run_command([ssh, '-Vq']) - ssh_version_string = proc[2].strip() - ssh_version = parse_openssh_version(ssh_version_string) - if ssh_version is None: - module.fail_json(msg="Failed to parse ssh version") - elif LooseVersion(ssh_version) < LooseVersion("7.6"): - module.fail_json( - msg=( - "Signing with CA key in ssh agent requires ssh 7.6 or newer." - " Your version is: %s" - ) % ssh_version_string - ) - - def isBaseDir(path): - base_dir = os.path.dirname(path) or '.' - if not os.path.isdir(base_dir): - module.fail_json( - name=base_dir, - msg='The directory %s does not exist or the file is not a directory' % base_dir - ) - if module.params['state'] == "present": - isBaseDir(module.params['signing_key']) - isBaseDir(module.params['public_key']) - - isBaseDir(module.params['path']) - certificate = Certificate(module) if certificate.state == 'present': - - if module.check_mode: - certificate.changed = module.params['force'] or not certificate.is_valid(module) - else: - try: - certificate.generate(module) - except Exception as exc: - module.fail_json(msg=to_native(exc)) - + certificate.generate() else: + certificate.remove() - if module.check_mode: - certificate.changed = os.path.exists(module.params['path']) - if certificate.changed: - certificate.cert_info = {} - else: - try: - certificate.remove() - except Exception as exc: - module.fail_json(msg=to_native(exc)) - - result = certificate.dump() - module.exit_json(**result) + module.exit_json(**certificate.result) if __name__ == '__main__': diff --git a/tests/integration/targets/openssh_cert/tasks/main.yml b/tests/integration/targets/openssh_cert/tasks/main.yml index 15782613..c1897d6c 100644 --- a/tests/integration/targets/openssh_cert/tasks/main.yml +++ b/tests/integration/targets/openssh_cert/tasks/main.yml @@ -3,465 +3,34 @@ # and should not be used as examples of how to write Ansible roles # #################################################################### -- name: openssh_cert integration tests - when: not (ansible_facts['distribution'] == "CentOS" and ansible_facts['distribution_major_version'] == "6") - block: - - name: Generate keypair - openssh_keypair: - path: '{{ output_dir }}/id_key' - type: rsa - size: 2048 - - name: Generate always valid cert (check mode) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: always - valid_to: forever - check_mode: yes - - name: Generate always valid cert - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: always - valid_to: forever - - name: Generate always valid cert (idempotent) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: always - valid_to: forever - - name: Generate always valid cert (idempotent, check mode) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: always - valid_to: forever - check_mode: yes - - name: Generate restricted validity cert with valid_at (check mode) - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: +0s - valid_to: +32w - valid_at: +2w - check_mode: yes - - name: Generate restricted validity cert with valid_at - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: +0s - valid_to: +32w - valid_at: +2w - - name: Generate restricted validity cert with valid_at (idempotent) - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: +0s - valid_to: +32w - valid_at: +2w - - name: Generate restricted validity cert with valid_at (idempotent, check mode) - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: +0s - valid_to: +32w - valid_at: +2w - check_mode: yes - - name: Generate always valid cert only for example.com and examplehost (check mode) - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: always - valid_to: forever - principals: - - example.com - - examplehost - check_mode: yes - - name: Generate always valid cert only for example.com and examplehost - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: always - valid_to: forever - principals: - - example.com - - examplehost - - name: Generate always valid cert only for example.com and examplehost (idempotent) - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: always - valid_to: forever - principals: - - example.com - - examplehost - - name: Generate always valid cert only for example.com and examplehost (idempotent, check mode) - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: always - valid_to: forever - principals: - - example.com - - examplehost - check_mode: yes - - name: Generate always valid cert only for example.com and examplehost (idempotent, switch) - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: always - valid_to: forever - principals: - - examplehost - - example.com - - name: Generate OpenSSH host Certificate that is valid from 21.1.2001 to 21.1.2019 (check mode) - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: "2001-01-21" - valid_to: "2019-01-21" - check_mode: yes - - name: Generate OpenSSH host Certificate that is valid from 21.1.2001 to 21.1.2019 - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: "2001-01-21" - valid_to: "2019-01-21" - - name: Generate OpenSSH host Certificate that is valid from 21.1.2001 to 21.1.2019 (idempotent) - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: "2001-01-21" - valid_to: "2019-01-21" - - name: Generate OpenSSH host Certificate that is valid from 21.1.2001 to 21.1.2019 (idempotent, check mode) - openssh_cert: - type: host - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - valid_from: "2001-01-21" - valid_to: "2019-01-21" - check_mode: yes - - name: Generate an OpenSSH user Certificate with clear and force-command option (check mode) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - options: - - "clear" - - "force-command=/tmp/bla/foo" - valid_from: "2001-01-21" - valid_to: "2019-01-21" - check_mode: yes - - name: Generate an OpenSSH user Certificate with clear and force-command option - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - options: - - "clear" - - "force-command=/tmp/bla/foo" - valid_from: "2001-01-21" - valid_to: "2019-01-21" - - name: Generate an OpenSSH user Certificate with clear and force-command option (idempotent) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - options: - - "clear" - - "force-command=/tmp/bla/foo" - valid_from: "2001-01-21" - valid_to: "2019-01-21" - - name: Generate an OpenSSH user Certificate with clear and force-command option (idempotent, check mode) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - options: - - "clear" - - "force-command=/tmp/bla/foo" - valid_from: "2001-01-21" - valid_to: "2019-01-21" - check_mode: yes - - name: Generate an OpenSSH user Certificate with clear and force-command option (idempotent, switch) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert' - options: - - "force-command=/tmp/bla/foo" - - "clear" - valid_from: "2001-01-21" - valid_to: "2019-01-21" - - name: Generate cert without serial - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_no_serial' - valid_from: always - valid_to: forever - register: rc_no_serial_number - - name: check default serial - assert: - that: - - "'Serial: 0' in rc_no_serial_number.info" - msg: OpenSSH user certificate contains the default serial number. - - name: Generate cert without serial (idempotent) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_no_serial' - valid_from: always - valid_to: forever - register: rc_no_serial_number_idempotent - - name: check idempotent - assert: - that: - - rc_no_serial_number_idempotent is not changed - msg: OpenSSH certificate generation without serial number is idempotent. - - name: Generate cert with serial 42 - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_serial_42' - valid_from: always - valid_to: forever - serial_number: 42 - register: rc_serial_number - - name: check serial 42 - assert: - that: - - "'Serial: 42' in rc_serial_number.info" - msg: OpenSSH user certificate contains the serial number from the params. - - name: Generate cert with serial 42 (idempotent) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_serial_42' - valid_from: always - valid_to: forever - serial_number: 42 - register: rc_serial_number_idempotent - - name: check idempotent - assert: - that: - - rc_serial_number_idempotent is not changed - msg: OpenSSH certificate generation with serial number is idempotent. - - name: Generate cert with changed serial number - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_serial_42' - valid_from: always - valid_to: forever - serial_number: 1337 - register: rc_serial_number_changed - - name: check changed - assert: - that: - - rc_serial_number_changed is changed - msg: OpenSSH certificate regenerated upon serial number change. - - name: Generate cert with removed serial number - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_serial_42' - valid_from: always - valid_to: forever - serial_number: 0 - register: rc_serial_number_removed - - name: check changed - assert: - that: - - rc_serial_number_removed is changed - msg: OpenSSH certificate regenerated upon serial number removal. - - name: Generate a new cert with serial number - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_serial_ignore' - valid_from: always - valid_to: forever - serial_number: 42 - - name: Generate cert again, omitting the parameter serial_number (idempotent) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_serial_ignore' - valid_from: always - valid_to: forever - register: rc_serial_number_ignored - - name: check idempotent - assert: - that: - - rc_serial_number_ignored is not changed - msg: OpenSSH certificate generation with omitted serial number is idempotent. - - name: Remove certificate (check mode) - openssh_cert: - state: absent - path: '{{ output_dir }}/id_cert' - #type: user - #signing_key: '{{ output_dir }}/id_key' - #public_key: '{{ output_dir }}/id_key.pub' - #valid_from: "2001-01-21" - #valid_to: "2019-01-21" - check_mode: yes - - name: Remove certificate - openssh_cert: - state: absent - path: '{{ output_dir }}/id_cert' - #type: user - #signing_key: '{{ output_dir }}/id_key' - #public_key: '{{ output_dir }}/id_key.pub' - #valid_from: "2001-01-21" - #valid_to: "2019-01-21" - - name: Remove certificate (idempotent) - openssh_cert: - state: absent - path: '{{ output_dir }}/id_cert' - #type: user - #signing_key: '{{ output_dir }}/id_key' - #public_key: '{{ output_dir }}/id_key.pub' - #valid_from: "2001-01-21" - #valid_to: "2019-01-21" - - name: Remove certificate (idempotent, check mode) - openssh_cert: - state: absent - path: '{{ output_dir }}/id_cert' - #type: user - #signing_key: '{{ output_dir }}/id_key' - #public_key: '{{ output_dir }}/id_key.pub' - #valid_from: "2001-01-21" - #valid_to: "2019-01-21" - check_mode: yes - - name: Remove keypair - openssh_keypair: - path: '{{ output_dir }}/id_key' - state: absent +- name: Declare global variables + set_fact: + signing_key: '{{ output_dir }}/id_key' + public_key: '{{ output_dir }}/id_key.pub' + certificate_path: '{{ output_dir }}/id_cert' -- name: openssh_cert integration tests that require ssh-agent +- name: Generate keypair + openssh_keypair: + path: "{{ signing_key }}" + type: rsa + size: 2048 + +- block: + - name: Import idempotency tests + import_tasks: ../tests/idempotency.yml + + - name: Import key_idempotency tests + import_tasks: ../tests/key_idempotency.yml + + - name: Import remove tests + import_tasks: ../tests/remove.yml + when: not (ansible_facts['distribution'] == "CentOS" and ansible_facts['distribution_major_version'] == "6") + +- name: Import ssh-agent tests + import_tasks: ../tests/ssh-agent.yml when: openssh_version is version("7.6",">=") - environment: - SSH_AUTH_SOCK: "{{ openssh_agent_sock }}" - block: - - name: Generate keypair for agent tests - openssh_keypair: - path: '{{ output_dir }}/id_key' - type: rsa - size: 2048 - - name: Generate always valid cert using agent without key in agent (should fail) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_with_agent' - use_agent: yes - valid_from: always - valid_to: forever - register: rc_no_key_in_agent - ignore_errors: yes - - name: Make sure cert creation with agent fails if key not in agent - assert: - that: - - rc_no_key_in_agent is failed - - "'agent contains no identities' in rc_no_key_in_agent.msg or 'not found in agent' in rc_no_key_in_agent.msg" - - name: Add key to agent - command: 'ssh-add {{ output_dir }}/id_key' - - name: Generate always valid cert with agent (check mode) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_with_agent' - use_agent: yes - valid_from: always - valid_to: forever - check_mode: yes - - name: Generate always valid cert with agent - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_with_agent' - use_agent: yes - valid_from: always - valid_to: forever - - name: Generate always valid cert with agent (idempotent) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_with_agent' - use_agent: yes - valid_from: always - valid_to: forever - register: rc_cert_with_agent_idempotent - - name: Check agent idempotency - assert: - that: - - rc_cert_with_agent_idempotent is not changed - msg: OpenSSH certificate generation without serial number is idempotent. - - name: Generate always valid cert with agent (idempotent, check mode) - openssh_cert: - type: user - signing_key: '{{ output_dir }}/id_key' - public_key: '{{ output_dir }}/id_key.pub' - path: '{{ output_dir }}/id_cert_with_agent' - use_agent: yes - valid_from: always - valid_to: forever - check_mode: yes - - name: Remove keypair for agent tests - openssh_keypair: - path: '{{ output_dir }}/id_key' - state: absent - - name: Remove certificate - openssh_cert: - state: absent - path: '{{ output_dir }}/id_cert_with_agent' + +- name: Remove keypair + openssh_keypair: + path: "{{ signing_key }}" + state: absent diff --git a/tests/integration/targets/openssh_cert/tests/idempotency.yml b/tests/integration/targets/openssh_cert/tests/idempotency.yml new file mode 100644 index 00000000..3844a94c --- /dev/null +++ b/tests/integration/targets/openssh_cert/tests/idempotency.yml @@ -0,0 +1,272 @@ +#################################################################### +# WARNING: These are designed specifically for Ansible tests # +# and should not be used as examples of how to write Ansible roles # +#################################################################### + +- set_fact: + test_cases: + - test_name: Generate cert - force option (check_mode) + force: true + type: user + valid_from: always + valid_to: forever + check_mode: true + changed: true + - test_name: Generate cert - force option + force: true + type: user + valid_from: always + valid_to: forever + check_mode: true + changed: true + - test_name: Generate cert - force option (idempotent) + force: true + type: user + valid_from: always + valid_to: forever + check_mode: true + changed: true + - test_name: Generate cert - force option (idemopotent, check mode) + force: true + type: user + valid_from: always + valid_to: forever + check_mode: true + changed: true + - test_name: Generate always valid cert (check mode) + type: user + valid_from: always + valid_to: forever + check_mode: true + changed: true + - test_name: Generate always valid cert + type: user + valid_from: always + valid_to: forever + changed: true + - test_name: Generate always valid cert (idempotent) + type: user + valid_from: always + valid_to: forever + changed: false + - test_name: Generate always valid cert (idempotent, check mode) + type: user + valid_from: always + valid_to: forever + check_mode: true + changed: false + - test_name: Generate restricted validity cert with valid_at (check mode) + type: host + valid_from: +0s + valid_to: +32w + valid_at: +2w + check_mode: true + changed: true + - test_name: Generate restricted validity cert with valid_at + type: host + valid_from: +0s + valid_to: +32w + valid_at: +2w + changed: true + # Relative date time is based on current time so re-generation will occur in this case + - test_name: Generate restricted validity cert with valid_at (idempotent) + type: host + valid_from: +0s + valid_to: +32w + valid_at: +2w + changed: true + # Relative date time is based on current time so re-generation will occur in this case + - test_name: Generate restricted validity cert with valid_at (idempotent, check mode) + type: host + valid_from: +0s + valid_to: +32w + valid_at: +2w + check_mode: true + changed: true + - test_name: Generate always valid cert only for example.com and examplehost (check mode) + type: host + valid_from: always + valid_to: forever + principals: &principals + - example.com + - examplehost + check_mode: true + changed: true + - test_name: Generate always valid cert only for example.com and examplehost + type: host + valid_from: always + valid_to: forever + principals: *principals + changed: true + - test_name: Generate always valid cert only for example.com and examplehost (idempotent) + type: host + valid_from: always + valid_to: forever + principals: *principals + changed: false + - test_name: Generate always valid cert only for example.com and examplehost (idempotent, check mode) + type: host + valid_from: always + valid_to: forever + principals: *principals + check_mode: true + changed: false + - test_name: Generate always valid cert only for example.com and examplehost (idempotent, switch) + type: host + valid_from: always + valid_to: forever + principals: + - examplehost + - example.com + changed: false + - test_name: Generate OpenSSH host Certificate that is valid from 21.1.2001 to 21.1.2019 (check mode) + type: host + valid_from: "2001-01-21" + valid_to: "2019-01-21" + check_mode: true + changed: true + - test_name: Generate OpenSSH host Certificate that is valid from 21.1.2001 to 21.1.2019 + type: host + valid_from: "2001-01-21" + valid_to: "2019-01-21" + changed: true + - test_name: Generate OpenSSH host Certificate that is valid from 21.1.2001 to 21.1.2019 (idempotent) + type: host + valid_from: "2001-01-21" + valid_to: "2019-01-21" + changed: false + - test_name: Generate OpenSSH host Certificate that is valid from 21.1.2001 to 21.1.2019 (idempotent, check mode) + type: host + valid_from: "2001-01-21" + valid_to: "2019-01-21" + check_mode: true + changed: false + - test_name: Generate an OpenSSH user Certificate with clear and force-command option (check mode) + type: user + options: &options + - "clear" + - "force-command=/tmp/bla/foo" + valid_from: "2001-01-21" + valid_to: "2019-01-21" + check_mode: true + changed: true + - test_name: Generate an OpenSSH user Certificate with clear and force-command option + type: user + options: *options + valid_from: "2001-01-21" + valid_to: "2019-01-21" + changed: true + - test_name: Generate an OpenSSH user Certificate with clear and force-command option (idempotent) + type: user + options: *options + valid_from: "2001-01-21" + valid_to: "2019-01-21" + changed: false + - test_name: Generate an OpenSSH user Certificate with clear and force-command option (idempotent, check mode) + type: user + options: *options + valid_from: "2001-01-21" + valid_to: "2019-01-21" + check_mode: true + changed: false + - test_name: Generate an OpenSSH user Certificate with clear and force-command option (idempotent, switch) + type: user + options: + - "force-command=/tmp/bla/foo" + - "clear" + valid_from: "2001-01-21" + valid_to: "2019-01-21" + changed: false + # Options are currently not checked for idempotency purposes + - test_name: Generate an OpenSSH user Certificate with no options (idempotent) + type: user + valid_from: "2001-01-21" + valid_to: "2019-01-21" + changed: false + - test_name: Generate cert without serial + type: user + valid_from: always + valid_to: forever + changed: true + - test_name: Generate cert without serial (idempotent) + type: user + valid_from: always + valid_to: forever + changed: false + - test_name: Generate cert with serial 42 + type: user + valid_from: always + valid_to: forever + serial_number: 42 + changed: true + - test_name: Generate cert with serial 42 (idempotent) + type: user + valid_from: always + valid_to: forever + serial_number: 42 + changed: false + - test_name: Generate cert with changed serial number + type: user + valid_from: always + valid_to: forever + serial_number: 1337 + changed: true + - test_name: Generate cert with removed serial number + type: user + valid_from: always + valid_to: forever + serial_number: 0 + changed: true + - test_name: Generate a new cert with serial number + type: user + valid_from: always + valid_to: forever + serial_number: 42 + changed: true + - test_name: Generate cert again, omitting the parameter serial_number (idempotent) + type: user + valid_from: always + valid_to: forever + changed: false + # Identifiers are not included in idempotency checks so a new cert will not be generated + - test_name: Generate cert with identifier + type: user + identifier: foo + valid_from: always + valid_to: forever + changed: false + +- name: Execute idempotency tests + openssh_cert: + force: "{{ test_case.force | default(omit) }}" + identifier: "{{ test_case.identifier | default(omit) }}" + options: "{{ test_case.options | default(omit) }}" + path: "{{ certificate_path }}" + public_key: "{{ public_key }}" + principals: "{{ test_case.principals | default(omit) }}" + serial_number: "{{ test_case.serial_number | default(omit) }}" + signing_key: "{{ signing_key }}" + state: "{{ test_case.state | default(omit) }}" + type: "{{ test_case.type | default(omit) }}" + valid_at: "{{ test_case.valid_at | default(omit) }}" + valid_from: "{{ test_case.valid_from | default(omit) }}" + valid_to: "{{ test_case.valid_to | default(omit) }}" + check_mode: "{{ test_case.check_mode | default(omit) }}" + register: idempotency_test_output + loop: "{{ test_cases }}" + loop_control: + loop_var: test_case + +- name: Assert task statuses + assert: + that: + - result.changed == test_cases[index].changed + loop: "{{ idempotency_test_output.results }}" + loop_control: + index_var: index + loop_var: result + +- name: Remove certificate + openssh_cert: + path: "{{ certificate_path }}" + state: absent \ No newline at end of file diff --git a/tests/integration/targets/openssh_cert/tests/key_idempotency.yml b/tests/integration/targets/openssh_cert/tests/key_idempotency.yml new file mode 100644 index 00000000..ecaeae7e --- /dev/null +++ b/tests/integration/targets/openssh_cert/tests/key_idempotency.yml @@ -0,0 +1,58 @@ +#################################################################### +# WARNING: These are designed specifically for Ansible tests # +# and should not be used as examples of how to write Ansible roles # +#################################################################### + +- set_fact: + new_signing_key: "{{ output_dir }}/new_key" + new_public_key: "{{ output_dir }}/new_key.pub" + +- name: Generate new test key + openssh_keypair: + path: "{{ new_signing_key }}" + +- name: Generate cert with original keys + openssh_cert: + type: user + path: "{{ certificate_path }}" + public_key: "{{ public_key }}" + signing_key: "{{ signing_key }}" + valid_from: always + valid_to: forever + +- name: Generate cert with new signing key + openssh_cert: + type: user + path: "{{ certificate_path }}" + public_key: "{{ public_key }}" + signing_key: "{{ new_signing_key }}" + valid_from: always + valid_to: forever + register: new_signing_key_output + +- name: Generate cert with new public key + openssh_cert: + type: user + path: "{{ certificate_path }}" + public_key: "{{ new_public_key }}" + signing_key: "{{ signing_key }}" + valid_from: always + valid_to: forever + register: new_public_key_output + +# Signing key and public key are not considered during idempotency checks +- name: Assert changes to public key or signing key results in no change + assert: + that: + - new_signing_key_output is not changed + - new_public_key_output is not changed + +- name: Remove certificate + openssh_cert: + path: "{{ certificate_path }}" + state: absent + +- name: Remove new keypair + openssh_keypair: + path: "{{ new_signing_key }}" + state: absent diff --git a/tests/integration/targets/openssh_cert/tests/remove.yml b/tests/integration/targets/openssh_cert/tests/remove.yml new file mode 100644 index 00000000..64b09ed0 --- /dev/null +++ b/tests/integration/targets/openssh_cert/tests/remove.yml @@ -0,0 +1,61 @@ +#################################################################### +# WARNING: These are designed specifically for Ansible tests # +# and should not be used as examples of how to write Ansible roles # +#################################################################### + +- set_fact: + test_cases: + - test_name: Generate certificate + type: user + signing_key: "{{ signing_key }}" + public_key: "{{ public_key }}" + path: "{{ certificate_path }}" + valid_from: always + valid_to: forever + changed: true + - test_name: Remove certificate (check mode) + state: absent + path: "{{ certificate_path }}" + check_mode: true + changed: true + - test_name: Remove certificate + state: absent + path: "{{ certificate_path }}" + changed: true + - test_name: Remove certificate (idempotent) + state: absent + path: "{{ certificate_path }}" + changed: false + - test_name: Remove certificate (idempotent, check mode) + state: absent + path: "{{ certificate_path }}" + check_mode: true + changed: false + +- name: Execute remove tests + openssh_cert: + options: "{{ test_case.options | default(omit) }}" + path: "{{ test_case.path | default(omit) }}" + public_key: "{{ test_case.public_key | default(omit) }}" + principals: "{{ test_case.principals | default(omit) }}" + serial_number: "{{ test_case.serial_number | default(omit) }}" + signing_key: "{{ test_case.signing_key | default(omit) }}" + state: "{{ test_case.state | default(omit) }}" + type: "{{ test_case.type | default(omit) }}" + valid_at: "{{ test_case.valid_at | default(omit) }}" + valid_from: "{{ test_case.valid_from | default(omit) }}" + valid_to: "{{ test_case.valid_to | default(omit) }}" + check_mode: "{{ test_case.check_mode | default(omit) }}" + register: remove_test_output + loop: "{{ test_cases }}" + loop_control: + loop_var: test_case + +- name: Assert task statuses + assert: + that: + - result.changed == test_cases[index].changed + loop: "{{ remove_test_output.results }}" + loop_control: + index_var: index + loop_var: result diff --git a/tests/integration/targets/openssh_cert/tests/ssh-agent.yml b/tests/integration/targets/openssh_cert/tests/ssh-agent.yml new file mode 100644 index 00000000..8dbbbc63 --- /dev/null +++ b/tests/integration/targets/openssh_cert/tests/ssh-agent.yml @@ -0,0 +1,83 @@ +#################################################################### +# WARNING: These are designed specifically for Ansible tests # +# and should not be used as examples of how to write Ansible roles # +#################################################################### + +- name: SSH-agent test block + environment: + SSH_AUTH_SOCK: "{{ openssh_agent_sock }}" + block: + - name: Generate always valid cert using agent without key in agent (should fail) + openssh_cert: + type: user + signing_key: "{{ signing_key }}" + public_key: "{{ public_key }}" + path: '{{ output_dir }}/id_cert_with_agent' + use_agent: true + valid_from: always + valid_to: forever + register: rc_no_key_in_agent + ignore_errors: true + + - name: Make sure cert creation with agent fails if key not in agent + assert: + that: + - rc_no_key_in_agent is failed + - "'agent contains no identities' in rc_no_key_in_agent.msg or 'not found in agent' in rc_no_key_in_agent.msg" + + - name: Add key to agent + command: 'ssh-add {{ signing_key }}' + + - name: Generate always valid cert with agent (check mode) + openssh_cert: + type: user + signing_key: "{{ signing_key }}" + public_key: "{{ public_key }}" + path: '{{ output_dir }}/id_cert_with_agent' + use_agent: true + valid_from: always + valid_to: forever + check_mode: true + + - name: Generate always valid cert with agent + openssh_cert: + type: user + signing_key: "{{ signing_key }}" + public_key: "{{ public_key }}" + path: '{{ output_dir }}/id_cert_with_agent' + use_agent: true + valid_from: always + valid_to: forever + + - name: Generate always valid cert with agent (idempotent) + openssh_cert: + type: user + signing_key: "{{ signing_key }}" + public_key: "{{ public_key }}" + path: '{{ output_dir }}/id_cert_with_agent' + use_agent: true + valid_from: always + valid_to: forever + register: rc_cert_with_agent_idempotent + + - name: Check agent idempotency + assert: + that: + - rc_cert_with_agent_idempotent is not changed + msg: OpenSSH certificate generation without serial number is idempotent. + + - name: Generate always valid cert with agent (idempotent, check mode) + openssh_cert: + type: user + signing_key: "{{ signing_key }}" + public_key: "{{ public_key }}" + path: '{{ output_dir }}/id_cert_with_agent' + use_agent: true + valid_from: always + valid_to: forever + check_mode: true + + - name: Remove certificate + openssh_cert: + state: absent + path: '{{ output_dir }}/id_cert_with_agent' diff --git a/tests/unit/plugins/module_utils/openssh/test_certificate.py b/tests/unit/plugins/module_utils/openssh/test_certificate.py index 6a215d53..c9e2f434 100644 --- a/tests/unit/plugins/module_utils/openssh/test_certificate.py +++ b/tests/unit/plugins/module_utils/openssh/test_certificate.py @@ -5,8 +5,11 @@ from __future__ import absolute_import, division, print_function __metaclass__ = type +import pytest + from ansible_collections.community.crypto.plugins.module_utils.openssh.certificate import ( - OpensshCertificate + OpensshCertificate, + OpensshCertificateTimeParameters ) # Type: ssh-rsa-cert-v01@openssh.com user certificate @@ -126,17 +129,65 @@ VALID_EXTENSIONS = [ ] INVALID_EXTENSIONS = [(b'test', b'')] +VALID_TIME_PARAMETERS = [ + (0, "always", "always", 0, + 0xFFFFFFFFFFFFFFFF, "forever", "forever", 253402300800, + ""), + ("always", "always", "always", 0, + "forever", "forever", "forever", 253402300800, + ""), + (315532800, "1980-01-01T00:00:00", "19800101000000", 315532800, + 631152000, "1990-01-01T00:00:00", "19900101000000", 631152000, + "19800101000000:19900101000000"), + ("1980-01-01", "1980-01-01T00:00:00", "19800101000000", 315532800, + "1990-01-01", "1990-01-01T00:00:00", "19900101000000", 631152000, + "19800101000000:19900101000000"), + ("1980-01-01 00:00:00", "1980-01-01T00:00:00", "19800101000000", 315532800, + "1990-01-01 00:00:00", "1990-01-01T00:00:00", "19900101000000", 631152000, + "19800101000000:19900101000000"), + ("1980-01-01T00:00:00", "1980-01-01T00:00:00", "19800101000000", 315532800, + "1990-01-01T00:00:00", "1990-01-01T00:00:00", "19900101000000", 631152000, + "19800101000000:19900101000000"), + ("always", "always", "always", 0, + "1990-01-01T00:00:00", "1990-01-01T00:00:00", "19900101000000", 631152000, + "always:19900101000000"), + ("1980-01-01", "1980-01-01T00:00:00", "19800101000000", 315532800, + "forever", "forever", "forever", 253402300800, + "19800101000000:forever"), +] + +INVALID_TIME_PARAMETERS = [ + (-1, 0xFFFFFFFFFFFFFFFFFF), + ("never", "ever"), + ("01-01-1980", "01-01-1990"), + (1, 0), +] + +VALID_VALIDITY_TEST = [ + ("always", "forever", "2000-01-01"), + ("1999-12-31", "2000-01-02", "2000-01-01"), + ("1999-12-31 23:59:00", "2000-01-01 00:01:00", "2000-01-01 00:00:00"), + ("1999-12-31 23:59:59", "2000-01-01 00:00:01", "2000-01-01 00:00:00"), +] + +INVALID_VALIDITY_TEST = [ + ("always", "forever", "1969-12-31"), + ("always", "2000-01-01", "2000-01-02"), + ("2000-01-01", "forever", "1999-12-31"), + ("2000-01-01 00:00:00", "2000-01-01 00:00:01", "2000-01-01 00:00:02"), +] + def test_rsa_certificate(tmpdir): cert_file = tmpdir / 'id_rsa-cert.pub' cert_file.write(RSA_CERT_SIGNED_BY_DSA, mode='wb') cert = OpensshCertificate.load(str(cert_file)) - assert cert.cert_info.key_id == b'test' - assert cert.cert_info.serial == 0 - assert cert.cert_info.type_string == b'ssh-rsa-cert-v01@openssh.com' - assert cert.cert_info.public_key_fingerprint() == RSA_FINGERPRINT - assert cert.signing_key_fingerprint() == DSA_FINGERPRINT + assert cert.key_id == b'test' + assert cert.serial == 0 + assert cert.type_string == b'ssh-rsa-cert-v01@openssh.com' + assert cert.public_key == RSA_FINGERPRINT + assert cert.signing_key == DSA_FINGERPRINT def test_dsa_certificate(tmpdir): @@ -145,11 +196,11 @@ def test_dsa_certificate(tmpdir): cert = OpensshCertificate.load(str(cert_file)) - assert cert.cert_info.type_string == b'ssh-dss-cert-v01@openssh.com' - assert cert.cert_info.public_key_fingerprint() == DSA_FINGERPRINT - assert cert.signing_key_fingerprint() == ECDSA_FINGERPRINT - assert cert.cert_info.critical_options == [] - assert cert.cert_info.extensions == [] + assert cert.type_string == b'ssh-dss-cert-v01@openssh.com' + assert cert.public_key == DSA_FINGERPRINT + assert cert.signing_key == ECDSA_FINGERPRINT + assert cert.critical_options == [] + assert cert.extensions == [] def test_ecdsa_certificate(tmpdir): @@ -157,11 +208,11 @@ def test_ecdsa_certificate(tmpdir): cert_file.write(ECDSA_CERT_SIGNED_BY_ED25519_VALID_OPTS) cert = OpensshCertificate.load(str(cert_file)) - assert cert.cert_info.type_string == b'ecdsa-sha2-nistp256-cert-v01@openssh.com' - assert cert.cert_info.public_key_fingerprint() == ECDSA_FINGERPRINT - assert cert.signing_key_fingerprint() == ED25519_FINGERPRINT - assert cert.cert_info.critical_options == VALID_OPTS - assert cert.cert_info.extensions == VALID_EXTENSIONS + assert cert.type_string == b'ecdsa-sha2-nistp256-cert-v01@openssh.com' + assert cert.public_key == ECDSA_FINGERPRINT + assert cert.signing_key == ED25519_FINGERPRINT + assert cert.critical_options == VALID_OPTS + assert cert.extensions == VALID_EXTENSIONS def test_ed25519_certificate(tmpdir): @@ -169,11 +220,11 @@ def test_ed25519_certificate(tmpdir): cert_file.write(ED25519_CERT_SIGNED_BY_RSA_INVALID_OPTS) cert = OpensshCertificate.load(str(cert_file)) - assert cert.cert_info.type_string == b'ssh-ed25519-cert-v01@openssh.com' - assert cert.cert_info.public_key_fingerprint() == ED25519_FINGERPRINT - assert cert.signing_key_fingerprint() == RSA_FINGERPRINT - assert cert.cert_info.critical_options == INVALID_OPTS - assert cert.cert_info.extensions == INVALID_EXTENSIONS + assert cert.type_string == b'ssh-ed25519-cert-v01@openssh.com' + assert cert.public_key == ED25519_FINGERPRINT + assert cert.signing_key == RSA_FINGERPRINT + assert cert.critical_options == INVALID_OPTS + assert cert.extensions == INVALID_EXTENSIONS def test_invalid_data(tmpdir): @@ -186,3 +237,41 @@ def test_invalid_data(tmpdir): except ValueError: result = True assert result + + +@pytest.mark.parametrize( + "valid_from,valid_from_hr,valid_from_openssh,valid_from_timestamp," + + "valid_to,valid_to_hr,valid_to_openssh,valid_to_timestamp," + + "validity_string", + VALID_TIME_PARAMETERS +) +def test_valid_time_parameters(valid_from, valid_from_hr, valid_from_openssh, valid_from_timestamp, + valid_to, valid_to_hr, valid_to_openssh, valid_to_timestamp, + validity_string): + time_parameters = OpensshCertificateTimeParameters( + valid_from=valid_from, + valid_to=valid_to + ) + assert time_parameters.valid_from(date_format="human_readable") == valid_from_hr + assert time_parameters.valid_from(date_format="openssh") == valid_from_openssh + assert time_parameters.valid_from(date_format="timestamp") == valid_from_timestamp + assert time_parameters.valid_to(date_format="human_readable") == valid_to_hr + assert time_parameters.valid_to(date_format="openssh") == valid_to_openssh + assert time_parameters.valid_to(date_format="timestamp") == valid_to_timestamp + assert time_parameters.validity_string == validity_string + + +@pytest.mark.parametrize("valid_from,valid_to", INVALID_TIME_PARAMETERS) +def test_invalid_time_parameters(valid_from, valid_to): + with pytest.raises(ValueError): + OpensshCertificateTimeParameters(valid_from, valid_to) + + +@pytest.mark.parametrize("valid_from,valid_to,valid_at", VALID_VALIDITY_TEST) +def test_valid_validity_test(valid_from, valid_to, valid_at): + assert OpensshCertificateTimeParameters(valid_from, valid_to).within_range(valid_at) + + +@pytest.mark.parametrize("valid_from,valid_to,valid_at", INVALID_VALIDITY_TEST) +def test_invalid_validity_test(valid_from, valid_to, valid_at): + assert not OpensshCertificateTimeParameters(valid_from, valid_to).within_range(valid_at)