modules: explicit encodings for open()

This commit is contained in:
Simon de Vlieger 2022-09-09 12:02:33 +02:00
parent 38d2ab685c
commit b07aca5d86
89 changed files with 144 additions and 144 deletions

View file

@ -112,7 +112,7 @@ MEDIA_TYPES = {
def sha256sum(path: str) -> str:
ret = subprocess.run(["sha256sum", path],
stdout=subprocess.PIPE,
encoding="utf-8",
encoding="utf8",
check=True)
return ret.stdout.strip().split(" ")[0]
@ -135,7 +135,7 @@ def blobs_add_file(blobs: str, path: str, mtype: str):
def blobs_add_json(blobs: str, js: str, mtype: str):
js_file = os.path.join(blobs, "temporary.js")
with open(js_file, "w") as f:
with open(js_file, "w", encoding="utf8") as f:
json.dump(js, f)
return blobs_add_file(blobs, js_file, mtype)
@ -246,11 +246,11 @@ def create_oci_dir(tree, output_dir, options):
# index
print("writing index")
with open(os.path.join(output_dir, "index.json"), "w") as f:
with open(os.path.join(output_dir, "index.json"), "w", encoding="utf8") as f:
json.dump(index, f)
# oci-layout tag
with open(os.path.join(output_dir, "oci-layout"), "w") as f:
with open(os.path.join(output_dir, "oci-layout"), "w", encoding="utf8") as f:
json.dump({"imageLayoutVersion": "1.0.0"}, f)

View file

@ -166,7 +166,7 @@ def main(tree, output_dir, options, meta):
stdout=sys.stderr,
check=True)
with open(os.path.join(output_dir, "compose.json"), "r") as f:
with open(os.path.join(output_dir, "compose.json"), "r", encoding="utf8") as f:
compose = json.load(f)
api.metadata({"compose": compose})

View file

@ -171,7 +171,7 @@ def mkfs_ext4(device, uuid, label):
if label:
opts = ["-L", label]
subprocess.run(["mkfs.ext4", "-U", uuid] + opts + [device],
input="y", encoding='utf-8', check=True)
input="y", encoding='utf8', check=True)
def mkfs_xfs(device, uuid, label):
@ -179,7 +179,7 @@ def mkfs_xfs(device, uuid, label):
if label:
opts = ["-L", label]
subprocess.run(["mkfs.xfs", "-m", f"uuid={uuid}"] + opts + [device],
encoding='utf-8', check=True)
encoding='utf8', check=True)
def mkfs_btrfs(device, uuid, label):
@ -187,7 +187,7 @@ def mkfs_btrfs(device, uuid, label):
if label:
opts = ["-L", label]
subprocess.run(["mkfs.btrfs", "-U", uuid] + opts + [device],
encoding='utf-8', check=True)
encoding='utf8', check=True)
def mkfs_vfat(device, uuid, label):
@ -195,7 +195,7 @@ def mkfs_vfat(device, uuid, label):
opts = []
if label:
opts = ["-n", label]
subprocess.run(["mkfs.vfat", "-i", volid] + opts + [device], encoding='utf-8', check=True)
subprocess.run(["mkfs.vfat", "-i", volid] + opts + [device], encoding='utf8', check=True)
class Filesystem:
@ -336,7 +336,7 @@ class PartitionTable:
subprocess.run(["sfdisk", "-q", target],
input=command,
encoding='utf-8',
encoding='utf8',
check=True)
if sync:
@ -346,7 +346,7 @@ class PartitionTable:
"""Update and fill in missing information from disk"""
r = subprocess.run(["sfdisk", "--json", target],
stdout=subprocess.PIPE,
encoding='utf-8',
encoding='utf8',
check=True)
disk_table = json.loads(r.stdout)["partitiontable"]
disk_parts = disk_table["partitions"]
@ -596,7 +596,7 @@ def install_grub2(image: str, pt: PartitionTable, options):
def parse_blsfile(blsfile):
params = {}
with open(blsfile, "r") as bls:
with open(blsfile, "r", encoding="utf8") as bls:
for line in bls:
key, value = line.split(' ', 1)
params[key] = value.strip()
@ -698,7 +698,7 @@ def main(tree, output_dir, options, loop_client):
if fmt == "raw":
subprocess.run(["cp", image, f"{output_dir}/{filename}"], check=True)
elif fmt == "raw.xz":
with open(f"{output_dir}/{filename}", "w") as f:
with open(f"{output_dir}/{filename}", "w", encoding="utf8") as f:
subprocess.run(
["xz", "--keep", "--stdout", "-0", image],
stdout=f,

View file

@ -68,15 +68,15 @@ def mount(source, dest, *options):
def mkfs_ext4(device, uuid):
subprocess.run(["mkfs.ext4", "-U", uuid, device], input="y", encoding='utf-8', check=True)
subprocess.run(["mkfs.ext4", "-U", uuid, device], input="y", encoding='utf8', check=True)
def mkfs_xfs(device, uuid):
subprocess.run(["mkfs.xfs", "-m", f"uuid={uuid}", device], encoding='utf-8', check=True)
subprocess.run(["mkfs.xfs", "-m", f"uuid={uuid}", device], encoding='utf8', check=True)
def mkfs_btrfs(device, uuid):
subprocess.run(["mkfs.btrfs", "-U", uuid, device], encoding='utf-8', check=True)
subprocess.run(["mkfs.btrfs", "-U", uuid, device], encoding='utf8', check=True)
def main(tree, output_dir, options, loop_client):

View file

@ -9,7 +9,7 @@ import osbuild.api
def ldconfig():
# ld.so.conf must exist, or `ldconfig` throws a warning
with open("/etc/ld.so.conf", "w", ) as f:
with open("/etc/ld.so.conf", "w", encoding="utf8") as f:
# qemu-img needs `libiscsi`, which is located in /usr/lib64/iscsi
f.write("/usr/lib64/iscsi\n")
f.flush()

View file

@ -46,7 +46,7 @@ def os_release():
# remove the symlink that systemd-nspawn creates
os.remove("/etc/os-release")
with open("/etc/os-release", "w") as f:
with open("/etc/os-release", "w", encoding="utf8") as f:
f.write('NAME="Red Hat Enterprise Linux"\n')
f.write('VERSION="8.1 (Ootpa)"\n')
f.write('ID="rhel"\n')

View file

@ -45,7 +45,7 @@ def main(tree, options):
product_dir = os.path.join(tree, "etc/anaconda/conf.d")
os.makedirs(product_dir, exist_ok=True)
with open(os.path.join(product_dir, "90-osbuild.conf"), "w") as f:
with open(os.path.join(product_dir, "90-osbuild.conf"), "w", encoding="utf8") as f:
f.write(CONFIG)
for m in modules:
f.write(f" {m}\n")

View file

@ -171,7 +171,7 @@ def replace(target, patterns):
finder = [(re.compile(p), s) for p, s in patterns]
newfile = target + ".replace"
with open(target, "r") as i, open(newfile, "w") as o:
with open(target, "r", encoding="utf8") as i, open(newfile, "w", encoding="utf8") as o:
for line in i:
for p, s in finder:
line = p.sub(s, line)
@ -180,7 +180,7 @@ def replace(target, patterns):
def make_rootfs(tree, image, size, workdir, loop_client):
with open(image, "w") as f:
with open(image, "w", encoding="utf8") as f:
os.ftruncate(f.fileno(), size)
root = os.path.join(workdir, "rootfs")
@ -192,7 +192,7 @@ def make_rootfs(tree, image, size, workdir, loop_client):
"-b", "4096",
"-m", "0",
dev],
input="y", encoding='utf-8', check=True)
input="y", encoding='utf8', check=True)
with mount(dev, root):
print("copying tree")
@ -267,7 +267,7 @@ def make_efi(efi, info, root, loop_client):
# create the image
image = os.path.join(info["imgdir"], "efiboot.img")
with open(image, "w") as f:
with open(image, "w", encoding="utf8") as f:
os.ftruncate(f.fileno(), size)
root = os.path.join(info["workdir"], "mnt")
@ -277,7 +277,7 @@ def make_efi(efi, info, root, loop_client):
subprocess.run(["mkfs.fat",
"-n", "ANACONDA",
dev],
input="y", encoding='utf-8', check=True)
input="y", encoding='utf8', check=True)
with mount(dev, root):
target = os.path.join(root, "EFI", "BOOT")

