osbuild: create API sockets in the thread they're used in
This might (hopefully) fix a race in destructing the asyncio.EventLoop that's used in all API classes, which leads to warnings about unhandled exceptions on CI. This also puts their creation closer to where the client-side sockets are created.
This commit is contained in:
parent
57b2c1e12d
commit
b9b2f99123
5 changed files with 38 additions and 46 deletions
|
|
@ -2,22 +2,22 @@ import array
|
|||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import sys
|
||||
|
||||
|
||||
from . import remoteloop
|
||||
|
||||
|
||||
class API:
|
||||
def __init__(self, sock, args, interactive):
|
||||
self.sock = sock
|
||||
def __init__(self, socket_address, args, interactive):
|
||||
self.socket_address = socket_address
|
||||
self.input = args
|
||||
self.interactive = interactive
|
||||
self._output = None
|
||||
self.event_loop = asyncio.new_event_loop()
|
||||
self.event_loop.add_reader(self.sock, self._dispatch)
|
||||
self.thread = threading.Thread(target=self._run_event_loop)
|
||||
|
||||
@property
|
||||
|
|
@ -38,7 +38,7 @@ class API:
|
|||
self._output = os.fdopen(fd)
|
||||
return out
|
||||
|
||||
def _setup_stdio(self, addr):
|
||||
def _setup_stdio(self, sock, addr):
|
||||
with self._prepare_input() as stdin, \
|
||||
self._prepare_output() as stdout:
|
||||
msg = {}
|
||||
|
|
@ -49,19 +49,22 @@ class API:
|
|||
msg['stdout'] = 1
|
||||
fds.append(stdout.fileno())
|
||||
msg['stderr'] = 2
|
||||
remoteloop.dump_fds(self.sock, msg, fds, addr=addr)
|
||||
remoteloop.dump_fds(sock, msg, fds, addr=addr)
|
||||
|
||||
def _dispatch(self):
|
||||
msg, addr = self.sock.recvfrom(1024)
|
||||
def _dispatch(self, sock):
|
||||
msg, addr = sock.recvfrom(1024)
|
||||
args = json.loads(msg)
|
||||
if args["method"] == 'setup-stdio':
|
||||
self._setup_stdio(addr)
|
||||
self._setup_stdio(sock, addr)
|
||||
|
||||
def _run_event_loop(self):
|
||||
# Set the thread-local event loop
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
sock.bind(self.socket_address)
|
||||
self.event_loop.add_reader(sock, self._dispatch, sock)
|
||||
asyncio.set_event_loop(self.event_loop)
|
||||
# Run event loop until stopped
|
||||
self.event_loop.run_forever()
|
||||
self.event_loop.remove_reader(sock)
|
||||
sock.close()
|
||||
|
||||
def __enter__(self):
|
||||
self.thread.start()
|
||||
|
|
|
|||
|
|
@ -1,8 +1,6 @@
|
|||
|
||||
import contextlib
|
||||
import os
|
||||
import platform
|
||||
import socket
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
|
@ -95,17 +93,6 @@ class BuildRoot:
|
|||
f"/run/osbuild/lib/runners/{self.runner}"
|
||||
] + argv, check=check, **kwargs)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def bound_socket(self, name):
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
sock_path = os.path.join(self.api, name)
|
||||
sock.bind(os.path.join(self.api, name))
|
||||
try:
|
||||
yield sock
|
||||
finally:
|
||||
os.unlink(sock_path)
|
||||
sock.close()
|
||||
|
||||
def __del__(self):
|
||||
self.unmount()
|
||||
|
||||
|
|
|
|||
|
|
@ -80,10 +80,8 @@ class Stage:
|
|||
|
||||
sources_dir = f"{libdir}/sources" if libdir else "/usr/lib/osbuild/sources"
|
||||
|
||||
with build_root.bound_socket("osbuild") as osbuild_sock, \
|
||||
build_root.bound_socket("sources") as sources_sock, \
|
||||
API(osbuild_sock, args, interactive) as api, \
|
||||
sources.SourcesServer(sources_sock, sources_dir, source_options or {}):
|
||||
with API(f"{build_root.api}/osbuild", args, interactive) as api, \
|
||||
sources.SourcesServer(f"{build_root.api}/sources", sources_dir, source_options or {}):
|
||||
r = build_root.run(
|
||||
[f"/run/osbuild/lib/stages/{self.name}"],
|
||||
binds=[f"{tree}:/run/osbuild/tree"],
|
||||
|
|
@ -141,10 +139,8 @@ class Assembler:
|
|||
# buildroot we should remove this because it includes code from the host in the buildroot thus
|
||||
# violating our effort of reproducibility.
|
||||
ro_binds.append(f"{osbuild_module_path}:/run/osbuild/lib/assemblers/osbuild")
|
||||
with build_root.bound_socket("remoteloop") as loop_sock, \
|
||||
build_root.bound_socket("osbuild") as osbuild_sock, \
|
||||
remoteloop.LoopServer(loop_sock), \
|
||||
API(osbuild_sock, args, interactive) as api:
|
||||
with remoteloop.LoopServer(f"{build_root.api}/remoteloop"), \
|
||||
API(f"{build_root.api}/osbuild", args, interactive) as api:
|
||||
r = build_root.run(
|
||||
[f"/run/osbuild/lib/assemblers/{self.name}"],
|
||||
binds=binds,
|
||||
|
|
|
|||
|
|
@ -54,12 +54,11 @@ class LoopServer:
|
|||
object.
|
||||
"""
|
||||
|
||||
def __init__(self, sock):
|
||||
def __init__(self, socket_address):
|
||||
self.socket_address = socket_address
|
||||
self.devs = []
|
||||
self.sock = sock
|
||||
self.ctl = loop.LoopControl()
|
||||
self.event_loop = asyncio.new_event_loop()
|
||||
self.event_loop.add_reader(self.sock, self._dispatch)
|
||||
self.thread = threading.Thread(target=self._run_event_loop)
|
||||
|
||||
def _create_device(self, fd, dir_fd, offset=None, sizelimit=None):
|
||||
|
|
@ -81,8 +80,8 @@ class LoopServer:
|
|||
self.devs.append(lo)
|
||||
return lo.devname
|
||||
|
||||
def _dispatch(self):
|
||||
args, fds, addr = load_fds(self.sock, 1024)
|
||||
def _dispatch(self, sock):
|
||||
args, fds, addr = load_fds(sock, 1024)
|
||||
|
||||
fd = fds[args["fd"]]
|
||||
dir_fd = fds[args["dir_fd"]]
|
||||
|
|
@ -91,13 +90,16 @@ class LoopServer:
|
|||
|
||||
devname = self._create_device(fd, dir_fd, offset, sizelimit)
|
||||
ret = json.dumps({"devname": devname})
|
||||
self.sock.sendto(ret.encode('utf-8'), addr)
|
||||
sock.sendto(ret.encode('utf-8'), addr)
|
||||
|
||||
def _run_event_loop(self):
|
||||
# Set the thread-local event loop
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
sock.bind(self.socket_address)
|
||||
self.event_loop.add_reader(sock, self._dispatch, sock)
|
||||
asyncio.set_event_loop(self.event_loop)
|
||||
# Run event loop until stopped
|
||||
self.event_loop.run_forever()
|
||||
self.event_loop.remove_reader(sock)
|
||||
sock.close()
|
||||
|
||||
def __enter__(self):
|
||||
self.thread.start()
|
||||
|
|
|
|||
|
|
@ -6,12 +6,11 @@ import threading
|
|||
|
||||
|
||||
class SourcesServer:
|
||||
def __init__(self, sock, sources_dir, source_options):
|
||||
self.sock = sock
|
||||
def __init__(self, socket_address, sources_dir, source_options):
|
||||
self.socket_address = socket_address
|
||||
self.sources_dir = sources_dir
|
||||
self.source_options = source_options
|
||||
self.event_loop = asyncio.new_event_loop()
|
||||
self.event_loop.add_reader(self.sock, self._dispatch)
|
||||
self.thread = threading.Thread(target=self._run_event_loop)
|
||||
|
||||
def _run_source(self, source, checksums):
|
||||
|
|
@ -29,16 +28,21 @@ class SourcesServer:
|
|||
|
||||
return json.loads(r.stdout)
|
||||
|
||||
def _dispatch(self):
|
||||
msg, addr = self.sock.recvfrom(8182)
|
||||
def _dispatch(self, sock):
|
||||
msg, addr = sock.recvfrom(8182)
|
||||
request = json.loads(msg)
|
||||
reply = self._run_source(request["source"], request["checksums"])
|
||||
msg = json.dumps(reply).encode("utf-8")
|
||||
self.sock.sendmsg([msg], [], 0, addr)
|
||||
sock.sendmsg([msg], [], 0, addr)
|
||||
|
||||
def _run_event_loop(self):
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
sock.bind(self.socket_address)
|
||||
self.event_loop.add_reader(sock, self._dispatch, sock)
|
||||
asyncio.set_event_loop(self.event_loop)
|
||||
self.event_loop.run_forever()
|
||||
self.event_loop.remove_reader(sock)
|
||||
sock.close()
|
||||
|
||||
def __enter__(self):
|
||||
self.thread.start()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue