acme: improve error handling in backend's parse_key() (#208)

* Improve error handling in backend's parse_key().

* Adjust unit tests.
pull/210/head
Felix Fontein 2021-03-22 07:30:06 +01:00 committed by GitHub
parent e85554827f
commit f5fd5fdf5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 29 additions and 31 deletions

View File

@ -159,7 +159,7 @@ class ACMELegacyAccount(object):
try: try:
return None, self.client.parse_key(key_file=key_file, key_content=key_content) return None, self.client.parse_key(key_file=key_file, key_content=key_content)
except KeyParsingError as e: except KeyParsingError as e:
return e.msg, None return e.msg, {}
def sign_request(self, protected, payload, key_data, encode_payload=True): def sign_request(self, protected, payload, key_data, encode_payload=True):
return self.client.sign_request(protected, payload, key_data, encode_payload=encode_payload) return self.client.sign_request(protected, payload, key_data, encode_payload=encode_payload)

View File

@ -157,10 +157,7 @@ class ACMEClient(object):
''' '''
if key_file is None and key_content is None: if key_file is None and key_content is None:
raise AssertionError('One of key_file and key_content must be specified!') raise AssertionError('One of key_file and key_content must be specified!')
error, key_data = self.backend.parse_key(key_file, key_content, passphrase=passphrase) return self.backend.parse_key(key_file, key_content, passphrase=passphrase)
if error:
raise KeyParsingError(error)
return key_data
def sign_request(self, protected, payload, key_data, encode_payload=True): def sign_request(self, protected, payload, key_data, encode_payload=True):
''' '''

View File

