PR#3970: Add CLI with users with given permission

Merges #3970
https://pagure.io/koji/pull-request/3970

Fixes: #3950
https://pagure.io/koji/issue/3950
Add command to list users with a particular permission
This commit is contained in:
Tomas Kopecek 2024-03-11 13:12:33 +01:00
commit eba8de2473
4 changed files with 254 additions and 16 deletions

View file

@ -8129,10 +8129,17 @@ def anon_handle_list_users(goptions, session, args):
parser.add_option("--usertype", help="List users that have a given usertype "
"(e.g. NORMAL, HOST, GROUP)")
parser.add_option("--prefix", help="List users that have a given prefix")
parser.add_option("--perm", help="List users that have a given permission")
parser.add_option("--inherited-perm", action='store_true', default=False,
help="List of users that inherited specific perm")
(options, args) = parser.parse_args(args)
if len(args) > 0:
parser.error("This command takes no arguments")
if options.inherited_perm and not options.perm:
parser.error("inherited_perm option must be used with perm option")
activate_session(session, goptions)
if options.usertype:
@ -8140,6 +8147,8 @@ def anon_handle_list_users(goptions, session, args):
usertype = koji.USERTYPES[options.usertype.upper()]
else:
error("Usertype %s doesn't exist" % options.usertype)
elif options.perm:
usertype = None
else:
usertype = koji.USERTYPES['NORMAL']
@ -8148,6 +8157,15 @@ def anon_handle_list_users(goptions, session, args):
else:
prefix = None
users_list = session.listUsers(userType=usertype, prefix=prefix)
if options.perm:
if options.perm in [p['name'] for p in session.getAllPerms()]:
perm = options.perm
else:
error("Permission %s does not exists" % options.perm)
else:
perm = None
users_list = session.listUsers(userType=usertype, prefix=prefix, perm=perm,
inherited_perm=options.inherited_perm)
for user in users_list:
print(user['name'])

View file

@ -13117,31 +13117,66 @@ class RootExports(object):
dropGroupMember = staticmethod(drop_group_member)
getGroupMembers = staticmethod(get_group_members)
def listUsers(self, userType=koji.USERTYPES['NORMAL'], prefix=None, queryOpts=None):
def listUsers(self, userType=koji.USERTYPES['NORMAL'], prefix=None, queryOpts=None, perm=None,
inherited_perm=False):
"""List all users in the system.
userType can be an integer value from koji.USERTYPES (defaults to 0,
i.e. normal users). Returns a list of maps with the following keys:
userType can be an integer value from koji.USERTYPES (defaults to 0, i.e. normal users).
Returns a list of maps with the following keys:
- id
- name
- status
- usertype
- krb_principals
- permissions
If no users of the specified
type exist, return an empty list."""
fields = ('id', 'name', 'status', 'usertype', 'array_agg(krb_principal)')
aliases = ('id', 'name', 'status', 'usertype', 'krb_principals')
joins = ['LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id']
clauses = ['usertype = %(userType)i']
If no users of the specified type exist, return an empty list."""
if inherited_perm and not perm:
raise koji.GenericError('inherited_perm option must be used with perm option')
joins = []
if userType is None:
userType = list(koji.USERTYPES.values())
elif isinstance(userType, int):
userType = [userType]
else:
raise koji.ParameterError("userType must be integer or None")
clauses = ['usertype IN %(userType)s']
fields = [
('users.id', 'id'),
('users.name', 'name'),
('status', 'status'),
('usertype', 'usertype'),
('array_agg(krb_principal)', 'krb_principals'),
]
if perm:
fields.extend([
('permissions.name', 'permission_name'),
('permissions.id', 'permission_id'),
])
clauses.extend(['user_perms.active AND permissions.name = %(perm)s'])
if inherited_perm:
joins.extend(['LEFT JOIN user_groups ON user_id = users.id AND '
'user_groups.active IS TRUE',
'LEFT JOIN user_perms ON users.id = user_perms.user_id AND '
'user_perms.active IS TRUE OR group_id =user_perms.user_id',
'LEFT JOIN permissions ON perm_id = permissions.id'])
else:
joins.extend(['LEFT JOIN user_perms ON users.id = user_perms.user_id AND '
'user_perms.active IS TRUE',
'LEFT JOIN permissions ON perm_id = permissions.id'])
joins.append('LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id')
if prefix:
clauses.append("name ilike %(prefix)s || '%%'")
clauses.append("users.name ilike %(prefix)s || '%%'")
if queryOpts is None:
queryOpts = {}
if not queryOpts.get('group'):
queryOpts['group'] = 'users.id'
if perm:
queryOpts['group'] = 'users.id,permissions.id'
else:
queryOpts['group'] = 'users.id'
else:
raise koji.GenericError('queryOpts.group is not available for this API')
fields, aliases = zip(*fields)
query = QueryProcessor(columns=fields, aliases=aliases,
tables=['users'], joins=joins, clauses=clauses,
values=locals(), opts=queryOpts,

View file

@ -47,7 +47,8 @@ testuser
self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None)
self.session.listUsers.assert_called_once_with(
userType=koji.USERTYPES['NORMAL'], prefix=None)
inherited_perm=False, perm=None, userType=koji.USERTYPES['NORMAL'], prefix=None)
self.session.getAllPerms.assert_not_called()
@mock.patch('sys.stdout', new_callable=StringIO)
def test_list_users_with_prefix(self, stdout):
@ -65,7 +66,8 @@ testuser
self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None)
self.session.listUsers.assert_called_once_with(
userType=koji.USERTYPES['NORMAL'], prefix='koji')
inherited_perm=False, perm=None, userType=koji.USERTYPES['NORMAL'], prefix='koji')
self.session.getAllPerms.assert_not_called()
@mock.patch('sys.stdout', new_callable=StringIO)
def test_list_users_with_usertype(self, stdout):
@ -88,7 +90,8 @@ testhost
self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None)
self.session.listUsers.assert_called_once_with(
userType=koji.USERTYPES['HOST'], prefix=None)
inherited_perm=False, perm=None, userType=koji.USERTYPES['HOST'], prefix=None)
self.session.getAllPerms.assert_not_called()
def test_list_users_with_usertype_non_existing(self):
arguments = ['--usertype', 'test']
@ -100,6 +103,7 @@ testhost
activate_session=None,
exit_code=1)
self.session.listUsers.assert_not_called()
self.session.getAllPerms.assert_not_called()
@mock.patch('sys.stdout', new_callable=StringIO)
def test_list_users_with_usertype_and_prefix(self, stdout):
@ -117,7 +121,95 @@ testhost
self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None)
self.session.listUsers.assert_called_once_with(
userType=koji.USERTYPES['HOST'], prefix='test')
inherited_perm=False, perm=None, userType=koji.USERTYPES['HOST'], prefix='test')
self.session.getAllPerms.assert_not_called()
def test_list_users_with_arg(self):
arguments = ['test-arg']
self.assert_system_exit(
anon_handle_list_users,
self.options, self.session, arguments,
stderr=self.format_error_message("This command takes no arguments"),
stdout='',
activate_session=None,
exit_code=2)
self.activate_session.assert_not_called()
self.session.listUsers.assert_not_called()
self.session.getAllPerms.assert_not_called()
def test_list_users_not_exists_perm(self):
arguments = ['--perm', 'test-non-exist-perm']
self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}]
self.assert_system_exit(
anon_handle_list_users,
self.options, self.session, arguments,
stderr="Permission test-non-exist-perm does not exists\n",
stdout='',
activate_session=None,
exit_code=1)
self.activate_session.assert_called_once_with(self.session, self.options)
self.session.listUsers.assert_not_called()
self.session.getAllPerms.assert_called_once_with()
@mock.patch('sys.stdout', new_callable=StringIO)
def test_list_users_with_empty_result_of_users(self, stdout):
perm = 'test-perm'
arguments = ['--perm', perm]
self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}]
self.session.listUsers.return_value = []
rv = anon_handle_list_users(self.options, self.session, arguments)
actual = stdout.getvalue()
expected = """"""
self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None)
self.activate_session.assert_called_once_with(self.session, self.options)
self.session.listUsers.assert_called_once_with(
inherited_perm=False, perm=perm, prefix=None, userType=None)
self.session.getAllPerms.assert_called_once_with()
@mock.patch('sys.stdout', new_callable=StringIO)
def test_list_users_with_perms_valid_and_inherited_perm(self, stdout):
perm = 'test-perm'
arguments = ['--perm', perm, '--inherited-perm']
self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}]
self.session.listUsers.return_value = [{
'id': 1, 'krb_principals': [],
'name': 'kojiadmin',
'permission': perm,
'status': 0,
'usertype': 0},
{'id': 4,
'krb_principals': [],
'name': 'testuser1234',
'permission': perm,
'status': 0,
'usertype': 2},
]
rv = anon_handle_list_users(self.options, self.session, arguments)
actual = stdout.getvalue()
expected = """kojiadmin
testuser1234
"""
self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None)
self.session.listUsers.assert_called_once_with(
inherited_perm=True, perm=perm, prefix=None, userType=None)
self.session.getAllPerms.assert_called_once_with()
self.activate_session.assert_called_once_with(self.session, self.options)
def test_list_users_inherited_perm_without_perm(self):
arguments = ['--inherited-perm']
self.assert_system_exit(
anon_handle_list_users,
self.options, self.session, arguments,
stderr=self.format_error_message(
"inherited_perm option must be used with perm option"),
stdout='',
activate_session=None,
exit_code=2)
self.activate_session.assert_not_called()
self.session.listUsers.assert_not_called()
self.session.getAllPerms.assert_not_called()
def test_anon_handle_list_users_help(self):
self.assert_help(
@ -130,4 +222,6 @@ Options:
--usertype=USERTYPE List users that have a given usertype (e.g. NORMAL,
HOST, GROUP)
--prefix=PREFIX List users that have a given prefix
--perm=PERM List users that have a given permission
--inherited-perm List of users that inherited specific perm
""" % self.progname)

