diff --git a/osbuild/api.py b/osbuild/api.py index 345d1eee..21530bee 100644 --- a/osbuild/api.py +++ b/osbuild/api.py @@ -2,22 +2,22 @@ import array import asyncio import json import os +import socket +import sys import tempfile import threading -import sys from . import remoteloop class API: - def __init__(self, sock, args, interactive): - self.sock = sock + def __init__(self, socket_address, args, interactive): + self.socket_address = socket_address self.input = args self.interactive = interactive self._output = None self.event_loop = asyncio.new_event_loop() - self.event_loop.add_reader(self.sock, self._dispatch) self.thread = threading.Thread(target=self._run_event_loop) @property @@ -38,7 +38,7 @@ class API: self._output = os.fdopen(fd) return out - def _setup_stdio(self, addr): + def _setup_stdio(self, sock, addr): with self._prepare_input() as stdin, \ self._prepare_output() as stdout: msg = {} @@ -49,19 +49,22 @@ class API: msg['stdout'] = 1 fds.append(stdout.fileno()) msg['stderr'] = 2 - remoteloop.dump_fds(self.sock, msg, fds, addr=addr) + remoteloop.dump_fds(sock, msg, fds, addr=addr) - def _dispatch(self): - msg, addr = self.sock.recvfrom(1024) + def _dispatch(self, sock): + msg, addr = sock.recvfrom(1024) args = json.loads(msg) if args["method"] == 'setup-stdio': - self._setup_stdio(addr) + self._setup_stdio(sock, addr) def _run_event_loop(self): - # Set the thread-local event loop + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.bind(self.socket_address) + self.event_loop.add_reader(sock, self._dispatch, sock) asyncio.set_event_loop(self.event_loop) - # Run event loop until stopped self.event_loop.run_forever() + self.event_loop.remove_reader(sock) + sock.close() def __enter__(self): self.thread.start() diff --git a/osbuild/buildroot.py b/osbuild/buildroot.py index 1b869cf4..d3f0679d 100644 --- a/osbuild/buildroot.py +++ b/osbuild/buildroot.py @@ -1,8 +1,6 @@ -import contextlib import os import platform -import socket import shutil import subprocess import tempfile @@ -95,17 +93,6 @@ class BuildRoot: f"/run/osbuild/lib/runners/{self.runner}" ] + argv, check=check, **kwargs) - @contextlib.contextmanager - def bound_socket(self, name): - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - sock_path = os.path.join(self.api, name) - sock.bind(os.path.join(self.api, name)) - try: - yield sock - finally: - os.unlink(sock_path) - sock.close() - def __del__(self): self.unmount() diff --git a/osbuild/pipeline.py b/osbuild/pipeline.py index a66e24e4..3253daaf 100644 --- a/osbuild/pipeline.py +++ b/osbuild/pipeline.py @@ -80,10 +80,8 @@ class Stage: sources_dir = f"{libdir}/sources" if libdir else "/usr/lib/osbuild/sources" - with build_root.bound_socket("osbuild") as osbuild_sock, \ - build_root.bound_socket("sources") as sources_sock, \ - API(osbuild_sock, args, interactive) as api, \ - sources.SourcesServer(sources_sock, sources_dir, source_options or {}): + with API(f"{build_root.api}/osbuild", args, interactive) as api, \ + sources.SourcesServer(f"{build_root.api}/sources", sources_dir, source_options or {}): r = build_root.run( [f"/run/osbuild/lib/stages/{self.name}"], binds=[f"{tree}:/run/osbuild/tree"], @@ -141,10 +139,8 @@ class Assembler: # buildroot we should remove this because it includes code from the host in the buildroot thus # violating our effort of reproducibility. ro_binds.append(f"{osbuild_module_path}:/run/osbuild/lib/assemblers/osbuild") - with build_root.bound_socket("remoteloop") as loop_sock, \ - build_root.bound_socket("osbuild") as osbuild_sock, \ - remoteloop.LoopServer(loop_sock), \ - API(osbuild_sock, args, interactive) as api: + with remoteloop.LoopServer(f"{build_root.api}/remoteloop"), \ + API(f"{build_root.api}/osbuild", args, interactive) as api: r = build_root.run( [f"/run/osbuild/lib/assemblers/{self.name}"], binds=binds, diff --git a/osbuild/remoteloop.py b/osbuild/remoteloop.py index 216d34e4..c37a1d1b 100644 --- a/osbuild/remoteloop.py +++ b/osbuild/remoteloop.py @@ -54,12 +54,11 @@ class LoopServer: object. """ - def __init__(self, sock): + def __init__(self, socket_address): + self.socket_address = socket_address self.devs = [] - self.sock = sock self.ctl = loop.LoopControl() self.event_loop = asyncio.new_event_loop() - self.event_loop.add_reader(self.sock, self._dispatch) self.thread = threading.Thread(target=self._run_event_loop) def _create_device(self, fd, dir_fd, offset=None, sizelimit=None): @@ -81,8 +80,8 @@ class LoopServer: self.devs.append(lo) return lo.devname - def _dispatch(self): - args, fds, addr = load_fds(self.sock, 1024) + def _dispatch(self, sock): + args, fds, addr = load_fds(sock, 1024) fd = fds[args["fd"]] dir_fd = fds[args["dir_fd"]] @@ -91,13 +90,16 @@ class LoopServer: devname = self._create_device(fd, dir_fd, offset, sizelimit) ret = json.dumps({"devname": devname}) - self.sock.sendto(ret.encode('utf-8'), addr) + sock.sendto(ret.encode('utf-8'), addr) def _run_event_loop(self): - # Set the thread-local event loop + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.bind(self.socket_address) + self.event_loop.add_reader(sock, self._dispatch, sock) asyncio.set_event_loop(self.event_loop) - # Run event loop until stopped self.event_loop.run_forever() + self.event_loop.remove_reader(sock) + sock.close() def __enter__(self): self.thread.start() diff --git a/osbuild/sources.py b/osbuild/sources.py index 084e6ada..53656a29 100644 --- a/osbuild/sources.py +++ b/osbuild/sources.py @@ -6,12 +6,11 @@ import threading class SourcesServer: - def __init__(self, sock, sources_dir, source_options): - self.sock = sock + def __init__(self, socket_address, sources_dir, source_options): + self.socket_address = socket_address self.sources_dir = sources_dir self.source_options = source_options self.event_loop = asyncio.new_event_loop() - self.event_loop.add_reader(self.sock, self._dispatch) self.thread = threading.Thread(target=self._run_event_loop) def _run_source(self, source, checksums): @@ -29,16 +28,21 @@ class SourcesServer: return json.loads(r.stdout) - def _dispatch(self): - msg, addr = self.sock.recvfrom(8182) + def _dispatch(self, sock): + msg, addr = sock.recvfrom(8182) request = json.loads(msg) reply = self._run_source(request["source"], request["checksums"]) msg = json.dumps(reply).encode("utf-8") - self.sock.sendmsg([msg], [], 0, addr) + sock.sendmsg([msg], [], 0, addr) def _run_event_loop(self): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + sock.bind(self.socket_address) + self.event_loop.add_reader(sock, self._dispatch, sock) asyncio.set_event_loop(self.event_loop) self.event_loop.run_forever() + self.event_loop.remove_reader(sock) + sock.close() def __enter__(self): self.thread.start()