From 128991c3dc9bf3956a9490722a79301cc83a8df3 Mon Sep 17 00:00:00 2001 From: Patrick Pichler Date: Sat, 6 Jun 2020 10:23:29 +0200 Subject: [PATCH] Add openssl_signature module (#63) --- plugins/modules/openssl_signature.py | 346 +++++++++++++++++++++++++++ 1 file changed, 346 insertions(+) create mode 100644 plugins/modules/openssl_signature.py diff --git a/plugins/modules/openssl_signature.py b/plugins/modules/openssl_signature.py new file mode 100644 index 00000000..4e92f686 --- /dev/null +++ b/plugins/modules/openssl_signature.py @@ -0,0 +1,346 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2016, Yanis Guenane +# Copyright: (c) 2019, Patrick Pichler +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +ANSIBLE_METADATA = {'metadata_version': '1.1'} + +DOCUMENTATION = r''' +--- +module: openssl_signature +short_description: Sign and verify data with openssl +description: This module allows one to sign and verify data via certificate and private key +requirements: + - Either cryptography >= 1.2.3 (older versions might work as well) + - Or pyOpenSSL +author: + - Patrick Pichler (@aveexy) +options: + action: + description: Action to be executed + type: str + required: true + choices: [ sign, verify ] + + private_key: + description: Private key required for sign action + type: path + + certificate: + description: Certificate required for verify action + type: path + + passphrase: + description: Passphrase for private_key + type: str + + path: + description: file to sign/verify + type: path + required: true + + signature: + description: base64 encoded signature required for verify action + type: str + + select_crypto_backend: + description: + - Determines which crypto backend to use. + - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl). + - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library. + - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + type: str + default: auto + choices: [ auto, cryptography, pyopenssl ] +''' + +EXAMPLES = r''' +- name: Sign example file + openssl_signature: + action: sign + private_key: private.key + path: /tmp/example_file + register: sig + +- name: Verify signature of example file + openssl_signature: + action: verify + certificate: cert.pem + path: /tmp/example_file + signature: sig.signature +''' + +RETURN = r''' +signature: + description: base64 encoded signature + returned: changed or success + type: str +''' + +import os +import traceback +from distutils.version import LooseVersion +import base64 + +MINIMAL_PYOPENSSL_VERSION = '0.6' +MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3' + +PYOPENSSL_IMP_ERR = None +try: + import OpenSSL + from OpenSSL import crypto + + PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) +except ImportError: + PYOPENSSL_IMP_ERR = traceback.format_exc() + PYOPENSSL_FOUND = False +else: + PYOPENSSL_FOUND = True + +CRYPTOGRAPHY_IMP_ERR = None +try: + import cryptography + import cryptography.hazmat.primitives.asymmetric.rsa + import cryptography.hazmat.primitives.asymmetric.dsa + import cryptography.hazmat.primitives.asymmetric.ec + import cryptography.hazmat.primitives.asymmetric.padding + import cryptography.hazmat.primitives.hashes + + CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) +except ImportError: + CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() + CRYPTOGRAPHY_FOUND = False +else: + CRYPTOGRAPHY_FOUND = True + +from ansible.module_utils import crypto as crypto_utils +from ansible.module_utils._text import to_native, to_bytes +from ansible.module_utils.basic import AnsibleModule, missing_required_lib + + +class SignatureBase(crypto_utils.OpenSSLObject): + + def __init__(self, module, backend): + super(SignatureBase, self).__init__( + module.params['path'], + 'present', + False, + module.check_mode + ) + + self.backend = backend + + self.action = module.params['action'] + self.signature = module.params['signature'] + self.passphrase = module.params['passphrase'] + self.private_key = module.params['private_key'] + self.certificate = module.params['certificate'] + + def generate(self): + # Empty method because crypto_utils.OpenSSLObject wants this + pass + + def dump(self): + # Empty method because crypto_utils.OpenSSLObject wants this + pass + + +# Implementation with using pyOpenSSL +class SignaturePyOpenSSL(SignatureBase): + + def __init__(self, module, backend): + super(SignaturePyOpenSSL, self).__init__(module, backend) + + def run(self): + try: + result = dict() + + with open(self.path, "rb") as f: + _in = f.read() + + if self.action == "verify": + _signature = base64.b64decode(self.signature) + certificate = crypto_utils.load_certificate(self.certificate, backend=self.backend) + + try: + OpenSSL.crypto.verify(certificate, _signature, _in, 'sha256') + except Exception: + self.module.fail_json( + msg="Verification failed" + ) + + elif self.action == "sign": + private_key = crypto_utils.load_privatekey( + self.private_key, + None if self.passphrase is None else to_bytes(self.passphrase), + backend=self.backend + ) + + out = OpenSSL.crypto.sign(private_key, _in, "sha256") + result['signature'] = base64.b64encode(out) + + return result + except Exception as e: + raise crypto_utils.OpenSSLObjectError(e) + + +# Implementation with using cryptography +class SignatureCryptography(SignatureBase): + + def __init__(self, module, backend): + super(SignatureCryptography, self).__init__(module, backend) + + def run(self): + _padding = cryptography.hazmat.primitives.asymmetric.padding.PKCS1v15() + _hash = cryptography.hazmat.primitives.hashes.SHA256() + + result = dict() + + try: + with open(self.path, "rb") as f: + _in = f.read() + + if self.action == "verify": + _signature = base64.b64decode(self.signature) + public_key = crypto_utils.load_certificate(self.certificate, backend=self.backend).public_key() + + try: + if isinstance(public_key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey): + public_key.verify(_signature, _in, _padding, _hash) + + elif isinstance(public_key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey): + public_key.verify(_signature, _in, cryptography.hazmat.primitives.asymmetric.ec.ECDSA(_hash)) + + elif (isinstance(public_key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey) or + isinstance(public_key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey)): + public_key.verify(_signature, _in) + + else: + self.module.fail_json( + msg="Unsupported algorithm" + ) + + except Exception: + self.module.fail_json( + msg="Verification failed" + ) + + elif self.action == "sign": + private_key = crypto_utils.load_privatekey( + self.private_key, + None if self.passphrase is None else to_bytes(self.passphrase), + backend=self.backend + ) + + if isinstance(private_key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey): + out = private_key.sign(_in, _padding, _hash) + + elif isinstance(private_key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey): + out = private_key.sign(_in, cryptography.hazmat.primitives.asymmetric.ec.ECDSA(_hash)) + + elif (isinstance(private_key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey) or + isinstance(private_key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey)): + out = private_key.sign(_in) + + else: + self.module.fail_json( + msg="Unsupported algorithm" + ) + + result['signature'] = base64.b64encode(out) + + return result + + except Exception as e: + raise crypto_utils.OpenSSLObjectError(e) + + +def main(): + module = AnsibleModule( + argument_spec=dict( + action=dict(type='str', choices=[ + 'sign', 'verify' + ]), + private_key=dict(type='path'), + certificate=dict(type='path'), + passphrase=dict(type='str', no_log=True), + path=dict(type='path', required=True), + signature=dict(type='path'), + select_crypto_backend=dict(type='str', choices=['auto', 'pyopenssl', 'cryptography'], default='auto'), + ), + supports_check_mode=False, + ) + + if module.params['private_key'] is not None and module.params['certificate'] is not None: + module.fail_json( + msg="private_key and certificate are mutually exclusive" + ) + + if module.params['private_key'] is None and module.params['action'] == "sign": + module.fail_json( + msg="Private key missing" + ) + + if module.params['certificate'] is None and module.params['action'] == "verify": + module.fail_json( + msg="Certificate missing" + ) + + if module.params['action'] == "verify" and module.params['signature'] is None: + module.fail_json( + msg="Can't verify without a signature" + ) + + if not os.path.isfile(module.params['path']): + module.fail_json( + name=module.params['path'], + msg='The file {0} does not exist'.format(module.params['path']) + ) + + backend = module.params['select_crypto_backend'] + if backend == 'auto': + # Detection what is possible + can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) + can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) + + # Decision + if can_use_cryptography: + backend = 'cryptography' + elif can_use_pyopenssl: + backend = 'pyopenssl' + + # Success? + if backend == 'auto': + module.fail_json(msg=("Can't detect any of the required Python libraries " + "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( + MINIMAL_CRYPTOGRAPHY_VERSION, + MINIMAL_PYOPENSSL_VERSION)) + try: + if backend == 'pyopenssl': + if not PYOPENSSL_FOUND: + module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)), + exception=PYOPENSSL_IMP_ERR) + module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', + version='2.13') + _sign = SignaturePyOpenSSL(module, backend) + elif backend == 'cryptography': + if not CRYPTOGRAPHY_FOUND: + module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), + exception=CRYPTOGRAPHY_IMP_ERR) + _sign = SignatureCryptography(module, backend) + + result = _sign.run() + + module.exit_json(**result) + except crypto_utils.OpenSSLObjectError as exc: + module.fail_json(msg=to_native(exc)) + + +if __name__ == '__main__': + main()