debian-forge/osbuild/util/ostree.py
2024-03-05 16:10:27 +01:00

391 lines
12 KiB
Python

import collections
import contextlib
import glob
import json
import os
import re
import subprocess
import sys
import tempfile
import typing
# pylint doesn't understand the string-annotation below
from typing import Any, Dict, List, Tuple # pylint: disable=unused-import
from osbuild.util.rhsm import Subscriptions
from .types import PathLike
class Param:
"""rpm-ostree Treefile parameter"""
def __init__(self, value_type, mandatory=False):
self.type = value_type
self.mandatory = mandatory
def check(self, value):
origin = getattr(self.type, "__origin__", None)
if origin:
self.typecheck(value, origin)
if origin is list or origin is typing.List:
self.check_list(value, self.type)
else:
raise NotImplementedError(origin)
else:
self.typecheck(value, self.type)
@staticmethod
def check_list(value, tp):
inner = tp.__args__
for x in value:
Param.typecheck(x, inner)
@staticmethod
def typecheck(value, tp):
if isinstance(value, tp):
return
raise ValueError(f"{value} is not of {tp}")
class Treefile:
"""Representation of an rpm-ostree Treefile
The following parameters are currently supported,
presented together with the rpm-ostree compose
phase that they are used in.
- ref: commit
- repos: install
- selinux: install, postprocess, commit
- boot-location: postprocess
- etc-group-members: postprocess
- machineid-compat
- selinux-label-version: commit
NB: 'ref' and 'repos' are mandatory and must be
present, even if they are not used in the given
phase; they therefore have defaults preset.
"""
parameters = {
"ref": Param(str, True),
"repos": Param(List[str], True),
"selinux": Param(bool),
"boot-location": Param(str),
"etc-group-members": Param(List[str]),
"machineid-compat": Param(bool),
"initramfs-args": Param(List[str]),
"selinux-label-version": Param(int),
}
def __init__(self):
self._data = {}
self["ref"] = "osbuild/devel"
self["repos"] = ["osbuild"]
def __getitem__(self, key):
param = self.parameters.get(key)
if not param:
raise ValueError(f"Unknown param: {key}")
return self._data[key]
def __setitem__(self, key, value):
param = self.parameters.get(key)
if not param:
raise ValueError(f"Unknown param: {key}")
param.check(value)
self._data[key] = value
def dumps(self):
return json.dumps(self._data)
def dump(self, fp):
return json.dump(self._data, fp)
@contextlib.contextmanager
def as_tmp_file(self):
name = None
try:
fd, name = tempfile.mkstemp(suffix=".json",
text=True)
with os.fdopen(fd, "w+", encoding="utf8") as f:
self.dump(f)
yield name
finally:
if name:
os.unlink(name)
def setup_remote(repo, name, remote):
"""Configure an OSTree remote in a given repo"""
url = remote["url"]
gpg = remote.get("gpgkeys", [])
remote_add_args = []
if not gpg:
remote_add_args = ["--no-gpg-verify"]
if "contenturl" in remote:
remote_add_args.append(f"--contenturl={remote['contenturl']}")
if remote.get("secrets", {}).get("name") == "org.osbuild.rhsm.consumer":
secrets = Subscriptions.get_consumer_secrets()
remote_add_args.append(f"--set=tls-client-key-path={secrets['consumer_key']}")
remote_add_args.append(f"--set=tls-client-cert-path={secrets['consumer_cert']}")
cli("remote", "add", name, url,
*remote_add_args, repo=repo)
for key in gpg:
cli("remote", "gpg-import", "--stdin",
name, repo=repo, _input=key)
def rev_parse(repo: PathLike, ref: str) -> str:
"""Resolve an OSTree reference `ref` in the repository at `repo`"""
repo = os.fspath(repo)
if isinstance(repo, bytes):
repo = repo.decode("utf8")
r = subprocess.run(["ostree", "rev-parse", ref, f"--repo={repo}"],
encoding="utf8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=False)
msg = r.stdout.strip()
if r.returncode != 0:
raise RuntimeError(msg)
return msg
def show(repo: PathLike, checksum: str) -> str:
"""Show the metada of an OSTree object pointed by `checksum` in the repository at `repo`"""
repo = os.fspath(repo)
if isinstance(repo, bytes):
repo = repo.decode("utf8")
r = subprocess.run(["ostree", "show", f"--repo={repo}", checksum],
encoding="utf8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=False)
msg = r.stdout.strip()
if r.returncode != 0:
raise RuntimeError(msg)
return msg
def pull_local(source_repo: PathLike, target_repo: PathLike, remote: str, ref: str):
"""Run ostree-pull local to copy commits around"""
extra_args = []
if remote:
extra_args.append(f'--remote={remote}')
cli("pull-local", source_repo, ref,
*extra_args,
repo=target_repo)
def cli(*args, _input=None, **kwargs):
"""Thin wrapper for running the ostree CLI"""
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
return subprocess.run(["ostree"] + args,
encoding="utf8",
stdout=subprocess.PIPE,
input=_input,
check=True)
def parse_input_commits(commits):
"""Parse ostree input commits and return the repo path and refs specified"""
data = commits["data"]
refs = data["refs"]
assert refs, "Need at least one commit"
return commits["path"], data["refs"]
def parse_deployment_option(root: PathLike, deployment: Dict) -> Tuple[str, str, str]:
"""Parse the deployment option and return the osname, ref, and serial
The `deployment` arg contains the following sub fields:
- osname: Name of the stateroot used in the deployment (ie. fedora-coreos)
- ref: OStree ref to used for the deployment (ie. fedora/aarch64/coreos/next)
- serial: The deployment serial (ie. 0)
- default: Boolean to determine whether the default ostree deployment should be used
"""
default_deployment = deployment.get("default")
if default_deployment:
filenames = glob.glob(os.path.join(root, 'ostree/deploy/*/deploy/*.0'))
if len(filenames) < 1:
raise ValueError("Could not find deployment")
if len(filenames) > 1:
raise ValueError(f"More than one deployment found: {filenames}")
# We pick up the osname, commit, and serial from the filesystem
# here. We'll return the detected commit as the ref in this
# since it's a valid substitute for all subsequent uses in
# the code base.
f = re.search("/ostree/deploy/(.*)/deploy/(.*)\\.([0-9])", filenames[0])
if not f:
raise ValueError("cannot find ostree deployment in {filenames[0]}")
osname = f.group(1)
commit = f.group(2)
serial = f.group(3)
return osname, commit, serial
osname = deployment["osname"]
ref = deployment["ref"]
serial = deployment.get("serial", 0)
return osname, ref, serial
def deployment_path(root: PathLike, osname: str = "", ref: str = "", serial: int = 0):
"""Return the path to a deployment given the parameters"""
base = os.path.join(root, "ostree")
repo = os.path.join(base, "repo")
stateroot = os.path.join(base, "deploy", osname)
commit = rev_parse(repo, ref)
sysroot = f"{stateroot}/deploy/{commit}.{serial}"
return sysroot
def parse_origin(origin: PathLike):
"""Parse the origin file and return the deployment type and imgref
Example container case: container-image-reference=ostree-remote-image:fedora:docker://quay.io/fedora/fedora-coreos:stable
Example ostree commit case: refspec=fedora:fedora/x86_64/coreos/stable
"""
deploy_type = ""
imgref = ""
with open(origin, "r", encoding="utf8") as f:
for line in f:
separated_line = line.split("=")
if separated_line[0] == "container-image-reference":
deploy_type = "container"
imgref = separated_line[1].rstrip()
break
if separated_line[0] == "refspec":
deploy_type = "ostree_commit"
imgref = separated_line[1].rstrip()
break
if deploy_type == "":
raise ValueError("Could not find 'container-image-reference' or 'refspec' in origin file")
if imgref == "":
raise ValueError("Could not find imgref in origin file")
return deploy_type, imgref
class PasswdLike:
"""Representation of a file with structure like /etc/passwd
If each line in a file contains a key-value pair separated by the
first colon on the line, it can be considered "passwd"-like. This
class can parse the the list, manipulate it, and export it to file
again.
"""
def __init__(self):
"""Initialize an empty PasswdLike object"""
self.db = {}
@classmethod
def from_file(cls, path: PathLike, allow_missing_file: bool = False):
"""Initialize a PasswdLike object from an existing file"""
ret = cls()
if allow_missing_file:
if not os.path.isfile(path):
return ret
with open(path, "r", encoding="utf8") as p:
ret.db = cls._passwd_lines_to_dict(p.readlines())
return ret
def merge_with_file(self, path: PathLike, allow_missing_file: bool = False):
"""Extend the database with entries from another file"""
if allow_missing_file:
if not os.path.isfile(path):
return
with open(path, "r", encoding="utf8") as p:
additional_passwd_dict = self._passwd_lines_to_dict(p.readlines())
for name, passwd_line in additional_passwd_dict.items():
if name not in self.db:
self.db[name] = passwd_line
def dump_to_file(self, path: PathLike):
"""Write the current database to a file"""
with open(path, "w", encoding="utf8") as p:
p.writelines(list(self.db.values()))
@staticmethod
def _passwd_lines_to_dict(lines):
"""Take a list of passwd lines and produce a "name": "line" dictionary"""
return {line.split(':')[0]: line for line in lines}
class SubIdsDB:
"""Represention of subordinate Ids database
Class to represent a mapping of a user name to subordinate ids,
like `/etc/subgid` and `/etc/subuid`.
"""
def __init__(self) -> None:
self.db: 'collections.OrderedDict[str, Any]' = collections.OrderedDict()
def read(self, fp) -> int:
idx = 0
for idx, line in enumerate(fp.readlines()):
line = line.strip()
if not line or line.startswith("#"):
continue
comps = line.split(":")
if len(comps) != 3:
print(f"WARNING: invalid line `{line}`", file=sys.stderr)
continue
name, uid, count = comps
self.db[name] = (uid, count)
return idx
def dumps(self) -> str:
"""Dump the database to a string"""
data = "\n".join([
f"{name}:{uid}:{count}\n"
for name, (uid, count) in self.db.items()
])
return data
def read_from(self, path: PathLike) -> int:
"""Read a file and add the entries to the database"""
with open(path, "r", encoding="utf8") as f:
return self.read(f)
def write_to(self, path: PathLike) -> None:
"""Write the database to a file"""
data = self.dumps()
with open(path, "w", encoding="utf8") as f:
f.write(data)
def __bool__(self) -> bool:
return bool(self.db)