diff --git a/lib/ansible/modules/network/cloudengine/ce_static_route.py b/lib/ansible/modules/network/cloudengine/ce_static_route.py
new file mode 100644
index 0000000000..eccdb7ef2f
--- /dev/null
+++ b/lib/ansible/modules/network/cloudengine/ce_static_route.py
@@ -0,0 +1,841 @@
+# This file is part of Ansible
+# Ansible is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# Ansible is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with Ansible. If not, see .
+ANSIBLE_METADATA = {'status': ['preview'],
+ 'supported_by': 'community',
+ 'metadata_version': '1.0'}
+module: ce_static_route
+version_added: "2.4"
+short_description: Manages static route configuration on HUAWEI CloudEngine switches.
+ - Manages the static routes on HUAWEI CloudEngine switches.
+author: Yang yang (@CloudEngine-Ansible)
+ - If no vrf is supplied, vrf is set to default.
+ If I(state=absent), the route will be removed, regardless of the
+ non-required parameters.
+ prefix:
+ description:
+ - Destination ip address of static route.
+ required: true
+ mask:
+ description:
+ - Destination ip mask of static route.
+ required: true
+ aftype:
+ description:
+ - Destination ip address family type of static route.
+ required: true
+ choices: ['v4','v6']
+ next_hop:
+ description:
+ - Next hop address of static route.
+ required: false
+ default: null
+ nhp_interface:
+ description:
+ - Next hop interface full name of static route.
+ required: false
+ default: null
+ vrf:
+ description:
+ - VPN instance of destination ip address.
+ required: false
+ default: null
+ destvrf:
+ description:
+ - VPN instance of next hop ip address.
+ required: false
+ default: null
+ tag:
+ description:
+ - Route tag value (numeric).
+ required: false
+ default: null
+ description:
+ description:
+ - Name of the route. Used with the name parameter on the CLI.
+ required: false
+ default: null
+ pref:
+ description:
+ - Preference or administrative difference of route (range 1-255).
+ required: false
+ default: null
+ state:
+ description:
+ - Specify desired state of the resource.
+ required: false
+ choices: ['present','absent']
+ default: present
+- name: static route module test
+ hosts: cloudengine
+ connection: local
+ gather_facts: no
+ vars:
+ cli:
+ host: "{{ inventory_hostname }}"
+ port: "{{ ansible_ssh_port }}"
+ username: "{{ username }}"
+ password: "{{ password }}"
+ transport: cli
+ tasks:
+ - name: Config a ipv4 static route, next hop is an address and that it has the proper description
+ ce_static_route:
+ prefix:
+ mask: 24
+ next_hop:
+ description: 'Configured by Ansible'
+ aftype: v4
+ provider: "{{ cli }}"
+ - name: Config a ipv4 static route ,next hop is an interface and that it has the proper description
+ ce_static_route:
+ prefix:
+ mask: 24
+ next_hop: 10GE1/0/1
+ description: 'Configured by Ansible'
+ aftype: v4
+ provider: "{{ cli }}"
+ - name: Config a ipv6 static route, next hop is an address and that it has the proper description
+ ce_static_route:
+ prefix: fc00:0:0:2001::1
+ mask: 64
+ next_hop: fc00:0:0:2004::1
+ description: 'Configured by Ansible'
+ aftype: v6
+ provider: "{{ cli }}"
+ - name: Config a ipv4 static route, next hop is an interface and that it has the proper description
+ ce_static_route:
+ prefix: fc00:0:0:2001::1
+ mask: 64
+ next_hop: 10GE1/0/1
+ description: 'Configured by Ansible'
+ aftype: v6
+ provider: "{{ cli }}"
+ - name: Config a VRF and set ipv4 static route, next hop is an address and that it has the proper description
+ ce_static_route:
+ vrf: vpna
+ prefix:
+ mask: 24
+ next_hop:
+ description: 'Configured by Ansible'
+ aftype: v4
+ provider: "{{ cli }}"
+RETURN = '''
+ description: k/v pairs of parameters passed into module
+ returned: always
+ type: dict
+ sample: {"next_hop": "", "pref": "100",
+ "prefix": "", "mask": "24", "description": "testing",
+ "vrf": "_public_"}
+ description: k/v pairs of existing switchport
+ returned: always
+ type: dict
+ sample: {}
+ description: k/v pairs of switchport after module execution
+ returned: always
+ type: dict
+ sample: {"next_hop": "", "pref": "100",
+ "prefix": "", "mask": "24", "description": "testing",
+ "tag" : "null"}
+ description: command list sent to the device
+ returned: always
+ type: list
+ sample: ["ip route-static preference 100 description testing"]
+ description: check to see if a change was made on the device
+ returned: always
+ type: boolean
+ sample: true
+from xml.etree import ElementTree
+from ansible.module_utils.basic import AnsibleModule
+from ansible.module_utils.ce import get_nc_config, set_nc_config, ce_argument_spec
+ %s
+ %s
+ base
+ %s
+ %s
+ %s
+ %s
+ %s%s%s%s
+CE_NC_SET_TAG = """
+ %s
+ %s
+ base
+ %s
+ %s
+ %s
+ %s
+ %s
+def build_config_xml(xmlstr):
+ """build config xml"""
+ return ' ' + xmlstr + ' '
+def is_valid_v4addr(addr):
+ """check if ipv4 addr is valid"""
+ if addr.find('.') != -1:
+ addr_list = addr.split('.')
+ if len(addr_list) != 4:
+ return False
+ for each_num in addr_list:
+ if not each_num.isdigit():
+ return False
+ if int(each_num) > 255:
+ return False
+ return True
+ return False
+def is_valid_v6addr(addr):
+ """check if ipv6 addr is valid"""
+ if addr.find(':') != -1:
+ addr_list = addr.split(':')
+ if len(addr_list) > 6:
+ return False
+ if addr_list[1] != "":
+ return False
+ return True
+ return False
+def is_valid_tag(tag):
+ """check if the tag is valid"""
+ if not tag.isdigit():
+ return False
+ if int(tag) < 1 or int(tag) > 4294967295:
+ return False
+ return True
+def is_valid_preference(pref):
+ """check if the preference is valid"""
+ if pref.isdigit():
+ return int(pref) > 0 and int(pref) < 256
+ else:
+ return False
+def is_valid_description(description):
+ """check if the description is valid"""
+ if description.find('?') != -1:
+ return False
+ if len(description) < 1 or len(description) > 255:
+ return False
+ return True
+class StaticRoute(object):
+ """static route module"""
+ def __init__(self, argument_spec, ):
+ self.spec = argument_spec
+ self.module = None
+ self.init_module()
+ # static route info
+ self.prefix = self.module.params['prefix']
+ self.mask = self.module.params['mask']
+ self.aftype = self.module.params['aftype']
+ self.next_hop = self.module.params['next_hop']
+ self.nhp_interface = self.module.params['nhp_interface']
+ if self.nhp_interface is None:
+ self.nhp_interface = "Invalid0"
+ self.tag = self.module.params['tag']
+ self.description = self.module.params['description']
+ self.state = self.module.params['state']
+ self.pref = self.module.params['pref']
+ # vpn instance info
+ self.vrf = self.module.params['vrf']
+ if self.vrf is None:
+ self.vrf = "_public_"
+ self.destvrf = self.module.params['destvrf']
+ if self.destvrf is None:
+ self.destvrf = "_public_"
+ # state
+ self.changed = False
+ self.updates_cmd = list()
+ self.results = dict()
+ self.proposed = dict()
+ self.existing = dict()
+ self.end_state = dict()
+ self.static_routes_info = dict()
+ def init_module(self):
+ """init module"""
+ required_one_of = [["next_hop", "nhp_interface"]]
+ self.module = AnsibleModule(
+ argument_spec=self.spec, required_one_of=required_one_of, supports_check_mode=True)
+ def check_response(self, xml_str, xml_name):
+ """check if response message is already succeed."""
+ if "" not in xml_str:
+ self.module.fail_json(msg='Error: %s failed.' % xml_name)
+ def convert_len_to_mask(self, masklen):
+ """convert mask length to ip address mask, i.e. 24 to"""
+ mask_int = ["0"] * 4
+ length = int(masklen)
+ if length > 32:
+ self.module.fail_json(msg='IPv4 ipaddress mask length is invalid')
+ if length < 8:
+ mask_int[0] = str(int((0xFF << (8 - length % 8)) & 0xFF))
+ if length >= 8:
+ mask_int[0] = '255'
+ mask_int[1] = str(int((0xFF << (16 - (length % 16))) & 0xFF))
+ if length >= 16:
+ mask_int[1] = '255'
+ mask_int[2] = str(int((0xFF << (24 - (length % 24))) & 0xFF))
+ if length >= 24:
+ mask_int[2] = '255'
+ mask_int[3] = str(int((0xFF << (32 - (length % 32))) & 0xFF))
+ if length == 32:
+ mask_int[3] = '255'
+ return '.'.join(mask_int)
+ def convert_ip_prefix(self):
+ """convert prefix to real value i.e. to"""
+ if self.aftype == "v4":
+ if self.prefix.find('.') == -1:
+ return False
+ if self.mask == '32':
+ self.prefix = self.prefix
+ return True
+ if self.mask == '0':
+ self.prefix = ''
+ return True
+ addr_list = self.prefix.split('.')
+ length = len(addr_list)
+ if length > 4:
+ return False
+ for each_num in addr_list:
+ if not each_num.isdigit():
+ return False
+ if int(each_num) > 255:
+ return False
+ byte_len = 8
+ ip_len = int(self.mask) / byte_len
+ ip_bit = int(self.mask) % byte_len
+ else:
+ if self.prefix.find(':') == -1:
+ return False
+ if self.mask == '128':
+ self.prefix = self.prefix
+ return True
+ if self.mask == '0':
+ self.prefix = '::'
+ return True
+ addr_list = self.prefix.split(':')
+ length = len(addr_list)
+ if length > 6:
+ return False
+ byte_len = 16
+ ip_len = int(self.mask) / byte_len
+ ip_bit = int(self.mask) % byte_len
+ if self.aftype == "v4":
+ for i in range(ip_len + 1, length):
+ addr_list[i] = 0
+ else:
+ for i in range(length - ip_len, length):
+ addr_list[i] = 0
+ for j in range(0, byte_len - ip_bit):
+ if self.aftype == "v4":
+ addr_list[ip_len] = int(addr_list[ip_len]) & (0 << j)
+ else:
+ if addr_list[length - ip_len - 1] == "":
+ continue
+ addr_list[length - ip_len -
+ 1] = '0x%s' % addr_list[length - ip_len - 1]
+ addr_list[length - ip_len -
+ 1] = int(addr_list[length - ip_len - 1], 16) & (0 << j)
+ if self.aftype == "v4":
+ self.prefix = '%s.%s.%s.%s' % (addr_list[0], addr_list[1], addr_list[2], addr_list[3])
+ return True
+ else:
+ ipv6_addr_str = ""
+ for num in range(0, length - ip_len):
+ ipv6_addr_str += '%s:' % addr_list[num]
+ self.prefix = ipv6_addr_str
+ return True
+ def set_update_cmd(self):
+ """set update command"""
+ if not self.changed:
+ return
+ if self.aftype == "v4":
+ maskstr = self.convert_len_to_mask(self.mask)
+ else:
+ maskstr = self.mask
+ if self.next_hop is None:
+ next_hop = ''
+ else:
+ next_hop = self.next_hop
+ if self.vrf == "_public_":
+ vrf = ''
+ else:
+ vrf = self.vrf
+ if self.destvrf == "_public_":
+ destvrf = ''
+ else:
+ destvrf = self.destvrf
+ if self.nhp_interface == "Invalid0":
+ nhp_interface = ''
+ else:
+ nhp_interface = self.nhp_interface
+ if self.state == "present":
+ if self.vrf != "_public_":
+ if self.destvrf != "_public_":
+ self.updates_cmd.append('ip route-static vpn-instance %s %s %s vpn-instance %s %s'
+ % (vrf, self.prefix, maskstr, destvrf, next_hop))
+ else:
+ self.updates_cmd.append('ip route-static vpn-instance %s %s %s %s %s'
+ % (vrf, self.prefix, maskstr, nhp_interface, next_hop))
+ elif self.destvrf != "_public_":
+ self.updates_cmd.append('ip route-static %s %s vpn-instance %s %s'
+ % (self.prefix, maskstr, self.destvrf, next_hop))
+ else:
+ self.updates_cmd.append('ip route-static %s %s %s %s'
+ % (self.prefix, maskstr, nhp_interface, next_hop))
+ if self.pref:
+ self.updates_cmd.append(' preference %s' % (self.pref))
+ if self.tag:
+ self.updates_cmd.append(' tag %s' % (self.tag))
+ if self.description:
+ self.updates_cmd.append(' description %s' % (self.description))
+ if self.state == "absent":
+ if self.vrf != "_public_":
+ if self.destvrf != "_public_":
+ self.updates_cmd.append('undo ip route-static vpn-instance %s %s %s vpn-instance %s %s'
+ % (vrf, self.prefix, maskstr, destvrf, next_hop))
+ else:
+ self.updates_cmd.append('undo ip route-static vpn-instance %s %s %s %s %s'
+ % (vrf, self.prefix, maskstr, nhp_interface, next_hop))
+ elif self.destvrf != "_public_":
+ self.updates_cmd.append('undo ip route-static %s %s vpn-instance %s %s'
+ % (self.prefix, maskstr, self.destvrf, self.next_hop))
+ else:
+ self.updates_cmd.append('undo ip route-static %s %s %s %s'
+ % (self.prefix, maskstr, nhp_interface, next_hop))
+ def operate_static_route(self, version, prefix, mask, nhp_interface, next_hop, vrf, destvrf, state):
+ """operate ipv4 static route"""
+ description_xml = """\n"""
+ preference_xml = """\n"""
+ tag_xml = """\n"""
+ if next_hop is None:
+ next_hop = ''
+ if nhp_interface is None:
+ nhp_interface = "Invalid0"
+ if vrf is None:
+ vpn_instance = "_public_"
+ else:
+ vpn_instance = vrf
+ if destvrf is None:
+ dest_vpn_instance = "_public_"
+ else:
+ dest_vpn_instance = destvrf
+ if self.description:
+ description_xml = CE_NC_SET_DESCRIPTION % self.description
+ if self.pref:
+ preference_xml = CE_NC_SET_PREFERENCE % self.pref
+ if self.tag:
+ tag_xml = CE_NC_SET_TAG % self.tag
+ if state == "present":
+ configxmlstr = CE_NC_SET_STATIC_ROUTE % (
+ vpn_instance, version, prefix, mask, nhp_interface,
+ dest_vpn_instance, next_hop, description_xml, preference_xml, tag_xml)
+ else:
+ configxmlstr = CE_NC_DELETE_STATIC_ROUTE % (
+ vpn_instance, version, prefix, mask, nhp_interface, dest_vpn_instance, next_hop)
+ conf_str = build_config_xml(configxmlstr)
+ recv_xml = set_nc_config(self.module, conf_str)
+ self.check_response(recv_xml, "OPERATE_STATIC_ROUTE")
+ def get_static_route(self, state):
+ """get ipv4 static route"""
+ self.static_routes_info["sroute"] = list()
+ if state == 'absent':
+ else:
+ getxmlstr = CE_NC_GET_STATIC_ROUTE
+ xml_str = get_nc_config(self.module, getxmlstr)
+ if 'data/' in xml_str:
+ return
+ xml_str = xml_str.replace('\r', '').replace('\n', '').\
+ replace('xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"', "").\
+ replace('xmlns="http://www.huawei.com/netconf/vrp"', "")
+ root = ElementTree.fromstring(xml_str)
+ static_routes = root.findall(
+ "data/staticrt/staticrtbase/srRoutes/srRoute")
+ if static_routes:
+ for static_route in static_routes:
+ static_info = dict()
+ for static_ele in static_route:
+ if static_ele.tag in ["vrfName", "afType", "topologyName",
+ "prefix", "maskLength", "destVrfName",
+ "nexthop", "ifName", "preference", "description"]:
+ static_info[
+ static_ele.tag] = static_ele.text
+ if static_ele.tag == "tag":
+ if static_ele.text is not None:
+ static_info["tag"] = static_ele.text
+ else:
+ static_info["tag"] = "None"
+ self.static_routes_info["sroute"].append(static_info)
+ def check_params(self):
+ """check all input params"""
+ # check prefix and mask
+ if not self.mask.isdigit():
+ self.module.fail_json(msg='Error: Mask is invalid.')
+ # ipv4 check
+ if self.aftype == "v4":
+ if int(self.mask) > 32 or int(self.mask) < 0:
+ self.module.fail_json(
+ msg='Error: Ipv4 mask must be an integer between 1 and 32.')
+ # next_hop check
+ if self.next_hop:
+ if not is_valid_v4addr(self.next_hop):
+ self.module.fail_json(
+ msg='Error: The %s is not a valid address' % self.next_hop)
+ # ipv6 check
+ if self.aftype == "v6":
+ if int(self.mask) > 128 or int(self.mask) < 0:
+ self.module.fail_json(
+ msg='Error: Ipv6 mask must be an integer between 1 and 128.')
+ if self.next_hop:
+ if not is_valid_v6addr(self.next_hop):
+ self.module.fail_json(
+ msg='Error: The %s is not a valid address' % self.next_hop)
+ # description check
+ if self.description:
+ if not is_valid_description(self.description):
+ self.module.fail_json(
+ msg='Error: Dsecription length should be 1 - 35, and can not contain "?".')
+ # tag check
+ if self.tag:
+ if not is_valid_tag(self.tag):
+ self.module.fail_json(
+ msg='Error: Tag should be integer 1 - 4294967295.')
+ # preference check
+ if self.pref:
+ if not is_valid_preference(self.pref):
+ self.module.fail_json(
+ msg='Error: Preference should be integer 1 - 255.')
+ if self.nhp_interface != "Invalid0" and self.destvrf != "_public_":
+ self.module.fail_json(
+ msg='Error: Destination vrf dose no support next hop is interface.')
+ # convert prefix
+ if not self.convert_ip_prefix():
+ self.module.fail_json(
+ msg='Error: The %s is not a valid address' % self.prefix)
+ def set_ip_static_route(self):
+ """set ip static route"""
+ if not self.changed:
+ return
+ version = None
+ if self.aftype == "v4":
+ version = "ipv4unicast"
+ else:
+ version = "ipv6unicast"
+ self.operate_static_route(version, self.prefix, self.mask, self.nhp_interface,
+ self.next_hop, self.vrf, self.destvrf, self.state)
+ def is_prefix_exist(self, static_route, version):
+ """is prefix mask nex_thop exist"""
+ if static_route is None:
+ return False
+ if self.next_hop and self.nhp_interface:
+ return static_route["prefix"].lower() == self.prefix.lower() \
+ and static_route["maskLength"] == self.mask \
+ and static_route["afType"] == version \
+ and static_route["ifName"].lower() == self.nhp_interface.lower() \
+ and static_route["nexthop"].lower() == self.next_hop.lower()
+ if self.next_hop and not self.nhp_interface:
+ return static_route["prefix"].lower() == self.prefix.lower() \
+ and static_route["maskLength"] == self.mask \
+ and static_route["afType"] == version \
+ and static_route["nexthop"].lower() == self.next_hop.lower()
+ if not self.next_hop and self.nhp_interface:
+ return static_route["prefix"].lower() == self.prefix.lower() \
+ and static_route["maskLength"] == self.mask \
+ and static_route["afType"] == version \
+ and static_route["ifName"].lower() == self.nhp_interface.lower()
+ def get_ip_static_route(self):
+ """get ip static route"""
+ if self.aftype == "v4":
+ version = "ipv4unicast"
+ else:
+ version = "ipv6unicast"
+ change = False
+ self.get_static_route(self.state)
+ if self.state == 'present':
+ for static_route in self.static_routes_info["sroute"]:
+ if self.is_prefix_exist(static_route, version):
+ if self.vrf:
+ if static_route["vrfName"] != self.vrf:
+ change = True
+ if self.tag:
+ if static_route["tag"] != self.tag:
+ change = True
+ if self.destvrf:
+ if static_route["destVrfName"] != self.destvrf:
+ change = True
+ if self.description:
+ if static_route["description"] != self.description:
+ change = True
+ if self.pref:
+ if static_route["preference"] != self.pref:
+ change = True
+ if self.nhp_interface:
+ if static_route["ifName"].lower() != self.nhp_interface.lower():
+ change = True
+ if self.next_hop:
+ if static_route["nexthop"].lower() != self.next_hop.lower():
+ change = True
+ return change
+ else:
+ continue
+ change = True
+ else:
+ for static_route in self.static_routes_info["sroute"]:
+ if static_route["nexthop"] and self.next_hop:
+ if static_route["prefix"].lower() == self.prefix.lower() \
+ and static_route["maskLength"] == self.mask \
+ and static_route["nexthop"].lower() == self.next_hop.lower() \
+ and static_route["afType"] == version:
+ change = True
+ return change
+ if static_route["ifName"] and self.nhp_interface:
+ if static_route["prefix"].lower() == self.prefix.lower() \
+ and static_route["maskLength"] == self.mask \
+ and static_route["ifName"].lower() == self.nhp_interface.lower() \
+ and static_route["afType"] == version:
+ change = True
+ return change
+ else:
+ continue
+ change = False
+ return change
+ def get_proposed(self):
+ """get proposed information"""
+ self.proposed['prefix'] = self.prefix
+ self.proposed['mask'] = self.mask
+ self.proposed['afType'] = self.aftype
+ self.proposed['next_hop'] = self.next_hop
+ self.proposed['ifName'] = self.nhp_interface
+ self.proposed['vrfName'] = self.vrf
+ self.proposed['destVrfName'] = self.destvrf
+ if self.tag:
+ self.proposed['tag'] = self.tag
+ if self.description:
+ self.proposed['description'] = self.description
+ if self.pref is None:
+ self.proposed['preference'] = 60
+ else:
+ self.proposed['preference'] = self.pref
+ self.proposed['state'] = self.state
+ def get_existing(self):
+ """get existing information"""
+ change = self.get_ip_static_route()
+ self.existing['sroute'] = self.static_routes_info["sroute"]
+ self.changed = bool(change)
+ def get_end_state(self):
+ """get end state information"""
+ self.get_static_route(self.state)
+ self.end_state['sroute'] = self.static_routes_info["sroute"]
+ def work(self):
+ """worker"""
+ self.check_params()
+ self.get_existing()
+ self.get_proposed()
+ self.set_ip_static_route()
+ self.set_update_cmd()
+ self.get_end_state()
+ self.results['changed'] = self.changed
+ self.results['proposed'] = self.proposed
+ self.results['existing'] = self.existing
+ self.results['end_state'] = self.end_state
+ if self.changed:
+ self.results['updates'] = self.updates_cmd
+ else:
+ self.results['updates'] = list()
+ self.module.exit_json(**self.results)
+def main():
+ """main"""
+ argument_spec = dict(
+ prefix=dict(required=True, type='str'),
+ mask=dict(required=True, type='str'),
+ aftype=dict(choices=['v4', 'v6'], required=True),
+ next_hop=dict(required=False, type='str'),
+ nhp_interface=dict(required=False, type='str'),
+ vrf=dict(required=False, type='str'),
+ destvrf=dict(required=False, type='str'),
+ tag=dict(required=False, type='str'),
+ description=dict(required=False, type='str'),
+ pref=dict(required=False, type='str'),
+ state=dict(choices=['absent', 'present'],
+ default='present', required=False),
+ )
+ argument_spec.update(ce_argument_spec)
+ interface = StaticRoute(argument_spec)
+ interface.work()
+if __name__ == '__main__':
+ main()