debian-forge/tools/tree-diff
Christian Kellner 33283853e9 tree-diff: properly detect missing selinux labels
Detect the case that one file has a SELinux label but the other
file does not have any label at all. This was currently not
possible to detect because both calls to get the labels were
wrapped in one try-except block and any failure in one of the
two calls, like a missing label, would lead to an early return
of the function, with a success value.
2021-01-26 12:09:23 +01:00

194 lines
6.7 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import contextlib
import hashlib
import json
import os
def hash_file(fd):
BLOCK_SIZE = 4096
hasher = hashlib.sha256()
buf = os.read(fd, BLOCK_SIZE)
while len(buf) > 0:
hasher.update(buf)
buf = os.read(fd, BLOCK_SIZE)
return f"sha256:{hasher.hexdigest()}"
def stat_diff(stat1, stat2, path, differences):
if stat1.st_mode != stat2.st_mode:
props = differences.setdefault(path, {})
props["mode"] = [stat1.st_mode, stat2.st_mode]
return False
if stat1.st_uid != stat2.st_uid:
props = differences.setdefault(path, {})
props["uid"] = [stat1.st_uid, stat2.st_uid]
if stat1.st_gid != stat2.st_gid:
props = differences.setdefault(path, {})
props["gid"] = [stat1.st_gid, stat2.st_gid]
return True
def selinux_diff(path1, path2, path, differences):
label1, label2 = "", ""
with contextlib.suppress(OSError):
label1 = os.getxattr(path1, b"security.selinux", follow_symlinks=False).decode()
with contextlib.suppress(OSError):
label2 = os.getxattr(path2, b"security.selinux", follow_symlinks=False).decode()
if label1 != label2:
props = differences.setdefault(path, {})
props["selinux"] = [label1.strip('\n\0'), label2.strip('\n\0')]
return False
return True
def content_diff(name, dir_fd1, dir_fd2, path, differences):
try:
fd1 = os.open(name, flags=os.O_RDONLY, dir_fd=dir_fd1)
except OSError:
return
try:
fd2 = os.open(name, flags=os.O_RDONLY, dir_fd=dir_fd2)
except OSError:
os.close(fd1)
return
try:
hash1 = hash_file(fd1)
hash2 = hash_file(fd2)
if hash1 != hash2:
props = differences.setdefault(path, {})
props["content"] = [hash1, hash2]
finally:
os.close(fd1)
os.close(fd2)
def symlink_diff(name, dir_fd1, dir_fd2, path, differences):
try:
target1 = os.readlink(name, dir_fd=dir_fd1)
target2 = os.readlink(name, dir_fd=dir_fd2)
except OSError:
return
if target1 != target2:
props = differences.setdefault(path, {})
props["symlink"] = [os.fsdecode(target1), os.fsdecode(target2)]
def diff_aux(dir_fd1, dir_fd2, path, report):
entries1 = set()
with os.scandir(dir_fd1) as it:
for dirent in it:
try:
stat2 = os.stat(dirent.name, dir_fd=dir_fd2, follow_symlinks=False)
except FileNotFoundError:
report["deleted_files"] += [os.path.join(path, dirent.name)]
if dirent.is_dir(follow_symlinks=False):
try:
child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd1)
except OSError:
continue
list_dir(child_fd, os.path.join(path, dirent.name), report["deleted_files"])
os.close(child_fd)
continue
entries1.add(dirent.name)
stat1 = dirent.stat(follow_symlinks=False)
selinux_diff(os.path.join(f"/proc/self/fd/{dir_fd1}", dirent.name),
os.path.join(f"/proc/self/fd/{dir_fd2}", dirent.name),
os.path.join(path, dirent.name),
report["differences"])
if not stat_diff(stat1,
stat2,
os.path.join(path, dirent.name),
report["differences"]):
continue
if dirent.is_symlink():
symlink_diff(dirent.name,
dir_fd1,
dir_fd2,
os.path.join(path, dirent.name),
report["differences"])
elif dirent.is_file(follow_symlinks=False):
content_diff(dirent.name,
dir_fd1,
dir_fd2,
os.path.join(path, dirent.name),
report["differences"])
elif dirent.is_dir(follow_symlinks=False):
try:
child_fd1 = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd1)
except OSError:
continue
try:
child_fd2 = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd2)
except OSError:
os.close(child_fd1)
continue
diff_aux(child_fd1, child_fd2, os.path.join(path, dirent.name), report)
os.close(child_fd2)
os.close(child_fd1)
with os.scandir(dir_fd2) as it:
for dirent in it:
if dirent.name not in entries1:
report["added_files"] += [os.path.join(path, dirent.name)]
if dirent.is_dir(follow_symlinks=False):
try:
child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd2)
except OSError:
continue
list_dir(child_fd, os.path.join(path, dirent.name), report["added_files"])
os.close(child_fd)
def diff(dir_fd1, dir_fd2, report):
stat1 = os.stat(".", dir_fd=dir_fd1, follow_symlinks=False)
stat2 = os.stat(".", dir_fd=dir_fd2, follow_symlinks=False)
selinux_diff(f"/proc/self/fd/{dir_fd1}", f"/proc/self/fd/{dir_fd2}", "/", report["differences"])
stat_diff(stat1, stat2, "/", report["differences"])
diff_aux(dir_fd1, dir_fd2, "/", report)
def list_dir(dir_fd, path, target_list):
with os.scandir(dir_fd) as it:
for dirent in it:
p = os.path.join(path, dirent.name)
target_list.append(p)
if dirent.is_dir(follow_symlinks=False):
try:
child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd)
except OSError:
continue
list_dir(child_fd, p, target_list)
os.close(child_fd)
def main():
parser = argparse.ArgumentParser(description="Recursively compare file system trees")
parser.add_argument("dir1", metavar="DIRECTORY1",
help="first directory to compare")
parser.add_argument("dir2", metavar="DIRECTORY2",
help="second directory to compare")
args = parser.parse_args()
report = {}
report["added_files"] = []
report["deleted_files"] = []
report["differences"] = {}
dir_fd1 = os.open(args.dir1, os.O_DIRECTORY)
dir_fd2 = os.open(args.dir2, os.O_DIRECTORY)
diff(dir_fd1, dir_fd2, report)
os.close(dir_fd2)
os.close(dir_fd1)
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()