From b65211a94d8440f04b8309d7bb11e9196c2be29c Mon Sep 17 00:00:00 2001 From: Christian Kellner Date: Wed, 16 Dec 2020 16:03:41 +0100 Subject: [PATCH] formats/v1: move validation logic here The validation of the manifest descritpion is eo ipso format specific and thus belongs into the format specific module. Adapt all usages throughout the codebase to directly use the version 1 specific function. --- osbuild/formats/v1.py | 53 +++++++++++++++++++++++++++++++++++++++ osbuild/main_cli.py | 3 ++- osbuild/meta.py | 54 +--------------------------------------- test/mod/test_osbuild.py | 13 +++++----- 4 files changed, 63 insertions(+), 60 deletions(-) diff --git a/osbuild/formats/v1.py b/osbuild/formats/v1.py index b8cd0e56..46e699c8 100644 --- a/osbuild/formats/v1.py +++ b/osbuild/formats/v1.py @@ -1,6 +1,7 @@ # Version 1 of the manifest description from typing import Dict +from osbuild.meta import Index, ValidationResult from ..pipeline import Pipeline, detect_host_runner @@ -31,3 +32,55 @@ def load(description: Dict, sources_options: Dict) -> Pipeline: pipeline.set_assembler(a["name"], a.get("options", {})) return pipeline + + +def validate(manifest: Dict, index: Index) -> ValidationResult: + """Validate a OSBuild manifest + + This function will validate a OSBuild manifest, including + all its stages and assembler and build manifests. It will + try to validate as much as possible and not stop on errors. + The result is a `ValidationResult` object that can be used + to check the overall validation status and iterate all the + individual validation errors. + """ + + schema = index.get_schema("Manifest") + result = schema.validate(manifest) + + # main pipeline + pipeline = manifest.get("pipeline", {}) + + # recursively validate the build pipeline as a "normal" + # pipeline in order to validate its stages and assembler + # options; for this it is being re-parented in a new plain + # {"pipeline": ...} dictionary. NB: Any nested structural + # errors might be detected twice, but de-duplicated by the + # `ValidationResult.merge` call + build = pipeline.get("build", {}).get("pipeline") + if build: + res = validate({"pipeline": build}, index=index) + result.merge(res, path=["pipeline", "build"]) + + stages = pipeline.get("stages", []) + for i, stage in enumerate(stages): + name = stage["name"] + schema = index.get_schema("Stage", name) + res = schema.validate(stage) + result.merge(res, path=["pipeline", "stages", i]) + + asm = pipeline.get("assembler", {}) + if asm: + name = asm["name"] + schema = index.get_schema("Assembler", name) + res = schema.validate(asm) + result.merge(res, path=["pipeline", "assembler"]) + + # sources + sources = manifest.get("sources", {}) + for name, source in sources.items(): + schema = index.get_schema("Source", name) + res = schema.validate(source) + result.merge(res, path=["sources", name]) + + return result diff --git a/osbuild/main_cli.py b/osbuild/main_cli.py index fbaa4481..0afa2c26 100644 --- a/osbuild/main_cli.py +++ b/osbuild/main_cli.py @@ -14,6 +14,7 @@ import sys import osbuild import osbuild.meta import osbuild.monitor +from osbuild.formats import v1 as fmt RESET = "\033[0m" @@ -106,7 +107,7 @@ def osbuild_cli(): # first thing after parsing is validation of the input index = osbuild.meta.Index(args.libdir) - res = osbuild.meta.validate(manifest, index) + res = fmt.validate(manifest, index) if not res: if args.json or args.inspect: json.dump(res.as_dict(), sys.stdout) diff --git a/osbuild/meta.py b/osbuild/meta.py index 81e37e73..691337b3 100644 --- a/osbuild/meta.py +++ b/osbuild/meta.py @@ -25,7 +25,7 @@ import copy import os import json from collections import deque -from typing import Dict, Iterable, List, Optional +from typing import Iterable, List, Optional import jsonschema @@ -429,55 +429,3 @@ class Index: self._schemata[(klass, name)] = schema return schema - - -def validate(manifest: Dict, index: Index) -> ValidationResult: - """Validate a OSBuild manifest - - This function will validate a OSBuild manifest, including - all its stages and assembler and build manifests. It will - try to validate as much as possible and not stop on errors. - The result is a `ValidationResult` object that can be used - to check the overall validation status and iterate all the - individual validation errors. - """ - - schema = index.get_schema("Manifest") - result = schema.validate(manifest) - - # main pipeline - pipeline = manifest.get("pipeline", {}) - - # recursively validate the build pipeline as a "normal" - # pipeline in order to validate its stages and assembler - # options; for this it is being re-parented in a new plain - # {"pipeline": ...} dictionary. NB: Any nested structural - # errors might be detected twice, but de-duplicated by the - # `ValidationResult.merge` call - build = pipeline.get("build", {}).get("pipeline") - if build: - res = validate({"pipeline": build}, index=index) - result.merge(res, path=["pipeline", "build"]) - - stages = pipeline.get("stages", []) - for i, stage in enumerate(stages): - name = stage["name"] - schema = index.get_schema("Stage", name) - res = schema.validate(stage) - result.merge(res, path=["pipeline", "stages", i]) - - asm = pipeline.get("assembler", {}) - if asm: - name = asm["name"] - schema = index.get_schema("Assembler", name) - res = schema.validate(asm) - result.merge(res, path=["pipeline", "assembler"]) - - # sources - sources = manifest.get("sources", {}) - for name, source in sources.items(): - schema = index.get_schema("Source", name) - res = schema.validate(source) - result.merge(res, path=["sources", name]) - - return result diff --git a/test/mod/test_osbuild.py b/test/mod/test_osbuild.py index f99279c1..b45311cf 100644 --- a/test/mod/test_osbuild.py +++ b/test/mod/test_osbuild.py @@ -11,6 +11,7 @@ import unittest import osbuild import osbuild.meta +from osbuild.formats import v1 as fmt from osbuild.monitor import NullMonitor from osbuild.pipeline import detect_host_runner from .. import test @@ -174,7 +175,7 @@ class TestDescriptions(unittest.TestCase): index = osbuild.meta.Index(os.curdir) # an empty manifest is OK - res = osbuild.meta.validate({}, index) + res = fmt.validate({}, index) self.assertEqual(res.valid, True) # something totally invalid (by Ondřej Budai) @@ -187,7 +188,7 @@ class TestDescriptions(unittest.TestCase): } } - res = osbuild.meta.validate(totally_invalid, index) + res = fmt.validate(totally_invalid, index) self.assertEqual(res.valid, False) # The top-level 'osbuild' is an additional property self.assertEqual(len(res), 1) @@ -201,7 +202,7 @@ class TestDescriptions(unittest.TestCase): } } - res = osbuild.meta.validate(no_runner, index) + res = fmt.validate(no_runner, index) self.assertEqual(res.valid, False) self.assertEqual(len(res), 1) # missing runner lst = res[".pipeline.build"] @@ -227,7 +228,7 @@ class TestDescriptions(unittest.TestCase): } } - res = osbuild.meta.validate(no_runner_extra, index) + res = fmt.validate(no_runner_extra, index) self.assertEqual(res.valid, False) self.assertEqual(len(res), 3) lst = res[".pipeline.build.pipeline"] @@ -251,7 +252,7 @@ class TestDescriptions(unittest.TestCase): } } - res = osbuild.meta.validate(stage_check, index) + res = fmt.validate(stage_check, index) self.assertEqual(res.valid, False) self.assertEqual(len(res), 2) lst = res[".pipeline.stages[0].options"] @@ -270,7 +271,7 @@ class TestDescriptions(unittest.TestCase): } } - res = osbuild.meta.validate(assembler_check, index) + res = fmt.validate(assembler_check, index) self.assertEqual(res.valid, False) self.assertEqual(len(res), 2) lst = res[".pipeline.assembler.options"]