From 0bd816c689d3478c20fdf8b65c4fb9a77cde98dd Mon Sep 17 00:00:00 2001 From: Christopher O'Brien Date: Tue, 9 Jul 2024 11:49:50 -0400 Subject: [PATCH] Make getGroupMembers anonymous and add getUserGroups --- kojihub/kojihub.py | 20 +++++++++- tests/test_hub/test_get_user_groups.py | 52 ++++++++++++++++++++++++++ tests/test_hub/test_user_groups.py | 8 ++-- 3 files changed, 75 insertions(+), 5 deletions(-) create mode 100644 tests/test_hub/test_get_user_groups.py diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index aea8d158..d9e2b374 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -9552,7 +9552,7 @@ def drop_group_member(group, user): def get_group_members(group): """Get the members of a group""" - context.session.assertPerm('admin') + ginfo = get_user(group) if not ginfo or ginfo['usertype'] != koji.USERTYPES['GROUP']: raise koji.GenericError("No such group: %s" % group) @@ -13409,6 +13409,24 @@ class RootExports(object): dropGroupMember = staticmethod(drop_group_member) getGroupMembers = staticmethod(get_group_members) + def getUserGroups(self, user): + """ + The groups associated with the given user + + :param user: a str (Kerberos principal or name) or an int (user id) + or a dict: + - id: User's ID + - name: User's name + - krb_principal: Kerberos principal + + :returns: a dict mapping member's group IDs to group names + + :raises: GenericError if the specified user is not found + """ + + uinfo = get_user(user, strict=True) + return get_user_groups(uinfo["id"]) + def listUsers(self, userType=koji.USERTYPES['NORMAL'], prefix=None, queryOpts=None, perm=None, inherited_perm=False): """List users in the system diff --git a/tests/test_hub/test_get_user_groups.py b/tests/test_hub/test_get_user_groups.py new file mode 100644 index 00000000..32675616 --- /dev/null +++ b/tests/test_hub/test_get_user_groups.py @@ -0,0 +1,52 @@ +import mock + +import koji +import kojihub +from .utils import DBQueryTestCase + + +class TestGetUserGroups(DBQueryTestCase): + + def setUp(self): + super(TestGetUserGroups, self).setUp() + self.exports = kojihub.RootExports() + + self.context = mock.patch('kojihub.kojihub.context').start() + self.context_db = mock.patch('kojihub.db.context').start() + + def tearDown(self): + mock.patch.stopall() + + def test_no_such_user(self): + user = 'test-user' + self.qp_execute_return_value = [] + with self.assertRaises(koji.GenericError) as cm: + self.exports.getUserGroups(user) + self.assertEqual(f"No such user: {user!r}", str(cm.exception)) + self.assertEqual(len(self.queries), 1) + + def test_valid(self): + get_user = mock.patch('kojihub.kojihub.get_user').start() + get_user.return_value = {'id': 23, 'usertype': 0} + + user = 'test-user' + self.qp_execute_return_value = [{'group_id': 123, 'name': 'grp_123'}, + {'group_id': 456, 'name': 'grp_456'}] + + grps = self.exports.getUserGroups(user) + + get_user.assert_called_once_with(user, strict=True) + + self.assertEqual(len(self.queries), 1) + query = self.queries[0] + self.assertEqual(query.tables, ['user_groups']) + self.assertEqual(query.joins, ['users ON group_id = users.id']) + self.assertEqual(query.clauses, ['active IS TRUE', + 'user_id=%(user_id)i', + 'users.usertype=%(t_group)i']) + self.assertEqual(query.values, {'t_group': 2, + 'user_id': 23}) + self.assertEqual(query.columns, ['group_id', 'name']) + + self.assertEqual(grps, {123: 'grp_123', + 456: 'grp_456'}) diff --git a/tests/test_hub/test_user_groups.py b/tests/test_hub/test_user_groups.py index d007444c..746afdcc 100644 --- a/tests/test_hub/test_user_groups.py +++ b/tests/test_hub/test_user_groups.py @@ -262,11 +262,11 @@ class TestGrouplist(unittest.TestCase): def test_get_group_members(self): group, gid = 'test_group', 1 - # no permission - self.context.session.assertPerm.side_effect = koji.ActionNotAllowed - with self.assertRaises(koji.ActionNotAllowed): + # no permission needed, verify that it's not being checked + self.context.session.assertPerm.side_effect = Exception + with self.assertRaises(koji.GenericError): kojihub.get_group_members(group) - self.context.session.assertPerm.assert_called_with('admin') + self.context.session.assertPerm.assert_not_called() self.assertEqual(len(self.inserts), 0) self.assertEqual(len(self.updates), 0)