View file

@ -0,0 +1,91 @@
import unittest
import mock
import koji
import kojihub
QP = kojihub.QueryProcessor
class TestListUsers(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.exports = kojihub.RootExports()
self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor',
side_effect=self.getQuery).start()
self.queries = []
def tearDown(self):
mock.patch.stopall()
def getQuery(self, *args, **kwargs):
query = QP(*args, **kwargs)
query.execute = mock.MagicMock()
query.executeOne = mock.MagicMock()
self.queries.append(query)
return query
def test_valid_default(self):
self.exports.listUsers()
self.assertEqual(len(self.queries), 1)
query = self.queries[0]
self.assertEqual(query.tables, ['users'])
self.assertEqual(query.joins, [
'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'])
self.assertEqual(query.clauses, ['usertype IN %(userType)s'])
def test_valid_userType_none_with_perm_and_prefix(self):
self.exports.listUsers(userType=None, perm='admin', prefix='koji')
self.assertEqual(len(self.queries), 1)
query = self.queries[0]
self.assertEqual(query.tables, ['users'])
self.assertEqual(query.joins, [
'LEFT JOIN user_perms ON users.id = user_perms.user_id AND user_perms.active IS TRUE',
'LEFT JOIN permissions ON perm_id = permissions.id',
'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'])
self.assertEqual(query.clauses, [
'user_perms.active AND permissions.name = %(perm)s',
"users.name ilike %(prefix)s || '%%'",
'usertype IN %(userType)s',
])
def test_valid_userType_none_with_perm_inherited_perm_and_prefix(self):
self.exports.listUsers(userType=None, perm='admin', prefix='koji', inherited_perm=True)
self.assertEqual(len(self.queries), 1)
query = self.queries[0]
self.assertEqual(query.tables, ['users'])
self.assertEqual(query.joins, [
'LEFT JOIN user_groups ON user_id = users.id AND user_groups.active IS TRUE',
'LEFT JOIN user_perms ON users.id = user_perms.user_id AND '
'user_perms.active IS TRUE OR group_id =user_perms.user_id',
'LEFT JOIN permissions ON perm_id = permissions.id',
'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'])
self.assertEqual(query.clauses, [
'user_perms.active AND permissions.name = %(perm)s',
"users.name ilike %(prefix)s || '%%'",
'usertype IN %(userType)s',
])
def test_inherited_perm_without_perm(self):
with self.assertRaises(koji.GenericError) as cm:
self.exports.listUsers(userType=None, inherited_perm=True)
self.assertEqual('inherited_perm option must be used with perm option',
str(cm.exception))
self.assertEqual(len(self.queries), 0)
def test_wrong_queryopts_group(self):
with self.assertRaises(koji.GenericError) as cm:
self.exports.listUsers(queryOpts={'group': 'test-column'})
self.assertEqual('queryOpts.group is not available for this API', str(cm.exception))
self.assertEqual(len(self.queries), 0)
def test_usertype_not_int_or_none(self):
with self.assertRaises(koji.GenericError) as cm:
self.exports.listUsers(userType=[1])
self.assertEqual('userType must be integer or None', str(cm.exception))
self.assertEqual(len(self.queries), 0)