View file

@ -74,7 +74,7 @@ def main(tree, options):
"osbuild": "devel",
}
with open(f"{tree}/.buildstamp", "w") as f:
with open(f"{tree}/.buildstamp", "w", encoding="utf8") as f:
stamp.write(f)
return 0

View file

@ -154,7 +154,7 @@ def main(tree, options):
# therefore default to 'None' to distinguish these two cases.
leapsectz = options.get("leapsectz", None)
with open(f"{tree}/etc/chrony.conf") as f:
with open(f"{tree}/etc/chrony.conf", encoding="utf8") as f:
chrony_conf = f.read()
# Split to lines and remove ones starting with server, pool or peer.
@ -172,7 +172,7 @@ def main(tree, options):
new_chrony_conf = "\n".join(lines)
with open(f"{tree}/etc/chrony.conf", "w") as f:
with open(f"{tree}/etc/chrony.conf", "w", encoding="utf8") as f:
f.write(new_chrony_conf)
return 0

View file

@ -68,7 +68,7 @@ def main(devices, options):
os.symlink("/proc/self/fd", "/dev/fd")
subprocess.run(command,
encoding='utf-8', check=True,
encoding='utf8', check=True,
input=passphrase)

View file

@ -151,7 +151,7 @@ def main(tree, options):
config_files_dir = f"{tree}/etc/cloud/cloud.cfg.d"
with open(f"{config_files_dir}/{filename}", "w") as f:
with open(f"{config_files_dir}/{filename}", "w", encoding="utf8") as f:
yaml.dump(config, f, default_flow_style=False)
return 0

View file

@ -161,12 +161,12 @@ def main(tree, options):
data = {}
with contextlib.suppress(FileNotFoundError):
with open(path, "r", encoding="utf-8") as f:
with open(path, "r", encoding="utf8") as f:
data = toml.load(f)
merge_config("storage", data, config)
with open(path, "w", encoding="utf-8") as f:
with open(path, "w", encoding="utf8") as f:
write_comment(f, HEADER)
write_comment(f, comment)

View file

@ -66,7 +66,7 @@ def main(tree, options):
filename = options["filename"]
filepath = os.path.join(tree, "etc", f"cron.{interval}", filename)
with open(filepath, "w", encoding="utf-8") as f:
with open(filepath, "w", encoding="utf8") as f:
cmd = options["simple"]
cmdline = cmd["command"]
comment = cmd.get("comment")

View file

