Add ipv4 plugin (#120)

Add IPV4 filter plugin

SUMMARY


ISSUE TYPE


New Module Pull Request

COMPONENT NAME

ADDITIONAL INFORMATION

Reviewed-by: Nathaniel Case <this.is@nathanielca.se>
Reviewed-by: Priyam Sahoo <None>
Reviewed-by: None <None>
pull/127/head^2
Ashwini Mhatre 2022-01-17 17:11:11 +05:30 committed by GitHub
parent 8d2ad3811f
commit 02f2c62386
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1074 additions and 0 deletions

View File

@ -24,6 +24,7 @@ Name | Description
[ansible.utils.from_xml](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.from_xml_filter.rst)|Convert given XML string to native python dictionary.
[ansible.utils.get_path](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.get_path_filter.rst)|Retrieve the value in a variable using a path
[ansible.utils.index_of](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.index_of_filter.rst)|Find the indices of items in a list matching some criteria
[ansible.utils.ipv4](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.ipv4_filter.rst)|To filter only Ipv4 addresses Ipv4 filter is used.
[ansible.utils.param_list_compare](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.param_list_compare_filter.rst)|Generate the final param list combining/comparing base and provided parameters.
[ansible.utils.to_paths](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.to_paths_filter.rst)|Flatten a complex object into a dictionary of paths and values
[ansible.utils.to_xml](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.to_xml_filter.rst)|Convert given JSON string to XML

View File

@ -0,0 +1,3 @@
---
minor_changes:
- Add ipv4 filter plugin.

View File

@ -0,0 +1,184 @@
.. _ansible.utils.ipv4_filter:
******************
ansible.utils.ipv4
******************
**To filter only Ipv4 addresses Ipv4 filter is used.**
Version added: 2.5.0
.. contents::
:local:
:depth: 1
Synopsis
--------
- Sometimes you need only IPv4 addresses. To filter only Ipv4 addresses Ipv4 filter is used.
Parameters
----------
.. raw:: html
<table border=0 cellpadding=0 class="documentation-table">
<tr>
<th colspan="1">Parameter</th>
<th>Choices/<font color="blue">Defaults</font></th>
<th>Configuration</th>
<th width="100%">Comments</th>
</tr>
<tr>
<td colspan="1">
<div class="ansibleOptionAnchor" id="parameter-"></div>
<b>query</b>
<a class="ansibleOptionLink" href="#parameter-" title="Permalink to this option"></a>
<div style="font-size: small">
<span style="color: purple">string</span>
</div>
</td>
<td>
<b>Default:</b><br/><div style="color: blue">""</div>
</td>
<td>
</td>
<td>
<div>You can provide a single argument to each ipv4() filter.</div>
<div>Example. query type &#x27;ipv6&#x27; to convert ipv4 into ipv6</div>
</td>
</tr>
<tr>
<td colspan="1">
<div class="ansibleOptionAnchor" id="parameter-"></div>
<b>value</b>
<a class="ansibleOptionLink" href="#parameter-" title="Permalink to this option"></a>
<div style="font-size: small">
<span style="color: purple">list</span>
/ <span style="color: purple">elements=string</span>
/ <span style="color: red">required</span>
</div>
</td>
<td>
</td>
<td>
</td>
<td>
<div>list of subnets or individual address or any other values input for ipv4 plugin</div>
</td>
</tr>
</table>
<br/>
Examples
--------
.. code-block:: yaml
#### examples
# Ipv4 filter plugin with different queries.
- name: Set value as input list
ansible.builtin.set_fact:
value:
- 192.24.2.1
- host.fqdn
- ::1
- ''
- 192.168.32.0/24
- fe80::100/10
- 42540766412265424405338506004571095040/64
- True
- name: IPv4 filter to filter Ipv4 Address
debug:
msg: "{{ value|ansible.utils.ipv4 }}"
- name: convert IPv4 addresses into IPv6 addresses.
debug:
msg: "{{ value|ansible.utils.ipv4('ipv6') }}"
- name: convert IPv4 addresses into IPv6 addresses.
debug:
msg: "{{ value|ansible.utils.ipv4('address') }}"
# PLAY [Ipv4 filter plugin with different queries.] ******************************************************************
# TASK [Set value as input list] ***************************************************************************************
# ok: [localhost] => {"ansible_facts": {"value": ["192.24.2.1", "host.fqdn", "::1", "", "192.168.32.0/24",
# "fe80::100/10", "42540766412265424405338506004571095040/64", true]}, "changed": false}
# TASK [IPv4 filter to filter Ipv4 Address] *******************************************************************
# ok: [localhost] => {
# "msg": [
# "192.24.2.1",
# "192.168.32.0/24"
# ]
# }
#
# TASK [convert IPv4 addresses into IPv6 addresses.] **********************************************************
# ok: [localhost] => {
# "msg": [
# "::ffff:192.24.2.1/128",
# "::ffff:192.168.32.0/120"
# ]
# }
#
# TASK [convert IPv4 addresses into IPv6 addresses.] **********************************************************
# ok: [localhost] => {
# "msg": [
# "192.24.2.1"
# ]
# }
Return Values
-------------
Common return values are documented `here <https://docs.ansible.com/ansible/latest/reference_appendices/common_return_values.html#common-return-values>`_, the following are the fields unique to this filter:
.. raw:: html
<table border=0 cellpadding=0 class="documentation-table">
<tr>
<th colspan="1">Key</th>
<th>Returned</th>
<th width="100%">Description</th>
</tr>
<tr>
<td colspan="1">
<div class="ansibleOptionAnchor" id="return-"></div>
<b>data</b>
<a class="ansibleOptionLink" href="#return-" title="Permalink to this return value"></a>
<div style="font-size: small">
<span style="color: purple">list</span>
/ <span style="color: purple">elements=string</span>
</div>
</td>
<td></td>
<td>
<div>Returns list with values valid for a particular query.</div>
<br/>
</td>
</tr>
</table>
<br/><br/>
Status
------
Authors
~~~~~~~
- Ashwini Mhatre (@amhatre)
.. hint::
Configuration entries for each entry type have a low to high priority order. For example, a variable that is lower in the list will override a variable that is higher up.

161
plugins/filter/ipv4.py Normal file
View File

@ -0,0 +1,161 @@
# -*- coding: utf-8 -*-
# Copyright 2021 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
filter plugin file for ipaddr filters: ipv4
"""
from __future__ import absolute_import, division, print_function
from functools import partial
from ansible_collections.ansible.utils.plugins.plugin_utils.base.ipaddr_utils import (
ipaddr,
_need_netaddr,
)
from ansible.errors import AnsibleFilterError
from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import (
AnsibleArgSpecValidator,
)
__metaclass__ = type
try:
from jinja2.filters import pass_environment
except ImportError:
from jinja2.filters import environmentfilter as pass_environment
try:
import netaddr
except ImportError:
# in this case, we'll make the filters return error messages (see bottom)
netaddr = None
else:
class mac_linux(netaddr.mac_unix):
pass
mac_linux.word_fmt = "%.2x"
DOCUMENTATION = """
name: ipv4
author: Ashwini Mhatre (@amhatre)
version_added: "2.5.0"
short_description: To filter only Ipv4 addresses Ipv4 filter is used.
description:
- Sometimes you need only IPv4 addresses. To filter only Ipv4 addresses Ipv4 filter is used.
options:
value:
description:
- list of subnets or individual address or any other values input for ipv4 plugin
type: list
elements: str
required: True
query:
description:
- You can provide a single argument to each ipv4() filter.
- Example. query type 'ipv6' to convert ipv4 into ipv6
type: str
default: ''
notes:
"""
EXAMPLES = r"""
#### examples
# Ipv4 filter plugin with different queries.
- name: Set value as input list
ansible.builtin.set_fact:
value:
- 192.24.2.1
- host.fqdn
- ::1
- ''
- 192.168.32.0/24
- fe80::100/10
- 42540766412265424405338506004571095040/64
- True
- name: IPv4 filter to filter Ipv4 Address
debug:
msg: "{{ value|ansible.utils.ipv4 }}"
- name: convert IPv4 addresses into IPv6 addresses.
debug:
msg: "{{ value|ansible.utils.ipv4('ipv6') }}"
- name: convert IPv4 addresses into IPv6 addresses.
debug:
msg: "{{ value|ansible.utils.ipv4('address') }}"
# PLAY [Ipv4 filter plugin with different queries.] ******************************************************************
# TASK [Set value as input list] ***************************************************************************************
# ok: [localhost] => {"ansible_facts": {"value": ["192.24.2.1", "host.fqdn", "::1", "", "192.168.32.0/24",
# "fe80::100/10", "42540766412265424405338506004571095040/64", true]}, "changed": false}
# TASK [IPv4 filter to filter Ipv4 Address] *******************************************************************
# ok: [localhost] => {
# "msg": [
# "192.24.2.1",
# "192.168.32.0/24"
# ]
# }
#
# TASK [convert IPv4 addresses into IPv6 addresses.] **********************************************************
# ok: [localhost] => {
# "msg": [
# "::ffff:192.24.2.1/128",
# "::ffff:192.168.32.0/120"
# ]
# }
#
# TASK [convert IPv4 addresses into IPv6 addresses.] **********************************************************
# ok: [localhost] => {
# "msg": [
# "192.24.2.1"
# ]
# }
"""
RETURN = """
data:
type: list
elements: str
description:
- Returns list with values valid for a particular query.
"""
@pass_environment
def _ipv4(*args, **kwargs):
"""This filter is designed to return the input value if a query is True, and False if a query is False"""
keys = ["value", "query"]
data = dict(zip(keys, args[1:]))
data.update(kwargs)
aav = AnsibleArgSpecValidator(data=data, schema=DOCUMENTATION, name="ipv4")
valid, errors, updated_data = aav.validate()
if not valid:
raise AnsibleFilterError(errors)
return ipv4(**updated_data)
def ipv4(value, query=""):
return ipaddr(value, query, version=4, alias="ipv4")
class FilterModule(object):
"""IP address and network manipulation filters
"""
filter_map = {
# IP addresses and networks
"ipv4": _ipv4
}
def filters(self):
""" ipaddr filter """
if netaddr:
return self.filter_map
else:
return dict(
(f, partial(_need_netaddr, f)) for f in self.filter_map
)

View File

@ -0,0 +1,606 @@
# -*- coding: utf-8 -*-
# Copyright 2021 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The utils file for all ipaddr filters
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from ansible.utils.display import Display
import types
from ansible.errors import AnsibleFilterError
try:
import netaddr
except ImportError:
# in this case, we'll make the filters return error messages (see bottom)
netaddr = None
else:
class mac_linux(netaddr.mac_unix):
pass
mac_linux.word_fmt = "%.2x"
display = Display()
# ---- IP address and network query helpers ----
def _empty_ipaddr_query(v, vtype):
# We don't have any query to process, so just check what type the user
# expects, and return the IP address in a correct format
if v:
if vtype == "address":
return str(v.ip)
elif vtype == "network":
return str(v)
def _first_last(v):
if v.size == 2:
first_usable = int(netaddr.IPAddress(v.first))
last_usable = int(netaddr.IPAddress(v.last))
return first_usable, last_usable
elif v.size > 1:
first_usable = int(netaddr.IPAddress(v.first + 1))
last_usable = int(netaddr.IPAddress(v.last - 1))
return first_usable, last_usable
def _6to4_query(v, vtype, value):
if v.version == 4:
if v.size == 1:
ipconv = str(v.ip)
elif v.size > 1:
if v.ip != v.network:
ipconv = str(v.ip)
else:
return False
if ipaddr(ipconv, "public") or ipaddr(ipconv, "private"):
numbers = list(map(int, ipconv.split(".")))
try:
return "2002:{:02x}{:02x}:{:02x}{:02x}::1/48".format(*numbers)
except Exception:
pass
elif v.version == 6:
if vtype == "address":
if ipaddr(str(v), "2002::/16"):
return value
elif vtype == "network":
if v.ip != v.network:
if ipaddr(str(v.ip), "2002::/16"):
return value
return False
def _ip_query(v):
if v.size == 1:
return str(v.ip)
if v.size > 1:
# /31 networks in netaddr have no broadcast address
if v.ip != v.network or not v.broadcast:
return str(v.ip)
# For the first IPv6 address in a network, netaddr will return it as a network address, despite it being a valid host address.
elif v.version == 6 and v.ip == v.network:
return str(v.ip)
def _address_prefix_query(v):
if v.size > 2 and v.ip in (v.network, v.broadcast):
return False
return str(v.ip) + "/" + str(v.prefixlen)
def _bool_ipaddr_query(v):
if v:
return True
def _broadcast_query(v):
if v.size > 2:
return str(v.broadcast)
def _cidr_query(v):
return str(v)
def _cidr_lookup_query(v, iplist, value):
try:
if v in iplist:
return value
except Exception:
return False
def _first_usable_query(v, vtype):
if vtype == "address":
# Does it make sense to raise an error
raise AnsibleFilterError("Not a network address")
elif vtype == "network":
if v.size == 2:
return str(netaddr.IPAddress(int(v.network)))
elif v.size > 1:
return str(netaddr.IPAddress(int(v.network) + 1))
def _host_query(v):
if v.size == 1:
return str(v)
elif v.size > 1:
if v.ip != v.network or not v.broadcast:
return str(v.ip) + "/" + str(v.prefixlen)
def _hostmask_query(v):
return str(v.hostmask)
def _int_query(v, vtype):
if vtype == "address":
return int(v.ip)
elif vtype == "network":
return str(int(v.ip)) + "/" + str(int(v.prefixlen))
def _ip_prefix_query(v):
if v.size == 2:
return str(v.ip) + "/" + str(v.prefixlen)
elif v.size > 1:
if v.ip != v.network:
return str(v.ip) + "/" + str(v.prefixlen)
def _ip_netmask_query(v):
if v.size == 2:
return str(v.ip) + " " + str(v.netmask)
elif v.size > 1:
if v.ip != v.network:
return str(v.ip) + " " + str(v.netmask)
def _ipv4_query(v, value):
if v.version == 6:
try:
return str(v.ipv4())
except Exception:
return False
else:
return value
def _ipv6_query(v, value):
if v.version == 4:
return str(v.ipv6())
else:
return value
def _last_usable_query(v, vtype):
if vtype == "address":
# Does it make sense to raise an error
raise AnsibleFilterError("Not a network address")
elif vtype == "network":
if v.size > 1:
first_usable, last_usable = _first_last(v)
return str(netaddr.IPAddress(last_usable))
def _link_local_query(v, value):
v_ip = netaddr.IPAddress(str(v.ip))
if v.version == 4:
if ipaddr(str(v_ip), "169.254.0.0/24"):
return value
elif v.version == 6:
if ipaddr(str(v_ip), "fe80::/10"):
return value
def _loopback_query(v, value):
v_ip = netaddr.IPAddress(str(v.ip))
if v_ip.is_loopback():
return value
def _multicast_query(v, value):
if v.is_multicast():
return value
def _net_query(v):
if v.size > 1:
if v.ip == v.network:
return str(v.network) + "/" + str(v.prefixlen)
def _netmask_query(v):
return str(v.netmask)
def _network_query(v):
"""Return the network of a given IP or subnet"""
return str(v.network)
def _network_netmask_query(v):
return str(v.network) + " " + str(v.netmask)
def _network_wildcard_query(v):
return str(v.network) + " " + str(v.hostmask)
def _next_usable_query(v, vtype):
if vtype == "address":
# Does it make sense to raise an error
raise AnsibleFilterError("Not a network address")
elif vtype == "network":
if v.size > 1:
first_usable, last_usable = _first_last(v)
next_ip = int(netaddr.IPAddress(int(v.ip) + 1))
if next_ip >= first_usable and next_ip <= last_usable:
return str(netaddr.IPAddress(int(v.ip) + 1))
def _peer_query(v, vtype):
if vtype == "address":
raise AnsibleFilterError("Not a network address")
elif vtype == "network":
if v.size == 2:
return str(netaddr.IPAddress(int(v.ip) ^ 1))
if v.size == 4:
if int(v.ip) % 4 == 0:
raise AnsibleFilterError("Network address of /30 has no peer")
if int(v.ip) % 4 == 3:
raise AnsibleFilterError(
"Broadcast address of /30 has no peer"
)
return str(netaddr.IPAddress(int(v.ip) ^ 3))
raise AnsibleFilterError("Not a point-to-point network")
def _prefix_query(v):
return int(v.prefixlen)
def _previous_usable_query(v, vtype):
if vtype == "address":
# Does it make sense to raise an error
raise AnsibleFilterError("Not a network address")
elif vtype == "network":
if v.size > 1:
first_usable, last_usable = _first_last(v)
previous_ip = int(netaddr.IPAddress(int(v.ip) - 1))
if previous_ip >= first_usable and previous_ip <= last_usable:
return str(netaddr.IPAddress(int(v.ip) - 1))
def _private_query(v, value):
if v.is_private():
return value
def _public_query(v, value):
v_ip = netaddr.IPAddress(str(v.ip))
if all(
[
v_ip.is_unicast(),
not v_ip.is_private(),
not v_ip.is_loopback(),
not v_ip.is_netmask(),
not v_ip.is_hostmask(),
]
):
return value
def _range_usable_query(v, vtype):
if vtype == "address":
# Does it make sense to raise an error
raise AnsibleFilterError("Not a network address")
elif vtype == "network":
if v.size > 1:
first_usable, last_usable = _first_last(v)
first_usable = str(netaddr.IPAddress(first_usable))
last_usable = str(netaddr.IPAddress(last_usable))
return "{0}-{1}".format(first_usable, last_usable)
def _revdns_query(v):
v_ip = netaddr.IPAddress(str(v.ip))
return v_ip.reverse_dns
def _size_query(v):
return v.size
def _size_usable_query(v):
if v.size == 1:
return 0
elif v.size == 2:
return 2
return v.size - 2
def _subnet_query(v):
return str(v.cidr)
def _type_query(v):
if v.size == 1:
return "address"
if v.size > 1:
if v.ip != v.network:
return "address"
else:
return "network"
def _unicast_query(v, value):
if v.is_unicast():
return value
def _version_query(v):
return v.version
def _wrap_query(v, vtype, value):
if v.version == 6:
if vtype == "address":
return "[" + str(v.ip) + "]"
elif vtype == "network":
return "[" + str(v.ip) + "]/" + str(v.prefixlen)
else:
return value
def ipaddr(value, query="", version=False, alias="ipaddr"):
""" Check if string is an IP address or network and filter it """
query_func_extra_args = {
"": ("vtype",),
"6to4": ("vtype", "value"),
"cidr_lookup": ("iplist", "value"),
"first_usable": ("vtype",),
"int": ("vtype",),
"ipv4": ("value",),
"ipv6": ("value",),
"last_usable": ("vtype",),
"link-local": ("value",),
"loopback": ("value",),
"lo": ("value",),
"multicast": ("value",),
"next_usable": ("vtype",),
"peer": ("vtype",),
"previous_usable": ("vtype",),
"private": ("value",),
"public": ("value",),
"unicast": ("value",),
"range_usable": ("vtype",),
"wrap": ("vtype", "value"),
}
query_func_map = {
"": _empty_ipaddr_query,
"6to4": _6to4_query,
"address": _ip_query,
"address/prefix": _address_prefix_query, # deprecate
"bool": _bool_ipaddr_query,
"broadcast": _broadcast_query,
"cidr": _cidr_query,
"cidr_lookup": _cidr_lookup_query,
"first_usable": _first_usable_query,
"gateway": _address_prefix_query, # deprecate
"gw": _address_prefix_query, # deprecate
"host": _host_query,
"host/prefix": _address_prefix_query, # deprecate
"hostmask": _hostmask_query,
"hostnet": _address_prefix_query, # deprecate
"int": _int_query,
"ip": _ip_query,
"ip/prefix": _ip_prefix_query,
"ip_netmask": _ip_netmask_query,
# 'ip_wildcard': _ip_wildcard_query, built then could not think of use case
"ipv4": _ipv4_query,
"ipv6": _ipv6_query,
"last_usable": _last_usable_query,
"link-local": _link_local_query,
"lo": _loopback_query,
"loopback": _loopback_query,
"multicast": _multicast_query,
"net": _net_query,
"next_usable": _next_usable_query,
"netmask": _netmask_query,
"network": _network_query,
"network_id": _network_query,
"network/prefix": _subnet_query,
"network_netmask": _network_netmask_query,
"network_wildcard": _network_wildcard_query,
"peer": _peer_query,
"prefix": _prefix_query,
"previous_usable": _previous_usable_query,
"private": _private_query,
"public": _public_query,
"range_usable": _range_usable_query,
"revdns": _revdns_query,
"router": _address_prefix_query, # deprecate
"size": _size_query,
"size_usable": _size_usable_query,
"subnet": _subnet_query,
"type": _type_query,
"unicast": _unicast_query,
"v4": _ipv4_query,
"v6": _ipv6_query,
"version": _version_query,
"wildcard": _hostmask_query,
"wrap": _wrap_query,
}
vtype = None
# Check if value is a list and parse each element
if isinstance(value, (list, tuple, types.GeneratorType)):
_ret = [ipaddr(element, str(query), version) for element in value]
return [item for item in _ret if item]
elif not value or value is True:
# TODO: Remove this check in a major version release of collection with porting guide
# TODO: and raise exception commented out below
display.warning(
"The value '%s' is not a valid IP address or network, passing this value to ipaddr filter"
" might result in breaking change in future." % value
)
return False
# Check if value is a number and convert it to an IP address
elif str(value).isdigit():
# We don't know what IP version to assume, so let's check IPv4 first,
# then IPv6
try:
if (not version) or (version and version == 4):
v = netaddr.IPNetwork("0.0.0.0/0")
v.value = int(value)
v.prefixlen = 32
elif version and version == 6:
v = netaddr.IPNetwork("::/0")
v.value = int(value)
v.prefixlen = 128
# IPv4 didn't work the first time, so it definitely has to be IPv6
except Exception:
try:
v = netaddr.IPNetwork("::/0")
v.value = int(value)
v.prefixlen = 128
# The value is too big for IPv6. Are you a nanobot?
except Exception:
return False
# We got an IP address, let's mark it as such
value = str(v)
vtype = "address"
# value has not been recognized, check if it's a valid IP string
else:
try:
v = netaddr.IPNetwork(value)
# value is a valid IP string, check if user specified
# CIDR prefix or just an IP address, this will indicate default
# output format
try:
address, prefix = value.split("/")
vtype = "network"
except Exception:
vtype = "address"
# value hasn't been recognized, maybe it's a numerical CIDR?
except Exception:
try:
address, prefix = value.split("/")
address.isdigit()
address = int(address)
prefix.isdigit()
prefix = int(prefix)
# It's not numerical CIDR, give up
except Exception:
return False
# It is something, so let's try and build a CIDR from the parts
try:
v = netaddr.IPNetwork("0.0.0.0/0")
v.value = address
v.prefixlen = prefix
# It's not a valid IPv4 CIDR
except Exception:
try:
v = netaddr.IPNetwork("::/0")
v.value = address
v.prefixlen = prefix
# It's not a valid IPv6 CIDR. Give up.
except Exception:
return False
# We have a valid CIDR, so let's write it in correct format
value = str(v)
vtype = "network"
# We have a query string but it's not in the known query types. Check if
# that string is a valid subnet, if so, we can check later if given IP
# address/network is inside that specific subnet
try:
# ?? 6to4 and link-local were True here before. Should they still?
if (
query
and (query not in query_func_map or query == "cidr_lookup")
and not str(query).isdigit()
and ipaddr(query, "network")
):
iplist = netaddr.IPSet([netaddr.IPNetwork(query)])
query = "cidr_lookup"
except Exception:
pass
# This code checks if value maches the IP version the user wants, ie. if
# it's any version ("ipaddr()"), IPv4 ("ipv4()") or IPv6 ("ipv6()")
# If version does not match, return False
if version and v.version != version:
return False
extras = []
for arg in query_func_extra_args.get(query, tuple()):
extras.append(locals()[arg])
try:
return query_func_map[query](v, *extras)
except KeyError:
try:
float(query)
if v.size == 1:
if vtype == "address":
return str(v.ip)
elif vtype == "network":
return str(v)
elif v.size > 1:
try:
return str(v[query]) + "/" + str(v.prefixlen)
except Exception:
return False
else:
return value
except Exception:
raise AnsibleFilterError(
alias + ": unknown filter type: %s" % query
)
return False
def _need_netaddr(f_name, *args, **kwargs):
"""
verify python's netaddr for these filters to work
"""
raise AnsibleFilterError(
"The %s filter requires python's netaddr be "
"installed on the ansible controller" % f_name
)

View File

@ -2,3 +2,4 @@ jsonschema==3.2.0
textfsm
ttp
xmltodict
netaddr

View File

@ -5,3 +5,4 @@ ipaddress ; python_version < '3.0'
mock ; python_version < '3.5'
pytest-xdist
yamllint
netaddr

View File

@ -0,0 +1,35 @@
---
- name: set ipaddress list
ansible.builtin.set_fact:
value:
- 192.24.2.1
- host.fqdn
- ::1
- 192.168.32.0/24
- fe80::100/10
- "42540766412265424405338506004571095040/64"
- True
- name: ipv4 filter
ansible.builtin.set_fact:
result1: "{{ value|ansible.utils.ipv4 }}"
- name: Assert result for ipv4.
assert:
that: "{{ result1 == result1_val }}"
- name: convert ipv4 to ipv6 filter
ansible.builtin.set_fact:
result2: "{{ value|ansible.utils.ipv4('ipv6') }}"
- name: Assert result for ipv6.
assert:
that: "{{ result2 == result2_val }}"
- name: Ipv4 filter with address query
ansible.builtin.set_fact:
result3: "{{ value|ansible.utils.ipv4('address') }}"
- name: Assert result for ipv4 filter with address query.
assert:
that: "{{ result3 == result3_val }}"

View File

@ -0,0 +1,14 @@
---
- name: Recursively find all test files
find:
file_type: file
paths: "{{ role_path }}/tasks"
recurse: false
use_regex: true
patterns:
- '^(?!_|main).+$'
delegate_to: localhost
register: found
- include: "{{ item.path }}"
loop: "{{ found.files }}"

View File

@ -0,0 +1,11 @@
---
result1_val:
- "192.24.2.1"
- "192.168.32.0/24"
result2_val:
- "::ffff:192.24.2.1/128"
- "::ffff:192.168.32.0/120"
result3_val:
- "192.24.2.1"

View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
# Copyright 2021 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
Unit test file for ipwrap filter plugin
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import unittest
from ansible_collections.ansible.utils.plugins.filter.ipv4 import _ipv4
VALID_DATA = [
"192.24.2.1",
"host.fqdn",
"::1",
"",
"192.168.32.0/24",
"fe80::100/10",
"42540766412265424405338506004571095040/64",
True,
]
VALID_OUTPUT = ["192.24.2.1", "192.168.32.0/24"]
VALID_OUTPUT1 = ["::ffff:192.24.2.1/128", "::ffff:192.168.32.0/120"]
VALID_OUTPUT2 = ["192.24.2.1"]
class TestIp4(unittest.TestCase):
def setUp(self):
pass
def test_ipv4_filter_empty_query(self):
"""Check ipv4 filter empty query"""
args = ["", VALID_DATA, ""]
result = _ipv4(*args)
self.assertEqual(result, VALID_OUTPUT)
def test_ipv4_ipv6_conversion(self):
"""Check ipv4 to ipv6 conversion"""
args = ["", VALID_DATA, "ipv6"]
result = _ipv4(*args)
self.assertEqual(result, VALID_OUTPUT1)
def test_ipv4_filter_address_query(self):
"""Check ipv4 filter address query"""
args = ["", VALID_DATA, "address"]
result = _ipv4(*args)
self.assertEqual(result, VALID_OUTPUT2)