PR#3308: server-side clonetag

Merges #3308
https://pagure.io/koji/pull-request/3308

Fixes: #3307
https://pagure.io/koji/issue/3307
clone-tag rewrite
This commit is contained in:
Tester 2022-06-27 14:58:38 +02:00
commit cd6b4bebb5
6 changed files with 806 additions and 696 deletions

View file

@ -15,7 +15,6 @@ import time
import traceback
from datetime import datetime
from dateutil.tz import tzutc
from collections import OrderedDict, defaultdict
from optparse import SUPPRESS_HELP, OptionParser
import six
@ -3678,17 +3677,6 @@ def anon_handle_hostinfo(goptions, session, args):
error()
def _multicall_with_check(session, batch_size):
"""Helper for running multicall inside handle_clone_tag"""
err = False
for r in session.multiCall(batch=batch_size):
if isinstance(r, dict):
warn(r['faultString'])
err = True
if err:
error('Errors during the last call. Target tag could be inconsistent.')
def handle_clone_tag(goptions, session, args):
"[admin] Duplicate the contents of one tag onto another tag"
usage = "usage: %prog clone-tag [options] <src-tag> <dst-tag>"
@ -3696,10 +3684,12 @@ def handle_clone_tag(goptions, session, args):
parser = OptionParser(usage=get_usage_str(usage))
parser.add_option('--config', action='store_true',
help="Copy config from the source to the dest tag")
parser.add_option('--groups', action='store_true', help="Copy group information")
parser.add_option('--groups', action='store_true',
help="Copy group information")
parser.add_option('--pkgs', action='store_true',
help="Copy package list from the source to the dest tag")
parser.add_option('--builds', action='store_true', help="Tag builds into the dest tag")
parser.add_option('--builds', action='store_true',
help="Tag builds into the dest tag")
parser.add_option('--all', action='store_true',
help="The same as --config --groups --pkgs --builds")
parser.add_option('--latest-only', action='store_true',
@ -3708,16 +3698,20 @@ def handle_clone_tag(goptions, session, args):
help="Include all builds inherited into the source tag into the dest tag")
parser.add_option('--ts', type='int', metavar="TIMESTAMP",
help='Clone tag at last event before specific timestamp')
parser.add_option('--no-delete', action='store_false', dest="delete", default=True,
parser.add_option('--no-delete', action='store_false', dest="delete",
default=True,
help="Don't delete any existing content in dest tag.")
parser.add_option('--event', type='int', help='Clone tag at a specific event')
parser.add_option('--repo', type='int', help='Clone tag at a specific repo event')
parser.add_option("-v", "--verbose", action="store_true", help="show changes")
parser.add_option('--event', type='int',
help='Clone tag at a specific event')
parser.add_option('--repo', type='int',
help='Clone tag at a specific repo event')
parser.add_option("-v", "--verbose", action="store_true",
help=SUPPRESS_HELP)
parser.add_option("--notify", action="store_true", default=False,
help='Send tagging/untagging notifications')
parser.add_option("-f", "--force", action="store_true",
help="override tag locks if necessary")
parser.add_option("-n", "--test", action="store_true", help="test mode")
parser.add_option("-n", "--test", action="store_true", help=SUPPRESS_HELP)
parser.add_option("--batch", type='int', default=100, metavar='SIZE',
help="batch size of multicalls [0 to disable, default: %default]")
(options, args) = parser.parse_args(args)
@ -3743,6 +3737,9 @@ def handle_clone_tag(goptions, session, args):
event['timestr'] = time.asctime(time.localtime(event['ts']))
print("Cloning at event %(id)i (%(timestr)s)" % event)
if options.builds and not options.pkgs:
parser.error("--builds can't be used without also specifying --pkgs")
# store tags.
try:
srctag = session.getBuildConfig(args[0], event=event.get('id'))
@ -3756,456 +3753,30 @@ def handle_clone_tag(goptions, session, args):
parser.error("Error: You are attempting to clone from or to a tag which is locked.\n"
"Please use --force if this is what you really want to do.")
# init debug lists.
chgpkglist = []
chgbldlist = []
chggrplist = []
# case of brand new dst-tag.
if not dsttag:
# create a new tag, copy srctag header.
if not options.test:
if options.config:
session.createTag(args[1], parent=None, arches=srctag['arches'],
perm=srctag['perm_id'],
locked=srctag['locked'],
maven_support=srctag['maven_support'],
maven_include_all=srctag['maven_include_all'],
extra=srctag['extra'])
else:
session.createTag(args[1], parent=None)
# store the new tag, need its assigned id.
newtag = session.getTag(args[1], strict=True)
# get pkglist of src-tag, including inherited packages.
if options.pkgs:
srcpkgs = session.listPackages(tagID=srctag['id'],
inherited=True,
event=event.get('id'))
srcpkgs.sort(key=lambda x: x['package_name'])
if not options.test:
session.multicall = True
for pkgs in srcpkgs:
# for each package add one entry in the new tag.
chgpkglist.append(('[new]',
pkgs['package_name'],
pkgs['blocked'],
pkgs['owner_name'],
pkgs['tag_name']))
if not options.test:
# add packages.
session.packageListAdd(newtag['name'],
pkgs['package_name'],
owner=pkgs['owner_name'],
block=pkgs['blocked'],
extra_arches=pkgs['extra_arches'])
if not options.test:
_multicall_with_check(session, options.batch)
if options.builds:
# get --all latest builds from src tag
builds = reversed(session.listTagged(srctag['id'],
event=event.get('id'),
inherit=options.inherit_builds,
latest=options.latest_only))
if not options.test:
session.multicall = True
for build in builds:
# add missing 'name' field.
build['name'] = build['package_name']
chgbldlist.append(('[new]',
build['package_name'],
build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'],
build['tag_name']))
# copy latest builds into new tag
if not options.test:
session.tagBuildBypass(newtag['name'],
build,
force=options.force,
notify=options.notify)
if not options.test:
_multicall_with_check(session, options.batch)
if options.groups:
# Copy the group data
srcgroups = session.getTagGroups(srctag['name'],
event=event.get('id'))
if not options.test:
session.multicall = True
for group in srcgroups:
if not options.test:
session.groupListAdd(newtag['name'], group['name'])
for pkg in group['packagelist']:
if not options.test:
session.groupPackageListAdd(newtag['name'],
group['name'],
pkg['package'],
block=pkg['blocked'])
chggrplist.append(('[new]', pkg['package'], group['name']))
if not options.test:
_multicall_with_check(session, options.batch)
# case of existing dst-tag.
if options.test:
parser.error("server-side operation, test output is no longer available")
if dsttag:
if options.config and not options.test:
if dsttag['extra']:
session.editTag2(dsttag['id'], remove_extra=list(dsttag['extra'].keys()))
session.editTag2(dsttag['id'], parent=None, arches=srctag['arches'],
perm=srctag['perm_id'],
locked=srctag['locked'],
maven_support=srctag['maven_support'],
maven_include_all=srctag['maven_include_all'],
extra=srctag['extra'])
dsttag = session.getTag(dsttag['id'], strict=True)
# get fresh list of packages & builds into maps.
srcpkgs = {}
dstpkgs = {}
srcbldsbypkg = defaultdict(OrderedDict)
dstbldsbypkg = defaultdict(OrderedDict)
srcgroups = OrderedDict()
dstgroups = OrderedDict()
# we use OrderedDict so that these indexes preserve the order given to us
if options.pkgs:
for pkg in session.listPackages(tagID=srctag['id'],
inherited=True,
event=event.get('id')):
srcpkgs[pkg['package_name']] = pkg
for pkg in session.listPackages(tagID=dsttag['id'],
inherited=True):
dstpkgs[pkg['package_name']] = pkg
if options.builds:
# listTagged orders builds latest-first
# so reversing that gives us oldest-first
for build in reversed(session.listTagged(srctag['id'],
event=event.get('id'),
inherit=options.inherit_builds,
latest=options.latest_only)):
srcbldsbypkg[build['package_name']][build['nvr']] = build
# get builds in dsttag without inheritance.
# latest=False to get all builds even when latest_only = True,
# so that only the *latest* build per tag will live in.
for build in reversed(session.listTagged(dsttag['id'],
inherit=False,
latest=False)):
dstbldsbypkg[build['package_name']][build['nvr']] = build
if options.groups:
for group in session.getTagGroups(srctag['name'],
event=event.get('id')):
srcgroups[group['name']] = group
for group in session.getTagGroups(dsttag['name']):
dstgroups[group['name']] = group
# construct to-do lists.
paddlist = [] # list containing new packages to be added from src tag
for (package_name, pkg) in six.iteritems(srcpkgs):
if package_name not in dstpkgs:
paddlist.append(pkg)
paddlist.sort(key=lambda x: x['package_name'])
pdellist = [] # list containing packages no more present in dst tag
for (package_name, pkg) in six.iteritems(dstpkgs):
if package_name not in srcpkgs:
pdellist.append(pkg)
pdellist.sort(key=lambda x: x['package_name'])
baddlist = [] # list containing new builds to be added from src tag
bdellist = [] # list containing new builds to be removed from dst tag
if options.delete:
# remove builds for packages that are absent from src tag
for (pkg, dstblds) in six.iteritems(dstbldsbypkg):
if pkg not in srcbldsbypkg:
bdellist.extend(dstblds.values())
# add and/or remove builds from dst to match src contents and order
for (pkg, srcblds) in six.iteritems(srcbldsbypkg):
dstblds = dstbldsbypkg[pkg]
ablds = []
dblds = []
# firstly, deal with extra builds in dst
removed_nvrs = set(dstblds.keys()) - set(srcblds.keys())
bld_order = srcblds.copy()
if options.delete:
# mark the extra builds for deletion
dnvrs = []
for (dstnvr, dstbld) in six.iteritems(dstblds):
if dstnvr in removed_nvrs:
dnvrs.append(dstnvr)
dblds.append(dstbld)
# we also remove them from dstblds now so that they do not
# interfere with the order comparison below
for dnvr in dnvrs:
del dstblds[dnvr]
else:
# in the no-delete case, the extra builds should be forced
# to last in the tag
bld_order = OrderedDict()
for (dstnvr, dstbld) in six.iteritems(dstblds):
if dstnvr in removed_nvrs:
bld_order[dstnvr] = dstbld
for (nvr, srcbld) in six.iteritems(srcblds):
bld_order[nvr] = srcbld
# secondly, add builds from src tag and adjust the order
for (nvr, srcbld) in six.iteritems(bld_order):
found = False
out_of_order = []
# note that dstblds is trimmed as we go, so we are only
# considering the tail corresponding to where we are at
# in the srcblds loop
for (dstnvr, dstbld) in six.iteritems(dstblds):
if nvr == dstnvr:
found = True
break
else:
out_of_order.append(dstnvr)
dblds.append(dstbld)
for dnvr in out_of_order:
del dstblds[dnvr]
# these will be re-added in the proper order later
if found:
# remove it for next pass so we stay aligned with outer
# loop
del dstblds[nvr]
else:
# missing from dst, so we need to add it
ablds.append(srcbld)
baddlist.extend(ablds)
bdellist.extend(dblds)
baddlist.sort(key=lambda x: x['package_name'])
bdellist.sort(key=lambda x: x['package_name'])
gaddlist = [] # list containing new groups to be added from src tag
for (grpname, group) in six.iteritems(srcgroups):
if grpname not in dstgroups:
gaddlist.append(group)
gdellist = [] # list containing groups to be removed from src tag
for (grpname, group) in six.iteritems(dstgroups):
if grpname not in srcgroups:
gdellist.append(group)
grpchanges = OrderedDict() # dict of changes to make in shared groups
for (grpname, group) in six.iteritems(srcgroups):
if grpname in dstgroups:
dstgroup = dstgroups[grpname]
grpchanges[grpname] = {'adds': [], 'dels': []}
# Store whether group is inherited or not
grpchanges[grpname]['inherited'] = False
if dstgroup['tag_id'] != dsttag['id']:
grpchanges[grpname]['inherited'] = True
srcgrppkglist = []
dstgrppkglist = []
for pkg in group['packagelist']:
srcgrppkglist.append(pkg['package'])
for pkg in dstgroups[grpname]['packagelist']:
dstgrppkglist.append(pkg['package'])
for pkg in srcgrppkglist:
if pkg not in dstgrppkglist:
grpchanges[grpname]['adds'].append(pkg)
for pkg in dstgrppkglist:
if pkg not in srcgrppkglist:
grpchanges[grpname]['dels'].append(pkg)
# ADD new packages.
if not options.test:
session.multicall = True
for pkg in paddlist:
chgpkglist.append(('[add]',
pkg['package_name'],
pkg['blocked'],
pkg['owner_name'],
pkg['tag_name']))
if not options.test:
session.packageListAdd(dsttag['name'],
pkg['package_name'],
owner=pkg['owner_name'],
block=pkg['blocked'],
extra_arches=pkg['extra_arches'])
if not options.test:
_multicall_with_check(session, options.batch)
# DEL builds. To keep the order we should untag builds at first
if not options.test:
session.multicall = True
for build in bdellist:
# don't delete an inherited build.
if build['tag_name'] == dsttag['name']:
# add missing 'name' field
build['name'] = build['package_name']
chgbldlist.append(('[del]',
build['package_name'],
build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'],
build['tag_name']))
# go on del builds from new tag.
if not options.test:
session.untagBuildBypass(dsttag['name'],
build,
force=options.force,
notify=options.notify)
if not options.test:
_multicall_with_check(session, options.batch)
# ADD builds.
if not options.test:
session.multicall = True
for build in baddlist:
# add missing 'name' field.
build['name'] = build['package_name']
chgbldlist.append(('[add]',
build['package_name'],
build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'],
build['tag_name']))
# copy latest builds into new tag.
if not options.test:
session.tagBuildBypass(dsttag['name'],
build,
force=options.force,
notify=options.notify)
if not options.test:
_multicall_with_check(session, options.batch)
# ADD groups.
if not options.test:
session.multicall = True
for group in gaddlist:
if not options.test:
session.groupListAdd(dsttag['name'],
group['name'],
force=options.force)
for pkg in group['packagelist']:
if not options.test:
session.groupPackageListAdd(dsttag['name'],
group['name'],
pkg['package'],
force=options.force)
chggrplist.append(('[new]', pkg['package'], group['name']))
if not options.test:
_multicall_with_check(session, options.batch)
# ADD group pkgs.
if not options.test:
session.multicall = True
for group in grpchanges:
for pkg in grpchanges[group]['adds']:
chggrplist.append(('[new]', pkg, group))
if not options.test:
session.groupPackageListAdd(dsttag['name'],
group,
pkg,
force=options.force)
if not options.test:
_multicall_with_check(session, options.batch)
if options.delete:
# DEL packages
ninhrtpdellist = []
inhrtpdellist = []
for pkg in pdellist:
if pkg['tag_name'] == dsttag['name']:
ninhrtpdellist.append(pkg)
else:
inhrtpdellist.append(pkg)
session.multicall = True
# delete only non-inherited packages.
for pkg in ninhrtpdellist:
# check if package have owned builds inside.
session.listTagged(dsttag['name'],
package=pkg['package_name'],
inherit=False)
bump_builds = session.multiCall(batch=options.batch)
if not options.test:
session.multicall = True
for pkg, [builds] in zip(ninhrtpdellist, bump_builds):
if isinstance(builds, dict):
error(builds['faultString'])
# remove all its builds first if there are any.
for build in builds:
# add missing 'name' field.
build['name'] = build['package_name']
chgbldlist.append(('[del]',
build['package_name'],
build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'],
build['tag_name']))
# so delete latest build(s) from new tag.
if not options.test:
session.untagBuildBypass(dsttag['name'],
build,
force=options.force,
notify=options.notify)
# now safe to remove package itself since we resolved its builds.
chgpkglist.append(('[del]',
pkg['package_name'],
pkg['blocked'],
pkg['owner_name'],
pkg['tag_name']))
if not options.test:
session.packageListRemove(dsttag['name'],
pkg['package_name'],
force=False)
# mark as blocked inherited packages.
for pkg in inhrtpdellist:
chgpkglist.append(('[blk]',
pkg['package_name'],
pkg['blocked'],
pkg['owner_name'],
pkg['tag_name']))
if not options.test:
session.packageListBlock(dsttag['name'], pkg['package_name'])
if not options.test:
_multicall_with_check(session, options.batch)
# DEL groups.
if not options.test:
session.multicall = True
for group in gdellist:
# Only delete a group that isn't inherited
if group['tag_id'] == dsttag['id']:
if not options.test:
session.groupListRemove(dsttag['name'],
group['name'],
force=options.force)
for pkg in group['packagelist']:
chggrplist.append(('[del]', pkg['package'], group['name']))
# mark as blocked inherited groups.
else:
if not options.test:
session.groupListBlock(dsttag['name'], group['name'])
for pkg in group['packagelist']:
chggrplist.append(('[blk]', pkg['package'], group['name']))
if not options.test:
_multicall_with_check(session, options.batch)
# DEL group pkgs.
if not options.test:
session.multicall = True
for group in grpchanges:
for pkg in grpchanges[group]['dels']:
# Only delete a group that isn't inherited
if not grpchanges[group]['inherited']:
chggrplist.append(('[del]', pkg, group))
if not options.test:
session.groupPackageListRemove(dsttag['name'],
group,
pkg,
force=options.force)
else:
chggrplist.append(('[blk]', pkg, group))
if not options.test:
session.groupPackageListBlock(dsttag['name'],
group,
pkg)
if not options.test:
_multicall_with_check(session, options.batch)
# print final list of actions.
if options.verbose:
pfmt = ' %-7s %-28s %-10s %-10s %-10s\n'
bfmt = ' %-7s %-28s %-40s %-10s %-10s %-10s\n'
gfmt = ' %-7s %-28s %-28s\n'
sys.stdout.write('\nList of changes:\n\n')
sys.stdout.write(pfmt % ('Action', 'Package', 'Blocked', 'Owner', 'From Tag'))
sys.stdout.write(pfmt % ('-' * 7, '-' * 28, '-' * 10, '-' * 10, '-' * 10))
for changes in chgpkglist:
sys.stdout.write(pfmt % changes)
sys.stdout.write('\n')
sys.stdout.write(bfmt %
('Action', 'From/To Package', 'Build(s)', 'State', 'Owner', 'From Tag'))
sys.stdout.write(bfmt % ('-' * 7, '-' * 28, '-' * 40, '-' * 10, '-' * 10, '-' * 10))
for changes in chgbldlist:
sys.stdout.write(bfmt % changes)
sys.stdout.write('\n')
sys.stdout.write(gfmt % ('Action', 'Package', 'Group'))
sys.stdout.write(gfmt % ('-' * 7, '-' * 28, '-' * 28))
for changes in chggrplist:
sys.stdout.write(gfmt % changes)
session.snapshotTagModify(srctag['id'], args[1],
config=options.config,
pkgs=options.pkgs,
builds=options.builds,
groups=options.groups,
latest_only=options.latest_only,
inherit_builds=options.inherit_builds,
remove=options.delete,
event=event.get('id'),
force=options.force)
else:
session.snapshotTag(srctag['id'], args[1],
config=options.config,
pkgs=options.pkgs,
builds=options.builds,
groups=options.groups,
latest_only=options.latest_only,
inherit_builds=options.inherit_builds,
event=event.get('id'),
force=options.force)
def handle_add_target(goptions, session, args):

View file

@ -48,6 +48,7 @@ import traceback
from urllib.parse import parse_qs
import xmlrpc.client
import zipfile
from collections import defaultdict, OrderedDict
import rpm
from psycopg2._psycopg import IntegrityError
@ -2051,7 +2052,7 @@ def grp_pkg_remove(taginfo, grpinfo, pkg_name):
def _grp_pkg_remove(taginfo, grpinfo, pkg_name):
"""grp_pkg_remove without permssion checks"""
"""grp_pkg_remove without permission checks"""
tag_id = get_tag_id(taginfo, strict=True)
grp_id = get_group_id(grpinfo, strict=True)
update = UpdateProcessor('group_package_listing', values=locals(),
@ -10560,6 +10561,14 @@ def importImageInternal(task_id, build_info, imgdata):
koji.plugin.run_callbacks('postImport', type='image', image=imgdata,
build=build_info, fullpath=fullpath)
def _delete_event_id():
"""Helper function to bump event"""
try:
del context.event_id
except AttributeError:
pass
#
# XMLRPC Methods
#
@ -11544,6 +11553,359 @@ class RootExports(object):
if notify:
tag_notification(True, None, tag, build, context.session.user_id)
def massTag(self, tag, builds):
"""
Substitute for tagBuildBypass - this call ignores every check, so special
'tag' permission is needed. It bypass all tag access checks and policies.
On error it will raise concrete exception
:param builds: list of build NVRs
:type builds: [str]
:returns: None
"""
context.session.assertPerm('tag')
tag = get_tag(tag, strict=True)
user = get_user(context.session.user_id, strict=True)
logger.debug("Tagging %d builds to %s on behalf of %s",
len(builds), tag['name'], user['name'])
start = time.time()
for build in builds:
binfo = get_build(build, strict=True)
_direct_tag_build(tag, binfo, user, force=True)
# ensure tagging order by updating event id
_delete_event_id()
length = time.time() - start
logger.debug("Tagged %d builds to %s in %.2f seconds", len(builds), tag['name'], length)
def snapshotTag(self, src, dst, config=True, pkgs=True, builds=True, groups=True,
latest_only=True, inherit_builds=True, event=None, force=False):
"""
Copy the tag and its current (or event) contents to new one. It doesn't copy inheritance.
Suitable for creating snapshots of tags. External repos are not linked.
Destination tag must not exist. For updating existing tags use snapshotTagModify
Calling user needs to have 'admin' or 'tag' permission.
:param [inst|str] src: source tag
:param [int|str] dst: destination tag
:param [bool] config: copy basic config (arches, permission, lock, maven_support,
maven_include_all, extra)
:param [bool] pkgs: copy package lists
:param [bool] builds: copy tagged builds
:param [bool] latest_only: copy only latest builds instead of all
:param [bool] inherit_builds: use inherited builds, not only explicitly tagged
:param [int] event: copy state of tag in given event id
:param [bool] force: use force for all underlying operations
:returns: None
"""
context.session.assertPerm('tag')
if builds and not pkgs:
raise koji.ParameterError("builds can't be used without pkgs in snapshotTag")
if get_tag(dst):
raise koji.GenericError("Target tag already exists")
src = get_tag(src, event=event, strict=True)
if src['locked'] and not force:
raise koji.GenericError("Source tag is locked, use force to copy")
if config:
dsttag = _create_tag(
dst,
parent=None, # should clone parent?
arches=src['arches'],
perm=src['perm_id'],
locked=src['locked'],
maven_support=src['maven_support'],
maven_include_all=src['maven_include_all'],
extra=src['extra'])
else:
dsttag = _create_tag(dst, parent=None)
# all update operations will reset event_id, so we've clear order of operations
_delete_event_id()
dst = get_tag(dsttag, strict=True)
logger.debug("Cloning %s to %s", src['name'], dst['name'])
# package lists
if pkgs:
logger.debug("Cloning package list to %s", dst['name'])
start = time.time()
for pkg in self.listPackages(tagID=src['id'], event=event, inherited=True):
_direct_pkglist_add(
taginfo=dst['id'],
pkginfo=pkg['package_name'],
owner=pkg['owner_name'],
block=pkg['blocked'],
extra_arches=pkg['extra_arches'],
force=True,
update=False)
_delete_event_id()
length = time.time() - start
logger.debug("Cloned packages to %s in %.2f seconds", dst['name'], length)
# builds
if builds:
builds = readTaggedBuilds(tag=src['id'], inherit=inherit_builds,
event=event, latest=latest_only)
self.massTag(dst['id'], list(reversed(builds)))
# groups
if groups:
logger.debug("Cloning groups to %s", dst['name'])
start = time.time()
for group in readTagGroups(tag=src['id'], event=event):
_grplist_add(dst['id'], group['name'], block=group['blocked'], force=True)
_delete_event_id()
for pkg in group['packagelist']:
_grp_pkg_add(dst['id'], group['name'], pkg['package'],
block=pkg['blocked'], force=True)
_delete_event_id()
for group_req in group['grouplist']:
_grp_req_add(dst['id'], group['name'], group_req['name'],
block=group_req['blocked'], force=True)
_delete_event_id()
length = time.time() - start
logger.debug("Cloned groups to %s in %.2f seconds", dst['name'], length)
_delete_event_id()
def snapshotTagModify(self, src, dst, config=True, pkgs=True, builds=True, groups=True,
latest_only=True, inherit_builds=True, event=None, force=False,
remove=False):
"""
Copy the tag and its current (or event) contents to existing one. It doesn't copy
inheritance. Suitable for creating snapshots of tags. External repos are not linked.
Destination tag must not exist. For creating new snapshots use snapshotTag
Calling user needs to have 'admin' or 'tag' permission.
:param [int|str] src: source tag
:param [int|str] dst: destination tag
:param bool config: copy basic config (arches, permission, lock, maven_support,
maven_include_all, extra)
:param bool pkgs: copy package lists
:param bool builds: copy tagged builds
:param bool latest_only: copy only latest builds instead of all
:param bool inherit_builds: use inherited builds, not only explicitly tagged
:param int event: copy state of tag in given event id
:param bool force: use force for all underlying operations
:param remove: remove builds/groups/
:returns: None
"""
context.session.assertPerm('tag')
if builds and not pkgs:
# It is necessarily not true (current pkgs can already cover all new builds),
# but it seems to be more consistent to require it anyway.
raise koji.ParameterError("builds can't be used without pkgs in snapshotTag")
src = get_tag(src, event=event, strict=True)
dst = get_tag(dst, strict=True)
if (src['locked'] or dst['locked']) and not force:
raise koji.GenericError("Source or destination tag is locked, use force to copy")
user = get_user(context.session.user_id, strict=True)
if config:
if dst['extra']:
remove_extra = list(set(dst['extra'].keys()) - set(src['extra'].keys()))
else:
remove_extra = []
edit_tag(dst['id'], parent=None, arches=src['arches'],
perm=src['perm_id'], locked=src['locked'],
maven_support=src['maven_support'],
maven_include_all=src['maven_include_all'],
extra=src['extra'],
remove_extra=remove_extra)
_delete_event_id()
dst = get_tag(dst['id'], strict=True)
if pkgs:
srcpkgs = {}
dstpkgs = {}
for pkg in self.listPackages(tagID=src['id'], event=event, inherited=True):
srcpkgs[pkg['package_name']] = pkg
for pkg in self.listPackages(tagID=dst['id'], inherited=True):
dstpkgs[pkg['package_name']] = pkg
for pkg_name in set(dstpkgs.keys()) - set(srcpkgs.keys()):
pkg = dstpkgs[pkg_name]
_direct_pkglist_add(dst,
pkg_name,
owner=pkg['owner_name'],
block=True,
force=True,
update=True,
extra_arches=pkg['extra_arches'])
_delete_event_id()
for pkg_name in set(srcpkgs.keys()) - set(dstpkgs.keys()):
pkg = srcpkgs[pkg_name]
_direct_pkglist_add(dst,
pkg_name,
owner=pkg['owner_name'],
block=pkg['blocked'],
update=False,
force=True,
extra_arches=pkg['extra_arches'])
_delete_event_id()
if builds:
srcbldsbypkg = defaultdict(OrderedDict)
dstbldsbypkg = defaultdict(OrderedDict)
# listTagged orders builds latest-first
# so reversing that gives us oldest-first
for build in reversed(readTaggedBuilds(src['id'], event=event, inherit=inherit_builds,
latest=latest_only)):
srcbldsbypkg[build['package_name']][build['nvr']] = build
# get builds in dst without inheritance.
# latest=False to get all builds even when latest_only = True,
# so that only the *latest* build per tag will live in.
for build in reversed(readTaggedBuilds(dst['id'], inherit=False, latest=False)):
dstbldsbypkg[build['package_name']][build['nvr']] = build
if remove:
for (pkg, dstblds) in dstbldsbypkg.items():
if pkg not in srcbldsbypkg:
# untag all builds for packages which are not in dst
for build in dstblds:
# don't untag inherited builds
if build['tag_name'] == dst['name']:
_direct_untag_build(dst, build, user, force=force)
_delete_event_id()
# add and/or remove builds from dst to match src contents and order
for (pkg, srcblds) in srcbldsbypkg.items():
dstblds = dstbldsbypkg[pkg]
# firstly, deal with extra builds in dst
removed_nvrs = set(dstblds.keys()) - set(srcblds.keys())
bld_order = srcblds.copy()
if remove:
# mark the extra builds for deletion
dnvrs = []
for (dstnvr, dstbld) in dstblds.items():
if dstnvr in removed_nvrs:
dnvrs.append(dstnvr)
if dstbld['tag_name'] == dst['name']:
_untag_build(dst['name'], dstbld, force=force)
_delete_event_id()
# we also remove them from dstblds now so that they do not
# interfere with the order comparison below
for dnvr in dnvrs:
del dstblds[dnvr]
else:
# in the no-removal case, the extra builds should be forced
# to last in the tag
bld_order = OrderedDict()
for (dstnvr, dstbld) in dstblds.items():
if dstnvr in removed_nvrs:
bld_order[dstnvr] = dstbld
for (nvr, srcbld) in srcblds.items():
bld_order[nvr] = srcbld
# secondly, add builds from src tag and adjust the order
for (nvr, srcbld) in bld_order.items():
found = False
out_of_order = []
# note that dstblds is trimmed as we go, so we are only
# considering the tail corresponding to where we are at
# in the srcblds loop
for (dstnvr, dstbld) in dstblds.items():
if nvr == dstnvr:
found = True
break
else:
out_of_order.append(dstnvr)
if dstbld['tag_name'] == dst['name']:
_untag_build(dst['name'], dstbld, force=force)
_delete_event_id()
for dnvr in out_of_order:
del dstblds[dnvr]
# these will be re-added in the proper order later
if found:
# remove it for next pass so we stay aligned with outer
# loop
del dstblds[nvr]
else:
# missing from dst, so we need to add it
_direct_tag_build(dst, srcbld, user, force=force)
_delete_event_id()
if groups:
srcgroups = OrderedDict()
dstgroups = OrderedDict()
for group in readTagGroups(src['name'], event=event, incl_blocked=True):
srcgroups[group['name']] = group
for group in readTagGroups(dst['name'], incl_blocked=True):
dstgroups[group['name']] = group
for (grpname, group) in srcgroups.items():
if grpname not in dstgroups or group['blocked'] != dstgroups[grpname]['blocked']:
_grplist_add(dst['id'], group['name'], block=group['blocked'], force=force)
_delete_event_id()
if remove:
for (grpname, group) in dstgroups.items():
if grpname not in srcgroups:
if group['tag_id'] == dst['id']:
# not inherited
_grplist_remove(dst['id'], group['id'], force=force)
else:
# block inherited groups
_grplist_add(dst['id'], group['name'], block=True, force=force)
_delete_event_id()
for (grpname, group) in srcgroups.items():
if grpname in dstgroups:
srcgrppkglist = {}
dstgrppkglist = {}
for pkg in group['packagelist']:
srcgrppkglist[pkg['package']] = pkg
for pkg in dstgroups[grpname]['packagelist']:
dstgrppkglist[pkg['package']] = pkg
for pkg in srcgrppkglist.values():
if pkg['package'] not in dstgrppkglist:
_grp_pkg_add(dst['name'], grpname, pkg['package'],
force=force, block=False)
_delete_event_id()
srcgrpreqlist = {}
dstgrpreqlist = {}
for grp in group['grouplist']:
srcgrpreqlist[grp['name']] = grp
for grp in dstgroups[grpname]['grouplist']:
dstgrpreqlist[grp['name']] = grp
for grp in srcgrpreqlist.values():
if grp['name'] not in dstgrpreqlist:
_grp_req_add(dst['name'], grpname, grp['name'],
force=force, block=grp['blocked'])
_delete_event_id()
if remove:
for pkgname, pkg in dstgrppkglist.items():
if pkg['blocked']:
continue
if srcgrppkglist.get(pkgname, {}).get('blocked'):
_grp_pkg_add(dst['id'], grpname, pkg['package'],
block=True, force=force)
_delete_event_id()
elif pkgname not in srcgrppkglist and pkg['tag_id'] == dst['id']:
_grp_pkg_remove(dst['name'], grpname, pkg['package'], force=force)
_delete_event_id()
for grp in dstgrpreqlist.values():
if grp['blocked']:
continue
if grp['name'] not in srcgrpreqlist:
if grp['group_id'] == dst['id']:
_grp_req_remove(dst['name'], grpname, grp['name'], force=force)
else:
_grp_req_add(dst['name'], grpname, grp['name'],
block=True, force=force)
_delete_event_id()
def moveBuild(self, tag1, tag2, build, force=False):
"""Move a build from tag1 to tag2

View file

@ -200,81 +200,12 @@ clone-tag will create the destination tag if it does not already exist
self.session.assert_has_calls([call.hasPerm('admin'),
call.getBuildConfig('src-tag', event=None),
call.getTag('dst-tag'),
call.createTag('dst-tag', arches='arch1 arch2',
locked=False, maven_include_all=True,
maven_support=False, parent=None, perm=1,
extra={}),
call.getTag('dst-tag', strict=True),
call.listPackages(event=None, inherited=True, tagID=1),
call.packageListAdd('dst-tag', 'apkg', block=False,
extra_arches='arch4', owner='userA'),
call.packageListAdd('dst-tag', 'pkg1', block=False,
extra_arches=None, owner='userA'),
call.packageListAdd('dst-tag', 'pkg2', block=True,
extra_arches='arch3 arch4',
owner='userB'),
call.multiCall(batch=100),
call.listTagged(1, event=None, inherit=None, latest=None),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg2-1.0-1',
'package_name': 'pkg2', 'state': 2,
'tag_name': 'src-tag-p',
'name': 'pkg2'}, force=None, notify=False),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-1.0-1',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None, notify=False),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-1.0-2',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None, notify=False),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-1.1-2',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None, notify=False),
call.multiCall(batch=100),
call.getTagGroups('src-tag', event=None),
call.groupListAdd('dst-tag', 'group1'),
call.groupPackageListAdd('dst-tag', 'group1', 'pkg1',
block=False),
call.groupPackageListAdd('dst-tag', 'group1', 'pkg2',
block=False),
call.groupListAdd('dst-tag', 'group2'),
call.groupPackageListAdd('dst-tag', 'group2', 'apkg',
block=False),
call.groupPackageListAdd('dst-tag', 'group2', 'bpkg',
block=False),
call.multiCall(batch=100)])
self.assert_console_message(stdout, """
List of changes:
Action Package Blocked Owner From Tag
------- ---------------------------- ---------- ---------- ----------
[new] apkg False userA src-tag-p
[new] pkg1 False userA src-tag
[new] pkg2 True userB src-tag-p
Action From/To Package Build(s) State Owner From Tag
------- ---------------------------- ---------------------------------------- ---------- ---------- ----------
[new] pkg2 pkg2-1.0-1 DELETED b_owner src-tag-p
[new] pkg1 pkg1-1.0-1 COMPLETE b_owner src-tag
[new] pkg1 pkg1-1.0-2 COMPLETE b_owner src-tag
[new] pkg1 pkg1-1.1-2 COMPLETE b_owner src-tag
Action Package Group
------- ---------------------------- ----------------------------
[new] pkg1 group1
[new] pkg2 group1
[new] apkg group2
[new] bpkg group2
""")
call.snapshotTag(1, 'dst-tag',
builds=True, config=True, event=None,
force=None, groups=True,
inherit_builds=None, latest_only=None,
pkgs=True),
])
@mock.patch('sys.stdout', new_callable=six.StringIO)
def test_handle_clone_tag_existing_dsttag(self, stdout):
@ -460,114 +391,13 @@ List of changes:
self.session.assert_has_calls([call.hasPerm('admin'),
call.getBuildConfig('src-tag', event=None),
call.getTag('dst-tag'),
call.editTag2(2, arches='arch1 arch2',
extra={}, locked=False,
maven_include_all=True,
maven_support=False,
parent=None, perm=1),
call.getTag(2, strict=True),
call.listPackages(event=None, inherited=True, tagID=1),
call.listPackages(inherited=True, tagID=2),
call.listTagged(1, event=None, inherit=None, latest=None),
call.listTagged(2, inherit=False, latest=False),
call.getTagGroups('src-tag', event=None),
call.getTagGroups('dst-tag'),
call.packageListAdd('dst-tag', 'pkg2', block=True,
extra_arches='arch3 arch4',
owner='userB'),
call.multiCall(batch=100),
call.untagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-2.1-2',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'dst-tag',
'name': 'pkg1'}, force=None, notify=False),
call.untagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-0.1-1',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'dst-tag',
'name': 'pkg1'}, force=None, notify=False),
call.untagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg3-1.0-1',
'package_name': 'pkg3', 'state': 1,
'tag_name': 'dst-tag',
'name': 'pkg3'}, force=None, notify=False),
call.multiCall(batch=100),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-0.1-1',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None, notify=False),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-1.0-2',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None, notify=False),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-1.1-2',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None, notify=False),
call.multiCall(batch=100),
call.multiCall(batch=100),
call.groupPackageListAdd('dst-tag', 'group1', 'pkg2',
force=None),
call.groupPackageListAdd('dst-tag', 'group1', 'pkg3',
force=None),
call.groupPackageListAdd('dst-tag', 'group1', 'pkg4',
force=None),
call.groupPackageListAdd('dst-tag', 'group2', 'bpkg',
force=None),
call.multiCall(batch=100),
call.multiCall(batch=100),
call.packageListBlock('dst-tag', 'bpkg'),
call.packageListBlock('dst-tag', 'cpkg'),
call.packageListBlock('dst-tag', 'dpkg'),
call.multiCall(batch=100),
call.groupListRemove('dst-tag', 'group3', force=None),
call.groupListBlock('dst-tag', 'group4'),
call.multiCall(batch=100),
call.groupPackageListRemove('dst-tag', 'group1', 'pkg5',
force=None),
call.groupPackageListBlock('dst-tag', 'group2', 'cpkg'),
call.multiCall(batch=100)])
self.assert_console_message(stdout, """
List of changes:
Action Package Blocked Owner From Tag
------- ---------------------------- ---------- ---------- ----------
[add] pkg2 True userB src-tag-p
[blk] bpkg False userC src-tag
[blk] cpkg True userC src-tag-p
[blk] dpkg True userC src-tag
Action From/To Package Build(s) State Owner From Tag
------- ---------------------------- ---------------------------------------- ---------- ---------- ----------
[del] pkg1 pkg1-2.1-2 COMPLETE b_owner dst-tag
[del] pkg1 pkg1-0.1-1 COMPLETE b_owner dst-tag
[del] pkg3 pkg3-1.0-1 COMPLETE b_owner dst-tag
[add] pkg1 pkg1-0.1-1 COMPLETE b_owner src-tag
[add] pkg1 pkg1-1.0-2 COMPLETE b_owner src-tag
[add] pkg1 pkg1-1.1-2 COMPLETE b_owner src-tag
Action Package Group
------- ---------------------------- ----------------------------
[new] pkg2 group1
[new] pkg3 group1
[new] pkg4 group1
[new] bpkg group2
[del] cpkg group3
[del] dpkg group3
[blk] epkg group4
[blk] fpkg group4
[del] pkg5 group1
[blk] cpkg group2
""")
call.snapshotTagModify(1, 'dst-tag',
builds=True, config=True,
event=None, force=None,
groups=True, inherit_builds=None,
latest_only=None, pkgs=True,
remove=True)
])
@mock.patch('sys.stdout', new_callable=six.StringIO)
def test_handle_clone_tag_existing_dsttag_nodelete(self, stdout):
@ -622,21 +452,6 @@ List of changes:
'extra': {}}]
handle_clone_tag(self.options, self.session, args)
self.activate_session.assert_called_once()
self.assert_console_message(stdout, """
List of changes:
Action Package Blocked Owner From Tag
------- ---------------------------- ---------- ---------- ----------
Action From/To Package Build(s) State Owner From Tag
------- ---------------------------- ---------------------------------------- ---------- ---------- ----------
[add] pkg pkg-0.1-1 COMPLETE b_owner src-tag
[add] pkg pkg-1.0-21 COMPLETE b_owner src-tag
[add] pkg pkg-1.0-23 COMPLETE b_owner src-tag
Action Package Group
------- ---------------------------- ----------------------------
""")
@mock.patch('sys.stdout', new_callable=six.StringIO)
def test_handle_clone_tag_existing_dsttag_nodelete_1(self, stdout):
@ -710,20 +525,6 @@ List of changes:
'extra': {}}
handle_clone_tag(self.options, self.session, args)
self.activate_session.assert_called_once()
self.assert_console_message(stdout, """
List of changes:
Action Package Blocked Owner From Tag
------- ---------------------------- ---------- ---------- ----------
Action From/To Package Build(s) State Owner From Tag
------- ---------------------------- ---------------------------------------- ---------- ---------- ----------
[add] pkg pkg-1.0-21 COMPLETE b_owner src-tag
[add] pkg pkg-1.0-23 COMPLETE b_owner src-tag
Action Package Group
------- ---------------------------- ----------------------------
""")
@mock.patch('sys.stdout', new_callable=six.StringIO)
def test_handle_clone_tag_existing_dsttag_nodelete_2(self, stdout):
@ -791,19 +592,6 @@ List of changes:
]
handle_clone_tag(self.options, self.session, args)
self.activate_session.assert_called_once()
self.assert_console_message(stdout, """
List of changes:
Action Package Blocked Owner From Tag
------- ---------------------------- ---------- ---------- ----------
Action From/To Package Build(s) State Owner From Tag
------- ---------------------------- ---------------------------------------- ---------- ---------- ----------
[add] pkg pkg-1.0-23 COMPLETE b_owner src-tag
Action Package Group
------- ---------------------------- ----------------------------
""")
def test_handle_clone_tag_help(self):
self.assert_help(
@ -826,10 +614,8 @@ Options:
--no-delete Don't delete any existing content in dest tag.
--event=EVENT Clone tag at a specific event
--repo=REPO Clone tag at a specific repo event
-v, --verbose show changes
--notify Send tagging/untagging notifications
-f, --force override tag locks if necessary
-n, --test test mode
--batch=SIZE batch size of multicalls [0 to disable, default: 100]
""" % self.progname)

View file

@ -0,0 +1,53 @@
import mock
import unittest
import koji
import kojihub
class TestDeleteEventId(unittest.TestCase):
@mock.patch('kojihub.context')
def test_delete_event_id(self, context):
kojihub.context.event_id = 123
kojihub._delete_event_id()
self.assertFalse(hasattr(context, 'event_id'))
@mock.patch('kojihub.context')
def test_delete_event_id_none(self, context):
kojihub._delete_event_id()
self.assertFalse(hasattr(context, 'event_id'))
class TestMassTag(unittest.TestCase):
def setUp(self):
self.get_tag = mock.patch('kojihub.get_tag').start()
self.get_build = mock.patch('kojihub.get_build').start()
self.get_user = mock.patch('kojihub.get_user').start()
self._direct_tag_build = mock.patch('kojihub._direct_tag_build').start()
self._delete_event_id = mock.patch('kojihub._delete_event_id').start()
self.context = mock.patch('kojihub.context').start()
self.context.session.assertPerm = mock.MagicMock()
self.hub = kojihub.RootExports()
def tearDown(self):
mock.patch.stopall()
def test_no_permission(self):
self.context.session.assertPerm.side_effect = koji.ActionNotAllowed
with self.assertRaises(koji.ActionNotAllowed):
self.hub.massTag('tag', ['n-v-r1'])
self.context.session.assertPerm.assert_called_once_with('tag')
def test_non_existent_tag(self):
self.hub.massTag('non-existent-tag', ['n-v-r-1', 'n-v-r-2'])
def test_non_existent_build(self):
self.hub.massTag('tag', ['non-existent-nvr'])
def test_correct_tagging_mixed_build_id_nvr(self):
self.hub.massTag('tag', ['n-v-r1', 123])
def test_correct_tagging_tag_id(self):
self.hub.massTag(1234, ['n-v-r1', 123])
def test_correct_tagging_tag_dict(self):
self.hub.massTag({'id': 1234, 'name': 'tag'}, ['n-v-r1', 123])

View file

@ -0,0 +1,120 @@
import mock
import unittest
import koji
import kojihub
class TestSnapshotTag(unittest.TestCase):
def setUp(self):
self._create_tag = mock.patch('kojihub._create_tag').start()
self.get_tag = mock.patch('kojihub.get_tag').start()
self.get_build = mock.patch('kojihub.get_build').start()
self.get_user = mock.patch('kojihub.get_user').start()
self._direct_tag_build = mock.patch('kojihub._direct_tag_build').start()
self._direct_pkglist_add = mock.patch('kojihub._direct_pkglist_add').start()
self._delete_event_id = mock.patch('kojihub._delete_event_id').start()
self._grplist_add = mock.patch('kojihub._grplist_add').start()
self._grp_pkg_add = mock.patch('kojihub._grp_pkg_add').start()
self._grp_req_add = mock.patch('kojihub._grp_req_add').start()
self.readTagGroups = mock.patch('kojihub.readTagGroups').start()
self.readTaggedBuilds = mock.patch('kojihub.readTaggedBuilds').start()
self.context = mock.patch('kojihub.context').start()
self.context.session.assertPerm = mock.MagicMock()
self.hub = kojihub.RootExports()
self.hub.listPackages = mock.MagicMock()
self.hub.massTag = mock.MagicMock()
def tearDown(self):
mock.patch.stopall()
def test_no_permission(self):
self.context.session.assertPerm.side_effect = koji.ActionNotAllowed
with self.assertRaises(koji.ActionNotAllowed):
self.hub.snapshotTag('src', 'dst')
self.context.session.assertPerm.assert_called_once_with('tag')
def test_builds_without_pkgs(self):
with self.assertRaises(koji.ParameterError):
self.hub.snapshotTag('src', 'dst', builds=True, pkgs=False)
def test_existing_dst(self):
self.get_tag.side_effect = [{'id': 1}, {'id': 2}]
with self.assertRaises(koji.GenericError) as cm:
self.hub.snapshotTag('src', 'dst')
self.assertEqual("Target tag already exists", str(cm.exception))
def test_locked_without_force(self):
self.get_tag.side_effect = [None, {'id': 1, 'locked': True}]
with self.assertRaises(koji.GenericError) as cm:
self.hub.snapshotTag('src', 'dst')
self.assertEqual("Source tag is locked, use force to copy", str(cm.exception))
def test_correct_all(self):
src = {
'id': 1,
'name': 'src',
'parent': 2,
'locked': True,
'arches': 'x86_64 s390x',
'perm_id': 3,
'maven_support': True,
'maven_include_all': False,
'extra': {'extra_field': 'text'},
}
dst = src.copy()
dst['id'] = 11
dst['name'] = 'dst'
pkg = {
'package_name': 'pkg1',
'owner_name': 'owner',
'blocked': False,
'extra_arches': None,
}
build = {
'id': 21,
'nvr': 'n-v-r',
}
self.get_tag.side_effect = [
None, # non-existing dst
src, # retrieve src
dst, # retrieve created dst
]
self._create_tag.return_value = dst['id']
self.hub.listPackages.return_value = [pkg]
self.readTaggedBuilds.return_value = [build]
self.readTagGroups.return_value = [
{
'id': 1,
'name': 'group',
'blocked': False,
'packagelist': [{'package': 'pkg', 'blocked': False}],
'grouplist': [{'name': 'group2', 'blocked': False}],
}
]
# call
self.hub.snapshotTag('src', 'dst', force=True)
self._create_tag.assert_called_once_with('dst', parent=None, arches=src['arches'],
perm=src['perm_id'], locked=src['locked'],
maven_support=src['maven_support'],
maven_include_all=src['maven_include_all'],
extra=src['extra'])
self.get_tag.assert_has_calls([
mock.call('dst'),
mock.call('src', event=None, strict=True),
mock.call(dst['id'], strict=True),
])
self.hub.listPackages.assert_called_once_with(tagID=src['id'], event=None, inherited=True)
self._direct_pkglist_add.assert_called_once_with(
taginfo=dst['id'],
pkginfo=pkg['package_name'],
owner=pkg['owner_name'],
block=pkg['blocked'],
extra_arches=pkg['extra_arches'],
force=True,
update=False,
)
self.readTaggedBuilds.assert_called_once_with(tag=src['id'], inherit=True, event=None, latest=True)
self.hub.massTag.assert_called_once_with(dst['id'], [build])

View file

@ -0,0 +1,218 @@
import mock
import unittest
import koji
import kojihub
class TestSnapshotTagModify(unittest.TestCase):
def setUp(self):
self._create_tag = mock.patch('kojihub._create_tag').start()
self.get_tag = mock.patch('kojihub.get_tag').start()
self.get_build = mock.patch('kojihub.get_build').start()
self.get_user = mock.patch('kojihub.get_user').start()
self._direct_tag_build = mock.patch('kojihub._direct_tag_build').start()
self._direct_untag_build = mock.patch('kojihub._direct_untag_build').start()
self._tag_build = mock.patch('kojihub._tag_build').start()
self._untag_build = mock.patch('kojihub._untag_build').start()
self._direct_pkglist_add = mock.patch('kojihub._direct_pkglist_add').start()
self._delete_event_id = mock.patch('kojihub._delete_event_id').start()
self._grplist_add = mock.patch('kojihub._grplist_add').start()
self._grplist_remove = mock.patch('kojihub._grplist_remove').start()
self._grp_pkg_add = mock.patch('kojihub._grp_pkg_add').start()
self._grp_pkg_remove = mock.patch('kojihub._grp_pkg_remove').start()
self._grp_req_add = mock.patch('kojihub._grp_req_add').start()
self._grp_req_remove = mock.patch('kojihub._grp_req_remove').start()
self.readTagGroups = mock.patch('kojihub.readTagGroups').start()
self.readTaggedBuilds = mock.patch('kojihub.readTaggedBuilds').start()
self.context = mock.patch('kojihub.context').start()
self.context.session.assertPerm = mock.MagicMock()
self.edit_tag = mock.patch('kojihub.edit_tag').start()
self.hub = kojihub.RootExports()
self.hub.listPackages = mock.MagicMock()
self.hub.massTag = mock.MagicMock()
def tearDown(self):
mock.patch.stopall()
def test_no_permission(self):
self.context.session.assertPerm.side_effect = koji.ActionNotAllowed
with self.assertRaises(koji.ActionNotAllowed):
self.hub.snapshotTagModify('src', 'dst')
self.context.session.assertPerm.assert_called_once_with('tag')
def test_builds_without_pkgs(self):
with self.assertRaises(koji.ParameterError):
self.hub.snapshotTagModify('src', 'dst', builds=True, pkgs=False)
def test_nonexisting_dst(self):
self.get_tag.side_effect = [{'id': 1, 'locked': False}, koji.GenericError('xx')]
with self.assertRaises(koji.GenericError) as cm:
self.hub.snapshotTagModify('src', 'dst')
self.assertEqual("xx", str(cm.exception))
def test_locked_without_force_both(self):
self.get_tag.side_effect = [{'id': 1, 'locked': True}, {'id': 2, 'locked': True}]
with self.assertRaises(koji.GenericError) as cm:
self.hub.snapshotTagModify('src', 'dst')
self.assertEqual("Source or destination tag is locked, use force to copy", str(cm.exception))
def test_locked_without_force_src(self):
self.get_tag.side_effect = [{'id': 1, 'locked': True}, {'id': 2, 'locked': False}]
with self.assertRaises(koji.GenericError) as cm:
self.hub.snapshotTagModify('src', 'dst')
self.assertEqual("Source or destination tag is locked, use force to copy", str(cm.exception))
def test_locked_without_force_dst(self):
self.get_tag.side_effect = [{'id': 1, 'locked': False}, {'id': 2, 'locked': True}]
with self.assertRaises(koji.GenericError) as cm:
self.hub.snapshotTagModify('src', 'dst')
self.assertEqual("Source or destination tag is locked, use force to copy", str(cm.exception))
def test_correct_all(self):
src = {
'id': 1,
'name': 'src',
'parent': 2,
'locked': True,
'arches': 'x86_64 s390x',
'perm_id': 3,
'maven_support': True,
'maven_include_all': False,
'extra': {'extra_field': 'text'},
}
dst = src.copy()
dst['id'] = 11
dst['name'] = 'dst'
pkg1 = {
'tag_id': src['id'],
'package_name': 'pkg1',
'owner_name': 'owner',
'blocked': False,
'extra_arches': None,
}
pkg2 = {
'tag_id': dst['id'],
'package_name': 'pkg2',
'owner_name': 'owner',
'blocked': False,
'extra_arches': None,
}
build = {
'id': 21,
'nvr': 'n-v-r',
'package_name': pkg1['package_name'],
'tag_name': 'src',
}
build2 = {
'id': 22,
'nvr': 'n-v-r2',
'package_name': pkg1['package_name'],
'tag_name': 'dst',
}
user = {
'id': 321,
'name': 'username',
}
src_group1 = {
'id': 1,
'name': 'group1',
'blocked': False,
'packagelist': [
{
'package': pkg1['package_name'],
'tag_id': src['id'],
'blocked': False,
}
],
'grouplist': [{'group_id': 5, 'name': 'group5', 'blocked': False}],
'inherited': False,
}
src_group2 = {
'id': 2,
'name': 'group2',
'blocked': False,
'packagelist': [],
'grouplist': [],
'inherited': False,
}
dst_group1 = {
'id': 3,
'name': 'group1',
'blocked': False,
'packagelist': [
{
'package': pkg2['package_name'],
'tag_id': dst['id'],
'blocked': False,
}
],
'grouplist': [{'group_id': 4, 'name': 'group4', 'blocked': False}],
'inherited': False,
}
self.get_tag.side_effect = [
src, # src
dst, # dst
dst, # edited dst
]
self.get_user.return_value = user
self._create_tag.return_value = dst['id']
self.hub.listPackages.side_effect = [[pkg1], [pkg2]]
self.readTaggedBuilds.side_effect = [[build], [build2]]
self.readTagGroups.side_effect = [[src_group1, src_group2], [dst_group1]]
self.context.session.user_id = user['id']
# call
self.hub.snapshotTagModify('src', 'dst', force=True, remove=True)
# tests
self._create_tag.assert_not_called()
self.get_tag.assert_has_calls([
mock.call('src', event=None, strict=True),
mock.call('dst', strict=True),
mock.call(dst['id'], strict=True),
])
self.get_user.assert_called_once_with(user['id'], strict=True)
self.edit_tag.assert_called_once_with(dst['id'], parent=None, arches=src['arches'],
perm=src['perm_id'], locked=src['locked'],
maven_support=src['maven_support'],
maven_include_all=src['maven_include_all'],
extra=src['extra'], remove_extra=[])
self.hub.listPackages.assert_has_calls([
mock.call(tagID=src['id'], event=None, inherited=True),
mock.call(tagID=dst['id'], inherited=True)
])
self._direct_pkglist_add.assert_has_calls([
# remove additional package
mock.call(dst,
pkg2['package_name'],
owner=pkg2['owner_name'],
block=True,
extra_arches=pkg2['extra_arches'],
force=True,
update=True),
# add missing package
mock.call(dst,
pkg1['package_name'],
owner=pkg1['owner_name'],
block=pkg1['blocked'],
extra_arches=pkg1['extra_arches'],
force=True,
update=False),
])
self.readTaggedBuilds.assert_has_calls([
mock.call(src['id'], event=None, inherit=True, latest=True),
mock.call(dst['id'], inherit=False, latest=False),
])
self._direct_untag_build.assert_not_called()
self._untag_build.assert_called_once_with('dst', build2, force=True)
self._direct_tag_build.assert_called_once_with(dst, build, user, force=True)
self._grp_pkg_add.assert_called_once_with('dst', 'group1', pkg1['package_name'],
block=False, force=True)
self._grp_req_add.assert_has_calls([
mock.call('dst', 'group1', 'group5', block=False, force=True),
mock.call('dst', 'group1', 'group4', block=True, force=True),
])
self._grplist_add.assert_called_once_with(dst['id'], 'group2', block=False, force=True)
self._grplist_remove.assert_not_called()
self.hub.massTag.assert_not_called()