@ -66,7 +66,7 @@ SCHEMA = """
def main(tree, options):
volumes = options["volumes"]
with open(f"{tree}/etc/crypttab", "w") as f:
with open(f"{tree}/etc/crypttab", "w", encoding="utf8") as f:
for volume in volumes:
name = volume["volume"]
uuid = volume.get("uuid")

View file

@ -56,7 +56,7 @@ KillSignal=SIGHUP
UnsetEnvironment=LANG LANGUAGE LC_CTYPE LC_NUMERIC LC_TIME LC_COLLATE LC_MONETARY LC_MESSAGES LC_PAPER LC_NAME LC_ADDRESS LC_TELEPHONE LC_MEASUREMENT LC_IDENTIFICATION
"""
with open(f"{tree}/etc/systemd/system/osbuild-debug-shell.service", "w") as f:
with open(f"{tree}/etc/systemd/system/osbuild-debug-shell.service", "w", encoding="utf8") as f:
f.write(unit)
os.symlink("../osbuild-debug-shell.service",

View file

@ -35,7 +35,7 @@ def main(tree, options):
# Based on `pylorax/discinfo.py`
timestamp = time.time()
with open(os.path.join(tree, ".discinfo"), "w") as f:
with open(os.path.join(tree, ".discinfo"), "w", encoding="utf8") as f:
f.write(f"{timestamp}\n")
f.write(f"{release}\n")
f.write(f"{basearch}\n")

View file

@ -68,7 +68,7 @@ def main(tree, options):
return 0
try:
with open(dnf_automatic_config_path, "r") as f:
with open(dnf_automatic_config_path, "r", encoding="utf8") as f:
dnf_automatic_conf.readfp(f)
except FileNotFoundError:
print(f"Error: DNF automatic configuration file '{dnf_automatic_config_path}' does not exist.")
@ -80,7 +80,7 @@ def main(tree, options):
value = bool_to_yes_no(value)
dnf_automatic_conf.set(config_section, option, value)
with open(dnf_automatic_config_path, "w") as f:
with open(dnf_automatic_config_path, "w", encoding="utf8") as f:
dnf_automatic_conf.write(f)
return 0

View file

@ -102,7 +102,7 @@ def configure_variable(tree, name, value):
"""
vars_directory = "/etc/dnf/vars"
with open(f"{tree}{vars_directory}/{name}", "w", encoding="utf-8") as f:
with open(f"{tree}{vars_directory}/{name}", "w", encoding="utf8") as f:
f.write(value + "\n")
@ -133,7 +133,7 @@ def make_dnf_config(tree, config_options):
dnf_config = iniparse.SafeConfigParser()
try:
with open(dnf_config_path, "r", encoding="utf-8") as f:
with open(dnf_config_path, "r", encoding="utf8") as f:
dnf_config.readfp(f)
except FileNotFoundError:
print(f"Warning: DNF configuration file '{dnf_config_path}' does not exist, will create it.")
@ -142,7 +142,7 @@ def make_dnf_config(tree, config_options):
for section, items in config_options.items():
make_section(dnf_config, section, items)
with open(dnf_config_path, "w", encoding="utf-8") as f:
with open(dnf_config_path, "w", encoding="utf8") as f:
os.fchmod(f.fileno(), 0o644)
dnf_config.write(f)

View file

@ -172,7 +172,7 @@ def main(tree, options):
"reproducible": bool_option_writer
}
with open(f"{config_files_dir}/{filename}", "w") as f:
with open(f"{config_files_dir}/{filename}", "w", encoding="utf8") as f:
for option, value in config.items():
try:
writter_func = SUPPORTED_OPTIONS[option]

View file

@ -63,13 +63,13 @@ Type=oneshot
{execs}"""
os.makedirs(f"{tree}/usr/lib/systemd/system/default.target.wants", exist_ok=True)
with open(f"{tree}/usr/lib/systemd/system/osbuild-first-boot.service", "w") as f:
with open(f"{tree}/usr/lib/systemd/system/osbuild-first-boot.service", "w", encoding="utf8") as f:
f.write(service)
os.symlink("../osbuild-first-boot.service",
f"{tree}/usr/lib/systemd/system/default.target.wants/osbuild-first-boot.service")
os.makedirs(f"{tree}/etc", exist_ok=True)
open(f"{tree}/etc/osbuild-first-boot", 'a').close()
open(f"{tree}/etc/osbuild-first-boot", 'a', encoding="utf8").close()
def main(tree, options):

View file

@ -49,10 +49,10 @@ def main(tree, options):
path_re = re.compile(r"(/.*)+/boot")
for name in glob.glob(f"{tree}/boot/loader/entries/*.conf"):
with open(name) as f:
with open(name, encoding="utf8") as f:
entry = f.read().splitlines(keepends=True)
with open(name, "w") as f:
with open(name, "w", encoding="utf8") as f:
for line in entry:
f.write(path_re.sub(prefix, line))

View file

@ -129,7 +129,7 @@ def main(tree, options):
path = f"{root}/etc/fstab"
with open(path, "w") as f:
with open(path, "w", encoding="utf8") as f:
for filesystem in filesystems:
uuid = filesystem.get("uuid")
path = filesystem["path"]

View file

@ -257,7 +257,7 @@ def main(tree, options):
parser = iniparse.SafeConfigParser()
try:
with open(filepath, "r", encoding="utf-8") as f:
with open(filepath, "r", encoding="utf8") as f:
parser.readfp(f)
except FileNotFoundError:
print(f"Creating new guest-agent configuration file at '{filepath}'.")
@ -266,7 +266,7 @@ def main(tree, options):
for section_id, section_content in config.items():
make_section(parser, section_id, section_content)
with open(filepath, "w", encoding="utf-8") as f:
with open(filepath, "w", encoding="utf8") as f:
parser.write(f)
return 0

View file

@ -67,7 +67,7 @@ def main(tree, options):
new_svcs = [ns for ns in value if ns not in svcs]
new_svcs.extend(svcs)
sys.stdout.write('GREENBOOT_MONITOR_SERVICES="{0}"\n'.format(" ".join(new_svcs)))
with open(config_file, mode="a") as f:
with open(config_file, "a", encoding="utf8") as f:
for key, value in changes.items():
if key == "monitor_services":
f.write('GREENBOOT_MONITOR_SERVICES="{0}"\n'.format(" ".join(value)))

View file

@ -440,7 +440,7 @@ class GrubConfig:
tplt = string.Template(GRUB_CFG_TEMPLATE)
data = tplt.safe_substitute(config)
with open(path, "w") as cfg:
with open(path, "w", encoding="utf8") as cfg:
cfg.write(data)
def write_redirect(self, tree, path):
@ -456,7 +456,7 @@ class GrubConfig:
tplt = string.Template(GRUB_REDIRECT_TEMPLATE)
data = tplt.safe_substitute(config)
with open(os.path.join(tree, path), "w") as cfg:
with open(os.path.join(tree, path), "w", encoding="utf8") as cfg:
cfg.write(data)
def defaults(self):
@ -530,7 +530,7 @@ def main(tree, options):
# Create the configuration file that determines how grub.cfg is generated.
if write_defaults:
os.makedirs(f"{tree}/etc/default", exist_ok=True)
with open(f"{tree}/etc/default/grub", "w") as default:
with open(f"{tree}/etc/default/grub", "w", encoding="utf8") as default:
default.write(config.defaults())
os.makedirs(f"{tree}/boot/grub2", exist_ok=True)
@ -545,7 +545,7 @@ def main(tree, options):
except FileNotFoundError:
pass
with open(grubenv, "w") as env:
with open(grubenv, "w", encoding="utf8") as env:
fs_type, fs_id = fs_spec_decode(root_fs)
data = "# GRUB Environment Block\n"

View file

@ -145,7 +145,7 @@ def main(root, options):
})
config = os.path.join(efidir, "grub.cfg")
with open(config, "w") as cfg:
with open(config, "w", encoding="utf8") as cfg:
cfg.write(data)
if "IA32" in architectures:

