community.general/lib/ansible/module_utils/gcp_utils.py

155 lines
5.2 KiB
Python

# Copyright (c), Google Inc, 2017
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
try:
import google.auth
import google.auth.compute_engine
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession
HAS_GOOGLE_LIBRARIES = True
except ImportError:
HAS_GOOGLE_LIBRARIES = False
from ansible.module_utils.basic import AnsibleModule, env_fallback
import os
def navigate_hash(source, path, default=None):
key = path[0]
path = path[1:]
if key not in source:
return default
result = source[key]
if path:
return navigate_hash(result, path, default)
else:
return result
class GcpRequestException(Exception):
pass
# Handles all authentation and HTTP sessions for GCP API calls.
class GcpSession(object):
def __init__(self, module, product):
self.module = module
self.product = product
self._validate()
def get(self, url, body=None):
try:
return self.session().get(url, json=body, headers=self._headers())
except getattr(requests.exceptions, 'RequestException') as inst:
raise GcpRequestException(inst.message)
def post(self, url, body=None):
try:
return self.session().post(url, json=body, headers=self._headers())
except getattr(requests.exceptions, 'RequestException') as inst:
raise GcpRequestException(inst.message)
def delete(self, url, body=None):
try:
return self.session().delete(url, json=body, headers=self._headers())
except getattr(requests.exceptions, 'RequestException') as inst:
raise GcpRequestException(inst.message)
def put(self, url, body=None):
try:
return self.session().put(url, json=body, headers=self._headers())
except getattr(requests.exceptions, 'RequestException') as inst:
raise GcpRequestException(inst.message)
def session(self):
return AuthorizedSession(
self._credentials().with_scopes(self.module.params['scopes']))
def _validate(self):
if not HAS_REQUESTS:
self.module.fail_json(msg="Please install the requests library")
if not HAS_GOOGLE_LIBRARIES:
self.module.fail_json(msg="Please install the google-auth library")
if self.module.params['service_account_email'] is not None and self.module.params['auth_kind'] != 'machineaccount':
self.module.fail_json(
msg="Service Acccount Email only works with Machine Account-based authentication"
)
if self.module.params['service_account_file'] is not None and self.module.params['auth_kind'] != 'serviceaccount':
self.module.fail_json(
msg="Service Acccount File only works with Service Account-based authentication"
)
def _credentials(self):
cred_type = self.module.params['auth_kind']
if cred_type == 'application':
credentials, project_id = google.auth.default()
return credentials
elif cred_type == 'serviceaccount':
return service_account.Credentials.from_service_account_file(
self.module.params['service_account_file'])
elif cred_type == 'machineaccount':
return google.auth.compute_engine.Credentials(
self.module.params['service_account_email'])
else:
self.module.fail_json(msg="Credential type '%s' not implmented" % cred_type)
def _headers(self):
return {
'User-Agent': "Google-Ansible-MM-{0}".format(self.product)
}
class GcpModule(AnsibleModule):
def __init__(self, *args, **kwargs):
arg_spec = {}
if 'argument_spec' in kwargs:
arg_spec = kwargs['argument_spec']
kwargs['argument_spec'] = self._merge_dictionaries(
arg_spec,
dict(
project=dict(required=True, type='str'),
auth_kind=dict(
required=False,
fallback=(env_fallback, ['GCP_AUTH_KIND']),
choices=['machineaccount', 'serviceaccount', 'application'],
type='str'),
service_account_email=dict(
required=False,
fallback=(env_fallback, ['GCP_SERVICE_ACCOUNT_EMAIL']),
type='str'),
service_account_file=dict(
required=False,
fallback=(env_fallback, ['GCP_SERVICE_ACCOUNT_FILE']),
type='path'),
scopes=dict(
required=False,
fallback=(env_fallback, ['GCP_SCOPES']),
type='list')
)
)
mutual = []
if 'mutually_exclusive' in kwargs:
mutual = kwargs['mutually_exclusive']
kwargs['mutually_exclusive'] = mutual.append(
['service_account_email', 'service_account_file']
)
AnsibleModule.__init__(self, *args, **kwargs)
def _merge_dictionaries(self, a, b):
new = a.copy()
new.update(b)
return new