This changes the sources module to explicitly cleanup event-loops.
Additionally, the implementation is protected against re-entrency which
we do not support (and do not need).
We did occasionally get the following exception when running
source-servers:
/usr/lib/python3.8/asyncio/base_events.py:654: ResourceWarning: unclosed event loop <_UnixSelectorEventLoop running=False closed=False debug=False>
_warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
ResourceWarning: Enable tracemalloc to get the object allocation traceback
Exception ignored in: <function BaseEventLoop.__del__ at 0x7f92589d14c0>
Traceback (most recent call last):
File "/usr/lib/python3.8/asyncio/base_events.py", line 656, in __del__
self.close()
File "/usr/lib/python3.8/asyncio/unix_events.py", line 58, in close
super().close()
File "/usr/lib/python3.8/asyncio/selector_events.py", line 92, in close
self._close_self_pipe()
File "/usr/lib/python3.8/asyncio/selector_events.py", line 99, in _close_self_pipe
self._remove_reader(self._ssock.fileno())
File "/usr/lib/python3.8/asyncio/selector_events.py", line 274, in _remove_reader
key = self._selector.get_key(fd)
File "/usr/lib/python3.8/selectors.py", line 190, in get_key
return mapping[fileobj]
File "/usr/lib/python3.8/selectors.py", line 71, in __getitem__
fd = self._selector._fileobj_lookup(fileobj)
File "/usr/lib/python3.8/selectors.py", line 225, in _fileobj_lookup
return _fileobj_to_fd(fileobj)
File "/usr/lib/python3.8/selectors.py", line 42, in _fileobj_to_fd
raise ValueError("Invalid file descriptor: {}".format(fd))
ValueError: Invalid file descriptor: -1
This is triggered when an event-loop is not closed explicitly via
`event_loop.close()`. It then tries to cleanup explicitly. The problem
here is that python has no knowledge of in which order it should
collect GC'ed objects. This might end up more or less random. Therefore,
file-descriptors might be closed in arbitrary order, leading to the
event-loop being unable to unregister its internal objects.
I am not entirely sure whether this is the case here. However, the error
definitely triggers on the internal event-loop socketpair, which there
is no other external access to. Furthermore, this socketpair is only set
to -1 in its own __del__ function. So unless we have a memory
corruption, I see nothing else that could trigger this.
With this fix in place, I can run `test_sources.py` in a loop without
triggering the bug.
It is quite likely that our other `*Server` classes need the same fix. I
did not verify, yet.
87 lines
2.7 KiB
Python
87 lines
2.7 KiB
Python
import asyncio
|
|
import json
|
|
import subprocess
|
|
import threading
|
|
from .util import jsoncomm
|
|
|
|
|
|
class SourcesServer:
|
|
# pylint: disable=too-many-instance-attributes
|
|
def __init__(self, socket_address, sources_libdir, options, cache, output, secrets=None):
|
|
self.socket_address = socket_address
|
|
self.sources_libdir = sources_libdir
|
|
self.cache = cache
|
|
self.output = output
|
|
self.options = options or {}
|
|
self.secrets = secrets or {}
|
|
self.barrier = threading.Barrier(2)
|
|
self.event_loop = None
|
|
self.thread = None
|
|
|
|
def _run_source(self, source, checksums):
|
|
msg = {
|
|
"options": self.options.get(source, {}),
|
|
"secrets": self.secrets.get(source, {}),
|
|
"cache": f"{self.cache}/{source}",
|
|
"output": f"{self.output}/{source}",
|
|
"checksums": checksums
|
|
}
|
|
|
|
r = subprocess.run(
|
|
[f"{self.sources_libdir}/{source}"],
|
|
input=json.dumps(msg),
|
|
stdout=subprocess.PIPE,
|
|
encoding="utf-8",
|
|
check=False)
|
|
|
|
try:
|
|
return json.loads(r.stdout)
|
|
except ValueError:
|
|
return {"error": f"source returned malformed json: {r.stdout}"}
|
|
|
|
def _dispatch(self, server):
|
|
request, _, addr = server.recv()
|
|
reply = self._run_source(request["source"], request["checksums"])
|
|
server.send(reply, destination=addr)
|
|
|
|
def _run_event_loop(self):
|
|
with jsoncomm.Socket.new_server(self.socket_address) as server:
|
|
self.barrier.wait()
|
|
self.event_loop.add_reader(server, self._dispatch, server)
|
|
asyncio.set_event_loop(self.event_loop)
|
|
self.event_loop.run_forever()
|
|
self.event_loop.remove_reader(server)
|
|
|
|
def __enter__(self):
|
|
# We are not re-entrant, so complain if re-entered.
|
|
assert self.event_loop is None
|
|
|
|
self.event_loop = asyncio.new_event_loop()
|
|
self.thread = threading.Thread(target=self._run_event_loop)
|
|
|
|
self.barrier.reset()
|
|
self.thread.start()
|
|
self.barrier.wait()
|
|
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
|
|
self.thread.join()
|
|
self.event_loop.close()
|
|
|
|
self.thread = None
|
|
self.event_loop = None
|
|
|
|
|
|
def get(source, checksums, api_path="/run/osbuild/api/sources"):
|
|
with jsoncomm.Socket.new_client(api_path) as client:
|
|
msg = {
|
|
"source": source,
|
|
"checksums": checksums
|
|
}
|
|
client.send(msg)
|
|
reply, _, _ = client.recv()
|
|
if "error" in reply:
|
|
raise RuntimeError(f"{source}: " + reply["error"])
|
|
return reply
|