osbuild: add a mutex to the _jsonseq() writer
This commit fixes a race/threading issue with the way the monitor works. The osbuild monitor can be called from multiple threads, e.g. in buildroot.py:run() monitor.log() is called but also in host.py:_stdout_ready(). This can lead to out-of-order writes when many messages need to be processed. We did not notice this so far because we were lucky and also log was just used for information. But now it is used to transmit the jsonseq data which means out-of-order communication results in broken json. Closes: https://github.com/osbuild/image-builder-cli/issues/110
This commit is contained in:
parent
752f4af6b3
commit
035781ea1c
2 changed files with 34 additions and 4 deletions
|
|
@ -16,6 +16,7 @@ import json
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
from threading import Lock
|
||||
from typing import Dict, Optional, Set, Union
|
||||
|
||||
import osbuild
|
||||
|
|
@ -234,6 +235,7 @@ class BaseMonitor(abc.ABC):
|
|||
def result(self, result: Union[BuildResult, DownloadResult]) -> None:
|
||||
"""Called when a module (stage/assembler) is done with its result"""
|
||||
|
||||
# note that this should be re-entrant
|
||||
def log(self, message: str, origin: Optional[str] = None):
|
||||
"""Called for all module log outputs"""
|
||||
|
||||
|
|
@ -316,6 +318,7 @@ class JSONSeqMonitor(BaseMonitor):
|
|||
self._ctx_ids: Set[str] = set()
|
||||
self._progress = Progress("pipelines/sources", total_steps)
|
||||
self._context = Context(origin="org.osbuild")
|
||||
self._jsonseq_mu = Lock()
|
||||
|
||||
def begin(self, pipeline: osbuild.Pipeline):
|
||||
self._context.set_pipeline(pipeline)
|
||||
|
|
@ -362,10 +365,11 @@ class JSONSeqMonitor(BaseMonitor):
|
|||
))
|
||||
|
||||
def _jsonseq(self, entry: dict) -> None:
|
||||
# follow rfc7464 (application/json-seq)
|
||||
self.out.write("\x1e")
|
||||
json.dump(entry, self.out)
|
||||
self.out.write("\n")
|
||||
with self._jsonseq_mu:
|
||||
# follow rfc7464 (application/json-seq)
|
||||
self.out.write("\x1e")
|
||||
json.dump(entry, self.out)
|
||||
self.out.write("\n")
|
||||
|
||||
|
||||
def make(name: str, fd: int, total_steps: int) -> BaseMonitor:
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import json
|
|||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
from collections import defaultdict
|
||||
|
|
@ -416,3 +417,28 @@ def test_jsonseq_download_unhappy(mocked_download, tmp_path):
|
|||
assert log[1]["result"]["name"] == "source org.osbuild.curl"
|
||||
assert log[1]["result"]["success"] is False
|
||||
assert log[1]["result"]["output"] == "RuntimeError: curl: error download ...\n error stack"
|
||||
|
||||
|
||||
def test_json_progress_monitor_handles_racy_writes(tmp_path):
|
||||
output_path = tmp_path / "jsonseq.log"
|
||||
with output_path.open("w") as fp:
|
||||
mon = JSONSeqMonitor(fp.fileno(), 10)
|
||||
|
||||
def racy_write(s):
|
||||
for i in range(20):
|
||||
mon.log(f"{s}: {i}")
|
||||
time.sleep(0.0001)
|
||||
t1 = threading.Thread(target=racy_write, args=("msg from t1",))
|
||||
t2 = threading.Thread(target=racy_write, args=("msg from t2",))
|
||||
t1.start()
|
||||
t2.start()
|
||||
t1.join()
|
||||
t2.join()
|
||||
# ensure the file is valid jsonseq
|
||||
with output_path.open() as fp:
|
||||
for line in fp.readlines():
|
||||
line = line.strip().strip("\1xe")
|
||||
try:
|
||||
json.loads(line)
|
||||
except json.decoder.JSONDecodeError:
|
||||
pytest.fail(f"the jsonseq stream is not valid json, got {line}")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue