community.general/lib/ansible/utils/encrypt.py

161 lines
6.3 KiB
Python
Raw Normal View History

2015-01-09 15:37:31 +00:00
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
2015-04-13 20:28:01 +00:00
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
2015-01-09 15:37:31 +00:00
2015-10-02 04:35:22 +00:00
import os
import stat
import tempfile
import multiprocessing
2015-10-02 04:35:22 +00:00
import time
import warnings
2015-01-09 15:37:31 +00:00
PASSLIB_AVAILABLE = False
try:
import passlib.hash
PASSLIB_AVAILABLE = True
except:
pass
2015-10-19 18:29:51 +00:00
try:
from __main__ import display
except ImportError:
from ansible.utils.display import Display
display = Display()
2015-10-02 04:35:22 +00:00
KEYCZAR_AVAILABLE=False
try:
try:
# some versions of pycrypto may not have this?
from Crypto.pct_warnings import PowmInsecureWarning
except ImportError:
PowmInsecureWarning = RuntimeWarning
with warnings.catch_warnings(record=True) as warning_handler:
warnings.simplefilter("error", PowmInsecureWarning)
try:
import keyczar.errors as key_errors
from keyczar.keys import AesKey
except PowmInsecureWarning:
2015-10-19 18:29:51 +00:00
display.system_warning(
2015-10-02 04:35:22 +00:00
"The version of gmp you have installed has a known issue regarding " + \
"timing vulnerabilities when used with pycrypto. " + \
"If possible, you should update it (i.e. yum update gmp)."
)
warnings.resetwarnings()
warnings.simplefilter("ignore")
import keyczar.errors as key_errors
from keyczar.keys import AesKey
KEYCZAR_AVAILABLE=True
except ImportError:
pass
from ansible import constants as C
2015-01-09 15:37:31 +00:00
from ansible.errors import AnsibleError
from ansible.module_utils._text import to_text, to_bytes
2015-01-09 15:37:31 +00:00
__all__ = ['do_encrypt']
_LOCK = multiprocessing.Lock()
2015-01-09 15:37:31 +00:00
def do_encrypt(result, encrypt, salt_size=None, salt=None):
if PASSLIB_AVAILABLE:
try:
crypt = getattr(passlib.hash, encrypt)
except:
raise AnsibleError("passlib does not support '%s' algorithm" % encrypt)
if salt_size:
result = crypt.encrypt(result, salt_size=salt_size)
elif salt:
if crypt._salt_is_bytes:
salt = to_bytes(salt, encoding='ascii', errors='strict')
else:
salt = to_text(salt, encoding='ascii', errors='strict')
2015-01-09 15:37:31 +00:00
result = crypt.encrypt(result, salt=salt)
else:
result = crypt.encrypt(result)
else:
raise AnsibleError("passlib must be installed to encrypt vars_prompt values")
Fix password lookup py3 plus alikins unittest additions refactoring (#17626) * Improve unit testing of 'password' lookup The tests showed some UnicodeErrors for the cases where the 'chars' param include unicode, causing the 'getattr(string, c, c)' to fail. So the candidate char generation code try/excepts UnicodeErrors there now. Some refactoring of the password.py module to make it easier to test, and some new tests that cover more of the password and salt generation. * More refactoring and fixes. * manual merge of text enc fixes from pr17475 * moving methods to module scope * more refactoring * A few more text encoding fixes/merges * remove now unused code * Add test cases and data for _gen_candidate_chars * more test coverage for password lookup * wip * More text encoding fixes and test coverage * cleanups * reenable text_type assert * Remove unneeded conditional in _random_password * Add docstring for _gen_candidate_chars * remove redundant to_text and list comphenesion * Move set of 'chars' default in _random_password on py2, C.DEFAULT_PASSWORD_CHARS is a regular str type, so the assert here fails. Move setting the default into the method and to_text(DEFAULT_PASSWORD_CHARS) if it's needed. * combine _random_password and _gen_password * s/_create_password_file/_create_password_file_dir * native strings for exception msgs * move password to_text to _read_password_file * move to_bytes(content) to _write_password_file * add more test assertions about genned pw's * Some cleanups to alikins and abadger's password lookup refactoring: * Make DEFAULT_PASSWORD_CHARS into a text string in constants.py - Move this into the nonconfigurable section of constants. * Make utils.encrypt.do_encrypt() return a text string because all the hashes in passlib should be returning ascii-only strings and they are text strings in python3. * Make the split up of functions more sane: - Don't split such that conditionals have to occur in two separate functions. - Don't go overboard: Good to split file system manipulation from parsing but we don't need to do every file manipulation in a separate function. - Don't split so that creation of the password store happens in two parts. - Don't split in such a way that no decisions are made in run. * Organize functions by when it gets called from run(). * Run all potential characters through the gen_candidate_chars function because it does both normalization and validation. * docstrings for functions * Change when we store salt slightly. Store it whenever it was already present in the file as well as when encrypt is requested. This will head of potential idempotence bugs where a user has two playbook tasks using the same password and in one they need it encrypted but in the other they need it plaintext. * Reorganize tests to follow the order of the functions so it's easier to figure out if/where a function has been tested. * Add tests for the functions that read and write the password file. * Add tests of run() when the password has already been created. * Test coverage currently at 100%
2016-09-19 18:37:57 +00:00
# Hashes from passlib.hash should be represented as ascii strings of hex
# digits so this should not traceback. If it's not representable as such
# we need to traceback and then blacklist such algorithms because it may
# impact calling code.
return to_text(result, errors='strict')
2015-01-09 15:37:31 +00:00
2015-10-02 04:35:22 +00:00
def key_for_hostname(hostname):
# fireball mode is an implementation of ansible firing up zeromq via SSH
# to use no persistent daemons or key management
if not KEYCZAR_AVAILABLE:
raise AnsibleError("python-keyczar must be installed on the control machine to use accelerated modes")
key_path = os.path.expanduser(C.ACCELERATE_KEYS_DIR)
if not os.path.exists(key_path):
# avoid race with multiple forks trying to create paths on host
# but limit when locking is needed to creation only
with(_LOCK):
if not os.path.exists(key_path):
# use a temp directory and rename to ensure the directory
# searched for only appears after permissions applied.
tmp_dir = tempfile.mkdtemp(dir=os.path.dirname(key_path))
os.chmod(tmp_dir, int(C.ACCELERATE_KEYS_DIR_PERMS, 8))
os.rename(tmp_dir, key_path)
2015-10-02 04:35:22 +00:00
elif not os.path.isdir(key_path):
raise AnsibleError('ACCELERATE_KEYS_DIR is not a directory.')
if stat.S_IMODE(os.stat(key_path).st_mode) != int(C.ACCELERATE_KEYS_DIR_PERMS, 8):
raise AnsibleError('Incorrect permissions on the private key directory. Use `chmod 0%o %s` to correct this issue, and make sure any of the keys files contained within that directory are set to 0%o' % (int(C.ACCELERATE_KEYS_DIR_PERMS, 8), C.ACCELERATE_KEYS_DIR, int(C.ACCELERATE_KEYS_FILE_PERMS, 8)))
key_path = os.path.join(key_path, hostname)
# use new AES keys every 2 hours, which means fireball must not allow running for longer either
if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60*60*2):
# avoid race with multiple forks trying to create key
# but limit when locking is needed to creation only
with(_LOCK):
if not os.path.exists(key_path) or (time.time() - os.path.getmtime(key_path) > 60*60*2):
key = AesKey.Generate()
# use temp file to ensure file only appears once it has
# desired contents and permissions
with tempfile.NamedTemporaryFile(mode='w', dir=os.path.dirname(key_path), delete=False) as fh:
tmp_key_path = fh.name
fh.write(str(key))
os.chmod(tmp_key_path, int(C.ACCELERATE_KEYS_FILE_PERMS, 8))
os.rename(tmp_key_path, key_path)
return key
if stat.S_IMODE(os.stat(key_path).st_mode) != int(C.ACCELERATE_KEYS_FILE_PERMS, 8):
raise AnsibleError('Incorrect permissions on the key file for this host. Use `chmod 0%o %s` to correct this issue.' % (int(C.ACCELERATE_KEYS_FILE_PERMS, 8), key_path))
fh = open(key_path)
key = AesKey.Read(fh.read())
fh.close()
return key
2015-10-02 04:35:22 +00:00
def keyczar_encrypt(key, msg):
return key.Encrypt(msg.encode('utf-8'))
def keyczar_decrypt(key, msg):
try:
return key.Decrypt(msg)
except key_errors.InvalidSignatureError:
raise AnsibleError("decryption failed")