sources: port to host services
Port sources to also use the host services infrastructure that is used by inputs, devices and mounts. Sources are a bit different from the other services that they don't run for the duration of the stage but are run before anything is built. By using the same infrastructure we re-use the process management and inter process communcation. Additionally, this will forward all messages from sources to the existing monitoring framework. Adapt all existing sources and tests.
This commit is contained in:
parent
072b75d78e
commit
c902a7a754
7 changed files with 78 additions and 109 deletions
|
|
@ -135,7 +135,7 @@ def osbuild_cli():
|
|||
output_directory = args.output_directory
|
||||
with ObjectStore(args.store) as object_store:
|
||||
|
||||
manifest.download(object_store, args.libdir)
|
||||
manifest.download(object_store, monitor, args.libdir)
|
||||
|
||||
r = manifest.build(
|
||||
object_store,
|
||||
|
|
|
|||
|
|
@ -344,9 +344,10 @@ class Manifest:
|
|||
self.sources.append(source)
|
||||
return source
|
||||
|
||||
def download(self, store, libdir):
|
||||
def download(self, store, monitor, libdir):
|
||||
with host.ServiceManager(monitor=monitor) as mgr:
|
||||
for source in self.sources:
|
||||
source.download(store, libdir)
|
||||
source.download(mgr, store, libdir)
|
||||
|
||||
def build(self, store, monitor, libdir, output_directory):
|
||||
results = {"success": True}
|
||||
|
|
|
|||
|
|
@ -1,8 +1,7 @@
|
|||
import abc
|
||||
import os
|
||||
import importlib
|
||||
import json
|
||||
import subprocess
|
||||
|
||||
from . import host
|
||||
from .objectstore import ObjectStore
|
||||
from .util.types import PathLike
|
||||
|
||||
|
|
@ -17,10 +16,11 @@ class Source:
|
|||
self.items = items or {}
|
||||
self.options = options
|
||||
|
||||
def download(self, store: ObjectStore, libdir: PathLike):
|
||||
def download(self, mgr: host.ServiceManager, store: ObjectStore, libdir: PathLike):
|
||||
source = self.info.name
|
||||
cache = os.path.join(store.store, "sources")
|
||||
msg = {
|
||||
|
||||
args = {
|
||||
"items": self.items,
|
||||
"options": self.options,
|
||||
"cache": cache,
|
||||
|
|
@ -29,29 +29,25 @@ class Source:
|
|||
"libdir": os.fspath(libdir)
|
||||
}
|
||||
|
||||
# We want the `osbuild` python package that contains this
|
||||
# very module, which might be different from the system wide
|
||||
# installed one, to be accessible to the Input programs so
|
||||
# we detect our origin and set the `PYTHONPATH` accordingly
|
||||
modorigin = importlib.util.find_spec("osbuild").origin
|
||||
modpath = os.path.dirname(modorigin)
|
||||
env = os.environ.copy()
|
||||
env["PYTHONPATH"] = os.path.dirname(modpath)
|
||||
client = mgr.start(f"source/{source}", self.info.path)
|
||||
|
||||
r = subprocess.run([self.info.path],
|
||||
env=env,
|
||||
input=json.dumps(msg),
|
||||
stdout=subprocess.PIPE,
|
||||
encoding="utf-8",
|
||||
check=False)
|
||||
reply = client.call("download", args)
|
||||
|
||||
try:
|
||||
reply = json.loads(r.stdout)
|
||||
except ValueError:
|
||||
raise RuntimeError(f"{source}: error: {r.stderr}") from None
|
||||
return reply
|
||||
|
||||
if "error" in reply:
|
||||
raise RuntimeError(f"{source}: " + reply["error"])
|
||||
|
||||
if r.returncode != 0:
|
||||
raise RuntimeError(f"{source}: error {r.returncode}")
|
||||
class SourceService(host.Service):
|
||||
"""Source host service"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def download(self, items, cache, options):
|
||||
pass
|
||||
|
||||
def dispatch(self, method: str, args, _fds):
|
||||
if method == "download":
|
||||
r = self.download(args["items"],
|
||||
args["cache"],
|
||||
args["options"])
|
||||
return r, None
|
||||
|
||||
raise host.ProtocolError("Unknown method")
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ up the download.
|
|||
|
||||
import concurrent.futures
|
||||
import itertools
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import subprocess
|
||||
|
|
@ -23,7 +22,7 @@ import sys
|
|||
import tempfile
|
||||
import time
|
||||
|
||||
from typing import Dict
|
||||
from osbuild import sources
|
||||
|
||||
from osbuild.util.checksum import verify_file
|
||||
from osbuild.util.rhsm import Subscriptions
|
||||
|
|
@ -156,47 +155,33 @@ def download(items, cache):
|
|||
|
||||
# check if url needs rhsm secrets
|
||||
if url.get("secrets", {}).get("name") == "org.osbuild.rhsm":
|
||||
try:
|
||||
# rhsm secrets only need to be retrieved once and can then be reused
|
||||
if subscriptions is None:
|
||||
subscriptions = Subscriptions.from_host_system()
|
||||
url["secrets"] = subscriptions.get_secrets(url.get("url"))
|
||||
except RuntimeError as e:
|
||||
json.dump({"error": e.args[0]}, sys.stdout)
|
||||
return 1
|
||||
|
||||
requested_urls.append(url)
|
||||
requested_checksums.append(checksum)
|
||||
|
||||
results = executor.map(fetch, requested_urls, requested_checksums, itertools.repeat(cache))
|
||||
|
||||
try:
|
||||
for _ in results:
|
||||
pass
|
||||
except RuntimeError as e:
|
||||
json.dump({"error": e.args[0]}, sys.stdout)
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def main(items: Dict, cache: str):
|
||||
class CurlSource(sources.SourceService):
|
||||
|
||||
def download(self, items, cache, _options):
|
||||
cache = os.path.join(cache, "org.osbuild.files")
|
||||
|
||||
if not items:
|
||||
json.dump({}, sys.stdout)
|
||||
return 0
|
||||
|
||||
os.makedirs(cache, exist_ok=True)
|
||||
res = download(items, cache)
|
||||
if res != 0:
|
||||
return res
|
||||
|
||||
json.dump({}, sys.stdout)
|
||||
return 0
|
||||
download(items, cache)
|
||||
|
||||
|
||||
def main():
|
||||
service = CurlSource.from_args(sys.argv[1:])
|
||||
service.main()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = json.load(sys.stdin)
|
||||
r = main(args["items"], args["cache"])
|
||||
sys.exit(r)
|
||||
main()
|
||||
|
|
|
|||
|
|
@ -13,13 +13,13 @@ resource is decoded and written to the store.
|
|||
|
||||
import base64
|
||||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
from typing import Dict
|
||||
|
||||
from osbuild import sources
|
||||
from osbuild.util.checksum import verify_file
|
||||
|
||||
|
||||
|
|
@ -73,33 +73,26 @@ def process(items: Dict, cache: str, tmpdir):
|
|||
f.write(data)
|
||||
|
||||
if not verify_file(floating, checksum):
|
||||
json.dump({"error": f"checksum mismatch: {checksum}"}, sys.stdout)
|
||||
sys.exit(1)
|
||||
raise RuntimeError("Checksum mismatch for {}".format(checksum))
|
||||
|
||||
with contextlib.suppress(FileExistsError):
|
||||
os.rename(floating, target)
|
||||
|
||||
|
||||
def main(items: Dict, base: str):
|
||||
cache = os.path.join(base, "org.osbuild.files")
|
||||
class InlineSource(sources.SourceService):
|
||||
|
||||
if not items:
|
||||
json.dump({}, sys.stdout)
|
||||
return 0
|
||||
|
||||
try:
|
||||
def download(self, items, cache, _options):
|
||||
cache = os.path.join(cache, "org.osbuild.files")
|
||||
os.makedirs(cache, exist_ok=True)
|
||||
with tempfile.TemporaryDirectory(prefix=".unverified-", dir=base) as tmpdir:
|
||||
process(items, cache, tmpdir)
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
json.dump({"error": str(e)}, sys.stdout)
|
||||
return 0
|
||||
|
||||
json.dump({}, sys.stdout)
|
||||
return 0
|
||||
with tempfile.TemporaryDirectory(prefix=".unverified-", dir=cache) as tmpdir:
|
||||
process(items, cache, tmpdir)
|
||||
|
||||
|
||||
def main():
|
||||
service = InlineSource.from_args(sys.argv[1:])
|
||||
service.main()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
source_args = json.load(sys.stdin)
|
||||
r = main(source_args["items"], source_args["cache"])
|
||||
sys.exit(r)
|
||||
main()
|
||||
|
|
|
|||
|
|
@ -7,13 +7,12 @@ gpg keys are provided via `gpgkeys`.
|
|||
"""
|
||||
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import uuid
|
||||
|
||||
from typing import Dict
|
||||
from osbuild import sources
|
||||
|
||||
|
||||
SCHEMA = """
|
||||
|
|
@ -113,26 +112,19 @@ def download(items, cache):
|
|||
repo=repo_cache)
|
||||
|
||||
|
||||
def main(items: Dict, cache: str):
|
||||
class OSTreeSource(sources.SourceService):
|
||||
|
||||
def download(self, items, cache, _options):
|
||||
cache = os.path.join(cache, "org.osbuild.ostree")
|
||||
|
||||
if not items:
|
||||
json.dump({}, sys.stdout)
|
||||
return 0
|
||||
|
||||
os.makedirs(cache, exist_ok=True)
|
||||
try:
|
||||
download(items, cache)
|
||||
except subprocess.CalledProcessError as e:
|
||||
output = e.output.strip()
|
||||
json.dump({"error": output}, sys.stdout)
|
||||
return 1
|
||||
|
||||
json.dump({}, sys.stdout)
|
||||
return 0
|
||||
download(items, cache)
|
||||
|
||||
|
||||
def main():
|
||||
service = OSTreeSource.from_args(sys.argv[1:])
|
||||
service.main()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
source_args = json.load(sys.stdin)
|
||||
r = main(source_args["items"], source_args["cache"])
|
||||
sys.exit(r)
|
||||
main()
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import pytest
|
|||
import osbuild.objectstore
|
||||
import osbuild.meta
|
||||
import osbuild.sources
|
||||
from osbuild import host
|
||||
from .. import test
|
||||
|
||||
|
||||
|
|
@ -87,12 +88,13 @@ def make_test_cases():
|
|||
|
||||
|
||||
def check_case(source, case, store, libdir):
|
||||
with host.ServiceManager() as mgr:
|
||||
expects = case["expects"]
|
||||
if expects == "error":
|
||||
with pytest.raises(RuntimeError):
|
||||
source.download(store, libdir)
|
||||
with pytest.raises(host.RemoteError):
|
||||
source.download(mgr, store, libdir)
|
||||
elif expects == "success":
|
||||
source.download(store, libdir)
|
||||
source.download(mgr, store, libdir)
|
||||
else:
|
||||
raise ValueError(f"invalid expectation: {expects}")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue