From 035781ea1ca5ab79ddf73b1f89c72b7911a0e40b Mon Sep 17 00:00:00 2001 From: Michael Vogt Date: Thu, 30 Jan 2025 13:08:16 +0100 Subject: [PATCH] osbuild: add a mutex to the _jsonseq() writer This commit fixes a race/threading issue with the way the monitor works. The osbuild monitor can be called from multiple threads, e.g. in buildroot.py:run() monitor.log() is called but also in host.py:_stdout_ready(). This can lead to out-of-order writes when many messages need to be processed. We did not notice this so far because we were lucky and also log was just used for information. But now it is used to transmit the jsonseq data which means out-of-order communication results in broken json. Closes: https://github.com/osbuild/image-builder-cli/issues/110 --- osbuild/monitor.py | 12 ++++++++---- test/mod/test_monitor.py | 26 ++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/osbuild/monitor.py b/osbuild/monitor.py index d0f117fa..9ece10f3 100644 --- a/osbuild/monitor.py +++ b/osbuild/monitor.py @@ -16,6 +16,7 @@ import json import os import sys import time +from threading import Lock from typing import Dict, Optional, Set, Union import osbuild @@ -234,6 +235,7 @@ class BaseMonitor(abc.ABC): def result(self, result: Union[BuildResult, DownloadResult]) -> None: """Called when a module (stage/assembler) is done with its result""" + # note that this should be re-entrant def log(self, message: str, origin: Optional[str] = None): """Called for all module log outputs""" @@ -316,6 +318,7 @@ class JSONSeqMonitor(BaseMonitor): self._ctx_ids: Set[str] = set() self._progress = Progress("pipelines/sources", total_steps) self._context = Context(origin="org.osbuild") + self._jsonseq_mu = Lock() def begin(self, pipeline: osbuild.Pipeline): self._context.set_pipeline(pipeline) @@ -362,10 +365,11 @@ class JSONSeqMonitor(BaseMonitor): )) def _jsonseq(self, entry: dict) -> None: - # follow rfc7464 (application/json-seq) - self.out.write("\x1e") - json.dump(entry, self.out) - self.out.write("\n") + with self._jsonseq_mu: + # follow rfc7464 (application/json-seq) + self.out.write("\x1e") + json.dump(entry, self.out) + self.out.write("\n") def make(name: str, fd: int, total_steps: int) -> BaseMonitor: diff --git a/test/mod/test_monitor.py b/test/mod/test_monitor.py index d8f3895e..148a4957 100644 --- a/test/mod/test_monitor.py +++ b/test/mod/test_monitor.py @@ -7,6 +7,7 @@ import json import os import sys import tempfile +import threading import time import unittest from collections import defaultdict @@ -416,3 +417,28 @@ def test_jsonseq_download_unhappy(mocked_download, tmp_path): assert log[1]["result"]["name"] == "source org.osbuild.curl" assert log[1]["result"]["success"] is False assert log[1]["result"]["output"] == "RuntimeError: curl: error download ...\n error stack" + + +def test_json_progress_monitor_handles_racy_writes(tmp_path): + output_path = tmp_path / "jsonseq.log" + with output_path.open("w") as fp: + mon = JSONSeqMonitor(fp.fileno(), 10) + + def racy_write(s): + for i in range(20): + mon.log(f"{s}: {i}") + time.sleep(0.0001) + t1 = threading.Thread(target=racy_write, args=("msg from t1",)) + t2 = threading.Thread(target=racy_write, args=("msg from t2",)) + t1.start() + t2.start() + t1.join() + t2.join() + # ensure the file is valid jsonseq + with output_path.open() as fp: + for line in fp.readlines(): + line = line.strip().strip("\1xe") + try: + json.loads(line) + except json.decoder.JSONDecodeError: + pytest.fail(f"the jsonseq stream is not valid json, got {line}")