debian-forge/osbuild.py
Tom Gundersen f25cffa151 osbuild.py/pipeline: assign a unique id to every pipeline
This uniquely identifies a pipeline based on its content. Pipelines
are considered equal modulo whitespace and the order of object
elements.

The intention is that two runs of a pipeline with the same id
generates functionaly equivalent ids. It is up to the writers
of stages and pipelines to ensure this property holds.

Signed-off-by: Tom Gundersen <teg@jklm.no>
2019-07-03 13:11:37 +02:00

235 lines
7.5 KiB
Python

import hashlib
import json
import os
import subprocess
import sys
import tempfile
__all__ = [
"StageFailed",
"BuildRoot",
"Pipeline",
"tmpfs"
]
RESET = "\033[0m"
BOLD = "\033[1m"
libdir = os.path.dirname(__file__)
if not os.path.exists(f"{libdir}/stages"):
libdir = f"{sys.prefix}/lib"
class StageFailed(Exception):
def __init__(self, stage, returncode, output):
self.stage = stage
self.returncode = returncode
self.output = output
class tmpfs:
def __init__(self, path="/run/osbuild"):
self.root = tempfile.mkdtemp(prefix="osbuild-tmpfs-", dir=path)
self.mounted = False
try:
subprocess.run(["mount", "-t", "tmpfs", "-o", "mode=0755", "tmpfs", self.root], check=True)
self.mounted = True
except subprocess.CalledProcessError:
self.unmount()
def unmount(self):
if not self.root:
return
if self.mounted:
subprocess.run(["umount", "--lazy", self.root], check=True)
os.rmdir(self.root)
self.root = None
def __del__(self):
self.unmount()
def __enter__(self):
return self.root
def __exit__(self, exc_type, exc_value, exc_tb):
self.unmount()
class BuildRoot:
def __init__(self, path="/run/osbuild"):
self.root = tempfile.mkdtemp(prefix="osbuild-buildroot-", dir=path)
self.mounted = False
try:
subprocess.run(["mount", "-o", "bind,ro", "/", self.root], check=True)
self.mounted = True
except subprocess.CalledProcessError:
self.unmount()
raise
# systemd-nspawn silently removes some characters when choosing a
# machine name from the directory name. The only one relevant for
# us is '_', because all other characters used by
# TemporaryDirectory() are allowed. Replace it with 'L's
# (TemporaryDirectory() only uses lower-case characters)
self.machine_name = os.path.basename(self.root).replace("_", "L")
def unmount(self):
if not self.root:
return
if self.mounted:
subprocess.run(["umount", "--lazy", self.root], check=True)
os.rmdir(self.root)
self.root = None
def run(self, argv, binds=[], readonly_binds=[], *args, **kwargs):
return subprocess.run([
"systemd-nspawn",
"--quiet",
"--as-pid2",
"--link-journal=no",
"--volatile=yes",
"--property=DeviceAllow=/dev/loop-control rw",
"--property=DeviceAllow=block-loop rw",
"--property=DeviceAllow=block-blkext rw",
f"--machine={self.machine_name}",
f"--directory={self.root}",
f"--bind={libdir}/osbuild-run:/run/osbuild/osbuild-run",
*[f"--bind={b}" for b in binds],
*[f"--bind-ro={b}" for b in readonly_binds],
"/run/osbuild/osbuild-run",
] + argv, *args, **kwargs)
def _get_system_resources_from_etc(self, stage_or_assembler):
resources = stage_or_assembler.get("systemResourcesFromEtc", [])
for r in resources:
if not r.startswith("/etc"):
raise ValueError(f"{r} is not a resource in /etc/")
if ":" in r:
raise ValueError(f"{r} tries to bind to a different location")
return resources
def run_stage(self, stage, tree, input_dir=None, interactive=False):
name = stage["name"]
args = {
"tree": "/run/osbuild/tree",
"options": stage.get("options", {})
}
robinds = [f"{libdir}/stages/{name}:/run/osbuild/{name}"]
robinds.extend(self._get_system_resources_from_etc(stage))
binds = [f"{tree}:/run/osbuild/tree", "/dev:/dev"]
if input_dir:
robinds.append(f"{input_dir}:/run/osbuild/input")
args["input_dir"] = "/run/osbuild/input"
try:
r = self.run([f"/run/osbuild/{name}"],
binds=binds,
readonly_binds=robinds,
input=json.dumps(args),
encoding="utf-8",
stdout=subprocess.PIPE if not interactive else None,
stderr=subprocess.STDOUT if not interactive else None,
check=True)
except subprocess.CalledProcessError as error:
raise StageFailed(name, error.returncode, error.stdout)
return {
"name": name,
"output": r.stdout
}
def run_assembler(self, assembler, tree, input_dir=None, output_dir=None, interactive=False):
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
name = assembler["name"]
args = {
"tree": "/run/osbuild/tree",
"options": assembler.get("options", {}),
}
robinds = [
f"{tree}:/run/osbuild/tree",
f"{libdir}/assemblers/{name}:/run/osbuild/{name}"
]
robinds.extend(self._get_system_resources_from_etc(assembler))
binds = ["/dev:/dev"]
if input_dir:
robinds.append(f"{input_dir}:/run/osbuild/input")
args["input_dir"] = "/run/osbuild/input"
if output_dir:
binds.append(f"{output_dir}:/run/osbuild/output")
args["output_dir"] = "/run/osbuild/output"
try:
r = self.run([f"/run/osbuild/{name}"],
binds=binds,
readonly_binds=robinds,
input=json.dumps(args),
encoding="utf-8",
stdout=subprocess.PIPE if not interactive else None,
stderr=subprocess.STDOUT if not interactive else None,
check=True)
except subprocess.CalledProcessError as error:
raise StageFailed(name, error.returncode, error.stdout)
return {
"name": name,
"output": r.stdout
}
def __del__(self):
self.unmount()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.unmount()
def print_header(title, options, machine_name):
print()
print(f"{RESET}{BOLD}{title}{RESET} " + json.dumps(options or {}, indent=2))
print("Inspect with:")
print(f"\t# nsenter -a --wd=/root -t `machinectl show {machine_name} -p Leader --value`")
print()
class Pipeline:
def __init__(self, pipeline):
m = hashlib.sha256()
m.update(json.dumps(pipeline, sort_keys=True).encode())
self.id = m.hexdigest()
self.stages = pipeline["stages"]
self.assembler = pipeline.get("assembler")
def run(self, input_dir, output_dir, interactive=False):
results = {
"stages": []
}
with BuildRoot() as buildroot, tmpfs() as tree:
for i, stage in enumerate(self.stages, start=1):
name = stage["name"]
options = stage.get("options", {})
if interactive:
print_header(f"{i}. {name}", options, buildroot.machine_name)
r = buildroot.run_stage(stage, tree, input_dir, interactive)
results["stages"].append(r)
if self.assembler:
name = self.assembler["name"]
options = self.assembler.get("options", {})
if interactive:
print_header(f"Assembling: {name}", options, buildroot.machine_name)
r = buildroot.run_assembler(self.assembler, tree, input_dir, output_dir, interactive)
results["assembler"] = r
self.results = results