308 lines
11 KiB
Python
308 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2020 Red Hat
|
|
# GNU General Public License v3.0+
|
|
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
|
|
"""
|
|
The action plugin file for cli_parse
|
|
"""
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
|
|
__metaclass__ = type
|
|
|
|
import json
|
|
|
|
from importlib import import_module
|
|
|
|
from ansible.errors import AnsibleActionFail
|
|
from ansible.module_utils._text import to_native, to_text
|
|
from ansible.module_utils.connection import Connection
|
|
from ansible.module_utils.connection import ConnectionError as AnsibleConnectionError
|
|
from ansible.plugins.action import ActionBase
|
|
|
|
from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import (
|
|
check_argspec,
|
|
)
|
|
from ansible_collections.ansible.utils.plugins.modules.cli_parse import DOCUMENTATION
|
|
|
|
|
|
ARGSPEC_CONDITIONALS = {
|
|
"argument_spec": {"parser": {"mutually_exclusive": [["command", "template_path"]]}},
|
|
"required_one_of": [["command", "text"]],
|
|
"mutually_exclusive": [["command", "text"]],
|
|
}
|
|
|
|
|
|
class ActionModule(ActionBase):
|
|
"""action module"""
|
|
|
|
_requires_connection = True
|
|
|
|
PARSER_CLS_NAME = "CliParser"
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(ActionModule, self).__init__(*args, **kwargs)
|
|
self._playhost = None
|
|
self._parser_name = None
|
|
self._result = {}
|
|
self._task_vars = None
|
|
|
|
def _debug(self, msg):
|
|
"""Output text using ansible's display
|
|
|
|
:param msg: The message
|
|
:type msg: str
|
|
"""
|
|
msg = "<{phost}> [cli_parse] {msg}".format(phost=self._playhost, msg=msg)
|
|
self._display.vvvv(msg)
|
|
|
|
def _fail_json(self, msg):
|
|
"""Replace the AnsibleModule fai_json here
|
|
|
|
:param msg: The message for the failure
|
|
:type msg: str
|
|
"""
|
|
msg = msg.replace("(basic.py)", self._task.action)
|
|
raise AnsibleActionFail(msg)
|
|
|
|
def _extended_check_argspec(self):
|
|
"""Check additional requirements for the argspec
|
|
that cannot be covered using stnd techniques
|
|
"""
|
|
errors = []
|
|
requested_parser = self._task.args.get("parser").get("name")
|
|
if len(requested_parser.split(".")) != 3:
|
|
msg = "Parser name should be provided as a full name including collection"
|
|
errors.append(msg)
|
|
|
|
if self._task.args.get("text") and requested_parser not in [
|
|
"ansible.utils.json",
|
|
"ansible.utils.xml",
|
|
]:
|
|
if not (
|
|
self._task.args.get("parser").get("command")
|
|
or self._task.args.get("parser").get("template_path")
|
|
):
|
|
msg = "Either parser/command or parser/template_path needs to be provided when parsing text."
|
|
errors.append(msg)
|
|
if errors:
|
|
self._result["failed"] = True
|
|
self._result["msg"] = " ".join(errors)
|
|
|
|
def _load_parser(self, task_vars):
|
|
"""Load a parser from the fs
|
|
|
|
:param task_vars: The vars provided when the task was run
|
|
:type task_vars: dict
|
|
:return: An instance of class CliParser
|
|
:rtype: CliParser
|
|
"""
|
|
requested_parser = self._task.args.get("parser").get("name")
|
|
cref = dict(zip(["corg", "cname", "plugin"], requested_parser.split(".")))
|
|
|
|
parserlib = "ansible_collections.{corg}.{cname}.plugins.sub_plugins.cli_parser.{plugin}_parser".format(
|
|
**cref,
|
|
)
|
|
try:
|
|
parsercls = getattr(import_module(parserlib), self.PARSER_CLS_NAME)
|
|
parser = parsercls(
|
|
task_args=self._task.args,
|
|
task_vars=task_vars,
|
|
debug=self._debug,
|
|
)
|
|
return parser
|
|
except Exception as exc:
|
|
self._result["failed"] = True
|
|
self._result["msg"] = "Error loading parser: {err}".format(err=to_native(exc))
|
|
return None
|
|
|
|
def _set_parser_command(self):
|
|
"""Set the /parser/command in the task args based on /command if needed"""
|
|
if self._task.args.get("command"):
|
|
if not self._task.args.get("parser").get("command"):
|
|
self._task.args.get("parser")["command"] = self._task.args.get("command")
|
|
|
|
def _set_text(self):
|
|
"""Set the /text in the task_args based on the command run"""
|
|
if self._result.get("stdout"):
|
|
self._task.args["text"] = self._result["stdout"]
|
|
|
|
def _os_from_task_vars(self):
|
|
"""Extract an os str from the task's vars
|
|
|
|
:return: A short OS name
|
|
:rtype: str
|
|
"""
|
|
os_vars = ["ansible_distribution", "ansible_network_os"]
|
|
oper_sys = ""
|
|
for hvar in os_vars:
|
|
if self._task_vars.get(hvar):
|
|
if hvar == "ansible_network_os":
|
|
oper_sys = self._task_vars.get(hvar, "").split(".")[-1]
|
|
self._debug(
|
|
"OS set to {os}, derived from ansible_network_os".format(
|
|
os=oper_sys.lower(),
|
|
),
|
|
)
|
|
else:
|
|
oper_sys = self._task_vars.get(hvar)
|
|
self._debug("OS set to {os}, using {key}".format(os=oper_sys.lower(), key=hvar))
|
|
return oper_sys.lower()
|
|
|
|
def _update_template_path(self, template_extension):
|
|
"""Update the template_path in the task args
|
|
If not provided, generate template name using os and command
|
|
|
|
:param template_extension: The parser specific template extension
|
|
:type template extension: str
|
|
"""
|
|
if not self._task.args.get("parser").get("template_path"):
|
|
if self._task.args.get("parser").get("os"):
|
|
oper_sys = self._task.args.get("parser").get("os")
|
|
else:
|
|
oper_sys = self._os_from_task_vars()
|
|
cmd_as_fname = self._task.args.get("parser").get("command").replace(" ", "_")
|
|
fname = "{os}_{cmd}.{ext}".format(os=oper_sys, cmd=cmd_as_fname, ext=template_extension)
|
|
source = self._find_needle("templates", fname)
|
|
else:
|
|
source = self._task.args.get("parser").get("template_path")
|
|
source = self._find_needle("templates", source)
|
|
self._debug("template_path in task args updated to {source}".format(source=source))
|
|
self._task.args["parser"]["template_path"] = source
|
|
|
|
def _get_template_contents(self):
|
|
"""Retrieve the contents of the parser template
|
|
|
|
:return: The parser's contents
|
|
:rtype: str
|
|
"""
|
|
template_contents = None
|
|
template_path = self._task.args.get("parser").get("template_path")
|
|
if template_path:
|
|
try:
|
|
with open(template_path, "rb") as file_handler:
|
|
try:
|
|
template_contents = to_text(
|
|
file_handler.read(),
|
|
errors="surrogate_or_strict",
|
|
)
|
|
except UnicodeError:
|
|
raise AnsibleActionFail("Template source files must be utf-8 encoded")
|
|
except FileNotFoundError as exc:
|
|
raise AnsibleActionFail(
|
|
"Failed to open template '{tpath}'. Error: {err}".format(
|
|
tpath=template_path,
|
|
err=to_native(exc),
|
|
),
|
|
)
|
|
return template_contents
|
|
|
|
def _prune_result(self):
|
|
"""In the case of an error, remove stdout and stdout_lines
|
|
this allows for easier visibility of the error message.
|
|
In the case of an actual command error, it will be thrown
|
|
in the module
|
|
"""
|
|
self._result.pop("stdout", None)
|
|
self._result.pop("stdout_lines", None)
|
|
|
|
def _run_command(self):
|
|
"""Run a command on the host
|
|
If socket_path exists, assume it's a network device
|
|
else, run a low level command
|
|
"""
|
|
command = self._task.args.get("command")
|
|
if command:
|
|
socket_path = self._connection.socket_path
|
|
if socket_path:
|
|
connection = Connection(socket_path)
|
|
try:
|
|
response = connection.get(command=command)
|
|
self._result["stdout"] = response
|
|
self._result["stdout_lines"] = response.splitlines()
|
|
except AnsibleConnectionError as exc:
|
|
self._result["failed"] = True
|
|
self._result["msg"] = [to_text(exc)]
|
|
else:
|
|
result = self._low_level_execute_command(cmd=command)
|
|
if result["rc"]:
|
|
self._result["failed"] = True
|
|
self._result["msg"] = result["stderr"]
|
|
self._result["stdout"] = result["stdout"]
|
|
self._result["stdout_lines"] = result["stdout_lines"]
|
|
|
|
def run(self, tmp=None, task_vars=None):
|
|
"""The std execution entry pt for an action plugin
|
|
|
|
:param tmp: no longer used
|
|
:type tmp: none
|
|
:param task_vars: The vars provided when the task is run
|
|
:type task_vars: dict
|
|
:return: The results from the parser
|
|
:rtype: dict
|
|
"""
|
|
valid, argspec_result, updated_params = check_argspec(
|
|
DOCUMENTATION,
|
|
"cli_parse module",
|
|
schema_conditionals=ARGSPEC_CONDITIONALS,
|
|
**self._task.args,
|
|
)
|
|
if not valid:
|
|
return argspec_result
|
|
|
|
self._extended_check_argspec()
|
|
if self._result.get("failed"):
|
|
return self._result
|
|
|
|
self._task_vars = task_vars
|
|
self._playhost = task_vars.get("inventory_hostname")
|
|
self._parser_name = self._task.args.get("parser").get("name")
|
|
|
|
self._run_command()
|
|
if self._result.get("failed"):
|
|
return self._result
|
|
|
|
self._set_parser_command()
|
|
self._set_text()
|
|
|
|
parser = self._load_parser(task_vars)
|
|
if self._result.get("failed"):
|
|
self._prune_result()
|
|
return self._result
|
|
|
|
# Not all parsers use a template, in the case a parser provides
|
|
# an extension, provide it the template path
|
|
if getattr(parser, "DEFAULT_TEMPLATE_EXTENSION", False):
|
|
self._update_template_path(parser.DEFAULT_TEMPLATE_EXTENSION)
|
|
|
|
# Not all parsers require the template contents
|
|
# when true, provide the template contents
|
|
if getattr(parser, "PROVIDE_TEMPLATE_CONTENTS", False) is True:
|
|
template_contents = self._get_template_contents()
|
|
else:
|
|
template_contents = None
|
|
|
|
try:
|
|
result = parser.parse(template_contents=template_contents)
|
|
# ensure the response returned to the controller
|
|
# contains only native types, nothing unique to the parser
|
|
result = json.loads(json.dumps(result))
|
|
except Exception as exc:
|
|
raise AnsibleActionFail(
|
|
"Unhandled exception from parser '{parser}'. Error: {err}".format(
|
|
parser=self._parser_name,
|
|
err=to_native(exc),
|
|
),
|
|
)
|
|
|
|
if result.get("errors"):
|
|
self._prune_result()
|
|
self._result.update({"failed": True, "msg": " ".join(result["errors"])})
|
|
else:
|
|
self._result["parsed"] = result["parsed"]
|
|
set_fact = self._task.args.get("set_fact")
|
|
if set_fact:
|
|
self._result["ansible_facts"] = {set_fact: result["parsed"]}
|
|
return self._result
|