From 7298c1f49ad89457b3127bd427b2cdeeac33c938 Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Tue, 18 May 2021 17:47:10 +0200 Subject: [PATCH] Add openssl_publickey_info module (#231) * Add openssl_publickey_info module. Share code between openssl_privatekey_info and the new module, and improve documentation of it. * Move public key loading to support module. * Require pyOpenSSL 16.0.0 for public key loading. * Linting. * Apply suggestions from code review Co-authored-by: Ajpantuso Co-authored-by: Ajpantuso --- .../crypto/module_backends/privatekey_info.py | 121 ++----- .../crypto/module_backends/publickey_info.py | 322 ++++++++++++++++++ plugins/module_utils/crypto/support.py | 22 ++ plugins/modules/openssl_privatekey_info.py | 56 +++ plugins/modules/openssl_publickey_info.py | 219 ++++++++++++ .../targets/openssl_publickey_info/aliases | 2 + .../openssl_publickey_info/meta/main.yml | 3 + .../openssl_publickey_info/tasks/impl.yml | 110 ++++++ .../openssl_publickey_info/tasks/main.yml | 79 +++++ 9 files changed, 845 insertions(+), 89 deletions(-) create mode 100644 plugins/module_utils/crypto/module_backends/publickey_info.py create mode 100644 plugins/modules/openssl_publickey_info.py create mode 100644 tests/integration/targets/openssl_publickey_info/aliases create mode 100644 tests/integration/targets/openssl_publickey_info/meta/main.yml create mode 100644 tests/integration/targets/openssl_publickey_info/tasks/impl.yml create mode 100644 tests/integration/targets/openssl_publickey_info/tasks/main.yml diff --git a/plugins/module_utils/crypto/module_backends/privatekey_info.py b/plugins/module_utils/crypto/module_backends/privatekey_info.py index 457aa74f..95c683e2 100644 --- a/plugins/module_utils/crypto/module_backends/privatekey_info.py +++ b/plugins/module_utils/crypto/module_backends/privatekey_info.py @@ -19,8 +19,6 @@ from ansible.module_utils.basic import missing_required_lib from ansible.module_utils._text import to_native, to_bytes from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( - CRYPTOGRAPHY_HAS_X25519, - CRYPTOGRAPHY_HAS_X448, CRYPTOGRAPHY_HAS_ED25519, CRYPTOGRAPHY_HAS_ED448, OpenSSLObjectError, @@ -36,6 +34,12 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.math impor quick_is_not_prime, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.publickey_info import ( + _get_cryptography_public_key_info, + _bigint_to_int, + _get_pyopenssl_public_key_info, +) + MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3' MINIMAL_PYOPENSSL_VERSION = '0.15' @@ -65,42 +69,20 @@ else: SIGNATURE_TEST_DATA = b'1234' -def _get_cryptography_key_info(key): - key_public_data = dict() +def _get_cryptography_private_key_info(key): + key_type, key_public_data = _get_cryptography_public_key_info(key.public_key()) key_private_data = dict() if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey): - key_type = 'RSA' - key_public_data['size'] = key.key_size - key_public_data['modulus'] = key.public_key().public_numbers().n - key_public_data['exponent'] = key.public_key().public_numbers().e - key_private_data['p'] = key.private_numbers().p - key_private_data['q'] = key.private_numbers().q - key_private_data['exponent'] = key.private_numbers().d + private_numbers = key.private_numbers() + key_private_data['p'] = private_numbers.p + key_private_data['q'] = private_numbers.q + key_private_data['exponent'] = private_numbers.d elif isinstance(key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey): - key_type = 'DSA' - key_public_data['size'] = key.key_size - key_public_data['p'] = key.parameters().parameter_numbers().p - key_public_data['q'] = key.parameters().parameter_numbers().q - key_public_data['g'] = key.parameters().parameter_numbers().g - key_public_data['y'] = key.public_key().public_numbers().y - key_private_data['x'] = key.private_numbers().x - elif CRYPTOGRAPHY_HAS_X25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey): - key_type = 'X25519' - elif CRYPTOGRAPHY_HAS_X448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey): - key_type = 'X448' - elif CRYPTOGRAPHY_HAS_ED25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey): - key_type = 'Ed25519' - elif CRYPTOGRAPHY_HAS_ED448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey): - key_type = 'Ed448' + private_numbers = key.private_numbers() + key_private_data['x'] = private_numbers.x elif isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey): - key_type = 'ECC' - key_public_data['curve'] = key.public_key().curve.name - key_public_data['x'] = key.public_key().public_numbers().x - key_public_data['y'] = key.public_key().public_numbers().y - key_public_data['exponent_size'] = key.public_key().curve.key_size - key_private_data['multiplier'] = key.private_numbers().private_value - else: - key_type = 'unknown ({0})'.format(type(key)) + private_numbers = key.private_numbers() + key_private_data['multiplier'] = private_numbers.private_value return key_type, key_public_data, key_private_data @@ -276,7 +258,7 @@ class PrivateKeyInfoRetrievalCryptography(PrivateKeyInfoRetrieval): ) def _get_key_info(self): - return _get_cryptography_key_info(self.key) + return _get_cryptography_private_key_info(self.key) def _is_key_consistent(self, key_public_data, key_private_data): return _is_cryptography_key_consistent(self.key, key_public_data, key_private_data) @@ -309,25 +291,11 @@ class PrivateKeyInfoRetrievalPyOpenSSL(PrivateKeyInfoRetrieval): self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' 'Please upgrade to version 16.0 or newer, or use the cryptography backend.') - def bigint_to_int(self, bn): - '''Convert OpenSSL BIGINT to Python integer''' - if bn == OpenSSL._util.ffi.NULL: - return None - hexstr = OpenSSL._util.lib.BN_bn2hex(bn) - try: - return int(OpenSSL._util.ffi.string(hexstr), 16) - finally: - OpenSSL._util.lib.OPENSSL_free(hexstr) - def _get_key_info(self): - key_public_data = dict() + key_type, key_public_data, try_fallback = _get_pyopenssl_public_key_info(self.key) key_private_data = dict() openssl_key_type = self.key.type() - try_fallback = True if crypto.TYPE_RSA == openssl_key_type: - key_type = 'RSA' - key_public_data['size'] = self.key.bits() - try: # Use OpenSSL directly to extract key data key = OpenSSL._util.lib.EVP_PKEY_get1_RSA(self.key._pkey) @@ -345,31 +313,22 @@ class PrivateKeyInfoRetrievalPyOpenSSL(PrivateKeyInfoRetrieval): e = OpenSSL._util.ffi.new("BIGNUM **") d = OpenSSL._util.ffi.new("BIGNUM **") OpenSSL._util.lib.RSA_get0_key(key, n, e, d) - key_public_data['modulus'] = self.bigint_to_int(n[0]) - key_public_data['exponent'] = self.bigint_to_int(e[0]) - key_private_data['exponent'] = self.bigint_to_int(d[0]) + key_private_data['exponent'] = _bigint_to_int(d[0]) # Get factors p = OpenSSL._util.ffi.new("BIGNUM **") q = OpenSSL._util.ffi.new("BIGNUM **") OpenSSL._util.lib.RSA_get0_factors(key, p, q) - key_private_data['p'] = self.bigint_to_int(p[0]) - key_private_data['q'] = self.bigint_to_int(q[0]) + key_private_data['p'] = _bigint_to_int(p[0]) + key_private_data['q'] = _bigint_to_int(q[0]) else: - # Get modulus and exponents - key_public_data['modulus'] = self.bigint_to_int(key.n) - key_public_data['exponent'] = self.bigint_to_int(key.e) - key_private_data['exponent'] = self.bigint_to_int(key.d) + # Get private exponent + key_private_data['exponent'] = _bigint_to_int(key.d) # Get factors - key_private_data['p'] = self.bigint_to_int(key.p) - key_private_data['q'] = self.bigint_to_int(key.q) - try_fallback = False + key_private_data['p'] = _bigint_to_int(key.p) + key_private_data['q'] = _bigint_to_int(key.q) except AttributeError: - # Use fallback if available - pass + try_fallback = True elif crypto.TYPE_DSA == openssl_key_type: - key_type = 'DSA' - key_public_data['size'] = self.key.bits() - try: # Use OpenSSL directly to extract key data key = OpenSSL._util.lib.EVP_PKEY_get1_DSA(self.key._pkey) @@ -382,38 +341,22 @@ class PrivateKeyInfoRetrievalPyOpenSSL(PrivateKeyInfoRetrieval): # 1.1 and later, and directly access the values for 1.0.x and # earlier. if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: - # Get public parameters (primes and group element) - p = OpenSSL._util.ffi.new("BIGNUM **") - q = OpenSSL._util.ffi.new("BIGNUM **") - g = OpenSSL._util.ffi.new("BIGNUM **") - OpenSSL._util.lib.DSA_get0_pqg(key, p, q, g) - key_public_data['p'] = self.bigint_to_int(p[0]) - key_public_data['q'] = self.bigint_to_int(q[0]) - key_public_data['g'] = self.bigint_to_int(g[0]) - # Get public and private key exponents + # Get private key exponents y = OpenSSL._util.ffi.new("BIGNUM **") x = OpenSSL._util.ffi.new("BIGNUM **") OpenSSL._util.lib.DSA_get0_key(key, y, x) - key_public_data['y'] = self.bigint_to_int(y[0]) - key_private_data['x'] = self.bigint_to_int(x[0]) + key_private_data['x'] = _bigint_to_int(x[0]) else: - # Get public parameters (primes and group element) - key_public_data['p'] = self.bigint_to_int(key.p) - key_public_data['q'] = self.bigint_to_int(key.q) - key_public_data['g'] = self.bigint_to_int(key.g) - # Get public and private key exponents - key_public_data['y'] = self.bigint_to_int(key.pub_key) - key_private_data['x'] = self.bigint_to_int(key.priv_key) - try_fallback = False + # Get private key exponents + key_private_data['x'] = _bigint_to_int(key.priv_key) except AttributeError: - # Use fallback if available - pass + try_fallback = True else: # Return 'unknown' key_type = 'unknown ({0})'.format(self.key.type()) # If needed and if possible, fall back to cryptography if try_fallback and PYOPENSSL_VERSION >= LooseVersion('16.1.0') and CRYPTOGRAPHY_FOUND: - return _get_cryptography_key_info(self.key.to_cryptography_key()) + return _get_cryptography_private_key_info(self.key.to_cryptography_key()) return key_type, key_public_data, key_private_data def _is_key_consistent(self, key_public_data, key_private_data): diff --git a/plugins/module_utils/crypto/module_backends/publickey_info.py b/plugins/module_utils/crypto/module_backends/publickey_info.py new file mode 100644 index 00000000..8b85dc8c --- /dev/null +++ b/plugins/module_utils/crypto/module_backends/publickey_info.py @@ -0,0 +1,322 @@ +# -*- coding: utf-8 -*- +# +# Copyright: (c) 2020-2021, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +import abc +import traceback + +from distutils.version import LooseVersion + +from ansible.module_utils import six +from ansible.module_utils.basic import missing_required_lib +from ansible.module_utils._text import to_native + +from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( + CRYPTOGRAPHY_HAS_X25519, + CRYPTOGRAPHY_HAS_X448, + CRYPTOGRAPHY_HAS_ED25519, + CRYPTOGRAPHY_HAS_ED448, + OpenSSLObjectError, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + get_fingerprint_of_bytes, + load_publickey, +) + + +MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3' +MINIMAL_PYOPENSSL_VERSION = '16.0.0' # when working with public key objects, the minimal required version is 0.15 + +PYOPENSSL_IMP_ERR = None +try: + import OpenSSL + from OpenSSL import crypto + PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__) +except ImportError: + PYOPENSSL_IMP_ERR = traceback.format_exc() + PYOPENSSL_FOUND = False +else: + PYOPENSSL_FOUND = True + +CRYPTOGRAPHY_IMP_ERR = None +try: + import cryptography + from cryptography.hazmat.primitives import serialization + CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__) +except ImportError: + CRYPTOGRAPHY_IMP_ERR = traceback.format_exc() + CRYPTOGRAPHY_FOUND = False +else: + CRYPTOGRAPHY_FOUND = True + + +def _get_cryptography_public_key_info(key): + key_public_data = dict() + if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey): + key_type = 'RSA' + public_numbers = key.public_numbers() + key_public_data['size'] = key.key_size + key_public_data['modulus'] = public_numbers.n + key_public_data['exponent'] = public_numbers.e + elif isinstance(key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey): + key_type = 'DSA' + parameter_numbers = key.parameters().parameter_numbers() + public_numbers = key.public_numbers() + key_public_data['size'] = key.key_size + key_public_data['p'] = parameter_numbers.p + key_public_data['q'] = parameter_numbers.q + key_public_data['g'] = parameter_numbers.g + key_public_data['y'] = public_numbers.y + elif CRYPTOGRAPHY_HAS_X25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey): + key_type = 'X25519' + elif CRYPTOGRAPHY_HAS_X448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey): + key_type = 'X448' + elif CRYPTOGRAPHY_HAS_ED25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey): + key_type = 'Ed25519' + elif CRYPTOGRAPHY_HAS_ED448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey): + key_type = 'Ed448' + elif isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey): + key_type = 'ECC' + public_numbers = key.public_numbers() + key_public_data['curve'] = key.curve.name + key_public_data['x'] = public_numbers.x + key_public_data['y'] = public_numbers.y + key_public_data['exponent_size'] = key.curve.key_size + else: + key_type = 'unknown ({0})'.format(type(key)) + return key_type, key_public_data + + +def _bigint_to_int(bn): + '''Convert OpenSSL BIGINT to Python integer''' + if bn == OpenSSL._util.ffi.NULL: + return None + hexstr = OpenSSL._util.lib.BN_bn2hex(bn) + try: + return int(OpenSSL._util.ffi.string(hexstr), 16) + finally: + OpenSSL._util.lib.OPENSSL_free(hexstr) + + +def _get_pyopenssl_public_key_info(key): + key_public_data = dict() + try_fallback = True + openssl_key_type = key.type() + if crypto.TYPE_RSA == openssl_key_type: + key_type = 'RSA' + key_public_data['size'] = key.bits() + + try: + # Use OpenSSL directly to extract key data + key = OpenSSL._util.lib.EVP_PKEY_get1_RSA(key._pkey) + key = OpenSSL._util.ffi.gc(key, OpenSSL._util.lib.RSA_free) + # OpenSSL 1.1 and newer have functions to extract the parameters + # from the EVP PKEY data structures. Older versions didn't have + # these getters, and it was common use to simply access the values + # directly. Since there's no guarantee that these data structures + # will still be accessible in the future, we use the getters for + # 1.1 and later, and directly access the values for 1.0.x and + # earlier. + if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: + # Get modulus and exponents + n = OpenSSL._util.ffi.new("BIGNUM **") + e = OpenSSL._util.ffi.new("BIGNUM **") + d = OpenSSL._util.ffi.new("BIGNUM **") + OpenSSL._util.lib.RSA_get0_key(key, n, e, d) + key_public_data['modulus'] = _bigint_to_int(n[0]) + key_public_data['exponent'] = _bigint_to_int(e[0]) + else: + # Get modulus and exponents + key_public_data['modulus'] = _bigint_to_int(key.n) + key_public_data['exponent'] = _bigint_to_int(key.e) + try_fallback = False + except AttributeError: + # Use fallback if available + pass + elif crypto.TYPE_DSA == openssl_key_type: + key_type = 'DSA' + key_public_data['size'] = key.bits() + + try: + # Use OpenSSL directly to extract key data + key = OpenSSL._util.lib.EVP_PKEY_get1_DSA(key._pkey) + key = OpenSSL._util.ffi.gc(key, OpenSSL._util.lib.DSA_free) + # OpenSSL 1.1 and newer have functions to extract the parameters + # from the EVP PKEY data structures. Older versions didn't have + # these getters, and it was common use to simply access the values + # directly. Since there's no guarantee that these data structures + # will still be accessible in the future, we use the getters for + # 1.1 and later, and directly access the values for 1.0.x and + # earlier. + if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000: + # Get public parameters (primes and group element) + p = OpenSSL._util.ffi.new("BIGNUM **") + q = OpenSSL._util.ffi.new("BIGNUM **") + g = OpenSSL._util.ffi.new("BIGNUM **") + OpenSSL._util.lib.DSA_get0_pqg(key, p, q, g) + key_public_data['p'] = _bigint_to_int(p[0]) + key_public_data['q'] = _bigint_to_int(q[0]) + key_public_data['g'] = _bigint_to_int(g[0]) + # Get public key exponents + y = OpenSSL._util.ffi.new("BIGNUM **") + x = OpenSSL._util.ffi.new("BIGNUM **") + OpenSSL._util.lib.DSA_get0_key(key, y, x) + key_public_data['y'] = _bigint_to_int(y[0]) + else: + # Get public parameters (primes and group element) + key_public_data['p'] = _bigint_to_int(key.p) + key_public_data['q'] = _bigint_to_int(key.q) + key_public_data['g'] = _bigint_to_int(key.g) + # Get public key exponents + key_public_data['y'] = _bigint_to_int(key.pub_key) + try_fallback = False + except AttributeError: + # Use fallback if available + pass + else: + # Return 'unknown' + key_type = 'unknown ({0})'.format(key.type()) + return key_type, key_public_data, try_fallback + + +class PublicKeyParseError(OpenSSLObjectError): + def __init__(self, msg, result): + super(PublicKeyParseError, self).__init__(msg) + self.error_message = msg + self.result = result + + +@six.add_metaclass(abc.ABCMeta) +class PublicKeyInfoRetrieval(object): + def __init__(self, module, backend, content=None, key=None): + # content must be a bytes string + self.module = module + self.backend = backend + self.content = content + self.key = key + + @abc.abstractmethod + def _get_public_key(self, binary): + pass + + @abc.abstractmethod + def _get_key_info(self): + pass + + def get_info(self): + result = dict( + can_parse_key=False, + key_is_consistent=None, + ) + if self.key is None: + try: + self.key = load_publickey(content=self.content, backend=self.backend) + except OpenSSLObjectError as e: + raise PublicKeyParseError(to_native(e)) + + pk = self._get_public_key(binary=True) + result['fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict() + + key_type, key_public_data = self._get_key_info() + result['type'] = key_type + result['public_data'] = key_public_data + return result + + +class PublicKeyInfoRetrievalCryptography(PublicKeyInfoRetrieval): + """Validate the supplied public key, using the cryptography backend""" + def __init__(self, module, content=None, key=None): + super(PublicKeyInfoRetrievalCryptography, self).__init__(module, 'cryptography', content=content, key=key) + + def _get_public_key(self, binary): + return self.key.public_bytes( + serialization.Encoding.DER if binary else serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo + ) + + def _get_key_info(self): + return _get_cryptography_public_key_info(self.key) + + +class PublicKeyInfoRetrievalPyOpenSSL(PublicKeyInfoRetrieval): + """validate the supplied public key.""" + + def __init__(self, module, content=None, key=None): + super(PublicKeyInfoRetrievalPyOpenSSL, self).__init__(module, 'pyopenssl', content=content, key=key) + + def _get_public_key(self, binary): + try: + return crypto.dump_publickey( + crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM, + self.key + ) + except AttributeError: + try: + # pyOpenSSL < 16.0: + bio = crypto._new_mem_buf() + if binary: + rc = crypto._lib.i2d_PUBKEY_bio(bio, self.key._pkey) + else: + rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.key._pkey) + if rc != 1: + crypto._raise_current_error() + return crypto._bio_to_string(bio) + except AttributeError: + self.module.warn('Your pyOpenSSL version does not support dumping public keys. ' + 'Please upgrade to version 16.0 or newer, or use the cryptography backend.') + + def _get_key_info(self): + key_type, key_public_data, try_fallback = _get_pyopenssl_public_key_info(self.key) + # If needed and if possible, fall back to cryptography + if try_fallback and PYOPENSSL_VERSION >= LooseVersion('16.1.0') and CRYPTOGRAPHY_FOUND: + return _get_cryptography_public_key_info(self.key.to_cryptography_key()) + return key_type, key_public_data + + +def get_publickey_info(module, backend, content=None, key=None): + if backend == 'cryptography': + info = PublicKeyInfoRetrievalCryptography(module, content=content, key=key) + elif backend == 'pyopenssl': + info = PublicKeyInfoRetrievalPyOpenSSL(module, content=content, key=key) + return info.get_info() + + +def select_backend(module, backend, content=None, key=None): + if backend == 'auto': + # Detection what is possible + can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION) + can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION) + + # First try cryptography, then pyOpenSSL + if can_use_cryptography: + backend = 'cryptography' + elif can_use_pyopenssl: + backend = 'pyopenssl' + + # Success? + if backend == 'auto': + module.fail_json(msg=("Can't detect any of the required Python libraries " + "cryptography (>= {0}) or PyOpenSSL (>= {1})").format( + MINIMAL_CRYPTOGRAPHY_VERSION, + MINIMAL_PYOPENSSL_VERSION)) + + if backend == 'pyopenssl': + if not PYOPENSSL_FOUND: + module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)), + exception=PYOPENSSL_IMP_ERR) + module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated', + version='2.0.0', collection_name='community.crypto') + return backend, PublicKeyInfoRetrievalPyOpenSSL(module, content=content, key=key) + elif backend == 'cryptography': + if not CRYPTOGRAPHY_FOUND: + module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)), + exception=CRYPTOGRAPHY_IMP_ERR) + return backend, PublicKeyInfoRetrievalCryptography(module, content=content, key=key) + else: + raise ValueError('Unsupported value for backend: {0}'.format(backend)) diff --git a/plugins/module_utils/crypto/support.py b/plugins/module_utils/crypto/support.py index 7850c6bb..355786da 100644 --- a/plugins/module_utils/crypto/support.py +++ b/plugins/module_utils/crypto/support.py @@ -183,6 +183,28 @@ def load_privatekey(path, passphrase=None, check_passphrase=True, content=None, return result +def load_publickey(path=None, content=None, backend=None): + if content is None: + if path is None: + raise OpenSSLObjectError('Must provide either path or content') + try: + with open(path, 'rb') as b_priv_key_fh: + content = b_priv_key_fh.read() + except (IOError, OSError) as exc: + raise OpenSSLObjectError(exc) + + if backend == 'cryptography': + try: + return serialization.load_pem_public_key(content, backend=cryptography_backend()) + except Exception as e: + raise OpenSSLObjectError('Error while deserializing key: {0}'.format(e)) + else: + try: + return crypto.load_publickey(crypto.FILETYPE_PEM, content) + except crypto.Error as e: + raise OpenSSLObjectError('Error while deserializing key: {0}'.format(e)) + + def load_certificate(path, content=None, backend='pyopenssl'): """Load the specified certificate.""" diff --git a/plugins/modules/openssl_privatekey_info.py b/plugins/modules/openssl_privatekey_info.py index 371ca9fd..41570aaa 100644 --- a/plugins/modules/openssl_privatekey_info.py +++ b/plugins/modules/openssl_privatekey_info.py @@ -130,6 +130,62 @@ public_data: - Public key data. Depends on key type. returned: success type: dict + contains: + size: + description: + - Bit size of modulus (RSA) or prime number (DSA). + type: int + returned: When C(type=RSA) or C(type=DSA) + modulus: + description: + - The RSA key's modulus. + type: int + returned: When C(type=RSA) + exponent: + description: + - The RSA key's public exponent. + type: int + returned: When C(type=RSA) + p: + description: + - The C(p) value for DSA. + - This is the prime modulus upon which arithmetic takes place. + type: int + returned: When C(type=DSA) + q: + description: + - The C(q) value for DSA. + - This is a prime that divides C(p - 1), and at the same time the order of the subgroup of the + multiplicative group of the prime field used. + type: int + returned: When C(type=DSA) + g: + description: + - The C(g) value for DSA. + - This is the element spanning the subgroup of the multiplicative group of the prime field used. + type: int + returned: When C(type=DSA) + curve: + description: + - The curve's name for ECC. + type: str + returned: When C(type=ECC) + exponent_size: + description: + - The maximum number of bits of a private key. This is basically the bit size of the subgroup used. + type: int + returned: When C(type=ECC) + x: + description: + - The C(x) coordinate for the public point on the elliptic curve. + type: int + returned: When C(type=ECC) + y: + description: + - For C(type=ECC), this is the C(y) coordinate for the public point on the elliptic curve. + - For C(type=DSA), this is the publicly known group element whose discrete logarithm w.r.t. C(g) is the private key. + type: int + returned: When C(type=DSA) or C(type=ECC) private_data: description: - Private key data. Depends on key type. diff --git a/plugins/modules/openssl_publickey_info.py b/plugins/modules/openssl_publickey_info.py new file mode 100644 index 00000000..0e293b57 --- /dev/null +++ b/plugins/modules/openssl_publickey_info.py @@ -0,0 +1,219 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2021, Felix Fontein +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function +__metaclass__ = type + + +DOCUMENTATION = r''' +--- +module: openssl_publickey_info +short_description: Provide information for OpenSSL public keys +description: + - This module allows one to query information on OpenSSL public keys. + - It uses the pyOpenSSL or cryptography python library to interact with OpenSSL. If both the + cryptography and PyOpenSSL libraries are available (and meet the minimum version requirements) + cryptography will be preferred as a backend over PyOpenSSL (unless the backend is forced with + C(select_crypto_backend)). Please note that the PyOpenSSL backend was deprecated in Ansible 2.9 + and will be removed in community.crypto 2.0.0. +version_added: 1.7.0 +requirements: + - PyOpenSSL >= 0.15 or cryptography >= 1.2.3 +author: + - Felix Fontein (@felixfontein) +options: + path: + description: + - Remote absolute path where the public key file is loaded from. + type: path + content: + description: + - Content of the public key file. + - Either I(path) or I(content) must be specified, but not both. + type: str + + select_crypto_backend: + description: + - Determines which crypto backend to use. + - The default choice is C(auto), which tries to use C(cryptography) if available, and falls back to C(pyopenssl). + - If set to C(pyopenssl), will try to use the L(pyOpenSSL,https://pypi.org/project/pyOpenSSL/) library. + - If set to C(cryptography), will try to use the L(cryptography,https://cryptography.io/) library. + - Please note that the C(pyopenssl) backend has been deprecated in Ansible 2.9, and will be removed in community.crypto 2.0.0. + From that point on, only the C(cryptography) backend will be available. + type: str + default: auto + choices: [ auto, cryptography, pyopenssl ] + +notes: +- Supports C(check_mode). + +seealso: +- module: community.crypto.openssl_publickey +- module: community.crypto.openssl_privatekey_info +''' + +EXAMPLES = r''' +- name: Generate an OpenSSL private key with the default values (4096 bits, RSA) + community.crypto.openssl_privatekey: + path: /etc/ssl/private/ansible.com.pem + +- name: Create public key from private key + community.crypto.openssl_privatekey: + privatekey_path: /etc/ssl/private/ansible.com.pem + path: /etc/ssl/ansible.com.pub + +- name: Get information on public key + community.crypto.openssl_publickey_info: + path: /etc/ssl/ansible.com.pub + register: result + +- name: Dump information + ansible.builtin.debug: + var: result +''' + +RETURN = r''' +fingerprints: + description: + - Fingerprints of public key. + - For every hash algorithm available, the fingerprint is computed. + returned: success + type: dict + sample: "{'sha256': 'd4:b3:aa:6d:c8:04:ce:4e:ba:f6:29:4d:92:a3:94:b0:c2:ff:bd:bf:33:63:11:43:34:0f:51:b0:95:09:2f:63', + 'sha512': 'f7:07:4a:f0:b0:f0:e6:8b:95:5f:f9:e6:61:0a:32:68:f1..." +type: + description: + - The key's type. + - One of C(RSA), C(DSA), C(ECC), C(Ed25519), C(X25519), C(Ed448), or C(X448). + - Will start with C(unknown) if the key type cannot be determined. + returned: success + type: str + sample: RSA +public_data: + description: + - Public key data. Depends on key type. + returned: success + type: dict + contains: + size: + description: + - Bit size of modulus (RSA) or prime number (DSA). + type: int + returned: When C(type=RSA) or C(type=DSA) + modulus: + description: + - The RSA key's modulus. + type: int + returned: When C(type=RSA) + exponent: + description: + - The RSA key's public exponent. + type: int + returned: When C(type=RSA) + p: + description: + - The C(p) value for DSA. + - This is the prime modulus upon which arithmetic takes place. + type: int + returned: When C(type=DSA) + q: + description: + - The C(q) value for DSA. + - This is a prime that divides C(p - 1), and at the same time the order of the subgroup of the + multiplicative group of the prime field used. + type: int + returned: When C(type=DSA) + g: + description: + - The C(g) value for DSA. + - This is the element spanning the subgroup of the multiplicative group of the prime field used. + type: int + returned: When C(type=DSA) + curve: + description: + - The curve's name for ECC. + type: str + returned: When C(type=ECC) + exponent_size: + description: + - The maximum number of bits of a private key. This is basically the bit size of the subgroup used. + type: int + returned: When C(type=ECC) + x: + description: + - The C(x) coordinate for the public point on the elliptic curve. + type: int + returned: When C(type=ECC) + y: + description: + - For C(type=ECC), this is the C(y) coordinate for the public point on the elliptic curve. + - For C(type=DSA), this is the publicly known group element whose discrete logarithm w.r.t. C(g) is the private key. + type: int + returned: When C(type=DSA) or C(type=ECC) +''' + + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils._text import to_native + +from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import ( + OpenSSLObjectError, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.publickey_info import ( + PublicKeyParseError, + select_backend, +) + + +def main(): + module = AnsibleModule( + argument_spec=dict( + path=dict(type='path'), + content=dict(type='str', no_log=True), + select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']), + ), + required_one_of=( + ['path', 'content'], + ), + mutually_exclusive=( + ['path', 'content'], + ), + supports_check_mode=True, + ) + + result = dict( + can_load_key=False, + can_parse_key=False, + key_is_consistent=None, + ) + + if module.params['content'] is not None: + data = module.params['content'].encode('utf-8') + else: + try: + with open(module.params['path'], 'rb') as f: + data = f.read() + except (IOError, OSError) as e: + module.fail_json(msg='Error while reading public key file from disk: {0}'.format(e), **result) + + backend, module_backend = select_backend( + module, + module.params['select_crypto_backend'], + data) + + try: + result.update(module_backend.get_info()) + module.exit_json(**result) + except PublicKeyParseError as exc: + result.update(exc.result) + module.fail_json(msg=exc.error_message, **result) + except OpenSSLObjectError as exc: + module.fail_json(msg=to_native(exc)) + + +if __name__ == "__main__": + main() diff --git a/tests/integration/targets/openssl_publickey_info/aliases b/tests/integration/targets/openssl_publickey_info/aliases new file mode 100644 index 00000000..6eae8bd8 --- /dev/null +++ b/tests/integration/targets/openssl_publickey_info/aliases @@ -0,0 +1,2 @@ +shippable/posix/group1 +destructive diff --git a/tests/integration/targets/openssl_publickey_info/meta/main.yml b/tests/integration/targets/openssl_publickey_info/meta/main.yml new file mode 100644 index 00000000..d1a318db --- /dev/null +++ b/tests/integration/targets/openssl_publickey_info/meta/main.yml @@ -0,0 +1,3 @@ +dependencies: + - setup_openssl + - setup_pyopenssl diff --git a/tests/integration/targets/openssl_publickey_info/tasks/impl.yml b/tests/integration/targets/openssl_publickey_info/tasks/impl.yml new file mode 100644 index 00000000..07b3bd2e --- /dev/null +++ b/tests/integration/targets/openssl_publickey_info/tasks/impl.yml @@ -0,0 +1,110 @@ +--- +- debug: + msg: "Executing tests with backend {{ select_crypto_backend }}" + +- name: ({{select_crypto_backend}}) Get key 1 info + openssl_publickey_info: + path: '{{ output_dir }}/publickey_1.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: Check that RSA key info is ok + assert: + that: + - "'fingerprints' in result" + - "'type' in result" + - "result.type == 'RSA'" + - "'public_data' in result" + - "2 ** (result.public_data.size - 1) < result.public_data.modulus < 2 ** result.public_data.size" + - "result.public_data.exponent > 5" + +- name: Update result list + set_fact: + info_results: "{{ info_results | combine({'key1': result}) }}" + +- name: ({{select_crypto_backend}}) Get key 1 info directly + openssl_publickey_info: + content: '{{ lookup("file", output_dir ~ "/publickey_1.pem") }}' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result_direct + +- name: ({{select_crypto_backend}}) Compare output of direct and loaded info + assert: + that: + - result == result_direct + +- name: ({{select_crypto_backend}}) Get key 2 info + openssl_publickey_info: + path: '{{ output_dir }}/publickey_2.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: Check that RSA key info is ok + assert: + that: + - "'fingerprints' in result" + - "'type' in result" + - "result.type == 'RSA'" + - "'public_data' in result" + - "result.public_data.size == default_rsa_key_size" + - "2 ** (result.public_data.size - 1) < result.public_data.modulus < 2 ** result.public_data.size" + - "result.public_data.exponent > 5" + +- name: Update result list + set_fact: + info_results: "{{ info_results | combine({'key2': result}) }}" + +- name: ({{select_crypto_backend}}) Get key 3 info + openssl_publickey_info: + path: '{{ output_dir }}/publickey_3.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- block: + - name: Check that ECC key info is ok + assert: + that: + - "'fingerprints' in result" + - "'type' in result" + - "result.type == 'ECC'" + - "'public_data' in result" + - "result.public_data.curve is string" + - "result.public_data.x != 0" + - "result.public_data.y != 0" + - "result.public_data.exponent_size == (521 if (ansible_distribution == 'CentOS' and ansible_distribution_major_version == '6') else 256)" + + - name: Update result list + set_fact: + info_results: "{{ info_results | combine({'key3': result}) }}" + when: select_crypto_backend != 'pyopenssl' or (pyopenssl_version.stdout is version('16.1.0', '>=') and cryptography_version.stdout is version('0.0', '>')) + +- name: Check that ECC key info is ok + assert: + that: + - "'fingerprints' in result" + - "'type' in result" + - "result.type.startswith('unknown ')" + - "'public_data' in result" + when: select_crypto_backend == 'pyopenssl' and not (pyopenssl_version.stdout is version('16.1.0', '>=') and cryptography_version.stdout is version('0.0', '>')) + +- name: ({{select_crypto_backend}}) Get key 4 info + openssl_publickey_info: + path: '{{ output_dir }}/publickey_4.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + register: result + +- name: Check that DSA key info is ok + assert: + that: + - "'fingerprints' in result" + - "'type' in result" + - "result.type == 'DSA'" + - "'public_data' in result" + - "result.public_data.p > 2" + - "result.public_data.q > 2" + - "result.public_data.g >= 2" + - "result.public_data.y > 2" + +- name: Update result list + set_fact: + info_results: "{{ info_results | combine({'key4': result}) }}" diff --git a/tests/integration/targets/openssl_publickey_info/tasks/main.yml b/tests/integration/targets/openssl_publickey_info/tasks/main.yml new file mode 100644 index 00000000..08586899 --- /dev/null +++ b/tests/integration/targets/openssl_publickey_info/tasks/main.yml @@ -0,0 +1,79 @@ +--- +#################################################################### +# WARNING: These are designed specifically for Ansible tests # +# and should not be used as examples of how to write Ansible roles # +#################################################################### + +- name: Generate privatekey 1 + openssl_privatekey: + path: '{{ output_dir }}/privatekey_1.pem' + +- name: Generate privatekey 2 (less bits) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_2.pem' + type: RSA + size: '{{ default_rsa_key_size }}' + +- name: Generate privatekey 3 (ECC) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_3.pem' + type: ECC + curve: "{{ (ansible_distribution == 'CentOS' and ansible_distribution_major_version == '6') | ternary('secp521r1', 'secp256k1') }}" + # ^ cryptography on CentOS6 doesn't support secp256k1, so we use secp521r1 instead + select_crypto_backend: cryptography + +- name: Generate privatekey 4 (DSA) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_4.pem' + type: DSA + size: 1024 + +- name: Generate public keys + openssl_publickey: + privatekey_path: '{{ output_dir }}/privatekey_{{ item }}.pem' + path: '{{ output_dir }}/publickey_{{ item }}.pem' + loop: + - 1 + - 2 + - 3 + - 4 + +- name: Prepare result list + set_fact: + info_results: {} + +- name: Running tests with pyOpenSSL backend + include_tasks: impl.yml + vars: + select_crypto_backend: pyopenssl + when: pyopenssl_version.stdout is version('16.0.0', '>=') + +- name: Prepare result list + set_fact: + pyopenssl_info_results: "{{ info_results }}" + info_results: {} + +- name: Running tests with cryptography backend + include_tasks: impl.yml + vars: + select_crypto_backend: cryptography + when: cryptography_version.stdout is version('1.2.3', '>=') + +- name: Prepare result list + set_fact: + cryptography_info_results: "{{ info_results }}" + +- block: + - name: Dump pyOpenSSL results + debug: + var: pyopenssl_info_results + - name: Dump cryptography results + debug: + var: cryptography_info_results + - name: Compare results + assert: + that: + - ' (pyopenssl_info_results[item] | dict2items | rejectattr("key", "equalto", "deprecations") | list | items2dict) + == (cryptography_info_results[item] | dict2items | rejectattr("key", "equalto", "deprecations") | list | items2dict)' + loop: "{{ pyopenssl_info_results.keys() | intersect(cryptography_info_results.keys()) | list }}" + when: pyopenssl_version.stdout is version('16.0.0', '>=') and cryptography_version.stdout is version('1.2.3', '>=')