""" Pipeline inputs A pipeline input provides data in various forms to a `Stage`, like files, OSTree commits or trees. The content can either be obtained via a `Source` or have been built by a `Pipeline`. Thus an `Input` is the bridge between various types of content that originate from different types of sources. The acceptable origin of the data is determined by the `Input` itself. What types of input are allowed and required is determined by the `Stage`. To osbuild itself this is all transparent. The only data visible to osbuild is the path. The input options are just passed to the `Input` as is and the result is forwarded to the `Stage`. """ import hashlib import importlib import json import os import subprocess from typing import Dict, Optional, Tuple from .objectstore import StoreServer class Input: """ A single input with its corresponding options. """ def __init__(self, info, origin: str, options: Dict): self.info = info self.origin = origin self.refs = {} self.options = options or {} self.id = self.calc_id() def add_reference(self, ref, options: Optional[Dict] = None): self.refs[ref] = options or {} self.id = self.calc_id() def calc_id(self): m = hashlib.sha256() m.update(json.dumps(self.name, sort_keys=True).encode()) m.update(json.dumps(self.origin, sort_keys=True).encode()) m.update(json.dumps(self.refs, sort_keys=True).encode()) m.update(json.dumps(self.options, sort_keys=True).encode()) return m.hexdigest() @property def name(self) -> str: return self.info.name def run(self, storeapi: StoreServer) -> Tuple[str, Dict]: name = self.info.name msg = { # mandatory bits "origin": self.origin, "refs": self.refs, # global options "options": self.options, # API endpoints "api": { "store": storeapi.socket_address } } # We want the `osbuild` python package that contains this # very module, which might be different from the system wide # installed one, to be accessible to the Input programs so # we detect our origin and set the `PYTHONPATH` accordingly modorigin = importlib.util.find_spec("osbuild").origin modpath = os.path.dirname(modorigin) env = os.environ.copy() env["PYTHONPATH"] = os.path.dirname(modpath) r = subprocess.run([self.info.path], env=env, input=json.dumps(msg), stdout=subprocess.PIPE, encoding="utf-8", check=False) try: reply = json.loads(r.stdout) except ValueError: raise RuntimeError(f"{name}: error: {r.stderr}") from None if "error" in reply: raise RuntimeError(f"{name}: " + reply["error"]) if r.returncode != 0: raise RuntimeError(f"{name}: error {r.returncode}") path, data = reply["path"], reply.get("data", {}) return path, data