#!/usr/bin/python3 """Inputs for container images from the host storage This is an input that pairs with the org.osbuild.containers-storage source. It makes the host container storage available to a stage and a container ID for the stage to use. Unlike all the other input types, this does not refer to static objects in the osbuild sources store but a dynamically mounted container store that depends on the availability of the resource (the specific container) on the host. """ import os import sys from osbuild import inputs from osbuild.util import host from osbuild.util.mnt import MountGuard SCHEMA = r""" "definitions": { "source-options": { "type": "object", "additionalProperties": false, "properties": { "name": { "type": "string", "description": "The name to use for the image" } } }, "source-object-ref": { "type": "object", "additionalProperties": false, "minProperties": 1, "patternProperties": { ".*": { "$ref": "#/definitions/source-options" } } }, "source-array-ref": { "type": "array", "minItems": 1, "items": { "type": "object", "additionalProperties": false, "required": ["id"], "properties": { "id": { "type": "string" }, "options": { "$ref": "#/definitions/source-options" } } } }, "source-origin": { "type": "string", "description": "When the origin of the input is a source", "enum": ["org.osbuild.source"] } }, "additionalProperties": false, "required": ["type", "origin", "references"], "properties": { "type": { "enum": ["org.osbuild.containers-storage"] }, "origin": { "description": "The org.osbuild.source origin case", "$ref": "#/definitions/source-origin" }, "references": { "description": "Container image id", "oneOf": [ {"$ref": "#/definitions/source-array-ref"}, {"$ref": "#/definitions/source-object-ref"} ] } } """ class ContainersStorageInput(inputs.InputService): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.mg = MountGuard() self.storage_conf = host.get_container_storage() def bind_mount_local_storage(self, target): source = self.storage_conf["storage"]["graphroot"] dest = os.path.join(target, "storage") # bind mount the input directory to the destination os.makedirs(dest, exist_ok=True) self.mg.mount(source, dest) def map(self, store, origin, refs, target, _options): self.bind_mount_local_storage(target) images = {} for ref, data in refs.items(): images[ref] = { "format": "containers-storage", "name": data["name"] } images[ref]["name"] = data["name"] images[ref]["storage"] = self.storage_conf["storage"] reply = { "path": target, "data": { "archives": images } } return reply def unmap(self): self.mg.umount() def main(): service = ContainersStorageInput.from_args(sys.argv[1:]) service.main() if __name__ == '__main__': main()