View file

@ -423,7 +423,7 @@ class GrubConfig:
data += "\n"
with open(path, "w") as cfg:
with open(path, "w", encoding="utf8") as cfg:
print(data)
cfg.write(data)
@ -478,13 +478,13 @@ def main(tree, options):
# Create the configuration file that determines how grub.cfg is generated.
if write_defaults:
os.makedirs(f"{tree}/etc/default", exist_ok=True)
with open(f"{tree}/etc/default/grub", "w") as default:
with open(f"{tree}/etc/default/grub", "w", encoding="utf8") as default:
default.write(config.defaults())
os.makedirs(f"{tree}/boot/grub2", exist_ok=True)
grubenv = f"{tree}/boot/grub2/grubenv"
with open(grubenv, "w") as env:
with open(grubenv, "w", encoding="utf8") as env:
data = (
"# GRUB Environment Block\n"
)

View file

@ -52,7 +52,7 @@ def main(inputs, output, options):
source = parse_input(inputs)
target = os.path.join(output, path.lstrip("/"))
with open(target, "w") as f:
with open(target, "w", encoding="utf8") as f:
cmd = ["gunzip", "--stdout", source]
subprocess.run(cmd, stdout=f,
check=True)

View file

@ -53,7 +53,7 @@ def main(inputs, output, options):
source = parse_input(inputs)
target = os.path.join(output, filename)
with open(target, "w") as f:
with open(target, "w", encoding="utf8") as f:
cmd = [
"gzip", "--no-name", "--stdout", "-1", source
]

View file

@ -44,7 +44,7 @@ def main(tree, options):
# itself will be sourced this the 'ignition_network_kcmdline'
# that is also in the "ignition_firstboot" variable can be
# overwritten with the contents of `network`
with open(f"{tree}/boot/ignition.firstboot", "w") as f:
with open(f"{tree}/boot/ignition.firstboot", "w", encoding="utf8") as f:
if network:
netstr = " ".join(network)
f.write(f"ignition_network_kcmdline={netstr}")

View file

@ -228,7 +228,7 @@ def main(tree, inputs, options):
})
config = os.path.join(isolinux, "isolinux.cfg")
with open(config, "w") as cfg:
with open(config, "w", encoding="utf8") as cfg:
cfg.write(data)
# link the kernel

View file

@ -52,7 +52,7 @@ def main(tree, options):
base = os.path.join(tree, "etc/kernel")
os.makedirs(base, exist_ok=True)
with open(f"{base}/cmdline", "w") as f:
with open(f"{base}/cmdline", "w", encoding="utf8") as f:
f.write(" ".join(filter(len, params)))
return 0

View file

@ -70,7 +70,7 @@ Section "InputClass"
EndSection
"""
with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf", "w") as f:
with open(f"{tree}/etc/X11/xorg.conf.d/00-keyboard.conf", "w", encoding="utf8") as f:
f.write(file_content)

View file

@ -218,13 +218,13 @@ def main(tree, options):
base = os.path.dirname(target)
os.makedirs(base, exist_ok=True)
with open(target, "w") as f:
with open(target, "w", encoding="utf8") as f:
if config:
f.write("\n".join(config))
f.write("\n")
print(f"created kickstarted at: {path}\n")
with open(target, "r") as f:
with open(target, "r", encoding="utf8") as f:
print(f.read())
return 0

View file

@ -172,7 +172,7 @@ def main(devices, options):
command += ["--pbkdf-parallel", str(parallelism)]
subprocess.run(command + [path],
encoding='utf-8', check=True,
encoding='utf8', check=True,
input=passphrase)

View file

@ -51,7 +51,7 @@ def main(devices, options):
]
subprocess.run(command + [path],
encoding='utf-8', check=True,
encoding='utf8', check=True,
input=passphrase)

View file

@ -83,11 +83,11 @@ def main(devices, options):
print(f"LVM2: using vg name '{vg_name}'")
subprocess.run(["pvcreate", path],
encoding='utf-8',
encoding='utf8',
check=True)
subprocess.run(["vgcreate", vg_name, path],
encoding='utf-8',
encoding='utf8',
check=True)
for volume in volumes:
@ -104,7 +104,7 @@ def main(devices, options):
cmd += ["-n", name, vg_name]
subprocess.run(cmd, encoding='utf-8', check=True)
subprocess.run(cmd, encoding='utf8', check=True)
if __name__ == '__main__':

View file

@ -56,7 +56,7 @@ def main(devices, options):
opts = ["-L", label]
subprocess.run(["mkfs.btrfs", "-U", uuid] + opts + [device],
encoding='utf-8', check=True)
encoding='utf8', check=True)
if __name__ == '__main__':

View file

@ -56,7 +56,7 @@ def main(devices, options):
opts = ["-L", label]
subprocess.run(["mkfs.ext4", "-U", uuid] + opts + [device],
encoding='utf-8', check=True)
encoding='utf8', check=True)
if __name__ == '__main__':

View file

@ -69,7 +69,7 @@ def main(devices, options):
opts = ["-F", str(fatsize)]
subprocess.run(["mkfs.fat", "-I", "-i", volid] + opts + [device],
encoding='utf-8', check=True)
encoding='utf8', check=True)
if __name__ == '__main__':

View file

@ -56,7 +56,7 @@ def main(devices, options):
opts = ["-L", label]
subprocess.run(["mkfs.xfs", "-m", f"uuid={uuid}"] + opts + [device],
encoding='utf-8', check=True)
encoding='utf8', check=True)
if __name__ == '__main__':

View file

@ -102,7 +102,7 @@ def main(tree, options):
else:
raise ValueError()
with open(f"{config_dir}/{config_file}", "w") as f:
with open(f"{config_dir}/{config_file}", "w", encoding="utf8") as f:
f.writelines(lines)
return 0

View file

@ -77,7 +77,7 @@ pid {pid};
daemon {daemon};
"""
with open(target, "w") as f:
with open(target, "w", encoding="utf8") as f:
f.write(content)
return 0

