stages: add root ssh options to bootc.install-to-filesystem
To start using the `bootc.instal-to-filesystem` stage we need to be able to test the generated images. This requires a login on the disk. Traditionally we did that via the `users` stage. But on a bootc system we do not want to modify the disk after bootc did the install to avoid messing with things like the selinux labels or (future) fsverity setups. So for now we will use the `--root-ssh-authorized-keys` feature to inject support for login. So this commit adds a new option to the stage called `root-ssh-authorized-keys`.
This commit is contained in:
parent
a7b4565445
commit
626077ffc0
2 changed files with 91 additions and 24 deletions
|
|
@ -11,6 +11,7 @@ Buildhost commands used: bootc
|
|||
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import osbuild.api
|
||||
from osbuild.util import containers
|
||||
|
|
@ -30,7 +31,16 @@ SCHEMA_2 = r"""
|
|||
}
|
||||
},
|
||||
"options": {
|
||||
"additionalProperties": false
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"root-ssh-authorized-keys": {
|
||||
"description": "array of SSH Public Keys to add to roots authorized_keys",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"devices": {
|
||||
"type": "object",
|
||||
|
|
@ -42,23 +52,30 @@ SCHEMA_2 = r"""
|
|||
"""
|
||||
|
||||
|
||||
def main(inputs, paths):
|
||||
def main(options, inputs, paths):
|
||||
images = containers.parse_containers_input(inputs)
|
||||
assert len(images) == 1
|
||||
image = list(images.values())[0]
|
||||
|
||||
with containers.container_source(image) as (_, source):
|
||||
dst = paths["mounts"]
|
||||
subprocess.run(
|
||||
["bootc", "install", "to-filesystem",
|
||||
"--source-imgref", source,
|
||||
"--skip-fetch-check", "--generic-image",
|
||||
dst],
|
||||
check=True,
|
||||
)
|
||||
pargs = ["bootc", "install", "to-filesystem",
|
||||
"--source-imgref", source,
|
||||
"--skip-fetch-check", "--generic-image"]
|
||||
# customize root
|
||||
root_ssh_keys = options.get("root-ssh-authorized-keys", [])
|
||||
if root_ssh_keys:
|
||||
tmpf = tempfile.NamedTemporaryFile(prefix="bootc-ssh-auth-keys-")
|
||||
for key in root_ssh_keys:
|
||||
tmpf.write(key.encode("utf8") + b"\n")
|
||||
tmpf.flush()
|
||||
pargs.extend(["--root-ssh-authorized-keys", tmpf.name])
|
||||
# add target and go
|
||||
pargs.append(dst)
|
||||
subprocess.run(pargs, check=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = osbuild.api.arguments()
|
||||
r = main(args["inputs"], args["paths"])
|
||||
r = main(args["options"], args["inputs"], args["paths"])
|
||||
sys.exit(r)
|
||||
|
|
|
|||
|
|
@ -1,17 +1,54 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import os.path
|
||||
import tempfile
|
||||
from contextlib import contextmanager
|
||||
from unittest.mock import call, patch
|
||||
from unittest.mock import Mock, call, patch
|
||||
|
||||
from osbuild.testutil.imports import import_module_from_path
|
||||
import pytest
|
||||
|
||||
STAGE_NAME = "org.osbuild.bootc.install-to-filesystem"
|
||||
|
||||
|
||||
@pytest.fixture(name="mocked_named_tmp")
|
||||
def mocked_named_tmp_fixture():
|
||||
fake_named_tmp = Mock()
|
||||
fake_named_tmp.name = "/tmp/fake-named-tmpfile-name"
|
||||
with patch("tempfile.NamedTemporaryFile", return_value=fake_named_tmp):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(name="mocked_temp_dir")
|
||||
def mocked_temp_dir_fixture(tmp_path):
|
||||
@contextmanager
|
||||
def _tmp_dir():
|
||||
yield tmp_path
|
||||
with patch("tempfile.TemporaryDirectory", side_effect=_tmp_dir):
|
||||
yield tmp_path
|
||||
|
||||
|
||||
FAKE_INPUTS = {
|
||||
"images": {
|
||||
"path": "/input/images/path",
|
||||
"data": {
|
||||
"archives": {
|
||||
"/input/images/path": {
|
||||
"format": "oci-archive",
|
||||
"name": "some-img-name",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("options,expected_args", [
|
||||
({}, []),
|
||||
({"root-ssh-authorized-keys": []}, []),
|
||||
({"root-ssh-authorized-keys": ["ssh-key"]}, ["--root-ssh-authorized-keys", "/tmp/fake-named-tmpfile-name"]),
|
||||
({"root-ssh-authorized-keys": ["key1", "key2"]}, ["--root-ssh-authorized-keys", "/tmp/fake-named-tmpfile-name"]),
|
||||
])
|
||||
@patch("subprocess.run")
|
||||
def test_bootc_install_to_fs(mock_run, tmp_path):
|
||||
stage_path = os.path.join(os.path.dirname(__file__), "../org.osbuild.bootc.install-to-filesystem")
|
||||
stage = import_module_from_path("bootc_install_to_fs_stage", stage_path)
|
||||
|
||||
def test_bootc_install_to_fs(mock_run, mocked_named_tmp, mocked_temp_dir, stage_module, options, expected_args): # pylint: disable=unused-argument
|
||||
inputs = {
|
||||
"images": {
|
||||
"path": "/input/images/path",
|
||||
|
|
@ -29,16 +66,29 @@ def test_bootc_install_to_fs(mock_run, tmp_path):
|
|||
"mounts": "/path/to/mounts",
|
||||
}
|
||||
|
||||
@contextmanager
|
||||
def faked_tmp_dir():
|
||||
yield tmp_path
|
||||
with patch("tempfile.TemporaryDirectory", side_effect=faked_tmp_dir):
|
||||
stage.main(inputs, paths)
|
||||
stage_module.main(options, inputs, paths)
|
||||
|
||||
assert len(mock_run.call_args_list) == 1
|
||||
assert mock_run.call_args_list == [
|
||||
call(["bootc", "install", "to-filesystem",
|
||||
"--source-imgref", f"oci-archive:{tmp_path}/image",
|
||||
"--source-imgref", f"oci-archive:{mocked_temp_dir}/image",
|
||||
"--skip-fetch-check", "--generic-image",
|
||||
"/path/to/mounts"], check=True)
|
||||
] + expected_args + ["/path/to/mounts"],
|
||||
check=True)
|
||||
]
|
||||
|
||||
|
||||
@patch("subprocess.run")
|
||||
def test_bootc_install_to_fs_write_root_ssh_keys(mock_run, stage_module): # pylint: disable=unused-argument
|
||||
paths = {
|
||||
"mounts": "/path/to/mounts",
|
||||
}
|
||||
options = {
|
||||
"root-ssh-authorized-keys": ["key1", "key2"],
|
||||
}
|
||||
|
||||
named_tmp = tempfile.NamedTemporaryFile(delete=False)
|
||||
with patch("tempfile.NamedTemporaryFile", return_value=named_tmp):
|
||||
stage_module.main(options, FAKE_INPUTS, paths)
|
||||
with open(named_tmp.name, encoding="utf8") as fp:
|
||||
assert "key1\nkey2\n" == fp.read()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue