ansible.utils/plugins/plugin_utils/index_of.py

199 lines
5.6 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2020 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The index_of plugin common code
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import json
from ansible.module_utils._text import to_native
from ansible.module_utils.six import integer_types, string_types
from jinja2.exceptions import TemplateSyntaxError
# Note, this file can only be used on the control node
# where ansible is installed
# limit imports to filter and lookup plugins
try:
from ansible.errors import AnsibleError
except ImportError:
pass
def _raise_error(msg):
"""Raise an error message, prepend with filter name
:param msg: The message
:type msg: str
:raises: AnsibleError
"""
error = "Error when using plugin 'index_of': {msg}".format(msg=msg)
raise AnsibleError(error)
def _list_to_and_str(lyst):
"""Convert a list to a command delimited string
with the last entry being an and
:param lyst: The list to turn into a str
:type lyst: list
:return: The nicely formatted string
:rtype: str
"""
res = "{most} and {last}".format(most=", ".join(lyst[:-1]), last=lyst[-1])
return res
def _to_well_known_type(obj):
"""Convert an ansible internal type to a well-known type
ie AnsibleUnicode => str
:param obj: the obj to convert
:type obj: unknown
"""
return json.loads(json.dumps(obj))
def _run_test(entry, test, right, tests):
"""Run a test
:param test: The test to run
:type test: a lambda from the qual_map
:param entry: The x for the lambda
:type entry: str int or bool
:param right: The y for the lambda
:type right: str int bool or list
:return: If the test passed
:rtype: book
"""
msg = (
"Error encountered when testing value "
"'{entry}' (type={entry_type}) against "
"'{right}' (type={right_type}) with '{test}'. "
).format(
entry=entry,
entry_type=type(_to_well_known_type(entry)).__name__,
right=right,
right_type=type(_to_well_known_type(entry)).__name__,
test=test,
)
if test.startswith("!"):
invert = True
test = test.lstrip("!")
if test == "=":
test = "=="
elif test.startswith("not "):
invert = True
test = test.lstrip("not ")
else:
invert = False
if not isinstance(right, list) and test == "in":
right = [right]
# JinjaPluginIntercept.get() raises an exception instead of returning None
# in ansible-core 2.15+
try:
j2_test = tests.get(test)
except TemplateSyntaxError:
j2_test = None
if not j2_test:
msg = "{msg} Error was: the test '{test}' was not found.".format(msg=msg, test=test)
_raise_error(msg)
try:
if right is None:
result = j2_test(entry)
else:
result = j2_test(entry, right)
except Exception as exc:
msg = "{msg} Error was: {error}".format(msg=msg, error=to_native(exc))
_raise_error(msg)
if invert:
result = not result
return result
def index_of(
data,
test,
value=None,
key=None,
wantlist=False,
fail_on_missing=False,
tests=None,
):
"""Find the index or indices of entries in list of objects"
:param data: The data passed in (data|index_of(...))
:type data: unknown
:param test: the test to use
:type test: jinja2 test
:param value: The value to use for the test
:type value: unknown
:param key: The key to use when a list of dicts is passed
:type key: valid key type
:param want_list: always return a list, even if 1 index
:type want_list: bool
:param fail_on_missing: Should we fail if key not found?
:type fail_on_missing: bool
:param tests: The jinja tests from the current environment
:type tests: ansible.template.JinjaPluginIntercept
"""
res = list()
if key is None:
for idx, entry in enumerate(data):
result = _run_test(entry, test, value, tests)
if result:
res.append(idx)
elif isinstance(key, (string_types, integer_types, bool)):
if not all(isinstance(entry, dict) for entry in data):
all_tipes = [type(_to_well_known_type(entry)).__name__ for entry in data]
msg = (
"When a key name is provided, all list entries are required to "
"be dictionaries, got {str_tipes}"
).format(str_tipes=_list_to_and_str(all_tipes))
_raise_error(msg)
errors = []
for idx, dyct in enumerate(data):
if key in dyct:
entry = dyct.get(key)
result = _run_test(entry, test, value, tests)
if result:
res.append(idx)
elif fail_on_missing:
msg = ("'{key}' was not found in '{dyct}' at [{index}]").format(
key=key,
dyct=dyct,
index=idx,
)
errors.append(msg)
if errors:
_raise_error(
("{errors}. fail_on_missing={fom}").format(
errors=_list_to_and_str(errors),
fom=str(fail_on_missing),
),
)
else:
msg = "Unknown key type, key ({key}) was a {type}. ".format(
key=key,
type=type(_to_well_known_type(key)).__name__,
)
_raise_error(msg)
if len(res) == 1 and not wantlist:
return res[0]
return res