Add small unit test framework for testing fetch_url() based modules (#171)
* Convert integrated simple fetch_url() checking framework from hetzner_firewall tests to proper framework which can also be used by other modules. * Linting. * One more. * Use community.internal_test_tools collection.pull/227/head
parent
23de3feedd
commit
703daa9500
|
@ -7,6 +7,7 @@ unit_tests_dependencies:
|
||||||
- ansible.netcommon
|
- ansible.netcommon
|
||||||
- ansible.posix
|
- ansible.posix
|
||||||
- cisco.intersight
|
- cisco.intersight
|
||||||
|
- community.internal_test_tools
|
||||||
- community.kubernetes
|
- community.kubernetes
|
||||||
- google.cloud
|
- google.cloud
|
||||||
- ovirt.ovirt
|
- ovirt.ovirt
|
||||||
|
|
|
@ -7,210 +7,38 @@ __metaclass__ = type
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import (
|
||||||
|
FetchUrlCall,
|
||||||
|
BaseTestModule,
|
||||||
|
)
|
||||||
|
|
||||||
from ansible_collections.community.general.plugins.module_utils.hetzner import BASE_URL
|
from ansible_collections.community.general.plugins.module_utils.hetzner import BASE_URL
|
||||||
from ansible_collections.community.general.plugins.modules.net_tools import hetzner_firewall
|
from ansible_collections.community.general.plugins.modules.net_tools import hetzner_firewall
|
||||||
|
|
||||||
|
|
||||||
# ##########################################################
|
def create_params(parameter, *values):
|
||||||
# ## General test framework
|
assert len(values) > 1
|
||||||
|
result = []
|
||||||
import json
|
for i in range(1, len(values)):
|
||||||
|
result.append((parameter, values[i - 1], values[i]))
|
||||||
from mock import MagicMock
|
return result
|
||||||
from ansible_collections.community.general.tests.unit.plugins.modules.utils import set_module_args
|
|
||||||
from ansible.module_utils.six.moves.urllib.parse import parse_qs
|
|
||||||
|
|
||||||
|
|
||||||
class FetchUrlCall:
|
def flatten(list_of_lists):
|
||||||
def __init__(self, method, status):
|
result = []
|
||||||
assert method == method.upper(), \
|
for l in list_of_lists:
|
||||||
'HTTP method names are case-sensitive and should be upper-case (RFCs 7230 and 7231)'
|
result.extend(l)
|
||||||
self.method = method
|
return result
|
||||||
self.status = status
|
|
||||||
self.body = None
|
|
||||||
self.headers = {}
|
|
||||||
self.error_data = {}
|
|
||||||
self.expected_url = None
|
|
||||||
self.expected_headers = {}
|
|
||||||
self.form_parse = False
|
|
||||||
self.form_present = set()
|
|
||||||
self.form_values = {}
|
|
||||||
self.form_values_one = {}
|
|
||||||
|
|
||||||
def result(self, body):
|
|
||||||
self.body = body
|
|
||||||
assert self.error_data.get('body') is None, 'Error body must not be given'
|
|
||||||
return self
|
|
||||||
|
|
||||||
def result_str(self, str_body):
|
|
||||||
return self.result(str_body.encode('utf-8'))
|
|
||||||
|
|
||||||
def result_json(self, json_body):
|
|
||||||
return self.result(json.dumps(json_body).encode('utf-8'))
|
|
||||||
|
|
||||||
def result_error(self, msg, body=None):
|
|
||||||
self.error_data['msg'] = msg
|
|
||||||
if body is not None:
|
|
||||||
self.error_data['body'] = body
|
|
||||||
assert self.body is None, 'Result must not be given if error body is provided'
|
|
||||||
return self
|
|
||||||
|
|
||||||
def expect_url(self, url):
|
|
||||||
self.expected_url = url
|
|
||||||
return self
|
|
||||||
|
|
||||||
def return_header(self, name, value):
|
|
||||||
assert value is not None
|
|
||||||
self.headers[name] = value
|
|
||||||
return self
|
|
||||||
|
|
||||||
def expect_header(self, name, value):
|
|
||||||
self.expected_headers[name] = value
|
|
||||||
return self
|
|
||||||
|
|
||||||
def expect_header_unset(self, name):
|
|
||||||
self.expected_headers[name] = None
|
|
||||||
return self
|
|
||||||
|
|
||||||
def expect_form_present(self, key):
|
|
||||||
self.form_parse = True
|
|
||||||
self.form_present.append(key)
|
|
||||||
return self
|
|
||||||
|
|
||||||
def expect_form_value(self, key, value):
|
|
||||||
self.form_parse = True
|
|
||||||
self.form_values[key] = [value]
|
|
||||||
return self
|
|
||||||
|
|
||||||
def expect_form_value_absent(self, key):
|
|
||||||
self.form_parse = True
|
|
||||||
self.form_values[key] = []
|
|
||||||
return self
|
|
||||||
|
|
||||||
def expect_form_value_one_of(self, key, value):
|
|
||||||
self.form_parse = True
|
|
||||||
if key not in self.form_values_subset:
|
|
||||||
self.form_values_subset[key] = set()
|
|
||||||
self.form_values_subset[key].add(value)
|
|
||||||
return self
|
|
||||||
|
|
||||||
|
|
||||||
class FetchUrlProxy:
|
class TestHetznerFirewall(BaseTestModule):
|
||||||
def __init__(self, calls):
|
MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.general.plugins.modules.net_tools.hetzner_firewall.AnsibleModule'
|
||||||
self.calls = calls
|
MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.general.plugins.module_utils.hetzner.fetch_url'
|
||||||
self.index = 0
|
|
||||||
|
|
||||||
def _validate_form(self, call, data):
|
|
||||||
form = {}
|
|
||||||
if data is not None:
|
|
||||||
form = parse_qs(data, keep_blank_values=True)
|
|
||||||
for k in call.form_present:
|
|
||||||
assert k in form
|
|
||||||
for k, v in call.form_values.items():
|
|
||||||
if len(v) == 0:
|
|
||||||
assert k not in form
|
|
||||||
else:
|
|
||||||
assert form[k] == v
|
|
||||||
for k, v in call.form_values_one.items():
|
|
||||||
assert v <= set(form[k])
|
|
||||||
|
|
||||||
def _validate_headers(self, call, headers):
|
|
||||||
given_headers = {}
|
|
||||||
if headers is not None:
|
|
||||||
for k, v in headers.items():
|
|
||||||
given_headers[k.lower()] = v
|
|
||||||
for k, v in call.expected_headers:
|
|
||||||
if v is None:
|
|
||||||
assert k.lower() not in given_headers, \
|
|
||||||
'Header "{0}" specified for fetch_url call, but should not be'.format(k)
|
|
||||||
else:
|
|
||||||
assert given_headers.get(k.lower()) == v, \
|
|
||||||
'Header "{0}" specified for fetch_url call, but with wrong value'.format(k)
|
|
||||||
|
|
||||||
def __call__(self, module, url, data=None, headers=None, method=None,
|
|
||||||
use_proxy=True, force=False, last_mod_time=None, timeout=10,
|
|
||||||
use_gssapi=False, unix_socket=None, ca_path=None, cookies=None):
|
|
||||||
assert self.index < len(self.calls), 'Got more fetch_url calls than expected'
|
|
||||||
call = self.calls[self.index]
|
|
||||||
self.index += 1
|
|
||||||
|
|
||||||
# Validate call
|
|
||||||
assert method == call.method
|
|
||||||
if call.expected_url is not None:
|
|
||||||
assert url == call.expected_url, \
|
|
||||||
'Exepected URL does not match for fetch_url call'
|
|
||||||
if call.expected_headers:
|
|
||||||
self._validate_headers(call, headers)
|
|
||||||
if call.form_parse:
|
|
||||||
self._validate_form(call, data)
|
|
||||||
|
|
||||||
# Compose result
|
|
||||||
info = dict(status=call.status)
|
|
||||||
for k, v in call.headers.items():
|
|
||||||
info[k.lower()] = v
|
|
||||||
info.update(call.error_data)
|
|
||||||
res = object()
|
|
||||||
if call.body is not None:
|
|
||||||
res = MagicMock()
|
|
||||||
res.read = MagicMock(return_value=call.body)
|
|
||||||
return (res, info)
|
|
||||||
|
|
||||||
def assert_is_done(self):
|
|
||||||
assert self.index == len(self.calls), 'Got less fetch_url calls than expected'
|
|
||||||
|
|
||||||
|
|
||||||
class ModuleExitException(Exception):
|
|
||||||
def __init__(self, kwargs):
|
|
||||||
self.kwargs = kwargs
|
|
||||||
|
|
||||||
|
|
||||||
class ModuleFailException(Exception):
|
|
||||||
def __init__(self, kwargs):
|
|
||||||
self.kwargs = kwargs
|
|
||||||
|
|
||||||
|
|
||||||
def run_module(mocker, module, arguments, fetch_url):
|
|
||||||
def exit_json(module, **kwargs):
|
|
||||||
module._return_formatted(kwargs)
|
|
||||||
raise ModuleExitException(kwargs)
|
|
||||||
|
|
||||||
def fail_json(module, **kwargs):
|
|
||||||
module._return_formatted(kwargs)
|
|
||||||
raise ModuleFailException(kwargs)
|
|
||||||
|
|
||||||
mocker.patch('ansible_collections.community.general.plugins.module_utils.hetzner.fetch_url', fetch_url)
|
|
||||||
mocker.patch('ansible_collections.community.general.plugins.module_utils.hetzner.time.sleep', lambda duration: None)
|
|
||||||
mocker.patch('ansible_collections.community.general.plugins.modules.net_tools.hetzner_firewall.AnsibleModule.exit_json', exit_json)
|
|
||||||
mocker.patch('ansible_collections.community.general.plugins.modules.net_tools.hetzner_firewall.AnsibleModule.fail_json', fail_json)
|
|
||||||
set_module_args(arguments)
|
|
||||||
module.main()
|
|
||||||
|
|
||||||
|
|
||||||
def run_module_success(mocker, module, arguments, fetch_url_calls):
|
|
||||||
fetch_url = FetchUrlProxy(fetch_url_calls or [])
|
|
||||||
with pytest.raises(ModuleExitException) as e:
|
|
||||||
run_module(mocker, module, arguments, fetch_url)
|
|
||||||
fetch_url.assert_is_done()
|
|
||||||
return e.value.kwargs
|
|
||||||
|
|
||||||
|
|
||||||
def run_module_failed(mocker, module, arguments, fetch_url_calls):
|
|
||||||
fetch_url = FetchUrlProxy(fetch_url_calls or [])
|
|
||||||
with pytest.raises(ModuleFailException) as e:
|
|
||||||
run_module(mocker, module, arguments, fetch_url)
|
|
||||||
fetch_url.assert_is_done()
|
|
||||||
return e.value.kwargs
|
|
||||||
|
|
||||||
|
|
||||||
# ##########################################################
|
|
||||||
# ## Hetzner firewall tests
|
|
||||||
|
|
||||||
|
|
||||||
# Tests for state (absent and present)
|
# Tests for state (absent and present)
|
||||||
|
|
||||||
|
def test_absent_idempotency(self, mocker):
|
||||||
def test_absent_idempotency(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -238,9 +66,8 @@ def test_absent_idempotency(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
def test_absent_changed(self, mocker):
|
||||||
def test_absent_changed(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -283,9 +110,8 @@ def test_absent_changed(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
def test_present_idempotency(self, mocker):
|
||||||
def test_present_idempotency(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -313,9 +139,8 @@ def test_present_idempotency(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
def test_present_changed(self, mocker):
|
||||||
def test_present_changed(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -358,12 +183,10 @@ def test_present_changed(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
|
||||||
# Tests for state (absent and present) with check mode
|
# Tests for state (absent and present) with check mode
|
||||||
|
|
||||||
|
def test_absent_idempotency_check(self, mocker):
|
||||||
def test_absent_idempotency_check(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -392,9 +215,8 @@ def test_absent_idempotency_check(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
def test_absent_changed_check(self, mocker):
|
||||||
def test_absent_changed_check(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -423,9 +245,8 @@ def test_absent_changed_check(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
def test_present_idempotency_check(self, mocker):
|
||||||
def test_present_idempotency_check(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -454,9 +275,8 @@ def test_present_idempotency_check(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
def test_present_changed_check(self, mocker):
|
||||||
def test_present_changed_check(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -485,12 +305,10 @@ def test_present_changed_check(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
|
||||||
# Tests for port
|
# Tests for port
|
||||||
|
|
||||||
|
def test_port_idempotency(self, mocker):
|
||||||
def test_port_idempotency(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -520,9 +338,8 @@ def test_port_idempotency(mocker):
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
assert result['firewall']['port'] == 'main'
|
assert result['firewall']['port'] == 'main'
|
||||||
|
|
||||||
|
def test_port_changed(self, mocker):
|
||||||
def test_port_changed(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -567,12 +384,10 @@ def test_port_changed(mocker):
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
assert result['firewall']['port'] == 'main'
|
assert result['firewall']['port'] == 'main'
|
||||||
|
|
||||||
|
|
||||||
# Tests for whitelist_hos
|
# Tests for whitelist_hos
|
||||||
|
|
||||||
|
def test_whitelist_hos_idempotency(self, mocker):
|
||||||
def test_whitelist_hos_idempotency(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -602,9 +417,8 @@ def test_whitelist_hos_idempotency(mocker):
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
assert result['firewall']['whitelist_hos'] is True
|
assert result['firewall']['whitelist_hos'] is True
|
||||||
|
|
||||||
|
def test_whitelist_hos_changed(self, mocker):
|
||||||
def test_whitelist_hos_changed(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -649,12 +463,11 @@ def test_whitelist_hos_changed(mocker):
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
assert result['firewall']['whitelist_hos'] is True
|
assert result['firewall']['whitelist_hos'] is True
|
||||||
|
|
||||||
|
|
||||||
# Tests for wait_for_configured in getting status
|
# Tests for wait_for_configured in getting status
|
||||||
|
|
||||||
|
def test_wait_get(self, mocker):
|
||||||
def test_wait_get(mocker):
|
mocker.patch('time.sleep', lambda duration: None)
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -697,9 +510,9 @@ def test_wait_get(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
def test_wait_get_timeout(self, mocker):
|
||||||
def test_wait_get_timeout(mocker):
|
mocker.patch('time.sleep', lambda duration: None)
|
||||||
result = run_module_failed(mocker, hetzner_firewall, {
|
result = self.run_module_failed(mocker, hetzner_firewall, {
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -738,9 +551,8 @@ def test_wait_get_timeout(mocker):
|
||||||
])
|
])
|
||||||
assert result['msg'] == 'Timeout while waiting for firewall to be configured.'
|
assert result['msg'] == 'Timeout while waiting for firewall to be configured.'
|
||||||
|
|
||||||
|
def test_nowait_get(self, mocker):
|
||||||
def test_nowait_get(mocker):
|
result = self.run_module_failed(mocker, hetzner_firewall, {
|
||||||
result = run_module_failed(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -764,12 +576,11 @@ def test_nowait_get(mocker):
|
||||||
])
|
])
|
||||||
assert result['msg'] == 'Firewall configuration cannot be read as it is not configured.'
|
assert result['msg'] == 'Firewall configuration cannot be read as it is not configured.'
|
||||||
|
|
||||||
|
|
||||||
# Tests for wait_for_configured in setting status
|
# Tests for wait_for_configured in setting status
|
||||||
|
|
||||||
|
def test_wait_update(self, mocker):
|
||||||
def test_wait_update(mocker):
|
mocker.patch('time.sleep', lambda duration: None)
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -826,9 +637,9 @@ def test_wait_update(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
def test_wait_update_timeout(self, mocker):
|
||||||
def test_wait_update_timeout(mocker):
|
mocker.patch('time.sleep', lambda duration: None)
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -887,9 +698,8 @@ def test_wait_update_timeout(mocker):
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
assert 'Timeout while waiting for firewall to be configured.' in result['warnings']
|
assert 'Timeout while waiting for firewall to be configured.' in result['warnings']
|
||||||
|
|
||||||
|
def test_nowait_update(self, mocker):
|
||||||
def test_nowait_update(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -932,11 +742,10 @@ def test_nowait_update(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
|
||||||
# Idempotency checks: different amount of input rules
|
# Idempotency checks: different amount of input rules
|
||||||
|
|
||||||
def test_input_rule_len_change_0_1(mocker):
|
def test_input_rule_len_change_0_1(self, mocker):
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -1010,9 +819,8 @@ def test_input_rule_len_change_0_1(mocker):
|
||||||
assert result['firewall']['status'] == 'active'
|
assert result['firewall']['status'] == 'active'
|
||||||
assert len(result['firewall']['rules']['input']) == 1
|
assert len(result['firewall']['rules']['input']) == 1
|
||||||
|
|
||||||
|
def test_input_rule_len_change_1_0(self, mocker):
|
||||||
def test_input_rule_len_change_1_0(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -1071,9 +879,8 @@ def test_input_rule_len_change_1_0(mocker):
|
||||||
assert result['firewall']['status'] == 'active'
|
assert result['firewall']['status'] == 'active'
|
||||||
assert len(result['firewall']['rules']['input']) == 0
|
assert len(result['firewall']['rules']['input']) == 0
|
||||||
|
|
||||||
|
def test_input_rule_len_change_1_2(self, mocker):
|
||||||
def test_input_rule_len_change_1_2(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -1169,25 +976,8 @@ def test_input_rule_len_change_1_2(mocker):
|
||||||
assert result['firewall']['status'] == 'active'
|
assert result['firewall']['status'] == 'active'
|
||||||
assert len(result['firewall']['rules']['input']) == 2
|
assert len(result['firewall']['rules']['input']) == 2
|
||||||
|
|
||||||
|
|
||||||
# Idempotency checks: change one value
|
# Idempotency checks: change one value
|
||||||
|
|
||||||
|
|
||||||
def create_params(parameter, *values):
|
|
||||||
assert len(values) > 1
|
|
||||||
result = []
|
|
||||||
for i in range(1, len(values)):
|
|
||||||
result.append((parameter, values[i - 1], values[i]))
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def flatten(list_of_lists):
|
|
||||||
result = []
|
|
||||||
for l in list_of_lists:
|
|
||||||
result.extend(l)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("parameter, before, after", flatten([
|
@pytest.mark.parametrize("parameter, before, after", flatten([
|
||||||
create_params('name', None, '', 'Test', 'Test', 'foo', '', None),
|
create_params('name', None, '', 'Test', 'Test', 'foo', '', None),
|
||||||
create_params('ip_version', 'ipv4', 'ipv4', 'ipv6', 'ipv6'),
|
create_params('ip_version', 'ipv4', 'ipv4', 'ipv6', 'ipv6'),
|
||||||
|
@ -1199,7 +989,7 @@ def flatten(list_of_lists):
|
||||||
create_params('tcp_flags', None, 'syn', 'syn|fin', 'syn|fin', 'syn&fin', '', None),
|
create_params('tcp_flags', None, 'syn', 'syn|fin', 'syn|fin', 'syn&fin', '', None),
|
||||||
create_params('action', 'accept', 'accept', 'discard', 'discard'),
|
create_params('action', 'accept', 'accept', 'discard', 'discard'),
|
||||||
]))
|
]))
|
||||||
def test_input_rule_value_change(mocker, parameter, before, after):
|
def test_input_rule_value_change(self, mocker, parameter, before, after):
|
||||||
input_call = {
|
input_call = {
|
||||||
'ip_version': 'ipv4',
|
'ip_version': 'ipv4',
|
||||||
'action': 'discard',
|
'action': 'discard',
|
||||||
|
@ -1278,7 +1068,7 @@ def test_input_rule_value_change(mocker, parameter, before, after):
|
||||||
after_call.expect_form_value_absent('rules[input][0][{0}]'.format(parameter))
|
after_call.expect_form_value_absent('rules[input][0][{0}]'.format(parameter))
|
||||||
calls.append(after_call)
|
calls.append(after_call)
|
||||||
|
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -1298,17 +1088,15 @@ def test_input_rule_value_change(mocker, parameter, before, after):
|
||||||
assert len(result['firewall']['rules']['input']) == 1
|
assert len(result['firewall']['rules']['input']) == 1
|
||||||
assert result['firewall']['rules']['input'][0][parameter] == after
|
assert result['firewall']['rules']['input'][0][parameter] == after
|
||||||
|
|
||||||
|
|
||||||
# Idempotency checks: IP address normalization
|
# Idempotency checks: IP address normalization
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("ip_version, parameter, before_normalized, after_normalized, after", [
|
@pytest.mark.parametrize("ip_version, parameter, before_normalized, after_normalized, after", [
|
||||||
('ipv4', 'src_ip', '1.2.3.4/32', '1.2.3.4/32', '1.2.3.4'),
|
('ipv4', 'src_ip', '1.2.3.4/32', '1.2.3.4/32', '1.2.3.4'),
|
||||||
('ipv6', 'src_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3::4'),
|
('ipv6', 'src_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3::4'),
|
||||||
('ipv6', 'dst_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3:0::4'),
|
('ipv6', 'dst_ip', '1:2:3::4/128', '1:2:3::4/128', '1:2:3:0::4'),
|
||||||
('ipv6', 'dst_ip', '::/0', '::/0', '0:0::0/0'),
|
('ipv6', 'dst_ip', '::/0', '::/0', '0:0::0/0'),
|
||||||
])
|
])
|
||||||
def test_input_rule_ip_normalization(mocker, ip_version, parameter, before_normalized, after_normalized, after):
|
def test_input_rule_ip_normalization(self, mocker, ip_version, parameter, before_normalized, after_normalized, after):
|
||||||
assert ip_version in ('ipv4', 'ipv6')
|
assert ip_version in ('ipv4', 'ipv6')
|
||||||
assert parameter in ('src_ip', 'dst_ip')
|
assert parameter in ('src_ip', 'dst_ip')
|
||||||
input_call = {
|
input_call = {
|
||||||
|
@ -1384,7 +1172,7 @@ def test_input_rule_ip_normalization(mocker, ip_version, parameter, before_norma
|
||||||
after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after_normalized)
|
after_call.expect_form_value('rules[input][0][{0}]'.format(parameter), after_normalized)
|
||||||
calls.append(after_call)
|
calls.append(after_call)
|
||||||
|
|
||||||
result = run_module_success(mocker, hetzner_firewall, {
|
result = self.run_module_success(mocker, hetzner_firewall, {
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
|
|
@ -5,18 +5,23 @@ from __future__ import (absolute_import, division, print_function)
|
||||||
__metaclass__ = type
|
__metaclass__ = type
|
||||||
|
|
||||||
|
|
||||||
import pytest
|
from ansible_collections.community.internal_test_tools.tests.unit.utils.fetch_url_module_framework import (
|
||||||
|
FetchUrlCall,
|
||||||
|
BaseTestModule,
|
||||||
|
)
|
||||||
|
|
||||||
from ansible_collections.community.general.plugins.module_utils.hetzner import BASE_URL
|
from ansible_collections.community.general.plugins.module_utils.hetzner import BASE_URL
|
||||||
from ansible_collections.community.general.plugins.modules.net_tools import hetzner_firewall_info
|
from ansible_collections.community.general.plugins.modules.net_tools import hetzner_firewall_info
|
||||||
from .test_hetzner_firewall import FetchUrlCall, run_module_success, run_module_failed
|
|
||||||
|
|
||||||
|
|
||||||
|
class TestHetznerFirewallInfo(BaseTestModule):
|
||||||
|
MOCK_ANSIBLE_MODULEUTILS_BASIC_ANSIBLEMODULE = 'ansible_collections.community.general.plugins.modules.net_tools.hetzner_firewall_info.AnsibleModule'
|
||||||
|
MOCK_ANSIBLE_MODULEUTILS_URLS_FETCH_URL = 'ansible_collections.community.general.plugins.module_utils.hetzner.fetch_url'
|
||||||
|
|
||||||
# Tests for state (absent and present)
|
# Tests for state (absent and present)
|
||||||
|
|
||||||
|
def test_absent(self, mocker):
|
||||||
def test_absent(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall_info, {
|
||||||
result = run_module_success(mocker, hetzner_firewall_info, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -41,9 +46,8 @@ def test_absent(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
def test_present(self, mocker):
|
||||||
def test_present(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall_info, {
|
||||||
result = run_module_success(mocker, hetzner_firewall_info, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -69,9 +73,8 @@ def test_present(mocker):
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
assert len(result['firewall']['rules']['input']) == 0
|
assert len(result['firewall']['rules']['input']) == 0
|
||||||
|
|
||||||
|
def test_present_w_rules(self, mocker):
|
||||||
def test_present_w_rules(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall_info, {
|
||||||
result = run_module_success(mocker, hetzner_firewall_info, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -125,12 +128,11 @@ def test_present_w_rules(mocker):
|
||||||
assert result['firewall']['rules']['input'][1]['dst_port'] is None
|
assert result['firewall']['rules']['input'][1]['dst_port'] is None
|
||||||
assert result['firewall']['rules']['input'][1]['action'] == 'discard'
|
assert result['firewall']['rules']['input'][1]['action'] == 'discard'
|
||||||
|
|
||||||
|
|
||||||
# Tests for wait_for_configured in getting status
|
# Tests for wait_for_configured in getting status
|
||||||
|
|
||||||
|
def test_wait_get(self, mocker):
|
||||||
def test_wait_get(mocker):
|
mocker.patch('time.sleep', lambda duration: None)
|
||||||
result = run_module_success(mocker, hetzner_firewall_info, {
|
result = self.run_module_success(mocker, hetzner_firewall_info, {
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -170,9 +172,9 @@ def test_wait_get(mocker):
|
||||||
assert result['firewall']['server_ip'] == '1.2.3.4'
|
assert result['firewall']['server_ip'] == '1.2.3.4'
|
||||||
assert result['firewall']['server_number'] == 1
|
assert result['firewall']['server_number'] == 1
|
||||||
|
|
||||||
|
def test_wait_get_timeout(self, mocker):
|
||||||
def test_wait_get_timeout(mocker):
|
mocker.patch('time.sleep', lambda duration: None)
|
||||||
result = run_module_failed(mocker, hetzner_firewall_info, {
|
result = self.run_module_failed(mocker, hetzner_firewall_info, {
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
@ -210,9 +212,8 @@ def test_wait_get_timeout(mocker):
|
||||||
])
|
])
|
||||||
assert result['msg'] == 'Timeout while waiting for firewall to be configured.'
|
assert result['msg'] == 'Timeout while waiting for firewall to be configured.'
|
||||||
|
|
||||||
|
def test_nowait_get(self, mocker):
|
||||||
def test_nowait_get(mocker):
|
result = self.run_module_success(mocker, hetzner_firewall_info, {
|
||||||
result = run_module_success(mocker, hetzner_firewall_info, {
|
|
||||||
'hetzner_user': '',
|
'hetzner_user': '',
|
||||||
'hetzner_password': '',
|
'hetzner_password': '',
|
||||||
'server_ip': '1.2.3.4',
|
'server_ip': '1.2.3.4',
|
||||||
|
|
|
@ -60,6 +60,7 @@ retry ansible-galaxy -vvv collection install ansible.posix
|
||||||
# https://github.com/CiscoDevNet/ansible-intersight/issues/9
|
# https://github.com/CiscoDevNet/ansible-intersight/issues/9
|
||||||
retry ansible-galaxy -vvv collection install cisco.intersight:1.0.4
|
retry ansible-galaxy -vvv collection install cisco.intersight:1.0.4
|
||||||
retry ansible-galaxy -vvv collection install community.crypto
|
retry ansible-galaxy -vvv collection install community.crypto
|
||||||
|
retry ansible-galaxy -vvv collection install community.internal_test_tools
|
||||||
retry ansible-galaxy -vvv collection install community.kubernetes
|
retry ansible-galaxy -vvv collection install community.kubernetes
|
||||||
retry ansible-galaxy -vvv collection install google.cloud
|
retry ansible-galaxy -vvv collection install google.cloud
|
||||||
retry ansible-galaxy -vvv collection install ovirt.ovirt
|
retry ansible-galaxy -vvv collection install ovirt.ovirt
|
||||||
|
|
Loading…
Reference in New Issue