199 lines
5.6 KiB
Python
199 lines
5.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2020 Red Hat
|
|
# GNU General Public License v3.0+
|
|
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
|
|
|
|
"""
|
|
The index_of plugin common code
|
|
"""
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
|
|
__metaclass__ = type
|
|
|
|
import json
|
|
|
|
from ansible.module_utils._text import to_native
|
|
from ansible.module_utils.six import integer_types, string_types
|
|
from jinja2.exceptions import TemplateSyntaxError
|
|
|
|
|
|
# Note, this file can only be used on the control node
|
|
# where ansible is installed
|
|
# limit imports to filter and lookup plugins
|
|
try:
|
|
from ansible.errors import AnsibleError
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
def _raise_error(msg):
|
|
"""Raise an error message, prepend with filter name
|
|
|
|
:param msg: The message
|
|
:type msg: str
|
|
:raises: AnsibleError
|
|
"""
|
|
error = "Error when using plugin 'index_of': {msg}".format(msg=msg)
|
|
raise AnsibleError(error)
|
|
|
|
|
|
def _list_to_and_str(lyst):
|
|
"""Convert a list to a command delimited string
|
|
with the last entry being an and
|
|
|
|
:param lyst: The list to turn into a str
|
|
:type lyst: list
|
|
:return: The nicely formatted string
|
|
:rtype: str
|
|
"""
|
|
res = "{most} and {last}".format(most=", ".join(lyst[:-1]), last=lyst[-1])
|
|
return res
|
|
|
|
|
|
def _to_well_known_type(obj):
|
|
"""Convert an ansible internal type to a well-known type
|
|
ie AnsibleUnicode => str
|
|
|
|
:param obj: the obj to convert
|
|
:type obj: unknown
|
|
"""
|
|
return json.loads(json.dumps(obj))
|
|
|
|
|
|
def _run_test(entry, test, right, tests):
|
|
"""Run a test
|
|
|
|
:param test: The test to run
|
|
:type test: a lambda from the qual_map
|
|
:param entry: The x for the lambda
|
|
:type entry: str int or bool
|
|
:param right: The y for the lambda
|
|
:type right: str int bool or list
|
|
:return: If the test passed
|
|
:rtype: book
|
|
"""
|
|
msg = (
|
|
"Error encountered when testing value "
|
|
"'{entry}' (type={entry_type}) against "
|
|
"'{right}' (type={right_type}) with '{test}'. "
|
|
).format(
|
|
entry=entry,
|
|
entry_type=type(_to_well_known_type(entry)).__name__,
|
|
right=right,
|
|
right_type=type(_to_well_known_type(entry)).__name__,
|
|
test=test,
|
|
)
|
|
|
|
if test.startswith("!"):
|
|
invert = True
|
|
test = test.lstrip("!")
|
|
if test == "=":
|
|
test = "=="
|
|
elif test.startswith("not "):
|
|
invert = True
|
|
test = test.lstrip("not ")
|
|
else:
|
|
invert = False
|
|
|
|
if not isinstance(right, list) and test == "in":
|
|
right = [right]
|
|
|
|
# JinjaPluginIntercept.get() raises an exception instead of returning None
|
|
# in ansible-core 2.15+
|
|
try:
|
|
j2_test = tests.get(test)
|
|
except TemplateSyntaxError:
|
|
j2_test = None
|
|
|
|
if not j2_test:
|
|
msg = "{msg} Error was: the test '{test}' was not found.".format(msg=msg, test=test)
|
|
_raise_error(msg)
|
|
|
|
try:
|
|
if right is None:
|
|
result = j2_test(entry)
|
|
else:
|
|
result = j2_test(entry, right)
|
|
except Exception as exc:
|
|
msg = "{msg} Error was: {error}".format(msg=msg, error=to_native(exc))
|
|
_raise_error(msg)
|
|
|
|
if invert:
|
|
result = not result
|
|
return result
|
|
|
|
|
|
def index_of(
|
|
data,
|
|
test,
|
|
value=None,
|
|
key=None,
|
|
wantlist=False,
|
|
fail_on_missing=False,
|
|
tests=None,
|
|
):
|
|
"""Find the index or indices of entries in list of objects"
|
|
|
|
:param data: The data passed in (data|index_of(...))
|
|
:type data: unknown
|
|
:param test: the test to use
|
|
:type test: jinja2 test
|
|
:param value: The value to use for the test
|
|
:type value: unknown
|
|
:param key: The key to use when a list of dicts is passed
|
|
:type key: valid key type
|
|
:param want_list: always return a list, even if 1 index
|
|
:type want_list: bool
|
|
:param fail_on_missing: Should we fail if key not found?
|
|
:type fail_on_missing: bool
|
|
:param tests: The jinja tests from the current environment
|
|
:type tests: ansible.template.JinjaPluginIntercept
|
|
"""
|
|
res = list()
|
|
if key is None:
|
|
for idx, entry in enumerate(data):
|
|
result = _run_test(entry, test, value, tests)
|
|
if result:
|
|
res.append(idx)
|
|
|
|
elif isinstance(key, (string_types, integer_types, bool)):
|
|
if not all(isinstance(entry, dict) for entry in data):
|
|
all_tipes = [type(_to_well_known_type(entry)).__name__ for entry in data]
|
|
msg = (
|
|
"When a key name is provided, all list entries are required to "
|
|
"be dictionaries, got {str_tipes}"
|
|
).format(str_tipes=_list_to_and_str(all_tipes))
|
|
_raise_error(msg)
|
|
errors = []
|
|
for idx, dyct in enumerate(data):
|
|
if key in dyct:
|
|
entry = dyct.get(key)
|
|
result = _run_test(entry, test, value, tests)
|
|
if result:
|
|
res.append(idx)
|
|
elif fail_on_missing:
|
|
msg = ("'{key}' was not found in '{dyct}' at [{index}]").format(
|
|
key=key,
|
|
dyct=dyct,
|
|
index=idx,
|
|
)
|
|
errors.append(msg)
|
|
if errors:
|
|
_raise_error(
|
|
("{errors}. fail_on_missing={fom}").format(
|
|
errors=_list_to_and_str(errors),
|
|
fom=str(fail_on_missing),
|
|
),
|
|
)
|
|
else:
|
|
msg = "Unknown key type, key ({key}) was a {type}. ".format(
|
|
key=key,
|
|
type=type(_to_well_known_type(key)).__name__,
|
|
)
|
|
_raise_error(msg)
|
|
if len(res) == 1 and not wantlist:
|
|
return res[0]
|
|
return res
|