#!/usr/bin/env python3 import argparse import contextlib import hashlib import json import os def hash_file(fd): BLOCK_SIZE = 4096 hasher = hashlib.sha256() buf = os.read(fd, BLOCK_SIZE) while len(buf) > 0: hasher.update(buf) buf = os.read(fd, BLOCK_SIZE) return f"sha256:{hasher.hexdigest()}" def stat_diff(stat1, stat2, path, differences): if stat1.st_mode != stat2.st_mode: props = differences.setdefault(path, {}) props["mode"] = [stat1.st_mode, stat2.st_mode] return False if stat1.st_uid != stat2.st_uid: props = differences.setdefault(path, {}) props["uid"] = [stat1.st_uid, stat2.st_uid] if stat1.st_gid != stat2.st_gid: props = differences.setdefault(path, {}) props["gid"] = [stat1.st_gid, stat2.st_gid] return True def selinux_diff(path1, path2, path, differences): label1, label2 = "", "" with contextlib.suppress(OSError): label1 = os.getxattr(path1, b"security.selinux", follow_symlinks=False).decode() with contextlib.suppress(OSError): label2 = os.getxattr(path2, b"security.selinux", follow_symlinks=False).decode() if label1 != label2: props = differences.setdefault(path, {}) props["selinux"] = [label1.strip('\n\0'), label2.strip('\n\0')] return False return True def content_diff(name, dir_fd1, dir_fd2, path, differences): try: fd1 = os.open(name, flags=os.O_RDONLY, dir_fd=dir_fd1) except OSError: return try: fd2 = os.open(name, flags=os.O_RDONLY, dir_fd=dir_fd2) except OSError: os.close(fd1) return try: hash1 = hash_file(fd1) hash2 = hash_file(fd2) if hash1 != hash2: props = differences.setdefault(path, {}) props["content"] = [hash1, hash2] finally: os.close(fd1) os.close(fd2) def symlink_diff(name, dir_fd1, dir_fd2, path, differences): try: target1 = os.readlink(name, dir_fd=dir_fd1) target2 = os.readlink(name, dir_fd=dir_fd2) except OSError: return if target1 != target2: props = differences.setdefault(path, {}) props["symlink"] = [os.fsdecode(target1), os.fsdecode(target2)] # pylint: disable=too-many-branches def diff_aux(dir_fd1, dir_fd2, path, report): entries1 = set() with os.scandir(f"/proc/self/fd/{dir_fd1}") as it: for dirent in it: try: stat2 = os.stat(dirent.name, dir_fd=dir_fd2, follow_symlinks=False) except FileNotFoundError: report["deleted_files"] += [os.path.join(path, dirent.name)] if dirent.is_dir(follow_symlinks=False): try: child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd1) except OSError: continue list_dir(child_fd, os.path.join(path, dirent.name), report["deleted_files"]) os.close(child_fd) continue entries1.add(dirent.name) stat1 = dirent.stat(follow_symlinks=False) selinux_diff(os.path.join(f"/proc/self/fd/{dir_fd1}", dirent.name), os.path.join(f"/proc/self/fd/{dir_fd2}", dirent.name), os.path.join(path, dirent.name), report["differences"]) if not stat_diff(stat1, stat2, os.path.join(path, dirent.name), report["differences"]): continue if dirent.is_symlink(): symlink_diff(dirent.name, dir_fd1, dir_fd2, os.path.join(path, dirent.name), report["differences"]) elif dirent.is_file(follow_symlinks=False): content_diff(dirent.name, dir_fd1, dir_fd2, os.path.join(path, dirent.name), report["differences"]) elif dirent.is_dir(follow_symlinks=False): try: child_fd1 = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd1) except OSError: continue try: child_fd2 = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd2) except OSError: os.close(child_fd1) continue diff_aux(child_fd1, child_fd2, os.path.join(path, dirent.name), report) os.close(child_fd2) os.close(child_fd1) with os.scandir(f"/proc/self/fd/{dir_fd2}") as it: for dirent in it: if dirent.name not in entries1: report["added_files"] += [os.path.join(path, dirent.name)] if dirent.is_dir(follow_symlinks=False): try: child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd2) except OSError: continue list_dir(child_fd, os.path.join(path, dirent.name), report["added_files"]) os.close(child_fd) def diff(dir_fd1, dir_fd2, report): stat1 = os.stat(".", dir_fd=dir_fd1, follow_symlinks=False) stat2 = os.stat(".", dir_fd=dir_fd2, follow_symlinks=False) selinux_diff(f"/proc/self/fd/{dir_fd1}", f"/proc/self/fd/{dir_fd2}", "/", report["differences"]) stat_diff(stat1, stat2, "/", report["differences"]) diff_aux(dir_fd1, dir_fd2, "/", report) def list_dir(dir_fd, path, target_list): with os.scandir(f"/proc/self/fd/{dir_fd}") as it: for dirent in it: p = os.path.join(path, dirent.name) target_list.append(p) if dirent.is_dir(follow_symlinks=False): try: child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd) except OSError: continue list_dir(child_fd, p, target_list) os.close(child_fd) def main(): parser = argparse.ArgumentParser(description="Recursively compare file system trees") parser.add_argument("dir1", metavar="DIRECTORY1", help="first directory to compare") parser.add_argument("dir2", metavar="DIRECTORY2", help="second directory to compare") args = parser.parse_args() report = {} report["added_files"] = [] report["deleted_files"] = [] report["differences"] = {} dir_fd1 = os.open(args.dir1, os.O_DIRECTORY) dir_fd2 = os.open(args.dir2, os.O_DIRECTORY) diff(dir_fd1, dir_fd2, report) os.close(dir_fd2) os.close(dir_fd1) report["added_files"].sort() report["deleted_files"].sort() print(json.dumps(report, indent=2, sort_keys=True)) if __name__ == "__main__": main()