diff --git a/osbuild/formats/v1.py b/osbuild/formats/v1.py index b8cd0e56..46e699c8 100644 --- a/osbuild/formats/v1.py +++ b/osbuild/formats/v1.py @@ -1,6 +1,7 @@ # Version 1 of the manifest description from typing import Dict +from osbuild.meta import Index, ValidationResult from ..pipeline import Pipeline, detect_host_runner @@ -31,3 +32,55 @@ def load(description: Dict, sources_options: Dict) -> Pipeline: pipeline.set_assembler(a["name"], a.get("options", {})) return pipeline + + +def validate(manifest: Dict, index: Index) -> ValidationResult: + """Validate a OSBuild manifest + + This function will validate a OSBuild manifest, including + all its stages and assembler and build manifests. It will + try to validate as much as possible and not stop on errors. + The result is a `ValidationResult` object that can be used + to check the overall validation status and iterate all the + individual validation errors. + """ + + schema = index.get_schema("Manifest") + result = schema.validate(manifest) + + # main pipeline + pipeline = manifest.get("pipeline", {}) + + # recursively validate the build pipeline as a "normal" + # pipeline in order to validate its stages and assembler + # options; for this it is being re-parented in a new plain + # {"pipeline": ...} dictionary. NB: Any nested structural + # errors might be detected twice, but de-duplicated by the + # `ValidationResult.merge` call + build = pipeline.get("build", {}).get("pipeline") + if build: + res = validate({"pipeline": build}, index=index) + result.merge(res, path=["pipeline", "build"]) + + stages = pipeline.get("stages", []) + for i, stage in enumerate(stages): + name = stage["name"] + schema = index.get_schema("Stage", name) + res = schema.validate(stage) + result.merge(res, path=["pipeline", "stages", i]) + + asm = pipeline.get("assembler", {}) + if asm: + name = asm["name"] + schema = index.get_schema("Assembler", name) + res = schema.validate(asm) + result.merge(res, path=["pipeline", "assembler"]) + + # sources + sources = manifest.get("sources", {}) + for name, source in sources.items(): + schema = index.get_schema("Source", name) + res = schema.validate(source) + result.merge(res, path=["sources", name]) + + return result diff --git a/osbuild/main_cli.py b/osbuild/main_cli.py index fbaa4481..0afa2c26 100644 --- a/osbuild/main_cli.py +++ b/osbuild/main_cli.py @@ -14,6 +14,7 @@ import sys import osbuild import osbuild.meta import osbuild.monitor +from osbuild.formats import v1 as fmt RESET = "\033[0m" @@ -106,7 +107,7 @@ def osbuild_cli(): # first thing after parsing is validation of the input index = osbuild.meta.Index(args.libdir) - res = osbuild.meta.validate(manifest, index) + res = fmt.validate(manifest, index) if not res: if args.json or args.inspect: json.dump(res.as_dict(), sys.stdout) diff --git a/osbuild/meta.py b/osbuild/meta.py index 81e37e73..691337b3 100644 --- a/osbuild/meta.py +++ b/osbuild/meta.py @@ -25,7 +25,7 @@ import copy import os import json from collections import deque -from typing import Dict, Iterable, List, Optional +from typing import Iterable, List, Optional import jsonschema @@ -429,55 +429,3 @@ class Index: self._schemata[(klass, name)] = schema return schema - - -def validate(manifest: Dict, index: Index) -> ValidationResult: - """Validate a OSBuild manifest - - This function will validate a OSBuild manifest, including - all its stages and assembler and build manifests. It will - try to validate as much as possible and not stop on errors. - The result is a `ValidationResult` object that can be used - to check the overall validation status and iterate all the - individual validation errors. - """ - - schema = index.get_schema("Manifest") - result = schema.validate(manifest) - - # main pipeline - pipeline = manifest.get("pipeline", {}) - - # recursively validate the build pipeline as a "normal" - # pipeline in order to validate its stages and assembler - # options; for this it is being re-parented in a new plain - # {"pipeline": ...} dictionary. NB: Any nested structural - # errors might be detected twice, but de-duplicated by the - # `ValidationResult.merge` call - build = pipeline.get("build", {}).get("pipeline") - if build: - res = validate({"pipeline": build}, index=index) - result.merge(res, path=["pipeline", "build"]) - - stages = pipeline.get("stages", []) - for i, stage in enumerate(stages): - name = stage["name"] - schema = index.get_schema("Stage", name) - res = schema.validate(stage) - result.merge(res, path=["pipeline", "stages", i]) - - asm = pipeline.get("assembler", {}) - if asm: - name = asm["name"] - schema = index.get_schema("Assembler", name) - res = schema.validate(asm) - result.merge(res, path=["pipeline", "assembler"]) - - # sources - sources = manifest.get("sources", {}) - for name, source in sources.items(): - schema = index.get_schema("Source", name) - res = schema.validate(source) - result.merge(res, path=["sources", name]) - - return result diff --git a/test/mod/test_osbuild.py b/test/mod/test_osbuild.py index f99279c1..b45311cf 100644 --- a/test/mod/test_osbuild.py +++ b/test/mod/test_osbuild.py @@ -11,6 +11,7 @@ import unittest import osbuild import osbuild.meta +from osbuild.formats import v1 as fmt from osbuild.monitor import NullMonitor from osbuild.pipeline import detect_host_runner from .. import test @@ -174,7 +175,7 @@ class TestDescriptions(unittest.TestCase): index = osbuild.meta.Index(os.curdir) # an empty manifest is OK - res = osbuild.meta.validate({}, index) + res = fmt.validate({}, index) self.assertEqual(res.valid, True) # something totally invalid (by Ondřej Budai) @@ -187,7 +188,7 @@ class TestDescriptions(unittest.TestCase): } } - res = osbuild.meta.validate(totally_invalid, index) + res = fmt.validate(totally_invalid, index) self.assertEqual(res.valid, False) # The top-level 'osbuild' is an additional property self.assertEqual(len(res), 1) @@ -201,7 +202,7 @@ class TestDescriptions(unittest.TestCase): } } - res = osbuild.meta.validate(no_runner, index) + res = fmt.validate(no_runner, index) self.assertEqual(res.valid, False) self.assertEqual(len(res), 1) # missing runner lst = res[".pipeline.build"] @@ -227,7 +228,7 @@ class TestDescriptions(unittest.TestCase): } } - res = osbuild.meta.validate(no_runner_extra, index) + res = fmt.validate(no_runner_extra, index) self.assertEqual(res.valid, False) self.assertEqual(len(res), 3) lst = res[".pipeline.build.pipeline"] @@ -251,7 +252,7 @@ class TestDescriptions(unittest.TestCase): } } - res = osbuild.meta.validate(stage_check, index) + res = fmt.validate(stage_check, index) self.assertEqual(res.valid, False) self.assertEqual(len(res), 2) lst = res[".pipeline.stages[0].options"] @@ -270,7 +271,7 @@ class TestDescriptions(unittest.TestCase): } } - res = osbuild.meta.validate(assembler_check, index) + res = fmt.validate(assembler_check, index) self.assertEqual(res.valid, False) self.assertEqual(len(res), 2) lst = res[".pipeline.assembler.options"]