inventory plugins: use f-strings (#9323)

* inventory plugins: use f-strings

* add changelog frag
pull/8437/merge
Alexei Znamensky 2024-12-23 23:02:30 +13:00 committed by GitHub
parent cb2cd00cd1
commit 1d8f0b2942
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 148 additions and 149 deletions

View File

@ -0,0 +1,14 @@
minor_changes:
- cobbler inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- gitlab_runners inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- icinga2 inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- linode inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- lxd inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- nmap inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- online inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- opennebula inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- proxmox inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- scaleway inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- stackpath_compute inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- virtualbox inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).
- xen_orchestra inventory plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9323).

View File

@ -160,7 +160,7 @@ class InventoryModule(BaseInventoryPlugin, Cacheable):
raise AnsibleError('Could not import xmlrpc client library')
if self.connection is None:
self.display.vvvv('Connecting to %s\n' % self.cobbler_url)
self.display.vvvv(f'Connecting to {self.cobbler_url}\n')
self.connection = xmlrpc_client.Server(self.cobbler_url, allow_none=True)
self.token = None
if self.get_option('user') is not None:
@ -211,7 +211,7 @@ class InventoryModule(BaseInventoryPlugin, Cacheable):
return self._cache[self.cache_key]['systems']
def _add_safe_group_name(self, group, child=None):
group_name = self.inventory.add_group(to_safe_group_name('%s%s' % (self.get_option('group_prefix'), group.lower().replace(" ", ""))))
group_name = self.inventory.add_group(to_safe_group_name(f"{self.get_option('group_prefix')}{group.lower().replace(' ', '')}"))
if child is not None:
self.inventory.add_child(group_name, child)
return group_name
@ -243,16 +243,16 @@ class InventoryModule(BaseInventoryPlugin, Cacheable):
for profile in self._get_profiles():
if profile['parent']:
self.display.vvvv('Processing profile %s with parent %s\n' % (profile['name'], profile['parent']))
self.display.vvvv(f"Processing profile {profile['name']} with parent {profile['parent']}\n")
if not self._exclude_profile(profile['parent']):
parent_group_name = self._add_safe_group_name(profile['parent'])
self.display.vvvv('Added profile parent group %s\n' % parent_group_name)
self.display.vvvv(f'Added profile parent group {parent_group_name}\n')
if not self._exclude_profile(profile['name']):
group_name = self._add_safe_group_name(profile['name'])
self.display.vvvv('Added profile group %s\n' % group_name)
self.display.vvvv(f'Added profile group {group_name}\n')
self.inventory.add_child(parent_group_name, group_name)
else:
self.display.vvvv('Processing profile %s without parent\n' % profile['name'])
self.display.vvvv(f"Processing profile {profile['name']} without parent\n")
# Create a hierarchy of profile names
profile_elements = profile['name'].split('-')
i = 0
@ -260,12 +260,12 @@ class InventoryModule(BaseInventoryPlugin, Cacheable):
profile_group = '-'.join(profile_elements[0:i + 1])
profile_group_child = '-'.join(profile_elements[0:i + 2])
if self._exclude_profile(profile_group):
self.display.vvvv('Excluding profile %s\n' % profile_group)
self.display.vvvv(f'Excluding profile {profile_group}\n')
break
group_name = self._add_safe_group_name(profile_group)
self.display.vvvv('Added profile group %s\n' % group_name)
self.display.vvvv(f'Added profile group {group_name}\n')
child_group_name = self._add_safe_group_name(profile_group_child)
self.display.vvvv('Added profile child group %s to %s\n' % (child_group_name, group_name))
self.display.vvvv(f'Added profile child group {child_group_name} to {group_name}\n')
self.inventory.add_child(group_name, child_group_name)
i = i + 1
@ -273,7 +273,7 @@ class InventoryModule(BaseInventoryPlugin, Cacheable):
self.group = to_safe_group_name(self.get_option('group'))
if self.group is not None and self.group != '':
self.inventory.add_group(self.group)
self.display.vvvv('Added site group %s\n' % self.group)
self.display.vvvv(f'Added site group {self.group}\n')
ip_addresses = {}
ipv6_addresses = {}
@ -286,14 +286,14 @@ class InventoryModule(BaseInventoryPlugin, Cacheable):
interfaces = host['interfaces']
if set(host['mgmt_classes']) & set(self.include_mgmt_classes):
self.display.vvvv('Including host %s in mgmt_classes %s\n' % (host['name'], host['mgmt_classes']))
self.display.vvvv(f"Including host {host['name']} in mgmt_classes {host['mgmt_classes']}\n")
else:
if self._exclude_profile(host['profile']):
self.display.vvvv('Excluding host %s in profile %s\n' % (host['name'], host['profile']))
self.display.vvvv(f"Excluding host {host['name']} in profile {host['profile']}\n")
continue
if set(host['mgmt_classes']) & set(self.exclude_mgmt_classes):
self.display.vvvv('Excluding host %s in mgmt_classes %s\n' % (host['name'], host['mgmt_classes']))
self.display.vvvv(f"Excluding host {host['name']} in mgmt_classes {host['mgmt_classes']}\n")
continue
# hostname is often empty for non-static IP hosts
@ -303,21 +303,21 @@ class InventoryModule(BaseInventoryPlugin, Cacheable):
this_dns_name = ivalue.get('dns_name', None)
if this_dns_name is not None and this_dns_name != "":
hostname = make_unsafe(this_dns_name)
self.display.vvvv('Set hostname to %s from %s\n' % (hostname, iname))
self.display.vvvv(f'Set hostname to {hostname} from {iname}\n')
if hostname == '':
self.display.vvvv('Cannot determine hostname for host %s, skipping\n' % host['name'])
self.display.vvvv(f"Cannot determine hostname for host {host['name']}, skipping\n")
continue
self.inventory.add_host(hostname)
self.display.vvvv('Added host %s hostname %s\n' % (host['name'], hostname))
self.display.vvvv(f"Added host {host['name']} hostname {hostname}\n")
# Add host to profile group
if host['profile'] != '':
group_name = self._add_safe_group_name(host['profile'], child=hostname)
self.display.vvvv('Added host %s to profile group %s\n' % (hostname, group_name))
self.display.vvvv(f'Added host {hostname} to profile group {group_name}\n')
else:
self.display.warning('Host %s has an empty profile\n' % (hostname))
self.display.warning(f'Host {hostname} has an empty profile\n')
# Add host to groups specified by group_by fields
for group_by in self.group_by:
@ -327,7 +327,7 @@ class InventoryModule(BaseInventoryPlugin, Cacheable):
groups = [host[group_by]] if isinstance(host[group_by], str) else host[group_by]
for group in groups:
group_name = self._add_safe_group_name(group, child=hostname)
self.display.vvvv('Added host %s to group_by %s group %s\n' % (hostname, group_by, group_name))
self.display.vvvv(f'Added host {hostname} to group_by {group_by} group {group_name}\n')
# Add to group for this inventory
if self.group is not None:
@ -377,7 +377,7 @@ class InventoryModule(BaseInventoryPlugin, Cacheable):
try:
self.inventory.set_variable(hostname, 'cobbler', make_unsafe(host))
except ValueError as e:
self.display.warning("Could not set host info for %s: %s" % (hostname, to_text(e)))
self.display.warning(f"Could not set host info for {hostname}: {to_text(e)}")
if self.get_option('want_ip_addresses'):
self.inventory.set_variable(self.group, 'cobbler_ipv4_addresses', make_unsafe(ip_addresses))

