community.crypto/plugins/modules/openssl_dhparam.py

434 lines
14 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (c) 2017, Thom Wiggers <ansible@thomwiggers.nl>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = r'''
---
module: openssl_dhparam
short_description: Generate OpenSSL Diffie-Hellman Parameters
description:
- This module allows one to (re)generate OpenSSL DH-params.
- This module uses file common arguments to specify generated file permissions.
- "Please note that the module regenerates existing DH params if they do not
match the module's options. If you are concerned that this could overwrite
your existing DH params, consider using the O(backup) option."
- The module can use the cryptography Python library, or the C(openssl) executable.
By default, it tries to detect which one is available. This can be overridden
with the O(select_crypto_backend) option.
requirements:
- Either cryptography >= 2.0
- Or OpenSSL binary C(openssl)
author:
- Thom Wiggers (@thomwiggers)
extends_documentation_fragment:
- ansible.builtin.files
- community.crypto.attributes
- community.crypto.attributes.files
attributes:
check_mode:
support: full
diff_mode:
support: none
safe_file_operations:
support: full
options:
state:
description:
- Whether the parameters should exist or not,
taking action if the state is different from what is stated.
type: str
default: present
choices: [ absent, present ]
size:
description:
- Size (in bits) of the generated DH-params.
type: int
default: 4096
force:
description:
- Should the parameters be regenerated even it it already exists.
type: bool
default: false
path:
description:
- Name of the file in which the generated parameters will be saved.
type: path
required: true
backup:
description:
- Create a backup file including a timestamp so you can get the original
DH params back if you overwrote them with new ones by accident.
type: bool
default: false
select_crypto_backend:
description:
- Determines which crypto backend to use.
- The default choice is V(auto), which tries to use C(cryptography) if available, and falls back to C(openssl).
- If set to V(openssl), will try to use the OpenSSL C(openssl) executable.
- If set to V(cryptography), will try to use the L(cryptography,https://cryptography.io/) library.
type: str
default: auto
choices: [ auto, cryptography, openssl ]
version_added: "1.0.0"
return_content:
description:
- If set to V(true), will return the (current or generated) DH parameter's content as RV(dhparams).
type: bool
default: false
version_added: "1.0.0"
seealso:
- module: community.crypto.x509_certificate
- module: community.crypto.openssl_csr
- module: community.crypto.openssl_pkcs12
- module: community.crypto.openssl_privatekey
- module: community.crypto.openssl_publickey
'''
EXAMPLES = r'''
- name: Generate Diffie-Hellman parameters with the default size (4096 bits)
community.crypto.openssl_dhparam:
path: /etc/ssl/dhparams.pem
- name: Generate DH Parameters with a different size (2048 bits)
community.crypto.openssl_dhparam:
path: /etc/ssl/dhparams.pem
size: 2048
- name: Force regenerate an DH parameters if they already exist
community.crypto.openssl_dhparam:
path: /etc/ssl/dhparams.pem
force: true
'''
RETURN = r'''
size:
description: Size (in bits) of the Diffie-Hellman parameters.
returned: changed or success
type: int
sample: 4096
filename:
description: Path to the generated Diffie-Hellman parameters.
returned: changed or success
type: str
sample: /etc/ssl/dhparams.pem
backup_file:
description: Name of backup file created.
returned: changed and if O(backup) is V(true)
type: str
sample: /path/to/dhparams.pem.2019-03-09@11:22~
dhparams:
description: The (current or generated) DH params' content.
returned: if O(state) is V(present) and O(return_content) is V(true)
type: str
version_added: "1.0.0"
'''
import abc
import os
import re
import tempfile
import traceback
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion
from ansible_collections.community.crypto.plugins.module_utils.io import (
load_file_if_exists,
write_file,
)
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import (
count_bits,
)
MINIMAL_CRYPTOGRAPHY_VERSION = '2.0'
CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
import cryptography.exceptions
import cryptography.hazmat.backends
import cryptography.hazmat.primitives.asymmetric.dh
import cryptography.hazmat.primitives.serialization
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
else:
CRYPTOGRAPHY_FOUND = True
class DHParameterError(Exception):
pass
class DHParameterBase(object):
def __init__(self, module):
self.state = module.params['state']
self.path = module.params['path']
self.size = module.params['size']
self.force = module.params['force']
self.changed = False
self.return_content = module.params['return_content']
self.backup = module.params['backup']
self.backup_file = None
@abc.abstractmethod
def _do_generate(self, module):
"""Actually generate the DH params."""
pass
def generate(self, module):
"""Generate DH params."""
changed = False
# only generate when necessary
if self.force or not self._check_params_valid(module):
self._do_generate(module)
changed = True
# fix permissions (checking force not necessary as done above)
if not self._check_fs_attributes(module):
# Fix done implicitly by
# AnsibleModule.set_fs_attributes_if_different
changed = True
self.changed = changed
def remove(self, module):
if self.backup:
self.backup_file = module.backup_local(self.path)
try:
os.remove(self.path)
self.changed = True
except OSError as exc:
module.fail_json(msg=to_native(exc))
def check(self, module):
"""Ensure the resource is in its desired state."""
if self.force:
return False
return self._check_params_valid(module) and self._check_fs_attributes(module)
@abc.abstractmethod
def _check_params_valid(self, module):
"""Check if the params are in the correct state"""
pass
def _check_fs_attributes(self, module):
"""Checks (and changes if not in check mode!) fs attributes"""
file_args = module.load_file_common_arguments(module.params)
if module.check_file_absent_if_check_mode(file_args['path']):
return False
return not module.set_fs_attributes_if_different(file_args, False)
def dump(self):
"""Serialize the object into a dictionary."""
result = {
'size': self.size,
'filename': self.path,
'changed': self.changed,
}
if self.backup_file:
result['backup_file'] = self.backup_file
if self.return_content:
content = load_file_if_exists(self.path, ignore_errors=True)
result['dhparams'] = content.decode('utf-8') if content else None
return result
class DHParameterAbsent(DHParameterBase):
def __init__(self, module):
super(DHParameterAbsent, self).__init__(module)
def _do_generate(self, module):
"""Actually generate the DH params."""
pass
def _check_params_valid(self, module):
"""Check if the params are in the correct state"""
pass
class DHParameterOpenSSL(DHParameterBase):
def __init__(self, module):
super(DHParameterOpenSSL, self).__init__(module)
self.openssl_bin = module.get_bin_path('openssl', True)
def _do_generate(self, module):
"""Actually generate the DH params."""
# create a tempfile
fd, tmpsrc = tempfile.mkstemp()
os.close(fd)
module.add_cleanup_file(tmpsrc) # Ansible will delete the file on exit
# openssl dhparam -out <path> <bits>
command = [self.openssl_bin, 'dhparam', '-out', tmpsrc, str(self.size)]
rc, dummy, err = module.run_command(command, check_rc=False)
if rc != 0:
raise DHParameterError(to_native(err))
if self.backup:
self.backup_file = module.backup_local(self.path)
try:
module.atomic_move(os.path.abspath(tmpsrc), os.path.abspath(self.path))
except Exception as e:
module.fail_json(msg="Failed to write to file %s: %s" % (self.path, str(e)))
def _check_params_valid(self, module):
"""Check if the params are in the correct state"""
command = [self.openssl_bin, 'dhparam', '-check', '-text', '-noout', '-in', self.path]
rc, out, err = module.run_command(command, check_rc=False)
result = to_native(out)
if rc != 0:
# If the call failed the file probably does not exist or is
# unreadable
return False
# output contains "(xxxx bit)"
match = re.search(r"Parameters:\s+\((\d+) bit\).*", result)
if not match:
return False # No "xxxx bit" in output
bits = int(match.group(1))
# if output contains "WARNING" we've got a problem
if "WARNING" in result or "WARNING" in to_native(err):
return False
return bits == self.size
class DHParameterCryptography(DHParameterBase):
def __init__(self, module):
super(DHParameterCryptography, self).__init__(module)
self.crypto_backend = cryptography.hazmat.backends.default_backend()
def _do_generate(self, module):
"""Actually generate the DH params."""
# Generate parameters
params = cryptography.hazmat.primitives.asymmetric.dh.generate_parameters(
generator=2,
key_size=self.size,
backend=self.crypto_backend,
)
# Serialize parameters
result = params.parameter_bytes(
encoding=cryptography.hazmat.primitives.serialization.Encoding.PEM,
format=cryptography.hazmat.primitives.serialization.ParameterFormat.PKCS3,
)
# Write result
if self.backup:
self.backup_file = module.backup_local(self.path)
write_file(module, result)
def _check_params_valid(self, module):
"""Check if the params are in the correct state"""
# Load parameters
try:
with open(self.path, 'rb') as f:
data = f.read()
params = cryptography.hazmat.primitives.serialization.load_pem_parameters(data, backend=self.crypto_backend)
except Exception as dummy:
return False
# Check parameters
bits = count_bits(params.parameter_numbers().p)
return bits == self.size
def main():
"""Main function"""
module = AnsibleModule(
argument_spec=dict(
state=dict(type='str', default='present', choices=['absent', 'present']),
size=dict(type='int', default=4096),
force=dict(type='bool', default=False),
path=dict(type='path', required=True),
backup=dict(type='bool', default=False),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'openssl']),
return_content=dict(type='bool', default=False),
),
supports_check_mode=True,
add_file_common_args=True,
)
base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir):
module.fail_json(
name=base_dir,
msg="The directory '%s' does not exist or the file is not a directory" % base_dir
)
if module.params['state'] == 'present':
backend = module.params['select_crypto_backend']
if backend == 'auto':
# Detection what is possible
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
can_use_openssl = module.get_bin_path('openssl', False) is not None
# First try cryptography, then OpenSSL
if can_use_cryptography:
backend = 'cryptography'
elif can_use_openssl:
backend = 'openssl'
# Success?
if backend == 'auto':
module.fail_json(msg=("Cannot detect either the required Python library cryptography (>= {0}) "
"or the OpenSSL binary openssl").format(MINIMAL_CRYPTOGRAPHY_VERSION))
if backend == 'openssl':
dhparam = DHParameterOpenSSL(module)
elif backend == 'cryptography':
if not CRYPTOGRAPHY_FOUND:
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
exception=CRYPTOGRAPHY_IMP_ERR)
dhparam = DHParameterCryptography(module)
else:
raise AssertionError('Internal error: unknown backend')
if module.check_mode:
result = dhparam.dump()
result['changed'] = module.params['force'] or not dhparam.check(module)
module.exit_json(**result)
try:
dhparam.generate(module)
except DHParameterError as exc:
module.fail_json(msg=to_native(exc))
else:
dhparam = DHParameterAbsent(module)
if module.check_mode:
result = dhparam.dump()
result['changed'] = os.path.exists(module.params['path'])
module.exit_json(**result)
if os.path.exists(module.params['path']):
try:
dhparam.remove(module)
except Exception as exc:
module.fail_json(msg=to_native(exc))
result = dhparam.dump()
module.exit_json(**result)
if __name__ == '__main__':
main()