In my opinion it is better to dump everything and then filter out unneeded entries elsewhere.
184 lines
6.8 KiB
Python
Executable file
184 lines
6.8 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
|
|
|
|
def stat_diff(stat1, stat2, path, differences):
|
|
if stat1.st_mode != stat2.st_mode:
|
|
props = differences.setdefault(path, {})
|
|
props["mode"] = [stat1.st_mode, stat2.st_mode]
|
|
return False
|
|
if stat1.st_uid != stat2.st_uid:
|
|
props = differences.setdefault(path, {})
|
|
props["uid"] = [stat1.st_uid, stat2.st_uid]
|
|
if stat1.st_gid != stat2.st_gid:
|
|
props = differences.setdefault(path, {})
|
|
props["gid"] = [stat1.st_gid, stat2.st_gid]
|
|
return True
|
|
|
|
|
|
def selinux_diff(path1, path2, path, differences):
|
|
try:
|
|
label1 = os.getxattr(path1, b"security.selinux", follow_symlinks=False).decode()
|
|
label2 = os.getxattr(path2, b"security.selinux", follow_symlinks=False).decode()
|
|
except OSError:
|
|
return True
|
|
if label1 != label2:
|
|
props = differences.setdefault(path, {})
|
|
props["selinux"] = [os.fsdecode(label1), os.fsdecode(label2)]
|
|
return False
|
|
return True
|
|
|
|
|
|
def content_diff(name, dir_fd1, dir_fd2, size1, size2, path, differences):
|
|
if size1 != size2:
|
|
props = differences.setdefault(path, {})
|
|
props["content"] = "different"
|
|
return
|
|
try:
|
|
fd1 = os.open(name, flags=os.O_RDONLY, dir_fd=dir_fd1)
|
|
except OSError:
|
|
return
|
|
try:
|
|
fd2 = os.open(name, flags=os.O_RDONLY, dir_fd=dir_fd2)
|
|
except OSError:
|
|
os.close(fd1)
|
|
return
|
|
try:
|
|
for (byte_block1, byte_block2) in zip(iter(lambda f=fd1: os.read(f, 4096), b""),
|
|
iter(lambda f=fd2: os.read(f, 4096), b"")):
|
|
if byte_block1 != byte_block2:
|
|
props = differences.setdefault(path, {})
|
|
props["content"] = "different"
|
|
break
|
|
finally:
|
|
os.close(fd1)
|
|
os.close(fd2)
|
|
|
|
|
|
def symlink_diff(name, dir_fd1, dir_fd2, path, differences):
|
|
try:
|
|
target1 = os.readlink(name, dir_fd=dir_fd1)
|
|
target2 = os.readlink(name, dir_fd=dir_fd2)
|
|
except OSError:
|
|
return
|
|
if target1 != target2:
|
|
props = differences.setdefault(path, {})
|
|
props["symlink"] = [os.fsdecode(target1), os.fsdecode(target2)]
|
|
|
|
|
|
def diff_aux(dir_fd1, dir_fd2, path, report):
|
|
entries1 = set()
|
|
with os.scandir(dir_fd1) as it:
|
|
for dirent in it:
|
|
try:
|
|
stat2 = os.stat(dirent.name, dir_fd=dir_fd2, follow_symlinks=False)
|
|
except FileNotFoundError:
|
|
report["deleted_files"] += [os.path.join(path, dirent.name)]
|
|
if dirent.is_dir(follow_symlinks=False):
|
|
try:
|
|
child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd1)
|
|
except OSError:
|
|
continue
|
|
list_dir(child_fd, os.path.join(path, dirent.name), report["deleted_files"])
|
|
os.close(child_fd)
|
|
continue
|
|
entries1.add(dirent.name)
|
|
stat1 = dirent.stat(follow_symlinks=False)
|
|
selinux_diff(os.path.join(f"/proc/self/fd/{dir_fd1}", dirent.name),
|
|
os.path.join(f"/proc/self/fd/{dir_fd2}", dirent.name),
|
|
os.path.join(path, dirent.name),
|
|
report["differences"])
|
|
if not stat_diff(stat1,
|
|
stat2,
|
|
os.path.join(path, dirent.name),
|
|
report["differences"]):
|
|
continue
|
|
if dirent.is_symlink():
|
|
symlink_diff(dirent.name,
|
|
dir_fd1,
|
|
dir_fd2,
|
|
os.path.join(path, dirent.name),
|
|
report["differences"])
|
|
elif dirent.is_file(follow_symlinks=False):
|
|
content_diff(dirent.name,
|
|
dir_fd1,
|
|
dir_fd2,
|
|
stat1.st_size,
|
|
stat2.st_size,
|
|
os.path.join(path, dirent.name),
|
|
report["differences"])
|
|
elif dirent.is_dir(follow_symlinks=False):
|
|
try:
|
|
child_fd1 = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd1)
|
|
except OSError:
|
|
continue
|
|
try:
|
|
child_fd2 = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd2)
|
|
except OSError:
|
|
os.close(child_fd1)
|
|
continue
|
|
diff_aux(child_fd1, child_fd2, os.path.join(path, dirent.name), report)
|
|
os.close(child_fd2)
|
|
os.close(child_fd1)
|
|
with os.scandir(dir_fd2) as it:
|
|
for dirent in it:
|
|
if dirent.name not in entries1:
|
|
report["added_files"] += [os.path.join(path, dirent.name)]
|
|
if dirent.is_dir(follow_symlinks=False):
|
|
try:
|
|
child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd2)
|
|
except OSError:
|
|
continue
|
|
list_dir(child_fd, os.path.join(path, dirent.name), report["added_files"])
|
|
os.close(child_fd)
|
|
|
|
|
|
def diff(dir_fd1, dir_fd2, report):
|
|
stat1 = os.stat(".", dir_fd=dir_fd1, follow_symlinks=False)
|
|
stat2 = os.stat(".", dir_fd=dir_fd2, follow_symlinks=False)
|
|
selinux_diff(f"/proc/self/fd/{dir_fd1}", f"/proc/self/fd/{dir_fd2}", "/", report["differences"])
|
|
stat_diff(stat1, stat2, "/", report["differences"])
|
|
diff_aux(dir_fd1, dir_fd2, "/", report)
|
|
|
|
|
|
def list_dir(dir_fd, path, target_list):
|
|
with os.scandir(dir_fd) as it:
|
|
for dirent in it:
|
|
p = os.path.join(path, dirent.name)
|
|
target_list.append(p)
|
|
if dirent.is_dir(follow_symlinks=False):
|
|
try:
|
|
child_fd = os.open(dirent.name, os.O_DIRECTORY, dir_fd=dir_fd)
|
|
except OSError:
|
|
continue
|
|
list_dir(child_fd, p, target_list)
|
|
os.close(child_fd)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Recursively compare file system trees")
|
|
parser.add_argument("dir1", metavar="DIRECTORY1",
|
|
help="first directory to compare")
|
|
parser.add_argument("dir2", metavar="DIRECTORY2",
|
|
help="second directory to compare")
|
|
args = parser.parse_args()
|
|
|
|
report = {}
|
|
report["added_files"] = []
|
|
report["deleted_files"] = []
|
|
report["differences"] = {}
|
|
|
|
dir_fd1 = os.open(args.dir1, os.O_DIRECTORY)
|
|
dir_fd2 = os.open(args.dir2, os.O_DIRECTORY)
|
|
diff(dir_fd1, dir_fd2, report)
|
|
os.close(dir_fd2)
|
|
os.close(dir_fd1)
|
|
|
|
print(json.dumps(report, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|