View File

@ -124,7 +124,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable):
# Create groups based on variable values and add the corresponding hosts to it
self._add_host_to_keyed_groups(self.get_option('keyed_groups'), host_attrs, host, strict=strict)
except Exception as e:
raise AnsibleParserError('Unable to fetch hosts from GitLab API, this was the original exception: %s' % to_native(e))
raise AnsibleParserError(f'Unable to fetch hosts from GitLab API, this was the original exception: {to_native(e)}')
def verify_file(self, path):
"""Return the possibly of a file being consumable by this plugin."""

View File

@ -141,7 +141,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable):
'User-Agent': "ansible-icinga2-inv",
'Accept': "application/json",
}
api_status_url = self.icinga2_url + "/status"
api_status_url = f"{self.icinga2_url}/status"
request_args = {
'headers': self.headers,
'url_username': self.icinga2_user,
@ -151,7 +151,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable):
open_url(api_status_url, **request_args)
def _post_request(self, request_url, data=None):
self.display.vvv("Requested URL: %s" % request_url)
self.display.vvv(f"Requested URL: {request_url}")
request_args = {
'headers': self.headers,
'url_username': self.icinga2_user,
@ -160,42 +160,38 @@ class InventoryModule(BaseInventoryPlugin, Constructable):
}
if data is not None:
request_args['data'] = json.dumps(data)
self.display.vvv("Request Args: %s" % request_args)
self.display.vvv(f"Request Args: {request_args}")
try:
response = open_url(request_url, **request_args)
except HTTPError as e:
try:
error_body = json.loads(e.read().decode())
self.display.vvv("Error returned: {0}".format(error_body))
self.display.vvv(f"Error returned: {error_body}")
except Exception:
error_body = {"status": None}
if e.code == 404 and error_body.get('status') == "No objects found.":
raise AnsibleParserError("Host filter returned no data. Please confirm your host_filter value is valid")
raise AnsibleParserError("Unexpected data returned: {0} -- {1}".format(e, error_body))
raise AnsibleParserError(f"Unexpected data returned: {e} -- {error_body}")
response_body = response.read()
json_data = json.loads(response_body.decode('utf-8'))
self.display.vvv("Returned Data: %s" % json.dumps(json_data, indent=4, sort_keys=True))
self.display.vvv(f"Returned Data: {json.dumps(json_data, indent=4, sort_keys=True)}")
if 200 <= response.status <= 299:
return json_data
if response.status == 404 and json_data['status'] == "No objects found.":
raise AnsibleParserError(
"API returned no data -- Response: %s - %s"
% (response.status, json_data['status']))
f"API returned no data -- Response: {response.status} - {json_data['status']}")
if response.status == 401:
raise AnsibleParserError(
"API was unable to complete query -- Response: %s - %s"
% (response.status, json_data['status']))
f"API was unable to complete query -- Response: {response.status} - {json_data['status']}")
if response.status == 500:
raise AnsibleParserError(
"API Response - %s - %s"
% (json_data['status'], json_data['errors']))
f"API Response - {json_data['status']} - {json_data['errors']}")
raise AnsibleParserError(
"Unexpected data returned - %s - %s"
% (json_data['status'], json_data['errors']))
f"Unexpected data returned - {json_data['status']} - {json_data['errors']}")
def _query_hosts(self, hosts=None, attrs=None, joins=None, host_filter=None):
query_hosts_url = "{0}/objects/hosts".format(self.icinga2_url)
query_hosts_url = f"{self.icinga2_url}/objects/hosts"
self.headers['X-HTTP-Method-Override'] = 'GET'
data_dict = dict()
if hosts:
@ -302,7 +298,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable):
if self.templar.is_template(self.icinga2_password):
self.icinga2_password = self.templar.template(variable=self.icinga2_password, disable_lookups=False)
self.icinga2_url = self.icinga2_url.rstrip('/') + '/v1'
self.icinga2_url = f"{self.icinga2_url.rstrip('/')}/v1"
# Not currently enabled
# self.cache_key = self.get_cache_key(path)

