538 lines
21 KiB
Python
538 lines
21 KiB
Python
# (c) 2020 Ansible Project
|
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
|
|
__metaclass__ = type
|
|
|
|
import os
|
|
import tempfile
|
|
|
|
from unittest import TestCase
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from ansible.module_utils.connection import ConnectionError as AnsibleConnectionError
|
|
from ansible.playbook.task import Task
|
|
from ansible.template import Templar
|
|
|
|
from ansible_collections.ansible.utils.plugins.action.cli_parse import (
|
|
ARGSPEC_CONDITIONALS,
|
|
ActionModule,
|
|
)
|
|
from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import (
|
|
check_argspec,
|
|
)
|
|
from ansible_collections.ansible.utils.plugins.modules.cli_parse import DOCUMENTATION
|
|
from ansible_collections.ansible.utils.plugins.plugin_utils.base.cli_parser import CliParserBase
|
|
from ansible_collections.ansible.utils.tests.unit.mock.loader import DictDataLoader
|
|
|
|
|
|
class TestCli_Parse(TestCase):
|
|
def setUp(self):
|
|
task = MagicMock(Task)
|
|
play_context = MagicMock()
|
|
play_context.check_mode = False
|
|
connection = MagicMock()
|
|
fake_loader = DictDataLoader({})
|
|
templar = Templar(loader=fake_loader)
|
|
self._plugin = ActionModule(
|
|
task=task,
|
|
connection=connection,
|
|
play_context=play_context,
|
|
loader=fake_loader,
|
|
templar=templar,
|
|
shared_loader_obj=None,
|
|
)
|
|
self._plugin._task.action = "cli_parse"
|
|
|
|
@staticmethod
|
|
def _load_fixture(filename):
|
|
"""Load a fixture from the filesystem
|
|
|
|
:param filename: The name of the file to load
|
|
:type filename: str
|
|
:return: The file contents
|
|
:rtype: str
|
|
"""
|
|
fixture_name = os.path.join(os.path.dirname(__file__), "fixtures", filename)
|
|
with open(fixture_name) as fhand:
|
|
return fhand.read()
|
|
|
|
def test_fn_debug(self):
|
|
"""Confirm debug doesn't fail and return None"""
|
|
msg = "some message"
|
|
result = self._plugin._debug(msg)
|
|
self.assertEqual(result, None)
|
|
|
|
def test_fn_ail_json(self):
|
|
"""Confirm fail json replaces basic.py in msg"""
|
|
msg = "text (basic.py)"
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin._fail_json(msg)
|
|
self.assertEqual("text cli_parse", str(error.exception))
|
|
|
|
def test_fn_check_argspec_pass(self):
|
|
"""Confirm a valid argspec passes"""
|
|
kwargs = {
|
|
"text": "text",
|
|
"parser": {
|
|
"name": "ansible.utils.textfsm",
|
|
"command": "show version",
|
|
},
|
|
}
|
|
valid, result, updated_params = check_argspec(
|
|
DOCUMENTATION,
|
|
"cli_parse module",
|
|
schema_conditionals={},
|
|
**kwargs,
|
|
)
|
|
self.assertEqual(valid, True)
|
|
|
|
def test_fn_check_argspec_fail_no_test_or_command(self):
|
|
"""Confirm failed argpsec w/o text or command"""
|
|
kwargs = {
|
|
"parser": {
|
|
"name": "ansible.utils.textfsm",
|
|
"command": "show version",
|
|
},
|
|
}
|
|
valid, result, updated_params = check_argspec(
|
|
DOCUMENTATION,
|
|
"cli_parse module",
|
|
schema_conditionals=ARGSPEC_CONDITIONALS,
|
|
**kwargs,
|
|
)
|
|
|
|
self.assertIn("one of the following is required: command, text", result["errors"])
|
|
|
|
def test_fn_check_argspec_fail_no_parser_name(self):
|
|
"""Confirm failed argspec no parser name"""
|
|
kwargs = {"text": "anything", "parser": {"command": "show version"}}
|
|
valid, result, updated_params = check_argspec(
|
|
DOCUMENTATION,
|
|
"cli_parse module",
|
|
schema_conditionals=ARGSPEC_CONDITIONALS,
|
|
**kwargs,
|
|
)
|
|
self.assertIn(
|
|
"missing required arguments: name found in parser",
|
|
result["errors"],
|
|
)
|
|
|
|
def test_fn_extended_check_argspec_parser_name_not_coll(self):
|
|
"""Confirm failed argpsec parser not collection format"""
|
|
self._plugin._task.args = {
|
|
"text": "anything",
|
|
"parser": {
|
|
"command": "show version",
|
|
"name": "not_collection_format",
|
|
},
|
|
}
|
|
self._plugin._extended_check_argspec()
|
|
self.assertTrue(self._plugin._result["failed"])
|
|
self.assertIn("including collection", self._plugin._result["msg"])
|
|
|
|
def test_fn_extended_check_argspec_missing_tpath_or_command(self):
|
|
"""Confirm failed argpsec missing template_path
|
|
or command when text provided
|
|
"""
|
|
self._plugin._task.args = {
|
|
"text": "anything",
|
|
"parser": {"name": "a.b.c"},
|
|
}
|
|
self._plugin._extended_check_argspec()
|
|
self.assertTrue(self._plugin._result["failed"])
|
|
self.assertIn("provided when parsing text", self._plugin._result["msg"])
|
|
|
|
def test_fn_load_parser_pass(self):
|
|
"""Confirm each each of the parsers loads from the filesystem"""
|
|
parser_names = ["json", "textfsm", "ttp", "xml"]
|
|
for parser_name in parser_names:
|
|
self._plugin._task.args = {
|
|
"text": "anything",
|
|
"parser": {"name": "ansible.utils." + parser_name},
|
|
}
|
|
parser = self._plugin._load_parser(task_vars=None)
|
|
self.assertEqual(type(parser).__name__, "CliParser")
|
|
self.assertTrue(hasattr(parser, "parse"))
|
|
self.assertTrue(callable(parser.parse))
|
|
|
|
def test_fn_load_parser_fail(self):
|
|
"""Confirm missing parser fails gracefully"""
|
|
self._plugin._task.args = {
|
|
"text": "anything",
|
|
"parser": {"name": "a.b.c"},
|
|
}
|
|
parser = self._plugin._load_parser(task_vars=None)
|
|
self.assertIsNone(parser)
|
|
self.assertTrue(self._plugin._result["failed"])
|
|
self.assertIn("No module named", self._plugin._result["msg"])
|
|
|
|
def test_fn_set_parser_command_missing(self):
|
|
"""Confirm parser/command is set if missing
|
|
and command provided
|
|
"""
|
|
self._plugin._task.args = {
|
|
"command": "anything",
|
|
"parser": {"name": "a.b.c"},
|
|
}
|
|
self._plugin._set_parser_command()
|
|
self.assertEqual(self._plugin._task.args["parser"]["command"], "anything")
|
|
|
|
def test_fn_set_parser_command_present(self):
|
|
"""Confirm parser/command is not changed if provided"""
|
|
self._plugin._task.args = {
|
|
"command": "anything",
|
|
"parser": {"command": "something", "name": "a.b.c"},
|
|
}
|
|
self._plugin._set_parser_command()
|
|
self.assertEqual(self._plugin._task.args["parser"]["command"], "something")
|
|
|
|
def test_fn_set_parser_command_absent(self):
|
|
"""Confirm parser/command is not added"""
|
|
self._plugin._task.args = {"parser": {}}
|
|
self._plugin._set_parser_command()
|
|
self.assertNotIn("command", self._plugin._task.args["parser"])
|
|
|
|
def test_fn_set_text_present(self):
|
|
"""Check task args text is set to stdout"""
|
|
expected = "output"
|
|
self._plugin._result["stdout"] = expected
|
|
self._plugin._task.args = {}
|
|
self._plugin._set_text()
|
|
self.assertEqual(self._plugin._task.args["text"], expected)
|
|
|
|
def test_fn_set_text_absent(self):
|
|
"""Check task args text is set to stdout"""
|
|
self._plugin._result["stdout"] = None
|
|
self._plugin._task.args = {}
|
|
self._plugin._set_text()
|
|
self.assertNotIn("text", self._plugin._task.args)
|
|
|
|
def test_fn_os_from_task_vars(self):
|
|
"""Confirm os is set based on task vars"""
|
|
checks = [
|
|
("ansible_network_os", "cisco.nxos.nxos", "nxos"),
|
|
("ansible_network_os", "NXOS", "nxos"),
|
|
("ansible_distribution", "Fedora", "fedora"),
|
|
(None, None, ""),
|
|
]
|
|
for check in checks:
|
|
self._plugin._task_vars = {check[0]: check[1]}
|
|
result = self._plugin._os_from_task_vars()
|
|
self.assertEqual(result, check[2])
|
|
|
|
def test_fn_update_template_path_not_exist(self):
|
|
"""Check the creation of the template_path if
|
|
it doesn't exist in the user provided data
|
|
"""
|
|
self._plugin._task.args = {"parser": {"command": "a command", "name": "a.b.c"}}
|
|
self._plugin._task_vars = {"ansible_network_os": "cisco.nxos.nxos"}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin._update_template_path("yaml")
|
|
self.assertIn(
|
|
"Could not find or access 'nxos_a_command.yaml'",
|
|
str(error.exception),
|
|
)
|
|
|
|
def test_fn_update_template_path_not_exist_os(self):
|
|
"""Check the creation of the template_path if
|
|
it doesn't exist in the user provided data
|
|
name based on os provided in task
|
|
"""
|
|
self._plugin._task.args = {
|
|
"parser": {"command": "a command", "name": "a.b.c", "os": "myos"},
|
|
}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin._update_template_path("yaml")
|
|
self.assertIn(
|
|
"Could not find or access 'myos_a_command.yaml'",
|
|
str(error.exception),
|
|
)
|
|
|
|
def test_fn_update_template_path_mock_find_needle(self):
|
|
"""Check the creation of the template_path
|
|
mock the find needle fn so the template doesn't
|
|
need to be in the default template folder
|
|
"""
|
|
template_path = os.path.join(
|
|
os.path.dirname(__file__),
|
|
"fixtures",
|
|
"nxos_show_version.yaml",
|
|
)
|
|
self._plugin._find_needle = MagicMock()
|
|
self._plugin._find_needle.return_value = template_path
|
|
self._plugin._task.args = {"parser": {"command": "show version", "os": "nxos"}}
|
|
self._plugin._update_template_path("yaml")
|
|
self.assertEqual(self._plugin._task.args["parser"]["template_path"], template_path)
|
|
|
|
def test_fn_get_template_contents_pass(self):
|
|
"""Check the retrieval of the template contents"""
|
|
temp = tempfile.NamedTemporaryFile()
|
|
contents = "abcdef"
|
|
with open(temp.name, "w") as fileh:
|
|
fileh.write(contents)
|
|
|
|
self._plugin._task.args = {"parser": {"template_path": temp.name}}
|
|
result = self._plugin._get_template_contents()
|
|
self.assertEqual(result, contents)
|
|
|
|
def test_fn_get_template_contents_missing(self):
|
|
"""Check the retrieval of the template contents"""
|
|
self._plugin._task.args = {"parser": {"template_path": "non-exist"}}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin._get_template_contents()
|
|
self.assertIn("Failed to open template 'non-exist'", str(error.exception))
|
|
|
|
def test_fn_get_template_contents_not_specified(self):
|
|
"""Check the none when template_path not specified"""
|
|
self._plugin._task.args = {"parser": {}}
|
|
result = self._plugin._get_template_contents()
|
|
self.assertIsNone(result)
|
|
|
|
def test_fn_prune_result_pass(self):
|
|
"""Test the removal of stdout and stdout_lines from the _result"""
|
|
self._plugin._result["stdout"] = "abc"
|
|
self._plugin._result["stdout_lines"] = "abc"
|
|
self._plugin._prune_result()
|
|
self.assertNotIn("stdout", self._plugin._result)
|
|
self.assertNotIn("stdout_lines", self._plugin._result)
|
|
|
|
def test_fn_prune_result_not_exist(self):
|
|
"""Test the removal of stdout and stdout_lines from the _result"""
|
|
self._plugin._prune_result()
|
|
self.assertNotIn("stdout", self._plugin._result)
|
|
self.assertNotIn("stdout_lines", self._plugin._result)
|
|
|
|
def test_fn_run_command_lx_rc0(self):
|
|
"""Check run command for non network"""
|
|
response = "abc"
|
|
self._plugin._connection.socket_path = None
|
|
self._plugin._low_level_execute_command = MagicMock()
|
|
self._plugin._low_level_execute_command.return_value = {
|
|
"rc": 0,
|
|
"stdout": response,
|
|
"stdout_lines": response,
|
|
}
|
|
self._plugin._task.args = {"command": "ls"}
|
|
self._plugin._run_command()
|
|
self.assertEqual(self._plugin._result["stdout"], response)
|
|
self.assertEqual(self._plugin._result["stdout_lines"], response)
|
|
|
|
def test_fn_run_command_lx_rc1(self):
|
|
"""Check run command for non network"""
|
|
response = "abc"
|
|
self._plugin._connection.socket_path = None
|
|
self._plugin._low_level_execute_command = MagicMock()
|
|
self._plugin._low_level_execute_command.return_value = {
|
|
"rc": 1,
|
|
"stdout": None,
|
|
"stdout_lines": None,
|
|
"stderr": response,
|
|
}
|
|
self._plugin._task.args = {"command": "ls"}
|
|
self._plugin._run_command()
|
|
self.assertTrue(self._plugin._result["failed"])
|
|
self.assertEqual(self._plugin._result["msg"], response)
|
|
|
|
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
|
def test_fn_run_command_network(self, mock_rpc):
|
|
"""Check run command for network"""
|
|
expected = "abc"
|
|
mock_rpc.return_value = expected
|
|
self._plugin._connection.socket_path = tempfile.NamedTemporaryFile().name
|
|
self._plugin._task.args = {"command": "command"}
|
|
self._plugin._run_command()
|
|
self.assertEqual(self._plugin._result["stdout"], expected)
|
|
self.assertEqual(self._plugin._result["stdout_lines"], [expected])
|
|
|
|
def test_fn_run_command_not_specified(self):
|
|
"""Check run command for network"""
|
|
self._plugin._task.args = {"command": None}
|
|
result = self._plugin._run_command()
|
|
self.assertIsNone(result)
|
|
|
|
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
|
def test_fn_run_pass_w_fact(self, mock_rpc):
|
|
"""Check full module run with valid params"""
|
|
mock_out = self._load_fixture("nxos_show_version.txt")
|
|
mock_rpc.return_value = mock_out
|
|
self._plugin._connection.socket_path = tempfile.NamedTemporaryFile().name
|
|
template_path = os.path.join(
|
|
os.path.dirname(__file__),
|
|
"fixtures",
|
|
"nxos_show_version.textfsm",
|
|
)
|
|
self._plugin._task.args = {
|
|
"command": "show version",
|
|
"parser": {
|
|
"name": "ansible.utils.textfsm",
|
|
"template_path": template_path,
|
|
},
|
|
"set_fact": "new_fact",
|
|
}
|
|
task_vars = {"inventory_hostname": "mockdevice"}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result["stdout"], mock_out)
|
|
self.assertEqual(result["stdout_lines"], mock_out.splitlines())
|
|
self.assertEqual(result["parsed"][0]["version"], "9.2(2)")
|
|
self.assertEqual(result["ansible_facts"]["new_fact"][0]["version"], "9.2(2)")
|
|
|
|
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
|
def test_fn_run_pass_wo_fact(self, mock_rpc):
|
|
"""Check full module run with valid params"""
|
|
mock_out = self._load_fixture("nxos_show_version.txt")
|
|
mock_rpc.return_value = mock_out
|
|
self._plugin._connection.socket_path = tempfile.NamedTemporaryFile().name
|
|
template_path = os.path.join(
|
|
os.path.dirname(__file__),
|
|
"fixtures",
|
|
"nxos_show_version.textfsm",
|
|
)
|
|
self._plugin._task.args = {
|
|
"command": "show version",
|
|
"parser": {
|
|
"name": "ansible.utils.textfsm",
|
|
"template_path": template_path,
|
|
},
|
|
}
|
|
task_vars = {"inventory_hostname": "mockdevice"}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result["stdout"], mock_out)
|
|
self.assertEqual(result["stdout_lines"], mock_out.splitlines())
|
|
self.assertEqual(result["parsed"][0]["version"], "9.2(2)")
|
|
self.assertNotIn("ansible_facts", result)
|
|
|
|
def test_fn_run_fail_argspec(self):
|
|
"""Check full module run with invalid params"""
|
|
self._plugin._task.args = {
|
|
"text": "anything",
|
|
"parser": {
|
|
"command": "show version",
|
|
"name": "not_collection_format",
|
|
},
|
|
}
|
|
self._plugin.run(task_vars=None)
|
|
self.assertTrue(self._plugin._result["failed"])
|
|
self.assertIn("including collection", self._plugin._result["msg"])
|
|
|
|
def test_fn_run_fail_command(self):
|
|
"""Confirm clean fail with rc 1"""
|
|
self._plugin._connection.socket_path = None
|
|
self._plugin._low_level_execute_command = MagicMock()
|
|
self._plugin._low_level_execute_command.return_value = {
|
|
"rc": 1,
|
|
"stdout": None,
|
|
"stdout_lines": None,
|
|
"stderr": None,
|
|
}
|
|
self._plugin._task.args = {
|
|
"command": "ls",
|
|
"parser": {"name": "a.b.c"},
|
|
}
|
|
task_vars = {"inventory_hostname": "mockdevice"}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
expected = {
|
|
"failed": True,
|
|
"msg": None,
|
|
"stdout": None,
|
|
"stdout_lines": None,
|
|
}
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_fn_run_fail_missing_parser(self):
|
|
"""Confirm clean fail with missing parser"""
|
|
self._plugin._task.args = {"text": None, "parser": {"name": "a.b.c"}}
|
|
task_vars = {"inventory_hostname": "mockdevice"}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result["failed"], True)
|
|
self.assertIn("Error loading parser", result["msg"])
|
|
|
|
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
|
def test_fn_run_pass_missing_parser_constants(self, mock_rpc):
|
|
"""Check full module run using parser w/o
|
|
DEFAULT_TEMPLATE_EXTENSION or PROVIDE_TEMPLATE_CONTENTS
|
|
defined in the parser
|
|
"""
|
|
mock_out = self._load_fixture("nxos_show_version.txt")
|
|
|
|
class CliParser(CliParserBase):
|
|
def parse(self, *_args, **kwargs):
|
|
return {"parsed": mock_out}
|
|
|
|
self._plugin._load_parser = MagicMock()
|
|
self._plugin._load_parser.return_value = CliParser(None, None, None)
|
|
|
|
mock_out = self._load_fixture("nxos_show_version.txt")
|
|
mock_rpc.return_value = mock_out
|
|
|
|
self._plugin._connection.socket_path = tempfile.NamedTemporaryFile().name
|
|
template_path = os.path.join(
|
|
os.path.dirname(__file__),
|
|
"fixtures",
|
|
"nxos_empty_parser.textfsm",
|
|
)
|
|
self._plugin._task.args = {
|
|
"command": "show version",
|
|
"parser": {
|
|
"name": "ansible.utils.textfsm",
|
|
"template_path": template_path,
|
|
},
|
|
}
|
|
task_vars = {"inventory_hostname": "mockdevice"}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result["stdout"], mock_out)
|
|
self.assertEqual(result["stdout_lines"], mock_out.splitlines())
|
|
self.assertEqual(result["parsed"], mock_out)
|
|
|
|
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
|
def test_fn_run_pass_missing_parser_in_parser(self, mock_rpc):
|
|
"""Check full module run using parser w/o
|
|
a parser function defined in the parser
|
|
defined in the parser
|
|
"""
|
|
mock_out = self._load_fixture("nxos_show_version.txt")
|
|
|
|
class CliParser(CliParserBase):
|
|
pass
|
|
|
|
self._plugin._load_parser = MagicMock()
|
|
self._plugin._load_parser.return_value = CliParser(None, None, None)
|
|
|
|
mock_out = self._load_fixture("nxos_show_version.textfsm")
|
|
mock_rpc.return_value = mock_out
|
|
|
|
self._plugin._connection.socket_path = tempfile.NamedTemporaryFile().name
|
|
template_path = os.path.join(
|
|
os.path.dirname(__file__),
|
|
"fixtures",
|
|
"nxos_empty_parser.textfsm",
|
|
)
|
|
self._plugin._task.args = {
|
|
"command": "show version",
|
|
"parser": {
|
|
"name": "ansible.utils.textfsm",
|
|
"template_path": template_path,
|
|
},
|
|
}
|
|
task_vars = {"inventory_hostname": "mockdevice"}
|
|
with self.assertRaises(Exception) as error:
|
|
self._plugin.run(task_vars=task_vars)
|
|
self.assertIn("Unhandled", str(error.exception))
|
|
|
|
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
|
def test_fn_run_net_device_error(self, mock_rpc):
|
|
"""Check full module run mock error from network device"""
|
|
msg = "I was mocked"
|
|
mock_rpc.side_effect = AnsibleConnectionError(msg)
|
|
self._plugin._connection.socket_path = tempfile.NamedTemporaryFile().name
|
|
self._plugin._task.args = {
|
|
"command": "show version",
|
|
"parser": {"name": "ansible.utils.textfsm"},
|
|
}
|
|
task_vars = {"inventory_hostname": "mockdevice"}
|
|
result = self._plugin.run(task_vars=task_vars)
|
|
self.assertEqual(result["failed"], True)
|
|
self.assertEqual([msg], result["msg"])
|