From ea6b1d7eff7512d8c92978d0f9810352fa9e2b2f Mon Sep 17 00:00:00 2001 From: Austin Lucas Lake <53884490+austinlucaslake@users.noreply.github.com> Date: Sun, 5 May 2024 01:32:25 -0700 Subject: [PATCH] added stricter matching passed on user input --- plugins/modules/gpg_keypair.py | 218 +++++++++++++++++++++++---------- 1 file changed, 154 insertions(+), 64 deletions(-) diff --git a/plugins/modules/gpg_keypair.py b/plugins/modules/gpg_keypair.py index aa6e0461..fc682285 100644 --- a/plugins/modules/gpg_keypair.py +++ b/plugins/modules/gpg_keypair.py @@ -44,7 +44,7 @@ options: - For non-ECC keys, this specifies the number of bits in the key to create. - For RSA keys, the minimum is V(1024), the maximum is V(4096), and the default is V(3072). - For DSA keys, the minimum is V(768), the maximum is V(3072), and the default is V(2048). - - As per gpg's behavior, values below the allowed ranges will be set to the respective defaults, and those above the allowed ranges will saturate at the maximum. + - As per gpg's behavior, values below the allowed ranges will be set to the respective defaults, and values above will saturate at the maximum. - For ECC keys, this parameter will be ignored. type: int key_curve: @@ -60,7 +60,6 @@ options: - Specifies usage(s) for key. - V(cert) is given to all primary keys regardess, however can be used to only give V(vert) usage to a key. - If not usage is specified, the valid usages for the given key type with be assigned. - - If O(state) is V(absent), this parameter is ignored. type: list elements: str choices: ['encrypt', 'sign', 'auth', 'cert'] @@ -73,19 +72,21 @@ options: - For ECDH keys, the default curve is V(brainpoolP512r1). - ECDH keys also support the V(cv25519) curve. - For ELG keys, the minimum length is V(1024) bits, the maximum length is V(4096) bits, and the default length is V(3072) bits. - type: list elements: dict options: subkey_type: - type: str + type: str + choices: ['RSA', 'DSA', 'ECDSA', 'EDDSA', 'ECDH', 'ELG'] subkey_length: type: int subkey_curve: type: str + choices: ['nistp256', 'nistp384', 'nistp521', 'brainpoolP256r1', 'brainpoolP384r1', 'brainpoolP512r1', 'secp256k1', 'cv25519'] subkey_usage: type: list elements: str + choices: ['encrypt', 'sign', 'auth'] name: description: - Specifies a name for the key. @@ -101,25 +102,22 @@ options: passphrase: description: - Passphrase used to decrypt an existing private key or encrypt a newly generated private key. - - If O(state=absent), this parameter is ignored. type: str - fingerprints: - description: - - Specifies keys to match against. - - Provided fingerprints will take priority over user-id "O(name) (O(comment)) ". - - If O(state=absent), keys with the provided fingerprints will be deleted if found. - type: list - elements: str keyserver: description: - Specifies keyserver to upload key to. - - If O(state=absent), this parameter will be ignored. type: str transient_key: description: - Allows key generation to use a faster, but less secure random number generator. + - If O(state=absent), this parameter is ignored. type: bool default: False + fingerprints: + description: + - Specifies keys to match against. + type: list + elements: str ''' EXAMPLES = ''' @@ -128,7 +126,7 @@ EXAMPLES = ''' - name: Generate the default GPG keypair with a passphrase community.crypto.gpg_keypair: - passphrase: {{ passphrase }} + passphrase: "{{ passphrase }}" - name: Generate a RSA GPG keypair with the default RSA size (2048 bits) community.crypto.gpg_keypair: @@ -139,12 +137,12 @@ EXAMPLES = ''' key_type: RSA key_length: 4096 -- name: Generate an ECC GPG keypair +- name: Generate an ECC GPG keypair community.crypto.gpg_keypair: key_type: EDDSA key_curve: ed25519 -- name: Generate a GPG keypair and with a subkey: +- name: Generate a GPG keypair and with a subkey community.crypto.gpg_keypair: subkeys: - { subkey_type: ECDH, subkey_curve: cv25519 } @@ -155,16 +153,9 @@ EXAMPLES = ''' comment: comment email: name@email.com -- name: Delete GPG keypair(s) matching a specified user-id: +- name: Delete a GPG keypair matching a specified fingerprint community.crypto.gpg_keypair: state: absent - name: name - comment: comment - email: name@email.com - -- name: Delete a GPG keypair matching a specified fingerprint: - community.crypto.gpg_keypair: - state: abscent fingerprints: - ABC123... ''' @@ -175,7 +166,7 @@ changed: type: bool sample: True fingerprints: - description: Fingerprint(s) of newly created or matched key(s). + description: Fingerprint(s) of created or deleted key(s). type: list elements: str sample: [ ABC123... ] @@ -191,7 +182,7 @@ from ansible_collections.community.crypto.plugins.module_utils.gnupg.cli import from ansible_collections.community.crypto.plugins.plugin_utils.gnupg import GPGError -def validate_key(key_type, key_length, key_curve, key_usage, key_name = 'primary key'): +def validate_key(key_type, key_length, key_curve, key_usage, key_name='primary key'): if key_type == 'EDDSA': if key_curve and key_curve != 'ed25519': raise GPGError('Invalid curve for {} {}.'.format(key_type, key_name)) @@ -199,6 +190,7 @@ def validate_key(key_type, key_length, key_curve, key_usage, key_name = 'primary raise GPGError('No curve provided for {} {}.'.format(key_type, key_name)) elif key_usage and key_usage not in list(itertools.combinations(['sign', 'auth'])): raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + pass elif key_type == 'ECDH': if key_name = 'primary key': raise GPGError('Invalid type for {}.'.format(key_name)) @@ -206,6 +198,7 @@ def validate_key(key_type, key_length, key_curve, key_usage, key_name = 'primary raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) elif not key_curve: raise GPGError('No curve provided for {} {}.'.format(key_type, key_name)) + pass elif key_type == 'ECDSA': if key_curve and key_curve not in ['nistp256', 'nistp384', 'nistp521', 'brainpoolP256r1', 'brainpoolP384r1', 'brainpoolP512r1', 'secp256k1']: raise GPGError('Invalid curve for {} {}.'.format(key_type, key_name)) @@ -213,26 +206,59 @@ def validate_key(key_type, key_length, key_curve, key_usage, key_name = 'primary raise GPGError('No curve provided for {} {}.'.format(key_type, key_name)) elif key_usage and key_usage not in list(itertools.combinations(['sign', 'auth'])): raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + pass elif key_type == 'RSA': if key_usage and key_usage not in list(itertools.combinatios(['ecrypt', 'sign', 'auth'])): raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + pass elif key_type == 'DSA': if key_usage and key_usage not in list(itertools.combinations(['sign', 'auth'])): raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + pass elif key_type == 'ELG': if key_name == 'primary key': raise GPGError('Invalid type for {}.'.format(key_name)) elif key_usage != ['encrypt']: raise GPGError('Invalid usage for {} {}.'.format(key_type, key_name)) + pass def validate_params(params): validate_key(params['key_type'], params['key_length'], params['key_curve'], params['key_usage']) for index, subkey in enumerate(params['subkeys']): - validate_key(subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage'], ('subkey #{}').format(index+1)) + validate_key(subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage'], ('subkey #{}').format(index + 1)) -def list_matching_keys(name, comment, email, fingerprints): +def key_type_from_algo(algo): + if algo == 1: + return 'RSA' + elif algo == 16: + return 'ELG' + elif algo == 17: + return 'DSA' + elif algo == 18: + return 'ECDH' + elif algo == 19: + return 'ECDSA' + elif algo == 22: + return 'EDDSA' + + +def expand_usages(usages): + usages = list(usages) + for i in range(len(usages)): + if usages[i] == 'c': + usages[i] = 'cert' + elif usages[i] == 's': + usages[i] = 'sign' + elif usages[i] == 'a': + usages[i] = 'auth' + elif usages[i] == 'e': + usages[i] = 'encrypt' + return usages + + +def list_matching_keys(params): user_id = '' if params['name']: user_id += '{} '.format(params["name"]) @@ -243,15 +269,74 @@ def list_matching_keys(name, comment, email, fingerprints): if user_id: user_id = '"{}"'.format(user_id.strip()) - if user_id or fingerprints: - _, stdout, _ = gpg_runner.run_command(['gpg', '--batch', '--list-secret-keys', '{}'.format(*fingerprints if fingerprints else user_id)]) + search_criteria = ' '.join(params['fingerprints']) + user_id + params[] + if params['fingerprints']: + search_criteria = ' '.join(params['']) + _, stdout, _ = gpg_runner.run_command(['gpg', '--list-secret-keys', '{}'.format(*params['fingerprints'])]) + lines = stdout.split('\n') + fingerprints = list(set([line.strip() for line in lines in line.strip().isalnum()])) + matching_keys = [] + for fingerprint in fingerprints: + _, stdout, _ = gpg_runner.run_command(['gpg', '--list-secret-keys', '{}'.format(fingerprint)]) lines = stdout.split('\n') - matching_keys = [line.strip() for line in lines if line.strip().isalnum()] - for key in matching_keys: - # TODO: match based on key_type, key_usage, key_curve, and subkeys - pass - return matching_keys - return [] + primary_key = lines[0] + subkey_count = 0 + is_match = True + uid_present = False + for line in lines: + if line[:3] == 'sec': + primary_key = re.search(r'sec:u:([0-9]*):([0-9]*):[0-9A-Z]*:*([a-z])*:*+:*([0-9a-zA-Z])', line) + key_type = key_type_from_algo(int(primary_key.group(2))) + if params['key_type'] and params['key_type'] != key_type: + is_match = False + break + if key_type in ['RSA', 'DSA', 'ELG']: + key_length = int(primary_key.group(1)) + if params['key_length'] and params['key_length'] != key_length: + is_match = False + break + else: + key_curve = primary_key.group(4) + if params['key_curve'] and params['key_curve'] != key_curve: + is_match = False + break + key_usage = expand_usages(primary_key.group(3)) + if params['key_usage'] and params['key_usage'] in itertools.permutations(key_usage): + is_match = False + break + elif line[:3] == 'uid': + uid = re.search(r'uid:u:*[0-9]*::[0-9a-zA-Z]*::(.*):*0:').group(1) + if user_id == uid: + uid_present = True + elif line[:3] == 'ssb': + subkey_count += 1 + if subkey_count > len(params['subkeys']): + is_match = False + break + subkey = re.search(r'ssb:u:([0-9]*):([0-9]*):[0-9A-Z]*:*([a-z])*:*+:*([0-9a-zA-Z])', line) + subkey_type = key_type_from_algo(int(subkey.group(2))) + if params['subkeys'][subkey_count]['type'] and params['subkeys'][subkey_count]['type'] != subkey_type: + + is_match = False + break + if subkey_type in ['RSA', 'DSA', 'ELG']: + subkey_length = int(subkey.group(1)) + if params['subkeys'][subkey_count]['length'] and params['subkeys'][subkey_count]['length'] != subkey_length: + is_match = False + break + else: + subkey_curve = subkey.group(4) + if params['subkeys'][subkey_count]['curve'] and params['subkeys'][subkey_count]['curve'] != subkey_curve: + is_match = False + break + subkey_usage = expand_usages(subkey.group(3)) + if params['subkeys'][subkey_count]['usage'] and params['subkeys'][subkey_count]['usage'] in itertools.permutations(subkey_usage): + is_match = False + break + if good_match and uid_present: + matching_keys.append(fingerprint) + return matching_keys def delete_keypair(gpg_runner, matching_keys, check_mode): @@ -274,13 +359,18 @@ def add_subkey(gpg_runner, fingerprint, subkey_index, subkey_type, subkey_length algo += str(subkey_length) elif subkey_curve: algo = subkey_curve + + if algo: + gpg_runner.run_command([ + '--batch', + '--quick-add-key', + fingerprint, + algo if algo else 'default', + *usage, + expire_date if expire_date else 0 + ]) else: - algo = None - gpg_runner.run_command([ - '--batch', '--quick-add-key', fingerprint, algo if algo else 'default', *usage, expire_date if expire_date else 0 - ]) - else: - raise GPGError('No algorithm applied for subkey #{}'.format(subkey_index+1)) + raise GPGError('No algorithm applied for subkey #{}'.format(subkey_index + 1)) def generate_keypair(gpg_runner, params, matching_keys, check_mode): @@ -301,17 +391,17 @@ def generate_keypair(gpg_runner, params, matching_keys, check_mode): %commit EOF '''.format( - 'Key-Type: {}'.format(params['key_type'] if params['key_type'] else 'default'), - 'Key-Length: {}'.format(params['key_length']) if params['key_length'] else '', - 'Key-Curve: {}'.format(params['key_curve']) if params['key_curve'] else '', - 'Expire-Date: {}'.format(params['expire_date']) if params['expire_date'] else '', - 'Name-Real: {}'.format(params['name']) if params['name'] else '', - 'Name-Comment: {}'.format(params['comment']) if params['comment'] else '', - 'Name-Email: {}'.format(params['email']) if params['email'] else '', - 'Passphrase: {}'.format(params['passphrase']) if params['passphrase'] else '%no-protection', - 'Keyserver: {}'.format(params['keyserver']) if params['keyserver'] else '', - '%transient-key' if params['transient_key'] else '' - ) + 'Key-Type: {}'.format(params['key_type'] if params['key_type'] else 'default'), + 'Key-Length: {}'.format(params['key_length']) if params['key_length'] else '', + 'Key-Curve: {}'.format(params['key_curve']) if params['key_curve'] else '', + 'Expire-Date: {}'.format(params['expire_date']) if params['expire_date'] else '', + 'Name-Real: {}'.format(params['name']) if params['name'] else '', + 'Name-Comment: {}'.format(params['comment']) if params['comment'] else '', + 'Name-Email: {}'.format(params['email']) if params['email'] else '', + 'Passphrase: {}'.format(params['passphrase']) if params['passphrase'] else '%no-protection', + 'Keyserver: {}'.format(params['keyserver']) if params['keyserver'] else '', + '%transient-key' if params['transient_key'] else '' + ) _, stdout, _ = gpg_runner.run_command([ '--dry-run' if check_mode else '', @@ -322,15 +412,15 @@ def generate_keypair(gpg_runner, params, matching_keys, check_mode): parameters ]) - fingerprint = re.search(r"([a-zA-Z0-9]*)\.rev", stdout) - + fingerprint = re.search(r'([a-zA-Z0-9]*)\.rev', stdout) + for index, subkey in enumerate(params['subkeys']): add_subkey(gpg_runner, fingerprint, index, subkey['subkey_type'], subkey['subkey_length'], subkey['subkey_curve'], subkey['subkey_usage']) return dict(changed=True, fingerprints=[fingerprint]) -def run_module(params, check_mode = False): +def run_module(params, check_mode=False): validate_params(params) gpg_runner = PluginGPGRunner() matching_keys = list_matching_keys( @@ -363,15 +453,15 @@ def main(): subkey_length=dict(type='int'), subkey_curve=dict(type='str', choices=key_curves), subkey_usage=dict(type='list', elements='str', choices=key_usages[:-1]) - )), - name=dict(type='str', default=None), - comment=dict(type='str', default=None), - email=dict(type='str', default=None), - passphrase=dict(type='str', default=None, no_log=True), - fingerprints=dict(type='list', elements='str', default=None, no_log=True), - keyserver=dict(type='str', default=None), - transient_key=dict(type='bool', default=False) + expire_date=dict(type='str') + name=dict(type='str'), + comment=dict(type='str'), + email=dict(type='str'), + passphrase=dict(type='str', no_log=True), + keyserver=dict(type='str'), + transient_key=dict(type='bool') + fingerprints=dict(type='list', elements='str', no_log=True), ), supports_check_mode=True, required_if=[