diff --git a/test/initrd.py b/test/initrd.py new file mode 100644 index 00000000..cb37817c --- /dev/null +++ b/test/initrd.py @@ -0,0 +1,189 @@ +import contextlib +import os +import subprocess +import sys + + +def skipcpio(fd): + cpio_end = b"TRAILER!!!" + cpio_len = len(cpio_end) + pos = 0 + while True: + os.lseek(fd, pos, os.SEEK_SET) + data = os.read(fd, 2*cpio_len) + if data == b'': + # end of file, cpio_end not found, cat it all + pos = 0 + break + r = data.find(cpio_end) + if r != -1: + pos += r + cpio_len + break # found the end! + pos += cpio_len + os.lseek(fd, pos, os.SEEK_SET) + if pos == 0: + return pos + # skip zeros + n = 2*cpio_len + while True: + data = os.read(fd, n) + if data == b'': + os.lseek(fd, pos, os.SEEK_SET) + return 0 + for i, x in enumerate(data): + if x != 0: + pos += i + os.lseek(fd, pos, os.SEEK_SET) + return pos + pos += len(data) + return pos + + +def read_header(fd, n=6): + pos = os.lseek(fd, 0, os.SEEK_CUR) + hdr = os.read(fd, n) + pos = os.lseek(fd, pos, os.SEEK_SET) + return hdr + + +def is_kmod(f: str): + return f.endswith(".ko") or f.endswith(".ko.xz") + + +class Initrd: + def __init__(self, path: str): + name = os.path.basename(path) + if name == "initrd": + # in the unlikely event that we actually have a fully compliant BLS entry + # a la /6a9857a393724b7a981ebb5b8495b9ea/3.8.0-2.fc19.x86_64/initrd + name = os.path.basename(os.path.dirname(path)) + self.path = path + self.name = name + + self.early_cpio = False + self.compression = None + self._cat = None + + self._cache = {} + self.init() + + def init(self): + with self.open() as image: + hdr = read_header(image) + if hdr.startswith(b'\x71\xc7') or hdr == b'070701': + cmd = f"cpio --extract --quiet --to-stdout -- 'early_cpio'" + data = self.run(cmd, image) + self.early_cpio = data == '1' + + if self.early_cpio: + skipcpio(image) + hdr = read_header(image) + if hdr.startswith(b'\x1f\x8b'): + cat = "zcat --" + compression = "gzip" + elif hdr.startswith(b'BZh'): + cat = "bzcat --" + compression = "bz" + elif hdr.startswith(b'\x71\xc7') or hdr == b'070701': + cat = "cat --" + compression = "ascii" + elif hdr.startswith(b'\x02\x21'): + cat = "lz4 -d -c" + compression = "lz4" + elif hdr.startswith(b'\x89LZO\0'): + cat = "lzop -d -c" + compression = "lzop" + elif hdr.startswith(b'\x28\xB5\x2F\xFD'): + cat = "zstd -d -c" + compression = "zstd" + else: + cat = "xzcat --single-stream --" + compression = "xz" + + self._cat = cat + self.compression = compression + + def read_file_list(self): + cmd = f"{self._cat} | cpio -it --no-absolute-filename" + with self.open() as image: + data = self.run(cmd, image) + filelist = data.split('\n') + return filelist + + def read_modules(self): + libdirs = ["lib64/dracut", "lib/dracut", + "usr/lib64/dracut", "usr/lib/dracut"] + paths = [f"{d}/modules.txt" for d in libdirs] + cmd = f"{self._cat} | " + cmd += "cpio --extract --quiet --to-stdout -- " + cmd += " ".join(paths) + + with self.open() as image: + data = self.run(cmd, image) + return data.split("\n") + + def get_cached(self, name, fn): + if name not in self._cache: + self._cache[name] = fn() + return self._cache[name] + + @property + def filelist(self): + return self.get_cached("filelist", self.read_file_list) + + @property + def modules(self): + return self.get_cached("modules", self.read_modules) + + @property + def kmods(self): + files = self.filelist + mods = sorted(os.path.basename(f) for f in files if is_kmod(f)) + return mods + + def as_dict(self): + return { + "early_cpio": self.early_cpio, + "compression": self.compression, + "kmods": self.kmods, + "modules": self.modules + } + + @contextlib.contextmanager + def open(self, skip_cpio=True): + fd = -1 + try: + fd = os.open(self.path, os.O_RDONLY) + if self.early_cpio and skip_cpio: + skipcpio(fd) + yield fd + finally: + if fd != -1: + os.close(fd) + + @staticmethod + def run(cmd, image): + argv = ["/bin/sh", "-c", cmd] + output = subprocess.check_output(argv, + encoding=None, + stdin=image, + stderr=subprocess.DEVNULL) + return output.strip().decode('utf-8') + + +def read_initrd(path): + initrd = Initrd(path) + return { + initrd.name: initrd.as_dict() + } + + +def main(): + import json # pylint: disable=import-outside-toplevel + + data = read_initrd(sys.argv[1]) + json.dump(data, sys.stdout, indent=2) + + +if __name__ == "__main__": + main()