Check for the rpm database in $tree/usr/share/rpm and explicitly also in $tree/var/lib/rpm and if the respective location exists pass it as argument to rpm. This should fix the situation where the default database on the host is in a different location than in the tree. Fedora < 36 and RHEL have the location in /var but Fedora starting 36 and rpm-ostree have it in /usr.
2744 lines
85 KiB
Python
Executable file
2744 lines
85 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import argparse
|
|
import configparser
|
|
import contextlib
|
|
import errno
|
|
import functools
|
|
import glob
|
|
import mimetypes
|
|
import operator
|
|
import json
|
|
import os
|
|
import platform
|
|
import re
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import xml.etree.ElementTree
|
|
import yaml
|
|
|
|
from collections import OrderedDict
|
|
from typing import Dict
|
|
|
|
from osbuild import loop
|
|
|
|
|
|
def run_ostree(*args, _input=None, _check=True, **kwargs):
|
|
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
|
|
print("ostree " + " ".join(args), file=sys.stderr)
|
|
res = subprocess.run(["ostree"] + args,
|
|
encoding="utf-8",
|
|
stdout=subprocess.PIPE,
|
|
input=_input,
|
|
check=_check)
|
|
return res
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def loop_create_device(ctl, fd, offset=None, sizelimit=None):
|
|
while True:
|
|
lo = loop.Loop(ctl.get_unbound())
|
|
try:
|
|
lo.set_fd(fd)
|
|
except OSError as e:
|
|
lo.close()
|
|
if e.errno == errno.EBUSY:
|
|
continue
|
|
raise e
|
|
try:
|
|
lo.set_status(offset=offset, sizelimit=sizelimit, autoclear=True)
|
|
except BlockingIOError:
|
|
lo.clear_fd()
|
|
lo.close()
|
|
continue
|
|
break
|
|
try:
|
|
yield lo
|
|
finally:
|
|
lo.close()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def loop_open(ctl, image, *, offset=None, size=None):
|
|
with open(image, "rb") as f:
|
|
fd = f.fileno()
|
|
with loop_create_device(ctl, fd, offset=offset, sizelimit=size) as lo:
|
|
yield os.path.join("/dev", lo.devname)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def open_image(ctl, image, fmt):
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
|
|
if fmt["type"] != "raw":
|
|
target = os.path.join(tmp, "image.raw")
|
|
# A bug exists in qemu that causes the conversion to raw to fail
|
|
# on aarch64 systems with a LOT of CPUs. A workaround is to use
|
|
# a single coroutine to do the conversion. It doesn't slow down
|
|
# the conversion by much, but it hangs about half the time without
|
|
# the limit set. 😢
|
|
# Bug: https://bugs.launchpad.net/qemu/+bug/1805256
|
|
if platform.machine() == 'aarch64':
|
|
subprocess.run(
|
|
["qemu-img", "convert", "-m", "1", "-O", "raw", image, target],
|
|
check=True
|
|
)
|
|
else:
|
|
subprocess.run(
|
|
["qemu-img", "convert", "-O", "raw", image, target],
|
|
check=True
|
|
)
|
|
else:
|
|
target = image
|
|
|
|
size = os.stat(target).st_size
|
|
|
|
with loop_open(ctl, target, offset=0, size=size) as dev:
|
|
yield target, dev
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mount_at(device, mountpoint, options=[], extra=[]):
|
|
opts = ",".join(["ro"] + options)
|
|
subprocess.run(["mount", "-o", opts] + extra + [device, mountpoint], check=True)
|
|
try:
|
|
yield mountpoint
|
|
finally:
|
|
subprocess.run(["umount", "--lazy", mountpoint], check=True)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def mount(device, options=None):
|
|
options = options or []
|
|
opts = ",".join(["ro"] + options)
|
|
with tempfile.TemporaryDirectory() as mountpoint:
|
|
subprocess.run(["mount", "-o", opts, device, mountpoint], check=True)
|
|
try:
|
|
yield mountpoint
|
|
finally:
|
|
subprocess.run(["umount", "--lazy", mountpoint], check=True)
|
|
|
|
|
|
def parse_environment_vars(s):
|
|
r = {}
|
|
for line in s.split("\n"):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line[0] == '#':
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
r[key] = value.strip('"')
|
|
return r
|
|
|
|
|
|
# Parses output of `systemctl list-unit-files`
|
|
def parse_unit_files(s, expected_state):
|
|
r = []
|
|
for line in s.split("\n")[1:]:
|
|
try:
|
|
unit, state, *_ = line.split()
|
|
except ValueError:
|
|
pass
|
|
if state != expected_state:
|
|
continue
|
|
r.append(unit)
|
|
|
|
return r
|
|
|
|
|
|
def subprocess_check_output(argv, parse_fn=None):
|
|
try:
|
|
output = subprocess.check_output(argv, encoding="utf-8")
|
|
except subprocess.CalledProcessError as e:
|
|
sys.stderr.write(f"--- Output from {argv}:\n")
|
|
sys.stderr.write(e.stdout)
|
|
sys.stderr.write("\n--- End of the output\n")
|
|
raise
|
|
|
|
return parse_fn(output) if parse_fn else output
|
|
|
|
|
|
def read_image_format(device):
|
|
"""
|
|
Read image format.
|
|
|
|
Returns: dictionary with at least one key 'type'. 'type' value is a string
|
|
representing the format of the image. In case the type is 'qcow2', the returned
|
|
dictionary contains second key 'compat' with a string value representing
|
|
the compatibility version of the 'qcow2' image.
|
|
|
|
An example return value:
|
|
{
|
|
"compat": "1.1",
|
|
"type": "qcow2"
|
|
}
|
|
"""
|
|
qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads)
|
|
format = qemu["format"]
|
|
result = {"type": format}
|
|
if format == "qcow2":
|
|
result["compat"] = qemu["format-specific"]["data"]["compat"]
|
|
return result
|
|
|
|
|
|
def read_partition(device, partition):
|
|
"""
|
|
Read block device attributes using 'blkid' and extend the passed 'partition'
|
|
dictionary.
|
|
|
|
Returns: the 'partition' dictionary provided as an argument, extended with
|
|
'label', 'uuid' and 'fstype' keys and their values.
|
|
"""
|
|
res = subprocess.run(["blkid", "-c", "/dev/null", "--output", "export",
|
|
device],
|
|
check=False, encoding="utf-8",
|
|
stdout=subprocess.PIPE)
|
|
if res.returncode == 0:
|
|
blkid = parse_environment_vars(res.stdout)
|
|
else:
|
|
blkid = {}
|
|
|
|
partition["label"] = blkid.get("LABEL") # doesn't exist for mbr
|
|
partition["uuid"] = blkid.get("UUID")
|
|
partition["fstype"] = blkid.get("TYPE")
|
|
return partition
|
|
|
|
|
|
def read_partition_table(device):
|
|
"""
|
|
Read information related to found partitions and partitioning table from
|
|
the device.
|
|
|
|
Returns: dictionary with three keys - 'partition-table', 'partition-table-id'
|
|
and 'partitions'.
|
|
'partition-table' value is a string with the type of the partition table or 'None'.
|
|
'partition-table-id' value is a string with the ID of the partition table or 'None'.
|
|
'partitions' value is a list of dictionaries representing found partitions.
|
|
|
|
An example return value:
|
|
{
|
|
"partition-table": "gpt",
|
|
"partition-table-id": "DA237A6F-F0D4-47DF-BB50-007E00DB347C",
|
|
"partitions": [
|
|
{
|
|
"bootable": false,
|
|
"partuuid": "64AF1EC2-0328-406A-8F36-83016E6DD858",
|
|
"size": 1048576,
|
|
"start": 1048576,
|
|
"type": "21686148-6449-6E6F-744E-656564454649",
|
|
},
|
|
{
|
|
"bootable": false,
|
|
"partuuid": "D650D523-06F6-4B90-9204-8F998FE9703C",
|
|
"size": 6442450944,
|
|
"start": 2097152,
|
|
"type": "0FC63DAF-8483-4772-8E79-3D69D8477DE4",
|
|
}
|
|
]
|
|
}
|
|
"""
|
|
partitions = []
|
|
info = {"partition-table": None,
|
|
"partition-table-id": None,
|
|
"partitions": partitions}
|
|
try:
|
|
sfdisk = subprocess_check_output(["sfdisk", "--json", device], json.loads)
|
|
except subprocess.CalledProcessError:
|
|
# This handles a case, when the device does contain a filesystem,
|
|
# but there is no partition table.
|
|
partitions.append(read_partition(device, {}))
|
|
return info
|
|
|
|
ptable = sfdisk["partitiontable"]
|
|
assert ptable["unit"] == "sectors"
|
|
is_dos = ptable["label"] == "dos"
|
|
ssize = ptable.get("sectorsize", 512)
|
|
|
|
for i, p in enumerate(ptable["partitions"]):
|
|
|
|
partuuid = p.get("uuid")
|
|
if not partuuid and is_dos:
|
|
# For dos/mbr partition layouts the partition uuid
|
|
# is generated. Normally this would be done by
|
|
# udev+blkid, when the partition table is scanned.
|
|
# 'sfdisk' prefixes the partition id with '0x' but
|
|
# 'blkid' does not; remove it to mimic 'blkid'
|
|
table_id = ptable['id'][2:]
|
|
partuuid = "%.33s-%02x" % (table_id, i+1)
|
|
|
|
partitions.append({
|
|
"bootable": p.get("bootable", False),
|
|
"type": p["type"],
|
|
"start": p["start"] * ssize,
|
|
"size": p["size"] * ssize,
|
|
"partuuid": partuuid
|
|
})
|
|
|
|
info["partitions"] = sorted(info["partitions"], key=operator.itemgetter("partuuid"))
|
|
info["partition-table"] = ptable["label"]
|
|
info["partition-table-id"] = ptable["id"]
|
|
|
|
return info
|
|
|
|
|
|
def read_bootloader_type(device):
|
|
"""
|
|
Read bootloader type from the provided device.
|
|
|
|
Returns: string representing the found bootloader. Function can return two values:
|
|
- 'grub'
|
|
- 'unknown'
|
|
"""
|
|
with open(device, "rb") as f:
|
|
if b"GRUB" in f.read(512):
|
|
return "grub"
|
|
else:
|
|
return "unknown"
|
|
|
|
|
|
def read_boot_entries(boot_dir):
|
|
"""
|
|
Read boot entries.
|
|
|
|
Returns: list of dictionaries representing configured boot entries.
|
|
|
|
An example return value:
|
|
[
|
|
{
|
|
"grub_arg": "--unrestricted",
|
|
"grub_class": "kernel",
|
|
"grub_users": "$grub_users",
|
|
"id": "rhel-20210429130346-0-rescue-c116920b13f44c59846f90b1057605bc",
|
|
"initrd": "/boot/initramfs-0-rescue-c116920b13f44c59846f90b1057605bc.img",
|
|
"linux": "/boot/vmlinuz-0-rescue-c116920b13f44c59846f90b1057605bc",
|
|
"options": "$kernelopts",
|
|
"title": "Red Hat Enterprise Linux (0-rescue-c116920b13f44c59846f90b1057605bc) 8.4 (Ootpa)",
|
|
"version": "0-rescue-c116920b13f44c59846f90b1057605bc"
|
|
},
|
|
{
|
|
"grub_arg": "--unrestricted",
|
|
"grub_class": "kernel",
|
|
"grub_users": "$grub_users",
|
|
"id": "rhel-20210429130346-4.18.0-305.el8.x86_64",
|
|
"initrd": "/boot/initramfs-4.18.0-305.el8.x86_64.img $tuned_initrd",
|
|
"linux": "/boot/vmlinuz-4.18.0-305.el8.x86_64",
|
|
"options": "$kernelopts $tuned_params",
|
|
"title": "Red Hat Enterprise Linux (4.18.0-305.el8.x86_64) 8.4 (Ootpa)",
|
|
"version": "4.18.0-305.el8.x86_64"
|
|
}
|
|
]
|
|
"""
|
|
entries = []
|
|
for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"):
|
|
with open(conf) as f:
|
|
entries.append(dict(line.strip().split(" ", 1) for line in f))
|
|
|
|
return sorted(entries, key=lambda e: e["title"])
|
|
|
|
|
|
def rpm_verify(tree):
|
|
"""
|
|
Read the output of 'rpm --verify'.
|
|
|
|
Returns: dictionary with two keys 'changed' and 'missing'.
|
|
'changed' value is a dictionary with the keys representing modified files from
|
|
installed RPM packages and values representing types of applied modifications.
|
|
'missing' value is a list of strings prepresenting missing values owned by
|
|
installed RPM packages.
|
|
|
|
An example return value:
|
|
{
|
|
"changed": {
|
|
"/etc/chrony.conf": "S.5....T.",
|
|
"/etc/cloud/cloud.cfg": "S.5....T.",
|
|
"/etc/nsswitch.conf": "....L....",
|
|
"/etc/openldap/ldap.conf": ".......T.",
|
|
"/etc/pam.d/fingerprint-auth": "....L....",
|
|
"/etc/pam.d/password-auth": "....L....",
|
|
"/etc/pam.d/postlogin": "....L....",
|
|
"/etc/pam.d/smartcard-auth": "....L....",
|
|
"/etc/pam.d/system-auth": "....L....",
|
|
"/etc/rhsm/rhsm.conf": "..5....T.",
|
|
"/etc/sudoers": "S.5....T.",
|
|
"/etc/systemd/logind.conf": "S.5....T."
|
|
},
|
|
"missing": [
|
|
"/etc/udev/rules.d/70-persistent-ipoib.rules",
|
|
"/run/cloud-init",
|
|
"/run/rpcbind",
|
|
"/run/setrans",
|
|
"/run/tuned"
|
|
]
|
|
}
|
|
"""
|
|
# cannot use `rpm --root` here, because rpm uses passwd from the host to
|
|
# verify user and group ownership:
|
|
# https://github.com/rpm-software-management/rpm/issues/882
|
|
rpm = subprocess.Popen(["chroot", tree, "rpm", "--verify", "--all"],
|
|
stdout=subprocess.PIPE, encoding="utf-8")
|
|
|
|
changed = {}
|
|
missing = []
|
|
for line in rpm.stdout:
|
|
# format description in rpm(8), under `--verify`
|
|
attrs = line[:9]
|
|
if attrs == "missing ":
|
|
missing.append(line[12:].rstrip())
|
|
else:
|
|
changed[line[13:].rstrip()] = attrs
|
|
|
|
# ignore return value, because it returns non-zero when it found changes
|
|
rpm.wait()
|
|
|
|
return {
|
|
"missing": sorted(missing),
|
|
"changed": changed
|
|
}
|
|
|
|
|
|
def rpm_not_installed_docs(tree, is_ostree):
|
|
"""
|
|
Gathers information on documentation, which is part of RPM packages,
|
|
but was not installed.
|
|
|
|
Returns: list of documentation files, which are normally a part of
|
|
the installed RPM packages, but were not installed (e.g. due to using
|
|
'--excludedocs' option when executing 'rpm' command).
|
|
|
|
An example return value:
|
|
[
|
|
"/usr/share/man/man1/sdiff.1.gz",
|
|
"/usr/share/man/man1/seapplet.1.gz",
|
|
"/usr/share/man/man1/secon.1.gz",
|
|
"/usr/share/man/man1/secret-tool.1.gz",
|
|
"/usr/share/man/man1/sed.1.gz",
|
|
"/usr/share/man/man1/seq.1.gz"
|
|
]
|
|
"""
|
|
# check not installed Docs (e.g. when RPMs are installed with --excludedocs)
|
|
not_installed_docs = []
|
|
cmd = ["rpm", "--root", tree, "-qad", "--state"]
|
|
if os.path.exists(os.path.join(tree, "usr/share/rpm")):
|
|
cmd += ["--dbpath", "/usr/share/rpm"]
|
|
elif os.path.exists(os.path.join(tree, "var/lib/rpm")):
|
|
cmd += ["--dbpath", "/var/lib/rpm"]
|
|
output = subprocess_check_output(cmd)
|
|
for line in output.splitlines():
|
|
if line.startswith("not installed"):
|
|
not_installed_docs.append(line.split()[-1])
|
|
|
|
return sorted(not_installed_docs)
|
|
|
|
|
|
def rpm_packages(tree, is_ostree):
|
|
"""
|
|
Read NVRs of RPM packages installed on the system.
|
|
|
|
Returns: sorted list of strings representing RPM packages installed
|
|
on the system.
|
|
|
|
An example return value:
|
|
[
|
|
"NetworkManager-1.30.0-7.el8.x86_64",
|
|
"PackageKit-glib-1.1.12-6.el8.x86_64",
|
|
"PackageKit-gtk3-module-1.1.12-6.el8.x86_64",
|
|
"abattis-cantarell-fonts-0.0.25-6.el8.noarch",
|
|
"acl-2.2.53-1.el8.x86_64",
|
|
"adobe-mappings-cmap-20171205-3.el8.noarch",
|
|
"adobe-mappings-cmap-deprecated-20171205-3.el8.noarch",
|
|
"adobe-mappings-pdf-20180407-1.el8.noarch",
|
|
"adwaita-cursor-theme-3.28.0-2.el8.noarch",
|
|
"adwaita-icon-theme-3.28.0-2.el8.noarch",
|
|
"alsa-lib-1.2.4-5.el8.x86_64"
|
|
]
|
|
"""
|
|
cmd = ["rpm", "--root", tree, "-qa"]
|
|
if os.path.exists(os.path.join(tree, "usr/share/rpm")):
|
|
cmd += ["--dbpath", "/usr/share/rpm"]
|
|
elif os.path.exists(os.path.join(tree, "var/lib/rpm")):
|
|
cmd += ["--dbpath", "/var/lib/rpm"]
|
|
output = subprocess_check_output(cmd)
|
|
pkgs = subprocess_check_output(cmd, str.split)
|
|
return list(sorted(pkgs))
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def change_root(root):
|
|
real_root = os.open("/", os.O_RDONLY)
|
|
try:
|
|
os.chroot(root)
|
|
yield None
|
|
finally:
|
|
os.fchdir(real_root)
|
|
os.chroot(".")
|
|
os.close(real_root)
|
|
|
|
|
|
def read_services(tree, state):
|
|
"""
|
|
Read the list of systemd services on the system in the given state.
|
|
|
|
Returns: alphabetically sorted list of strings representing systemd services
|
|
in the given state.
|
|
The returned list may be empty.
|
|
|
|
An example return value:
|
|
[
|
|
"arp-ethers.service",
|
|
"canberra-system-bootup.service",
|
|
"canberra-system-shutdown-reboot.service",
|
|
"canberra-system-shutdown.service",
|
|
"chrony-dnssrv@.timer",
|
|
"chrony-wait.service"
|
|
]
|
|
"""
|
|
services_state = subprocess_check_output(["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, state)))
|
|
|
|
# Since systemd v246, some services previously reported as "enabled" /
|
|
# "disabled" are now reported as "alias". There is no systemd command, that
|
|
# would take an "alias" unit and report its state as enabled/disabled
|
|
# and could run on a different tree (with "--root" option).
|
|
# To make the produced list of services in the given state consistent on
|
|
# pre/post v246 systemd versions, check all "alias" units and append them
|
|
# to the list, if their target is also listed in 'services_state'.
|
|
if state != "alias":
|
|
services_alias = subprocess_check_output(["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, "alias")))
|
|
|
|
for alias in services_alias:
|
|
# The service may be in one of the following places (output of
|
|
# "systemd-analyze unit-paths", it should not change too often).
|
|
unit_paths = [
|
|
"/etc/systemd/system.control",
|
|
"/run/systemd/system.control",
|
|
"/run/systemd/transient",
|
|
"/run/systemd/generator.early",
|
|
"/etc/systemd/system",
|
|
"/run/systemd/system",
|
|
"/run/systemd/generator",
|
|
"/usr/local/lib/systemd/system",
|
|
"/usr/lib/systemd/system",
|
|
"/run/systemd/generator.late"
|
|
]
|
|
|
|
with change_root(tree):
|
|
for path in unit_paths:
|
|
unit_path = os.path.join(path, alias)
|
|
if os.path.exists(unit_path):
|
|
real_unit_path = os.path.realpath(unit_path)
|
|
# Skip the alias, if there was a symlink cycle.
|
|
# When symbolic link cycles occur, the returned path will
|
|
# be one member of the cycle, but no guarantee is made about
|
|
# which member that will be.
|
|
if os.path.islink(real_unit_path):
|
|
continue
|
|
|
|
# Append the alias unit to the list, if its target is
|
|
# already there.
|
|
if os.path.basename(real_unit_path) in services_state:
|
|
services_state.append(alias)
|
|
|
|
# deduplicate and sort
|
|
services_state = list(set(services_state))
|
|
services_state.sort()
|
|
|
|
return services_state
|
|
|
|
|
|
def read_default_target(tree):
|
|
"""
|
|
Read the default systemd target.
|
|
|
|
Returns: string representing the default systemd target.
|
|
|
|
An example return value:
|
|
"multi-user.target"
|
|
"""
|
|
return subprocess_check_output(["systemctl", f"--root={tree}", "get-default"]).rstrip()
|
|
|
|
|
|
def read_firewall_default_zone(tree):
|
|
"""
|
|
Read the name of the default firewall zone
|
|
|
|
Returns: a string with the zone name. If the firewall configuration doesn't
|
|
exist, an empty string is returned.
|
|
|
|
An example return value:
|
|
"trusted"
|
|
"""
|
|
try:
|
|
with open(f"{tree}/etc/firewalld/firewalld.conf") as f:
|
|
conf = parse_environment_vars(f.read())
|
|
return conf["DefaultZone"]
|
|
except FileNotFoundError:
|
|
return ""
|
|
|
|
|
|
def read_firewall_zone(tree):
|
|
"""
|
|
Read enabled services from the configuration of the default firewall zone.
|
|
|
|
Returns: list of strings representing enabled services in the firewall.
|
|
The returned list may be empty.
|
|
|
|
An example return value:
|
|
[
|
|
"ssh",
|
|
"dhcpv6-client",
|
|
"cockpit"
|
|
]
|
|
"""
|
|
default = read_firewall_default_zone(tree)
|
|
if default == "":
|
|
default = "public"
|
|
|
|
r = []
|
|
try:
|
|
root = xml.etree.ElementTree.parse(f"{tree}/etc/firewalld/zones/{default}.xml").getroot()
|
|
except FileNotFoundError:
|
|
root = xml.etree.ElementTree.parse(f"{tree}/usr/lib/firewalld/zones/{default}.xml").getroot()
|
|
|
|
for element in root.findall("service"):
|
|
r.append(element.get("name"))
|
|
|
|
return r
|
|
|
|
|
|
def read_fstab(tree):
|
|
"""
|
|
Read the content of /etc/fstab.
|
|
|
|
Returns: list of all uncommented lines read from the configuration file
|
|
represented as a list of values split by whitespaces.
|
|
The returned list may be empty.
|
|
|
|
An example return value:
|
|
[
|
|
[
|
|
"UUID=6d066eb4-e4c1-4472-91f9-d167097f48d1",
|
|
"/",
|
|
"xfs",
|
|
"defaults",
|
|
"0",
|
|
"0"
|
|
]
|
|
]
|
|
"""
|
|
result = []
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/fstab") as f:
|
|
result = sorted([line.split() for line in f if line.strip() and not line.startswith("#")])
|
|
return result
|
|
|
|
|
|
def read_rhsm(tree):
|
|
"""
|
|
Read configuration changes possible via org.osbuild.rhsm stage
|
|
and in addition also the whole content of /etc/rhsm/rhsm.conf.
|
|
|
|
Returns: returns dictionary with two keys - 'dnf-plugins' and 'rhsm.conf'.
|
|
'dnf-plugins' value represents configuration of 'product-id' and
|
|
'subscription-manager' DNF plugins.
|
|
'rhsm.conf' value is a dictionary representing the content of the RHSM
|
|
configuration file.
|
|
The returned dictionary may be empty.
|
|
|
|
An example return value:
|
|
{
|
|
"dnf-plugins": {
|
|
"product-id": {
|
|
"enabled": true
|
|
},
|
|
"subscription-manager": {
|
|
"enabled": true
|
|
}
|
|
},
|
|
"rhsm.conf": {
|
|
"logging": {
|
|
"default_log_level": "INFO"
|
|
},
|
|
"rhsm": {
|
|
"auto_enable_yum_plugins": "1",
|
|
"baseurl": "https://cdn.redhat.com",
|
|
"ca_cert_dir": "/etc/rhsm/ca/",
|
|
"consumercertdir": "/etc/pki/consumer",
|
|
"entitlementcertdir": "/etc/pki/entitlement",
|
|
"full_refresh_on_yum": "0",
|
|
"inotify": "1",
|
|
"manage_repos": "0",
|
|
"package_profile_on_trans": "0",
|
|
"pluginconfdir": "/etc/rhsm/pluginconf.d",
|
|
"plugindir": "/usr/share/rhsm-plugins",
|
|
"productcertdir": "/etc/pki/product",
|
|
"repo_ca_cert": "/etc/rhsm/ca/redhat-uep.pem",
|
|
"repomd_gpg_url": "",
|
|
"report_package_profile": "1"
|
|
},
|
|
"rhsmcertd": {
|
|
"auto_registration": "1",
|
|
"auto_registration_interval": "60",
|
|
"autoattachinterval": "1440",
|
|
"certcheckinterval": "240",
|
|
"disable": "0",
|
|
"splay": "1"
|
|
},
|
|
"server": {
|
|
"hostname": "subscription.rhsm.redhat.com",
|
|
"insecure": "0",
|
|
"no_proxy": "",
|
|
"port": "443",
|
|
"prefix": "/subscription",
|
|
"proxy_hostname": "",
|
|
"proxy_password": "",
|
|
"proxy_port": "",
|
|
"proxy_scheme": "http",
|
|
"proxy_user": "",
|
|
"ssl_verify_depth": "3"
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
# Check RHSM DNF plugins configuration and allowed options
|
|
dnf_plugins_config = {
|
|
"product-id": f"{tree}/etc/dnf/plugins/product-id.conf",
|
|
"subscription-manager": f"{tree}/etc/dnf/plugins/subscription-manager.conf"
|
|
}
|
|
|
|
for plugin_name, plugin_path in dnf_plugins_config.items():
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(plugin_path) as f:
|
|
parser = configparser.ConfigParser()
|
|
parser.read_file(f)
|
|
# only read "enabled" option from "main" section
|
|
with contextlib.suppress(configparser.NoSectionError, configparser.NoOptionError):
|
|
# get the value as the first thing, in case it raises an exception
|
|
enabled = parser.getboolean("main", "enabled")
|
|
|
|
try:
|
|
dnf_plugins_dict = result["dnf-plugins"]
|
|
except KeyError as _:
|
|
dnf_plugins_dict = result["dnf-plugins"] = {}
|
|
|
|
try:
|
|
plugin_dict = dnf_plugins_dict[plugin_name]
|
|
except KeyError as _:
|
|
plugin_dict = dnf_plugins_dict[plugin_name] = {}
|
|
|
|
plugin_dict["enabled"] = enabled
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
rhsm_conf = {}
|
|
with open(f"{tree}/etc/rhsm/rhsm.conf") as f:
|
|
parser = configparser.ConfigParser()
|
|
parser.read_file(f)
|
|
for section in parser.sections():
|
|
section_dict = {}
|
|
section_dict.update(parser[section])
|
|
if section_dict:
|
|
rhsm_conf[section] = section_dict
|
|
|
|
result["rhsm.conf"] = rhsm_conf
|
|
|
|
return result
|
|
|
|
|
|
def read_sysconfig(tree):
|
|
"""
|
|
Read selected configuration files from /etc/sysconfig.
|
|
|
|
Currently supported sysconfig files are:
|
|
- 'kernel' - /etc/sysconfig/kernel
|
|
- 'network' - /etc/sysconfig/network
|
|
- 'network-scripts' - /etc/sysconfig/network-scripts/ifcfg-*
|
|
|
|
Returns: dictionary with the keys being the supported types of sysconfig
|
|
configurations read by the function. Values of 'kernel' and 'network' keys
|
|
are a dictionaries containing key/values read from the respective
|
|
configuration files. Value of 'network-scripts' key is a dictionary with
|
|
the keys corresponding to the suffix of each 'ifcfg-*' configuration file
|
|
and their values holding dictionaries with all key/values read from the
|
|
configuration file.
|
|
The returned dictionary may be empty.
|
|
|
|
An example return value:
|
|
{
|
|
"kernel": {
|
|
"DEFAULTKERNEL": "kernel",
|
|
"UPDATEDEFAULT": "yes"
|
|
},
|
|
"network": {
|
|
"NETWORKING": "yes",
|
|
"NOZEROCONF": "yes"
|
|
},
|
|
"network-scripts": {
|
|
"ens3": {
|
|
"BOOTPROTO": "dhcp",
|
|
"BROWSER_ONLY": "no",
|
|
"DEFROUTE": "yes",
|
|
"DEVICE": "ens3",
|
|
"IPV4_FAILURE_FATAL": "no",
|
|
"IPV6INIT": "yes",
|
|
"IPV6_AUTOCONF": "yes",
|
|
"IPV6_DEFROUTE": "yes",
|
|
"IPV6_FAILURE_FATAL": "no",
|
|
"NAME": "ens3",
|
|
"ONBOOT": "yes",
|
|
"PROXY_METHOD": "none",
|
|
"TYPE": "Ethernet",
|
|
"UUID": "106f1b31-7093-41d6-ae47-1201710d0447"
|
|
},
|
|
"eth0": {
|
|
"BOOTPROTO": "dhcp",
|
|
"DEVICE": "eth0",
|
|
"IPV6INIT": "no",
|
|
"ONBOOT": "yes",
|
|
"PEERDNS": "yes",
|
|
"TYPE": "Ethernet",
|
|
"USERCTL": "yes"
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
result = {}
|
|
sysconfig_paths = {
|
|
"kernel": f"{tree}/etc/sysconfig/kernel",
|
|
"network": f"{tree}/etc/sysconfig/network"
|
|
}
|
|
# iterate through supported configs
|
|
for name, path in sysconfig_paths.items():
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(path) as f:
|
|
# if file exists start with empty array of values
|
|
result[name] = parse_environment_vars(f.read())
|
|
|
|
# iterate through all files in /etc/sysconfig/network-scripts
|
|
network_scripts = {}
|
|
files = glob.glob(f"{tree}/etc/sysconfig/network-scripts/ifcfg-*")
|
|
for file in files:
|
|
ifname = os.path.basename(file).lstrip("ifcfg-")
|
|
with open(file) as f:
|
|
network_scripts[ifname] = parse_environment_vars(f.read())
|
|
|
|
if network_scripts:
|
|
result["network-scripts"] = network_scripts
|
|
|
|
return result
|
|
|
|
|
|
def read_hosts(tree):
|
|
"""
|
|
Read non-empty lines of /etc/hosts.
|
|
|
|
Returns: list of strings for all uncommented lines in the configuration file.
|
|
The returned list may be empty.
|
|
|
|
An example return value:
|
|
[
|
|
"127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4",
|
|
"::1 localhost localhost.localdomain localhost6 localhost6.localdomain6"
|
|
]
|
|
"""
|
|
result = []
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/hosts") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
result.append(line)
|
|
return result
|
|
|
|
|
|
def read_logind_config(config_path):
|
|
"""
|
|
Read all uncommented key/values from the 'Login" section of system-logind
|
|
configuration file.
|
|
|
|
Returns: dictionary with key/values read from the configuration file.
|
|
The returned dictionary may be empty.
|
|
|
|
An example return value:
|
|
{
|
|
"NAutoVTs": "0"
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
with open(config_path) as f:
|
|
parser = configparser.RawConfigParser()
|
|
# prevent conversion of the option name to lowercase
|
|
parser.optionxform = lambda option: option
|
|
parser.read_file(f)
|
|
with contextlib.suppress(configparser.NoSectionError):
|
|
result.update(parser["Login"])
|
|
return result
|
|
|
|
|
|
def read_logind_configs(tree):
|
|
"""
|
|
Read all systemd-logind *.conf files from a predefined list of paths and
|
|
parse them.
|
|
|
|
The searched paths are:
|
|
- "/etc/systemd/logind.conf"
|
|
- "/etc/systemd/logind.conf.d/*.conf"
|
|
- "/usr/lib/systemd/logind.conf.d/*.conf"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by 'read_logind_config()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/systemd/logind.conf": {
|
|
"NAutoVTs": "0"
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/systemd/logind.conf",
|
|
"/etc/systemd/logind.conf.d/*.conf",
|
|
"/usr/lib/systemd/logind.conf.d/*.conf"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, read_logind_config)
|
|
|
|
|
|
def read_locale(tree):
|
|
"""
|
|
Read all uncommented key/values set in /etc/locale.conf.
|
|
|
|
Returns: dictionary with key/values read from the configuration file.
|
|
The returned dictionary may be empty.
|
|
|
|
An example return value:
|
|
{
|
|
"LANG": "en_US"
|
|
}
|
|
"""
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/locale.conf") as f:
|
|
return parse_environment_vars(f.read())
|
|
|
|
|
|
def read_selinux_info(tree, is_ostree):
|
|
"""
|
|
Read information related to SELinux.
|
|
|
|
Returns: dictionary with two keys - 'policy' and 'context-mismatch'.
|
|
'policy' value corresponds to the value returned by read_selinux_conf().
|
|
'context-mismatch' value corresponds to the value returned by
|
|
read_selinux_ctx_mismatch().
|
|
The returned dictionary may be empty. Keys with empty values are omitted.
|
|
|
|
An example return value:
|
|
{
|
|
"context-mismatch": [
|
|
{
|
|
"actual": "system_u:object_r:root_t:s0",
|
|
"expected": "system_u:object_r:device_t:s0",
|
|
"filename": "/dev"
|
|
},
|
|
{
|
|
"actual": "system_u:object_r:root_t:s0",
|
|
"expected": "system_u:object_r:default_t:s0",
|
|
"filename": "/proc"
|
|
}
|
|
],
|
|
"policy": {
|
|
"SELINUX": "permissive",
|
|
"SELINUXTYPE": "targeted"
|
|
}
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
policy = read_selinux_conf(tree)
|
|
if policy:
|
|
result["policy"] = policy
|
|
|
|
with contextlib.suppress(subprocess.CalledProcessError):
|
|
ctx_mismatch = read_selinux_ctx_mismatch(tree, is_ostree)
|
|
if ctx_mismatch:
|
|
result["context-mismatch"] = ctx_mismatch
|
|
|
|
return result
|
|
|
|
|
|
def read_selinux_conf(tree):
|
|
"""
|
|
Read all uncommented key/values set in /etc/selinux/config.
|
|
|
|
Returns: dictionary with key/values read from the configuration
|
|
file.
|
|
|
|
An example of returned value:
|
|
{
|
|
"SELINUX": "enforcing",
|
|
"SELINUXTYPE": "targeted"
|
|
}
|
|
"""
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/selinux/config") as f:
|
|
return parse_environment_vars(f.read())
|
|
|
|
|
|
def read_selinux_ctx_mismatch(tree, is_ostree):
|
|
"""
|
|
Read any mismatch in selinux context of files on the image.
|
|
|
|
Returns: list of dictionaries as described below. If there
|
|
are no mismatches between used and expected selinux context,
|
|
then an empty list is returned.
|
|
|
|
If the checked 'tree' is ostree, then the path '/etc' is
|
|
excluded from the check. This is beause it is bind-mounted
|
|
from /usr/etc and therefore has incorrect selinux context
|
|
for its filesystem path.
|
|
|
|
An example of returned value:
|
|
[
|
|
{
|
|
"actual": "system_u:object_r:root_t:s0",
|
|
"expected": "system_u:object_r:device_t:s0",
|
|
"filename": "/dev"
|
|
},
|
|
{
|
|
"actual": "system_u:object_r:root_t:s0",
|
|
"expected": "system_u:object_r:default_t:s0",
|
|
"filename": "/proc"
|
|
}
|
|
]
|
|
"""
|
|
result = []
|
|
|
|
# The binary policy that should be used is on the image and has name "policy.X"
|
|
# where the "X" is a number. There may be more than one policy files.
|
|
# In the usual case, the policy with the highest number suffix should be used.
|
|
policy_files = glob.glob(f"{tree}/etc/selinux/targeted/policy/policy.*")
|
|
policy_files = sorted(policy_files, reverse=True)
|
|
|
|
if policy_files:
|
|
CMD = [
|
|
"setfiles",
|
|
"-r", f"{tree}",
|
|
"-nvF",
|
|
"-c", policy_files[0], # take the policy with the highest number
|
|
f"{tree}/etc/selinux/targeted/contexts/files/file_contexts",
|
|
f"{tree}"
|
|
]
|
|
|
|
if is_ostree:
|
|
# exclude /etc from being checked when the tree is ostree, because
|
|
# it is just bind-mounted from /usr/etc and has incorrect selinux
|
|
# context for /etc path
|
|
CMD.extend(["-e", f"{tree}/etc"])
|
|
|
|
output = subprocess_check_output(CMD)
|
|
|
|
# output are lines such as:
|
|
# Would relabel /tmp/tmpwrozmb47/dev from system_u:object_r:root_t:s0 to system_u:object_r:device_t:s0\n
|
|
setfiles_pattern = r"Would\s+relabel\s+(?P<filename>.+)\s+from\s+(?P<actual>.+)\s+to\s+(?P<expected>.+)"
|
|
setfiles_re = re.compile(setfiles_pattern)
|
|
|
|
for line in output.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
match = setfiles_re.match(line)
|
|
# do not silently ignore changes of 'setfiles' output
|
|
if not match:
|
|
raise RuntimeError(f"could not match line '{line}' with pattern '{setfiles_pattern}'")
|
|
parsed_line = {
|
|
"filename": match.group("filename")[len(tree):],
|
|
"actual": match.group("actual"),
|
|
"expected": match.group("expected")
|
|
}
|
|
result.append(parsed_line)
|
|
|
|
# sort the list to make it consistent across runs
|
|
result.sort(key=lambda x: x.get("filename"))
|
|
|
|
return result
|
|
|
|
|
|
def _read_glob_paths_with_parser(tree, glob_paths, parser_func):
|
|
"""
|
|
Use 'parser_func' to read all files obtained by using all 'glob_paths'
|
|
globbing patterns under the 'tree' path.
|
|
|
|
The 'glob_paths' is a list string patterns accepted by glob.glob().
|
|
The 'parser_func' function is expected to take a single string argument
|
|
containing the absolute path to a configuration file which should be parsed.
|
|
Its return value can be arbitrary representation of the parsed
|
|
configuration.
|
|
|
|
Returns: dictionary with the keys corresponding to directories, which
|
|
contain configuration files mathing the provided glob pattern. Value of
|
|
each key is another dictionary with keys representing each filename and
|
|
values being the parsed configuration representation as returned by the
|
|
provided 'parser_func' function.
|
|
|
|
An example return value for dracut configuration paths and parser:
|
|
{
|
|
"/etc/dracut.conf.d": {
|
|
"sgdisk.conf": {
|
|
"install_items": " sgdisk "
|
|
},
|
|
},
|
|
"/usr/lib/dracut/dracut.conf.d": {
|
|
"xen.conf": {
|
|
"add_drivers": " xen-netfront xen-blkfront "
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
for glob_path in glob_paths:
|
|
glob_path_result = {}
|
|
|
|
files = glob.glob(f"{tree}{glob_path}")
|
|
for file in files:
|
|
config = parser_func(file)
|
|
if config:
|
|
filename = os.path.basename(file)
|
|
glob_path_result[filename] = config
|
|
|
|
if glob_path_result:
|
|
checked_path = os.path.dirname(glob_path)
|
|
result[checked_path] = glob_path_result
|
|
|
|
return result
|
|
|
|
|
|
def read_modprobe_config(config_path):
|
|
"""
|
|
Read a specific modprobe configuragion file and for now, extract only
|
|
blacklisted kernel modules.
|
|
|
|
Returns: dictionary with the keys corresponding to specific modprobe
|
|
commands and values being the values of these commands.
|
|
|
|
An example return value:
|
|
{
|
|
"blacklist": [
|
|
"nouveau"
|
|
]
|
|
}
|
|
"""
|
|
file_result = {}
|
|
|
|
BLACKLIST_CMD = "blacklist"
|
|
|
|
with open(config_path) as f:
|
|
# The format of files under modprobe.d: one command per line,
|
|
# with blank lines and lines starting with '#' ignored.
|
|
# A '\' at the end of a line causes it to continue on the next line.
|
|
line_to_be_continued = ""
|
|
for line in f:
|
|
line = line.strip()
|
|
# line is not blank
|
|
if line:
|
|
# comment, skip it
|
|
if line[0] == "#":
|
|
continue
|
|
# this line continues on the following line
|
|
if line[-1] == "\\":
|
|
line_to_be_continued += line[:-1]
|
|
continue
|
|
# this line ends here
|
|
else:
|
|
# is this line continuation of the previous one?
|
|
if line_to_be_continued:
|
|
line = line_to_be_continued + line
|
|
line_to_be_continued = ""
|
|
cmd, cmd_args = line.split(' ', 1)
|
|
# we care only about blacklist command for now
|
|
if cmd == BLACKLIST_CMD:
|
|
modules_list = file_result[BLACKLIST_CMD] = []
|
|
modules_list.append(cmd_args)
|
|
|
|
return file_result
|
|
|
|
|
|
def read_modprobe_configs(tree):
|
|
"""
|
|
Read all modprobe *.conf files from a predefined list of paths and extract
|
|
supported commands. For now, extract only blacklisted kernel modules.
|
|
|
|
The searched paths are:
|
|
- "/etc/modprobe.d/*.conf"
|
|
- "/usr/lib/modprobe.d/*.conf"
|
|
- "/usr/local/lib/modprobe.d/*.conf"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by 'read_modprobe_config()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/usr/lib/modprobe.d": {
|
|
"blacklist-nouveau.conf": {
|
|
"blacklist": [
|
|
"nouveau"
|
|
]
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/modprobe.d/*.conf",
|
|
"/usr/lib/modprobe.d/*.conf",
|
|
"/usr/local/lib/modprobe.d/*.conf"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, read_modprobe_config)
|
|
|
|
|
|
def read_cloud_init_config(config_path):
|
|
"""
|
|
Read the specific cloud-init configuration file.
|
|
|
|
Returns: dictionary representing the cloud-init configuration.
|
|
|
|
An example return value:
|
|
{
|
|
"cloud_config_modules": [
|
|
"mounts",
|
|
"locale",
|
|
"set-passwords",
|
|
"rh_subscription",
|
|
"yum-add-repo",
|
|
"package-update-upgrade-install",
|
|
"timezone",
|
|
"puppet",
|
|
"chef",
|
|
"salt-minion",
|
|
"mcollective",
|
|
"disable-ec2-metadata",
|
|
"runcmd"
|
|
],
|
|
"cloud_final_modules": [
|
|
"rightscale_userdata",
|
|
"scripts-per-once",
|
|
"scripts-per-boot",
|
|
"scripts-per-instance",
|
|
"scripts-user",
|
|
"ssh-authkey-fingerprints",
|
|
"keys-to-console",
|
|
"phone-home",
|
|
"final-message",
|
|
"power-state-change"
|
|
],
|
|
"cloud_init_modules": [
|
|
"disk_setup",
|
|
"migrator",
|
|
"bootcmd",
|
|
"write-files",
|
|
"growpart",
|
|
"resizefs",
|
|
"set_hostname",
|
|
"update_hostname",
|
|
"update_etc_hosts",
|
|
"rsyslog",
|
|
"users-groups",
|
|
"ssh"
|
|
],
|
|
"disable_root": 1,
|
|
"disable_vmware_customization": false,
|
|
"mount_default_fields": [
|
|
null,
|
|
null,
|
|
"auto",
|
|
"defaults,nofail,x-systemd.requires=cloud-init.service",
|
|
"0",
|
|
"2"
|
|
],
|
|
"resize_rootfs_tmp": "/dev",
|
|
"ssh_deletekeys": 1,
|
|
"ssh_genkeytypes": null,
|
|
"ssh_pwauth": 0,
|
|
"syslog_fix_perms": null,
|
|
"system_info": {
|
|
"default_user": {
|
|
"gecos": "Cloud User",
|
|
"groups": [
|
|
"adm",
|
|
"systemd-journal"
|
|
],
|
|
"lock_passwd": true,
|
|
"name": "ec2-user",
|
|
"shell": "/bin/bash",
|
|
"sudo": [
|
|
"ALL=(ALL) NOPASSWD:ALL"
|
|
]
|
|
},
|
|
"distro": "rhel",
|
|
"paths": {
|
|
"cloud_dir": "/var/lib/cloud",
|
|
"templates_dir": "/etc/cloud/templates"
|
|
},
|
|
"ssh_svcname": "sshd"
|
|
},
|
|
"users": [
|
|
"default"
|
|
]
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(config_path) as f:
|
|
config = yaml.safe_load(f)
|
|
result.update(config)
|
|
|
|
return result
|
|
|
|
|
|
def read_cloud_init_configs(tree):
|
|
"""
|
|
Read all cloud-init *.cfg files from a predefined list of paths and parse them.
|
|
|
|
The searched paths are:
|
|
- "/etc/cloud/cloud.cfg"
|
|
- "/etc/cloud/cloud.cfg.d/*.cfg"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by 'read_cloud_init_config()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/cloud.cfg.d":
|
|
"ec2.cfg": {
|
|
"default_user": {
|
|
"name": "ec2-user"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/cloud/cloud.cfg",
|
|
"/etc/cloud/cloud.cfg.d/*.cfg"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, read_cloud_init_config)
|
|
|
|
|
|
def read_dracut_config(config_path):
|
|
"""
|
|
Read specific dracut configuration file.
|
|
|
|
Returns: dictionary representing the uncommented configuration options read
|
|
from the file.
|
|
|
|
An example return value:
|
|
{
|
|
"install_items": " sgdisk "
|
|
"add_drivers": " xen-netfront xen-blkfront "
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
with open(config_path) as f:
|
|
# dracut configuration key/values delimiter is '=' or '+='
|
|
for line in f:
|
|
line = line.strip()
|
|
# A '#' indicates the beginning of a comment; following
|
|
# characters, up to the end of the line are not interpreted.
|
|
line_comment = line.split("#", 1)
|
|
line = line_comment[0]
|
|
if line:
|
|
key, value = line.split("=", 1)
|
|
if key[-1] == "+":
|
|
key = key[:-1]
|
|
result[key] = value.strip('"')
|
|
|
|
return result
|
|
|
|
|
|
def read_dracut_configs(tree):
|
|
"""
|
|
Read all dracut *.conf files from a predefined list of paths and parse them.
|
|
|
|
The searched paths are:
|
|
- "/etc/dracut.conf.d/*.conf"
|
|
- "/usr/lib/dracut/dracut.conf.d/*.conf"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by 'read_dracut_config()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/dracut.conf.d": {
|
|
"sgdisk.conf": {
|
|
"install_items": " sgdisk "
|
|
},
|
|
},
|
|
"/usr/lib/dracut/dracut.conf.d": {
|
|
"xen.conf": {
|
|
"add_drivers": " xen-netfront xen-blkfront "
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/dracut.conf.d/*.conf",
|
|
"/usr/lib/dracut/dracut.conf.d/*.conf"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, read_dracut_config)
|
|
|
|
|
|
def read_keyboard_conf(tree):
|
|
"""
|
|
Read keyboard configuration for vconsole and X11.
|
|
|
|
Returns: dictionary with at most two keys 'X11' and 'vconsole'.
|
|
'vconsole' value is a dictionary representing configuration read from
|
|
/etc/vconsole.conf.
|
|
'X11' value is a dictionary with at most two keys 'layout' and 'variant',
|
|
which values are extracted from X11 keyborad configuration.
|
|
|
|
An example return value:
|
|
{
|
|
"X11": {
|
|
"layout": "us"
|
|
},
|
|
"vconsole": {
|
|
"FONT": "eurlatgr",
|
|
"KEYMAP": "us"
|
|
}
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
# read virtual console configuration
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/vconsole.conf") as f:
|
|
values = parse_environment_vars(f.read())
|
|
if values:
|
|
result["vconsole"] = values
|
|
|
|
# read X11 keyboard configuration
|
|
with contextlib.suppress(FileNotFoundError):
|
|
# Example file content:
|
|
#
|
|
# Section "InputClass"
|
|
# Identifier "system-keyboard"
|
|
# MatchIsKeyboard "on"
|
|
# Option "XkbLayout" "us,sk"
|
|
# Option "XkbVariant" ",qwerty"
|
|
# EndSection
|
|
x11_config = {}
|
|
match_options_dict = {
|
|
"layout": r'Section\s+"InputClass"\s+.*Option\s+"XkbLayout"\s+"([\w,-]+)"\s+.*EndSection',
|
|
"variant": r'Section\s+"InputClass"\s+.*Option\s+"XkbVariant"\s+"([\w,-]+)"\s+.*EndSection'
|
|
}
|
|
with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf") as f:
|
|
config = f.read()
|
|
for option, pattern in match_options_dict.items():
|
|
match = re.search(pattern, config, re.DOTALL)
|
|
if match and match.group(1):
|
|
x11_config[option] = match.group(1)
|
|
|
|
if x11_config:
|
|
result["X11"] = x11_config
|
|
|
|
return result
|
|
|
|
|
|
def read_chrony_conf(tree):
|
|
"""
|
|
Read specific directives from Chrony configuration. Currently parsed
|
|
directives are:
|
|
- 'server'
|
|
- 'pool'
|
|
- 'peer'
|
|
- 'leapsectz'
|
|
|
|
Returns: dictionary with the keys representing parsed directives from Chrony
|
|
configuration. Value of each key is a list of strings containing arguments
|
|
provided with each occurance of the directive in the configuration.
|
|
|
|
An example return value:
|
|
{
|
|
"leapsectz": [
|
|
"right/UTC"
|
|
],
|
|
"pool": [
|
|
"2.rhel.pool.ntp.org iburst"
|
|
],
|
|
"server": [
|
|
"169.254.169.123 prefer iburst minpoll 4 maxpoll 4"
|
|
]
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
parsed_directives = ["server", "pool", "peer", "leapsectz"]
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/chrony.conf") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
# skip comments
|
|
if line[0] in ["!", ";", "#", "%"]:
|
|
continue
|
|
split_line = line.split()
|
|
if split_line[0] in parsed_directives:
|
|
try:
|
|
directive_list = result[split_line[0]]
|
|
except KeyError:
|
|
directive_list = result[split_line[0]] = []
|
|
directive_list.append(" ".join(split_line[1:]))
|
|
|
|
return result
|
|
|
|
|
|
def read_systemd_service_dropin(dropin_dir_path):
|
|
"""
|
|
Read systemd .service unit drop-in configurations.
|
|
|
|
Returns: dictionary representing the combined drop-in configurations.
|
|
|
|
An example return value:
|
|
{
|
|
"Service": {
|
|
"Environment": "NM_CLOUD_SETUP_EC2=yes"
|
|
}
|
|
}
|
|
"""
|
|
# read all unit drop-in configurations
|
|
config_files = glob.glob(f"{dropin_dir_path}/*.conf")
|
|
parser = configparser.RawConfigParser()
|
|
# prevent conversion of the opion name to lowercase
|
|
parser.optionxform = lambda option: option
|
|
parser.read(config_files)
|
|
|
|
dropin_config = {}
|
|
for section in parser.sections():
|
|
section_config = {}
|
|
section_config.update(parser[section])
|
|
if section_config:
|
|
dropin_config[section] = section_config
|
|
|
|
return dropin_config
|
|
|
|
|
|
def read_systemd_service_dropins(tree):
|
|
"""
|
|
Read all systemd .service unit config files from a predefined list of paths
|
|
and parse them.
|
|
|
|
The searched paths are:
|
|
- "/etc/systemd/system/*.service.d"
|
|
- "/usr/lib/systemd/system/*.service.d"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by 'read_systemd_service_dropin()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/systemd/system": {
|
|
"nm-cloud-setup.service.d": {
|
|
"Service": {
|
|
"Environment": "NM_CLOUD_SETUP_EC2=yes"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/systemd/system/*.service.d",
|
|
"/usr/lib/systemd/system/*.service.d"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, read_systemd_service_dropin)
|
|
|
|
|
|
def read_tmpfilesd_config(config_path):
|
|
"""
|
|
Read tmpfiles.d configuration files.
|
|
|
|
Returns: list of strings representing uncommented lines read from the
|
|
configuration file.
|
|
|
|
An example return value:
|
|
[
|
|
"x /tmp/.sap*",
|
|
"x /tmp/.hdb*lock",
|
|
"x /tmp/.trex*lock"
|
|
]
|
|
"""
|
|
file_lines = []
|
|
|
|
with open(config_path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line[0] == "#":
|
|
continue
|
|
file_lines.append(line)
|
|
|
|
return file_lines
|
|
|
|
|
|
def read_tmpfilesd_configs(tree):
|
|
"""
|
|
Read all tmpfiles.d *.conf files from a predefined list of paths and parse
|
|
them.
|
|
|
|
The searched paths are:
|
|
- "/etc/tmpfiles.d/*.conf"
|
|
- "/usr/lib/tmpfiles.d/*.conf"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by 'read_tmpfilesd_config()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/tmpfiles.d": {
|
|
"sap.conf": [
|
|
"x /tmp/.sap*",
|
|
"x /tmp/.hdb*lock",
|
|
"x /tmp/.trex*lock"
|
|
]
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/tmpfiles.d/*.conf",
|
|
"/usr/lib/tmpfiles.d/*.conf"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, read_tmpfilesd_config)
|
|
|
|
|
|
def read_tuned_profile(tree):
|
|
"""
|
|
Read the Tuned active profile and profile mode.
|
|
|
|
Returns: dictionary with at most two keys 'active_profile' and 'profile_mode'.
|
|
Value of each key is a string representing respective tuned configuration
|
|
value.
|
|
|
|
An example return value:
|
|
{
|
|
"active_profile": "sap-hana",
|
|
"profile_mode": "manual"
|
|
}
|
|
"""
|
|
result = {}
|
|
config_files = ["active_profile", "profile_mode"]
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
for config_file in config_files:
|
|
with open(f"{tree}/etc/tuned/{config_file}") as f:
|
|
value = f.read()
|
|
value = value.strip()
|
|
if value:
|
|
result[config_file] = value
|
|
|
|
return result
|
|
|
|
|
|
def read_sysctld_config(config_path):
|
|
"""
|
|
Read sysctl configuration file.
|
|
|
|
Returns: list of strings representing uncommented lines read from the
|
|
configuration file.
|
|
|
|
An example return value:
|
|
[
|
|
"kernel.pid_max = 4194304",
|
|
"vm.max_map_count = 2147483647"
|
|
]
|
|
"""
|
|
values = []
|
|
|
|
with open(config_path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
# skip comments
|
|
if line[0] in ["#", ";"]:
|
|
continue
|
|
values.append(line)
|
|
|
|
return values
|
|
|
|
|
|
def read_sysctld_configs(tree):
|
|
"""
|
|
Read all sysctl.d *.conf files from a predefined list of paths and parse
|
|
them.
|
|
|
|
The searched paths are:
|
|
- "/etc/sysctl.d/*.conf",
|
|
- "/usr/lib/sysctl.d/*.conf"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by 'read_sysctld_config()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/sysctl.d": {
|
|
"sap.conf": [
|
|
"kernel.pid_max = 4194304",
|
|
"vm.max_map_count = 2147483647"
|
|
]
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/sysctl.d/*.conf",
|
|
"/usr/lib/sysctl.d/*.conf"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, read_sysctld_config)
|
|
|
|
|
|
def read_security_limits_config(config_path):
|
|
"""
|
|
Read all configuration files from /etc/security/limits.d.
|
|
|
|
Returns: dictionary with the keys representing names of configuration files
|
|
from /etc/security/limits.d. Value of each key is a dictionary representing
|
|
uncommented configuration values read from the configuration file.
|
|
|
|
An example return value:
|
|
[
|
|
{
|
|
"domain": "@sapsys",
|
|
"item": "nofile",
|
|
"type": "hard",
|
|
"value": "65536"
|
|
},
|
|
{
|
|
"domain": "@sapsys",
|
|
"item": "nofile",
|
|
"type": "soft",
|
|
"value": "65536"
|
|
}
|
|
]
|
|
"""
|
|
values = []
|
|
|
|
with open(config_path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
# the '#' character introduces a comment - after which the rest of the line is ignored
|
|
split_line = line.split("#", 1)
|
|
line = split_line[0]
|
|
if not line:
|
|
continue
|
|
# Syntax of a line is "<domain> <type> <item> <value>"
|
|
domain, limit_type, item, value = line.split()
|
|
values.append({
|
|
"domain": domain,
|
|
"type": limit_type,
|
|
"item": item,
|
|
"value": value
|
|
})
|
|
|
|
return values
|
|
|
|
|
|
def read_security_limits_configs(tree):
|
|
"""
|
|
Read all security limits *.conf files from a predefined list of paths and
|
|
parse them.
|
|
|
|
The searched paths are:
|
|
- "/etc/security/limits.conf"
|
|
- "/etc/security/limits.d/*.conf"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by 'read_security_limits_config()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/security/limits.d": {
|
|
"99-sap.conf": [
|
|
{
|
|
"domain": "@sapsys",
|
|
"item": "nofile",
|
|
"type": "hard",
|
|
"value": "65536"
|
|
},
|
|
{
|
|
"domain": "@sapsys",
|
|
"item": "nofile",
|
|
"type": "soft",
|
|
"value": "65536"
|
|
}
|
|
]
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/security/limits.conf",
|
|
"/etc/security/limits.d/*.conf"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, read_tmpfilesd_config)
|
|
|
|
|
|
def read_ssh_config(config_path):
|
|
"""
|
|
Read the content of provided SSH(d) configuration file.
|
|
|
|
Returns: list of uncommented and non-empty lines read from the configuation
|
|
file.
|
|
|
|
An example return value:
|
|
[
|
|
"Match final all",
|
|
"Include /etc/crypto-policies/back-ends/openssh.config",
|
|
"GSSAPIAuthentication yes",
|
|
"ForwardX11Trusted yes",
|
|
"SendEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES",
|
|
"SendEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT",
|
|
"SendEnv LC_IDENTIFICATION LC_ALL LANGUAGE",
|
|
"SendEnv XMODIFIERS"
|
|
]
|
|
"""
|
|
config_lines = []
|
|
|
|
with open(config_path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line[0] == "#":
|
|
continue
|
|
config_lines.append(line)
|
|
|
|
return config_lines
|
|
|
|
|
|
def read_ssh_configs(tree):
|
|
"""
|
|
Read all SSH configuration files from a predefined list of paths and
|
|
parse them.
|
|
|
|
The searched paths are:
|
|
- "/etc/ssh/ssh_config"
|
|
- "/etc/ssh/ssh_config.d/*.conf"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by 'read_ssh_config()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/ssh": {
|
|
"ssh_config": [
|
|
"Include /etc/ssh/ssh_config.d/*.conf"
|
|
]
|
|
},
|
|
"/etc/ssh/ssh_config.d": {
|
|
"05-redhat.conf": [
|
|
"Match final all",
|
|
"Include /etc/crypto-policies/back-ends/openssh.config",
|
|
"GSSAPIAuthentication yes",
|
|
"ForwardX11Trusted yes",
|
|
"SendEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES",
|
|
"SendEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT",
|
|
"SendEnv LC_IDENTIFICATION LC_ALL LANGUAGE",
|
|
"SendEnv XMODIFIERS"
|
|
]
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/ssh/ssh_config",
|
|
"/etc/ssh/ssh_config.d/*.conf"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, read_ssh_config)
|
|
|
|
|
|
def read_sshd_configs(tree):
|
|
"""
|
|
Read all SSHd configuration files from a predefined list of paths and
|
|
parse them.
|
|
|
|
The searched paths are:
|
|
- "/etc/ssh/sshd_config"
|
|
- "/etc/ssh/sshd_config.d/*.conf"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by 'read_ssh_config()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/ssh": {
|
|
"sshd_config": [
|
|
"HostKey /etc/ssh/ssh_host_rsa_key",
|
|
"HostKey /etc/ssh/ssh_host_ecdsa_key",
|
|
"HostKey /etc/ssh/ssh_host_ed25519_key",
|
|
"SyslogFacility AUTHPRIV",
|
|
"PermitRootLogin no",
|
|
"AuthorizedKeysFile\t.ssh/authorized_keys",
|
|
"PasswordAuthentication no",
|
|
"ChallengeResponseAuthentication no",
|
|
"GSSAPIAuthentication yes",
|
|
"GSSAPICleanupCredentials no",
|
|
"UsePAM yes",
|
|
"X11Forwarding yes",
|
|
"PrintMotd no",
|
|
"AcceptEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES",
|
|
"AcceptEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT",
|
|
"AcceptEnv LC_IDENTIFICATION LC_ALL LANGUAGE",
|
|
"AcceptEnv XMODIFIERS",
|
|
"Subsystem\tsftp\t/usr/libexec/openssh/sftp-server",
|
|
"ClientAliveInterval 420"
|
|
]
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/ssh/sshd_config",
|
|
"/etc/ssh/sshd_config.d/*.conf"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, read_ssh_config)
|
|
|
|
|
|
def read_yum_repos(tree):
|
|
"""
|
|
Read all YUM/DNF repo files.
|
|
|
|
The searched paths are:
|
|
- "/etc/yum.repos.d/*.repo"
|
|
|
|
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
|
|
configuration representation as returned by '_read_inifile_to_dict()'.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/yum.repos.d": {
|
|
"google-cloud.repo": {
|
|
"google-cloud-sdk": {
|
|
"baseurl": "https://packages.cloud.google.com/yum/repos/cloud-sdk-el8-x86_64",
|
|
"enabled": "1",
|
|
"gpgcheck": "1",
|
|
"gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg",
|
|
"name": "Google Cloud SDK",
|
|
"repo_gpgcheck": "0"
|
|
},
|
|
"google-compute-engine": {
|
|
"baseurl": "https://packages.cloud.google.com/yum/repos/google-compute-engine-el8-x86_64-stable",
|
|
"enabled": "1",
|
|
"gpgcheck": "1",
|
|
"gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg",
|
|
"name": "Google Compute Engine",
|
|
"repo_gpgcheck": "0"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
checked_globs = [
|
|
"/etc/yum.repos.d/*.repo"
|
|
]
|
|
|
|
return _read_glob_paths_with_parser(tree, checked_globs, _read_inifile_to_dict)
|
|
|
|
|
|
def read_sudoers(tree):
|
|
"""
|
|
Read uncommented lines from sudoers configuration file and /etc/sudoers.d
|
|
This functions does not actually do much of a parsing, as sudoers file
|
|
format grammar is a bit too much for our purpose.
|
|
Any #include or #includedir directives are ignored by this function.
|
|
|
|
Returns: dictionary with the keys representing names of read configuration
|
|
files, /etc/sudoers and files from /etc/sudoers.d. Value of each key is
|
|
a list of strings representing uncommented lines read from the configuration
|
|
file.
|
|
|
|
An example return value:
|
|
{
|
|
"/etc/sudoers": [
|
|
"Defaults !visiblepw",
|
|
"Defaults always_set_home",
|
|
"Defaults match_group_by_gid",
|
|
"Defaults always_query_group_plugin",
|
|
"Defaults env_reset",
|
|
"Defaults env_keep = \"COLORS DISPLAY HOSTNAME HISTSIZE KDEDIR LS_COLORS\"",
|
|
"Defaults env_keep += \"MAIL PS1 PS2 QTDIR USERNAME LANG LC_ADDRESS LC_CTYPE\"",
|
|
"Defaults env_keep += \"LC_COLLATE LC_IDENTIFICATION LC_MEASUREMENT LC_MESSAGES\"",
|
|
"Defaults env_keep += \"LC_MONETARY LC_NAME LC_NUMERIC LC_PAPER LC_TELEPHONE\"",
|
|
"Defaults env_keep += \"LC_TIME LC_ALL LANGUAGE LINGUAS _XKB_CHARSET XAUTHORITY\"",
|
|
"Defaults secure_path = /sbin:/bin:/usr/sbin:/usr/bin",
|
|
"root\tALL=(ALL) \tALL",
|
|
"%wheel\tALL=(ALL)\tALL",
|
|
"ec2-user\tALL=(ALL)\tNOPASSWD: ALL"
|
|
]
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
def _parse_sudoers_file(f):
|
|
lines = []
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line[0] == "#":
|
|
continue
|
|
lines.append(line)
|
|
return lines
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/sudoers") as f:
|
|
lines = _parse_sudoers_file(f)
|
|
if lines:
|
|
result["/etc/sudoers"] = lines
|
|
|
|
sudoersd_result = {}
|
|
for file in glob.glob(f"{tree}/etc/sudoers.d/*"):
|
|
with open(file) as f:
|
|
lines = _parse_sudoers_file(f)
|
|
if lines:
|
|
result[os.path.basename(file)] = lines
|
|
if sudoersd_result:
|
|
result["/etc/sudoers.d"] = sudoersd_result
|
|
|
|
return result
|
|
|
|
|
|
def read_udev_rules(tree):
|
|
"""
|
|
Read udev rules defined in /etc/udev/rules.d.
|
|
|
|
Returns: dictionary with the keys representing names of files with udev
|
|
rules from /etc/udev/rules.d. Value of each key is a list of strings
|
|
representing uncommented lines read from the configuration file. If
|
|
the file is empty (e.g. because of masking udev configuration installed
|
|
by an RPM), an emptu list is returned as the respective value.
|
|
|
|
An example return value:
|
|
{
|
|
"80-net-name-slot.rules": []
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
for file in glob.glob(f"{tree}/etc/udev/rules.d/*.rules"):
|
|
with open(file) as f:
|
|
lines = []
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line[0] == "#":
|
|
continue
|
|
lines.append(line)
|
|
# include also empty files in the report
|
|
result[os.path.basename(file)] = lines
|
|
|
|
return result
|
|
|
|
|
|
def _read_inifile_to_dict(config_path):
|
|
"""
|
|
Read INI file from the provided path
|
|
|
|
Returns: a dictionary representing the provided INI file content.
|
|
|
|
An example return value:
|
|
{
|
|
"google-cloud-sdk": {
|
|
"baseurl": "https://packages.cloud.google.com/yum/repos/cloud-sdk-el8-x86_64",
|
|
"enabled": "1",
|
|
"gpgcheck": "1",
|
|
"gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg",
|
|
"name": "Google Cloud SDK",
|
|
"repo_gpgcheck": "0"
|
|
},
|
|
"google-compute-engine": {
|
|
"baseurl": "https://packages.cloud.google.com/yum/repos/google-compute-engine-el8-x86_64-stable",
|
|
"enabled": "1",
|
|
"gpgcheck": "1",
|
|
"gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg",
|
|
"name": "Google Compute Engine",
|
|
"repo_gpgcheck": "0"
|
|
}
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(config_path) as f:
|
|
parser = configparser.RawConfigParser()
|
|
# prevent conversion of the opion name to lowercase
|
|
parser.optionxform = lambda option: option
|
|
parser.readfp(f)
|
|
|
|
for section in parser.sections():
|
|
section_config = dict(parser.items(section))
|
|
if section_config:
|
|
result[section] = section_config
|
|
|
|
return result
|
|
|
|
|
|
def read_dnf_conf(tree):
|
|
"""
|
|
Read DNF configuration and defined variable files.
|
|
|
|
Returns: dictionary with at most two keys 'dnf.conf' and 'vars'.
|
|
'dnf.conf' value is a dictionary representing the DNF configuration
|
|
file content.
|
|
'vars' value is a dictionary which keys represent names of files from
|
|
/etc/dnf/vars/ and values are strings representing the file content.
|
|
|
|
An example return value:
|
|
{
|
|
"dnf.conf": {
|
|
"main": {
|
|
"installonly_limit": "3"
|
|
}
|
|
},
|
|
"vars": {
|
|
"releasever": "8.4"
|
|
}
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
dnf_config = _read_inifile_to_dict(f"{tree}/etc/dnf/dnf.conf")
|
|
if dnf_config:
|
|
result["dnf.conf"] = dnf_config
|
|
|
|
dnf_vars = {}
|
|
for file in glob.glob(f"{tree}/etc/dnf/vars/*"):
|
|
with open(file) as f:
|
|
dnf_vars[os.path.basename(file)] = f.read().strip()
|
|
if dnf_vars:
|
|
result["vars"] = dnf_vars
|
|
|
|
return result
|
|
|
|
|
|
def read_dnf_automatic_conf(tree):
|
|
"""
|
|
Read DNF Automatic configuation.
|
|
|
|
Returns: dictionary as returned by '_read_inifile_to_dict()'.
|
|
|
|
An example return value:
|
|
{
|
|
"base": {
|
|
"debuglevel": "1"
|
|
},
|
|
"command_email": {
|
|
"email_from": "root@example.com",
|
|
"email_to": "root"
|
|
},
|
|
"commands": {
|
|
"apply_updates": "yes",
|
|
"download_updates": "yes",
|
|
"network_online_timeout": "60",
|
|
"random_sleep": "0",
|
|
"upgrade_type": "security"
|
|
},
|
|
"email": {
|
|
"email_from": "root@example.com",
|
|
"email_host": "localhost",
|
|
"email_to": "root"
|
|
},
|
|
"emitters": {
|
|
"emit_via": "stdio"
|
|
}
|
|
}
|
|
"""
|
|
return _read_inifile_to_dict(f"{tree}/etc/dnf/automatic.conf")
|
|
|
|
|
|
def read_authselect_conf(tree):
|
|
"""
|
|
Read authselect configuration.
|
|
|
|
Returns: dictionary with two keys 'profile-id' and 'enabled-features'.
|
|
'profile-id' value is a string representing the configured authselect
|
|
profile.
|
|
'enabled-features' value is a list of strings representing enabled features
|
|
of the used authselect profile. In case there are no specific features
|
|
enabled, the list is empty.
|
|
|
|
An example return value:
|
|
{
|
|
"enabled-features": [],
|
|
"profile-id": "sssd"
|
|
}
|
|
"""
|
|
result = {}
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/authselect/authselect.conf") as f:
|
|
# the first line is always the profile ID
|
|
# following lines are listing enabled features
|
|
# lines starting with '#' and empty lines are skipped
|
|
authselect_conf_lines = []
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line[0] == "#":
|
|
continue
|
|
authselect_conf_lines.append(line)
|
|
if authselect_conf_lines:
|
|
result["profile-id"] = authselect_conf_lines[0]
|
|
result["enabled-features"] = authselect_conf_lines[1:]
|
|
|
|
return result
|
|
|
|
|
|
def read_resolv_conf(tree):
|
|
"""
|
|
Read /etc/resolv.conf.
|
|
|
|
Returns: a list of uncommented lines from the /etc/resolv.conf.
|
|
|
|
An example return value:
|
|
[
|
|
"search redhat.com",
|
|
"nameserver 192.168.1.1",
|
|
"nameserver 192.168.1.2"
|
|
]
|
|
"""
|
|
result = []
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/resolv.conf") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
if line[0] == "#":
|
|
continue
|
|
result.append(line)
|
|
|
|
return result
|
|
|
|
|
|
def append_filesystem(report, tree, *, is_ostree=False):
|
|
if os.path.exists(f"{tree}/etc/os-release"):
|
|
report["packages"] = rpm_packages(tree, is_ostree)
|
|
if not is_ostree:
|
|
report["rpm-verify"] = rpm_verify(tree)
|
|
|
|
not_installed_docs = rpm_not_installed_docs(tree, is_ostree)
|
|
if not_installed_docs:
|
|
report["rpm_not_installed_docs"] = not_installed_docs
|
|
|
|
with open(f"{tree}/etc/os-release") as f:
|
|
report["os-release"] = parse_environment_vars(f.read())
|
|
|
|
report["services-enabled"] = read_services(tree, "enabled")
|
|
report["services-disabled"] = read_services(tree, "disabled")
|
|
|
|
default_target = read_default_target(tree)
|
|
if default_target:
|
|
report["default-target"] = default_target
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/hostname") as f:
|
|
report["hostname"] = f.read().strip()
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
report["timezone"] = os.path.basename(os.readlink(f"{tree}/etc/localtime"))
|
|
|
|
authselect_conf = read_authselect_conf(tree)
|
|
if authselect_conf:
|
|
report["authselect"] = authselect_conf
|
|
|
|
chrony_conf = read_chrony_conf(tree)
|
|
if chrony_conf:
|
|
report["chrony"] = chrony_conf
|
|
|
|
cloud_init_configs = read_cloud_init_configs(tree)
|
|
if cloud_init_configs:
|
|
report["cloud-init"] = cloud_init_configs
|
|
|
|
dnf_conf = read_dnf_conf(tree)
|
|
if dnf_conf:
|
|
report["dnf"] = dnf_conf
|
|
|
|
dnf_automatic = read_dnf_automatic_conf(tree)
|
|
if dnf_automatic:
|
|
report["/etc/dnf/automatic.conf"] = dnf_automatic
|
|
|
|
yum_repos = read_yum_repos(tree)
|
|
if yum_repos:
|
|
report["yum_repos"] = yum_repos
|
|
|
|
dracut_configs = read_dracut_configs(tree)
|
|
if dracut_configs:
|
|
report["dracut"] = dracut_configs
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
report["firewall-enabled"] = read_firewall_zone(tree)
|
|
|
|
firewall_default_zone = read_firewall_default_zone(tree)
|
|
if firewall_default_zone:
|
|
report["firewall-default-zone"] = firewall_default_zone
|
|
|
|
fstab = read_fstab(tree)
|
|
if fstab:
|
|
report["fstab"] = fstab
|
|
|
|
hosts = read_hosts(tree)
|
|
if hosts:
|
|
report["hosts"] = hosts
|
|
|
|
keyboard = read_keyboard_conf(tree)
|
|
if keyboard:
|
|
report["keyboard"] = keyboard
|
|
|
|
security_limits_configs = read_security_limits_configs(tree)
|
|
if security_limits_configs:
|
|
report["security-limits"] = security_limits_configs
|
|
|
|
locale = read_locale(tree)
|
|
if locale:
|
|
report["locale"] = locale
|
|
|
|
logind_configs = read_logind_configs(tree)
|
|
if logind_configs:
|
|
report["systemd-logind"] = logind_configs
|
|
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/etc/machine-id") as f:
|
|
report["machine-id"] = f.readline()
|
|
|
|
modprobe_configs = read_modprobe_configs(tree)
|
|
if modprobe_configs:
|
|
report["modprobe"] = modprobe_configs
|
|
|
|
tmpfilesd_configs = read_tmpfilesd_configs(tree)
|
|
if tmpfilesd_configs:
|
|
report["tmpfiles.d"] = tmpfilesd_configs
|
|
|
|
rhsm = read_rhsm(tree)
|
|
if rhsm:
|
|
report["rhsm"] = rhsm
|
|
|
|
selinux = read_selinux_info(tree, is_ostree)
|
|
if selinux:
|
|
report["selinux"] = selinux
|
|
|
|
ssh_configs = read_ssh_configs(tree)
|
|
if ssh_configs:
|
|
report["ssh_config"] = ssh_configs
|
|
|
|
sshd_configs = read_sshd_configs(tree)
|
|
if sshd_configs:
|
|
report["sshd_config"] = sshd_configs
|
|
|
|
sudoers_conf = read_sudoers(tree)
|
|
if sudoers_conf:
|
|
report["sudoers"] = sudoers_conf
|
|
|
|
sysconfig = read_sysconfig(tree)
|
|
if sysconfig:
|
|
report["sysconfig"] = sysconfig
|
|
|
|
sysctld_configs = read_sysctld_configs(tree)
|
|
if sysctld_configs:
|
|
report["sysctl.d"] = sysctld_configs
|
|
|
|
systemd_service_dropins = read_systemd_service_dropins(tree)
|
|
if systemd_service_dropins:
|
|
report["systemd-service-dropins"] = systemd_service_dropins
|
|
|
|
tuned_profile = read_tuned_profile(tree)
|
|
if tuned_profile:
|
|
report["tuned"] = tuned_profile
|
|
|
|
resolv_conf = read_resolv_conf(tree)
|
|
# add even empty resolv_conf to the report to express that it is empty or non-existent
|
|
report["/etc/resolv.conf"] = resolv_conf
|
|
|
|
udev_rules = read_udev_rules(tree)
|
|
if udev_rules:
|
|
report["/etc/udev/rules.d"] = udev_rules
|
|
|
|
with open(f"{tree}/etc/passwd") as f:
|
|
report["passwd"] = sorted(f.read().strip().split("\n"))
|
|
|
|
with open(f"{tree}/etc/group") as f:
|
|
report["groups"] = sorted(f.read().strip().split("\n"))
|
|
|
|
if is_ostree:
|
|
with open(f"{tree}/usr/lib/passwd") as f:
|
|
report["passwd-system"] = sorted(f.read().strip().split("\n"))
|
|
|
|
with open(f"{tree}/usr/lib/group") as f:
|
|
report["groups-system"] = sorted(f.read().strip().split("\n"))
|
|
|
|
if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0:
|
|
assert "bootmenu" not in report
|
|
with contextlib.suppress(FileNotFoundError):
|
|
with open(f"{tree}/boot/grub2/grubenv") as f:
|
|
report["boot-environment"] = parse_environment_vars(f.read())
|
|
report["bootmenu"] = read_boot_entries(f"{tree}/boot")
|
|
|
|
elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0:
|
|
assert "bootmenu" not in report
|
|
with open(f"{tree}/grub2/grubenv") as f:
|
|
report["boot-environment"] = parse_environment_vars(f.read())
|
|
report["bootmenu"] = read_boot_entries(tree)
|
|
elif len(glob.glob(f"{tree}/EFI")):
|
|
print("EFI partition", file=sys.stderr)
|
|
|
|
|
|
def volume_group_for_device(device: str) -> str:
|
|
# Find the volume group that belongs to the device specified via `parent`
|
|
vg_name = None
|
|
count = 0
|
|
|
|
cmd = [
|
|
"pvdisplay", "-C", "--noheadings", "-o", "vg_name", device
|
|
]
|
|
|
|
while True:
|
|
res = subprocess.run(cmd,
|
|
check=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
encoding="UTF-8")
|
|
|
|
if res.returncode == 5:
|
|
if count == 10:
|
|
raise RuntimeError("Could not find parent device")
|
|
time.sleep(1*count)
|
|
count += 1
|
|
continue
|
|
|
|
if res.returncode != 0:
|
|
raise RuntimeError(res.stderr.strip())
|
|
|
|
vg_name = res.stdout.strip()
|
|
if vg_name:
|
|
break
|
|
|
|
return vg_name
|
|
|
|
|
|
def ensure_device_file(path: str, major: int, minor: int):
|
|
"""Ensure the device file with the given major, minor exists"""
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
if not os.path.exists(path):
|
|
os.mknod(path, 0o600 | stat.S_IFBLK, os.makedev(major, minor))
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def discover_lvm(dev) -> Dict:
|
|
# find the volume group name for the device file
|
|
vg_name = volume_group_for_device(dev)
|
|
|
|
# activate it
|
|
r = subprocess.run(["vgchange", "-ay", vg_name],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE,
|
|
check=False)
|
|
if r.returncode != 0:
|
|
raise RuntimeError(r.stderr.strip())
|
|
|
|
try:
|
|
# Find all logical volumes in the volume group
|
|
cmd = [
|
|
"lvdisplay", "-C", "--noheadings",
|
|
"-o", "lv_name,path,lv_kernel_major,lv_kernel_minor",
|
|
"--separator", ";",
|
|
vg_name
|
|
]
|
|
|
|
res = subprocess.run(cmd,
|
|
check=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
encoding="UTF-8")
|
|
|
|
if res.returncode != 0:
|
|
raise RuntimeError(res.stderr.strip())
|
|
|
|
data = res.stdout.strip()
|
|
parsed = list(map(lambda l: l.split(";"), data.split("\n")))
|
|
volumes = OrderedDict()
|
|
|
|
for vol in parsed:
|
|
vol = list(map(lambda v: v.strip(), vol))
|
|
assert len(vol) == 4
|
|
name, voldev, major, minor = vol
|
|
info = {
|
|
"device": voldev
|
|
}
|
|
ensure_device_file(voldev, int(major), int(minor))
|
|
|
|
read_partition(voldev, info)
|
|
volumes[name] = info
|
|
if name.startswith("root"):
|
|
volumes.move_to_end(name, last=False)
|
|
res = {
|
|
"lvm": True,
|
|
"lvm.vg": vg_name,
|
|
"lvm.volumes": volumes
|
|
}
|
|
|
|
yield res
|
|
|
|
finally:
|
|
r = subprocess.run(["vgchange", "-an", vg_name],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE,
|
|
check=False)
|
|
if r.returncode != 0:
|
|
raise RuntimeError(r.stderr.strip())
|
|
|
|
|
|
def partition_is_lvm(part: Dict) -> bool:
|
|
return part["type"].upper() in ["E6D6D379-F507-44C2-A23C-238F2A3DF928", "8E"]
|
|
|
|
|
|
def append_partitions(report, device, loctl):
|
|
partitions = report["partitions"]
|
|
|
|
with contextlib.ExitStack() as cm:
|
|
# open each partition as a loop device
|
|
filesystems = {}
|
|
for part in partitions:
|
|
start, size = part["start"], part["size"]
|
|
dev = cm.enter_context(loop_open(loctl, device, offset=start, size=size))
|
|
read_partition(dev, part)
|
|
if partition_is_lvm(part):
|
|
lvm = cm.enter_context(discover_lvm(dev))
|
|
for vol in lvm["lvm.volumes"].values():
|
|
if vol["fstype"]:
|
|
mntopts = []
|
|
# we cannot recover since the underlying loopback device is mounted
|
|
# read-only but since we are using the it through the device mapper
|
|
# the fact might not be communicated and the kernel attempt a to
|
|
# a recovery of the filesystem, which will lead to a kernel panic
|
|
if vol["fstype"] in ("ext4", "ext3", "xfs"):
|
|
mntopts = ["norecovery"]
|
|
filesystems[vol["uuid"].upper()] = {
|
|
"device": vol["device"],
|
|
"mntops": mntopts
|
|
}
|
|
del vol["device"]
|
|
part.update(lvm)
|
|
elif part["uuid"] and part["fstype"]:
|
|
filesystems[part["uuid"].upper()] = {
|
|
"device": dev
|
|
}
|
|
|
|
# find partition with fstab and read it
|
|
fstab = []
|
|
for fs in filesystems.values():
|
|
dev, opts = fs["device"], fs.get("mntops")
|
|
with mount(dev, opts) as tree:
|
|
if os.path.exists(f"{tree}/etc/fstab"):
|
|
fstab.extend(read_fstab(tree))
|
|
break
|
|
# sort the fstab entries by the mountpoint
|
|
fstab = sorted(fstab, key=operator.itemgetter(1))
|
|
|
|
# mount all partitions to ther respective mount points
|
|
root_tree = ""
|
|
for n, fstab_entry in enumerate(fstab):
|
|
part_uuid = fstab_entry[0].split("=")[1].upper()
|
|
part_device = filesystems[part_uuid]["device"]
|
|
part_mountpoint = fstab_entry[1]
|
|
part_fstype = fstab_entry[2]
|
|
part_options = fstab_entry[3].split(",")
|
|
part_options += filesystems[part_uuid].get("mntops", [])
|
|
|
|
# the first mount point should be root
|
|
if n == 0:
|
|
if part_mountpoint != "/":
|
|
raise RuntimeError("The first mountpoint in sorted fstab entries is not '/'")
|
|
root_tree = cm.enter_context(mount(part_device, part_options))
|
|
continue
|
|
|
|
cm.enter_context(mount_at(part_device, f"{root_tree}{part_mountpoint}", options=part_options, extra=["-t", part_fstype]))
|
|
|
|
if not root_tree:
|
|
raise RuntimeError("The root filesystem tree is not mounted")
|
|
|
|
append_filesystem(report, root_tree)
|
|
|
|
|
|
def analyse_image(image):
|
|
loctl = loop.LoopControl()
|
|
|
|
imgfmt = read_image_format(image)
|
|
report = {"image-format": imgfmt}
|
|
|
|
with open_image(loctl, image, imgfmt) as (_, device):
|
|
report["bootloader"] = read_bootloader_type(device)
|
|
report.update(read_partition_table(device))
|
|
|
|
if report["partition-table"]:
|
|
append_partitions(report, device, loctl)
|
|
else:
|
|
with mount(device) as tree:
|
|
append_filesystem(report, tree)
|
|
|
|
return report
|
|
|
|
|
|
def append_directory(report, tree):
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
|
|
tree_ro = os.path.join(tmpdir, "root_ro")
|
|
os.makedirs(tree_ro)
|
|
# Make sure that the tools which analyse the directory in-place
|
|
# can not modify its content (e.g. create additional files).
|
|
# mount_at() always mounts the source as read-only!
|
|
with mount_at(tree, tree_ro, ["bind"]) as mountpoint:
|
|
if os.path.lexists(f"{tree}/ostree"):
|
|
os.makedirs(f"{tree}/etc", exist_ok=True)
|
|
with mount_at(f"{tree}/usr/etc", f"{tree}/etc", extra=["--bind"]):
|
|
append_filesystem(report, tree_ro, is_ostree=True)
|
|
else:
|
|
append_filesystem(report, tree_ro)
|
|
|
|
|
|
def append_ostree_repo(report, repo):
|
|
ostree = functools.partial(run_ostree, repo=repo)
|
|
|
|
r = ostree("config", "get", "core.mode")
|
|
report["ostree"] = {
|
|
"repo": {
|
|
"core.mode": r.stdout.strip()
|
|
}
|
|
}
|
|
|
|
r = ostree("refs")
|
|
refs = r.stdout.strip().split("\n")
|
|
report["ostree"]["refs"] = refs
|
|
|
|
resolved = {r: ostree("rev-parse", r).stdout.strip() for r in refs}
|
|
commit = resolved[refs[0]]
|
|
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
|
|
tree = os.path.join(tmpdir, "tree")
|
|
ostree("checkout", "--force-copy", commit, tree)
|
|
append_directory(report, tree)
|
|
|
|
|
|
def analyse_directory(path):
|
|
report = {}
|
|
|
|
if os.path.exists(os.path.join(path, "compose.json")):
|
|
report["type"] = "ostree/commit"
|
|
repo = os.path.join(path, "repo")
|
|
append_ostree_repo(report, repo)
|
|
elif os.path.isdir(os.path.join(path, "refs")):
|
|
report["type"] = "ostree/repo"
|
|
repo = os.path.join(path, "repo")
|
|
append_ostree_repo(report, repo)
|
|
else:
|
|
append_directory(report, path)
|
|
|
|
return report
|
|
|
|
|
|
def is_tarball(path):
|
|
mtype, encoding = mimetypes.guess_type(path)
|
|
return mtype == "application/x-tar"
|
|
|
|
|
|
def analyse_tarball(path):
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
|
|
tree = os.path.join(tmpdir, "root")
|
|
os.makedirs(tree)
|
|
command = [
|
|
"tar",
|
|
"--selinux",
|
|
"--xattrs",
|
|
"--acls",
|
|
"-x",
|
|
"--auto-compress",
|
|
"-f", path,
|
|
"-C", tree
|
|
]
|
|
subprocess.run(command,
|
|
stdout=sys.stderr,
|
|
check=True)
|
|
# gce image type contains virtual raw disk inside a tarball
|
|
if os.path.isfile(f"{tree}/disk.raw"):
|
|
return analyse_image(f"{tree}/disk.raw")
|
|
else:
|
|
return analyse_directory(tree)
|
|
|
|
|
|
def is_compressed(path):
|
|
_, encoding = mimetypes.guess_type(path)
|
|
return encoding in ["xz", "gzip", "bzip2"]
|
|
|
|
|
|
def analyse_compressed(path):
|
|
_, encoding = mimetypes.guess_type(path)
|
|
|
|
if encoding == "xz":
|
|
command = ["unxz", "--force"]
|
|
elif encoding == "gzip":
|
|
command = ["gunzip", "--force"]
|
|
elif encoding == "bzip2":
|
|
command = ["bunzip2", "--force"]
|
|
else:
|
|
raise ValueError(f"Unsupported compression: {encoding}")
|
|
|
|
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
|
|
subprocess.run(["cp", "--reflink=auto", "-a", path, tmpdir],
|
|
check=True)
|
|
|
|
files = os.listdir(tmpdir)
|
|
archive = os.path.join(tmpdir, files[0])
|
|
subprocess.run(command + [archive], check=True)
|
|
|
|
files = os.listdir(tmpdir)
|
|
assert len(files) == 1
|
|
image = os.path.join(tmpdir, files[0])
|
|
return analyse_image(image)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Inspect an image")
|
|
parser.add_argument("target", metavar="TARGET",
|
|
help="The file or directory to analyse",
|
|
type=os.path.abspath)
|
|
|
|
args = parser.parse_args()
|
|
target = args.target
|
|
|
|
if os.path.isdir(target):
|
|
report = analyse_directory(target)
|
|
elif is_tarball(target):
|
|
report = analyse_tarball(target)
|
|
elif is_compressed(target):
|
|
report = analyse_compressed(target)
|
|
else:
|
|
report = analyse_image(target)
|
|
|
|
json.dump(report, sys.stdout, sort_keys=True, indent=2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|