PR#3970: Add CLI with users with given permission

Merges #3970
https://pagure.io/koji/pull-request/3970

Fixes: #3950
https://pagure.io/koji/issue/3950
Add command to list users with a particular permission
This commit is contained in:
Tomas Kopecek 2024-03-11 13:12:33 +01:00
commit eba8de2473
4 changed files with 254 additions and 16 deletions

View file

@ -8129,10 +8129,17 @@ def anon_handle_list_users(goptions, session, args):
parser.add_option("--usertype", help="List users that have a given usertype " parser.add_option("--usertype", help="List users that have a given usertype "
"(e.g. NORMAL, HOST, GROUP)") "(e.g. NORMAL, HOST, GROUP)")
parser.add_option("--prefix", help="List users that have a given prefix") parser.add_option("--prefix", help="List users that have a given prefix")
parser.add_option("--perm", help="List users that have a given permission")
parser.add_option("--inherited-perm", action='store_true', default=False,
help="List of users that inherited specific perm")
(options, args) = parser.parse_args(args) (options, args) = parser.parse_args(args)
if len(args) > 0: if len(args) > 0:
parser.error("This command takes no arguments") parser.error("This command takes no arguments")
if options.inherited_perm and not options.perm:
parser.error("inherited_perm option must be used with perm option")
activate_session(session, goptions) activate_session(session, goptions)
if options.usertype: if options.usertype:
@ -8140,6 +8147,8 @@ def anon_handle_list_users(goptions, session, args):
usertype = koji.USERTYPES[options.usertype.upper()] usertype = koji.USERTYPES[options.usertype.upper()]
else: else:
error("Usertype %s doesn't exist" % options.usertype) error("Usertype %s doesn't exist" % options.usertype)
elif options.perm:
usertype = None
else: else:
usertype = koji.USERTYPES['NORMAL'] usertype = koji.USERTYPES['NORMAL']
@ -8148,6 +8157,15 @@ def anon_handle_list_users(goptions, session, args):
else: else:
prefix = None prefix = None
users_list = session.listUsers(userType=usertype, prefix=prefix) if options.perm:
if options.perm in [p['name'] for p in session.getAllPerms()]:
perm = options.perm
else:
error("Permission %s does not exists" % options.perm)
else:
perm = None
users_list = session.listUsers(userType=usertype, prefix=prefix, perm=perm,
inherited_perm=options.inherited_perm)
for user in users_list: for user in users_list:
print(user['name']) print(user['name'])

View file

