debian-forge/inputs/org.osbuild.tree
Christian Kellner eb1d17d8ac input: add references and origin
Currently all options for inputs are totally opaque to osbuild
itself. This is neat from a seperation of concerns point of view
but has one major downside: osbuild can not verify the integrity
of the pipeline graph, i.e. if all inputs that need pipelines or
sources do indeed exists. Therefore intrdouce two generic fields
for inputs: `origin` and `references`. The former can either be
a source or a pipeline. The latter is an array of identifiers or
a dictionary where the keys are the identifiers and the values
are additional options for that id. The identifiers then refer
to either resources obtained via a source or a pipeline that has
already been built.
2021-02-06 12:04:30 +01:00

82 lines
1.7 KiB
Python
Executable file

#!/usr/bin/python3
"""
Tree inputs
Open the tree produced by the pipeline supplied via the
first and only entry in `references`. The tree is opened
in read only mode. If the id is `null` or the empty
string it returns an empty tree.
"""
import json
import sys
from osbuild.objectstore import StoreClient
SCHEMA = """
"additionalProperties": false,
"required": ["origin", "references"],
"properties": {
"origin": {
"description": "The origin of the input (must be 'org.osbuild.pipeline')",
"type": "string",
"enum": ["org.osbuild.pipeline"]
},
"references": {
"description": "Exactly one pipeline identifier to ues as tree input",
"oneOf": [{
"type": "array",
"additionalItems": false,
"items": [{
"type": "string"
}]
}, {
"type": "object",
"additionalProperties": false,
"patternProperties": {
".*": {
"type": "object",
"additionalProperties": false
}
},
"minProperties": 1,
"maxProperties": 1
}]
}
}
"""
def error(msg):
json.dump({"error": msg}, sys.stdout)
sys.exit(1)
def main():
args = json.load(sys.stdin)
refs = args["refs"]
# input verification *must* have been done via schema
# verification. It is expected that origin is a pipeline
# and we have exactly one reference, i.e. a pipeline id
pid, _ = refs.popitem()
store = StoreClient(connect_to=args["api"]["store"])
if not pid:
path = store.mkdtemp(prefix="empty")
else:
path = store.read_tree(pid)
if not path:
error(f"Could not find pipeline with id '{pid}'")
json.dump({"path": path}, sys.stdout)
return 0
if __name__ == '__main__':
r = main()
sys.exit(r)