We recently hit the issue that `osbuild` crashed with:
```
Unable to decode response body "Traceback (most recent call last):
File \"/usr/bin/osbuild\", line 33, in <module>
sys.exit(load_entry_point('osbuild==124', 'console_scripts', 'osbuild')())
File \"/usr/lib/python3.9/site-packages/osbuild/main_cli.py\", line 181, in osbuild_cli
r = manifest.build(
File \"/usr/lib/python3.9/site-packages/osbuild/pipeline.py\", line 477, in build
res = pl.run(store, monitor, libdir, debug_break, stage_timeout)
File \"/usr/lib/python3.9/site-packages/osbuild/pipeline.py\", line 376, in run
results = self.build_stages(store,
File \"/usr/lib/python3.9/site-packages/osbuild/pipeline.py\", line 348, in build_stages
r = stage.run(tree,
File \"/usr/lib/python3.9/site-packages/osbuild/pipeline.py\", line 213, in run
data = ipmgr.map(ip, store)
File \"/usr/lib/python3.9/site-packages/osbuild/inputs.py\", line 94, in map
reply, _ = client.call_with_fds(\"map\", {}, fds)
File \"/usr/lib/python3.9/site-packages/osbuild/host.py\", line 373, in call_with_fds
kind, data = self.protocol.decode_message(ret)
File \"/usr/lib/python3.9/site-packages/osbuild/host.py\", line 83, in decode_message
raise ProtocolError(\"message empty\")
osbuild.host.ProtocolError: message empty
cannot run osbuild: exit status 1" into osbuild result: invalid character 'T' looking for beginning of value
...
input/packages (org.osbuild.files): Traceback (most recent call last):
input/packages (org.osbuild.files): File "/usr/lib/osbuild/inputs/org.osbuild.files", line 226, in <module>
input/packages (org.osbuild.files): main()
input/packages (org.osbuild.files): File "/usr/lib/osbuild/inputs/org.osbuild.files", line 222, in main
input/packages (org.osbuild.files): service.main()
input/packages (org.osbuild.files): File "/usr/lib/python3.11/site-packages/osbuild/host.py", line 250, in main
input/packages (org.osbuild.files): self.serve()
input/packages (org.osbuild.files): File "/usr/lib/python3.11/site-packages/osbuild/host.py", line 284, in serve
input/packages (org.osbuild.files): self.sock.send(reply, fds=reply_fds)
input/packages (org.osbuild.files): File "/usr/lib/python3.11/site-packages/osbuild/util/jsoncomm.py", line 407, in send
input/packages (org.osbuild.files): n = self._socket.sendmsg([serialized], cmsg, 0)
input/packages (org.osbuild.files): ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
input/packages (org.osbuild.files): OSError: [Errno 90] Message too long
```
The underlying issue is that the reply of the `map()` call is too
big for the buffer that `jsoncomm` uses. This problem existed before
for the args of map and was fixed by introducing a temporary file
in https://github.com/osbuild/osbuild/pull/1331 (and similarly
before in https://github.com/osbuild/osbuild/pull/824).
This commit writes the return values also into a file. This should
fix the crash above and make the function more symetrical as well.
Alternative/complementary version of
https://github.com/osbuild/osbuild/pull/1833
Closes: HMS-4537
149 lines
4.6 KiB
Python
149 lines
4.6 KiB
Python
"""
|
|
Pipeline inputs
|
|
|
|
A pipeline input provides data in various forms to a `Stage`, like
|
|
files, OSTree commits or trees. The content can either be obtained
|
|
via a `Source` or have been built by a `Pipeline`. Thus an `Input`
|
|
is the bridge between various types of content that originate from
|
|
different types of sources.
|
|
|
|
The acceptable origin of the data is determined by the `Input`
|
|
itself. What types of input are allowed and required is determined
|
|
by the `Stage`.
|
|
|
|
To osbuild itself this is all transparent. The only data visible to
|
|
osbuild is the path. The input options are just passed to the
|
|
`Input` as is and the result is forwarded to the `Stage`.
|
|
"""
|
|
|
|
import abc
|
|
import contextlib
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import tempfile
|
|
from typing import Any, Dict, Optional, Tuple
|
|
|
|
from osbuild import host
|
|
from osbuild.util.types import PathLike
|
|
|
|
from .objectstore import ObjectStore, StoreClient, StoreServer
|
|
|
|
|
|
class Input:
|
|
"""
|
|
A single input with its corresponding options.
|
|
"""
|
|
|
|
def __init__(self, name, info, origin: str, options: Dict):
|
|
self.name = name
|
|
self.info = info
|
|
self.origin = origin
|
|
self.refs: Dict[str, Dict[str, Any]] = {}
|
|
self.options = options or {}
|
|
self.id = self.calc_id()
|
|
|
|
def add_reference(self, ref, options: Optional[Dict] = None):
|
|
self.refs[ref] = options or {}
|
|
self.id = self.calc_id()
|
|
|
|
def calc_id(self):
|
|
# NB: The input `name` is not included here on purpose since it
|
|
# is either prescribed by the stage itself and thus not actual
|
|
# parameter or arbitrary and chosen by the manifest generator
|
|
# and thus can be changed without affecting the contents
|
|
m = hashlib.sha256()
|
|
m.update(json.dumps(self.info.name, sort_keys=True).encode())
|
|
m.update(json.dumps(self.origin, sort_keys=True).encode())
|
|
m.update(json.dumps(self.refs, sort_keys=True).encode())
|
|
m.update(json.dumps(self.options, sort_keys=True).encode())
|
|
return m.hexdigest()
|
|
|
|
|
|
class InputManager:
|
|
def __init__(self, mgr: host.ServiceManager, storeapi: StoreServer, root: PathLike) -> None:
|
|
self.service_manager = mgr
|
|
self.storeapi = storeapi
|
|
self.root = root
|
|
self.inputs: Dict[str, Input] = {}
|
|
|
|
def map(self, ip: Input, store: ObjectStore) -> Tuple[str, Dict]:
|
|
|
|
target = os.path.join(self.root, ip.name)
|
|
os.makedirs(target)
|
|
|
|
args = {
|
|
# mandatory bits
|
|
"origin": ip.origin,
|
|
"refs": ip.refs,
|
|
|
|
"target": target,
|
|
|
|
# global options
|
|
"options": ip.options,
|
|
|
|
# API endpoints
|
|
"api": {
|
|
"store": self.storeapi.socket_address
|
|
}
|
|
}
|
|
|
|
with make_args_and_reply_files(store.tmp, args) as (fd_args, fd_reply):
|
|
fds = [fd_args, fd_reply]
|
|
client = self.service_manager.start(f"input/{ip.name}", ip.info.path)
|
|
_, _ = client.call_with_fds("map", {}, fds)
|
|
with os.fdopen(os.dup(fd_reply)) as f:
|
|
reply = json.loads(f.read())
|
|
|
|
path = reply["path"]
|
|
|
|
if not path.startswith(self.root):
|
|
raise RuntimeError(f"returned {path} has wrong prefix")
|
|
|
|
reply["path"] = os.path.relpath(path, self.root)
|
|
|
|
self.inputs[ip.name] = reply
|
|
|
|
return reply
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def make_args_and_reply_files(tmp, args):
|
|
with tempfile.TemporaryFile("w+", dir=tmp, encoding="utf-8") as f_args, \
|
|
tempfile.TemporaryFile("w+", dir=tmp, encoding="utf-8") as f_reply:
|
|
json.dump(args, f_args)
|
|
f_args.seek(0)
|
|
yield f_args.fileno(), f_reply.fileno()
|
|
|
|
|
|
class InputService(host.Service):
|
|
"""Input host service"""
|
|
|
|
@abc.abstractmethod
|
|
def map(self, store, origin, refs, target, options):
|
|
pass
|
|
|
|
def unmap(self):
|
|
pass
|
|
|
|
def stop(self):
|
|
self.unmap()
|
|
|
|
def dispatch(self, method: str, _, fds):
|
|
if method == "map":
|
|
# map() sends fd[0] to read the arguments from and fd[1] to
|
|
# write the reply back. This avoids running into EMSGSIZE
|
|
with os.fdopen(fds.steal(0)) as f:
|
|
args = json.load(f)
|
|
store = StoreClient(connect_to=args["api"]["store"])
|
|
r = self.map(store,
|
|
args["origin"],
|
|
args["refs"],
|
|
args["target"],
|
|
args["options"])
|
|
with os.fdopen(fds.steal(1), "w") as f:
|
|
f.write(json.dumps(r))
|
|
f.seek(0)
|
|
return "{}", None
|
|
|
|
raise host.ProtocolError("Unknown method")
|