proxmox_backup: refactor permission checking (#9239)

* proxmox_backup: refactor permission checking

* add changelog frag

* Update plugins/modules/proxmox_backup.py

* Update plugins/modules/proxmox_backup.py

* Update plugins/modules/proxmox_backup.py

* Update plugins/modules/proxmox_backup.py

For consistency

* Update plugins/modules/proxmox_backup.py

* yet another missing slash
pull/9344/head
Alexei Znamensky 2024-12-24 06:56:37 +13:00 committed by GitHub
parent 28f36ae25c
commit 005c8f50db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 61 additions and 114 deletions

View File

@ -0,0 +1,2 @@
minor_changes:
- proxmox_backup - refactor permission checking to improve code readability and maintainability (https://github.com/ansible-collections/community.general/pull/9239).

View File

@ -6,6 +6,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later # SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
__metaclass__ = type __metaclass__ = type
@ -230,13 +231,16 @@ backups:
type: str type: str
''' '''
from ansible_collections.community.general.plugins.module_utils.proxmox import (
proxmox_auth_argument_spec, ProxmoxAnsible)
from ansible.module_utils.common.text.converters import to_native
from ansible.module_utils.basic import AnsibleModule
import time import time
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.general.plugins.module_utils.proxmox import ProxmoxAnsible, proxmox_auth_argument_spec
def has_permission(permission_tree, permission, search_scopes, default=0, expected=1):
return any(permission_tree.get(scope, {}).get(permission, default) == expected for scope in search_scopes)
class ProxmoxBackupAnsible(ProxmoxAnsible): class ProxmoxBackupAnsible(ProxmoxAnsible):
@ -264,21 +268,20 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
for node in node_endpoints: for node in node_endpoints:
upid = self._post_vzdump(node, request_body) upid = self._post_vzdump(node, request_body)
if upid != "OK": if upid != "OK":
tasklog = ", ".join( tasklog = ", ".join(logentry["t"] for logentry in self._get_tasklog(node, upid))
[logentry["t"] for logentry in self._get_tasklog(node, upid)])
else: else:
tasklog = "" tasklog = ""
task_ids.extend( task_ids.extend([{"node": node, "upid": upid, "status": "unknown", "log": "%s" % tasklog}])
[{"node": node, "upid": upid, "status": "unknown", "log": "%s" % tasklog}])
return task_ids return task_ids
def check_relevant_nodes(self, node): def check_relevant_nodes(self, node):
nodes = [item["node"] for item in self._get_resources( nodes = [
"node") if item["status"] == "online"] item["node"]
for item in self._get_resources("node")
if item["status"] == "online"
]
if node and node not in nodes: if node and node not in nodes:
self.module.fail_json( self.module.fail_json(msg="Node %s was specified, but does not exist on the cluster" % node)
msg="Node %s was specified, but does not exist on the cluster" %
node)
elif node: elif node:
return [node] return [node]
return nodes return nodes
@ -291,57 +294,28 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
performance, performance,
retention): retention):
# Check for Datastore.AllocateSpace in the permission tree # Check for Datastore.AllocateSpace in the permission tree
if "/" in permissions.keys() and permissions["/"].get( if not has_permission(permissions, "Datastore.AllocateSpace", search_scopes=["/", "/storage/", "/storage/" + storage]):
"Datastore.AllocateSpace", 0) == 1: self.module.fail_json(changed=False, msg="Insufficient permission: Datastore.AllocateSpace is missing")
pass
elif "/storage" in permissions.keys() and permissions["/storage"].get("Datastore.AllocateSpace", 0) == 1: if (bandwidth or performance) and has_permission(permissions, "Sys.Modify", search_scopes=["/"], expected=0):
pass self.module.fail_json(changed=False, msg="Insufficient permission: Performance_tweaks and bandwidth require 'Sys.Modify' permission for '/'")
elif "/storage/" + storage in permissions.keys() and permissions["/storage/" + storage].get("Datastore.AllocateSpace", 0) == 1:
pass
else:
self.module.fail_json(
changed=False,
msg="Insufficient permission: Datastore.AllocateSpace is missing")
if (bandwidth or performance) and permissions["/"].get(
"Sys.Modify", 0) == 0:
self.module.fail_json(
changed=False,
msg="Insufficient permission: Performance_tweaks and bandwidth require 'Sys.Modify' permission for '/'")
if retention: if retention:
if "/" in permissions.keys() and permissions["/"].get( if not has_permission(permissions, "Datastore.Allocate", search_scopes=["/", "/storage", "/storage/" + storage]):
"Datastore.Allocate", 0) == 1: self.module.fail_json(changed=False, msg="Insufficient permissions: Custom retention was requested, but Datastore.Allocate is missing")
pass
elif "/storage" in permissions.keys() and permissions["/storage"].get("Datastore.Allocate", 0) == 1:
pass
elif "/storage/" + storage in permissions.keys() and permissions["/storage/" + storage].get("Datastore.Allocate", 0) == 1:
pass
else:
self.module.fail_json(
changed=False,
msg="Insufficient permissions: Custom retention was requested, but Datastore.Allocate is missing")
def check_vmid_backup_permission(self, permissions, vmids, pool): def check_vmid_backup_permission(self, permissions, vmids, pool):
sufficient_permissions = False sufficient_permissions = has_permission(permissions, "VM.Backup", search_scopes=["/", "/vms"])
if "/" in permissions.keys() and permissions["/"].get( if pool and not sufficient_permissions:
"VM.Backup", 0) == 1: sufficient_permissions = has_permission(permissions, "VM.Backup", search_scopes=["/pool/" + pool, "/pool/" + pool + "/vms"])
sufficient_permissions = True
elif "/vms" in permissions.keys() and permissions["/vms"].get(
"VM.Backup", 0) == 1:
sufficient_permissions = True
elif pool and "/pool/" + pool in permissions.keys() and permissions["/pool/" + pool].get(
"VM.Backup", 0) == 1:
sufficient_permissions = True
elif pool and "/pool/" + pool + "/vms" in permissions.keys() and permissions["/pool/" + pool + "/vms"].get(
"VM.Backup", 0) == 1:
sufficient_permissions = True
if not sufficient_permissions: if not sufficient_permissions:
# Since VM.Backup can be given for each vmid at a time, iterate through all of them # Since VM.Backup can be given for each vmid at a time, iterate through all of them
# and check, if the permission is set # and check, if the permission is set
failed_vmids = [] failed_vmids = []
for vm in vmids: for vm in vmids:
if "/vms/" + \ vm_path = "/vms/" + str(vm)
str(vm) in permissions.keys() and permissions["/vms/" + str(vm)].get("VM.Backup", 1) == 0: if has_permission(permissions, "VM.Backup", search_scopes=[vm_path], default=1, expected=0):
failed_vmids.append(str(vm)) failed_vmids.append(str(vm))
if failed_vmids: if failed_vmids:
self.module.fail_json( self.module.fail_json(
@ -351,23 +325,11 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
sufficient_permissions = True sufficient_permissions = True
# Finally, when no check succeeded, fail # Finally, when no check succeeded, fail
if not sufficient_permissions: if not sufficient_permissions:
self.module.fail_json( self.module.fail_json(changed=False, msg="Insufficient permissions: You do not have the VM.Backup permission")
changed=False,
msg="Insufficient permissions: You do not have the VM.Backup permission")
def check_general_backup_permission(self, permissions, pool): def check_general_backup_permission(self, permissions, pool):
if "/" in permissions.keys() and permissions["/"].get( if not has_permission(permissions, "VM.Backup", search_scopes=["/", "/vms"] + (["/pool/" + pool] if pool else [])):
"VM.Backup", 0) == 1: self.module.fail_json(changed=False, msg="Insufficient permissions: You dont have the VM.Backup permission")
pass
elif "/vms" in permissions.keys() and permissions["/vms"].get("VM.Backup", 0) == 1:
pass
elif pool and "/pool/" + pool in permissions.keys() and permissions["/pool/" + pool].get(
"VM.Backup", 0) == 1:
pass
else:
self.module.fail_json(
changed=False,
msg="Insufficient permissions: You dont have the VM.Backup permission")
def check_if_storage_exists(self, storage, node): def check_if_storage_exists(self, storage, node):
storages = self.get_storages(type=None) storages = self.get_storages(type=None)
@ -413,21 +375,19 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
status = self._get_taskok(node["node"], node["upid"]) status = self._get_taskok(node["node"], node["upid"])
if status["status"] == "stopped" and status["exitstatus"] == "OK": if status["status"] == "stopped" and status["exitstatus"] == "OK":
node["status"] = "success" node["status"] = "success"
if status["status"] == "stopped" and status["exitstatus"] in ( if status["status"] == "stopped" and status["exitstatus"] == "job errors":
"job errors",):
node["status"] = "failed" node["status"] = "failed"
except Exception as e: except Exception as e:
self.module.fail_json( self.module.fail_json(msg="Unable to retrieve API task ID from node %s: %s" % (node["node"], e))
msg="Unable to retrieve API task ID from node %s: %s" % if len([item for item in tasks if item["status"] != "unknown"]) == len(tasks):
(node["node"], e))
if len([item for item in tasks if item["status"]
!= "unknown"]) == len(tasks):
break break
if time.time() > start_time + timeout: if time.time() > start_time + timeout:
timeouted_nodes = [node["node"] timeouted_nodes = [
for node in tasks if node["status"] == "unknown"] node["node"]
failed_nodes = [node["node"] for node in tasks
for node in tasks if node["status"] == "failed"] if node["status"] == "unknown"
]
failed_nodes = [node["node"] for node in tasks if node["status"] == "failed"]
if failed_nodes: if failed_nodes:
self.module.fail_json( self.module.fail_json(
msg="Reached timeout while waiting for backup task. " msg="Reached timeout while waiting for backup task. "
@ -443,8 +403,7 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
error_logs = [] error_logs = []
for node in tasks: for node in tasks:
if node["status"] == "failed": if node["status"] == "failed":
tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog( tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog(node["node"], node["upid"])])
node["node"], node["upid"])])
error_logs.append("%s: %s" % (node, tasklog)) error_logs.append("%s: %s" % (node, tasklog))
if error_logs: if error_logs:
self.module.fail_json( self.module.fail_json(
@ -453,9 +412,8 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
", ".join(error_logs)) ", ".join(error_logs))
for node in tasks: for node in tasks:
tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog( tasklog = ", ".join([logentry["t"] for logentry in self._get_tasklog(node["node"], node["upid"])])
node["node"], node["upid"])]) node["log"] = tasklog
node["log"] = "%s" % tasklog
# Finally, reattach ok tasks to show, that all nodes were contacted # Finally, reattach ok tasks to show, that all nodes were contacted
tasks.extend(ok_tasks) tasks.extend(ok_tasks)
@ -516,8 +474,7 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
# Create comma separated list from vmids, the API expects so # Create comma separated list from vmids, the API expects so
if request_body.get("vmid"): if request_body.get("vmid"):
request_body.update( request_body.update({"vmid": ",".join(str(vmid) for vmid in request_body["vmid"])})
{"vmid": ",".join([str(vmid) for vmid in request_body.get("vmid")])})
# remove whitespaces from option strings # remove whitespaces from option strings
for key in ("prune-backups", "performance"): for key in ("prune-backups", "performance"):
@ -550,26 +507,16 @@ class ProxmoxBackupAnsible(ProxmoxAnsible):
def main(): def main():
module_args = proxmox_auth_argument_spec() module_args = proxmox_auth_argument_spec()
backup_args = { backup_args = {
"backup_mode": {"type": "str", "default": "snapshot", "choices": [ "backup_mode": {"type": "str", "default": "snapshot", "choices": ["snapshot", "suspend", "stop"]},
"snapshot", "suspend", "stop"
]},
"bandwidth": {"type": "int"}, "bandwidth": {"type": "int"},
"change_detection_mode": {"type": "str", "choices": [ "change_detection_mode": {"type": "str", "choices": ["legacy", "data", "metadata"]},
"legacy", "data", "metadata" "compress": {"type": "str", "choices": ["0", "1", "gzip", "lzo", "zstd"]},
]},
"compress": {"type": "str", "choices": [
"0", "1", "gzip", "lzo", "zstd"
]},
"compression_threads": {"type": "int"}, "compression_threads": {"type": "int"},
"description": {"type": "str", "default": "{{guestname}}"}, "description": {"type": "str", "default": "{{guestname}}"},
"fleecing": {"type": "str"}, "fleecing": {"type": "str"},
"mode": {"type": "str", "required": True, "choices": [ "mode": {"type": "str", "required": True, "choices": ["include", "all", "pool"]},
"include", "all", "pool"
]},
"node": {"type": "str"}, "node": {"type": "str"},
"notification_mode": {"type": "str", "default": "auto", "choices": [ "notification_mode": {"type": "str", "default": "auto", "choices": ["auto", "legacy-sendmail", "notification-system"]},
"auto", "legacy-sendmail", "notification-system"
]},
"performance_tweaks": {"type": "str"}, "performance_tweaks": {"type": "str"},
"pool": {"type": "str"}, "pool": {"type": "str"},
"protected": {"type": "bool"}, "protected": {"type": "bool"},
@ -611,21 +558,19 @@ def main():
proxmox.check_vmids(module.params["vmids"]) proxmox.check_vmids(module.params["vmids"])
node_endpoints = proxmox.check_relevant_nodes(module.params["node"]) node_endpoints = proxmox.check_relevant_nodes(module.params["node"])
try: try:
result = proxmox.backup_create( result = proxmox.backup_create(module.params, module.check_mode, node_endpoints)
module.params, module.check_mode, node_endpoints)
except Exception as e: except Exception as e:
module.fail_json( module.fail_json(msg="Creating backups failed with exception: %s" % to_native(e))
msg="Creating backups failed with exception: %s" % to_native(e))
if module.check_mode: if module.check_mode:
module.exit_json(backups=result, changed=True, module.exit_json(backups=result, changed=True, msg="Backups would be created")
msg="Backups would be created")
elif len([entry for entry in result if entry["upid"] == "OK"]) == len(result): elif len([entry for entry in result if entry["upid"] == "OK"]) == len(result):
module.exit_json( module.exit_json(backups=result, changed=False, msg="Backup request sent to proxmox, no tasks created")
backups=result,
changed=False,
msg="Backup request sent to proxmox, no tasks created")
elif module.params["wait"]: elif module.params["wait"]:
module.exit_json(backups=result, changed=True, msg="Backups succeeded") module.exit_json(backups=result, changed=True, msg="Backups succeeded")
else: else:
module.exit_json(backups=result, changed=True, module.exit_json(backups=result, changed=True,
msg="Backup tasks created") msg="Backup tasks created")