debian-forge/osbuild/util/ostree.py
Martin Sehnoutka 8b0ea15817 stages: add org.osbuild.ostree.passwd
This stage takes /usr/lib/passwd and /usr/etc/passwd from an OSTree
checkout, merges them into one file, and store it as /etc/passwd in the
buildroot.

It does the same for /etc/group.

The reason for doing this is that there is an issue with unstable UIDs
and GIDs when creating OSTree commits from scratch. When there is a
package that creates a system user or a system group, it can change the
UID and GID of users and groups that are created later.

This is not a problem in traditional deployments because already created
users and groups never change their UIDs and GIDs, but with OSTree we
recreate the files from scratch and then replace the previous one so it
can actually change.

By copying the files to the build root before doing any other
operations, we can make sure that the UIDs and GIDs of already existing
users and groups won't change.

Co-author: Christian Kellner <christian@kellner.me>
2021-08-17 13:53:00 +02:00

188 lines
5.3 KiB
Python

import contextlib
import json
import os
import subprocess
import tempfile
import typing
from typing import List
from .types import PathLike
class Param:
"""rpm-ostree Treefile parameter"""
def __init__(self, value_type, mandatory=False):
self.type = value_type
self.mandatory = mandatory
def check(self, value):
origin = getattr(self.type, "__origin__", None)
if origin:
self.typecheck(value, origin)
if origin is list or origin is typing.List:
self.check_list(value, self.type)
else:
raise NotImplementedError(origin)
else:
self.typecheck(value, self.type)
@staticmethod
def check_list(value, tp):
inner = tp.__args__
for x in value:
Param.typecheck(x, inner)
@staticmethod
def typecheck(value, tp):
if isinstance(value, tp):
return
raise ValueError(f"{value} is not of {tp}")
class Treefile:
"""Representation of an rpm-ostree Treefile
The following parameters are currently supported,
presented together with the rpm-ostree compose
phase that they are used in.
- ref: commit
- repos: install
- selinux: install, postprocess, commit
- boot-location: postprocess
- etc-group-members: postprocess
- machineid-compat
NB: 'ref' and 'repos' are mandatory and must be
present, even if they are not used in the given
phase; they therefore have defaults preset.
"""
parameters = {
"ref": Param(str, True),
"repos": Param(List[str], True),
"selinux": Param(bool),
"boot-location": Param(str),
"etc-group-members": Param(List[str]),
"machineid-compat": Param(bool),
"initramfs-args": Param(List[str]),
}
def __init__(self):
self._data = {}
self["ref"] = "osbuild/devel"
self["repos"] = ["osbuild"]
def __getitem__(self, key):
param = self.parameters.get(key)
if not param:
raise ValueError(f"Unknown param: {key}")
return self._data[key]
def __setitem__(self, key, value):
param = self.parameters.get(key)
if not param:
raise ValueError(f"Unknown param: {key}")
param.check(value)
self._data[key] = value
def dumps(self):
return json.dumps(self._data)
def dump(self, fp):
return json.dump(self._data, fp)
@contextlib.contextmanager
def as_tmp_file(self):
name = None
try:
fd, name = tempfile.mkstemp(suffix=".json",
text=True)
with os.fdopen(fd, "w+") as f:
self.dump(f)
yield name
finally:
if name:
os.unlink(name)
def rev_parse(repo: PathLike, ref: str) -> str:
"""Resolve an OSTree reference `ref` in the repository at `repo`"""
repo = os.fspath(repo)
r = subprocess.run(["ostree", "rev-parse", ref, f"--repo={repo}"],
encoding="utf-8",
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=False)
msg = r.stdout.strip()
if r.returncode != 0:
raise RuntimeError(msg)
return msg
def deployment_path(root: PathLike, osname: str, ref: str, serial: int):
"""Return the path to a deployment given the parameters"""
base = os.path.join(root, "ostree")
repo = os.path.join(base, "repo")
stateroot = os.path.join(base, "deploy", osname)
commit = rev_parse(repo, ref)
sysroot = f"{stateroot}/deploy/{commit}.{serial}"
return sysroot
class PasswdLike:
"""Representation of a file with structure like /etc/passwd
If each line in a file contains a key-value pair separated by the
first colon on the line, it can be considered "passwd"-like. This
class can parse the the list, manipulate it, and export it to file
again.
"""
def __init__(self):
"""Initialize an empty PasswdLike object"""
self.db = dict()
@classmethod
def from_file(cls, path: PathLike, allow_missing_file: bool=False):
"""Initialize a PasswdLike object from an existing file"""
ret = cls()
if allow_missing_file:
if not os.path.isfile(path):
return ret
with open(path, "r") as p:
ret.db = cls._passwd_lines_to_dict(p.readlines())
return ret
def merge_with_file(self, path: PathLike, allow_missing_file: bool=False):
"""Extend the database with entries from another file"""
if allow_missing_file:
if not os.path.isfile(path):
return
with open(path, "r") as p:
additional_passwd_dict = self._passwd_lines_to_dict(p.readlines())
for name, passwd_line in additional_passwd_dict.items():
if name not in self.db:
self.db[name] = passwd_line
def dump_to_file(self, path: PathLike):
"""Write the current database to a file"""
with open(path, "w") as p:
p.writelines(list(self.db.values()))
@staticmethod
def _passwd_lines_to_dict(lines):
"""Take a list of passwd lines and produce a "name": "line" dictionary"""
return {line.split(':')[0]: line for line in lines}