action plugins: use f-strings (#9318)

* action plugins: use f-strings

* add changelog frag

* adjustment from review
pull/8437/merge
Alexei Znamensky 2024-12-23 23:21:25 +13:00 committed by GitHub
parent 1d8f0b2942
commit 79bef1a14c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 62 additions and 61 deletions

View File

@ -0,0 +1,3 @@
minor_changes:
- iptables_state action plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9318).
- shutdown action plugin - use f-strings instead of interpolations or ``format`` (https://github.com/ansible-collections/community.general/pull/9318).

View File

@ -22,25 +22,33 @@ class ActionModule(ActionBase):
_VALID_ARGS = frozenset(('path', 'state', 'table', 'noflush', 'counters', 'modprobe', 'ip_version', 'wait')) _VALID_ARGS = frozenset(('path', 'state', 'table', 'noflush', 'counters', 'modprobe', 'ip_version', 'wait'))
DEFAULT_SUDOABLE = True DEFAULT_SUDOABLE = True
MSG_ERROR__ASYNC_AND_POLL_NOT_ZERO = ( @staticmethod
def msg_error__async_and_poll_not_zero(task_poll, task_async, max_timeout):
return (
"This module doesn't support async>0 and poll>0 when its 'state' param " "This module doesn't support async>0 and poll>0 when its 'state' param "
"is set to 'restored'. To enable its rollback feature (that needs the " "is set to 'restored'. To enable its rollback feature (that needs the "
"module to run asynchronously on the remote), please set task attribute " "module to run asynchronously on the remote), please set task attribute "
"'poll' (=%s) to 0, and 'async' (=%s) to a value >2 and not greater than " f"'poll' (={task_poll}) to 0, and 'async' (={task_async}) to a value >2 and not greater than "
"'ansible_timeout' (=%s) (recommended).") f"'ansible_timeout' (={max_timeout}) (recommended).")
MSG_WARNING__NO_ASYNC_IS_NO_ROLLBACK = (
@staticmethod
def msg_warning__no_async_is_no_rollback(task_poll, task_async, max_timeout):
return (
"Attempts to restore iptables state without rollback in case of mistake " "Attempts to restore iptables state without rollback in case of mistake "
"may lead the ansible controller to loose access to the hosts and never " "may lead the ansible controller to loose access to the hosts and never "
"regain it before fixing firewall rules through a serial console, or any " "regain it before fixing firewall rules through a serial console, or any "
"other way except SSH. Please set task attribute 'poll' (=%s) to 0, and " f"other way except SSH. Please set task attribute 'poll' (={task_poll}) to 0, and "
"'async' (=%s) to a value >2 and not greater than 'ansible_timeout' (=%s) " f"'async' (={task_async}) to a value >2 and not greater than 'ansible_timeout' (={max_timeout}) "
"(recommended).") "(recommended).")
MSG_WARNING__ASYNC_GREATER_THAN_TIMEOUT = (
@staticmethod
def msg_warning__async_greater_than_timeout(task_poll, task_async, max_timeout):
return (
"You attempt to restore iptables state with rollback in case of mistake, " "You attempt to restore iptables state with rollback in case of mistake, "
"but with settings that will lead this rollback to happen AFTER that the " "but with settings that will lead this rollback to happen AFTER that the "
"controller will reach its own timeout. Please set task attribute 'poll' " "controller will reach its own timeout. Please set task attribute 'poll' "
"(=%s) to 0, and 'async' (=%s) to a value >2 and not greater than " f"(={task_poll}) to 0, and 'async' (={task_async}) to a value >2 and not greater than "
"'ansible_timeout' (=%s) (recommended).") f"'ansible_timeout' (={max_timeout}) (recommended).")
def _async_result(self, async_status_args, task_vars, timeout): def _async_result(self, async_status_args, task_vars, timeout):
''' '''
@ -95,18 +103,18 @@ class ActionModule(ActionBase):
if module_args.get('state', None) == 'restored': if module_args.get('state', None) == 'restored':
if not wrap_async: if not wrap_async:
if not check_mode: if not check_mode:
display.warning(self.MSG_WARNING__NO_ASYNC_IS_NO_ROLLBACK % ( display.warning(self.msg_error__async_and_poll_not_zero(
task_poll, task_poll,
task_async, task_async,
max_timeout)) max_timeout))
elif task_poll: elif task_poll:
raise AnsibleActionFail(self.MSG_ERROR__ASYNC_AND_POLL_NOT_ZERO % ( raise AnsibleActionFail(self.msg_warning__no_async_is_no_rollback(
task_poll, task_poll,
task_async, task_async,
max_timeout)) max_timeout))
else: else:
if task_async > max_timeout and not check_mode: if task_async > max_timeout and not check_mode:
display.warning(self.MSG_WARNING__ASYNC_GREATER_THAN_TIMEOUT % ( display.warning(self.msg_warning__async_greater_than_timeout(
task_poll, task_poll,
task_async, task_async,
max_timeout)) max_timeout))
@ -119,10 +127,10 @@ class ActionModule(ActionBase):
# remote and local sides (if not the same, make the loop # remote and local sides (if not the same, make the loop
# longer on the controller); and set a backup file path. # longer on the controller); and set a backup file path.
module_args['_timeout'] = task_async module_args['_timeout'] = task_async
module_args['_back'] = '%s/iptables.state' % async_dir module_args['_back'] = f'{async_dir}/iptables.state'
async_status_args = dict(mode='status') async_status_args = dict(mode='status')
confirm_cmd = 'rm -f %s' % module_args['_back'] confirm_cmd = f"rm -f {module_args['_back']}"
starter_cmd = 'touch %s.starter' % module_args['_back'] starter_cmd = f"touch {module_args['_back']}.starter"
remaining_time = max(task_async, max_timeout) remaining_time = max(task_async, max_timeout)
# do work! # do work!

View File

@ -18,6 +18,10 @@ from ansible.utils.display import Display
display = Display() display = Display()
def fmt(mapping, key):
return to_native(mapping[key]).strip()
class TimedOutException(Exception): class TimedOutException(Exception):
pass pass
@ -84,31 +88,26 @@ class ActionModule(ActionBase):
def get_distribution(self, task_vars): def get_distribution(self, task_vars):
# FIXME: only execute the module if we don't already have the facts we need # FIXME: only execute the module if we don't already have the facts we need
distribution = {} distribution = {}
display.debug('{action}: running setup module to get distribution'.format(action=self._task.action)) display.debug(f'{self._task.action}: running setup module to get distribution')
module_output = self._execute_module( module_output = self._execute_module(
task_vars=task_vars, task_vars=task_vars,
module_name='ansible.legacy.setup', module_name='ansible.legacy.setup',
module_args={'gather_subset': 'min'}) module_args={'gather_subset': 'min'})
try: try:
if module_output.get('failed', False): if module_output.get('failed', False):
raise AnsibleError('Failed to determine system distribution. {0}, {1}'.format( raise AnsibleError(f"Failed to determine system distribution. {fmt(module_output, 'module_stdout')}, {fmt(module_output, 'module_stderr')}")
to_native(module_output['module_stdout']).strip(),
to_native(module_output['module_stderr']).strip()))
distribution['name'] = module_output['ansible_facts']['ansible_distribution'].lower() distribution['name'] = module_output['ansible_facts']['ansible_distribution'].lower()
distribution['version'] = to_text( distribution['version'] = to_text(
module_output['ansible_facts']['ansible_distribution_version'].split('.')[0]) module_output['ansible_facts']['ansible_distribution_version'].split('.')[0])
distribution['family'] = to_text(module_output['ansible_facts']['ansible_os_family'].lower()) distribution['family'] = to_text(module_output['ansible_facts']['ansible_os_family'].lower())
display.debug("{action}: distribution: {dist}".format(action=self._task.action, dist=distribution)) display.debug(f"{self._task.action}: distribution: {distribution}")
return distribution return distribution
except KeyError as ke: except KeyError as ke:
raise AnsibleError('Failed to get distribution information. Missing "{0}" in output.'.format(ke.args[0])) raise AnsibleError(f'Failed to get distribution information. Missing "{ke.args[0]}" in output.')
def get_shutdown_command(self, task_vars, distribution): def get_shutdown_command(self, task_vars, distribution):
def find_command(command, find_search_paths): def find_command(command, find_search_paths):
display.debug('{action}: running find module looking in {paths} to get path for "{command}"'.format( display.debug(f'{self._task.action}: running find module looking in {find_search_paths} to get path for "{command}"')
action=self._task.action,
command=command,
paths=find_search_paths))
find_result = self._execute_module( find_result = self._execute_module(
task_vars=task_vars, task_vars=task_vars,
# prevent collection search by calling with ansible.legacy (still allows library/ override of find) # prevent collection search by calling with ansible.legacy (still allows library/ override of find)
@ -130,42 +129,37 @@ class ActionModule(ActionBase):
if is_string(search_paths): if is_string(search_paths):
search_paths = [search_paths] search_paths = [search_paths]
# Error if we didn't get a list
err_msg = "'search_paths' must be a string or flat list of strings, got {0}"
try: try:
incorrect_type = any(not is_string(x) for x in search_paths) incorrect_type = any(not is_string(x) for x in search_paths)
if not isinstance(search_paths, list) or incorrect_type: if not isinstance(search_paths, list) or incorrect_type:
raise TypeError raise TypeError
except TypeError: except TypeError:
raise AnsibleError(err_msg.format(search_paths)) # Error if we didn't get a list
err_msg = f"'search_paths' must be a string or flat list of strings, got {search_paths}"
raise AnsibleError(err_msg)
full_path = find_command(shutdown_bin, search_paths) # find the path to the shutdown command full_path = find_command(shutdown_bin, search_paths) # find the path to the shutdown command
if not full_path: # if we could not find the shutdown command if not full_path: # if we could not find the shutdown command
display.vvv('Unable to find command "{0}" in search paths: {1}, will attempt a shutdown using systemd '
'directly.'.format(shutdown_bin, search_paths)) # tell the user we will try with systemd # tell the user we will try with systemd
display.vvv(f'Unable to find command "{shutdown_bin}" in search paths: {search_paths}, will attempt a shutdown using systemd directly.')
systemctl_search_paths = ['/bin', '/usr/bin'] systemctl_search_paths = ['/bin', '/usr/bin']
full_path = find_command('systemctl', systemctl_search_paths) # find the path to the systemctl command full_path = find_command('systemctl', systemctl_search_paths) # find the path to the systemctl command
if not full_path: # if we couldn't find systemctl if not full_path: # if we couldn't find systemctl
raise AnsibleError( raise AnsibleError(
'Could not find command "{0}" in search paths: {1} or systemctl command in search paths: {2}, unable to shutdown.'. f'Could not find command "{shutdown_bin}" in search paths: {search_paths} or systemctl'
format(shutdown_bin, search_paths, systemctl_search_paths)) # we give up here f' command in search paths: {systemctl_search_paths}, unable to shutdown.') # we give up here
else: else:
return "{0} poweroff".format(full_path[0]) # done, since we cannot use args with systemd shutdown return f"{full_path[0]} poweroff" # done, since we cannot use args with systemd shutdown
# systemd case taken care of, here we add args to the command # systemd case taken care of, here we add args to the command
args = self._get_value_from_facts('SHUTDOWN_COMMAND_ARGS', distribution, 'DEFAULT_SHUTDOWN_COMMAND_ARGS') args = self._get_value_from_facts('SHUTDOWN_COMMAND_ARGS', distribution, 'DEFAULT_SHUTDOWN_COMMAND_ARGS')
# Convert seconds to minutes. If less that 60, set it to 0. # Convert seconds to minutes. If less that 60, set it to 0.
delay_sec = self.delay delay_sec = self.delay
shutdown_message = self._task.args.get('msg', self.DEFAULT_SHUTDOWN_MESSAGE) shutdown_message = self._task.args.get('msg', self.DEFAULT_SHUTDOWN_MESSAGE)
return '{0} {1}'. \
format( af = args.format(delay_sec=delay_sec, delay_min=delay_sec // 60, message=shutdown_message)
full_path[0], return f'{full_path[0]} {af}'
args.format(
delay_sec=delay_sec,
delay_min=delay_sec // 60,
message=shutdown_message
)
)
def perform_shutdown(self, task_vars, distribution): def perform_shutdown(self, task_vars, distribution):
result = {} result = {}
@ -174,9 +168,8 @@ class ActionModule(ActionBase):
self.cleanup(force=True) self.cleanup(force=True)
try: try:
display.vvv("{action}: shutting down server...".format(action=self._task.action)) display.vvv(f"{self._task.action}: shutting down server...")
display.debug("{action}: shutting down server with command '{command}'". display.debug(f"{self._task.action}: shutting down server with command '{shutdown_command_exec}'")
format(action=self._task.action, command=shutdown_command_exec))
if self._play_context.check_mode: if self._play_context.check_mode:
shutdown_result['rc'] = 0 shutdown_result['rc'] = 0
else: else:
@ -184,16 +177,13 @@ class ActionModule(ActionBase):
except AnsibleConnectionFailure as e: except AnsibleConnectionFailure as e:
# If the connection is closed too quickly due to the system being shutdown, carry on # If the connection is closed too quickly due to the system being shutdown, carry on
display.debug( display.debug(
'{action}: AnsibleConnectionFailure caught and handled: {error}'.format(action=self._task.action, f'{self._task.action}: AnsibleConnectionFailure caught and handled: {to_text(e)}')
error=to_text(e)))
shutdown_result['rc'] = 0 shutdown_result['rc'] = 0
if shutdown_result['rc'] != 0: if shutdown_result['rc'] != 0:
result['failed'] = True result['failed'] = True
result['shutdown'] = False result['shutdown'] = False
result['msg'] = "Shutdown command failed. Error was {stdout}, {stderr}".format( result['msg'] = f"Shutdown command failed. Error was {fmt(shutdown_result, 'stdout')}, {fmt(shutdown_result, 'stderr')}"
stdout=to_native(shutdown_result['stdout'].strip()),
stderr=to_native(shutdown_result['stderr'].strip()))
return result return result
result['failed'] = False result['failed'] = False
@ -206,7 +196,7 @@ class ActionModule(ActionBase):
# If running with local connection, fail so we don't shutdown ourself # If running with local connection, fail so we don't shutdown ourself
if self._connection.transport == 'local' and (not self._play_context.check_mode): if self._connection.transport == 'local' and (not self._play_context.check_mode):
msg = 'Running {0} with local connection would shutdown the control node.'.format(self._task.action) msg = f'Running {self._task.action} with local connection would shutdown the control node.'
return {'changed': False, 'elapsed': 0, 'shutdown': False, 'failed': True, 'msg': msg} return {'changed': False, 'elapsed': 0, 'shutdown': False, 'failed': True, 'msg': msg}
if task_vars is None: if task_vars is None: