test: add initrd module to inspec initrds
Add a new module that contains a utilities to inspect an initrd. It contains a `Initrd` class that loads a given initrd and can then be used get the list of files via Initrd.filelist, the dracut modules via `Initrd.modules` and the kernel modules via the `Initrd.kmods` properties. Another top level function `read_initrd` returns a dict that contains the name of the initrd as key and as value another dictionary with `modules`, `kmods` as well as the `compression` & `early_cpio` metadata. The initrd.py can also use as a script that will print the dict from `read_initrd` as JSON. The implementation is a heavily based on dracut's `lsinitrd` bash script and additionally contains a python port of the skipcpio.c utility.
This commit is contained in:
parent
6064793a54
commit
0fdbfa3c93
1 changed files with 189 additions and 0 deletions
189
test/initrd.py
Normal file
189
test/initrd.py
Normal file
|
|
@ -0,0 +1,189 @@
|
|||
import contextlib
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
def skipcpio(fd):
|
||||
cpio_end = b"TRAILER!!!"
|
||||
cpio_len = len(cpio_end)
|
||||
pos = 0
|
||||
while True:
|
||||
os.lseek(fd, pos, os.SEEK_SET)
|
||||
data = os.read(fd, 2*cpio_len)
|
||||
if data == b'':
|
||||
# end of file, cpio_end not found, cat it all
|
||||
pos = 0
|
||||
break
|
||||
r = data.find(cpio_end)
|
||||
if r != -1:
|
||||
pos += r + cpio_len
|
||||
break # found the end!
|
||||
pos += cpio_len
|
||||
os.lseek(fd, pos, os.SEEK_SET)
|
||||
if pos == 0:
|
||||
return pos
|
||||
# skip zeros
|
||||
n = 2*cpio_len
|
||||
while True:
|
||||
data = os.read(fd, n)
|
||||
if data == b'':
|
||||
os.lseek(fd, pos, os.SEEK_SET)
|
||||
return 0
|
||||
for i, x in enumerate(data):
|
||||
if x != 0:
|
||||
pos += i
|
||||
os.lseek(fd, pos, os.SEEK_SET)
|
||||
return pos
|
||||
pos += len(data)
|
||||
return pos
|
||||
|
||||
|
||||
def read_header(fd, n=6):
|
||||
pos = os.lseek(fd, 0, os.SEEK_CUR)
|
||||
hdr = os.read(fd, n)
|
||||
pos = os.lseek(fd, pos, os.SEEK_SET)
|
||||
return hdr
|
||||
|
||||
|
||||
def is_kmod(f: str):
|
||||
return f.endswith(".ko") or f.endswith(".ko.xz")
|
||||
|
||||
|
||||
class Initrd:
|
||||
def __init__(self, path: str):
|
||||
name = os.path.basename(path)
|
||||
if name == "initrd":
|
||||
# in the unlikely event that we actually have a fully compliant BLS entry
|
||||
# a la /6a9857a393724b7a981ebb5b8495b9ea/3.8.0-2.fc19.x86_64/initrd
|
||||
name = os.path.basename(os.path.dirname(path))
|
||||
self.path = path
|
||||
self.name = name
|
||||
|
||||
self.early_cpio = False
|
||||
self.compression = None
|
||||
self._cat = None
|
||||
|
||||
self._cache = {}
|
||||
self.init()
|
||||
|
||||
def init(self):
|
||||
with self.open() as image:
|
||||
hdr = read_header(image)
|
||||
if hdr.startswith(b'\x71\xc7') or hdr == b'070701':
|
||||
cmd = f"cpio --extract --quiet --to-stdout -- 'early_cpio'"
|
||||
data = self.run(cmd, image)
|
||||
self.early_cpio = data == '1'
|
||||
|
||||
if self.early_cpio:
|
||||
skipcpio(image)
|
||||
hdr = read_header(image)
|
||||
if hdr.startswith(b'\x1f\x8b'):
|
||||
cat = "zcat --"
|
||||
compression = "gzip"
|
||||
elif hdr.startswith(b'BZh'):
|
||||
cat = "bzcat --"
|
||||
compression = "bz"
|
||||
elif hdr.startswith(b'\x71\xc7') or hdr == b'070701':
|
||||
cat = "cat --"
|
||||
compression = "ascii"
|
||||
elif hdr.startswith(b'\x02\x21'):
|
||||
cat = "lz4 -d -c"
|
||||
compression = "lz4"
|
||||
elif hdr.startswith(b'\x89LZO\0'):
|
||||
cat = "lzop -d -c"
|
||||
compression = "lzop"
|
||||
elif hdr.startswith(b'\x28\xB5\x2F\xFD'):
|
||||
cat = "zstd -d -c"
|
||||
compression = "zstd"
|
||||
else:
|
||||
cat = "xzcat --single-stream --"
|
||||
compression = "xz"
|
||||
|
||||
self._cat = cat
|
||||
self.compression = compression
|
||||
|
||||
def read_file_list(self):
|
||||
cmd = f"{self._cat} | cpio -it --no-absolute-filename"
|
||||
with self.open() as image:
|
||||
data = self.run(cmd, image)
|
||||
filelist = data.split('\n')
|
||||
return filelist
|
||||
|
||||
def read_modules(self):
|
||||
libdirs = ["lib64/dracut", "lib/dracut",
|
||||
"usr/lib64/dracut", "usr/lib/dracut"]
|
||||
paths = [f"{d}/modules.txt" for d in libdirs]
|
||||
cmd = f"{self._cat} | "
|
||||
cmd += "cpio --extract --quiet --to-stdout -- "
|
||||
cmd += " ".join(paths)
|
||||
|
||||
with self.open() as image:
|
||||
data = self.run(cmd, image)
|
||||
return data.split("\n")
|
||||
|
||||
def get_cached(self, name, fn):
|
||||
if name not in self._cache:
|
||||
self._cache[name] = fn()
|
||||
return self._cache[name]
|
||||
|
||||
@property
|
||||
def filelist(self):
|
||||
return self.get_cached("filelist", self.read_file_list)
|
||||
|
||||
@property
|
||||
def modules(self):
|
||||
return self.get_cached("modules", self.read_modules)
|
||||
|
||||
@property
|
||||
def kmods(self):
|
||||
files = self.filelist
|
||||
mods = sorted(os.path.basename(f) for f in files if is_kmod(f))
|
||||
return mods
|
||||
|
||||
def as_dict(self):
|
||||
return {
|
||||
"early_cpio": self.early_cpio,
|
||||
"compression": self.compression,
|
||||
"kmods": self.kmods,
|
||||
"modules": self.modules
|
||||
}
|
||||
|
||||
@contextlib.contextmanager
|
||||
def open(self, skip_cpio=True):
|
||||
fd = -1
|
||||
try:
|
||||
fd = os.open(self.path, os.O_RDONLY)
|
||||
if self.early_cpio and skip_cpio:
|
||||
skipcpio(fd)
|
||||
yield fd
|
||||
finally:
|
||||
if fd != -1:
|
||||
os.close(fd)
|
||||
|
||||
@staticmethod
|
||||
def run(cmd, image):
|
||||
argv = ["/bin/sh", "-c", cmd]
|
||||
output = subprocess.check_output(argv,
|
||||
encoding=None,
|
||||
stdin=image,
|
||||
stderr=subprocess.DEVNULL)
|
||||
return output.strip().decode('utf-8')
|
||||
|
||||
|
||||
def read_initrd(path):
|
||||
initrd = Initrd(path)
|
||||
return {
|
||||
initrd.name: initrd.as_dict()
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
import json # pylint: disable=import-outside-toplevel
|
||||
|
||||
data = read_initrd(sys.argv[1])
|
||||
json.dump(data, sys.stdout, indent=2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue