debian-forge/test/mod/test_util_lvm2.py

166 lines
3.6 KiB
Python

#
# Test for the util.lvm2 module
#
import json
import os
import subprocess
import time
import uuid
from tempfile import TemporaryDirectory
from typing import List
import pytest
from osbuild import loop
from osbuild.util import lvm2
from osbuild.util.types import PathLike
from ..test import TestBase
def have_lvm() -> bool:
try:
r = subprocess.run(
["vgs"],
encoding="utf8",
stdout=subprocess.PIPE,
check=False
)
except FileNotFoundError:
return False
return r.returncode == 0
@pytest.fixture(name="tempdir")
def tempdir_fixture():
with TemporaryDirectory(prefix="lvm2-") as tmp:
yield tmp
def make_loop(ctl, fd: int, offset, sizelimit, sector_size=512):
if not sizelimit:
stat = os.fstat(fd)
sizelimit = stat.st_size - offset
print(f"size: {sizelimit}")
else:
sizelimit *= sector_size
return ctl.loop_for_fd(fd, offset, sizelimit=sizelimit, autoclear=True)
def pvcreate(path: PathLike):
cmd = ["pvcreate", os.fspath(path)]
subprocess.run(cmd, check=True)
def vgcreate(path: PathLike, vg_name: str):
cmd = ["vgcreate", vg_name, os.fspath(path)]
subprocess.run(cmd, check=True)
def lvcreate(vg_name, name: str, size: str):
cmd = [
"lvcreate", "-an",
"-l", size,
"-n", name,
vg_name
]
subprocess.run(cmd, check=True)
def list_vgs():
cmd = [
"vgs",
"--reportformat", "json",
"--readonly",
"-o", "+vg_all"
]
res = subprocess.run(cmd,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="UTF-8")
data = res.stdout.strip()
if res.returncode != 0:
msg = f"vgs: {res.stderr.strip()}"
raise RuntimeError(msg)
data = json.loads(data)
return data["report"][0]["vg"]
def find_vg(lst: List, name: str):
for vg in lst:
if vg["vg_name"] == name:
return vg
return None
@pytest.mark.skipif(not have_lvm(), reason="require lvm2 installation")
@pytest.mark.skipif(not TestBase.can_bind_mount(), reason="root only")
def test_rename_vg_group(tempdir):
path = os.path.join(tempdir, "lvm.img")
ctl = loop.LoopControl()
f = None
lo = None
try:
f = open(path, "wb+")
f.truncate(100 * 1024 * 1024)
f.flush()
lo = make_loop(ctl, f.fileno(), 0, None)
devname = os.path.join("/dev", lo.devname)
vg_name = str(uuid.uuid4())
pvcreate(devname)
vgcreate(devname, vg_name)
lvcreate(vg_name, "lv1", r"100%FREE")
vgs = list_vgs()
vg = find_vg(vgs, vg_name)
assert vg
finally:
if lo:
lo.close()
if f:
f.close()
new_name = str(uuid.uuid4())
with lvm2.Disk.open(path) as disk:
assert disk.metadata
assert disk.metadata.vg_name == vg_name
disk.rename_vg(new_name)
disk.creation_host = "osbuild"
disk.description = "created via lvm2 and osbuild"
disk.flush_metadata()
f = None
lo = None
try:
f = open(path, "rb")
lo = make_loop(ctl, f.fileno(), 0, None)
devname = os.path.join("/dev", lo.devname)
vg = None
for i in range(3):
vgs = list_vgs()
vg = find_vg(vgs, new_name)
if vg:
break
time.sleep(0.250 * (i + 1))
if not vg:
raise RuntimeError(f"Could not find vg {new_name}")
finally:
if lo:
lo.close()
if f:
f.close()