PR#3970: Add CLI with users with given permission
Merges #3970 https://pagure.io/koji/pull-request/3970 Fixes: #3950 https://pagure.io/koji/issue/3950 Add command to list users with a particular permission
This commit is contained in:
commit
eba8de2473
4 changed files with 254 additions and 16 deletions
|
|
@ -8129,10 +8129,17 @@ def anon_handle_list_users(goptions, session, args):
|
|||
parser.add_option("--usertype", help="List users that have a given usertype "
|
||||
"(e.g. NORMAL, HOST, GROUP)")
|
||||
parser.add_option("--prefix", help="List users that have a given prefix")
|
||||
parser.add_option("--perm", help="List users that have a given permission")
|
||||
parser.add_option("--inherited-perm", action='store_true', default=False,
|
||||
help="List of users that inherited specific perm")
|
||||
(options, args) = parser.parse_args(args)
|
||||
|
||||
if len(args) > 0:
|
||||
parser.error("This command takes no arguments")
|
||||
|
||||
if options.inherited_perm and not options.perm:
|
||||
parser.error("inherited_perm option must be used with perm option")
|
||||
|
||||
activate_session(session, goptions)
|
||||
|
||||
if options.usertype:
|
||||
|
|
@ -8140,6 +8147,8 @@ def anon_handle_list_users(goptions, session, args):
|
|||
usertype = koji.USERTYPES[options.usertype.upper()]
|
||||
else:
|
||||
error("Usertype %s doesn't exist" % options.usertype)
|
||||
elif options.perm:
|
||||
usertype = None
|
||||
else:
|
||||
usertype = koji.USERTYPES['NORMAL']
|
||||
|
||||
|
|
@ -8148,6 +8157,15 @@ def anon_handle_list_users(goptions, session, args):
|
|||
else:
|
||||
prefix = None
|
||||
|
||||
users_list = session.listUsers(userType=usertype, prefix=prefix)
|
||||
if options.perm:
|
||||
if options.perm in [p['name'] for p in session.getAllPerms()]:
|
||||
perm = options.perm
|
||||
else:
|
||||
error("Permission %s does not exists" % options.perm)
|
||||
else:
|
||||
perm = None
|
||||
|
||||
users_list = session.listUsers(userType=usertype, prefix=prefix, perm=perm,
|
||||
inherited_perm=options.inherited_perm)
|
||||
for user in users_list:
|
||||
print(user['name'])
|
||||
|
|
|
|||
|
|
@ -13117,31 +13117,66 @@ class RootExports(object):
|
|||
dropGroupMember = staticmethod(drop_group_member)
|
||||
getGroupMembers = staticmethod(get_group_members)
|
||||
|
||||
def listUsers(self, userType=koji.USERTYPES['NORMAL'], prefix=None, queryOpts=None):
|
||||
def listUsers(self, userType=koji.USERTYPES['NORMAL'], prefix=None, queryOpts=None, perm=None,
|
||||
inherited_perm=False):
|
||||
"""List all users in the system.
|
||||
userType can be an integer value from koji.USERTYPES (defaults to 0,
|
||||
i.e. normal users). Returns a list of maps with the following keys:
|
||||
userType can be an integer value from koji.USERTYPES (defaults to 0, i.e. normal users).
|
||||
Returns a list of maps with the following keys:
|
||||
|
||||
- id
|
||||
- name
|
||||
- status
|
||||
- usertype
|
||||
- krb_principals
|
||||
- permissions
|
||||
|
||||
If no users of the specified
|
||||
type exist, return an empty list."""
|
||||
fields = ('id', 'name', 'status', 'usertype', 'array_agg(krb_principal)')
|
||||
aliases = ('id', 'name', 'status', 'usertype', 'krb_principals')
|
||||
joins = ['LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id']
|
||||
clauses = ['usertype = %(userType)i']
|
||||
If no users of the specified type exist, return an empty list."""
|
||||
if inherited_perm and not perm:
|
||||
raise koji.GenericError('inherited_perm option must be used with perm option')
|
||||
joins = []
|
||||
if userType is None:
|
||||
userType = list(koji.USERTYPES.values())
|
||||
elif isinstance(userType, int):
|
||||
userType = [userType]
|
||||
else:
|
||||
raise koji.ParameterError("userType must be integer or None")
|
||||
clauses = ['usertype IN %(userType)s']
|
||||
fields = [
|
||||
('users.id', 'id'),
|
||||
('users.name', 'name'),
|
||||
('status', 'status'),
|
||||
('usertype', 'usertype'),
|
||||
('array_agg(krb_principal)', 'krb_principals'),
|
||||
]
|
||||
if perm:
|
||||
fields.extend([
|
||||
('permissions.name', 'permission_name'),
|
||||
('permissions.id', 'permission_id'),
|
||||
])
|
||||
clauses.extend(['user_perms.active AND permissions.name = %(perm)s'])
|
||||
if inherited_perm:
|
||||
joins.extend(['LEFT JOIN user_groups ON user_id = users.id AND '
|
||||
'user_groups.active IS TRUE',
|
||||
'LEFT JOIN user_perms ON users.id = user_perms.user_id AND '
|
||||
'user_perms.active IS TRUE OR group_id =user_perms.user_id',
|
||||
'LEFT JOIN permissions ON perm_id = permissions.id'])
|
||||
else:
|
||||
joins.extend(['LEFT JOIN user_perms ON users.id = user_perms.user_id AND '
|
||||
'user_perms.active IS TRUE',
|
||||
'LEFT JOIN permissions ON perm_id = permissions.id'])
|
||||
joins.append('LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id')
|
||||
if prefix:
|
||||
clauses.append("name ilike %(prefix)s || '%%'")
|
||||
clauses.append("users.name ilike %(prefix)s || '%%'")
|
||||
if queryOpts is None:
|
||||
queryOpts = {}
|
||||
if not queryOpts.get('group'):
|
||||
queryOpts['group'] = 'users.id'
|
||||
if perm:
|
||||
queryOpts['group'] = 'users.id,permissions.id'
|
||||
else:
|
||||
queryOpts['group'] = 'users.id'
|
||||
else:
|
||||
raise koji.GenericError('queryOpts.group is not available for this API')
|
||||
fields, aliases = zip(*fields)
|
||||
query = QueryProcessor(columns=fields, aliases=aliases,
|
||||
tables=['users'], joins=joins, clauses=clauses,
|
||||
values=locals(), opts=queryOpts,
|
||||
|
|
|
|||
|
|
@ -47,7 +47,8 @@ testuser
|
|||
self.assertMultiLineEqual(actual, expected)
|
||||
self.assertEqual(rv, None)
|
||||
self.session.listUsers.assert_called_once_with(
|
||||
userType=koji.USERTYPES['NORMAL'], prefix=None)
|
||||
inherited_perm=False, perm=None, userType=koji.USERTYPES['NORMAL'], prefix=None)
|
||||
self.session.getAllPerms.assert_not_called()
|
||||
|
||||
@mock.patch('sys.stdout', new_callable=StringIO)
|
||||
def test_list_users_with_prefix(self, stdout):
|
||||
|
|
@ -65,7 +66,8 @@ testuser
|
|||
self.assertMultiLineEqual(actual, expected)
|
||||
self.assertEqual(rv, None)
|
||||
self.session.listUsers.assert_called_once_with(
|
||||
userType=koji.USERTYPES['NORMAL'], prefix='koji')
|
||||
inherited_perm=False, perm=None, userType=koji.USERTYPES['NORMAL'], prefix='koji')
|
||||
self.session.getAllPerms.assert_not_called()
|
||||
|
||||
@mock.patch('sys.stdout', new_callable=StringIO)
|
||||
def test_list_users_with_usertype(self, stdout):
|
||||
|
|
@ -88,7 +90,8 @@ testhost
|
|||
self.assertMultiLineEqual(actual, expected)
|
||||
self.assertEqual(rv, None)
|
||||
self.session.listUsers.assert_called_once_with(
|
||||
userType=koji.USERTYPES['HOST'], prefix=None)
|
||||
inherited_perm=False, perm=None, userType=koji.USERTYPES['HOST'], prefix=None)
|
||||
self.session.getAllPerms.assert_not_called()
|
||||
|
||||
def test_list_users_with_usertype_non_existing(self):
|
||||
arguments = ['--usertype', 'test']
|
||||
|
|
@ -100,6 +103,7 @@ testhost
|
|||
activate_session=None,
|
||||
exit_code=1)
|
||||
self.session.listUsers.assert_not_called()
|
||||
self.session.getAllPerms.assert_not_called()
|
||||
|
||||
@mock.patch('sys.stdout', new_callable=StringIO)
|
||||
def test_list_users_with_usertype_and_prefix(self, stdout):
|
||||
|
|
@ -117,7 +121,95 @@ testhost
|
|||
self.assertMultiLineEqual(actual, expected)
|
||||
self.assertEqual(rv, None)
|
||||
self.session.listUsers.assert_called_once_with(
|
||||
userType=koji.USERTYPES['HOST'], prefix='test')
|
||||
inherited_perm=False, perm=None, userType=koji.USERTYPES['HOST'], prefix='test')
|
||||
self.session.getAllPerms.assert_not_called()
|
||||
|
||||
def test_list_users_with_arg(self):
|
||||
arguments = ['test-arg']
|
||||
self.assert_system_exit(
|
||||
anon_handle_list_users,
|
||||
self.options, self.session, arguments,
|
||||
stderr=self.format_error_message("This command takes no arguments"),
|
||||
stdout='',
|
||||
activate_session=None,
|
||||
exit_code=2)
|
||||
self.activate_session.assert_not_called()
|
||||
self.session.listUsers.assert_not_called()
|
||||
self.session.getAllPerms.assert_not_called()
|
||||
|
||||
def test_list_users_not_exists_perm(self):
|
||||
arguments = ['--perm', 'test-non-exist-perm']
|
||||
self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}]
|
||||
self.assert_system_exit(
|
||||
anon_handle_list_users,
|
||||
self.options, self.session, arguments,
|
||||
stderr="Permission test-non-exist-perm does not exists\n",
|
||||
stdout='',
|
||||
activate_session=None,
|
||||
exit_code=1)
|
||||
self.activate_session.assert_called_once_with(self.session, self.options)
|
||||
self.session.listUsers.assert_not_called()
|
||||
self.session.getAllPerms.assert_called_once_with()
|
||||
|
||||
@mock.patch('sys.stdout', new_callable=StringIO)
|
||||
def test_list_users_with_empty_result_of_users(self, stdout):
|
||||
perm = 'test-perm'
|
||||
arguments = ['--perm', perm]
|
||||
self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}]
|
||||
self.session.listUsers.return_value = []
|
||||
rv = anon_handle_list_users(self.options, self.session, arguments)
|
||||
actual = stdout.getvalue()
|
||||
expected = """"""
|
||||
self.assertMultiLineEqual(actual, expected)
|
||||
self.assertEqual(rv, None)
|
||||
self.activate_session.assert_called_once_with(self.session, self.options)
|
||||
self.session.listUsers.assert_called_once_with(
|
||||
inherited_perm=False, perm=perm, prefix=None, userType=None)
|
||||
self.session.getAllPerms.assert_called_once_with()
|
||||
|
||||
@mock.patch('sys.stdout', new_callable=StringIO)
|
||||
def test_list_users_with_perms_valid_and_inherited_perm(self, stdout):
|
||||
perm = 'test-perm'
|
||||
arguments = ['--perm', perm, '--inherited-perm']
|
||||
self.session.getAllPerms.return_value = [{'name': 'test-perm'}, {'name': 'test-perm-2'}]
|
||||
self.session.listUsers.return_value = [{
|
||||
'id': 1, 'krb_principals': [],
|
||||
'name': 'kojiadmin',
|
||||
'permission': perm,
|
||||
'status': 0,
|
||||
'usertype': 0},
|
||||
{'id': 4,
|
||||
'krb_principals': [],
|
||||
'name': 'testuser1234',
|
||||
'permission': perm,
|
||||
'status': 0,
|
||||
'usertype': 2},
|
||||
]
|
||||
rv = anon_handle_list_users(self.options, self.session, arguments)
|
||||
actual = stdout.getvalue()
|
||||
expected = """kojiadmin
|
||||
testuser1234
|
||||
"""
|
||||
self.assertMultiLineEqual(actual, expected)
|
||||
self.assertEqual(rv, None)
|
||||
self.session.listUsers.assert_called_once_with(
|
||||
inherited_perm=True, perm=perm, prefix=None, userType=None)
|
||||
self.session.getAllPerms.assert_called_once_with()
|
||||
self.activate_session.assert_called_once_with(self.session, self.options)
|
||||
|
||||
def test_list_users_inherited_perm_without_perm(self):
|
||||
arguments = ['--inherited-perm']
|
||||
self.assert_system_exit(
|
||||
anon_handle_list_users,
|
||||
self.options, self.session, arguments,
|
||||
stderr=self.format_error_message(
|
||||
"inherited_perm option must be used with perm option"),
|
||||
stdout='',
|
||||
activate_session=None,
|
||||
exit_code=2)
|
||||
self.activate_session.assert_not_called()
|
||||
self.session.listUsers.assert_not_called()
|
||||
self.session.getAllPerms.assert_not_called()
|
||||
|
||||
def test_anon_handle_list_users_help(self):
|
||||
self.assert_help(
|
||||
|
|
@ -130,4 +222,6 @@ Options:
|
|||
--usertype=USERTYPE List users that have a given usertype (e.g. NORMAL,
|
||||
HOST, GROUP)
|
||||
--prefix=PREFIX List users that have a given prefix
|
||||
--perm=PERM List users that have a given permission
|
||||
--inherited-perm List of users that inherited specific perm
|
||||
""" % self.progname)
|
||||
|
|
|
|||
91
tests/test_hub/test_list_users.py
Normal file
91
tests/test_hub/test_list_users.py
Normal file
|
|
@ -0,0 +1,91 @@
|
|||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
import koji
|
||||
import kojihub
|
||||
|
||||
QP = kojihub.QueryProcessor
|
||||
|
||||
|
||||
class TestListUsers(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.maxDiff = None
|
||||
self.exports = kojihub.RootExports()
|
||||
self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor',
|
||||
side_effect=self.getQuery).start()
|
||||
self.queries = []
|
||||
|
||||
def tearDown(self):
|
||||
mock.patch.stopall()
|
||||
|
||||
def getQuery(self, *args, **kwargs):
|
||||
query = QP(*args, **kwargs)
|
||||
query.execute = mock.MagicMock()
|
||||
query.executeOne = mock.MagicMock()
|
||||
self.queries.append(query)
|
||||
return query
|
||||
|
||||
def test_valid_default(self):
|
||||
self.exports.listUsers()
|
||||
|
||||
self.assertEqual(len(self.queries), 1)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, [
|
||||
'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'])
|
||||
self.assertEqual(query.clauses, ['usertype IN %(userType)s'])
|
||||
|
||||
def test_valid_userType_none_with_perm_and_prefix(self):
|
||||
self.exports.listUsers(userType=None, perm='admin', prefix='koji')
|
||||
|
||||
self.assertEqual(len(self.queries), 1)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, [
|
||||
'LEFT JOIN user_perms ON users.id = user_perms.user_id AND user_perms.active IS TRUE',
|
||||
'LEFT JOIN permissions ON perm_id = permissions.id',
|
||||
'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'])
|
||||
self.assertEqual(query.clauses, [
|
||||
'user_perms.active AND permissions.name = %(perm)s',
|
||||
"users.name ilike %(prefix)s || '%%'",
|
||||
'usertype IN %(userType)s',
|
||||
])
|
||||
|
||||
def test_valid_userType_none_with_perm_inherited_perm_and_prefix(self):
|
||||
self.exports.listUsers(userType=None, perm='admin', prefix='koji', inherited_perm=True)
|
||||
|
||||
self.assertEqual(len(self.queries), 1)
|
||||
query = self.queries[0]
|
||||
self.assertEqual(query.tables, ['users'])
|
||||
self.assertEqual(query.joins, [
|
||||
'LEFT JOIN user_groups ON user_id = users.id AND user_groups.active IS TRUE',
|
||||
'LEFT JOIN user_perms ON users.id = user_perms.user_id AND '
|
||||
'user_perms.active IS TRUE OR group_id =user_perms.user_id',
|
||||
'LEFT JOIN permissions ON perm_id = permissions.id',
|
||||
'LEFT JOIN user_krb_principals ON users.id = user_krb_principals.user_id'])
|
||||
self.assertEqual(query.clauses, [
|
||||
'user_perms.active AND permissions.name = %(perm)s',
|
||||
"users.name ilike %(prefix)s || '%%'",
|
||||
'usertype IN %(userType)s',
|
||||
])
|
||||
|
||||
def test_inherited_perm_without_perm(self):
|
||||
with self.assertRaises(koji.GenericError) as cm:
|
||||
self.exports.listUsers(userType=None, inherited_perm=True)
|
||||
self.assertEqual('inherited_perm option must be used with perm option',
|
||||
str(cm.exception))
|
||||
self.assertEqual(len(self.queries), 0)
|
||||
|
||||
def test_wrong_queryopts_group(self):
|
||||
with self.assertRaises(koji.GenericError) as cm:
|
||||
self.exports.listUsers(queryOpts={'group': 'test-column'})
|
||||
self.assertEqual('queryOpts.group is not available for this API', str(cm.exception))
|
||||
self.assertEqual(len(self.queries), 0)
|
||||
|
||||
def test_usertype_not_int_or_none(self):
|
||||
with self.assertRaises(koji.GenericError) as cm:
|
||||
self.exports.listUsers(userType=[1])
|
||||
self.assertEqual('userType must be integer or None', str(cm.exception))
|
||||
self.assertEqual(len(self.queries), 0)
|
||||
Loading…
Add table
Add a link
Reference in a new issue