koji-gc: Allow admins to force untagging builds.

* Add check for needed permsission and skip the tag if the user doesn't
  have the needed permission.

Fixes: https://pagure.io/koji/issue/2189
This commit is contained in:
Joe Talbott 2021-05-24 16:23:25 -04:00 committed by Tomas Kopecek
parent 3f0743a5e6
commit e68166b29d

View file

@ -233,6 +233,26 @@ def get_options():
return options, args
def check_perms(required, granted):
"""Check that the 'granted' permissions are sufficient with regard to the
'required' permissions.
:param required: a permission name.
:param granted: a list of granted permissions.
:returns: True if required permissions are met and False if not.
"""
# Nothing required
if required is None or not required:
return True
# Nothing granted
elif granted is None or not granted:
return False
elif required in granted:
return True
return False
def check_tag(name):
"""Check tag name against options and determine if we should process it
@ -860,8 +880,15 @@ def handle_prune():
raise Exception("Invalid action: %s" % action)
if options.debug:
pprint.pprint(policies.ruleset)
# get user info
user_perms = session.getPerms() #JOE
user = session.getLoggedInUser() #JOE
username = user['name']
# get tags
tags = session.listTags(perms=False, queryOpts={'order': 'name'})
tags = session.listTags(perms=True, queryOpts={'order': 'name'})
is_admin = session.hasPerm('admin')
untagged = {}
build_ids = {}
for taginfo in tags:
@ -887,6 +914,27 @@ def handle_prune():
if options.debug:
print("skipping locked tag: %s" % tagname)
continue
perm = taginfo['perm']
# Fail early if required permissions are missing.
has_tag_specific_perm = check_perms(perm, user_perms)
has_global_tag_perm = check_perms('tag', user_perms)
if (
not has_tag_specific_perm and
not has_global_tag_perm and
not is_admin
):
required_perms = '"{}"{}'.format(
perm if perm is not None else 'tag',
' or "admin"' if not is_admin and perm != "admin" else '')
print("Skipping tag "
"'{}' which requires {} but not found for user '{}'".format(
taginfo['name'],
required_perms,
username))
continue
if options.debug:
print("Pruning tag: %s" % tagname)
# get builds
@ -946,7 +994,7 @@ def handle_prune():
print("Untagging build %s from %s" % (nvr, tagname))
try:
session.untagBuildBypass(taginfo['id'], entry['build_id'],
force=bypass)
force=bypass or is_admin)
untagged.setdefault(nvr, {})[tagname] = 1
except (xmlrpc.client.Fault, koji.GenericError) as e:
print("Warning: untag operation failed: %s" % e)