350 lines
13 KiB
Python
350 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2020 Red Hat
|
|
# GNU General Public License v3.0+
|
|
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
|
|
__metaclass__ = type
|
|
|
|
import copy
|
|
|
|
from unittest import TestCase
|
|
|
|
from ansible.playbook.task import Task
|
|
from ansible.template import Templar
|
|
from jinja2 import Template, TemplateSyntaxError
|
|
|
|
from ansible_collections.ansible.utils.plugins.action.update_fact import ActionModule
|
|
|
|
|
|
try:
|
|
from unittest.mock import MagicMock # pylint:disable=syntax-error
|
|
except ImportError:
|
|
from mock import MagicMock # pyright: ignore[reportMissingModuleSource]
|
|
|
|
|
|
VALID_DATA = {
|
|
"a": {
|
|
"b": {"4.4": [{"1": {5: {"foo": 123}}}], 5.5: "float5.5"},
|
|
"127.0.0.1": "localhost",
|
|
},
|
|
}
|
|
|
|
VALID_TESTS = [
|
|
{
|
|
"path": 'a.b["4.4"][0]["1"].5[\'foo\']',
|
|
"split": ["a", "b", "4.4", 0, "1", 5, "foo"],
|
|
"template_result": "123",
|
|
},
|
|
{
|
|
"path": 'a.b["4.4"][0]["1"].5[\'foo\']',
|
|
"split": ["a", "b", "4.4", 0, "1", 5, "foo"],
|
|
"template_result": "123",
|
|
},
|
|
{
|
|
"path": "a.b[5.5]",
|
|
"split": ["a", "b", 5.5],
|
|
"template_result": "float5.5",
|
|
},
|
|
{
|
|
"path": "a['127.0.0.1']",
|
|
"split": ["a", "127.0.0.1"],
|
|
"template_result": "localhost",
|
|
},
|
|
{
|
|
"path": "a.b['4.4'].0['1'].5['foo']",
|
|
"split": ["a", "b", "4.4", 0, "1", 5, "foo"],
|
|
"template_result": "123",
|
|
},
|
|
]
|
|
|
|
|
|
INVALID_JINJA = [
|
|
{
|
|
"path": "a.'1'",
|
|
"note": "quoted values are required to be in brackets",
|
|
"error": "expected name or number",
|
|
},
|
|
{
|
|
"path": "a.[1]",
|
|
"note": "brackets can't follow dots",
|
|
"error": "expected name or number",
|
|
},
|
|
{
|
|
"path": 'a.b["4.4"][0]["1"]."5"[\'foo\']',
|
|
"note": "quoted values are required to be in brackets",
|
|
"error": "expected name or number",
|
|
},
|
|
]
|
|
|
|
|
|
class TestUpdate_Fact(TestCase):
|
|
def setUp(self):
|
|
task = MagicMock(Task)
|
|
# Ansible > 2.13 looks for check_mode in task
|
|
task.check_mode = False
|
|
play_context = MagicMock()
|
|
# Ansible <= 2.13 looks for check_mode in play_context
|
|
play_context.check_mode = False
|
|
connection = MagicMock()
|
|
fake_loader = {}
|
|
templar = Templar(loader=fake_loader)
|
|
self._plugin = ActionModule(
|
|
task=task,
|
|
connection=connection,
|
|
play_context=play_context,
|
|
loader=fake_loader,
|
|
templar=templar,
|
|
shared_loader_obj=None,
|
|
)
|
|
self._plugin._task.action = "update_fact"
|
|
|
|
def test_argspec_no_updates(self):
|
|
"""Check passing invalid argspec"""
|
|
self._plugin._task.args = {"a": 10}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars=None)
|
|
self.assertIn("missing required arguments: updates", str(error.exception))
|
|
|
|
def test_argspec_none(self):
|
|
"""Check passing a dict"""
|
|
self._plugin._task.args = {}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars=None)
|
|
self.assertIn("missing required arguments: updates", str(error.exception))
|
|
|
|
def test_valid_jinja(self):
|
|
for test in VALID_TESTS:
|
|
tmplt = Template("{{" + test["path"] + "}}")
|
|
result = tmplt.render(VALID_DATA)
|
|
self.assertEqual(result, test["template_result"])
|
|
|
|
def test_invalid_jinja(self):
|
|
for test in INVALID_JINJA:
|
|
with self.assertRaises(TemplateSyntaxError) as error:
|
|
Template("{{" + test["path"] + "}}")
|
|
self.assertIn(test["error"], str(error.exception))
|
|
|
|
def test_fields(self):
|
|
"""Check the parsing of a path into it's parts"""
|
|
for stest in VALID_TESTS:
|
|
result = self._plugin._field_split(stest["path"])
|
|
self.assertEqual(result, stest["split"])
|
|
|
|
def test_missing_var(self):
|
|
"""Check for a missing fact"""
|
|
self._plugin._task.args = {"updates": [{"path": "a.b.c", "value": 5}]}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars={"vars": {}})
|
|
self.assertIn("'a' was not found in the current facts.", str(error.exception))
|
|
|
|
def test_run_simple(self):
|
|
"""Confirm a valid argspec passes"""
|
|
task_vars = {"vars": {"a": {"b": [1, 2, 3]}}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"]["b"] = 5
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {"updates": [{"path": "a.b", "value": 5}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_multiple(self):
|
|
"""Confirm multiple paths passes"""
|
|
task_vars = {"vars": {"a": {"b1": [1, 2, 3], "b2": {"c": "123", "d": False}}}}
|
|
expected = {"a": {"b1": [1, 2, 3, 4], "b2": {"c": 456, "d": True}}}
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {
|
|
"updates": [
|
|
{"path": "a.b1.3", "value": 4},
|
|
{"path": "a.b2.c", "value": 456},
|
|
{"path": "a.b2.d", "value": True},
|
|
],
|
|
}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_replace_in_list(self):
|
|
"""Replace in list"""
|
|
task_vars = {"vars": {"a": {"b": [1, 2, 3]}}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"]["b"][1] = 5
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {"updates": [{"path": "a.b.1", "value": 5}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_append_to_list(self):
|
|
"""Append to list"""
|
|
task_vars = {"vars": {"a": {"b": [1, 2, 3]}}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"]["b"].append(4)
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {"updates": [{"path": "a.b.3", "value": 4}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_bracket_single_quote(self):
|
|
"""Bracket notation sigle quote"""
|
|
task_vars = {"vars": {"a": {"b": [1, 2, 3]}}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"]["b"].append(4)
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {"updates": [{"path": "a['b'][3]", "value": 4}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_bracket_double_quote(self):
|
|
"""Bracket notation double quote"""
|
|
task_vars = {"vars": {"a": {"b": [1, 2, 3]}}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"]["b"].append(4)
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {"updates": [{"path": 'a["b"][3]', "value": 4}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_int_dict_keys(self):
|
|
"""Integer dict keys"""
|
|
task_vars = {"vars": {"a": {0: [1, 2, 3]}}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"][0][0] = 0
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {"updates": [{"path": "a.0.0", "value": 0}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_int_as_string(self):
|
|
"""Integer dict keys as string"""
|
|
task_vars = {"vars": {"a": {"0": [1, 2, 3]}}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"]["0"][0] = 0
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {"updates": [{"path": 'a["0"].0', "value": 0}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_invalid_path_quote_after_dot(self):
|
|
"""Invalid path format"""
|
|
self._plugin._task.args = {"updates": [{"path": "a.'b'", "value": 0}]}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars={"vars": {}})
|
|
self.assertIn("malformed", str(error.exception))
|
|
|
|
def test_run_invalid_path_bracket_after_dot(self):
|
|
"""Invalid path format"""
|
|
self._plugin._task.args = {"updates": [{"path": "a.['b']", "value": 0}]}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars={"vars": {}})
|
|
self.assertIn("malformed", str(error.exception))
|
|
|
|
def test_run_invalid_key_start_with_dot(self):
|
|
"""Invalid key format"""
|
|
self._plugin._task.args = {"updates": [{"path": ".abc", "value": 0}]}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars={"vars": {}})
|
|
self.assertIn("malformed", str(error.exception))
|
|
|
|
def test_run_no_update_list(self):
|
|
"""Confirm no change when same"""
|
|
task_vars = {"vars": {"a": {"b": [1, 2, 3]}}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"]["b"] = [1, 2, 3]
|
|
expected.update({"changed": False})
|
|
self._plugin._task.args = {"updates": [{"path": "a.b.0", "value": 1}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_no_update_dict(self):
|
|
"""Confirm no change when same"""
|
|
task_vars = {"vars": {"a": {"b": [1, 2, 3]}}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"]["b"] = [1, 2, 3]
|
|
expected.update({"changed": False})
|
|
self._plugin._task.args = {"updates": [{"path": "a.b", "value": [1, 2, 3]}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_missing_key(self):
|
|
"""Confirm error when key not found"""
|
|
task_vars = {"vars": {"a": {"b": 1}}}
|
|
self._plugin._task.args = {"updates": [{"path": "a.c.d", "value": 1}]}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars=task_vars)
|
|
self.assertIn("the key 'c' was not found", str(error.exception))
|
|
|
|
def test_run_list_not_int(self):
|
|
"""Confirm error when key not found"""
|
|
task_vars = {"vars": {"a": {"b": [1]}}}
|
|
self._plugin._task.args = {"updates": [{"path": "a.b['0']", "value": 2}]}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars=task_vars)
|
|
self.assertIn("index provided was not an integer", str(error.exception))
|
|
|
|
def test_run_list_not_long(self):
|
|
"""List not long enough"""
|
|
task_vars = {"vars": {"a": {"b": [0]}}}
|
|
self._plugin._task.args = {"updates": [{"path": "a.b.2", "value": 2}]}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars=task_vars)
|
|
self.assertIn("not long enough for item #2 to be set", str(error.exception))
|
|
|
|
def test_not_mutable_sequence_or_mapping(self):
|
|
"""Confirm graceful fail when immutable object
|
|
This should never happen in the real world
|
|
"""
|
|
obj = {"a": frozenset([1, 2, 3])}
|
|
path = ["a", 0]
|
|
val = 9
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.set_value(obj, path, val)
|
|
self.assertIn("can only modify mutable objects", str(error.exception))
|
|
|
|
def test_run_not_dotted_success_one(self):
|
|
"""Test with a not dotted key"""
|
|
task_vars = {"vars": {"a": 0}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"] = 1
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {"updates": [{"path": "a", "value": 1}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_not_dotted_success_three(self):
|
|
"""Test with a not dotted key longer"""
|
|
task_vars = {"vars": {"abc": 0}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["abc"] = 1
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {"updates": [{"path": "abc", "value": 1}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_not_dotted_fail_missing(self):
|
|
"""Test with a not dotted key, missing"""
|
|
task_vars = {"vars": {"abc": 0}}
|
|
self._plugin._task.args = {"updates": [{"path": "123", "value": 1}]}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars=task_vars)
|
|
self.assertIn("'123' was not found in the current facts", str(error.exception))
|
|
|
|
def test_run_not_dotted_success_same(self):
|
|
"""Test with a not dotted key, no change"""
|
|
task_vars = {"vars": {"a": 0}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected.update({"changed": False})
|
|
self._plugin._task.args = {"updates": [{"path": "a", "value": 0}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_run_looks_like_a_bool(self):
|
|
"""Test with a key that looks like a bool"""
|
|
task_vars = {"vars": {"a": {"True": 0}}}
|
|
expected = copy.deepcopy(task_vars["vars"])
|
|
expected["a"]["True"] = 1
|
|
expected.update({"changed": True})
|
|
self._plugin._task.args = {"updates": [{"path": "a['True']", "value": 1}]}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result, expected)
|