[PR #9324/6cd3f79e backport][stable-10] lookup plugins: use f-strings (#9367)

lookup plugins: use f-strings (#9324)

* lookup plugins: use f-strings

* add changelog frag

* manual change for few occurrences

* Update plugins/lookup/dependent.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* adjustment from review

* no f-string for you

* Update plugins/lookup/dependent.py

Co-authored-by: Felix Fontein <felix@fontein.de>

---------

Co-authored-by: Felix Fontein <felix@fontein.de>
(cherry picked from commit 6cd3f79e19)

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
pull/9376/head
patchback[bot] 2024-12-25 16:46:55 +01:00 committed by GitHub
parent 74bd7f1471
commit a5c448d6e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 165 additions and 149 deletions

View File

@ -0,0 +1,29 @@
minor_changes:
- bitwarden lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- chef_databag lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- collection_version lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- consul_kv lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- credstash lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- cyberarkpassword lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- dependent lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- dig lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- dnstxt lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- dsv lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- etcd lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- etcd3 lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- filetree lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- github_app_access_token lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- hiera lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- keyring lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- lastpass lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- lmdb_kv lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- manifold lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- merge_variables lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- onepassword lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- onepassword_doc lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- passwordstore lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- random_pet lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- redis lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- revbitspss lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- shelvefile lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- tss lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).

View File

@ -207,7 +207,7 @@ class Bitwarden(object):
continue continue
if matches and not field_matches: if matches and not field_matches:
raise AnsibleError("field {field} does not exist in {search_value}".format(field=field, search_value=search_value)) raise AnsibleError(f"field {field} does not exist in {search_value}")
return field_matches return field_matches

View File

@ -81,11 +81,11 @@ class LookupModule(LookupBase):
setattr(self, arg, parsed) setattr(self, arg, parsed)
except ValueError: except ValueError:
raise AnsibleError( raise AnsibleError(
"can't parse arg {0}={1} as string".format(arg, arg_raw) f"can't parse arg {arg}={arg_raw} as string"
) )
if args: if args:
raise AnsibleError( raise AnsibleError(
"unrecognized arguments to with_sequence: %r" % list(args.keys()) f"unrecognized arguments to with_sequence: {list(args.keys())!r}"
) )
def run(self, terms, variables=None, **kwargs): def run(self, terms, variables=None, **kwargs):

View File

@ -115,10 +115,10 @@ class LookupModule(LookupBase):
for term in terms: for term in terms:
if not FQCN_RE.match(term): if not FQCN_RE.match(term):
raise AnsibleLookupError('"{term}" is not a FQCN'.format(term=term)) raise AnsibleLookupError(f'"{term}" is not a FQCN')
try: try:
collection_pkg = import_module('ansible_collections.{fqcn}'.format(fqcn=term)) collection_pkg = import_module(f'ansible_collections.{term}')
except ImportError: except ImportError:
# Collection not found # Collection not found
result.append(not_found) result.append(not_found)
@ -127,7 +127,7 @@ class LookupModule(LookupBase):
try: try:
data = load_collection_meta(collection_pkg, no_version=no_version) data = load_collection_meta(collection_pkg, no_version=no_version)
except Exception as exc: except Exception as exc:
raise AnsibleLookupError('Error while loading metadata for {fqcn}: {error}'.format(fqcn=term, error=exc)) raise AnsibleLookupError(f'Error while loading metadata for {term}: {exc}')
result.append(data.get('version', no_version)) result.append(data.get('version', no_version))

View File

@ -171,7 +171,7 @@ class LookupModule(LookupBase):
values.append(to_text(results[1]['Value'])) values.append(to_text(results[1]['Value']))
except Exception as e: except Exception as e:
raise AnsibleError( raise AnsibleError(
"Error locating '%s' in kv store. Error was %s" % (term, e)) f"Error locating '{term}' in kv store. Error was {e}")
return values return values
@ -192,7 +192,7 @@ class LookupModule(LookupBase):
if param and len(param) > 0: if param and len(param) > 0:
name, value = param.split('=') name, value = param.split('=')
if name not in paramvals: if name not in paramvals:
raise AnsibleAssertionError("%s not a valid consul lookup parameter" % name) raise AnsibleAssertionError(f"{name} not a valid consul lookup parameter")
paramvals[name] = value paramvals[name] = value
except (ValueError, AssertionError) as e: except (ValueError, AssertionError) as e:
raise AnsibleError(e) raise AnsibleError(e)

View File

@ -137,8 +137,8 @@ class LookupModule(LookupBase):
try: try:
ret.append(credstash.getSecret(term, version, region, table, context=context, **kwargs_pass)) ret.append(credstash.getSecret(term, version, region, table, context=context, **kwargs_pass))
except credstash.ItemNotFound: except credstash.ItemNotFound:
raise AnsibleError('Key {0} not found'.format(term)) raise AnsibleError(f'Key {term} not found')
except Exception as e: except Exception as e:
raise AnsibleError('Encountered exception while fetching {0}: {1}'.format(term, e)) raise AnsibleError(f'Encountered exception while fetching {term}: {e}')
return ret return ret

View File

@ -105,7 +105,7 @@ class CyberarkPassword:
self.extra_parms = [] self.extra_parms = []
for key, value in kwargs.items(): for key, value in kwargs.items():
self.extra_parms.append('-p') self.extra_parms.append('-p')
self.extra_parms.append("%s=%s" % (key, value)) self.extra_parms.append(f"{key}={value}")
if self.appid is None: if self.appid is None:
raise AnsibleError("CyberArk Error: No Application ID specified") raise AnsibleError("CyberArk Error: No Application ID specified")
@ -130,8 +130,8 @@ class CyberarkPassword:
all_parms = [ all_parms = [
CLIPASSWORDSDK_CMD, CLIPASSWORDSDK_CMD,
'GetPassword', 'GetPassword',
'-p', 'AppDescs.AppID=%s' % self.appid, '-p', f'AppDescs.AppID={self.appid}',
'-p', 'Query=%s' % self.query, '-p', f'Query={self.query}',
'-o', self.output, '-o', self.output,
'-d', self.b_delimiter] '-d', self.b_delimiter]
all_parms.extend(self.extra_parms) all_parms.extend(self.extra_parms)
@ -144,7 +144,7 @@ class CyberarkPassword:
b_credential = to_bytes(tmp_output) b_credential = to_bytes(tmp_output)
if tmp_error: if tmp_error:
raise AnsibleError("ERROR => %s " % (tmp_error)) raise AnsibleError(f"ERROR => {tmp_error} ")
if b_credential and b_credential.endswith(b'\n'): if b_credential and b_credential.endswith(b'\n'):
b_credential = b_credential[:-1] b_credential = b_credential[:-1]
@ -164,7 +164,7 @@ class CyberarkPassword:
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
raise AnsibleError(e.output) raise AnsibleError(e.output)
except OSError as e: except OSError as e:
raise AnsibleError("ERROR - AIM not installed or clipasswordsdk not in standard location. ERROR=(%s) => %s " % (to_text(e.errno), e.strerror)) raise AnsibleError(f"ERROR - AIM not installed or clipasswordsdk not in standard location. ERROR=({to_text(e.errno)}) => {e.strerror} ")
return [result_dict] return [result_dict]
@ -177,11 +177,11 @@ class LookupModule(LookupBase):
""" """
def run(self, terms, variables=None, **kwargs): def run(self, terms, variables=None, **kwargs):
display.vvvv("%s" % terms) display.vvvv(f"{terms}")
if isinstance(terms, list): if isinstance(terms, list):
return_values = [] return_values = []
for term in terms: for term in terms:
display.vvvv("Term: %s" % term) display.vvvv(f"Term: {term}")
cyberark_conn = CyberarkPassword(**term) cyberark_conn = CyberarkPassword(**term)
return_values.append(cyberark_conn.get()) return_values.append(cyberark_conn.get())
return return_values return return_values

View File

@ -173,8 +173,7 @@ class LookupModule(LookupBase):
values = self.__evaluate(expression, templar, variables=vars) values = self.__evaluate(expression, templar, variables=vars)
except Exception as e: except Exception as e:
raise AnsibleLookupError( raise AnsibleLookupError(
'Caught "{error}" while evaluating {key!r} with item == {item!r}'.format( f'Caught "{e}" while evaluating {key!r} with item == {current!r}')
error=e, key=key, item=current))
if isinstance(values, Mapping): if isinstance(values, Mapping):
for idx, val in sorted(values.items()): for idx, val in sorted(values.items()):
@ -186,8 +185,7 @@ class LookupModule(LookupBase):
self.__process(result, terms, index + 1, current, templar, variables) self.__process(result, terms, index + 1, current, templar, variables)
else: else:
raise AnsibleLookupError( raise AnsibleLookupError(
'Did not obtain dictionary or list while evaluating {key!r} with item == {item!r}, but {type}'.format( f'Did not obtain dictionary or list while evaluating {key!r} with item == {current!r}, but {type(values)}')
key=key, item=current, type=type(values)))
def run(self, terms, variables=None, **kwargs): def run(self, terms, variables=None, **kwargs):
"""Generate list.""" """Generate list."""
@ -201,16 +199,14 @@ class LookupModule(LookupBase):
for index, term in enumerate(terms): for index, term in enumerate(terms):
if not isinstance(term, Mapping): if not isinstance(term, Mapping):
raise AnsibleLookupError( raise AnsibleLookupError(
'Parameter {index} must be a dictionary, got {type}'.format( f'Parameter {index} must be a dictionary, got {type(term)}')
index=index, type=type(term)))
if len(term) != 1: if len(term) != 1:
raise AnsibleLookupError( raise AnsibleLookupError(
'Parameter {index} must be a one-element dictionary, got {count} elements'.format( f'Parameter {index} must be a one-element dictionary, got {len(term)} elements')
index=index, count=len(term)))
k, v = list(term.items())[0] k, v = list(term.items())[0]
if k in vars_so_far: if k in vars_so_far:
raise AnsibleLookupError( raise AnsibleLookupError(
'The variable {key!r} appears more than once'.format(key=k)) f'The variable {k!r} appears more than once')
vars_so_far.add(k) vars_so_far.add(k)
if isinstance(v, string_types): if isinstance(v, string_types):
data.append((k, v, None)) data.append((k, v, None))
@ -218,7 +214,6 @@ class LookupModule(LookupBase):
data.append((k, None, v)) data.append((k, None, v))
else: else:
raise AnsibleLookupError( raise AnsibleLookupError(
'Parameter {key!r} (index {index}) must have a value of type string, dictionary or list, got type {type}'.format( f'Parameter {k!r} (index {index}) must have a value of type string, dictionary or list, got type {type(v)}')
index=index, key=k, type=type(v)))
self.__process(result, data, 0, {}, templar, variables) self.__process(result, data, 0, {}, templar, variables)
return result return result

View File

@ -345,7 +345,7 @@ class LookupModule(LookupBase):
try: try:
rdclass = dns.rdataclass.from_text(self.get_option('class')) rdclass = dns.rdataclass.from_text(self.get_option('class'))
except Exception as e: except Exception as e:
raise AnsibleError("dns lookup illegal CLASS: %s" % to_native(e)) raise AnsibleError(f"dns lookup illegal CLASS: {to_native(e)}")
myres.retry_servfail = self.get_option('retry_servfail') myres.retry_servfail = self.get_option('retry_servfail')
for t in terms: for t in terms:
@ -363,7 +363,7 @@ class LookupModule(LookupBase):
nsaddr = dns.resolver.query(ns)[0].address nsaddr = dns.resolver.query(ns)[0].address
nameservers.append(nsaddr) nameservers.append(nsaddr)
except Exception as e: except Exception as e:
raise AnsibleError("dns lookup NS: %s" % to_native(e)) raise AnsibleError(f"dns lookup NS: {to_native(e)}")
continue continue
if '=' in t: if '=' in t:
try: try:
@ -379,7 +379,7 @@ class LookupModule(LookupBase):
try: try:
rdclass = dns.rdataclass.from_text(arg) rdclass = dns.rdataclass.from_text(arg)
except Exception as e: except Exception as e:
raise AnsibleError("dns lookup illegal CLASS: %s" % to_native(e)) raise AnsibleError(f"dns lookup illegal CLASS: {to_native(e)}")
elif opt == 'retry_servfail': elif opt == 'retry_servfail':
myres.retry_servfail = boolean(arg) myres.retry_servfail = boolean(arg)
elif opt == 'fail_on_error': elif opt == 'fail_on_error':
@ -400,7 +400,7 @@ class LookupModule(LookupBase):
else: else:
domains.append(t) domains.append(t)
# print "--- domain = {0} qtype={1} rdclass={2}".format(domain, qtype, rdclass) # print "--- domain = {domain} qtype={qtype} rdclass={rdclass}"
if port: if port:
myres.port = port myres.port = port
@ -416,7 +416,7 @@ class LookupModule(LookupBase):
except dns.exception.SyntaxError: except dns.exception.SyntaxError:
pass pass
except Exception as e: except Exception as e:
raise AnsibleError("dns.reversename unhandled exception %s" % to_native(e)) raise AnsibleError(f"dns.reversename unhandled exception {to_native(e)}")
domains = reversed_domains domains = reversed_domains
if len(domains) > 1: if len(domains) > 1:
@ -445,25 +445,25 @@ class LookupModule(LookupBase):
ret.append(rd) ret.append(rd)
except Exception as err: except Exception as err:
if fail_on_error: if fail_on_error:
raise AnsibleError("Lookup failed: %s" % str(err)) raise AnsibleError(f"Lookup failed: {str(err)}")
ret.append(str(err)) ret.append(str(err))
except dns.resolver.NXDOMAIN as err: except dns.resolver.NXDOMAIN as err:
if fail_on_error: if fail_on_error:
raise AnsibleError("Lookup failed: %s" % str(err)) raise AnsibleError(f"Lookup failed: {str(err)}")
if not real_empty: if not real_empty:
ret.append('NXDOMAIN') ret.append('NXDOMAIN')
except dns.resolver.NoAnswer as err: except dns.resolver.NoAnswer as err:
if fail_on_error: if fail_on_error:
raise AnsibleError("Lookup failed: %s" % str(err)) raise AnsibleError(f"Lookup failed: {str(err)}")
if not real_empty: if not real_empty:
ret.append("") ret.append("")
except dns.resolver.Timeout as err: except dns.resolver.Timeout as err:
if fail_on_error: if fail_on_error:
raise AnsibleError("Lookup failed: %s" % str(err)) raise AnsibleError(f"Lookup failed: {str(err)}")
if not real_empty: if not real_empty:
ret.append("") ret.append("")
except dns.exception.DNSException as err: except dns.exception.DNSException as err:
raise AnsibleError("dns.resolver unhandled exception %s" % to_native(err)) raise AnsibleError(f"dns.resolver unhandled exception {to_native(err)}")
return ret return ret

View File

@ -108,7 +108,7 @@ class LookupModule(LookupBase):
continue continue
string = '' string = ''
except DNSException as e: except DNSException as e:
raise AnsibleError("dns.resolver unhandled exception %s" % to_native(e)) raise AnsibleError(f"dns.resolver unhandled exception {to_native(e)}")
ret.append(''.join(string)) ret.append(''.join(string))

View File

@ -135,17 +135,17 @@ class LookupModule(LookupBase):
result = [] result = []
for term in terms: for term in terms:
display.debug("dsv_lookup term: %s" % term) display.debug(f"dsv_lookup term: {term}")
try: try:
path = term.lstrip("[/:]") path = term.lstrip("[/:]")
if path == "": if path == "":
raise AnsibleOptionsError("Invalid secret path: %s" % term) raise AnsibleOptionsError(f"Invalid secret path: {term}")
display.vvv(u"DevOps Secrets Vault GET /secrets/%s" % path) display.vvv(f"DevOps Secrets Vault GET /secrets/{path}")
result.append(vault.get_secret_json(path)) result.append(vault.get_secret_json(path))
except SecretsVaultError as error: except SecretsVaultError as error:
raise AnsibleError( raise AnsibleError(
"DevOps Secrets Vault lookup failure: %s" % error.message f"DevOps Secrets Vault lookup failure: {error.message}"
) )
return result return result

View File

@ -104,7 +104,7 @@ class Etcd:
def __init__(self, url, version, validate_certs): def __init__(self, url, version, validate_certs):
self.url = url self.url = url
self.version = version self.version = version
self.baseurl = '%s/%s/keys' % (self.url, self.version) self.baseurl = f'{self.url}/{self.version}/keys'
self.validate_certs = validate_certs self.validate_certs = validate_certs
def _parse_node(self, node): def _parse_node(self, node):
@ -125,7 +125,7 @@ class Etcd:
return path return path
def get(self, key): def get(self, key):
url = "%s/%s?recursive=true" % (self.baseurl, key) url = f"{self.baseurl}/{key}?recursive=true"
data = None data = None
value = {} value = {}
try: try:

View File

@ -168,7 +168,7 @@ def etcd3_client(client_params):
etcd = etcd3.client(**client_params) etcd = etcd3.client(**client_params)
etcd.status() etcd.status()
except Exception as exp: except Exception as exp:
raise AnsibleLookupError('Cannot connect to etcd cluster: %s' % (to_native(exp))) raise AnsibleLookupError(f'Cannot connect to etcd cluster: {to_native(exp)}')
return etcd return etcd
@ -204,7 +204,7 @@ class LookupModule(LookupBase):
cnx_log = dict(client_params) cnx_log = dict(client_params)
if 'password' in cnx_log: if 'password' in cnx_log:
cnx_log['password'] = '<redacted>' cnx_log['password'] = '<redacted>'
display.verbose("etcd3 connection parameters: %s" % cnx_log) display.verbose(f"etcd3 connection parameters: {cnx_log}")
# connect to etcd3 server # connect to etcd3 server
etcd = etcd3_client(client_params) etcd = etcd3_client(client_params)
@ -218,12 +218,12 @@ class LookupModule(LookupBase):
if val and meta: if val and meta:
ret.append({'key': to_native(meta.key), 'value': to_native(val)}) ret.append({'key': to_native(meta.key), 'value': to_native(val)})
except Exception as exp: except Exception as exp:
display.warning('Caught except during etcd3.get_prefix: %s' % (to_native(exp))) display.warning(f'Caught except during etcd3.get_prefix: {to_native(exp)}')
else: else:
try: try:
val, meta = etcd.get(term) val, meta = etcd.get(term)
if val and meta: if val and meta:
ret.append({'key': to_native(meta.key), 'value': to_native(val)}) ret.append({'key': to_native(meta.key), 'value': to_native(val)})
except Exception as exp: except Exception as exp:
display.warning('Caught except during etcd3.get: %s' % (to_native(exp))) display.warning(f'Caught except during etcd3.get: {to_native(exp)}')
return ret return ret

View File

@ -158,7 +158,7 @@ def file_props(root, path):
try: try:
st = os.lstat(abspath) st = os.lstat(abspath)
except OSError as e: except OSError as e:
display.warning('filetree: Error using stat() on path %s (%s)' % (abspath, e)) display.warning(f'filetree: Error using stat() on path {abspath} ({e})')
return None return None
ret = dict(root=root, path=path) ret = dict(root=root, path=path)
@ -172,7 +172,7 @@ def file_props(root, path):
ret['state'] = 'file' ret['state'] = 'file'
ret['src'] = abspath ret['src'] = abspath
else: else:
display.warning('filetree: Error file type of %s is not supported' % abspath) display.warning(f'filetree: Error file type of {abspath} is not supported')
return None return None
ret['uid'] = st.st_uid ret['uid'] = st.st_uid
@ -185,7 +185,7 @@ def file_props(root, path):
ret['group'] = to_text(grp.getgrgid(st.st_gid).gr_name) ret['group'] = to_text(grp.getgrgid(st.st_gid).gr_name)
except KeyError: except KeyError:
ret['group'] = st.st_gid ret['group'] = st.st_gid
ret['mode'] = '0%03o' % (stat.S_IMODE(st.st_mode)) ret['mode'] = f'0{stat.S_IMODE(st.st_mode):03o}'
ret['size'] = st.st_size ret['size'] = st.st_size
ret['mtime'] = st.st_mtime ret['mtime'] = st.st_mtime
ret['ctime'] = st.st_ctime ret['ctime'] = st.st_ctime
@ -212,7 +212,7 @@ class LookupModule(LookupBase):
term_file = os.path.basename(term) term_file = os.path.basename(term)
dwimmed_path = self._loader.path_dwim_relative(basedir, 'files', os.path.dirname(term)) dwimmed_path = self._loader.path_dwim_relative(basedir, 'files', os.path.dirname(term))
path = os.path.join(dwimmed_path, term_file) path = os.path.join(dwimmed_path, term_file)
display.debug("Walking '{0}'".format(path)) display.debug(f"Walking '{path}'")
for root, dirs, files in os.walk(path, topdown=True): for root, dirs, files in os.walk(path, topdown=True):
for entry in dirs + files: for entry in dirs + files:
relpath = os.path.relpath(os.path.join(root, entry), path) relpath = os.path.relpath(os.path.join(root, entry), path)
@ -221,7 +221,7 @@ class LookupModule(LookupBase):
if relpath not in [entry['path'] for entry in ret]: if relpath not in [entry['path'] for entry in ret]:
props = file_props(path, relpath) props = file_props(path, relpath)
if props is not None: if props is not None:
display.debug(" found '{0}'".format(os.path.join(path, relpath))) display.debug(f" found '{os.path.join(path, relpath)}'")
ret.append(props) ret.append(props)
return ret return ret

View File

@ -97,7 +97,7 @@ def read_key(path, private_key=None):
with open(path, 'rb') as pem_file: with open(path, 'rb') as pem_file:
return jwk_from_pem(pem_file.read()) return jwk_from_pem(pem_file.read())
except Exception as e: except Exception as e:
raise AnsibleError("Error while parsing key file: {0}".format(e)) raise AnsibleError(f"Error while parsing key file: {e}")
def encode_jwt(app_id, jwk, exp=600): def encode_jwt(app_id, jwk, exp=600):
@ -110,7 +110,7 @@ def encode_jwt(app_id, jwk, exp=600):
try: try:
return jwt_instance.encode(payload, jwk, alg='RS256') return jwt_instance.encode(payload, jwk, alg='RS256')
except Exception as e: except Exception as e:
raise AnsibleError("Error while encoding jwt: {0}".format(e)) raise AnsibleError(f"Error while encoding jwt: {e}")
def post_request(generated_jwt, installation_id): def post_request(generated_jwt, installation_id):
@ -124,19 +124,19 @@ def post_request(generated_jwt, installation_id):
except HTTPError as e: except HTTPError as e:
try: try:
error_body = json.loads(e.read().decode()) error_body = json.loads(e.read().decode())
display.vvv("Error returned: {0}".format(error_body)) display.vvv(f"Error returned: {error_body}")
except Exception: except Exception:
error_body = {} error_body = {}
if e.code == 404: if e.code == 404:
raise AnsibleError("Github return error. Please confirm your installationd_id value is valid") raise AnsibleError("Github return error. Please confirm your installationd_id value is valid")
elif e.code == 401: elif e.code == 401:
raise AnsibleError("Github return error. Please confirm your private key is valid") raise AnsibleError("Github return error. Please confirm your private key is valid")
raise AnsibleError("Unexpected data returned: {0} -- {1}".format(e, error_body)) raise AnsibleError(f"Unexpected data returned: {e} -- {error_body}")
response_body = response.read() response_body = response.read()
try: try:
json_data = json.loads(response_body.decode('utf-8')) json_data = json.loads(response_body.decode('utf-8'))
except json.decoder.JSONDecodeError as e: except json.decoder.JSONDecodeError as e:
raise AnsibleError("Error while dencoding JSON respone from github: {0}".format(e)) raise AnsibleError(f"Error while dencoding JSON respone from github: {e}")
return json_data.get('token') return json_data.get('token')

View File

@ -79,8 +79,7 @@ class Hiera(object):
pargs.extend(hiera_key) pargs.extend(hiera_key)
rc, output, err = run_cmd("{0} -c {1} {2}".format( rc, output, err = run_cmd(f"{self.hiera_bin} -c {self.hiera_cfg} {hiera_key[0]}")
self.hiera_bin, self.hiera_cfg, hiera_key[0]))
return to_text(output.strip()) return to_text(output.strip())

View File

@ -61,13 +61,13 @@ class LookupModule(LookupBase):
self.set_options(var_options=variables, direct=kwargs) self.set_options(var_options=variables, direct=kwargs)
display.vvvv(u"keyring: %s" % keyring.get_keyring()) display.vvvv(f"keyring: {keyring.get_keyring()}")
ret = [] ret = []
for term in terms: for term in terms:
(servicename, username) = (term.split()[0], term.split()[1]) (servicename, username) = (term.split()[0], term.split()[1])
display.vvvv(u"username: %s, servicename: %s " % (username, servicename)) display.vvvv(f"username: {username}, servicename: {servicename} ")
password = keyring.get_password(servicename, username) password = keyring.get_password(servicename, username)
if password is None: if password is None:
raise AnsibleError(u"servicename: %s for user %s not found" % (servicename, username)) raise AnsibleError(f"servicename: {servicename} for user {username} not found")
ret.append(password.rstrip()) ret.append(password.rstrip())
return ret return ret

View File

@ -83,9 +83,9 @@ class LPass(object):
def get_field(self, key, field): def get_field(self, key, field):
if field in ['username', 'password', 'url', 'notes', 'id', 'name']: if field in ['username', 'password', 'url', 'notes', 'id', 'name']:
out, err = self._run(self._build_args("show", ["--{0}".format(field), key])) out, err = self._run(self._build_args("show", [f"--{field}", key]))
else: else:
out, err = self._run(self._build_args("show", ["--field={0}".format(field), key])) out, err = self._run(self._build_args("show", [f"--field={field}", key]))
return out.strip() return out.strip()

View File

@ -96,7 +96,7 @@ class LookupModule(LookupBase):
try: try:
env = lmdb.open(str(db), readonly=True) env = lmdb.open(str(db), readonly=True)
except Exception as e: except Exception as e:
raise AnsibleError("LMDB can't open database %s: %s" % (db, to_native(e))) raise AnsibleError(f"LMDB can't open database {db}: {to_native(e)}")
ret = [] ret = []
if len(terms) == 0: if len(terms) == 0:

View File

@ -78,12 +78,14 @@ class ApiError(Exception):
class ManifoldApiClient(object): class ManifoldApiClient(object):
base_url = 'https://api.{api}.manifold.co/v1/{endpoint}'
http_agent = 'python-manifold-ansible-1.0.0' http_agent = 'python-manifold-ansible-1.0.0'
def __init__(self, token): def __init__(self, token):
self._token = token self._token = token
def _make_url(self, api, endpoint):
return f'https://api.{api}.manifold.co/v1/{endpoint}'
def request(self, api, endpoint, *args, **kwargs): def request(self, api, endpoint, *args, **kwargs):
""" """
Send a request to API backend and pre-process a response. Send a request to API backend and pre-process a response.
@ -98,11 +100,11 @@ class ManifoldApiClient(object):
""" """
default_headers = { default_headers = {
'Authorization': "Bearer {0}".format(self._token), 'Authorization': f"Bearer {self._token}",
'Accept': "*/*" # Otherwise server doesn't set content-type header 'Accept': "*/*" # Otherwise server doesn't set content-type header
} }
url = self.base_url.format(api=api, endpoint=endpoint) url = self._make_url(api, endpoint)
headers = default_headers headers = default_headers
arg_headers = kwargs.pop('headers', None) arg_headers = kwargs.pop('headers', None)
@ -110,23 +112,22 @@ class ManifoldApiClient(object):
headers.update(arg_headers) headers.update(arg_headers)
try: try:
display.vvvv('manifold lookup connecting to {0}'.format(url)) display.vvvv(f'manifold lookup connecting to {url}')
response = open_url(url, headers=headers, http_agent=self.http_agent, *args, **kwargs) response = open_url(url, headers=headers, http_agent=self.http_agent, *args, **kwargs)
data = response.read() data = response.read()
if response.headers.get('content-type') == 'application/json': if response.headers.get('content-type') == 'application/json':
data = json.loads(data) data = json.loads(data)
return data return data
except ValueError: except ValueError:
raise ApiError('JSON response can\'t be parsed while requesting {url}:\n{json}'.format(json=data, url=url)) raise ApiError(f'JSON response can\'t be parsed while requesting {url}:\n{data}')
except HTTPError as e: except HTTPError as e:
raise ApiError('Server returned: {err} while requesting {url}:\n{response}'.format( raise ApiError(f'Server returned: {str(e)} while requesting {url}:\n{e.read()}')
err=str(e), url=url, response=e.read()))
except URLError as e: except URLError as e:
raise ApiError('Failed lookup url for {url} : {err}'.format(url=url, err=str(e))) raise ApiError(f'Failed lookup url for {url} : {str(e)}')
except SSLValidationError as e: except SSLValidationError as e:
raise ApiError('Error validating the server\'s certificate for {url}: {err}'.format(url=url, err=str(e))) raise ApiError(f'Error validating the server\'s certificate for {url}: {str(e)}')
except ConnectionError as e: except ConnectionError as e:
raise ApiError('Error connecting to {url}: {err}'.format(url=url, err=str(e))) raise ApiError(f'Error connecting to {url}: {str(e)}')
def get_resources(self, team_id=None, project_id=None, label=None): def get_resources(self, team_id=None, project_id=None, label=None):
""" """
@ -152,7 +153,7 @@ class ManifoldApiClient(object):
query_params['label'] = label query_params['label'] = label
if query_params: if query_params:
endpoint += '?' + urlencode(query_params) endpoint += f"?{urlencode(query_params)}"
return self.request(api, endpoint) return self.request(api, endpoint)
@ -188,7 +189,7 @@ class ManifoldApiClient(object):
query_params['label'] = label query_params['label'] = label
if query_params: if query_params:
endpoint += '?' + urlencode(query_params) endpoint += f"?{urlencode(query_params)}"
return self.request(api, endpoint) return self.request(api, endpoint)
@ -200,7 +201,7 @@ class ManifoldApiClient(object):
:return: :return:
""" """
api = 'marketplace' api = 'marketplace'
endpoint = 'credentials?' + urlencode({'resource_id': resource_id}) endpoint = f"credentials?{urlencode({'resource_id': resource_id})}"
return self.request(api, endpoint) return self.request(api, endpoint)
@ -229,7 +230,7 @@ class LookupModule(LookupBase):
if team: if team:
team_data = client.get_teams(team) team_data = client.get_teams(team)
if len(team_data) == 0: if len(team_data) == 0:
raise AnsibleError("Team '{0}' does not exist".format(team)) raise AnsibleError(f"Team '{team}' does not exist")
team_id = team_data[0]['id'] team_id = team_data[0]['id']
else: else:
team_id = None team_id = None
@ -237,7 +238,7 @@ class LookupModule(LookupBase):
if project: if project:
project_data = client.get_projects(project) project_data = client.get_projects(project)
if len(project_data) == 0: if len(project_data) == 0:
raise AnsibleError("Project '{0}' does not exist".format(project)) raise AnsibleError(f"Project '{project}' does not exist")
project_id = project_data[0]['id'] project_id = project_data[0]['id']
else: else:
project_id = None project_id = None
@ -252,7 +253,7 @@ class LookupModule(LookupBase):
if labels and len(resources_data) < len(labels): if labels and len(resources_data) < len(labels):
fetched_labels = [r['body']['label'] for r in resources_data] fetched_labels = [r['body']['label'] for r in resources_data]
not_found_labels = [label for label in labels if label not in fetched_labels] not_found_labels = [label for label in labels if label not in fetched_labels]
raise AnsibleError("Resource(s) {0} do not exist".format(', '.join(not_found_labels))) raise AnsibleError(f"Resource(s) {', '.join(not_found_labels)} do not exist")
credentials = {} credentials = {}
cred_map = {} cred_map = {}
@ -262,17 +263,14 @@ class LookupModule(LookupBase):
for cred_key, cred_val in six.iteritems(resource_credentials[0]['body']['values']): for cred_key, cred_val in six.iteritems(resource_credentials[0]['body']['values']):
label = resource['body']['label'] label = resource['body']['label']
if cred_key in credentials: if cred_key in credentials:
display.warning("'{cred_key}' with label '{old_label}' was replaced by resource data " display.warning(f"'{cred_key}' with label '{cred_map[cred_key]}' was replaced by resource data with label '{label}'")
"with label '{new_label}'".format(cred_key=cred_key,
old_label=cred_map[cred_key],
new_label=label))
credentials[cred_key] = cred_val credentials[cred_key] = cred_val
cred_map[cred_key] = label cred_map[cred_key] = label
ret = [credentials] ret = [credentials]
return ret return ret
except ApiError as e: except ApiError as e:
raise AnsibleError('API Error: {0}'.format(str(e))) raise AnsibleError(f'API Error: {str(e)}')
except AnsibleError as e: except AnsibleError as e:
raise e raise e
except Exception: except Exception:

View File

@ -149,7 +149,7 @@ class LookupModule(LookupBase):
ret = [] ret = []
for term in terms: for term in terms:
if not isinstance(term, str): if not isinstance(term, str):
raise AnsibleError("Non-string type '{0}' passed, only 'str' types are allowed!".format(type(term))) raise AnsibleError(f"Non-string type '{type(term)}' passed, only 'str' types are allowed!")
if not self._groups: # consider only own variables if not self._groups: # consider only own variables
ret.append(self._merge_vars(term, initial_value, variables)) ret.append(self._merge_vars(term, initial_value, variables))
@ -186,9 +186,9 @@ class LookupModule(LookupBase):
return False return False
def _merge_vars(self, search_pattern, initial_value, variables): def _merge_vars(self, search_pattern, initial_value, variables):
display.vvv("Merge variables with {0}: {1}".format(self._pattern_type, search_pattern)) display.vvv(f"Merge variables with {self._pattern_type}: {search_pattern}")
var_merge_names = sorted([key for key in variables.keys() if self._var_matches(key, search_pattern)]) var_merge_names = sorted([key for key in variables.keys() if self._var_matches(key, search_pattern)])
display.vvv("The following variables will be merged: {0}".format(var_merge_names)) display.vvv(f"The following variables will be merged: {var_merge_names}")
prev_var_type = None prev_var_type = None
result = None result = None
@ -226,8 +226,7 @@ class LookupModule(LookupBase):
dest[key] += value dest[key] += value
else: else:
if (key in dest) and dest[key] != value: if (key in dest) and dest[key] != value:
msg = "The key '{0}' with value '{1}' will be overwritten with value '{2}' from '{3}.{0}'".format( msg = f"The key '{key}' with value '{dest[key]}' will be overwritten with value '{value}' from '{'.'.join(path)}.{key}'"
key, dest[key], value, ".".join(path))
if self._override == "error": if self._override == "error":
raise AnsibleError(msg) raise AnsibleError(msg)

View File

@ -140,11 +140,11 @@ class OnePassCLIBase(with_metaclass(abc.ABCMeta, object)):
if missing: if missing:
prefix = "Unable to sign in to 1Password. Missing required parameter" prefix = "Unable to sign in to 1Password. Missing required parameter"
plural = "" plural = ""
suffix = ": {params}.".format(params=", ".join(missing)) suffix = f": {', '.join(missing)}."
if len(missing) > 1: if len(missing) > 1:
plural = "s" plural = "s"
msg = "{prefix}{plural}{suffix}".format(prefix=prefix, plural=plural, suffix=suffix) msg = f"{prefix}{plural}{suffix}"
raise AnsibleLookupError(msg) raise AnsibleLookupError(msg)
@abc.abstractmethod @abc.abstractmethod
@ -210,12 +210,12 @@ class OnePassCLIBase(with_metaclass(abc.ABCMeta, object)):
try: try:
bin_path = get_bin_path(cls.bin) bin_path = get_bin_path(cls.bin)
except ValueError: except ValueError:
raise AnsibleLookupError("Unable to locate '%s' command line tool" % cls.bin) raise AnsibleLookupError(f"Unable to locate '{cls.bin}' command line tool")
try: try:
b_out = subprocess.check_output([bin_path, "--version"], stderr=subprocess.PIPE) b_out = subprocess.check_output([bin_path, "--version"], stderr=subprocess.PIPE)
except subprocess.CalledProcessError as cpe: except subprocess.CalledProcessError as cpe:
raise AnsibleLookupError("Unable to get the op version: %s" % cpe) raise AnsibleLookupError(f"Unable to get the op version: {cpe}")
return to_text(b_out).strip() return to_text(b_out).strip()
@ -300,7 +300,7 @@ class OnePassCLIv1(OnePassCLIBase):
if self.account_id: if self.account_id:
args.extend(["--account", self.account_id]) args.extend(["--account", self.account_id])
elif self.subdomain: elif self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain) account = f"{self.subdomain}.{self.domain}"
args.extend(["--account", account]) args.extend(["--account", account])
rc, out, err = self._run(args, ignore_errors=True) rc, out, err = self._run(args, ignore_errors=True)
@ -326,7 +326,7 @@ class OnePassCLIv1(OnePassCLIBase):
args = [ args = [
"signin", "signin",
"{0}.{1}".format(self.subdomain, self.domain), f"{self.subdomain}.{self.domain}",
to_bytes(self.username), to_bytes(self.username),
to_bytes(self.secret_key), to_bytes(self.secret_key),
"--raw", "--raw",
@ -341,7 +341,7 @@ class OnePassCLIv1(OnePassCLIBase):
args.extend(["--account", self.account_id]) args.extend(["--account", self.account_id])
if vault is not None: if vault is not None:
args += ["--vault={0}".format(vault)] args += [f"--vault={vault}"]
if token is not None: if token is not None:
args += [to_bytes("--session=") + token] args += [to_bytes("--session=") + token]
@ -512,7 +512,7 @@ class OnePassCLIv2(OnePassCLIBase):
args = ["account", "list"] args = ["account", "list"]
if self.subdomain: if self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain) account = f"{self.subdomain}.{self.domain}"
args.extend(["--account", account]) args.extend(["--account", account])
rc, out, err = self._run(args) rc, out, err = self._run(args)
@ -525,7 +525,7 @@ class OnePassCLIv2(OnePassCLIBase):
if self.account_id: if self.account_id:
args.extend(["--account", self.account_id]) args.extend(["--account", self.account_id])
elif self.subdomain: elif self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain) account = f"{self.subdomain}.{self.domain}"
args.extend(["--account", account]) args.extend(["--account", account])
rc, out, err = self._run(args, ignore_errors=True) rc, out, err = self._run(args, ignore_errors=True)
@ -545,7 +545,7 @@ class OnePassCLIv2(OnePassCLIBase):
args = [ args = [
"account", "add", "--raw", "account", "add", "--raw",
"--address", "{0}.{1}".format(self.subdomain, self.domain), "--address", f"{self.subdomain}.{self.domain}",
"--email", to_bytes(self.username), "--email", to_bytes(self.username),
"--signin", "--signin",
] ]
@ -560,7 +560,7 @@ class OnePassCLIv2(OnePassCLIBase):
args.extend(["--account", self.account_id]) args.extend(["--account", self.account_id])
if vault is not None: if vault is not None:
args += ["--vault={0}".format(vault)] args += [f"--vault={vault}"]
if self.connect_host and self.connect_token: if self.connect_host and self.connect_token:
if vault is None: if vault is None:
@ -627,7 +627,7 @@ class OnePass(object):
except TypeError as e: except TypeError as e:
raise AnsibleLookupError(e) raise AnsibleLookupError(e)
raise AnsibleLookupError("op version %s is unsupported" % version) raise AnsibleLookupError(f"op version {version} is unsupported")
def set_token(self): def set_token(self):
if self._config.config_file_path and os.path.isfile(self._config.config_file_path): if self._config.config_file_path and os.path.isfile(self._config.config_file_path):