View file

@ -191,7 +191,7 @@ def main(tree, options):
else:
raise ValueError(f"Invalid section type: {type(items)}")
with open(cfgfile, "w") as f:
with open(cfgfile, "w", encoding="utf8") as f:
os.fchmod(f.fileno(), 0o600)
cfg.write(f, space_around_delimiters=False)

View file

@ -210,7 +210,7 @@ def main(tree, options):
val = str(value)
config.set(name, option, val)
with open(cfgfile, "w") as f:
with open(cfgfile, "w", encoding="utf8") as f:
# need restrictive permissions
os.fchmod(f.fileno(), 0o600)
config.write(f, space_around_delimiters=False)

View file

@ -179,7 +179,7 @@ XATTRS_WANT = r"^(user.|security\.ima|security\.capability)"
def sha256sum(path: str) -> str:
ret = subprocess.run(["sha256sum", path],
stdout=subprocess.PIPE,
encoding="utf-8",
encoding="utf8",
check=True)
return ret.stdout.strip().split(" ")[0]
@ -202,7 +202,7 @@ def blobs_add_file(blobs: str, path: str, mtype: str):
def blobs_add_json(blobs: str, js: str, mtype: str):
js_file = os.path.join(blobs, "temporary.js")
with open(js_file, "w", encoding="utf-8") as f:
with open(js_file, "w", encoding="utf8") as f:
json.dump(js, f)
return blobs_add_file(blobs, js_file, mtype)
@ -333,12 +333,12 @@ def create_oci_dir(inputs, output_dir, options, create_time):
# index
print("writing index")
index_path = os.path.join(output_dir, "index.json")
with open(index_path, "w", encoding="utf-8") as f:
with open(index_path, "w", encoding="utf8") as f:
json.dump(index, f)
# oci-layout tag
layout_path = os.path.join(output_dir, "oci-layout")
with open(layout_path, "w", encoding="utf-8") as f:
with open(layout_path, "w", encoding="utf8") as f:
json.dump({"imageLayoutVersion": "1.0.0"}, f)

View file

@ -131,7 +131,7 @@ def main(tree, options):
cmd.append(datastream)
res = subprocess.run(cmd, encoding="utf-8", stdout=sys.stderr, check=False)
res = subprocess.run(cmd, encoding="utf8", stdout=sys.stderr, check=False)
# oscap return values are:
# 0 → success

View file

@ -146,7 +146,7 @@ def ostree(*args, _input=None, **kwargs):
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
subprocess.run(["ostree"] + args,
encoding="utf-8",
encoding="utf8",
stdout=sys.stderr,
input=_input,
check=True)
@ -177,7 +177,7 @@ def populate_var(sysroot):
res = subprocess.run(["systemd-tmpfiles", "--create", "--boot",
"--root=" + sysroot,
"--prefix=/var/" + target],
encoding="utf-8",
encoding="utf8",
stdout=sys.stderr,
check=False)
@ -271,7 +271,7 @@ def main(tree, inputs, options):
if not os.path.isfile(cfgfile):
continue
with open(cfgfile, 'r') as f:
with open(cfgfile, 'r', encoding="utf8") as f:
cfg = selinux.parse_config(f)
se_policy = selinux.config_get_policy(cfg)

View file

@ -104,7 +104,7 @@ def main(inputs, output_dir, options, meta):
stdout=sys.stderr,
check=True)
with open(os.path.join(output_dir, "compose.json"), "r") as f:
with open(os.path.join(output_dir, "compose.json"), "r", encoding="utf8") as f:
compose = json.load(f)
api.metadata({"compose": compose})

View file

@ -56,7 +56,7 @@ def ostree(*args, _input=None, **kwargs):
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
subprocess.run(["ostree"] + args,
encoding="utf-8",
encoding="utf8",
stdout=sys.stderr,
input=_input,
check=True)

View file

@ -86,7 +86,7 @@ def ostree(*args, _input=None, **kwargs):
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
subprocess.run(["ostree"] + args,
encoding="utf-8",
encoding="utf8",
stdout=sys.stderr,
input=_input,
check=True)

View file

@ -57,7 +57,7 @@ def populate_var(sysroot):
res = subprocess.run(["systemd-tmpfiles", "--create", "--boot",
"--root=" + sysroot,
"--prefix=/var/" + target],
encoding="utf-8",
encoding="utf8",
stdout=sys.stderr,
check=False)

View file

@ -21,7 +21,7 @@ def ostree(*args, _input=None, **kwargs):
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
subprocess.run(["ostree"] + args,
encoding="utf-8",
encoding="utf8",
stdout=sys.stderr,
input=_input,
check=True)

View file

