rmtree: use fork

These changes work around a thread safety issue in our rmtree
implementation, which uses chdir to traverse the directory tree.
Using chdir resolves issues deleting paths longer than PATH_MAX, but
makes the code inherently unsafe in a threaded environment.

Now, the main rmtree function uses fork to perform the actions in a
dedicated process.
To avoid possible locking issues with the logging module, we introduce a
simple proxy logger for the subprocess.

Fixes: https://pagure.io/koji/issue/3755

For historical context see:
https://pagure.io/koji/issue/201
https://pagure.io/koji/issue/2481
https://pagure.io/koji/issue/2714
This commit is contained in:
Mike McLean 2024-02-12 10:26:47 -05:00 committed by Tomas Kopecek
parent c4b50c65c7
commit 4fddafc54d
2 changed files with 456 additions and 136 deletions

View file

@ -25,6 +25,7 @@ import calendar
import datetime
import errno
import hashlib
import json
import logging
import os
import os.path
@ -34,6 +35,7 @@ import shutil
import stat
import struct
import sys
import tempfile
import time
import warnings
from fnmatch import fnmatch
@ -434,7 +436,113 @@ class _RetryRmtree(Exception):
def rmtree(path, logger=None):
"""Delete a directory tree without crossing fs boundaries"""
"""Delete a directory tree without crossing fs boundaries
:param str path: the directory to remove
:param Logger logger: Logger object
"""
# we use the fake logger to avoid issues with logging locks while forking
fd, logfile = tempfile.mkstemp(suffix='.jsonl')
os.close(fd)
pid = os.fork()
if not pid:
# child process
try:
status = 1
with SimpleProxyLogger(logfile) as mylogger:
try:
_rmtree_nofork(path, logger=mylogger)
except Exception as e:
mylogger.error('rmtree failed: %s' % e)
raise
status = 0
finally:
# diediedie
os._exit(status)
# not reached
# parent process
_pid, status = os.waitpid(pid, 0)
logger = logger or logging.getLogger('koji')
try:
SimpleProxyLogger.send(logfile, logger)
except Exception as err:
logger.error("Failed to get rmtree logs -- %s" % err)
if not isSuccess(status):
raise koji.GenericError(parseStatus(status, "rmtree process"))
if os.path.exists(path):
raise koji.GenericError("Failed to remove directory: %s" % path)
class SimpleProxyLogger(object):
"""Save log messages to a file and log them later"""
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
def __init__(self, filename):
self.outfile = koji._open_text_file(filename, mode='wt')
# so we can use as a context manager
def __enter__(self):
return self
def __exit__(self, _type, value, traceback):
self.outfile.close()
# don't eat exceptions
return False
def log(self, level, msg, *args, **kwargs):
# jsonl output
data = [level, msg, args, kwargs]
try:
line = json.dumps(data, indent=None)
except Exception:
try:
data = [logging.ERROR, "Unable to log: %s" % data, (), {}]
line = json.dumps(data, indent=None)
except Exception:
line = '[40, "Invalid log data", [], {}]'
try:
self.outfile.write(line)
self.outfile.write('\n')
except Exception:
pass
def info(self, msg, *args, **kwargs):
self.log(self.INFO, msg, *args, **kwargs)
def warning(self, msg, *args, **kwargs):
self.log(self.WARNING, msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
self.log(self.ERROR, msg, *args, **kwargs)
def debug(self, msg, *args, **kwargs):
self.log(self.DEBUG, msg, *args, **kwargs)
@staticmethod
def send(filename, logger):
with koji._open_text_file(filename, mode='rt') as fo:
for line in fo:
try:
level, msg, args, kwargs = json.loads(line)
except Exception:
level = logging.ERROR
msg = "Bad log data: %r"
args = (line,)
logger.log(level, msg, *args, **kwargs)
def _rmtree_nofork(path, logger=None):
"""Delete a directory tree without crossing fs boundaries
This function is not thread safe because it relies on chdir to avoid
forming long paths.
"""
# implemented to avoid forming long paths
# see: https://pagure.io/koji/issue/201
logger = logger or logging.getLogger('koji')
@ -446,6 +554,7 @@ def rmtree(path, logger=None):
if not stat.S_ISDIR(st.st_mode):
raise koji.GenericError("Not a directory: %s" % path)
dev = st.st_dev
new_cwd = os.path.abspath(path)
cwd = os.getcwd()
try:
@ -461,7 +570,7 @@ def rmtree(path, logger=None):
return
raise
try:
_rmtree(dev, logger)
_rmtree(dev, new_cwd, logger)
except _RetryRmtree as e:
# reset and retry
os.chdir(cwd)
@ -479,37 +588,45 @@ def rmtree(path, logger=None):
raise
def _rmtree(dev, logger):
def _rmtree(dev, cwd, logger):
"""Remove all contents of CWD"""
# This implementation avoids forming long paths and recursion. Otherwise
# we will have errors with very deep directory trees.
# - to avoid forming long paths we change directory as we go
# - to avoid recursion we maintain our own stack
dirstack = []
# Each entry in dirstack is a list of subdirs for that level
# Each entry in dirstack contains data for a level of directory traversal
# - path
# - subdirs
# As we descend into the tree, we append a new entry to dirstack
# When we ascend back up after removal, we pop them off
while True:
dirs = _stripcwd(dev, logger)
dirs = _stripcwd(dev, cwd, logger)
# if cwd has no subdirs, walk back up until we find some
while not dirs and dirstack:
_assert_cwd(cwd)
try:
os.chdir('..')
except OSError as e:
_assert_cwd(cwd)
if e.errno in (errno.ENOENT, errno.ESTALE):
# likely in a race with another rmtree
# however, we cannot proceed from here, so we return to the top
raise _RetryRmtree(str(e))
raise
dirs = dirstack.pop()
cwd = os.path.dirname(cwd)
# now that we've ascended back up by one, the first dir entry is
# now that we've ascended back up by one, the last dir entry is
# one we've just cleared, so we should remove it
empty_dir = dirs.pop()
_assert_cwd(cwd)
try:
os.rmdir(empty_dir)
except OSError as e:
_assert_cwd(cwd)
# If this happens, either something else is writing to the dir,
# or there is a bug in our code.
# For now, we ignore this and proceed, but we'll still fail at
@ -524,9 +641,11 @@ def _rmtree(dev, logger):
# otherwise we descend into the next subdir
subdir = dirs[-1]
# note: we do not pop here because we need to remember to remove subdir later
_assert_cwd(cwd)
try:
os.chdir(subdir)
except OSError as e:
_assert_cwd(cwd)
if e.errno == errno.ENOENT:
# likely in a race with another rmtree
# we'll ignore this and continue
@ -535,12 +654,27 @@ def _rmtree(dev, logger):
logger.warning("Subdir disappeared during rmtree %s: %s" % (subdir, e))
continue # with dirstack unchanged
raise
cwd = os.path.join(cwd, subdir)
dirstack.append(dirs)
def _stripcwd(dev, logger):
def _assert_cwd(cwd):
try:
actual = os.getcwd()
except OSError as e:
if e.errno == errno.ENOENT:
# subsequent calls should fail with better handling
return
raise
if cwd != actual:
raise koji.GenericError('CWD changed unexpectedly: %s -> %s' % (cwd, actual))
print('CWD changed unexpectedly: %s -> %s' % (cwd, actual))
def _stripcwd(dev, cwd, logger):
"""Unlink all files in cwd and return list of subdirs"""
dirs = []
_assert_cwd(cwd)
try:
fdirs = os.listdir('.')
except OSError as e:
@ -553,6 +687,7 @@ def _stripcwd(dev, logger):
try:
st = os.lstat(fn)
except OSError as e:
_assert_cwd(cwd)
if e.errno == errno.ENOENT:
continue
raise
@ -562,6 +697,7 @@ def _stripcwd(dev, logger):
if stat.S_ISDIR(st.st_mode):
dirs.append(fn)
else:
_assert_cwd(cwd)
try:
os.unlink(fn)
except OSError: