ansible.utils/plugins/filter/ipcut.py

165 lines
4.8 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2023 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
filter plugin file for ipaddr filters: ip_cut
"""
from __future__ import absolute_import, division, print_function
from functools import partial
from ansible.errors import AnsibleFilterError
from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import (
AnsibleArgSpecValidator,
)
from ansible_collections.ansible.utils.plugins.plugin_utils.base.ipaddr_utils import _need_netaddr
__metaclass__ = type
try:
from jinja2.filters import pass_environment
except ImportError:
from jinja2.filters import environmentfilter as pass_environment
try:
import netaddr
HAS_NETADDR = True
except ImportError:
# in this case, we'll make the filters return error messages (see bottom)
HAS_NETADDR = False
else:
class mac_linux(netaddr.mac_unix):
pass
mac_linux.word_fmt = "%.2x"
DOCUMENTATION = """
name: ipcut
author: Ashwini Mhatre (@amhatre)
version_added: "2.11.0"
short_description: This filter is designed to get 1st or last few bits of IP address.
description:
- This filter is designed to fetch 1st or last few bits of Ip address.
options:
value:
description:
- list of subnets or individual address or any other values input for ip_cut plugin
type: str
required: True
amount:
type: int
description: integer for arithmetic. Example -1,2,3
"""
EXAMPLES = r"""
#### examples
- name: Get first 64 bits of Ipv6 address
debug:
msg: "{{ '1234:4321:abcd:dcba::17' | ansible.utils.ipcut(64) }}"
- name: Get last 80 bits of Ipv6 address
debug:
msg: "{{ '1234:4321:abcd:dcba::17' | ansible.utils.ipcut(-80) }}"
# PLAY [IPCUT filter plugin examples] ************************************************************************************************
# TASK [Get first X bits of Ipv6 address] ********************************************************************************************
# ok: [localhost] => {
# "msg": "1234:4321:abcd:dcba"
# }
# TASK [Get last X bits of Ipv6 address] *********************************************************************************************
# ok: [localhost] => {
# "msg": "dcba:0:0:0:17"
# }
# PLAY RECAP *************************************************************************************************************************
# localhost : ok=2 changed=0 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0
"""
RETURN = """
data:
type: str
description:
- Returns result of portion of IP.
"""
@pass_environment
def _ipcut(*args, **kwargs):
"""Fetch first or last bits of IPV6 address"""
keys = ["value", "amount"]
data = dict(zip(keys, args[1:]))
data.update(kwargs)
aav = AnsibleArgSpecValidator(data=data, schema=DOCUMENTATION, name="ipmath")
valid, errors, updated_data = aav.validate()
if not valid:
raise AnsibleFilterError(errors)
return ipcut(**updated_data)
def ipcut(value, amount):
try:
ip = netaddr.IPAddress(value)
if ip.version == 6:
ip_bits = ip.bits().replace(":", "")
elif ip.version == 4:
ip_bits = ip.bits().replace(".", "")
else:
msg = "Unknown IP Address Version: {0}".format(ip.version)
raise AnsibleFilterError(msg)
except (netaddr.AddrFormatError, ValueError):
msg = "You must pass a valid IP address; {0} is invalid".format(value)
raise AnsibleFilterError(msg)
if not isinstance(amount, int):
msg = ("You must pass an integer for arithmetic; " "{0} is not a valid integer").format(
amount,
)
raise AnsibleFilterError(msg)
else:
if amount < 0:
ipsub = ip_bits[amount:]
else:
ipsub = ip_bits[0:amount]
if ip.version == 6:
ipv4_oct = []
for i in range(0, len(ipsub), 16):
oct_sub = i + 16
ipv4_oct.append(
hex(int(ipsub[i:oct_sub], 2)).replace("0x", ""),
)
result = str(":".join(ipv4_oct))
else: # ip.version == 4:
ipv4_oct = []
for i in range(0, len(ipsub), 8):
oct_sub = i + 8
ipv4_oct.append(
str(int(ipsub[i:oct_sub], 2)),
)
result = str(".".join(ipv4_oct))
return result
class FilterModule(object):
"""IP address and network manipulation filters"""
filter_map = {
# This filter is designed to fetch first or last bits of IPV6 address
"ipcut": _ipcut,
}
def filters(self):
"""ipcut filter"""
if HAS_NETADDR:
return self.filter_map
else:
return dict((f, partial(_need_netaddr, f)) for f in self.filter_map)