removed keyserver/transient_key parameters and dependencies on PluginGPGRunner and GPGError

pull/743/head
Austin Lucas Lake 2024-05-05 14:07:18 -07:00
parent d5d9c5dcaf
commit 81166c799f
No known key found for this signature in database
GPG Key ID: 6A37FA54CFCFA4DB
1 changed files with 82 additions and 79 deletions

View File

@ -87,32 +87,32 @@ options:
type: list
elements: str
choices: ['encrypt', 'sign', 'auth']
expire_date:
description:
- Sets the expire date for the key.
- If O(expire_date=0), the key will never expire
- If O(expire_date=<n>), the key will expire in V(n) days.
- If O(expire_date=<n>w), the key will expire in V(n) weeks.
- If O(expire_date=<n>m), the key will expire in V(n) months.
- If O(expire_date=<n>y), the key will expire in V(n) years.
- If left unspecified, any created GPG keys will never expire.
type: str
name:
description:
- Specifies a name for the key.
- Specifies a name for the key's user id.
type: str
comment:
description:
- Specifies a comment for the key.
- Specifies a comment for the key's user id.
type: str
email:
description:
- Specifies an email for the key.
- Specifies an email for the key's user id.
type: str
passphrase:
description:
- Passphrase used to decrypt an existing private key or encrypt a newly generated private key.
type: str
keyserver:
description:
- Specifies keyserver to upload key to.
type: str
transient_key:
description:
- Allows key generation to use a faster, but less secure random number generator.
- If O(state=absent), this parameter is ignored.
type: bool
default: False
fingerprints:
description:
- Specifies keys to match against.
@ -166,7 +166,7 @@ changed:
type: bool
sample: True
fingerprints:
description: Fingerprint(s) of created or deleted key(s).
description: Fingerprint(s) of matching, created, or deleted primary key(s).
type: list
elements: str
sample: [ ABC123... ]
@ -176,57 +176,59 @@ import itertools
import re
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.community.crypto.plugins.plugin_utils.gnupg.py import PluginGPGRunner
from ansible_collections.community.crypto.plugins.module_utils.gnupg.cli import GPGError
def validate_key(key_type, key_length, key_curve, key_usage, key_name='primary key'):
if key_type == 'EDDSA':
if key_curve and key_curve != 'ed25519':
raise GPGError('Invalid curve for {} {}.'.format(key_type, key_name))
module.fail_json('Invalid curve for {} {}.'.format(key_type, key_name))
elif not key_curve:
raise GPGError('No curve provided for {} {}.'.format(key_type, key_name))
module.fail_json('No curve provided for {} {}.'.format(key_type, key_name))
elif key_usage and key_usage not in list(itertools.combinations(['sign', 'auth'])):
raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name))
module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name))
pass
elif key_type == 'ECDH':
if key_name == 'primary key':
raise GPGError('Invalid type for {}.'.format(key_name))
module.fail_json('Invalid type for {}.'.format(key_name))
elif key_curve and key_curve not in ['nistp256', 'nistp384', 'nistp521', 'brainpoolP256r1', 'brainpoolP384r1', 'brainpoolP512r1', 'secp256k1', 'cv25519']:
raise GPGError('Invalid curve for {} {}.'.format(key_type, key_name))
module.fail_json('Invalid curve for {} {}.'.format(key_type, key_name))
elif not key_curve:
raise GPGError('No curve provided for {} {}.'.format(key_type, key_name))
module.fail_json('No curve provided for {} {}.'.format(key_type, key_name))
elif key_usage and key_usage != ['encrypt']:
raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name))
module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name))
pass
elif key_type == 'ECDSA':
if key_curve and key_curve not in ['nistp256', 'nistp384', 'nistp521', 'brainpoolP256r1', 'brainpoolP384r1', 'brainpoolP512r1', 'secp256k1']:
raise GPGError('Invalid curve for {} {}.'.format(key_type, key_name))
module.fail_json('Invalid curve for {} {}.'.format(key_type, key_name))
elif not key_curve:
raise GPGError('No curve provided for {} {}.'.format(key_type, key_name))
module.fail_json('No curve provided for {} {}.'.format(key_type, key_name))
elif key_usage and key_usage not in list(itertools.combinations(['sign', 'auth'])):
raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name))
module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name))
pass
elif key_type == 'RSA':
if key_usage and key_usage not in list(itertools.combinatios(['ecrypt', 'sign', 'auth', 'cert'])):
raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name))
module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name))
pass
elif key_type == 'DSA':
if key_usage and key_usage not in list(itertools.combinations(['sign', 'auth'])):
raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name))
module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name))
pass
elif key_type == 'ELG':
if key_name == 'primary key':
raise GPGError('Invalid type for {}.'.format(key_name))
module.fail_json('Invalid type for {}.'.format(key_name))
elif key_usage != ['encrypt']:
raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name))
module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name))
pass
def validate_params(params):
validate_key(params['key_type'], params['key_length'], params['key_curve'], params['key_usage'])
def validate_params(module, params):
if not (params['expire_date'].isnumeric() or params['expire_date'][:-1].isnumeric()):
module.fail_json('Invalid format for expire date')
expire-date[-1] not in ['y']
validate_key(module, params['key_type'], params['key_length'], params['key_curve'], params['key_usage'])
for index, subkey in enumerate(params['subkeys']):
validate_key(subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage'], ('subkey #{}').format(index + 1))
validate_key(module, subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage'], 'subkey #{}'.format(index + 1))
def key_type_from_algo(algo):
@ -258,26 +260,26 @@ def expand_usages(usages):
return usages
def list_matching_keys(gpg_runner, params):
def list_matching_keys(module, params):
user_id = ''
if params['name']:
user_id += '{} '.format(params["name"])
user_id += '{} '.format(params['name'])
if params['comment']:
user_id += '({}) '.format(params["comment"])
user_id += '({}) '.format(params['comment'])
if params['email']:
user_id += '<{}>'.format(params["email"])
user_id += '<{}>'.format(params['email'])
if user_id:
user_id = '"{}"'.format(user_id.strip())
search_criteria = ' '.join(params['fingerprints']) + user_id
if params['fingerprints']:
search_criteria = ' '.join(params[''])
_, stdout, _ = gpg_runner.run_command(['gpg', '--list-secret-keys', '{}'.format(search_criteria)])
_, stdout, _ = module.run_command(['gpg', '--list-secret-keys', '{}'.format(search_criteria)])
lines = stdout.split('\n')
fingerprints = list(set([line.strip() for line in lines if line.strip().isalnum()]))
matching_keys = []
for fingerprint in fingerprints:
_, stdout, _ = gpg_runner.run_command(['gpg', '--list-secret-keys', '{}'.format(fingerprint)])
_, stdout, _ = module.run_command(['gpg', '--list-secret-keys', '{}'.format(fingerprint)])
lines = stdout.split('\n')
primary_key = lines[0]
subkey_count = 0
@ -338,20 +340,23 @@ def list_matching_keys(gpg_runner, params):
return matching_keys
def delete_keypair(gpg_runner, matching_keys, check_mode):
def delete_keypair(module, matching_keys, check_mode):
if matching_keys:
gpg_runner.run_command([
'--dry-run' if check_mode else '',
'--batch',
'--yes',
'--delete-secret-and-public-key',
*matching_keys
], check_rc=True)
module.run_command(
[
'--dry-run' if check_mode else '',
'--batch',
'--yes',
'--delete-secret-and-public-key',
*matching_keys
],
executable='gpg'
)
return dict(changed=True, fingerprints=matching_keys)
return dict(changed=False, fingerprints=[])
def add_subkey(gpg_runner, fingerprint, subkey_index, subkey_type, subkey_length, subkey_curve, subkey_usage, expire_date):
def add_subkey(module, fingerprint, subkey_index, subkey_type, subkey_length, subkey_curve, subkey_usage, expire_date):
if subkey_type in ['RSA', 'DSA', 'ELG']:
algo = '{}'.format(subkey_type.lower())
if subkey_length:
@ -360,19 +365,22 @@ def add_subkey(gpg_runner, fingerprint, subkey_index, subkey_type, subkey_length
algo = subkey_curve
if algo:
gpg_runner.run_command([
'--batch',
'--quick-add-key',
fingerprint,
algo if algo else 'default',
*subkey_usage,
expire_date if expire_date else 0
])
module.run_command(
[
'--batch',
'--quick-add-key',
fingerprint,
algo if algo else 'default',
*subkey_usage,
expire_date if expire_date else 0
],
executable='gpg'
)
else:
raise GPGError('No algorithm applied for subkey #{}'.format(subkey_index + 1))
module.fail_json('No algorithm applied for subkey #{}'.format(subkey_index + 1))
def generate_keypair(gpg_runner, params, matching_keys, check_mode):
def generate_keypair(module, params, matching_keys, check_mode):
if matching_keys:
return dict(changed=False, fingerprints=matching_keys)
@ -385,8 +393,6 @@ def generate_keypair(gpg_runner, params, matching_keys, check_mode):
{}
{}
{}
{}
{}
%commit
EOF
'''.format(
@ -398,35 +404,36 @@ def generate_keypair(gpg_runner, params, matching_keys, check_mode):
'Name-Comment: {}'.format(params['comment']) if params['comment'] else '',
'Name-Email: {}'.format(params['email']) if params['email'] else '',
'Passphrase: {}'.format(params['passphrase']) if params['passphrase'] else '%no-protection',
'Keyserver: {}'.format(params['keyserver']) if params['keyserver'] else '',
'%transient-key' if params['transient_key'] else ''
)
_, stdout, _ = gpg_runner.run_command([
'--dry-run' if check_mode else '',
'--batch',
'--log-file',
'/dev/stdout',
'--gen-key',
parameters
])
_, stdout, _ = module.run_command(
[
'--dry-run' if check_mode else '',
'--batch',
'--log-file',
'/dev/stdout',
'--gen-key',
parameters
],
executable='gpg'
)
fingerprint = re.search(r'([a-zA-Z0-9]*)\.rev', stdout)
for index, subkey in enumerate(params['subkeys']):
add_subkey(gpg_runner, fingerprint, index, subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage'], params['expire_date'])
add_subkey(module, fingerprint, index, subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage'], params['expire_date'])
return dict(changed=True, fingerprints=[fingerprint])
def run_module(params, check_mode=False):
validate_params(params)
gpg_runner = PluginGPGRunner()
matching_keys = list_matching_keys(gpg_runner, params)
module = PluginGPGRunner()
matching_keys = list_matching_keys(module, params)
if params['state'] == 'present':
result = generate_keypair(gpg_runner, params, matching_keys, check_mode)
result = generate_keypair(module, params, matching_keys, check_mode)
else:
result = delete_keypair(gpg_runner, matching_keys, check_mode)
result = delete_keypair(module, matching_keys, check_mode)
return result
@ -453,8 +460,6 @@ def main():
comment=dict(type='str'),
email=dict(type='str'),
passphrase=dict(type='str', no_log=True),
keyserver=dict(type='str'),
transient_key=dict(type='bool'),
fingerprints=dict(type='list', elements='str', no_log=True),
),
supports_check_mode=True,
@ -465,10 +470,8 @@ def main():
)
try:
results = run_module(module.params, module.check_mode)
results = run_module(module, module.params, module.check_mode)
module.exit_json(**results)
except GPGError as e:
module.fail_json(e)
except Exception as e:
module.fail_json(e)