View File

@ -55,7 +55,7 @@ class OnePassCLIv2Doc(OnePassCLIv2):
def get_raw(self, item_id, vault=None, token=None): def get_raw(self, item_id, vault=None, token=None):
args = ["document", "get", item_id] args = ["document", "get", item_id]
if vault is not None: if vault is not None:
args = [*args, "--vault={0}".format(vault)] args = [*args, f"--vault={vault}"]
if self.service_account_token: if self.service_account_token:
if vault is None: if vault is None:

View File

@ -315,7 +315,7 @@ class LookupModule(LookupBase):
) )
self.realpass = 'pass: the standard unix password manager' in passoutput self.realpass = 'pass: the standard unix password manager' in passoutput
except (subprocess.CalledProcessError) as e: except (subprocess.CalledProcessError) as e:
raise AnsibleError('exit code {0} while running {1}. Error output: {2}'.format(e.returncode, e.cmd, e.output)) raise AnsibleError(f'exit code {e.returncode} while running {e.cmd}. Error output: {e.output}')
return self.realpass return self.realpass
@ -332,7 +332,7 @@ class LookupModule(LookupBase):
for param in params[1:]: for param in params[1:]:
name, value = param.split('=', 1) name, value = param.split('=', 1)
if name not in self.paramvals: if name not in self.paramvals:
raise AnsibleAssertionError('%s not in paramvals' % name) raise AnsibleAssertionError(f'{name} not in paramvals')
self.paramvals[name] = value self.paramvals[name] = value
except (ValueError, AssertionError) as e: except (ValueError, AssertionError) as e:
raise AnsibleError(e) raise AnsibleError(e)
@ -344,12 +344,12 @@ class LookupModule(LookupBase):
except (ValueError, AssertionError) as e: except (ValueError, AssertionError) as e:
raise AnsibleError(e) raise AnsibleError(e)
if self.paramvals['missing'] not in ['error', 'warn', 'create', 'empty']: if self.paramvals['missing'] not in ['error', 'warn', 'create', 'empty']:
raise AnsibleError("{0} is not a valid option for missing".format(self.paramvals['missing'])) raise AnsibleError(f"{self.paramvals['missing']} is not a valid option for missing")
if not isinstance(self.paramvals['length'], int): if not isinstance(self.paramvals['length'], int):
if self.paramvals['length'].isdigit(): if self.paramvals['length'].isdigit():
self.paramvals['length'] = int(self.paramvals['length']) self.paramvals['length'] = int(self.paramvals['length'])
else: else:
raise AnsibleError("{0} is not a correct value for length".format(self.paramvals['length'])) raise AnsibleError(f"{self.paramvals['length']} is not a correct value for length")
if self.paramvals['create']: if self.paramvals['create']:
self.paramvals['missing'] = 'create' self.paramvals['missing'] = 'create'
@ -364,7 +364,7 @@ class LookupModule(LookupBase):
# Set PASSWORD_STORE_DIR # Set PASSWORD_STORE_DIR
self.env['PASSWORD_STORE_DIR'] = self.paramvals['directory'] self.env['PASSWORD_STORE_DIR'] = self.paramvals['directory']
elif self.is_real_pass(): elif self.is_real_pass():
raise AnsibleError('Passwordstore directory \'{0}\' does not exist'.format(self.paramvals['directory'])) raise AnsibleError(f"Passwordstore directory '{self.paramvals['directory']}' does not exist")
# Set PASSWORD_STORE_UMASK if umask is set # Set PASSWORD_STORE_UMASK if umask is set
if self.paramvals.get('umask') is not None: if self.paramvals.get('umask') is not None:
@ -394,19 +394,19 @@ class LookupModule(LookupBase):
name, value = line.split(':', 1) name, value = line.split(':', 1)
self.passdict[name.strip()] = value.strip() self.passdict[name.strip()] = value.strip()
if (self.backend == 'gopass' or if (self.backend == 'gopass' or
os.path.isfile(os.path.join(self.paramvals['directory'], self.passname + ".gpg")) os.path.isfile(os.path.join(self.paramvals['directory'], f"{self.passname}.gpg"))
or not self.is_real_pass()): or not self.is_real_pass()):
# When using real pass, only accept password as found if there is a .gpg file for it (might be a tree node otherwise) # When using real pass, only accept password as found if there is a .gpg file for it (might be a tree node otherwise)
return True return True
except (subprocess.CalledProcessError) as e: except (subprocess.CalledProcessError) as e:
# 'not in password store' is the expected error if a password wasn't found # 'not in password store' is the expected error if a password wasn't found
if 'not in the password store' not in e.output: if 'not in the password store' not in e.output:
raise AnsibleError('exit code {0} while running {1}. Error output: {2}'.format(e.returncode, e.cmd, e.output)) raise AnsibleError(f'exit code {e.returncode} while running {e.cmd}. Error output: {e.output}')
if self.paramvals['missing'] == 'error': if self.paramvals['missing'] == 'error':
raise AnsibleError('passwordstore: passname {0} not found and missing=error is set'.format(self.passname)) raise AnsibleError(f'passwordstore: passname {self.passname} not found and missing=error is set')
elif self.paramvals['missing'] == 'warn': elif self.paramvals['missing'] == 'warn':
display.warning('passwordstore: passname {0} not found'.format(self.passname)) display.warning(f'passwordstore: passname {self.passname} not found')
return False return False
@ -433,11 +433,11 @@ class LookupModule(LookupBase):
msg_lines = [] msg_lines = []
subkey_exists = False subkey_exists = False
subkey_line = "{0}: {1}".format(subkey, newpass) subkey_line = f"{subkey}: {newpass}"
oldpass = None oldpass = None
for line in self.passoutput: for line in self.passoutput:
if line.startswith("{0}: ".format(subkey)): if line.startswith(f"{subkey}: "):
oldpass = self.passdict[subkey] oldpass = self.passdict[subkey]
line = subkey_line line = subkey_line
subkey_exists = True subkey_exists = True
@ -449,9 +449,7 @@ class LookupModule(LookupBase):
if self.paramvals["timestamp"] and self.paramvals["backup"] and oldpass and oldpass != newpass: if self.paramvals["timestamp"] and self.paramvals["backup"] and oldpass and oldpass != newpass:
msg_lines.append( msg_lines.append(
"lookup_pass: old subkey '{0}' password was {1} (Updated on {2})\n".format( f"lookup_pass: old subkey '{subkey}' password was {oldpass} (Updated on {datetime})\n"
subkey, oldpass, datetime
)
) )
msg = os.linesep.join(msg_lines) msg = os.linesep.join(msg_lines)
@ -464,12 +462,12 @@ class LookupModule(LookupBase):
if self.paramvals['preserve'] and self.passoutput[1:]: if self.paramvals['preserve'] and self.passoutput[1:]:
msg += '\n'.join(self.passoutput[1:]) + '\n' msg += '\n'.join(self.passoutput[1:]) + '\n'
if self.paramvals['timestamp'] and self.paramvals['backup']: if self.paramvals['timestamp'] and self.paramvals['backup']:
msg += "lookup_pass: old password was {0} (Updated on {1})\n".format(self.password, datetime) msg += f"lookup_pass: old password was {self.password} (Updated on {datetime})\n"
try: try:
check_output2([self.pass_cmd, 'insert', '-f', '-m', self.passname], input=msg, env=self.env) check_output2([self.pass_cmd, 'insert', '-f', '-m', self.passname], input=msg, env=self.env)
except (subprocess.CalledProcessError) as e: except (subprocess.CalledProcessError) as e:
raise AnsibleError('exit code {0} while running {1}. Error output: {2}'.format(e.returncode, e.cmd, e.output)) raise AnsibleError(f'exit code {e.returncode} while running {e.cmd}. Error output: {e.output}')
return newpass return newpass
def generate_password(self): def generate_password(self):
@ -480,17 +478,17 @@ class LookupModule(LookupBase):
subkey = self.paramvals["subkey"] subkey = self.paramvals["subkey"]
if subkey != "password": if subkey != "password":
msg = "\n\n{0}: {1}".format(subkey, newpass) msg = f"\n\n{subkey}: {newpass}"
else: else:
msg = newpass msg = newpass
if self.paramvals['timestamp']: if self.paramvals['timestamp']:
msg += '\n' + "lookup_pass: First generated by ansible on {0}\n".format(datetime) msg += f"\nlookup_pass: First generated by ansible on {datetime}\n"
try: try:
check_output2([self.pass_cmd, 'insert', '-f', '-m', self.passname], input=msg, env=self.env) check_output2([self.pass_cmd, 'insert', '-f', '-m', self.passname], input=msg, env=self.env)
except (subprocess.CalledProcessError) as e: except (subprocess.CalledProcessError) as e:
raise AnsibleError('exit code {0} while running {1}. Error output: {2}'.format(e.returncode, e.cmd, e.output)) raise AnsibleError(f'exit code {e.returncode} while running {e.cmd}. Error output: {e.output}')
return newpass return newpass
@ -505,16 +503,12 @@ class LookupModule(LookupBase):
else: else:
if self.paramvals["missing_subkey"] == "error": if self.paramvals["missing_subkey"] == "error":
raise AnsibleError( raise AnsibleError(
"passwordstore: subkey {0} for passname {1} not found and missing_subkey=error is set".format( f"passwordstore: subkey {self.paramvals['subkey']} for passname {self.passname} not found and missing_subkey=error is set"
self.paramvals["subkey"], self.passname
)
) )
if self.paramvals["missing_subkey"] == "warn": if self.paramvals["missing_subkey"] == "warn":
display.warning( display.warning(
"passwordstore: subkey {0} for passname {1} not found".format( f"passwordstore: subkey {self.paramvals['subkey']} for passname {self.passname} not found"
self.paramvals["subkey"], self.passname
)
) )
return None return None
@ -524,7 +518,7 @@ class LookupModule(LookupBase):
if self.get_option('lock') == type: if self.get_option('lock') == type:
tmpdir = os.environ.get('TMPDIR', '/tmp') tmpdir = os.environ.get('TMPDIR', '/tmp')
user = os.environ.get('USER') user = os.environ.get('USER')
lockfile = os.path.join(tmpdir, '.{0}.passwordstore.lock'.format(user)) lockfile = os.path.join(tmpdir, f'.{user}.passwordstore.lock')
with FileLock().lock_file(lockfile, tmpdir, self.lock_timeout): with FileLock().lock_file(lockfile, tmpdir, self.lock_timeout):
self.locked = type self.locked = type
yield yield
@ -538,7 +532,7 @@ class LookupModule(LookupBase):
self.locked = None self.locked = None
timeout = self.get_option('locktimeout') timeout = self.get_option('locktimeout')
if not re.match('^[0-9]+[smh]$', timeout): if not re.match('^[0-9]+[smh]$', timeout):
raise AnsibleError("{0} is not a correct value for locktimeout".format(timeout)) raise AnsibleError(f"{timeout} is not a correct value for locktimeout")
unit_to_seconds = {"s": 1, "m": 60, "h": 3600} unit_to_seconds = {"s": 1, "m": 60, "h": 3600}
self.lock_timeout = int(timeout[:-1]) * unit_to_seconds[timeout[-1]] self.lock_timeout = int(timeout[:-1]) * unit_to_seconds[timeout[-1]]

