diff --git a/test/mod/test_util_lvm2.py b/test/mod/test_util_lvm2.py new file mode 100644 index 00000000..eb37245e --- /dev/null +++ b/test/mod/test_util_lvm2.py @@ -0,0 +1,190 @@ +# +# Test for the util.lvm2 module +# + +import errno +import os +import json +import subprocess +import time +import uuid + +from typing import List +from tempfile import TemporaryDirectory + +import pytest + +from osbuild import loop +from osbuild.util import lvm2 +from osbuild.util.types import PathLike + +from ..test import TestBase + + +def have_lvm() -> bool: + try: + r = subprocess.run( + ["vgs"], + encoding="utf-8", + stdout=subprocess.PIPE, + check=False + ) + except FileNotFoundError: + return False + return r.returncode == 0 + + +@pytest.fixture(name="tempdir") +def tempdir_fixture(): + with TemporaryDirectory(prefix="lvm2-") as tmp: + yield tmp + + +def make_loop(ctl, fd: int, offset, sizelimit, sector_size=512): + lo = loop.Loop(ctl.get_unbound()) + + if not sizelimit: + stat = os.fstat(fd) + sizelimit = stat.st_size - offset + print(f"size: {sizelimit}") + else: + sizelimit *= sector_size + + while True: + try: + lo.set_fd(fd) + except OSError as e: + lo.close() + if e.errno == errno.EBUSY: + continue + raise e + # `set_status` returns EBUSY when the pages from the previously + # bound file have not been fully cleared yet. + try: + lo.set_status(offset=offset, + sizelimit=sizelimit, + autoclear=True) + except BlockingIOError: + lo.clear_fd() + lo.close() + continue + break + + return lo + + +def pvcreate(path: PathLike): + cmd = ["pvcreate", os.fspath(path)] + subprocess.run(cmd, check=True) + + +def vgcreate(path: PathLike, vg_name: str): + cmd = ["vgcreate", vg_name, os.fspath(path)] + subprocess.run(cmd, check=True) + + +def lvcreate(vg_name, name: str, size: str): + cmd = [ + "lvcreate", "-an", + "-l", size, + "-n", name, + vg_name + ] + subprocess.run(cmd, check=True) + + +def list_vgs(): + cmd = [ + "vgs", + "--reportformat", "json", + "--readonly", + "-o", "+vg_all" + ] + + res = subprocess.run(cmd, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + encoding="UTF-8") + + data = res.stdout.strip() + + if res.returncode != 0: + msg = f"vgs: {data}" + raise RuntimeError(msg) + + data = json.loads(data) + + return data["report"][0]["vg"] + + +def find_vg(lst: List, name: str): + for vg in lst: + if vg["vg_name"] == name: + return vg + return None + + +@pytest.mark.skipif(not have_lvm(), reason="require lvm2 installation") +@pytest.mark.skipif(not TestBase.can_bind_mount(), reason="root only") +def test_rename_vg_group(tempdir): + + path = os.path.join(tempdir, "lvm.img") + ctl = loop.LoopControl() + + f = None + lo = None + try: + f = open(path, "wb+") + f.truncate(100 * 1024 * 1024) + f.flush() + lo = make_loop(ctl, f.fileno(), 0, None) + devname = os.path.join("/dev", lo.devname) + + vg_name = str(uuid.uuid4()) + pvcreate(devname) + vgcreate(devname, vg_name) + lvcreate(vg_name, "lv1", r"100%FREE") + + vgs = list_vgs() + vg = find_vg(vgs, vg_name) + assert vg + + finally: + if lo: + lo.close() + if f: + f.close() + + new_name = str(uuid.uuid4()) + with lvm2.Disk.open(path) as disk: + assert disk.metadata + assert disk.metadata.vg_name == vg_name + + disk.rename_vg(new_name) + disk.creation_host = "osbuild" + disk.description = "created via lvm2 and osbuild" + + disk.flush_metadata() + + f = None + lo = None + try: + f = open(path, "rb") + lo = make_loop(ctl, f.fileno(), 0, None) + devname = os.path.join("/dev", lo.devname) + + vg = None + for i in range(3): + vgs = list_vgs() + vg = find_vg(vgs, new_name) + if vg: + break + time.sleep(0.250 * (i+1)) + if not vg: + raise RuntimeError(f"Could not find vg {new_name}") + finally: + if lo: + lo.close() + if f: + f.close()