Delete onepassword_ssh_key.py
parent
786d212b28
commit
e17ff7e232
|
@ -1,148 +0,0 @@
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
# Copyright (c) 2023, Ansible Project
|
|
||||||
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
||||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
||||||
|
|
||||||
from __future__ import absolute_import, division, print_function
|
|
||||||
|
|
||||||
__metaclass__ = type
|
|
||||||
|
|
||||||
DOCUMENTATION = """
|
|
||||||
name: onepassword_ssh_key
|
|
||||||
author:
|
|
||||||
- Mohammed Babelly (@mohammedbabelly20)
|
|
||||||
requirements:
|
|
||||||
- C(op) 1Password command line utility version 2 or later.
|
|
||||||
short_description: Fetch SSH Keys stored in 1Password
|
|
||||||
version_added: "10.3.0"
|
|
||||||
description:
|
|
||||||
- P(community.general.onepassword_ssh_key#lookup) wraps C(op) command line utility to fetch ssh keys from 1Password.
|
|
||||||
notes:
|
|
||||||
- By default, it returns the private key value in PKCS#8 format, unless O(ssh_format=true) is passed.
|
|
||||||
- The pluging works only for C(SSHKEY) type items.
|
|
||||||
- This plugin requires C(op) version 2 or later.
|
|
||||||
|
|
||||||
options:
|
|
||||||
_terms:
|
|
||||||
description: Identifier(s) (case-insensitive UUID or name) of item(s) to retrieve.
|
|
||||||
required: true
|
|
||||||
type: list
|
|
||||||
elements: string
|
|
||||||
ssh_format:
|
|
||||||
description: Output key in SSH format if true. Otherwise, outputs in the default format.
|
|
||||||
required: false
|
|
||||||
default: false
|
|
||||||
type: bool
|
|
||||||
|
|
||||||
extends_documentation_fragment:
|
|
||||||
- community.general.onepassword
|
|
||||||
- community.general.onepassword.lookup
|
|
||||||
"""
|
|
||||||
|
|
||||||
EXAMPLES = """
|
|
||||||
- name: Retrieve the private key of ssh key from 1Password
|
|
||||||
ansible.builtin.debug:
|
|
||||||
var: lookup('community.general.onepassword_ssh_key', 'SSH Key', ssh_format=true)
|
|
||||||
"""
|
|
||||||
|
|
||||||
RETURN = """
|
|
||||||
_raw:
|
|
||||||
description: Private key of SSH key
|
|
||||||
type: list
|
|
||||||
elements: string
|
|
||||||
"""
|
|
||||||
import json
|
|
||||||
|
|
||||||
from ansible_collections.community.general.plugins.lookup.onepassword import (
|
|
||||||
OnePass,
|
|
||||||
OnePassCLIv2,
|
|
||||||
)
|
|
||||||
from ansible.errors import AnsibleLookupError
|
|
||||||
from ansible.module_utils.common.text.converters import to_bytes
|
|
||||||
from ansible.plugins.lookup import LookupBase
|
|
||||||
|
|
||||||
|
|
||||||
class OnePassCLIv2SSHKey(OnePassCLIv2):
|
|
||||||
|
|
||||||
def _get_raw(self, item_id, vault=None, token=None):
|
|
||||||
args = ["item", "get", item_id, "--format", "json"]
|
|
||||||
if vault is not None:
|
|
||||||
args = [*args, f"--vault={vault}"]
|
|
||||||
|
|
||||||
if self.service_account_token:
|
|
||||||
if vault is None:
|
|
||||||
raise AnsibleLookupError(
|
|
||||||
"'vault' is required with 'service_account_token'"
|
|
||||||
)
|
|
||||||
|
|
||||||
environment_update = {
|
|
||||||
"OP_SERVICE_ACCOUNT_TOKEN": self.service_account_token
|
|
||||||
}
|
|
||||||
return self._run(args, environment_update=environment_update)
|
|
||||||
|
|
||||||
if token is not None:
|
|
||||||
args = [*args, to_bytes("--session=") + token]
|
|
||||||
|
|
||||||
return self._run(args)
|
|
||||||
|
|
||||||
def get_ssh_key(self, item_id, vault=None, token=None, ssh_format=False):
|
|
||||||
rc, out, err = self._get_raw(item_id, vault, token)
|
|
||||||
|
|
||||||
data = json.loads(out)
|
|
||||||
|
|
||||||
if data.get("category") != "SSH_KEY":
|
|
||||||
raise AnsibleLookupError(f"Item {item_id} is not SSH Key")
|
|
||||||
|
|
||||||
private_key_field = next(
|
|
||||||
(
|
|
||||||
field
|
|
||||||
for field in data.get("fields", {})
|
|
||||||
if field.get("id") == "private_key" and field.get("type") == "SSHKEY"
|
|
||||||
),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
if not private_key_field:
|
|
||||||
raise AnsibleLookupError(f"No private key found for item {item_id}.")
|
|
||||||
|
|
||||||
if ssh_format:
|
|
||||||
return (
|
|
||||||
private_key_field.get("ssh_formats", {})
|
|
||||||
.get("openssh", {})
|
|
||||||
.get("value", "")
|
|
||||||
)
|
|
||||||
return private_key_field.get("value", "")
|
|
||||||
|
|
||||||
|
|
||||||
class LookupModule(LookupBase):
|
|
||||||
def run(self, terms, variables=None, **kwargs):
|
|
||||||
self.set_options(var_options=variables, direct=kwargs)
|
|
||||||
|
|
||||||
ssh_format = kwargs.get("ssh_format")
|
|
||||||
vault = self.get_option("vault")
|
|
||||||
subdomain = self.get_option("subdomain")
|
|
||||||
domain = self.get_option("domain", "1password.com")
|
|
||||||
username = self.get_option("username")
|
|
||||||
secret_key = self.get_option("secret_key")
|
|
||||||
master_password = self.get_option("master_password")
|
|
||||||
service_account_token = self.get_option("service_account_token")
|
|
||||||
account_id = self.get_option("account_id")
|
|
||||||
connect_host = self.get_option("connect_host")
|
|
||||||
connect_token = self.get_option("connect_token")
|
|
||||||
|
|
||||||
op = OnePass(
|
|
||||||
subdomain=subdomain,
|
|
||||||
domain=domain,
|
|
||||||
username=username,
|
|
||||||
secret_key=secret_key,
|
|
||||||
master_password=master_password,
|
|
||||||
service_account_token=service_account_token,
|
|
||||||
account_id=account_id,
|
|
||||||
connect_host=connect_host,
|
|
||||||
connect_token=connect_token,
|
|
||||||
cli_class=OnePassCLIv2SSHKey,
|
|
||||||
)
|
|
||||||
op.assert_logged_in()
|
|
||||||
|
|
||||||
return [
|
|
||||||
op._cli.get_ssh_key(term, vault, ssh_format=ssh_format) for term in terms
|
|
||||||
]
|
|
Loading…
Reference in New Issue