lookup plugins: use f-strings (#9324)

* lookup plugins: use f-strings

* add changelog frag

* manual change for few occurrences

* Update plugins/lookup/dependent.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* adjustment from review

* no f-string for you

* Update plugins/lookup/dependent.py

Co-authored-by: Felix Fontein <felix@fontein.de>

---------

Co-authored-by: Felix Fontein <felix@fontein.de>
pull/9363/head
Alexei Znamensky 2024-12-25 21:48:06 +13:00 committed by GitHub
parent 2005125af4
commit 6cd3f79e19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 165 additions and 149 deletions

View File

@ -0,0 +1,29 @@
minor_changes:
- bitwarden lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- chef_databag lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- collection_version lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- consul_kv lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- credstash lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- cyberarkpassword lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- dependent lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- dig lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- dnstxt lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- dsv lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- etcd lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- etcd3 lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- filetree lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- github_app_access_token lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- hiera lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- keyring lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- lastpass lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- lmdb_kv lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- manifold lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- merge_variables lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- onepassword lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- onepassword_doc lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- passwordstore lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- random_pet lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- redis lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- revbitspss lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- shelvefile lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).
- tss lookup plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9324).

View File

@ -207,7 +207,7 @@ class Bitwarden(object):
continue
if matches and not field_matches:
raise AnsibleError("field {field} does not exist in {search_value}".format(field=field, search_value=search_value))
raise AnsibleError(f"field {field} does not exist in {search_value}")
return field_matches

View File

@ -81,11 +81,11 @@ class LookupModule(LookupBase):
setattr(self, arg, parsed)
except ValueError:
raise AnsibleError(
"can't parse arg {0}={1} as string".format(arg, arg_raw)
f"can't parse arg {arg}={arg_raw} as string"
)
if args:
raise AnsibleError(
"unrecognized arguments to with_sequence: %r" % list(args.keys())
f"unrecognized arguments to with_sequence: {list(args.keys())!r}"
)
def run(self, terms, variables=None, **kwargs):

View File

@ -115,10 +115,10 @@ class LookupModule(LookupBase):
for term in terms:
if not FQCN_RE.match(term):
raise AnsibleLookupError('"{term}" is not a FQCN'.format(term=term))
raise AnsibleLookupError(f'"{term}" is not a FQCN')
try:
collection_pkg = import_module('ansible_collections.{fqcn}'.format(fqcn=term))
collection_pkg = import_module(f'ansible_collections.{term}')
except ImportError:
# Collection not found
result.append(not_found)
@ -127,7 +127,7 @@ class LookupModule(LookupBase):
try:
data = load_collection_meta(collection_pkg, no_version=no_version)
except Exception as exc:
raise AnsibleLookupError('Error while loading metadata for {fqcn}: {error}'.format(fqcn=term, error=exc))
raise AnsibleLookupError(f'Error while loading metadata for {term}: {exc}')
result.append(data.get('version', no_version))

View File

@ -171,7 +171,7 @@ class LookupModule(LookupBase):
values.append(to_text(results[1]['Value']))
except Exception as e:
raise AnsibleError(
"Error locating '%s' in kv store. Error was %s" % (term, e))
f"Error locating '{term}' in kv store. Error was {e}")
return values
@ -192,7 +192,7 @@ class LookupModule(LookupBase):
if param and len(param) > 0:
name, value = param.split('=')
if name not in paramvals:
raise AnsibleAssertionError("%s not a valid consul lookup parameter" % name)
raise AnsibleAssertionError(f"{name} not a valid consul lookup parameter")
paramvals[name] = value
except (ValueError, AssertionError) as e:
raise AnsibleError(e)

View File

@ -137,8 +137,8 @@ class LookupModule(LookupBase):
try:
ret.append(credstash.getSecret(term, version, region, table, context=context, **kwargs_pass))
except credstash.ItemNotFound:
raise AnsibleError('Key {0} not found'.format(term))
raise AnsibleError(f'Key {term} not found')
except Exception as e:
raise AnsibleError('Encountered exception while fetching {0}: {1}'.format(term, e))
raise AnsibleError(f'Encountered exception while fetching {term}: {e}')
return ret

View File

