move admin force usage to assert_policy

Fixes: https://pagure.io/koji/issue/1930
This commit is contained in:
Tomas Kopecek 2020-03-24 16:06:55 +01:00
parent ee615ed0e7
commit f55c9e4d25
2 changed files with 39 additions and 71 deletions

View file

@ -991,13 +991,7 @@ def _direct_pkglist_add(taginfo, pkginfo, owner, block, extra_arches, force,
if policy:
context.session.assertLogin()
policy_data = {'tag': tag_id, 'action': action, 'package': pkginfo, 'force': force}
# don't check policy for admins using force
if not (force and context.session.hasPerm('admin')):
assert_policy('package_list', policy_data)
else:
pkg_name = pkg and pkg['name'] or pkginfo
logger.info("Package list add %s/%s policy overriden by %s" % (
tag['name'], pkg_name, context.session.user_data['name']))
assert_policy('package_list', policy_data, force=force)
if not pkg:
pkg = lookup_package(pkginfo, create=True)
# validate arches before running callbacks
@ -1076,11 +1070,7 @@ def _direct_pkglist_remove(taginfo, pkginfo, force=False, policy=False):
context.session.assertLogin()
policy_data = {'tag': tag['id'], 'action': 'remove', 'package': pkg['id'], 'force': force}
# don't check policy for admins using force
if not (force and context.session.hasPerm('admin')):
assert_policy('package_list', policy_data)
else:
logger.info("Package list %s/%s remove policy overriden by %s" % (
tag['name'], pkg['name'], context.session.user_data['name']))
assert_policy('package_list', policy_data, force=force)
user = get_user(context.session.user_id)
koji.plugin.run_callbacks(
@ -1112,11 +1102,7 @@ def pkglist_unblock(taginfo, pkginfo, force=False):
context.session.assertLogin()
policy_data = {'tag': tag['id'], 'action': 'unblock', 'package': pkg['id'], 'force': force}
# don't check policy for admins using force
if not (force and context.session.hasPerm('admin')):
assert_policy('package_list', policy_data)
else:
logger.info("Package list %s/%s unblock policy overriden by %s" % (
tag['name'], pkg['name'], context.session.user_data['name']))
assert_policy('package_list', policy_data, force=force)
user = get_user(context.session.user_id)
koji.plugin.run_callbacks(
'prePackageListChange', action='unblock', tag=tag, package=pkg, user=user)
@ -9724,7 +9710,7 @@ class PolicyTest(koji.policy.BaseSimpleTest):
return result.lower() in ('yes', 'true', 'allow')
def check_policy(name, data, default='deny', strict=False):
def check_policy(name, data, default='deny', strict=False, force=False):
"""Check data against the named policy
This assumes the policy actions consist of:
@ -9735,6 +9721,7 @@ def check_policy(name, data, default='deny', strict=False):
access: True if the policy result is allow, false otherwise
reason: reason for the access
If strict is True, will raise ActionNotAllowed if the action is not 'allow'
If force is True, policy will pass, but action will be logged
"""
ruleset = context.policy.get(name)
if not ruleset:
@ -9765,6 +9752,9 @@ def check_policy(name, data, default='deny', strict=False):
if result != 'deny':
reason = 'error in policy'
logger.error("Invalid action in policy %s, rule: %s", name, lastrule)
if force and context.session.assertPerm('admin'):
logger.info("Policy %s overriden by force: %s" % (name, context.session.user_data["name"]))
return True, "overriden by force"
if not strict:
return False, reason
err_str = "policy violation (%s)" % name
@ -9775,7 +9765,7 @@ def check_policy(name, data, default='deny', strict=False):
raise koji.ActionNotAllowed(err_str)
def assert_policy(name, data, default='deny'):
def assert_policy(name, data, default='deny', force=False):
"""Enforce the named policy
This assumes the policy actions consist of:
@ -9783,7 +9773,7 @@ def assert_policy(name, data, default='deny'):
deny
Raises ActionNotAllowed if policy result is not allow
"""
check_policy(name, data, default=default, strict=True)
check_policy(name, data, default=default, strict=True, force=force)
def rpmdiff(basepath, rpmlist, hashes):
@ -10625,14 +10615,11 @@ class RootExports(object):
Retagging is not allowed unless force is true. (retagging changes the order
of entries will affect which build is the latest)
"""
if force:
context.session.assertPerm('admin')
else:
context.session.assertPerm('tag')
tag_id = get_tag(tag, strict=True)['id']
build_id = get_build(build, strict=True)['id']
policy_data = {'tag': tag_id, 'build': build_id, 'fromtag': None, 'operation': 'tag'}
assert_policy('tag', policy_data)
context.session.assertPerm('tag')
tag_id = get_tag(tag, strict=True)['id']
build_id = get_build(build, strict=True)['id']
policy_data = {'tag': tag_id, 'build': build_id, 'fromtag': None, 'operation': 'tag'}
assert_policy('tag', policy_data, force=force)
_tag_build(tag, build, force=force)
if notify:
tag_notification(True, tag, None, build, context.session.user_id)
@ -10695,12 +10682,8 @@ class RootExports(object):
else:
policy_data['operation'] = 'move'
# don't check policy for admins using force
if not (force and context.session.hasPerm('admin')):
assert_policy('tag', policy_data)
# XXX - we're running this check twice, here and in host.tagBuild (called by the task)
else:
logger.info("Tag policy %s/%s overriden by %s" % (
tag['name'], build['nvr'], context.session.user_data['name']))
# XXX - we're running this check twice, here and in host.tagBuild (called by the task)
assert_policy('tag', policy_data, force=force)
# spawn the tagging task
return make_task('tagBuild', [tag_id, build_id, force, fromtag_id], priority=10)
@ -10718,11 +10701,7 @@ class RootExports(object):
policy_data['operation'] = 'untag'
try:
# don't check policy for admins using force
if not (force and context.session.hasPerm('admin')):
assert_policy('tag', policy_data)
else:
logger.info("Untag policy %s/%s overriden by %s" % (
tag, build, context.session.user_data['name']))
assert_policy('tag', policy_data, force=force)
_untag_build(tag, build, strict=strict, force=force)
tag_notification(True, None, tag, build, user_id)
except Exception:
@ -10737,14 +10716,11 @@ class RootExports(object):
Unlike tagBuild, this does not create a task
No return value"""
if force:
context.session.assertPerm('admin')
else:
context.session.assertPerm('tag')
tag_id = get_tag(tag, strict=True)['id']
build_id = get_build(build, strict=True)['id']
policy_data = {'tag': None, 'build': build_id, 'fromtag': tag_id, 'operation': 'untag'}
assert_policy('tag', policy_data)
context.session.assertPerm('tag')
tag_id = get_tag(tag, strict=True)['id']
build_id = get_build(build, strict=True)['id']
policy_data = {'tag': None, 'build': build_id, 'fromtag': tag_id, 'operation': 'untag'}
assert_policy('tag', policy_data, force=force)
_untag_build(tag, build, strict=strict, force=force)
if notify:
tag_notification(True, None, tag, build, context.session.user_id)
@ -10794,15 +10770,11 @@ class RootExports(object):
# policy check
policy_data = {'tag': tag2, 'fromtag': tag1, 'operation': 'move'}
# don't check policy for admins using force
if not (force and context.session.hasPerm('admin')):
for build in build_list:
policy_data['build'] = build['id']
assert_policy('tag', policy_data)
# XXX - we're running this check twice, here and in host.tagBuild (called by the
# task)
else:
logger.info("Tag move policy %s/%s overriden by %s" % (
tag2, package, context.session.user_data['name']))
for build in build_list:
policy_data['build'] = build['id']
assert_policy('tag', policy_data)
# XXX - we're running this check twice, here and in host.tagBuild (called by the
# task)
wait_on = []
tasklist = []
@ -13997,12 +13969,7 @@ class HostExports(object):
else:
policy_data['operation'] = 'move'
# don't check policy for admins using force
perms = koji.auth.get_user_perms(user_id)
if not force or 'admin' not in perms:
assert_policy('tag', policy_data)
if force and 'admin' in perms:
logger.info("Tag build %s/%s policy overriden by %s" % (
tag, build['nvr'], context.session.user_data['name']))
assert_policy('tag', policy_data, force=force)
# package list check
pkgs = readPackageList(tagID=tag_id, pkgID=pkg_id, inherit=True)
pkg_error = None
@ -14013,7 +13980,7 @@ class HostExports(object):
if pkg_error:
if force and context.session.hasPerm('admin'):
pkglist_add(tag_id, pkg_id, force=True, block=False)
logger.info("Package added %s/%s by %s" % (
logger.info("Package added %s/%s by %s by force" % (
tag, build['nvr'], context.session.user_data['name']))
else:
raise koji.TagError(pkg_error)

View file

@ -73,7 +73,7 @@ class TestPkglistBlock(unittest.TestCase):
get_tag.assert_called_once_with('tag', strict=True)
lookup_package.assert_called_once_with('pkg', strict=True)
assert_policy.assert_called_once_with('package_list', {'tag': tag['id'],
'action': 'unblock', 'package': pkg['id'], 'force': False})
'action': 'unblock', 'package': pkg['id'], 'force': False}, force=False)
self.assertEqual(readPackageList.call_count, 2)
readPackageList.assert_has_calls([
mock.call(tag['id'], pkgID=pkg['id'], inherit=True),
@ -109,7 +109,7 @@ class TestPkglistBlock(unittest.TestCase):
get_tag.assert_called_once_with('tag', strict=True)
lookup_package.assert_called_once_with('pkg', strict=True)
assert_policy.assert_called_once_with('package_list', {'tag': tag_id,
'action': 'unblock', 'package': pkg_id, 'force': False})
'action': 'unblock', 'package': pkg_id, 'force': False}, force=False)
readPackageList.assert_called_once_with(tag_id, pkgID=pkg_id, inherit=True)
_pkglist_add.assert_called_once_with(tag_id, pkg_id, owner_id, False, '')
_pkglist_remove.assert_not_called()
@ -133,7 +133,7 @@ class TestPkglistBlock(unittest.TestCase):
get_tag.assert_called_once_with('tag', strict=True)
lookup_package.assert_called_once_with('pkg', strict=True)
assert_policy.assert_called_once_with('package_list', {'tag': tag_id,
'action': 'unblock', 'package': pkg_id, 'force': False})
'action': 'unblock', 'package': pkg_id, 'force': False}, force=False)
readPackageList.assert_called_once_with(tag_id, pkgID=pkg_id, inherit=True)
_pkglist_add.assert_not_called()
_pkglist_remove.assert_not_called()
@ -162,7 +162,7 @@ class TestPkglistBlock(unittest.TestCase):
get_tag.assert_called_once_with('tag', strict=True)
lookup_package.assert_called_once_with('pkg', strict=True)
assert_policy.assert_called_once_with('package_list', {'tag': tag_id,
'action': 'unblock', 'package': pkg_id, 'force': False})
'action': 'unblock', 'package': pkg_id, 'force': False}, force=False)
readPackageList.assert_called_once_with(tag_id, pkgID=pkg_id, inherit=True)
_pkglist_add.assert_not_called()
_pkglist_remove.assert_not_called()
@ -224,7 +224,7 @@ class TestPkglistBlock(unittest.TestCase):
mock.call(112233),
])
assert_policy.assert_called_once_with('package_list', {'tag': tag['id'],
'action': 'add', 'package': pkg['name'], 'force': False})
'action': 'add', 'package': pkg['name'], 'force': False}, force=False)
self.assertEqual(self.run_callbacks.call_count, 2)
self.run_callbacks.assert_has_calls([
mock.call('prePackageListChange', action='add', tag=tag,
@ -341,7 +341,7 @@ class TestPkglistBlock(unittest.TestCase):
mock.call(112233),
])
assert_policy.assert_called_once_with('package_list', {'tag': tag['id'],
'action': 'add', 'package': pkg['name'], 'force': False})
'action': 'add', 'package': pkg['name'], 'force': False}, force=False)
self.assertEqual(self.run_callbacks.call_count, 2)
self.run_callbacks.assert_has_calls([
mock.call('prePackageListChange', action='add', tag=tag,
@ -398,7 +398,7 @@ class TestPkglistBlock(unittest.TestCase):
mock.call(112233),
])
assert_policy.assert_called_once_with('package_list', {'tag': tag['id'],
'action': 'add', 'package': pkg['name'], 'force': False})
'action': 'add', 'package': pkg['name'], 'force': False}, force=False)
self.run_callbacks.assert_called_once_with(
'prePackageListChange', action='add', tag=tag,
package=pkg, owner=user['id'], block=block,
@ -447,7 +447,8 @@ class TestPkglistBlock(unittest.TestCase):
mock.call(112233),
])
# force + admin
assert_policy.assert_not_called()
assert_policy.assert_called_once_with('package_list',
{'tag': 1, 'action': 'add', 'package': 'pkg', 'force': True}, force=True)
self.assertEqual(self.run_callbacks.call_count, 2)
self.run_callbacks.assert_has_calls([