import contextlib import os import subprocess import sys def skipcpio(fd): cpio_end = b"TRAILER!!!" cpio_len = len(cpio_end) pos = 0 while True: os.lseek(fd, pos, os.SEEK_SET) data = os.read(fd, 2 * cpio_len) if data == b'': # end of file, cpio_end not found, cat it all pos = 0 break r = data.find(cpio_end) if r != -1: pos += r + cpio_len break # found the end! pos += cpio_len os.lseek(fd, pos, os.SEEK_SET) if pos == 0: return pos # skip zeros n = 2 * cpio_len while True: data = os.read(fd, n) if data == b'': os.lseek(fd, pos, os.SEEK_SET) return 0 for i, x in enumerate(data): if x != 0: pos += i os.lseek(fd, pos, os.SEEK_SET) return pos pos += len(data) return pos def read_header(fd, n=6): pos = os.lseek(fd, 0, os.SEEK_CUR) hdr = os.read(fd, n) pos = os.lseek(fd, pos, os.SEEK_SET) return hdr def is_kmod(f: str): return f.endswith(".ko") or f.endswith(".ko.xz") class Initrd: def __init__(self, path: str): name = os.path.basename(path) if name == "initrd": # in the unlikely event that we actually have a fully compliant BLS entry # a la /6a9857a393724b7a981ebb5b8495b9ea/3.8.0-2.fc19.x86_64/initrd name = os.path.basename(os.path.dirname(path)) self.path = path self.name = name self.early_cpio = False self.compression = None self._cat = None self._cache = {} self.init() def init(self): with self.open() as image: hdr = read_header(image) if hdr.startswith(b'\x71\xc7') or hdr == b'070701': cmd = "cpio --extract --quiet --to-stdout -- 'early_cpio'" data = self.run(cmd, image) self.early_cpio = data == '1' if self.early_cpio: skipcpio(image) hdr = read_header(image) if hdr.startswith(b'\x1f\x8b'): cat = "zcat --" compression = "gzip" elif hdr.startswith(b'BZh'): cat = "bzcat --" compression = "bz" elif hdr.startswith(b'\x71\xc7') or hdr == b'070701': cat = "cat --" compression = "ascii" elif hdr.startswith(b'\x02\x21'): cat = "lz4 -d -c" compression = "lz4" elif hdr.startswith(b'\x89LZO\0'): cat = "lzop -d -c" compression = "lzop" elif hdr.startswith(b'\x28\xB5\x2F\xFD'): cat = "zstd -d -c" compression = "zstd" else: cat = "xzcat --single-stream --" compression = "xz" self._cat = cat self.compression = compression def read_file_list(self): cmd = f"{self._cat} | cpio -it --no-absolute-filename" with self.open() as image: data = self.run(cmd, image) filelist = data.split('\n') return filelist def read_modules(self): libdirs = ["lib64/dracut", "lib/dracut", "usr/lib64/dracut", "usr/lib/dracut"] paths = [f"{d}/modules.txt" for d in libdirs] cmd = f"{self._cat} | " cmd += "cpio --extract --quiet --to-stdout -- " cmd += " ".join(paths) with self.open() as image: data = self.run(cmd, image) return data.split("\n") def get_cached(self, name, fn): if name not in self._cache: self._cache[name] = fn() return self._cache[name] @property def filelist(self): return self.get_cached("filelist", self.read_file_list) @property def modules(self): return self.get_cached("modules", self.read_modules) @property def kmods(self): files = self.filelist mods = sorted(os.path.basename(f) for f in files if is_kmod(f)) return mods def as_dict(self): return { "early_cpio": self.early_cpio, "compression": self.compression, "kmods": self.kmods, "modules": self.modules } @contextlib.contextmanager def open(self, skip_cpio=True): fd = -1 try: fd = os.open(self.path, os.O_RDONLY) if self.early_cpio and skip_cpio: skipcpio(fd) yield fd finally: if fd != -1: os.close(fd) @staticmethod def run(cmd, image): argv = ["/bin/sh", "-c", cmd] output = subprocess.check_output(argv, encoding=None, stdin=image, stderr=subprocess.DEVNULL) return output.strip().decode('utf8') def read_initrd(path): initrd = Initrd(path) return { initrd.name: initrd.as_dict() } def main(): import json # pylint: disable=import-outside-toplevel data = read_initrd(sys.argv[1]) json.dump(data, sys.stdout, indent=2) if __name__ == "__main__": main()