debian-forge-composer/tools/image-info
Tomas Hozza bd81506831 image-info: read the firewall default zone
Modify affected image manifests.

Signed-off-by: Tomas Hozza <thozza@redhat.com>
2022-04-14 19:07:31 +01:00

2735 lines
85 KiB
Python
Executable file

#!/usr/bin/python3
import argparse
import configparser
import contextlib
import errno
import functools
import glob
import mimetypes
import operator
import json
import os
import platform
import re
import stat
import subprocess
import sys
import time
import tempfile
import xml.etree.ElementTree
import yaml
from collections import OrderedDict
from typing import Dict
from osbuild import loop
def run_ostree(*args, _input=None, _check=True, **kwargs):
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
res = subprocess.run(["ostree"] + args,
encoding="utf-8",
stdout=subprocess.PIPE,
input=_input,
check=_check)
return res
@contextlib.contextmanager
def loop_create_device(ctl, fd, offset=None, sizelimit=None):
while True:
lo = loop.Loop(ctl.get_unbound())
try:
lo.set_fd(fd)
except OSError as e:
lo.close()
if e.errno == errno.EBUSY:
continue
raise e
try:
lo.set_status(offset=offset, sizelimit=sizelimit, autoclear=True)
except BlockingIOError:
lo.clear_fd()
lo.close()
continue
break
try:
yield lo
finally:
lo.close()
@contextlib.contextmanager
def loop_open(ctl, image, *, offset=None, size=None):
with open(image, "rb") as f:
fd = f.fileno()
with loop_create_device(ctl, fd, offset=offset, sizelimit=size) as lo:
yield os.path.join("/dev", lo.devname)
@contextlib.contextmanager
def open_image(ctl, image, fmt):
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
if fmt["type"] != "raw":
target = os.path.join(tmp, "image.raw")
# A bug exists in qemu that causes the conversion to raw to fail
# on aarch64 systems with a LOT of CPUs. A workaround is to use
# a single coroutine to do the conversion. It doesn't slow down
# the conversion by much, but it hangs about half the time without
# the limit set. 😢
# Bug: https://bugs.launchpad.net/qemu/+bug/1805256
if platform.machine() == 'aarch64':
subprocess.run(
["qemu-img", "convert", "-m", "1", "-O", "raw", image, target],
check=True
)
else:
subprocess.run(
["qemu-img", "convert", "-O", "raw", image, target],
check=True
)
else:
target = image
size = os.stat(target).st_size
with loop_open(ctl, target, offset=0, size=size) as dev:
yield target, dev
@contextlib.contextmanager
def mount_at(device, mountpoint, options=[], extra=[]):
opts = ",".join(["ro"] + options)
subprocess.run(["mount", "-o", opts] + extra + [device, mountpoint], check=True)
try:
yield mountpoint
finally:
subprocess.run(["umount", "--lazy", mountpoint], check=True)
@contextlib.contextmanager
def mount(device, options=None):
options = options or []
opts = ",".join(["ro"] + options)
with tempfile.TemporaryDirectory() as mountpoint:
subprocess.run(["mount", "-o", opts, device, mountpoint], check=True)
try:
yield mountpoint
finally:
subprocess.run(["umount", "--lazy", mountpoint], check=True)
def parse_environment_vars(s):
r = {}
for line in s.split("\n"):
line = line.strip()
if not line:
continue
if line[0] == '#':
continue
key, value = line.split("=", 1)
r[key] = value.strip('"')
return r
# Parses output of `systemctl list-unit-files`
def parse_unit_files(s, expected_state):
r = []
for line in s.split("\n")[1:]:
try:
unit, state, *_ = line.split()
except ValueError:
pass
if state != expected_state:
continue
r.append(unit)
return r
def subprocess_check_output(argv, parse_fn=None):
try:
output = subprocess.check_output(argv, encoding="utf-8")
except subprocess.CalledProcessError as e:
sys.stderr.write(f"--- Output from {argv}:\n")
sys.stderr.write(e.stdout)
sys.stderr.write("\n--- End of the output\n")
raise
return parse_fn(output) if parse_fn else output
def read_image_format(device):
"""
Read image format.
Returns: dictionary with at least one key 'type'. 'type' value is a string
representing the format of the image. In case the type is 'qcow2', the returned
dictionary contains second key 'compat' with a string value representing
the compatibility version of the 'qcow2' image.
An example return value:
{
"compat": "1.1",
"type": "qcow2"
}
"""
qemu = subprocess_check_output(["qemu-img", "info", "--output=json", device], json.loads)
format = qemu["format"]
result = {"type": format}
if format == "qcow2":
result["compat"] = qemu["format-specific"]["data"]["compat"]
return result
def read_partition(device, partition):
"""
Read block device attributes using 'blkid' and extend the passed 'partition'
dictionary.
Returns: the 'partition' dictionary provided as an argument, extended with
'label', 'uuid' and 'fstype' keys and their values.
"""
res = subprocess.run(["blkid", "-c", "/dev/null", "--output", "export",
device],
check=False, encoding="utf-8",
stdout=subprocess.PIPE)
if res.returncode == 0:
blkid = parse_environment_vars(res.stdout)
else:
blkid = {}
partition["label"] = blkid.get("LABEL") # doesn't exist for mbr
partition["uuid"] = blkid.get("UUID")
partition["fstype"] = blkid.get("TYPE")
return partition
def read_partition_table(device):
"""
Read information related to found partitions and partitioning table from
the device.
Returns: dictionary with three keys - 'partition-table', 'partition-table-id'
and 'partitions'.
'partition-table' value is a string with the type of the partition table or 'None'.
'partition-table-id' value is a string with the ID of the partition table or 'None'.
'partitions' value is a list of dictionaries representing found partitions.
An example return value:
{
"partition-table": "gpt",
"partition-table-id": "DA237A6F-F0D4-47DF-BB50-007E00DB347C",
"partitions": [
{
"bootable": false,
"partuuid": "64AF1EC2-0328-406A-8F36-83016E6DD858",
"size": 1048576,
"start": 1048576,
"type": "21686148-6449-6E6F-744E-656564454649",
},
{
"bootable": false,
"partuuid": "D650D523-06F6-4B90-9204-8F998FE9703C",
"size": 6442450944,
"start": 2097152,
"type": "0FC63DAF-8483-4772-8E79-3D69D8477DE4",
}
]
}
"""
partitions = []
info = {"partition-table": None,
"partition-table-id": None,
"partitions": partitions}
try:
sfdisk = subprocess_check_output(["sfdisk", "--json", device], json.loads)
except subprocess.CalledProcessError:
# This handles a case, when the device does contain a filesystem,
# but there is no partition table.
partitions.append(read_partition(device, {}))
return info
ptable = sfdisk["partitiontable"]
assert ptable["unit"] == "sectors"
is_dos = ptable["label"] == "dos"
ssize = ptable.get("sectorsize", 512)
for i, p in enumerate(ptable["partitions"]):
partuuid = p.get("uuid")
if not partuuid and is_dos:
# For dos/mbr partition layouts the partition uuid
# is generated. Normally this would be done by
# udev+blkid, when the partition table is scanned.
# 'sfdisk' prefixes the partition id with '0x' but
# 'blkid' does not; remove it to mimic 'blkid'
table_id = ptable['id'][2:]
partuuid = "%.33s-%02x" % (table_id, i+1)
partitions.append({
"bootable": p.get("bootable", False),
"type": p["type"],
"start": p["start"] * ssize,
"size": p["size"] * ssize,
"partuuid": partuuid
})
info["partitions"] = sorted(info["partitions"], key=operator.itemgetter("partuuid"))
info["partition-table"] = ptable["label"]
info["partition-table-id"] = ptable["id"]
return info
def read_bootloader_type(device):
"""
Read bootloader type from the provided device.
Returns: string representing the found bootloader. Function can return two values:
- 'grub'
- 'unknown'
"""
with open(device, "rb") as f:
if b"GRUB" in f.read(512):
return "grub"
else:
return "unknown"
def read_boot_entries(boot_dir):
"""
Read boot entries.
Returns: list of dictionaries representing configured boot entries.
An example return value:
[
{
"grub_arg": "--unrestricted",
"grub_class": "kernel",
"grub_users": "$grub_users",
"id": "rhel-20210429130346-0-rescue-c116920b13f44c59846f90b1057605bc",
"initrd": "/boot/initramfs-0-rescue-c116920b13f44c59846f90b1057605bc.img",
"linux": "/boot/vmlinuz-0-rescue-c116920b13f44c59846f90b1057605bc",
"options": "$kernelopts",
"title": "Red Hat Enterprise Linux (0-rescue-c116920b13f44c59846f90b1057605bc) 8.4 (Ootpa)",
"version": "0-rescue-c116920b13f44c59846f90b1057605bc"
},
{
"grub_arg": "--unrestricted",
"grub_class": "kernel",
"grub_users": "$grub_users",
"id": "rhel-20210429130346-4.18.0-305.el8.x86_64",
"initrd": "/boot/initramfs-4.18.0-305.el8.x86_64.img $tuned_initrd",
"linux": "/boot/vmlinuz-4.18.0-305.el8.x86_64",
"options": "$kernelopts $tuned_params",
"title": "Red Hat Enterprise Linux (4.18.0-305.el8.x86_64) 8.4 (Ootpa)",
"version": "4.18.0-305.el8.x86_64"
}
]
"""
entries = []
for conf in glob.glob(f"{boot_dir}/loader/entries/*.conf"):
with open(conf) as f:
entries.append(dict(line.strip().split(" ", 1) for line in f))
return sorted(entries, key=lambda e: e["title"])
def rpm_verify(tree):
"""
Read the output of 'rpm --verify'.
Returns: dictionary with two keys 'changed' and 'missing'.
'changed' value is a dictionary with the keys representing modified files from
installed RPM packages and values representing types of applied modifications.
'missing' value is a list of strings prepresenting missing values owned by
installed RPM packages.
An example return value:
{
"changed": {
"/etc/chrony.conf": "S.5....T.",
"/etc/cloud/cloud.cfg": "S.5....T.",
"/etc/nsswitch.conf": "....L....",
"/etc/openldap/ldap.conf": ".......T.",
"/etc/pam.d/fingerprint-auth": "....L....",
"/etc/pam.d/password-auth": "....L....",
"/etc/pam.d/postlogin": "....L....",
"/etc/pam.d/smartcard-auth": "....L....",
"/etc/pam.d/system-auth": "....L....",
"/etc/rhsm/rhsm.conf": "..5....T.",
"/etc/sudoers": "S.5....T.",
"/etc/systemd/logind.conf": "S.5....T."
},
"missing": [
"/etc/udev/rules.d/70-persistent-ipoib.rules",
"/run/cloud-init",
"/run/rpcbind",
"/run/setrans",
"/run/tuned"
]
}
"""
# cannot use `rpm --root` here, because rpm uses passwd from the host to
# verify user and group ownership:
# https://github.com/rpm-software-management/rpm/issues/882
rpm = subprocess.Popen(["chroot", tree, "rpm", "--verify", "--all"],
stdout=subprocess.PIPE, encoding="utf-8")
changed = {}
missing = []
for line in rpm.stdout:
# format description in rpm(8), under `--verify`
attrs = line[:9]
if attrs == "missing ":
missing.append(line[12:].rstrip())
else:
changed[line[13:].rstrip()] = attrs
# ignore return value, because it returns non-zero when it found changes
rpm.wait()
return {
"missing": sorted(missing),
"changed": changed
}
def rpm_not_installed_docs(tree, is_ostree):
"""
Gathers information on documentation, which is part of RPM packages,
but was not installed.
Returns: list of documentation files, which are normally a part of
the installed RPM packages, but were not installed (e.g. due to using
'--excludedocs' option when executing 'rpm' command).
An example return value:
[
"/usr/share/man/man1/sdiff.1.gz",
"/usr/share/man/man1/seapplet.1.gz",
"/usr/share/man/man1/secon.1.gz",
"/usr/share/man/man1/secret-tool.1.gz",
"/usr/share/man/man1/sed.1.gz",
"/usr/share/man/man1/seq.1.gz"
]
"""
# check not installed Docs (e.g. when RPMs are installed with --excludedocs)
not_installed_docs = []
cmd = ["rpm", "--root", tree, "-qad", "--state"]
if is_ostree:
cmd += ["--dbpath", "/usr/share/rpm"]
output = subprocess_check_output(cmd)
for line in output.splitlines():
if line.startswith("not installed"):
not_installed_docs.append(line.split()[-1])
return sorted(not_installed_docs)
def rpm_packages(tree, is_ostree):
"""
Read NVRs of RPM packages installed on the system.
Returns: sorted list of strings representing RPM packages installed
on the system.
An example return value:
[
"NetworkManager-1.30.0-7.el8.x86_64",
"PackageKit-glib-1.1.12-6.el8.x86_64",
"PackageKit-gtk3-module-1.1.12-6.el8.x86_64",
"abattis-cantarell-fonts-0.0.25-6.el8.noarch",
"acl-2.2.53-1.el8.x86_64",
"adobe-mappings-cmap-20171205-3.el8.noarch",
"adobe-mappings-cmap-deprecated-20171205-3.el8.noarch",
"adobe-mappings-pdf-20180407-1.el8.noarch",
"adwaita-cursor-theme-3.28.0-2.el8.noarch",
"adwaita-icon-theme-3.28.0-2.el8.noarch",
"alsa-lib-1.2.4-5.el8.x86_64"
]
"""
cmd = ["rpm", "--root", tree, "-qa"]
if is_ostree:
cmd += ["--dbpath", "/usr/share/rpm"]
pkgs = subprocess_check_output(cmd, str.split)
return list(sorted(pkgs))
@contextlib.contextmanager
def change_root(root):
real_root = os.open("/", os.O_RDONLY)
try:
os.chroot(root)
yield None
finally:
os.fchdir(real_root)
os.chroot(".")
os.close(real_root)
def read_services(tree, state):
"""
Read the list of systemd services on the system in the given state.
Returns: alphabetically sorted list of strings representing systemd services
in the given state.
The returned list may be empty.
An example return value:
[
"arp-ethers.service",
"canberra-system-bootup.service",
"canberra-system-shutdown-reboot.service",
"canberra-system-shutdown.service",
"chrony-dnssrv@.timer",
"chrony-wait.service"
]
"""
services_state = subprocess_check_output(["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, state)))
# Since systemd v246, some services previously reported as "enabled" /
# "disabled" are now reported as "alias". There is no systemd command, that
# would take an "alias" unit and report its state as enabled/disabled
# and could run on a different tree (with "--root" option).
# To make the produced list of services in the given state consistent on
# pre/post v246 systemd versions, check all "alias" units and append them
# to the list, if their target is also listed in 'services_state'.
if state != "alias":
services_alias = subprocess_check_output(["systemctl", f"--root={tree}", "list-unit-files"], (lambda s: parse_unit_files(s, "alias")))
for alias in services_alias:
# The service may be in one of the following places (output of
# "systemd-analyze unit-paths", it should not change too often).
unit_paths = [
"/etc/systemd/system.control",
"/run/systemd/system.control",
"/run/systemd/transient",
"/run/systemd/generator.early",
"/etc/systemd/system",
"/run/systemd/system",
"/run/systemd/generator",
"/usr/local/lib/systemd/system",
"/usr/lib/systemd/system",
"/run/systemd/generator.late"
]
with change_root(tree):
for path in unit_paths:
unit_path = os.path.join(path, alias)
if os.path.exists(unit_path):
real_unit_path = os.path.realpath(unit_path)
# Skip the alias, if there was a symlink cycle.
# When symbolic link cycles occur, the returned path will
# be one member of the cycle, but no guarantee is made about
# which member that will be.
if os.path.islink(real_unit_path):
continue
# Append the alias unit to the list, if its target is
# already there.
if os.path.basename(real_unit_path) in services_state:
services_state.append(alias)
# deduplicate and sort
services_state = list(set(services_state))
services_state.sort()
return services_state
def read_default_target(tree):
"""
Read the default systemd target.
Returns: string representing the default systemd target.
An example return value:
"multi-user.target"
"""
return subprocess_check_output(["systemctl", f"--root={tree}", "get-default"]).rstrip()
def read_firewall_default_zone(tree):
"""
Read the name of the default firewall zone
Returns: a string with the zone name. If the firewall configuration doesn't
exist, an empty string is returned.
An example return value:
"trusted"
"""
try:
with open(f"{tree}/etc/firewalld/firewalld.conf") as f:
conf = parse_environment_vars(f.read())
return conf["DefaultZone"]
except FileNotFoundError:
return ""
def read_firewall_zone(tree):
"""
Read enabled services from the configuration of the default firewall zone.
Returns: list of strings representing enabled services in the firewall.
The returned list may be empty.
An example return value:
[
"ssh",
"dhcpv6-client",
"cockpit"
]
"""
default = read_firewall_default_zone(tree)
if default == "":
default = "public"
r = []
try:
root = xml.etree.ElementTree.parse(f"{tree}/etc/firewalld/zones/{default}.xml").getroot()
except FileNotFoundError:
root = xml.etree.ElementTree.parse(f"{tree}/usr/lib/firewalld/zones/{default}.xml").getroot()
for element in root.findall("service"):
r.append(element.get("name"))
return r
def read_fstab(tree):
"""
Read the content of /etc/fstab.
Returns: list of all uncommented lines read from the configuration file
represented as a list of values split by whitespaces.
The returned list may be empty.
An example return value:
[
[
"UUID=6d066eb4-e4c1-4472-91f9-d167097f48d1",
"/",
"xfs",
"defaults",
"0",
"0"
]
]
"""
result = []
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/fstab") as f:
result = sorted([line.split() for line in f if line.strip() and not line.startswith("#")])
return result
def read_rhsm(tree):
"""
Read configuration changes possible via org.osbuild.rhsm stage
and in addition also the whole content of /etc/rhsm/rhsm.conf.
Returns: returns dictionary with two keys - 'dnf-plugins' and 'rhsm.conf'.
'dnf-plugins' value represents configuration of 'product-id' and
'subscription-manager' DNF plugins.
'rhsm.conf' value is a dictionary representing the content of the RHSM
configuration file.
The returned dictionary may be empty.
An example return value:
{
"dnf-plugins": {
"product-id": {
"enabled": true
},
"subscription-manager": {
"enabled": true
}
},
"rhsm.conf": {
"logging": {
"default_log_level": "INFO"
},
"rhsm": {
"auto_enable_yum_plugins": "1",
"baseurl": "https://cdn.redhat.com",
"ca_cert_dir": "/etc/rhsm/ca/",
"consumercertdir": "/etc/pki/consumer",
"entitlementcertdir": "/etc/pki/entitlement",
"full_refresh_on_yum": "0",
"inotify": "1",
"manage_repos": "0",
"package_profile_on_trans": "0",
"pluginconfdir": "/etc/rhsm/pluginconf.d",
"plugindir": "/usr/share/rhsm-plugins",
"productcertdir": "/etc/pki/product",
"repo_ca_cert": "/etc/rhsm/ca/redhat-uep.pem",
"repomd_gpg_url": "",
"report_package_profile": "1"
},
"rhsmcertd": {
"auto_registration": "1",
"auto_registration_interval": "60",
"autoattachinterval": "1440",
"certcheckinterval": "240",
"disable": "0",
"splay": "1"
},
"server": {
"hostname": "subscription.rhsm.redhat.com",
"insecure": "0",
"no_proxy": "",
"port": "443",
"prefix": "/subscription",
"proxy_hostname": "",
"proxy_password": "",
"proxy_port": "",
"proxy_scheme": "http",
"proxy_user": "",
"ssl_verify_depth": "3"
}
}
}
"""
result = {}
# Check RHSM DNF plugins configuration and allowed options
dnf_plugins_config = {
"product-id": f"{tree}/etc/dnf/plugins/product-id.conf",
"subscription-manager": f"{tree}/etc/dnf/plugins/subscription-manager.conf"
}
for plugin_name, plugin_path in dnf_plugins_config.items():
with contextlib.suppress(FileNotFoundError):
with open(plugin_path) as f:
parser = configparser.ConfigParser()
parser.read_file(f)
# only read "enabled" option from "main" section
with contextlib.suppress(configparser.NoSectionError, configparser.NoOptionError):
# get the value as the first thing, in case it raises an exception
enabled = parser.getboolean("main", "enabled")
try:
dnf_plugins_dict = result["dnf-plugins"]
except KeyError as _:
dnf_plugins_dict = result["dnf-plugins"] = {}
try:
plugin_dict = dnf_plugins_dict[plugin_name]
except KeyError as _:
plugin_dict = dnf_plugins_dict[plugin_name] = {}
plugin_dict["enabled"] = enabled
with contextlib.suppress(FileNotFoundError):
rhsm_conf = {}
with open(f"{tree}/etc/rhsm/rhsm.conf") as f:
parser = configparser.ConfigParser()
parser.read_file(f)
for section in parser.sections():
section_dict = {}
section_dict.update(parser[section])
if section_dict:
rhsm_conf[section] = section_dict
result["rhsm.conf"] = rhsm_conf
return result
def read_sysconfig(tree):
"""
Read selected configuration files from /etc/sysconfig.
Currently supported sysconfig files are:
- 'kernel' - /etc/sysconfig/kernel
- 'network' - /etc/sysconfig/network
- 'network-scripts' - /etc/sysconfig/network-scripts/ifcfg-*
Returns: dictionary with the keys being the supported types of sysconfig
configurations read by the function. Values of 'kernel' and 'network' keys
are a dictionaries containing key/values read from the respective
configuration files. Value of 'network-scripts' key is a dictionary with
the keys corresponding to the suffix of each 'ifcfg-*' configuration file
and their values holding dictionaries with all key/values read from the
configuration file.
The returned dictionary may be empty.
An example return value:
{
"kernel": {
"DEFAULTKERNEL": "kernel",
"UPDATEDEFAULT": "yes"
},
"network": {
"NETWORKING": "yes",
"NOZEROCONF": "yes"
},
"network-scripts": {
"ens3": {
"BOOTPROTO": "dhcp",
"BROWSER_ONLY": "no",
"DEFROUTE": "yes",
"DEVICE": "ens3",
"IPV4_FAILURE_FATAL": "no",
"IPV6INIT": "yes",
"IPV6_AUTOCONF": "yes",
"IPV6_DEFROUTE": "yes",
"IPV6_FAILURE_FATAL": "no",
"NAME": "ens3",
"ONBOOT": "yes",
"PROXY_METHOD": "none",
"TYPE": "Ethernet",
"UUID": "106f1b31-7093-41d6-ae47-1201710d0447"
},
"eth0": {
"BOOTPROTO": "dhcp",
"DEVICE": "eth0",
"IPV6INIT": "no",
"ONBOOT": "yes",
"PEERDNS": "yes",
"TYPE": "Ethernet",
"USERCTL": "yes"
}
}
}
"""
result = {}
sysconfig_paths = {
"kernel": f"{tree}/etc/sysconfig/kernel",
"network": f"{tree}/etc/sysconfig/network"
}
# iterate through supported configs
for name, path in sysconfig_paths.items():
with contextlib.suppress(FileNotFoundError):
with open(path) as f:
# if file exists start with empty array of values
result[name] = parse_environment_vars(f.read())
# iterate through all files in /etc/sysconfig/network-scripts
network_scripts = {}
files = glob.glob(f"{tree}/etc/sysconfig/network-scripts/ifcfg-*")
for file in files:
ifname = os.path.basename(file).lstrip("ifcfg-")
with open(file) as f:
network_scripts[ifname] = parse_environment_vars(f.read())
if network_scripts:
result["network-scripts"] = network_scripts
return result
def read_hosts(tree):
"""
Read non-empty lines of /etc/hosts.
Returns: list of strings for all uncommented lines in the configuration file.
The returned list may be empty.
An example return value:
[
"127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4",
"::1 localhost localhost.localdomain localhost6 localhost6.localdomain6"
]
"""
result = []
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/hosts") as f:
for line in f:
line = line.strip()
if line:
result.append(line)
return result
def read_logind_config(config_path):
"""
Read all uncommented key/values from the 'Login" section of system-logind
configuration file.
Returns: dictionary with key/values read from the configuration file.
The returned dictionary may be empty.
An example return value:
{
"NAutoVTs": "0"
}
"""
result = {}
with open(config_path) as f:
parser = configparser.RawConfigParser()
# prevent conversion of the option name to lowercase
parser.optionxform = lambda option: option
parser.read_file(f)
with contextlib.suppress(configparser.NoSectionError):
result.update(parser["Login"])
return result
def read_logind_configs(tree):
"""
Read all systemd-logind *.conf files from a predefined list of paths and
parse them.
The searched paths are:
- "/etc/systemd/logind.conf"
- "/etc/systemd/logind.conf.d/*.conf"
- "/usr/lib/systemd/logind.conf.d/*.conf"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by 'read_logind_config()'.
An example return value:
{
"/etc/systemd/logind.conf": {
"NAutoVTs": "0"
}
}
"""
checked_globs = [
"/etc/systemd/logind.conf",
"/etc/systemd/logind.conf.d/*.conf",
"/usr/lib/systemd/logind.conf.d/*.conf"
]
return _read_glob_paths_with_parser(tree, checked_globs, read_logind_config)
def read_locale(tree):
"""
Read all uncommented key/values set in /etc/locale.conf.
Returns: dictionary with key/values read from the configuration file.
The returned dictionary may be empty.
An example return value:
{
"LANG": "en_US"
}
"""
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/locale.conf") as f:
return parse_environment_vars(f.read())
def read_selinux_info(tree, is_ostree):
"""
Read information related to SELinux.
Returns: dictionary with two keys - 'policy' and 'context-mismatch'.
'policy' value corresponds to the value returned by read_selinux_conf().
'context-mismatch' value corresponds to the value returned by
read_selinux_ctx_mismatch().
The returned dictionary may be empty. Keys with empty values are omitted.
An example return value:
{
"context-mismatch": [
{
"actual": "system_u:object_r:root_t:s0",
"expected": "system_u:object_r:device_t:s0",
"filename": "/dev"
},
{
"actual": "system_u:object_r:root_t:s0",
"expected": "system_u:object_r:default_t:s0",
"filename": "/proc"
}
],
"policy": {
"SELINUX": "permissive",
"SELINUXTYPE": "targeted"
}
}
"""
result = {}
policy = read_selinux_conf(tree)
if policy:
result["policy"] = policy
with contextlib.suppress(subprocess.CalledProcessError):
ctx_mismatch = read_selinux_ctx_mismatch(tree, is_ostree)
if ctx_mismatch:
result["context-mismatch"] = ctx_mismatch
return result
def read_selinux_conf(tree):
"""
Read all uncommented key/values set in /etc/selinux/config.
Returns: dictionary with key/values read from the configuration
file.
An example of returned value:
{
"SELINUX": "enforcing",
"SELINUXTYPE": "targeted"
}
"""
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/selinux/config") as f:
return parse_environment_vars(f.read())
def read_selinux_ctx_mismatch(tree, is_ostree):
"""
Read any mismatch in selinux context of files on the image.
Returns: list of dictionaries as described below. If there
are no mismatches between used and expected selinux context,
then an empty list is returned.
If the checked 'tree' is ostree, then the path '/etc' is
excluded from the check. This is beause it is bind-mounted
from /usr/etc and therefore has incorrect selinux context
for its filesystem path.
An example of returned value:
[
{
"actual": "system_u:object_r:root_t:s0",
"expected": "system_u:object_r:device_t:s0",
"filename": "/dev"
},
{
"actual": "system_u:object_r:root_t:s0",
"expected": "system_u:object_r:default_t:s0",
"filename": "/proc"
}
]
"""
result = []
# The binary policy that should be used is on the image and has name "policy.X"
# where the "X" is a number. There may be more than one policy files.
# In the usual case, the policy with the highest number suffix should be used.
policy_files = glob.glob(f"{tree}/etc/selinux/targeted/policy/policy.*")
policy_files = sorted(policy_files, reverse=True)
if policy_files:
CMD = [
"setfiles",
"-r", f"{tree}",
"-nvF",
"-c", policy_files[0], # take the policy with the highest number
f"{tree}/etc/selinux/targeted/contexts/files/file_contexts",
f"{tree}"
]
if is_ostree:
# exclude /etc from being checked when the tree is ostree, because
# it is just bind-mounted from /usr/etc and has incorrect selinux
# context for /etc path
CMD.extend(["-e", f"{tree}/etc"])
output = subprocess_check_output(CMD)
# output are lines such as:
# Would relabel /tmp/tmpwrozmb47/dev from system_u:object_r:root_t:s0 to system_u:object_r:device_t:s0\n
setfiles_pattern = r"Would\s+relabel\s+(?P<filename>.+)\s+from\s+(?P<actual>.+)\s+to\s+(?P<expected>.+)"
setfiles_re = re.compile(setfiles_pattern)
for line in output.splitlines():
line = line.strip()
if not line:
continue
match = setfiles_re.match(line)
# do not silently ignore changes of 'setfiles' output
if not match:
raise RuntimeError(f"could not match line '{line}' with pattern '{setfiles_pattern}'")
parsed_line = {
"filename": match.group("filename")[len(tree):],
"actual": match.group("actual"),
"expected": match.group("expected")
}
result.append(parsed_line)
# sort the list to make it consistent across runs
result.sort(key=lambda x: x.get("filename"))
return result
def _read_glob_paths_with_parser(tree, glob_paths, parser_func):
"""
Use 'parser_func' to read all files obtained by using all 'glob_paths'
globbing patterns under the 'tree' path.
The 'glob_paths' is a list string patterns accepted by glob.glob().
The 'parser_func' function is expected to take a single string argument
containing the absolute path to a configuration file which should be parsed.
Its return value can be arbitrary representation of the parsed
configuration.
Returns: dictionary with the keys corresponding to directories, which
contain configuration files mathing the provided glob pattern. Value of
each key is another dictionary with keys representing each filename and
values being the parsed configuration representation as returned by the
provided 'parser_func' function.
An example return value for dracut configuration paths and parser:
{
"/etc/dracut.conf.d": {
"sgdisk.conf": {
"install_items": " sgdisk "
},
},
"/usr/lib/dracut/dracut.conf.d": {
"xen.conf": {
"add_drivers": " xen-netfront xen-blkfront "
}
}
}
"""
result = {}
for glob_path in glob_paths:
glob_path_result = {}
files = glob.glob(f"{tree}{glob_path}")
for file in files:
config = parser_func(file)
if config:
filename = os.path.basename(file)
glob_path_result[filename] = config
if glob_path_result:
checked_path = os.path.dirname(glob_path)
result[checked_path] = glob_path_result
return result
def read_modprobe_config(config_path):
"""
Read a specific modprobe configuragion file and for now, extract only
blacklisted kernel modules.
Returns: dictionary with the keys corresponding to specific modprobe
commands and values being the values of these commands.
An example return value:
{
"blacklist": [
"nouveau"
]
}
"""
file_result = {}
BLACKLIST_CMD = "blacklist"
with open(config_path) as f:
# The format of files under modprobe.d: one command per line,
# with blank lines and lines starting with '#' ignored.
# A '\' at the end of a line causes it to continue on the next line.
line_to_be_continued = ""
for line in f:
line = line.strip()
# line is not blank
if line:
# comment, skip it
if line[0] == "#":
continue
# this line continues on the following line
if line[-1] == "\\":
line_to_be_continued += line[:-1]
continue
# this line ends here
else:
# is this line continuation of the previous one?
if line_to_be_continued:
line = line_to_be_continued + line
line_to_be_continued = ""
cmd, cmd_args = line.split(' ', 1)
# we care only about blacklist command for now
if cmd == BLACKLIST_CMD:
modules_list = file_result[BLACKLIST_CMD] = []
modules_list.append(cmd_args)
return file_result
def read_modprobe_configs(tree):
"""
Read all modprobe *.conf files from a predefined list of paths and extract
supported commands. For now, extract only blacklisted kernel modules.
The searched paths are:
- "/etc/modprobe.d/*.conf"
- "/usr/lib/modprobe.d/*.conf"
- "/usr/local/lib/modprobe.d/*.conf"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by 'read_modprobe_config()'.
An example return value:
{
"/usr/lib/modprobe.d": {
"blacklist-nouveau.conf": {
"blacklist": [
"nouveau"
]
}
}
}
"""
checked_globs = [
"/etc/modprobe.d/*.conf",
"/usr/lib/modprobe.d/*.conf",
"/usr/local/lib/modprobe.d/*.conf"
]
return _read_glob_paths_with_parser(tree, checked_globs, read_modprobe_config)
def read_cloud_init_config(config_path):
"""
Read the specific cloud-init configuration file.
Returns: dictionary representing the cloud-init configuration.
An example return value:
{
"cloud_config_modules": [
"mounts",
"locale",
"set-passwords",
"rh_subscription",
"yum-add-repo",
"package-update-upgrade-install",
"timezone",
"puppet",
"chef",
"salt-minion",
"mcollective",
"disable-ec2-metadata",
"runcmd"
],
"cloud_final_modules": [
"rightscale_userdata",
"scripts-per-once",
"scripts-per-boot",
"scripts-per-instance",
"scripts-user",
"ssh-authkey-fingerprints",
"keys-to-console",
"phone-home",
"final-message",
"power-state-change"
],
"cloud_init_modules": [
"disk_setup",
"migrator",
"bootcmd",
"write-files",
"growpart",
"resizefs",
"set_hostname",
"update_hostname",
"update_etc_hosts",
"rsyslog",
"users-groups",
"ssh"
],
"disable_root": 1,
"disable_vmware_customization": false,
"mount_default_fields": [
null,
null,
"auto",
"defaults,nofail,x-systemd.requires=cloud-init.service",
"0",
"2"
],
"resize_rootfs_tmp": "/dev",
"ssh_deletekeys": 1,
"ssh_genkeytypes": null,
"ssh_pwauth": 0,
"syslog_fix_perms": null,
"system_info": {
"default_user": {
"gecos": "Cloud User",
"groups": [
"adm",
"systemd-journal"
],
"lock_passwd": true,
"name": "ec2-user",
"shell": "/bin/bash",
"sudo": [
"ALL=(ALL) NOPASSWD:ALL"
]
},
"distro": "rhel",
"paths": {
"cloud_dir": "/var/lib/cloud",
"templates_dir": "/etc/cloud/templates"
},
"ssh_svcname": "sshd"
},
"users": [
"default"
]
}
"""
result = {}
with contextlib.suppress(FileNotFoundError):
with open(config_path) as f:
config = yaml.safe_load(f)
result.update(config)
return result
def read_cloud_init_configs(tree):
"""
Read all cloud-init *.cfg files from a predefined list of paths and parse them.
The searched paths are:
- "/etc/cloud/cloud.cfg"
- "/etc/cloud/cloud.cfg.d/*.cfg"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by 'read_cloud_init_config()'.
An example return value:
{
"/etc/cloud.cfg.d":
"ec2.cfg": {
"default_user": {
"name": "ec2-user"
}
}
}
}
"""
checked_globs = [
"/etc/cloud/cloud.cfg",
"/etc/cloud/cloud.cfg.d/*.cfg"
]
return _read_glob_paths_with_parser(tree, checked_globs, read_cloud_init_config)
def read_dracut_config(config_path):
"""
Read specific dracut configuration file.
Returns: dictionary representing the uncommented configuration options read
from the file.
An example return value:
{
"install_items": " sgdisk "
"add_drivers": " xen-netfront xen-blkfront "
}
"""
result = {}
with open(config_path) as f:
# dracut configuration key/values delimiter is '=' or '+='
for line in f:
line = line.strip()
# A '#' indicates the beginning of a comment; following
# characters, up to the end of the line are not interpreted.
line_comment = line.split("#", 1)
line = line_comment[0]
if line:
key, value = line.split("=", 1)
if key[-1] == "+":
key = key[:-1]
result[key] = value.strip('"')
return result
def read_dracut_configs(tree):
"""
Read all dracut *.conf files from a predefined list of paths and parse them.
The searched paths are:
- "/etc/dracut.conf.d/*.conf"
- "/usr/lib/dracut/dracut.conf.d/*.conf"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by 'read_dracut_config()'.
An example return value:
{
"/etc/dracut.conf.d": {
"sgdisk.conf": {
"install_items": " sgdisk "
},
},
"/usr/lib/dracut/dracut.conf.d": {
"xen.conf": {
"add_drivers": " xen-netfront xen-blkfront "
}
}
}
"""
checked_globs = [
"/etc/dracut.conf.d/*.conf",
"/usr/lib/dracut/dracut.conf.d/*.conf"
]
return _read_glob_paths_with_parser(tree, checked_globs, read_dracut_config)
def read_keyboard_conf(tree):
"""
Read keyboard configuration for vconsole and X11.
Returns: dictionary with at most two keys 'X11' and 'vconsole'.
'vconsole' value is a dictionary representing configuration read from
/etc/vconsole.conf.
'X11' value is a dictionary with at most two keys 'layout' and 'variant',
which values are extracted from X11 keyborad configuration.
An example return value:
{
"X11": {
"layout": "us"
},
"vconsole": {
"FONT": "eurlatgr",
"KEYMAP": "us"
}
}
"""
result = {}
# read virtual console configuration
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/vconsole.conf") as f:
values = parse_environment_vars(f.read())
if values:
result["vconsole"] = values
# read X11 keyboard configuration
with contextlib.suppress(FileNotFoundError):
# Example file content:
#
# Section "InputClass"
# Identifier "system-keyboard"
# MatchIsKeyboard "on"
# Option "XkbLayout" "us,sk"
# Option "XkbVariant" ",qwerty"
# EndSection
x11_config = {}
match_options_dict = {
"layout": r'Section\s+"InputClass"\s+.*Option\s+"XkbLayout"\s+"([\w,-]+)"\s+.*EndSection',
"variant": r'Section\s+"InputClass"\s+.*Option\s+"XkbVariant"\s+"([\w,-]+)"\s+.*EndSection'
}
with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf") as f:
config = f.read()
for option, pattern in match_options_dict.items():
match = re.search(pattern, config, re.DOTALL)
if match and match.group(1):
x11_config[option] = match.group(1)
if x11_config:
result["X11"] = x11_config
return result
def read_chrony_conf(tree):
"""
Read specific directives from Chrony configuration. Currently parsed
directives are:
- 'server'
- 'pool'
- 'peer'
- 'leapsectz'
Returns: dictionary with the keys representing parsed directives from Chrony
configuration. Value of each key is a list of strings containing arguments
provided with each occurance of the directive in the configuration.
An example return value:
{
"leapsectz": [
"right/UTC"
],
"pool": [
"2.rhel.pool.ntp.org iburst"
],
"server": [
"169.254.169.123 prefer iburst minpoll 4 maxpoll 4"
]
}
"""
result = {}
parsed_directives = ["server", "pool", "peer", "leapsectz"]
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/chrony.conf") as f:
for line in f:
line = line.strip()
if not line:
continue
# skip comments
if line[0] in ["!", ";", "#", "%"]:
continue
split_line = line.split()
if split_line[0] in parsed_directives:
try:
directive_list = result[split_line[0]]
except KeyError:
directive_list = result[split_line[0]] = []
directive_list.append(" ".join(split_line[1:]))
return result
def read_systemd_service_dropin(dropin_dir_path):
"""
Read systemd .service unit drop-in configurations.
Returns: dictionary representing the combined drop-in configurations.
An example return value:
{
"Service": {
"Environment": "NM_CLOUD_SETUP_EC2=yes"
}
}
"""
# read all unit drop-in configurations
config_files = glob.glob(f"{dropin_dir_path}/*.conf")
parser = configparser.RawConfigParser()
# prevent conversion of the opion name to lowercase
parser.optionxform = lambda option: option
parser.read(config_files)
dropin_config = {}
for section in parser.sections():
section_config = {}
section_config.update(parser[section])
if section_config:
dropin_config[section] = section_config
return dropin_config
def read_systemd_service_dropins(tree):
"""
Read all systemd .service unit config files from a predefined list of paths
and parse them.
The searched paths are:
- "/etc/systemd/system/*.service.d"
- "/usr/lib/systemd/system/*.service.d"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by 'read_systemd_service_dropin()'.
An example return value:
{
"/etc/systemd/system": {
"nm-cloud-setup.service.d": {
"Service": {
"Environment": "NM_CLOUD_SETUP_EC2=yes"
}
}
}
}
"""
checked_globs = [
"/etc/systemd/system/*.service.d",
"/usr/lib/systemd/system/*.service.d"
]
return _read_glob_paths_with_parser(tree, checked_globs, read_systemd_service_dropin)
def read_tmpfilesd_config(config_path):
"""
Read tmpfiles.d configuration files.
Returns: list of strings representing uncommented lines read from the
configuration file.
An example return value:
[
"x /tmp/.sap*",
"x /tmp/.hdb*lock",
"x /tmp/.trex*lock"
]
"""
file_lines = []
with open(config_path) as f:
for line in f:
line = line.strip()
if not line:
continue
if line[0] == "#":
continue
file_lines.append(line)
return file_lines
def read_tmpfilesd_configs(tree):
"""
Read all tmpfiles.d *.conf files from a predefined list of paths and parse
them.
The searched paths are:
- "/etc/tmpfiles.d/*.conf"
- "/usr/lib/tmpfiles.d/*.conf"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by 'read_tmpfilesd_config()'.
An example return value:
{
"/etc/tmpfiles.d": {
"sap.conf": [
"x /tmp/.sap*",
"x /tmp/.hdb*lock",
"x /tmp/.trex*lock"
]
}
}
"""
checked_globs = [
"/etc/tmpfiles.d/*.conf",
"/usr/lib/tmpfiles.d/*.conf"
]
return _read_glob_paths_with_parser(tree, checked_globs, read_tmpfilesd_config)
def read_tuned_profile(tree):
"""
Read the Tuned active profile and profile mode.
Returns: dictionary with at most two keys 'active_profile' and 'profile_mode'.
Value of each key is a string representing respective tuned configuration
value.
An example return value:
{
"active_profile": "sap-hana",
"profile_mode": "manual"
}
"""
result = {}
config_files = ["active_profile", "profile_mode"]
with contextlib.suppress(FileNotFoundError):
for config_file in config_files:
with open(f"{tree}/etc/tuned/{config_file}") as f:
value = f.read()
value = value.strip()
if value:
result[config_file] = value
return result
def read_sysctld_config(config_path):
"""
Read sysctl configuration file.
Returns: list of strings representing uncommented lines read from the
configuration file.
An example return value:
[
"kernel.pid_max = 4194304",
"vm.max_map_count = 2147483647"
]
"""
values = []
with open(config_path) as f:
for line in f:
line = line.strip()
if not line:
continue
# skip comments
if line[0] in ["#", ";"]:
continue
values.append(line)
return values
def read_sysctld_configs(tree):
"""
Read all sysctl.d *.conf files from a predefined list of paths and parse
them.
The searched paths are:
- "/etc/sysctl.d/*.conf",
- "/usr/lib/sysctl.d/*.conf"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by 'read_sysctld_config()'.
An example return value:
{
"/etc/sysctl.d": {
"sap.conf": [
"kernel.pid_max = 4194304",
"vm.max_map_count = 2147483647"
]
}
}
"""
checked_globs = [
"/etc/sysctl.d/*.conf",
"/usr/lib/sysctl.d/*.conf"
]
return _read_glob_paths_with_parser(tree, checked_globs, read_sysctld_config)
def read_security_limits_config(config_path):
"""
Read all configuration files from /etc/security/limits.d.
Returns: dictionary with the keys representing names of configuration files
from /etc/security/limits.d. Value of each key is a dictionary representing
uncommented configuration values read from the configuration file.
An example return value:
[
{
"domain": "@sapsys",
"item": "nofile",
"type": "hard",
"value": "65536"
},
{
"domain": "@sapsys",
"item": "nofile",
"type": "soft",
"value": "65536"
}
]
"""
values = []
with open(config_path) as f:
for line in f:
line = line.strip()
# the '#' character introduces a comment - after which the rest of the line is ignored
split_line = line.split("#", 1)
line = split_line[0]
if not line:
continue
# Syntax of a line is "<domain> <type> <item> <value>"
domain, limit_type, item, value = line.split()
values.append({
"domain": domain,
"type": limit_type,
"item": item,
"value": value
})
return values
def read_security_limits_configs(tree):
"""
Read all security limits *.conf files from a predefined list of paths and
parse them.
The searched paths are:
- "/etc/security/limits.conf"
- "/etc/security/limits.d/*.conf"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by 'read_security_limits_config()'.
An example return value:
{
"/etc/security/limits.d": {
"99-sap.conf": [
{
"domain": "@sapsys",
"item": "nofile",
"type": "hard",
"value": "65536"
},
{
"domain": "@sapsys",
"item": "nofile",
"type": "soft",
"value": "65536"
}
]
}
}
"""
checked_globs = [
"/etc/security/limits.conf",
"/etc/security/limits.d/*.conf"
]
return _read_glob_paths_with_parser(tree, checked_globs, read_tmpfilesd_config)
def read_ssh_config(config_path):
"""
Read the content of provided SSH(d) configuration file.
Returns: list of uncommented and non-empty lines read from the configuation
file.
An example return value:
[
"Match final all",
"Include /etc/crypto-policies/back-ends/openssh.config",
"GSSAPIAuthentication yes",
"ForwardX11Trusted yes",
"SendEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES",
"SendEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT",
"SendEnv LC_IDENTIFICATION LC_ALL LANGUAGE",
"SendEnv XMODIFIERS"
]
"""
config_lines = []
with open(config_path) as f:
for line in f:
line = line.strip()
if not line:
continue
if line[0] == "#":
continue
config_lines.append(line)
return config_lines
def read_ssh_configs(tree):
"""
Read all SSH configuration files from a predefined list of paths and
parse them.
The searched paths are:
- "/etc/ssh/ssh_config"
- "/etc/ssh/ssh_config.d/*.conf"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by 'read_ssh_config()'.
An example return value:
{
"/etc/ssh": {
"ssh_config": [
"Include /etc/ssh/ssh_config.d/*.conf"
]
},
"/etc/ssh/ssh_config.d": {
"05-redhat.conf": [
"Match final all",
"Include /etc/crypto-policies/back-ends/openssh.config",
"GSSAPIAuthentication yes",
"ForwardX11Trusted yes",
"SendEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES",
"SendEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT",
"SendEnv LC_IDENTIFICATION LC_ALL LANGUAGE",
"SendEnv XMODIFIERS"
]
}
}
"""
checked_globs = [
"/etc/ssh/ssh_config",
"/etc/ssh/ssh_config.d/*.conf"
]
return _read_glob_paths_with_parser(tree, checked_globs, read_ssh_config)
def read_sshd_configs(tree):
"""
Read all SSHd configuration files from a predefined list of paths and
parse them.
The searched paths are:
- "/etc/ssh/sshd_config"
- "/etc/ssh/sshd_config.d/*.conf"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by 'read_ssh_config()'.
An example return value:
{
"/etc/ssh": {
"sshd_config": [
"HostKey /etc/ssh/ssh_host_rsa_key",
"HostKey /etc/ssh/ssh_host_ecdsa_key",
"HostKey /etc/ssh/ssh_host_ed25519_key",
"SyslogFacility AUTHPRIV",
"PermitRootLogin no",
"AuthorizedKeysFile\t.ssh/authorized_keys",
"PasswordAuthentication no",
"ChallengeResponseAuthentication no",
"GSSAPIAuthentication yes",
"GSSAPICleanupCredentials no",
"UsePAM yes",
"X11Forwarding yes",
"PrintMotd no",
"AcceptEnv LANG LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES",
"AcceptEnv LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT",
"AcceptEnv LC_IDENTIFICATION LC_ALL LANGUAGE",
"AcceptEnv XMODIFIERS",
"Subsystem\tsftp\t/usr/libexec/openssh/sftp-server",
"ClientAliveInterval 420"
]
}
}
"""
checked_globs = [
"/etc/ssh/sshd_config",
"/etc/ssh/sshd_config.d/*.conf"
]
return _read_glob_paths_with_parser(tree, checked_globs, read_ssh_config)
def read_yum_repos(tree):
"""
Read all YUM/DNF repo files.
The searched paths are:
- "/etc/yum.repos.d/*.repo"
Returns: dictionary as returned by '_read_glob_paths_with_parser()' with
configuration representation as returned by '_read_inifile_to_dict()'.
An example return value:
{
"/etc/yum.repos.d": {
"google-cloud.repo": {
"google-cloud-sdk": {
"baseurl": "https://packages.cloud.google.com/yum/repos/cloud-sdk-el8-x86_64",
"enabled": "1",
"gpgcheck": "1",
"gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg",
"name": "Google Cloud SDK",
"repo_gpgcheck": "0"
},
"google-compute-engine": {
"baseurl": "https://packages.cloud.google.com/yum/repos/google-compute-engine-el8-x86_64-stable",
"enabled": "1",
"gpgcheck": "1",
"gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg",
"name": "Google Compute Engine",
"repo_gpgcheck": "0"
}
}
}
}
"""
checked_globs = [
"/etc/yum.repos.d/*.repo"
]
return _read_glob_paths_with_parser(tree, checked_globs, _read_inifile_to_dict)
def read_sudoers(tree):
"""
Read uncommented lines from sudoers configuration file and /etc/sudoers.d
This functions does not actually do much of a parsing, as sudoers file
format grammar is a bit too much for our purpose.
Any #include or #includedir directives are ignored by this function.
Returns: dictionary with the keys representing names of read configuration
files, /etc/sudoers and files from /etc/sudoers.d. Value of each key is
a list of strings representing uncommented lines read from the configuration
file.
An example return value:
{
"/etc/sudoers": [
"Defaults !visiblepw",
"Defaults always_set_home",
"Defaults match_group_by_gid",
"Defaults always_query_group_plugin",
"Defaults env_reset",
"Defaults env_keep = \"COLORS DISPLAY HOSTNAME HISTSIZE KDEDIR LS_COLORS\"",
"Defaults env_keep += \"MAIL PS1 PS2 QTDIR USERNAME LANG LC_ADDRESS LC_CTYPE\"",
"Defaults env_keep += \"LC_COLLATE LC_IDENTIFICATION LC_MEASUREMENT LC_MESSAGES\"",
"Defaults env_keep += \"LC_MONETARY LC_NAME LC_NUMERIC LC_PAPER LC_TELEPHONE\"",
"Defaults env_keep += \"LC_TIME LC_ALL LANGUAGE LINGUAS _XKB_CHARSET XAUTHORITY\"",
"Defaults secure_path = /sbin:/bin:/usr/sbin:/usr/bin",
"root\tALL=(ALL) \tALL",
"%wheel\tALL=(ALL)\tALL",
"ec2-user\tALL=(ALL)\tNOPASSWD: ALL"
]
}
"""
result = {}
def _parse_sudoers_file(f):
lines = []
for line in f:
line = line.strip()
if not line:
continue
if line[0] == "#":
continue
lines.append(line)
return lines
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/sudoers") as f:
lines = _parse_sudoers_file(f)
if lines:
result["/etc/sudoers"] = lines
sudoersd_result = {}
for file in glob.glob(f"{tree}/etc/sudoers.d/*"):
with open(file) as f:
lines = _parse_sudoers_file(f)
if lines:
result[os.path.basename(file)] = lines
if sudoersd_result:
result["/etc/sudoers.d"] = sudoersd_result
return result
def read_udev_rules(tree):
"""
Read udev rules defined in /etc/udev/rules.d.
Returns: dictionary with the keys representing names of files with udev
rules from /etc/udev/rules.d. Value of each key is a list of strings
representing uncommented lines read from the configuration file. If
the file is empty (e.g. because of masking udev configuration installed
by an RPM), an emptu list is returned as the respective value.
An example return value:
{
"80-net-name-slot.rules": []
}
"""
result = {}
for file in glob.glob(f"{tree}/etc/udev/rules.d/*.rules"):
with open(file) as f:
lines = []
for line in f:
line = line.strip()
if not line:
continue
if line[0] == "#":
continue
lines.append(line)
# include also empty files in the report
result[os.path.basename(file)] = lines
return result
def _read_inifile_to_dict(config_path):
"""
Read INI file from the provided path
Returns: a dictionary representing the provided INI file content.
An example return value:
{
"google-cloud-sdk": {
"baseurl": "https://packages.cloud.google.com/yum/repos/cloud-sdk-el8-x86_64",
"enabled": "1",
"gpgcheck": "1",
"gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg",
"name": "Google Cloud SDK",
"repo_gpgcheck": "0"
},
"google-compute-engine": {
"baseurl": "https://packages.cloud.google.com/yum/repos/google-compute-engine-el8-x86_64-stable",
"enabled": "1",
"gpgcheck": "1",
"gpgkey": "https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg",
"name": "Google Compute Engine",
"repo_gpgcheck": "0"
}
}
"""
result = {}
with contextlib.suppress(FileNotFoundError):
with open(config_path) as f:
parser = configparser.RawConfigParser()
# prevent conversion of the opion name to lowercase
parser.optionxform = lambda option: option
parser.readfp(f)
for section in parser.sections():
section_config = dict(parser.items(section))
if section_config:
result[section] = section_config
return result
def read_dnf_conf(tree):
"""
Read DNF configuration and defined variable files.
Returns: dictionary with at most two keys 'dnf.conf' and 'vars'.
'dnf.conf' value is a dictionary representing the DNF configuration
file content.
'vars' value is a dictionary which keys represent names of files from
/etc/dnf/vars/ and values are strings representing the file content.
An example return value:
{
"dnf.conf": {
"main": {
"installonly_limit": "3"
}
},
"vars": {
"releasever": "8.4"
}
}
"""
result = {}
dnf_config = _read_inifile_to_dict(f"{tree}/etc/dnf/dnf.conf")
if dnf_config:
result["dnf.conf"] = dnf_config
dnf_vars = {}
for file in glob.glob(f"{tree}/etc/dnf/vars/*"):
with open(file) as f:
dnf_vars[os.path.basename(file)] = f.read().strip()
if dnf_vars:
result["vars"] = dnf_vars
return result
def read_dnf_automatic_conf(tree):
"""
Read DNF Automatic configuation.
Returns: dictionary as returned by '_read_inifile_to_dict()'.
An example return value:
{
"base": {
"debuglevel": "1"
},
"command_email": {
"email_from": "root@example.com",
"email_to": "root"
},
"commands": {
"apply_updates": "yes",
"download_updates": "yes",
"network_online_timeout": "60",
"random_sleep": "0",
"upgrade_type": "security"
},
"email": {
"email_from": "root@example.com",
"email_host": "localhost",
"email_to": "root"
},
"emitters": {
"emit_via": "stdio"
}
}
"""
return _read_inifile_to_dict(f"{tree}/etc/dnf/automatic.conf")
def read_authselect_conf(tree):
"""
Read authselect configuration.
Returns: dictionary with two keys 'profile-id' and 'enabled-features'.
'profile-id' value is a string representing the configured authselect
profile.
'enabled-features' value is a list of strings representing enabled features
of the used authselect profile. In case there are no specific features
enabled, the list is empty.
An example return value:
{
"enabled-features": [],
"profile-id": "sssd"
}
"""
result = {}
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/authselect/authselect.conf") as f:
# the first line is always the profile ID
# following lines are listing enabled features
# lines starting with '#' and empty lines are skipped
authselect_conf_lines = []
for line in f:
line = line.strip()
if not line:
continue
if line[0] == "#":
continue
authselect_conf_lines.append(line)
if authselect_conf_lines:
result["profile-id"] = authselect_conf_lines[0]
result["enabled-features"] = authselect_conf_lines[1:]
return result
def read_resolv_conf(tree):
"""
Read /etc/resolv.conf.
Returns: a list of uncommented lines from the /etc/resolv.conf.
An example return value:
[
"search redhat.com",
"nameserver 192.168.1.1",
"nameserver 192.168.1.2"
]
"""
result = []
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/resolv.conf") as f:
for line in f:
line = line.strip()
if not line:
continue
if line[0] == "#":
continue
result.append(line)
return result
def append_filesystem(report, tree, *, is_ostree=False):
if os.path.exists(f"{tree}/etc/os-release"):
report["packages"] = rpm_packages(tree, is_ostree)
if not is_ostree:
report["rpm-verify"] = rpm_verify(tree)
not_installed_docs = rpm_not_installed_docs(tree, is_ostree)
if not_installed_docs:
report["rpm_not_installed_docs"] = not_installed_docs
with open(f"{tree}/etc/os-release") as f:
report["os-release"] = parse_environment_vars(f.read())
report["services-enabled"] = read_services(tree, "enabled")
report["services-disabled"] = read_services(tree, "disabled")
default_target = read_default_target(tree)
if default_target:
report["default-target"] = default_target
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/hostname") as f:
report["hostname"] = f.read().strip()
with contextlib.suppress(FileNotFoundError):
report["timezone"] = os.path.basename(os.readlink(f"{tree}/etc/localtime"))
authselect_conf = read_authselect_conf(tree)
if authselect_conf:
report["authselect"] = authselect_conf
chrony_conf = read_chrony_conf(tree)
if chrony_conf:
report["chrony"] = chrony_conf
cloud_init_configs = read_cloud_init_configs(tree)
if cloud_init_configs:
report["cloud-init"] = cloud_init_configs
dnf_conf = read_dnf_conf(tree)
if dnf_conf:
report["dnf"] = dnf_conf
dnf_automatic = read_dnf_automatic_conf(tree)
if dnf_automatic:
report["/etc/dnf/automatic.conf"] = dnf_automatic
yum_repos = read_yum_repos(tree)
if yum_repos:
report["yum_repos"] = yum_repos
dracut_configs = read_dracut_configs(tree)
if dracut_configs:
report["dracut"] = dracut_configs
with contextlib.suppress(FileNotFoundError):
report["firewall-enabled"] = read_firewall_zone(tree)
firewall_default_zone = read_firewall_default_zone(tree)
if firewall_default_zone:
report["firewall-default-zone"] = firewall_default_zone
fstab = read_fstab(tree)
if fstab:
report["fstab"] = fstab
hosts = read_hosts(tree)
if hosts:
report["hosts"] = hosts
keyboard = read_keyboard_conf(tree)
if keyboard:
report["keyboard"] = keyboard
security_limits_configs = read_security_limits_configs(tree)
if security_limits_configs:
report["security-limits"] = security_limits_configs
locale = read_locale(tree)
if locale:
report["locale"] = locale
logind_configs = read_logind_configs(tree)
if logind_configs:
report["systemd-logind"] = logind_configs
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/etc/machine-id") as f:
report["machine-id"] = f.readline()
modprobe_configs = read_modprobe_configs(tree)
if modprobe_configs:
report["modprobe"] = modprobe_configs
tmpfilesd_configs = read_tmpfilesd_configs(tree)
if tmpfilesd_configs:
report["tmpfiles.d"] = tmpfilesd_configs
rhsm = read_rhsm(tree)
if rhsm:
report["rhsm"] = rhsm
selinux = read_selinux_info(tree, is_ostree)
if selinux:
report["selinux"] = selinux
ssh_configs = read_ssh_configs(tree)
if ssh_configs:
report["ssh_config"] = ssh_configs
sshd_configs = read_sshd_configs(tree)
if sshd_configs:
report["sshd_config"] = sshd_configs
sudoers_conf = read_sudoers(tree)
if sudoers_conf:
report["sudoers"] = sudoers_conf
sysconfig = read_sysconfig(tree)
if sysconfig:
report["sysconfig"] = sysconfig
sysctld_configs = read_sysctld_configs(tree)
if sysctld_configs:
report["sysctl.d"] = sysctld_configs
systemd_service_dropins = read_systemd_service_dropins(tree)
if systemd_service_dropins:
report["systemd-service-dropins"] = systemd_service_dropins
tuned_profile = read_tuned_profile(tree)
if tuned_profile:
report["tuned"] = tuned_profile
resolv_conf = read_resolv_conf(tree)
# add even empty resolv_conf to the report to express that it is empty or non-existent
report["/etc/resolv.conf"] = resolv_conf
udev_rules = read_udev_rules(tree)
if udev_rules:
report["/etc/udev/rules.d"] = udev_rules
with open(f"{tree}/etc/passwd") as f:
report["passwd"] = sorted(f.read().strip().split("\n"))
with open(f"{tree}/etc/group") as f:
report["groups"] = sorted(f.read().strip().split("\n"))
if is_ostree:
with open(f"{tree}/usr/lib/passwd") as f:
report["passwd-system"] = sorted(f.read().strip().split("\n"))
with open(f"{tree}/usr/lib/group") as f:
report["groups-system"] = sorted(f.read().strip().split("\n"))
if os.path.exists(f"{tree}/boot") and len(os.listdir(f"{tree}/boot")) > 0:
assert "bootmenu" not in report
with contextlib.suppress(FileNotFoundError):
with open(f"{tree}/boot/grub2/grubenv") as f:
report["boot-environment"] = parse_environment_vars(f.read())
report["bootmenu"] = read_boot_entries(f"{tree}/boot")
elif len(glob.glob(f"{tree}/vmlinuz-*")) > 0:
assert "bootmenu" not in report
with open(f"{tree}/grub2/grubenv") as f:
report["boot-environment"] = parse_environment_vars(f.read())
report["bootmenu"] = read_boot_entries(tree)
elif len(glob.glob(f"{tree}/EFI")):
print("EFI partition", file=sys.stderr)
def volume_group_for_device(device: str) -> str:
# Find the volume group that belongs to the device specified via `parent`
vg_name = None
count = 0
cmd = [
"pvdisplay", "-C", "--noheadings", "-o", "vg_name", device
]
while True:
res = subprocess.run(cmd,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="UTF-8")
if res.returncode == 5:
if count == 10:
raise RuntimeError("Could not find parent device")
time.sleep(1*count)
count += 1
continue
if res.returncode != 0:
raise RuntimeError(res.stderr.strip())
vg_name = res.stdout.strip()
if vg_name:
break
return vg_name
def ensure_device_file(path: str, major: int, minor: int):
"""Ensure the device file with the given major, minor exists"""
os.makedirs(os.path.dirname(path), exist_ok=True)
if not os.path.exists(path):
os.mknod(path, 0o600 | stat.S_IFBLK, os.makedev(major, minor))
@contextlib.contextmanager
def discover_lvm(dev) -> Dict:
# find the volume group name for the device file
vg_name = volume_group_for_device(dev)
# activate it
r = subprocess.run(["vgchange", "-ay", vg_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
check=False)
if r.returncode != 0:
raise RuntimeError(r.stderr.strip())
try:
# Find all logical volumes in the volume group
cmd = [
"lvdisplay", "-C", "--noheadings",
"-o", "lv_name,path,lv_kernel_major,lv_kernel_minor",
"--separator", ";",
vg_name
]
res = subprocess.run(cmd,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="UTF-8")
if res.returncode != 0:
raise RuntimeError(res.stderr.strip())
data = res.stdout.strip()
parsed = list(map(lambda l: l.split(";"), data.split("\n")))
volumes = OrderedDict()
for vol in parsed:
vol = list(map(lambda v: v.strip(), vol))
assert len(vol) == 4
name, voldev, major, minor = vol
info = {
"device": voldev
}
ensure_device_file(voldev, int(major), int(minor))
read_partition(voldev, info)
volumes[name] = info
if name.startswith("root"):
volumes.move_to_end(name, last=False)
res = {
"lvm": True,
"lvm.vg": vg_name,
"lvm.volumes": volumes
}
yield res
finally:
r = subprocess.run(["vgchange", "-an", vg_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
check=False)
if r.returncode != 0:
raise RuntimeError(r.stderr.strip())
def partition_is_lvm(part: Dict) -> bool:
return part["type"].upper() in ["E6D6D379-F507-44C2-A23C-238F2A3DF928", "8E"]
def append_partitions(report, device, loctl):
partitions = report["partitions"]
with contextlib.ExitStack() as cm:
# open each partition as a loop device
filesystems = {}
for part in partitions:
start, size = part["start"], part["size"]
dev = cm.enter_context(loop_open(loctl, device, offset=start, size=size))
read_partition(dev, part)
if partition_is_lvm(part):
lvm = cm.enter_context(discover_lvm(dev))
for vol in lvm["lvm.volumes"].values():
if vol["fstype"]:
mntopts = []
# we cannot recover since the underlying loopback device is mounted
# read-only but since we are using the it through the device mapper
# the fact might not be communicated and the kernel attempt a to
# a recovery of the filesystem, which will lead to a kernel panic
if vol["fstype"] in ("ext4", "ext3", "xfs"):
mntopts = ["norecovery"]
filesystems[vol["uuid"].upper()] = {
"device": vol["device"],
"mntops": mntopts
}
del vol["device"]
part.update(lvm)
elif part["uuid"] and part["fstype"]:
filesystems[part["uuid"].upper()] = {
"device": dev
}
# find partition with fstab and read it
fstab = []
for fs in filesystems.values():
dev, opts = fs["device"], fs.get("mntops")
with mount(dev, opts) as tree:
if os.path.exists(f"{tree}/etc/fstab"):
fstab.extend(read_fstab(tree))
break
# sort the fstab entries by the mountpoint
fstab = sorted(fstab, key=operator.itemgetter(1))
# mount all partitions to ther respective mount points
root_tree = ""
for n, fstab_entry in enumerate(fstab):
part_uuid = fstab_entry[0].split("=")[1].upper()
part_device = filesystems[part_uuid]["device"]
part_mountpoint = fstab_entry[1]
part_fstype = fstab_entry[2]
part_options = fstab_entry[3].split(",")
part_options += filesystems[part_uuid].get("mntops", [])
# the first mount point should be root
if n == 0:
if part_mountpoint != "/":
raise RuntimeError("The first mountpoint in sorted fstab entries is not '/'")
root_tree = cm.enter_context(mount(part_device, part_options))
continue
cm.enter_context(mount_at(part_device, f"{root_tree}{part_mountpoint}", options=part_options, extra=["-t", part_fstype]))
if not root_tree:
raise RuntimeError("The root filesystem tree is not mounted")
append_filesystem(report, root_tree)
def analyse_image(image):
loctl = loop.LoopControl()
imgfmt = read_image_format(image)
report = {"image-format": imgfmt}
with open_image(loctl, image, imgfmt) as (_, device):
report["bootloader"] = read_bootloader_type(device)
report.update(read_partition_table(device))
if report["partition-table"]:
append_partitions(report, device, loctl)
else:
with mount(device) as tree:
append_filesystem(report, tree)
return report
def append_directory(report, tree):
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
tree_ro = os.path.join(tmpdir, "root_ro")
os.makedirs(tree_ro)
# Make sure that the tools which analyse the directory in-place
# can not modify its content (e.g. create additional files).
# mount_at() always mounts the source as read-only!
with mount_at(tree, tree_ro, ["bind"]) as mountpoint:
if os.path.lexists(f"{tree}/ostree"):
os.makedirs(f"{tree}/etc", exist_ok=True)
with mount_at(f"{tree}/usr/etc", f"{tree}/etc", extra=["--bind"]):
append_filesystem(report, tree_ro, is_ostree=True)
else:
append_filesystem(report, tree_ro)
def append_ostree_repo(report, repo):
ostree = functools.partial(run_ostree, repo=repo)
r = ostree("config", "get", "core.mode")
report["ostree"] = {
"repo": {
"core.mode": r.stdout.strip()
}
}
r = ostree("refs")
refs = r.stdout.strip().split("\n")
report["ostree"]["refs"] = refs
resolved = {r: ostree("rev-parse", r).stdout.strip() for r in refs}
commit = resolved[refs[0]]
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
tree = os.path.join(tmpdir, "tree")
ostree("checkout", "--force-copy", commit, tree)
append_directory(report, tree)
def analyse_directory(path):
report = {}
if os.path.exists(os.path.join(path, "compose.json")):
report["type"] = "ostree/commit"
repo = os.path.join(path, "repo")
append_ostree_repo(report, repo)
elif os.path.isdir(os.path.join(path, "refs")):
report["type"] = "ostree/repo"
repo = os.path.join(path, "repo")
append_ostree_repo(report, repo)
else:
append_directory(report, path)
return report
def is_tarball(path):
mtype, encoding = mimetypes.guess_type(path)
return mtype == "application/x-tar"
def analyse_tarball(path):
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
tree = os.path.join(tmpdir, "root")
os.makedirs(tree)
command = [
"tar",
"--selinux",
"--xattrs",
"--acls",
"-x",
"--auto-compress",
"-f", path,
"-C", tree
]
subprocess.run(command,
stdout=sys.stderr,
check=True)
return analyse_directory(tree)
def is_compressed(path):
_, encoding = mimetypes.guess_type(path)
return encoding in ["xz", "gzip", "bzip2"]
def analyse_compressed(path):
_, encoding = mimetypes.guess_type(path)
if encoding == "xz":
command = ["unxz", "--force"]
elif encoding == "gzip":
command = ["gunzip", "--force"]
elif encoding == "bzip2":
command = ["bunzip2", "--force"]
else:
raise ValueError(f"Unsupported compression: {encoding}")
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmpdir:
subprocess.run(["cp", "--reflink=auto", "-a", path, tmpdir],
check=True)
files = os.listdir(tmpdir)
archive = os.path.join(tmpdir, files[0])
subprocess.run(command + [archive], check=True)
files = os.listdir(tmpdir)
assert len(files) == 1
image = os.path.join(tmpdir, files[0])
return analyse_image(image)
def main():
parser = argparse.ArgumentParser(description="Inspect an image")
parser.add_argument("target", metavar="TARGET",
help="The file or directory to analyse",
type=os.path.abspath)
args = parser.parse_args()
target = args.target
if os.path.isdir(target):
report = analyse_directory(target)
elif is_tarball(target):
report = analyse_tarball(target)
elif is_compressed(target):
report = analyse_compressed(target)
else:
report = analyse_image(target)
json.dump(report, sys.stdout, sort_keys=True, indent=2)
if __name__ == "__main__":
main()