diff --git a/changelogs/fragments/9239-proxmox-backup-refactor.yml b/changelogs/fragments/9239-proxmox-backup-refactor.yml new file mode 100644 index 0000000000..4f73fe6dde --- /dev/null +++ b/changelogs/fragments/9239-proxmox-backup-refactor.yml @@ -0,0 +1,2 @@ +minor_changes: + - proxmox_backup - refactor permission checking to improve code readability and maintainability (https://github.com/ansible-collections/community.general/pull/9239). diff --git a/plugins/modules/proxmox_backup.py b/plugins/modules/proxmox_backup.py index 575d492bf9..0db2c4ad0e 100644 --- a/plugins/modules/proxmox_backup.py +++ b/plugins/modules/proxmox_backup.py @@ -6,6 +6,7 @@ # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import absolute_import, division, print_function + __metaclass__ = type @@ -230,13 +231,16 @@ backups: type: str ''' -from ansible_collections.community.general.plugins.module_utils.proxmox import ( - proxmox_auth_argument_spec, ProxmoxAnsible) -from ansible.module_utils.common.text.converters import to_native -from ansible.module_utils.basic import AnsibleModule - import time +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.common.text.converters import to_native +from ansible_collections.community.general.plugins.module_utils.proxmox import ProxmoxAnsible, proxmox_auth_argument_spec + + +def has_permission(permission_tree, permission, search_scopes, default=0, expected=1): + return any(permission_tree.get(scope, {}).get(permission, default) == expected for scope in search_scopes) + class ProxmoxBackupAnsible(ProxmoxAnsible): @@ -264,21 +268,20 @@ class ProxmoxBackupAnsible(ProxmoxAnsible): for node in node_endpoints: upid = self._post_vzdump(node, request_body) if upid != "OK": - tasklog = ", ".join( - [logentry["t"] for logentry in self._get_tasklog(node, upid)]) + tasklog = ", ".join(logentry["t"] for logentry in self._get_tasklog(node, upid)) else: tasklog = "" - task_ids.extend( - [{"node": node, "upid": upid, "status": "unknown", "log": "%s" % tasklog}]) + task_ids.extend([{"node": node, "upid": upid, "status": "unknown", "log": "%s" % tasklog}]) return task_ids def check_relevant_nodes(self, node): - nodes = [item["node"] for item in self._get_resources( - "node") if item["status"] == "online"] + nodes = [ + item["node"] + for item in self._get_resources("node") + if item["status"] == "online" + ] if node and node not in nodes: - self.module.fail_json( - msg="Node %s was specified, but does not exist on the cluster" % - node) + self.module.fail_json(msg="Node %s was specified, but does not exist on the cluster" % node) elif node: return [node] return nodes @@ -291,57 +294,28 @@ class ProxmoxBackupAnsible(ProxmoxAnsible): performance, retention): # Check for Datastore.AllocateSpace in the permission tree - if "/" in permissions.keys() and permissions["/"].get( - "Datastore.AllocateSpace", 0) == 1: - pass - elif "/storage" in permissions.keys() and permissions["/storage"].get("Datastore.AllocateSpace", 0) == 1: - pass - elif "/storage/" + storage in permissions.keys() and permissions["/storage/" + storage].get("Datastore.AllocateSpace", 0) == 1: - pass - else: - self.module.fail_json( - changed=False, - msg="Insufficient permission: Datastore.AllocateSpace is missing") - if (bandwidth or performance) and permissions["/"].get( - "Sys.Modify", 0) == 0: - self.module.fail_json( - changed=False, - msg="Insufficient permission: Performance_tweaks and bandwidth require 'Sys.Modify' permission for '/'") + if not has_permission(permissions, "Datastore.AllocateSpace", search_scopes=["/", "/storage/", "/storage/" + storage]): + self.module.fail_json(changed=False, msg="Insufficient permission: Datastore.AllocateSpace is missing") + + if (bandwidth or performance) and has_permission(permissions, "Sys.Modify", search_scopes=["/"], expected=0): + self.module.fail_json(changed=False, msg="Insufficient permission: Performance_tweaks and bandwidth require 'Sys.Modify' permission for '/'") + if retention: - if "/" in permissions.keys() and permissions["/"].get( - "Datastore.Allocate", 0) == 1: - pass - elif "/storage" in permissions.keys() and permissions["/storage"].get("Datastore.Allocate", 0) == 1: - pass - elif "/storage/" + storage in permissions.keys() and permissions["/storage/" + storage].get("Datastore.Allocate", 0) == 1: - pass - else: - self.module.fail_json( - changed=False, - msg="Insufficient permissions: Custom retention was requested, but Datastore.Allocate is missing") + if not has_permission(permissions, "Datastore.Allocate", search_scopes=["/", "/storage", "/storage/" + storage]): + self.module.fail_json(changed=False, msg="Insufficient permissions: Custom retention was requested, but Datastore.Allocate is missing") def check_vmid_backup_permission(self, permissions, vmids, pool): - sufficient_permissions = False - if "/" in permissions.keys() and permissions["/"].get( - "VM.Backup", 0) == 1: - sufficient_permissions = True - elif "/vms" in permissions.keys() and permissions["/vms"].get( - "VM.Backup", 0) == 1: - sufficient_permissions = True - elif pool and "/pool/" + pool in permissions.keys() and permissions["/pool/" + pool].get( - "VM.Backup", 0) == 1: - sufficient_permissions = True - elif pool and "/pool/" + pool + "/vms" in permissions.keys() and permissions["/pool/" + pool + "/vms"].get( - "VM.Backup", 0) == 1: - sufficient_permissions = True + sufficient_permissions = has_permission(permissions, "VM.Backup", search_scopes=["/", "/vms"]) + if pool and not sufficient_permissions: + sufficient_permissions = has_permission(permissions, "VM.Backup", search_scopes=["/pool/" + pool, "/pool/" + pool + "/vms"]) if not sufficient_permissions: # Since VM.Backup can be given for each vmid at a time, iterate through all of them # and check, if the permission is set failed_vmids = [] for vm in vmids: - if "/vms/" + \ - str(vm) in permissions.keys() and permissions["/vms/" + str(vm)].get("VM.Backup", 1) == 0: + vm_path = "/vms/" + str(vm) + if has_permission(permissions, "VM.Backup", search_scopes=[vm_path], default=1, expected=0): failed_vmids.append(str(vm)) if failed_vmids: self.module.fail_json( @@ -351,23 +325,11 @@ class ProxmoxBackupAnsible(ProxmoxAnsible): sufficient_permissions = True # Finally, when no check succeeded, fail if not sufficient_permissions: - self.module.fail_json( - changed=False, - msg="Insufficient permissions: You do not have the VM.Backup permission") + self.module.fail_json(changed=False, msg="Insufficient permissions: You do not have the VM.Backup permission") def check_general_backup_permission(self, permissions, pool): - if "/" in permissions.keys() and permissions["/"].get( - "VM.Backup", 0) == 1: - pass - elif "/vms" in permissions.keys() and permissions["/vms"].get("VM.Backup", 0) == 1: - pass - elif pool and "/pool/" + pool in permissions.keys() and permissions["/pool/" + pool].get( - "VM.Backup", 0) == 1: - pass - else: - self.module.fail_json( - changed=False, - msg="Insufficient permissions: You dont have the VM.Backup permission") + if not has_permission(permissions, "VM.Backup", search_scopes=["/", "/vms"] + (["/pool/" + pool] if pool else [])): + self.module.fail_json(changed=False, msg="Insufficient permissions: You dont have the VM.Backup permission") def check_if_storage_exists(self, storage, node): storages = self.get_storages(type=None) @@ -413,21 +375,19 @@ class ProxmoxBackupAnsible(ProxmoxAnsible): status = self._get_taskok(node["node"], node["upid"]) if status["status"] == "stopped" and status["exitstatus"] == "OK": node["status"] = "success" - if status["status"] == "stopped" and status["exitstatus"] in ( - "job errors",): + if status["status"] == "stopped" and status["exitstatus"] == "job errors": node["status"] = "failed" except Exception as e: - self.module.fail_json( - msg="Unable to retrieve API task ID from node %s: %s" % - (node["node"], e)) - if len([item for item in tasks if item["status"] - != "unknown"]) == len(tasks): + self.module.fail_json(msg="Unable to retrieve API task ID from node %s: %s" % (node["node"], e)) + if len([item for item in tasks if item["status"] != "unknown"]) == len(tasks): break if time.time() > start_time + timeout: - timeouted_nodes = [node["node"] - for node in tasks if node["status"] == "unknown"] - failed_nodes = [node["node"] - for node in tasks if node["status"] == "failed"] + timeouted_nodes = [ + node["node"] + for node in tasks + if node["status"] == "unknown" + ] + failed_nodes = [node["node"] for node in tasks if node["status"] == "failed"] if failed_nodes: self.module.fail_json( msg="Reached timeout while waiting for backup task. " @@ -443,8 +403,7 @@ class ProxmoxBackupAnsible(ProxmoxAnsible): error_logs = [] for node in tasks: if node["status"] == "failed": - tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog( - node["node"], node["upid"])]) + tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog(node["node"], node["upid"])]) error_logs.append("%s: %s" % (node, tasklog)) if error_logs: self.module.fail_json( @@ -453,9 +412,8 @@ class ProxmoxBackupAnsible(ProxmoxAnsible): ", ".join(error_logs)) for node in tasks: - tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog( - node["node"], node["upid"])]) - node["log"] = "%s" % tasklog + tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog(node["node"], node["upid"])]) + node["log"] = tasklog # Finally, reattach ok tasks to show, that all nodes were contacted tasks.extend(ok_tasks) @@ -516,8 +474,7 @@ class ProxmoxBackupAnsible(ProxmoxAnsible): # Create comma separated list from vmids, the API expects so if request_body.get("vmid"): - request_body.update( - {"vmid": ",".join([str(vmid) for vmid in request_body.get("vmid")])}) + request_body.update({"vmid": ",".join(str(vmid) for vmid in request_body["vmid"])}) # remove whitespaces from option strings for key in ("prune-backups", "performance"): @@ -550,26 +507,16 @@ class ProxmoxBackupAnsible(ProxmoxAnsible): def main(): module_args = proxmox_auth_argument_spec() backup_args = { - "backup_mode": {"type": "str", "default": "snapshot", "choices": [ - "snapshot", "suspend", "stop" - ]}, + "backup_mode": {"type": "str", "default": "snapshot", "choices": ["snapshot", "suspend", "stop"]}, "bandwidth": {"type": "int"}, - "change_detection_mode": {"type": "str", "choices": [ - "legacy", "data", "metadata" - ]}, - "compress": {"type": "str", "choices": [ - "0", "1", "gzip", "lzo", "zstd" - ]}, + "change_detection_mode": {"type": "str", "choices": ["legacy", "data", "metadata"]}, + "compress": {"type": "str", "choices": ["0", "1", "gzip", "lzo", "zstd"]}, "compression_threads": {"type": "int"}, "description": {"type": "str", "default": "{{guestname}}"}, "fleecing": {"type": "str"}, - "mode": {"type": "str", "required": True, "choices": [ - "include", "all", "pool" - ]}, + "mode": {"type": "str", "required": True, "choices": ["include", "all", "pool"]}, "node": {"type": "str"}, - "notification_mode": {"type": "str", "default": "auto", "choices": [ - "auto", "legacy-sendmail", "notification-system" - ]}, + "notification_mode": {"type": "str", "default": "auto", "choices": ["auto", "legacy-sendmail", "notification-system"]}, "performance_tweaks": {"type": "str"}, "pool": {"type": "str"}, "protected": {"type": "bool"}, @@ -611,21 +558,19 @@ def main(): proxmox.check_vmids(module.params["vmids"]) node_endpoints = proxmox.check_relevant_nodes(module.params["node"]) try: - result = proxmox.backup_create( - module.params, module.check_mode, node_endpoints) + result = proxmox.backup_create(module.params, module.check_mode, node_endpoints) except Exception as e: - module.fail_json( - msg="Creating backups failed with exception: %s" % to_native(e)) + module.fail_json(msg="Creating backups failed with exception: %s" % to_native(e)) + if module.check_mode: - module.exit_json(backups=result, changed=True, - msg="Backups would be created") + module.exit_json(backups=result, changed=True, msg="Backups would be created") + elif len([entry for entry in result if entry["upid"] == "OK"]) == len(result): - module.exit_json( - backups=result, - changed=False, - msg="Backup request sent to proxmox, no tasks created") + module.exit_json(backups=result, changed=False, msg="Backup request sent to proxmox, no tasks created") + elif module.params["wait"]: module.exit_json(backups=result, changed=True, msg="Backups succeeded") + else: module.exit_json(backups=result, changed=True, msg="Backup tasks created")