From c902a7a754c352d0b69c368945440393400098a2 Mon Sep 17 00:00:00 2001 From: Christian Kellner Date: Thu, 16 Sep 2021 17:05:27 +0200 Subject: [PATCH] sources: port to host services Port sources to also use the host services infrastructure that is used by inputs, devices and mounts. Sources are a bit different from the other services that they don't run for the duration of the stage but are run before anything is built. By using the same infrastructure we re-use the process management and inter process communcation. Additionally, this will forward all messages from sources to the existing monitoring framework. Adapt all existing sources and tests. --- osbuild/main_cli.py | 2 +- osbuild/pipeline.py | 7 +++--- osbuild/sources.py | 50 ++++++++++++++++++-------------------- sources/org.osbuild.curl | 49 +++++++++++++------------------------ sources/org.osbuild.inline | 33 ++++++++++--------------- sources/org.osbuild.ostree | 28 ++++++++------------- test/run/test_sources.py | 18 ++++++++------ 7 files changed, 78 insertions(+), 109 deletions(-) diff --git a/osbuild/main_cli.py b/osbuild/main_cli.py index fc9aab17..5ca2b9f9 100644 --- a/osbuild/main_cli.py +++ b/osbuild/main_cli.py @@ -135,7 +135,7 @@ def osbuild_cli(): output_directory = args.output_directory with ObjectStore(args.store) as object_store: - manifest.download(object_store, args.libdir) + manifest.download(object_store, monitor, args.libdir) r = manifest.build( object_store, diff --git a/osbuild/pipeline.py b/osbuild/pipeline.py index 853397c0..f0499bbc 100644 --- a/osbuild/pipeline.py +++ b/osbuild/pipeline.py @@ -344,9 +344,10 @@ class Manifest: self.sources.append(source) return source - def download(self, store, libdir): - for source in self.sources: - source.download(store, libdir) + def download(self, store, monitor, libdir): + with host.ServiceManager(monitor=monitor) as mgr: + for source in self.sources: + source.download(mgr, store, libdir) def build(self, store, monitor, libdir, output_directory): results = {"success": True} diff --git a/osbuild/sources.py b/osbuild/sources.py index 5015eb58..d15eca95 100644 --- a/osbuild/sources.py +++ b/osbuild/sources.py @@ -1,8 +1,7 @@ +import abc import os -import importlib -import json -import subprocess +from . import host from .objectstore import ObjectStore from .util.types import PathLike @@ -17,10 +16,11 @@ class Source: self.items = items or {} self.options = options - def download(self, store: ObjectStore, libdir: PathLike): + def download(self, mgr: host.ServiceManager, store: ObjectStore, libdir: PathLike): source = self.info.name cache = os.path.join(store.store, "sources") - msg = { + + args = { "items": self.items, "options": self.options, "cache": cache, @@ -29,29 +29,25 @@ class Source: "libdir": os.fspath(libdir) } - # We want the `osbuild` python package that contains this - # very module, which might be different from the system wide - # installed one, to be accessible to the Input programs so - # we detect our origin and set the `PYTHONPATH` accordingly - modorigin = importlib.util.find_spec("osbuild").origin - modpath = os.path.dirname(modorigin) - env = os.environ.copy() - env["PYTHONPATH"] = os.path.dirname(modpath) + client = mgr.start(f"source/{source}", self.info.path) - r = subprocess.run([self.info.path], - env=env, - input=json.dumps(msg), - stdout=subprocess.PIPE, - encoding="utf-8", - check=False) + reply = client.call("download", args) - try: - reply = json.loads(r.stdout) - except ValueError: - raise RuntimeError(f"{source}: error: {r.stderr}") from None + return reply - if "error" in reply: - raise RuntimeError(f"{source}: " + reply["error"]) - if r.returncode != 0: - raise RuntimeError(f"{source}: error {r.returncode}") +class SourceService(host.Service): + """Source host service""" + + @abc.abstractmethod + def download(self, items, cache, options): + pass + + def dispatch(self, method: str, args, _fds): + if method == "download": + r = self.download(args["items"], + args["cache"], + args["options"]) + return r, None + + raise host.ProtocolError("Unknown method") diff --git a/sources/org.osbuild.curl b/sources/org.osbuild.curl index 6ba1239f..dbc7d3df 100755 --- a/sources/org.osbuild.curl +++ b/sources/org.osbuild.curl @@ -15,7 +15,6 @@ up the download. import concurrent.futures import itertools -import json import math import os import subprocess @@ -23,7 +22,7 @@ import sys import tempfile import time -from typing import Dict +from osbuild import sources from osbuild.util.checksum import verify_file from osbuild.util.rhsm import Subscriptions @@ -156,47 +155,33 @@ def download(items, cache): # check if url needs rhsm secrets if url.get("secrets", {}).get("name") == "org.osbuild.rhsm": - try: - # rhsm secrets only need to be retrieved once and can then be reused - if subscriptions is None: - subscriptions = Subscriptions.from_host_system() - url["secrets"] = subscriptions.get_secrets(url.get("url")) - except RuntimeError as e: - json.dump({"error": e.args[0]}, sys.stdout) - return 1 + # rhsm secrets only need to be retrieved once and can then be reused + if subscriptions is None: + subscriptions = Subscriptions.from_host_system() + url["secrets"] = subscriptions.get_secrets(url.get("url")) requested_urls.append(url) requested_checksums.append(checksum) results = executor.map(fetch, requested_urls, requested_checksums, itertools.repeat(cache)) - try: - for _ in results: - pass - except RuntimeError as e: - json.dump({"error": e.args[0]}, sys.stdout) - return 1 - - return 0 + for _ in results: + pass -def main(items: Dict, cache: str): - cache = os.path.join(cache, "org.osbuild.files") +class CurlSource(sources.SourceService): - if not items: - json.dump({}, sys.stdout) - return 0 + def download(self, items, cache, _options): + cache = os.path.join(cache, "org.osbuild.files") + os.makedirs(cache, exist_ok=True) - os.makedirs(cache, exist_ok=True) - res = download(items, cache) - if res != 0: - return res + download(items, cache) - json.dump({}, sys.stdout) - return 0 + +def main(): + service = CurlSource.from_args(sys.argv[1:]) + service.main() if __name__ == '__main__': - args = json.load(sys.stdin) - r = main(args["items"], args["cache"]) - sys.exit(r) + main() diff --git a/sources/org.osbuild.inline b/sources/org.osbuild.inline index 4e33a53f..74206793 100755 --- a/sources/org.osbuild.inline +++ b/sources/org.osbuild.inline @@ -13,13 +13,13 @@ resource is decoded and written to the store. import base64 import contextlib -import json import os import sys import tempfile from typing import Dict +from osbuild import sources from osbuild.util.checksum import verify_file @@ -73,33 +73,26 @@ def process(items: Dict, cache: str, tmpdir): f.write(data) if not verify_file(floating, checksum): - json.dump({"error": f"checksum mismatch: {checksum}"}, sys.stdout) - sys.exit(1) + raise RuntimeError("Checksum mismatch for {}".format(checksum)) with contextlib.suppress(FileExistsError): os.rename(floating, target) -def main(items: Dict, base: str): - cache = os.path.join(base, "org.osbuild.files") +class InlineSource(sources.SourceService): - if not items: - json.dump({}, sys.stdout) - return 0 - - try: + def download(self, items, cache, _options): + cache = os.path.join(cache, "org.osbuild.files") os.makedirs(cache, exist_ok=True) - with tempfile.TemporaryDirectory(prefix=".unverified-", dir=base) as tmpdir: - process(items, cache, tmpdir) - except Exception as e: # pylint: disable=broad-except - json.dump({"error": str(e)}, sys.stdout) - return 0 - json.dump({}, sys.stdout) - return 0 + with tempfile.TemporaryDirectory(prefix=".unverified-", dir=cache) as tmpdir: + process(items, cache, tmpdir) + + +def main(): + service = InlineSource.from_args(sys.argv[1:]) + service.main() if __name__ == '__main__': - source_args = json.load(sys.stdin) - r = main(source_args["items"], source_args["cache"]) - sys.exit(r) + main() diff --git a/sources/org.osbuild.ostree b/sources/org.osbuild.ostree index 586d51bb..07ec4938 100755 --- a/sources/org.osbuild.ostree +++ b/sources/org.osbuild.ostree @@ -7,13 +7,12 @@ gpg keys are provided via `gpgkeys`. """ -import json import os import sys import subprocess import uuid -from typing import Dict +from osbuild import sources SCHEMA = """ @@ -113,26 +112,19 @@ def download(items, cache): repo=repo_cache) -def main(items: Dict, cache: str): - cache = os.path.join(cache, "org.osbuild.ostree") +class OSTreeSource(sources.SourceService): - if not items: - json.dump({}, sys.stdout) - return 0 + def download(self, items, cache, _options): + cache = os.path.join(cache, "org.osbuild.ostree") + os.makedirs(cache, exist_ok=True) - os.makedirs(cache, exist_ok=True) - try: download(items, cache) - except subprocess.CalledProcessError as e: - output = e.output.strip() - json.dump({"error": output}, sys.stdout) - return 1 - json.dump({}, sys.stdout) - return 0 + +def main(): + service = OSTreeSource.from_args(sys.argv[1:]) + service.main() if __name__ == '__main__': - source_args = json.load(sys.stdin) - r = main(source_args["items"], source_args["cache"]) - sys.exit(r) + main() diff --git a/test/run/test_sources.py b/test/run/test_sources.py index b4d0981c..66002e56 100644 --- a/test/run/test_sources.py +++ b/test/run/test_sources.py @@ -17,6 +17,7 @@ import pytest import osbuild.objectstore import osbuild.meta import osbuild.sources +from osbuild import host from .. import test @@ -87,14 +88,15 @@ def make_test_cases(): def check_case(source, case, store, libdir): - expects = case["expects"] - if expects == "error": - with pytest.raises(RuntimeError): - source.download(store, libdir) - elif expects == "success": - source.download(store, libdir) - else: - raise ValueError(f"invalid expectation: {expects}") + with host.ServiceManager() as mgr: + expects = case["expects"] + if expects == "error": + with pytest.raises(host.RemoteError): + source.download(mgr, store, libdir) + elif expects == "success": + source.download(mgr, store, libdir) + else: + raise ValueError(f"invalid expectation: {expects}") @pytest.fixture(name="tmpdir")