@ -28,7 +28,7 @@ def ostree(*args, _input=None, **kwargs):
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
subprocess.run(["ostree"] + args,
encoding="utf-8",
encoding="utf8",
stdout=sys.stderr,
input=_input,
check=True)

View file

@ -48,7 +48,7 @@ def ostree(*args, _input=None, **kwargs):
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
subprocess.run(["ostree"] + args,
encoding="utf-8",
encoding="utf8",
stdout=sys.stderr,
input=_input,
check=True)

View file

@ -54,7 +54,7 @@ def ostree(*args, _input=None, **kwargs):
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
subprocess.run(["ostree"] + args,
encoding="utf-8",
encoding="utf8",
stdout=sys.stderr,
input=_input,
check=True)

View file

@ -71,7 +71,7 @@ def ostree(*args, _input=None, **kwargs):
args = list(args) + [f'--{k}={v}' for k, v in kwargs.items()]
print("ostree " + " ".join(args), file=sys.stderr)
subprocess.run(["ostree"] + args,
encoding="utf-8",
encoding="utf8",
stdout=sys.stderr,
input=_input,
check=True)

View file

@ -70,7 +70,7 @@ def main(tree, options):
if not os.path.isfile(cfgfile):
continue
with open(cfgfile, 'r') as f:
with open(cfgfile, 'r', encoding="utf8") as f:
cfg = selinux.parse_config(f)
se_policy = selinux.config_get_policy(cfg)

View file

@ -75,7 +75,7 @@ LocalFileSigLevel = Optional
os.makedirs(os.path.join(tree, "etc"), exist_ok=True)
cfgpath = os.path.join(tree, "etc", "pacman.conf")
with open(cfgpath, "w", encoding="utf-8") as cfgfile:
with open(cfgpath, "w", encoding="utf8") as cfgfile:
cfgfile.write(cfg)
return 0

View file

@ -35,7 +35,7 @@ def main(tree, options):
os.makedirs(os.path.join(tree, *filepath.parts[:-1]), exist_ok=True)
mirrorpath = os.path.join(tree, *filepath.parts)
with open(mirrorpath, "w", encoding="utf-8") as cfgfile:
with open(mirrorpath, "w", encoding="utf8") as cfgfile:
cfgfile.write("\n".join(f"Server = {m}\n" for m in mirrors))
return 0

View file

@ -103,7 +103,7 @@ def main(tree, options):
cfg_line = f'{cfg_item["domain"]} {cfg_item["type"]} {cfg_item["item"]} {cfg_item["value"]}\n'
cfg_lines.append(cfg_line)
with open(f"{limitsd_config_dir}/{filename}", "w") as f:
with open(f"{limitsd_config_dir}/{filename}", "w", encoding="utf8") as f:
f.writelines(cfg_lines)
return 0

View file

@ -138,12 +138,12 @@ class PartitionTable:
subprocess.run(["parted", "-a", "none", "-s",
target, "--"] + commands,
encoding='utf-8',
encoding='utf8',
check=True)
subprocess.run(["parted", "-a", "none", "-s",
target, "--", "print"],
encoding='utf-8',
encoding='utf8',
check=True)

View file

@ -70,7 +70,7 @@ def main(tree, options):
sys.stdout.write(f"{key} = {value}\n")
continue
sys.stdout.write(line)
with open(f"{tree}/etc/security/pwquality.conf", mode="a") as f:
with open(f"{tree}/etc/security/pwquality.conf", mode="a", encoding="utf8") as f:
for key, value in changes.items():
f.write(f"{key} = {value}\n")

View file

@ -56,7 +56,7 @@ def main(tree, options):
data += ["nameserver " + str(ns)]
os.makedirs(os.path.join(tree, "etc"), exist_ok=True)
with open(fullpath, "w") as f:
with open(fullpath, "w", encoding="utf8") as f:
f.write("\n".join(data) + "\n")
return 0

View file

@ -111,7 +111,7 @@ def configure_plugins(tree, path, plugins_options):
plugin_conf = iniparse.SafeConfigParser()
try:
with open(plugin_conf_path, "r") as f:
with open(plugin_conf_path, "r", encoding="utf8") as f:
plugin_conf.readfp(f)
except FileNotFoundError:
print(f"Error: {plugin} configuration file '{plugin_conf_path}' does not exist.")
@ -129,7 +129,7 @@ def configure_plugins(tree, path, plugins_options):
print(f"Error: unknown property {option} specified for {plugin} plugin.")
return 1
with open(plugin_conf_path, "w") as f:
with open(plugin_conf_path, "w", encoding="utf8") as f:
plugin_conf.write(f)
return 0
@ -144,7 +144,7 @@ def configure_rhsm(tree, rhsm_configuration_options):
rhsm_conf = iniparse.SafeConfigParser()
try:
with open(rhsm_config_path, "r") as f:
with open(rhsm_config_path, "r", encoding="utf8") as f:
rhsm_conf.readfp(f)
except FileNotFoundError:
print(f"Error: RHSM configuration file '{rhsm_config_path}' does not exist.")
@ -159,7 +159,7 @@ def configure_rhsm(tree, rhsm_configuration_options):
value = str(value)
rhsm_conf.set(config_section, option, value)
with open(rhsm_config_path, "w") as f:
with open(rhsm_config_path, "w", encoding="utf8") as f:
rhsm_conf.write(f)
return 0

View file

@ -28,7 +28,7 @@ def main(tree, options):
os.makedirs(path, exist_ok=True)
with open(file, "x", encoding="utf-8") as f:
with open(file, "x", encoding="utf8") as f:
json.dump(options["facts"], f)
os.makedirs(os.path.join(tree, "etc/rhsm/facts"), exist_ok=True)

View file

