Protect against decoding errors with subprocess text mode

All these are calling subprocess in 'text mode', where it will
try to decode stdout/stderr using the default encoding (utf-8
for us). If it doesn't decode, subprocess will raise an exception
and kobo doesn't handle it, it just passes it along to us, so
things blow up - see https://pagure.io/releng/issue/12474 . To
avoid this, let's set `errors="replace"`, which tells the decoder
to replace invalid data with ? characters. This way we should get
as much of the output as can be read, and no crashes.

We also replace `universal_newlines=True` with `text=True` as
the latter is shorter, clearer, and what Python 3 subprocess
wants us to use, it considers `universal_newlines` to just be
a backwards-compatibility thing - "The universal_newlines argument
is equivalent to text and is provided for backwards compatibility"

Signed-off-by: Adam Williamson <awilliam@redhat.com>
Merges: https://pagure.io/pungi/pull-request/1812
This commit is contained in:
Adam Williamson 2025-01-15 11:00:33 -08:00 committed by Lubomír Sedlář
parent 98e3b3f8c4
commit 2d16a3af00
17 changed files with 132 additions and 70 deletions

View file

@ -16,7 +16,8 @@ def get_full_version():
proc = subprocess.Popen(
["git", "--git-dir=%s/.git" % location, "describe", "--tags"],
stdout=subprocess.PIPE,
universal_newlines=True,
text=True,
errors="replace",
)
output, _ = proc.communicate()
return re.sub(r"-1.fc\d\d?", "", output.strip().replace("pungi-", ""))
@ -24,7 +25,7 @@ def get_full_version():
import subprocess
proc = subprocess.Popen(
["rpm", "-q", "pungi"], stdout=subprocess.PIPE, universal_newlines=True
["rpm", "-q", "pungi"], stdout=subprocess.PIPE, text=True, errors="replace"
)
(output, err) = proc.communicate()
if not err:

View file

@ -104,7 +104,8 @@ class PungiNotifier(object):
workdir=workdir,
return_stdout=False,
show_cmd=True,
universal_newlines=True,
text=True,
errors="replace",
logfile=logfile,
)
if ret != 0:

View file

@ -64,7 +64,8 @@ class Tree(OSTree):
show_cmd=True,
stdout=True,
logfile=log_file,
universal_newlines=True,
text=True,
errors="replace",
)
finally:
os.umask(oldumask)
@ -77,7 +78,8 @@ class Tree(OSTree):
show_cmd=True,
stdout=True,
logfile=log_file,
universal_newlines=True,
text=True,
errors="replace",
)
def _update_ref(self):

View file

@ -139,7 +139,7 @@ class OSTreeContainerThread(WorkerThread):
"--version=%s" % version,
]
_, runroot_script = shortcuts.run(cmd, universal_newlines=True)
_, runroot_script = shortcuts.run(cmd, text=True, errors="replace")
default_packages = ["ostree", "rpm-ostree", "selinux-policy-targeted"]
additional_packages = config.get("runroot_packages", [])

View file

@ -652,7 +652,11 @@ def run_unmount_cmd(cmd, max_retries=10, path=None, logger=None):
"""
for i in range(max_retries):
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
errors="replace",
)
out, err = proc.communicate()
if proc.returncode == 0:
@ -674,7 +678,8 @@ def run_unmount_cmd(cmd, max_retries=10, path=None, logger=None):
c,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
text=True,
errors="replace",
)
out, _ = proc.communicate()
logger.debug(
@ -879,7 +884,7 @@ def git_ls_remote(baseurl, ref, credential_helper=None):
if credential_helper:
cmd.extend(["-c", "credential.useHttpPath=true"])
cmd.extend(["-c", "credential.helper=%s" % credential_helper])
return run(cmd + ["ls-remote", baseurl, ref], universal_newlines=True)
return run(cmd + ["ls-remote", baseurl, ref], text=True, errors="replace")
def get_tz_offset():

View file

@ -227,7 +227,7 @@ def get_checkisomd5_cmd(iso_path, just_print=False):
def get_checkisomd5_data(iso_path, logger=None):
cmd = get_checkisomd5_cmd(iso_path, just_print=True)
retcode, output = run(cmd, universal_newlines=True)
retcode, output = run(cmd, text=True, errors="replace")
items = [line.strip().rsplit(":", 1) for line in output.splitlines()]
items = dict([(k, v.strip()) for k, v in items])
md5 = items.get(iso_path, "")
@ -283,13 +283,13 @@ def get_manifest_cmd(iso_name, xorriso=False, output_file=None):
def get_volume_id(path, xorriso=False):
if xorriso:
cmd = ["xorriso", "-indev", path]
retcode, output = run(cmd, universal_newlines=True)
retcode, output = run(cmd, text=True, errors="replace")
for line in output.splitlines():
if line.startswith("Volume id"):
return line.split("'")[1]
else:
cmd = ["isoinfo", "-d", "-i", path]
retcode, output = run(cmd, universal_newlines=True)
retcode, output = run(cmd, text=True, errors="replace")
for line in output.splitlines():
line = line.strip()
@ -500,7 +500,7 @@ def mount(image, logger=None, use_guestmount=True):
else:
env = {}
cmd = ["mount", "-o", "loop", image, mount_dir]
ret, out = run(cmd, env=env, can_fail=True, universal_newlines=True)
ret, out = run(cmd, env=env, can_fail=True, text=True, errors="replace")
if ret != 0:
# The mount command failed, something is wrong.
# Log the output and raise an exception.

View file

@ -294,7 +294,8 @@ class KojiWrapper(object):
show_cmd=True,
env=env,
buffer_size=-1,
universal_newlines=True,
text=True,
errors="replace",
)
# Look for first line that contains only a number. This is the ID of
@ -430,7 +431,7 @@ class KojiWrapper(object):
while True:
retcode, output = run(
cmd, can_fail=True, logfile=logfile, universal_newlines=True
cmd, can_fail=True, logfile=logfile, text=True, errors="replace"
)
if retcode == 0 or not (
@ -463,7 +464,8 @@ class KojiWrapper(object):
logfile=log_file,
env=env,
buffer_size=-1,
universal_newlines=True,
text=True,
errors="replace",
)
match = re.search(r"Created task: (\d+)", output)
@ -808,7 +810,8 @@ def get_buildroot_rpms(compose, task_id):
# local
retcode, output = run(
"rpm -qa --qf='%{name}-%{version}-%{release}.%{arch}\n'",
universal_newlines=True,
text=True,
errors="replace",
)
for i in output.splitlines():
if not i:

View file

@ -56,7 +56,8 @@ class ScmBase(kobo.log.LoggingBase):
workdir=cwd,
can_fail=True,
stdin_data="",
universal_newlines=True,
text=True,
errors="replace",
)
if retcode != 0:
self.log_error("Output was: %r" % output)