Merge PR#177 allow tasks to fail on some arches for images/lives/appliances

Merges #177
https://pagure.io/koji/pull-request/177

Fixes #47
This commit is contained in:
Mike McLean 2017-01-12 14:28:45 -05:00
commit 12c7f8911e
5 changed files with 158 additions and 55 deletions

View file

@ -57,7 +57,6 @@ from ConfigParser import ConfigParser
from fnmatch import fnmatch
from gzip import GzipFile
from optparse import OptionParser, SUPPRESS_HELP
from StringIO import StringIO
from yum import repoMDObject
#imports for LiveCD, LiveMedia, and Appliance handler
@ -2406,18 +2405,39 @@ class BuildLiveMediaTask(BuildImageTask):
bld_info = self.initImageBuild(name, version, release,
target_info, opts)
subtasks = {}
canfail = []
for arch in arches:
subtasks[arch] = self.subtask('createLiveMedia',
[name, version, release, arch, target_info, build_tag,
repo_info, ksfile, opts],
label='livemedia %s' % arch, arch=arch)
if arch in opts.get('optional_arches', []):
canfail.append(subtasks[arch])
self.logger.debug("Tasks that can fail: %r", canfail)
self.logger.debug("Got image subtasks: %r", subtasks)
self.logger.debug("Waiting on livemedia subtasks...")
results = self.wait(subtasks.values(), all=True, failany=True)
results = self.wait(subtasks.values(), all=True, failany=True, canfail=canfail)
# if everything failed, fail even if all subtasks are in canfail
self.logger.debug('subtask results: %r', results)
all_failed = True
for result in results.values():
if not isinstance(result, dict) or 'faultCode' not in result:
all_failed = False
break
if all_failed:
raise koji.GenericError("all subtasks failed")
# determine ignored arch failures
ignored_arches = set()
for arch in arches:
if arch in opts.get('optional_arches', []):
task_id = subtasks[arch]
result = results[task_id]
if isinstance(result, dict) and 'faultCode' in result:
ignored_arches.add(arch)
# wrap each image an RPM if needed
spec_url = opts.get('specfile')
@ -2427,6 +2447,8 @@ class BuildLiveMediaTask(BuildImageTask):
subtask_id = subtasks[arch]
result = results[subtask_id]
tinfo = self.session.getTaskInfo(subtask_id)
if arch in ignored_arches:
continue
arglist = [spec_url, target_info, bld_info, tinfo,
{'repo_id': repo_info['id']}]
wrapper_tasks[arch] = self.subtask('wrapperRPM', arglist,
@ -2437,6 +2459,8 @@ class BuildLiveMediaTask(BuildImageTask):
# add wrapper rpm results into main results
for arch in arches:
if arch in ignored_arches:
continue
result = results[subtasks[arch]]
result2 = results2[wrapper_tasks[arch]]
result['rpmresults'] = result2

View file

@ -5477,12 +5477,15 @@ def handle_spin_livemedia(options, session, args):
help=_("SCM URL to spec file fragment to use to generate wrapper RPMs"))
parser.add_option("--skip-tag", action="store_true",
help=_("Do not attempt to tag package"))
parser.add_option("--can-fail", action="store", dest="optional_arches",
metavar="ARCH1,ARCH2,...", default="",
help=_("List of archs which are not blocking for build (separated by commas."))
(task_options, args) = parser.parse_args(args)
# Make sure the target and kickstart is specified.
if len(args) != 5:
parser.error(_("Five arguments are required: a name, a version, an" +
" architecture, a build target, and a relative path to" +
parser.error(_("Five arguments are required: a name, a version, a" +
" build target, an architecture, and a relative path to" +
" a kickstart file."))
assert False # pragma: no cover
_build_image(options, task_options, session, args, 'livemedia')
@ -5743,6 +5746,9 @@ def handle_image_build(options, session, args):
help=_("Create a scratch image"))
parser.add_option("--skip-tag", action="store_true",
help=_("Do not attempt to tag package"))
parser.add_option("--can-fail", action="store", dest="optional_arches",
metavar="ARCH1,ARCH2,...", default="",
help=_("List of archs which are not blocking for build (separated by commas."))
parser.add_option("--specfile", metavar="URL",
help=_("SCM URL to spec file fragment to use to generate wrapper RPMs"))
parser.add_option("--wait", action="store_true",
@ -5853,12 +5859,13 @@ def _build_image(options, task_opts, session, args, img_type):
ksfile = os.path.join(serverdir, os.path.basename(ksfile))
print
passthru_opts = [
'isoname', 'ksurl', 'ksversion', 'scratch', 'repo',
'release', 'skip_tag', 'vmem', 'vcpu', 'format', 'specfile',
'title', 'install_tree_url',
]
hub_opts = {}
hub_opts['optional_arches'] = task_opts.optional_arches.split(',')
passthru_opts = [
'format', 'install_tree_url', 'isoname', 'ksurl',
'ksversion', 'release', 'repo', 'scratch', 'skip_tag',
'specfile', 'title', 'vcpu', 'vmem',
]
for opt in passthru_opts:
val = getattr(task_opts, opt, None)
if val is not None:

View file

@ -400,7 +400,7 @@ class Task(object):
params, method = xmlrpclib.loads(xml_request)
return params
def getResult(self):
def getResult(self, raise_fault=True):
query = """SELECT state,result FROM task WHERE id = %(id)i"""
r = _fetchSingle(query, vars(self))
if not r:
@ -410,17 +410,20 @@ class Task(object):
raise koji.GenericError, "Task %i is canceled" % self.id
elif koji.TASK_STATES[state] not in ['CLOSED', 'FAILED']:
raise koji.GenericError, "Task %i is not finished" % self.id
# If the result is a Fault, then loads will raise it
# This is probably what we want to happen.
# Note that you can't really 'return' a fault over xmlrpc, you
# can only 'raise' them.
# If you try to return a fault as a value, it gets reduced to
# a mere struct.
# f = Fault(1,"hello"); print dumps((f,))
if xml_result.find('<?xml', 0, 10) == -1:
#handle older base64 encoded data
xml_result = base64.decodestring(xml_result)
result, method = xmlrpclib.loads(xml_result)
try:
# If the result is a Fault, then loads will raise it
# This is normally what we want to happen
result, method = xmlrpclib.loads(xml_result)
except xmlrpclib.Fault, fault:
if raise_fault:
raise
# Note that you can't really return a fault over xmlrpc, except by
# raising it. We return a dictionary in the same format that
# multiCall does.
return {'faultCode': fault.faultCode, 'faultString': fault.faultString}
return result[0]
def getInfo(self, strict=True, request=False):
@ -10066,9 +10069,9 @@ class RootExports(object):
task = Task(taskId)
return task.getRequest()
def getTaskResult(self, taskId):
def getTaskResult(self, taskId, raise_fault=True):
task = Task(taskId)
return task.getResult()
return task.getResult(raise_fault=raise_fault)
def getTaskInfo(self, task_id, request=False):
"""Get information about a task"""
@ -11094,7 +11097,10 @@ class Host(object):
c.execute(q, locals())
return [finished, unfinished]
def taskWaitResults(self, parent, tasks):
def taskWaitResults(self, parent, tasks, canfail=None):
if canfail is None:
canfail = []
results = {}
# If we're getting results, we're done waiting
self.taskUnwait(parent)
c = context.cnx.cursor()
@ -11108,16 +11114,17 @@ class Host(object):
# Query all subtasks
tasks = []
c.execute(q, locals())
for id, state in c.fetchall():
for task_id, state in c.fetchall():
if state == canceled:
raise koji.GenericError, "Subtask canceled"
elif state in (closed, failed):
tasks.append(id)
tasks.append(task_id)
# Would use a dict, but xmlrpc requires the keys to be strings
results = []
for id in tasks:
task = Task(id)
results.append([id, task.getResult()])
for task_id in tasks:
task = Task(task_id)
raise_fault = (task_id not in canfail)
results.append([task_id, task.getResult(raise_fault=raise_fault)])
return results
def getHostTasks(self):
@ -11301,10 +11308,10 @@ class HostExports(object):
host.verify()
return host.taskWait(parent)
def taskWaitResults(self, parent, tasks):
def taskWaitResults(self, parent, tasks, canfail=None):
host = Host()
host.verify()
return host.taskWaitResults(parent, tasks)
return host.taskWaitResults(parent, tasks, canfail)
def subtask(self, method, arglist, parent, **opts):
host = Host()
@ -11445,6 +11452,9 @@ class HostExports(object):
task.assertHost(host.id)
logger.debug('scratch image results: %s' % results)
for sub_results in results.values():
if 'task_id' not in sub_results:
logger.warning('Task %s failed, no image available' % task_id)
continue
workdir = koji.pathinfo.task(sub_results['task_id'])
scratchdir = koji.pathinfo.scratch()
username = get_user(task.getOwner())['name']
@ -11825,6 +11835,9 @@ class HostExports(object):
moving the image to its final location.
"""
for sub_results in results.values():
if 'task_id' not in sub_results:
logger.warning('Task %s failed, no image available' % task_id)
continue
importImageInternal(task_id, build_id, sub_results)
if sub_results.has_key('rpmresults'):
rpm_results = sub_results['rpmresults']

View file

@ -187,15 +187,20 @@ class BaseTaskHandler(object):
safe_rmtree(self.workdir, unmount=False, strict=True)
#os.spawnvp(os.P_WAIT, 'rm', ['rm', '-rf', self.workdir])
def wait(self, subtasks=None, all=False, failany=False):
def wait(self, subtasks=None, all=False, failany=False, canfail=None):
"""Wait on subtasks
subtasks is a list of integers (or an integer). If more than one subtask
is specified, then the default behavior is to return when any of those
tasks complete. However, if all is set to True, then it waits for all of
them to complete. If all and failany are both set to True, then each
finished task will be checked for failure, and a failure will cause all
of the unfinished tasks to be cancelled.
them to complete.
If all and failany are both set to True, then each finished task will
be checked for failure, and a failure will cause all of the unfinished
tasks to be cancelled.
If canfail is given a list of task ids, then those tasks can fail
without affecting the other tasks.
special values:
subtasks = None specify all subtasks
@ -206,6 +211,9 @@ class BaseTaskHandler(object):
the database and will send the subprocess corresponding to the
subtask a SIGUSR2 to wake it up when subtasks complete.
"""
if canfail is None:
canfail = []
if isinstance(subtasks, int):
# allow single integer w/o enclosing list
subtasks = [subtasks]
@ -221,6 +229,9 @@ class BaseTaskHandler(object):
if failany:
failed = False
for task in finished:
if task in canfail:
# no point in checking
continue
try:
self.session.getTaskResult(task)
except (koji.GenericError, xmlrpclib.Fault), task_error:
@ -243,9 +254,10 @@ class BaseTaskHandler(object):
self.logger.debug("...waking up")
self.logger.debug("Finished waiting")
if all:
return dict(self.session.host.taskWaitResults(self.id, subtasks))
else:
return dict(self.session.host.taskWaitResults(self.id, finished))
finished = subtasks
return dict(self.session.host.taskWaitResults(self.id, finished,
canfail=canfail))
def getUploadDir(self):
return koji.pathinfo.taskrelpath(self.id)
@ -390,17 +402,23 @@ class ForkTask(BaseTaskHandler):
os.spawnvp(os.P_NOWAIT, 'sleep', ['sleep', str(m)])
class WaitTestTask(BaseTaskHandler):
"""
Tests self.wait()
Starts few tasks which just sleeps. One of them will fail due to bad
arguments. As it is listed as 'canfail' it shouldn't affect overall
CLOSED status.
"""
Methods = ['waittest']
_taskWeight = 0.1
def handler(self, count, seconds=10):
tasks = []
for i in xrange(count):
task_id = self.session.host.subtask(method='sleep',
arglist=[seconds],
label=str(i),
parent=self.id)
task_id = self.subtask(method='sleep', arglist=[seconds], label=str(i), parent=self.id)
tasks.append(task_id)
results = self.wait(all=True)
bad_task = self.subtask('sleep', ['BAD_ARG'], label='bad')
tasks.append(bad_task)
results = self.wait(subtasks=tasks, all=True, failany=True, canfail=[bad_task])
self.logger.info(pprint.pformat(results))

View file

@ -1,14 +1,16 @@
import random
from io import StringIO
from os import path, makedirs
from shutil import rmtree
from tempfile import gettempdir
from unittest import TestCase
from mock import patch, Mock, call
from tempfile import gettempdir
from shutil import rmtree
from os import path, makedirs
from io import StringIO
import koji
from koji.tasks import scan_mounts, umount_all, safe_rmtree, BaseTaskHandler, FakeTask, SleepTask, ForkTask
from koji import BuildError, GenericError
from koji.tasks import BaseTaskHandler, FakeTask, ForkTask, SleepTask, \
WaitTestTask, scan_mounts, umount_all, \
safe_rmtree
def get_fake_mounts_file():
""" Returns contents of /prc/mounts in a file-like object
@ -271,7 +273,7 @@ class TasksTestCase(TestCase):
obj.session.host.taskWaitResults.return_value = taskWaitResults
self.assertEquals(obj.wait([1551234, 1591234]), dict(taskWaitResults))
obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])
obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234, 1591234])
obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234, 1591234], canfail=[])
def test_BaseTaskHandler_wait_some_not_done(self):
""" Tests that the wait function returns the one finished subtask results of
@ -296,7 +298,7 @@ class TasksTestCase(TestCase):
obj.session.host.taskWaitResults.return_value = taskWaitResults
self.assertEquals(obj.wait([1551234, 1591234]), dict(taskWaitResults))
obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])
obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234])
obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234], canfail=[])
@patch('signal.pause', return_value=None)
def test_BaseTaskHandler_wait_some_not_done_all_set(self, mock_signal_pause):
@ -336,7 +338,7 @@ class TasksTestCase(TestCase):
obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])
obj.session.host.taskWait.assert_has_calls([call(12345678), call(12345678)])
mock_signal_pause.assert_called_once_with()
obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234, 1591234])
obj.session.host.taskWaitResults.assert_called_once_with(12345678, [1551234, 1591234], canfail=[])
def test_BaseTaskHandler_wait_some_not_done_all_set_failany_set_failed_task(self):
""" Tests that the wait function raises an exception when one of the subtask fails when the failany flag is set
@ -348,11 +350,11 @@ class TasksTestCase(TestCase):
obj.session = Mock()
obj.session.host.taskSetWait.return_value = None
obj.session.host.taskWait.side_effect = [[[1551234], [1591234]], [[1551234, 1591234], []]]
obj.session.getTaskResult.side_effect = GenericError('Uh oh, we\'ve got a problem here!')
obj.session.getTaskResult.side_effect = koji.GenericError('Uh oh, we\'ve got a problem here!')
try:
obj.wait([1551234, 1591234], all=True, failany=True)
raise Exception('A GeneralError was not raised.')
except GenericError as e:
except koji.GenericError as e:
self.assertEquals(e.message, 'Uh oh, we\'ve got a problem here!')
obj.session.host.taskSetWait.assert_called_once_with(12345678, [1551234, 1591234])
@ -509,7 +511,7 @@ class TasksTestCase(TestCase):
try:
obj.find_arch('noarch', host, None)
raise Exception('The BuildError Exception was not raised')
except BuildError as e:
except koji.BuildError as e:
self.assertEquals(e.message, 'No arch list for this host: test.domain.local')
def test_BaseTaskHandler_find_arch_noarch_bad_tag(self):
@ -524,7 +526,7 @@ class TasksTestCase(TestCase):
try:
obj.find_arch('noarch', host, tag)
raise Exception('The BuildError Exception was not raised')
except BuildError as e:
except koji.BuildError as e:
self.assertEquals(e.message, 'No arch list for tag: some_package-1.2-build')
def test_BaseTaskHandler_find_arch_noarch(self):
@ -550,7 +552,7 @@ class TasksTestCase(TestCase):
try:
obj.find_arch('noarch', host, tag)
raise Exception('The BuildError Exception was not raised')
except BuildError as e:
except koji.BuildError as e:
self.assertEquals(e.message, ('host test.domain.local (i386) does not support '
'any arches of tag some_package-1.2-build (aarch64, x86_64)'))
@ -646,7 +648,7 @@ class TasksTestCase(TestCase):
try:
obj.getRepo(8472)
raise Exception('The BuildError Exception was not raised')
except BuildError as e:
except koji.BuildError as e:
obj.session.getRepo.assert_called_once_with(8472)
self.assertEquals(e.message, 'no repo (and no target) for tag rhel-7.3-build')
@ -671,3 +673,42 @@ class TasksTestCase(TestCase):
obj = ForkTask(123, 'fork', [1, 20], None, None, (get_tmp_dir_path('ForkTask')))
obj.run()
mock_spawnvp.assert_called_once_with(1, 'sleep', ['sleep', '20'])
@patch('signal.pause', return_value=None)
@patch('time.sleep')
def test_WaitTestTask_handler(self, mock_sleep, mock_signal_pause):
""" Tests that the WaitTestTask handler can be instantiated and runs appropriately based on the input
Specifically, that forking works and canfail behaves correctly.
"""
self.mock_subtask_id = 1
def mock_subtask(method, arglist, id, **opts):
self.assertEqual(method, 'sleep')
task_id = self.mock_subtask_id
self.mock_subtask_id += 1
obj = SleepTask(task_id, 'sleep', arglist, None, None, (get_tmp_dir_path('SleepTask')))
obj.run()
return task_id
mock_taskWait = [
[[], [1, 2, 3, 4]],
[[3, 4], [1, 2]],
[[1, 2, 3, 4], []],
]
def mock_getTaskResult(task_id):
if task_id == 4:
raise koji.GenericError()
obj = WaitTestTask(123, 'waittest', [3], None, None, (get_tmp_dir_path('WaitTestTask')))
obj.session = Mock()
obj.session.host.subtask.side_effect = mock_subtask
obj.session.getTaskResult.side_effect = mock_getTaskResult
obj.session.host.taskWait.side_effect = mock_taskWait
obj.session.host.taskWaitResults.return_value = [ ['1', {}], ['2', {}], ['3', {}], ['4', {}], ]
obj.run()
#self.assertEqual(mock_sleep.call_count, 4)
obj.session.host.taskSetWait.assert_called_once()
obj.session.host.taskWait.assert_has_calls([call(123), call(123), call(123)])
# getTaskResult should be called in 2nd round only for task 3, as 4
# will be skipped as 'canfail'
obj.session.getTaskResult.assert_has_calls([call(3)])