debian-forge/osbuild/sources.py
Christian Kellner fa9c288988 sources: source itself controls cache sub-dir
Instead of supplying the full cache dir, i.e. the directory in
the store where the source will place the fetched resources, to
the source, only supply the root folder of the cache and let
the source itself create the desired sub-directory. This allows
the source to determine what type of resource it provides. This
makes the final directory independent of the name of the source:
a `org.osbuild.curl` source can place file-like resource in the
`org.osbuild.files` sub-directory. Then the `org.osbuild.files`
input can be used to get those from the cache directory.
2021-02-12 19:27:08 +01:00

109 lines
3.3 KiB
Python

import os
import importlib
import json
import subprocess
from . import api
from .objectstore import ObjectStore
from .util import jsoncomm
from .util.types import PathLike
class Source:
"""
A single source with is corresponding options.
"""
def __init__(self, info, items, options) -> None:
self.info = info
self.items = items or {}
self.options = options
def download(self, store: ObjectStore, libdir: PathLike):
source = self.info.name
cache = os.path.join(store.store, "sources")
msg = {
"items": self.items,
"options": self.options,
"cache": cache,
"output": None,
"checksums": [],
"libdir": os.fspath(libdir)
}
# We want the `osbuild` python package that contains this
# very module, which might be different from the system wide
# installed one, to be accessible to the Input programs so
# we detect our origin and set the `PYTHONPATH` accordingly
modorigin = importlib.util.find_spec("osbuild").origin
modpath = os.path.dirname(modorigin)
env = os.environ.copy()
env["PYTHONPATH"] = os.path.dirname(modpath)
r = subprocess.run([self.info.path],
env=env,
input=json.dumps(msg),
stdout=subprocess.PIPE,
encoding="utf-8",
check=False)
try:
reply = json.loads(r.stdout)
except ValueError:
raise RuntimeError(f"{source}: error: {r.stderr}") from None
if "error" in reply:
raise RuntimeError(f"{source}: " + reply["error"])
if r.returncode != 0:
raise RuntimeError(f"{source}: error {r.returncode}")
class SourcesServer(api.BaseAPI):
endpoint = "sources"
def __init__(self, libdir, options, cache, output, *, socket_address=None):
super().__init__(socket_address)
self.libdir = libdir
self.cache = cache
self.output = output
self.options = options or {}
def _run_source(self, source, checksums):
msg = {
"items": {},
"options": self.options.get(source, {}),
"cache": self.cache,
"output": f"{self.output}/{source}",
"checksums": checksums,
"libdir": self.libdir
}
r = subprocess.run(
[f"{self.libdir}/sources/{source}"],
input=json.dumps(msg),
stdout=subprocess.PIPE,
encoding="utf-8",
check=False)
try:
return json.loads(r.stdout)
except ValueError:
return {"error": f"source returned malformed json: {r.stdout}"}
def _message(self, msg, fds, sock):
reply = self._run_source(msg["source"], msg["checksums"])
sock.send(reply)
def get(source, checksums, api_path="/run/osbuild/api/sources"):
with jsoncomm.Socket.new_client(api_path) as client:
msg = {
"source": source,
"checksums": checksums
}
client.send(msg)
reply, _, _ = client.recv()
if "error" in reply:
raise RuntimeError(f"{source}: " + reply["error"])
return reply