debian-forge/osbuild/sources.py
Christian Kellner 1896047bae sources: pass the library dir to the sources
The idea is that source can themselves spawn other modules, esp.
new secrets modules. For this they need to know the library dir,
aka 'libdir' throughout the osbuild source. Therefore change the
SourceServer to directly get the library directory instead of
just the sub-directory to the sources. Then pass the library
directory to via the JSON API to the source.
Adjust all usage of the SourceServer, including the tests.
2020-05-20 14:43:33 +02:00

88 lines
2.7 KiB
Python

import asyncio
import json
import subprocess
import threading
from .util import jsoncomm
class SourcesServer:
# pylint: disable=too-many-instance-attributes
def __init__(self, socket_address, libdir, options, cache, output, secrets=None):
self.socket_address = socket_address
self.libdir = libdir
self.cache = cache
self.output = output
self.options = options or {}
self.secrets = secrets or {}
self.barrier = threading.Barrier(2)
self.event_loop = None
self.thread = None
def _run_source(self, source, checksums):
msg = {
"options": self.options.get(source, {}),
"secrets": self.secrets.get(source, {}),
"cache": f"{self.cache}/{source}",
"output": f"{self.output}/{source}",
"checksums": checksums,
"libdir": self.libdir
}
r = subprocess.run(
[f"{self.libdir}/sources/{source}"],
input=json.dumps(msg),
stdout=subprocess.PIPE,
encoding="utf-8",
check=False)
try:
return json.loads(r.stdout)
except ValueError:
return {"error": f"source returned malformed json: {r.stdout}"}
def _dispatch(self, server):
request, _, addr = server.recv()
reply = self._run_source(request["source"], request["checksums"])
server.send(reply, destination=addr)
def _run_event_loop(self):
with jsoncomm.Socket.new_server(self.socket_address) as server:
self.barrier.wait()
self.event_loop.add_reader(server, self._dispatch, server)
asyncio.set_event_loop(self.event_loop)
self.event_loop.run_forever()
self.event_loop.remove_reader(server)
def __enter__(self):
# We are not re-entrant, so complain if re-entered.
assert self.event_loop is None
self.event_loop = asyncio.new_event_loop()
self.thread = threading.Thread(target=self._run_event_loop)
self.barrier.reset()
self.thread.start()
self.barrier.wait()
return self
def __exit__(self, *args):
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
self.thread.join()
self.event_loop.close()
self.thread = None
self.event_loop = None
def get(source, checksums, api_path="/run/osbuild/api/sources"):
with jsoncomm.Socket.new_client(api_path) as client:
msg = {
"source": source,
"checksums": checksums
}
client.send(msg)
reply, _, _ = client.recv()
if "error" in reply:
raise RuntimeError(f"{source}: " + reply["error"])
return reply