debian-forge/osbuild/sources.py
Tom Gundersen 7817ae5e8b sources: add org.osbuild.files source
This source adds support for downloaded files. The files are
indexed by their content hash, and the only option is their URL.

The main usecase for this will be downloading rpms. Allowing depsolving
to be done outside of osbuild, network access to be restricted and
downloaded rpms to be reused between runs.

Each source is now passed two additional arguments, a cache directory
and an output directory. Both are in the source's namespace, and
the source is responsible for managing them. Each directory may
contain contents from previous runs, but neither is ever guaranteed
to do so.

Downloaded contents may be saved to the cache and resued between
runs, and the requested content should be written to the output dir.
If secrets are used, the source must only ever write contents to
the output that corresponds to the available secrets (rather than
contents from the cache from previous runs).

Each stage is passed an additional argument, a sources directory.
The directory is read-only, and contains a subdirectory named after
each used source, which will contain the requseted contents when
the `Get()` call returns (if the source uses this functionality).

Based on a patch by Lars Karlitski.

Signed-off-by: Tom Gundersen <teg@jklm.no>
2020-02-06 19:01:12 +01:00

81 lines
2.6 KiB
Python

import asyncio
import json
import socket
import subprocess
import threading
class SourcesServer:
# pylint: disable=too-many-instance-attributes
def __init__(self, socket_address, sources_libdir, options, cache, output, secrets=None):
self.socket_address = socket_address
self.sources_libdir = sources_libdir
self.cache = cache
self.output = output
self.options = options or {}
self.secrets = secrets or {}
self.event_loop = asyncio.new_event_loop()
self.thread = threading.Thread(target=self._run_event_loop)
self.barrier = threading.Barrier(2)
def _run_source(self, source, checksums):
msg = {
"options": self.options.get(source, {}),
"secrets": self.secrets.get(source, {}),
"cache": f"{self.cache}/{source}",
"output": f"{self.output}/{source}",
"checksums": checksums
}
r = subprocess.run(
[f"{self.sources_libdir}/{source}"],
input=json.dumps(msg),
stdout=subprocess.PIPE,
encoding="utf-8",
check=False)
try:
return json.loads(r.stdout)
except ValueError:
return {"error": f"source returned malformed json: {r.stdout}"}
def _dispatch(self, sock):
msg, addr = sock.recvfrom(65536)
request = json.loads(msg)
reply = self._run_source(request["source"], request["checksums"])
msg = json.dumps(reply).encode("utf-8")
sock.sendmsg([msg], [], 0, addr)
def _run_event_loop(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.bind(self.socket_address)
self.barrier.wait()
self.event_loop.add_reader(sock, self._dispatch, sock)
asyncio.set_event_loop(self.event_loop)
self.event_loop.run_forever()
self.event_loop.remove_reader(sock)
sock.close()
def __enter__(self):
self.thread.start()
self.barrier.wait()
return self
def __exit__(self, *args):
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
self.thread.join()
def get(source, checksums, api_path="/run/osbuild/api/sources"):
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
sock.connect(api_path)
msg = {
"source": source,
"checksums": checksums
}
sock.sendall(json.dumps(msg).encode('utf-8'))
reply = json.loads(sock.recv(65536))
if "error" in reply:
raise RuntimeError(f"{source}: " + reply["error"])
return reply