refactor get_user_perms

This commit is contained in:
Tomas Kopecek 2023-08-15 10:38:39 +02:00
parent e1ea85f184
commit f5f8e6c6a2
6 changed files with 85 additions and 58 deletions

View file

@ -2176,12 +2176,10 @@ def handle_list_permissions(goptions, session, args):
user = session.getUser(options.user)
if not user:
error("No such user: %s" % options.user)
all = set(session.getUserPerms(user['id']))
own = set(session.getUserPerms(user['id'], with_groups=False))
for p in all:
for p, groups in session.getUserPermsInheritance(user['id']).items():
p = {'name': p}
if p['name'] not in own:
p['description'] = '[inherited]'
if groups != [None]:
p['description'] = 'inherited from: %s' % ', '.join(groups)
perms.append(p)
else:
for p in session.getAllPerms():

View file

@ -782,29 +782,61 @@ def get_user_groups(user_id):
return groups
def get_user_perms(user_id, with_groups=True):
def get_user_perms(user_id, with_groups=True, inheritance_data=False):
"""
:param int user_id: User ID
:param bool with_groups: Add also permissions from all groups and their inheritance chain
:param bool inheritance_data: Return extended data about permissions sources
:returns list[str]: in case of inheritance_data=False
:returns dict[str, list[str]]: in case of inheritance_data=True - keys are permissions' names,
values list of groups which are in inheritance and provides
given permission.
"""
if inheritance_data and not with_groups:
raise koji.ParameterError("inheritance option implies with_groups")
# individual permissions
perms = {}
query = QueryProcessor(tables=['user_perms'], columns=['name'],
clauses=['active IS TRUE', 'user_id=%(user_id)s'],
joins=['permissions ON perm_id = permissions.id'],
values={'user_id': user_id})
result = query.execute()
perms = {r['name'] for r in result}
for perm in query.execute():
perms[perm['name']] = [None]
# inherited group permissions
if with_groups:
query = QueryProcessor(tables=['user_groups'], columns=['name'],
columns = ['permissions.name']
aliases = ['name']
joins = [
'user_perms ON user_perms.user_id = user_groups.group_id',
'permissions ON perm_id = permissions.id',
]
if inheritance_data:
# inheritance data adds one more join and as function
# can be called relatively often (e.g. in hub policy tests)
# it is a bit faster to ignore this join for "default" code path
columns.append('users.name')
aliases.append('group')
joins.append('users ON user_groups.group_id = users.id')
query = QueryProcessor(tables=['user_groups'],
columns=columns,
aliases=aliases,
clauses=[
'user_groups.active IS TRUE',
'user_perms.active IS TRUE',
'user_groups.user_id=%(user_id)s'],
joins=[
'user_perms ON user_perms.user_id = user_groups.group_id',
'permissions ON perm_id = permissions.id'],
joins=joins,
values={'user_id': user_id})
result = query.execute()
perms |= {r['name'] for r in result}
return list(perms)
for row in query.execute():
if inheritance_data:
perms.setdefault(row['name'], []).append(row['group'])
else:
# group name wouldn't be used in this case
perms.setdefault(row['name'], [])
if inheritance_data:
return perms
else:
return list(perms.keys())
def get_user_data(user_id):

View file

@ -13390,7 +13390,7 @@ class RootExports(object):
- userID: User ID or username. If no userID provided, current login user's
permissions will be listed."""
user_info = get_user(userID, strict=True)
return get_user_perms(user_info['id'], with_groups=with_groups)
return get_user_perms(user_info['id'], with_groups=with_groups, inheritance_data=False)
def getUserPermsInheritance(self, userID):
"""Get a dict of the permissions granted directly to user or inherited from groups
@ -13400,25 +13400,7 @@ class RootExports(object):
:returns dict[str, list[str]]: list of permissions with source (None/group)
"""
user_info = get_user(userID, strict=True)
perms = {}
for perm in get_user_perms(user_info['id'], with_groups=False):
perms[perm] = [None]
query = QueryProcessor(tables=['user_groups'],
columns=['permissions.name', 'users.name'],
aliases=['permission', 'group'],
clauses=[
'user_groups.active IS TRUE',
'user_perms.active IS TRUE',
'user_groups.user_id=%(user_id)s'],
joins=[
'user_perms ON user_perms.user_id = user_groups.group_id',
'permissions ON perm_id = permissions.id',
'users ON user_groups.group_id = users.id'],
values={'user_id': user_info['id']})
for row in query.execute():
perms.setdefault(row['permission'], []).append(row['group'])
return perms
return get_user_perms(user_info['id'], inheritance_data=True)
def getAllPerms(self):
"""Get a list of all permissions in the system. Returns a list of maps. Each

View file

@ -79,13 +79,14 @@ class TestListPermissions(utils.CliTestCase):
"""
self.options.quiet = False
self.session.getUser.return_value = self.userinfo
self.session.getUserPerms.return_value = []
self.session.getUserPermsInheritance.return_value = {}
handle_list_permissions(self.options, self.session, ['--user', self.user])
self.assert_console_message(stdout, expected)
self.activate_session_mock.assert_called_once()
self.session.getUser.assert_called_once()
self.session.getUserPerms.assert_called()
self.session.getUserPerms.assert_not_called()
self.session.getUserPermsInheritance.assert_called_once()
self.session.getPerms.assert_not_called()
self.session.getAllPerms.assert_not_called()
@ -100,15 +101,16 @@ livecd
long-permission-appliance
repo
"""
perms = [p['name'] for p in self.all_perms[::1]]
self.session.getUserPerms.return_value = perms
perms = {p['name']: [None] for p in self.all_perms[::1]}
self.session.getUserPermsInheritance.return_value = perms
self.session.getUser.return_value = self.userinfo
handle_list_permissions(self.options, self.session, ['--user', self.user])
self.assert_console_message(stdout, expected)
self.activate_session_mock.assert_called_once()
self.session.getUser.assert_called_once()
self.session.getUserPerms.assert_called()
self.session.getUserPerms.assert_not_called()
self.session.getUserPermsInheritance.assert_called_once_with(self.userinfo['id'])
self.session.getPerms.assert_not_called()
self.session.getAllPerms.assert_not_called()
@ -118,15 +120,16 @@ repo
expected = """build
repo
"""
perms = [p['name'] for p in self.all_perms[1:3]]
perms = {p['name']: [None] for p in self.all_perms[1:3]}
self.session.getLoggedInUser.return_value = {'id': 1, 'name': 'user'}
self.session.getUserPerms.return_value = perms
self.session.getUserPermsInheritance.return_value = perms
handle_list_permissions(self.options, self.session, ['--mine'])
self.assert_console_message(stdout, expected)
self.activate_session_mock.assert_called_once()
self.session.getUser.assert_called_once()
self.session.getUserPerms.assert_called()
self.session.getUserPerms.assert_not_called()
self.session.getUserPermsInheritance.assert_called_once()
self.session.getPerms.assert_not_called()
self.session.getAllPerms.assert_not_called()

View file

@ -3,6 +3,7 @@ import unittest
import koji
from .utils import DBQueryTestCase
import kojihub
from kojihub.auth import get_user_perms
class TestGetUserPerms(unittest.TestCase):
@ -22,7 +23,7 @@ class TestGetUserPerms(unittest.TestCase):
def test_normal(self):
self.get_user.return_value = {'id': 123, 'name': 'testuser'}
kojihub.RootExports().getUserPerms(123)
self.get_user_perms.assert_called_once_with(123, with_groups=True)
self.get_user_perms.assert_called_once_with(123, with_groups=True, inheritance_data=False)
class TestGetUserPermsInheritance(DBQueryTestCase):
@ -41,18 +42,14 @@ class TestGetUserPermsInheritance(DBQueryTestCase):
self.get_user_perms.assert_not_called()
def test_normal(self):
self.get_user.return_value = {'id': 123, 'name': 'testuser'}
self.get_user_perms.return_value = ['test1', 'test2']
self.qp_execute_return_value = [
{'permission': 'test2', 'group': 'group1'},
{'permission': 'test3', 'group': 'group1'},
{'permission': 'test3', 'group': 'group2'},
]
result = kojihub.RootExports().getUserPermsInheritance(123)
self.assertEqual(result, {
data = {
'test1': [None],
'test2': [None, 'group1'],
'test3': ['group1', 'group2'],
})
}
self.get_user.return_value = {'id': 123, 'name': 'testuser'}
self.get_user_perms.return_value = data
result = kojihub.RootExports().getUserPermsInheritance(123)
self.assertEqual(result, data)
self.get_user.assert_called_once_with(123, strict=True)
self.get_user_perms.assert_called_once_with(123, with_groups=False)
self.get_user_perms.assert_called_once_with(123, inheritance_data=True)

View file

@ -725,16 +725,31 @@ class TestAuthSession(unittest.TestCase):
'user_groups.active IS TRUE',
'user_perms.active IS TRUE',
'user_groups.user_id=%(user_id)s']))
self.assertEqual(query.columns, ['name'])
self.assertEqual(query.columns, ['permissions.name'])
def test_get_user_perms_inherited(self):
self.query_execute.side_effect = [
[{'id': 1, 'name': 'perm1'}, {'id': 2, 'name': 'perm2'}],
[{'id': 3, 'name': 'perm3'}]
[{'name': 'perm3'}]
]
result = kojihub.auth.get_user_perms(1)
self.assertEqual(set(result), {'perm1', 'perm2', 'perm3'})
def test_get_user_perms_inherited_data(self):
self.query_execute.side_effect = [
[{'id': 1, 'name': 'perm1'}, {'id': 2, 'name': 'perm2'}],
[{'name': 'perm3', 'group': 'group_a'},
{'name': 'perm4', 'group': 'group_b'},
{'name': 'perm4', 'group': 'group_c'}]
]
result = kojihub.auth.get_user_perms(1, inheritance_data=True)
self.assertEqual(result, {
'perm1': [None],
'perm2': [None],
'perm3': ['group_a'],
'perm4': ['group_b', 'group_c'],
})
def test_logout_logged_not_owner(self):
s, _ = self.get_session()