View File

@ -95,6 +95,6 @@ class LookupModule(LookupBase):
values = petname.Generate(words=words, separator=separator, letters=length) values = petname.Generate(words=words, separator=separator, letters=length)
if prefix: if prefix:
values = "%s%s%s" % (prefix, separator, values) values = f"{prefix}{separator}{values}"
return [values] return [values]

View File

@ -116,5 +116,5 @@ class LookupModule(LookupBase):
ret.append(to_text(res)) ret.append(to_text(res))
except Exception as e: except Exception as e:
# connection failed or key not found # connection failed or key not found
raise AnsibleError('Encountered exception while fetching {0}: {1}'.format(term, e)) raise AnsibleError(f'Encountered exception while fetching {term}: {e}')
return ret return ret

View File

@ -100,8 +100,8 @@ class LookupModule(LookupBase):
result = [] result = []
for term in terms: for term in terms:
try: try:
display.vvv("Secret Server lookup of Secret with ID %s" % term) display.vvv(f"Secret Server lookup of Secret with ID {term}")
result.append({term: secret_server.get_pam_secret(term)}) result.append({term: secret_server.get_pam_secret(term)})
except Exception as error: except Exception as error:
raise AnsibleError("Secret Server lookup failure: %s" % error.message) raise AnsibleError(f"Secret Server lookup failure: {error.message}")
return result return result