View File

@ -161,7 +161,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
try:
self.instances = self.client.linode.instances()
except LinodeApiError as exception:
raise AnsibleError('Linode client raised: %s' % exception)
raise AnsibleError(f'Linode client raised: {exception}')
def _add_groups(self):
"""Add Linode instance groups to the dynamic inventory."""

View File

@ -211,7 +211,7 @@ class InventoryModule(BaseInventoryPlugin):
with open(path, 'r') as json_file:
return json.load(json_file)
except (IOError, json.decoder.JSONDecodeError) as err:
raise AnsibleParserError('Could not load the test data from {0}: {1}'.format(to_native(path), to_native(err)))
raise AnsibleParserError(f'Could not load the test data from {to_native(path)}: {to_native(err)}')
def save_json_data(self, path, file_name=None):
"""save data as json
@ -241,7 +241,7 @@ class InventoryModule(BaseInventoryPlugin):
with open(os.path.abspath(os.path.join(cwd, *path)), 'w') as json_file:
json.dump(self.data, json_file)
except IOError as err:
raise AnsibleParserError('Could not save data: {0}'.format(to_native(err)))
raise AnsibleParserError(f'Could not save data: {to_native(err)}')
def verify_file(self, path):
"""Check the config
@ -281,7 +281,7 @@ class InventoryModule(BaseInventoryPlugin):
if not isinstance(url, str):
return False
if not url.startswith(('unix:', 'https:')):
raise AnsibleError('URL is malformed: {0}'.format(to_native(url)))
raise AnsibleError(f'URL is malformed: {to_native(url)}')
return True
def _connect_to_socket(self):
@ -306,7 +306,7 @@ class InventoryModule(BaseInventoryPlugin):
return socket_connection
except LXDClientException as err:
error_storage[url] = err
raise AnsibleError('No connection to the socket: {0}'.format(to_native(error_storage)))
raise AnsibleError(f'No connection to the socket: {to_native(error_storage)}')
def _get_networks(self):
"""Get Networknames
@ -355,7 +355,7 @@ class InventoryModule(BaseInventoryPlugin):
# }
url = '/1.0/instances'
if self.project:
url = url + '?{0}'.format(urlencode(dict(project=self.project)))
url = f"{url}?{urlencode(dict(project=self.project))}"
instances = self.socket.do('GET', url)
@ -383,10 +383,10 @@ class InventoryModule(BaseInventoryPlugin):
config = {}
if isinstance(branch, (tuple, list)):
config[name] = {branch[1]: self.socket.do(
'GET', '/1.0/{0}/{1}/{2}?{3}'.format(to_native(branch[0]), to_native(name), to_native(branch[1]), urlencode(dict(project=self.project))))}
'GET', f'/1.0/{to_native(branch[0])}/{to_native(name)}/{to_native(branch[1])}?{urlencode(dict(project=self.project))}')}
else:
config[name] = {branch: self.socket.do(
'GET', '/1.0/{0}/{1}?{2}'.format(to_native(branch), to_native(name), urlencode(dict(project=self.project))))}
'GET', f'/1.0/{to_native(branch)}/{to_native(name)}?{urlencode(dict(project=self.project))}')}
return config
def get_instance_data(self, names):
@ -449,7 +449,7 @@ class InventoryModule(BaseInventoryPlugin):
None
Returns:
dict(network_configuration): network config"""
instance_network_interfaces = self._get_data_entry('instances/{0}/state/metadata/network'.format(instance_name))
instance_network_interfaces = self._get_data_entry(f'instances/{instance_name}/state/metadata/network')
network_configuration = None
if instance_network_interfaces:
network_configuration = {}
@ -462,7 +462,7 @@ class InventoryModule(BaseInventoryPlugin):
address_set['family'] = address.get('family')
address_set['address'] = address.get('address')
address_set['netmask'] = address.get('netmask')
address_set['combined'] = address.get('address') + '/' + address.get('netmask')
address_set['combined'] = f"{address.get('address')}/{address.get('netmask')}"
network_configuration[interface_name].append(address_set)
return network_configuration
@ -479,7 +479,7 @@ class InventoryModule(BaseInventoryPlugin):
None
Returns:
str(prefered_interface): None or interface name"""
instance_network_interfaces = self._get_data_entry('inventory/{0}/network_interfaces'.format(instance_name))
instance_network_interfaces = self._get_data_entry(f'inventory/{instance_name}/network_interfaces')
prefered_interface = None # init
if instance_network_interfaces: # instance have network interfaces
# generator if interfaces which start with the desired pattern
@ -516,7 +516,7 @@ class InventoryModule(BaseInventoryPlugin):
# "network":"lxdbr0",
# "type":"nic"},
vlan_ids = {}
devices = self._get_data_entry('instances/{0}/instances/metadata/expanded_devices'.format(to_native(instance_name)))
devices = self._get_data_entry(f'instances/{to_native(instance_name)}/instances/metadata/expanded_devices')
for device in devices:
if 'network' in devices[device]:
if devices[device]['network'] in network_vlans:
@ -579,7 +579,7 @@ class InventoryModule(BaseInventoryPlugin):
else:
path[instance_name][key] = value
except KeyError as err:
raise AnsibleParserError("Unable to store Information: {0}".format(to_native(err)))
raise AnsibleParserError(f"Unable to store Information: {to_native(err)}")
def extract_information_from_instance_configs(self):
"""Process configuration information
@ -600,24 +600,24 @@ class InventoryModule(BaseInventoryPlugin):
for instance_name in self.data['instances']:
self._set_data_entry(instance_name, 'os', self._get_data_entry(
'instances/{0}/instances/metadata/config/image.os'.format(instance_name)))
f'instances/{instance_name}/instances/metadata/config/image.os'))
self._set_data_entry(instance_name, 'release', self._get_data_entry(
'instances/{0}/instances/metadata/config/image.release'.format(instance_name)))
f'instances/{instance_name}/instances/metadata/config/image.release'))
self._set_data_entry(instance_name, 'version', self._get_data_entry(
'instances/{0}/instances/metadata/config/image.version'.format(instance_name)))
f'instances/{instance_name}/instances/metadata/config/image.version'))
self._set_data_entry(instance_name, 'profile', self._get_data_entry(
'instances/{0}/instances/metadata/profiles'.format(instance_name)))
f'instances/{instance_name}/instances/metadata/profiles'))
self._set_data_entry(instance_name, 'location', self._get_data_entry(
'instances/{0}/instances/metadata/location'.format(instance_name)))
f'instances/{instance_name}/instances/metadata/location'))
self._set_data_entry(instance_name, 'state', self._get_data_entry(
'instances/{0}/instances/metadata/config/volatile.last_state.power'.format(instance_name)))
f'instances/{instance_name}/instances/metadata/config/volatile.last_state.power'))
self._set_data_entry(instance_name, 'type', self._get_data_entry(
'instances/{0}/instances/metadata/type'.format(instance_name)))
f'instances/{instance_name}/instances/metadata/type'))
self._set_data_entry(instance_name, 'network_interfaces', self.extract_network_information_from_instance_config(instance_name))
self._set_data_entry(instance_name, 'preferred_interface', self.get_prefered_instance_network_interface(instance_name))
self._set_data_entry(instance_name, 'vlan_ids', self.get_instance_vlans(instance_name))
self._set_data_entry(instance_name, 'project', self._get_data_entry(
'instances/{0}/instances/metadata/project'.format(instance_name)))
f'instances/{instance_name}/instances/metadata/project'))
def build_inventory_network(self, instance_name):
"""Add the network interfaces of the instance to the inventory
@ -651,18 +651,18 @@ class InventoryModule(BaseInventoryPlugin):
None
Returns:
dict(interface_name: ip)"""
prefered_interface = self._get_data_entry('inventory/{0}/preferred_interface'.format(instance_name)) # name or None
prefered_interface = self._get_data_entry(f'inventory/{instance_name}/preferred_interface') # name or None
prefered_instance_network_family = self.prefered_instance_network_family
ip_address = ''
if prefered_interface:
interface = self._get_data_entry('inventory/{0}/network_interfaces/{1}'.format(instance_name, prefered_interface))
interface = self._get_data_entry(f'inventory/{instance_name}/network_interfaces/{prefered_interface}')
for config in interface:
if config['family'] == prefered_instance_network_family:
ip_address = config['address']
break
else:
interfaces = self._get_data_entry('inventory/{0}/network_interfaces'.format(instance_name))
interfaces = self._get_data_entry(f'inventory/{instance_name}/network_interfaces')
for interface in interfaces.values():
for config in interface:
if config['family'] == prefered_instance_network_family:
@ -670,7 +670,7 @@ class InventoryModule(BaseInventoryPlugin):
break
return ip_address
if self._get_data_entry('inventory/{0}/network_interfaces'.format(instance_name)): # instance have network interfaces
if self._get_data_entry(f'inventory/{instance_name}/network_interfaces'): # instance have network interfaces
self.inventory.set_variable(instance_name, 'ansible_connection', 'ssh')
self.inventory.set_variable(instance_name, 'ansible_host', make_unsafe(interface_selection(instance_name)))
else:
@ -691,7 +691,7 @@ class InventoryModule(BaseInventoryPlugin):
Returns:
None"""
for instance_name in self.data['inventory']:
instance_state = str(self._get_data_entry('inventory/{0}/state'.format(instance_name)) or "STOPPED").lower()
instance_state = str(self._get_data_entry(f'inventory/{instance_name}/state') or "STOPPED").lower()
# Only consider instances that match the "state" filter, if self.state is not None
if self.filter:
@ -703,34 +703,34 @@ class InventoryModule(BaseInventoryPlugin):
# add network information
self.build_inventory_network(instance_name)
# add os
v = self._get_data_entry('inventory/{0}/os'.format(instance_name))
v = self._get_data_entry(f'inventory/{instance_name}/os')
if v:
self.inventory.set_variable(instance_name, 'ansible_lxd_os', make_unsafe(v.lower()))
# add release
v = self._get_data_entry('inventory/{0}/release'.format(instance_name))
v = self._get_data_entry(f'inventory/{instance_name}/release')
if v:
self.inventory.set_variable(
instance_name, 'ansible_lxd_release', make_unsafe(v.lower()))
# add profile
self.inventory.set_variable(
instance_name, 'ansible_lxd_profile', make_unsafe(self._get_data_entry('inventory/{0}/profile'.format(instance_name))))
instance_name, 'ansible_lxd_profile', make_unsafe(self._get_data_entry(f'inventory/{instance_name}/profile')))
# add state
self.inventory.set_variable(
instance_name, 'ansible_lxd_state', make_unsafe(instance_state))
# add type
self.inventory.set_variable(
instance_name, 'ansible_lxd_type', make_unsafe(self._get_data_entry('inventory/{0}/type'.format(instance_name))))
instance_name, 'ansible_lxd_type', make_unsafe(self._get_data_entry(f'inventory/{instance_name}/type')))
# add location information
if self._get_data_entry('inventory/{0}/location'.format(instance_name)) != "none": # wrong type by lxd 'none' != 'None'
if self._get_data_entry(f'inventory/{instance_name}/location') != "none": # wrong type by lxd 'none' != 'None'
self.inventory.set_variable(
instance_name, 'ansible_lxd_location', make_unsafe(self._get_data_entry('inventory/{0}/location'.format(instance_name))))
instance_name, 'ansible_lxd_location', make_unsafe(self._get_data_entry(f'inventory/{instance_name}/location')))
# add VLAN_ID information
if self._get_data_entry('inventory/{0}/vlan_ids'.format(instance_name)):
if self._get_data_entry(f'inventory/{instance_name}/vlan_ids'):
self.inventory.set_variable(
instance_name, 'ansible_lxd_vlan_ids', make_unsafe(self._get_data_entry('inventory/{0}/vlan_ids'.format(instance_name))))
instance_name, 'ansible_lxd_vlan_ids', make_unsafe(self._get_data_entry(f'inventory/{instance_name}/vlan_ids')))
# add project
self.inventory.set_variable(
instance_name, 'ansible_lxd_project', make_unsafe(self._get_data_entry('inventory/{0}/project'.format(instance_name))))
instance_name, 'ansible_lxd_project', make_unsafe(self._get_data_entry(f'inventory/{instance_name}/project')))
def build_inventory_groups_location(self, group_name):
"""create group by attribute: location
@ -792,7 +792,7 @@ class InventoryModule(BaseInventoryPlugin):
network = ipaddress.ip_network(to_text(self.groupby[group_name].get('attribute')))
except ValueError as err:
raise AnsibleParserError(
'Error while parsing network range {0}: {1}'.format(self.groupby[group_name].get('attribute'), to_native(err)))
f"Error while parsing network range {self.groupby[group_name].get('attribute')}: {to_native(err)}")
for instance_name in self.inventory.hosts:
if self.data['inventory'][instance_name].get('network_interfaces') is not None:
@ -997,12 +997,12 @@ class InventoryModule(BaseInventoryPlugin):
elif self.groupby[group_name].get('type') == 'project':
self.build_inventory_groups_project(group_name)
else:
raise AnsibleParserError('Unknown group type: {0}'.format(to_native(group_name)))
raise AnsibleParserError(f'Unknown group type: {to_native(group_name)}')
if self.groupby:
for group_name in self.groupby:
if not group_name.isalnum():
raise AnsibleParserError('Invalid character(s) in groupname: {0}'.format(to_native(group_name)))
raise AnsibleParserError(f'Invalid character(s) in groupname: {to_native(group_name)}')
group_type(make_unsafe(group_name))
def build_inventory(self):
@ -1039,7 +1039,7 @@ class InventoryModule(BaseInventoryPlugin):
None"""
iter_keys = list(self.data['instances'].keys())
for instance_name in iter_keys:
if self._get_data_entry('instances/{0}/instances/metadata/type'.format(instance_name)) != self.type_filter:
if self._get_data_entry(f'instances/{instance_name}/instances/metadata/type') != self.type_filter:
del self.data['instances'][instance_name]
def _populate(self):
@ -1120,6 +1120,6 @@ class InventoryModule(BaseInventoryPlugin):
self.url = self.get_option('url')
except Exception as err:
raise AnsibleParserError(
'All correct options required: {0}'.format(to_native(err)))
f'All correct options required: {to_native(err)}')
# Call our internal helper to populate the dynamic inventory
self._populate()

View File

@ -178,7 +178,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
try:
self._nmap = get_bin_path('nmap')
except ValueError as e:
raise AnsibleParserError('nmap inventory plugin requires the nmap cli tool to work: {0}'.format(to_native(e)))
raise AnsibleParserError(f'nmap inventory plugin requires the nmap cli tool to work: {to_native(e)}')
super(InventoryModule, self).parse(inventory, loader, path, cache=cache)
@ -248,7 +248,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise AnsibleParserError('Failed to run nmap, rc=%s: %s' % (p.returncode, to_native(stderr)))
raise AnsibleParserError(f'Failed to run nmap, rc={p.returncode}: {to_native(stderr)}')
# parse results
host = None
@ -259,7 +259,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
try:
t_stdout = to_text(stdout, errors='surrogate_or_strict')
except UnicodeError as e:
raise AnsibleParserError('Invalid (non unicode) input returned: %s' % to_native(e))
raise AnsibleParserError(f'Invalid (non unicode) input returned: {to_native(e)}')
for line in t_stdout.splitlines():
hits = self.find_host.match(line)
@ -300,7 +300,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
results[-1]['ports'] = ports
except Exception as e:
raise AnsibleParserError("failed to parse %s: %s " % (to_native(path), to_native(e)))
raise AnsibleParserError(f"failed to parse {to_native(path)}: {to_native(e)} ")
if cache_needs_update:
self._cache[cache_key] = results

View File

@ -138,7 +138,7 @@ class InventoryModule(BaseInventoryPlugin):
try:
response = open_url(url, headers=self.headers)
except Exception as e:
self.display.warning("An error happened while fetching: %s" % url)
self.display.warning(f"An error happened while fetching: {url}")
return None
try:
@ -245,8 +245,8 @@ class InventoryModule(BaseInventoryPlugin):
}
self.headers = {
'Authorization': "Bearer %s" % token,
'User-Agent': "ansible %s Python %s" % (ansible_version, python_version.split(' ', 1)[0]),
'Authorization': f"Bearer {token}",
'User-Agent': f"ansible {ansible_version} Python {python_version.split(' ', 1)[0]}",
'Content-type': 'application/json'
}

View File

@ -128,9 +128,9 @@ class InventoryModule(BaseInventoryPlugin, Constructable):
authstring = fp.read().rstrip()
username, password = authstring.split(":")
except (OSError, IOError):
raise AnsibleError("Could not find or read ONE_AUTH file at '{e}'".format(e=authfile))
raise AnsibleError(f"Could not find or read ONE_AUTH file at '{authfile}'")
except Exception:
raise AnsibleError("Error occurs when reading ONE_AUTH file at '{e}'".format(e=authfile))
raise AnsibleError(f"Error occurs when reading ONE_AUTH file at '{authfile}'")
auth_params = namedtuple('auth', ('url', 'username', 'password'))
@ -166,13 +166,13 @@ class InventoryModule(BaseInventoryPlugin, Constructable):
if not (auth.username and auth.password):
raise AnsibleError('API Credentials missing. Check OpenNebula inventory file.')
else:
one_client = pyone.OneServer(auth.url, session=auth.username + ':' + auth.password)
one_client = pyone.OneServer(auth.url, session=f"{auth.username}:{auth.password}")
# get hosts (VMs)
try:
vm_pool = one_client.vmpool.infoextended(-2, -1, -1, 3)
except Exception as e:
raise AnsibleError("Something happened during XML-RPC call: {e}".format(e=to_native(e)))
raise AnsibleError(f"Something happened during XML-RPC call: {to_native(e)}")
return vm_pool

View File

@ -284,12 +284,12 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
if self.proxmox_password:
credentials = urlencode({'username': self.proxmox_user, 'password': self.proxmox_password})
a = self._get_session()
ret = a.post('%s/api2/json/access/ticket' % self.proxmox_url, data=credentials)
ret = a.post(f'{self.proxmox_url}/api2/json/access/ticket', data=credentials)
json = ret.json()
self.headers = {
# only required for POST/PUT/DELETE methods, which we are not using currently
# 'CSRFPreventionToken': json['data']['CSRFPreventionToken'],
'Cookie': 'PVEAuthCookie={0}'.format(json['data']['ticket'])
'Cookie': f"PVEAuthCookie={json['data']['ticket']}"
}
else:
# Clean and format token components
@ -341,23 +341,23 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
return make_unsafe(self._cache[self.cache_key][url])
def _get_nodes(self):
return self._get_json("%s/api2/json/nodes" % self.proxmox_url)
return self._get_json(f"{self.proxmox_url}/api2/json/nodes")
def _get_pools(self):
return self._get_json("%s/api2/json/pools" % self.proxmox_url)
return self._get_json(f"{self.proxmox_url}/api2/json/pools")
def _get_lxc_per_node(self, node):
return self._get_json("%s/api2/json/nodes/%s/lxc" % (self.proxmox_url, node))
return self._get_json(f"{self.proxmox_url}/api2/json/nodes/{node}/lxc")
def _get_qemu_per_node(self, node):
return self._get_json("%s/api2/json/nodes/%s/qemu" % (self.proxmox_url, node))
return self._get_json(f"{self.proxmox_url}/api2/json/nodes/{node}/qemu")
def _get_members_per_pool(self, pool):
ret = self._get_json("%s/api2/json/pools/%s" % (self.proxmox_url, pool))
ret = self._get_json(f"{self.proxmox_url}/api2/json/pools/{pool}")
return ret['members']
def _get_node_ip(self, node):
ret = self._get_json("%s/api2/json/nodes/%s/network" % (self.proxmox_url, node))
ret = self._get_json(f"{self.proxmox_url}/api2/json/nodes/{node}/network")
for iface in ret:
try:
@ -371,7 +371,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
if status_key not in properties or not properties[status_key] == 'running':
return
ret = self._get_json("%s/api2/json/nodes/%s/lxc/%s/interfaces" % (self.proxmox_url, node, vmid), ignore_errors=[501])
ret = self._get_json(f"{self.proxmox_url}/api2/json/nodes/{node}/lxc/{vmid}/interfaces", ignore_errors=[501])
if not ret:
return
@ -398,9 +398,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
try:
ifaces = self._get_json(
"%s/api2/json/nodes/%s/%s/%s/agent/network-get-interfaces" % (
self.proxmox_url, node, vmtype, vmid
)
f"{self.proxmox_url}/api2/json/nodes/{node}/{vmtype}/{vmid}/agent/network-get-interfaces"
)['result']
if "error" in ifaces:
@ -418,7 +416,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
result.append({
'name': iface['name'],
'mac-address': iface['hardware-address'] if 'hardware-address' in iface else '',
'ip-addresses': ["%s/%s" % (ip['ip-address'], ip['prefix']) for ip in iface['ip-addresses']] if 'ip-addresses' in iface else []
'ip-addresses': [f"{ip['ip-address']}/{ip['prefix']}" for ip in iface['ip-addresses']] if 'ip-addresses' in iface else []
})
except requests.HTTPError:
pass
@ -426,7 +424,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
return result
def _get_vm_config(self, properties, node, vmid, vmtype, name):
ret = self._get_json("%s/api2/json/nodes/%s/%s/%s/config" % (self.proxmox_url, node, vmtype, vmid))
ret = self._get_json(f"{self.proxmox_url}/api2/json/nodes/{node}/{vmtype}/{vmid}/config")
properties[self._fact('node')] = node
properties[self._fact('vmid')] = vmid
@ -442,13 +440,13 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
try:
# fixup disk images as they have no key
if config == 'rootfs' or config.startswith(('virtio', 'sata', 'ide', 'scsi')):
value = ('disk_image=' + value)
value = f"disk_image={value}"
# Additional field containing parsed tags as list
if config == 'tags':
stripped_value = value.strip()
if stripped_value:
parsed_key = key + "_parsed"
parsed_key = f"{key}_parsed"
properties[parsed_key] = [tag.strip() for tag in stripped_value.replace(',', ';').split(";")]
# The first field in the agent string tells you whether the agent is enabled
@ -464,7 +462,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
if agent_enabled:
agent_iface_value = self._get_agent_network_interfaces(node, vmid, vmtype)
if agent_iface_value:
agent_iface_key = self.to_safe('%s%s' % (key, "_interfaces"))
agent_iface_key = self.to_safe(f'{key}_interfaces')
properties[agent_iface_key] = agent_iface_value
if config == 'lxc':
@ -489,13 +487,13 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
return None
def _get_vm_status(self, properties, node, vmid, vmtype, name):
ret = self._get_json("%s/api2/json/nodes/%s/%s/%s/status/current" % (self.proxmox_url, node, vmtype, vmid))
ret = self._get_json(f"{self.proxmox_url}/api2/json/nodes/{node}/{vmtype}/{vmid}/status/current")
properties[self._fact('status')] = ret['status']
if vmtype == 'qemu':
properties[self._fact('qmpstatus')] = ret['qmpstatus']
def _get_vm_snapshots(self, properties, node, vmid, vmtype, name):
ret = self._get_json("%s/api2/json/nodes/%s/%s/%s/snapshot" % (self.proxmox_url, node, vmtype, vmid))
ret = self._get_json(f"{self.proxmox_url}/api2/json/nodes/{node}/{vmtype}/{vmid}/snapshot")
snapshots = [snapshot['name'] for snapshot in ret if snapshot['name'] != 'current']
properties[self._fact('snapshots')] = snapshots
@ -509,11 +507,11 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
def _fact(self, name):
'''Generate a fact's full name from the common prefix and a name.'''
return self.to_safe('%s%s' % (self.facts_prefix, name.lower()))
return self.to_safe(f'{self.facts_prefix}{name.lower()}')
def _group(self, name):
'''Generate a group's full name from the common prefix and a name.'''
return self.to_safe('%s%s' % (self.group_prefix, name.lower()))
return self.to_safe(f'{self.group_prefix}{name.lower()}')
def _can_add_host(self, name, properties):
'''Ensure that a host satisfies all defined hosts filters. If strict mode is
@ -525,7 +523,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
if not self._compose(host_filter, properties):
return False
except Exception as e: # pylint: disable=broad-except
message = "Could not evaluate host filter %s for host %s - %s" % (host_filter, name, to_native(e))
message = f"Could not evaluate host filter {host_filter} for host {name} - {to_native(e)}"
if self.strict:
raise AnsibleError(message)
display.warning(message)
@ -566,8 +564,8 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
# add the host to the inventory
self._add_host(name, properties)
node_type_group = self._group('%s_%s' % (node, ittype))
self.inventory.add_child(self._group('all_' + ittype), name)
node_type_group = self._group(f'{node}_{ittype}')
self.inventory.add_child(self._group(f"all_{ittype}"), name)
self.inventory.add_child(node_type_group, name)
item_status = item['status']
@ -575,7 +573,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
if want_facts and ittype == 'qemu' and self.get_option('qemu_extended_statuses'):
# get more details about the status of the qemu VM
item_status = properties.get(self._fact('qmpstatus'), item_status)
self.inventory.add_child(self._group('all_%s' % (item_status, )), name)
self.inventory.add_child(self._group(f'all_{item_status}'), name)
return name
@ -586,7 +584,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
poolid = pool.get('poolid')
if not poolid:
continue
pool_group = self._group('pool_' + poolid)
pool_group = self._group(f"pool_{poolid}")
self.inventory.add_group(pool_group)
for member in self._get_members_per_pool(poolid):
@ -603,7 +601,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
default_groups.extend(['prelaunch', 'paused'])
for group in default_groups:
self.inventory.add_group(self._group('all_%s' % (group)))
self.inventory.add_group(self._group(f'all_{group}'))
nodes_group = self._group('nodes')
if not self.exclude_nodes:
self.inventory.add_group(nodes_group)
@ -636,7 +634,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
# add LXC/Qemu groups for the node
for ittype in ('lxc', 'qemu'):
node_type_group = self._group('%s_%s' % (node['node'], ittype))
node_type_group = self._group(f"{node['node']}_{ittype}")
self.inventory.add_group(node_type_group)
# get LXC containers and Qemu VMs for this node
@ -665,7 +663,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
v = self.get_option(o)
if self.templar.is_template(v):
v = self.templar.template(v, disable_lookups=False)
setattr(self, 'proxmox_%s' % o, v)
setattr(self, f'proxmox_{o}', v)
# some more cleanup and validation
self.proxmox_url = self.proxmox_url.rstrip('/')

View File

@ -140,7 +140,7 @@ def _fetch_information(token, url):
headers={'X-Auth-Token': token,
'Content-type': 'application/json'})
except Exception as e:
raise AnsibleError("Error while fetching %s: %s" % (url, to_native(e)))
raise AnsibleError(f"Error while fetching {url}: {to_native(e)}")
try:
raw_json = json.loads(to_text(response.read()))
except ValueError:
@ -161,7 +161,7 @@ def _fetch_information(token, url):
def _build_server_url(api_endpoint):
return "/".join([api_endpoint, "servers"])
return f"{api_endpoint}/servers"
def extract_public_ipv4(server_info):

View File

@ -139,7 +139,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
"Content-Type": "application/json",
}
resp = open_url(
self.api_host + '/identity/v1/oauth2/token',
f"{self.api_host}/identity/v1/oauth2/token",
headers=headers,
data=payload,
method="POST"
@ -155,16 +155,16 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
self._authenticate()
for stack_slug in self.stack_slugs:
try:
workloads = self._stackpath_query_get_list(self.api_host + '/workload/v1/stacks/' + stack_slug + '/workloads')
workloads = self._stackpath_query_get_list(f"{self.api_host}/workload/v1/stacks/{stack_slug}/workloads")
except Exception:
raise AnsibleError("Failed to get workloads from the StackPath API: %s" % traceback.format_exc())
raise AnsibleError(f"Failed to get workloads from the StackPath API: {traceback.format_exc()}")
for workload in workloads:
try:
workload_instances = self._stackpath_query_get_list(
self.api_host + '/workload/v1/stacks/' + stack_slug + '/workloads/' + workload["id"] + '/instances'
f"{self.api_host}/workload/v1/stacks/{stack_slug}/workloads/{workload['id']}/instances"
)
except Exception:
raise AnsibleError("Failed to get workload instances from the StackPath API: %s" % traceback.format_exc())
raise AnsibleError(f"Failed to get workload instances from the StackPath API: {traceback.format_exc()}")
for instance in workload_instances:
if instance["phase"] == "RUNNING":
instance["stackSlug"] = stack_slug
@ -184,7 +184,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
def _populate(self, instances):
for instance in instances:
for group_key in self.group_keys:
group = group_key + "_" + instance[group_key]
group = f"{group_key}_{instance[group_key]}"
group = group.lower().replace(" ", "_").replace("-", "_")
self.inventory.add_group(group)
self.inventory.add_host(instance[self.hostname_key],
@ -194,14 +194,14 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
self._authenticate()
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + self.auth_token,
"Authorization": f"Bearer {self.auth_token}",
}
next_page = True
result = []
cursor = '-1'
while next_page:
resp = open_url(
url + '?page_request.first=10&page_request.after=%s' % cursor,
f"{url}?page_request.first=10&page_request.after={cursor}",
headers=headers,
method="GET"
)
@ -251,10 +251,10 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
self.stack_slugs = self.get_option('stack_slugs')
if not self.stack_slugs:
try:
stacks = self._stackpath_query_get_list(self.api_host + '/stack/v1/stacks')
stacks = self._stackpath_query_get_list(f"{self.api_host}/stack/v1/stacks")
self._get_stack_slugs(stacks)
except Exception:
raise AnsibleError("Failed to get stack IDs from the Stackpath API: %s" % traceback.format_exc())
raise AnsibleError(f"Failed to get stack IDs from the Stackpath API: {traceback.format_exc()}")
cache_key = self.get_cache_key(path)
# false when refresh_cache or --flush-cache is used
@ -283,4 +283,4 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
if cache_needs_update or (not cache and self.get_option('cache')):
self._cache[cache_key] = results
except Exception:
raise AnsibleError("Failed to populate data: %s" % traceback.format_exc())
raise AnsibleError(f"Failed to populate data: {traceback.format_exc()}")

View File

@ -203,7 +203,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
else:
# found vars, accumulate in hostvars for clean inventory set
pref_k = make_unsafe('vbox_' + k.strip().replace(' ', '_'))
pref_k = make_unsafe(f"vbox_{k.strip().replace(' ', '_')}")
leading_spaces = len(k) - len(k.lstrip(' '))
if 0 < leading_spaces <= 2:
if prevkey not in hostvars[current_host] or not isinstance(hostvars[current_host][prevkey], dict):

View File

@ -138,7 +138,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
sslopt = None if validate_certs else {'cert_reqs': ssl.CERT_NONE}
self.conn = create_connection(
'{0}://{1}/api/'.format(proto, xoa_api_host), sslopt=sslopt)
f'{proto}://{xoa_api_host}/api/', sslopt=sslopt)
CALL_TIMEOUT = 100
"""Number of 1/10ths of a second to wait before method call times out."""
@ -162,8 +162,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
sleep(0.1)
waited += 1
raise AnsibleError(
'Method call {method} timed out after {timeout} seconds.'.format(method=method, timeout=self.CALL_TIMEOUT / 10))
raise AnsibleError(f'Method call {method} timed out after {self.CALL_TIMEOUT / 10} seconds.')
def login(self, user, password):
result = self.call('session.signIn', {
@ -171,15 +170,13 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
})
if 'error' in result:
raise AnsibleError(
'Could not connect: {0}'.format(result['error']))
raise AnsibleError(f"Could not connect: {result['error']}")
def get_object(self, name):
answer = self.call('xo.getAllObjects', {'filter': {'type': name}})
if 'error' in answer:
raise AnsibleError(
'Could not request: {0}'.format(answer['error']))
raise AnsibleError(f"Could not request: {answer['error']}")
return answer['result']
@ -252,8 +249,7 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
def _add_hosts(self, hosts, pools):
for host in hosts.values():
entry_name = host['uuid']
group_name = 'xo_host_{0}'.format(
clean_group_name(host['name_label']))
group_name = f"xo_host_{clean_group_name(host['name_label'])}"
pool_name = self._pool_group_name_for_uuid(pools, host['$poolId'])
self.inventory.add_group(group_name)
@ -276,15 +272,13 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
entry_name, 'product_brand', host['productBrand'])
for pool in pools.values():
group_name = 'xo_pool_{0}'.format(
clean_group_name(pool['name_label']))
group_name = f"xo_pool_{clean_group_name(pool['name_label'])}"
self.inventory.add_group(group_name)
def _add_pools(self, pools):
for pool in pools.values():
group_name = 'xo_pool_{0}'.format(
clean_group_name(pool['name_label']))
group_name = f"xo_pool_{clean_group_name(pool['name_label'])}"
self.inventory.add_group(group_name)
@ -292,16 +286,13 @@ class InventoryModule(BaseInventoryPlugin, Constructable, Cacheable):
def _pool_group_name_for_uuid(self, pools, pool_uuid):
for pool in pools:
if pool == pool_uuid:
return 'xo_pool_{0}'.format(
clean_group_name(pools[pool_uuid]['name_label']))
return f"xo_pool_{clean_group_name(pools[pool_uuid]['name_label'])}"
# TODO: Refactor
def _host_group_name_for_uuid(self, hosts, host_uuid):
for host in hosts:
if host == host_uuid:
return 'xo_host_{0}'.format(
clean_group_name(hosts[host_uuid]['name_label']
))
return f"xo_host_{clean_group_name(hosts[host_uuid]['name_label'])}"
def _populate(self, objects):
# Prepare general groups