Add cli_parse module and plugins (#28)
Add cli_parse module and plugins Reviewed-by: https://github.com/apps/ansible-zuulpull/30/head
parent
3490b95957
commit
a22cbd97b2
|
@ -0,0 +1,3 @@
|
|||
---
|
||||
minor_changes:
|
||||
- Add cli_parse module and plugins (https://github.com/ansible-collections/ansible.utils/pull/28)
|
|
@ -0,0 +1,350 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2020 Red Hat
|
||||
# GNU General Public License v3.0+
|
||||
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
"""
|
||||
The action plugin file for cli_parse
|
||||
"""
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import json
|
||||
from importlib import import_module
|
||||
|
||||
from ansible.errors import AnsibleActionFail
|
||||
from ansible.module_utils._text import to_native, to_text
|
||||
from ansible.module_utils.connection import (
|
||||
Connection,
|
||||
ConnectionError as AnsibleConnectionError,
|
||||
)
|
||||
from ansible.plugins.action import ActionBase
|
||||
from ansible_collections.ansible.utils.plugins.modules.cli_parse import (
|
||||
DOCUMENTATION,
|
||||
)
|
||||
from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import (
|
||||
check_argspec,
|
||||
)
|
||||
|
||||
# python 2.7 compat for FileNotFoundError
|
||||
try:
|
||||
FileNotFoundError
|
||||
except NameError:
|
||||
FileNotFoundError = IOError
|
||||
|
||||
|
||||
ARGSPEC_CONDITIONALS = {
|
||||
"argument_spec": {
|
||||
"parser": {"mutually_exclusive": [["command", "template_path"]]}
|
||||
},
|
||||
"required_one_of": [["command", "text"]],
|
||||
"mutually_exclusive": [["command", "text"]],
|
||||
}
|
||||
|
||||
|
||||
class ActionModule(ActionBase):
|
||||
""" action module
|
||||
"""
|
||||
|
||||
PARSER_CLS_NAME = "CliParser"
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(ActionModule, self).__init__(*args, **kwargs)
|
||||
self._playhost = None
|
||||
self._parser_name = None
|
||||
self._result = {}
|
||||
self._task_vars = None
|
||||
|
||||
def _debug(self, msg):
|
||||
""" Output text using ansible's display
|
||||
|
||||
:param msg: The message
|
||||
:type msg: str
|
||||
"""
|
||||
msg = "<{phost}> [cli_parse] {msg}".format(
|
||||
phost=self._playhost, msg=msg
|
||||
)
|
||||
self._display.vvvv(msg)
|
||||
|
||||
def _fail_json(self, msg):
|
||||
""" Replace the AnsibleModule fai_json here
|
||||
|
||||
:param msg: The message for the failure
|
||||
:type msg: str
|
||||
"""
|
||||
msg = msg.replace("(basic.py)", self._task.action)
|
||||
raise AnsibleActionFail(msg)
|
||||
|
||||
def _extended_check_argspec(self):
|
||||
""" Check additional requirements for the argspec
|
||||
that cannot be covered using stnd techniques
|
||||
"""
|
||||
errors = []
|
||||
requested_parser = self._task.args.get("parser").get("name")
|
||||
if len(requested_parser.split(".")) != 3:
|
||||
msg = "Parser name should be provided as a full name including collection"
|
||||
errors.append(msg)
|
||||
|
||||
if self._task.args.get("text") and requested_parser not in [
|
||||
"ansible.utils.json",
|
||||
"ansible.utils.xml",
|
||||
]:
|
||||
if not (
|
||||
self._task.args.get("parser").get("command")
|
||||
or self._task.args.get("parser").get("template_path")
|
||||
):
|
||||
msg = "Either parser/command or parser/template_path needs to be provided when parsing text."
|
||||
errors.append(msg)
|
||||
if errors:
|
||||
self._result["failed"] = True
|
||||
self._result["msg"] = " ".join(errors)
|
||||
|
||||
def _load_parser(self, task_vars):
|
||||
""" Load a parser from the fs
|
||||
|
||||
:param task_vars: The vars provided when the task was run
|
||||
:type task_vars: dict
|
||||
:return: An instance of class CliParser
|
||||
:rtype: CliParser
|
||||
"""
|
||||
requested_parser = self._task.args.get("parser").get("name")
|
||||
cref = dict(
|
||||
zip(["corg", "cname", "plugin"], requested_parser.split("."))
|
||||
)
|
||||
if cref["cname"] == "netcommon" and cref["plugin"] in [
|
||||
"json",
|
||||
"textfsm",
|
||||
"ttp",
|
||||
"xml",
|
||||
]:
|
||||
cref["cname"] = "utils"
|
||||
msg = (
|
||||
"Use 'ansible.utils.{plugin}' for parser name instead of '{requested_parser}'."
|
||||
" This feature will be removed from 'ansible.netcommon' collection in a release"
|
||||
" after 2022-11-01".format(
|
||||
plugin=cref["plugin"], requested_parser=requested_parser
|
||||
)
|
||||
)
|
||||
self._display.warning(msg)
|
||||
|
||||
parserlib = "ansible_collections.{corg}.{cname}.plugins.cli_parsers.{plugin}_parser".format(
|
||||
**cref
|
||||
)
|
||||
try:
|
||||
parsercls = getattr(import_module(parserlib), self.PARSER_CLS_NAME)
|
||||
parser = parsercls(
|
||||
task_args=self._task.args,
|
||||
task_vars=task_vars,
|
||||
debug=self._debug,
|
||||
)
|
||||
return parser
|
||||
except Exception as exc:
|
||||
self._result["failed"] = True
|
||||
self._result["msg"] = "Error loading parser: {err}".format(
|
||||
err=to_native(exc)
|
||||
)
|
||||
return None
|
||||
|
||||
def _set_parser_command(self):
|
||||
""" Set the /parser/command in the task args based on /command if needed
|
||||
"""
|
||||
if self._task.args.get("command"):
|
||||
if not self._task.args.get("parser").get("command"):
|
||||
self._task.args.get("parser")["command"] = self._task.args.get(
|
||||
"command"
|
||||
)
|
||||
|
||||
def _set_text(self):
|
||||
""" Set the /text in the task_args based on the command run
|
||||
"""
|
||||
if self._result.get("stdout"):
|
||||
self._task.args["text"] = self._result["stdout"]
|
||||
|
||||
def _os_from_task_vars(self):
|
||||
""" Extract an os str from the task's vars
|
||||
|
||||
:return: A short OS name
|
||||
:rtype: str
|
||||
"""
|
||||
os_vars = ["ansible_distribution", "ansible_network_os"]
|
||||
oper_sys = ""
|
||||
for hvar in os_vars:
|
||||
if self._task_vars.get(hvar):
|
||||
if hvar == "ansible_network_os":
|
||||
oper_sys = self._task_vars.get(hvar, "").split(".")[-1]
|
||||
self._debug(
|
||||
"OS set to {os}, derived from ansible_network_os".format(
|
||||
os=oper_sys.lower()
|
||||
)
|
||||
)
|
||||
else:
|
||||
oper_sys = self._task_vars.get(hvar)
|
||||
self._debug(
|
||||
"OS set to {os}, using {key}".format(
|
||||
os=oper_sys.lower(), key=hvar
|
||||
)
|
||||
)
|
||||
return oper_sys.lower()
|
||||
|
||||
def _update_template_path(self, template_extension):
|
||||
""" Update the template_path in the task args
|
||||
If not provided, generate template name using os and command
|
||||
|
||||
:param template_extension: The parser specific template extension
|
||||
:type template extension: str
|
||||
"""
|
||||
if not self._task.args.get("parser").get("template_path"):
|
||||
if self._task.args.get("parser").get("os"):
|
||||
oper_sys = self._task.args.get("parser").get("os")
|
||||
else:
|
||||
oper_sys = self._os_from_task_vars()
|
||||
cmd_as_fname = (
|
||||
self._task.args.get("parser").get("command").replace(" ", "_")
|
||||
)
|
||||
fname = "{os}_{cmd}.{ext}".format(
|
||||
os=oper_sys, cmd=cmd_as_fname, ext=template_extension
|
||||
)
|
||||
source = self._find_needle("templates", fname)
|
||||
self._debug(
|
||||
"template_path in task args updated to {source}".format(
|
||||
source=source
|
||||
)
|
||||
)
|
||||
self._task.args["parser"]["template_path"] = source
|
||||
|
||||
def _get_template_contents(self):
|
||||
""" Retrieve the contents of the parser template
|
||||
|
||||
:return: The parser's contents
|
||||
:rtype: str
|
||||
"""
|
||||
template_contents = None
|
||||
template_path = self._task.args.get("parser").get("template_path")
|
||||
if template_path:
|
||||
try:
|
||||
with open(template_path, "rb") as file_handler:
|
||||
try:
|
||||
template_contents = to_text(
|
||||
file_handler.read(), errors="surrogate_or_strict"
|
||||
)
|
||||
except UnicodeError:
|
||||
raise AnsibleActionFail(
|
||||
"Template source files must be utf-8 encoded"
|
||||
)
|
||||
except FileNotFoundError as exc:
|
||||
raise AnsibleActionFail(
|
||||
"Failed to open template '{tpath}'. Error: {err}".format(
|
||||
tpath=template_path, err=to_native(exc)
|
||||
)
|
||||
)
|
||||
return template_contents
|
||||
|
||||
def _prune_result(self):
|
||||
""" In the case of an error, remove stdout and stdout_lines
|
||||
this allows for easier visibility of the error message.
|
||||
In the case of an actual command error, it will be thrown
|
||||
in the module
|
||||
"""
|
||||
self._result.pop("stdout", None)
|
||||
self._result.pop("stdout_lines", None)
|
||||
|
||||
def _run_command(self):
|
||||
""" Run a command on the host
|
||||
If socket_path exists, assume it's a network device
|
||||
else, run a low level command
|
||||
"""
|
||||
command = self._task.args.get("command")
|
||||
if command:
|
||||
socket_path = self._connection.socket_path
|
||||
if socket_path:
|
||||
connection = Connection(socket_path)
|
||||
try:
|
||||
response = connection.get(command=command)
|
||||
self._result["stdout"] = response
|
||||
self._result["stdout_lines"] = response.splitlines()
|
||||
except AnsibleConnectionError as exc:
|
||||
self._result["failed"] = True
|
||||
self._result["msg"] = [to_text(exc)]
|
||||
else:
|
||||
result = self._low_level_execute_command(cmd=command)
|
||||
if result["rc"]:
|
||||
self._result["failed"] = True
|
||||
self._result["msg"] = result["stderr"]
|
||||
self._result["stdout"] = result["stdout"]
|
||||
self._result["stdout_lines"] = result["stdout_lines"]
|
||||
|
||||
def run(self, tmp=None, task_vars=None):
|
||||
""" The std execution entry pt for an action plugin
|
||||
|
||||
:param tmp: no longer used
|
||||
:type tmp: none
|
||||
:param task_vars: The vars provided when the task is run
|
||||
:type task_vars: dict
|
||||
:return: The results from the parser
|
||||
:rtype: dict
|
||||
"""
|
||||
valid, argspec_result, updated_params = check_argspec(
|
||||
DOCUMENTATION,
|
||||
"cli_parse module",
|
||||
schema_conditionals=ARGSPEC_CONDITIONALS,
|
||||
**self._task.args
|
||||
)
|
||||
if not valid:
|
||||
return argspec_result
|
||||
|
||||
self._extended_check_argspec()
|
||||
if self._result.get("failed"):
|
||||
return self._result
|
||||
|
||||
self._task_vars = task_vars
|
||||
self._playhost = task_vars.get("inventory_hostname")
|
||||
self._parser_name = self._task.args.get("parser").get("name")
|
||||
|
||||
self._run_command()
|
||||
if self._result.get("failed"):
|
||||
return self._result
|
||||
|
||||
self._set_parser_command()
|
||||
self._set_text()
|
||||
|
||||
parser = self._load_parser(task_vars)
|
||||
if self._result.get("failed"):
|
||||
self._prune_result()
|
||||
return self._result
|
||||
|
||||
# Not all parsers use a template, in the case a parser provides
|
||||
# an extension, provide it the template path
|
||||
if getattr(parser, "DEFAULT_TEMPLATE_EXTENSION", False):
|
||||
self._update_template_path(parser.DEFAULT_TEMPLATE_EXTENSION)
|
||||
|
||||
# Not all parsers require the template contents
|
||||
# when true, provide the template contents
|
||||
if getattr(parser, "PROVIDE_TEMPLATE_CONTENTS", False) is True:
|
||||
template_contents = self._get_template_contents()
|
||||
else:
|
||||
template_contents = None
|
||||
|
||||
try:
|
||||
result = parser.parse(template_contents=template_contents)
|
||||
# ensure the response returned to the controller
|
||||
# contains only native types, nothing unique to the parser
|
||||
result = json.loads(json.dumps(result))
|
||||
except Exception as exc:
|
||||
raise AnsibleActionFail(
|
||||
"Unhandled exception from parser '{parser}'. Error: {err}".format(
|
||||
parser=self._parser_name, err=to_native(exc)
|
||||
)
|
||||
)
|
||||
|
||||
if result.get("errors"):
|
||||
self._prune_result()
|
||||
self._result.update(
|
||||
{"failed": True, "msg": " ".join(result["errors"])}
|
||||
)
|
||||
else:
|
||||
self._result["parsed"] = result["parsed"]
|
||||
set_fact = self._task.args.get("set_fact")
|
||||
if set_fact:
|
||||
self._result["ansible_facts"] = {set_fact: result["parsed"]}
|
||||
return self._result
|
|
@ -0,0 +1,17 @@
|
|||
"""
|
||||
The base class for cli_parsers
|
||||
"""
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
class CliParserBase:
|
||||
""" The base class for cli parsers
|
||||
Provides a _debug function to normalize parser debug output
|
||||
"""
|
||||
|
||||
def __init__(self, task_args, task_vars, debug):
|
||||
self._debug = debug
|
||||
self._task_args = task_args
|
||||
self._task_vars = task_vars
|
|
@ -0,0 +1,48 @@
|
|||
"""
|
||||
json parser
|
||||
|
||||
This is the json parser for use with the cli_parse module and action plugin
|
||||
"""
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import json
|
||||
|
||||
from ansible.module_utils._text import to_native
|
||||
from ansible.module_utils.six import string_types
|
||||
from ansible_collections.ansible.utils.plugins.cli_parsers._base import (
|
||||
CliParserBase,
|
||||
)
|
||||
|
||||
|
||||
class CliParser(CliParserBase):
|
||||
""" The json parser class
|
||||
Convert a string containing valid json into an object
|
||||
"""
|
||||
|
||||
DEFAULT_TEMPLATE_EXTENSION = None
|
||||
PROVIDE_TEMPLATE_CONTENTS = False
|
||||
|
||||
def parse(self, *_args, **_kwargs):
|
||||
""" Std entry point for a cli_parse parse execution
|
||||
|
||||
:return: Errors or parsed text as structured data
|
||||
:rtype: dict
|
||||
|
||||
:example:
|
||||
|
||||
The parse function of a parser should return a dict:
|
||||
{"errors": [a list of errors]}
|
||||
or
|
||||
{"parsed": obj}
|
||||
"""
|
||||
text = self._task_args.get("text")
|
||||
try:
|
||||
if not isinstance(text, string_types):
|
||||
text = json.dumps(text)
|
||||
parsed = json.loads(text)
|
||||
except Exception as exc:
|
||||
return {"errors": [to_native(exc)]}
|
||||
|
||||
return {"parsed": parsed}
|
|
@ -0,0 +1,85 @@
|
|||
"""
|
||||
textfsm parser
|
||||
|
||||
This is the textfsm parser for use with the cli_parse module and action plugin
|
||||
https://github.com/google/textfsm
|
||||
"""
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import os
|
||||
|
||||
from ansible.module_utils._text import to_native
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible_collections.ansible.utils.plugins.cli_parsers._base import (
|
||||
CliParserBase,
|
||||
)
|
||||
|
||||
try:
|
||||
import textfsm
|
||||
|
||||
HAS_TEXTFSM = True
|
||||
except ImportError:
|
||||
HAS_TEXTFSM = False
|
||||
|
||||
|
||||
class CliParser(CliParserBase):
|
||||
""" The textfsm parser class
|
||||
Convert raw text to structured data using textfsm
|
||||
"""
|
||||
|
||||
DEFAULT_TEMPLATE_EXTENSION = "textfsm"
|
||||
PROVIDE_TEMPLATE_CONTENTS = False
|
||||
|
||||
@staticmethod
|
||||
def _check_reqs():
|
||||
""" Check the prerequisites for the textfsm parser
|
||||
|
||||
:return dict: A dict with errors or a template_path
|
||||
"""
|
||||
errors = []
|
||||
|
||||
if not HAS_TEXTFSM:
|
||||
errors.append(missing_required_lib("textfsm"))
|
||||
|
||||
return {"errors": errors}
|
||||
|
||||
def parse(self, *_args, **_kwargs):
|
||||
""" Std entry point for a cli_parse parse execution
|
||||
|
||||
:return: Errors or parsed text as structured data
|
||||
:rtype: dict
|
||||
|
||||
:example:
|
||||
|
||||
The parse function of a parser should return a dict:
|
||||
{"errors": [a list of errors]}
|
||||
or
|
||||
{"parsed": obj}
|
||||
"""
|
||||
cli_output = self._task_args.get("text")
|
||||
res = self._check_reqs()
|
||||
if res.get("errors"):
|
||||
return {"errors": res.get("errors")}
|
||||
|
||||
template_path = self._task_args.get("parser").get("template_path")
|
||||
if template_path and not os.path.isfile(template_path):
|
||||
return {
|
||||
"error": "error while reading template_path file {file}".format(
|
||||
file=template_path
|
||||
)
|
||||
}
|
||||
try:
|
||||
template = open(self._task_args.get("parser").get("template_path"))
|
||||
except IOError as exc:
|
||||
return {"error": to_native(exc)}
|
||||
|
||||
re_table = textfsm.TextFSM(template)
|
||||
fsm_results = re_table.ParseText(cli_output)
|
||||
|
||||
results = list()
|
||||
for item in fsm_results:
|
||||
results.append(dict(zip(re_table.header, item)))
|
||||
|
||||
return {"parsed": results}
|
|
@ -0,0 +1,104 @@
|
|||
"""
|
||||
ttp parser
|
||||
|
||||
This is the ttp parser for use with the cli_parse module and action plugin
|
||||
https://github.com/dmulyalin/ttp
|
||||
"""
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import os
|
||||
|
||||
from ansible.module_utils._text import to_native
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible_collections.ansible.utils.plugins.cli_parsers._base import (
|
||||
CliParserBase,
|
||||
)
|
||||
|
||||
try:
|
||||
from ttp import ttp
|
||||
|
||||
HAS_TTP = True
|
||||
except ImportError:
|
||||
HAS_TTP = False
|
||||
|
||||
|
||||
class CliParser(CliParserBase):
|
||||
""" The ttp parser class
|
||||
Convert raw text to structured data using ttp
|
||||
"""
|
||||
|
||||
DEFAULT_TEMPLATE_EXTENSION = "ttp"
|
||||
PROVIDE_TEMPLATE_CONTENTS = False
|
||||
|
||||
@staticmethod
|
||||
def _check_reqs():
|
||||
""" Check the prerequisites for the ttp parser
|
||||
|
||||
:return dict: A dict with errors or a template_path
|
||||
"""
|
||||
errors = []
|
||||
|
||||
if not HAS_TTP:
|
||||
errors.append(missing_required_lib("ttp"))
|
||||
|
||||
return {"errors": errors}
|
||||
|
||||
def parse(self, *_args, **_kwargs):
|
||||
""" Std entry point for a cli_parse parse execution
|
||||
|
||||
:return: Errors or parsed text as structured data
|
||||
:rtype: dict
|
||||
|
||||
:example:
|
||||
|
||||
The parse function of a parser should return a dict:
|
||||
{"errors": [a list of errors]}
|
||||
or
|
||||
{"parsed": obj}
|
||||
"""
|
||||
cli_output = to_native(
|
||||
self._task_args.get("text"), errors="surrogate_then_replace"
|
||||
)
|
||||
res = self._check_reqs()
|
||||
if res.get("errors"):
|
||||
return {"errors": res.get("errors")}
|
||||
|
||||
template_path = to_native(
|
||||
self._task_args.get("parser").get("template_path"),
|
||||
errors="surrogate_then_replace",
|
||||
)
|
||||
if template_path and not os.path.isfile(template_path):
|
||||
return {
|
||||
"error": "error while reading template_path file {file}".format(
|
||||
file=template_path
|
||||
)
|
||||
}
|
||||
|
||||
try:
|
||||
parser_param = self._task_args.get("parser")
|
||||
vars = (
|
||||
parser_param.get("vars", {}).get("ttp_vars", {})
|
||||
if parser_param.get("vars")
|
||||
else {}
|
||||
)
|
||||
kwargs = (
|
||||
parser_param.get("vars", {}).get("ttp_init", {})
|
||||
if parser_param.get("vars")
|
||||
else {}
|
||||
)
|
||||
parser = ttp(
|
||||
data=cli_output, template=template_path, vars=vars, **kwargs
|
||||
)
|
||||
parser.parse(one=True)
|
||||
ttp_results = (
|
||||
parser_param.get("vars", {}).get("ttp_results", {})
|
||||
if parser_param.get("vars")
|
||||
else {}
|
||||
)
|
||||
results = parser.result(**ttp_results)
|
||||
except Exception as exc:
|
||||
msg = "Template Text Parser returned an error while parsing. Error: {err}"
|
||||
return {"errors": [msg.format(err=to_native(exc))]}
|
||||
return {"parsed": results}
|
|
@ -0,0 +1,80 @@
|
|||
"""
|
||||
xml parser
|
||||
|
||||
This is the xml parser for use with the cli_parse module and action plugin
|
||||
https://github.com/martinblech/xmltodict
|
||||
"""
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
from ansible.module_utils._text import to_native
|
||||
from ansible.module_utils.basic import missing_required_lib
|
||||
from ansible_collections.ansible.utils.plugins.cli_parsers._base import (
|
||||
CliParserBase,
|
||||
)
|
||||
|
||||
|
||||
try:
|
||||
import xmltodict
|
||||
|
||||
HAS_XMLTODICT = True
|
||||
except ImportError:
|
||||
HAS_XMLTODICT = False
|
||||
|
||||
|
||||
class CliParser(CliParserBase):
|
||||
""" The xml parser class
|
||||
Convert an xml string to structured data using xmltodict
|
||||
"""
|
||||
|
||||
DEFAULT_TEMPLATE_EXTENSION = None
|
||||
PROVIDE_TEMPLATE_CONTENTS = False
|
||||
|
||||
@staticmethod
|
||||
def _check_reqs():
|
||||
""" Check the prerequisites for the xml parser
|
||||
"""
|
||||
errors = []
|
||||
if not HAS_XMLTODICT:
|
||||
errors.append(missing_required_lib("xmltodict"))
|
||||
|
||||
return errors
|
||||
|
||||
def parse(self, *_args, **_kwargs):
|
||||
""" Std entry point for a cli_parse parse execution
|
||||
|
||||
:return: Errors or parsed text as structured data
|
||||
:rtype: dict
|
||||
|
||||
:example:
|
||||
|
||||
The parse function of a parser should return a dict:
|
||||
{"errors": [a list of errors]}
|
||||
or
|
||||
{"parsed": obj}
|
||||
"""
|
||||
errors = self._check_reqs()
|
||||
if errors:
|
||||
return {"errors": errors}
|
||||
|
||||
cli_output = self._task_args.get("text")
|
||||
|
||||
network_os = self._task_args.get("parser").get(
|
||||
"os"
|
||||
) or self._task_vars.get("ansible_network_os")
|
||||
# the nxos | xml includes a odd garbage line at the end, so remove it
|
||||
if not network_os:
|
||||
self._debug("network_os value is not set")
|
||||
|
||||
if network_os and "nxos" in network_os:
|
||||
splitted = cli_output.splitlines()
|
||||
if splitted[-1] == "]]>]]>":
|
||||
cli_output = "\n".join(splitted[:-1])
|
||||
|
||||
try:
|
||||
parsed = xmltodict.parse(cli_output)
|
||||
return {"parsed": parsed}
|
||||
except Exception as exc:
|
||||
msg = "XML parser returned an error while parsing. Error: {err}"
|
||||
return {"errors": [msg.format(err=to_native(exc))]}
|
|
@ -0,0 +1,280 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2020 Red Hat
|
||||
# GNU General Public License v3.0+
|
||||
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
|
||||
DOCUMENTATION = """
|
||||
module: cli_parse
|
||||
author: Bradley Thornton (@cidrblock)
|
||||
short_description: Parse cli output or text using a variety of parsers
|
||||
description:
|
||||
- Parse cli output or text using a variety of parsers
|
||||
version_added: 1.0.0
|
||||
options:
|
||||
command:
|
||||
type: str
|
||||
description:
|
||||
- The command to run on the host
|
||||
text:
|
||||
type: str
|
||||
description:
|
||||
- Text to be parsed
|
||||
parser:
|
||||
type: dict
|
||||
description:
|
||||
- Parser specific parameters
|
||||
required: True
|
||||
suboptions:
|
||||
name:
|
||||
type: str
|
||||
description:
|
||||
- The name of the parser to use
|
||||
required: True
|
||||
command:
|
||||
type: str
|
||||
description:
|
||||
- The command used to locate the parser's template
|
||||
os:
|
||||
type: str
|
||||
description:
|
||||
- Provide an operating system value to the parser
|
||||
- For `ntc_templates` parser, this should be in the supported
|
||||
`<vendor>_<os>` format.
|
||||
template_path:
|
||||
type: str
|
||||
description:
|
||||
- Path of the parser template on the Ansible controller
|
||||
- This can be a relative or an absolute path
|
||||
vars:
|
||||
type: dict
|
||||
description:
|
||||
- Additional parser specific parameters
|
||||
- See the cli_parse user guide for examples of parser specific variables
|
||||
- U(https://docs.ansible.com/ansible/latest/network/user_guide/cli_parsing.html)
|
||||
set_fact:
|
||||
description:
|
||||
- Set the resulting parsed data as a fact
|
||||
type: str
|
||||
|
||||
|
||||
notes:
|
||||
- The default search path for a parser template is templates/{{ short_os }}_{{ command }}.{{ extension }}
|
||||
- => short_os derived from ansible_network_os or ansible_distribution and set to lower case
|
||||
- => command is the command passed to the module with spaces replaced with _
|
||||
- => extension is specific to the parser used (native=yaml, textfsm=textfsm, ttp=ttp)
|
||||
- The default Ansible search path for the templates directory is used for parser templates as well
|
||||
- Some parsers may have additional configuration options available. See the parsers/vars key and the parser's documentation
|
||||
- Some parsers require third-party python libraries be installed on the Ansible control node and a specific python version
|
||||
- e.g. Pyats requires pyats and genie and requires Python 3
|
||||
- e.g. ntc_templates requires ntc_templates
|
||||
- e.g. textfsm requires textfsm
|
||||
- e.g. ttp requires ttp
|
||||
- e.g. xml requires xml_to_dict
|
||||
- Support of 3rd party python libraries is limited to the use of their public APIs as documented
|
||||
- "Additional information and examples can be found in the parsing user guide:"
|
||||
- https://docs.ansible.com/ansible/latest/network/user_guide/cli_parsing.html
|
||||
"""
|
||||
|
||||
|
||||
EXAMPLES = r"""
|
||||
|
||||
# Using the native parser
|
||||
|
||||
# -------------
|
||||
# templates/nxos_show_interface.yaml
|
||||
# - example: Ethernet1/1 is up
|
||||
# getval: '(?P<name>\S+) is (?P<oper_state>\S+)'
|
||||
# result:
|
||||
# "{{ name }}":
|
||||
# name: "{{ name }}"
|
||||
# state:
|
||||
# operating: "{{ oper_state }}"
|
||||
# shared: True
|
||||
#
|
||||
# - example: admin state is up, Dedicated Interface
|
||||
# getval: 'admin state is (?P<admin_state>\S+)'
|
||||
# result:
|
||||
# "{{ name }}":
|
||||
# name: "{{ name }}"
|
||||
# state:
|
||||
# admin: "{{ admin_state }}"
|
||||
#
|
||||
# - example: " Hardware: Ethernet, address: 0000.5E00.5301 (bia 0000.5E00.5301)"
|
||||
# getval: '\s+Hardware: (?P<hardware>.*), address: (?P<mac>\S+)'
|
||||
# result:
|
||||
# "{{ name }}":
|
||||
# hardware: "{{ hardware }}"
|
||||
# mac_address: "{{ mac }}"
|
||||
|
||||
- name: Run command and parse with native
|
||||
ansible.utils.cli_parse:
|
||||
command: "show interface"
|
||||
parser:
|
||||
name: ansible.netcommon.native
|
||||
set_fact: interfaces_fact
|
||||
|
||||
|
||||
- name: Pass text and template_path
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ previous_command['stdout'] }}"
|
||||
parser:
|
||||
name: ansible.netcommon.native
|
||||
template_path: "{{ role_path }}/templates/nxos_show_interface.yaml"
|
||||
|
||||
|
||||
# Using the ntc_templates parser
|
||||
|
||||
# -------------
|
||||
# The ntc_templates use 'vendor_platform' for the file name
|
||||
# it will be derived from ansible_network_os if not provided
|
||||
# e.g. cisco.ios.ios => cisco_ios
|
||||
|
||||
- name: Run command and parse with ntc_templates
|
||||
ansible.utils.cli_parse:
|
||||
command: "show interface"
|
||||
parser:
|
||||
name: ansible.netcommon.ntc_templates
|
||||
register: parser_output
|
||||
|
||||
- name: Pass text and command
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ previous_command['stdout'] }}"
|
||||
parser:
|
||||
name: ansible.netcommon.ntc_templates
|
||||
command: show interface
|
||||
register: parser_output
|
||||
|
||||
|
||||
# Using the pyats parser
|
||||
|
||||
# -------------
|
||||
# The pyats parser uses 'os' to locate the appropriate parser
|
||||
# it will be derived from ansible_network_os if not provided
|
||||
# in the case of pyats: cisco.ios.ios => iosxe
|
||||
|
||||
- name: Run command and parse with pyats
|
||||
ansible.utils.cli_parse:
|
||||
command: "show interface"
|
||||
parser:
|
||||
name: ansible.netcommon.pyats
|
||||
register: parser_output
|
||||
|
||||
- name: Pass text and command
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ previous_command['stdout'] }}"
|
||||
parser:
|
||||
name: ansible.netcommon.pyats
|
||||
command: show interface
|
||||
register: parser_output
|
||||
|
||||
- name: Provide an OS to pyats to use an ios parser
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ previous_command['stdout'] }}"
|
||||
parser:
|
||||
name: ansible.netcommon.pyats
|
||||
command: show interface
|
||||
os: ios
|
||||
register: parser_output
|
||||
|
||||
|
||||
# Using the textfsm parser
|
||||
|
||||
# -------------
|
||||
# templates/nxos_show_version.textfsm
|
||||
#
|
||||
# Value UPTIME ((\d+\s\w+.s.,?\s?){4})
|
||||
# Value LAST_REBOOT_REASON (.+)
|
||||
# Value OS (\d+.\d+(.+)?)
|
||||
# Value BOOT_IMAGE (.*)
|
||||
# Value PLATFORM (\w+)
|
||||
#
|
||||
# Start
|
||||
# ^\s+(NXOS: version|system:\s+version)\s+${OS}\s*$$
|
||||
# ^\s+(NXOS|kickstart)\s+image\s+file\s+is:\s+${BOOT_IMAGE}\s*$$
|
||||
# ^\s+cisco\s+${PLATFORM}\s+[cC]hassis
|
||||
# ^\s+cisco\s+Nexus\d+\s+${PLATFORM}
|
||||
# # Cisco N5K platform
|
||||
# ^\s+cisco\s+Nexus\s+${PLATFORM}\s+[cC]hassis
|
||||
# ^\s+cisco\s+.+-${PLATFORM}\s*
|
||||
# ^Kernel\s+uptime\s+is\s+${UPTIME}
|
||||
# ^\s+Reason:\s${LAST_REBOOT_REASON} -> Record
|
||||
|
||||
- name: Run command and parse with textfsm
|
||||
ansible.utils.cli_parse:
|
||||
command: "show version"
|
||||
parser:
|
||||
name: ansible.utils.textfsm
|
||||
register: parser_output
|
||||
|
||||
- name: Pass text and command
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ previous_command['stdout'] }}"
|
||||
parser:
|
||||
name: ansible.utils.textfsm
|
||||
command: show version
|
||||
register: parser_output
|
||||
|
||||
# Using the ttp parser
|
||||
|
||||
# -------------
|
||||
# templates/nxos_show_interface.ttp
|
||||
#
|
||||
# {{ interface }} is {{ state }}
|
||||
# admin state is {{ admin_state }}{{ ignore(".*") }}
|
||||
|
||||
- name: Run command and parse with ttp
|
||||
ansible.utils.cli_parse:
|
||||
command: "show interface"
|
||||
parser:
|
||||
name: ansible.utils.ttp
|
||||
set_fact: new_fact_key
|
||||
|
||||
- name: Pass text and template_path
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ previous_command['stdout'] }}"
|
||||
parser:
|
||||
name: ansible.utils.ttp
|
||||
template_path: "{{ role_path }}/templates/nxos_show_interface.ttp"
|
||||
register: parser_output
|
||||
|
||||
# Using the XML parser
|
||||
|
||||
# -------------
|
||||
- name: Run command and parse with xml
|
||||
ansible.utils.cli_parse:
|
||||
command: "show interface | xml"
|
||||
parser:
|
||||
name: ansible.utils.xml
|
||||
register: parser_output
|
||||
|
||||
- name: Pass text and parse with xml
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ previous_command['stdout'] }}"
|
||||
parser:
|
||||
name: ansible.utils.xml
|
||||
register: parser_output
|
||||
"""
|
||||
|
||||
RETURN = r"""
|
||||
parsed:
|
||||
description: The structured data resulting from the parsing of the text
|
||||
returned: always
|
||||
type: dict
|
||||
sample:
|
||||
stdout:
|
||||
description: The output from the command run
|
||||
returned: when provided a command
|
||||
type: str
|
||||
sample:
|
||||
stdout_lines:
|
||||
description: The output of the command run split into lines
|
||||
returned: when provided a command
|
||||
type: list
|
||||
sample:
|
||||
"""
|
|
@ -1,3 +0,0 @@
|
|||
# The follow are 3rd party libs for validate
|
||||
jsonschema
|
||||
# /valiate
|
|
@ -4,6 +4,14 @@ flake8
|
|||
mock ; python_version < '3.5'
|
||||
pytest-xdist
|
||||
yamllint
|
||||
# The follow are 3rd party libs for valiate
|
||||
|
||||
# The follow are 3rd party libs for validate
|
||||
jsonschema
|
||||
# /valiate
|
||||
|
||||
# The follow are 3rd party libs for cli_parse
|
||||
textfsm
|
||||
ttp
|
||||
xmltodict
|
||||
# /cli_parse
|
||||
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
mgmt0 is up
|
||||
admin state is up,
|
||||
Hardware: Ethernet, address: 0000.000a.0000 (bia 0000.000a.0000)
|
||||
Internet Address is 192.168.0.38/24
|
||||
MTU 1500 bytes, BW 1000000 Kbit, DLY 10 usec
|
||||
reliability 189/255, txload 1/255, rxload 1/255
|
||||
Encapsulation ARPA, medium is broadcast
|
||||
full-duplex, 1000 Mb/s
|
||||
Auto-Negotiation is turned on
|
||||
Auto-mdix is turned off
|
||||
EtherType is 0x0000
|
||||
1 minute input rate 944 bits/sec, 0 packets/sec
|
||||
1 minute output rate 400 bits/sec, 0 packets/sec
|
||||
Rx
|
||||
7006741 input packets 2790464 unicast packets 1157599 multicast packets
|
||||
3058678 broadcast packets 840241052 bytes
|
||||
Tx
|
||||
2942644 output packets 2790426 unicast packets 152126 multicast packets
|
||||
92 broadcast packets 320387552 bytes
|
||||
|
||||
Ethernet1/1 is up
|
||||
admin state is up, Dedicated Interface
|
||||
Belongs to Po10
|
||||
Hardware: 100/1000/10000 Ethernet, address: 0000.000a.0008 (bia 0000.000a.0008)
|
||||
MTU 1500 bytes, BW 1000000 Kbit, DLY 10 usec
|
||||
reliability 255/255, txload 1/255, rxload 1/255
|
||||
Encapsulation ARPA, medium is broadcast
|
||||
Port mode is access
|
||||
full-duplex, 1000 Mb/s
|
||||
Beacon is turned off
|
||||
Auto-Negotiation is turned on FEC mode is Auto
|
||||
Input flow-control is off, output flow-control is off
|
||||
Auto-mdix is turned off
|
||||
Switchport monitor is off
|
||||
EtherType is 0x8100
|
||||
EEE (efficient-ethernet) : n/a
|
||||
Last link flapped 9week(s) 0day(s)
|
||||
Last clearing of "show interface" counters never
|
||||
4 interface resets
|
||||
30 seconds input rate 0 bits/sec, 0 packets/sec
|
||||
30 seconds output rate 0 bits/sec, 0 packets/sec
|
||||
Load-Interval #2: 5 minute (300 seconds)
|
||||
input rate 0 bps, 0 pps; output rate 0 bps, 0 pps
|
||||
RX
|
||||
0 unicast packets 0 multicast packets 0 broadcast packets
|
||||
0 input packets 0 bytes
|
||||
0 jumbo packets 0 storm suppression packets
|
||||
0 runts 0 giants 0 CRC 0 no buffer
|
||||
0 input error 0 short frame 0 overrun 0 underrun 0 ignored
|
||||
0 watchdog 0 bad etype drop 0 bad proto drop 0 if down drop
|
||||
0 input with dribble 0 input discard
|
||||
0 Rx pause
|
||||
TX
|
||||
0 unicast packets 0 multicast packets 0 broadcast packets
|
||||
0 output packets 0 bytes
|
||||
0 jumbo packets
|
||||
0 output error 0 collision 0 deferred 0 late collision
|
||||
0 lost carrier 0 no carrier 0 babble 0 output discard
|
||||
0 Tx pause
|
||||
|
||||
Ethernet1/8 is down (Link not connected)
|
||||
admin state is up, Dedicated Interface
|
||||
Hardware: 100/1000/10000 Ethernet, address: 0000.000a.000f (bia 0000.000a.000f)
|
||||
MTU 1500 bytes, BW 10000000 Kbit, DLY 10 usec
|
||||
reliability 255/255, txload 1/255, rxload 1/255
|
||||
Encapsulation ARPA, medium is broadcast
|
||||
Port mode is access
|
||||
auto-duplex, auto-speed
|
||||
Beacon is turned off
|
||||
Auto-Negotiation is turned on FEC mode is Auto
|
||||
Input flow-control is off, output flow-control is off
|
||||
Auto-mdix is turned off
|
||||
Switchport monitor is off
|
||||
EtherType is 0x8100
|
||||
EEE (efficient-ethernet) : n/a
|
||||
Last link flapped never
|
||||
Last clearing of "show interface" counters never
|
||||
0 interface resets
|
||||
30 seconds input rate 0 bits/sec, 0 packets/sec
|
||||
30 seconds output rate 0 bits/sec, 0 packets/sec
|
||||
Load-Interval #2: 5 minute (300 seconds)
|
||||
input rate 0 bps, 0 pps; output rate 0 bps, 0 pps
|
||||
RX
|
||||
0 unicast packets 0 multicast packets 0 broadcast packets
|
||||
0 input packets 0 bytes
|
||||
0 jumbo packets 0 storm suppression packets
|
||||
0 runts 0 giants 0 CRC 0 no buffer
|
||||
0 input error 0 short frame 0 overrun 0 underrun 0 ignored
|
||||
0 watchdog 0 bad etype drop 0 bad proto drop 0 if down drop
|
||||
0 input with dribble 0 input discard
|
||||
0 Rx pause
|
||||
TX
|
||||
0 unicast packets 0 multicast packets 0 broadcast packets
|
||||
0 output packets 0 bytes
|
||||
0 jumbo packets
|
||||
0 output error 0 collision 0 deferred 0 late collision
|
||||
0 lost carrier 0 no carrier 0 babble 0 output discard
|
||||
0 Tx pause
|
|
@ -0,0 +1,53 @@
|
|||
<?xml version="1.0" encoding="ISO-8859-1"?>
|
||||
<nf:rpc-reply xmlns:nf="urn:ietf:params:xml:ns:netconf:base:1.0" xmlns="http://www.cisco.com/nxos:1.0:if_manager">
|
||||
<nf:data>
|
||||
<show>
|
||||
<interface>
|
||||
<__XML__OPT_Cmd_show_interface___readonly__>
|
||||
<__readonly__>
|
||||
<TABLE_interface>
|
||||
<ROW_interface>
|
||||
<interface>mgmt0</interface>
|
||||
<state>up</state>
|
||||
<admin_state>up</admin_state>
|
||||
<eth_hw_desc>Ethernet</eth_hw_desc>
|
||||
<eth_hw_addr>5e00.000b.0000</eth_hw_addr>
|
||||
<eth_bia_addr>5e00.000b.0000</eth_bia_addr>
|
||||
<eth_ip_addr>10.8.38.74</eth_ip_addr>
|
||||
<eth_ip_mask>24</eth_ip_mask>
|
||||
<eth_ip_prefix>10.8.38.0</eth_ip_prefix>
|
||||
<eth_mtu>1500</eth_mtu>
|
||||
<eth_bw>1000000</eth_bw>
|
||||
<eth_dly>10</eth_dly>
|
||||
<eth_reliability>188</eth_reliability>
|
||||
<eth_txload>1</eth_txload>
|
||||
<eth_rxload>1</eth_rxload>
|
||||
<medium>broadcast</medium>
|
||||
<eth_mode>routed</eth_mode>
|
||||
<eth_duplex>full</eth_duplex>
|
||||
<eth_speed>1000 Mb/s</eth_speed>
|
||||
<eth_autoneg>on</eth_autoneg>
|
||||
<eth_mdix>off</eth_mdix>
|
||||
<eth_ethertype>0x0000</eth_ethertype>
|
||||
<vdc_lvl_in_avg_bytes>816</vdc_lvl_in_avg_bytes>
|
||||
<vdc_lvl_in_avg_pkts>0</vdc_lvl_in_avg_pkts>
|
||||
<vdc_lvl_out_avg_bytes>272</vdc_lvl_out_avg_bytes>
|
||||
<vdc_lvl_out_avg_pkts>0</vdc_lvl_out_avg_pkts>
|
||||
<vdc_lvl_in_pkts>7716327</vdc_lvl_in_pkts>
|
||||
<vdc_lvl_in_ucast>2848860</vdc_lvl_in_ucast>
|
||||
<vdc_lvl_in_mcast>1743445</vdc_lvl_in_mcast>
|
||||
<vdc_lvl_in_bcast>3124022</vdc_lvl_in_bcast>
|
||||
<vdc_lvl_in_bytes>966991855</vdc_lvl_in_bytes>
|
||||
<vdc_lvl_out_pkts>3004554</vdc_lvl_out_pkts>
|
||||
<vdc_lvl_out_ucast>2849173</vdc_lvl_out_ucast>
|
||||
<vdc_lvl_out_mcast>155378</vdc_lvl_out_mcast>
|
||||
<vdc_lvl_out_bcast>3</vdc_lvl_out_bcast>
|
||||
<vdc_lvl_out_bytes>325131723</vdc_lvl_out_bytes>
|
||||
</ROW_interface>
|
||||
</TABLE_interface>
|
||||
</__readonly__>
|
||||
</__XML__OPT_Cmd_show_interface___readonly__>
|
||||
</interface>
|
||||
</show>
|
||||
</nf:data>
|
||||
</nf:rpc-reply>
|
|
@ -0,0 +1,37 @@
|
|||
Cisco Nexus Operating System (NX-OS) Software
|
||||
TAC support: http://www.cisco.com/tac
|
||||
Documents: http://www.cisco.com/en/US/products/ps9372/tsd_products_support_series_home.html
|
||||
Copyright (c) 2002-2016, Cisco Systems, Inc. All rights reserved.
|
||||
The copyrights to certain works contained herein are owned by
|
||||
other third parties and are used and distributed under license.
|
||||
Some parts of this software are covered under the GNU Public
|
||||
License. A copy of the license is available at
|
||||
http://www.gnu.org/licenses/gpl.html.
|
||||
|
||||
NX-OSv is a demo version of the Nexus Operating System
|
||||
|
||||
Software
|
||||
loader: version N/A
|
||||
kickstart: version 7.3(0)D1(1)
|
||||
system: version 7.3(0)D1(1)
|
||||
kickstart image file is: bootflash:///titanium-d1-kickstart.7.3.0.D1.1.bin
|
||||
kickstart compile time: 1/11/2016 16:00:00 [02/11/2016 10:30:12]
|
||||
system image file is: bootflash:///titanium-d1.7.3.0.D1.1.bin
|
||||
system compile time: 1/11/2016 16:00:00 [02/11/2016 13:08:11]
|
||||
|
||||
|
||||
Hardware
|
||||
cisco NX-OSv Chassis ("NX-OSv Supervisor Module")
|
||||
QEMU Virtual CPU version 2.5 with 3064740 kB of memory.
|
||||
Processor Board ID TM000B0000B
|
||||
|
||||
Device name: an-nxos-02
|
||||
bootflash: 3184776 kB
|
||||
|
||||
Kernel uptime is 110 day(s), 12 hour(s), 32 minute(s), 10 second(s)
|
||||
|
||||
|
||||
plugin
|
||||
Core Plugin, Ethernet Plugin
|
||||
|
||||
Active Package(s)
|
|
@ -0,0 +1,18 @@
|
|||
[
|
||||
[
|
||||
[
|
||||
{
|
||||
"admin_state": "up,",
|
||||
"interface": "mgmt0",
|
||||
"state": "up",
|
||||
"var": "extra_var"
|
||||
},
|
||||
{
|
||||
"admin_state": "up,",
|
||||
"interface": "Ethernet1/1",
|
||||
"state": "up",
|
||||
"var": "extra_var"
|
||||
}
|
||||
]
|
||||
]
|
||||
]
|
|
@ -0,0 +1,18 @@
|
|||
[
|
||||
[
|
||||
[
|
||||
{
|
||||
"admin_state": "up,",
|
||||
"interface": "mgmt0",
|
||||
"state": "up",
|
||||
"var": "extra_var"
|
||||
},
|
||||
{
|
||||
"admin_state": "up,",
|
||||
"interface": "Ethernet1/1",
|
||||
"state": "up",
|
||||
"var": "extra_var"
|
||||
}
|
||||
]
|
||||
]
|
||||
]
|
|
@ -0,0 +1,56 @@
|
|||
{
|
||||
"nf:rpc-reply": {
|
||||
"@xmlns": "http://www.cisco.com/nxos:1.0:if_manager",
|
||||
"@xmlns:nf": "urn:ietf:params:xml:ns:netconf:base:1.0",
|
||||
"nf:data": {
|
||||
"show": {
|
||||
"interface": {
|
||||
"__XML__OPT_Cmd_show_interface___readonly__": {
|
||||
"__readonly__": {
|
||||
"TABLE_interface": {
|
||||
"ROW_interface": {
|
||||
"admin_state": "up",
|
||||
"eth_autoneg": "on",
|
||||
"eth_bia_addr": "5e00.000b.0000",
|
||||
"eth_bw": "1000000",
|
||||
"eth_dly": "10",
|
||||
"eth_duplex": "full",
|
||||
"eth_ethertype": "0x0000",
|
||||
"eth_hw_addr": "5e00.000b.0000",
|
||||
"eth_hw_desc": "Ethernet",
|
||||
"eth_ip_addr": "10.8.38.74",
|
||||
"eth_ip_mask": "24",
|
||||
"eth_ip_prefix": "10.8.38.0",
|
||||
"eth_mdix": "off",
|
||||
"eth_mode": "routed",
|
||||
"eth_mtu": "1500",
|
||||
"eth_reliability": "188",
|
||||
"eth_rxload": "1",
|
||||
"eth_speed": "1000 Mb/s",
|
||||
"eth_txload": "1",
|
||||
"interface": "mgmt0",
|
||||
"medium": "broadcast",
|
||||
"state": "up",
|
||||
"vdc_lvl_in_avg_bytes": "816",
|
||||
"vdc_lvl_in_avg_pkts": "0",
|
||||
"vdc_lvl_in_bcast": "3124022",
|
||||
"vdc_lvl_in_bytes": "966991855",
|
||||
"vdc_lvl_in_mcast": "1743445",
|
||||
"vdc_lvl_in_pkts": "7716327",
|
||||
"vdc_lvl_in_ucast": "2848860",
|
||||
"vdc_lvl_out_avg_bytes": "272",
|
||||
"vdc_lvl_out_avg_pkts": "0",
|
||||
"vdc_lvl_out_bcast": "3",
|
||||
"vdc_lvl_out_bytes": "325131723",
|
||||
"vdc_lvl_out_mcast": "155378",
|
||||
"vdc_lvl_out_pkts": "3004554",
|
||||
"vdc_lvl_out_ucast": "2849173"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
{
|
||||
"BOOT_IMAGE": "bootflash:///titanium-d1-kickstart.7.3.0.D1.1.bin",
|
||||
"LAST_REBOOT_REASON": "",
|
||||
"OS": "7.3(0)D1(1)",
|
||||
"PLATFORM": "OSv",
|
||||
"UPTIME": "110 day(s), 12 hour(s), 32 minute(s), 10 second(s)"
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
---
|
||||
- name: "{{ parser }} validate argspec"
|
||||
ansible.utils.cli_parse:
|
||||
text: ""
|
||||
parser:
|
||||
name: ansible.utils.json
|
||||
template_path: ""
|
||||
command: ls
|
||||
register: argfail
|
||||
ignore_errors: true
|
||||
|
||||
- name: "{{ parser }} Check argspec fail"
|
||||
assert:
|
||||
that: "argfail['errors'] == 'parameters are mutually exclusive: command|template_path found in parser'"
|
||||
|
||||
- name: "{{ parser }} validate argspec"
|
||||
ansible.utils.cli_parse:
|
||||
text: ""
|
||||
command: ls
|
||||
parser:
|
||||
name: ansible.utils.json
|
||||
command: ""
|
||||
register: argfail
|
||||
ignore_errors: true
|
||||
|
||||
- name: "{{ parser }} Check argspec fail"
|
||||
assert:
|
||||
that: "argfail['errors'] == 'parameters are mutually exclusive: command|text'"
|
||||
|
||||
- name: "{{ parser }} validate argspec"
|
||||
ansible.utils.cli_parse:
|
||||
parser:
|
||||
name: ansible.netcommon.json
|
||||
command: ""
|
||||
register: argfail
|
||||
ignore_errors: true
|
||||
|
||||
- name: "{{ parser }} Check argspec fail"
|
||||
assert:
|
||||
that: "argfail['errors'] == 'one of the following is required: command, text'"
|
||||
|
||||
|
||||
- name: "{{ parser }} validate argspec"
|
||||
ansible.utils.cli_parse:
|
||||
text: ""
|
||||
parser:
|
||||
name: not_fqdn
|
||||
command: ""
|
||||
register: argfail
|
||||
ignore_errors: true
|
||||
|
||||
- name: "{{ parser }} Check arspec fail"
|
||||
assert:
|
||||
that: "argfail['msg'] == 'Parser name should be provided as a full name including collection'"
|
|
@ -0,0 +1,18 @@
|
|||
---
|
||||
- name: "{{ parser }} Run command and parse with textfsm"
|
||||
ansible.utils.cli_parse:
|
||||
command: "ifconfig"
|
||||
parser:
|
||||
name: ansible.utils.textfsm
|
||||
set_fact: myfact
|
||||
register: ifconfig_out
|
||||
|
||||
- name: "{{ parser }} Check parser output"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ myfact is defined }}"
|
||||
- "{{ ifconfig_out['stdout'] is defined }}"
|
||||
- "{{ ifconfig_out['stdout_lines'] is defined }}"
|
||||
- "{{ ifconfig_out['parsed'] is defined }}"
|
||||
- "{{ ifconfig_out['parsed'][0]['Interface'] is defined }}"
|
|
@ -0,0 +1,18 @@
|
|||
---
|
||||
- name: "{{ parser }} Run command and parse with ttp"
|
||||
ansible.utils.cli_parse:
|
||||
command: "df -h"
|
||||
parser:
|
||||
name: ansible.utils.ttp
|
||||
set_fact: myfact
|
||||
register: df_h_out
|
||||
|
||||
- name: "{{ parser }} Check parser output"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ myfact is defined }}"
|
||||
- "{{ df_h_out['stdout'] is defined }}"
|
||||
- "{{ df_h_out['stdout_lines'] is defined }}"
|
||||
- "{{ df_h_out['parsed'] is defined }}"
|
||||
- "{{ df_h_out['parsed'][0][0][0]['Filesystem'] is defined }}"
|
|
@ -0,0 +1,18 @@
|
|||
---
|
||||
- name: "{{ parser }} Run command and parse with textfsm"
|
||||
ansible.utils.cli_parse:
|
||||
command: "ifconfig"
|
||||
parser:
|
||||
name: ansible.utils.textfsm
|
||||
set_fact: myfact
|
||||
register: ifconfig_out
|
||||
|
||||
- name: "{{ parser }} Check parser output"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ myfact is defined }}"
|
||||
- "{{ ifconfig_out['stdout'] is defined }}"
|
||||
- "{{ ifconfig_out['stdout_lines'] is defined }}"
|
||||
- "{{ ifconfig_out['parsed'] is defined }}"
|
||||
- "{{ ifconfig_out['parsed'][0]['Interface'] is defined }}"
|
|
@ -0,0 +1,18 @@
|
|||
---
|
||||
- name: "{{ parser }} Run command and parse with ttp"
|
||||
ansible.utils.cli_parse:
|
||||
command: "df -h"
|
||||
parser:
|
||||
name: ansible.utils.ttp
|
||||
set_fact: myfact
|
||||
register: df_h_out
|
||||
|
||||
- name: "{{ parser }} Check parser output"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ myfact is defined }}"
|
||||
- "{{ df_h_out['stdout'] is defined }}"
|
||||
- "{{ df_h_out['stdout_lines'] is defined }}"
|
||||
- "{{ df_h_out['parsed'] is defined }}"
|
||||
- "{{ df_h_out['parsed'][0][0][0]['Filesystem'] is defined }}"
|
|
@ -0,0 +1,78 @@
|
|||
---
|
||||
- name: Set a short name
|
||||
set_fact:
|
||||
os: "{{ ansible_distribution|d }}"
|
||||
|
||||
- include_tasks: argspec.yaml
|
||||
vars:
|
||||
parser: "({{ inventory_hostname }}/argspec)"
|
||||
|
||||
- include_tasks: "nxos_json.yaml"
|
||||
vars:
|
||||
parser: "(nxos/json)"
|
||||
tags:
|
||||
- json
|
||||
|
||||
- include_tasks: "nxos_textfsm.yaml"
|
||||
vars:
|
||||
parser: "(nxos/textfsm)"
|
||||
tags:
|
||||
- textfsm
|
||||
|
||||
- include_tasks: "nxos_ttp.yaml"
|
||||
vars:
|
||||
parser: "(nxos/ttp)"
|
||||
tags:
|
||||
- ttp
|
||||
|
||||
- include_tasks: "nxos_xml.yaml"
|
||||
vars:
|
||||
parser: "(nxos/xml)"
|
||||
tags:
|
||||
- xml
|
||||
|
||||
- name: debug os
|
||||
debug:
|
||||
msg: "{{ os }}"
|
||||
|
||||
- include_tasks: "centos_textfsm.yaml"
|
||||
vars:
|
||||
parser: "(centos/textfsm)"
|
||||
when: os == 'centos'
|
||||
tags:
|
||||
- textfsm
|
||||
|
||||
- include_tasks: "centos_ttp.yaml"
|
||||
vars:
|
||||
parser: "(centos/ttp)"
|
||||
when: os == 'centos'
|
||||
tags:
|
||||
- ttp
|
||||
|
||||
- include_tasks: "fedora_textfsm.yaml"
|
||||
vars:
|
||||
parser: "(fedora/textfsm)"
|
||||
when: os == 'fedora'
|
||||
tags:
|
||||
- textfsm
|
||||
|
||||
- include_tasks: "fedora_ttp.yaml"
|
||||
vars:
|
||||
parser: "(fedora/ttp)"
|
||||
when: os == 'fedora'
|
||||
tags:
|
||||
- ttp
|
||||
|
||||
- include_tasks: "ubuntu_textfsm.yaml"
|
||||
vars:
|
||||
parser: "(ubuntu/textfsm)"
|
||||
when: os == 'ubuntu'
|
||||
tags:
|
||||
- textfsm
|
||||
|
||||
- include_tasks: "ubuntu_ttp.yaml"
|
||||
vars:
|
||||
parser: "(ubuntu/ttp)"
|
||||
when: os == 'ubuntu'
|
||||
tags:
|
||||
- ttp
|
|
@ -0,0 +1,18 @@
|
|||
---
|
||||
- set_fact:
|
||||
nxos_json_text_parsed: "{{ lookup('file', '{{ role_path }}/output/nxos_show_interface_json_text.txt') }}"
|
||||
|
||||
- name: "{{ parser }} Run command and parse with json"
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ lookup('file', '{{ role_path }}/output/nxos_show_interface_json_text.txt') }}"
|
||||
parser:
|
||||
name: ansible.utils.json
|
||||
register: nxos_json_text
|
||||
|
||||
- name: "{{ parser }} Confirm response"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ nxos_json_text['parsed'] is defined }}"
|
||||
- "{{ nxos_json_text['parsed'][0][0][0]['admin_state'] is defined }}"
|
||||
- "{{ nxos_json_text['parsed'] == nxos_json_text_parsed }}"
|
|
@ -0,0 +1,19 @@
|
|||
---
|
||||
- set_fact:
|
||||
nxos_textfsm_text_parsed: "{{ lookup('file', '{{ role_path }}/output/nxos_show_version_textfsm_parsed.json') | from_json }}"
|
||||
|
||||
- name: "{{ parser }} Pass text and command"
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ lookup('file', '{{ role_path }}/files/nxos_show_version.txt') }}"
|
||||
parser:
|
||||
name: ansible.utils.textfsm
|
||||
template_path: "{{ role_path }}/templates/nxos_show_version.textfsm"
|
||||
register: nxos_textfsm_text
|
||||
|
||||
- name: "{{ parser }} Confirm response"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ nxos_textfsm_text['parsed'] == nxos_textfsm_text['parsed'] }}"
|
||||
- "{{ nxos_textfsm_text['parsed'][0]['BOOT_IMAGE'] is defined }}"
|
||||
- "{{ nxos_textfsm_text['parsed'][0] == nxos_textfsm_text_parsed }}"
|
|
@ -0,0 +1,54 @@
|
|||
---
|
||||
- set_fact:
|
||||
nxos_ttp_text_parsed: "{{ lookup('file', '{{ role_path }}/output/nxos_show_interface_ttp_parsed.json') | from_json }}"
|
||||
|
||||
- name: "{{ parser }} Pass text and template_path"
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ lookup('file', '{{ role_path }}/files/nxos_show_interface.txt') }}"
|
||||
parser:
|
||||
name: ansible.utils.ttp
|
||||
template_path: "{{ role_path }}/templates/nxos_show_interface.ttp"
|
||||
set_fact: POpqMQoJWTiDpEW
|
||||
register: nxos_ttp_text
|
||||
|
||||
- name: "{{ parser }} Confirm response"
|
||||
assert:
|
||||
that:
|
||||
- "{{ POpqMQoJWTiDpEW is defined }}"
|
||||
- "{{ nxos_ttp_text['parsed'][0][0] | selectattr('interface', 'search', 'mgmt0') | list | length }}"
|
||||
- "{{ nxos_ttp_text['parsed'] == nxos_ttp_text_parsed }}"
|
||||
|
||||
|
||||
- name: "{{ parser }} Pass text and custom variable"
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ lookup('file', '{{ role_path }}/files/nxos_show_interface.txt') }}"
|
||||
parser:
|
||||
name: ansible.utils.ttp
|
||||
template_path: "{{ role_path }}/templates/nxos_show_interface.ttp"
|
||||
vars:
|
||||
ttp_vars:
|
||||
extra_var: some_text
|
||||
register: nxos_ttp_vars
|
||||
|
||||
- name: "{{ parser }} Confirm modified results"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ nxos_ttp_vars['parsed'][0][0][0]['var'] == 'some_text' }}"
|
||||
|
||||
- name: "{{ parser }} Pass text and ttp_results modified"
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ lookup('file', '{{ role_path }}/files/nxos_show_interface.txt') }}"
|
||||
parser:
|
||||
name: ansible.utils.ttp
|
||||
template_path: "{{ role_path }}/templates/nxos_show_interface.ttp"
|
||||
vars:
|
||||
ttp_results:
|
||||
format: yaml
|
||||
register: nxos_ttp_results
|
||||
|
||||
- name: "{{ parser }} Confirm modified results"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ (nxos_ttp_results['parsed'][0]|from_yaml)[0] | selectattr('interface', 'search', 'mgmt0') | list | length }}"
|
|
@ -0,0 +1,18 @@
|
|||
---
|
||||
- set_fact:
|
||||
nxos_xml_text_parsed: "{{ lookup('file', '{{ role_path }}/output/nxos_show_interface_xml_parsed.json') | from_json }}"
|
||||
|
||||
- name: "{{ parser }} Pass text and parse with xml"
|
||||
ansible.utils.cli_parse:
|
||||
text: "{{ lookup('file', '{{ role_path }}/files/nxos_show_interface.xml') }}"
|
||||
parser:
|
||||
name: ansible.utils.xml
|
||||
os: nxos
|
||||
register: nxos_xml_text
|
||||
|
||||
- name: "{{ parser }} Confirm response"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ nxos_xml_text['parsed'] == nxos_xml_text_parsed }}"
|
||||
- "{{ nxos_xml_text['parsed']['nf:rpc-reply'] is defined }}"
|
|
@ -0,0 +1,18 @@
|
|||
---
|
||||
- name: "{{ parser }} Run command and parse with textfsm"
|
||||
ansible.utils.cli_parse:
|
||||
command: "ifconfig"
|
||||
parser:
|
||||
name: ansible.utils.textfsm
|
||||
set_fact: myfact
|
||||
register: ifconfig_out
|
||||
|
||||
- name: "{{ parser }} Check parser output"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ myfact is defined }}"
|
||||
- "{{ ifconfig_out['stdout'] is defined }}"
|
||||
- "{{ ifconfig_out['stdout_lines'] is defined }}"
|
||||
- "{{ ifconfig_out['parsed'] is defined }}"
|
||||
- "{{ ifconfig_out['parsed'][0]['Interface'] is defined }}"
|
|
@ -0,0 +1,18 @@
|
|||
---
|
||||
- name: "{{ parser }} Run command and parse with ttp"
|
||||
ansible.utils.cli_parse:
|
||||
command: "df -h"
|
||||
parser:
|
||||
name: ansible.utils.ttp
|
||||
set_fact: myfact
|
||||
register: df_h_out
|
||||
|
||||
- name: "{{ parser }} Check parser output"
|
||||
assert:
|
||||
that: "{{ item }}"
|
||||
with_items:
|
||||
- "{{ myfact is defined }}"
|
||||
- "{{ df_h_out['stdout'] is defined }}"
|
||||
- "{{ df_h_out['stdout_lines'] is defined }}"
|
||||
- "{{ df_h_out['parsed'] is defined }}"
|
||||
- "{{ df_h_out['parsed'][0][0][0]['Filesystem'] is defined }}"
|
|
@ -0,0 +1 @@
|
|||
Filesystem Size Used Avail Use Mounted_on {{ _headers_ }}
|
|
@ -0,0 +1,19 @@
|
|||
# template from https://github.com/google/textfsm/blob/master/examples/unix_ifcfg_template
|
||||
Value Required Interface ([^:]+)
|
||||
Value MTU (\d+)
|
||||
Value State ((in)?active)
|
||||
Value MAC ([\d\w:]+)
|
||||
Value List Inet ([\d\.]+)
|
||||
Value List Netmask (\S+)
|
||||
# Don't match interface local (fe80::/10) - achieved with excluding '%'.
|
||||
Value List Inet6 ([^%]+)
|
||||
Value List Prefix (\d+)
|
||||
|
||||
Start
|
||||
# Record interface record (if we have one).
|
||||
^\S+:.* -> Continue.Record
|
||||
# Collect data for new interface.
|
||||
^${Interface}:.* mtu ${MTU}
|
||||
^\s+ether ${MAC}
|
||||
^\s+inet6 ${Inet6} prefixlen ${Prefix}
|
||||
^\s+inet ${Inet} netmask ${Netmask}
|
|
@ -0,0 +1 @@
|
|||
Filesystem Size Used Avail Use Mounted_on {{ _headers_ }}
|
|
@ -0,0 +1,19 @@
|
|||
# template from https://github.com/google/textfsm/blob/master/examples/unix_ifcfg_template
|
||||
Value Required Interface ([^:]+)
|
||||
Value MTU (\d+)
|
||||
Value State ((in)?active)
|
||||
Value MAC ([\d\w:]+)
|
||||
Value List Inet ([\d\.]+)
|
||||
Value List Netmask (\S+)
|
||||
# Don't match interface local (fe80::/10) - achieved with excluding '%'.
|
||||
Value List Inet6 ([^%]+)
|
||||
Value List Prefix (\d+)
|
||||
|
||||
Start
|
||||
# Record interface record (if we have one).
|
||||
^\S+:.* -> Continue.Record
|
||||
# Collect data for new interface.
|
||||
^${Interface}:.* mtu ${MTU}
|
||||
^\s+ether ${MAC}
|
||||
^\s+inet6 ${Inet6} prefixlen ${Prefix}
|
||||
^\s+inet ${Inet} netmask ${Netmask}
|
|
@ -0,0 +1,3 @@
|
|||
{{ interface }} is {{ state }}
|
||||
admin state is {{ admin_state }}{{ ignore(".*") }}
|
||||
{{ var | set("extra_var") }}
|
|
@ -0,0 +1,24 @@
|
|||
---
|
||||
- example: Ethernet1/1 is up
|
||||
getval: '(?P<name>\S+) is (?P<oper_state>\S+)'
|
||||
result:
|
||||
"{{ name }}":
|
||||
name: "{{ name }}"
|
||||
state:
|
||||
operating: "{{ oper_state }}"
|
||||
shared: true
|
||||
|
||||
- example: admin state is up, Dedicated Interface
|
||||
getval: 'admin state is (?P<admin_state>\S+)'
|
||||
result:
|
||||
"{{ name }}":
|
||||
name: "{{ name }}"
|
||||
state:
|
||||
admin: "{{ admin_state }}"
|
||||
|
||||
- example: " Hardware: Ethernet, address: 5254.005a.f8b5 (bia 5254.005a.f8b5)"
|
||||
getval: '\s+Hardware: (?P<hardware>.*), address: (?P<mac>\S+)'
|
||||
result:
|
||||
"{{ name }}":
|
||||
hardware: "{{ hardware }}"
|
||||
mac_address: "{{ mac }}"
|
|
@ -0,0 +1,16 @@
|
|||
Value UPTIME ((\d+\s\w+.s.,?\s?){4})
|
||||
Value LAST_REBOOT_REASON (.+)
|
||||
Value OS (\d+.\d+(.+)?)
|
||||
Value BOOT_IMAGE (.*)
|
||||
Value PLATFORM (\w+)
|
||||
|
||||
Start
|
||||
^\s+(NXOS: version|system:\s+version)\s+${OS}\s*$$
|
||||
^\s+(NXOS|kickstart)\s+image\s+file\s+is:\s+${BOOT_IMAGE}\s*$$
|
||||
^\s+cisco\s+${PLATFORM}\s+[cC]hassis
|
||||
^\s+cisco\s+Nexus\d+\s+${PLATFORM}
|
||||
# Cisco N5K platform
|
||||
^\s+cisco\s+Nexus\s+${PLATFORM}\s+[cC]hassis
|
||||
^\s+cisco\s+.+-${PLATFORM}\s*
|
||||
^Kernel\s+uptime\s+is\s+${UPTIME}
|
||||
^\s+Reason:\s${LAST_REBOOT_REASON} -> Record
|
|
@ -0,0 +1 @@
|
|||
Filesystem Size Used Avail Use Mounted_on {{ _headers_ }}
|
|
@ -0,0 +1,19 @@
|
|||
# template from https://github.com/google/textfsm/blob/master/examples/unix_ifcfg_template
|
||||
Value Required Interface ([^:]+)
|
||||
Value MTU (\d+)
|
||||
Value State ((in)?active)
|
||||
Value MAC ([\d\w:]+)
|
||||
Value List Inet ([\d\.]+)
|
||||
Value List Netmask (\S+)
|
||||
# Don't match interface local (fe80::/10) - achieved with excluding '%'.
|
||||
Value List Inet6 ([^%]+)
|
||||
Value List Prefix (\d+)
|
||||
|
||||
Start
|
||||
# Record interface record (if we have one).
|
||||
^\S+:.* -> Continue.Record
|
||||
# Collect data for new interface.
|
||||
^${Interface}:.* mtu ${MTU}
|
||||
^\s+ether ${MAC}
|
||||
^\s+inet6 ${Inet6} prefixlen ${Prefix}
|
||||
^\s+inet ${Inet} netmask ${Netmask}
|
|
@ -0,0 +1,34 @@
|
|||
# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
|
||||
#
|
||||
# This file is part of Ansible
|
||||
#
|
||||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Make coding more python3-ish
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
#
|
||||
# Compat for python2.7
|
||||
#
|
||||
|
||||
# One unittest needs to import builtins via __import__() so we need to have
|
||||
# the string that represents it
|
||||
try:
|
||||
import __builtin__
|
||||
except ImportError:
|
||||
BUILTINS = "builtins"
|
||||
else:
|
||||
BUILTINS = "__builtin__"
|
|
@ -0,0 +1,127 @@
|
|||
# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
|
||||
#
|
||||
# This file is part of Ansible
|
||||
#
|
||||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Make coding more python3-ish
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
"""
|
||||
Compat module for Python3.x's unittest.mock module
|
||||
"""
|
||||
import _io
|
||||
import sys
|
||||
|
||||
# Python 2.7
|
||||
|
||||
# Note: Could use the pypi mock library on python3.x as well as python2.x. It
|
||||
# is the same as the python3 stdlib mock library
|
||||
|
||||
try:
|
||||
# Allow wildcard import because we really do want to import all of mock's
|
||||
# symbols into this compat shim
|
||||
# pylint: disable=wildcard-import,unused-wildcard-import
|
||||
from unittest.mock import *
|
||||
except ImportError:
|
||||
# Python 2
|
||||
# pylint: disable=wildcard-import,unused-wildcard-import
|
||||
try:
|
||||
from mock import *
|
||||
except ImportError:
|
||||
print("You need the mock library installed on python2.x to run tests")
|
||||
|
||||
|
||||
# Prior to 3.4.4, mock_open cannot handle binary read_data
|
||||
if sys.version_info >= (3,) and sys.version_info < (3, 4, 4):
|
||||
file_spec = None
|
||||
|
||||
def _iterate_read_data(read_data):
|
||||
# Helper for mock_open:
|
||||
# Retrieve lines from read_data via a generator so that separate calls to
|
||||
# readline, read, and readlines are properly interleaved
|
||||
sep = b"\n" if isinstance(read_data, bytes) else "\n"
|
||||
data_as_list = [l + sep for l in read_data.split(sep)]
|
||||
|
||||
if data_as_list[-1] == sep:
|
||||
# If the last line ended in a newline, the list comprehension will have an
|
||||
# extra entry that's just a newline. Remove this.
|
||||
data_as_list = data_as_list[:-1]
|
||||
else:
|
||||
# If there wasn't an extra newline by itself, then the file being
|
||||
# emulated doesn't have a newline to end the last line remove the
|
||||
# newline that our naive format() added
|
||||
data_as_list[-1] = data_as_list[-1][:-1]
|
||||
|
||||
for line in data_as_list:
|
||||
yield line
|
||||
|
||||
def mock_open(mock=None, read_data=""):
|
||||
"""
|
||||
A helper function to create a mock to replace the use of `open`. It works
|
||||
for `open` called directly or used as a context manager.
|
||||
|
||||
The `mock` argument is the mock object to configure. If `None` (the
|
||||
default) then a `MagicMock` will be created for you, with the API limited
|
||||
to methods or attributes available on standard file handles.
|
||||
|
||||
`read_data` is a string for the `read` methoddline`, and `readlines` of the
|
||||
file handle to return. This is an empty string by default.
|
||||
"""
|
||||
|
||||
def _readlines_side_effect(*args, **kwargs):
|
||||
if handle.readlines.return_value is not None:
|
||||
return handle.readlines.return_value
|
||||
return list(_data)
|
||||
|
||||
def _read_side_effect(*args, **kwargs):
|
||||
if handle.read.return_value is not None:
|
||||
return handle.read.return_value
|
||||
return type(read_data)().join(_data)
|
||||
|
||||
def _readline_side_effect():
|
||||
if handle.readline.return_value is not None:
|
||||
while True:
|
||||
yield handle.readline.return_value
|
||||
for line in _data:
|
||||
yield line
|
||||
|
||||
global file_spec
|
||||
if file_spec is None:
|
||||
|
||||
file_spec = list(
|
||||
set(dir(_io.TextIOWrapper)).union(set(dir(_io.BytesIO)))
|
||||
)
|
||||
|
||||
if mock is None:
|
||||
mock = MagicMock(name="open", spec=open)
|
||||
|
||||
handle = MagicMock(spec=file_spec)
|
||||
handle.__enter__.return_value = handle
|
||||
|
||||
_data = _iterate_read_data(read_data)
|
||||
|
||||
handle.write.return_value = None
|
||||
handle.read.return_value = None
|
||||
handle.readline.return_value = None
|
||||
handle.readlines.return_value = None
|
||||
|
||||
handle.read.side_effect = _read_side_effect
|
||||
handle.readline.side_effect = _readline_side_effect()
|
||||
handle.readlines.side_effect = _readlines_side_effect
|
||||
|
||||
mock.return_value = handle
|
||||
return mock
|
|
@ -0,0 +1,39 @@
|
|||
# (c) 2014, Toshio Kuratomi <tkuratomi@ansible.com>
|
||||
#
|
||||
# This file is part of Ansible
|
||||
#
|
||||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Make coding more python3-ish
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
"""
|
||||
Compat module for Python2.7's unittest module
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
# Allow wildcard import because we really do want to import all of
|
||||
# unittests's symbols into this compat shim
|
||||
# pylint: disable=wildcard-import,unused-wildcard-import
|
||||
if sys.version_info < (2, 7):
|
||||
try:
|
||||
# Need unittest2 on python2.6
|
||||
from unittest2 import *
|
||||
except ImportError:
|
||||
print("You need unittest2 installed on python2.6.x to run tests")
|
||||
else:
|
||||
from unittest import *
|
|
@ -0,0 +1,116 @@
|
|||
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
|
||||
#
|
||||
# This file is part of Ansible
|
||||
#
|
||||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Make coding more python3-ish
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import os
|
||||
|
||||
from ansible.errors import AnsibleParserError
|
||||
from ansible.parsing.dataloader import DataLoader
|
||||
from ansible.module_utils._text import to_bytes, to_text
|
||||
|
||||
|
||||
class DictDataLoader(DataLoader):
|
||||
def __init__(self, file_mapping=None):
|
||||
file_mapping = {} if file_mapping is None else file_mapping
|
||||
assert type(file_mapping) == dict
|
||||
|
||||
super(DictDataLoader, self).__init__()
|
||||
|
||||
self._file_mapping = file_mapping
|
||||
self._build_known_directories()
|
||||
self._vault_secrets = None
|
||||
|
||||
def load_from_file(self, path, cache=True, unsafe=False):
|
||||
path = to_text(path)
|
||||
if path in self._file_mapping:
|
||||
return self.load(self._file_mapping[path], path)
|
||||
return None
|
||||
|
||||
# TODO: the real _get_file_contents returns a bytestring, so we actually convert the
|
||||
# unicode/text it's created with to utf-8
|
||||
def _get_file_contents(self, path):
|
||||
path = to_text(path)
|
||||
if path in self._file_mapping:
|
||||
return (to_bytes(self._file_mapping[path]), False)
|
||||
else:
|
||||
raise AnsibleParserError("file not found: %s" % path)
|
||||
|
||||
def path_exists(self, path):
|
||||
path = to_text(path)
|
||||
return path in self._file_mapping or path in self._known_directories
|
||||
|
||||
def is_file(self, path):
|
||||
path = to_text(path)
|
||||
return path in self._file_mapping
|
||||
|
||||
def is_directory(self, path):
|
||||
path = to_text(path)
|
||||
return path in self._known_directories
|
||||
|
||||
def list_directory(self, path):
|
||||
ret = []
|
||||
path = to_text(path)
|
||||
for x in list(self._file_mapping.keys()) + self._known_directories:
|
||||
if x.startswith(path):
|
||||
if os.path.dirname(x) == path:
|
||||
ret.append(os.path.basename(x))
|
||||
return ret
|
||||
|
||||
def is_executable(self, path):
|
||||
# FIXME: figure out a way to make paths return true for this
|
||||
return False
|
||||
|
||||
def _add_known_directory(self, directory):
|
||||
if directory not in self._known_directories:
|
||||
self._known_directories.append(directory)
|
||||
|
||||
def _build_known_directories(self):
|
||||
self._known_directories = []
|
||||
for path in self._file_mapping:
|
||||
dirname = os.path.dirname(path)
|
||||
while dirname not in ("/", ""):
|
||||
self._add_known_directory(dirname)
|
||||
dirname = os.path.dirname(dirname)
|
||||
|
||||
def push(self, path, content):
|
||||
rebuild_dirs = False
|
||||
if path not in self._file_mapping:
|
||||
rebuild_dirs = True
|
||||
|
||||
self._file_mapping[path] = content
|
||||
|
||||
if rebuild_dirs:
|
||||
self._build_known_directories()
|
||||
|
||||
def pop(self, path):
|
||||
if path in self._file_mapping:
|
||||
del self._file_mapping[path]
|
||||
self._build_known_directories()
|
||||
|
||||
def clear(self):
|
||||
self._file_mapping = dict()
|
||||
self._known_directories = []
|
||||
|
||||
def get_basedir(self):
|
||||
return os.getcwd()
|
||||
|
||||
def set_vault_secrets(self, vault_secrets):
|
||||
self._vault_secrets = vault_secrets
|
|
@ -0,0 +1,12 @@
|
|||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
from ansible_collections.ansible.netcommon.tests.unit.compat.mock import (
|
||||
MagicMock,
|
||||
)
|
||||
from ansible.utils.path import unfrackpath
|
||||
|
||||
|
||||
mock_unfrackpath_noop = MagicMock(
|
||||
spec_set=unfrackpath, side_effect=lambda x, *args, **kwargs: x
|
||||
)
|
|
@ -0,0 +1,94 @@
|
|||
# (c) 2016, Matt Davis <mdavis@ansible.com>
|
||||
# (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com>
|
||||
#
|
||||
# This file is part of Ansible
|
||||
#
|
||||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Make coding more python3-ish
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import sys
|
||||
import json
|
||||
|
||||
from contextlib import contextmanager
|
||||
from io import BytesIO, StringIO
|
||||
from ansible_collections.ansible.netcommon.tests.unit.compat import unittest
|
||||
from ansible.module_utils.six import PY3
|
||||
from ansible.module_utils._text import to_bytes
|
||||
|
||||
|
||||
@contextmanager
|
||||
def swap_stdin_and_argv(stdin_data="", argv_data=tuple()):
|
||||
"""
|
||||
context manager that temporarily masks the test runner's values for stdin and argv
|
||||
"""
|
||||
real_stdin = sys.stdin
|
||||
real_argv = sys.argv
|
||||
|
||||
if PY3:
|
||||
fake_stream = StringIO(stdin_data)
|
||||
fake_stream.buffer = BytesIO(to_bytes(stdin_data))
|
||||
else:
|
||||
fake_stream = BytesIO(to_bytes(stdin_data))
|
||||
|
||||
try:
|
||||
sys.stdin = fake_stream
|
||||
sys.argv = argv_data
|
||||
|
||||
yield
|
||||
finally:
|
||||
sys.stdin = real_stdin
|
||||
sys.argv = real_argv
|
||||
|
||||
|
||||
@contextmanager
|
||||
def swap_stdout():
|
||||
"""
|
||||
context manager that temporarily replaces stdout for tests that need to verify output
|
||||
"""
|
||||
old_stdout = sys.stdout
|
||||
|
||||
if PY3:
|
||||
fake_stream = StringIO()
|
||||
else:
|
||||
fake_stream = BytesIO()
|
||||
|
||||
try:
|
||||
sys.stdout = fake_stream
|
||||
|
||||
yield fake_stream
|
||||
finally:
|
||||
sys.stdout = old_stdout
|
||||
|
||||
|
||||
class ModuleTestCase(unittest.TestCase):
|
||||
def setUp(self, module_args=None):
|
||||
if module_args is None:
|
||||
module_args = {
|
||||
"_ansible_remote_tmp": "/tmp",
|
||||
"_ansible_keep_remote_files": False,
|
||||
}
|
||||
|
||||
args = json.dumps(dict(ANSIBLE_MODULE_ARGS=module_args))
|
||||
|
||||
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
|
||||
self.stdin_swap = swap_stdin_and_argv(stdin_data=args)
|
||||
self.stdin_swap.__enter__()
|
||||
|
||||
def tearDown(self):
|
||||
# unittest doesn't have a clean place to use a context manager, so we have to enter/exit manually
|
||||
self.stdin_swap.__exit__(None, None, None)
|
|
@ -0,0 +1,42 @@
|
|||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Make coding more python3-ish
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
from ansible.module_utils._text import to_bytes
|
||||
|
||||
from ansible.parsing.vault import VaultSecret
|
||||
|
||||
|
||||
class TextVaultSecret(VaultSecret):
|
||||
"""A secret piece of text. ie, a password. Tracks text encoding.
|
||||
|
||||
The text encoding of the text may not be the default text encoding so
|
||||
we keep track of the encoding so we encode it to the same bytes."""
|
||||
|
||||
def __init__(self, text, encoding=None, errors=None, _bytes=None):
|
||||
super(TextVaultSecret, self).__init__()
|
||||
self.text = text
|
||||
self.encoding = encoding or "utf-8"
|
||||
self._bytes = _bytes
|
||||
self.errors = errors or "strict"
|
||||
|
||||
@property
|
||||
def bytes(self):
|
||||
"""The text encoded with encoding, unless we specifically set _bytes."""
|
||||
return self._bytes or to_bytes(
|
||||
self.text, encoding=self.encoding, errors=self.errors
|
||||
)
|
|
@ -0,0 +1,167 @@
|
|||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
import io
|
||||
import yaml
|
||||
|
||||
from ansible.module_utils.six import PY3
|
||||
from ansible.parsing.yaml.loader import AnsibleLoader
|
||||
from ansible.parsing.yaml.dumper import AnsibleDumper
|
||||
|
||||
|
||||
class YamlTestUtils(object):
|
||||
"""Mixin class to combine with a unittest.TestCase subclass."""
|
||||
|
||||
def _loader(self, stream):
|
||||
"""Vault related tests will want to override this.
|
||||
|
||||
Vault cases should setup a AnsibleLoader that has the vault password."""
|
||||
return AnsibleLoader(stream)
|
||||
|
||||
def _dump_stream(self, obj, stream, dumper=None):
|
||||
"""Dump to a py2-unicode or py3-string stream."""
|
||||
if PY3:
|
||||
return yaml.dump(obj, stream, Dumper=dumper)
|
||||
else:
|
||||
return yaml.dump(obj, stream, Dumper=dumper, encoding=None)
|
||||
|
||||
def _dump_string(self, obj, dumper=None):
|
||||
"""Dump to a py2-unicode or py3-string"""
|
||||
if PY3:
|
||||
return yaml.dump(obj, Dumper=dumper)
|
||||
else:
|
||||
return yaml.dump(obj, Dumper=dumper, encoding=None)
|
||||
|
||||
def _dump_load_cycle(self, obj):
|
||||
# Each pass though a dump or load revs the 'generation'
|
||||
# obj to yaml string
|
||||
string_from_object_dump = self._dump_string(obj, dumper=AnsibleDumper)
|
||||
|
||||
# wrap a stream/file like StringIO around that yaml
|
||||
stream_from_object_dump = io.StringIO(string_from_object_dump)
|
||||
loader = self._loader(stream_from_object_dump)
|
||||
# load the yaml stream to create a new instance of the object (gen 2)
|
||||
obj_2 = loader.get_data()
|
||||
|
||||
# dump the gen 2 objects directory to strings
|
||||
string_from_object_dump_2 = self._dump_string(
|
||||
obj_2, dumper=AnsibleDumper
|
||||
)
|
||||
|
||||
# The gen 1 and gen 2 yaml strings
|
||||
self.assertEqual(string_from_object_dump, string_from_object_dump_2)
|
||||
# the gen 1 (orig) and gen 2 py object
|
||||
self.assertEqual(obj, obj_2)
|
||||
|
||||
# again! gen 3... load strings into py objects
|
||||
stream_3 = io.StringIO(string_from_object_dump_2)
|
||||
loader_3 = self._loader(stream_3)
|
||||
obj_3 = loader_3.get_data()
|
||||
|
||||
string_from_object_dump_3 = self._dump_string(
|
||||
obj_3, dumper=AnsibleDumper
|
||||
)
|
||||
|
||||
self.assertEqual(obj, obj_3)
|
||||
# should be transitive, but...
|
||||
self.assertEqual(obj_2, obj_3)
|
||||
self.assertEqual(string_from_object_dump, string_from_object_dump_3)
|
||||
|
||||
def _old_dump_load_cycle(self, obj):
|
||||
"""Dump the passed in object to yaml, load it back up, dump again, compare."""
|
||||
stream = io.StringIO()
|
||||
|
||||
yaml_string = self._dump_string(obj, dumper=AnsibleDumper)
|
||||
self._dump_stream(obj, stream, dumper=AnsibleDumper)
|
||||
|
||||
yaml_string_from_stream = stream.getvalue()
|
||||
|
||||
# reset stream
|
||||
stream.seek(0)
|
||||
|
||||
loader = self._loader(stream)
|
||||
# loader = AnsibleLoader(stream, vault_password=self.vault_password)
|
||||
obj_from_stream = loader.get_data()
|
||||
|
||||
stream_from_string = io.StringIO(yaml_string)
|
||||
loader2 = self._loader(stream_from_string)
|
||||
# loader2 = AnsibleLoader(stream_from_string, vault_password=self.vault_password)
|
||||
obj_from_string = loader2.get_data()
|
||||
|
||||
stream_obj_from_stream = io.StringIO()
|
||||
stream_obj_from_string = io.StringIO()
|
||||
|
||||
if PY3:
|
||||
yaml.dump(
|
||||
obj_from_stream, stream_obj_from_stream, Dumper=AnsibleDumper
|
||||
)
|
||||
yaml.dump(
|
||||
obj_from_stream, stream_obj_from_string, Dumper=AnsibleDumper
|
||||
)
|
||||
else:
|
||||
yaml.dump(
|
||||
obj_from_stream,
|
||||
stream_obj_from_stream,
|
||||
Dumper=AnsibleDumper,
|
||||
encoding=None,
|
||||
)
|
||||
yaml.dump(
|
||||
obj_from_stream,
|
||||
stream_obj_from_string,
|
||||
Dumper=AnsibleDumper,
|
||||
encoding=None,
|
||||
)
|
||||
|
||||
yaml_string_stream_obj_from_stream = stream_obj_from_stream.getvalue()
|
||||
yaml_string_stream_obj_from_string = stream_obj_from_string.getvalue()
|
||||
|
||||
stream_obj_from_stream.seek(0)
|
||||
stream_obj_from_string.seek(0)
|
||||
|
||||
if PY3:
|
||||
yaml_string_obj_from_stream = yaml.dump(
|
||||
obj_from_stream, Dumper=AnsibleDumper
|
||||
)
|
||||
yaml_string_obj_from_string = yaml.dump(
|
||||
obj_from_string, Dumper=AnsibleDumper
|
||||
)
|
||||
else:
|
||||
yaml_string_obj_from_stream = yaml.dump(
|
||||
obj_from_stream, Dumper=AnsibleDumper, encoding=None
|
||||
)
|
||||
yaml_string_obj_from_string = yaml.dump(
|
||||
obj_from_string, Dumper=AnsibleDumper, encoding=None
|
||||
)
|
||||
|
||||
assert yaml_string == yaml_string_obj_from_stream
|
||||
assert (
|
||||
yaml_string
|
||||
== yaml_string_obj_from_stream
|
||||
== yaml_string_obj_from_string
|
||||
)
|
||||
assert (
|
||||
yaml_string
|
||||
== yaml_string_obj_from_stream
|
||||
== yaml_string_obj_from_string
|
||||
== yaml_string_stream_obj_from_stream
|
||||
== yaml_string_stream_obj_from_string
|
||||
)
|
||||
assert obj == obj_from_stream
|
||||
assert obj == obj_from_string
|
||||
assert obj == yaml_string_obj_from_stream
|
||||
assert obj == yaml_string_obj_from_string
|
||||
assert (
|
||||
obj
|
||||
== obj_from_stream
|
||||
== obj_from_string
|
||||
== yaml_string_obj_from_stream
|
||||
== yaml_string_obj_from_string
|
||||
)
|
||||
return {
|
||||
"obj": obj,
|
||||
"yaml_string": yaml_string,
|
||||
"yaml_string_from_stream": yaml_string_from_stream,
|
||||
"obj_from_stream": obj_from_stream,
|
||||
"obj_from_string": obj_from_string,
|
||||
"yaml_string_obj_from_string": yaml_string_obj_from_string,
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
Value uptime ((\d+\s\w+.s.,?\s?){4})
|
||||
Value last_reboot_reason (\w+)
|
||||
Value version (\d+.\d+(.+)?)
|
||||
Value boot_image (.*)
|
||||
Value platfrom (\w+)
|
||||
|
||||
Start
|
||||
^\s+NXOS: version\s${version}
|
||||
^\s+NXOS image file is:\s${boot_image}
|
||||
^\s+cisco Nexus\d+\s${platfrom}
|
||||
^Kernel uptime is\s${uptime}
|
||||
^\s+Reason:\s${last_reboot_reason} -> Record
|
|
@ -0,0 +1,38 @@
|
|||
Cisco Nexus Operating System (NX-OS) Software
|
||||
TAC support: http://www.cisco.com/tac
|
||||
Documents: http://www.cisco.com/en/US/products/ps9372/tsd_products_support_series_home.html
|
||||
Copyright (c) 2002-2018, Cisco Systems, Inc. All rights reserved.
|
||||
The copyrights to certain works contained herein are owned by
|
||||
other third parties and are used and distributed under license.
|
||||
Some parts of this software are covered under the GNU Public
|
||||
License. A copy of the license is available at
|
||||
http://www.gnu.org/licenses/gpl.html.
|
||||
|
||||
Nexus 9000v is a demo version of the Nexus Operating System
|
||||
|
||||
Software
|
||||
BIOS: version
|
||||
NXOS: version 9.2(2)
|
||||
BIOS compile time:
|
||||
NXOS image file is: bootflash:///nxos.9.2.2.bin
|
||||
NXOS compile time: 11/4/2018 21:00:00 [11/05/2018 06:11:06]
|
||||
|
||||
|
||||
Hardware
|
||||
cisco Nexus9000 9000v Chassis
|
||||
with 8035024 kB of memory.
|
||||
Processor Board ID 970MUM0NTLV
|
||||
|
||||
Device name: nxos101
|
||||
bootflash: 3509454 kB
|
||||
Kernel uptime is 33 day(s), 17 hour(s), 29 minute(s), 8 second(s)
|
||||
|
||||
Last reset
|
||||
Reason: Unknown
|
||||
System version:
|
||||
Service:
|
||||
|
||||
plugin
|
||||
Core Plugin, Ethernet Plugin
|
||||
|
||||
Active Package(s):
|
|
@ -0,0 +1,594 @@
|
|||
# (c) 2020 Ansible Project
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from ansible.playbook.task import Task
|
||||
from ansible.template import Templar
|
||||
|
||||
from ansible_collections.ansible.utils.tests.unit.compat import unittest
|
||||
from ansible_collections.ansible.utils.tests.unit.compat.mock import (
|
||||
MagicMock,
|
||||
patch,
|
||||
)
|
||||
from ansible_collections.ansible.utils.tests.unit.mock.loader import (
|
||||
DictDataLoader,
|
||||
)
|
||||
from ansible_collections.ansible.utils.plugins.action.cli_parse import (
|
||||
ActionModule,
|
||||
)
|
||||
from ansible_collections.ansible.utils.plugins.modules.cli_parse import (
|
||||
DOCUMENTATION,
|
||||
)
|
||||
from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import (
|
||||
check_argspec,
|
||||
)
|
||||
from ansible_collections.ansible.utils.plugins.action.cli_parse import (
|
||||
ARGSPEC_CONDITIONALS,
|
||||
)
|
||||
from ansible_collections.ansible.utils.plugins.cli_parsers._base import (
|
||||
CliParserBase,
|
||||
)
|
||||
from ansible.module_utils.connection import (
|
||||
ConnectionError as AnsibleConnectionError,
|
||||
)
|
||||
|
||||
|
||||
class TestCli_Parse(unittest.TestCase):
|
||||
def setUp(self):
|
||||
task = MagicMock(Task)
|
||||
play_context = MagicMock()
|
||||
play_context.check_mode = False
|
||||
connection = MagicMock()
|
||||
fake_loader = DictDataLoader({})
|
||||
templar = Templar(loader=fake_loader)
|
||||
self._plugin = ActionModule(
|
||||
task=task,
|
||||
connection=connection,
|
||||
play_context=play_context,
|
||||
loader=fake_loader,
|
||||
templar=templar,
|
||||
shared_loader_obj=None,
|
||||
)
|
||||
self._plugin._task.action = "cli_parse"
|
||||
|
||||
@staticmethod
|
||||
def _load_fixture(filename):
|
||||
""" Load a fixture from the filesystem
|
||||
|
||||
:param filename: The name of the file to load
|
||||
:type filename: str
|
||||
:return: The file contents
|
||||
:rtype: str
|
||||
"""
|
||||
fixture_name = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", filename
|
||||
)
|
||||
with open(fixture_name) as fhand:
|
||||
return fhand.read()
|
||||
|
||||
def test_fn_debug(self):
|
||||
""" Confirm debug doesn't fail and return None
|
||||
"""
|
||||
msg = "some message"
|
||||
result = self._plugin._debug(msg)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_fn_ail_json(self):
|
||||
""" Confirm fail json replaces basic.py in msg
|
||||
"""
|
||||
msg = "text (basic.py)"
|
||||
with self.assertRaises(Exception) as error:
|
||||
self._plugin._fail_json(msg)
|
||||
self.assertEqual("text cli_parse", str(error.exception))
|
||||
|
||||
def test_fn_check_argspec_pass(self):
|
||||
""" Confirm a valid argspec passes
|
||||
"""
|
||||
kwargs = {
|
||||
"text": "text",
|
||||
"parser": {
|
||||
"name": "ansible.utils.textfsm",
|
||||
"command": "show version",
|
||||
},
|
||||
}
|
||||
valid, result, updated_params = check_argspec(
|
||||
DOCUMENTATION, "cli_parse module", schema_conditionals={}, **kwargs
|
||||
)
|
||||
self.assertEqual(valid, True)
|
||||
|
||||
def test_fn_check_argspec_fail_no_test_or_command(self):
|
||||
""" Confirm failed argpsec w/o text or command
|
||||
"""
|
||||
kwargs = {
|
||||
"parser": {
|
||||
"name": "ansible.utils.textfsm",
|
||||
"command": "show version",
|
||||
}
|
||||
}
|
||||
valid, result, updated_params = check_argspec(
|
||||
DOCUMENTATION,
|
||||
"cli_parse module",
|
||||
schema_conditionals=ARGSPEC_CONDITIONALS,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
"one of the following is required: command, text", result["errors"]
|
||||
)
|
||||
|
||||
def test_fn_check_argspec_fail_no_parser_name(self):
|
||||
""" Confirm failed argspec no parser name
|
||||
"""
|
||||
kwargs = {"text": "anything", "parser": {"command": "show version"}}
|
||||
valid, result, updated_params = check_argspec(
|
||||
DOCUMENTATION,
|
||||
"cli_parse module",
|
||||
schema_conditionals=ARGSPEC_CONDITIONALS,
|
||||
**kwargs
|
||||
)
|
||||
self.assertEqual(
|
||||
"missing required arguments: name found in parser",
|
||||
result["errors"],
|
||||
)
|
||||
|
||||
def test_fn_extended_check_argspec_parser_name_not_coll(self):
|
||||
""" Confirm failed argpsec parser not collection format
|
||||
"""
|
||||
self._plugin._task.args = {
|
||||
"text": "anything",
|
||||
"parser": {
|
||||
"command": "show version",
|
||||
"name": "not_collection_format",
|
||||
},
|
||||
}
|
||||
self._plugin._extended_check_argspec()
|
||||
self.assertTrue(self._plugin._result["failed"])
|
||||
self.assertIn("including collection", self._plugin._result["msg"])
|
||||
|
||||
def test_fn_extended_check_argspec_missing_tpath_or_command(self):
|
||||
""" Confirm failed argpsec missing template_path
|
||||
or command when text provided
|
||||
"""
|
||||
self._plugin._task.args = {
|
||||
"text": "anything",
|
||||
"parser": {"name": "a.b.c"},
|
||||
}
|
||||
self._plugin._extended_check_argspec()
|
||||
self.assertTrue(self._plugin._result["failed"])
|
||||
self.assertIn(
|
||||
"provided when parsing text", self._plugin._result["msg"]
|
||||
)
|
||||
|
||||
def test_fn_load_parser_pass(self):
|
||||
""" Confirm each each of the parsers loads from the filesystem
|
||||
"""
|
||||
parser_names = ["json", "textfsm", "ttp", "xml"]
|
||||
for parser_name in parser_names:
|
||||
self._plugin._task.args = {
|
||||
"text": "anything",
|
||||
"parser": {"name": "ansible.utils." + parser_name},
|
||||
}
|
||||
parser = self._plugin._load_parser(task_vars=None)
|
||||
self.assertEqual(type(parser).__name__, "CliParser")
|
||||
self.assertTrue(hasattr(parser, "parse"))
|
||||
self.assertTrue(callable(parser.parse))
|
||||
|
||||
def test_fn_load_parser_fail(self):
|
||||
""" Confirm missing parser fails gracefully
|
||||
"""
|
||||
self._plugin._task.args = {
|
||||
"text": "anything",
|
||||
"parser": {"name": "a.b.c"},
|
||||
}
|
||||
parser = self._plugin._load_parser(task_vars=None)
|
||||
self.assertIsNone(parser)
|
||||
self.assertTrue(self._plugin._result["failed"])
|
||||
self.assertIn("No module named", self._plugin._result["msg"])
|
||||
|
||||
def test_fn_set_parser_command_missing(self):
|
||||
""" Confirm parser/command is set if missing
|
||||
and command provided
|
||||
"""
|
||||
self._plugin._task.args = {
|
||||
"command": "anything",
|
||||
"parser": {"name": "a.b.c"},
|
||||
}
|
||||
self._plugin._set_parser_command()
|
||||
self.assertEqual(
|
||||
self._plugin._task.args["parser"]["command"], "anything"
|
||||
)
|
||||
|
||||
def test_fn_set_parser_command_present(self):
|
||||
""" Confirm parser/command is not changed if provided
|
||||
"""
|
||||
self._plugin._task.args = {
|
||||
"command": "anything",
|
||||
"parser": {"command": "something", "name": "a.b.c"},
|
||||
}
|
||||
self._plugin._set_parser_command()
|
||||
self.assertEqual(
|
||||
self._plugin._task.args["parser"]["command"], "something"
|
||||
)
|
||||
|
||||
def test_fn_set_parser_command_absent(self):
|
||||
""" Confirm parser/command is not added
|
||||
"""
|
||||
self._plugin._task.args = {"parser": {}}
|
||||
self._plugin._set_parser_command()
|
||||
self.assertNotIn("command", self._plugin._task.args["parser"])
|
||||
|
||||
def test_fn_set_text_present(self):
|
||||
""" Check task args text is set to stdout
|
||||
"""
|
||||
expected = "output"
|
||||
self._plugin._result["stdout"] = expected
|
||||
self._plugin._task.args = {}
|
||||
self._plugin._set_text()
|
||||
self.assertEqual(self._plugin._task.args["text"], expected)
|
||||
|
||||
def test_fn_set_text_absent(self):
|
||||
""" Check task args text is set to stdout
|
||||
"""
|
||||
self._plugin._result["stdout"] = None
|
||||
self._plugin._task.args = {}
|
||||
self._plugin._set_text()
|
||||
self.assertNotIn("text", self._plugin._task.args)
|
||||
|
||||
def test_fn_os_from_task_vars(self):
|
||||
""" Confirm os is set based on task vars
|
||||
"""
|
||||
checks = [
|
||||
("ansible_network_os", "cisco.nxos.nxos", "nxos"),
|
||||
("ansible_network_os", "NXOS", "nxos"),
|
||||
("ansible_distribution", "Fedora", "fedora"),
|
||||
(None, None, ""),
|
||||
]
|
||||
for check in checks:
|
||||
self._plugin._task_vars = {check[0]: check[1]}
|
||||
result = self._plugin._os_from_task_vars()
|
||||
self.assertEqual(result, check[2])
|
||||
|
||||
def test_fn_update_template_path_not_exist(self):
|
||||
""" Check the creation of the template_path if
|
||||
it doesn't exist in the user provided data
|
||||
"""
|
||||
self._plugin._task.args = {
|
||||
"parser": {"command": "a command", "name": "a.b.c"}
|
||||
}
|
||||
self._plugin._task_vars = {"ansible_network_os": "cisco.nxos.nxos"}
|
||||
with self.assertRaises(Exception) as error:
|
||||
self._plugin._update_template_path("yaml")
|
||||
self.assertIn(
|
||||
"Could not find or access 'nxos_a_command.yaml'",
|
||||
str(error.exception),
|
||||
)
|
||||
|
||||
def test_fn_update_template_path_not_exist_os(self):
|
||||
""" Check the creation of the template_path if
|
||||
it doesn't exist in the user provided data
|
||||
name based on os provided in task
|
||||
"""
|
||||
self._plugin._task.args = {
|
||||
"parser": {"command": "a command", "name": "a.b.c", "os": "myos"}
|
||||
}
|
||||
with self.assertRaises(Exception) as error:
|
||||
self._plugin._update_template_path("yaml")
|
||||
self.assertIn(
|
||||
"Could not find or access 'myos_a_command.yaml'",
|
||||
str(error.exception),
|
||||
)
|
||||
|
||||
def test_fn_update_template_path_mock_find_needle(self):
|
||||
""" Check the creation of the template_path
|
||||
mock the find needle fn so the template doesn't
|
||||
need to be in the default template folder
|
||||
"""
|
||||
template_path = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "nxos_show_version.yaml"
|
||||
)
|
||||
self._plugin._find_needle = MagicMock()
|
||||
self._plugin._find_needle.return_value = template_path
|
||||
self._plugin._task.args = {
|
||||
"parser": {"command": "show version", "os": "nxos"}
|
||||
}
|
||||
self._plugin._update_template_path("yaml")
|
||||
self.assertEqual(
|
||||
self._plugin._task.args["parser"]["template_path"], template_path
|
||||
)
|
||||
|
||||
def test_fn_get_template_contents_pass(self):
|
||||
""" Check the retrieval of the template contents
|
||||
"""
|
||||
temp = tempfile.NamedTemporaryFile()
|
||||
contents = "abcdef"
|
||||
with open(temp.name, "w") as fileh:
|
||||
fileh.write(contents)
|
||||
|
||||
self._plugin._task.args = {"parser": {"template_path": temp.name}}
|
||||
result = self._plugin._get_template_contents()
|
||||
self.assertEqual(result, contents)
|
||||
|
||||
def test_fn_get_template_contents_missing(self):
|
||||
""" Check the retrieval of the template contents
|
||||
"""
|
||||
self._plugin._task.args = {"parser": {"template_path": "non-exist"}}
|
||||
with self.assertRaises(Exception) as error:
|
||||
self._plugin._get_template_contents()
|
||||
self.assertIn(
|
||||
"Failed to open template 'non-exist'", str(error.exception)
|
||||
)
|
||||
|
||||
def test_fn_get_template_contents_not_specified(self):
|
||||
""" Check the none when template_path not specified
|
||||
"""
|
||||
self._plugin._task.args = {"parser": {}}
|
||||
result = self._plugin._get_template_contents()
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_fn_prune_result_pass(self):
|
||||
""" Test the removal of stdout and stdout_lines from the _result
|
||||
"""
|
||||
self._plugin._result["stdout"] = "abc"
|
||||
self._plugin._result["stdout_lines"] = "abc"
|
||||
self._plugin._prune_result()
|
||||
self.assertNotIn("stdout", self._plugin._result)
|
||||
self.assertNotIn("stdout_lines", self._plugin._result)
|
||||
|
||||
def test_fn_prune_result_not_exist(self):
|
||||
""" Test the removal of stdout and stdout_lines from the _result
|
||||
"""
|
||||
self._plugin._prune_result()
|
||||
self.assertNotIn("stdout", self._plugin._result)
|
||||
self.assertNotIn("stdout_lines", self._plugin._result)
|
||||
|
||||
def test_fn_run_command_lx_rc0(self):
|
||||
""" Check run command for non network
|
||||
"""
|
||||
response = "abc"
|
||||
self._plugin._connection.socket_path = None
|
||||
self._plugin._low_level_execute_command = MagicMock()
|
||||
self._plugin._low_level_execute_command.return_value = {
|
||||
"rc": 0,
|
||||
"stdout": response,
|
||||
"stdout_lines": response,
|
||||
}
|
||||
self._plugin._task.args = {"command": "ls"}
|
||||
self._plugin._run_command()
|
||||
self.assertEqual(self._plugin._result["stdout"], response)
|
||||
self.assertEqual(self._plugin._result["stdout_lines"], response)
|
||||
|
||||
def test_fn_run_command_lx_rc1(self):
|
||||
""" Check run command for non network
|
||||
"""
|
||||
response = "abc"
|
||||
self._plugin._connection.socket_path = None
|
||||
self._plugin._low_level_execute_command = MagicMock()
|
||||
self._plugin._low_level_execute_command.return_value = {
|
||||
"rc": 1,
|
||||
"stdout": None,
|
||||
"stdout_lines": None,
|
||||
"stderr": response,
|
||||
}
|
||||
self._plugin._task.args = {"command": "ls"}
|
||||
self._plugin._run_command()
|
||||
self.assertTrue(self._plugin._result["failed"])
|
||||
self.assertEqual(self._plugin._result["msg"], response)
|
||||
|
||||
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
||||
def test_fn_run_command_network(self, mock_rpc):
|
||||
""" Check run command for network
|
||||
"""
|
||||
expected = "abc"
|
||||
mock_rpc.return_value = expected
|
||||
self._plugin._connection.socket_path = (
|
||||
tempfile.NamedTemporaryFile().name
|
||||
)
|
||||
self._plugin._task.args = {"command": "command"}
|
||||
self._plugin._run_command()
|
||||
self.assertEqual(self._plugin._result["stdout"], expected)
|
||||
self.assertEqual(self._plugin._result["stdout_lines"], [expected])
|
||||
|
||||
def test_fn_run_command_not_specified(self):
|
||||
""" Check run command for network
|
||||
"""
|
||||
self._plugin._task.args = {"command": None}
|
||||
result = self._plugin._run_command()
|
||||
self.assertIsNone(result)
|
||||
|
||||
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
||||
def test_fn_run_pass_w_fact(self, mock_rpc):
|
||||
""" Check full module run with valid params
|
||||
"""
|
||||
mock_out = self._load_fixture("nxos_show_version.txt")
|
||||
mock_rpc.return_value = mock_out
|
||||
self._plugin._connection.socket_path = (
|
||||
tempfile.NamedTemporaryFile().name
|
||||
)
|
||||
template_path = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "nxos_show_version.textfsm"
|
||||
)
|
||||
self._plugin._task.args = {
|
||||
"command": "show version",
|
||||
"parser": {
|
||||
"name": "ansible.utils.textfsm",
|
||||
"template_path": template_path,
|
||||
},
|
||||
"set_fact": "new_fact",
|
||||
}
|
||||
task_vars = {"inventory_hostname": "mockdevice"}
|
||||
result = self._plugin.run(task_vars=task_vars)
|
||||
self.assertEqual(result["stdout"], mock_out)
|
||||
self.assertEqual(result["stdout_lines"], mock_out.splitlines())
|
||||
self.assertEqual(result["parsed"][0]["version"], "9.2(2)")
|
||||
self.assertEqual(
|
||||
result["ansible_facts"]["new_fact"][0]["version"], "9.2(2)"
|
||||
)
|
||||
|
||||
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
||||
def test_fn_run_pass_wo_fact(self, mock_rpc):
|
||||
""" Check full module run with valid params
|
||||
"""
|
||||
mock_out = self._load_fixture("nxos_show_version.txt")
|
||||
mock_rpc.return_value = mock_out
|
||||
self._plugin._connection.socket_path = (
|
||||
tempfile.NamedTemporaryFile().name
|
||||
)
|
||||
template_path = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "nxos_show_version.textfsm"
|
||||
)
|
||||
self._plugin._task.args = {
|
||||
"command": "show version",
|
||||
"parser": {
|
||||
"name": "ansible.utils.textfsm",
|
||||
"template_path": template_path,
|
||||
},
|
||||
}
|
||||
task_vars = {"inventory_hostname": "mockdevice"}
|
||||
result = self._plugin.run(task_vars=task_vars)
|
||||
self.assertEqual(result["stdout"], mock_out)
|
||||
self.assertEqual(result["stdout_lines"], mock_out.splitlines())
|
||||
self.assertEqual(result["parsed"][0]["version"], "9.2(2)")
|
||||
self.assertNotIn("ansible_facts", result)
|
||||
|
||||
def test_fn_run_fail_argspec(self):
|
||||
""" Check full module run with invalid params
|
||||
"""
|
||||
self._plugin._task.args = {
|
||||
"text": "anything",
|
||||
"parser": {
|
||||
"command": "show version",
|
||||
"name": "not_collection_format",
|
||||
},
|
||||
}
|
||||
self._plugin.run(task_vars=None)
|
||||
self.assertTrue(self._plugin._result["failed"])
|
||||
self.assertIn("including collection", self._plugin._result["msg"])
|
||||
|
||||
def test_fn_run_fail_command(self):
|
||||
""" Confirm clean fail with rc 1
|
||||
"""
|
||||
self._plugin._connection.socket_path = None
|
||||
self._plugin._low_level_execute_command = MagicMock()
|
||||
self._plugin._low_level_execute_command.return_value = {
|
||||
"rc": 1,
|
||||
"stdout": None,
|
||||
"stdout_lines": None,
|
||||
"stderr": None,
|
||||
}
|
||||
self._plugin._task.args = {
|
||||
"command": "ls",
|
||||
"parser": {"name": "a.b.c"},
|
||||
}
|
||||
task_vars = {"inventory_hostname": "mockdevice"}
|
||||
result = self._plugin.run(task_vars=task_vars)
|
||||
expected = {
|
||||
"failed": True,
|
||||
"msg": None,
|
||||
"stdout": None,
|
||||
"stdout_lines": None,
|
||||
}
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_fn_run_fail_missing_parser(self):
|
||||
"""Confirm clean fail with missing parser
|
||||
"""
|
||||
self._plugin._task.args = {"text": None, "parser": {"name": "a.b.c"}}
|
||||
task_vars = {"inventory_hostname": "mockdevice"}
|
||||
result = self._plugin.run(task_vars=task_vars)
|
||||
self.assertEqual(result["failed"], True)
|
||||
self.assertIn("Error loading parser", result["msg"])
|
||||
|
||||
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
||||
def test_fn_run_pass_missing_parser_constants(self, mock_rpc):
|
||||
""" Check full module run using parser w/o
|
||||
DEFAULT_TEMPLATE_EXTENSION or PROVIDE_TEMPLATE_CONTENTS
|
||||
defined in the parser
|
||||
"""
|
||||
mock_out = self._load_fixture("nxos_show_version.txt")
|
||||
|
||||
class CliParser(CliParserBase):
|
||||
def parse(self, *_args, **kwargs):
|
||||
return {"parsed": mock_out}
|
||||
|
||||
self._plugin._load_parser = MagicMock()
|
||||
self._plugin._load_parser.return_value = CliParser(None, None, None)
|
||||
|
||||
mock_out = self._load_fixture("nxos_show_version.txt")
|
||||
mock_rpc.return_value = mock_out
|
||||
|
||||
self._plugin._connection.socket_path = (
|
||||
tempfile.NamedTemporaryFile().name
|
||||
)
|
||||
template_path = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "nxos_empty_parser.textfsm"
|
||||
)
|
||||
self._plugin._task.args = {
|
||||
"command": "show version",
|
||||
"parser": {
|
||||
"name": "ansible.utils.textfsm",
|
||||
"template_path": template_path,
|
||||
},
|
||||
}
|
||||
task_vars = {"inventory_hostname": "mockdevice"}
|
||||
result = self._plugin.run(task_vars=task_vars)
|
||||
self.assertEqual(result["stdout"], mock_out)
|
||||
self.assertEqual(result["stdout_lines"], mock_out.splitlines())
|
||||
self.assertEqual(result["parsed"], mock_out)
|
||||
|
||||
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
||||
def test_fn_run_pass_missing_parser_in_parser(self, mock_rpc):
|
||||
""" Check full module run using parser w/o
|
||||
a parser function defined in the parser
|
||||
defined in the parser
|
||||
"""
|
||||
mock_out = self._load_fixture("nxos_show_version.txt")
|
||||
|
||||
class CliParser(CliParserBase):
|
||||
pass
|
||||
|
||||
self._plugin._load_parser = MagicMock()
|
||||
self._plugin._load_parser.return_value = CliParser(None, None, None)
|
||||
|
||||
mock_out = self._load_fixture("nxos_show_version.textfsm")
|
||||
mock_rpc.return_value = mock_out
|
||||
|
||||
self._plugin._connection.socket_path = (
|
||||
tempfile.NamedTemporaryFile().name
|
||||
)
|
||||
template_path = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "nxos_empty_parser.textfsm"
|
||||
)
|
||||
self._plugin._task.args = {
|
||||
"command": "show version",
|
||||
"parser": {
|
||||
"name": "ansible.utils.textfsm",
|
||||
"template_path": template_path,
|
||||
},
|
||||
}
|
||||
task_vars = {"inventory_hostname": "mockdevice"}
|
||||
with self.assertRaises(Exception) as error:
|
||||
self._plugin.run(task_vars=task_vars)
|
||||
self.assertIn("Unhandled", str(error.exception))
|
||||
|
||||
@patch("ansible.module_utils.connection.Connection.__rpc__")
|
||||
def test_fn_run_net_device_error(self, mock_rpc):
|
||||
""" Check full module run mock error from network device
|
||||
"""
|
||||
msg = "I was mocked"
|
||||
mock_rpc.side_effect = AnsibleConnectionError(msg)
|
||||
self._plugin._connection.socket_path = (
|
||||
tempfile.NamedTemporaryFile().name
|
||||
)
|
||||
self._plugin._task.args = {
|
||||
"command": "show version",
|
||||
"parser": {"name": "ansible.utils.textfsm"},
|
||||
}
|
||||
task_vars = {"inventory_hostname": "mockdevice"}
|
||||
result = self._plugin.run(task_vars=task_vars)
|
||||
self.assertEqual(result["failed"], True)
|
||||
self.assertEqual([msg], result["msg"])
|
|
@ -0,0 +1,4 @@
|
|||
Interface IP-Address OK? Method Status Protocol
|
||||
GigabitEthernet0/0 10.8.38.75 YES manual up up
|
||||
GigabitEthernet0/1 unassigned YES unset up up
|
||||
GigabitEthernet0/2 unassigned YES unset up up
|
|
@ -0,0 +1,39 @@
|
|||
an-nxos9k-01# show version
|
||||
Cisco Nexus Operating System (NX-OS) Software
|
||||
TAC support: http://www.cisco.com/tac
|
||||
Documents: http://www.cisco.com/en/US/products/ps9372/tsd_products_support_series_home.html
|
||||
Copyright (c) 2002-2017, Cisco Systems, Inc. All rights reserved.
|
||||
The copyrights to certain works contained herein are owned by
|
||||
other third parties and are used and distributed under license.
|
||||
Some parts of this software are covered under the GNU Public
|
||||
License. A copy of the license is available at
|
||||
http://www.gnu.org/licenses/gpl.html.
|
||||
|
||||
Nexus 9000v is a demo version of the Nexus Operating System
|
||||
|
||||
Software
|
||||
BIOS: version
|
||||
NXOS: version 7.0(3)I7(1)
|
||||
BIOS compile time:
|
||||
NXOS image file is: bootflash:///nxos.7.0.3.I7.1.bin
|
||||
NXOS compile time: 8/31/2017 14:00:00 [08/31/2017 22:29:32]
|
||||
|
||||
|
||||
Hardware
|
||||
cisco Nexus9000 9000v Chassis
|
||||
with 4041236 kB of memory.
|
||||
Processor Board ID 96NK4OUJH32
|
||||
|
||||
Device name: an-nxos9k-01
|
||||
bootflash: 3509454 kB
|
||||
Kernel uptime is 12 day(s), 23 hour(s), 48 minute(s), 10 second(s)
|
||||
|
||||
Last reset
|
||||
Reason: Unknown
|
||||
System version:
|
||||
Service:
|
||||
|
||||
plugin
|
||||
Core Plugin, Ethernet Plugin
|
||||
|
||||
Active Package(s):
|
|
@ -0,0 +1,16 @@
|
|||
Value BOOT_IMAGE (.*)
|
||||
Value UPTIME ((\d+\s\w+.s.,?\s?){4})
|
||||
Value LAST_REBOOT_REASON (.+)
|
||||
Value OS (\d+.\d+(.+)?)
|
||||
Value PLATFORM (\w+)
|
||||
|
||||
Start
|
||||
^\s*(NXOS: version|system:\s+version)\s+${OS}\s*$$
|
||||
^\s*(NXOS|kickstart)\s+image\s+file\s+is:\s+${BOOT_IMAGE}\s*$$
|
||||
^\s+cisco\s+${PLATFORM}\s+[cC]hassis
|
||||
^\s+cisco\s+Nexus\d+\s+${PLATFORM}
|
||||
# Cisco N5K platform
|
||||
^\s+cisco\s+Nexus\s+${PLATFORM}\s+[cC]hassis
|
||||
^\s+cisco\s+.+-${PLATFORM}\s*
|
||||
^Kernel\s+uptime\s+is\s+${UPTIME}
|
||||
^\s+Reason:\s${LAST_REBOOT_REASON} -> Record
|
|
@ -0,0 +1,7 @@
|
|||
<group>
|
||||
NXOS: version {{ os }}
|
||||
NXOS image file is: {{ boot_image }}
|
||||
Kernel uptime is {{ uptime | ORPHRASE }}
|
||||
cisco Nexus9000 {{ platform }} Chassis
|
||||
</group>
|
||||
"""
|
|
@ -0,0 +1,16 @@
|
|||
Value BOOT_IMAGE (.*)
|
||||
Value UPTIME ((\d+\s\w+.s.,?\s?){4})
|
||||
Value LAST_REBOOT_REASON (.+)
|
||||
Value OS (\d+.\d+(.+)?)
|
||||
Value PLATFORM (\w+)
|
||||
|
||||
Start
|
||||
^\s*(NXOS: version|system:\s+version)\s+${OS}\s*$$
|
||||
\s*(NXOS|kickstart)\s+image\s+file\s+is:\s+${BOOT_IMAGE}\s*$$
|
||||
^\s+cisco\s+${PLATFORM}\s+[cC]hassis
|
||||
^\s+cisco\s+Nexus\d+\s+${PLATFORM}
|
||||
# Cisco N5K platform
|
||||
^\s+cisco\s+Nexus\s+${PLATFORM}\s+[cC]hassis
|
||||
cisco\s+.+-${PLATFORM}\s*
|
||||
^Kernel\s+uptime\s+is\s+${UPTIME}
|
||||
^\s+Reason:\s${LAST_REBOOT_REASON} -> Record
|
|
@ -0,0 +1,44 @@
|
|||
# (c) 2020 Ansible Project
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import json
|
||||
|
||||
from ansible_collections.ansible.utils.tests.unit.compat import unittest
|
||||
from ansible_collections.ansible.utils.plugins.cli_parsers.json_parser import (
|
||||
CliParser,
|
||||
)
|
||||
|
||||
|
||||
class TestJsonParser(unittest.TestCase):
|
||||
def test_json_parser(self):
|
||||
test_value = {
|
||||
"string": "This is a string",
|
||||
"list": ["This", "is", "a", "list"],
|
||||
"bool": True,
|
||||
"int": 27,
|
||||
"dict": {
|
||||
"This": "string",
|
||||
"is": ["l", "i", "s", "t"],
|
||||
"a": True,
|
||||
"dict": 42,
|
||||
},
|
||||
}
|
||||
task_args = {"text": json.dumps(test_value)}
|
||||
parser = CliParser(task_args=task_args, task_vars=[], debug=False)
|
||||
|
||||
result = parser.parse()
|
||||
self.assertEqual(result, {"parsed": test_value})
|
||||
|
||||
def test_invalid_json(self):
|
||||
task_args = {"text": "Definitely not JSON"}
|
||||
parser = CliParser(task_args=task_args, task_vars=[], debug=False)
|
||||
|
||||
result = parser.parse()
|
||||
# Errors are different between Python 2 and 3, so we have to be a bit roundabout.
|
||||
self.assertEqual(len(result), 1)
|
||||
assert "errors" in result
|
||||
self.assertEqual(len(result["errors"]), 1)
|
|
@ -0,0 +1,71 @@
|
|||
# (c) 2020 Ansible Project
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_collections.ansible.utils.tests.unit.compat import unittest
|
||||
from ansible_collections.ansible.utils.plugins.cli_parsers.textfsm_parser import (
|
||||
CliParser,
|
||||
)
|
||||
|
||||
textfsm = pytest.importorskip("textfsm")
|
||||
|
||||
|
||||
class TestTextfsmParser(unittest.TestCase):
|
||||
def test_textfsm_parser(self):
|
||||
nxos_cfg_path = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "nxos_show_version.cfg"
|
||||
)
|
||||
nxos_template_path = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "nxos_show_version.textfsm"
|
||||
)
|
||||
|
||||
with open(nxos_cfg_path) as fhand:
|
||||
nxos_show_version_output = fhand.read()
|
||||
|
||||
task_args = {
|
||||
"text": nxos_show_version_output,
|
||||
"parser": {
|
||||
"name": "ansible.utils.textfsm",
|
||||
"command": "show version",
|
||||
"template_path": nxos_template_path,
|
||||
},
|
||||
}
|
||||
parser = CliParser(task_args=task_args, task_vars=[], debug=False)
|
||||
|
||||
result = parser.parse()
|
||||
parsed_output = [
|
||||
{
|
||||
"BOOT_IMAGE": "bootflash:///nxos.7.0.3.I7.1.bin",
|
||||
"LAST_REBOOT_REASON": "Unknown",
|
||||
"OS": "7.0(3)I7(1)",
|
||||
"PLATFORM": "9000v",
|
||||
"UPTIME": "12 day(s), 23 hour(s), 48 minute(s), 10 second(s)",
|
||||
}
|
||||
]
|
||||
self.assertEqual(result, {"parsed": parsed_output})
|
||||
|
||||
def test_textfsm_parser_invalid_parser(self):
|
||||
fake_path = "/ /I hope this doesn't exist"
|
||||
task_args = {
|
||||
"text": "",
|
||||
"parser": {
|
||||
"name": "ansible.utils.textfsm",
|
||||
"command": "show version",
|
||||
"template_path": fake_path,
|
||||
},
|
||||
}
|
||||
parser = CliParser(task_args=task_args, task_vars=[], debug=False)
|
||||
result = parser.parse()
|
||||
error = {
|
||||
"error": "error while reading template_path file {0}".format(
|
||||
fake_path
|
||||
)
|
||||
}
|
||||
self.assertEqual(result, error)
|
|
@ -0,0 +1,71 @@
|
|||
# (c) 2020 Ansible Project
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_collections.ansible.utils.tests.unit.compat import unittest
|
||||
from ansible_collections.ansible.utils.plugins.cli_parsers.ttp_parser import (
|
||||
CliParser,
|
||||
)
|
||||
|
||||
textfsm = pytest.importorskip("ttp")
|
||||
|
||||
|
||||
class TestTextfsmParser(unittest.TestCase):
|
||||
def test_ttp_parser(self):
|
||||
nxos_cfg_path = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "nxos_show_version.cfg"
|
||||
)
|
||||
nxos_template_path = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "nxos_show_version.ttp"
|
||||
)
|
||||
|
||||
with open(nxos_cfg_path) as fhand:
|
||||
nxos_show_version_output = fhand.read()
|
||||
|
||||
task_args = {
|
||||
"text": nxos_show_version_output,
|
||||
"parser": {
|
||||
"name": "ansible.utils.ttp",
|
||||
"command": "show version",
|
||||
"template_path": nxos_template_path,
|
||||
},
|
||||
}
|
||||
parser = CliParser(task_args=task_args, task_vars=[], debug=False)
|
||||
|
||||
result = parser.parse()
|
||||
# import pdb; pdb.set_trace()
|
||||
parsed_output = [
|
||||
{
|
||||
"boot_image": "bootflash:///nxos.7.0.3.I7.1.bin",
|
||||
"os": "7.0(3)I7(1)",
|
||||
"platform": "9000v",
|
||||
"uptime": "12 day(s), 23 hour(s), 48 minute(s), 10 second(s)",
|
||||
}
|
||||
]
|
||||
self.assertEqual(result["parsed"][0][0], parsed_output)
|
||||
|
||||
def test_textfsm_parser_invalid_parser(self):
|
||||
fake_path = "/ /I hope this doesn't exist"
|
||||
task_args = {
|
||||
"text": "",
|
||||
"parser": {
|
||||
"name": "ansible.utils.ttp",
|
||||
"command": "show version",
|
||||
"template_path": fake_path,
|
||||
},
|
||||
}
|
||||
parser = CliParser(task_args=task_args, task_vars=[], debug=False)
|
||||
result = parser.parse()
|
||||
error = {
|
||||
"error": "error while reading template_path file {0}".format(
|
||||
fake_path
|
||||
)
|
||||
}
|
||||
self.assertEqual(result, error)
|
|
@ -0,0 +1,43 @@
|
|||
# (c) 2020 Ansible Project
|
||||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
from collections import OrderedDict
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_collections.ansible.utils.tests.unit.compat import unittest
|
||||
from ansible_collections.ansible.utils.plugins.cli_parsers.xml_parser import (
|
||||
CliParser,
|
||||
)
|
||||
|
||||
xmltodict = pytest.importorskip("xmltodict")
|
||||
|
||||
|
||||
class TestXmlParser(unittest.TestCase):
|
||||
def test_valid_xml(self):
|
||||
xml = "<tag1><tag2 arg='foo'>text</tag2></tag1>"
|
||||
xml_dict = OrderedDict(
|
||||
tag1=OrderedDict(
|
||||
tag2=OrderedDict([("@arg", "foo"), ("#text", "text")])
|
||||
)
|
||||
)
|
||||
task_args = {"text": xml, "parser": {"os": "none"}}
|
||||
parser = CliParser(task_args=task_args, task_vars=[], debug=False)
|
||||
|
||||
result = parser.parse()
|
||||
self.assertEqual(result["parsed"], xml_dict)
|
||||
|
||||
def test_invalid_xml(self):
|
||||
task_args = {"text": "Definitely not XML", "parser": {"os": "none"}}
|
||||
parser = CliParser(task_args=task_args, task_vars=[], debug=False)
|
||||
|
||||
result = parser.parse()
|
||||
self.assertEqual(len(result["errors"]), 1)
|
||||
self.assertEqual(
|
||||
result["errors"][0],
|
||||
"XML parser returned an error while parsing. Error: syntax error: line 1, column 0",
|
||||
)
|
Loading…
Reference in New Issue