This allows you to format strings based on what will get installed. A common example here is extracting the kernel version for the dracut stage.
669 lines
20 KiB
Python
Executable file
669 lines
20 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
"""Manifest-Pre-Processor
|
|
|
|
This manifest-pre-processor takes a path to a manifest, loads it,
|
|
runs various pre-processing options and then produces a resultant manfest, written
|
|
to a specified filename (or stdout if filename is "-").
|
|
|
|
Manifest format version "1" and "2" are supported.
|
|
|
|
Pipeline Import:
|
|
|
|
This tool imports a pipeline from another file and inserts it into a manifest
|
|
at the same position the import instruction is located. Sources from the
|
|
imported manifest are merged with the existing sources.
|
|
|
|
The parameters for this pre-processor for format version "1" look like this:
|
|
|
|
```
|
|
...
|
|
"mpp-import-pipeline": {
|
|
"path": "./manifest.json"
|
|
}
|
|
...
|
|
```
|
|
|
|
The parameters for this pre-processor for format version "2" look like this:
|
|
|
|
```
|
|
...
|
|
"mpp-import-pipeline": {
|
|
"path": "./manifest.json",
|
|
"id:" "build"
|
|
}
|
|
...
|
|
```
|
|
|
|
|
|
Depsolving:
|
|
|
|
This tool adjusts the `org.osbuild.rpm` stage. It consumes the `mpp-depsolve`
|
|
option and produces a package-list and source-entries.
|
|
|
|
It supports version "1" and version "2" of the manifest description format.
|
|
|
|
The parameters for this pre-processor, version "1", look like this:
|
|
|
|
```
|
|
...
|
|
{
|
|
"name": "org.osbuild.rpm",
|
|
...
|
|
"options": {
|
|
...
|
|
"mpp-depsolve": {
|
|
"architecture": "x86_64",
|
|
"module-platform-id": "f32",
|
|
"baseurl": "http://mirrors.kernel.org/fedora/releases/32/Everything/x86_64/os",
|
|
"repos": [
|
|
{
|
|
"id": "default",
|
|
"metalink": "https://mirrors.fedoraproject.org/metalink?repo=fedora-32&arch=$basearch"
|
|
}
|
|
],
|
|
"packages": [
|
|
"@core",
|
|
"dracut-config-generic",
|
|
"grub2-pc",
|
|
"kernel"
|
|
],
|
|
"excludes": [
|
|
(optional excludes)
|
|
]
|
|
}
|
|
}
|
|
}
|
|
...
|
|
```
|
|
|
|
The parameters for this pre-processor, version "2", look like this:
|
|
|
|
```
|
|
...
|
|
{
|
|
"name": "org.osbuild.rpm",
|
|
...
|
|
"inputs": {
|
|
packages: {
|
|
"mpp-depsolve": {
|
|
see above
|
|
}
|
|
}
|
|
}
|
|
}
|
|
...
|
|
```
|
|
|
|
String expansion:
|
|
|
|
You can expand string with using python f-string formatting and variables. The variables
|
|
can be set in the mpp-vars toplevel dict (which is removed from the final results) or
|
|
overridden by the --var commandline option.
|
|
|
|
Example:
|
|
|
|
|
|
```
|
|
{
|
|
"mpp-vars": {
|
|
"variable": "some string",
|
|
"rootfs_size": 20480
|
|
},
|
|
...
|
|
{
|
|
"foo": "a value",
|
|
"bar": { "mpp-format-string": "This expands {variable} but can also eval like {variable.upper()}" }
|
|
"disk_size": { "mpp-format-int": "{rootfs_size * 512}" }
|
|
}
|
|
...
|
|
```
|
|
|
|
"""
|
|
|
|
|
|
import argparse
|
|
import contextlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import pathlib
|
|
import tempfile
|
|
from typing import Dict
|
|
import urllib.parse
|
|
import collections
|
|
import dnf
|
|
import hawkey
|
|
|
|
from osbuild.util.rhsm import Subscriptions
|
|
|
|
|
|
def element_enter(element, key, default):
|
|
if key not in element:
|
|
element[key] = default.copy()
|
|
return element[key]
|
|
|
|
|
|
class DepSolver:
|
|
def __init__(self, cachedir, persistdir):
|
|
self.cachedir = cachedir
|
|
self.persistdir = persistdir
|
|
self.basedir = None
|
|
|
|
self.subscriptions = None
|
|
self.secrets = {}
|
|
|
|
self.base = dnf.Base()
|
|
|
|
def reset(self, basedir):
|
|
base = self.base
|
|
base.reset(goal=True, repos=True, sack=True)
|
|
self.secrets.clear()
|
|
|
|
if self.cachedir:
|
|
base.conf.cachedir = self.cachedir
|
|
base.conf.config_file_path = "/dev/null"
|
|
base.conf.persistdir = self.persistdir
|
|
|
|
self.base = base
|
|
self.basedir = basedir
|
|
|
|
def setup(self, arch, module_platform_id, ignore_weak_deps):
|
|
base = self.base
|
|
|
|
base.conf.module_platform_id = module_platform_id
|
|
base.conf.substitutions['arch'] = arch
|
|
base.conf.substitutions['basearch'] = dnf.rpm.basearch(arch)
|
|
base.conf.install_weak_deps = not ignore_weak_deps
|
|
|
|
def expand_baseurl(self, baseurl):
|
|
"""Expand non-uris as paths relative to basedir into a file:/// uri"""
|
|
basedir = self.basedir
|
|
try:
|
|
result = urllib.parse.urlparse(baseurl)
|
|
if not result.scheme:
|
|
path = basedir.joinpath(baseurl)
|
|
return path.as_uri()
|
|
except: # pylint: disable=bare-except
|
|
pass
|
|
return baseurl
|
|
|
|
def get_secrets(self, url, desc):
|
|
if not desc:
|
|
return None
|
|
|
|
name = desc.get("name")
|
|
if name != "org.osbuild.rhsm":
|
|
raise ValueError(f"Unknown secret type: {name}")
|
|
|
|
try:
|
|
# rhsm secrets only need to be retrieved once and can then be reused
|
|
if not self.subscriptions:
|
|
self.subscriptions = Subscriptions.from_host_system()
|
|
secrets = self.subscriptions.get_secrets(url)
|
|
except RuntimeError as e:
|
|
raise ValueError(f"Error getting secrets: {e.args[0]}") from None
|
|
|
|
secrets["type"] = "org.osbuild.rhsm"
|
|
|
|
return secrets
|
|
|
|
def add_repo(self, desc, baseurl):
|
|
repo = dnf.repo.Repo(desc["id"], self.base.conf)
|
|
url = None
|
|
url_keys = ["baseurl", "metalink", "mirrorlist"]
|
|
skip_keys = ["id", "secrets"]
|
|
supported = ["baseurl", "metalink", "mirrorlist",
|
|
"enabled", "metadata_expire", "gpgcheck", "username", "password", "priority",
|
|
"sslverify", "sslcacert", "sslclientkey", "sslclientcert"]
|
|
|
|
for key in desc.keys():
|
|
if key in skip_keys:
|
|
continue # We handled this already
|
|
|
|
if key in url_keys:
|
|
url = desc[key]
|
|
if key in supported:
|
|
value = desc[key]
|
|
if key == "baseurl":
|
|
value = self.expand_baseurl(value)
|
|
setattr(repo, key, value)
|
|
else:
|
|
raise ValueError(f"Unknown repo config option {key}")
|
|
|
|
if not url:
|
|
url = self.expand_baseurl(baseurl)
|
|
|
|
if not url:
|
|
raise ValueError("repo description does not contain baseurl, metalink, or mirrorlist keys")
|
|
|
|
secrets = self.get_secrets(url, desc.get("secrets"))
|
|
|
|
if secrets:
|
|
if "ssl_ca_cert" in secrets:
|
|
repo.sslcacert = secrets["ssl_ca_cert"]
|
|
if "ssl_client_key" in secrets:
|
|
repo.sslclientkey = secrets["ssl_client_key"]
|
|
if "ssl_client_cert" in secrets:
|
|
repo.sslclientcert = secrets["ssl_client_cert"]
|
|
self.secrets[repo.id] = secrets["type"]
|
|
|
|
self.base.repos.add(repo)
|
|
|
|
return repo
|
|
|
|
def resolve(self, packages, excludes):
|
|
base = self.base
|
|
|
|
base.reset(goal=True, sack=True)
|
|
base.fill_sack(load_system_repo=False)
|
|
|
|
base.install_specs(packages, exclude=excludes)
|
|
base.resolve()
|
|
|
|
deps = []
|
|
|
|
for tsi in base.transaction:
|
|
if tsi.action not in dnf.transaction.FORWARD_ACTIONS:
|
|
continue
|
|
|
|
checksum_type = hawkey.chksum_name(tsi.pkg.chksum[0])
|
|
checksum_hex = tsi.pkg.chksum[1].hex()
|
|
|
|
path = tsi.pkg.relativepath
|
|
reponame = tsi.pkg.reponame
|
|
baseurl = self.base.repos[reponame].baseurl[0] # self.expand_baseurl(baseurls[reponame])
|
|
# dep["path"] often starts with a "/", even though it's meant to be
|
|
# relative to `baseurl`. Strip any leading slashes, but ensure there's
|
|
# exactly one between `baseurl` and the path.
|
|
url = urllib.parse.urljoin(baseurl + "/", path.lstrip("/"))
|
|
secret = self.secrets.get(reponame)
|
|
|
|
pkg = {
|
|
"checksum": f"{checksum_type}:{checksum_hex}",
|
|
"name": tsi.pkg.name,
|
|
"url": url,
|
|
}
|
|
|
|
if secret:
|
|
pkg["secrets"] = secret
|
|
deps.append(pkg)
|
|
|
|
return deps
|
|
|
|
|
|
class ManifestFile:
|
|
@staticmethod
|
|
def load(path):
|
|
with open(path) as f:
|
|
return ManifestFile.load_from_fd(f, path)
|
|
|
|
@staticmethod
|
|
def load_from_fd(f, path):
|
|
# We use OrderedDict to preserve key order (for python < 3.6)
|
|
data = json.load(f, object_pairs_hook=collections.OrderedDict)
|
|
|
|
version = int(data.get("version", "1"))
|
|
if version == 1:
|
|
return ManifestFileV1(path, data)
|
|
if version == 2:
|
|
return ManifestFileV2(path, data)
|
|
raise ValueError(f"Unknown manfest version {version}")
|
|
|
|
def __init__(self, path, root, version):
|
|
self.path = pathlib.Path(path)
|
|
self.basedir = self.path.parent
|
|
self.root = root
|
|
self.version = version
|
|
self.sources = element_enter(self.root, "sources", {})
|
|
self.source_urls = {}
|
|
|
|
self.vars = {}
|
|
if "mpp-vars" in root:
|
|
self.vars = root["mpp-vars"]
|
|
del root["mpp-vars"]
|
|
|
|
def set_vars(self, args):
|
|
for arg in args:
|
|
if "=" in arg:
|
|
key, value_s = arg.split("=", 1)
|
|
value = json.loads(value_s)
|
|
else:
|
|
key = arg
|
|
value = True
|
|
self.vars[key] = value
|
|
|
|
def load_import(self, path, search_dirs):
|
|
m = self.find_and_load_manifest(path, search_dirs)
|
|
if m.version != self.version:
|
|
raise ValueError(f"Incompatible manifest version {m.version}")
|
|
return m
|
|
|
|
def find_and_load_manifest(self, path, dirs):
|
|
for p in [self.basedir] + dirs:
|
|
with contextlib.suppress(FileNotFoundError):
|
|
fullpath = os.path.join(p, path)
|
|
with open(fullpath, "r") as f:
|
|
return ManifestFile.load_from_fd(f, path)
|
|
|
|
raise FileNotFoundError(f"Could not find manifest '{path}'")
|
|
|
|
def depsolve(self, solver: DepSolver, desc: Dict):
|
|
repos = desc.get("repos", [])
|
|
packages = desc.get("packages", [])
|
|
excludes = desc.get("excludes", [])
|
|
baseurl = desc.get("baseurl")
|
|
|
|
if not packages:
|
|
return []
|
|
|
|
solver.reset(self.basedir)
|
|
|
|
for repo in repos:
|
|
solver.add_repo(repo, baseurl)
|
|
|
|
return solver.resolve(packages, excludes)
|
|
|
|
def add_packages(self, deps, pipeline_name):
|
|
checksums = []
|
|
|
|
pkginfos = {}
|
|
|
|
for dep in deps:
|
|
name, checksum, url = dep["name"], dep["checksum"], dep["url"]
|
|
|
|
pkginfos[name] = urllib.parse.urlparse(url).path.rsplit("/", 1)[1]
|
|
|
|
secretes = dep.get("secrets")
|
|
if secretes:
|
|
data = {
|
|
"url": url,
|
|
"secrets": secretes
|
|
}
|
|
else:
|
|
data = url
|
|
|
|
self.source_urls[checksum] = data
|
|
checksums.append(checksum)
|
|
|
|
if "rpms" not in self.vars:
|
|
self.vars["rpms"] = {}
|
|
self.vars["rpms"][pipeline_name] = pkginfos
|
|
|
|
return checksums
|
|
|
|
def sort_urls(self):
|
|
def get_sort_key(item):
|
|
key = item[1]
|
|
if isinstance(key, dict):
|
|
key = key["url"]
|
|
return key
|
|
|
|
urls = self.source_urls
|
|
if not urls:
|
|
return urls
|
|
|
|
urls_sorted = sorted(urls.items(), key=get_sort_key)
|
|
urls.clear()
|
|
urls.update(collections.OrderedDict(urls_sorted))
|
|
|
|
def write(self, file, sort_keys=False):
|
|
self.sort_urls()
|
|
json.dump(self.root, file, indent=2, sort_keys=sort_keys)
|
|
file.write("\n")
|
|
|
|
def _process_format(self, node):
|
|
def _is_format(node):
|
|
if not isinstance(node, dict):
|
|
return False
|
|
return "mpp-format-string" in node or "mpp-format-int" in node
|
|
def _eval_format(node, local_vars):
|
|
if "mpp-format-string" in node:
|
|
is_int = False
|
|
format_string = node["mpp-format-string"]
|
|
else:
|
|
is_int = True
|
|
format_string = node["mpp-format-int"]
|
|
res = eval(f'f\'\'\'{format_string}\'\'\'', local_vars) # pylint: disable=eval-used # yolo this is fine!
|
|
|
|
if is_int:
|
|
return int(res)
|
|
return res
|
|
|
|
if isinstance(node, dict):
|
|
for key in list(node.keys()):
|
|
value = node[key]
|
|
if _is_format(value):
|
|
node[key] = _eval_format(value, self.vars)
|
|
else:
|
|
self._process_format(value)
|
|
if isinstance(node, list):
|
|
for i, value in enumerate(node):
|
|
if _is_format(value):
|
|
node[i] = _eval_format(value, self.vars)
|
|
else:
|
|
self._process_format(value)
|
|
|
|
def process_format(self):
|
|
self._process_format(self.root)
|
|
|
|
class ManifestFileV1(ManifestFile):
|
|
def __init__(self, path, data):
|
|
super().__init__(path, data, 1)
|
|
self.pipeline = element_enter(self.root, "pipeline", {})
|
|
|
|
files = element_enter(self.sources, "org.osbuild.files", {})
|
|
self.source_urls = element_enter(files, "urls", {})
|
|
|
|
def _process_import(self, build, search_dirs):
|
|
mpp = build.get("mpp-import-pipeline")
|
|
if not mpp:
|
|
return
|
|
|
|
path = mpp["path"]
|
|
imp = self.load_import(path, search_dirs)
|
|
|
|
self.vars.update(imp.vars)
|
|
|
|
# We only support importing manifests with URL sources. Other sources are
|
|
# not supported, yet. This can be extended in the future, but we should
|
|
# maybe rather try to make sources generic (and repeatable?), so we can
|
|
# deal with any future sources here as well.
|
|
assert list(imp.sources.keys()) == ["org.osbuild.files"]
|
|
|
|
# We import `sources` from the manifest, as well as a pipeline description
|
|
# from the `pipeline` entry. Make sure nothing else is in the manifest, so
|
|
# we do not accidentally miss new features.
|
|
assert list(imp.root.keys()).sort() == ["pipeline", "sources"].sort()
|
|
|
|
# Now with everything imported and verified, we can merge the pipeline back
|
|
# into the original manifest. We take all URLs and merge them in the pinned
|
|
# url-array, and then we take the pipeline and simply override any original
|
|
# pipeline at the position where the import was declared.
|
|
|
|
self.source_urls.update(imp.source_urls)
|
|
|
|
build["pipeline"] = imp.pipeline
|
|
del(build["mpp-import-pipeline"])
|
|
|
|
def process_imports(self, search_dirs):
|
|
current = self.root
|
|
while current:
|
|
self._process_import(current, search_dirs)
|
|
current = current.get("pipeline", {}).get("build")
|
|
|
|
def _process_depsolve(self, solver, stage, pipeline_name):
|
|
if stage.get("name", "") != "org.osbuild.rpm":
|
|
return
|
|
options = stage.get("options")
|
|
if not options:
|
|
return
|
|
mpp = options.get("mpp-depsolve")
|
|
if not mpp:
|
|
return
|
|
|
|
del(options["mpp-depsolve"])
|
|
|
|
packages = element_enter(options, "packages", [])
|
|
|
|
deps = self.depsolve(solver, mpp)
|
|
checksums = self.add_packages(deps, pipeline_name)
|
|
|
|
packages += checksums
|
|
|
|
def process_depsolves(self, solver, pipeline=None, depth = 0):
|
|
if pipeline is None:
|
|
pipeline = self.pipeline
|
|
|
|
if depth == 0:
|
|
pipeline_name = "stages"
|
|
elif depth == 1:
|
|
pipeline_name = "build"
|
|
else:
|
|
pipeline_name = "build" + str(depth)
|
|
|
|
stages = element_enter(pipeline, "stages", [])
|
|
for stage in stages:
|
|
self._process_depsolve(solver, stage, pipeline_name)
|
|
build = pipeline.get("build")
|
|
if build:
|
|
if "pipeline" in build:
|
|
self.process_depsolves(solver, build["pipeline"], depth+1)
|
|
|
|
|
|
class ManifestFileV2(ManifestFile):
|
|
def __init__(self, path, data):
|
|
super().__init__(path, data, 2)
|
|
self.pipelines = element_enter(self.root, "pipelines", {})
|
|
|
|
files = element_enter(self.sources, "org.osbuild.curl", {})
|
|
self.source_urls = element_enter(files, "items", {})
|
|
|
|
def get_pipeline_by_name(self, name):
|
|
for pipeline in self.pipelines:
|
|
if pipeline["name"] == name:
|
|
return pipeline
|
|
|
|
raise ValueError(f"Pipeline '{name}' not found in {self.path}")
|
|
|
|
def _process_import(self, pipeline, search_dirs):
|
|
mpp = pipeline.get("mpp-import-pipeline")
|
|
if not mpp:
|
|
return
|
|
|
|
path = mpp["path"]
|
|
imp = self.load_import(path, search_dirs)
|
|
|
|
self.vars.update(imp.vars)
|
|
|
|
for source, desc in imp.sources.items():
|
|
target = self.sources.get(source)
|
|
if not target:
|
|
# new source, just copy everything
|
|
self.sources[source] = desc
|
|
continue
|
|
|
|
if desc.get("options"):
|
|
options = element_enter(target, "options", {})
|
|
options.update(desc["options"])
|
|
|
|
items = element_enter(target, "items", {})
|
|
items.update(desc.get("items", {}))
|
|
|
|
del(pipeline["mpp-import-pipeline"])
|
|
target = imp.get_pipeline_by_name(mpp["id"])
|
|
pipeline.update(target)
|
|
|
|
def process_imports(self, search_dirs):
|
|
for pipeline in self.pipelines:
|
|
self._process_import(pipeline, search_dirs)
|
|
|
|
def _process_depsolve(self, solver, stage, pipeline_name):
|
|
if stage.get("type", "") != "org.osbuild.rpm":
|
|
return
|
|
inputs = element_enter(stage, "inputs", {})
|
|
packages = element_enter(inputs, "packages", {})
|
|
mpp = packages.get("mpp-depsolve")
|
|
if not mpp:
|
|
return
|
|
|
|
del(packages["mpp-depsolve"])
|
|
|
|
refs = element_enter(packages, "references", {})
|
|
|
|
deps = self.depsolve(solver, mpp)
|
|
checksums = self.add_packages(deps, pipeline_name)
|
|
|
|
for checksum in checksums:
|
|
refs[checksum] = {}
|
|
|
|
def process_depsolves(self, solver):
|
|
for pipeline in self.pipelines:
|
|
name = pipeline.get("name", "")
|
|
stages = element_enter(pipeline, "stages", [])
|
|
for stage in stages:
|
|
self._process_depsolve(solver, stage, name)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Manifest pre processor")
|
|
parser.add_argument(
|
|
"--dnf-cache",
|
|
metavar="PATH",
|
|
type=os.path.abspath,
|
|
default=None,
|
|
help="Path to DNF cache-directory to use",
|
|
)
|
|
parser.add_argument(
|
|
"-I,--import-dir",
|
|
dest="searchdirs",
|
|
default=[],
|
|
action="append",
|
|
help="Search for import in that directory",
|
|
)
|
|
parser.add_argument(
|
|
"--sort-keys",
|
|
dest="sort_keys",
|
|
action='store_true',
|
|
help="Sort keys in generated json",
|
|
)
|
|
parser.add_argument(
|
|
"-D,--define",
|
|
dest="vars",
|
|
action='append',
|
|
help="Set/Override variable, format is key=Json"
|
|
)
|
|
parser.add_argument(
|
|
"src",
|
|
metavar="SRCPATH",
|
|
help="Input manifest",
|
|
)
|
|
parser.add_argument(
|
|
"dst",
|
|
metavar="DESTPATH",
|
|
help="Output manifest",
|
|
)
|
|
|
|
args = parser.parse_args(sys.argv[1:])
|
|
|
|
m = ManifestFile.load(args.src)
|
|
|
|
# First resolve all imports
|
|
m.process_imports(args.searchdirs)
|
|
|
|
# Override variables from the main of imported files
|
|
if args.vars:
|
|
m.set_vars(args.vars)
|
|
|
|
with tempfile.TemporaryDirectory() as persistdir:
|
|
solver = DepSolver(args.dnf_cache, persistdir)
|
|
m.process_depsolves(solver)
|
|
|
|
m.process_format()
|
|
|
|
with sys.stdout if args.dst == "-" else open(args.dst, "w") as f:
|
|
m.write(f, args.sort_keys)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|