objectstore: extract remove_tree()

Move remove_tree() into its own module in `osbuild.util.rmrf`. This way
we can use it in other modules as well, without cross-referencing
internal helpers.
This commit is contained in:
David Rheinsberg 2020-04-20 12:15:21 +02:00 committed by Tom Gundersen
parent 4ad4da4658
commit 2cc9160099
4 changed files with 152 additions and 77 deletions

View file

@ -1,14 +1,13 @@
import array
import contextlib
import errno
import fcntl
import hashlib
import os
import shutil
import subprocess
import tempfile
import weakref
from typing import Optional
import osbuild.util.rmrf as rmrf
from . import treesum
@ -54,53 +53,6 @@ def umount(target, lazy=True):
subprocess.run(["umount"] + args + [target], check=True)
def clear_mutable_flag(path):
FS_IOC_GETFLAGS = 0x80086601
FS_IOC_SETFLAGS = 0x40086602
FS_IMMUTABLE_FL = 0x010
fd = -1
try:
fd = os.open(path, os.O_RDONLY)
flags = array.array('L', [0])
fcntl.ioctl(fd, FS_IOC_GETFLAGS, flags, True)
flags[0] &= ~FS_IMMUTABLE_FL
fcntl.ioctl(fd, FS_IOC_SETFLAGS, flags, False)
except OSError:
pass # clearing flags is best effort
finally:
if fd > -1:
os.close(fd)
def remove_tree(path):
def fixperms(p):
clear_mutable_flag(p)
os.chmod(p, 0o777)
def unlink(p):
try:
os.unlink(p)
except IsADirectoryError:
remove_tree(p)
except FileNotFoundError:
pass
def on_error(_fn, p, exc_info):
e = exc_info[0]
if issubclass(e, FileNotFoundError):
pass
elif issubclass(e, PermissionError):
if p != path:
fixperms(os.path.dirname(p))
fixperms(p)
unlink(p)
else:
raise e
shutil.rmtree(path, onerror=on_error)
# pylint: disable=too-many-instance-attributes
class Object:
def __init__(self, store: "ObjectStore"):
@ -211,7 +163,7 @@ class Object:
# manually remove the tree, it might contain
# files with immutable flag set, which will
# throw off standard Python 3 tempdir cleanup
remove_tree(self._tree)
rmrf.rmtree(self._tree)
self._tree = None
if self._workdir:
self._workdir.cleanup()

97
osbuild/util/rmrf.py Normal file
View file

@ -0,0 +1,97 @@
"""Recursive File System Removal
This module implements `rm -rf` as a python function. Its core is the
`rmtree()` function, which takes a file-system path and then recursively
deletes everything it finds on that path, until eventually the path entry
itself is dropped. This is modeled around `shutil.rmtree()`.
This function tries to be as thorough as possible. That is, it tries its best
to modify permission bits and other flags to make sure directory entries can be
removed.
"""
import array
import fcntl
import os
import shutil
__all__ = [
"rmtree",
]
def rmtree(path: str):
"""Recursively Remove from File System
This removes the object at the given path from the file-system. It
recursively iterates through its content and removes them, before removing
the object itself.
This function is modeled around `shutil.rmtree()`, but extends its
functionality with a more aggressive approach. It tries much harder to
unlink file system objects. This includes immutable markers and more.
Note that this function can still fail. In particular, missing permissions
can always prevent this function from succeeding. However, a caller should
never assume that they can intentionally prevent this function from
succeeding. In other words, this function might be extended in any way in
the future, to be more powerful and successful in removing file system
objects.
Parameters
---------
path
A file system path pointing to the object to remove.
Raises
------
Exception
This raises the same exceptions as `shutil.rmtree()` (since that
function is used internally). Consult its documentation for details.
"""
def clear_mutable_flag(path):
FS_IOC_GETFLAGS = 0x80086601
FS_IOC_SETFLAGS = 0x40086602
FS_IMMUTABLE_FL = 0x010
fd = -1
try:
fd = os.open(path, os.O_RDONLY)
flags = array.array('L', [0])
fcntl.ioctl(fd, FS_IOC_GETFLAGS, flags, True)
flags[0] &= ~FS_IMMUTABLE_FL
fcntl.ioctl(fd, FS_IOC_SETFLAGS, flags, False)
except OSError:
pass # clearing flags is best effort
finally:
if fd > -1:
os.close(fd)
def fixperms(p):
clear_mutable_flag(p)
os.chmod(p, 0o777)
def unlink(p):
try:
os.unlink(p)
except IsADirectoryError:
rmtree(p)
except FileNotFoundError:
pass
def on_error(_fn, p, exc_info):
e = exc_info[0]
if issubclass(e, FileNotFoundError):
pass
elif issubclass(e, PermissionError):
if p != path:
fixperms(os.path.dirname(p))
fixperms(p)
unlink(p)
else:
raise e
shutil.rmtree(path, onerror=on_error)

View file

@ -2,25 +2,11 @@ import os
import shutil
import tempfile
import unittest
import subprocess
from pathlib import Path
from osbuild import objectstore
def can_set_immutable():
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
try:
os.makedirs(f"{tmp}/f")
# fist they give it ...
subprocess.run(["chattr", "+i", f"{tmp}/f"], check=True)
# ... then they take it away
subprocess.run(["chattr", "-i", f"{tmp}/f"], check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
return False
return True
class TestObjectStore(unittest.TestCase):
@classmethod
@ -87,18 +73,6 @@ class TestObjectStore(unittest.TestCase):
# there should be no temporary Objects dirs anymore
self.assertEqual(len(os.listdir(object_store.tmp)), 0)
# pylint: disable=no-self-use
@unittest.skipUnless(can_set_immutable(), "Need root permissions")
def test_cleanup_immutable(self):
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
with objectstore.ObjectStore(tmp) as object_store:
tree = object_store.new()
with tree.write() as path:
p = Path(f"{path}/A")
p.touch()
subprocess.run(["chattr", "+i", f"{path}/A"],
check=True)
# pylint: disable=no-self-use
def test_duplicate(self):
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:

52
test/test_util_rmrf.py Normal file
View file

@ -0,0 +1,52 @@
#
# Tests for the `osbuild.util.rmrf` module.
#
import os
import pathlib
import shutil
import subprocess
import tempfile
import unittest
import osbuild.util.rmrf as rmrf
def can_set_immutable():
with tempfile.TemporaryDirectory(dir="/var/tmp") as tmp:
try:
os.makedirs(f"{tmp}/f")
# fist they give it ...
subprocess.run(["chattr", "+i", f"{tmp}/f"], check=True)
# ... then they take it away
subprocess.run(["chattr", "-i", f"{tmp}/f"], check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
return False
return True
class TestUtilLinux(unittest.TestCase):
@unittest.skipUnless(can_set_immutable(), "Need root permissions")
def test_rmtree_immutable(self):
#
# Test the `rmrf.rmtree()` helper and verify it can correctly unlink
# files that are marked immutable.
#
with tempfile.TemporaryDirectory(dir="/var/tmp") as vartmpdir:
os.makedirs(f"{vartmpdir}/dir")
p = pathlib.Path(f"{vartmpdir}/dir/immutable")
p.touch()
subprocess.run(["chattr", "+i", f"{vartmpdir}/dir/immutable"],
check=True)
with self.assertRaises(PermissionError):
shutil.rmtree(f"{vartmpdir}/dir")
rmrf.rmtree(f"{vartmpdir}/dir")
if __name__ == "__main__":
unittest.main()