[clone-tag] preserve build order and fix group cloning

This commit is contained in:
Yuming Zhu 2018-07-12 12:42:36 +08:00 committed by Mike McLean
parent 39f84abf32
commit b42bc3ebde
2 changed files with 828 additions and 103 deletions

View file

@ -3,7 +3,7 @@ from __future__ import division
import ast
import base64
from collections import OrderedDict
from collections import defaultdict, OrderedDict
import fnmatch
import json
import logging
@ -3248,11 +3248,11 @@ def handle_clone_tag(goptions, session, args):
help=_('Clone tag at a specific event'))
parser.add_option('--repo', type='int',
help=_('Clone tag at a specific repo event'))
parser.add_option("-v","--verbose", action="store_true",
parser.add_option("-v", "--verbose", action="store_true",
help=_("show changes"))
parser.add_option("-f","--force", action="store_true",
parser.add_option("-f", "--force", action="store_true",
help=_("override tag locks if necessary"))
parser.add_option("-n","--test", action="store_true", help=_("test mode"))
parser.add_option("-n", "--test", action="store_true", help=_("test mode"))
parser.add_option("--batch", type='int', default=1000, metavar='SIZE',
help=_("batch size of multicalls [0 to disable, default: %default]"))
(options, args) = parser.parse_args(args)
@ -3263,12 +3263,11 @@ def handle_clone_tag(goptions, session, args):
activate_session(session, goptions)
if not session.hasPerm('admin') and not options.test:
print(_("This action requires admin privileges"))
parser.error(_("This action requires admin privileges"))
return
if args[0] == args[1]:
sys.stdout.write('Source and destination tags must be different.\n')
return
parser.error(_('Source and destination tags must be different.'))
if options.batch < 0:
parser.error(_("batch size must be bigger than zero"))
@ -3285,12 +3284,10 @@ def handle_clone_tag(goptions, session, args):
srctag = session.getTag(args[0])
dsttag = session.getTag(args[1])
if not srctag:
sys.stdout.write("Unknown src-tag: %s\n" % args[0])
return
parser.error(_("Unknown src-tag: %s" % args[0]))
if (srctag['locked'] and not options.force) or (dsttag and dsttag['locked'] and not options.force):
print(_("Error: You are attempting to clone from or to a tag which is locked."))
print(_("Please use --force if this is what you really want to do."))
return
parser.error(_("Error: You are attempting to clone from or to a tag which is locked.\n"
"Please use --force if this is what you really want to do."))
# init debug lists.
chgpkglist=[]
@ -3299,8 +3296,7 @@ def handle_clone_tag(goptions, session, args):
# case of brand new dst-tag.
if not dsttag:
if not options.config:
print(_('Cannot create tag without specifying --config'))
return
parser.error(_('Cannot create tag without specifying --config'))
# create a new tag, copy srctag header.
if not options.test:
session.createTag(args[1], parent=None, arches=srctag['arches'],
@ -3308,20 +3304,29 @@ def handle_clone_tag(goptions, session, args):
locked=srctag['locked'],
maven_support=srctag['maven_support'],
maven_include_all=srctag['maven_include_all'])
newtag = session.getTag(args[1], strict=True) # store the new tag, need its asigned id.
# store the new tag, need its assigned id.
newtag = session.getTag(args[1], strict=True)
# get pkglist of src-tag, including inherited packages.
if options.pkgs:
srcpkgs = session.listPackages(tagID=srctag['id'], inherited=True, event=event.get('id'))
srcpkgs.sort(key = lambda x: x['package_name'])
srcpkgs = session.listPackages(tagID=srctag['id'],
inherited=True,
event=event.get('id'))
srcpkgs.sort(key=lambda x: x['package_name'])
if not options.test:
session.multicall = True
for pkgs in srcpkgs:
# for each package add one entry in the new tag.
chgpkglist.append(('[new]',pkgs['package_name'],pkgs['blocked'],pkgs['owner_name'],pkgs['tag_name']))
chgpkglist.append(('[new]',
pkgs['package_name'],
pkgs['blocked'],
pkgs['owner_name'],
pkgs['tag_name']))
if not options.test:
# add packages.
session.packageListAdd(newtag['name'],pkgs['package_name'],
owner=pkgs['owner_name'],block=pkgs['blocked'],
session.packageListAdd(newtag['name'],
pkgs['package_name'],
owner=pkgs['owner_name'],
block=pkgs['blocked'],
extra_arches=pkgs['extra_arches'])
if not options.test:
session.multiCall(batch=options.batch)
@ -3334,18 +3339,25 @@ def handle_clone_tag(goptions, session, args):
if not options.test:
session.multicall = True
for build in builds:
build['name'] = build['package_name'] # add missing 'name' field.
chgbldlist.append(('[new]', build['package_name'],
build['nvr'], koji.BUILD_STATES[build['state']],
build['owner_name'], build['tag_name']))
# add missing 'name' field.
build['name'] = build['package_name']
chgbldlist.append(('[new]',
build['package_name'],
build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'],
build['tag_name']))
# copy latest builds into new tag
if not options.test:
session.tagBuildBypass(newtag['name'], build, force=options.force)
session.tagBuildBypass(newtag['name'],
build,
force=options.force)
if not options.test:
session.multiCall(batch=options.batch)
if options.groups:
# Copy the group data
srcgroups = session.getTagGroups(srctag['name'], event=event.get('id'))
srcgroups = session.getTagGroups(srctag['name'],
event=event.get('id'))
if not options.test:
session.multicall = True
for group in srcgroups:
@ -3353,8 +3365,10 @@ def handle_clone_tag(goptions, session, args):
session.groupListAdd(newtag['name'], group['name'])
for pkg in group['packagelist']:
if not options.test:
session.groupPackageListAdd(newtag['name'], group['name'],
pkg['package'], block=pkg['blocked'])
session.groupPackageListAdd(newtag['name'],
group['name'],
pkg['package'],
block=pkg['blocked'])
chggrplist.append(('[new]', pkg['package'], group['name']))
if not options.test:
session.multiCall(batch=options.batch)
@ -3363,65 +3377,101 @@ def handle_clone_tag(goptions, session, args):
# get fresh list of packages & builds into maps.
srcpkgs = {}
dstpkgs = {}
srclblds = OrderedDict()
dstlblds = {}
srcbldsbypkg = defaultdict(OrderedDict)
dstbldsbypkg = defaultdict(OrderedDict)
srcgroups = {}
dstgroups = {}
if options.pkgs:
for pkg in session.listPackages(tagID=srctag['id'], inherited=True, event=event.get('id')):
for pkg in session.listPackages(tagID=srctag['id'],
inherited=True,
event=event.get('id')):
srcpkgs[pkg['package_name']] = pkg
for pkg in session.listPackages(tagID=dsttag['id'], inherited=True):
for pkg in session.listPackages(tagID=dsttag['id'],
inherited=True):
dstpkgs[pkg['package_name']] = pkg
if options.builds:
src_builds = list(reversed(session.listTagged(srctag['id'],
event=event.get('id'),
inherit=options.inherit_builds,
latest=options.latest_only)))
for build in src_builds:
srclblds[build['nvr']] = build
for build in session.getLatestBuilds(dsttag['name']):
dstlblds[build['nvr']] = build
for build in reversed(session.listTagged(srctag['id'],
event=event.get('id'),
inherit=options.inherit_builds,
latest=options.latest_only)):
srcbldsbypkg[build['package_name']][build['nvr']] = build
# get builds in dsttag without inheritance
for build in reversed(session.listTagged(dsttag['id'],
inherit=False,
latest=options.latest_only)):
dstbldsbypkg[build['package_name']][build['nvr']] = build
if options.groups:
for group in session.getTagGroups(srctag['name'], event=event.get('id')):
for group in session.getTagGroups(srctag['name'],
event=event.get('id')):
srcgroups[group['name']] = group
for group in session.getTagGroups(dsttag['name']):
dstgroups[group['name']] = group
#construct to-do lists.
paddlist = [] # list containing new packages to be added from src tag
# construct to-do lists.
paddlist = [] # list containing new packages to be added from src tag
for (package_name, pkg) in six.iteritems(srcpkgs):
if package_name not in dstpkgs:
paddlist.append(pkg)
paddlist.sort(key = lambda x: x['package_name'])
pdellist = [] # list containing packages no more present in dst tag
paddlist.sort(key=lambda x: x['package_name'])
pdellist = [] # list containing packages no more present in dst tag
for (package_name, pkg) in six.iteritems(dstpkgs):
if package_name not in srcpkgs:
pdellist.append(pkg)
pdellist.sort(key = lambda x: x['package_name'])
baddlist = [] # list containing new builds to be added from src tag
for (nvr, lbld) in six.iteritems(srclblds):
if nvr not in dstlblds:
baddlist.append(lbld)
baddlist.sort(key = lambda x: x['package_name'])
bdellist = [] # list containing new builds to be removed from dst tag
for (nvr, lbld) in six.iteritems(dstlblds):
if nvr not in srclblds:
bdellist.append(lbld)
bdellist.sort(key = lambda x: x['package_name'])
pdellist.sort(key=lambda x: x['package_name'])
baddlist = [] # list containing new builds to be added from src tag
bdellist = [] # list containing new builds to be removed from dst tag
for (pkg, dstblds) in six.iteritems(dstbldsbypkg):
if pkg not in srcbldsbypkg:
bdellist.extend(dstblds.values())
for (pkg, srcblds) in six.iteritems(srcbldsbypkg):
dstblds = dstbldsbypkg[pkg]
ablds = []
dblds = []
# firstly, remove builds from dst tag
removed_nvrs = set(dstblds.keys()) - set(srcblds.keys())
dnvrs = []
for (dstnvr, dstbld) in six.iteritems(dstblds):
if dstnvr in removed_nvrs:
dnvrs.append(dstnvr)
dblds.append(dstbld)
for dnvr in dnvrs:
del dstblds[dnvr]
# secondly, add builds from src tag and adjust the order
for (nvr, srcbld) in six.iteritems(srcblds):
found = False
dnvrs = []
for (dstnvr, dstbld) in six.iteritems(dstblds):
if nvr == dstnvr:
found = True
dnvrs.append(dstnvr)
break
# if latest_only, remove older builds, else keep them
elif not options.latest_only:
dnvrs.append(dstnvr)
dblds.append(dstbld)
for dnvr in dnvrs:
del dstblds[dnvr]
if not found:
ablds.append(srcbld)
baddlist.extend(ablds)
bdellist.extend(dblds)
baddlist.sort(key=lambda x: x['package_name'])
bdellist.sort(key=lambda x: x['package_name'])
gaddlist = [] # list containing new groups to be added from src tag
for (grpname, group) in six.iteritems(srcgroups):
if grpname not in dstgroups:
gaddlist.append(group)
gdellist = [] # list containing groups to be removed from src tag
gdellist = [] # list containing groups to be removed from src tag
for (grpname, group) in six.iteritems(dstgroups):
if grpname not in srcgroups:
gdellist.append(group)
grpchanges = {} # dict of changes to make in shared groups
grpchanges = {} # dict of changes to make in shared groups
for (grpname, group) in six.iteritems(srcgroups):
if grpname in dstgroups:
dstgroup = dstgroups[grpname]
grpchanges[grpname] = {'adds':[], 'dels':[]}
# Store whether group is inherited or not
grpchanges[grpname]['inherited'] = False
if group['tag_id'] != dsttag['id']:
if dstgroup['tag_id'] != dsttag['id']:
grpchanges[grpname]['inherited'] = True
srcgrppkglist=[]
dstgrppkglist=[]
@ -3439,27 +3489,57 @@ def handle_clone_tag(goptions, session, args):
if not options.test:
session.multicall = True
for pkg in paddlist:
chgpkglist.append(('[add]', pkg['package_name'],
pkg['blocked'], pkg['owner_name'],
pkg['tag_name']))
chgpkglist.append(('[add]',
pkg['package_name'],
pkg['blocked'],
pkg['owner_name'],
pkg['tag_name']))
if not options.test:
session.packageListAdd(dsttag['name'], pkg['package_name'],
session.packageListAdd(dsttag['name'],
pkg['package_name'],
owner=pkg['owner_name'],
block=pkg['blocked'],
extra_arches=pkg['extra_arches'])
if not options.test:
session.multiCall(batch=options.batch)
# DEL builds. To keep the order we should untag builds at first
if not options.test:
session.multicall = True
for build in bdellist:
# don't delete an inherited build.
if build['tag_name'] == dsttag['name']:
# add missing 'name' field
build['name'] = build['package_name']
chgbldlist.append(('[del]',
build['package_name'],
build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'],
build['tag_name']))
# go on del builds from new tag.
if not options.test:
session.untagBuildBypass(dsttag['name'],
build,
force=options.force)
if not options.test:
session.multiCall(batch=options.batch)
# ADD builds.
if not options.test:
session.multicall = True
for build in baddlist:
build['name'] = build['package_name'] # add missing 'name' field.
chgbldlist.append(('[add]', build['package_name'], build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'], build['tag_name']))
# add missing 'name' field.
build['name'] = build['package_name']
chgbldlist.append(('[add]',
build['package_name'],
build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'],
build['tag_name']))
# copy latest builds into new tag.
if not options.test:
session.tagBuildBypass(dsttag['name'], build, force=options.force)
session.tagBuildBypass(dsttag['name'],
build,
force=options.force)
if not options.test:
session.multiCall(batch=options.batch)
# ADD groups.
@ -3467,10 +3547,15 @@ def handle_clone_tag(goptions, session, args):
session.multicall = True
for group in gaddlist:
if not options.test:
session.groupListAdd(dsttag['name'], group['name'], force=options.force)
session.groupListAdd(dsttag['name'],
group['name'],
force=options.force)
for pkg in group['packagelist']:
if not options.test:
session.groupPackageListAdd(dsttag['name'], group['name'], pkg['package'], force=options.force)
session.groupPackageListAdd(dsttag['name'],
group['name'],
pkg['package'],
force=options.force)
chggrplist.append(('[new]', pkg['package'], group['name']))
if not options.test:
session.multiCall(batch=options.batch)
@ -3481,22 +3566,10 @@ def handle_clone_tag(goptions, session, args):
for pkg in grpchanges[group]['adds']:
chggrplist.append(('[new]', pkg, group))
if not options.test:
session.groupPackageListAdd(dsttag['name'], group, pkg, force=options.force)
if not options.test:
session.multiCall(batch=options.batch)
# DEL builds.
if not options.test:
session.multicall = True
for build in bdellist:
# dont delete an inherited build.
if build['tag_name'] == dsttag['name']:
build['name'] = build['package_name'] # add missing 'name' field.
chgbldlist.append(('[del]', build['package_name'], build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'], build['tag_name']))
# go on del builds from new tag.
if not options.test:
session.untagBuildBypass(dsttag['name'], build, force=options.force)
session.groupPackageListAdd(dsttag['name'],
group,
pkg,
force=options.force)
if not options.test:
session.multiCall(batch=options.batch)
# DEL packages.
@ -3511,29 +3584,45 @@ def handle_clone_tag(goptions, session, args):
# delete only non-inherited packages.
for pkg in ninhrtpdellist:
# check if package have owned builds inside.
session.listTagged(dsttag['name'], package=pkg['package_name'], inherit=False)
session.listTagged(dsttag['name'],
package=pkg['package_name'],
inherit=False)
bump_builds = session.multiCall(batch=options.batch)
if not options.test:
session.multicall = True
for pkg, [builds] in zip(ninhrtpdellist, bump_builds):
#remove all its builds first if there are any.
# remove all its builds first if there are any.
for build in builds:
build['name'] = build['package_name'] #add missing 'name' field.
chgbldlist.append(('[del]', build['package_name'], build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'], build['tag_name']))
# add missing 'name' field.
build['name'] = build['package_name']
chgbldlist.append(('[del]',
build['package_name'],
build['nvr'],
koji.BUILD_STATES[build['state']],
build['owner_name'],
build['tag_name']))
# so delete latest build(s) from new tag.
if not options.test:
session.untagBuildBypass(dsttag['name'], build, force=options.force)
# now safe to remove package itselfm since we resolved its builds.
chgpkglist.append(('[del]', pkg['package_name'], pkg['blocked'],
pkg['owner_name'], pkg['tag_name']))
session.untagBuildBypass(dsttag['name'],
build,
force=options.force)
# now safe to remove package itself since we resolved its builds.
chgpkglist.append(('[del]',
pkg['package_name'],
pkg['blocked'],
pkg['owner_name'],
pkg['tag_name']))
if not options.test:
session.packageListRemove(dsttag['name'], pkg['package_name'], force=False)
session.packageListRemove(dsttag['name'],
pkg['package_name'],
force=False)
# mark as blocked inherited packages.
for pkg in inhrtpdellist:
chgpkglist.append(('[blk]', pkg['package_name'], pkg['blocked'],
pkg['owner_name'], pkg['tag_name']))
chgpkglist.append(('[blk]',
pkg['package_name'],
pkg['blocked'],
pkg['owner_name'],
pkg['tag_name']))
if not options.test:
session.packageListBlock(dsttag['name'], pkg['package_name'])
if not options.test:
@ -3545,7 +3634,9 @@ def handle_clone_tag(goptions, session, args):
# Only delete a group that isn't inherited
if group['tag_id'] == dsttag['id']:
if not options.test:
session.groupListRemove(dsttag['name'], group['name'], force=options.force)
session.groupListRemove(dsttag['name'],
group['name'],
force=options.force)
for pkg in group['packagelist']:
chggrplist.append(('[del]', pkg['package'], group['name']))
# mark as blocked inherited groups.
@ -3565,11 +3656,16 @@ def handle_clone_tag(goptions, session, args):
if not grpchanges[group]['inherited']:
chggrplist.append(('[del]', pkg, group))
if not options.test:
session.groupPackageListRemove(dsttag['name'], group, pkg, force=options.force)
session.groupPackageListRemove(dsttag['name'],
group,
pkg,
force=options.force)
else:
chggrplist.append(('[blk]', pkg, group))
if not options.test:
session.groupPackageListBlock(dsttag['name'], group, pkg)
session.groupPackageListBlock(dsttag['name'],
group,
pkg)
if not options.test:
session.multiCall(batch=options.batch)
# print final list of actions.
@ -3583,7 +3679,7 @@ def handle_clone_tag(goptions, session, args):
for changes in chgpkglist:
sys.stdout.write(pfmt % changes)
sys.stdout.write('\n')
sys.stdout.write(bfmt % ('Action', 'From/To Package', 'Latest Build(s)', 'State', 'Owner', 'From Tag'))
sys.stdout.write(bfmt % ('Action', 'From/To Package', 'Build(s)', 'State', 'Owner', 'From Tag'))
sys.stdout.write(bfmt % ('-'*7, '-'*28, '-'*40, '-'*10, '-'*10, '-'*10))
for changes in chgbldlist:
sys.stdout.write(bfmt % changes)

View file

@ -0,0 +1,629 @@
from __future__ import absolute_import
import mock
import six
from mock import call
try:
import unittest2 as unittest
except ImportError:
import unittest
from koji_cli.commands import handle_clone_tag
from . import utils
class TestCloneTag(utils.CliTestCase):
# Show long diffs in error output...
maxDiff = None
def setUp(self):
self.session = mock.MagicMock()
self.options = mock.MagicMock()
self.session.hasPerm.return_value = True
self.session.getTag.side_effect = [{'id': 1,
'locked': False},
{'id': 2,
'locked': False}]
self.activate_session = mock.patch(
'koji_cli.commands.activate_session').start()
self.error_format = """Usage: %s clone-tag [options] <src-tag> <dst-tag>
(Specify the --help global option for a list of other help options)
%s: error: {message}
""" % (self.progname, self.progname)
def tearDown(self):
mock.patch.stopall()
def test_handle_clone_tag_missing_arg(self):
args = ['some-tag']
self.assert_system_exit(
handle_clone_tag,
self.options,
self.session,
args,
stderr=self.format_error_message(
"This command takes two arguments: <src-tag> <dst-tag>"),
activate_session=None)
self.activate_session.assert_not_called()
def test_handle_clone_tag_not_admin(self):
args = ['src-tag', 'dst-tag']
self.session.hasPerm.return_value = False
self.assert_system_exit(
handle_clone_tag,
self.options,
self.session,
args,
stderr=self.format_error_message(
"This action requires admin privileges"),
activate_session=None)
self.activate_session.assert_called_once()
self.session.hasPerm.assert_called_once_with('admin')
def test_handle_clone_tag_same_tag(self):
args = ['src-tag', 'src-tag']
self.assert_system_exit(
handle_clone_tag,
self.options,
self.session,
args,
stderr=self.format_error_message(
"Source and destination tags must be different."),
activate_session=None)
self.activate_session.assert_called_once()
def test_handle_clone_tag_invalid_batch(self):
args = ['src-tag', 'dst-tag', '--batch=-1']
self.assert_system_exit(
handle_clone_tag,
self.options,
self.session,
args,
stderr=self.format_error_message(
"batch size must be bigger than zero"),
activate_session=None)
self.activate_session.assert_called_once()
def test_handle_clone_tag_no_srctag(self):
args = ['src-tag', 'dst-tag']
self.session.getTag.side_effect = [None, None]
self.assert_system_exit(
handle_clone_tag,
self.options,
self.session,
args,
stderr=self.format_error_message("Unknown src-tag: src-tag"),
activate_session=None)
self.activate_session.assert_called_once()
self.activate_session.getTag.has_called([mock.call('src-tag'),
mock.call('dst-tag')])
def test_handle_clone_tag_locked(self):
args = ['src-tag', 'dst-tag']
self.session.getTag.side_effect = [{'id': 1,
'locked': True},
{'id': 2,
'locked': False}]
self.assert_system_exit(
handle_clone_tag,
self.options,
self.session,
args,
stderr=self.format_error_message(
"Error: You are attempting to clone from or to a tag which is locked.\n"
"Please use --force if this is what you really want to do."),
activate_session=None)
self.activate_session.assert_called_once()
self.activate_session.getTag.has_called([mock.call('src-tag'),
mock.call('dst-tag')])
def test_handle_clone_tag_no_config(self):
args = ['src-tag', 'dst-tag']
self.session.getTag.side_effect = [{'id': 1,
'locked': False},
None]
self.assert_system_exit(
handle_clone_tag,
self.options,
self.session,
args,
stderr=self.format_error_message(
"Cannot create tag without specifying --config"),
activate_session=None)
self.activate_session.assert_called_once()
self.activate_session.getTag.has_called([mock.call('src-tag'),
mock.call('dst-tag')])
@mock.patch('sys.stdout', new_callable=six.StringIO)
def test_handle_clone_tag_new_dsttag(self, stdout):
args = ['src-tag', 'dst-tag', '--all', '-v']
self.session.listPackages.return_value = [{'package_id': 1,
'package_name': 'pkg1',
'blocked': False,
'owner_name': 'userA',
'tag_name': 'src-tag',
'extra_arches': None},
{'package_id': 2,
'package_name': 'pkg2',
'blocked': True,
'owner_name': 'userB',
'tag_name': 'src-tag-p',
'extra_arches': 'arch3 arch4'},
{'package_id': 3,
'package_name': 'apkg',
'blocked': False,
'owner_name': 'userA',
'tag_name': 'src-tag-p',
'extra_arches': 'arch4'}]
self.session.listTagged.return_value = [{'package_name': 'pkg1',
'nvr': 'pkg1-1.1-2',
'state': 1,
'owner_name': 'b_owner',
'tag_name': 'src-tag'},
{'package_name': 'pkg1',
'nvr': 'pkg1-1.0-2',
'state': 1,
'owner_name': 'b_owner',
'tag_name': 'src-tag'},
{'package_name': 'pkg1',
'nvr': 'pkg1-1.0-1',
'state': 1,
'owner_name': 'b_owner',
'tag_name': 'src-tag'},
{'package_name': 'pkg2',
'nvr': 'pkg2-1.0-1',
'state': 2,
'owner_name': 'b_owner',
'tag_name': 'src-tag-p'}
]
self.session.getTagGroups.return_value = [{'name': 'group1',
'packagelist': [
{'package': 'pkg1',
'blocked': False},
{'package': 'pkg2',
'blocked': False}]},
{'name': 'group2',
'packagelist': [
{'package': 'apkg',
'blocked': False},
{'package': 'bpkg',
'blocked': False}]
}]
self.session.getTag.side_effect = [{'id': 1,
'name': 'src-tag',
'arches': 'arch1 arch2',
'perm_id': 1,
'maven_support': False,
'maven_include_all': True,
'locked': False},
None,
{'id': 2,
'name': 'dst-tag',
'arches': 'arch1 arch2',
'perm_id': 1,
'maven_support': False,
'maven_include_all': True,
'locked': False}]
handle_clone_tag(self.options, self.session, args)
self.activate_session.assert_called_once()
self.session.assert_has_calls([call.hasPerm('admin'),
call.getTag('src-tag'),
call.getTag('dst-tag'),
call.createTag('dst-tag',
arches='arch1 arch2',
locked=False,
maven_include_all=True,
maven_support=False,
parent=None, perm=1),
call.getTag('dst-tag', strict=True),
call.listPackages(event=None,
inherited=True,
tagID=1),
call.packageListAdd('dst-tag', 'apkg',
block=False,
extra_arches='arch4',
owner='userA'),
call.packageListAdd('dst-tag', 'pkg1',
block=False,
extra_arches=None,
owner='userA'),
call.packageListAdd('dst-tag', 'pkg2',
block=True,
extra_arches='arch3 arch4',
owner='userB'),
call.multiCall(batch=1000),
call.listTagged(1, event=None,
inherit=None,
latest=None),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg2-1.0-1',
'package_name': 'pkg2', 'state': 2,
'tag_name': 'src-tag-p',
'name': 'pkg2'}, force=None),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-1.0-1',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-1.0-2',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-1.1-2',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None),
call.multiCall(batch=1000),
call.getTagGroups('src-tag',
event=None),
call.groupListAdd('dst-tag', 'group1'),
call.groupPackageListAdd('dst-tag',
'group1',
'pkg1',
block=False),
call.groupPackageListAdd('dst-tag',
'group1',
'pkg2',
block=False),
call.groupListAdd('dst-tag', 'group2'),
call.groupPackageListAdd('dst-tag',
'group2',
'apkg',
block=False),
call.groupPackageListAdd('dst-tag',
'group2',
'bpkg',
block=False),
call.multiCall(batch=1000)])
self.assert_console_message(stdout, """
List of changes:
Action Package Blocked Owner From Tag
------- ---------------------------- ---------- ---------- ----------
[new] apkg False userA src-tag-p
[new] pkg1 False userA src-tag
[new] pkg2 True userB src-tag-p
Action From/To Package Build(s) State Owner From Tag
------- ---------------------------- ---------------------------------------- ---------- ---------- ----------
[new] pkg2 pkg2-1.0-1 DELETED b_owner src-tag-p
[new] pkg1 pkg1-1.0-1 COMPLETE b_owner src-tag
[new] pkg1 pkg1-1.0-2 COMPLETE b_owner src-tag
[new] pkg1 pkg1-1.1-2 COMPLETE b_owner src-tag
Action Package Group
------- ---------------------------- ----------------------------
[new] pkg1 group1
[new] pkg2 group1
[new] apkg group2
[new] bpkg group2
""")
@mock.patch('sys.stdout', new_callable=six.StringIO)
def test_handle_clone_tag_existing_dsttag(self, stdout):
args = ['src-tag', 'dst-tag', '--all', '-v']
self.session.multiCall.return_value = []
self.session.listPackages.side_effect = [[{'package_id': 1,
'package_name': 'pkg1',
'blocked': False,
'owner_name': 'userA',
'tag_name': 'src-tag',
'extra_arches': None},
{'package_id': 2,
'package_name': 'pkg2',
'blocked': True,
'owner_name': 'userB',
'tag_name': 'src-tag-p',
'extra_arches': 'arch3 arch4'},
{'package_id': 3,
'package_name': 'apkg',
'blocked': False,
'owner_name': 'userA',
'tag_name': 'src-tag-p',
'extra_arches': 'arch4'}],
[{'package_id': 1,
'package_name': 'pkg1',
'blocked': False,
'owner_name': 'userA',
'tag_name': 'src-tag',
'extra_arches': None},
{'package_id': 3,
'package_name': 'apkg',
'blocked': False,
'owner_name': 'userA',
'tag_name': 'src-tag-p',
'extra_arches': 'arch4'},
{'package_id': 4,
'package_name': 'bpkg',
'blocked': False,
'owner_name': 'userC',
'tag_name': 'src-tag',
'extra_arches': 'arch4'},
{'package_id': 5,
'package_name': 'cpkg',
'blocked': True,
'owner_name': 'userC',
'tag_name': 'src-tag-p',
'extra_arches': 'arch4'},
{'package_id': 6,
'package_name': 'dpkg',
'blocked': True,
'owner_name': 'userC',
'tag_name': 'src-tag',
'extra_arches': 'arch4'}
]]
self.session.listTagged.side_effect = [[{'package_name': 'pkg1',
'nvr': 'pkg1-1.1-2',
'state': 1,
'owner_name': 'b_owner',
'tag_name': 'src-tag'},
{'package_name': 'pkg1',
'nvr': 'pkg1-1.0-2',
'state': 1,
'owner_name': 'b_owner',
'tag_name': 'src-tag'},
{'package_name': 'pkg1',
'nvr': 'pkg1-0.1-1',
'state': 1,
'owner_name': 'b_owner',
'tag_name': 'src-tag'},
{'package_name': 'pkg1',
'nvr': 'pkg1-1.0-1',
'state': 1,
'owner_name': 'b_owner',
'tag_name': 'src-tag'},
{'package_name': 'pkg2',
'nvr': 'pkg2-1.0-1',
'state': 2,
'owner_name': 'b_owner',
'tag_name': 'src-tag-p'}
],
[{'package_name': 'pkg1',
'nvr': 'pkg1-2.1-2',
'state': 1,
'owner_name': 'b_owner',
'tag_name': 'dst-tag'},
{'package_name': 'pkg1',
'nvr': 'pkg1-1.0-1',
'state': 1,
'owner_name': 'b_owner',
'tag_name': 'dst-tag'},
{'package_name': 'pkg1',
'nvr': 'pkg1-0.1-1',
'state': 1,
'owner_name': 'b_owner',
'tag_name': 'dst-tag'},
{'package_name': 'pkg2',
'nvr': 'pkg2-1.0-1',
'state': 2,
'owner_name': 'b_owner',
'tag_name': 'dst-tag'}
]]
self.session.getTagGroups.side_effect = [[{'name': 'group1',
'tag_id': 1,
'packagelist': [
{'package': 'pkg1',
'blocked': False},
{'package': 'pkg2',
'blocked': False},
{'package': 'pkg3',
'blocked': False},
{'package': 'pkg4',
'blocked': False}
]},
{'name': 'group2',
'tag_id': 1,
'packagelist': [
{'package': 'apkg',
'blocked': False},
{'package': 'bpkg',
'blocked': False}]
}],
[{'name': 'group1',
'tag_id': 2,
'packagelist': [
{'package': 'pkg1',
'blocked': False},
{'package': 'pkg5',
'blocked': False}
]},
{'name': 'group2',
'tag_id': 3,
'packagelist': [
{'package': 'apkg',
'blocked': False},
{'package': 'cpkg',
'blocked': False}]},
{'name': 'group3',
'tag_id': 2,
'packagelist': [
{'package': 'cpkg',
'blocked': False},
{'package': 'dpkg',
'blocked': False}]},
{'name': 'group4',
'tag_id': 3,
'packagelist': [
{'package': 'epkg',
'blocked': False},
{'package': 'fpkg',
'blocked': False}]}
]]
self.session.getTag.side_effect = [{'id': 1,
'name': 'src-tag',
'arches': 'arch1 arch2',
'perm_id': 1,
'maven_support': False,
'maven_include_all': True,
'locked': False},
{'id': 2,
'name': 'dst-tag',
'arches': 'arch1 arch2',
'perm_id': 1,
'maven_support': False,
'maven_include_all': True,
'locked': False}]
handle_clone_tag(self.options, self.session, args)
self.activate_session.assert_called_once()
self.session.assert_has_calls([call.hasPerm('admin'),
call.getTag('src-tag'),
call.getTag('dst-tag'),
call.listPackages(event=None,
inherited=True,
tagID=1),
call.listPackages(inherited=True,
tagID=2),
call.listTagged(1, event=None,
inherit=None,
latest=None),
call.listTagged(2, inherit=False,
latest=None),
call.getTagGroups('src-tag',
event=None),
call.getTagGroups('dst-tag'),
call.packageListAdd('dst-tag', 'pkg2',
block=True,
extra_arches='arch3 arch4',
owner='userB'),
call.multiCall(batch=1000),
call.untagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-2.1-2',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'dst-tag',
'name': 'pkg1'}, force=None),
call.untagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-0.1-1',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'dst-tag',
'name': 'pkg1'}, force=None),
call.multiCall(batch=1000),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-0.1-1',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-1.0-2',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None),
call.tagBuildBypass('dst-tag', {
'owner_name': 'b_owner',
'nvr': 'pkg1-1.1-2',
'package_name': 'pkg1', 'state': 1,
'tag_name': 'src-tag',
'name': 'pkg1'}, force=None),
call.multiCall(batch=1000),
call.multiCall(batch=1000),
call.groupPackageListAdd('dst-tag',
'group1',
'pkg2',
force=None),
call.groupPackageListAdd('dst-tag',
'group1',
'pkg3',
force=None),
call.groupPackageListAdd('dst-tag',
'group1',
'pkg4',
force=None),
call.groupPackageListAdd('dst-tag',
'group2',
'bpkg',
force=None),
call.multiCall(batch=1000),
call.multiCall(batch=1000),
call.packageListBlock('dst-tag',
'bpkg'),
call.packageListBlock('dst-tag',
'cpkg'),
call.packageListBlock('dst-tag',
'dpkg'),
call.multiCall(batch=1000),
call.groupListBlock('dst-tag',
'group4'),
call.groupListRemove('dst-tag',
'group3',
force=None),
call.multiCall(batch=1000),
call.groupPackageListRemove('dst-tag',
'group1',
'pkg5',
force=None),
call.groupPackageListBlock('dst-tag',
'group2',
'cpkg'),
call.multiCall(batch=1000)])
self.assert_console_message(stdout, """
List of changes:
Action Package Blocked Owner From Tag
------- ---------------------------- ---------- ---------- ----------
[add] pkg2 True userB src-tag-p
[blk] bpkg False userC src-tag
[blk] cpkg True userC src-tag-p
[blk] dpkg True userC src-tag
Action From/To Package Build(s) State Owner From Tag
------- ---------------------------- ---------------------------------------- ---------- ---------- ----------
[del] pkg1 pkg1-2.1-2 COMPLETE b_owner dst-tag
[del] pkg1 pkg1-0.1-1 COMPLETE b_owner dst-tag
[add] pkg1 pkg1-0.1-1 COMPLETE b_owner src-tag
[add] pkg1 pkg1-1.0-2 COMPLETE b_owner src-tag
[add] pkg1 pkg1-1.1-2 COMPLETE b_owner src-tag
Action Package Group
------- ---------------------------- ----------------------------
[new] pkg2 group1
[new] pkg3 group1
[new] pkg4 group1
[new] bpkg group2
[blk] epkg group4
[blk] fpkg group4
[del] cpkg group3
[del] dpkg group3
[del] pkg5 group1
[blk] cpkg group2
""")
def test_handle_clone_tag_help(self):
self.assert_help(
handle_clone_tag,
"""Usage: %s clone-tag [options] <src-tag> <dst-tag>
(Specify the --help global option for a list of other help options)
Options:
-h, --help show this help message and exit
--config Copy config from the source to the dest tag
--groups Copy group information
--pkgs Copy package list from the source to the dest tag
--builds Tag builds into the dest tag
--all The same as --config --groups --pkgs --builds
--latest-only Tag only the latest build of each package
--inherit-builds Include all builds inherited into the source tag into the
dest tag
--ts=TS Clone tag at a specific timestamp
--event=EVENT Clone tag at a specific event
--repo=REPO Clone tag at a specific repo event
-v, --verbose show changes
-f, --force override tag locks if necessary
-n, --test test mode
--batch=SIZE batch size of multicalls [0 to disable, default: 1000]
""" % self.progname)
if __name__ == '__main__':
unittest.main()