diff --git a/plugins/modules/gpg_keypair.py b/plugins/modules/gpg_keypair.py index f56c234f..94bc04a7 100644 --- a/plugins/modules/gpg_keypair.py +++ b/plugins/modules/gpg_keypair.py @@ -87,32 +87,32 @@ options: type: list elements: str choices: ['encrypt', 'sign', 'auth'] + expire_date: + description: + - Sets the expire date for the key. + - If O(expire_date=0), the key will never expire + - If O(expire_date=), the key will expire in V(n) days. + - If O(expire_date=w), the key will expire in V(n) weeks. + - If O(expire_date=m), the key will expire in V(n) months. + - If O(expire_date=y), the key will expire in V(n) years. + - If left unspecified, any created GPG keys will never expire. + type: str name: description: - - Specifies a name for the key. + - Specifies a name for the key's user id. type: str comment: description: - - Specifies a comment for the key. + - Specifies a comment for the key's user id. type: str email: description: - - Specifies an email for the key. + - Specifies an email for the key's user id. type: str passphrase: description: - Passphrase used to decrypt an existing private key or encrypt a newly generated private key. type: str - keyserver: - description: - - Specifies keyserver to upload key to. - type: str - transient_key: - description: - - Allows key generation to use a faster, but less secure random number generator. - - If O(state=absent), this parameter is ignored. - type: bool - default: False fingerprints: description: - Specifies keys to match against. @@ -166,7 +166,7 @@ changed: type: bool sample: True fingerprints: - description: Fingerprint(s) of created or deleted key(s). + description: Fingerprint(s) of matching, created, or deleted primary key(s). type: list elements: str sample: [ ABC123... ] @@ -176,57 +176,59 @@ import itertools import re from ansible.module_utils.basic import AnsibleModule -from ansible_collections.community.crypto.plugins.plugin_utils.gnupg.py import PluginGPGRunner from ansible_collections.community.crypto.plugins.module_utils.gnupg.cli import GPGError def validate_key(key_type, key_length, key_curve, key_usage, key_name='primary key'): if key_type == 'EDDSA': if key_curve and key_curve != 'ed25519': - raise GPGError('Invalid curve for {} {}.'.format(key_type, key_name)) + module.fail_json('Invalid curve for {} {}.'.format(key_type, key_name)) elif not key_curve: - raise GPGError('No curve provided for {} {}.'.format(key_type, key_name)) + module.fail_json('No curve provided for {} {}.'.format(key_type, key_name)) elif key_usage and key_usage not in list(itertools.combinations(['sign', 'auth'])): - raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name)) pass elif key_type == 'ECDH': if key_name == 'primary key': - raise GPGError('Invalid type for {}.'.format(key_name)) + module.fail_json('Invalid type for {}.'.format(key_name)) elif key_curve and key_curve not in ['nistp256', 'nistp384', 'nistp521', 'brainpoolP256r1', 'brainpoolP384r1', 'brainpoolP512r1', 'secp256k1', 'cv25519']: - raise GPGError('Invalid curve for {} {}.'.format(key_type, key_name)) + module.fail_json('Invalid curve for {} {}.'.format(key_type, key_name)) elif not key_curve: - raise GPGError('No curve provided for {} {}.'.format(key_type, key_name)) + module.fail_json('No curve provided for {} {}.'.format(key_type, key_name)) elif key_usage and key_usage != ['encrypt']: - raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name)) pass elif key_type == 'ECDSA': if key_curve and key_curve not in ['nistp256', 'nistp384', 'nistp521', 'brainpoolP256r1', 'brainpoolP384r1', 'brainpoolP512r1', 'secp256k1']: - raise GPGError('Invalid curve for {} {}.'.format(key_type, key_name)) + module.fail_json('Invalid curve for {} {}.'.format(key_type, key_name)) elif not key_curve: - raise GPGError('No curve provided for {} {}.'.format(key_type, key_name)) + module.fail_json('No curve provided for {} {}.'.format(key_type, key_name)) elif key_usage and key_usage not in list(itertools.combinations(['sign', 'auth'])): - raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name)) pass elif key_type == 'RSA': if key_usage and key_usage not in list(itertools.combinatios(['ecrypt', 'sign', 'auth', 'cert'])): - raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name)) pass elif key_type == 'DSA': if key_usage and key_usage not in list(itertools.combinations(['sign', 'auth'])): - raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name)) pass elif key_type == 'ELG': if key_name == 'primary key': - raise GPGError('Invalid type for {}.'.format(key_name)) + module.fail_json('Invalid type for {}.'.format(key_name)) elif key_usage != ['encrypt']: - raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + module.fail_json('Invalid usage for {} {}.'.format(key_type, key_name)) pass -def validate_params(params): - validate_key(params['key_type'], params['key_length'], params['key_curve'], params['key_usage']) +def validate_params(module, params): + if not (params['expire_date'].isnumeric() or params['expire_date'][:-1].isnumeric()): + module.fail_json('Invalid format for expire date') + expire-date[-1] not in ['y'] + validate_key(module, params['key_type'], params['key_length'], params['key_curve'], params['key_usage']) for index, subkey in enumerate(params['subkeys']): - validate_key(subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage'], ('subkey #{}').format(index + 1)) + validate_key(module, subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage'], 'subkey #{}'.format(index + 1)) def key_type_from_algo(algo): @@ -258,26 +260,26 @@ def expand_usages(usages): return usages -def list_matching_keys(gpg_runner, params): +def list_matching_keys(module, params): user_id = '' if params['name']: - user_id += '{} '.format(params["name"]) + user_id += '{} '.format(params['name']) if params['comment']: - user_id += '({}) '.format(params["comment"]) + user_id += '({}) '.format(params['comment']) if params['email']: - user_id += '<{}>'.format(params["email"]) + user_id += '<{}>'.format(params['email']) if user_id: user_id = '"{}"'.format(user_id.strip()) search_criteria = ' '.join(params['fingerprints']) + user_id if params['fingerprints']: search_criteria = ' '.join(params['']) - _, stdout, _ = gpg_runner.run_command(['gpg', '--list-secret-keys', '{}'.format(search_criteria)]) + _, stdout, _ = module.run_command(['gpg', '--list-secret-keys', '{}'.format(search_criteria)]) lines = stdout.split('\n') fingerprints = list(set([line.strip() for line in lines if line.strip().isalnum()])) matching_keys = [] for fingerprint in fingerprints: - _, stdout, _ = gpg_runner.run_command(['gpg', '--list-secret-keys', '{}'.format(fingerprint)]) + _, stdout, _ = module.run_command(['gpg', '--list-secret-keys', '{}'.format(fingerprint)]) lines = stdout.split('\n') primary_key = lines[0] subkey_count = 0 @@ -338,20 +340,23 @@ def list_matching_keys(gpg_runner, params): return matching_keys -def delete_keypair(gpg_runner, matching_keys, check_mode): +def delete_keypair(module, matching_keys, check_mode): if matching_keys: - gpg_runner.run_command([ - '--dry-run' if check_mode else '', - '--batch', - '--yes', - '--delete-secret-and-public-key', - *matching_keys - ], check_rc=True) + module.run_command( + [ + '--dry-run' if check_mode else '', + '--batch', + '--yes', + '--delete-secret-and-public-key', + *matching_keys + ], + executable='gpg' + ) return dict(changed=True, fingerprints=matching_keys) return dict(changed=False, fingerprints=[]) -def add_subkey(gpg_runner, fingerprint, subkey_index, subkey_type, subkey_length, subkey_curve, subkey_usage, expire_date): +def add_subkey(module, fingerprint, subkey_index, subkey_type, subkey_length, subkey_curve, subkey_usage, expire_date): if subkey_type in ['RSA', 'DSA', 'ELG']: algo = '{}'.format(subkey_type.lower()) if subkey_length: @@ -360,19 +365,22 @@ def add_subkey(gpg_runner, fingerprint, subkey_index, subkey_type, subkey_length algo = subkey_curve if algo: - gpg_runner.run_command([ - '--batch', - '--quick-add-key', - fingerprint, - algo if algo else 'default', - *subkey_usage, - expire_date if expire_date else 0 - ]) + module.run_command( + [ + '--batch', + '--quick-add-key', + fingerprint, + algo if algo else 'default', + *subkey_usage, + expire_date if expire_date else 0 + ], + executable='gpg' + ) else: - raise GPGError('No algorithm applied for subkey #{}'.format(subkey_index + 1)) + module.fail_json('No algorithm applied for subkey #{}'.format(subkey_index + 1)) -def generate_keypair(gpg_runner, params, matching_keys, check_mode): +def generate_keypair(module, params, matching_keys, check_mode): if matching_keys: return dict(changed=False, fingerprints=matching_keys) @@ -385,8 +393,6 @@ def generate_keypair(gpg_runner, params, matching_keys, check_mode): {} {} {} - {} - {} %commit EOF '''.format( @@ -398,35 +404,36 @@ def generate_keypair(gpg_runner, params, matching_keys, check_mode): 'Name-Comment: {}'.format(params['comment']) if params['comment'] else '', 'Name-Email: {}'.format(params['email']) if params['email'] else '', 'Passphrase: {}'.format(params['passphrase']) if params['passphrase'] else '%no-protection', - 'Keyserver: {}'.format(params['keyserver']) if params['keyserver'] else '', - '%transient-key' if params['transient_key'] else '' ) - _, stdout, _ = gpg_runner.run_command([ - '--dry-run' if check_mode else '', - '--batch', - '--log-file', - '/dev/stdout', - '--gen-key', - parameters - ]) + _, stdout, _ = module.run_command( + [ + '--dry-run' if check_mode else '', + '--batch', + '--log-file', + '/dev/stdout', + '--gen-key', + parameters + ], + executable='gpg' + ) fingerprint = re.search(r'([a-zA-Z0-9]*)\.rev', stdout) for index, subkey in enumerate(params['subkeys']): - add_subkey(gpg_runner, fingerprint, index, subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage'], params['expire_date']) + add_subkey(module, fingerprint, index, subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage'], params['expire_date']) return dict(changed=True, fingerprints=[fingerprint]) def run_module(params, check_mode=False): validate_params(params) - gpg_runner = PluginGPGRunner() - matching_keys = list_matching_keys(gpg_runner, params) + module = PluginGPGRunner() + matching_keys = list_matching_keys(module, params) if params['state'] == 'present': - result = generate_keypair(gpg_runner, params, matching_keys, check_mode) + result = generate_keypair(module, params, matching_keys, check_mode) else: - result = delete_keypair(gpg_runner, matching_keys, check_mode) + result = delete_keypair(module, matching_keys, check_mode) return result @@ -453,8 +460,6 @@ def main(): comment=dict(type='str'), email=dict(type='str'), passphrase=dict(type='str', no_log=True), - keyserver=dict(type='str'), - transient_key=dict(type='bool'), fingerprints=dict(type='list', elements='str', no_log=True), ), supports_check_mode=True, @@ -465,10 +470,8 @@ def main(): ) try: - results = run_module(module.params, module.check_mode) + results = run_module(module, module.params, module.check_mode) module.exit_json(**results) - except GPGError as e: - module.fail_json(e) except Exception as e: module.fail_json(e)