From e77d95f4b771c5b455e9c2d6802fc58f7bad086f Mon Sep 17 00:00:00 2001 From: Christian Kellner Date: Fri, 24 Apr 2020 17:40:17 +0200 Subject: [PATCH] osbuild: add meta module for metadata information This new module contains utilities that help to introspect parts that constitute the inner parts of osbuild, i.e. its stages and assembler (which is also considered a type of stage in this context). It contains the `StageInfo` class that can that contains meta-information about the individual stage, such as a short information (`info`), a longer description (`desc`) and its JSON schema. A new Schema class represents schema data and has a `validation` method that can be used to validate that json data conforms to said schema. A `Index` class can be used to obtain `StageInfo` and `Schema` for entities identified via `klass` and `name`. A top level `validate` method is introduced that can validate manifest data. Internally it uses the `jsonschema` package so add that as a requirement and Install this dependency in the CI. --- .github/workflows/runtime-tests.yml | 8 + .github/workflows/tests.yml | 5 +- osbuild/meta.py | 426 ++++++++++++++++++++++++++++ requirements.txt | 1 + setup.py | 3 + 5 files changed, 442 insertions(+), 1 deletion(-) create mode 100644 osbuild/meta.py create mode 100644 requirements.txt diff --git a/.github/workflows/runtime-tests.yml b/.github/workflows/runtime-tests.yml index f00340e4..5109cb51 100644 --- a/.github/workflows/runtime-tests.yml +++ b/.github/workflows/runtime-tests.yml @@ -31,6 +31,8 @@ jobs: uses: actions/setup-python@v1 with: python-version: 3.7 + - name: Install Python Packages + run: pip install jsonschema - name: "Run Pipeline Tests" run: sudo env "PATH=$PATH" make test-runtime @@ -49,6 +51,8 @@ jobs: uses: actions/setup-python@v1 with: python-version: 3.7 + - name: Install Python Packages + run: pip install jsonschema - name: "Run Noop-Pipeline Tests" run: | for i in {0..2} ; do @@ -75,6 +79,8 @@ jobs: uses: actions/setup-python@v1 with: python-version: 3.7 + - name: Install Python Packages + run: pip install jsonschema - name: "Run Assembler Tests" run: sudo env "PATH=$PATH" python3 -m unittest -v test.test_assemblers @@ -95,5 +101,7 @@ jobs: uses: actions/setup-python@v1 with: python-version: 3.7 + - name: Install Python Packages + run: pip install jsonschema - name: "Run Stage Tests" run: sudo env "PATH=$PATH" python3 -m unittest -v test.test_stages diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 97537a43..870ead16 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,7 +9,7 @@ jobs: container: docker.io/library/python:3.7 steps: - name: Install pylint - run: pip install pylint==2.4.1 + run: pip install pylint==2.4.1 jsonschema - name: Clone repository uses: actions/checkout@v2 - name: Run pylint @@ -67,6 +67,9 @@ jobs: with: path: osbuild + - name: Install Dependencies + run: pip install jsonschema + - name: Run test_osbuild run: | cd osbuild diff --git a/osbuild/meta.py b/osbuild/meta.py new file mode 100644 index 00000000..792edc77 --- /dev/null +++ b/osbuild/meta.py @@ -0,0 +1,426 @@ +"""Introspection and validation for osbuild + +This module contains utilities that help to introspect parts +that constitute the inner parts of osbuild, i.e. its stages +and assembler (which is also considered a type of stage in +this context). Additionally, it provides classes and functions +to do schema validation of OSBuild manifests and stage options. + +A central `Index` class can be used to obtain stage and schema +information. For the former a `StageInfo` class is returned via +`Index.get_stage_info`, which contains meta-information about +the individual stages. Schemata, obtained via `Index.get_schema` +is represented via a `Schema` class that can in turn be used +to validate the individual components. + +The high level `validate` function can be used to check a given +manifest (parsed form JSON input in dictionary form) against all +available schemata. The result is a `ValidationResult` which +contains a single `ValidationError` for each error detected in +the manifest. See the individual documentation for details. +""" +import ast +import contextlib +import copy +import os +import json +from collections import deque +from typing import Dict, Iterable, Optional + +import jsonschema + + +FAILED_TITLE = "JSON Schema validation failed" +FAILED_TYPEURI = "https://osbuild.org/validation-error" + + +class ValidationError: + """Describes a single failed validation + + Consists of a `message` member describing the error + that occurred and a `path` that points to the element + that caused the error. + Implements hashing, equality and less-than and thus + can be sorted and used in sets and dictionaries. + """ + + def __init__(self, message: str): + self.message = message + self.path = deque() + + @classmethod + def from_exception(cls, ex): + err = cls(ex.message) + err.path = ex.absolute_path + return err + + @property + def id(self): + if not self.path: + return "." + + result = "" + for p in self.path: + if isinstance(p, str): + if " " in p: + p = f"'{p}'" + result += "." + p + elif isinstance(p, int): + result += f"[{p}]" + else: + assert "new type" + + return result + + def as_dict(self): + """Serializes this object as a dictionary + + The `path` member will be serialized as a list of + components (string or integer) and `message` the + human readable message string. + """ + return { + "message": self.message, + "path": list(self.path) + } + + def rebase(self, path: Iterable[str]): + """Prepend the `path` to `self.path`""" + rev = reversed(path) + self.path.extendleft(rev) + + def __hash__(self): + return hash((self.id, self.message)) + + def __eq__(self, other: "ValidationError"): + if not isinstance(other, ValidationError): + raise ValueError("Need ValidationError") + + if self.id != other.id: + return False + return self.message == other.message + + def __lt__(self, other: "ValidationError"): + if not isinstance(other, ValidationError): + raise ValueError("Need ValidationError") + + return self.id < other.id + + def __str__(self): + return f"ValidationError: {self.message} [{self.id}]" + + +class ValidationResult: + """Result of a JSON Schema validation""" + + def __init__(self, origin: Optional[str]): + self.origin = origin + self.errors = set() + + def fail(self, msg: str) -> ValidationError: + """Add a new `ValidationError` with `msg` as message""" + err = ValidationError(msg) + self.errors.add(err) + return err + + def add(self, err: ValidationError): + """Add a `ValidationError` to the set of errors""" + self.errors.add(err) + return self + + def merge(self, result: "ValidationResult", *, path=[]): + """Merge all errors of `result` into this + + Merge all the errors of in `result` into this, + adjusting their the paths be pre-pending the + supplied `path`. + """ + for err in result: + err = copy.deepcopy(err) + err.rebase(path) + self.errors.add(err) + + def as_dict(self): + """Represent this result as a dictionary + + If there are not errors, returns an empty dict; + otherwise it will contain a `type`, `title` and + `errors` field. The `title` is a human readable + description, the `type` is a URI identifying + the validation error type and errors is a list + of `ValueErrors`, in turn serialized as dict. + Additionally, a `success` member is provided to + be compatible with pipeline build results. + """ + errors = [e.as_dict() for e in self] + if not errors: + return {} + + return { + "type": FAILED_TYPEURI, + "title": FAILED_TITLE, + "success": False, + "errors": errors + } + + @property + def valid(self): + """Returns `True` if there are zero errors""" + return len(self) == 0 + + def __iadd__(self, error: ValidationError): + return self.add(error) + + def __bool__(self): + return self.valid + + def __len__(self): + return len(self.errors) + + def __iter__(self): + return iter(sorted(self.errors)) + + def __str__(self): + return f"ValidationResult: {len(self)} error(s)" + + def __getitem__(self, key): + if not isinstance(key, str): + raise ValueError("Only string keys allowed") + + lst = list(filter(lambda e: e.id == key, self)) + if not lst: + raise IndexError(f"{key} not found") + + return lst + + +class Schema: + """JSON Schema representation + + Class that represents a JSON schema. The `data` attribute + contains the actual schema data itself. The `klass` and + (optional) `name` refer to entity this schema belongs to. + The schema information can be used to validate data via + the `validate` method. + + The class can be created with empty schema data. In that + case it represents missing schema information. Any call + to `validate` will then result in a failure. + + The truth value of this objects corresponds to it having + schema data. + """ + + def __init__(self, schema: str, name: Optional[str] = None): + self.data = schema + self.name = name + if schema: + self._validator = jsonschema.Draft7Validator(schema) + + def validate(self, target) -> ValidationResult: + """Validate the `target` against this schema + + If the schema information itself is missing, it + will return a `ValidationResult` in failed state, + with 'missing schema information' as the reason. + """ + res = ValidationResult(self.name) + + if not self.data: + return res.fail("missing schema information") + + for error in self._validator.iter_errors(target): + res += ValidationError.from_exception(error) + + return res + + def __bool__(self): + return self.data + + +class StageInfo: + """Meta information about a stage + + Represents the information about a osbuild pipeline + stage or assembler (here also considered to be a stage). + Contains the short description (`desc`), a longer + description (`info`) and the JSON schema of valid options + (`opts`). The `validate` method will check a the options + of a stage instance against the JSON schema. + + Normally this class is instantiated via its `load` method. + """ + + def __init__(self, klass: str, name: str, info: str): + self.name = name + self.type = klass + + opts = info.get("STAGE_OPTS") or "" + self.info = info.get("STAGE_INFO") + self.desc = info.get("STAGE_DESC") + self.opts = info.get("STAGE_OPTS") + self.opts = json.loads("{" + opts + "}") + + @property + def schema(self): + schema = { + "title": f"Pipeline {self.type}", + "type": "object", + "additionalProperties": False, + "properties": { + "name": {"type": "string"}, + "options": { + "type": "object", + **self.opts + } + }, + "required": ["name"] + } + + # if there are is a definitions node, it needs to be at + # the top level schema node, since the schema inside the + # stages is written as-if they were the root node and + # so are the references + definitions = self.opts.get("definitions") + if definitions: + schema["definitions"] = definitions + del schema["properties"]["options"]["definitions"] + + return schema + + @classmethod + def load(cls, root, klass, name) -> Optional["StageInfo"]: + names = ['STAGE_INFO', 'STAGE_DESC', 'STAGE_OPTS'] + + def value(a): + v = a.value + if isinstance(v, ast.Str): + return v.s + return "" + + def filter_type(lst, target): + return [x for x in lst if isinstance(x, target)] + + def targets(a): + return [t.id for t in filter_type(a.targets, ast.Name)] + + mapping = { + "Stage": "stages", + "Assembler": "assemblers" + } + + if klass not in mapping: + raise ValueError(f"Unsupported type: {klass}") + + base = mapping[klass] + + path = os.path.join(root, base, name) + try: + with open(path) as f: + data = f.read() + except FileNotFoundError: + return None + + tree = ast.parse(data, name) + assigns = filter_type(tree.body, ast.Assign) + targets = [(t, a) for a in assigns for t in targets(a)] + info = {k: value(v) for k, v in targets if k in names} + return cls(klass, name, info) + + +class Index: + """Index of stages and assemblers + + Class that can be used to get the meta information about + osbuild stages and assemblers as well as JSON schemata. + """ + + def __init__(self, path: str): + self.path = path + self._stage_info = {} + self._schemata = {} + + def get_stage_info(self, klass, name) -> Optional[StageInfo]: + """Obtain `StageInfo` for a given stage or assembler""" + + if (klass, name) not in self._stage_info: + + info = StageInfo.load(self.path, klass, name) + self._stage_info[(klass, name)] = info + + return self._stage_info[(klass, name)] + + def get_schema(self, klass, name=None) -> Schema: + """Obtain a `Schema` for `klass` and `name` (optional) + + Returns a `Schema` for the entity identified via `klass` + and `name` (if given). Always returns a `Schema` even if + no schema information could be found for the entity. In + that case the actual schema data for `Schema` will be + `None` and any validation will fail. + """ + schema = self._schemata.get((klass, name)) + if schema is not None: + return schema + + if klass == "Manifest": + path = f"{self.path}/schemas/osbuild1.json" + with contextlib.suppress(FileNotFoundError): + with open(path, "r") as f: + schema = json.load(f) + elif klass in ["Stage", "Assembler"]: + info = self.get_stage_info(klass, name) + if info: + schema = info.schema + else: + raise ValueError(f"Unknown klass: {klass}") + + schema = Schema(schema, name or klass) + self._schemata[(klass, name)] = schema + + return schema + + +def validate(manifest: Dict, index: Index) -> ValidationResult: + """Validate a OSBuild manifest + + This function will validate a OSBuild manifest, including + all its stages and assembler and build manifests. It will + try to validate as much as possible and not stop on errors. + The result is a `ValidationResult` object that can be used + to check the overall validation status and iterate all the + individual validation errors. + """ + + schema = index.get_schema("Manifest") + result = schema.validate(manifest) + + # main pipeline + pipeline = manifest.get("pipeline", {}) + + # recursively validate the build pipeline as a "normal" + # pipeline in order to validate its stages and assembler + # options; for this it is being re-parented in a new plain + # {"pipeline": ...} dictionary. NB: Any nested structural + # errors might be detected twice, but de-duplicated by the + # `ValidationResult.merge` call + build = pipeline.get("build", {}).get("pipeline") + if build: + res = validate({"pipeline": build}, index=index) + result.merge(res, path=["pipeline", "build"]) + + stages = pipeline.get("stages", []) + for i, stage in enumerate(stages): + name = stage["name"] + schema = index.get_schema("Stage", name) + res = schema.validate(stage) + result.merge(res, path=["pipeline", "stages", i]) + + asm = pipeline.get("assembler", {}) + if asm: + name = asm["name"] + schema = index.get_schema("Assembler", name) + res = schema.validate(asm) + result.merge(res, path=["pipeline", "assembler"]) + + return result diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..d89304b1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +jsonschema diff --git a/setup.py b/setup.py index 519a4b52..0612d119 100644 --- a/setup.py +++ b/setup.py @@ -6,6 +6,9 @@ setuptools.setup( description="A build system for OS images", packages=["osbuild", "osbuild.util"], license='Apache-2.0', + install_requires=[ + "jsonschema", + ], entry_points={ "console_scripts": [ "osbuild = osbuild.main_cli:main_cli"