diff --git a/README.md b/README.md index 091db35..61631a1 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Name | Description [ansible.utils.from_xml](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.from_xml_filter.rst)|Convert given XML string to native python dictionary. [ansible.utils.get_path](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.get_path_filter.rst)|Retrieve the value in a variable using a path [ansible.utils.index_of](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.index_of_filter.rst)|Find the indices of items in a list matching some criteria +[ansible.utils.ipv4](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.ipv4_filter.rst)|To filter only Ipv4 addresses Ipv4 filter is used. [ansible.utils.param_list_compare](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.param_list_compare_filter.rst)|Generate the final param list combining/comparing base and provided parameters. [ansible.utils.to_paths](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.to_paths_filter.rst)|Flatten a complex object into a dictionary of paths and values [ansible.utils.to_xml](https://github.com/ansible-collections/ansible.utils/blob/main/docs/ansible.utils.to_xml_filter.rst)|Convert given JSON string to XML diff --git a/changelogs/fragments/add_ipv4_filter_plugin.yaml b/changelogs/fragments/add_ipv4_filter_plugin.yaml new file mode 100644 index 0000000..92628e0 --- /dev/null +++ b/changelogs/fragments/add_ipv4_filter_plugin.yaml @@ -0,0 +1,3 @@ +--- +minor_changes: + - Add ipv4 filter plugin. diff --git a/docs/ansible.utils.ipv4_filter.rst b/docs/ansible.utils.ipv4_filter.rst new file mode 100644 index 0000000..8f490b2 --- /dev/null +++ b/docs/ansible.utils.ipv4_filter.rst @@ -0,0 +1,184 @@ +.. _ansible.utils.ipv4_filter: + + +****************** +ansible.utils.ipv4 +****************** + +**To filter only Ipv4 addresses Ipv4 filter is used.** + + +Version added: 2.5.0 + +.. contents:: + :local: + :depth: 1 + + +Synopsis +-------- +- Sometimes you need only IPv4 addresses. To filter only Ipv4 addresses Ipv4 filter is used. + + + + +Parameters +---------- + +.. raw:: html + + + + + + + + + + + + + + + + + + + + +
ParameterChoices/DefaultsConfigurationComments
+
+ query + +
+ string +
+
+ Default:
""
+
+ +
You can provide a single argument to each ipv4() filter.
+
Example. query type 'ipv6' to convert ipv4 into ipv6
+
+
+ value + +
+ list + / elements=string + / required +
+
+ + +
list of subnets or individual address or any other values input for ipv4 plugin
+
+
+ + + + +Examples +-------- + +.. code-block:: yaml + + #### examples + # Ipv4 filter plugin with different queries. + - name: Set value as input list + ansible.builtin.set_fact: + value: + - 192.24.2.1 + - host.fqdn + - ::1 + - '' + - 192.168.32.0/24 + - fe80::100/10 + - 42540766412265424405338506004571095040/64 + - True + - name: IPv4 filter to filter Ipv4 Address + debug: + msg: "{{ value|ansible.utils.ipv4 }}" + + - name: convert IPv4 addresses into IPv6 addresses. + debug: + msg: "{{ value|ansible.utils.ipv4('ipv6') }}" + + - name: convert IPv4 addresses into IPv6 addresses. + debug: + msg: "{{ value|ansible.utils.ipv4('address') }}" + + + # PLAY [Ipv4 filter plugin with different queries.] ****************************************************************** + # TASK [Set value as input list] *************************************************************************************** + # ok: [localhost] => {"ansible_facts": {"value": ["192.24.2.1", "host.fqdn", "::1", "", "192.168.32.0/24", + # "fe80::100/10", "42540766412265424405338506004571095040/64", true]}, "changed": false} + # TASK [IPv4 filter to filter Ipv4 Address] ******************************************************************* + # ok: [localhost] => { + # "msg": [ + # "192.24.2.1", + # "192.168.32.0/24" + # ] + # } + # + # TASK [convert IPv4 addresses into IPv6 addresses.] ********************************************************** + # ok: [localhost] => { + # "msg": [ + # "::ffff:192.24.2.1/128", + # "::ffff:192.168.32.0/120" + # ] + # } + # + # TASK [convert IPv4 addresses into IPv6 addresses.] ********************************************************** + # ok: [localhost] => { + # "msg": [ + # "192.24.2.1" + # ] + # } + + + +Return Values +------------- +Common return values are documented `here `_, the following are the fields unique to this filter: + +.. raw:: html + + + + + + + + + + + + +
KeyReturnedDescription
+
+ data + +
+ list + / elements=string +
+
+
Returns list with values valid for a particular query.
+
+
+

+ + +Status +------ + + +Authors +~~~~~~~ + +- Ashwini Mhatre (@amhatre) + + +.. hint:: + Configuration entries for each entry type have a low to high priority order. For example, a variable that is lower in the list will override a variable that is higher up. diff --git a/plugins/filter/ipv4.py b/plugins/filter/ipv4.py new file mode 100644 index 0000000..930eaf5 --- /dev/null +++ b/plugins/filter/ipv4.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 Red Hat +# GNU General Public License v3.0+ +# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +""" +filter plugin file for ipaddr filters: ipv4 +""" +from __future__ import absolute_import, division, print_function +from functools import partial +from ansible_collections.ansible.utils.plugins.plugin_utils.base.ipaddr_utils import ( + ipaddr, + _need_netaddr, +) +from ansible.errors import AnsibleFilterError +from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import ( + AnsibleArgSpecValidator, +) + +__metaclass__ = type + + +try: + from jinja2.filters import pass_environment +except ImportError: + from jinja2.filters import environmentfilter as pass_environment + +try: + import netaddr +except ImportError: + # in this case, we'll make the filters return error messages (see bottom) + netaddr = None +else: + + class mac_linux(netaddr.mac_unix): + pass + + mac_linux.word_fmt = "%.2x" + +DOCUMENTATION = """ + name: ipv4 + author: Ashwini Mhatre (@amhatre) + version_added: "2.5.0" + short_description: To filter only Ipv4 addresses Ipv4 filter is used. + description: + - Sometimes you need only IPv4 addresses. To filter only Ipv4 addresses Ipv4 filter is used. + options: + value: + description: + - list of subnets or individual address or any other values input for ipv4 plugin + type: list + elements: str + required: True + query: + description: + - You can provide a single argument to each ipv4() filter. + - Example. query type 'ipv6' to convert ipv4 into ipv6 + type: str + default: '' + notes: +""" + +EXAMPLES = r""" +#### examples +# Ipv4 filter plugin with different queries. +- name: Set value as input list + ansible.builtin.set_fact: + value: + - 192.24.2.1 + - host.fqdn + - ::1 + - '' + - 192.168.32.0/24 + - fe80::100/10 + - 42540766412265424405338506004571095040/64 + - True +- name: IPv4 filter to filter Ipv4 Address + debug: + msg: "{{ value|ansible.utils.ipv4 }}" + +- name: convert IPv4 addresses into IPv6 addresses. + debug: + msg: "{{ value|ansible.utils.ipv4('ipv6') }}" + +- name: convert IPv4 addresses into IPv6 addresses. + debug: + msg: "{{ value|ansible.utils.ipv4('address') }}" + + +# PLAY [Ipv4 filter plugin with different queries.] ****************************************************************** +# TASK [Set value as input list] *************************************************************************************** +# ok: [localhost] => {"ansible_facts": {"value": ["192.24.2.1", "host.fqdn", "::1", "", "192.168.32.0/24", +# "fe80::100/10", "42540766412265424405338506004571095040/64", true]}, "changed": false} +# TASK [IPv4 filter to filter Ipv4 Address] ******************************************************************* +# ok: [localhost] => { +# "msg": [ +# "192.24.2.1", +# "192.168.32.0/24" +# ] +# } +# +# TASK [convert IPv4 addresses into IPv6 addresses.] ********************************************************** +# ok: [localhost] => { +# "msg": [ +# "::ffff:192.24.2.1/128", +# "::ffff:192.168.32.0/120" +# ] +# } +# +# TASK [convert IPv4 addresses into IPv6 addresses.] ********************************************************** +# ok: [localhost] => { +# "msg": [ +# "192.24.2.1" +# ] +# } + +""" + +RETURN = """ + data: + type: list + elements: str + description: + - Returns list with values valid for a particular query. +""" + + +@pass_environment +def _ipv4(*args, **kwargs): + """This filter is designed to return the input value if a query is True, and False if a query is False""" + keys = ["value", "query"] + data = dict(zip(keys, args[1:])) + data.update(kwargs) + aav = AnsibleArgSpecValidator(data=data, schema=DOCUMENTATION, name="ipv4") + valid, errors, updated_data = aav.validate() + if not valid: + raise AnsibleFilterError(errors) + return ipv4(**updated_data) + + +def ipv4(value, query=""): + return ipaddr(value, query, version=4, alias="ipv4") + + +class FilterModule(object): + """IP address and network manipulation filters + """ + + filter_map = { + # IP addresses and networks + "ipv4": _ipv4 + } + + def filters(self): + """ ipaddr filter """ + if netaddr: + return self.filter_map + else: + return dict( + (f, partial(_need_netaddr, f)) for f in self.filter_map + ) diff --git a/plugins/plugin_utils/base/ipaddr_utils.py b/plugins/plugin_utils/base/ipaddr_utils.py new file mode 100644 index 0000000..0b83537 --- /dev/null +++ b/plugins/plugin_utils/base/ipaddr_utils.py @@ -0,0 +1,606 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 Red Hat +# GNU General Public License v3.0+ +# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +""" +The utils file for all ipaddr filters +""" + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + + +from ansible.utils.display import Display +import types +from ansible.errors import AnsibleFilterError + +try: + import netaddr +except ImportError: + # in this case, we'll make the filters return error messages (see bottom) + netaddr = None +else: + + class mac_linux(netaddr.mac_unix): + pass + + mac_linux.word_fmt = "%.2x" + +display = Display() + + +# ---- IP address and network query helpers ---- +def _empty_ipaddr_query(v, vtype): + # We don't have any query to process, so just check what type the user + # expects, and return the IP address in a correct format + if v: + if vtype == "address": + return str(v.ip) + elif vtype == "network": + return str(v) + + +def _first_last(v): + if v.size == 2: + first_usable = int(netaddr.IPAddress(v.first)) + last_usable = int(netaddr.IPAddress(v.last)) + return first_usable, last_usable + elif v.size > 1: + first_usable = int(netaddr.IPAddress(v.first + 1)) + last_usable = int(netaddr.IPAddress(v.last - 1)) + return first_usable, last_usable + + +def _6to4_query(v, vtype, value): + if v.version == 4: + if v.size == 1: + ipconv = str(v.ip) + elif v.size > 1: + if v.ip != v.network: + ipconv = str(v.ip) + else: + return False + + if ipaddr(ipconv, "public") or ipaddr(ipconv, "private"): + numbers = list(map(int, ipconv.split("."))) + + try: + return "2002:{:02x}{:02x}:{:02x}{:02x}::1/48".format(*numbers) + except Exception: + pass + + elif v.version == 6: + if vtype == "address": + if ipaddr(str(v), "2002::/16"): + return value + elif vtype == "network": + if v.ip != v.network: + if ipaddr(str(v.ip), "2002::/16"): + return value + + return False + + +def _ip_query(v): + if v.size == 1: + return str(v.ip) + if v.size > 1: + # /31 networks in netaddr have no broadcast address + if v.ip != v.network or not v.broadcast: + return str(v.ip) + # For the first IPv6 address in a network, netaddr will return it as a network address, despite it being a valid host address. + elif v.version == 6 and v.ip == v.network: + return str(v.ip) + + +def _address_prefix_query(v): + if v.size > 2 and v.ip in (v.network, v.broadcast): + return False + return str(v.ip) + "/" + str(v.prefixlen) + + +def _bool_ipaddr_query(v): + if v: + return True + + +def _broadcast_query(v): + if v.size > 2: + return str(v.broadcast) + + +def _cidr_query(v): + return str(v) + + +def _cidr_lookup_query(v, iplist, value): + try: + if v in iplist: + return value + except Exception: + return False + + +def _first_usable_query(v, vtype): + if vtype == "address": + # Does it make sense to raise an error + raise AnsibleFilterError("Not a network address") + elif vtype == "network": + if v.size == 2: + return str(netaddr.IPAddress(int(v.network))) + elif v.size > 1: + return str(netaddr.IPAddress(int(v.network) + 1)) + + +def _host_query(v): + if v.size == 1: + return str(v) + elif v.size > 1: + if v.ip != v.network or not v.broadcast: + return str(v.ip) + "/" + str(v.prefixlen) + + +def _hostmask_query(v): + return str(v.hostmask) + + +def _int_query(v, vtype): + if vtype == "address": + return int(v.ip) + elif vtype == "network": + return str(int(v.ip)) + "/" + str(int(v.prefixlen)) + + +def _ip_prefix_query(v): + if v.size == 2: + return str(v.ip) + "/" + str(v.prefixlen) + elif v.size > 1: + if v.ip != v.network: + return str(v.ip) + "/" + str(v.prefixlen) + + +def _ip_netmask_query(v): + if v.size == 2: + return str(v.ip) + " " + str(v.netmask) + elif v.size > 1: + if v.ip != v.network: + return str(v.ip) + " " + str(v.netmask) + + +def _ipv4_query(v, value): + if v.version == 6: + try: + return str(v.ipv4()) + except Exception: + return False + else: + return value + + +def _ipv6_query(v, value): + if v.version == 4: + return str(v.ipv6()) + else: + return value + + +def _last_usable_query(v, vtype): + if vtype == "address": + # Does it make sense to raise an error + raise AnsibleFilterError("Not a network address") + elif vtype == "network": + if v.size > 1: + first_usable, last_usable = _first_last(v) + return str(netaddr.IPAddress(last_usable)) + + +def _link_local_query(v, value): + v_ip = netaddr.IPAddress(str(v.ip)) + if v.version == 4: + if ipaddr(str(v_ip), "169.254.0.0/24"): + return value + + elif v.version == 6: + if ipaddr(str(v_ip), "fe80::/10"): + return value + + +def _loopback_query(v, value): + v_ip = netaddr.IPAddress(str(v.ip)) + if v_ip.is_loopback(): + return value + + +def _multicast_query(v, value): + if v.is_multicast(): + return value + + +def _net_query(v): + if v.size > 1: + if v.ip == v.network: + return str(v.network) + "/" + str(v.prefixlen) + + +def _netmask_query(v): + return str(v.netmask) + + +def _network_query(v): + """Return the network of a given IP or subnet""" + return str(v.network) + + +def _network_netmask_query(v): + return str(v.network) + " " + str(v.netmask) + + +def _network_wildcard_query(v): + return str(v.network) + " " + str(v.hostmask) + + +def _next_usable_query(v, vtype): + if vtype == "address": + # Does it make sense to raise an error + raise AnsibleFilterError("Not a network address") + elif vtype == "network": + if v.size > 1: + first_usable, last_usable = _first_last(v) + next_ip = int(netaddr.IPAddress(int(v.ip) + 1)) + if next_ip >= first_usable and next_ip <= last_usable: + return str(netaddr.IPAddress(int(v.ip) + 1)) + + +def _peer_query(v, vtype): + if vtype == "address": + raise AnsibleFilterError("Not a network address") + elif vtype == "network": + if v.size == 2: + return str(netaddr.IPAddress(int(v.ip) ^ 1)) + if v.size == 4: + if int(v.ip) % 4 == 0: + raise AnsibleFilterError("Network address of /30 has no peer") + if int(v.ip) % 4 == 3: + raise AnsibleFilterError( + "Broadcast address of /30 has no peer" + ) + return str(netaddr.IPAddress(int(v.ip) ^ 3)) + raise AnsibleFilterError("Not a point-to-point network") + + +def _prefix_query(v): + return int(v.prefixlen) + + +def _previous_usable_query(v, vtype): + if vtype == "address": + # Does it make sense to raise an error + raise AnsibleFilterError("Not a network address") + elif vtype == "network": + if v.size > 1: + first_usable, last_usable = _first_last(v) + previous_ip = int(netaddr.IPAddress(int(v.ip) - 1)) + if previous_ip >= first_usable and previous_ip <= last_usable: + return str(netaddr.IPAddress(int(v.ip) - 1)) + + +def _private_query(v, value): + if v.is_private(): + return value + + +def _public_query(v, value): + v_ip = netaddr.IPAddress(str(v.ip)) + if all( + [ + v_ip.is_unicast(), + not v_ip.is_private(), + not v_ip.is_loopback(), + not v_ip.is_netmask(), + not v_ip.is_hostmask(), + ] + ): + return value + + +def _range_usable_query(v, vtype): + if vtype == "address": + # Does it make sense to raise an error + raise AnsibleFilterError("Not a network address") + elif vtype == "network": + if v.size > 1: + first_usable, last_usable = _first_last(v) + first_usable = str(netaddr.IPAddress(first_usable)) + last_usable = str(netaddr.IPAddress(last_usable)) + return "{0}-{1}".format(first_usable, last_usable) + + +def _revdns_query(v): + v_ip = netaddr.IPAddress(str(v.ip)) + return v_ip.reverse_dns + + +def _size_query(v): + return v.size + + +def _size_usable_query(v): + if v.size == 1: + return 0 + elif v.size == 2: + return 2 + return v.size - 2 + + +def _subnet_query(v): + return str(v.cidr) + + +def _type_query(v): + if v.size == 1: + return "address" + if v.size > 1: + if v.ip != v.network: + return "address" + else: + return "network" + + +def _unicast_query(v, value): + if v.is_unicast(): + return value + + +def _version_query(v): + return v.version + + +def _wrap_query(v, vtype, value): + if v.version == 6: + if vtype == "address": + return "[" + str(v.ip) + "]" + elif vtype == "network": + return "[" + str(v.ip) + "]/" + str(v.prefixlen) + else: + return value + + +def ipaddr(value, query="", version=False, alias="ipaddr"): + """ Check if string is an IP address or network and filter it """ + + query_func_extra_args = { + "": ("vtype",), + "6to4": ("vtype", "value"), + "cidr_lookup": ("iplist", "value"), + "first_usable": ("vtype",), + "int": ("vtype",), + "ipv4": ("value",), + "ipv6": ("value",), + "last_usable": ("vtype",), + "link-local": ("value",), + "loopback": ("value",), + "lo": ("value",), + "multicast": ("value",), + "next_usable": ("vtype",), + "peer": ("vtype",), + "previous_usable": ("vtype",), + "private": ("value",), + "public": ("value",), + "unicast": ("value",), + "range_usable": ("vtype",), + "wrap": ("vtype", "value"), + } + + query_func_map = { + "": _empty_ipaddr_query, + "6to4": _6to4_query, + "address": _ip_query, + "address/prefix": _address_prefix_query, # deprecate + "bool": _bool_ipaddr_query, + "broadcast": _broadcast_query, + "cidr": _cidr_query, + "cidr_lookup": _cidr_lookup_query, + "first_usable": _first_usable_query, + "gateway": _address_prefix_query, # deprecate + "gw": _address_prefix_query, # deprecate + "host": _host_query, + "host/prefix": _address_prefix_query, # deprecate + "hostmask": _hostmask_query, + "hostnet": _address_prefix_query, # deprecate + "int": _int_query, + "ip": _ip_query, + "ip/prefix": _ip_prefix_query, + "ip_netmask": _ip_netmask_query, + # 'ip_wildcard': _ip_wildcard_query, built then could not think of use case + "ipv4": _ipv4_query, + "ipv6": _ipv6_query, + "last_usable": _last_usable_query, + "link-local": _link_local_query, + "lo": _loopback_query, + "loopback": _loopback_query, + "multicast": _multicast_query, + "net": _net_query, + "next_usable": _next_usable_query, + "netmask": _netmask_query, + "network": _network_query, + "network_id": _network_query, + "network/prefix": _subnet_query, + "network_netmask": _network_netmask_query, + "network_wildcard": _network_wildcard_query, + "peer": _peer_query, + "prefix": _prefix_query, + "previous_usable": _previous_usable_query, + "private": _private_query, + "public": _public_query, + "range_usable": _range_usable_query, + "revdns": _revdns_query, + "router": _address_prefix_query, # deprecate + "size": _size_query, + "size_usable": _size_usable_query, + "subnet": _subnet_query, + "type": _type_query, + "unicast": _unicast_query, + "v4": _ipv4_query, + "v6": _ipv6_query, + "version": _version_query, + "wildcard": _hostmask_query, + "wrap": _wrap_query, + } + + vtype = None + + # Check if value is a list and parse each element + if isinstance(value, (list, tuple, types.GeneratorType)): + _ret = [ipaddr(element, str(query), version) for element in value] + return [item for item in _ret if item] + + elif not value or value is True: + # TODO: Remove this check in a major version release of collection with porting guide + # TODO: and raise exception commented out below + display.warning( + "The value '%s' is not a valid IP address or network, passing this value to ipaddr filter" + " might result in breaking change in future." % value + ) + return False + + # Check if value is a number and convert it to an IP address + elif str(value).isdigit(): + + # We don't know what IP version to assume, so let's check IPv4 first, + # then IPv6 + try: + if (not version) or (version and version == 4): + v = netaddr.IPNetwork("0.0.0.0/0") + v.value = int(value) + v.prefixlen = 32 + elif version and version == 6: + v = netaddr.IPNetwork("::/0") + v.value = int(value) + v.prefixlen = 128 + + # IPv4 didn't work the first time, so it definitely has to be IPv6 + except Exception: + try: + v = netaddr.IPNetwork("::/0") + v.value = int(value) + v.prefixlen = 128 + + # The value is too big for IPv6. Are you a nanobot? + except Exception: + return False + + # We got an IP address, let's mark it as such + value = str(v) + vtype = "address" + + # value has not been recognized, check if it's a valid IP string + else: + try: + v = netaddr.IPNetwork(value) + + # value is a valid IP string, check if user specified + # CIDR prefix or just an IP address, this will indicate default + # output format + try: + address, prefix = value.split("/") + vtype = "network" + except Exception: + vtype = "address" + + # value hasn't been recognized, maybe it's a numerical CIDR? + except Exception: + try: + address, prefix = value.split("/") + address.isdigit() + address = int(address) + prefix.isdigit() + prefix = int(prefix) + + # It's not numerical CIDR, give up + except Exception: + return False + + # It is something, so let's try and build a CIDR from the parts + try: + v = netaddr.IPNetwork("0.0.0.0/0") + v.value = address + v.prefixlen = prefix + + # It's not a valid IPv4 CIDR + except Exception: + try: + v = netaddr.IPNetwork("::/0") + v.value = address + v.prefixlen = prefix + + # It's not a valid IPv6 CIDR. Give up. + except Exception: + return False + + # We have a valid CIDR, so let's write it in correct format + value = str(v) + vtype = "network" + + # We have a query string but it's not in the known query types. Check if + # that string is a valid subnet, if so, we can check later if given IP + # address/network is inside that specific subnet + try: + # ?? 6to4 and link-local were True here before. Should they still? + if ( + query + and (query not in query_func_map or query == "cidr_lookup") + and not str(query).isdigit() + and ipaddr(query, "network") + ): + iplist = netaddr.IPSet([netaddr.IPNetwork(query)]) + query = "cidr_lookup" + except Exception: + pass + + # This code checks if value maches the IP version the user wants, ie. if + # it's any version ("ipaddr()"), IPv4 ("ipv4()") or IPv6 ("ipv6()") + # If version does not match, return False + if version and v.version != version: + return False + + extras = [] + for arg in query_func_extra_args.get(query, tuple()): + extras.append(locals()[arg]) + try: + return query_func_map[query](v, *extras) + except KeyError: + try: + float(query) + if v.size == 1: + if vtype == "address": + return str(v.ip) + elif vtype == "network": + return str(v) + + elif v.size > 1: + try: + return str(v[query]) + "/" + str(v.prefixlen) + except Exception: + return False + + else: + return value + + except Exception: + raise AnsibleFilterError( + alias + ": unknown filter type: %s" % query + ) + + return False + + +def _need_netaddr(f_name, *args, **kwargs): + """ + verify python's netaddr for these filters to work + """ + raise AnsibleFilterError( + "The %s filter requires python's netaddr be " + "installed on the ansible controller" % f_name + ) diff --git a/requirements.txt b/requirements.txt index 6963d42..da5f477 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ jsonschema==3.2.0 textfsm ttp xmltodict +netaddr diff --git a/test-requirements.txt b/test-requirements.txt index e454ff0..8958f9f 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -5,3 +5,4 @@ ipaddress ; python_version < '3.0' mock ; python_version < '3.5' pytest-xdist yamllint +netaddr diff --git a/tests/integration/targets/utils_ipaddr_filter/tasks/ipv4.yaml b/tests/integration/targets/utils_ipaddr_filter/tasks/ipv4.yaml new file mode 100644 index 0000000..2f7c754 --- /dev/null +++ b/tests/integration/targets/utils_ipaddr_filter/tasks/ipv4.yaml @@ -0,0 +1,35 @@ +--- +- name: set ipaddress list + ansible.builtin.set_fact: + value: + - 192.24.2.1 + - host.fqdn + - ::1 + - 192.168.32.0/24 + - fe80::100/10 + - "42540766412265424405338506004571095040/64" + - True + +- name: ipv4 filter + ansible.builtin.set_fact: + result1: "{{ value|ansible.utils.ipv4 }}" + +- name: Assert result for ipv4. + assert: + that: "{{ result1 == result1_val }}" + +- name: convert ipv4 to ipv6 filter + ansible.builtin.set_fact: + result2: "{{ value|ansible.utils.ipv4('ipv6') }}" + +- name: Assert result for ipv6. + assert: + that: "{{ result2 == result2_val }}" + +- name: Ipv4 filter with address query + ansible.builtin.set_fact: + result3: "{{ value|ansible.utils.ipv4('address') }}" + +- name: Assert result for ipv4 filter with address query. + assert: + that: "{{ result3 == result3_val }}" diff --git a/tests/integration/targets/utils_ipaddr_filter/tasks/main.yaml b/tests/integration/targets/utils_ipaddr_filter/tasks/main.yaml new file mode 100644 index 0000000..eb1a94b --- /dev/null +++ b/tests/integration/targets/utils_ipaddr_filter/tasks/main.yaml @@ -0,0 +1,14 @@ +--- +- name: Recursively find all test files + find: + file_type: file + paths: "{{ role_path }}/tasks" + recurse: false + use_regex: true + patterns: + - '^(?!_|main).+$' + delegate_to: localhost + register: found + +- include: "{{ item.path }}" + loop: "{{ found.files }}" diff --git a/tests/integration/targets/utils_ipaddr_filter/vars/main.yaml b/tests/integration/targets/utils_ipaddr_filter/vars/main.yaml new file mode 100644 index 0000000..b17b503 --- /dev/null +++ b/tests/integration/targets/utils_ipaddr_filter/vars/main.yaml @@ -0,0 +1,11 @@ +--- +result1_val: + - "192.24.2.1" + - "192.168.32.0/24" + +result2_val: + - "::ffff:192.24.2.1/128" + - "::ffff:192.168.32.0/120" + +result3_val: + - "192.24.2.1" diff --git a/tests/unit/plugins/filter/test_ipv4.py b/tests/unit/plugins/filter/test_ipv4.py new file mode 100644 index 0000000..44dac1b --- /dev/null +++ b/tests/unit/plugins/filter/test_ipv4.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 Red Hat +# GNU General Public License v3.0+ +# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +""" +Unit test file for ipwrap filter plugin +""" + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +import unittest +from ansible_collections.ansible.utils.plugins.filter.ipv4 import _ipv4 + + +VALID_DATA = [ + "192.24.2.1", + "host.fqdn", + "::1", + "", + "192.168.32.0/24", + "fe80::100/10", + "42540766412265424405338506004571095040/64", + True, +] + + +VALID_OUTPUT = ["192.24.2.1", "192.168.32.0/24"] + +VALID_OUTPUT1 = ["::ffff:192.24.2.1/128", "::ffff:192.168.32.0/120"] + +VALID_OUTPUT2 = ["192.24.2.1"] + + +class TestIp4(unittest.TestCase): + def setUp(self): + pass + + def test_ipv4_filter_empty_query(self): + """Check ipv4 filter empty query""" + args = ["", VALID_DATA, ""] + result = _ipv4(*args) + self.assertEqual(result, VALID_OUTPUT) + + def test_ipv4_ipv6_conversion(self): + """Check ipv4 to ipv6 conversion""" + args = ["", VALID_DATA, "ipv6"] + result = _ipv4(*args) + self.assertEqual(result, VALID_OUTPUT1) + + def test_ipv4_filter_address_query(self): + """Check ipv4 filter address query""" + args = ["", VALID_DATA, "address"] + result = _ipv4(*args) + self.assertEqual(result, VALID_OUTPUT2)