Inherit group permissions

Related: https://pagure.io/koji/issue/3849
This commit is contained in:
Tomas Kopecek 2023-06-19 09:57:04 +02:00
parent d324f601c8
commit 255ec21d00
7 changed files with 63 additions and 21 deletions

View file

@ -2170,15 +2170,19 @@ def handle_list_permissions(goptions, session, args):
parser.error("This command takes no arguments")
activate_session(session, goptions)
perms = []
if options.mine:
options.user = session.getLoggedInUser()['id']
if options.user:
user = session.getUser(options.user)
if not user:
error("No such user: %s" % options.user)
for p in session.getUserPerms(user['id']):
perms.append({'name': p})
elif options.mine:
for p in session.getPerms():
perms.append({'name': p})
all = set(session.getUserPerms(user['id']))
own = set(session.getUserPerms(user['id'], with_groups=False))
for p in all:
p = {'name': p}
if p['name'] not in own:
p['description'] = '[inherited]'
perms.append(p)
else:
for p in session.getAllPerms():
perms.append({'name': p['name'], 'description': p['description']})

View file

@ -17,6 +17,8 @@ referenced in :doc:`hub policies <defining_hub_policies>`. Note, that you need
to first understand the policy mechanism as most permissions are reflected in
policy rules.
Any user can be part of user group. User group permissions are inherited by the user.
Permission management
=====================

View file

@ -772,7 +772,7 @@ def get_user_groups(user_id):
are the group names"""
t_group = koji.USERTYPES['GROUP']
query = QueryProcessor(tables=['user_groups'], columns=['group_id', 'name'],
clauses=['active = TRUE', 'users.usertype=%(t_group)i',
clauses=['active IS TRUE', 'users.usertype=%(t_group)i',
'user_id=%(user_id)i'],
joins=['users ON group_id = users.id'],
values={'t_group': t_group, 'user_id': user_id})
@ -782,13 +782,30 @@ def get_user_groups(user_id):
return groups
def get_user_perms(user_id):
def get_user_perms(user_id, with_groups=True):
# individual permissions
query = QueryProcessor(tables=['user_perms'], columns=['name'],
clauses=['active = TRUE', 'user_id=%(user_id)s'],
clauses=['active IS TRUE', 'user_id=%(user_id)s'],
joins=['permissions ON perm_id = permissions.id'],
values={'user_id': user_id})
result = query.execute()
return [r['name'] for r in result]
perms = {r['name'] for r in result}
# inherited group permissions
if with_groups:
query = QueryProcessor(tables=['user_groups'], columns=['name'],
clauses=[
'user_groups.active IS TRUE',
'user_perms.active IS TRUE',
'user_groups.user_id=%(user_id)s'],
joins=[
'LEFT JOIN user_perms ON '
'user_perms.user_id = user_groups.group_id',
'permissions ON perm_id = permissions.id'],
values={'user_id': user_id})
result = query.execute()
perms |= {r['name'] for r in result}
return list(perms)
def get_user_data(user_id):

View file

@ -13384,13 +13384,13 @@ class RootExports(object):
"""Get a list of the permissions granted to the currently logged-in user."""
return context.session.getPerms()
def getUserPerms(self, userID=None):
def getUserPerms(self, userID=None, with_groups=True):
"""Get a list of the permissions granted to the user with the given ID/name.
Options:
- userID: User ID or username. If no userID provided, current login user's
permissions will be listed."""
user_info = get_user(userID, strict=True)
return get_user_perms(user_info['id'])
return get_user_perms(user_info['id'], with_groups=with_groups)
def getAllPerms(self):
"""Get a list of all permissions in the system. Returns a list of maps. Each

View file

@ -85,7 +85,7 @@ class TestListPermissions(utils.CliTestCase):
self.activate_session_mock.assert_called_once()
self.session.getUser.assert_called_once()
self.session.getUserPerms.assert_called_once()
self.session.getUserPerms.assert_called()
self.session.getPerms.assert_not_called()
self.session.getAllPerms.assert_not_called()
@ -108,7 +108,7 @@ repo
self.activate_session_mock.assert_called_once()
self.session.getUser.assert_called_once()
self.session.getUserPerms.assert_called_once()
self.session.getUserPerms.assert_called()
self.session.getPerms.assert_not_called()
self.session.getAllPerms.assert_not_called()
@ -119,14 +119,15 @@ repo
repo
"""
perms = [p['name'] for p in self.all_perms[1:3]]
self.session.getPerms.return_value = perms
self.session.getLoggedInUser.return_value = {'id': 1, 'name': 'user'}
self.session.getUserPerms.return_value = perms
handle_list_permissions(self.options, self.session, ['--mine'])
self.assert_console_message(stdout, expected)
self.activate_session_mock.assert_called_once()
self.session.getUser.assert_not_called()
self.session.getUserPerms.assert_not_called()
self.session.getPerms.assert_called_once()
self.session.getUser.assert_called_once()
self.session.getUserPerms.assert_called()
self.session.getPerms.assert_not_called()
self.session.getAllPerms.assert_not_called()
@mock.patch('sys.stdout', new_callable=six.StringIO)

View file

@ -21,4 +21,4 @@ class TestGetUserPerms(unittest.TestCase):
def test_normal(self):
self.get_user.return_value = {'id': 123, 'name': 'testuser'}
kojihub.RootExports().getUserPerms(123)
self.get_user_perms.assert_called_once_with(123)
self.get_user_perms.assert_called_once_with(123, with_groups=True)

View file

@ -703,19 +703,37 @@ class TestAuthSession(unittest.TestCase):
query = self.queries[0]
self.assertEqual(query.tables, ['user_groups'])
self.assertEqual(query.joins, ['users ON group_id = users.id'])
self.assertEqual(query.clauses, ['active = TRUE', 'user_id=%(user_id)i',
self.assertEqual(query.clauses, ['active IS TRUE', 'user_id=%(user_id)i',
'users.usertype=%(t_group)i'])
self.assertEqual(query.columns, ['group_id', 'name'])
def test_get_user_perms(self):
"""auth.get_user_perms"""
kojihub.auth.get_user_perms(1)
self.assertEqual(len(self.queries), 1)
self.assertEqual(len(self.queries), 2)
query = self.queries[0]
self.assertEqual(query.tables, ['user_perms'])
self.assertEqual(query.joins, ['permissions ON perm_id = permissions.id'])
self.assertEqual(query.clauses, ['active = TRUE', 'user_id=%(user_id)s'])
self.assertEqual(query.clauses, ['active IS TRUE', 'user_id=%(user_id)s'])
self.assertEqual(query.columns, ['name'])
query = self.queries[1]
self.assertEqual(query.tables, ['user_groups'])
self.assertEqual(query.joins, [
'LEFT JOIN user_perms ON user_perms.user_id = user_groups.group_id',
'permissions ON perm_id = permissions.id'])
self.assertEqual(sorted(query.clauses), sorted([
'user_groups.active IS TRUE',
'user_perms.active IS TRUE',
'user_groups.user_id=%(user_id)s']))
self.assertEqual(query.columns, ['name'])
def test_get_user_perms_inherited(self):
self.query_execute.side_effect = [
[{'id': 1, 'name': 'perm1'}, {'id': 2, 'name': 'perm2'}],
[{'id': 3, 'name': 'perm3'}]
]
result = kojihub.auth.get_user_perms(1)
self.assertEqual(set(result), {'perm1', 'perm2', 'perm3'})
def test_logout_logged_not_owner(self):
s, _ = self.get_session()