bitwarden lookup: add options to filter by collection_name and validate number of results (#9728)

* feat(lookups/bitwarden): add collection_name filter

* feat(lookups/bitwarden): add result_count check

* docs(lookups/bitwarden): add changelog fragment

* Update changelogs/fragments/9728-bitwarden-collection-name-filter.yml

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/lookup/bitwarden.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/lookup/bitwarden.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/lookup/bitwarden.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* Update plugins/lookup/bitwarden.py

Co-authored-by: Felix Fontein <felix@fontein.de>

* fix(lookups/bitwarden): fix result_count check for multiple terms

* fix(lookups/bitwarden): Enforce mutual exclusion of 'collection_name' and 'collection_id'

* formatting(lookups/bitwarden): remove trailing whitespace

* Update plugins/lookup/bitwarden.py

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>

* Update plugins/lookup/bitwarden.py

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>

* Update plugins/lookup/bitwarden.py

Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>

* formatting(lookups/bitwarden): remove trailing whitespace

---------

Co-authored-by: Felix Fontein <felix@fontein.de>
Co-authored-by: Alexei Znamensky <103110+russoz@users.noreply.github.com>
pull/9750/head
Jonas 2025-02-16 12:11:04 +01:00 committed by GitHub
parent ba25229482
commit 410999dffa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 140 additions and 5 deletions

View File

@ -0,0 +1,2 @@
minor_changes:
- bitwarden lookup plugin - add new option ``collection_name`` to filter results by collection name, and new option ``result_count`` to validate number of results (https://github.com/ansible-collections/community.general/pull/9728).

View File

@ -37,9 +37,17 @@ DOCUMENTATION = """
description: Field to fetch. Leave unset to fetch whole response.
type: str
collection_id:
description: Collection ID to filter results by collection. Leave unset to skip filtering.
description:
- Collection ID to filter results by collection. Leave unset to skip filtering.
- O(collection_id) and O(collection_name) are mutually exclusive.
type: str
version_added: 6.3.0
collection_name:
description:
- Collection name to filter results by collection. Leave unset to skip filtering.
- O(collection_id) and O(collection_name) are mutually exclusive.
type: str
version_added: 10.4.0
organization_id:
description: Organization ID to filter results by organization. Leave unset to skip filtering.
type: str
@ -48,6 +56,12 @@ DOCUMENTATION = """
description: Pass session key instead of reading from env.
type: str
version_added: 8.4.0
result_count:
description:
- Number of results expected for the lookup query. Task will fail if O(result_count)
is set but does not match the number of query results. Leave empty to skip this check.
type: int
version_added: 10.4.0
"""
EXAMPLES = """
@ -85,6 +99,16 @@ EXAMPLES = """
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', None, collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }}
- name: "Get all Bitwarden records from collection"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', None, collection_name='my_collections/test_collection') }}
- name: "Get Bitwarden record named 'a_test', ensure there is exactly one match"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', result_count=1) }}
"""
RETURN = """
@ -99,7 +123,7 @@ RETURN = """
from subprocess import Popen, PIPE
from ansible.errors import AnsibleError
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.parsing.ajson import AnsibleJSONDecoder
from ansible.plugins.lookup import LookupBase
@ -211,6 +235,24 @@ class Bitwarden(object):
return field_matches
def get_collection_ids(self, collection_name: str, organization_id=None) -> list[str]:
"""Return matching IDs of collections whose name is equal to collection_name."""
# Prepare set of params for Bitwarden CLI
params = ['list', 'collections', '--search', collection_name]
if organization_id:
params.extend(['--organizationid', organization_id])
out, err = self._run(params)
# This includes things that matched in different fields.
initial_matches = AnsibleJSONDecoder().raw_decode(out)[0]
# Filter to only return the ID of a collections with exactly matching name
return [item['id'] for item in initial_matches
if str(item.get('name')).lower() == collection_name.lower()]
class LookupModule(LookupBase):
@ -219,7 +261,9 @@ class LookupModule(LookupBase):
field = self.get_option('field')
search_field = self.get_option('search')
collection_id = self.get_option('collection_id')
collection_name = self.get_option('collection_name')
organization_id = self.get_option('organization_id')
result_count = self.get_option('result_count')
_bitwarden.session = self.get_option('bw_session')
if not _bitwarden.unlocked:
@ -228,7 +272,27 @@ class LookupModule(LookupBase):
if not terms:
terms = [None]
return [_bitwarden.get_field(field, term, search_field, collection_id, organization_id) for term in terms]
if collection_name and collection_id:
raise AnsibleOptionsError("'collection_name' and 'collection_id' are mutually exclusive!")
elif collection_name:
collection_ids = _bitwarden.get_collection_ids(collection_name, organization_id)
if not collection_ids:
raise BitwardenException("No matching collections found!")
else:
collection_ids = [collection_id]
results = [
_bitwarden.get_field(field, term, search_field, collection_id, organization_id)
for collection_id in collection_ids
for term in terms
]
for result in results:
if result_count is not None and len(result) != result_count:
raise BitwardenException(
f"Number of results doesn't match result_count! ({len(result)} != {result_count})")
return results
_bitwarden = Bitwarden()

View File

@ -13,7 +13,7 @@ from ansible_collections.community.general.tests.unit.compat.mock import patch
from ansible.errors import AnsibleError
from ansible.module_utils import six
from ansible.plugins.loader import lookup_loader
from ansible_collections.community.general.plugins.lookup.bitwarden import Bitwarden
from ansible_collections.community.general.plugins.lookup.bitwarden import Bitwarden, BitwardenException
from ansible.parsing.ajson import AnsibleJSONEncoder
MOCK_COLLECTION_ID = "3b12a9da-7c49-40b8-ad33-aede017a7ead"
@ -131,7 +131,21 @@ MOCK_RECORDS = [
"reprompt": 0,
"revisionDate": "2024-14-15T11:30:00.000Z",
"type": 1
}
},
{
"object": "collection",
"id": MOCK_COLLECTION_ID,
"organizationId": MOCK_ORGANIZATION_ID,
"name": "MOCK_COLLECTION",
"externalId": None
},
{
"object": "collection",
"id": "3b12a9da-7c49-40b8-ad33-aede017a8ead",
"organizationId": "3b12a9da-7c49-40b8-ad33-aede017a9ead",
"name": "some/other/collection",
"externalId": None
},
]
@ -164,6 +178,9 @@ class MockBitwarden(Bitwarden):
items = []
for item in MOCK_RECORDS:
if item.get('object') != 'item':
continue
if search_value and not re.search(search_value, item.get('name')):
continue
if collection_to_filter and collection_to_filter not in item.get('collectionIds', []):
@ -172,6 +189,35 @@ class MockBitwarden(Bitwarden):
continue
items.append(item)
return AnsibleJSONEncoder().encode(items), ''
elif args[1] == 'collections':
try:
search_value = args[args.index('--search') + 1]
except ValueError:
search_value = None
try:
collection_to_filter = args[args.index('--collectionid') + 1]
except ValueError:
collection_to_filter = None
try:
organization_to_filter = args[args.index('--organizationid') + 1]
except ValueError:
organization_to_filter = None
collections = []
for item in MOCK_RECORDS:
if item.get('object') != 'collection':
continue
if search_value and not re.search(search_value, item.get('name')):
continue
if collection_to_filter and collection_to_filter not in item.get('collectionIds', []):
continue
if organization_to_filter and item.get('organizationId') != organization_to_filter:
continue
collections.append(item)
return AnsibleJSONEncoder().encode(collections), ''
return '[]', ''
@ -261,3 +307,26 @@ class TestLookupModule(unittest.TestCase):
def test_bitwarden_plugin_full_collection_organization(self):
self.assertEqual([MOCK_RECORDS[0], MOCK_RECORDS[2]], self.lookup.run(None,
collection_id=MOCK_COLLECTION_ID, organization_id=MOCK_ORGANIZATION_ID)[0])
@patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden())
def test_bitwarden_plugin_collection_name_filter(self):
# all passwords from MOCK_COLLECTION
self.assertEqual([MOCK_RECORDS[0], MOCK_RECORDS[2]], self.lookup.run(None,
collection_name="MOCK_COLLECTION")[0])
# Existing collection, no results
self.assertEqual([], self.lookup.run(None, collection_name="some/other/collection")[0])
# Non-Existent collection
with self.assertRaises(BitwardenException):
self.lookup.run(None, collection_name="nonexistent")
@patch('ansible_collections.community.general.plugins.lookup.bitwarden._bitwarden', new=MockBitwarden())
def test_bitwarden_plugin_result_count_check(self):
self.lookup.run(None, collection_id=MOCK_COLLECTION_ID, organization_id=MOCK_ORGANIZATION_ID, result_count=2)
with self.assertRaises(BitwardenException):
self.lookup.run(None, collection_id=MOCK_COLLECTION_ID, organization_id=MOCK_ORGANIZATION_ID,
result_count=1)
self.lookup.run(None, organization_id=MOCK_ORGANIZATION_ID, result_count=3)
with self.assertRaises(BitwardenException):
self.lookup.run(None, organization_id=MOCK_ORGANIZATION_ID, result_count=0)