412 lines
13 KiB
Python
412 lines
13 KiB
Python
import collections
|
|
import contextlib
|
|
import glob
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import typing
|
|
|
|
# pylint doesn't understand the string-annotation below
|
|
from typing import Any, Dict, List, Tuple # pylint: disable=unused-import
|
|
|
|
from osbuild.util.rhsm import Subscriptions
|
|
|
|
from .types import PathLike
|
|
|
|
|
|
class Param:
|
|
"""rpm-ostree Treefile parameter"""
|
|
|
|
def __init__(self, value_type, mandatory=False):
|
|
self.type = value_type
|
|
self.mandatory = mandatory
|
|
|
|
def check(self, value):
|
|
origin = getattr(self.type, "__origin__", None)
|
|
if origin:
|
|
self.typecheck(value, origin)
|
|
if origin is list or origin is typing.List:
|
|
self.check_list(value, self.type)
|
|
else:
|
|
raise NotImplementedError(origin)
|
|
else:
|
|
self.typecheck(value, self.type)
|
|
|
|
@staticmethod
|
|
def check_list(value, tp):
|
|
inner = tp.__args__
|
|
for x in value:
|
|
Param.typecheck(x, inner)
|
|
|
|
@staticmethod
|
|
def typecheck(value, tp):
|
|
if isinstance(value, tp):
|
|
return
|
|
raise ValueError(f"{value} is not of {tp}")
|
|
|
|
|
|
class Treefile:
|
|
"""Representation of an rpm-ostree Treefile
|
|
|
|
The following parameters are currently supported,
|
|
presented together with the rpm-ostree compose
|
|
phase that they are used in.
|
|
- ref: commit
|
|
- repos: install
|
|
- selinux: install, postprocess, commit
|
|
- boot-location: postprocess
|
|
- etc-group-members: postprocess
|
|
- machineid-compat
|
|
- selinux-label-version: commit
|
|
|
|
NB: 'ref' and 'repos' are mandatory and must be
|
|
present, even if they are not used in the given
|
|
phase; they therefore have defaults preset.
|
|
"""
|
|
|
|
parameters = {
|
|
"ref": Param(str, True),
|
|
"repos": Param(List[str], True),
|
|
"selinux": Param(bool),
|
|
"boot-location": Param(str),
|
|
"etc-group-members": Param(List[str]),
|
|
"machineid-compat": Param(bool),
|
|
"initramfs-args": Param(List[str]),
|
|
"selinux-label-version": Param(int),
|
|
}
|
|
|
|
def __init__(self):
|
|
self._data = {}
|
|
self["ref"] = "osbuild/devel"
|
|
self["repos"] = ["osbuild"]
|
|
|
|
def __getitem__(self, key):
|
|
param = self.parameters.get(key)
|
|
if not param:
|
|
raise ValueError(f"Unknown param: {key}")
|
|
return self._data[key]
|
|
|
|
def __setitem__(self, key, value):
|
|
param = self.parameters.get(key)
|
|
if not param:
|
|
raise ValueError(f"Unknown param: {key}")
|
|
param.check(value)
|
|
self._data[key] = value
|
|
|
|
def dumps(self):
|
|
return json.dumps(self._data)
|
|
|
|
def dump(self, fp):
|
|
return json.dump(self._data, fp)
|
|
|
|
@contextlib.contextmanager
|
|
def as_tmp_file(self):
|
|
name = None
|
|
try:
|
|
fd, name = tempfile.mkstemp(suffix=".json",
|
|
text=True)
|
|
|
|
with os.fdopen(fd, "w+", encoding="utf8") as f:
|
|
self.dump(f)
|
|
|
|
yield name
|
|
finally:
|
|
if name:
|
|
os.unlink(name)
|
|
|
|
|
|
def setup_remote(repo, name, remote):
|
|
"""Configure an OSTree remote in a given repo"""
|
|
|
|
url = remote["url"]
|
|
gpg = remote.get("gpgkeys", [])
|
|
|
|
remote_add_args = []
|
|
if not gpg:
|
|
remote_add_args = ["--no-gpg-verify"]
|
|
|
|
if "contenturl" in remote:
|
|
remote_add_args.append(f"--contenturl={remote['contenturl']}")
|
|
|
|
if remote.get("secrets", {}).get("name") == "org.osbuild.rhsm.consumer":
|
|
secrets = Subscriptions.get_consumer_secrets()
|
|
remote_add_args.append(f"--set=tls-client-key-path={secrets['consumer_key']}")
|
|
remote_add_args.append(f"--set=tls-client-cert-path={secrets['consumer_cert']}")
|
|
elif remote.get("secrets", {}).get("name") == "org.osbuild.mtls":
|
|
tlsca = os.getenv("OSBUILD_SOURCES_OSTREE_SSL_CA_CERT")
|
|
if tlsca:
|
|
remote_add_args.append(f"--set=tls-ca-path={tlsca}")
|
|
|
|
tlscert = os.getenv("OSBUILD_SOURCES_OSTREE_SSL_CLIENT_CERT")
|
|
if tlscert:
|
|
remote_add_args.append(f"--set=tls-client-cert-path={tlscert}")
|
|
|
|
tlskey = os.getenv("OSBUILD_SOURCES_OSTREE_SSL_CLIENT_KEY")
|
|
if tlskey:
|
|
remote_add_args.append(f"--set=tls-client-key-path={tlskey}")
|
|
|
|
proxy = os.getenv("OSBUILD_SOURCES_OSTREE_PROXY")
|
|
if proxy:
|
|
remote_add_args.append(f"--set=proxy={proxy}")
|
|
|
|
# Insecure mode is meant for development only
|
|
insecure = os.getenv("OSBUILD_SOURCES_OSTREE_INSECURE")
|
|
if insecure and insecure.lower() in ["true", "yes", "1"]:
|
|
remote_add_args.append("--set=tls-permissive=true")
|
|
|
|
cli("remote", "add", name, url,
|
|
*remote_add_args, repo=repo)
|
|
|
|
for key in gpg:
|
|
cli("remote", "gpg-import", "--stdin",
|
|
name, repo=repo, _input=key)
|
|
|
|
|
|
def rev_parse(repo: PathLike, ref: str) -> str:
|
|
"""Resolve an OSTree reference `ref` in the repository at `repo`"""
|
|
|
|
repo = os.fspath(repo)
|
|
|
|
if isinstance(repo, bytes):
|
|
repo = repo.decode("utf8")
|
|
|
|
r = subprocess.run(["ostree", "rev-parse", ref, f"--repo={repo}"],
|
|
encoding="utf8",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
check=False)
|
|
|
|
msg = r.stdout.strip()
|
|
if r.returncode != 0:
|
|
raise RuntimeError(msg)
|
|
|
|
return msg
|
|
|
|
|
|
def show(repo: PathLike, checksum: str) -> str:
|
|
"""Show the metada of an OSTree object pointed by `checksum` in the repository at `repo`"""
|
|
|
|
repo = os.fspath(repo)
|
|
|
|
if isinstance(repo, bytes):
|
|
repo = repo.decode("utf8")
|
|
|
|
r = subprocess.run(["ostree", "show", f"--repo={repo}", checksum],
|
|
encoding="utf8",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
check=False)
|
|
|
|
msg = r.stdout.strip()
|
|
if r.returncode != 0:
|
|
raise RuntimeError(msg)
|
|
|
|
return msg
|
|
|
|
|
|
def pull_local(source_repo: PathLike, target_repo: PathLike, remote: str, ref: str):
|
|
"""Run ostree-pull local to copy commits around"""
|
|
|
|
extra_args = []
|
|
if remote:
|
|
extra_args.append(f'--remote={remote}')
|
|
|
|
cli("pull-local", source_repo, ref,
|
|
*extra_args,
|
|
repo=target_repo)
|
|
|
|
|
|
def cli(*args, _input=None, **kwargs):
|
|
"""Thin wrapper for running the ostree CLI"""
|
|
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
|
|
print("ostree " + " ".join(args), file=sys.stderr)
|
|
return subprocess.run(["ostree"] + args,
|
|
encoding="utf8",
|
|
stdout=subprocess.PIPE,
|
|
input=_input,
|
|
check=True)
|
|
|
|
|
|
def parse_input_commits(commits):
|
|
"""Parse ostree input commits and return the repo path and refs specified"""
|
|
data = commits["data"]
|
|
refs = data["refs"]
|
|
assert refs, "Need at least one commit"
|
|
return commits["path"], data["refs"]
|
|
|
|
|
|
def parse_deployment_option(root: PathLike, deployment: Dict) -> Tuple[str, str, str]:
|
|
"""Parse the deployment option and return the osname, ref, and serial
|
|
|
|
The `deployment` arg contains the following sub fields:
|
|
- osname: Name of the stateroot used in the deployment (ie. fedora-coreos)
|
|
- ref: OStree ref to used for the deployment (ie. fedora/aarch64/coreos/next)
|
|
- serial: The deployment serial (ie. 0)
|
|
- default: Boolean to determine whether the default ostree deployment should be used
|
|
"""
|
|
|
|
default_deployment = deployment.get("default")
|
|
if default_deployment:
|
|
filenames = glob.glob(os.path.join(root, 'ostree/deploy/*/deploy/*.0'))
|
|
if len(filenames) < 1:
|
|
raise ValueError("Could not find deployment")
|
|
if len(filenames) > 1:
|
|
raise ValueError(f"More than one deployment found: {filenames}")
|
|
|
|
# We pick up the osname, commit, and serial from the filesystem
|
|
# here. We'll return the detected commit as the ref in this
|
|
# since it's a valid substitute for all subsequent uses in
|
|
# the code base.
|
|
f = re.search("/ostree/deploy/(.*)/deploy/(.*)\\.([0-9])", filenames[0])
|
|
if not f:
|
|
raise ValueError("cannot find ostree deployment in {filenames[0]}")
|
|
osname = f.group(1)
|
|
commit = f.group(2)
|
|
serial = f.group(3)
|
|
return osname, commit, serial
|
|
|
|
osname = deployment["osname"]
|
|
ref = deployment["ref"]
|
|
serial = deployment.get("serial", 0)
|
|
return osname, ref, serial
|
|
|
|
|
|
def deployment_path(root: PathLike, osname: str = "", ref: str = "", serial: int = 0):
|
|
"""Return the path to a deployment given the parameters"""
|
|
|
|
base = os.path.join(root, "ostree")
|
|
|
|
repo = os.path.join(base, "repo")
|
|
stateroot = os.path.join(base, "deploy", osname)
|
|
|
|
commit = rev_parse(repo, ref)
|
|
sysroot = f"{stateroot}/deploy/{commit}.{serial}"
|
|
|
|
return sysroot
|
|
|
|
|
|
def parse_origin(origin: PathLike):
|
|
"""Parse the origin file and return the deployment type and imgref
|
|
|
|
Example container case: container-image-reference=ostree-remote-image:fedora:docker://quay.io/fedora/fedora-coreos:stable
|
|
Example ostree commit case: refspec=fedora:fedora/x86_64/coreos/stable
|
|
"""
|
|
deploy_type = ""
|
|
imgref = ""
|
|
with open(origin, "r", encoding="utf8") as f:
|
|
for line in f:
|
|
separated_line = line.split("=")
|
|
if separated_line[0] == "container-image-reference":
|
|
deploy_type = "container"
|
|
imgref = separated_line[1].rstrip()
|
|
break
|
|
if separated_line[0] == "refspec":
|
|
deploy_type = "ostree_commit"
|
|
imgref = separated_line[1].rstrip()
|
|
break
|
|
|
|
if deploy_type == "":
|
|
raise ValueError("Could not find 'container-image-reference' or 'refspec' in origin file")
|
|
if imgref == "":
|
|
raise ValueError("Could not find imgref in origin file")
|
|
|
|
return deploy_type, imgref
|
|
|
|
|
|
class PasswdLike:
|
|
"""Representation of a file with structure like /etc/passwd
|
|
|
|
If each line in a file contains a key-value pair separated by the
|
|
first colon on the line, it can be considered "passwd"-like. This
|
|
class can parse the the list, manipulate it, and export it to file
|
|
again.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize an empty PasswdLike object"""
|
|
self.db = {}
|
|
|
|
@classmethod
|
|
def from_file(cls, path: PathLike, allow_missing_file: bool = False):
|
|
"""Initialize a PasswdLike object from an existing file"""
|
|
ret = cls()
|
|
if allow_missing_file:
|
|
if not os.path.isfile(path):
|
|
return ret
|
|
|
|
with open(path, "r", encoding="utf8") as p:
|
|
ret.db = cls._passwd_lines_to_dict(p.readlines())
|
|
return ret
|
|
|
|
def merge_with_file(self, path: PathLike, allow_missing_file: bool = False):
|
|
"""Extend the database with entries from another file"""
|
|
if allow_missing_file:
|
|
if not os.path.isfile(path):
|
|
return
|
|
|
|
with open(path, "r", encoding="utf8") as p:
|
|
additional_passwd_dict = self._passwd_lines_to_dict(p.readlines())
|
|
for name, passwd_line in additional_passwd_dict.items():
|
|
if name not in self.db:
|
|
self.db[name] = passwd_line
|
|
|
|
def dump_to_file(self, path: PathLike):
|
|
"""Write the current database to a file"""
|
|
with open(path, "w", encoding="utf8") as p:
|
|
p.writelines(list(self.db.values()))
|
|
|
|
@staticmethod
|
|
def _passwd_lines_to_dict(lines):
|
|
"""Take a list of passwd lines and produce a "name": "line" dictionary"""
|
|
return {line.split(':')[0]: line for line in lines}
|
|
|
|
|
|
class SubIdsDB:
|
|
"""Represention of subordinate Ids database
|
|
|
|
Class to represent a mapping of a user name to subordinate ids,
|
|
like `/etc/subgid` and `/etc/subuid`.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.db: 'collections.OrderedDict[str, Any]' = collections.OrderedDict()
|
|
|
|
def read(self, fp) -> int:
|
|
idx = 0
|
|
for idx, line in enumerate(fp.readlines()):
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
comps = line.split(":")
|
|
if len(comps) != 3:
|
|
print(f"WARNING: invalid line `{line}`", file=sys.stderr)
|
|
continue
|
|
name, uid, count = comps
|
|
self.db[name] = (uid, count)
|
|
return idx
|
|
|
|
def dumps(self) -> str:
|
|
"""Dump the database to a string"""
|
|
data = "\n".join([
|
|
f"{name}:{uid}:{count}\n"
|
|
for name, (uid, count) in self.db.items()
|
|
])
|
|
|
|
return data
|
|
|
|
def read_from(self, path: PathLike) -> int:
|
|
"""Read a file and add the entries to the database"""
|
|
with open(path, "r", encoding="utf8") as f:
|
|
return self.read(f)
|
|
|
|
def write_to(self, path: PathLike) -> None:
|
|
"""Write the database to a file"""
|
|
data = self.dumps()
|
|
with open(path, "w", encoding="utf8") as f:
|
|
f.write(data)
|
|
|
|
def __bool__(self) -> bool:
|
|
return bool(self.db)
|