ansible.utils/plugins/filter/ipsubnet.py

344 lines
12 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2021 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
filter plugin file for ipaddr filters: ipsubnet
"""
from __future__ import absolute_import, division, print_function
from functools import partial
from ansible.errors import AnsibleFilterError
from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import (
AnsibleArgSpecValidator,
)
from ansible_collections.ansible.utils.plugins.plugin_utils.base.ipaddr_utils import (
_need_netaddr,
ipaddr,
)
__metaclass__ = type
from ansible.module_utils._text import to_text
try:
from jinja2.filters import pass_environment
except ImportError:
from jinja2.filters import environmentfilter as pass_environment
try:
import netaddr
HAS_NETADDR = True
except ImportError:
# in this case, we'll make the filters return error messages (see bottom)
HAS_NETADDR = False
else:
class mac_linux(netaddr.mac_unix):
pass
mac_linux.word_fmt = "%.2x"
DOCUMENTATION = """
name: ipsubnet
author: Ashwini Mhatre (@amhatre)
version_added: "2.5.0"
short_description: This filter can be used to manipulate network subnets in several ways.
description:
- This filter can be used to manipulate network subnets in several ways.
options:
value:
description:
- subnets or individual address or any other values input for ipsubnet plugin
type: str
required: True
query:
description: |
You can provide query as 1st argument.
To check if a given string is a subnet, pass it through the filter without any arguments. If the given
string is an IP address, it will be converted into a subnet.
If you specify a subnet size as the first parameter of the ipsubnet() filter, and the subnet size is
smaller than the current one, you will get the number of subnets a given subnet can be split into.
type: str
default: ''
index:
description: |
The second argument of the ipsubnet() filter is an index number; by specifying it you can get a new subnet
with the specified index.
type: int
notes:
"""
EXAMPLES = r"""
#### examples
# Ipsubnet filter plugin with different queries.
vars:
address: 192.168.144.5
subnet: 192.168.0.0/16
ipv6_address: '2001:4860:4860::8888'
ipv6_subnet: '2600:1f1c:1b3:8f00::/56'
tasks:
- name: convert IP address to subnet
debug:
msg: '{{ address | ansible.utils.ipsubnet }}'
- name: check if a given string is a subnet
debug:
msg: '{{ subnet | ansible.utils.ipsubnet }}'
- name: Get the number of subnets a given subnet can be split into.
debug:
msg: '{{ subnet | ansible.utils.ipsubnet(20) }}'
- name: Get a 1st subnet
debug:
msg: '{{ subnet | ansible.utils.ipsubnet(20, 0) }}'
- name: Get a last subnet
debug:
msg: '{{ subnet | ansible.utils.ipsubnet(20, -1) }}'
- name: Get first IPv6 subnet that has prefix length /120
debug:
msg: '{{ ipv6_subnet | ansible.utils.ipsubnet(120, 0) }}'
- name: Get last subnet that has prefix length /120
debug:
msg: '{{ ipv6_subnet | ansible.utils.ipsubnet(120, -1) }}'
- name: Get biggest subnet that contains that given IP address.
debug:
msg: '{{ address | ansible.utils.ipsubnet(20) }}'
- name: Get 1st smaller subnet by specifying 0 as index number
debug:
msg: '{{ address | ansible.utils.ipsubnet(18, 0) }}'
- name: Get last subnet
debug:
msg: '{{ address | ansible.utils.ipsubnet(18, -1) }}'
- name: >-
The rank of the IP in the subnet (the IP is the 36870nth /32 of the
subnet)
debug:
msg: '{{ address | ansible.utils.ipsubnet(subnet) }}'
- name: The rank in the /24 that contain the address
debug:
msg: '{{ address | ansible.utils.ipsubnet(''192.168.144.0/24'') }}'
- name: An IP with the subnet in the first /30 in a /24
debug:
msg: '{{ ''192.168.144.1/30'' | ansible.utils.ipsubnet(''192.168.144.0/24'') }}'
- name: The fifth subnet /30 in a /24
debug:
msg: '{{ ''192.168.144.16/30'' | ansible.utils.ipsubnet(''192.168.144.0/24'') }}'
# PLAY [Ipsubnet filter plugin with different queries.] ****************************************************************
# TASK [convert IP address to subnet] *************************************************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:10
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "192.168.144.5/32"
# }
#
# TASK [check if a given string is a subnet] ******************************************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:15
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "192.168.0.0/16"
# }
#
# TASK [Get the number of subnets a given subnet can be split into.] ******************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:20
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "16"
# }
#
# TASK [Get a 1st subnet] *************************************************************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:25
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "192.168.0.0/20"
# }
#
# TASK [Get a last subnet] ************************************************************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:30
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "192.168.240.0/20"
# }
#
# TASK [Get biggest subnet that contains that given IP address.] **********************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:35
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "192.168.144.0/20"
# }
#
# TASK [Get 1st smaller subnet by specifying 0 as index number] ***********************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:40
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "192.168.128.0/18"
# }
#
# TASK [Get last subnet] **************************************************************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:45
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "192.168.144.4/31"
# }
#
# TASK [The rank of the IP in the subnet (the IP is the 36870nth /32 of the subnet)] **************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:50
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "36870"
# }
#
# TASK [The rank in the /24 that contain the address] *********************************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:55
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "6"
# }
#
# TASK [An IP with the subnet in the first /30 in a /24] ******************************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:60
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "1"
# }
#
# TASK [he fifth subnet /30 in a /24] *************************************************************************
# task path: /Users/amhatre/ansible-collections/playbooks/test_ipsubnet.yaml:65
# Loading collection ansible.utils from /Users/amhatre/ansible-collections/collections/ansible_collections/ansible/utils
# ok: [localhost] => {
# "msg": "5"
# }
"""
RETURN = """
data:
type: raw
description:
- Returns values valid for a particular query.
"""
@pass_environment
def _ipsubnet(*args, **kwargs):
"""Manipulate IPv4/IPv6 subnets"""
keys = ["value", "query", "index"]
data = dict(zip(keys, args[1:]))
data.update(kwargs)
aav = AnsibleArgSpecValidator(data=data, schema=DOCUMENTATION, name="ipsubnet")
valid, errors, updated_data = aav.validate()
if not valid:
raise AnsibleFilterError(errors)
return ipsubnet(**updated_data)
def ipsubnet(value, query="", index=None):
"""Manipulate IPv4/IPv6 subnets"""
vtype = ipaddr(value, "type")
if not vtype:
return False
elif vtype == "address":
v = ipaddr(value, "cidr")
elif vtype == "network":
v = ipaddr(value, "subnet")
value = netaddr.IPNetwork(v).cidr
vtype = ipaddr(value, "type")
if not query:
return to_text(value)
vtotalbits = 128 if value.version == 6 else 32
if query.isdigit():
query = int(query)
if query < 0 or query > vtotalbits:
return False
if index is None:
if vtype == "address":
return to_text(value.supernet(query)[0])
elif vtype == "network":
if query - value.prefixlen < 0:
msg = "Requested subnet size of {0} is invalid".format(
to_text(query),
)
raise AnsibleFilterError(msg)
return to_text(2 ** (query - value.prefixlen))
index = int(index)
if vtype == "address":
if index > vtotalbits + 1 - index or index < query - vtotalbits:
return False
return to_text(value.supernet(query)[index])
elif vtype == "network":
subnets = 2 ** (query - value.prefixlen)
if index < 0:
index = index + subnets
if index < 0 or index >= subnets:
# subnet index out of range
return False
return to_text(
netaddr.IPNetwork(
to_text(
netaddr.IPAddress(
value.network.value + (index << vtotalbits - query),
),
)
+ "/"
+ to_text(query),
),
)
else:
vtype = ipaddr(query, "type")
if vtype == "address":
v = ipaddr(query, "cidr")
elif vtype == "network":
v = ipaddr(query, "subnet")
else:
msg = "You must pass a valid subnet or IP address; {0} is invalid".format(
to_text(query),
)
raise AnsibleFilterError(msg)
query = netaddr.IPNetwork(v)
if (
value.value >> vtotalbits - query.prefixlen
== query.value >> vtotalbits - query.prefixlen
):
return to_text(
(
(
value.value
& 2 ** (value.prefixlen - query.prefixlen) - 1
<< (vtotalbits - value.prefixlen)
)
>> (vtotalbits - value.prefixlen)
)
+ 1,
)
msg = "{0} is not in the subnet {1}".format(value.cidr, query.cidr)
raise AnsibleFilterError(msg)
return False
class FilterModule(object):
"""IP address and network manipulation filters"""
filter_map = {
# IP addresses and networks
"ipsubnet": _ipsubnet,
}
def filters(self):
"""ipsubnet filter"""
if HAS_NETADDR:
return self.filter_map
else:
return dict((f, partial(_need_netaddr, f)) for f in self.filter_map)