debian-forge/osbuild/sources.py
Michael Vogt 77a61da760 osbuild: drop libdir from download() methods
The libdir is passed down for sources but it is never used in
any of our sources. As this is confusing and we want to eventually
support multiple libdirs remove this code.

It looks like the libdir for soruces was added a long time ago in 8423da3
but there is no indication if/how it is/was supposed to get used and
AFACT from going over the git history it was very used.

SourceService:dispatch() never sends "libdir" to the actual sources,
so it is not an even technically an API break.
2024-08-26 19:58:55 +02:00

124 lines
3.6 KiB
Python

import abc
import contextlib
import hashlib
import json
import os
import tempfile
from typing import ClassVar, Dict
from . import host
from .objectstore import ObjectStore
class Source:
"""
A single source with is corresponding options.
"""
def __init__(self, info, items, options) -> None:
self.info = info
self.items = items or {}
self.options = options
# compat with pipeline
self.build = None
self.runner = None
self.source_epoch = None
def download(self, mgr: host.ServiceManager, store: ObjectStore):
source = self.info.name
cache = os.path.join(store.store, "sources")
args = {
"options": self.options,
"cache": cache,
"output": None,
"checksums": [],
}
client = mgr.start(f"source/{source}", self.info.path)
with self.make_items_file(store.tmp) as fd:
fds = [fd]
reply = client.call_with_fds("download", args, fds)
return reply
@contextlib.contextmanager
def make_items_file(self, tmp):
with tempfile.TemporaryFile("w+", dir=tmp, encoding="utf-8") as f:
json.dump(self.items, f)
f.seek(0)
yield f.fileno()
# "name", "id", "stages", "results" is only here to make it looks like a
# pipeline for the monitor. This should be revisited at some point
# and maybe the monitor should get first-class support for
# sources?
#
# In any case, sources can be represented only poorly right now
# by the monitor because the source is called with download()
# for all items and there is no way for a stage right now to
# report something structured back to the host that runs the
# source so it just downloads all sources without any user
# visible progress right now
@property
def name(self):
return f"source {self.info.name}"
@property
def id(self):
m = hashlib.sha256()
m.update(json.dumps(self.info.name, sort_keys=True).encode())
m.update(json.dumps(self.items, sort_keys=True).encode())
return m.hexdigest()
@property
def stages(self):
return []
class SourceService(host.Service):
"""Source host service"""
max_workers = 1
content_type: ClassVar[str]
"""The content type of the source."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cache = None
self.options = None
self.tmpdir = None
@abc.abstractmethod
def fetch_one(self, checksum, desc) -> None:
"""Performs the actual fetch of an element described by its checksum and its descriptor"""
@abc.abstractmethod
def fetch_all(self, items: Dict) -> None:
"""Fetch all sources."""
def exists(self, checksum, _desc) -> bool:
"""Returns True if the item to download is in cache. """
return os.path.isfile(f"{self.cache}/{checksum}")
@staticmethod
def load_items(fds):
with os.fdopen(fds.steal(0)) as f:
items = json.load(f)
return items
def setup(self, args):
self.cache = os.path.join(args["cache"], self.content_type)
os.makedirs(self.cache, exist_ok=True)
self.options = args["options"]
def dispatch(self, method: str, args, fds):
if method == "download":
self.setup(args)
with tempfile.TemporaryDirectory(prefix=".unverified-", dir=self.cache) as self.tmpdir:
self.fetch_all(SourceService.load_items(fds))
return None, None
raise host.ProtocolError("Unknown method")