debian-forge/osbuild/sources.py
Michael Vogt f214c69a98 osbuild: add workaround to integrate sources into progress reporting
This commit is somewhat poor, sorry for that. It mostly adds
workaround so that the osbuild sources can emit some progress
reporting as well. Without that the user experience is rather poor
and there is a long delay before any sort of progress can be
reported (even before the normal stages run).

With it the user experience is still not good but slightly better,
i.e. the progress monitor will report that the sources have
started downloading and curl will generated some log output. No
real progress unfortunately (sources subprogress will jump from
zero to 100%).
2024-03-12 16:44:12 +01:00

131 lines
3.9 KiB
Python

import abc
import contextlib
import hashlib
import json
import os
import tempfile
from typing import ClassVar, Dict, Tuple
from . import host
from .objectstore import ObjectStore
from .util.types import PathLike
class Source:
"""
A single source with is corresponding options.
"""
def __init__(self, info, items, options) -> None:
self.info = info
self.items = items or {}
self.options = options
# compat with pipeline
self.build = None
self.runner = None
self.source_epoch = None
def download(self, mgr: host.ServiceManager, store: ObjectStore, libdir: PathLike):
source = self.info.name
cache = os.path.join(store.store, "sources")
args = {
"options": self.options,
"cache": cache,
"output": None,
"checksums": [],
"libdir": os.fspath(libdir)
}
client = mgr.start(f"source/{source}", self.info.path)
with self.make_items_file(store.tmp) as fd:
fds = [fd]
reply = client.call_with_fds("download", args, fds)
return reply
@contextlib.contextmanager
def make_items_file(self, tmp):
with tempfile.TemporaryFile("w+", dir=tmp, encoding="utf-8") as f:
json.dump(self.items, f)
f.seek(0)
yield f.fileno()
# "name", "id", "stages", "results" is only here to make it looks like a
# pipeline for the monitor. This should be revisited at some point
# and maybe the monitor should get first-class support for
# sources?
#
# In any case, sources can be represented only poorly right now
# by the monitor because the source is called with download()
# for all items and there is no way for a stage right now to
# report something structured back to the host that runs the
# source so it just downloads all sources without any user
# visible progress right now
@property
def name(self):
return f"source {self.info.name}"
@property
def id(self):
m = hashlib.sha256()
m.update(json.dumps(self.info.name, sort_keys=True).encode())
m.update(json.dumps(self.items, sort_keys=True).encode())
return m.hexdigest()
@property
def stages(self):
return []
class SourceService(host.Service):
"""Source host service"""
max_workers = 1
content_type: ClassVar[str]
"""The content type of the source."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cache = None
self.options = None
self.tmpdir = None
@abc.abstractmethod
def fetch_one(self, checksum, desc) -> None:
"""Performs the actual fetch of an element described by its checksum and its descriptor"""
@abc.abstractmethod
def fetch_all(self, items: Dict) -> None:
"""Fetch all sources."""
def exists(self, checksum, _desc) -> bool:
"""Returns True if the item to download is in cache. """
return os.path.isfile(f"{self.cache}/{checksum}")
# pylint: disable=[no-self-use]
def transform(self, checksum, desc) -> Tuple:
"""Modify the input data before downloading. By default only transforms an item object to a Tupple."""
return checksum, desc
@staticmethod
def load_items(fds):
with os.fdopen(fds.steal(0)) as f:
items = json.load(f)
return items
def setup(self, args):
self.cache = os.path.join(args["cache"], self.content_type)
os.makedirs(self.cache, exist_ok=True)
self.options = args["options"]
def dispatch(self, method: str, args, fds):
if method == "download":
self.setup(args)
with tempfile.TemporaryDirectory(prefix=".unverified-", dir=self.cache) as self.tmpdir:
self.fetch_all(SourceService.load_items(fds))
return None, None
raise host.ProtocolError("Unknown method")