From c62e3467b566329b1344d5682070470662707e8d Mon Sep 17 00:00:00 2001 From: Jana Cupova Date: Tue, 9 Jan 2024 10:56:04 +0100 Subject: [PATCH] Add CLI with users with given permission Fixes: https://pagure.io/koji/issue/3950 --- cli/koji_cli/commands.py | 20 +++++- kojihub/kojihub.py | 57 +++++++++++++---- tests/test_cli/test_list_users.py | 102 ++++++++++++++++++++++++++++-- tests/test_hub/test_list_users.py | 91 ++++++++++++++++++++++++++ 4 files changed, 254 insertions(+), 16 deletions(-) create mode 100644 tests/test_hub/test_list_users.py diff --git a/cli/koji_cli/commands.py b/cli/koji_cli/commands.py index e44b1363..a7db87df 100644 --- a/cli/koji_cli/commands.py +++ b/cli/koji_cli/commands.py @@ -8129,10 +8129,17 @@ def anon_handle_list_users(goptions, session, args): parser.add_option("--usertype", help="List users that have a given usertype " "(e.g. NORMAL, HOST, GROUP)") parser.add_option("--prefix", help="List users that have a given prefix") + parser.add_option("--perm", help="List users that have a given permission") + parser.add_option("--inherited-perm", action='store_true', default=False, + help="List of users that inherited specific perm") (options, args) = parser.parse_args(args) if len(args) > 0: parser.error("This command takes no arguments") + + if options.inherited_perm and not options.perm: + parser.error("inherited_perm option must be used with perm option") + activate_session(session, goptions) if options.usertype: @@ -8140,6 +8147,8 @@ def anon_handle_list_users(goptions, session, args): usertype = koji.USERTYPES[options.usertype.upper()] else: error("Usertype %s doesn't exist" % options.usertype) + elif options.perm: + usertype = None else: usertype = koji.USERTYPES['NORMAL'] @@ -8148,6 +8157,15 @@ def anon_handle_list_users(goptions, session, args): else: prefix = None - users_list = session.listUsers(userType=usertype, prefix=prefix) + if options.perm: + if options.perm in [p['name'] for p in session.getAllPerms()]: + perm = options.perm + else: + error("Permission %s does not exists" % options.perm) + else: + perm = None + + users_list = session.listUsers(userType=usertype, prefix=prefix, perm=perm, + inherited_perm=options.inherited_perm) for user in users_list: print(user['name']) diff --git a/kojihub/kojihub.py b/kojihub/kojihub.py index f068169c..1b53ea4c 100644 --- a/kojihub/kojihub.py +++ b/kojihub/kojihub.py @@ -13117,31 +13117,66 @@ class RootExports(object): dropGroupMember = staticmethod(drop_group_member) getGroupMembers = staticmethod(get_group_members) - def listUsers(self, userType=koji.USERTYPES['NORMAL'], prefix=None, queryOpts=None): + def listUsers(self, userType=koji.USERTYPES['NORMAL'], prefix=None, queryOpts=None, perm=None, + inherited_perm=False): """List all users in the system. - userType can be an integer value from koji.USERTYPES (defaults to 0, - i.e. normal users). Returns a list of maps with the following keys: + userType can be an integer value from koji.USERTYPES (defaults to 0, i.e. normal users). + Returns a list of maps with the following keys: - id - name - status - usertype - krb_principals + - permissions - If no users of the specified - type exist, return an empty list.""" - fields = ('id', 'name', 'status', 'usertype', 'array_agg(krb_principal)') - aliases = ('id', 'name', 'status', 'usertype', 'krb_principals') - joins = ['LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'] - clauses = ['usertype = %(userType)i'] + If no users of the specified type exist, return an empty list.""" + if inherited_perm and not perm: + raise koji.GenericError('inherited_perm option must be used with perm option') + joins = [] + if userType is None: + userType = list(koji.USERTYPES.values()) + elif isinstance(userType, int): + userType = [userType] + else: + raise koji.ParameterError("userType must be integer or None") + clauses = ['usertype IN %(userType)s'] + fields = [ + ('users.id', 'id'), + ('users.name', 'name'), + ('status', 'status'), + ('usertype', 'usertype'), + ('array_agg(krb_principal)', 'krb_principals'), + ] + if perm: + fields.extend([ + ('permissions.name', 'permission_name'), + ('permissions.id', 'permission_id'), + ]) + clauses.extend(['user_perms.active AND permissions.name = %(perm)s']) + if inherited_perm: + joins.extend(['LEFT JOIN user_groups ON user_id = users.id AND ' + 'user_groups.active IS TRUE', + 'LEFT JOIN user_perms ON users.id = user_perms.user_id AND ' + 'user_perms.active IS TRUE OR group_id =user_perms.user_id', + 'LEFT JOIN permissions ON perm_id = permissions.id']) + else: + joins.extend(['LEFT JOIN user_perms ON users.id = user_perms.user_id AND ' + 'user_perms.active IS TRUE', + 'LEFT JOIN permissions ON perm_id = permissions.id']) + joins.append('LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id') if prefix: - clauses.append("name ilike %(prefix)s || '%%'") + clauses.append("users.name ilike %(prefix)s || '%%'") if queryOpts is None: queryOpts = {} if not queryOpts.get('group'): - queryOpts['group'] = 'users.id' + if perm: + queryOpts['group'] = 'users.id,permissions.id' + else: + queryOpts['group'] = 'users.id' else: raise koji.GenericError('queryOpts.group is not available for this API') + fields, aliases = zip(*fields) query = QueryProcessor(columns=fields, aliases=aliases, tables=['users'], joins=joins, clauses=clauses, values=locals(), opts=queryOpts, diff --git a/tests/test_cli/test_list_users.py b/tests/test_cli/test_list_users.py index 174b87a3..aed416ac 100644 --- a/tests/test_cli/test_list_users.py +++ b/tests/test_cli/test_list_users.py @@ -47,7 +47,8 @@ testuser self.assertMultiLineEqual(actual, expected) self.assertEqual(rv, None) self.session.listUsers.assert_called_once_with( - userType=koji.USERTYPES['NORMAL'], prefix=None) + inherited_perm=False, perm=None, userType=koji.USERTYPES['NORMAL'], prefix=None) + self.session.getAllPerms.assert_not_called() @mock.patch('sys.stdout', new_callable=StringIO) def test_list_users_with_prefix(self, stdout): @@ -65,7 +66,8 @@ testuser self.assertMultiLineEqual(actual, expected) self.assertEqual(rv, None) self.session.listUsers.assert_called_once_with( - userType=koji.USERTYPES['NORMAL'], prefix='koji') + inherited_perm=False, perm=None, userType=koji.USERTYPES['NORMAL'], prefix='koji') + self.session.getAllPerms.assert_not_called() @mock.patch('sys.stdout', new_callable=StringIO) def test_list_users_with_usertype(self, stdout): @@ -88,7 +90,8 @@ testhost self.assertMultiLineEqual(actual, expected) self.assertEqual(rv, None) self.session.listUsers.assert_called_once_with( - userType=koji.USERTYPES['HOST'], prefix=None) + inherited_perm=False, perm=None, userType=koji.USERTYPES['HOST'], prefix=None) + self.session.getAllPerms.assert_not_called() def test_list_users_with_usertype_non_existing(self): arguments = ['--usertype', 'test'] @@ -100,6 +103,7 @@ testhost activate_session=None, exit_code=1) self.session.listUsers.assert_not_called() + self.session.getAllPerms.assert_not_called() @mock.patch('sys.stdout', new_callable=StringIO) def test_list_users_with_usertype_and_prefix(self, stdout): @@ -117,7 +121,95 @@ testhost self.assertMultiLineEqual(actual, expected) self.assertEqual(rv, None) self.session.listUsers.assert_called_once_with( - userType=koji.USERTYPES['HOST'], prefix='test') + inherited_perm=False, perm=None, userType=koji.USERTYPES['HOST'], prefix='test') + self.session.getAllPerms.assert_not_called() + + def test_list_users_with_arg(self): + arguments = ['test-arg'] + self.assert_system_exit( + anon_handle_list_users, + self.options, self.session, arguments, + stderr=self.format_error_message("This command takes no arguments"), + stdout='', + activate_session=None, + exit_code=2) + self.activate_session.assert_not_called() + self.session.listUsers.assert_not_called() + self.session.getAllPerms.assert_not_called() + + def test_list_users_not_exists_perm(self): + arguments = ['--perm', 'test-non-exist-perm'] + self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}] + self.assert_system_exit( + anon_handle_list_users, + self.options, self.session, arguments, + stderr="Permission test-non-exist-perm does not exists\n", + stdout='', + activate_session=None, + exit_code=1) + self.activate_session.assert_called_once_with(self.session, self.options) + self.session.listUsers.assert_not_called() + self.session.getAllPerms.assert_called_once_with() + + @mock.patch('sys.stdout', new_callable=StringIO) + def test_list_users_with_empty_result_of_users(self, stdout): + perm = 'test-perm' + arguments = ['--perm', perm] + self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}] + self.session.listUsers.return_value = [] + rv = anon_handle_list_users(self.options, self.session, arguments) + actual = stdout.getvalue() + expected = """""" + self.assertMultiLineEqual(actual, expected) + self.assertEqual(rv, None) + self.activate_session.assert_called_once_with(self.session, self.options) + self.session.listUsers.assert_called_once_with( + inherited_perm=False, perm=perm, prefix=None, userType=None) + self.session.getAllPerms.assert_called_once_with() + + @mock.patch('sys.stdout', new_callable=StringIO) + def test_list_users_with_perms_valid_and_inherited_perm(self, stdout): + perm = 'test-perm' + arguments = ['--perm', perm, '--inherited-perm'] + self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}] + self.session.listUsers.return_value = [{ + 'id': 1, 'krb_principals': [], + 'name': 'kojiadmin', + 'permission': perm, + 'status': 0, + 'usertype': 0}, + {'id': 4, + 'krb_principals': [], + 'name': 'testuser1234', + 'permission': perm, + 'status': 0, + 'usertype': 2}, + ] + rv = anon_handle_list_users(self.options, self.session, arguments) + actual = stdout.getvalue() + expected = """kojiadmin +testuser1234 +""" + self.assertMultiLineEqual(actual, expected) + self.assertEqual(rv, None) + self.session.listUsers.assert_called_once_with( + inherited_perm=True, perm=perm, prefix=None, userType=None) + self.session.getAllPerms.assert_called_once_with() + self.activate_session.assert_called_once_with(self.session, self.options) + + def test_list_users_inherited_perm_without_perm(self): + arguments = ['--inherited-perm'] + self.assert_system_exit( + anon_handle_list_users, + self.options, self.session, arguments, + stderr=self.format_error_message( + "inherited_perm option must be used with perm option"), + stdout='', + activate_session=None, + exit_code=2) + self.activate_session.assert_not_called() + self.session.listUsers.assert_not_called() + self.session.getAllPerms.assert_not_called() def test_anon_handle_list_users_help(self): self.assert_help( @@ -130,4 +222,6 @@ Options: --usertype=USERTYPE List users that have a given usertype (e.g. NORMAL, HOST, GROUP) --prefix=PREFIX List users that have a given prefix + --perm=PERM List users that have a given permission + --inherited-perm List of users that inherited specific perm """ % self.progname) diff --git a/tests/test_hub/test_list_users.py b/tests/test_hub/test_list_users.py new file mode 100644 index 00000000..0295e45f --- /dev/null +++ b/tests/test_hub/test_list_users.py @@ -0,0 +1,91 @@ +import unittest + +import mock + +import koji +import kojihub + +QP = kojihub.QueryProcessor + + +class TestListUsers(unittest.TestCase): + + def setUp(self): + self.maxDiff = None + self.exports = kojihub.RootExports() + self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor', + side_effect=self.getQuery).start() + self.queries = [] + + def tearDown(self): + mock.patch.stopall() + + def getQuery(self, *args, **kwargs): + query = QP(*args, **kwargs) + query.execute = mock.MagicMock() + query.executeOne = mock.MagicMock() + self.queries.append(query) + return query + + def test_valid_default(self): + self.exports.listUsers() + + self.assertEqual(len(self.queries), 1) + query = self.queries[0] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, [ + 'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id']) + self.assertEqual(query.clauses, ['usertype IN %(userType)s']) + + def test_valid_userType_none_with_perm_and_prefix(self): + self.exports.listUsers(userType=None, perm='admin', prefix='koji') + + self.assertEqual(len(self.queries), 1) + query = self.queries[0] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, [ + 'LEFT JOIN user_perms ON users.id = user_perms.user_id AND user_perms.active IS TRUE', + 'LEFT JOIN permissions ON perm_id = permissions.id', + 'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id']) + self.assertEqual(query.clauses, [ + 'user_perms.active AND permissions.name = %(perm)s', + "users.name ilike %(prefix)s || '%%'", + 'usertype IN %(userType)s', + ]) + + def test_valid_userType_none_with_perm_inherited_perm_and_prefix(self): + self.exports.listUsers(userType=None, perm='admin', prefix='koji', inherited_perm=True) + + self.assertEqual(len(self.queries), 1) + query = self.queries[0] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, [ + 'LEFT JOIN user_groups ON user_id = users.id AND user_groups.active IS TRUE', + 'LEFT JOIN user_perms ON users.id = user_perms.user_id AND ' + 'user_perms.active IS TRUE OR group_id =user_perms.user_id', + 'LEFT JOIN permissions ON perm_id = permissions.id', + 'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id']) + self.assertEqual(query.clauses, [ + 'user_perms.active AND permissions.name = %(perm)s', + "users.name ilike %(prefix)s || '%%'", + 'usertype IN %(userType)s', + ]) + + def test_inherited_perm_without_perm(self): + with self.assertRaises(koji.GenericError) as cm: + self.exports.listUsers(userType=None, inherited_perm=True) + self.assertEqual('inherited_perm option must be used with perm option', + str(cm.exception)) + self.assertEqual(len(self.queries), 0) + + def test_wrong_queryopts_group(self): + with self.assertRaises(koji.GenericError) as cm: + self.exports.listUsers(queryOpts={'group': 'test-column'}) + self.assertEqual('queryOpts.group is not available for this API', str(cm.exception)) + self.assertEqual(len(self.queries), 0) + + def test_usertype_not_int_or_none(self): + with self.assertRaises(koji.GenericError) as cm: + self.exports.listUsers(userType=[1]) + self.assertEqual('userType must be integer or None', str(cm.exception)) + self.assertEqual(len(self.queries), 0)