@ -204,7 +204,7 @@ def generate_package_metadata(tree, rpm_args):
]
res = subprocess.run(cmd, stdout=subprocess.PIPE,
check=True, encoding="utf-8")
check=True, encoding="utf8")
raw = res.stdout.strip()
jsdata = '{"packages": [' + raw[:-1] + "]}"
@ -271,7 +271,7 @@ def create_machine_id_if_needed(tree):
return False
os.makedirs(f"{tree}/etc", exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
with open(path, "w", encoding="utf8") as f:
# create a fake machine ID to improve reproducibility
f.write("ffffffffffffffffffffffffffffffff")
os.fchmod(f.fileno(), 0o400)
@ -323,7 +323,7 @@ def main(tree, inputs, options):
if options.get("ostree_booted", False):
os.makedirs(f"{tree}/run", exist_ok=True)
ostree_booted = f"{tree}/{OSTREE_BOOTED_MARKER}"
with open(ostree_booted, "w", encoding="utf-8") as f:
with open(ostree_booted, "w", encoding="utf8") as f:
f.write("")
extra_args = []

View file

@ -57,7 +57,7 @@ def main(tree, options):
path = os.path.join(tree, filename)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
with open(path, "w", encoding="utf8") as f:
for k, v in options["macros"].items():
value = make_value(k, v)
line = f"%{k} {value}\n"

View file

@ -39,7 +39,7 @@ def main(tree, options):
selinux_type_key = "SELINUXTYPE"
selinux_config_lines = []
with open(f"{tree}{selinux_config_file}") as f:
with open(f"{tree}{selinux_config_file}", encoding="utf8") as f:
selinux_config_lines = f.readlines()
for idx, line in enumerate(selinux_config_lines):
@ -53,7 +53,7 @@ def main(tree, options):
elif line_key == selinux_type_key and policy_type:
selinux_config_lines[idx] = f"{selinux_type_key}={policy_type}\n"
with open(f"{tree}{selinux_config_file}", "w") as f:
with open(f"{tree}{selinux_config_file}", "w", encoding="utf8") as f:
f.writelines(selinux_config_lines)
return 0

View file

@ -165,7 +165,7 @@ class PartitionTable:
subprocess.run(["sfdisk", "-q", "--no-tell-kernel", target],
input=command,
encoding='utf-8',
encoding='utf8',
check=True)
if sync:
@ -175,7 +175,7 @@ class PartitionTable:
"""Update and fill in missing information from disk"""
r = subprocess.run(["sfdisk", "--json", target],
stdout=subprocess.PIPE,
encoding='utf-8',
encoding='utf8',
check=True)
disk_table = json.loads(r.stdout)["partitiontable"]
disk_parts = disk_table["partitions"]
@ -214,7 +214,7 @@ def main(devices, options):
pt.write_to(device)
subprocess.run(["sfdisk", "--json", device],
encoding='utf-8',
encoding='utf8',
check=False)

View file

@ -162,7 +162,7 @@ class PartitionTable:
command += cmd
subprocess.run(command,
encoding='utf-8',
encoding='utf8',
check=True)
@ -189,7 +189,7 @@ def main(devices, options):
pt.write_to(device)
subprocess.run(["sgdisk", "-p", device],
encoding='utf-8',
encoding='utf8',
check=False)

View file

@ -101,7 +101,7 @@ def main(tree, options):
sys.stdout.write(f"{key} {entry['value']}\n")
continue
sys.stdout.write(line)
with open(f"{tree}/etc/ssh/sshd_config", mode="a") as f:
with open(f"{tree}/etc/ssh/sshd_config", mode="a", encoding="utf8") as f:
for entry in changes.values():
f.write(f"{entry['key']} {entry['value']}\n")

View file

@ -168,7 +168,7 @@ def configure_kernel(tree, kernel_options):
if not kernel_options:
return
with open(f"{tree}/etc/sysconfig/kernel", 'w') as kernel_file:
with open(f"{tree}/etc/sysconfig/kernel", 'w', encoding="utf8") as kernel_file:
for option, value in kernel_options.items():
if option == "update_default":
kernel_file.write(f"UPDATEDEFAULT={bool_to_string(value)}\n")
@ -183,7 +183,7 @@ def configure_network(tree, network_options):
if not network_options:
return
with open(f"{tree}/etc/sysconfig/network", 'w') as network_file:
with open(f"{tree}/etc/sysconfig/network", 'w', encoding="utf8") as network_file:
for option, value in network_options.items():
if option == "networking":
network_file.write(f"NETWORKING={bool_to_string(value)}\n")
@ -230,7 +230,7 @@ def configure_network_scripts_ifcfg(tree, network_scripts_ifcfg_options):
"config.")
if lines:
with open(f"{tree}/etc/sysconfig/network-scripts/ifcfg-{ifname}", 'w') as ifcfg_file:
with open(f"{tree}/etc/sysconfig/network-scripts/ifcfg-{ifname}", 'w', encoding="utf8") as ifcfg_file:
ifcfg_file.writelines(lines)

View file

@ -94,7 +94,7 @@ def main(tree, options):
cfg_line = f"{key} = {value}\n" if value else f"{key}\n"
cfg_lines.append(cfg_line)
with open(f"{sysctld_config_dir}/{filename}", "w") as f:
with open(f"{sysctld_config_dir}/{filename}", "w", encoding="utf8") as f:
f.writelines(cfg_lines)
return 0

View file

@ -73,7 +73,7 @@ def main(tree, options):
for option, value in opts.items():
config.set(section, option, str(value))
with open(f"{dropins_dir}/{dropin_file}", "w") as f:
with open(f"{dropins_dir}/{dropin_file}", "w", encoding="utf8") as f:
config.write(f, space_around_delimiters=False)
return 0

View file

@ -79,7 +79,7 @@ def main(tree, options):
for option, value in opts.items():
config.set(section, option, str(value))
with open(f"{unit_dropins_dir}/{dropin_file}", "w") as f:
with open(f"{unit_dropins_dir}/{dropin_file}", "w", encoding="utf8") as f:
config.write(f, space_around_delimiters=False)
return 0

View file

