From 3c21079afad07098b9d0d863c5d395574211642b Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Wed, 28 Oct 2020 21:52:54 +0100 Subject: [PATCH] Refactor openssl_privatekey module, move add openssl_privatekey_pipe module (#119) * Move disk-independent parts of openssl_privatekey to module_utils and doc_fragments. * Improve documentation. * Add openssl_privatekey_pipe module. * Fallback in case no fingerprints are returned. * Prevent no_log=True for content to stop module from working correctly. * Forgot version_added. * Update copyright. All the interesting code is no longer in this file anyway. * Remove file arguments. * Add framework for action modules. * Convert openssl_privatekey_pipe to action plugin. * Linting. * Bump version. * Add return_current_key option. * Add no_log to examples. * Remove preparation for potential later extensibility (easy to re-add when needed). * Fix deprecation version in docs. * Use new ArgumentSpec object for AnsibleActionModule as well. --- plugins/action/openssl_privatekey_pipe.py | 107 +++ plugins/doc_fragments/module_privatekey.py | 159 ++++ plugins/module_utils/action_module.py | 734 ++++++++++++++++ .../crypto/module_backends/common.py | 8 +- .../crypto/module_backends/privatekey.py | 590 +++++++++++++ plugins/module_utils/crypto/support.py | 16 +- plugins/modules/openssl_privatekey.py | 793 ++---------------- plugins/modules/openssl_privatekey_info.py | 1 + plugins/modules/openssl_privatekey_pipe.py | 112 +++ .../targets/openssl_privatekey_pipe/aliases | 2 + .../openssl_privatekey_pipe/meta/main.yml | 2 + .../openssl_privatekey_pipe/tasks/impl.yml | 103 +++ .../openssl_privatekey_pipe/tasks/main.yml | 35 + tests/sanity/ignore-2.10.txt | 8 + tests/sanity/ignore-2.11.txt | 8 + tests/sanity/ignore-2.9.txt | 7 + 16 files changed, 1945 insertions(+), 740 deletions(-) create mode 100644 plugins/action/openssl_privatekey_pipe.py create mode 100644 plugins/doc_fragments/module_privatekey.py create mode 100644 plugins/module_utils/action_module.py create mode 100644 plugins/module_utils/crypto/module_backends/privatekey.py create mode 100644 plugins/modules/openssl_privatekey_pipe.py create mode 100644 tests/integration/targets/openssl_privatekey_pipe/aliases create mode 100644 tests/integration/targets/openssl_privatekey_pipe/meta/main.yml create mode 100644 tests/integration/targets/openssl_privatekey_pipe/tasks/impl.yml create mode 100644 tests/integration/targets/openssl_privatekey_pipe/tasks/main.yml diff --git a/plugins/action/openssl_privatekey_pipe.py b/plugins/action/openssl_privatekey_pipe.py new file mode 100644 index 00000000..002adc95 --- /dev/null +++ b/plugins/action/openssl_privatekey_pipe.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2020, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import base64 + +from ansible.module_utils._text import to_native, to_bytes + +from ansible_collections.community.crypto.plugins.module_utils.action_module import ActionModuleBase + +from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( + OpenSSLObjectError, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.privatekey import ( + select_backend, + get_privatekey_argument_spec, +) + + +class PrivateKeyModule(object): + def __init__(self, module, module_backend): + self.module = module + self.module_backend = module_backend + self.check_mode = module.check_mode + self.changed = False + self.return_current_key = module.params['return_current_key'] + + if module.params['content'] is not None: + if module.params['content_base64']: + try: + data = base64.b64decode(module.params['content']) + except Exception as e: + module.fail_json(msg='Cannot decode Base64 encoded data: {0}'.format(e)) + else: + data = to_bytes(module.params['content']) + module_backend.set_existing(data) + + def generate(self, module): + """Generate a keypair.""" + + if self.module_backend.needs_regeneration(): + # Regenerate + if not self.check_mode: + self.module_backend.generate_private_key() + privatekey_data = self.module_backend.get_private_key_data() + self.privatekey_bytes = privatekey_data + self.changed = True + elif self.module_backend.needs_conversion(): + # Convert + if not self.check_mode: + self.module_backend.convert_private_key() + privatekey_data = self.module_backend.get_private_key_data() + self.privatekey_bytes = privatekey_data + self.changed = True + + def dump(self): + """Serialize the object into a dictionary.""" + result = self.module_backend.dump(include_key=self.changed or self.return_current_key) + result['changed'] = self.changed + return result + + +class ActionModule(ActionModuleBase): + @staticmethod + def setup_module(): + argument_spec = get_privatekey_argument_spec() + argument_spec.argument_spec.update(dict( + content=dict(type='str', no_log=True), + content_base64=dict(type='bool', default=False), + return_current_key=dict(type='bool', default=False), + )) + return argument_spec, dict( + supports_check_mode=True, + ) + + @staticmethod + def run_module(module): + backend, module_backend = select_backend( + module=module, + backend=module.params['select_crypto_backend'], + ) + + try: + private_key = PrivateKeyModule(module, module_backend) + private_key.generate(module) + result = private_key.dump() + if private_key.return_current_key: + # In case the module's input (`content`) is returned as `privatekey`: + # Since `content` is no_log=True, `privatekey`'s value will get replaced by + # VALUE_SPECIFIED_IN_NO_LOG_PARAMETER. To avoid this, we remove the value of + # `content` from module.no_log_values. Since we explicitly set + # `module.no_log = True`, this should be safe. + module.no_log = True + try: + module.no_log_values.remove(module.params['content']) + except KeyError: + pass + module.params['content'] = 'ANSIBLE_NO_LOG_VALUE' + module.exit_json(**result) + except OpenSSLObjectError as exc: + module.fail_json(msg=to_native(exc)) diff --git a/plugins/doc_fragments/module_privatekey.py b/plugins/doc_fragments/module_privatekey.py new file mode 100644 index 00000000..4679653f --- /dev/null +++ b/plugins/doc_fragments/module_privatekey.py @@ -0,0 +1,159 @@ +# -*- coding: utf-8 -*- + +# Copyright: (c) 2016, Yanis Guenane +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +class ModuleDocFragment(object): + + # Standard files documentation fragment + DOCUMENTATION = r''' +description: + - One can generate L(RSA,https://en.wikipedia.org/wiki/RSA_%28cryptosystem%29), + L(DSA,https://en.wikipedia.org/wiki/Digital_Signature_Algorithm), + L(ECC,https://en.wikipedia.org/wiki/Elliptic-curve_cryptography) or + L(EdDSA,https://en.wikipedia.org/wiki/EdDSA) private keys. + - Keys are generated in PEM format. + - "Please note that the module regenerates private keys if they don't match + the module's options. In particular, if you provide another passphrase + (or specify none), change the keysize, etc., the private key will be + regenerated. If you are concerned that this could **overwrite your private key**, + consider using the I(backup) option." + - "The module can use the cryptography Python library, or the pyOpenSSL Python + library. By default, it tries to detect which one is available. This can be + overridden with the I(select_crypto_backend) option. Please note that the + PyOpenSSL backend was deprecated in Ansible 2.9 and will be removed in community.crypto 2.0.0." +requirements: + - Either cryptography >= 1.2.3 (older versions might work as well) + - Or pyOpenSSL +options: + size: + description: + - Size (in bits) of the TLS/SSL key to generate. + type: int + default: 4096 + type: + description: + - The algorithm used to generate the TLS/SSL private key. + - Note that C(ECC), C(X25519), C(X448), C(Ed25519) and C(Ed448) require the C(cryptography) backend. + C(X25519) needs cryptography 2.5 or newer, while C(X448), C(Ed25519) and C(Ed448) require + cryptography 2.6 or newer. For C(ECC), the minimal cryptography version required depends on the + I(curve) option. + type: str + default: RSA + choices: [ DSA, ECC, Ed25519, Ed448, RSA, X25519, X448 ] + curve: + description: + - Note that not all curves are supported by all versions of C(cryptography). + - For maximal interoperability, C(secp384r1) or C(secp256r1) should be used. + - We use the curve names as defined in the + L(IANA registry for TLS,https://www.iana.org/assignments/tls-parameters/tls-parameters.xhtml#tls-parameters-8). + type: str + choices: + - secp384r1 + - secp521r1 + - secp224r1 + - secp192r1 + - secp256r1 + - secp256k1 + - brainpoolP256r1 + - brainpoolP384r1 + - brainpoolP512r1 + - sect571k1 + - sect409k1 + - sect283k1 + - sect233k1 + - sect163k1 + - sect571r1 + - sect409r1 + - sect283r1 + - sect233r1 + - sect163r2 + passphrase: + description: + - The passphrase for the private key. + type: str + cipher: + description: + - The cipher to encrypt the private key. (Valid values can be found by + running `openssl list -cipher-algorithms` or `openssl list-cipher-algorithms`, + depending on your OpenSSL version.) + - When using the C(cryptography) backend, use C(auto). + type: str + select_crypto_backend: + description: + - Determines which crypto backend to use. + - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl). + - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library. + - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in community.crypto 2.0.0. + From that point on, only the C(cryptography) backend will be available. + type: str + default: auto + choices: [ auto, cryptography, pyopenssl ] + format: + description: + - Determines which format the private key is written in. By default, PKCS1 (traditional OpenSSL format) + is used for all keys which support it. Please note that not every key can be exported in any format. + - The value C(auto) selects a fromat based on the key format. The value C(auto_ignore) does the same, + but for existing private key files, it will not force a regenerate when its format is not the automatically + selected one for generation. + - Note that if the format for an existing private key mismatches, the key is *regenerated* by default. + To change this behavior, use the I(format_mismatch) option. + - The I(format) option is only supported by the C(cryptography) backend. The C(pyopenssl) backend will + fail if a value different from C(auto_ignore) is used. + type: str + default: auto_ignore + choices: [ pkcs1, pkcs8, raw, auto, auto_ignore ] + format_mismatch: + description: + - Determines behavior of the module if the format of a private key does not match the expected format, but all + other parameters are as expected. + - If set to C(regenerate) (default), generates a new private key. + - If set to C(convert), the key will be converted to the new format instead. + - Only supported by the C(cryptography) backend. + type: str + default: regenerate + choices: [ regenerate, convert ] + regenerate: + description: + - Allows to configure in which situations the module is allowed to regenerate private keys. + The module will always generate a new key if the destination file does not exist. + - By default, the key will be regenerated when it doesn't match the module's options, + except when the key cannot be read or the passphrase does not match. Please note that + this B(changed) for Ansible 2.10. For Ansible 2.9, the behavior was as if C(full_idempotence) + is specified. + - If set to C(never), the module will fail if the key cannot be read or the passphrase + isn't matching, and will never regenerate an existing key. + - If set to C(fail), the module will fail if the key does not correspond to the module's + options. + - If set to C(partial_idempotence), the key will be regenerated if it does not conform to + the module's options. The key is B(not) regenerated if it cannot be read (broken file), + the key is protected by an unknown passphrase, or when they key is not protected by a + passphrase, but a passphrase is specified. + - If set to C(full_idempotence), the key will be regenerated if it does not conform to the + module's options. This is also the case if the key cannot be read (broken file), the key + is protected by an unknown passphrase, or when they key is not protected by a passphrase, + but a passphrase is specified. Make sure you have a B(backup) when using this option! + - If set to C(always), the module will always regenerate the key. This is equivalent to + setting I(force) to C(yes). + - Note that if I(format_mismatch) is set to C(convert) and everything matches except the + format, the key will always be converted, except if I(regenerate) is set to C(always). + type: str + choices: + - never + - fail + - partial_idempotence + - full_idempotence + - always + default: full_idempotence +seealso: +- module: community.crypto.x509_certificate +- module: community.crypto.openssl_csr +- module: community.crypto.openssl_dhparam +- module: community.crypto.openssl_pkcs12 +- module: community.crypto.openssl_publickey +''' diff --git a/plugins/module_utils/action_module.py b/plugins/module_utils/action_module.py new file mode 100644 index 00000000..05461727 --- /dev/null +++ b/plugins/module_utils/action_module.py @@ -0,0 +1,734 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2012-2013 Michael DeHaan +# Copyright (c) 2016 Toshio Kuratomi +# Copyright (c) 2019 Ansible Project +# Copyright (c) 2020 Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +# Parts taken from ansible.module_utils.basic and ansible.module_utils.common.warnings. + +# NOTE: THIS MUST NOT BE USED BY A MODULE! THIS IS ONLY FOR ACTION PLUGINS! + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import abc +import copy +import traceback + +from ansible import constants as C +from ansible.errors import AnsibleError +from ansible.module_utils import six +from ansible.module_utils.basic import AnsibleFallbackNotFound, SEQUENCETYPE, remove_values +from ansible.module_utils.common._collections_compat import ( + Mapping +) +from ansible.module_utils.common.parameters import ( + handle_aliases, + list_deprecations, + list_no_log_values, + PASS_VARS, + PASS_BOOLS, +) +from ansible.module_utils.common.validation import ( + check_mutually_exclusive, + check_required_arguments, + check_required_by, + check_required_if, + check_required_one_of, + check_required_together, + count_terms, + check_type_bool, + check_type_bits, + check_type_bytes, + check_type_float, + check_type_int, + check_type_jsonarg, + check_type_list, + check_type_dict, + check_type_path, + check_type_raw, + check_type_str, + safe_eval, +) +from ansible.module_utils.common.text.formatters import ( + lenient_lowercase, +) +from ansible.module_utils.parsing.convert_bool import BOOLEANS_FALSE, BOOLEANS_TRUE +from ansible.module_utils.six import ( + binary_type, + string_types, + text_type, +) +from ansible.module_utils._text import to_native, to_text +from ansible.plugins.action import ActionBase + + +class _ModuleExitException(Exception): + def __init__(self, result): + super(_ModuleExitException, self).__init__() + self.result = result + + +class AnsibleActionModule(object): + def __init__(self, action_plugin, argument_spec, bypass_checks=False, + mutually_exclusive=None, required_together=None, + required_one_of=None, supports_check_mode=False, + required_if=None, required_by=None): + # Internal data + self.__action_plugin = action_plugin + self.__warnings = [] + self.__deprecations = [] + + # AnsibleModule data + self._name = self.__action_plugin._task.action + self.argument_spec = argument_spec + self.supports_check_mode = supports_check_mode + self.check_mode = self.__action_plugin._play_context.check_mode + self.bypass_checks = bypass_checks + self.no_log = self.__action_plugin._play_context.no_log + + self.mutually_exclusive = mutually_exclusive + self.required_together = required_together + self.required_one_of = required_one_of + self.required_if = required_if + self.required_by = required_by + self._diff = self.__action_plugin._play_context.diff + self._verbosity = self.__action_plugin._display.verbosity + self._string_conversion_action = C.STRING_CONVERSION_ACTION + + self.aliases = {} + self._legal_inputs = [] + + self.params = copy.deepcopy(action_plugin._task.args) + self._set_fallbacks() + + # append to legal_inputs and then possibly check against them + try: + self.aliases = self._handle_aliases() + except (ValueError, TypeError) as e: + # Use exceptions here because it isn't safe to call fail_json until no_log is processed + raise _ModuleExitException(dict(failed=True, msg="Module alias error: %s" % to_native(e))) + + # Save parameter values that should never be logged + self.no_log_values = set() + self._handle_no_log_values() + + self._check_arguments() + + # check exclusive early + if not bypass_checks: + self._check_mutually_exclusive(mutually_exclusive) + + self._set_defaults(pre=True) + + self._CHECK_ARGUMENT_TYPES_DISPATCHER = { + 'str': self._check_type_str, + 'list': self._check_type_list, + 'dict': self._check_type_dict, + 'bool': self._check_type_bool, + 'int': self._check_type_int, + 'float': self._check_type_float, + 'path': self._check_type_path, + 'raw': self._check_type_raw, + 'jsonarg': self._check_type_jsonarg, + 'json': self._check_type_jsonarg, + 'bytes': self._check_type_bytes, + 'bits': self._check_type_bits, + } + if not bypass_checks: + self._check_required_arguments() + self._check_argument_types() + self._check_argument_values() + self._check_required_together(required_together) + self._check_required_one_of(required_one_of) + self._check_required_if(required_if) + self._check_required_by(required_by) + + self._set_defaults(pre=False) + + # deal with options sub-spec + self._handle_options() + + def _handle_aliases(self, spec=None, param=None, option_prefix=''): + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + + # this uses exceptions as it happens before we can safely call fail_json + alias_warnings = [] + alias_results, self._legal_inputs = handle_aliases(spec, param, alias_warnings=alias_warnings) + for option, alias in alias_warnings: + self.warn('Both option %s and its alias %s are set.' % (option_prefix + option, option_prefix + alias)) + + deprecated_aliases = [] + for i in spec.keys(): + if 'deprecated_aliases' in spec[i].keys(): + for alias in spec[i]['deprecated_aliases']: + deprecated_aliases.append(alias) + + for deprecation in deprecated_aliases: + if deprecation['name'] in param.keys(): + self.deprecate("Alias '%s' is deprecated. See the module docs for more information" % deprecation['name'], + version=deprecation.get('version'), date=deprecation.get('date'), + collection_name=deprecation.get('collection_name')) + return alias_results + + def _handle_no_log_values(self, spec=None, param=None): + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + + try: + self.no_log_values.update(list_no_log_values(spec, param)) + except TypeError as te: + self.fail_json(msg="Failure when processing no_log parameters. Module invocation will be hidden. " + "%s" % to_native(te), invocation={'module_args': 'HIDDEN DUE TO FAILURE'}) + + for message in list_deprecations(spec, param): + self.deprecate(message['msg'], version=message.get('version'), date=message.get('date'), + collection_name=message.get('collection_name')) + + def _check_arguments(self, spec=None, param=None, legal_inputs=None): + self._syslog_facility = 'LOG_USER' + unsupported_parameters = set() + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + if legal_inputs is None: + legal_inputs = self._legal_inputs + + for k in list(param.keys()): + + if k not in legal_inputs: + unsupported_parameters.add(k) + + for k in PASS_VARS: + # handle setting internal properties from internal ansible vars + param_key = '_ansible_%s' % k + if param_key in param: + if k in PASS_BOOLS: + setattr(self, PASS_VARS[k][0], self.boolean(param[param_key])) + else: + setattr(self, PASS_VARS[k][0], param[param_key]) + + # clean up internal top level params: + if param_key in self.params: + del self.params[param_key] + else: + # use defaults if not already set + if not hasattr(self, PASS_VARS[k][0]): + setattr(self, PASS_VARS[k][0], PASS_VARS[k][1]) + + if unsupported_parameters: + msg = "Unsupported parameters for (%s) module: %s" % (self._name, ', '.join(sorted(list(unsupported_parameters)))) + if self._options_context: + msg += " found in %s." % " -> ".join(self._options_context) + supported_parameters = list() + for key in sorted(spec.keys()): + if 'aliases' in spec[key] and spec[key]['aliases']: + supported_parameters.append("%s (%s)" % (key, ', '.join(sorted(spec[key]['aliases'])))) + else: + supported_parameters.append(key) + msg += " Supported parameters include: %s" % (', '.join(supported_parameters)) + self.fail_json(msg=msg) + if self.check_mode and not self.supports_check_mode: + self.exit_json(skipped=True, msg="action module (%s) does not support check mode" % self._name) + + def _count_terms(self, check, param=None): + if param is None: + param = self.params + return count_terms(check, param) + + def _check_mutually_exclusive(self, spec, param=None): + if param is None: + param = self.params + + try: + check_mutually_exclusive(spec, param) + except TypeError as e: + msg = to_native(e) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def _check_required_one_of(self, spec, param=None): + if spec is None: + return + + if param is None: + param = self.params + + try: + check_required_one_of(spec, param) + except TypeError as e: + msg = to_native(e) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def _check_required_together(self, spec, param=None): + if spec is None: + return + if param is None: + param = self.params + + try: + check_required_together(spec, param) + except TypeError as e: + msg = to_native(e) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def _check_required_by(self, spec, param=None): + if spec is None: + return + if param is None: + param = self.params + + try: + check_required_by(spec, param) + except TypeError as e: + self.fail_json(msg=to_native(e)) + + def _check_required_arguments(self, spec=None, param=None): + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + + try: + check_required_arguments(spec, param) + except TypeError as e: + msg = to_native(e) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def _check_required_if(self, spec, param=None): + ''' ensure that parameters which conditionally required are present ''' + if spec is None: + return + if param is None: + param = self.params + + try: + check_required_if(spec, param) + except TypeError as e: + msg = to_native(e) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def _check_argument_values(self, spec=None, param=None): + ''' ensure all arguments have the requested values, and there are no stray arguments ''' + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + for (k, v) in spec.items(): + choices = v.get('choices', None) + if choices is None: + continue + if isinstance(choices, SEQUENCETYPE) and not isinstance(choices, (binary_type, text_type)): + if k in param: + # Allow one or more when type='list' param with choices + if isinstance(param[k], list): + diff_list = ", ".join([item for item in param[k] if item not in choices]) + if diff_list: + choices_str = ", ".join([to_native(c) for c in choices]) + msg = "value of %s must be one or more of: %s. Got no match for: %s" % (k, choices_str, diff_list) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + elif param[k] not in choices: + # PyYaml converts certain strings to bools. If we can unambiguously convert back, do so before checking + # the value. If we can't figure this out, module author is responsible. + lowered_choices = None + if param[k] == 'False': + lowered_choices = lenient_lowercase(choices) + overlap = BOOLEANS_FALSE.intersection(choices) + if len(overlap) == 1: + # Extract from a set + (param[k],) = overlap + + if param[k] == 'True': + if lowered_choices is None: + lowered_choices = lenient_lowercase(choices) + overlap = BOOLEANS_TRUE.intersection(choices) + if len(overlap) == 1: + (param[k],) = overlap + + if param[k] not in choices: + choices_str = ", ".join([to_native(c) for c in choices]) + msg = "value of %s must be one of: %s, got: %s" % (k, choices_str, param[k]) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + else: + msg = "internal error: choices for argument %s are not iterable: %s" % (k, choices) + if self._options_context: + msg += " found in %s" % " -> ".join(self._options_context) + self.fail_json(msg=msg) + + def safe_eval(self, value, locals=None, include_exceptions=False): + return safe_eval(value, locals, include_exceptions) + + def _check_type_str(self, value, param=None, prefix=''): + opts = { + 'error': False, + 'warn': False, + 'ignore': True + } + + # Ignore, warn, or error when converting to a string. + allow_conversion = opts.get(self._string_conversion_action, True) + try: + return check_type_str(value, allow_conversion) + except TypeError: + common_msg = 'quote the entire value to ensure it does not change.' + from_msg = '{0!r}'.format(value) + to_msg = '{0!r}'.format(to_text(value)) + + if param is not None: + if prefix: + param = '{0}{1}'.format(prefix, param) + + from_msg = '{0}: {1!r}'.format(param, value) + to_msg = '{0}: {1!r}'.format(param, to_text(value)) + + if self._string_conversion_action == 'error': + msg = common_msg.capitalize() + raise TypeError(to_native(msg)) + elif self._string_conversion_action == 'warn': + msg = ('The value "{0}" (type {1.__class__.__name__}) was converted to "{2}" (type string). ' + 'If this does not look like what you expect, {3}').format(from_msg, value, to_msg, common_msg) + self.warn(to_native(msg)) + return to_native(value, errors='surrogate_or_strict') + + def _check_type_list(self, value): + return check_type_list(value) + + def _check_type_dict(self, value): + return check_type_dict(value) + + def _check_type_bool(self, value): + return check_type_bool(value) + + def _check_type_int(self, value): + return check_type_int(value) + + def _check_type_float(self, value): + return check_type_float(value) + + def _check_type_path(self, value): + return check_type_path(value) + + def _check_type_jsonarg(self, value): + return check_type_jsonarg(value) + + def _check_type_raw(self, value): + return check_type_raw(value) + + def _check_type_bytes(self, value): + return check_type_bytes(value) + + def _check_type_bits(self, value): + return check_type_bits(value) + + def _handle_options(self, argument_spec=None, params=None, prefix=''): + ''' deal with options to create sub spec ''' + if argument_spec is None: + argument_spec = self.argument_spec + if params is None: + params = self.params + + for (k, v) in argument_spec.items(): + wanted = v.get('type', None) + if wanted == 'dict' or (wanted == 'list' and v.get('elements', '') == 'dict'): + spec = v.get('options', None) + if v.get('apply_defaults', False): + if spec is not None: + if params.get(k) is None: + params[k] = {} + else: + continue + elif spec is None or k not in params or params[k] is None: + continue + + self._options_context.append(k) + + if isinstance(params[k], dict): + elements = [params[k]] + else: + elements = params[k] + + for idx, param in enumerate(elements): + if not isinstance(param, dict): + self.fail_json(msg="value of %s must be of type dict or list of dict" % k) + + new_prefix = prefix + k + if wanted == 'list': + new_prefix += '[%d]' % idx + new_prefix += '.' + + self._set_fallbacks(spec, param) + options_aliases = self._handle_aliases(spec, param, option_prefix=new_prefix) + + options_legal_inputs = list(spec.keys()) + list(options_aliases.keys()) + + self._check_arguments(spec, param, options_legal_inputs) + + # check exclusive early + if not self.bypass_checks: + self._check_mutually_exclusive(v.get('mutually_exclusive', None), param) + + self._set_defaults(pre=True, spec=spec, param=param) + + if not self.bypass_checks: + self._check_required_arguments(spec, param) + self._check_argument_types(spec, param, new_prefix) + self._check_argument_values(spec, param) + + self._check_required_together(v.get('required_together', None), param) + self._check_required_one_of(v.get('required_one_of', None), param) + self._check_required_if(v.get('required_if', None), param) + self._check_required_by(v.get('required_by', None), param) + + self._set_defaults(pre=False, spec=spec, param=param) + + # handle multi level options (sub argspec) + self._handle_options(spec, param, new_prefix) + self._options_context.pop() + + def _get_wanted_type(self, wanted, k): + if not callable(wanted): + if wanted is None: + # Mostly we want to default to str. + # For values set to None explicitly, return None instead as + # that allows a user to unset a parameter + wanted = 'str' + try: + type_checker = self._CHECK_ARGUMENT_TYPES_DISPATCHER[wanted] + except KeyError: + self.fail_json(msg="implementation error: unknown type %s requested for %s" % (wanted, k)) + else: + # set the type_checker to the callable, and reset wanted to the callable's name (or type if it doesn't have one, ala MagicMock) + type_checker = wanted + wanted = getattr(wanted, '__name__', to_native(type(wanted))) + + return type_checker, wanted + + def _handle_elements(self, wanted, param, values): + type_checker, wanted_name = self._get_wanted_type(wanted, param) + validated_params = [] + # Get param name for strings so we can later display this value in a useful error message if needed + # Only pass 'kwargs' to our checkers and ignore custom callable checkers + kwargs = {} + if wanted_name == 'str' and isinstance(wanted, string_types): + if isinstance(param, string_types): + kwargs['param'] = param + elif isinstance(param, dict): + kwargs['param'] = list(param.keys())[0] + for value in values: + try: + validated_params.append(type_checker(value, **kwargs)) + except (TypeError, ValueError) as e: + msg = "Elements value for option %s" % param + if self._options_context: + msg += " found in '%s'" % " -> ".join(self._options_context) + msg += " is of type %s and we were unable to convert to %s: %s" % (type(value), wanted_name, to_native(e)) + self.fail_json(msg=msg) + return validated_params + + def _check_argument_types(self, spec=None, param=None, prefix=''): + ''' ensure all arguments have the requested type ''' + + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + + for (k, v) in spec.items(): + wanted = v.get('type', None) + if k not in param: + continue + + value = param[k] + if value is None: + continue + + type_checker, wanted_name = self._get_wanted_type(wanted, k) + # Get param name for strings so we can later display this value in a useful error message if needed + # Only pass 'kwargs' to our checkers and ignore custom callable checkers + kwargs = {} + if wanted_name == 'str' and isinstance(type_checker, string_types): + kwargs['param'] = list(param.keys())[0] + + # Get the name of the parent key if this is a nested option + if prefix: + kwargs['prefix'] = prefix + + try: + param[k] = type_checker(value, **kwargs) + wanted_elements = v.get('elements', None) + if wanted_elements: + if wanted != 'list' or not isinstance(param[k], list): + msg = "Invalid type %s for option '%s'" % (wanted_name, param) + if self._options_context: + msg += " found in '%s'." % " -> ".join(self._options_context) + msg += ", elements value check is supported only with 'list' type" + self.fail_json(msg=msg) + param[k] = self._handle_elements(wanted_elements, k, param[k]) + + except (TypeError, ValueError) as e: + msg = "argument %s is of type %s" % (k, type(value)) + if self._options_context: + msg += " found in '%s'." % " -> ".join(self._options_context) + msg += " and we were unable to convert to %s: %s" % (wanted_name, to_native(e)) + self.fail_json(msg=msg) + + def _set_defaults(self, pre=True, spec=None, param=None): + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + for (k, v) in spec.items(): + default = v.get('default', None) + if pre is True: + # this prevents setting defaults on required items + if default is not None and k not in param: + param[k] = default + else: + # make sure things without a default still get set None + if k not in param: + param[k] = default + + def _set_fallbacks(self, spec=None, param=None): + if spec is None: + spec = self.argument_spec + if param is None: + param = self.params + + for (k, v) in spec.items(): + fallback = v.get('fallback', (None,)) + fallback_strategy = fallback[0] + fallback_args = [] + fallback_kwargs = {} + if k not in param and fallback_strategy is not None: + for item in fallback[1:]: + if isinstance(item, dict): + fallback_kwargs = item + else: + fallback_args = item + try: + param[k] = fallback_strategy(*fallback_args, **fallback_kwargs) + except AnsibleFallbackNotFound: + continue + + def warn(self, warning): + # Copied from ansible.module_utils.common.warnings: + if isinstance(warning, string_types): + self.__warnings.append(warning) + else: + raise TypeError("warn requires a string not a %s" % type(warning)) + + def deprecate(self, msg, version=None, date=None, collection_name=None): + if version is not None and date is not None: + raise AssertionError("implementation error -- version and date must not both be set") + + # Copied from ansible.module_utils.common.warnings: + if isinstance(msg, string_types): + # For compatibility, we accept that neither version nor date is set, + # and treat that the same as if version would haven been set + if date is not None: + self.__deprecations.append({'msg': msg, 'date': date, 'collection_name': collection_name}) + else: + self.__deprecations.append({'msg': msg, 'version': version, 'collection_name': collection_name}) + else: + raise TypeError("deprecate requires a string not a %s" % type(msg)) + + def _return_formatted(self, kwargs): + if 'invocation' not in kwargs: + kwargs['invocation'] = {'module_args': self.params} + + if 'warnings' in kwargs: + if isinstance(kwargs['warnings'], list): + for w in kwargs['warnings']: + self.warn(w) + else: + self.warn(kwargs['warnings']) + + if self.__warnings: + kwargs['warnings'] = self.__warnings + + if 'deprecations' in kwargs: + if isinstance(kwargs['deprecations'], list): + for d in kwargs['deprecations']: + if isinstance(d, SEQUENCETYPE) and len(d) == 2: + self.deprecate(d[0], version=d[1]) + elif isinstance(d, Mapping): + self.deprecate(d['msg'], version=d.get('version'), date=d.get('date'), + collection_name=d.get('collection_name')) + else: + self.deprecate(d) # pylint: disable=ansible-deprecated-no-version + else: + self.deprecate(kwargs['deprecations']) # pylint: disable=ansible-deprecated-no-version + + if self.__deprecations: + kwargs['deprecations'] = self.__deprecations + + kwargs = remove_values(kwargs, self.no_log_values) + raise _ModuleExitException(kwargs) + + def exit_json(self, **kwargs): + result = dict(kwargs) + if 'failed' not in result: + result['failed'] = False + self._return_formatted(result) + + def fail_json(self, msg, **kwargs): + result = dict(kwargs) + result['failed'] = True + result['msg'] = msg + self._return_formatted(result) + + +@six.add_metaclass(abc.ABCMeta) +class ActionModuleBase(ActionBase): + @abc.abstractmethod + def setup_module(self): + """Return pair (ArgumentSpec, kwargs).""" + pass + + @abc.abstractmethod + def run_module(self, module): + """Run module code""" + module.fail_json(msg='Not implemented.') + + def run(self, tmp=None, task_vars=None): + if task_vars is None: + task_vars = dict() + + result = super(ActionModuleBase, self).run(tmp, task_vars) + del tmp # tmp no longer has any effect + + try: + argument_spec, kwargs = self.setup_module() + module = argument_spec.create_ansible_module_helper(AnsibleActionModule, (self, ), **kwargs) + self.run_module(module) + raise AnsibleError('Internal error: action module did not call module.exit_json()') + except _ModuleExitException as mee: + result.update(mee.result) + return result + except Exception as dummy: + result['failed'] = True + result['msg'] = 'MODULE FAILURE' + result['exception'] = traceback.format_exc() + return result diff --git a/plugins/module_utils/crypto/module_backends/common.py b/plugins/module_utils/crypto/module_backends/common.py index 7f22caff..39222227 100644 --- a/plugins/module_utils/crypto/module_backends/common.py +++ b/plugins/module_utils/crypto/module_backends/common.py @@ -19,8 +19,9 @@ class ArgumentSpec: self.required_if = required_if or [] self.required_by = required_by or {} - def create_ansible_module(self, **kwargs): - return AnsibleModule( + def create_ansible_module_helper(self, clazz, args, **kwargs): + return clazz( + *args, argument_spec=self.argument_spec, mutually_exclusive=self.mutually_exclusive, required_together=self.required_together, @@ -28,3 +29,6 @@ class ArgumentSpec: required_if=self.required_if, required_by=self.required_by, **kwargs) + + def create_ansible_module(self, **kwargs): + return self.create_ansible_module_helper(AnsibleModule, (), **kwargs) diff --git a/plugins/module_utils/crypto/module_backends/privatekey.py b/plugins/module_utils/crypto/module_backends/privatekey.py new file mode 100644 index 00000000..85d2697e --- /dev/null +++ b/plugins/module_utils/crypto/module_backends/privatekey.py @@ -0,0 +1,590 @@ +# -*- coding: utf-8 -*- +# +# Copyright: (c) 2016, Yanis Guenane +# Copyright: (c) 2020, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import abc +import base64 +import traceback + +from distutils.version import LooseVersion + +from ansible.module_utils import six +from ansible.module_utils.basic import missing_required_lib +from ansible.module_utils._text import to_bytes + +from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( + CRYPTOGRAPHY_HAS_X25519, + CRYPTOGRAPHY_HAS_X25519_FULL, + CRYPTOGRAPHY_HAS_X448, + CRYPTOGRAPHY_HAS_ED25519, + CRYPTOGRAPHY_HAS_ED448, + OpenSSLObjectError, + OpenSSLBadPassphraseError, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + load_privatekey, + get_fingerprint_of_privatekey, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.identify import ( + identify_private_key_format, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.common import ArgumentSpec + + +MINIMAL_PYOPENSSL_VERSION = '0.6' +MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3' + +PYOPENSSL_IMP_ERR = None +try: + import OpenSSL + from OpenSSL import crypto + PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) +except ImportError: + PYOPENSSL_IMP_ERR = traceback.format_exc() + PYOPENSSL_FOUND = False +else: + PYOPENSSL_FOUND = True + +CRYPTOGRAPHY_IMP_ERR = None +try: + import cryptography + import cryptography.exceptions + import cryptography.hazmat.backends + import cryptography.hazmat.primitives.serialization + import cryptography.hazmat.primitives.asymmetric.rsa + import cryptography.hazmat.primitives.asymmetric.dsa + import cryptography.hazmat.primitives.asymmetric.ec + import cryptography.hazmat.primitives.asymmetric.utils + CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) +except ImportError: + CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() + CRYPTOGRAPHY_FOUND = False +else: + CRYPTOGRAPHY_FOUND = True + + +class PrivateKeyError(OpenSSLObjectError): + pass + + +# From the object called `module`, only the following properties are used: +# +# - module.params[] +# - module.warn(msg: str) +# - module.fail_json(msg: str, **kwargs) + + +@six.add_metaclass(abc.ABCMeta) +class PrivateKeyBackend: + def __init__(self, module, backend): + self.module = module + self.type = module.params['type'] + self.size = module.params['size'] + self.curve = module.params['curve'] + self.passphrase = module.params['passphrase'] + self.cipher = module.params['cipher'] + self.format = module.params['format'] + self.format_mismatch = module.params.get('format_mismatch', 'regenerate') + self.regenerate = module.params.get('regenerate', 'full_idempotence') + self.backend = backend + + self.private_key = None + + self.existing_private_key = None + self.existing_private_key_bytes = None + + @abc.abstractmethod + def generate_private_key(self): + """(Re-)Generate private key.""" + pass + + def convert_private_key(self): + """Convert existing private key (self.existing_private_key) to new private key (self.private_key). + + This is effectively a copy without active conversion. The conversion is done + during load and store; get_private_key_data() uses the destination format to + serialize the key. + """ + self._ensure_existing_private_key_loaded() + self.private_key = self.existing_private_key + + @abc.abstractmethod + def get_private_key_data(self): + """Return bytes for self.private_key.""" + pass + + def set_existing(self, privatekey_bytes): + """Set existing private key bytes. None indicates that the key does not exist.""" + self.existing_private_key_bytes = privatekey_bytes + + def has_existing(self): + """Query whether an existing private key is/has been there.""" + return self.existing_private_key_bytes is not None + + @abc.abstractmethod + def _check_passphrase(self): + """Check whether provided passphrase matches, assuming self.existing_private_key_bytes has been populated.""" + pass + + @abc.abstractmethod + def _ensure_existing_private_key_loaded(self): + """Make sure that self.existing_private_key is populated from self.existing_private_key_bytes.""" + pass + + @abc.abstractmethod + def _check_size_and_type(self): + """Check whether provided size and type matches, assuming self.existing_private_key has been populated.""" + pass + + @abc.abstractmethod + def _check_format(self): + """Check whether the key file format, assuming self.existing_private_key and self.existing_private_key_bytes has been populated.""" + pass + + def needs_regeneration(self): + """Check whether a regeneration is necessary.""" + if self.regenerate == 'always': + return True + if not self.has_existing(): + # key does not exist + return True + if not self._check_passphrase(): + if self.regenerate == 'full_idempotence': + return True + self.module.fail_json(msg='Unable to read the key. The key is protected with a another passphrase / no passphrase or broken.' + ' Will not proceed. To force regeneration, call the module with `generate`' + ' set to `full_idempotence` or `always`, or with `force=yes`.') + self._ensure_existing_private_key_loaded() + if self.regenerate != 'never': + if not self._check_size_and_type(): + if self.regenerate in ('partial_idempotence', 'full_idempotence'): + return True + self.module.fail_json(msg='Key has wrong type and/or size.' + ' Will not proceed. To force regeneration, call the module with `generate`' + ' set to `partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`.') + # During generation step, regenerate if format does not match and format_mismatch == 'regenerate' + if self.format_mismatch == 'regenerate' and self.regenerate != 'never': + if not self._check_format(): + if self.regenerate in ('partial_idempotence', 'full_idempotence'): + return True + self.module.fail_json(msg='Key has wrong format.' + ' Will not proceed. To force regeneration, call the module with `generate`' + ' set to `partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`.' + ' To convert the key, set `format_mismatch` to `convert`.') + return False + + def needs_conversion(self): + """Check whether a conversion is necessary. Must only be called if needs_regeneration() returned False.""" + # During conversion step, convert if format does not match and format_mismatch == 'convert' + self._ensure_existing_private_key_loaded() + return self.has_existing() and self.format_mismatch == 'convert' and not self._check_format() + + def _get_fingerprint(self): + if self.private_key: + return get_fingerprint_of_privatekey(self.private_key, backend=self.backend) + try: + self._ensure_existing_private_key_loaded() + except Exception as dummy: + # Ignore errors + pass + if self.existing_private_key: + return get_fingerprint_of_privatekey(self.existing_private_key, backend=self.backend) + + def dump(self, include_key): + """Serialize the object into a dictionary.""" + + if not self.private_key: + try: + self._ensure_existing_private_key_loaded() + except Exception as dummy: + # Ignore errors + pass + result = { + 'type': self.type, + 'size': self.size, + 'fingerprint': self._get_fingerprint(), + } + if self.type == 'ECC': + result['curve'] = self.curve + if include_key: + # Get hold of private key bytes + pk_bytes = self.existing_private_key_bytes + if self.private_key is not None: + pk_bytes = self.get_private_key_data() + # Store result + if pk_bytes: + if identify_private_key_format(pk_bytes) == 'raw': + result['privatekey'] = base64.b64encode(pk_bytes) + else: + result['privatekey'] = pk_bytes.decode('utf-8') + else: + result['privatekey'] = None + + return result + + +# Implementation with using pyOpenSSL +class PrivateKeyPyOpenSSLBackend(PrivateKeyBackend): + + def __init__(self, module): + super(PrivateKeyPyOpenSSLBackend, self).__init__(module=module, backend='pyopenssl') + + if self.type == 'RSA': + self.openssl_type = crypto.TYPE_RSA + elif self.type == 'DSA': + self.openssl_type = crypto.TYPE_DSA + else: + self.module.fail_json(msg="PyOpenSSL backend only supports RSA and DSA keys.") + + if self.format != 'auto_ignore': + self.module.fail_json(msg="PyOpenSSL backend only supports auto_ignore format.") + + def generate_private_key(self): + """(Re-)Generate private key.""" + self.private_key = crypto.PKey() + try: + self.private_key.generate_key(self.openssl_type, self.size) + except (TypeError, ValueError) as exc: + raise PrivateKeyError(exc) + + def _ensure_existing_private_key_loaded(self): + if self.existing_private_key is None and self.has_existing(): + try: + self.existing_private_key = load_privatekey( + None, self.passphrase, content=self.existing_private_key_bytes, backend=self.backend) + except OpenSSLBadPassphraseError as exc: + raise PrivateKeyError(exc) + + def get_private_key_data(self): + """Return bytes for self.private_key""" + if self.cipher and self.passphrase: + return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.private_key, + self.cipher, to_bytes(self.passphrase)) + else: + return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.private_key) + + def _check_passphrase(self): + try: + load_privatekey(None, self.passphrase, content=self.existing_private_key_bytes, backend=self.backend) + return True + except Exception as dummy: + return False + + def _check_size_and_type(self): + return self.size == self.existing_private_key.bits() and self.openssl_type == self.existing_private_key.type() + + def _check_format(self): + # Not supported by this backend + return True + + +# Implementation with using cryptography +class PrivateKeyCryptographyBackend(PrivateKeyBackend): + + def _get_ec_class(self, ectype): + ecclass = cryptography.hazmat.primitives.asymmetric.ec.__dict__.get(ectype) + if ecclass is None: + self.module.fail_json(msg='Your cryptography version does not support {0}'.format(ectype)) + return ecclass + + def _add_curve(self, name, ectype, deprecated=False): + def create(size): + ecclass = self._get_ec_class(ectype) + return ecclass() + + def verify(privatekey): + ecclass = self._get_ec_class(ectype) + return isinstance(privatekey.private_numbers().public_numbers.curve, ecclass) + + self.curves[name] = { + 'create': create, + 'verify': verify, + 'deprecated': deprecated, + } + + def __init__(self, module): + super(PrivateKeyCryptographyBackend, self).__init__(module=module, backend='cryptography') + + self.curves = dict() + self._add_curve('secp384r1', 'SECP384R1') + self._add_curve('secp521r1', 'SECP521R1') + self._add_curve('secp224r1', 'SECP224R1') + self._add_curve('secp192r1', 'SECP192R1') + self._add_curve('secp256r1', 'SECP256R1') + self._add_curve('secp256k1', 'SECP256K1') + self._add_curve('brainpoolP256r1', 'BrainpoolP256R1', deprecated=True) + self._add_curve('brainpoolP384r1', 'BrainpoolP384R1', deprecated=True) + self._add_curve('brainpoolP512r1', 'BrainpoolP512R1', deprecated=True) + self._add_curve('sect571k1', 'SECT571K1', deprecated=True) + self._add_curve('sect409k1', 'SECT409K1', deprecated=True) + self._add_curve('sect283k1', 'SECT283K1', deprecated=True) + self._add_curve('sect233k1', 'SECT233K1', deprecated=True) + self._add_curve('sect163k1', 'SECT163K1', deprecated=True) + self._add_curve('sect571r1', 'SECT571R1', deprecated=True) + self._add_curve('sect409r1', 'SECT409R1', deprecated=True) + self._add_curve('sect283r1', 'SECT283R1', deprecated=True) + self._add_curve('sect233r1', 'SECT233R1', deprecated=True) + self._add_curve('sect163r2', 'SECT163R2', deprecated=True) + + self.cryptography_backend = cryptography.hazmat.backends.default_backend() + + if not CRYPTOGRAPHY_HAS_X25519 and self.type == 'X25519': + self.module.fail_json(msg='Your cryptography version does not support X25519') + if not CRYPTOGRAPHY_HAS_X25519_FULL and self.type == 'X25519': + self.module.fail_json(msg='Your cryptography version does not support X25519 serialization') + if not CRYPTOGRAPHY_HAS_X448 and self.type == 'X448': + self.module.fail_json(msg='Your cryptography version does not support X448') + if not CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519': + self.module.fail_json(msg='Your cryptography version does not support Ed25519') + if not CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448': + self.module.fail_json(msg='Your cryptography version does not support Ed448') + + def _get_wanted_format(self): + if self.format not in ('auto', 'auto_ignore'): + return self.format + if self.type in ('X25519', 'X448', 'Ed25519', 'Ed448'): + return 'pkcs8' + else: + return 'pkcs1' + + def generate_private_key(self): + """(Re-)Generate private key.""" + try: + if self.type == 'RSA': + self.private_key = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key( + public_exponent=65537, # OpenSSL always uses this + key_size=self.size, + backend=self.cryptography_backend + ) + if self.type == 'DSA': + self.private_key = cryptography.hazmat.primitives.asymmetric.dsa.generate_private_key( + key_size=self.size, + backend=self.cryptography_backend + ) + if CRYPTOGRAPHY_HAS_X25519_FULL and self.type == 'X25519': + self.private_key = cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.generate() + if CRYPTOGRAPHY_HAS_X448 and self.type == 'X448': + self.private_key = cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.generate() + if CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519': + self.private_key = cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.generate() + if CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448': + self.private_key = cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.generate() + if self.type == 'ECC' and self.curve in self.curves: + if self.curves[self.curve]['deprecated']: + self.module.warn('Elliptic curves of type {0} should not be used for new keys!'.format(self.curve)) + self.private_key = cryptography.hazmat.primitives.asymmetric.ec.generate_private_key( + curve=self.curves[self.curve]['create'](self.size), + backend=self.cryptography_backend + ) + except cryptography.exceptions.UnsupportedAlgorithm as dummy: + self.module.fail_json(msg='Cryptography backend does not support the algorithm required for {0}'.format(self.type)) + + def get_private_key_data(self): + """Return bytes for self.private_key""" + # Select export format and encoding + try: + export_format = self._get_wanted_format() + export_encoding = cryptography.hazmat.primitives.serialization.Encoding.PEM + if export_format == 'pkcs1': + # "TraditionalOpenSSL" format is PKCS1 + export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.TraditionalOpenSSL + elif export_format == 'pkcs8': + export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8 + elif export_format == 'raw': + export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.Raw + export_encoding = cryptography.hazmat.primitives.serialization.Encoding.Raw + except AttributeError: + self.module.fail_json(msg='Cryptography backend does not support the selected output format "{0}"'.format(self.format)) + + # Select key encryption + encryption_algorithm = cryptography.hazmat.primitives.serialization.NoEncryption() + if self.cipher and self.passphrase: + if self.cipher == 'auto': + encryption_algorithm = cryptography.hazmat.primitives.serialization.BestAvailableEncryption(to_bytes(self.passphrase)) + else: + self.module.fail_json(msg='Cryptography backend can only use "auto" for cipher option.') + + # Serialize key + try: + return self.private_key.private_bytes( + encoding=export_encoding, + format=export_format, + encryption_algorithm=encryption_algorithm + ) + except ValueError as dummy: + self.module.fail_json( + msg='Cryptography backend cannot serialize the private key in the required format "{0}"'.format(self.format) + ) + except Exception as dummy: + self.module.fail_json( + msg='Error while serializing the private key in the required format "{0}"'.format(self.format), + exception=traceback.format_exc() + ) + + def _load_privatekey(self): + data = self.existing_private_key_bytes + try: + # Interpret bytes depending on format. + format = identify_private_key_format(data) + if format == 'raw': + if len(data) == 56 and CRYPTOGRAPHY_HAS_X448: + return cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.from_private_bytes(data) + if len(data) == 57 and CRYPTOGRAPHY_HAS_ED448: + return cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.from_private_bytes(data) + if len(data) == 32: + if CRYPTOGRAPHY_HAS_X25519 and (self.type == 'X25519' or not CRYPTOGRAPHY_HAS_ED25519): + return cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.from_private_bytes(data) + if CRYPTOGRAPHY_HAS_ED25519 and (self.type == 'Ed25519' or not CRYPTOGRAPHY_HAS_X25519): + return cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(data) + if CRYPTOGRAPHY_HAS_X25519 and CRYPTOGRAPHY_HAS_ED25519: + try: + return cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.from_private_bytes(data) + except Exception: + return cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(data) + raise PrivateKeyError('Cannot load raw key') + else: + return cryptography.hazmat.primitives.serialization.load_pem_private_key( + data, + None if self.passphrase is None else to_bytes(self.passphrase), + backend=self.cryptography_backend + ) + except Exception as e: + raise PrivateKeyError(e) + + def _ensure_existing_private_key_loaded(self): + if self.existing_private_key is None and self.has_existing(): + self.existing_private_key = self._load_privatekey() + + def _check_passphrase(self): + try: + format = identify_private_key_format(self.existing_private_key_bytes) + if format == 'raw': + # Raw keys cannot be encrypted. To avoid incompatibilities, we try to + # actually load the key (and return False when this fails). + self._load_privatekey() + # Loading the key succeeded. Only return True when no passphrase was + # provided. + return self.passphrase is None + else: + return cryptography.hazmat.primitives.serialization.load_pem_private_key( + self.existing_private_key_bytes, + None if self.passphrase is None else to_bytes(self.passphrase), + backend=self.cryptography_backend + ) + except Exception as dummy: + return False + + def _check_size_and_type(self): + if isinstance(self.existing_private_key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey): + return self.type == 'RSA' and self.size == self.existing_private_key.key_size + if isinstance(self.existing_private_key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey): + return self.type == 'DSA' and self.size == self.existing_private_key.key_size + if CRYPTOGRAPHY_HAS_X25519 and isinstance(self.existing_private_key, cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey): + return self.type == 'X25519' + if CRYPTOGRAPHY_HAS_X448 and isinstance(self.existing_private_key, cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey): + return self.type == 'X448' + if CRYPTOGRAPHY_HAS_ED25519 and isinstance(self.existing_private_key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey): + return self.type == 'Ed25519' + if CRYPTOGRAPHY_HAS_ED448 and isinstance(self.existing_private_key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey): + return self.type == 'Ed448' + if isinstance(self.existing_private_key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey): + if self.type != 'ECC': + return False + if self.curve not in self.curves: + return False + return self.curves[self.curve]['verify'](self.existing_private_key) + + return False + + def _check_format(self): + if self.format == 'auto_ignore': + return True + try: + format = identify_private_key_format(self.existing_private_key_bytes) + return format == self._get_wanted_format() + except Exception as dummy: + return False + + +def select_backend(module, backend): + if backend == 'auto': + # Detection what is possible + can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) + can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) + + # Decision + if module.params['cipher'] and module.params['passphrase'] and module.params['cipher'] != 'auto': + # First try pyOpenSSL, then cryptography + if can_use_pyopenssl: + backend = 'pyopenssl' + elif can_use_cryptography: + backend = 'cryptography' + else: + # First try cryptography, then pyOpenSSL + if can_use_cryptography: + backend = 'cryptography' + elif can_use_pyopenssl: + backend = 'pyopenssl' + + # Success? + if backend == 'auto': + module.fail_json(msg=("Can't detect any of the required Python libraries " + "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( + MINIMAL_CRYPTOGRAPHY_VERSION, + MINIMAL_PYOPENSSL_VERSION)) + if backend == 'pyopenssl': + if not PYOPENSSL_FOUND: + module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)), + exception=PYOPENSSL_IMP_ERR) + module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', + version='2.0.0', collection_name='community.crypto') + return backend, PrivateKeyPyOpenSSLBackend(module) + elif backend == 'cryptography': + if not CRYPTOGRAPHY_FOUND: + module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), + exception=CRYPTOGRAPHY_IMP_ERR) + return backend, PrivateKeyCryptographyBackend(module) + else: + raise Exception('Unsupported value for backend: {0}'.format(backend)) + + +def get_privatekey_argument_spec(): + return ArgumentSpec( + argument_spec=dict( + size=dict(type='int', default=4096), + type=dict(type='str', default='RSA', choices=[ + 'DSA', 'ECC', 'Ed25519', 'Ed448', 'RSA', 'X25519', 'X448' + ]), + curve=dict(type='str', choices=[ + 'secp384r1', 'secp521r1', 'secp224r1', 'secp192r1', 'secp256r1', + 'secp256k1', 'brainpoolP256r1', 'brainpoolP384r1', 'brainpoolP512r1', + 'sect571k1', 'sect409k1', 'sect283k1', 'sect233k1', 'sect163k1', + 'sect571r1', 'sect409r1', 'sect283r1', 'sect233r1', 'sect163r2', + ]), + passphrase=dict(type='str', no_log=True), + cipher=dict(type='str'), + format=dict(type='str', default='auto_ignore', choices=['pkcs1', 'pkcs8', 'raw', 'auto', 'auto_ignore']), + format_mismatch=dict(type='str', default='regenerate', choices=['regenerate', 'convert']), + select_crypto_backend=dict(type='str', choices=['auto', 'pyopenssl', 'cryptography'], default='auto'), + regenerate=dict( + type='str', + default='full_idempotence', + choices=['never', 'fail', 'partial_idempotence', 'full_idempotence', 'always'] + ), + ), + required_together=[ + ['cipher', 'passphrase'] + ], + required_if=[ + ['type', 'ECC', ['curve']], + ], + ) diff --git a/plugins/module_utils/crypto/support.py b/plugins/module_utils/crypto/support.py index a84dd4f4..20b1dde1 100644 --- a/plugins/module_utils/crypto/support.py +++ b/plugins/module_utils/crypto/support.py @@ -83,11 +83,9 @@ def get_fingerprint_of_bytes(source): return fingerprint -def get_fingerprint(path, passphrase=None, content=None, backend='pyopenssl'): +def get_fingerprint_of_privatekey(privatekey, backend='pyopenssl'): """Generate the fingerprint of the public key. """ - privatekey = load_privatekey(path, passphrase=passphrase, content=content, check_passphrase=False, backend=backend) - if backend == 'pyopenssl': try: publickey = crypto.dump_publickey(crypto.FILETYPE_ASN1, privatekey) @@ -112,6 +110,14 @@ def get_fingerprint(path, passphrase=None, content=None, backend='pyopenssl'): return get_fingerprint_of_bytes(publickey) +def get_fingerprint(path, passphrase=None, content=None, backend='pyopenssl'): + """Generate the fingerprint of the public key. """ + + privatekey = load_privatekey(path, passphrase=passphrase, content=content, check_passphrase=False, backend=backend) + + return get_fingerprint_of_privatekey(privatekey, backend=backend) + + def load_privatekey(path, passphrase=None, check_passphrase=True, content=None, backend='pyopenssl'): """Load the specified OpenSSL private key. @@ -343,6 +349,10 @@ class OpenSSLObject(object): def remove(self, module): """Remove the resource from the filesystem.""" + if self.check_mode: + if os.path.exists(self.path): + self.changed = True + return try: os.remove(self.path) diff --git a/plugins/modules/openssl_privatekey.py b/plugins/modules/openssl_privatekey.py index 25b9eaaa..017f8cba 100644 --- a/plugins/modules/openssl_privatekey.py +++ b/plugins/modules/openssl_privatekey.py @@ -14,23 +14,6 @@ module: openssl_privatekey short_description: Generate OpenSSL private keys description: - This module allows one to (re)generate OpenSSL private keys. - - One can generate L(RSA,https://en.wikipedia.org/wiki/RSA_%28cryptosystem%29), - L(DSA,https://en.wikipedia.org/wiki/Digital_Signature_Algorithm), - L(ECC,https://en.wikipedia.org/wiki/Elliptic-curve_cryptography) or - L(EdDSA,https://en.wikipedia.org/wiki/EdDSA) private keys. - - Keys are generated in PEM format. - - "Please note that the module regenerates private keys if they don't match - the module's options. In particular, if you provide another passphrase - (or specify none), change the keysize, etc., the private key will be - regenerated. If you are concerned that this could **overwrite your private key**, - consider using the I(backup) option." - - "The module can use the cryptography Python library, or the pyOpenSSL Python - library. By default, it tries to detect which one is available. This can be - overridden with the I(select_crypto_backend) option. Please note that the - PyOpenSSL backend was deprecated in Ansible 2.9 and will be removed in Ansible 2.13." -requirements: - - Either cryptography >= 1.2.3 (older versions might work as well) - - Or pyOpenSSL author: - Yanis Guenane (@Spredzy) - Felix Fontein (@felixfontein) @@ -41,48 +24,6 @@ options: type: str default: present choices: [ absent, present ] - size: - description: - - Size (in bits) of the TLS/SSL key to generate. - type: int - default: 4096 - type: - description: - - The algorithm used to generate the TLS/SSL private key. - - Note that C(ECC), C(X25519), C(X448), C(Ed25519) and C(Ed448) require the C(cryptography) backend. - C(X25519) needs cryptography 2.5 or newer, while C(X448), C(Ed25519) and C(Ed448) require - cryptography 2.6 or newer. For C(ECC), the minimal cryptography version required depends on the - I(curve) option. - type: str - default: RSA - choices: [ DSA, ECC, Ed25519, Ed448, RSA, X25519, X448 ] - curve: - description: - - Note that not all curves are supported by all versions of C(cryptography). - - For maximal interoperability, C(secp384r1) or C(secp256r1) should be used. - - We use the curve names as defined in the - L(IANA registry for TLS,https://www.iana.org/assignments/tls-parameters/tls-parameters.xhtml#tls-parameters-8). - type: str - choices: - - secp384r1 - - secp521r1 - - secp224r1 - - secp192r1 - - secp256r1 - - secp256k1 - - brainpoolP256r1 - - brainpoolP384r1 - - brainpoolP512r1 - - sect571k1 - - sect409k1 - - sect283k1 - - sect233k1 - - sect163k1 - - sect571r1 - - sect409r1 - - sect283r1 - - sect233r1 - - sect163r2 force: description: - Should the key be regenerated even if it already exists. @@ -90,56 +31,13 @@ options: default: no path: description: - - Name of the file in which the generated TLS/SSL private key will be written. It will have 0600 mode. + - Name of the file in which the generated TLS/SSL private key will be written. It will have C(0600) mode + if I(mode) is not explicitly set. type: path required: true - passphrase: - description: - - The passphrase for the private key. - type: str - cipher: - description: - - The cipher to encrypt the private key. (Valid values can be found by - running `openssl list -cipher-algorithms` or `openssl list-cipher-algorithms`, - depending on your OpenSSL version.) - - When using the C(cryptography) backend, use C(auto). - type: str - select_crypto_backend: - description: - - Determines which crypto backend to use. - - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl). - - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library. - - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. - - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in community.crypto 2.0.0. - From that point on, only the C(cryptography) backend will be available. - type: str - default: auto - choices: [ auto, cryptography, pyopenssl ] format: - description: - - Determines which format the private key is written in. By default, PKCS1 (traditional OpenSSL format) - is used for all keys which support it. Please note that not every key can be exported in any format. - - The value C(auto) selects a fromat based on the key format. The value C(auto_ignore) does the same, - but for existing private key files, it will not force a regenerate when its format is not the automatically - selected one for generation. - - Note that if the format for an existing private key mismatches, the key is *regenerated* by default. - To change this behavior, use the I(format_mismatch) option. - - The I(format) option is only supported by the C(cryptography) backend. The C(pyopenssl) backend will - fail if a value different from C(auto_ignore) is used. - type: str - default: auto_ignore - choices: [ pkcs1, pkcs8, raw, auto, auto_ignore ] version_added: '1.0.0' format_mismatch: - description: - - Determines behavior of the module if the format of a private key does not match the expected format, but all - other parameters are as expected. - - If set to C(regenerate) (default), generates a new private key. - - If set to C(convert), the key will be converted to the new format instead. - - Only supported by the C(cryptography) backend. - type: str - default: regenerate - choices: [ regenerate, convert ] version_added: '1.0.0' backup: description: @@ -158,46 +56,13 @@ options: default: no version_added: '1.0.0' regenerate: - description: - - Allows to configure in which situations the module is allowed to regenerate private keys. - The module will always generate a new key if the destination file does not exist. - - By default, the key will be regenerated when it doesn't match the module's options, - except when the key cannot be read or the passphrase does not match. Please note that - this B(changed) for Ansible 2.10. For Ansible 2.9, the behavior was as if C(full_idempotence) - is specified. - - If set to C(never), the module will fail if the key cannot be read or the passphrase - isn't matching, and will never regenerate an existing key. - - If set to C(fail), the module will fail if the key does not correspond to the module's - options. - - If set to C(partial_idempotence), the key will be regenerated if it does not conform to - the module's options. The key is B(not) regenerated if it cannot be read (broken file), - the key is protected by an unknown passphrase, or when they key is not protected by a - passphrase, but a passphrase is specified. - - If set to C(full_idempotence), the key will be regenerated if it does not conform to the - module's options. This is also the case if the key cannot be read (broken file), the key - is protected by an unknown passphrase, or when they key is not protected by a passphrase, - but a passphrase is specified. Make sure you have a B(backup) when using this option! - - If set to C(always), the module will always regenerate the key. This is equivalent to - setting I(force) to C(yes). - - Note that if I(format_mismatch) is set to C(convert) and everything matches except the - format, the key will always be converted, except if I(regenerate) is set to C(always). - type: str - choices: - - never - - fail - - partial_idempotence - - full_idempotence - - always - default: full_idempotence version_added: '1.0.0' extends_documentation_fragment: -- files +- ansible.builtin.files +- community.crypto.module_privatekey seealso: -- module: community.crypto.x509_certificate -- module: community.crypto.openssl_csr -- module: community.crypto.openssl_dhparam -- module: community.crypto.openssl_pkcs12 -- module: community.crypto.openssl_publickey +- module: community.crypto.openssl_privatekey_pipe +- module: community.crypto.openssl_privatekey_info ''' EXAMPLES = r''' @@ -275,15 +140,10 @@ privatekey: version_added: '1.0.0' ''' -import abc -import base64 import os -import traceback -from distutils.version import LooseVersion - -from ansible.module_utils.basic import AnsibleModule, missing_required_lib -from ansible.module_utils._text import to_native, to_bytes +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils._text import to_native from ansible_collections.community.crypto.plugins.module_utils.io import ( load_file_if_exists, @@ -291,83 +151,32 @@ from ansible_collections.community.crypto.plugins.module_utils.io import ( ) from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( - CRYPTOGRAPHY_HAS_X25519, - CRYPTOGRAPHY_HAS_X25519_FULL, - CRYPTOGRAPHY_HAS_X448, - CRYPTOGRAPHY_HAS_ED25519, - CRYPTOGRAPHY_HAS_ED448, OpenSSLObjectError, - OpenSSLBadPassphraseError, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( OpenSSLObject, - load_privatekey, - get_fingerprint, - get_fingerprint_of_bytes, ) -from ansible_collections.community.crypto.plugins.module_utils.crypto.identify import ( - identify_private_key_format, +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.privatekey import ( + select_backend, + get_privatekey_argument_spec, ) -MINIMAL_PYOPENSSL_VERSION = '0.6' -MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3' -PYOPENSSL_IMP_ERR = None -try: - import OpenSSL - from OpenSSL import crypto - PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) -except ImportError: - PYOPENSSL_IMP_ERR = traceback.format_exc() - PYOPENSSL_FOUND = False -else: - PYOPENSSL_FOUND = True +class PrivateKeyModule(OpenSSLObject): -CRYPTOGRAPHY_IMP_ERR = None -try: - import cryptography - import cryptography.exceptions - import cryptography.hazmat.backends - import cryptography.hazmat.primitives.serialization - import cryptography.hazmat.primitives.asymmetric.rsa - import cryptography.hazmat.primitives.asymmetric.dsa - import cryptography.hazmat.primitives.asymmetric.ec - import cryptography.hazmat.primitives.asymmetric.utils - CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) -except ImportError: - CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() - CRYPTOGRAPHY_FOUND = False -else: - CRYPTOGRAPHY_FOUND = True - - -class PrivateKeyError(OpenSSLObjectError): - pass - - -class PrivateKeyBase(OpenSSLObject): - - def __init__(self, module): - super(PrivateKeyBase, self).__init__( + def __init__(self, module, module_backend): + super(PrivateKeyModule, self).__init__( module.params['path'], module.params['state'], module.params['force'], - module.check_mode + module.check_mode, ) - self.size = module.params['size'] - self.passphrase = module.params['passphrase'] - self.cipher = module.params['cipher'] - self.privatekey = None - self.fingerprint = {} - self.format = module.params['format'] - self.format_mismatch = module.params['format_mismatch'] - self.privatekey_bytes = None + self.module_backend = module_backend self.return_content = module.params['return_content'] - self.regenerate = module.params['regenerate'] - if self.regenerate == 'always': - self.force = True + if self.force: + module_backend.regenerate = 'always' self.backup = module.params['backup'] self.backup_file = None @@ -375,510 +184,68 @@ class PrivateKeyBase(OpenSSLObject): if module.params['mode'] is None: module.params['mode'] = '0600' - @abc.abstractmethod - def _generate_private_key(self): - """(Re-)Generate private key.""" - pass - - @abc.abstractmethod - def _ensure_private_key_loaded(self): - """Make sure that the private key has been loaded.""" - pass - - @abc.abstractmethod - def _get_private_key_data(self): - """Return bytes for self.privatekey""" - pass - - @abc.abstractmethod - def _get_fingerprint(self): - pass + module_backend.set_existing(load_file_if_exists(self.path, module)) def generate(self, module): """Generate a keypair.""" - if not self.check(module, perms_required=False, ignore_conversion=True) or self.force: + if self.module_backend.needs_regeneration(): # Regenerate - if self.backup: - self.backup_file = module.backup_local(self.path) - self._generate_private_key() - privatekey_data = self._get_private_key_data() - if self.return_content: - self.privatekey_bytes = privatekey_data - write_file(module, privatekey_data, 0o600) + if not self.check_mode: + if self.backup: + self.backup_file = module.backup_local(self.path) + self.module_backend.generate_private_key() + privatekey_data = self.module_backend.get_private_key_data() + if self.return_content: + self.privatekey_bytes = privatekey_data + write_file(module, privatekey_data, 0o600) self.changed = True - elif not self.check(module, perms_required=False, ignore_conversion=False): + elif self.module_backend.needs_conversion(): # Convert - if self.backup: - self.backup_file = module.backup_local(self.path) - self._ensure_private_key_loaded() - privatekey_data = self._get_private_key_data() - if self.return_content: - self.privatekey_bytes = privatekey_data - write_file(module, privatekey_data, 0o600) + if not self.check_mode: + if self.backup: + self.backup_file = module.backup_local(self.path) + self.module_backend.convert_private_key() + privatekey_data = self.module_backend.get_private_key_data() + if self.return_content: + self.privatekey_bytes = privatekey_data + write_file(module, privatekey_data, 0o600) self.changed = True - self.fingerprint = self._get_fingerprint() file_args = module.load_file_common_arguments(module.params) - if module.set_fs_attributes_if_different(file_args, False): - self.changed = True + self.changed = module.set_fs_attributes_if_different(file_args, self.changed) def remove(self, module): - if self.backup: + self.module_backend.set_existing(None) + if self.backup and not self.check_mode: self.backup_file = module.backup_local(self.path) - super(PrivateKeyBase, self).remove(module) - - @abc.abstractmethod - def _check_passphrase(self): - pass - - @abc.abstractmethod - def _check_size_and_type(self): - pass - - @abc.abstractmethod - def _check_format(self): - pass - - def check(self, module, perms_required=True, ignore_conversion=True): - """Ensure the resource is in its desired state.""" - - state_and_perms = super(PrivateKeyBase, self).check(module, perms_required=False) - - if not state_and_perms: - # key does not exist - return False - - if not self._check_passphrase(): - if self.regenerate in ('full_idempotence', 'always'): - return False - module.fail_json(msg='Unable to read the key. The key is protected with a another passphrase / no passphrase or broken.' - ' Will not proceed. To force regeneration, call the module with `generate`' - ' set to `full_idempotence` or `always`, or with `force=yes`.') - - if self.regenerate != 'never': - if not self._check_size_and_type(): - if self.regenerate in ('partial_idempotence', 'full_idempotence', 'always'): - return False - module.fail_json(msg='Key has wrong type and/or size.' - ' Will not proceed. To force regeneration, call the module with `generate`' - ' set to `partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`.') - - if not self._check_format(): - # During conversion step, convert if format does not match and format_mismatch == 'convert' - if not ignore_conversion and self.format_mismatch == 'convert': - return False - # During generation step, regenerate if format does not match and format_mismatch == 'regenerate' - if ignore_conversion and self.format_mismatch == 'regenerate' and self.regenerate != 'never': - if not ignore_conversion or self.regenerate in ('partial_idempotence', 'full_idempotence', 'always'): - return False - module.fail_json(msg='Key has wrong format.' - ' Will not proceed. To force regeneration, call the module with `generate`' - ' set to `partial_idempotence`, `full_idempotence` or `always`, or with `force=yes`.' - ' To convert the key, set `format_mismatch` to `convert`.') - - # check whether permissions are correct (in case that needs to be checked) - return not perms_required or super(PrivateKeyBase, self).check(module, perms_required=perms_required) + super(PrivateKeyModule, self).remove(module) def dump(self): """Serialize the object into a dictionary.""" - result = { - 'size': self.size, - 'filename': self.path, - 'changed': self.changed, - 'fingerprint': self.fingerprint, - } + result = self.module_backend.dump(include_key=self.return_content) + result['filename'] = self.path + result['changed'] = self.changed if self.backup_file: result['backup_file'] = self.backup_file - if self.return_content: - if self.privatekey_bytes is None: - self.privatekey_bytes = load_file_if_exists(self.path, ignore_errors=True) - if self.privatekey_bytes: - if identify_private_key_format(self.privatekey_bytes) == 'raw': - result['privatekey'] = base64.b64encode(self.privatekey_bytes) - else: - result['privatekey'] = self.privatekey_bytes.decode('utf-8') - else: - result['privatekey'] = None return result -# Implementation with using pyOpenSSL -class PrivateKeyPyOpenSSL(PrivateKeyBase): - - def __init__(self, module): - super(PrivateKeyPyOpenSSL, self).__init__(module) - - if module.params['type'] == 'RSA': - self.type = crypto.TYPE_RSA - elif module.params['type'] == 'DSA': - self.type = crypto.TYPE_DSA - else: - module.fail_json(msg="PyOpenSSL backend only supports RSA and DSA keys.") - - if self.format != 'auto_ignore': - module.fail_json(msg="PyOpenSSL backend only supports auto_ignore format.") - - def _generate_private_key(self): - """(Re-)Generate private key.""" - self.privatekey = crypto.PKey() - try: - self.privatekey.generate_key(self.type, self.size) - except (TypeError, ValueError) as exc: - raise PrivateKeyError(exc) - - def _ensure_private_key_loaded(self): - """Make sure that the private key has been loaded.""" - if self.privatekey is None: - try: - self.privatekey = privatekey = load_privatekey(self.path, self.passphrase) - except OpenSSLBadPassphraseError as exc: - raise PrivateKeyError(exc) - - def _get_private_key_data(self): - """Return bytes for self.privatekey""" - if self.cipher and self.passphrase: - return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.privatekey, - self.cipher, to_bytes(self.passphrase)) - else: - return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.privatekey) - - def _get_fingerprint(self): - return get_fingerprint(self.path, self.passphrase) - - def _check_passphrase(self): - try: - load_privatekey(self.path, self.passphrase) - return True - except Exception as dummy: - return False - - def _check_size_and_type(self): - def _check_size(privatekey): - return self.size == privatekey.bits() - - def _check_type(privatekey): - return self.type == privatekey.type() - - self._ensure_private_key_loaded() - return _check_size(self.privatekey) and _check_type(self.privatekey) - - def _check_format(self): - # Not supported by this backend - return True - - def dump(self): - """Serialize the object into a dictionary.""" - - result = super(PrivateKeyPyOpenSSL, self).dump() - - if self.type == crypto.TYPE_RSA: - result['type'] = 'RSA' - else: - result['type'] = 'DSA' - - return result - - -# Implementation with using cryptography -class PrivateKeyCryptography(PrivateKeyBase): - - def _get_ec_class(self, ectype): - ecclass = cryptography.hazmat.primitives.asymmetric.ec.__dict__.get(ectype) - if ecclass is None: - self.module.fail_json(msg='Your cryptography version does not support {0}'.format(ectype)) - return ecclass - - def _add_curve(self, name, ectype, deprecated=False): - def create(size): - ecclass = self._get_ec_class(ectype) - return ecclass() - - def verify(privatekey): - ecclass = self._get_ec_class(ectype) - return isinstance(privatekey.private_numbers().public_numbers.curve, ecclass) - - self.curves[name] = { - 'create': create, - 'verify': verify, - 'deprecated': deprecated, - } - - def __init__(self, module): - super(PrivateKeyCryptography, self).__init__(module) - - self.curves = dict() - self._add_curve('secp384r1', 'SECP384R1') - self._add_curve('secp521r1', 'SECP521R1') - self._add_curve('secp224r1', 'SECP224R1') - self._add_curve('secp192r1', 'SECP192R1') - self._add_curve('secp256r1', 'SECP256R1') - self._add_curve('secp256k1', 'SECP256K1') - self._add_curve('brainpoolP256r1', 'BrainpoolP256R1', deprecated=True) - self._add_curve('brainpoolP384r1', 'BrainpoolP384R1', deprecated=True) - self._add_curve('brainpoolP512r1', 'BrainpoolP512R1', deprecated=True) - self._add_curve('sect571k1', 'SECT571K1', deprecated=True) - self._add_curve('sect409k1', 'SECT409K1', deprecated=True) - self._add_curve('sect283k1', 'SECT283K1', deprecated=True) - self._add_curve('sect233k1', 'SECT233K1', deprecated=True) - self._add_curve('sect163k1', 'SECT163K1', deprecated=True) - self._add_curve('sect571r1', 'SECT571R1', deprecated=True) - self._add_curve('sect409r1', 'SECT409R1', deprecated=True) - self._add_curve('sect283r1', 'SECT283R1', deprecated=True) - self._add_curve('sect233r1', 'SECT233R1', deprecated=True) - self._add_curve('sect163r2', 'SECT163R2', deprecated=True) - - self.module = module - self.cryptography_backend = cryptography.hazmat.backends.default_backend() - - self.type = module.params['type'] - self.curve = module.params['curve'] - if not CRYPTOGRAPHY_HAS_X25519 and self.type == 'X25519': - self.module.fail_json(msg='Your cryptography version does not support X25519') - if not CRYPTOGRAPHY_HAS_X25519_FULL and self.type == 'X25519': - self.module.fail_json(msg='Your cryptography version does not support X25519 serialization') - if not CRYPTOGRAPHY_HAS_X448 and self.type == 'X448': - self.module.fail_json(msg='Your cryptography version does not support X448') - if not CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519': - self.module.fail_json(msg='Your cryptography version does not support Ed25519') - if not CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448': - self.module.fail_json(msg='Your cryptography version does not support Ed448') - - def _get_wanted_format(self): - if self.format not in ('auto', 'auto_ignore'): - return self.format - if self.type in ('X25519', 'X448', 'Ed25519', 'Ed448'): - return 'pkcs8' - else: - return 'pkcs1' - - def _generate_private_key(self): - """(Re-)Generate private key.""" - try: - if self.type == 'RSA': - self.privatekey = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key( - public_exponent=65537, # OpenSSL always uses this - key_size=self.size, - backend=self.cryptography_backend - ) - if self.type == 'DSA': - self.privatekey = cryptography.hazmat.primitives.asymmetric.dsa.generate_private_key( - key_size=self.size, - backend=self.cryptography_backend - ) - if CRYPTOGRAPHY_HAS_X25519_FULL and self.type == 'X25519': - self.privatekey = cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.generate() - if CRYPTOGRAPHY_HAS_X448 and self.type == 'X448': - self.privatekey = cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.generate() - if CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519': - self.privatekey = cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.generate() - if CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448': - self.privatekey = cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.generate() - if self.type == 'ECC' and self.curve in self.curves: - if self.curves[self.curve]['deprecated']: - self.module.warn('Elliptic curves of type {0} should not be used for new keys!'.format(self.curve)) - self.privatekey = cryptography.hazmat.primitives.asymmetric.ec.generate_private_key( - curve=self.curves[self.curve]['create'](self.size), - backend=self.cryptography_backend - ) - except cryptography.exceptions.UnsupportedAlgorithm as dummy: - self.module.fail_json(msg='Cryptography backend does not support the algorithm required for {0}'.format(self.type)) - - def _ensure_private_key_loaded(self): - """Make sure that the private key has been loaded.""" - if self.privatekey is None: - self.privatekey = self._load_privatekey() - - def _get_private_key_data(self): - """Return bytes for self.privatekey""" - # Select export format and encoding - try: - export_format = self._get_wanted_format() - export_encoding = cryptography.hazmat.primitives.serialization.Encoding.PEM - if export_format == 'pkcs1': - # "TraditionalOpenSSL" format is PKCS1 - export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.TraditionalOpenSSL - elif export_format == 'pkcs8': - export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8 - elif export_format == 'raw': - export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.Raw - export_encoding = cryptography.hazmat.primitives.serialization.Encoding.Raw - except AttributeError: - self.module.fail_json(msg='Cryptography backend does not support the selected output format "{0}"'.format(self.format)) - - # Select key encryption - encryption_algorithm = cryptography.hazmat.primitives.serialization.NoEncryption() - if self.cipher and self.passphrase: - if self.cipher == 'auto': - encryption_algorithm = cryptography.hazmat.primitives.serialization.BestAvailableEncryption(to_bytes(self.passphrase)) - else: - self.module.fail_json(msg='Cryptography backend can only use "auto" for cipher option.') - - # Serialize key - try: - return self.privatekey.private_bytes( - encoding=export_encoding, - format=export_format, - encryption_algorithm=encryption_algorithm - ) - except ValueError as dummy: - self.module.fail_json( - msg='Cryptography backend cannot serialize the private key in the required format "{0}"'.format(self.format) - ) - except Exception as dummy: - self.module.fail_json( - msg='Error while serializing the private key in the required format "{0}"'.format(self.format), - exception=traceback.format_exc() - ) - - def _load_privatekey(self): - try: - # Read bytes - with open(self.path, 'rb') as f: - data = f.read() - # Interpret bytes depending on format. - format = identify_private_key_format(data) - if format == 'raw': - if len(data) == 56 and CRYPTOGRAPHY_HAS_X448: - return cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.from_private_bytes(data) - if len(data) == 57 and CRYPTOGRAPHY_HAS_ED448: - return cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.from_private_bytes(data) - if len(data) == 32: - if CRYPTOGRAPHY_HAS_X25519 and (self.type == 'X25519' or not CRYPTOGRAPHY_HAS_ED25519): - return cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.from_private_bytes(data) - if CRYPTOGRAPHY_HAS_ED25519 and (self.type == 'Ed25519' or not CRYPTOGRAPHY_HAS_X25519): - return cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(data) - if CRYPTOGRAPHY_HAS_X25519 and CRYPTOGRAPHY_HAS_ED25519: - try: - return cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.from_private_bytes(data) - except Exception: - return cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(data) - raise PrivateKeyError('Cannot load raw key') - else: - return cryptography.hazmat.primitives.serialization.load_pem_private_key( - data, - None if self.passphrase is None else to_bytes(self.passphrase), - backend=self.cryptography_backend - ) - except Exception as e: - raise PrivateKeyError(e) - - def _get_fingerprint(self): - # Get bytes of public key - private_key = self._load_privatekey() - public_key = private_key.public_key() - public_key_bytes = public_key.public_bytes( - cryptography.hazmat.primitives.serialization.Encoding.DER, - cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo - ) - # Get fingerprints of public_key_bytes - return get_fingerprint_of_bytes(public_key_bytes) - - def _check_passphrase(self): - try: - with open(self.path, 'rb') as f: - data = f.read() - format = identify_private_key_format(data) - if format == 'raw': - # Raw keys cannot be encrypted. To avoid incompatibilities, we try to - # actually load the key (and return False when this fails). - self._load_privatekey() - # Loading the key succeeded. Only return True when no passphrase was - # provided. - return self.passphrase is None - else: - return cryptography.hazmat.primitives.serialization.load_pem_private_key( - data, - None if self.passphrase is None else to_bytes(self.passphrase), - backend=self.cryptography_backend - ) - except Exception as dummy: - return False - - def _check_size_and_type(self): - self._ensure_private_key_loaded() - - if isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey): - return self.type == 'RSA' and self.size == self.privatekey.key_size - if isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey): - return self.type == 'DSA' and self.size == self.privatekey.key_size - if CRYPTOGRAPHY_HAS_X25519 and isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey): - return self.type == 'X25519' - if CRYPTOGRAPHY_HAS_X448 and isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey): - return self.type == 'X448' - if CRYPTOGRAPHY_HAS_ED25519 and isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey): - return self.type == 'Ed25519' - if CRYPTOGRAPHY_HAS_ED448 and isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey): - return self.type == 'Ed448' - if isinstance(self.privatekey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey): - if self.type != 'ECC': - return False - if self.curve not in self.curves: - return False - return self.curves[self.curve]['verify'](self.privatekey) - - return False - - def _check_format(self): - if self.format == 'auto_ignore': - return True - try: - with open(self.path, 'rb') as f: - content = f.read() - format = identify_private_key_format(content) - return format == self._get_wanted_format() - except Exception as dummy: - return False - - def dump(self): - """Serialize the object into a dictionary.""" - result = super(PrivateKeyCryptography, self).dump() - result['type'] = self.type - if self.type == 'ECC': - result['curve'] = self.curve - return result - - def main(): - module = AnsibleModule( - argument_spec=dict( - state=dict(type='str', default='present', choices=['present', 'absent']), - size=dict(type='int', default=4096), - type=dict(type='str', default='RSA', choices=[ - 'DSA', 'ECC', 'Ed25519', 'Ed448', 'RSA', 'X25519', 'X448' - ]), - curve=dict(type='str', choices=[ - 'secp384r1', 'secp521r1', 'secp224r1', 'secp192r1', 'secp256r1', - 'secp256k1', 'brainpoolP256r1', 'brainpoolP384r1', 'brainpoolP512r1', - 'sect571k1', 'sect409k1', 'sect283k1', 'sect233k1', 'sect163k1', - 'sect571r1', 'sect409r1', 'sect283r1', 'sect233r1', 'sect163r2', - ]), - force=dict(type='bool', default=False), - path=dict(type='path', required=True), - passphrase=dict(type='str', no_log=True), - cipher=dict(type='str'), - backup=dict(type='bool', default=False), - format=dict(type='str', default='auto_ignore', choices=['pkcs1', 'pkcs8', 'raw', 'auto', 'auto_ignore']), - format_mismatch=dict(type='str', default='regenerate', choices=['regenerate', 'convert']), - select_crypto_backend=dict(type='str', choices=['auto', 'pyopenssl', 'cryptography'], default='auto'), - return_content=dict(type='bool', default=False), - regenerate=dict( - type='str', - default='full_idempotence', - choices=['never', 'fail', 'partial_idempotence', 'full_idempotence', 'always'] - ), - ), + argument_spec = get_privatekey_argument_spec() + argument_spec.argument_spec.update(dict( + state=dict(type='str', default='present', choices=['present', 'absent']), + force=dict(type='bool', default=False), + path=dict(type='path', required=True), + backup=dict(type='bool', default=False), + return_content=dict(type='bool', default=False), + )) + module = argument_spec.create_ansible_module( supports_check_mode=True, add_file_common_args=True, - required_together=[ - ['cipher', 'passphrase'] - ], - required_if=[ - ['type', 'ECC', ['curve']], - ], ) base_dir = os.path.dirname(module.params['path']) or '.' @@ -888,61 +255,17 @@ def main(): msg='The directory %s does not exist or the file is not a directory' % base_dir ) - backend = module.params['select_crypto_backend'] - if backend == 'auto': - # Detection what is possible - can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) - can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) + backend, module_backend = select_backend( + module=module, + backend=module.params['select_crypto_backend'], + ) - # Decision - if module.params['cipher'] and module.params['passphrase'] and module.params['cipher'] != 'auto': - # First try pyOpenSSL, then cryptography - if can_use_pyopenssl: - backend = 'pyopenssl' - elif can_use_cryptography: - backend = 'cryptography' - else: - # First try cryptography, then pyOpenSSL - if can_use_cryptography: - backend = 'cryptography' - elif can_use_pyopenssl: - backend = 'pyopenssl' - - # Success? - if backend == 'auto': - module.fail_json(msg=("Can't detect any of the required Python libraries " - "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( - MINIMAL_CRYPTOGRAPHY_VERSION, - MINIMAL_PYOPENSSL_VERSION)) try: - if backend == 'pyopenssl': - if not PYOPENSSL_FOUND: - module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)), - exception=PYOPENSSL_IMP_ERR) - module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', - version='2.0.0', collection_name='community.crypto') - private_key = PrivateKeyPyOpenSSL(module) - elif backend == 'cryptography': - if not CRYPTOGRAPHY_FOUND: - module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), - exception=CRYPTOGRAPHY_IMP_ERR) - private_key = PrivateKeyCryptography(module) + private_key = PrivateKeyModule(module, module_backend) if private_key.state == 'present': - if module.check_mode: - result = private_key.dump() - result['changed'] = private_key.force \ - or not private_key.check(module, ignore_conversion=True) \ - or not private_key.check(module, ignore_conversion=False) - module.exit_json(**result) - private_key.generate(module) else: - if module.check_mode: - result = private_key.dump() - result['changed'] = os.path.exists(module.params['path']) - module.exit_json(**result) - private_key.remove(module) result = private_key.dump() diff --git a/plugins/modules/openssl_privatekey_info.py b/plugins/modules/openssl_privatekey_info.py index c4401c81..5fe288ab 100644 --- a/plugins/modules/openssl_privatekey_info.py +++ b/plugins/modules/openssl_privatekey_info.py @@ -67,6 +67,7 @@ options: seealso: - module: community.crypto.openssl_privatekey +- module: community.crypto.openssl_privatekey_pipe ''' EXAMPLES = r''' diff --git a/plugins/modules/openssl_privatekey_pipe.py b/plugins/modules/openssl_privatekey_pipe.py new file mode 100644 index 00000000..87a89b7a --- /dev/null +++ b/plugins/modules/openssl_privatekey_pipe.py @@ -0,0 +1,112 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2020, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: openssl_privatekey_pipe +short_description: Generate OpenSSL private keys without disk access +version_added: 1.3.0 +description: + - This module allows one to (re)generate OpenSSL private keys without disk access. + - This allows to read and write keys to vaults without having to write intermediate versions to disk. + - Make sure to not write the result of this module into logs or to the console, as it contains private key data! Use the I(no_log) task option to be sure. + - Note that this module is implemented as an L(action plugin,https://docs.ansible.com/ansible/latest/plugins/action.html) + and will always be executed on the controller. +author: + - Yanis Guenane (@Spredzy) + - Felix Fontein (@felixfontein) +options: + content: + description: + - The current private key data. + - Needed for idempotency. If not provided, the module will always return a change, and all idempotence-related + options are ignored. + type: str + content_base64: + description: + - Set to C(true) if the content is base64 encoded. + type: bool + default: false + return_current_key: + description: + - Set to C(true) to return the current private key when the module did not generate a new one. + - Note that in case of check mode, when this option is not set to C(true), the module always returns the + current key (if it was provided) and Ansible will replace it by C(VALUE_SPECIFIED_IN_NO_LOG_PARAMETER). + type: bool + default: false +extends_documentation_fragment: +- community.crypto.module_privatekey +seealso: +- module: community.crypto.openssl_privatekey +- module: community.crypto.openssl_privatekey_info +''' + +EXAMPLES = r''' +- name: Generate an OpenSSL private key with the default values (4096 bits, RSA) + community.crypto.openssl_privatekey_pipe: + path: /etc/ssl/private/ansible.com.pem + register: output + no_log: true # make sure that private key data is not accidentally revealed in logs! +- name: Show generated key + debug: + msg: "{{ output.privatekey }}" + # DO NOT OUTPUT KEY MATERIAL TO CONSOLE OR LOGS IN PRODUCTION! + +- name: Update sops-encrypted key with the community.sops collection + community.crypto.openssl_privatekey_pipe: + content: "{{ lookup('community.sops.sops', 'private_key.pem.sops') }}" + size: 2048 + register: output + no_log: true # make sure that private key data is not accidentally revealed in logs! +- name: Update encrypted key when openssl_privatekey_pipe reported a change + community.sops.encrypt_sops: + path: private_key.pem.sops + content_text: output.privatekey + when: output is changed +''' + +RETURN = r''' +size: + description: Size (in bits) of the TLS/SSL private key. + returned: changed or success + type: int + sample: 4096 +type: + description: Algorithm used to generate the TLS/SSL private key. + returned: changed or success + type: str + sample: RSA +curve: + description: Elliptic curve used to generate the TLS/SSL private key. + returned: changed or success, and I(type) is C(ECC) + type: str + sample: secp256r1 +fingerprint: + description: + - The fingerprint of the public key. Fingerprint will be generated for each C(hashlib.algorithms) available. + - The PyOpenSSL backend requires PyOpenSSL >= 16.0 for meaningful output. + returned: changed or success + type: dict + sample: + md5: "84:75:71:72:8d:04:b5:6c:4d:37:6d:66:83:f5:4c:29" + sha1: "51:cc:7c:68:5d:eb:41:43:88:7e:1a:ae:c7:f8:24:72:ee:71:f6:10" + sha224: "b1:19:a6:6c:14:ac:33:1d:ed:18:50:d3:06:5c:b2:32:91:f1:f1:52:8c:cb:d5:75:e9:f5:9b:46" + sha256: "41:ab:c7:cb:d5:5f:30:60:46:99:ac:d4:00:70:cf:a1:76:4f:24:5d:10:24:57:5d:51:6e:09:97:df:2f:de:c7" + sha384: "85:39:50:4e:de:d9:19:33:40:70:ae:10:ab:59:24:19:51:c3:a2:e4:0b:1c:b1:6e:dd:b3:0c:d9:9e:6a:46:af:da:18:f8:ef:ae:2e:c0:9a:75:2c:9b:b3:0f:3a:5f:3d" + sha512: "fd:ed:5e:39:48:5f:9f:fe:7f:25:06:3f:79:08:cd:ee:a5:e7:b3:3d:13:82:87:1f:84:e1:f5:c7:28:77:53:94:86:56:38:69:f0:d9:35:22:01:1e:a6:60:...:0f:9b" +privatekey: + description: + - The generated private key's content. + - Please note that if the result is not changed, the current private key will only be returned + if the I(return_current_key) option is set to C(true). + - Will be Base64-encoded if the key is in raw format. + returned: changed, or I(return_current_key) is C(true) + type: str +''' diff --git a/tests/integration/targets/openssl_privatekey_pipe/aliases b/tests/integration/targets/openssl_privatekey_pipe/aliases new file mode 100644 index 00000000..6eae8bd8 --- /dev/null +++ b/tests/integration/targets/openssl_privatekey_pipe/aliases @@ -0,0 +1,2 @@ +shippable/posix/group1 +destructive diff --git a/tests/integration/targets/openssl_privatekey_pipe/meta/main.yml b/tests/integration/targets/openssl_privatekey_pipe/meta/main.yml new file mode 100644 index 00000000..800aff64 --- /dev/null +++ b/tests/integration/targets/openssl_privatekey_pipe/meta/main.yml @@ -0,0 +1,2 @@ +dependencies: + - setup_openssl diff --git a/tests/integration/targets/openssl_privatekey_pipe/tasks/impl.yml b/tests/integration/targets/openssl_privatekey_pipe/tasks/impl.yml new file mode 100644 index 00000000..8b1db479 --- /dev/null +++ b/tests/integration/targets/openssl_privatekey_pipe/tasks/impl.yml @@ -0,0 +1,103 @@ +--- +- name: ({{select_crypto_backend}}) Create key + openssl_privatekey_pipe: + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: ({{select_crypto_backend}}) Get key info + openssl_privatekey_info: + content: "{{ result.privatekey }}" + register: result_info + +- assert: + that: + - result is changed + - result.privatekey.startswith('----') + - result_info.type == 'RSA' + - result_info.public_data.size == 4096 + - result_info.public_data.exponent >= 5 + +- assert: + that: + - result_info.public_key_fingerprints.sha256 | length > 10 + - result.fingerprint.sha256 == result_info.public_key_fingerprints.sha256 + when: result.fingerprint is not none + +- name: ({{select_crypto_backend}}) Update key (check mode) + openssl_privatekey_pipe: + select_crypto_backend: '{{ select_crypto_backend }}' + content: "{{ result.privatekey }}" + size: 2048 + register: update_check + check_mode: true + +- name: ({{select_crypto_backend}}) Update key (check mode, with return_current_key=true) + openssl_privatekey_pipe: + select_crypto_backend: '{{ select_crypto_backend }}' + content: "{{ result.privatekey }}" + size: 2048 + return_current_key: true + register: update_check_return + check_mode: true + +- name: ({{select_crypto_backend}}) Update key + openssl_privatekey_pipe: + select_crypto_backend: '{{ select_crypto_backend }}' + content: "{{ result.privatekey }}" + size: 2048 + register: update + +- name: ({{select_crypto_backend}}) Update key (idempotent, check mode) + openssl_privatekey_pipe: + select_crypto_backend: '{{ select_crypto_backend }}' + content: "{{ update.privatekey }}" + size: 2048 + register: update_idempotent_check + check_mode: true + +- name: ({{select_crypto_backend}}) Update key (idempotent) + openssl_privatekey_pipe: + select_crypto_backend: '{{ select_crypto_backend }}' + content: "{{ update.privatekey }}" + size: 2048 + register: update_idempotent + +- name: ({{select_crypto_backend}}) Update key (idempotent, check mode, with return_current_key=true) + openssl_privatekey_pipe: + select_crypto_backend: '{{ select_crypto_backend }}' + content: "{{ update.privatekey }}" + size: 2048 + return_current_key: true + register: update_idempotent_return_check + check_mode: true + +- name: ({{select_crypto_backend}}) Update key (idempotent, with return_current_key=true) + openssl_privatekey_pipe: + select_crypto_backend: '{{ select_crypto_backend }}' + content: "{{ update.privatekey }}" + size: 2048 + return_current_key: true + register: update_idempotent_return + +- name: ({{select_crypto_backend}}) Get key info + openssl_privatekey_info: + content: "{{ update.privatekey }}" + register: update_info + +- assert: + that: + - update_check is changed + - update_check.privatekey == 'VALUE_SPECIFIED_IN_NO_LOG_PARAMETER' + - update_check_return is changed + - update_check_return.privatekey == result.privatekey + - update is changed + - update.privatekey != result.privatekey + - update_info.public_data.size == 2048 + - update_idempotent_check is not changed + - update_idempotent_check.privatekey is undefined + - update_idempotent is not changed + - update_idempotent.privatekey is undefined + - update_idempotent_return_check is not changed + - update_idempotent_return_check.privatekey == update.privatekey + - update_idempotent_return is not changed + - update_idempotent_return.privatekey == update.privatekey diff --git a/tests/integration/targets/openssl_privatekey_pipe/tasks/main.yml b/tests/integration/targets/openssl_privatekey_pipe/tasks/main.yml new file mode 100644 index 00000000..316212b3 --- /dev/null +++ b/tests/integration/targets/openssl_privatekey_pipe/tasks/main.yml @@ -0,0 +1,35 @@ +--- +#################################################################### +# WARNING: These are designed specifically for Ansible tests # +# and should not be used as examples of how to write Ansible roles # +#################################################################### + +- name: Run module with backend autodetection + openssl_privatekey_pipe: + +- block: + - name: Running tests with pyOpenSSL backend + include_tasks: impl.yml + vars: + select_crypto_backend: pyopenssl + + # FIXME: minimal pyOpenSSL version?! + when: pyopenssl_version.stdout is version('0.6', '>=') + +- name: Remove output directory + file: + path: "{{ output_dir }}" + state: absent + +- name: Re-create output directory + file: + path: "{{ output_dir }}" + state: directory + +- block: + - name: Running tests with cryptography backend + include_tasks: impl.yml + vars: + select_crypto_backend: cryptography + + when: cryptography_version.stdout is version('0.5', '>=') diff --git a/tests/sanity/ignore-2.10.txt b/tests/sanity/ignore-2.10.txt index 43335011..a517bd54 100644 --- a/tests/sanity/ignore-2.10.txt +++ b/tests/sanity/ignore-2.10.txt @@ -1,3 +1,11 @@ +plugins/module_utils/action_module.py import-2.6!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-2.7!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.5!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.6!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.7!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.8!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.9!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py pylint:ansible-bad-module-import # This is not supposed to be included by modules, but by action plugins plugins/module_utils/compat/ipaddress.py future-import-boilerplate plugins/module_utils/compat/ipaddress.py metaclass-boilerplate plugins/module_utils/compat/ipaddress.py no-assert diff --git a/tests/sanity/ignore-2.11.txt b/tests/sanity/ignore-2.11.txt index 43335011..a517bd54 100644 --- a/tests/sanity/ignore-2.11.txt +++ b/tests/sanity/ignore-2.11.txt @@ -1,3 +1,11 @@ +plugins/module_utils/action_module.py import-2.6!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-2.7!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.5!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.6!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.7!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.8!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.9!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py pylint:ansible-bad-module-import # This is not supposed to be included by modules, but by action plugins plugins/module_utils/compat/ipaddress.py future-import-boilerplate plugins/module_utils/compat/ipaddress.py metaclass-boilerplate plugins/module_utils/compat/ipaddress.py no-assert diff --git a/tests/sanity/ignore-2.9.txt b/tests/sanity/ignore-2.9.txt index f12ed5f7..353f11c0 100644 --- a/tests/sanity/ignore-2.9.txt +++ b/tests/sanity/ignore-2.9.txt @@ -1,3 +1,10 @@ +plugins/module_utils/action_module.py import-2.6!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-2.7!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.5!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.6!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.7!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py import-3.8!skip # This is not supposed to be included by modules, but by action plugins +plugins/module_utils/action_module.py pylint:ansible-bad-module-import # This is not supposed to be included by modules, but by action plugins plugins/module_utils/compat/ipaddress.py future-import-boilerplate plugins/module_utils/compat/ipaddress.py metaclass-boilerplate plugins/module_utils/compat/ipaddress.py no-assert