View File

@ -71,7 +71,7 @@ class LookupModule(LookupBase):
for param in params: for param in params:
name, value = param.split('=') name, value = param.split('=')
if name not in paramvals: if name not in paramvals:
raise AnsibleAssertionError('%s not in paramvals' % name) raise AnsibleAssertionError(f'{name} not in paramvals')
paramvals[name] = value paramvals[name] = value
except (ValueError, AssertionError) as e: except (ValueError, AssertionError) as e:
@ -86,11 +86,11 @@ class LookupModule(LookupBase):
if shelvefile: if shelvefile:
res = self.read_shelve(shelvefile, key) res = self.read_shelve(shelvefile, key)
if res is None: if res is None:
raise AnsibleError("Key %s not found in shelve file %s" % (key, shelvefile)) raise AnsibleError(f"Key {key} not found in shelve file {shelvefile}")
# Convert the value read to string # Convert the value read to string
ret.append(to_text(res)) ret.append(to_text(res))
break break
else: else:
raise AnsibleError("Could not locate shelve file in lookup: %s" % paramvals['file']) raise AnsibleError(f"Could not locate shelve file in lookup: {paramvals['file']}")
return ret return ret

View File

@ -306,14 +306,14 @@ class TSSClient(object):
return TSSClientV0(**server_parameters) return TSSClientV0(**server_parameters)
def get_secret(self, term, secret_path, fetch_file_attachments, file_download_path): def get_secret(self, term, secret_path, fetch_file_attachments, file_download_path):
display.debug("tss_lookup term: %s" % term) display.debug(f"tss_lookup term: {term}")
secret_id = self._term_to_secret_id(term) secret_id = self._term_to_secret_id(term)
if secret_id == 0 and secret_path: if secret_id == 0 and secret_path:
fetch_secret_by_path = True fetch_secret_by_path = True
display.vvv(u"Secret Server lookup of Secret with path %s" % secret_path) display.vvv(f"Secret Server lookup of Secret with path {secret_path}")
else: else:
fetch_secret_by_path = False fetch_secret_by_path = False
display.vvv(u"Secret Server lookup of Secret with ID %d" % secret_id) display.vvv(f"Secret Server lookup of Secret with ID {secret_id}")
if fetch_file_attachments: if fetch_file_attachments:
if fetch_secret_by_path: if fetch_secret_by_path:
@ -325,12 +325,12 @@ class TSSClient(object):
if i['isFile']: if i['isFile']:
try: try:
file_content = i['itemValue'].content file_content = i['itemValue'].content
with open(os.path.join(file_download_path, str(obj['id']) + "_" + i['slug']), "wb") as f: with open(os.path.join(file_download_path, f"{str(obj['id'])}_{i['slug']}"), "wb") as f:
f.write(file_content) f.write(file_content)
except ValueError: except ValueError:
raise AnsibleOptionsError("Failed to download {0}".format(str(i['slug']))) raise AnsibleOptionsError(f"Failed to download {str(i['slug'])}")
except AttributeError: except AttributeError:
display.warning("Could not read file content for {0}".format(str(i['slug']))) display.warning(f"Could not read file content for {str(i['slug'])}")
finally: finally:
i['itemValue'] = "*** Not Valid For Display ***" i['itemValue'] = "*** Not Valid For Display ***"
else: else:
@ -343,9 +343,9 @@ class TSSClient(object):
return self._client.get_secret_json(secret_id) return self._client.get_secret_json(secret_id)
def get_secret_ids_by_folderid(self, term): def get_secret_ids_by_folderid(self, term):
display.debug("tss_lookup term: %s" % term) display.debug(f"tss_lookup term: {term}")
folder_id = self._term_to_folder_id(term) folder_id = self._term_to_folder_id(term)
display.vvv(u"Secret Server lookup of Secret id's with Folder ID %d" % folder_id) display.vvv(f"Secret Server lookup of Secret id's with Folder ID {folder_id}")
return self._client.get_secret_ids_by_folderid(folder_id) return self._client.get_secret_ids_by_folderid(folder_id)
@ -447,4 +447,4 @@ class LookupModule(LookupBase):
for term in terms for term in terms
] ]
except SecretServerError as error: except SecretServerError as error:
raise AnsibleError("Secret Server lookup failure: %s" % error.message) raise AnsibleError(f"Secret Server lookup failure: {error.message}")

