From 1ed85dc790003228bd3d9c0a5336b5e9de903481 Mon Sep 17 00:00:00 2001 From: Christian Kellner Date: Mon, 7 Jun 2021 12:50:13 +0200 Subject: [PATCH] inputs: convert to host service Create a `InputService` class with an abstract method called `map`, meant to be implemented by all inputs. An `unmap` method may be optionally overridden by inputs to cleanup resources. Instantiate a `host.ServiceManager` in the `Stage.run` section and pass the to the host side input code so it can be used to spawn the input services. Convert all existing inputs to the new service framework. --- inputs/org.osbuild.files | 39 +++++++++------------- inputs/org.osbuild.noop | 35 ++++++++++++-------- inputs/org.osbuild.ostree | 46 +++++++++++++------------- inputs/org.osbuild.tree | 48 ++++++++++++--------------- osbuild/inputs.py | 69 ++++++++++++++++++++------------------- osbuild/pipeline.py | 6 +++- 6 files changed, 122 insertions(+), 121 deletions(-) diff --git a/inputs/org.osbuild.files b/inputs/org.osbuild.files index c1f08a25..358a2d2d 100755 --- a/inputs/org.osbuild.files +++ b/inputs/org.osbuild.files @@ -14,12 +14,10 @@ like `rpm.` to avoid namespace clashes. This is enforced via schema validation. """ - -import json import sys import subprocess -from osbuild.objectstore import StoreClient +from osbuild import inputs SCHEMA = r""" @@ -77,16 +75,12 @@ SCHEMA = r""" """ -def main(): - args = json.load(sys.stdin) - refs = args["refs"] - target = args["target"] +class FilesInput(inputs.InputService): - store = StoreClient(connect_to=args["api"]["store"]) - source = store.source("org.osbuild.files") + def map(self, store, _origin, refs, target, _options): - for checksum in refs: - try: + source = store.source("org.osbuild.files") + for checksum in refs: subprocess.run( [ "ln", @@ -95,21 +89,20 @@ def main(): ], check=True, ) - except subprocess.CalledProcessError as e: - json.dump({"error": e.output}, sys.stdout) - return 1 - reply = { - "path": target, - "data": { - "refs": refs + reply = { + "path": target, + "data": { + "refs": refs + } } - } + return reply - json.dump(reply, sys.stdout) - return 0 + +def main(): + service = FilesInput.from_args(sys.argv[1:]) + service.main() if __name__ == '__main__': - r = main() - sys.exit(r) + main() diff --git a/inputs/org.osbuild.noop b/inputs/org.osbuild.noop index 87f6a358..2fafd857 100755 --- a/inputs/org.osbuild.noop +++ b/inputs/org.osbuild.noop @@ -7,31 +7,38 @@ it to the stage. """ -import json import os import sys import uuid +from osbuild import inputs SCHEMA = """ "additionalProperties": true """ +class NoopInput(inputs.InputService): + + def map(self, _store, _origin, refs, target, _options): + + uid = str(uuid.uuid4()) + path = os.path.join(target, uid) + os.makedirs(path) + + reply = { + "path": target, + "data": { + "refs": refs + } + } + return reply + + def main(): - args = json.load(sys.stdin) - refs = args["refs"] - target = args["target"] - - uid = str(uuid.uuid4()) - path = os.path.join(target, uid) - os.makedirs(path) - - data = {"path": path, "data": {"refs": refs}} - json.dump(data, sys.stdout) - return 0 + service = NoopInput.from_args(sys.argv[1:]) + service.main() if __name__ == '__main__': - r = main() - sys.exit(r) + main() diff --git a/inputs/org.osbuild.ostree b/inputs/org.osbuild.ostree index 6f4731d8..e0e3fc41 100755 --- a/inputs/org.osbuild.ostree +++ b/inputs/org.osbuild.ostree @@ -17,7 +17,7 @@ import json import sys import subprocess -from osbuild.objectstore import StoreClient +from osbuild import inputs SCHEMA = """ @@ -99,31 +99,31 @@ def export(checksums, cache, output): } } - json.dump(reply, sys.stdout) + return reply + + +class OSTreeInput(inputs.InputService): + + def map(self, store, origin, refs, target, _options): + + if origin == "org.osbuild.pipeline": + for ref, options in refs.items(): + source = store.read_tree(ref) + with open(os.path.join(source, "compose.json"), "r") as f: + compose = json.load(f) + commit_id = compose["ostree-commit"] + reply = export({commit_id: options}, source, target) + else: + source = store.source("org.osbuild.ostree") + reply = export(refs, source, target) + + return reply def main(): - args = json.load(sys.stdin) - refs = args["refs"] - target = args["target"] - - origin = args["origin"] - store = StoreClient(connect_to=args["api"]["store"]) - - if origin == "org.osbuild.pipeline": - for ref, options in refs.items(): - source = store.read_tree(ref) - with open(os.path.join(source, "compose.json"), "r") as f: - compose = json.load(f) - commit_id = compose["ostree-commit"] - export({commit_id: options}, source, target) - else: - source = store.source("org.osbuild.ostree") - export(refs, source, target) - - return 0 + service = OSTreeInput.from_args(sys.argv[1:]) + service.main() if __name__ == '__main__': - r = main() - sys.exit(r) + main() diff --git a/inputs/org.osbuild.tree b/inputs/org.osbuild.tree index f98bae0d..42249a29 100755 --- a/inputs/org.osbuild.tree +++ b/inputs/org.osbuild.tree @@ -8,11 +8,9 @@ in read only mode. If the id is `null` or the empty string it returns an empty tree. """ - -import json import sys -from osbuild.objectstore import StoreClient +from osbuild import inputs SCHEMA = """ @@ -52,33 +50,29 @@ SCHEMA = """ """ -def error(msg): - json.dump({"error": msg}, sys.stdout) - sys.exit(1) +class TreeInput(inputs.InputService): + + def map(self, store, _origin, refs, target, _options): + + # input verification *must* have been done via schema + # verification. It is expected that origin is a pipeline + # and we have exactly one reference, i.e. a pipeline id + pid, _ = refs.popitem() + + if pid: + path = store.read_tree_at(pid, target) + + if not path: + raise ValueError(f"Unknown pipeline '{pid}'") + + reply = {"path": target} + return reply def main(): - args = json.load(sys.stdin) - refs = args["refs"] - target = args["target"] - - # input verification *must* have been done via schema - # verification. It is expected that origin is a pipeline - # and we have exactly one reference, i.e. a pipeline id - pid, _ = refs.popitem() - - store = StoreClient(connect_to=args["api"]["store"]) - - if pid: - path = store.read_tree_at(pid, target) - - if not path: - error(f"Could not find pipeline with id '{pid}'") - - json.dump({"path": target}, sys.stdout) - return 0 + service = TreeInput.from_args(sys.argv[1:]) + service.main() if __name__ == '__main__': - r = main() - sys.exit(r) + main() diff --git a/osbuild/inputs.py b/osbuild/inputs.py index c51c06aa..4a029a9c 100644 --- a/osbuild/inputs.py +++ b/osbuild/inputs.py @@ -16,17 +16,16 @@ osbuild is the path. The input options are just passed to the `Input` as is and the result is forwarded to the `Stage`. """ - +import abc import hashlib -import importlib import json import os -import subprocess from typing import Dict, Optional, Tuple +from osbuild import host from osbuild.util.types import PathLike -from .objectstore import StoreServer +from .objectstore import StoreClient, StoreServer class Input: @@ -59,13 +58,15 @@ class Input: m.update(json.dumps(self.options, sort_keys=True).encode()) return m.hexdigest() - def run(self, storeapi: StoreServer, root: PathLike) -> Tuple[str, Dict]: - name = self.info.name + def map(self, + mgr: host.ServiceManager, + storeapi: StoreServer, + root: PathLike) -> Tuple[str, Dict]: target = os.path.join(root, self.name) os.makedirs(target) - msg = { + args = { # mandatory bits "origin": self.origin, "refs": self.refs, @@ -81,32 +82,8 @@ class Input: } } - # We want the `osbuild` python package that contains this - # very module, which might be different from the system wide - # installed one, to be accessible to the Input programs so - # we detect our origin and set the `PYTHONPATH` accordingly - modorigin = importlib.util.find_spec("osbuild").origin - modpath = os.path.dirname(modorigin) - env = os.environ.copy() - env["PYTHONPATH"] = os.path.dirname(modpath) - - r = subprocess.run([self.info.path], - env=env, - input=json.dumps(msg), - stdout=subprocess.PIPE, - encoding="utf-8", - check=False) - - try: - reply = json.loads(r.stdout) - except ValueError: - raise RuntimeError(f"{name}: error: {r.stderr}") from None - - if "error" in reply: - raise RuntimeError(f"{name}: " + reply["error"]) - - if r.returncode != 0: - raise RuntimeError(f"{name}: error {r.returncode}") + client = mgr.start(f"input/{self.name}", self.info.path) + reply = client.call("map", args) path = reply["path"] @@ -116,3 +93,29 @@ class Input: reply["path"] = os.path.relpath(path, root) return reply + + +class InputService(host.Service): + """Input host service""" + + @abc.abstractmethod + def map(self, store, origin, refs, target, options): + pass + + def unmap(self): + pass + + def stop(self): + self.unmap() + + def dispatch(self, method: str, args, _fds): + if method == "map": + store = StoreClient(connect_to=args["api"]["store"]) + r = self.map(store, + args["origin"], + args["refs"], + args["target"], + args["options"]) + return r, None + + raise host.ProtocolError("Unknown method") diff --git a/osbuild/pipeline.py b/osbuild/pipeline.py index 1f6433eb..7e94b651 100644 --- a/osbuild/pipeline.py +++ b/osbuild/pipeline.py @@ -7,6 +7,7 @@ from typing import Dict, Iterator, List, Optional from .api import API from . import buildroot +from . import host from . import objectstore from . import remoteloop from .inputs import Input @@ -95,8 +96,11 @@ class Stage: storeapi = objectstore.StoreServer(store) cm.enter_context(storeapi) + mgr = host.ServiceManager(monitor=monitor) + cm.enter_context(mgr) + for key, ip in self.inputs.items(): - data = ip.run(storeapi, inputs_tmpdir) + data = ip.map(mgr, storeapi, inputs_tmpdir) inputs[key] = data api = API(args, monitor)