[PR #9239/005c8f50 backport][stable-10] proxmox_backup: refactor permission checking (#9341)

proxmox_backup: refactor permission checking (#9239)

* proxmox_backup: refactor permission checking

* add changelog frag

* Update plugins/modules/proxmox_backup.py

* Update plugins/modules/proxmox_backup.py

* Update plugins/modules/proxmox_backup.py

* Update plugins/modules/proxmox_backup.py

For consistency

* Update plugins/modules/proxmox_backup.py

* yet another missing slash

(cherry picked from commit 005c8f50db)

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
pull/9346/head
patchback[bot] 2024-12-23 19:20:29 +01:00 committed by GitHub
parent 474c7a7240
commit 2d12c6678c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 61 additions and 114 deletions

View File

@ -0,0 +1,2 @@
minor_changes:
- proxmox_backup - refactor permission checking to improve code readability and maintainability (https://github.com/ansible-collections/community.general/pull/9239).

View File

@ -6,6 +6,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function
__metaclass__ = type
@ -230,13 +231,16 @@ backups:
type: str
'''
from ansible_collections.community.general.plugins.module_utils.proxmox import (
proxmox_auth_argument_spec, ProxmoxAnsible)
from ansible.module_utils.common.text.converters import to_native
from ansible.module_utils.basic import AnsibleModule
import time
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.general.plugins.module_utils.proxmox import ProxmoxAnsible, proxmox_auth_argument_spec
def has_permission(permission_tree, permission, search_scopes, default=0, expected=1):
return any(permission_tree.get(scope, {}).get(permission, default) == expected for scope in search_scopes)
class ProxmoxBackupAnsible(ProxmoxAnsible):
@ -264,21 +268,20 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
for node in node_endpoints:
upid = self._post_vzdump(node, request_body)
if upid != "OK":
tasklog = ", ".join(
[logentry["t"] for logentry in self._get_tasklog(node, upid)])
tasklog = ", ".join(logentry["t"] for logentry in self._get_tasklog(node, upid))
else:
tasklog = ""
task_ids.extend(
[{"node": node, "upid": upid, "status": "unknown", "log": "%s" % tasklog}])
task_ids.extend([{"node": node, "upid": upid, "status": "unknown", "log": "%s" % tasklog}])
return task_ids
def check_relevant_nodes(self, node):
nodes = [item["node"] for item in self._get_resources(
"node") if item["status"] == "online"]
nodes = [
item["node"]
for item in self._get_resources("node")
if item["status"] == "online"
]
if node and node not in nodes:
self.module.fail_json(
msg="Node %s was specified, but does not exist on the cluster" %
node)
self.module.fail_json(msg="Node %s was specified, but does not exist on the cluster" % node)
elif node:
return [node]
return nodes
@ -291,57 +294,28 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
performance,
retention):
# Check for Datastore.AllocateSpace in the permission tree
if "/" in permissions.keys() and permissions["/"].get(
"Datastore.AllocateSpace", 0) == 1:
pass
elif "/storage" in permissions.keys() and permissions["/storage"].get("Datastore.AllocateSpace", 0) == 1:
pass
elif "/storage/" + storage in permissions.keys() and permissions["/storage/" + storage].get("Datastore.AllocateSpace", 0) == 1:
pass
else:
self.module.fail_json(
changed=False,
msg="Insufficient permission: Datastore.AllocateSpace is missing")
if (bandwidth or performance) and permissions["/"].get(
"Sys.Modify", 0) == 0:
self.module.fail_json(
changed=False,
msg="Insufficient permission: Performance_tweaks and bandwidth require 'Sys.Modify' permission for '/'")
if not has_permission(permissions, "Datastore.AllocateSpace", search_scopes=["/", "/storage/", "/storage/" + storage]):
self.module.fail_json(changed=False, msg="Insufficient permission: Datastore.AllocateSpace is missing")
if (bandwidth or performance) and has_permission(permissions, "Sys.Modify", search_scopes=["/"], expected=0):
self.module.fail_json(changed=False, msg="Insufficient permission: Performance_tweaks and bandwidth require 'Sys.Modify' permission for '/'")
if retention:
if "/" in permissions.keys() and permissions["/"].get(
"Datastore.Allocate", 0) == 1:
pass
elif "/storage" in permissions.keys() and permissions["/storage"].get("Datastore.Allocate", 0) == 1:
pass
elif "/storage/" + storage in permissions.keys() and permissions["/storage/" + storage].get("Datastore.Allocate", 0) == 1:
pass
else:
self.module.fail_json(
changed=False,
msg="Insufficient permissions: Custom retention was requested, but Datastore.Allocate is missing")
if not has_permission(permissions, "Datastore.Allocate", search_scopes=["/", "/storage", "/storage/" + storage]):
self.module.fail_json(changed=False, msg="Insufficient permissions: Custom retention was requested, but Datastore.Allocate is missing")
def check_vmid_backup_permission(self, permissions, vmids, pool):
sufficient_permissions = False
if "/" in permissions.keys() and permissions["/"].get(
"VM.Backup", 0) == 1:
sufficient_permissions = True
elif "/vms" in permissions.keys() and permissions["/vms"].get(
"VM.Backup", 0) == 1:
sufficient_permissions = True
elif pool and "/pool/" + pool in permissions.keys() and permissions["/pool/" + pool].get(
"VM.Backup", 0) == 1:
sufficient_permissions = True
elif pool and "/pool/" + pool + "/vms" in permissions.keys() and permissions["/pool/" + pool + "/vms"].get(
"VM.Backup", 0) == 1:
sufficient_permissions = True
sufficient_permissions = has_permission(permissions, "VM.Backup", search_scopes=["/", "/vms"])
if pool and not sufficient_permissions:
sufficient_permissions = has_permission(permissions, "VM.Backup", search_scopes=["/pool/" + pool, "/pool/" + pool + "/vms"])
if not sufficient_permissions:
# Since VM.Backup can be given for each vmid at a time, iterate through all of them
# and check, if the permission is set
failed_vmids = []
for vm in vmids:
if "/vms/" + \
str(vm) in permissions.keys() and permissions["/vms/" + str(vm)].get("VM.Backup", 1) == 0:
vm_path = "/vms/" + str(vm)
if has_permission(permissions, "VM.Backup", search_scopes=[vm_path], default=1, expected=0):
failed_vmids.append(str(vm))
if failed_vmids:
self.module.fail_json(
@ -351,23 +325,11 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
sufficient_permissions = True
# Finally, when no check succeeded, fail
if not sufficient_permissions:
self.module.fail_json(
changed=False,
msg="Insufficient permissions: You do not have the VM.Backup permission")
self.module.fail_json(changed=False, msg="Insufficient permissions: You do not have the VM.Backup permission")
def check_general_backup_permission(self, permissions, pool):
if "/" in permissions.keys() and permissions["/"].get(
"VM.Backup", 0) == 1:
pass
elif "/vms" in permissions.keys() and permissions["/vms"].get("VM.Backup", 0) == 1:
pass
elif pool and "/pool/" + pool in permissions.keys() and permissions["/pool/" + pool].get(
"VM.Backup", 0) == 1:
pass
else:
self.module.fail_json(
changed=False,
msg="Insufficient permissions: You dont have the VM.Backup permission")
if not has_permission(permissions, "VM.Backup", search_scopes=["/", "/vms"] + (["/pool/" + pool] if pool else [])):
self.module.fail_json(changed=False, msg="Insufficient permissions: You dont have the VM.Backup permission")
def check_if_storage_exists(self, storage, node):
storages = self.get_storages(type=None)
@ -413,21 +375,19 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
status = self._get_taskok(node["node"], node["upid"])
if status["status"] == "stopped" and status["exitstatus"] == "OK":
node["status"] = "success"
if status["status"] == "stopped" and status["exitstatus"] in (
"job errors",):
if status["status"] == "stopped" and status["exitstatus"] == "job errors":
node["status"] = "failed"
except Exception as e:
self.module.fail_json(
msg="Unable to retrieve API task ID from node %s: %s" %
(node["node"], e))
if len([item for item in tasks if item["status"]
!= "unknown"]) == len(tasks):
self.module.fail_json(msg="Unable to retrieve API task ID from node %s: %s" % (node["node"], e))
if len([item for item in tasks if item["status"] != "unknown"]) == len(tasks):
break
if time.time() > start_time + timeout:
timeouted_nodes = [node["node"]
for node in tasks if node["status"] == "unknown"]
failed_nodes = [node["node"]
for node in tasks if node["status"] == "failed"]
timeouted_nodes = [
node["node"]
for node in tasks
if node["status"] == "unknown"
]
failed_nodes = [node["node"] for node in tasks if node["status"] == "failed"]
if failed_nodes:
self.module.fail_json(
msg="Reached timeout while waiting for backup task. "
@ -443,8 +403,7 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
error_logs = []
for node in tasks:
if node["status"] == "failed":
tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog(
node["node"], node["upid"])])
tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog(node["node"], node["upid"])])
error_logs.append("%s: %s" % (node, tasklog))
if error_logs:
self.module.fail_json(
@ -453,9 +412,8 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
", ".join(error_logs))
for node in tasks:
tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog(
node["node"], node["upid"])])
node["log"] = "%s" % tasklog
tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog(node["node"], node["upid"])])
node["log"] = tasklog
# Finally, reattach ok tasks to show, that all nodes were contacted
tasks.extend(ok_tasks)
@ -516,8 +474,7 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
# Create comma separated list from vmids, the API expects so
if request_body.get("vmid"):
request_body.update(
{"vmid": ",".join([str(vmid) for vmid in request_body.get("vmid")])})
request_body.update({"vmid": ",".join(str(vmid) for vmid in request_body["vmid"])})
# remove whitespaces from option strings
for key in ("prune-backups", "performance"):
@ -550,26 +507,16 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
def main():
module_args = proxmox_auth_argument_spec()
backup_args = {
"backup_mode": {"type": "str", "default": "snapshot", "choices": [
"snapshot", "suspend", "stop"
]},
"backup_mode": {"type": "str", "default": "snapshot", "choices": ["snapshot", "suspend", "stop"]},
"bandwidth": {"type": "int"},
"change_detection_mode": {"type": "str", "choices": [
"legacy", "data", "metadata"
]},
"compress": {"type": "str", "choices": [
"0", "1", "gzip", "lzo", "zstd"
]},
"change_detection_mode": {"type": "str", "choices": ["legacy", "data", "metadata"]},
"compress": {"type": "str", "choices": ["0", "1", "gzip", "lzo", "zstd"]},
"compression_threads": {"type": "int"},
"description": {"type": "str", "default": "{{guestname}}"},
"fleecing": {"type": "str"},
"mode": {"type": "str", "required": True, "choices": [
"include", "all", "pool"
]},
"mode": {"type": "str", "required": True, "choices": ["include", "all", "pool"]},
"node": {"type": "str"},
"notification_mode": {"type": "str", "default": "auto", "choices": [
"auto", "legacy-sendmail", "notification-system"
]},
"notification_mode": {"type": "str", "default": "auto", "choices": ["auto", "legacy-sendmail", "notification-system"]},
"performance_tweaks": {"type": "str"},
"pool": {"type": "str"},
"protected": {"type": "bool"},
@ -611,21 +558,19 @@ def main():
proxmox.check_vmids(module.params["vmids"])
node_endpoints = proxmox.check_relevant_nodes(module.params["node"])
try:
result = proxmox.backup_create(
module.params, module.check_mode, node_endpoints)
result = proxmox.backup_create(module.params, module.check_mode, node_endpoints)
except Exception as e:
module.fail_json(
msg="Creating backups failed with exception: %s" % to_native(e))
module.fail_json(msg="Creating backups failed with exception: %s" % to_native(e))
if module.check_mode:
module.exit_json(backups=result, changed=True,
msg="Backups would be created")
module.exit_json(backups=result, changed=True, msg="Backups would be created")
elif len([entry for entry in result if entry["upid"] == "OK"]) == len(result):
module.exit_json(
backups=result,
changed=False,
msg="Backup request sent to proxmox, no tasks created")
module.exit_json(backups=result, changed=False, msg="Backup request sent to proxmox, no tasks created")
elif module.params["wait"]:
module.exit_json(backups=result, changed=True, msg="Backups succeeded")
else:
module.exit_json(backups=result, changed=True,
msg="Backup tasks created")