View File

@ -1,4 +1,5 @@
.azure-pipelines/scripts/publish-codecov.py replace-urlopen .azure-pipelines/scripts/publish-codecov.py replace-urlopen
plugins/lookup/dependent.py validate-modules:unidiomatic-typecheck
plugins/modules/consul_session.py validate-modules:parameter-state-invalid-choice plugins/modules/consul_session.py validate-modules:parameter-state-invalid-choice
plugins/modules/homectl.py import-3.11 # Uses deprecated stdlib library 'crypt' plugins/modules/homectl.py import-3.11 # Uses deprecated stdlib library 'crypt'
plugins/modules/iptables_state.py validate-modules:undocumented-parameter # params _back and _timeout used by action plugin plugins/modules/iptables_state.py validate-modules:undocumented-parameter # params _back and _timeout used by action plugin

View File

@ -1,3 +1,4 @@
plugins/lookup/dependent.py validate-modules:unidiomatic-typecheck
plugins/modules/consul_session.py validate-modules:parameter-state-invalid-choice plugins/modules/consul_session.py validate-modules:parameter-state-invalid-choice
plugins/modules/homectl.py import-3.11 # Uses deprecated stdlib library 'crypt' plugins/modules/homectl.py import-3.11 # Uses deprecated stdlib library 'crypt'
plugins/modules/homectl.py import-3.12 # Uses deprecated stdlib library 'crypt' plugins/modules/homectl.py import-3.12 # Uses deprecated stdlib library 'crypt'