@ -13117,31 +13117,66 @@ class RootExports(object):
dropGroupMember = staticmethod(drop_group_member) dropGroupMember = staticmethod(drop_group_member)
getGroupMembers = staticmethod(get_group_members) getGroupMembers = staticmethod(get_group_members)
def listUsers(self, userType=koji.USERTYPES['NORMAL'], prefix=None, queryOpts=None): def listUsers(self, userType=koji.USERTYPES['NORMAL'], prefix=None, queryOpts=None, perm=None,
inherited_perm=False):
"""List all users in the system. """List all users in the system.
userType can be an integer value from koji.USERTYPES (defaults to 0, userType can be an integer value from koji.USERTYPES (defaults to 0, i.e. normal users).
i.e. normal users). Returns a list of maps with the following keys: Returns a list of maps with the following keys:
- id - id
- name - name
- status - status
- usertype - usertype
- krb_principals - krb_principals
- permissions
If no users of the specified If no users of the specified type exist, return an empty list."""
type exist, return an empty list.""" if inherited_perm and not perm:
fields = ('id', 'name', 'status', 'usertype', 'array_agg(krb_principal)') raise koji.GenericError('inherited_perm option must be used with perm option')
aliases = ('id', 'name', 'status', 'usertype', 'krb_principals') joins = []
joins = ['LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'] if userType is None:
clauses = ['usertype = %(userType)i'] userType = list(koji.USERTYPES.values())
elif isinstance(userType, int):
userType = [userType]
else:
raise koji.ParameterError("userType must be integer or None")
clauses = ['usertype IN %(userType)s']
fields = [
('users.id', 'id'),
('users.name', 'name'),
('status', 'status'),
('usertype', 'usertype'),
('array_agg(krb_principal)', 'krb_principals'),
]
if perm:
fields.extend([
('permissions.name', 'permission_name'),
('permissions.id', 'permission_id'),
])
clauses.extend(['user_perms.active AND permissions.name = %(perm)s'])
if inherited_perm:
joins.extend(['LEFT JOIN user_groups ON user_id = users.id AND '
'user_groups.active IS TRUE',
'LEFT JOIN user_perms ON users.id = user_perms.user_id AND '
'user_perms.active IS TRUE OR group_id =user_perms.user_id',
'LEFT JOIN permissions ON perm_id = permissions.id'])
else:
joins.extend(['LEFT JOIN user_perms ON users.id = user_perms.user_id AND '
'user_perms.active IS TRUE',
'LEFT JOIN permissions ON perm_id = permissions.id'])
joins.append('LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id')
if prefix: if prefix:
clauses.append("name ilike %(prefix)s || '%%'") clauses.append("users.name ilike %(prefix)s || '%%'")
if queryOpts is None: if queryOpts is None:
queryOpts = {} queryOpts = {}
if not queryOpts.get('group'): if not queryOpts.get('group'):
queryOpts['group'] = 'users.id' if perm:
queryOpts['group'] = 'users.id,permissions.id'
else:
queryOpts['group'] = 'users.id'
else: else:
raise koji.GenericError('queryOpts.group is not available for this API') raise koji.GenericError('queryOpts.group is not available for this API')
fields, aliases = zip(*fields)
query = QueryProcessor(columns=fields, aliases=aliases, query = QueryProcessor(columns=fields, aliases=aliases,
tables=['users'], joins=joins, clauses=clauses, tables=['users'], joins=joins, clauses=clauses,
values=locals(), opts=queryOpts, values=locals(), opts=queryOpts,

View file

@ -47,7 +47,8 @@ testuser
self.assertMultiLineEqual(actual, expected) self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None) self.assertEqual(rv, None)
self.session.listUsers.assert_called_once_with( self.session.listUsers.assert_called_once_with(
userType=koji.USERTYPES['NORMAL'], prefix=None) inherited_perm=False, perm=None, userType=koji.USERTYPES['NORMAL'], prefix=None)
self.session.getAllPerms.assert_not_called()
@mock.patch('sys.stdout', new_callable=StringIO) @mock.patch('sys.stdout', new_callable=StringIO)
def test_list_users_with_prefix(self, stdout): def test_list_users_with_prefix(self, stdout):
@ -65,7 +66,8 @@ testuser
self.assertMultiLineEqual(actual, expected) self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None) self.assertEqual(rv, None)
self.session.listUsers.assert_called_once_with( self.session.listUsers.assert_called_once_with(
userType=koji.USERTYPES['NORMAL'], prefix='koji') inherited_perm=False, perm=None, userType=koji.USERTYPES['NORMAL'], prefix='koji')
self.session.getAllPerms.assert_not_called()
@mock.patch('sys.stdout', new_callable=StringIO) @mock.patch('sys.stdout', new_callable=StringIO)
def test_list_users_with_usertype(self, stdout): def test_list_users_with_usertype(self, stdout):
@ -88,7 +90,8 @@ testhost
self.assertMultiLineEqual(actual, expected) self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None) self.assertEqual(rv, None)
self.session.listUsers.assert_called_once_with( self.session.listUsers.assert_called_once_with(
userType=koji.USERTYPES['HOST'], prefix=None) inherited_perm=False, perm=None, userType=koji.USERTYPES['HOST'], prefix=None)
self.session.getAllPerms.assert_not_called()
def test_list_users_with_usertype_non_existing(self): def test_list_users_with_usertype_non_existing(self):
arguments = ['--usertype', 'test'] arguments = ['--usertype', 'test']
@ -100,6 +103,7 @@ testhost
activate_session=None, activate_session=None,
exit_code=1) exit_code=1)
self.session.listUsers.assert_not_called() self.session.listUsers.assert_not_called()
self.session.getAllPerms.assert_not_called()
@mock.patch('sys.stdout', new_callable=StringIO) @mock.patch('sys.stdout', new_callable=StringIO)
def test_list_users_with_usertype_and_prefix(self, stdout): def test_list_users_with_usertype_and_prefix(self, stdout):
@ -117,7 +121,95 @@ testhost
self.assertMultiLineEqual(actual, expected) self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None) self.assertEqual(rv, None)
self.session.listUsers.assert_called_once_with( self.session.listUsers.assert_called_once_with(
userType=koji.USERTYPES['HOST'], prefix='test') inherited_perm=False, perm=None, userType=koji.USERTYPES['HOST'], prefix='test')
self.session.getAllPerms.assert_not_called()
def test_list_users_with_arg(self):
arguments = ['test-arg']
self.assert_system_exit(
anon_handle_list_users,
self.options, self.session, arguments,
stderr=self.format_error_message("This command takes no arguments"),
stdout='',
activate_session=None,
exit_code=2)
self.activate_session.assert_not_called()
self.session.listUsers.assert_not_called()
self.session.getAllPerms.assert_not_called()
def test_list_users_not_exists_perm(self):
arguments = ['--perm', 'test-non-exist-perm']
self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}]
self.assert_system_exit(
anon_handle_list_users,
self.options, self.session, arguments,
stderr="Permission test-non-exist-perm does not exists\n",
stdout='',
activate_session=None,
exit_code=1)
self.activate_session.assert_called_once_with(self.session, self.options)
self.session.listUsers.assert_not_called()
self.session.getAllPerms.assert_called_once_with()
@mock.patch('sys.stdout', new_callable=StringIO)
def test_list_users_with_empty_result_of_users(self, stdout):
perm = 'test-perm'
arguments = ['--perm', perm]
self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}]
self.session.listUsers.return_value = []
rv = anon_handle_list_users(self.options, self.session, arguments)
actual = stdout.getvalue()
expected = """"""
self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None)
self.activate_session.assert_called_once_with(self.session, self.options)
self.session.listUsers.assert_called_once_with(
inherited_perm=False, perm=perm, prefix=None, userType=None)
self.session.getAllPerms.assert_called_once_with()
@mock.patch('sys.stdout', new_callable=StringIO)
def test_list_users_with_perms_valid_and_inherited_perm(self, stdout):
perm = 'test-perm'
arguments = ['--perm', perm, '--inherited-perm']
self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}]
self.session.listUsers.return_value = [{
'id': 1, 'krb_principals': [],
'name': 'kojiadmin',
'permission': perm,
'status': 0,
'usertype': 0},
{'id': 4,
'krb_principals': [],
'name': 'testuser1234',
'permission': perm,
'status': 0,
'usertype': 2},
]
rv = anon_handle_list_users(self.options, self.session, arguments)
actual = stdout.getvalue()
expected = """kojiadmin
testuser1234
"""
self.assertMultiLineEqual(actual, expected)
self.assertEqual(rv, None)
self.session.listUsers.assert_called_once_with(
inherited_perm=True, perm=perm, prefix=None, userType=None)
self.session.getAllPerms.assert_called_once_with()
self.activate_session.assert_called_once_with(self.session, self.options)
def test_list_users_inherited_perm_without_perm(self):
arguments = ['--inherited-perm']
self.assert_system_exit(
anon_handle_list_users,
self.options, self.session, arguments,
stderr=self.format_error_message(
"inherited_perm option must be used with perm option"),
stdout='',
activate_session=None,
exit_code=2)
self.activate_session.assert_not_called()
self.session.listUsers.assert_not_called()
self.session.getAllPerms.assert_not_called()
def test_anon_handle_list_users_help(self): def test_anon_handle_list_users_help(self):
self.assert_help( self.assert_help(
@ -130,4 +222,6 @@ Options:
--usertype=USERTYPE List users that have a given usertype (e.g. NORMAL, --usertype=USERTYPE List users that have a given usertype (e.g. NORMAL,
HOST, GROUP) HOST, GROUP)
--prefix=PREFIX List users that have a given prefix --prefix=PREFIX List users that have a given prefix
--perm=PERM List users that have a given permission
--inherited-perm List of users that inherited specific perm
""" % self.progname) """ % self.progname)

View file

@ -0,0 +1,91 @@
import unittest
import mock
import koji
import kojihub
QP = kojihub.QueryProcessor
class TestListUsers(unittest.TestCase):
def setUp(self):
self.maxDiff = None
self.exports = kojihub.RootExports()
self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor',
side_effect=self.getQuery).start()
self.queries = []
def tearDown(self):
mock.patch.stopall()
def getQuery(self, *args, **kwargs):
query = QP(*args, **kwargs)
query.execute = mock.MagicMock()
query.executeOne = mock.MagicMock()
self.queries.append(query)
return query
def test_valid_default(self):
self.exports.listUsers()
self.assertEqual(len(self.queries), 1)
query = self.queries[0]
self.assertEqual(query.tables, ['users'])
self.assertEqual(query.joins, [
'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'])
self.assertEqual(query.clauses, ['usertype IN %(userType)s'])
def test_valid_userType_none_with_perm_and_prefix(self):
self.exports.listUsers(userType=None, perm='admin', prefix='koji')
self.assertEqual(len(self.queries), 1)
query = self.queries[0]
self.assertEqual(query.tables, ['users'])
self.assertEqual(query.joins, [
'LEFT JOIN user_perms ON users.id = user_perms.user_id AND user_perms.active IS TRUE',
'LEFT JOIN permissions ON perm_id = permissions.id',
'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'])
self.assertEqual(query.clauses, [
'user_perms.active AND permissions.name = %(perm)s',
"users.name ilike %(prefix)s || '%%'",
'usertype IN %(userType)s',
])
def test_valid_userType_none_with_perm_inherited_perm_and_prefix(self):
self.exports.listUsers(userType=None, perm='admin', prefix='koji', inherited_perm=True)
self.assertEqual(len(self.queries), 1)
query = self.queries[0]
self.assertEqual(query.tables, ['users'])
self.assertEqual(query.joins, [
'LEFT JOIN user_groups ON user_id = users.id AND user_groups.active IS TRUE',
'LEFT JOIN user_perms ON users.id = user_perms.user_id AND '
'user_perms.active IS TRUE OR group_id =user_perms.user_id',
'LEFT JOIN permissions ON perm_id = permissions.id',
'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'])
self.assertEqual(query.clauses, [
'user_perms.active AND permissions.name = %(perm)s',
"users.name ilike %(prefix)s || '%%'",
'usertype IN %(userType)s',
])
def test_inherited_perm_without_perm(self):
with self.assertRaises(koji.GenericError) as cm:
self.exports.listUsers(userType=None, inherited_perm=True)
self.assertEqual('inherited_perm option must be used with perm option',
str(cm.exception))
self.assertEqual(len(self.queries), 0)
def test_wrong_queryopts_group(self):
with self.assertRaises(koji.GenericError) as cm:
self.exports.listUsers(queryOpts={'group': 'test-column'})
self.assertEqual('queryOpts.group is not available for this API', str(cm.exception))
self.assertEqual(len(self.queries), 0)
def test_usertype_not_int_or_none(self):
with self.assertRaises(koji.GenericError) as cm:
self.exports.listUsers(userType=[1])
self.assertEqual('userType must be integer or None', str(cm.exception))
self.assertEqual(len(self.queries), 0)