@ -42,7 +42,7 @@ StandardOutput=file:/dev/vport0p1
ExecStart={script}
ExecStopPost=/usr/bin/systemctl poweroff
"""
with open(f"{tree}/etc/systemd/system/osbuild-test.service", "w") as f:
with open(f"{tree}/etc/systemd/system/osbuild-test.service", "w", encoding="utf8") as f:
f.write(unit)
os.symlink("../osbuild-test.service",

View file

@ -109,7 +109,7 @@ def main(tree, options):
cfg_line += "\n"
cfg_lines.append(cfg_line)
with open(f"{tmpfilesd_config_dir}/{filename}", "w") as f:
with open(f"{tmpfilesd_config_dir}/{filename}", "w", encoding="utf8") as f:
f.writelines(cfg_lines)
return 0

View file

@ -90,11 +90,11 @@ def main(tree, options):
raise ValueError(f"TuneD profile '{profile}' does not exist")
# Set the active profile
with open(f"{tree}{active_profile_file}", "w") as f:
with open(f"{tree}{active_profile_file}", "w", encoding="utf8") as f:
f.write(" ".join(profiles) + "\n")
# Mode needs to be set to "manual" if set explicitly
with open(f"{tree}{profile_mode_file}", "w") as f:
with open(f"{tree}{profile_mode_file}", "w", encoding="utf8") as f:
f.write("manual\n")
return 0

View file

@ -304,7 +304,7 @@ def main(tree, options):
rules = options["rules"]
path = os.path.join(tree, filename.lstrip("/"))
with open(path, "w", encoding="utf-8") as f:
with open(path, "w", encoding="utf8") as f:
for rule in rules:
if isinstance(rule, dict):
comment = rule.get("comment")

View file

@ -74,7 +74,7 @@ SCHEMA = """
def getpwnam(root, name):
"""Similar to pwd.getpwnam, but takes a @root parameter"""
with open(f"{root}/etc/passwd") as f:
with open(f"{root}/etc/passwd", encoding="utf8") as f:
for line in f:
passwd = line.split(":")
if passwd[0] == name:
@ -131,7 +131,7 @@ def add_ssh_key(root, user, key):
os.mkdir(ssh_dir, 0o700)
os.chown(ssh_dir, int(uid), int(gid))
with open(authorized_keys, "a") as f:
with open(authorized_keys, "a", encoding="utf8") as f:
f.write(f"{key}\n")
os.chown(authorized_keys, int(uid), int(gid))

View file

@ -79,8 +79,8 @@ Vagrant.configure("2") do |config|
end
end
"""
open(f"{tree}/Vagrantfile", "w").write(vagrantfile)
with open(f"{tree}/metadata.json", "w") as fp:
open(f"{tree}/Vagrantfile", "w", encoding="utf8").write(vagrantfile)
with open(f"{tree}/metadata.json", "w", encoding="utf8") as fp:
json.dump(metadata, fp)

View file

@ -73,7 +73,7 @@ def main(tree, options):
sys.stdout.write(f"{key}={entry['value']}\n")
continue
sys.stdout.write(line)
with open(f"{tree}/etc/waagent.conf", mode="a") as f:
with open(f"{tree}/etc/waagent.conf", mode="a", encoding="utf8") as f:
for entry in changes.values():
f.write(f"{entry['key']}={entry['value']}\n")

View file

@ -53,7 +53,7 @@ def main(inputs, output, options):
source = parse_input(inputs)
target = os.path.join(output, filename)
with open(target, "w") as f:
with open(target, "w", encoding="utf8") as f:
env = {
"XZ_OPT": "--threads 0"

View file

@ -62,7 +62,7 @@ def configure_plugins(tree, plugins_options):
plugin_conf = iniparse.SafeConfigParser()
try:
with open(plugin_conf_path, "r") as f:
with open(plugin_conf_path, "r", encoding="utf8") as f:
plugin_conf.readfp(f)
except FileNotFoundError:
print(f"Warning: {plugin} configuration file '{plugin_conf_path}' does not exist, will create it.")
@ -83,7 +83,7 @@ def configure_plugins(tree, plugins_options):
print(f"Error: unknown property {option} specified for {plugin} plugin.")
return 1
with open(plugin_conf_path, "w") as f:
with open(plugin_conf_path, "w", encoding="utf8") as f:
plugin_conf.write(f)
return 0
@ -98,7 +98,7 @@ def main(tree, options):
if config_options:
try:
with open(yum_config_path, "r") as f:
with open(yum_config_path, "r", encoding="utf8") as f:
yum_config.readfp(f)
except FileNotFoundError:
print(f"Warning: YUM configuration file '{yum_config_path}' does not exist, will create it.")
@ -110,7 +110,7 @@ def main(tree, options):
for option, value in config_options.items():
yum_config.set("main", option, value)
with open(yum_config_path, "w") as f:
with open(yum_config_path, "w", encoding="utf8") as f:
yum_config.write(f)
if configure_plugins(tree, plugins_options):

View file

@ -181,7 +181,7 @@ def main(tree, options):
parser.set(repo_id, key, option_value_to_str(value))
# ensure that we won't overwrite an existing file
with open(f"{yum_repos_dir}/{filename}", "x") as f:
with open(f"{yum_repos_dir}/{filename}", "x", encoding="utf8") as f:
parser.write(f, space_around_delimiters=False)
return 0

View file

@ -34,7 +34,7 @@ def main(tree, options):
new_zipl_conf = "\n".join(config) + "\n"
os.makedirs(f"{tree}/etc", exist_ok=True)
with open(f"{tree}/etc/zipl.conf", "w") as f:
with open(f"{tree}/etc/zipl.conf", "w", encoding="utf8") as f:
f.write(new_zipl_conf)
return 0

View file

@ -55,7 +55,7 @@ SCHEMA_2 = r"""
def parse_blsfile(blsfile):
params = {}
with open(blsfile, "r") as bls:
with open(blsfile, "r", encoding="utf8") as bls:
for line in bls:
key, value = line.split(' ', 1)
params[key] = value.strip()