openssl_privatekey_info: move main code to module_utils to allow easier implementation of diff mode (#205)
* Move openssl_privatekey_info code to module_utils. * Add changelog fragment. * Apply suggestions from code review Co-authored-by: Ajpantuso <ajpantuso@gmail.com> Co-authored-by: Ajpantuso <ajpantuso@gmail.com>pull/235/head
parent
c0edfb46bb
commit
a93f07c651
|
@ -0,0 +1,2 @@
|
||||||
|
minor_changes:
|
||||||
|
- "openssl_privatekey_info - refactor module to allow code re-use for diff mode (https://github.com/ansible-collections/community.crypto/pull/205)."
|
|
@ -0,0 +1,490 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Copyright: (c) 2016-2017, Yanis Guenane <yanis+ansible@guenane.org>
|
||||||
|
# Copyright: (c) 2017, Markus Teufelberger <mteufelberger+ansible@mgit.at>
|
||||||
|
# Copyright: (c) 2020, Felix Fontein <felix@fontein.de>
|
||||||
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||||
|
|
||||||
|
from __future__ import absolute_import, division, print_function
|
||||||
|
__metaclass__ = type
|
||||||
|
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
from distutils.version import LooseVersion
|
||||||
|
|
||||||
|
from ansible.module_utils import six
|
||||||
|
from ansible.module_utils.basic import missing_required_lib
|
||||||
|
from ansible.module_utils._text import to_native, to_bytes
|
||||||
|
|
||||||
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
||||||
|
CRYPTOGRAPHY_HAS_X25519,
|
||||||
|
CRYPTOGRAPHY_HAS_X448,
|
||||||
|
CRYPTOGRAPHY_HAS_ED25519,
|
||||||
|
CRYPTOGRAPHY_HAS_ED448,
|
||||||
|
OpenSSLObjectError,
|
||||||
|
)
|
||||||
|
|
||||||
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
||||||
|
load_privatekey,
|
||||||
|
get_fingerprint_of_bytes,
|
||||||
|
)
|
||||||
|
|
||||||
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import (
|
||||||
|
binary_exp_mod,
|
||||||
|
quick_is_not_prime,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3'
|
||||||
|
MINIMAL_PYOPENSSL_VERSION = '0.15'
|
||||||
|
|
||||||
|
PYOPENSSL_IMP_ERR = None
|
||||||
|
try:
|
||||||
|
import OpenSSL
|
||||||
|
from OpenSSL import crypto
|
||||||
|
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
|
||||||
|
except ImportError:
|
||||||
|
PYOPENSSL_IMP_ERR = traceback.format_exc()
|
||||||
|
PYOPENSSL_FOUND = False
|
||||||
|
else:
|
||||||
|
PYOPENSSL_FOUND = True
|
||||||
|
|
||||||
|
CRYPTOGRAPHY_IMP_ERR = None
|
||||||
|
try:
|
||||||
|
import cryptography
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
||||||
|
except ImportError:
|
||||||
|
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
||||||
|
CRYPTOGRAPHY_FOUND = False
|
||||||
|
else:
|
||||||
|
CRYPTOGRAPHY_FOUND = True
|
||||||
|
|
||||||
|
SIGNATURE_TEST_DATA = b'1234'
|
||||||
|
|
||||||
|
|
||||||
|
def _get_cryptography_key_info(key):
|
||||||
|
key_public_data = dict()
|
||||||
|
key_private_data = dict()
|
||||||
|
if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
|
||||||
|
key_type = 'RSA'
|
||||||
|
key_public_data['size'] = key.key_size
|
||||||
|
key_public_data['modulus'] = key.public_key().public_numbers().n
|
||||||
|
key_public_data['exponent'] = key.public_key().public_numbers().e
|
||||||
|
key_private_data['p'] = key.private_numbers().p
|
||||||
|
key_private_data['q'] = key.private_numbers().q
|
||||||
|
key_private_data['exponent'] = key.private_numbers().d
|
||||||
|
elif isinstance(key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey):
|
||||||
|
key_type = 'DSA'
|
||||||
|
key_public_data['size'] = key.key_size
|
||||||
|
key_public_data['p'] = key.parameters().parameter_numbers().p
|
||||||
|
key_public_data['q'] = key.parameters().parameter_numbers().q
|
||||||
|
key_public_data['g'] = key.parameters().parameter_numbers().g
|
||||||
|
key_public_data['y'] = key.public_key().public_numbers().y
|
||||||
|
key_private_data['x'] = key.private_numbers().x
|
||||||
|
elif CRYPTOGRAPHY_HAS_X25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey):
|
||||||
|
key_type = 'X25519'
|
||||||
|
elif CRYPTOGRAPHY_HAS_X448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey):
|
||||||
|
key_type = 'X448'
|
||||||
|
elif CRYPTOGRAPHY_HAS_ED25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey):
|
||||||
|
key_type = 'Ed25519'
|
||||||
|
elif CRYPTOGRAPHY_HAS_ED448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey):
|
||||||
|
key_type = 'Ed448'
|
||||||
|
elif isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
|
||||||
|
key_type = 'ECC'
|
||||||
|
key_public_data['curve'] = key.public_key().curve.name
|
||||||
|
key_public_data['x'] = key.public_key().public_numbers().x
|
||||||
|
key_public_data['y'] = key.public_key().public_numbers().y
|
||||||
|
key_public_data['exponent_size'] = key.public_key().curve.key_size
|
||||||
|
key_private_data['multiplier'] = key.private_numbers().private_value
|
||||||
|
else:
|
||||||
|
key_type = 'unknown ({0})'.format(type(key))
|
||||||
|
return key_type, key_public_data, key_private_data
|
||||||
|
|
||||||
|
|
||||||
|
def _check_dsa_consistency(key_public_data, key_private_data):
|
||||||
|
# Get parameters
|
||||||
|
p = key_public_data.get('p')
|
||||||
|
q = key_public_data.get('q')
|
||||||
|
g = key_public_data.get('g')
|
||||||
|
y = key_public_data.get('y')
|
||||||
|
x = key_private_data.get('x')
|
||||||
|
for v in (p, q, g, y, x):
|
||||||
|
if v is None:
|
||||||
|
return None
|
||||||
|
# Make sure that g is not 0, 1 or -1 in Z/pZ
|
||||||
|
if g < 2 or g >= p - 1:
|
||||||
|
return False
|
||||||
|
# Make sure that x is in range
|
||||||
|
if x < 1 or x >= q:
|
||||||
|
return False
|
||||||
|
# Check whether q divides p-1
|
||||||
|
if (p - 1) % q != 0:
|
||||||
|
return False
|
||||||
|
# Check that g**q mod p == 1
|
||||||
|
if binary_exp_mod(g, q, p) != 1:
|
||||||
|
return False
|
||||||
|
# Check whether g**x mod p == y
|
||||||
|
if binary_exp_mod(g, x, p) != y:
|
||||||
|
return False
|
||||||
|
# Check (quickly) whether p or q are not primes
|
||||||
|
if quick_is_not_prime(q) or quick_is_not_prime(p):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _is_cryptography_key_consistent(key, key_public_data, key_private_data):
|
||||||
|
if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
|
||||||
|
return bool(key._backend._lib.RSA_check_key(key._rsa_cdata))
|
||||||
|
if isinstance(key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey):
|
||||||
|
result = _check_dsa_consistency(key_public_data, key_private_data)
|
||||||
|
if result is not None:
|
||||||
|
return result
|
||||||
|
try:
|
||||||
|
signature = key.sign(SIGNATURE_TEST_DATA, cryptography.hazmat.primitives.hashes.SHA256())
|
||||||
|
except AttributeError:
|
||||||
|
# sign() was added in cryptography 1.5, but we support older versions
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
key.public_key().verify(
|
||||||
|
signature,
|
||||||
|
SIGNATURE_TEST_DATA,
|
||||||
|
cryptography.hazmat.primitives.hashes.SHA256()
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except cryptography.exceptions.InvalidSignature:
|
||||||
|
return False
|
||||||
|
if isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
|
||||||
|
try:
|
||||||
|
signature = key.sign(
|
||||||
|
SIGNATURE_TEST_DATA,
|
||||||
|
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(cryptography.hazmat.primitives.hashes.SHA256())
|
||||||
|
)
|
||||||
|
except AttributeError:
|
||||||
|
# sign() was added in cryptography 1.5, but we support older versions
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
key.public_key().verify(
|
||||||
|
signature,
|
||||||
|
SIGNATURE_TEST_DATA,
|
||||||
|
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(cryptography.hazmat.primitives.hashes.SHA256())
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except cryptography.exceptions.InvalidSignature:
|
||||||
|
return False
|
||||||
|
has_simple_sign_function = False
|
||||||
|
if CRYPTOGRAPHY_HAS_ED25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey):
|
||||||
|
has_simple_sign_function = True
|
||||||
|
if CRYPTOGRAPHY_HAS_ED448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey):
|
||||||
|
has_simple_sign_function = True
|
||||||
|
if has_simple_sign_function:
|
||||||
|
signature = key.sign(SIGNATURE_TEST_DATA)
|
||||||
|
try:
|
||||||
|
key.public_key().verify(signature, SIGNATURE_TEST_DATA)
|
||||||
|
return True
|
||||||
|
except cryptography.exceptions.InvalidSignature:
|
||||||
|
return False
|
||||||
|
# For X25519 and X448, there's no test yet.
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class PrivateKeyConsistencyError(OpenSSLObjectError):
|
||||||
|
def __init__(self, msg, result):
|
||||||
|
super(PrivateKeyConsistencyError, self).__init__(msg)
|
||||||
|
self.error_message = msg
|
||||||
|
self.result = result
|
||||||
|
|
||||||
|
|
||||||
|
class PrivateKeyParseError(OpenSSLObjectError):
|
||||||
|
def __init__(self, msg, result):
|
||||||
|
super(PrivateKeyParseError, self).__init__(msg)
|
||||||
|
self.error_message = msg
|
||||||
|
self.result = result
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class PrivateKeyInfoRetrieval(object):
|
||||||
|
def __init__(self, module, backend, content, passphrase=None, return_private_key_data=False):
|
||||||
|
# content must be a bytes string
|
||||||
|
self.module = module
|
||||||
|
self.backend = backend
|
||||||
|
self.content = content
|
||||||
|
self.passphrase = passphrase
|
||||||
|
self.return_private_key_data = return_private_key_data
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_public_key(self, binary):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _get_key_info(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _is_key_consistent(self, key_public_data, key_private_data):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_info(self):
|
||||||
|
result = dict(
|
||||||
|
can_parse_key=False,
|
||||||
|
key_is_consistent=None,
|
||||||
|
)
|
||||||
|
priv_key_detail = self.content
|
||||||
|
try:
|
||||||
|
self.key = load_privatekey(
|
||||||
|
path=None,
|
||||||
|
content=priv_key_detail,
|
||||||
|
passphrase=to_bytes(self.passphrase) if self.passphrase is not None else self.passphrase,
|
||||||
|
backend=self.backend
|
||||||
|
)
|
||||||
|
result['can_parse_key'] = True
|
||||||
|
except OpenSSLObjectError as exc:
|
||||||
|
raise PrivateKeyParseError(to_native(exc), result)
|
||||||
|
|
||||||
|
result['public_key'] = self._get_public_key(binary=False)
|
||||||
|
pk = self._get_public_key(binary=True)
|
||||||
|
result['public_key_fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict()
|
||||||
|
|
||||||
|
key_type, key_public_data, key_private_data = self._get_key_info()
|
||||||
|
result['type'] = key_type
|
||||||
|
result['public_data'] = key_public_data
|
||||||
|
if self.return_private_key_data:
|
||||||
|
result['private_data'] = key_private_data
|
||||||
|
|
||||||
|
result['key_is_consistent'] = self._is_key_consistent(key_public_data, key_private_data)
|
||||||
|
if result['key_is_consistent'] is False:
|
||||||
|
# Only fail when it is False, to avoid to fail on None (which means "we don't know")
|
||||||
|
msg = (
|
||||||
|
"Private key is not consistent! (See "
|
||||||
|
"https://blog.hboeck.de/archives/888-How-I-tricked-Symantec-with-a-Fake-Private-Key.html)"
|
||||||
|
)
|
||||||
|
raise PrivateKeyConsistencyError(msg, result)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
class PrivateKeyInfoRetrievalCryptography(PrivateKeyInfoRetrieval):
|
||||||
|
"""Validate the supplied private key, using the cryptography backend"""
|
||||||
|
def __init__(self, module, content, **kwargs):
|
||||||
|
super(PrivateKeyInfoRetrievalCryptography, self).__init__(module, 'cryptography', content, **kwargs)
|
||||||
|
|
||||||
|
def _get_public_key(self, binary):
|
||||||
|
return self.key.public_key().public_bytes(
|
||||||
|
serialization.Encoding.DER if binary else serialization.Encoding.PEM,
|
||||||
|
serialization.PublicFormat.SubjectPublicKeyInfo
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_key_info(self):
|
||||||
|
return _get_cryptography_key_info(self.key)
|
||||||
|
|
||||||
|
def _is_key_consistent(self, key_public_data, key_private_data):
|
||||||
|
return _is_cryptography_key_consistent(self.key, key_public_data, key_private_data)
|
||||||
|
|
||||||
|
|
||||||
|
class PrivateKeyInfoRetrievalPyOpenSSL(PrivateKeyInfoRetrieval):
|
||||||
|
"""validate the supplied private key."""
|
||||||
|
|
||||||
|
def __init__(self, module, content, **kwargs):
|
||||||
|
super(PrivateKeyInfoRetrievalPyOpenSSL, self).__init__(module, 'pyopenssl', content, **kwargs)
|
||||||
|
|
||||||
|
def _get_public_key(self, binary):
|
||||||
|
try:
|
||||||
|
return crypto.dump_publickey(
|
||||||
|
crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM,
|
||||||
|
self.key
|
||||||
|
)
|
||||||
|
except AttributeError:
|
||||||
|
try:
|
||||||
|
# pyOpenSSL < 16.0:
|
||||||
|
bio = crypto._new_mem_buf()
|
||||||
|
if binary:
|
||||||
|
rc = crypto._lib.i2d_PUBKEY_bio(bio, self.key._pkey)
|
||||||
|
else:
|
||||||
|
rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.key._pkey)
|
||||||
|
if rc != 1:
|
||||||
|
crypto._raise_current_error()
|
||||||
|
return crypto._bio_to_string(bio)
|
||||||
|
except AttributeError:
|
||||||
|
self.module.warn('Your pyOpenSSL version does not support dumping public keys. '
|
||||||
|
'Please upgrade to version 16.0 or newer, or use the cryptography backend.')
|
||||||
|
|
||||||
|
def bigint_to_int(self, bn):
|
||||||
|
'''Convert OpenSSL BIGINT to Python integer'''
|
||||||
|
if bn == OpenSSL._util.ffi.NULL:
|
||||||
|
return None
|
||||||
|
hexstr = OpenSSL._util.lib.BN_bn2hex(bn)
|
||||||
|
try:
|
||||||
|
return int(OpenSSL._util.ffi.string(hexstr), 16)
|
||||||
|
finally:
|
||||||
|
OpenSSL._util.lib.OPENSSL_free(hexstr)
|
||||||
|
|
||||||
|
def _get_key_info(self):
|
||||||
|
key_public_data = dict()
|
||||||
|
key_private_data = dict()
|
||||||
|
openssl_key_type = self.key.type()
|
||||||
|
try_fallback = True
|
||||||
|
if crypto.TYPE_RSA == openssl_key_type:
|
||||||
|
key_type = 'RSA'
|
||||||
|
key_public_data['size'] = self.key.bits()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Use OpenSSL directly to extract key data
|
||||||
|
key = OpenSSL._util.lib.EVP_PKEY_get1_RSA(self.key._pkey)
|
||||||
|
key = OpenSSL._util.ffi.gc(key, OpenSSL._util.lib.RSA_free)
|
||||||
|
# OpenSSL 1.1 and newer have functions to extract the parameters
|
||||||
|
# from the EVP PKEY data structures. Older versions didn't have
|
||||||
|
# these getters, and it was common use to simply access the values
|
||||||
|
# directly. Since there's no guarantee that these data structures
|
||||||
|
# will still be accessible in the future, we use the getters for
|
||||||
|
# 1.1 and later, and directly access the values for 1.0.x and
|
||||||
|
# earlier.
|
||||||
|
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
|
||||||
|
# Get modulus and exponents
|
||||||
|
n = OpenSSL._util.ffi.new("BIGNUM **")
|
||||||
|
e = OpenSSL._util.ffi.new("BIGNUM **")
|
||||||
|
d = OpenSSL._util.ffi.new("BIGNUM **")
|
||||||
|
OpenSSL._util.lib.RSA_get0_key(key, n, e, d)
|
||||||
|
key_public_data['modulus'] = self.bigint_to_int(n[0])
|
||||||
|
key_public_data['exponent'] = self.bigint_to_int(e[0])
|
||||||
|
key_private_data['exponent'] = self.bigint_to_int(d[0])
|
||||||
|
# Get factors
|
||||||
|
p = OpenSSL._util.ffi.new("BIGNUM **")
|
||||||
|
q = OpenSSL._util.ffi.new("BIGNUM **")
|
||||||
|
OpenSSL._util.lib.RSA_get0_factors(key, p, q)
|
||||||
|
key_private_data['p'] = self.bigint_to_int(p[0])
|
||||||
|
key_private_data['q'] = self.bigint_to_int(q[0])
|
||||||
|
else:
|
||||||
|
# Get modulus and exponents
|
||||||
|
key_public_data['modulus'] = self.bigint_to_int(key.n)
|
||||||
|
key_public_data['exponent'] = self.bigint_to_int(key.e)
|
||||||
|
key_private_data['exponent'] = self.bigint_to_int(key.d)
|
||||||
|
# Get factors
|
||||||
|
key_private_data['p'] = self.bigint_to_int(key.p)
|
||||||
|
key_private_data['q'] = self.bigint_to_int(key.q)
|
||||||
|
try_fallback = False
|
||||||
|
except AttributeError:
|
||||||
|
# Use fallback if available
|
||||||
|
pass
|
||||||
|
elif crypto.TYPE_DSA == openssl_key_type:
|
||||||
|
key_type = 'DSA'
|
||||||
|
key_public_data['size'] = self.key.bits()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Use OpenSSL directly to extract key data
|
||||||
|
key = OpenSSL._util.lib.EVP_PKEY_get1_DSA(self.key._pkey)
|
||||||
|
key = OpenSSL._util.ffi.gc(key, OpenSSL._util.lib.DSA_free)
|
||||||
|
# OpenSSL 1.1 and newer have functions to extract the parameters
|
||||||
|
# from the EVP PKEY data structures. Older versions didn't have
|
||||||
|
# these getters, and it was common use to simply access the values
|
||||||
|
# directly. Since there's no guarantee that these data structures
|
||||||
|
# will still be accessible in the future, we use the getters for
|
||||||
|
# 1.1 and later, and directly access the values for 1.0.x and
|
||||||
|
# earlier.
|
||||||
|
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
|
||||||
|
# Get public parameters (primes and group element)
|
||||||
|
p = OpenSSL._util.ffi.new("BIGNUM **")
|
||||||
|
q = OpenSSL._util.ffi.new("BIGNUM **")
|
||||||
|
g = OpenSSL._util.ffi.new("BIGNUM **")
|
||||||
|
OpenSSL._util.lib.DSA_get0_pqg(key, p, q, g)
|
||||||
|
key_public_data['p'] = self.bigint_to_int(p[0])
|
||||||
|
key_public_data['q'] = self.bigint_to_int(q[0])
|
||||||
|
key_public_data['g'] = self.bigint_to_int(g[0])
|
||||||
|
# Get public and private key exponents
|
||||||
|
y = OpenSSL._util.ffi.new("BIGNUM **")
|
||||||
|
x = OpenSSL._util.ffi.new("BIGNUM **")
|
||||||
|
OpenSSL._util.lib.DSA_get0_key(key, y, x)
|
||||||
|
key_public_data['y'] = self.bigint_to_int(y[0])
|
||||||
|
key_private_data['x'] = self.bigint_to_int(x[0])
|
||||||
|
else:
|
||||||
|
# Get public parameters (primes and group element)
|
||||||
|
key_public_data['p'] = self.bigint_to_int(key.p)
|
||||||
|
key_public_data['q'] = self.bigint_to_int(key.q)
|
||||||
|
key_public_data['g'] = self.bigint_to_int(key.g)
|
||||||
|
# Get public and private key exponents
|
||||||
|
key_public_data['y'] = self.bigint_to_int(key.pub_key)
|
||||||
|
key_private_data['x'] = self.bigint_to_int(key.priv_key)
|
||||||
|
try_fallback = False
|
||||||
|
except AttributeError:
|
||||||
|
# Use fallback if available
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# Return 'unknown'
|
||||||
|
key_type = 'unknown ({0})'.format(self.key.type())
|
||||||
|
# If needed and if possible, fall back to cryptography
|
||||||
|
if try_fallback and PYOPENSSL_VERSION >= LooseVersion('16.1.0') and CRYPTOGRAPHY_FOUND:
|
||||||
|
return _get_cryptography_key_info(self.key.to_cryptography_key())
|
||||||
|
return key_type, key_public_data, key_private_data
|
||||||
|
|
||||||
|
def _is_key_consistent(self, key_public_data, key_private_data):
|
||||||
|
openssl_key_type = self.key.type()
|
||||||
|
if crypto.TYPE_RSA == openssl_key_type:
|
||||||
|
try:
|
||||||
|
return self.key.check()
|
||||||
|
except crypto.Error:
|
||||||
|
# OpenSSL error means that key is not consistent
|
||||||
|
return False
|
||||||
|
if crypto.TYPE_DSA == openssl_key_type:
|
||||||
|
result = _check_dsa_consistency(key_public_data, key_private_data)
|
||||||
|
if result is not None:
|
||||||
|
return result
|
||||||
|
signature = crypto.sign(self.key, SIGNATURE_TEST_DATA, 'sha256')
|
||||||
|
# Verify wants a cert (where it can get the public key from)
|
||||||
|
cert = crypto.X509()
|
||||||
|
cert.set_pubkey(self.key)
|
||||||
|
try:
|
||||||
|
crypto.verify(cert, signature, SIGNATURE_TEST_DATA, 'sha256')
|
||||||
|
return True
|
||||||
|
except crypto.Error:
|
||||||
|
return False
|
||||||
|
# If needed and if possible, fall back to cryptography
|
||||||
|
if PYOPENSSL_VERSION >= LooseVersion('16.1.0') and CRYPTOGRAPHY_FOUND:
|
||||||
|
return _is_cryptography_key_consistent(self.key.to_cryptography_key(), key_public_data, key_private_data)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_privatekey_info(module, backend, content, passphrase=None, return_private_key_data=False):
|
||||||
|
if backend == 'cryptography':
|
||||||
|
info = PrivateKeyInfoRetrievalCryptography(
|
||||||
|
module, content, passphrase=passphrase, return_private_key_data=return_private_key_data)
|
||||||
|
elif backend == 'pyopenssl':
|
||||||
|
info = PrivateKeyInfoRetrievalPyOpenSSL(
|
||||||
|
module, content, passphrase=passphrase, return_private_key_data=return_private_key_data)
|
||||||
|
return info.get_info()
|
||||||
|
|
||||||
|
|
||||||
|
def select_backend(module, backend, content, passphrase=None, return_private_key_data=False):
|
||||||
|
if backend == 'auto':
|
||||||
|
# Detection what is possible
|
||||||
|
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
||||||
|
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
|
||||||
|
|
||||||
|
# First try cryptography, then pyOpenSSL
|
||||||
|
if can_use_cryptography:
|
||||||
|
backend = 'cryptography'
|
||||||
|
elif can_use_pyopenssl:
|
||||||
|
backend = 'pyopenssl'
|
||||||
|
|
||||||
|
# Success?
|
||||||
|
if backend == 'auto':
|
||||||
|
module.fail_json(msg=("Can't detect any of the required Python libraries "
|
||||||
|
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
|
||||||
|
MINIMAL_CRYPTOGRAPHY_VERSION,
|
||||||
|
MINIMAL_PYOPENSSL_VERSION))
|
||||||
|
|
||||||
|
if backend == 'pyopenssl':
|
||||||
|
if not PYOPENSSL_FOUND:
|
||||||
|
module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
|
||||||
|
exception=PYOPENSSL_IMP_ERR)
|
||||||
|
module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
|
||||||
|
version='2.0.0', collection_name='community.crypto')
|
||||||
|
return backend, PrivateKeyInfoRetrievalPyOpenSSL(
|
||||||
|
module, content, passphrase=passphrase, return_private_key_data=return_private_key_data)
|
||||||
|
elif backend == 'cryptography':
|
||||||
|
if not CRYPTOGRAPHY_FOUND:
|
||||||
|
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
|
||||||
|
exception=CRYPTOGRAPHY_IMP_ERR)
|
||||||
|
return backend, PrivateKeyInfoRetrievalCryptography(
|
||||||
|
module, content, passphrase=passphrase, return_private_key_data=return_private_key_data)
|
||||||
|
else:
|
||||||
|
raise ValueError('Unsupported value for backend: {0}'.format(backend))
|
|
@ -138,450 +138,19 @@ private_data:
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
import abc
|
from ansible.module_utils.basic import AnsibleModule
|
||||||
import os
|
from ansible.module_utils._text import to_native
|
||||||
import traceback
|
|
||||||
|
|
||||||
from distutils.version import LooseVersion
|
|
||||||
|
|
||||||
from ansible.module_utils.basic import AnsibleModule, missing_required_lib
|
|
||||||
from ansible.module_utils._text import to_native, to_bytes
|
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
|
||||||
CRYPTOGRAPHY_HAS_X25519,
|
|
||||||
CRYPTOGRAPHY_HAS_X448,
|
|
||||||
CRYPTOGRAPHY_HAS_ED25519,
|
|
||||||
CRYPTOGRAPHY_HAS_ED448,
|
|
||||||
OpenSSLObjectError,
|
OpenSSLObjectError,
|
||||||
)
|
)
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
|
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.privatekey_info import (
|
||||||
OpenSSLObject,
|
PrivateKeyConsistencyError,
|
||||||
load_privatekey,
|
PrivateKeyParseError,
|
||||||
get_fingerprint_of_bytes,
|
select_backend,
|
||||||
)
|
)
|
||||||
|
|
||||||
from ansible_collections.community.crypto.plugins.module_utils.crypto.math import (
|
|
||||||
binary_exp_mod,
|
|
||||||
quick_is_not_prime,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
MINIMAL_CRYPTOGRAPHY_VERSION = '1.2.3'
|
|
||||||
MINIMAL_PYOPENSSL_VERSION = '0.15'
|
|
||||||
|
|
||||||
PYOPENSSL_IMP_ERR = None
|
|
||||||
try:
|
|
||||||
import OpenSSL
|
|
||||||
from OpenSSL import crypto
|
|
||||||
PYOPENSSL_VERSION = LooseVersion(OpenSSL.__version__)
|
|
||||||
except ImportError:
|
|
||||||
PYOPENSSL_IMP_ERR = traceback.format_exc()
|
|
||||||
PYOPENSSL_FOUND = False
|
|
||||||
else:
|
|
||||||
PYOPENSSL_FOUND = True
|
|
||||||
|
|
||||||
CRYPTOGRAPHY_IMP_ERR = None
|
|
||||||
try:
|
|
||||||
import cryptography
|
|
||||||
from cryptography.hazmat.primitives import serialization
|
|
||||||
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
|
|
||||||
except ImportError:
|
|
||||||
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
|
|
||||||
CRYPTOGRAPHY_FOUND = False
|
|
||||||
else:
|
|
||||||
CRYPTOGRAPHY_FOUND = True
|
|
||||||
|
|
||||||
SIGNATURE_TEST_DATA = b'1234'
|
|
||||||
|
|
||||||
|
|
||||||
def _get_cryptography_key_info(key):
|
|
||||||
key_public_data = dict()
|
|
||||||
key_private_data = dict()
|
|
||||||
if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
|
|
||||||
key_type = 'RSA'
|
|
||||||
key_public_data['size'] = key.key_size
|
|
||||||
key_public_data['modulus'] = key.public_key().public_numbers().n
|
|
||||||
key_public_data['exponent'] = key.public_key().public_numbers().e
|
|
||||||
key_private_data['p'] = key.private_numbers().p
|
|
||||||
key_private_data['q'] = key.private_numbers().q
|
|
||||||
key_private_data['exponent'] = key.private_numbers().d
|
|
||||||
elif isinstance(key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey):
|
|
||||||
key_type = 'DSA'
|
|
||||||
key_public_data['size'] = key.key_size
|
|
||||||
key_public_data['p'] = key.parameters().parameter_numbers().p
|
|
||||||
key_public_data['q'] = key.parameters().parameter_numbers().q
|
|
||||||
key_public_data['g'] = key.parameters().parameter_numbers().g
|
|
||||||
key_public_data['y'] = key.public_key().public_numbers().y
|
|
||||||
key_private_data['x'] = key.private_numbers().x
|
|
||||||
elif CRYPTOGRAPHY_HAS_X25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey):
|
|
||||||
key_type = 'X25519'
|
|
||||||
elif CRYPTOGRAPHY_HAS_X448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey):
|
|
||||||
key_type = 'X448'
|
|
||||||
elif CRYPTOGRAPHY_HAS_ED25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey):
|
|
||||||
key_type = 'Ed25519'
|
|
||||||
elif CRYPTOGRAPHY_HAS_ED448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey):
|
|
||||||
key_type = 'Ed448'
|
|
||||||
elif isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
|
|
||||||
key_type = 'ECC'
|
|
||||||
key_public_data['curve'] = key.public_key().curve.name
|
|
||||||
key_public_data['x'] = key.public_key().public_numbers().x
|
|
||||||
key_public_data['y'] = key.public_key().public_numbers().y
|
|
||||||
key_public_data['exponent_size'] = key.public_key().curve.key_size
|
|
||||||
key_private_data['multiplier'] = key.private_numbers().private_value
|
|
||||||
else:
|
|
||||||
key_type = 'unknown ({0})'.format(type(key))
|
|
||||||
return key_type, key_public_data, key_private_data
|
|
||||||
|
|
||||||
|
|
||||||
def _check_dsa_consistency(key_public_data, key_private_data):
|
|
||||||
# Get parameters
|
|
||||||
p = key_public_data.get('p')
|
|
||||||
q = key_public_data.get('q')
|
|
||||||
g = key_public_data.get('g')
|
|
||||||
y = key_public_data.get('y')
|
|
||||||
x = key_private_data.get('x')
|
|
||||||
for v in (p, q, g, y, x):
|
|
||||||
if v is None:
|
|
||||||
return None
|
|
||||||
# Make sure that g is not 0, 1 or -1 in Z/pZ
|
|
||||||
if g < 2 or g >= p - 1:
|
|
||||||
return False
|
|
||||||
# Make sure that x is in range
|
|
||||||
if x < 1 or x >= q:
|
|
||||||
return False
|
|
||||||
# Check whether q divides p-1
|
|
||||||
if (p - 1) % q != 0:
|
|
||||||
return False
|
|
||||||
# Check that g**q mod p == 1
|
|
||||||
if binary_exp_mod(g, q, p) != 1:
|
|
||||||
return False
|
|
||||||
# Check whether g**x mod p == y
|
|
||||||
if binary_exp_mod(g, x, p) != y:
|
|
||||||
return False
|
|
||||||
# Check (quickly) whether p or q are not primes
|
|
||||||
if quick_is_not_prime(q) or quick_is_not_prime(p):
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def _is_cryptography_key_consistent(key, key_public_data, key_private_data):
|
|
||||||
if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
|
|
||||||
return bool(key._backend._lib.RSA_check_key(key._rsa_cdata))
|
|
||||||
if isinstance(key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey):
|
|
||||||
result = _check_dsa_consistency(key_public_data, key_private_data)
|
|
||||||
if result is not None:
|
|
||||||
return result
|
|
||||||
try:
|
|
||||||
signature = key.sign(SIGNATURE_TEST_DATA, cryptography.hazmat.primitives.hashes.SHA256())
|
|
||||||
except AttributeError:
|
|
||||||
# sign() was added in cryptography 1.5, but we support older versions
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
key.public_key().verify(
|
|
||||||
signature,
|
|
||||||
SIGNATURE_TEST_DATA,
|
|
||||||
cryptography.hazmat.primitives.hashes.SHA256()
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
except cryptography.exceptions.InvalidSignature:
|
|
||||||
return False
|
|
||||||
if isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
|
|
||||||
try:
|
|
||||||
signature = key.sign(
|
|
||||||
SIGNATURE_TEST_DATA,
|
|
||||||
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(cryptography.hazmat.primitives.hashes.SHA256())
|
|
||||||
)
|
|
||||||
except AttributeError:
|
|
||||||
# sign() was added in cryptography 1.5, but we support older versions
|
|
||||||
return None
|
|
||||||
try:
|
|
||||||
key.public_key().verify(
|
|
||||||
signature,
|
|
||||||
SIGNATURE_TEST_DATA,
|
|
||||||
cryptography.hazmat.primitives.asymmetric.ec.ECDSA(cryptography.hazmat.primitives.hashes.SHA256())
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
except cryptography.exceptions.InvalidSignature:
|
|
||||||
return False
|
|
||||||
has_simple_sign_function = False
|
|
||||||
if CRYPTOGRAPHY_HAS_ED25519 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey):
|
|
||||||
has_simple_sign_function = True
|
|
||||||
if CRYPTOGRAPHY_HAS_ED448 and isinstance(key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey):
|
|
||||||
has_simple_sign_function = True
|
|
||||||
if has_simple_sign_function:
|
|
||||||
signature = key.sign(SIGNATURE_TEST_DATA)
|
|
||||||
try:
|
|
||||||
key.public_key().verify(signature, SIGNATURE_TEST_DATA)
|
|
||||||
return True
|
|
||||||
except cryptography.exceptions.InvalidSignature:
|
|
||||||
return False
|
|
||||||
# For X25519 and X448, there's no test yet.
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
class PrivateKeyInfo(OpenSSLObject):
|
|
||||||
def __init__(self, module, backend):
|
|
||||||
super(PrivateKeyInfo, self).__init__(
|
|
||||||
module.params['path'] or '',
|
|
||||||
'present',
|
|
||||||
False,
|
|
||||||
module.check_mode,
|
|
||||||
)
|
|
||||||
self.backend = backend
|
|
||||||
self.module = module
|
|
||||||
self.content = module.params['content']
|
|
||||||
|
|
||||||
self.passphrase = module.params['passphrase']
|
|
||||||
self.return_private_key_data = module.params['return_private_key_data']
|
|
||||||
|
|
||||||
def generate(self):
|
|
||||||
# Empty method because OpenSSLObject wants this
|
|
||||||
pass
|
|
||||||
|
|
||||||
def dump(self):
|
|
||||||
# Empty method because OpenSSLObject wants this
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_public_key(self, binary):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_key_info(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _is_key_consistent(self, key_public_data, key_private_data):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_info(self):
|
|
||||||
result = dict(
|
|
||||||
can_load_key=False,
|
|
||||||
can_parse_key=False,
|
|
||||||
key_is_consistent=None,
|
|
||||||
)
|
|
||||||
if self.content is not None:
|
|
||||||
priv_key_detail = self.content.encode('utf-8')
|
|
||||||
result['can_load_key'] = True
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
with open(self.path, 'rb') as b_priv_key_fh:
|
|
||||||
priv_key_detail = b_priv_key_fh.read()
|
|
||||||
result['can_load_key'] = True
|
|
||||||
except (IOError, OSError) as exc:
|
|
||||||
self.module.fail_json(msg=to_native(exc), **result)
|
|
||||||
try:
|
|
||||||
self.key = load_privatekey(
|
|
||||||
path=None,
|
|
||||||
content=priv_key_detail,
|
|
||||||
passphrase=to_bytes(self.passphrase) if self.passphrase is not None else self.passphrase,
|
|
||||||
backend=self.backend
|
|
||||||
)
|
|
||||||
result['can_parse_key'] = True
|
|
||||||
except OpenSSLObjectError as exc:
|
|
||||||
self.module.fail_json(msg=to_native(exc), **result)
|
|
||||||
|
|
||||||
result['public_key'] = self._get_public_key(binary=False)
|
|
||||||
pk = self._get_public_key(binary=True)
|
|
||||||
result['public_key_fingerprints'] = get_fingerprint_of_bytes(pk) if pk is not None else dict()
|
|
||||||
|
|
||||||
key_type, key_public_data, key_private_data = self._get_key_info()
|
|
||||||
result['type'] = key_type
|
|
||||||
result['public_data'] = key_public_data
|
|
||||||
if self.return_private_key_data:
|
|
||||||
result['private_data'] = key_private_data
|
|
||||||
|
|
||||||
result['key_is_consistent'] = self._is_key_consistent(key_public_data, key_private_data)
|
|
||||||
if result['key_is_consistent'] is False:
|
|
||||||
# Only fail when it is False, to avoid to fail on None (which means "we don't know")
|
|
||||||
result['key_is_consistent'] = False
|
|
||||||
self.module.fail_json(
|
|
||||||
msg="Private key is not consistent! (See "
|
|
||||||
"https://blog.hboeck.de/archives/888-How-I-tricked-Symantec-with-a-Fake-Private-Key.html)",
|
|
||||||
**result
|
|
||||||
)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
class PrivateKeyInfoCryptography(PrivateKeyInfo):
|
|
||||||
"""Validate the supplied private key, using the cryptography backend"""
|
|
||||||
def __init__(self, module):
|
|
||||||
super(PrivateKeyInfoCryptography, self).__init__(module, 'cryptography')
|
|
||||||
|
|
||||||
def _get_public_key(self, binary):
|
|
||||||
return self.key.public_key().public_bytes(
|
|
||||||
serialization.Encoding.DER if binary else serialization.Encoding.PEM,
|
|
||||||
serialization.PublicFormat.SubjectPublicKeyInfo
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_key_info(self):
|
|
||||||
return _get_cryptography_key_info(self.key)
|
|
||||||
|
|
||||||
def _is_key_consistent(self, key_public_data, key_private_data):
|
|
||||||
return _is_cryptography_key_consistent(self.key, key_public_data, key_private_data)
|
|
||||||
|
|
||||||
|
|
||||||
class PrivateKeyInfoPyOpenSSL(PrivateKeyInfo):
|
|
||||||
"""validate the supplied private key."""
|
|
||||||
|
|
||||||
def __init__(self, module):
|
|
||||||
super(PrivateKeyInfoPyOpenSSL, self).__init__(module, 'pyopenssl')
|
|
||||||
|
|
||||||
def _get_public_key(self, binary):
|
|
||||||
try:
|
|
||||||
return crypto.dump_publickey(
|
|
||||||
crypto.FILETYPE_ASN1 if binary else crypto.FILETYPE_PEM,
|
|
||||||
self.key
|
|
||||||
)
|
|
||||||
except AttributeError:
|
|
||||||
try:
|
|
||||||
# pyOpenSSL < 16.0:
|
|
||||||
bio = crypto._new_mem_buf()
|
|
||||||
if binary:
|
|
||||||
rc = crypto._lib.i2d_PUBKEY_bio(bio, self.key._pkey)
|
|
||||||
else:
|
|
||||||
rc = crypto._lib.PEM_write_bio_PUBKEY(bio, self.key._pkey)
|
|
||||||
if rc != 1:
|
|
||||||
crypto._raise_current_error()
|
|
||||||
return crypto._bio_to_string(bio)
|
|
||||||
except AttributeError:
|
|
||||||
self.module.warn('Your pyOpenSSL version does not support dumping public keys. '
|
|
||||||
'Please upgrade to version 16.0 or newer, or use the cryptography backend.')
|
|
||||||
|
|
||||||
def bigint_to_int(self, bn):
|
|
||||||
'''Convert OpenSSL BIGINT to Python integer'''
|
|
||||||
if bn == OpenSSL._util.ffi.NULL:
|
|
||||||
return None
|
|
||||||
hexstr = OpenSSL._util.lib.BN_bn2hex(bn)
|
|
||||||
try:
|
|
||||||
return int(OpenSSL._util.ffi.string(hexstr), 16)
|
|
||||||
finally:
|
|
||||||
OpenSSL._util.lib.OPENSSL_free(hexstr)
|
|
||||||
|
|
||||||
def _get_key_info(self):
|
|
||||||
key_public_data = dict()
|
|
||||||
key_private_data = dict()
|
|
||||||
openssl_key_type = self.key.type()
|
|
||||||
try_fallback = True
|
|
||||||
if crypto.TYPE_RSA == openssl_key_type:
|
|
||||||
key_type = 'RSA'
|
|
||||||
key_public_data['size'] = self.key.bits()
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Use OpenSSL directly to extract key data
|
|
||||||
key = OpenSSL._util.lib.EVP_PKEY_get1_RSA(self.key._pkey)
|
|
||||||
key = OpenSSL._util.ffi.gc(key, OpenSSL._util.lib.RSA_free)
|
|
||||||
# OpenSSL 1.1 and newer have functions to extract the parameters
|
|
||||||
# from the EVP PKEY data structures. Older versions didn't have
|
|
||||||
# these getters, and it was common use to simply access the values
|
|
||||||
# directly. Since there's no guarantee that these data structures
|
|
||||||
# will still be accessible in the future, we use the getters for
|
|
||||||
# 1.1 and later, and directly access the values for 1.0.x and
|
|
||||||
# earlier.
|
|
||||||
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
|
|
||||||
# Get modulus and exponents
|
|
||||||
n = OpenSSL._util.ffi.new("BIGNUM **")
|
|
||||||
e = OpenSSL._util.ffi.new("BIGNUM **")
|
|
||||||
d = OpenSSL._util.ffi.new("BIGNUM **")
|
|
||||||
OpenSSL._util.lib.RSA_get0_key(key, n, e, d)
|
|
||||||
key_public_data['modulus'] = self.bigint_to_int(n[0])
|
|
||||||
key_public_data['exponent'] = self.bigint_to_int(e[0])
|
|
||||||
key_private_data['exponent'] = self.bigint_to_int(d[0])
|
|
||||||
# Get factors
|
|
||||||
p = OpenSSL._util.ffi.new("BIGNUM **")
|
|
||||||
q = OpenSSL._util.ffi.new("BIGNUM **")
|
|
||||||
OpenSSL._util.lib.RSA_get0_factors(key, p, q)
|
|
||||||
key_private_data['p'] = self.bigint_to_int(p[0])
|
|
||||||
key_private_data['q'] = self.bigint_to_int(q[0])
|
|
||||||
else:
|
|
||||||
# Get modulus and exponents
|
|
||||||
key_public_data['modulus'] = self.bigint_to_int(key.n)
|
|
||||||
key_public_data['exponent'] = self.bigint_to_int(key.e)
|
|
||||||
key_private_data['exponent'] = self.bigint_to_int(key.d)
|
|
||||||
# Get factors
|
|
||||||
key_private_data['p'] = self.bigint_to_int(key.p)
|
|
||||||
key_private_data['q'] = self.bigint_to_int(key.q)
|
|
||||||
try_fallback = False
|
|
||||||
except AttributeError:
|
|
||||||
# Use fallback if available
|
|
||||||
pass
|
|
||||||
elif crypto.TYPE_DSA == openssl_key_type:
|
|
||||||
key_type = 'DSA'
|
|
||||||
key_public_data['size'] = self.key.bits()
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Use OpenSSL directly to extract key data
|
|
||||||
key = OpenSSL._util.lib.EVP_PKEY_get1_DSA(self.key._pkey)
|
|
||||||
key = OpenSSL._util.ffi.gc(key, OpenSSL._util.lib.DSA_free)
|
|
||||||
# OpenSSL 1.1 and newer have functions to extract the parameters
|
|
||||||
# from the EVP PKEY data structures. Older versions didn't have
|
|
||||||
# these getters, and it was common use to simply access the values
|
|
||||||
# directly. Since there's no guarantee that these data structures
|
|
||||||
# will still be accessible in the future, we use the getters for
|
|
||||||
# 1.1 and later, and directly access the values for 1.0.x and
|
|
||||||
# earlier.
|
|
||||||
if OpenSSL.SSL.OPENSSL_VERSION_NUMBER >= 0x10100000:
|
|
||||||
# Get public parameters (primes and group element)
|
|
||||||
p = OpenSSL._util.ffi.new("BIGNUM **")
|
|
||||||
q = OpenSSL._util.ffi.new("BIGNUM **")
|
|
||||||
g = OpenSSL._util.ffi.new("BIGNUM **")
|
|
||||||
OpenSSL._util.lib.DSA_get0_pqg(key, p, q, g)
|
|
||||||
key_public_data['p'] = self.bigint_to_int(p[0])
|
|
||||||
key_public_data['q'] = self.bigint_to_int(q[0])
|
|
||||||
key_public_data['g'] = self.bigint_to_int(g[0])
|
|
||||||
# Get public and private key exponents
|
|
||||||
y = OpenSSL._util.ffi.new("BIGNUM **")
|
|
||||||
x = OpenSSL._util.ffi.new("BIGNUM **")
|
|
||||||
OpenSSL._util.lib.DSA_get0_key(key, y, x)
|
|
||||||
key_public_data['y'] = self.bigint_to_int(y[0])
|
|
||||||
key_private_data['x'] = self.bigint_to_int(x[0])
|
|
||||||
else:
|
|
||||||
# Get public parameters (primes and group element)
|
|
||||||
key_public_data['p'] = self.bigint_to_int(key.p)
|
|
||||||
key_public_data['q'] = self.bigint_to_int(key.q)
|
|
||||||
key_public_data['g'] = self.bigint_to_int(key.g)
|
|
||||||
# Get public and private key exponents
|
|
||||||
key_public_data['y'] = self.bigint_to_int(key.pub_key)
|
|
||||||
key_private_data['x'] = self.bigint_to_int(key.priv_key)
|
|
||||||
try_fallback = False
|
|
||||||
except AttributeError:
|
|
||||||
# Use fallback if available
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
# Return 'unknown'
|
|
||||||
key_type = 'unknown ({0})'.format(self.key.type())
|
|
||||||
# If needed and if possible, fall back to cryptography
|
|
||||||
if try_fallback and PYOPENSSL_VERSION >= LooseVersion('16.1.0') and CRYPTOGRAPHY_FOUND:
|
|
||||||
return _get_cryptography_key_info(self.key.to_cryptography_key())
|
|
||||||
return key_type, key_public_data, key_private_data
|
|
||||||
|
|
||||||
def _is_key_consistent(self, key_public_data, key_private_data):
|
|
||||||
openssl_key_type = self.key.type()
|
|
||||||
if crypto.TYPE_RSA == openssl_key_type:
|
|
||||||
try:
|
|
||||||
return self.key.check()
|
|
||||||
except crypto.Error:
|
|
||||||
# OpenSSL error means that key is not consistent
|
|
||||||
return False
|
|
||||||
if crypto.TYPE_DSA == openssl_key_type:
|
|
||||||
result = _check_dsa_consistency(key_public_data, key_private_data)
|
|
||||||
if result is not None:
|
|
||||||
return result
|
|
||||||
signature = crypto.sign(self.key, SIGNATURE_TEST_DATA, 'sha256')
|
|
||||||
# Verify wants a cert (where it can get the public key from)
|
|
||||||
cert = crypto.X509()
|
|
||||||
cert.set_pubkey(self.key)
|
|
||||||
try:
|
|
||||||
crypto.verify(cert, signature, SIGNATURE_TEST_DATA, 'sha256')
|
|
||||||
return True
|
|
||||||
except crypto.Error:
|
|
||||||
return False
|
|
||||||
# If needed and if possible, fall back to cryptography
|
|
||||||
if PYOPENSSL_VERSION >= LooseVersion('16.1.0') and CRYPTOGRAPHY_FOUND:
|
|
||||||
return _is_cryptography_key_consistent(self.key.to_cryptography_key(), key_public_data, key_private_data)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
module = AnsibleModule(
|
module = AnsibleModule(
|
||||||
|
@ -601,49 +170,39 @@ def main():
|
||||||
supports_check_mode=True,
|
supports_check_mode=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
result = dict(
|
||||||
|
can_load_key=False,
|
||||||
|
can_parse_key=False,
|
||||||
|
key_is_consistent=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
if module.params['content'] is not None:
|
||||||
|
data = module.params['content'].encode('utf-8')
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
with open(module.params['path'], 'rb') as f:
|
||||||
|
data = f.read()
|
||||||
|
except (IOError, OSError) as e:
|
||||||
|
module.fail_json(msg='Error while reading private key file from disk: {0}'.format(e), **result)
|
||||||
|
|
||||||
|
result['can_load_key'] = True
|
||||||
|
|
||||||
|
backend, module_backend = select_backend(
|
||||||
|
module,
|
||||||
|
module.params['select_crypto_backend'],
|
||||||
|
data,
|
||||||
|
passphrase=module.params['passphrase'],
|
||||||
|
return_private_key_data=module.params['return_private_key_data'])
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if module.params['path'] is not None:
|
result.update(module_backend.get_info())
|
||||||
base_dir = os.path.dirname(module.params['path']) or '.'
|
|
||||||
if not os.path.isdir(base_dir):
|
|
||||||
module.fail_json(
|
|
||||||
name=base_dir,
|
|
||||||
msg='The directory %s does not exist or the file is not a directory' % base_dir
|
|
||||||
)
|
|
||||||
|
|
||||||
backend = module.params['select_crypto_backend']
|
|
||||||
if backend == 'auto':
|
|
||||||
# Detect what backend we can use
|
|
||||||
can_use_cryptography = CRYPTOGRAPHY_FOUND and CRYPTOGRAPHY_VERSION >= LooseVersion(MINIMAL_CRYPTOGRAPHY_VERSION)
|
|
||||||
can_use_pyopenssl = PYOPENSSL_FOUND and PYOPENSSL_VERSION >= LooseVersion(MINIMAL_PYOPENSSL_VERSION)
|
|
||||||
|
|
||||||
# If cryptography is available we'll use it
|
|
||||||
if can_use_cryptography:
|
|
||||||
backend = 'cryptography'
|
|
||||||
elif can_use_pyopenssl:
|
|
||||||
backend = 'pyopenssl'
|
|
||||||
|
|
||||||
# Fail if no backend has been found
|
|
||||||
if backend == 'auto':
|
|
||||||
module.fail_json(msg=("Can't detect any of the required Python libraries "
|
|
||||||
"cryptography (>= {0}) or PyOpenSSL (>= {1})").format(
|
|
||||||
MINIMAL_CRYPTOGRAPHY_VERSION,
|
|
||||||
MINIMAL_PYOPENSSL_VERSION))
|
|
||||||
|
|
||||||
if backend == 'pyopenssl':
|
|
||||||
if not PYOPENSSL_FOUND:
|
|
||||||
module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
|
|
||||||
exception=PYOPENSSL_IMP_ERR)
|
|
||||||
module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
|
|
||||||
version='2.0.0', collection_name='community.crypto')
|
|
||||||
privatekey = PrivateKeyInfoPyOpenSSL(module)
|
|
||||||
elif backend == 'cryptography':
|
|
||||||
if not CRYPTOGRAPHY_FOUND:
|
|
||||||
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
|
|
||||||
exception=CRYPTOGRAPHY_IMP_ERR)
|
|
||||||
privatekey = PrivateKeyInfoCryptography(module)
|
|
||||||
|
|
||||||
result = privatekey.get_info()
|
|
||||||
module.exit_json(**result)
|
module.exit_json(**result)
|
||||||
|
except PrivateKeyParseError as exc:
|
||||||
|
result.update(exc.result)
|
||||||
|
module.fail_json(msg=exc.error_message, **result)
|
||||||
|
except PrivateKeyConsistencyError as exc:
|
||||||
|
result.update(exc.result)
|
||||||
|
module.fail_json(msg=exc.error_message, **result)
|
||||||
except OpenSSLObjectError as exc:
|
except OpenSSLObjectError as exc:
|
||||||
module.fail_json(msg=to_native(exc))
|
module.fail_json(msg=to_native(exc))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue