# (c) 2012-2014, Michael DeHaan # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . # Make coding more python3-ish from __future__ import (absolute_import, division, print_function) __metaclass__ = type import copy import os import json import subprocess import tempfile from yaml import YAMLError from ansible.compat.six import text_type, string_types from ansible.errors import AnsibleFileNotFound, AnsibleParserError, AnsibleError from ansible.errors.yaml_strings import YAML_SYNTAX_ERROR from ansible.module_utils.basic import is_executable from ansible.module_utils._text import to_bytes, to_native, to_text from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file from ansible.parsing.quoting import unquote from ansible.parsing.yaml.loader import AnsibleLoader from ansible.parsing.yaml.objects import AnsibleBaseYAMLObject, AnsibleUnicode from ansible.utils.path import unfrackpath try: from __main__ import display except ImportError: from ansible.utils.display import Display display = Display() class DataLoader(): ''' The DataLoader class is used to load and parse YAML or JSON content, either from a given file name or from a string that was previously read in through other means. A Vault password can be specified, and any vault-encrypted files will be decrypted. Data read from files will also be cached, so the file will never be read from disk more than once. Usage: dl = DataLoader() # optionally: dl.set_vault_password('foo') ds = dl.load('...') ds = dl.load_from_file('/path/to/file') ''' def __init__(self): self._basedir = '.' self._FILE_CACHE = dict() self._tempfiles = set() # initialize the vault stuff with an empty password self.set_vault_password(None) def set_vault_password(self, vault_password): self._vault_password = vault_password self._vault = VaultLib(password=vault_password) def load(self, data, file_name='', show_content=True): ''' Creates a python datastructure from the given data, which can be either a JSON or YAML string. ''' new_data = None try: # we first try to load this data as JSON new_data = json.loads(data) except: # must not be JSON, let the rest try if isinstance(data, AnsibleUnicode): # The PyYAML's libyaml bindings use PyUnicode_CheckExact so # they are unable to cope with our subclass. # Unwrap and re-wrap the unicode so we can keep track of line # numbers in_data = text_type(data) else: in_data = data try: new_data = self._safe_load(in_data, file_name=file_name) except YAMLError as yaml_exc: self._handle_error(yaml_exc, file_name, show_content) if isinstance(data, AnsibleUnicode): new_data = AnsibleUnicode(new_data) new_data.ansible_pos = data.ansible_pos return new_data def load_from_file(self, file_name): ''' Loads data from a file, which can contain either JSON or YAML. ''' file_name = self.path_dwim(file_name) # if the file has already been read in and cached, we'll # return those results to avoid more file/vault operations if file_name in self._FILE_CACHE: parsed_data = self._FILE_CACHE[file_name] else: # read the file contents and load the data structure from them (b_file_data, show_content) = self._get_file_contents(file_name) file_data = to_text(b_file_data, errors='surrogate_or_strict') parsed_data = self.load(data=file_data, file_name=file_name, show_content=show_content) # cache the file contents for next time self._FILE_CACHE[file_name] = parsed_data # return a deep copy here, so the cache is not affected return copy.deepcopy(parsed_data) def path_exists(self, path): path = self.path_dwim(path) return os.path.exists(to_bytes(path, errors='surrogate_or_strict')) def is_file(self, path): path = self.path_dwim(path) return os.path.isfile(to_bytes(path, errors='surrogate_or_strict')) or path == os.devnull def is_directory(self, path): path = self.path_dwim(path) return os.path.isdir(to_bytes(path, errors='surrogate_or_strict')) def list_directory(self, path): path = self.path_dwim(path) return os.listdir(path) def is_executable(self, path): '''is the given path executable?''' path = self.path_dwim(path) return is_executable(path) def _safe_load(self, stream, file_name=None): ''' Implements yaml.safe_load(), except using our custom loader class. ''' loader = AnsibleLoader(stream, file_name, self._vault_password) try: return loader.get_single_data() finally: try: loader.dispose() except AttributeError: pass # older versions of yaml don't have dispose function, ignore def _get_file_contents(self, file_name): ''' Reads the file contents from the given file name, and will decrypt them if they are found to be vault-encrypted. ''' if not file_name or not isinstance(file_name, string_types): raise AnsibleParserError("Invalid filename: '%s'" % str(file_name)) b_file_name = to_bytes(file_name) if not self.path_exists(b_file_name) or not self.is_file(b_file_name): raise AnsibleFileNotFound("the file_name '%s' does not exist, or is not readable" % file_name) show_content = True try: with open(b_file_name, 'rb') as f: data = f.read() if is_encrypted(data): data = self._vault.decrypt(data, filename=b_file_name) show_content = False return (data, show_content) except (IOError, OSError) as e: raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (file_name, str(e))) def _handle_error(self, yaml_exc, file_name, show_content): ''' Optionally constructs an object (AnsibleBaseYAMLObject) to encapsulate the file name/position where a YAML exception occurred, and raises an AnsibleParserError to display the syntax exception information. ''' # if the YAML exception contains a problem mark, use it to construct # an object the error class can use to display the faulty line err_obj = None if hasattr(yaml_exc, 'problem_mark'): err_obj = AnsibleBaseYAMLObject() err_obj.ansible_pos = (file_name, yaml_exc.problem_mark.line + 1, yaml_exc.problem_mark.column + 1) raise AnsibleParserError(YAML_SYNTAX_ERROR, obj=err_obj, show_content=show_content) def get_basedir(self): ''' returns the current basedir ''' return self._basedir def set_basedir(self, basedir): ''' sets the base directory, used to find files when a relative path is given ''' if basedir is not None: self._basedir = to_text(basedir) def path_dwim(self, given): ''' make relative paths work like folks expect. ''' given = unquote(given) given = to_text(given, errors='surrogate_or_strict') if given.startswith(u"/"): return os.path.abspath(given) elif given.startswith(u"~"): return os.path.abspath(os.path.expanduser(given)) else: basedir = to_text(self._basedir, errors='surrogate_or_strict') return os.path.abspath(os.path.join(basedir, given)) def path_dwim_relative(self, path, dirname, source): ''' find one file in either a role or playbook dir with or without explicitly named dirname subdirs Used in action plugins and lookups to find supplemental files that could be in either place. ''' search = [] isrole = False # I have full path, nothing else needs to be looked at if source.startswith('~') or source.startswith(os.path.sep): search.append(self.path_dwim(source)) else: # base role/play path + templates/files/vars + relative filename search.append(os.path.join(path, dirname, source)) basedir = unfrackpath(path) # is it a role and if so make sure you get correct base path if path.endswith('tasks') and os.path.exists(to_bytes(os.path.join(path,'main.yml'), errors='surrogate_or_strict')) \ or os.path.exists(to_bytes(os.path.join(path,'tasks/main.yml'), errors='surrogate_or_strict')): isrole = True if path.endswith('tasks'): basedir = unfrackpath(os.path.dirname(path)) cur_basedir = self._basedir self.set_basedir(basedir) # resolved base role/play path + templates/files/vars + relative filename search.append(self.path_dwim(os.path.join(basedir, dirname, source))) self.set_basedir(cur_basedir) if isrole and not source.endswith(dirname): # look in role's tasks dir w/o dirname search.append(self.path_dwim(os.path.join(basedir, 'tasks', source))) # try to create absolute path for loader basedir + templates/files/vars + filename search.append(self.path_dwim(os.path.join(dirname,source))) search.append(self.path_dwim(os.path.join(basedir, source))) # try to create absolute path for loader basedir + filename search.append(self.path_dwim(source)) for candidate in search: if os.path.exists(to_bytes(candidate, errors='surrogate_or_strict')): break return candidate def path_dwim_relative_stack(self, paths, dirname, source): ''' find one file in first path in stack taking roles into account and adding play basedir as fallback :arg paths: A list of text strings which are the paths to look for the filename in. :arg dirname: A text string representing a directory. The directory is prepended to the source to form the path to search for. :arg source: A text string which is the filename to search for :rtype: A text string :returns: An absolute path to the filename ``source`` ''' b_dirname = to_bytes(dirname) b_source = to_bytes(source) result = None if source is None: display.warning('Invalid request to find a file that matches a "null" value') elif source and (source.startswith('~') or source.startswith(os.path.sep)): # path is absolute, no relative needed, check existence and return source test_path = unfrackpath(b_source) if os.path.exists(to_bytes(test_path, errors='surrogate_or_strict')): result = test_path else: search = [] display.debug(u'evaluation_path:\n\t%s' % '\n\t'.join(paths)) for path in paths: upath = unfrackpath(path) b_upath = to_bytes(upath, errors='surrogate_or_strict') b_mydir = os.path.dirname(b_upath) # if path is in role and 'tasks' not there already, add it into the search if b_upath.endswith(b'tasks') and os.path.exists(os.path.join(b_upath, b'main.yml')) \ or os.path.exists(os.path.join(b_upath, b'tasks/main.yml')) \ or os.path.exists(os.path.join(b_mydir, b'tasks/main.yml')): if b_mydir.endswith(b'tasks'): search.append(os.path.join(os.path.dirname(b_mydir), b_dirname, b_source)) search.append(os.path.join(b_mydir, b_source)) else: # don't add dirname if user already is using it in source if b_source.split(b'/')[0] != b_dirname: search.append(os.path.join(b_upath, b_dirname, b_source)) search.append(os.path.join(b_upath, b_source)) elif b_dirname not in b_source.split(b'/'): # don't add dirname if user already is using it in source if b_source.split(b'/')[0] != dirname: search.append(os.path.join(b_upath, b_dirname, b_source)) search.append(os.path.join(b_upath, b_source)) # always append basedir as last resort # don't add dirname if user already is using it in source if b_source.split(b'/')[0] != dirname: search.append(os.path.join(to_bytes(self.get_basedir()), b_dirname, b_source)) search.append(os.path.join(to_bytes(self.get_basedir()), b_source)) display.debug(u'search_path:\n\t%s' % to_text(b'\n\t'.join(search))) for b_candidate in search: display.vvvvv(u'looking for "%s" at "%s"' % (source, to_text(b_candidate))) if os.path.exists(b_candidate): result = to_text(b_candidate) break return result def _create_content_tempfile(self, content): ''' Create a tempfile containing defined content ''' fd, content_tempfile = tempfile.mkstemp() f = os.fdopen(fd, 'wb') content = to_bytes(content) try: f.write(content) except Exception as err: os.remove(content_tempfile) raise Exception(err) finally: f.close() return content_tempfile def get_real_file(self, file_path): """ If the file is vault encrypted return a path to a temporary decrypted file If the file is not encrypted then the path is returned Temporary files are cleanup in the destructor """ if not file_path or not isinstance(file_path, string_types): raise AnsibleParserError("Invalid filename: '%s'" % to_native(file_path)) b_file_path = to_bytes(file_path, errors='surrogate_or_strict') if not self.path_exists(b_file_path) or not self.is_file(b_file_path): raise AnsibleFileNotFound("the file_name '%s' does not exist, or is not readable" % to_native(file_path)) if not self._vault: self._vault = VaultLib(password="") real_path = self.path_dwim(file_path) try: with open(to_bytes(real_path), 'rb') as f: # Limit how much of the file is read since we do not know # whether this is a vault file and therefore it could be very # large. if is_encrypted_file(f, count=len(b_HEADER)): # if the file is encrypted and no password was specified, # the decrypt call would throw an error, but we check first # since the decrypt function doesn't know the file name data = f.read() if not self._vault_password: raise AnsibleParserError("A vault password must be specified to decrypt %s" % file_path) data = self._vault.decrypt(data, filename=real_path) # Make a temp file real_path = self._create_content_tempfile(data) self._tempfiles.add(real_path) return real_path except (IOError, OSError) as e: raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (to_native(real_path), to_native(e))) def cleanup_tmp_file(self, file_path): """ Removes any temporary files created from a previous call to get_real_file. file_path must be the path returned from a previous call to get_real_file. """ if file_path in self._tempfiles: os.unlink(file_path) self._tempfiles.remove(file_path) def cleanup_all_tmp_files(self): for f in self._tempfiles: try: self.cleanup_tmp_file(f) except: pass # TODO: this should at least warn