@ -24,7 +24,10 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.certificates
ChainMatcher, ChainMatcher,
) )
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import BackendException from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
BackendException,
KeyParsingError,
)
from ansible_collections.community.crypto.plugins.module_utils.acme.io import read_file from ansible_collections.community.crypto.plugins.module_utils.acme.io import read_file
@ -181,8 +184,8 @@ class CryptographyBackend(CryptoBackend):
def parse_key(self, key_file=None, key_content=None, passphrase=None): def parse_key(self, key_file=None, key_content=None, passphrase=None):
''' '''
Parses an RSA or Elliptic Curve key file in PEM format and returns a pair Parses an RSA or Elliptic Curve key file in PEM format and returns key_data.
(error, key_data). Raises KeyParsingError in case of errors.
''' '''
# If key_content isn't given, read key_file # If key_content isn't given, read key_file
if key_content is None: if key_content is None:
@ -196,10 +199,10 @@ class CryptographyBackend(CryptoBackend):
password=to_bytes(passphrase) if passphrase is not None else None, password=to_bytes(passphrase) if passphrase is not None else None,
backend=_cryptography_backend) backend=_cryptography_backend)
except Exception as e: except Exception as e:
return 'error while loading key: {0}'.format(e), None raise KeyParsingError('error while loading key: {0}'.format(e))
if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey): if isinstance(key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
pk = key.public_key().public_numbers() pk = key.public_key().public_numbers()
return None, { return {
'key_obj': key, 'key_obj': key,
'type': 'rsa', 'type': 'rsa',
'alg': 'RS256', 'alg': 'RS256',
@ -233,9 +236,9 @@ class CryptographyBackend(CryptoBackend):
point_size = 66 point_size = 66
curve = 'P-521' curve = 'P-521'
else: else:
return 'unknown elliptic curve: {0}'.format(pk.curve.name), {} raise KeyParsingError('unknown elliptic curve: {0}'.format(pk.curve.name))
num_bytes = (bits + 7) // 8 num_bytes = (bits + 7) // 8
return None, { return {
'key_obj': key, 'key_obj': key,
'type': 'ec', 'type': 'ec',
'alg': alg, 'alg': alg,
@ -249,7 +252,7 @@ class CryptographyBackend(CryptoBackend):
'point_size': point_size, 'point_size': point_size,
} }
else: else:
return 'unknown key type "{0}"'.format(type(key)), {} raise KeyParsingError('unknown key type "{0}"'.format(type(key)))
def sign(self, payload64, protected64, key_data): def sign(self, payload64, protected64, key_data):
sign_payload = "{0}.{1}".format(protected64, payload64).encode('utf8') sign_payload = "{0}.{1}".format(protected64, payload64).encode('utf8')

View File

@ -24,6 +24,7 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.backends imp
from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ( from ansible_collections.community.crypto.plugins.module_utils.acme.errors import (
BackendException, BackendException,
KeyParsingError,
) )
from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64 from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64
@ -43,11 +44,11 @@ class OpenSSLCLIBackend(CryptoBackend):
def parse_key(self, key_file=None, key_content=None, passphrase=None): def parse_key(self, key_file=None, key_content=None, passphrase=None):
''' '''
Parses an RSA or Elliptic Curve key file in PEM format and returns a pair Parses an RSA or Elliptic Curve key file in PEM format and returns key_data.
(error, key_data). Raises KeyParsingError in case of errors.
''' '''
if passphrase is not None: if passphrase is not None:
return 'openssl backend does not support key passphrases', {} raise KeyParsingError('openssl backend does not support key passphrases')
# If key_file isn't given, but key_content, write that to a temporary file # If key_file isn't given, but key_content, write that to a temporary file
if key_file is None: if key_file is None:
fd, tmpsrc = tempfile.mkstemp() fd, tmpsrc = tempfile.mkstemp()
@ -61,7 +62,7 @@ class OpenSSLCLIBackend(CryptoBackend):
f.close() f.close()
except Exception as dummy: except Exception as dummy:
pass pass
raise BackendException("failed to create temporary content file: %s" % to_native(err), exception=traceback.format_exc()) raise KeyParsingError("failed to create temporary content file: %s" % to_native(err), exception=traceback.format_exc())
f.close() f.close()
# Parse key # Parse key
account_key_type = None account_key_type = None
@ -78,7 +79,7 @@ class OpenSSLCLIBackend(CryptoBackend):
# FIXME: add some kind of auto-detection # FIXME: add some kind of auto-detection
account_key_type = "rsa" account_key_type = "rsa"
if account_key_type not in ("rsa", "ec"): if account_key_type not in ("rsa", "ec"):
return 'unknown key type "%s"' % account_key_type, {} raise KeyParsingError('unknown key type "%s"' % account_key_type)
openssl_keydump_cmd = [self.openssl_binary, account_key_type, "-in", key_file, "-noout", "-text"] openssl_keydump_cmd = [self.openssl_binary, account_key_type, "-in", key_file, "-noout", "-text"]
dummy, out, dummy = self.module.run_command( dummy, out, dummy = self.module.run_command(
@ -92,7 +93,7 @@ class OpenSSLCLIBackend(CryptoBackend):
if len(pub_exp) % 2: if len(pub_exp) % 2:
pub_exp = "0{0}".format(pub_exp) pub_exp = "0{0}".format(pub_exp)
return None, { return {
'key_file': key_file, 'key_file': key_file,
'type': 'rsa', 'type': 'rsa',
'alg': 'RS256', 'alg': 'RS256',
@ -108,7 +109,7 @@ class OpenSSLCLIBackend(CryptoBackend):
r"pub:\s*\n\s+04:([a-f0-9\:\s]+?)\nASN1 OID: (\S+)(?:\nNIST CURVE: (\S+))?", r"pub:\s*\n\s+04:([a-f0-9\:\s]+?)\nASN1 OID: (\S+)(?:\nNIST CURVE: (\S+))?",
to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL) to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL)
if pub_data is None: if pub_data is None:
return 'cannot parse elliptic curve key', {} raise KeyParsingError('cannot parse elliptic curve key')
pub_hex = binascii.unhexlify(re.sub(r"(\s|:)", "", pub_data.group(1)).encode("utf-8")) pub_hex = binascii.unhexlify(re.sub(r"(\s|:)", "", pub_data.group(1)).encode("utf-8"))
asn1_oid_curve = pub_data.group(2).lower() asn1_oid_curve = pub_data.group(2).lower()
nist_curve = pub_data.group(3).lower() if pub_data.group(3) else None nist_curve = pub_data.group(3).lower() if pub_data.group(3) else None
@ -133,11 +134,11 @@ class OpenSSLCLIBackend(CryptoBackend):
point_size = 66 point_size = 66
curve = 'P-521' curve = 'P-521'
else: else:
return 'unknown elliptic curve: %s / %s' % (asn1_oid_curve, nist_curve), {} raise KeyParsingError('unknown elliptic curve: %s / %s' % (asn1_oid_curve, nist_curve))
num_bytes = (bits + 7) // 8 num_bytes = (bits + 7) // 8
if len(pub_hex) != 2 * num_bytes: if len(pub_hex) != 2 * num_bytes:
return 'bad elliptic curve point (%s / %s)' % (asn1_oid_curve, nist_curve), {} raise KeyParsingError('bad elliptic curve point (%s / %s)' % (asn1_oid_curve, nist_curve))
return None, { return {
'key_file': key_file, 'key_file': key_file,
'type': 'ec', 'type': 'ec',
'alg': alg, 'alg': alg,

View File

@ -21,8 +21,8 @@ class CryptoBackend(object):
@abc.abstractmethod @abc.abstractmethod
def parse_key(self, key_file=None, key_content=None, passphrase=None): def parse_key(self, key_file=None, key_content=None, passphrase=None):
''' '''
Parses an RSA or Elliptic Curve key file in PEM format and returns a pair Parses an RSA or Elliptic Curve key file in PEM format and returns key_data.
(error, key_data). Raises KeyParsingError in case of errors.
''' '''
@abc.abstractmethod @abc.abstractmethod

View File

@ -30,12 +30,10 @@ def test_eckeyparse_cryptography(pem, result, dummy, tmpdir):
fn.write(pem) fn.write(pem)
module = MagicMock() module = MagicMock()
backend = CryptographyBackend(module) backend = CryptographyBackend(module)
error, key = backend.parse_key(key_file=str(fn)) key = backend.parse_key(key_file=str(fn))
assert error is None
key.pop('key_obj') key.pop('key_obj')
assert key == result assert key == result
error, key = backend.parse_key(key_content=pem) key = backend.parse_key(key_content=pem)
assert error is None
key.pop('key_obj') key.pop('key_obj')
assert key == result assert key == result

View File

@ -37,8 +37,7 @@ def test_eckeyparse_openssl(pem, result, openssl_output, tmpdir):
module = MagicMock() module = MagicMock()
module.run_command = MagicMock(return_value=(0, openssl_output, 0)) module.run_command = MagicMock(return_value=(0, openssl_output, 0))
backend = OpenSSLCLIBackend(module, openssl_binary='openssl') backend = OpenSSLCLIBackend(module, openssl_binary='openssl')
error, key = backend.parse_key(key_file=str(fn)) key = backend.parse_key(key_file=str(fn))
assert error is None
key.pop('key_file') key.pop('key_file')
assert key == result assert key == result