debian-forge/inputs/org.osbuild.tree
Christian Kellner 08bc9ab7d8 inputs: pre-defined input paths
Instead of bind-mounting each individual input into the container,
create a temporary directory that is used by all inputs and bind-
mount this to the well known location ("/run/osbuild/inputs"). The
temporary directory is then passed to the input so that it can
make the requested resources available relative to that directory.
This is enforced by the common input handling code.
Additionally, pass the well known input path via a new "paths" key
to the arguments dictionary passed to the stage.
2021-06-09 18:37:47 +01:00

84 lines
1.8 KiB
Python
Executable file

#!/usr/bin/python3
"""
Tree inputs
Open the tree produced by the pipeline supplied via the
first and only entry in `references`. The tree is opened
in read only mode. If the id is `null` or the empty
string it returns an empty tree.
"""
import json
import sys
from osbuild.objectstore import StoreClient
SCHEMA = """
"additionalProperties": false,
"required": ["type", "origin", "references"],
"properties": {
"type": {
"enum": ["org.osbuild.tree"]
},
"origin": {
"description": "The origin of the input (must be 'org.osbuild.pipeline')",
"type": "string",
"enum": ["org.osbuild.pipeline"]
},
"references": {
"description": "Exactly one pipeline identifier to ues as tree input",
"oneOf": [{
"type": "array",
"additionalItems": false,
"items": [{
"type": "string"
}]
}, {
"type": "object",
"additionalProperties": false,
"patternProperties": {
".*": {
"type": "object",
"additionalProperties": false
}
},
"minProperties": 1,
"maxProperties": 1
}]
}
}
"""
def error(msg):
json.dump({"error": msg}, sys.stdout)
sys.exit(1)
def main():
args = json.load(sys.stdin)
refs = args["refs"]
target = args["target"]
# input verification *must* have been done via schema
# verification. It is expected that origin is a pipeline
# and we have exactly one reference, i.e. a pipeline id
pid, _ = refs.popitem()
store = StoreClient(connect_to=args["api"]["store"])
if pid:
path = store.read_tree_at(pid, target)
if not path:
error(f"Could not find pipeline with id '{pid}'")
json.dump({"path": target}, sys.stdout)
return 0
if __name__ == '__main__':
r = main()
sys.exit(r)