debian-forge/osbuild/sources.py
Christian Kellner 0c7284572e osbuild: auto-generate socket addresses for APIs
Rely on the ability of `BaseAPI` to auto-generate socket addresses
when no one was provided. The `BuildRoot` does not rely on the
sockets being created in the `BuildRoot.api` directory anymore and
will instead bind-mount each individual socket address to the well
known location via the `BaseAPI.endpoint` identifier.
Convert all API providers to take the `socket_address` as an
optional keyword argument.
2020-07-27 12:50:38 +01:00

55 lines
1.6 KiB
Python

import json
import subprocess
from . import api
from .util import jsoncomm
class SourcesServer(api.BaseAPI):
endpoint = "sources"
def __init__(self, libdir, options, cache, output, *, socket_address=None):
super().__init__(socket_address)
self.libdir = libdir
self.cache = cache
self.output = output
self.options = options or {}
def _run_source(self, source, checksums):
msg = {
"options": self.options.get(source, {}),
"cache": f"{self.cache}/{source}",
"output": f"{self.output}/{source}",
"checksums": checksums,
"libdir": self.libdir
}
r = subprocess.run(
[f"{self.libdir}/sources/{source}"],
input=json.dumps(msg),
stdout=subprocess.PIPE,
encoding="utf-8",
check=False)
try:
return json.loads(r.stdout)
except ValueError:
return {"error": f"source returned malformed json: {r.stdout}"}
def _dispatch(self, server):
request, _, addr = server.recv()
reply = self._run_source(request["source"], request["checksums"])
server.send(reply, destination=addr)
def get(source, checksums, api_path="/run/osbuild/api/sources"):
with jsoncomm.Socket.new_client(api_path) as client:
msg = {
"source": source,
"checksums": checksums
}
client.send(msg)
reply, _, _ = client.recv()
if "error" in reply:
raise RuntimeError(f"{source}: " + reply["error"])
return reply