@ -105,7 +105,7 @@ class CyberarkPassword:
self.extra_parms = []
for key, value in kwargs.items():
self.extra_parms.append('-p')
self.extra_parms.append("%s=%s" % (key, value))
self.extra_parms.append(f"{key}={value}")
if self.appid is None:
raise AnsibleError("CyberArk Error: No Application ID specified")
@ -130,8 +130,8 @@ class CyberarkPassword:
all_parms = [
CLIPASSWORDSDK_CMD,
'GetPassword',
'-p', 'AppDescs.AppID=%s' % self.appid,
'-p', 'Query=%s' % self.query,
'-p', f'AppDescs.AppID={self.appid}',
'-p', f'Query={self.query}',
'-o', self.output,
'-d', self.b_delimiter]
all_parms.extend(self.extra_parms)
@ -144,7 +144,7 @@ class CyberarkPassword:
b_credential = to_bytes(tmp_output)
if tmp_error:
raise AnsibleError("ERROR => %s " % (tmp_error))
raise AnsibleError(f"ERROR => {tmp_error} ")
if b_credential and b_credential.endswith(b'\n'):
b_credential = b_credential[:-1]
@ -164,7 +164,7 @@ class CyberarkPassword:
except subprocess.CalledProcessError as e:
raise AnsibleError(e.output)
except OSError as e:
raise AnsibleError("ERROR - AIM not installed or clipasswordsdk not in standard location. ERROR=(%s) => %s " % (to_text(e.errno), e.strerror))
raise AnsibleError(f"ERROR - AIM not installed or clipasswordsdk not in standard location. ERROR=({to_text(e.errno)}) => {e.strerror} ")
return [result_dict]
@ -177,11 +177,11 @@ class LookupModule(LookupBase):
"""
def run(self, terms, variables=None, **kwargs):
display.vvvv("%s" % terms)
display.vvvv(f"{terms}")
if isinstance(terms, list):
return_values = []
for term in terms:
display.vvvv("Term: %s" % term)
display.vvvv(f"Term: {term}")
cyberark_conn = CyberarkPassword(**term)
return_values.append(cyberark_conn.get())
return return_values

View File

@ -173,8 +173,7 @@ class LookupModule(LookupBase):
values = self.__evaluate(expression, templar, variables=vars)
except Exception as e:
raise AnsibleLookupError(
'Caught "{error}" while evaluating {key!r} with item == {item!r}'.format(
error=e, key=key, item=current))
f'Caught "{e}" while evaluating {key!r} with item == {current!r}')
if isinstance(values, Mapping):
for idx, val in sorted(values.items()):
@ -186,8 +185,7 @@ class LookupModule(LookupBase):
self.__process(result, terms, index + 1, current, templar, variables)
else:
raise AnsibleLookupError(
'Did not obtain dictionary or list while evaluating {key!r} with item == {item!r}, but {type}'.format(
key=key, item=current, type=type(values)))
f'Did not obtain dictionary or list while evaluating {key!r} with item == {current!r}, but {type(values)}')
def run(self, terms, variables=None, **kwargs):
"""Generate list."""
@ -201,16 +199,14 @@ class LookupModule(LookupBase):
for index, term in enumerate(terms):
if not isinstance(term, Mapping):
raise AnsibleLookupError(
'Parameter {index} must be a dictionary, got {type}'.format(
index=index, type=type(term)))
f'Parameter {index} must be a dictionary, got {type(term)}')
if len(term) != 1:
raise AnsibleLookupError(
'Parameter {index} must be a one-element dictionary, got {count} elements'.format(
index=index, count=len(term)))
f'Parameter {index} must be a one-element dictionary, got {len(term)} elements')
k, v = list(term.items())[0]
if k in vars_so_far:
raise AnsibleLookupError(
'The variable {key!r} appears more than once'.format(key=k))
f'The variable {k!r} appears more than once')
vars_so_far.add(k)
if isinstance(v, string_types):
data.append((k, v, None))
@ -218,7 +214,6 @@ class LookupModule(LookupBase):
data.append((k, None, v))
else:
raise AnsibleLookupError(
'Parameter {key!r} (index {index}) must have a value of type string, dictionary or list, got type {type}'.format(
index=index, key=k, type=type(v)))
f'Parameter {k!r} (index {index}) must have a value of type string, dictionary or list, got type {type(v)}')
self.__process(result, data, 0, {}, templar, variables)
return result

View File

@ -345,7 +345,7 @@ class LookupModule(LookupBase):
try:
rdclass = dns.rdataclass.from_text(self.get_option('class'))
except Exception as e:
raise AnsibleError("dns lookup illegal CLASS: %s" % to_native(e))
raise AnsibleError(f"dns lookup illegal CLASS: {to_native(e)}")
myres.retry_servfail = self.get_option('retry_servfail')
for t in terms:
@ -363,7 +363,7 @@ class LookupModule(LookupBase):
nsaddr = dns.resolver.query(ns)[0].address
nameservers.append(nsaddr)
except Exception as e:
raise AnsibleError("dns lookup NS: %s" % to_native(e))
raise AnsibleError(f"dns lookup NS: {to_native(e)}")
continue
if '=' in t:
try:
@ -379,7 +379,7 @@ class LookupModule(LookupBase):
try:
rdclass = dns.rdataclass.from_text(arg)
except Exception as e:
raise AnsibleError("dns lookup illegal CLASS: %s" % to_native(e))
raise AnsibleError(f"dns lookup illegal CLASS: {to_native(e)}")
elif opt == 'retry_servfail':
myres.retry_servfail = boolean(arg)
elif opt == 'fail_on_error':
@ -400,7 +400,7 @@ class LookupModule(LookupBase):
else:
domains.append(t)
# print "--- domain = {0} qtype={1} rdclass={2}".format(domain, qtype, rdclass)
# print "--- domain = {domain} qtype={qtype} rdclass={rdclass}"
if port:
myres.port = port
@ -416,7 +416,7 @@ class LookupModule(LookupBase):
except dns.exception.SyntaxError:
pass
except Exception as e:
raise AnsibleError("dns.reversename unhandled exception %s" % to_native(e))
raise AnsibleError(f"dns.reversename unhandled exception {to_native(e)}")
domains = reversed_domains
if len(domains) > 1:
@ -445,25 +445,25 @@ class LookupModule(LookupBase):
ret.append(rd)
except Exception as err:
if fail_on_error:
raise AnsibleError("Lookup failed: %s" % str(err))
raise AnsibleError(f"Lookup failed: {str(err)}")
ret.append(str(err))
except dns.resolver.NXDOMAIN as err:
if fail_on_error:
raise AnsibleError("Lookup failed: %s" % str(err))
raise AnsibleError(f"Lookup failed: {str(err)}")
if not real_empty:
ret.append('NXDOMAIN')
except dns.resolver.NoAnswer as err:
if fail_on_error:
raise AnsibleError("Lookup failed: %s" % str(err))
raise AnsibleError(f"Lookup failed: {str(err)}")
if not real_empty:
ret.append("")
except dns.resolver.Timeout as err:
if fail_on_error:
raise AnsibleError("Lookup failed: %s" % str(err))
raise AnsibleError(f"Lookup failed: {str(err)}")
if not real_empty:
ret.append("")
except dns.exception.DNSException as err:
raise AnsibleError("dns.resolver unhandled exception %s" % to_native(err))
raise AnsibleError(f"dns.resolver unhandled exception {to_native(err)}")
return ret

View File

@ -108,7 +108,7 @@ class LookupModule(LookupBase):
continue
string = ''
except DNSException as e:
raise AnsibleError("dns.resolver unhandled exception %s" % to_native(e))
raise AnsibleError(f"dns.resolver unhandled exception {to_native(e)}")
ret.append(''.join(string))

View File

@ -135,17 +135,17 @@ class LookupModule(LookupBase):
result = []
for term in terms:
display.debug("dsv_lookup term: %s" % term)
display.debug(f"dsv_lookup term: {term}")
try:
path = term.lstrip("[/:]")
if path == "":
raise AnsibleOptionsError("Invalid secret path: %s" % term)
raise AnsibleOptionsError(f"Invalid secret path: {term}")
display.vvv(u"DevOps Secrets Vault GET /secrets/%s" % path)
display.vvv(f"DevOps Secrets Vault GET /secrets/{path}")
result.append(vault.get_secret_json(path))
except SecretsVaultError as error:
raise AnsibleError(
"DevOps Secrets Vault lookup failure: %s" % error.message
f"DevOps Secrets Vault lookup failure: {error.message}"
)
return result

View File

@ -104,7 +104,7 @@ class Etcd:
def __init__(self, url, version, validate_certs):
self.url = url
self.version = version
self.baseurl = '%s/%s/keys' % (self.url, self.version)
self.baseurl = f'{self.url}/{self.version}/keys'
self.validate_certs = validate_certs
def _parse_node(self, node):
@ -125,7 +125,7 @@ class Etcd:
return path
def get(self, key):
url = "%s/%s?recursive=true" % (self.baseurl, key)
url = f"{self.baseurl}/{key}?recursive=true"
data = None
value = {}
try:

View File

@ -168,7 +168,7 @@ def etcd3_client(client_params):
etcd = etcd3.client(**client_params)
etcd.status()
except Exception as exp:
raise AnsibleLookupError('Cannot connect to etcd cluster: %s' % (to_native(exp)))
raise AnsibleLookupError(f'Cannot connect to etcd cluster: {to_native(exp)}')
return etcd
@ -204,7 +204,7 @@ class LookupModule(LookupBase):
cnx_log = dict(client_params)
if 'password' in cnx_log:
cnx_log['password'] = '<redacted>'
display.verbose("etcd3 connection parameters: %s" % cnx_log)
display.verbose(f"etcd3 connection parameters: {cnx_log}")
# connect to etcd3 server
etcd = etcd3_client(client_params)
@ -218,12 +218,12 @@ class LookupModule(LookupBase):
if val and meta:
ret.append({'key': to_native(meta.key), 'value': to_native(val)})
except Exception as exp:
display.warning('Caught except during etcd3.get_prefix: %s' % (to_native(exp)))
display.warning(f'Caught except during etcd3.get_prefix: {to_native(exp)}')
else:
try:
val, meta = etcd.get(term)
if val and meta:
ret.append({'key': to_native(meta.key), 'value': to_native(val)})
except Exception as exp:
display.warning('Caught except during etcd3.get: %s' % (to_native(exp)))
display.warning(f'Caught except during etcd3.get: {to_native(exp)}')
return ret

View File

@ -158,7 +158,7 @@ def file_props(root, path):
try:
st = os.lstat(abspath)
except OSError as e:
display.warning('filetree: Error using stat() on path %s (%s)' % (abspath, e))
display.warning(f'filetree: Error using stat() on path {abspath} ({e})')
return None
ret = dict(root=root, path=path)
@ -172,7 +172,7 @@ def file_props(root, path):
ret['state'] = 'file'
ret['src'] = abspath
else:
display.warning('filetree: Error file type of %s is not supported' % abspath)
display.warning(f'filetree: Error file type of {abspath} is not supported')
return None
ret['uid'] = st.st_uid
@ -185,7 +185,7 @@ def file_props(root, path):
ret['group'] = to_text(grp.getgrgid(st.st_gid).gr_name)
except KeyError:
ret['group'] = st.st_gid
ret['mode'] = '0%03o' % (stat.S_IMODE(st.st_mode))
ret['mode'] = f'0{stat.S_IMODE(st.st_mode):03o}'
ret['size'] = st.st_size
ret['mtime'] = st.st_mtime
ret['ctime'] = st.st_ctime
@ -212,7 +212,7 @@ class LookupModule(LookupBase):
term_file = os.path.basename(term)
dwimmed_path = self._loader.path_dwim_relative(basedir, 'files', os.path.dirname(term))
path = os.path.join(dwimmed_path, term_file)
display.debug("Walking '{0}'".format(path))
display.debug(f"Walking '{path}'")
for root, dirs, files in os.walk(path, topdown=True):
for entry in dirs + files:
relpath = os.path.relpath(os.path.join(root, entry), path)
@ -221,7 +221,7 @@ class LookupModule(LookupBase):
if relpath not in [entry['path'] for entry in ret]:
props = file_props(path, relpath)
if props is not None:
display.debug(" found '{0}'".format(os.path.join(path, relpath)))
display.debug(f" found '{os.path.join(path, relpath)}'")
ret.append(props)
return ret

View File

@ -97,7 +97,7 @@ def read_key(path, private_key=None):
with open(path, 'rb') as pem_file:
return jwk_from_pem(pem_file.read())
except Exception as e:
raise AnsibleError("Error while parsing key file: {0}".format(e))
raise AnsibleError(f"Error while parsing key file: {e}")
def encode_jwt(app_id, jwk, exp=600):
@ -110,7 +110,7 @@ def encode_jwt(app_id, jwk, exp=600):
try:
return jwt_instance.encode(payload, jwk, alg='RS256')
except Exception as e:
raise AnsibleError("Error while encoding jwt: {0}".format(e))
raise AnsibleError(f"Error while encoding jwt: {e}")
def post_request(generated_jwt, installation_id):
@ -124,19 +124,19 @@ def post_request(generated_jwt, installation_id):
except HTTPError as e:
try:
error_body = json.loads(e.read().decode())
display.vvv("Error returned: {0}".format(error_body))
display.vvv(f"Error returned: {error_body}")
except Exception:
error_body = {}
if e.code == 404:
raise AnsibleError("Github return error. Please confirm your installationd_id value is valid")
elif e.code == 401:
raise AnsibleError("Github return error. Please confirm your private key is valid")
raise AnsibleError("Unexpected data returned: {0} -- {1}".format(e, error_body))
raise AnsibleError(f"Unexpected data returned: {e} -- {error_body}")
response_body = response.read()
try:
json_data = json.loads(response_body.decode('utf-8'))
except json.decoder.JSONDecodeError as e:
raise AnsibleError("Error while dencoding JSON respone from github: {0}".format(e))
raise AnsibleError(f"Error while dencoding JSON respone from github: {e}")
return json_data.get('token')

View File

@ -79,8 +79,7 @@ class Hiera(object):
pargs.extend(hiera_key)
rc, output, err = run_cmd("{0} -c {1} {2}".format(
self.hiera_bin, self.hiera_cfg, hiera_key[0]))
rc, output, err = run_cmd(f"{self.hiera_bin} -c {self.hiera_cfg} {hiera_key[0]}")
return to_text(output.strip())

View File

@ -61,13 +61,13 @@ class LookupModule(LookupBase):
self.set_options(var_options=variables, direct=kwargs)
display.vvvv(u"keyring: %s" % keyring.get_keyring())
display.vvvv(f"keyring: {keyring.get_keyring()}")
ret = []
for term in terms:
(servicename, username) = (term.split()[0], term.split()[1])
display.vvvv(u"username: %s, servicename: %s " % (username, servicename))
display.vvvv(f"username: {username}, servicename: {servicename} ")
password = keyring.get_password(servicename, username)
if password is None:
raise AnsibleError(u"servicename: %s for user %s not found" % (servicename, username))
raise AnsibleError(f"servicename: {servicename} for user {username} not found")
ret.append(password.rstrip())
return ret

View File

@ -83,9 +83,9 @@ class LPass(object):
def get_field(self, key, field):
if field in ['username', 'password', 'url', 'notes', 'id', 'name']:
out, err = self._run(self._build_args("show", ["--{0}".format(field), key]))
out, err = self._run(self._build_args("show", [f"--{field}", key]))
else:
out, err = self._run(self._build_args("show", ["--field={0}".format(field), key]))
out, err = self._run(self._build_args("show", [f"--field={field}", key]))
return out.strip()

View File

@ -96,7 +96,7 @@ class LookupModule(LookupBase):
try:
env = lmdb.open(str(db), readonly=True)
except Exception as e:
raise AnsibleError("LMDB can't open database %s: %s" % (db, to_native(e)))
raise AnsibleError(f"LMDB can't open database {db}: {to_native(e)}")
ret = []
if len(terms) == 0:

View File

@ -78,12 +78,14 @@ class ApiError(Exception):
class ManifoldApiClient(object):
base_url = 'https://api.{api}.manifold.co/v1/{endpoint}'
http_agent = 'python-manifold-ansible-1.0.0'
def __init__(self, token):
self._token = token
def _make_url(self, api, endpoint):
return f'https://api.{api}.manifold.co/v1/{endpoint}'
def request(self, api, endpoint, *args, **kwargs):
"""
Send a request to API backend and pre-process a response.
@ -98,11 +100,11 @@ class ManifoldApiClient(object):
"""
default_headers = {
'Authorization': "Bearer {0}".format(self._token),
'Authorization': f"Bearer {self._token}",
'Accept': "*/*" # Otherwise server doesn't set content-type header
}
url = self.base_url.format(api=api, endpoint=endpoint)
url = self._make_url(api, endpoint)
headers = default_headers
arg_headers = kwargs.pop('headers', None)
@ -110,23 +112,22 @@ class ManifoldApiClient(object):
headers.update(arg_headers)
try:
display.vvvv('manifold lookup connecting to {0}'.format(url))
display.vvvv(f'manifold lookup connecting to {url}')
response = open_url(url, headers=headers, http_agent=self.http_agent, *args, **kwargs)
data = response.read()
if response.headers.get('content-type') == 'application/json':
data = json.loads(data)
return data
except ValueError:
raise ApiError('JSON response can\'t be parsed while requesting {url}:\n{json}'.format(json=data, url=url))
raise ApiError(f'JSON response can\'t be parsed while requesting {url}:\n{data}')
except HTTPError as e:
raise ApiError('Server returned: {err} while requesting {url}:\n{response}'.format(
err=str(e), url=url, response=e.read()))
raise ApiError(f'Server returned: {str(e)} while requesting {url}:\n{e.read()}')
except URLError as e:
raise ApiError('Failed lookup url for {url} : {err}'.format(url=url, err=str(e)))
raise ApiError(f'Failed lookup url for {url} : {str(e)}')
except SSLValidationError as e:
raise ApiError('Error validating the server\'s certificate for {url}: {err}'.format(url=url, err=str(e)))
raise ApiError(f'Error validating the server\'s certificate for {url}: {str(e)}')
except ConnectionError as e:
raise ApiError('Error connecting to {url}: {err}'.format(url=url, err=str(e)))
raise ApiError(f'Error connecting to {url}: {str(e)}')
def get_resources(self, team_id=None, project_id=None, label=None):
"""
@ -152,7 +153,7 @@ class ManifoldApiClient(object):
query_params['label'] = label
if query_params:
endpoint += '?' + urlencode(query_params)
endpoint += f"?{urlencode(query_params)}"
return self.request(api, endpoint)
@ -188,7 +189,7 @@ class ManifoldApiClient(object):
query_params['label'] = label
if query_params:
endpoint += '?' + urlencode(query_params)
endpoint += f"?{urlencode(query_params)}"
return self.request(api, endpoint)
@ -200,7 +201,7 @@ class ManifoldApiClient(object):
:return:
"""
api = 'marketplace'
endpoint = 'credentials?' + urlencode({'resource_id': resource_id})
endpoint = f"credentials?{urlencode({'resource_id': resource_id})}"
return self.request(api, endpoint)
@ -229,7 +230,7 @@ class LookupModule(LookupBase):
if team:
team_data = client.get_teams(team)
if len(team_data) == 0:
raise AnsibleError("Team '{0}' does not exist".format(team))
raise AnsibleError(f"Team '{team}' does not exist")
team_id = team_data[0]['id']
else:
team_id = None
@ -237,7 +238,7 @@ class LookupModule(LookupBase):
if project:
project_data = client.get_projects(project)
if len(project_data) == 0:
raise AnsibleError("Project '{0}' does not exist".format(project))
raise AnsibleError(f"Project '{project}' does not exist")
project_id = project_data[0]['id']
else:
project_id = None
@ -252,7 +253,7 @@ class LookupModule(LookupBase):
if labels and len(resources_data) < len(labels):
fetched_labels = [r['body']['label'] for r in resources_data]
not_found_labels = [label for label in labels if label not in fetched_labels]
raise AnsibleError("Resource(s) {0} do not exist".format(', '.join(not_found_labels)))
raise AnsibleError(f"Resource(s) {', '.join(not_found_labels)} do not exist")
credentials = {}
cred_map = {}
@ -262,17 +263,14 @@ class LookupModule(LookupBase):
for cred_key, cred_val in six.iteritems(resource_credentials[0]['body']['values']):
label = resource['body']['label']
if cred_key in credentials:
display.warning("'{cred_key}' with label '{old_label}' was replaced by resource data "
"with label '{new_label}'".format(cred_key=cred_key,
old_label=cred_map[cred_key],
new_label=label))
display.warning(f"'{cred_key}' with label '{cred_map[cred_key]}' was replaced by resource data with label '{label}'")
credentials[cred_key] = cred_val
cred_map[cred_key] = label
ret = [credentials]
return ret
except ApiError as e:
raise AnsibleError('API Error: {0}'.format(str(e)))
raise AnsibleError(f'API Error: {str(e)}')
except AnsibleError as e:
raise e
except Exception:

View File

@ -149,7 +149,7 @@ class LookupModule(LookupBase):
ret = []
for term in terms:
if not isinstance(term, str):
raise AnsibleError("Non-string type '{0}' passed, only 'str' types are allowed!".format(type(term)))
raise AnsibleError(f"Non-string type '{type(term)}' passed, only 'str' types are allowed!")
if not self._groups: # consider only own variables
ret.append(self._merge_vars(term, initial_value, variables))
@ -186,9 +186,9 @@ class LookupModule(LookupBase):
return False
def _merge_vars(self, search_pattern, initial_value, variables):
display.vvv("Merge variables with {0}: {1}".format(self._pattern_type, search_pattern))
display.vvv(f"Merge variables with {self._pattern_type}: {search_pattern}")
var_merge_names = sorted([key for key in variables.keys() if self._var_matches(key, search_pattern)])
display.vvv("The following variables will be merged: {0}".format(var_merge_names))
display.vvv(f"The following variables will be merged: {var_merge_names}")
prev_var_type = None
result = None
@ -226,8 +226,7 @@ class LookupModule(LookupBase):
dest[key] += value
else:
if (key in dest) and dest[key] != value:
msg = "The key '{0}' with value '{1}' will be overwritten with value '{2}' from '{3}.{0}'".format(
key, dest[key], value, ".".join(path))
msg = f"The key '{key}' with value '{dest[key]}' will be overwritten with value '{value}' from '{'.'.join(path)}.{key}'"
if self._override == "error":
raise AnsibleError(msg)

View File

@ -140,11 +140,11 @@ class OnePassCLIBase(with_metaclass(abc.ABCMeta, object)):
if missing:
prefix = "Unable to sign in to 1Password. Missing required parameter"
plural = ""
suffix = ": {params}.".format(params=", ".join(missing))
suffix = f": {', '.join(missing)}."
if len(missing) > 1:
plural = "s"
msg = "{prefix}{plural}{suffix}".format(prefix=prefix, plural=plural, suffix=suffix)
msg = f"{prefix}{plural}{suffix}"
raise AnsibleLookupError(msg)
@abc.abstractmethod
@ -210,12 +210,12 @@ class OnePassCLIBase(with_metaclass(abc.ABCMeta, object)):
try:
bin_path = get_bin_path(cls.bin)
except ValueError:
raise AnsibleLookupError("Unable to locate '%s' command line tool" % cls.bin)
raise AnsibleLookupError(f"Unable to locate '{cls.bin}' command line tool")
try:
b_out = subprocess.check_output([bin_path, "--version"], stderr=subprocess.PIPE)
except subprocess.CalledProcessError as cpe:
raise AnsibleLookupError("Unable to get the op version: %s" % cpe)
raise AnsibleLookupError(f"Unable to get the op version: {cpe}")
return to_text(b_out).strip()
@ -300,7 +300,7 @@ class OnePassCLIv1(OnePassCLIBase):
if self.account_id:
args.extend(["--account", self.account_id])
elif self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain)
account = f"{self.subdomain}.{self.domain}"
args.extend(["--account", account])
rc, out, err = self._run(args, ignore_errors=True)
@ -326,7 +326,7 @@ class OnePassCLIv1(OnePassCLIBase):
args = [
"signin",
"{0}.{1}".format(self.subdomain, self.domain),
f"{self.subdomain}.{self.domain}",
to_bytes(self.username),
to_bytes(self.secret_key),
"--raw",
@ -341,7 +341,7 @@ class OnePassCLIv1(OnePassCLIBase):
args.extend(["--account", self.account_id])
if vault is not None:
args += ["--vault={0}".format(vault)]
args += [f"--vault={vault}"]
if token is not None:
args += [to_bytes("--session=") + token]
@ -512,7 +512,7 @@ class OnePassCLIv2(OnePassCLIBase):
args = ["account", "list"]
if self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain)
account = f"{self.subdomain}.{self.domain}"
args.extend(["--account", account])
rc, out, err = self._run(args)
@ -525,7 +525,7 @@ class OnePassCLIv2(OnePassCLIBase):
if self.account_id:
args.extend(["--account", self.account_id])
elif self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain)
account = f"{self.subdomain}.{self.domain}"
args.extend(["--account", account])
rc, out, err = self._run(args, ignore_errors=True)
@ -545,7 +545,7 @@ class OnePassCLIv2(OnePassCLIBase):
args = [
"account", "add", "--raw",
"--address", "{0}.{1}".format(self.subdomain, self.domain),
"--address", f"{self.subdomain}.{self.domain}",
"--email", to_bytes(self.username),
"--signin",
]
@ -560,7 +560,7 @@ class OnePassCLIv2(OnePassCLIBase):
args.extend(["--account", self.account_id])
if vault is not None:
args += ["--vault={0}".format(vault)]
args += [f"--vault={vault}"]
if self.connect_host and self.connect_token:
if vault is None:
@ -627,7 +627,7 @@ class OnePass(object):
except TypeError as e:
raise AnsibleLookupError(e)
raise AnsibleLookupError("op version %s is unsupported" % version)
raise AnsibleLookupError(f"op version {version} is unsupported")
def set_token(self):
if self._config.config_file_path and os.path.isfile(self._config.config_file_path):

View File

@ -55,7 +55,7 @@ class OnePassCLIv2Doc(OnePassCLIv2):
def get_raw(self, item_id, vault=None, token=None):
args = ["document", "get", item_id]
if vault is not None:
args = [*args, "--vault={0}".format(vault)]
args = [*args, f"--vault={vault}"]
if self.service_account_token:
if vault is None:

View File

@ -315,7 +315,7 @@ class LookupModule(LookupBase):
)
self.realpass = 'pass: the standard unix password manager' in passoutput
except (subprocess.CalledProcessError) as e:
raise AnsibleError('exit code {0} while running {1}. Error output: {2}'.format(e.returncode, e.cmd, e.output))
raise AnsibleError(f'exit code {e.returncode} while running {e.cmd}. Error output: {e.output}')
return self.realpass
@ -332,7 +332,7 @@ class LookupModule(LookupBase):
for param in params[1:]:
name, value = param.split('=', 1)
if name not in self.paramvals:
raise AnsibleAssertionError('%s not in paramvals' % name)
raise AnsibleAssertionError(f'{name} not in paramvals')
self.paramvals[name] = value
except (ValueError, AssertionError) as e:
raise AnsibleError(e)
@ -344,12 +344,12 @@ class LookupModule(LookupBase):
except (ValueError, AssertionError) as e:
raise AnsibleError(e)
if self.paramvals['missing'] not in ['error', 'warn', 'create', 'empty']:
raise AnsibleError("{0} is not a valid option for missing".format(self.paramvals['missing']))
raise AnsibleError(f"{self.paramvals['missing']} is not a valid option for missing")
if not isinstance(self.paramvals['length'], int):
if self.paramvals['length'].isdigit():
self.paramvals['length'] = int(self.paramvals['length'])
else:
raise AnsibleError("{0} is not a correct value for length".format(self.paramvals['length']))
raise AnsibleError(f"{self.paramvals['length']} is not a correct value for length")
if self.paramvals['create']:
self.paramvals['missing'] = 'create'
@ -364,7 +364,7 @@ class LookupModule(LookupBase):
# Set PASSWORD_STORE_DIR
self.env['PASSWORD_STORE_DIR'] = self.paramvals['directory']
elif self.is_real_pass():
raise AnsibleError('Passwordstore directory \'{0}\' does not exist'.format(self.paramvals['directory']))
raise AnsibleError(f"Passwordstore directory '{self.paramvals['directory']}' does not exist")
# Set PASSWORD_STORE_UMASK if umask is set
if self.paramvals.get('umask') is not None:
@ -394,19 +394,19 @@ class LookupModule(LookupBase):
name, value = line.split(':', 1)
self.passdict[name.strip()] = value.strip()
if (self.backend == 'gopass' or
os.path.isfile(os.path.join(self.paramvals['directory'], self.passname + ".gpg"))
os.path.isfile(os.path.join(self.paramvals['directory'], f"{self.passname}.gpg"))
or not self.is_real_pass()):
# When using real pass, only accept password as found if there is a .gpg file for it (might be a tree node otherwise)
return True
except (subprocess.CalledProcessError) as e:
# 'not in password store' is the expected error if a password wasn't found
if 'not in the password store' not in e.output:
raise AnsibleError('exit code {0} while running {1}. Error output: {2}'.format(e.returncode, e.cmd, e.output))
raise AnsibleError(f'exit code {e.returncode} while running {e.cmd}. Error output: {e.output}')
if self.paramvals['missing'] == 'error':
raise AnsibleError('passwordstore: passname {0} not found and missing=error is set'.format(self.passname))
raise AnsibleError(f'passwordstore: passname {self.passname} not found and missing=error is set')
elif self.paramvals['missing'] == 'warn':
display.warning('passwordstore: passname {0} not found'.format(self.passname))
display.warning(f'passwordstore: passname {self.passname} not found')
return False
@ -433,11 +433,11 @@ class LookupModule(LookupBase):
msg_lines = []
subkey_exists = False
subkey_line = "{0}: {1}".format(subkey, newpass)
subkey_line = f"{subkey}: {newpass}"
oldpass = None
for line in self.passoutput:
if line.startswith("{0}: ".format(subkey)):
if line.startswith(f"{subkey}: "):
oldpass = self.passdict[subkey]
line = subkey_line
subkey_exists = True
@ -449,9 +449,7 @@ class LookupModule(LookupBase):
if self.paramvals["timestamp"] and self.paramvals["backup"] and oldpass and oldpass != newpass:
msg_lines.append(
"lookup_pass: old subkey '{0}' password was {1} (Updated on {2})\n".format(
subkey, oldpass, datetime
)
f"lookup_pass: old subkey '{subkey}' password was {oldpass} (Updated on {datetime})\n"
)
msg = os.linesep.join(msg_lines)
@ -464,12 +462,12 @@ class LookupModule(LookupBase):
if self.paramvals['preserve'] and self.passoutput[1:]:
msg += '\n'.join(self.passoutput[1:]) + '\n'
if self.paramvals['timestamp'] and self.paramvals['backup']:
msg += "lookup_pass: old password was {0} (Updated on {1})\n".format(self.password, datetime)
msg += f"lookup_pass: old password was {self.password} (Updated on {datetime})\n"
try:
check_output2([self.pass_cmd, 'insert', '-f', '-m', self.passname], input=msg, env=self.env)
except (subprocess.CalledProcessError) as e:
raise AnsibleError('exit code {0} while running {1}. Error output: {2}'.format(e.returncode, e.cmd, e.output))
raise AnsibleError(f'exit code {e.returncode} while running {e.cmd}. Error output: {e.output}')
return newpass
def generate_password(self):
@ -480,17 +478,17 @@ class LookupModule(LookupBase):
subkey = self.paramvals["subkey"]
if subkey != "password":
msg = "\n\n{0}: {1}".format(subkey, newpass)
msg = f"\n\n{subkey}: {newpass}"
else:
msg = newpass
if self.paramvals['timestamp']:
msg += '\n' + "lookup_pass: First generated by ansible on {0}\n".format(datetime)
msg += f"\nlookup_pass: First generated by ansible on {datetime}\n"
try:
check_output2([self.pass_cmd, 'insert', '-f', '-m', self.passname], input=msg, env=self.env)
except (subprocess.CalledProcessError) as e:
raise AnsibleError('exit code {0} while running {1}. Error output: {2}'.format(e.returncode, e.cmd, e.output))
raise AnsibleError(f'exit code {e.returncode} while running {e.cmd}. Error output: {e.output}')
return newpass
@ -505,16 +503,12 @@ class LookupModule(LookupBase):
else:
if self.paramvals["missing_subkey"] == "error":
raise AnsibleError(
"passwordstore: subkey {0} for passname {1} not found and missing_subkey=error is set".format(
self.paramvals["subkey"], self.passname
)
f"passwordstore: subkey {self.paramvals['subkey']} for passname {self.passname} not found and missing_subkey=error is set"
)
if self.paramvals["missing_subkey"] == "warn":
display.warning(
"passwordstore: subkey {0} for passname {1} not found".format(
self.paramvals["subkey"], self.passname
)
f"passwordstore: subkey {self.paramvals['subkey']} for passname {self.passname} not found"
)
return None
@ -524,7 +518,7 @@ class LookupModule(LookupBase):
if self.get_option('lock') == type:
tmpdir = os.environ.get('TMPDIR', '/tmp')
user = os.environ.get('USER')
lockfile = os.path.join(tmpdir, '.{0}.passwordstore.lock'.format(user))
lockfile = os.path.join(tmpdir, f'.{user}.passwordstore.lock')
with FileLock().lock_file(lockfile, tmpdir, self.lock_timeout):
self.locked = type
yield
@ -538,7 +532,7 @@ class LookupModule(LookupBase):
self.locked = None
timeout = self.get_option('locktimeout')
if not re.match('^[0-9]+[smh]$', timeout):
raise AnsibleError("{0} is not a correct value for locktimeout".format(timeout))
raise AnsibleError(f"{timeout} is not a correct value for locktimeout")
unit_to_seconds = {"s": 1, "m": 60, "h": 3600}
self.lock_timeout = int(timeout[:-1]) * unit_to_seconds[timeout[-1]]

View File

@ -95,6 +95,6 @@ class LookupModule(LookupBase):
values = petname.Generate(words=words, separator=separator, letters=length)
if prefix:
values = "%s%s%s" % (prefix, separator, values)
values = f"{prefix}{separator}{values}"
return [values]

View File

@ -116,5 +116,5 @@ class LookupModule(LookupBase):
ret.append(to_text(res))
except Exception as e:
# connection failed or key not found
raise AnsibleError('Encountered exception while fetching {0}: {1}'.format(term, e))
raise AnsibleError(f'Encountered exception while fetching {term}: {e}')
return ret

View File

@ -100,8 +100,8 @@ class LookupModule(LookupBase):
result = []
for term in terms:
try:
display.vvv("Secret Server lookup of Secret with ID %s" % term)
display.vvv(f"Secret Server lookup of Secret with ID {term}")
result.append({term: secret_server.get_pam_secret(term)})
except Exception as error:
raise AnsibleError("Secret Server lookup failure: %s" % error.message)
raise AnsibleError(f"Secret Server lookup failure: {error.message}")
return result

View File

@ -71,7 +71,7 @@ class LookupModule(LookupBase):
for param in params:
name, value = param.split('=')
if name not in paramvals:
raise AnsibleAssertionError('%s not in paramvals' % name)
raise AnsibleAssertionError(f'{name} not in paramvals')
paramvals[name] = value
except (ValueError, AssertionError) as e:
@ -86,11 +86,11 @@ class LookupModule(LookupBase):
if shelvefile:
res = self.read_shelve(shelvefile, key)
if res is None:
raise AnsibleError("Key %s not found in shelve file %s" % (key, shelvefile))
raise AnsibleError(f"Key {key} not found in shelve file {shelvefile}")
# Convert the value read to string
ret.append(to_text(res))
break
else:
raise AnsibleError("Could not locate shelve file in lookup: %s" % paramvals['file'])
raise AnsibleError(f"Could not locate shelve file in lookup: {paramvals['file']}")
return ret

View File

@ -306,14 +306,14 @@ class TSSClient(object):
return TSSClientV0(**server_parameters)
def get_secret(self, term, secret_path, fetch_file_attachments, file_download_path):
display.debug("tss_lookup term: %s" % term)
display.debug(f"tss_lookup term: {term}")
secret_id = self._term_to_secret_id(term)
if secret_id == 0 and secret_path:
fetch_secret_by_path = True
display.vvv(u"Secret Server lookup of Secret with path %s" % secret_path)
display.vvv(f"Secret Server lookup of Secret with path {secret_path}")
else:
fetch_secret_by_path = False
display.vvv(u"Secret Server lookup of Secret with ID %d" % secret_id)
display.vvv(f"Secret Server lookup of Secret with ID {secret_id}")
if fetch_file_attachments:
if fetch_secret_by_path:
@ -325,12 +325,12 @@ class TSSClient(object):
if i['isFile']:
try:
file_content = i['itemValue'].content
with open(os.path.join(file_download_path, str(obj['id']) + "_" + i['slug']), "wb") as f:
with open(os.path.join(file_download_path, f"{str(obj['id'])}_{i['slug']}"), "wb") as f:
f.write(file_content)
except ValueError:
raise AnsibleOptionsError("Failed to download {0}".format(str(i['slug'])))
raise AnsibleOptionsError(f"Failed to download {str(i['slug'])}")
except AttributeError:
display.warning("Could not read file content for {0}".format(str(i['slug'])))
display.warning(f"Could not read file content for {str(i['slug'])}")
finally:
i['itemValue'] = "*** Not Valid For Display ***"
else:
@ -343,9 +343,9 @@ class TSSClient(object):
return self._client.get_secret_json(secret_id)
def get_secret_ids_by_folderid(self, term):
display.debug("tss_lookup term: %s" % term)
display.debug(f"tss_lookup term: {term}")
folder_id = self._term_to_folder_id(term)
display.vvv(u"Secret Server lookup of Secret id's with Folder ID %d" % folder_id)
display.vvv(f"Secret Server lookup of Secret id's with Folder ID {folder_id}")
return self._client.get_secret_ids_by_folderid(folder_id)
@ -447,4 +447,4 @@ class LookupModule(LookupBase):
for term in terms
]
except SecretServerError as error:
raise AnsibleError("Secret Server lookup failure: %s" % error.message)
raise AnsibleError(f"Secret Server lookup failure: {error.message}")

View File

@ -1,4 +1,5 @@
.azure-pipelines/scripts/publish-codecov.py replace-urlopen
plugins/lookup/dependent.py validate-modules:unidiomatic-typecheck
plugins/modules/consul_session.py validate-modules:parameter-state-invalid-choice
plugins/modules/homectl.py import-3.11 # Uses deprecated stdlib library 'crypt'
plugins/modules/iptables_state.py validate-modules:undocumented-parameter # params _back and _timeout used by action plugin

View File

@ -1,3 +1,4 @@
plugins/lookup/dependent.py validate-modules:unidiomatic-typecheck
plugins/modules/consul_session.py validate-modules:parameter-state-invalid-choice
plugins/modules/homectl.py import-3.11 # Uses deprecated stdlib library 'crypt'
plugins/modules/homectl.py import-3.12 # Uses deprecated stdlib library 'crypt'