Instead of having custom code that basically duplicates the functionality of `LoopControl.loop_for_fd` use that instead. Additionally, the version used in the test had a bug where it did not re-create the Loop device in the main loop when it was close due to an error, leading errors in subsequent usages of the device that would often manifest in CI runs: fcntl.ioctl(self.fd, self.LOOP_SET_FD, fd) ValueError: file descriptor cannot be a negative integer (-1)
167 lines
3.6 KiB
Python
167 lines
3.6 KiB
Python
#
|
|
# Test for the util.lvm2 module
|
|
#
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import time
|
|
import uuid
|
|
|
|
from typing import List
|
|
from tempfile import TemporaryDirectory
|
|
|
|
import pytest
|
|
|
|
from osbuild import loop
|
|
from osbuild.util import lvm2
|
|
from osbuild.util.types import PathLike
|
|
|
|
from ..test import TestBase
|
|
|
|
|
|
def have_lvm() -> bool:
|
|
try:
|
|
r = subprocess.run(
|
|
["vgs"],
|
|
encoding="utf-8",
|
|
stdout=subprocess.PIPE,
|
|
check=False
|
|
)
|
|
except FileNotFoundError:
|
|
return False
|
|
return r.returncode == 0
|
|
|
|
|
|
@pytest.fixture(name="tempdir")
|
|
def tempdir_fixture():
|
|
with TemporaryDirectory(prefix="lvm2-") as tmp:
|
|
yield tmp
|
|
|
|
|
|
def make_loop(ctl, fd: int, offset, sizelimit, sector_size=512):
|
|
if not sizelimit:
|
|
stat = os.fstat(fd)
|
|
sizelimit = stat.st_size - offset
|
|
print(f"size: {sizelimit}")
|
|
else:
|
|
sizelimit *= sector_size
|
|
|
|
return ctl.loop_for_fd(fd, offset, sizelimit=sizelimit, autoclear=True)
|
|
|
|
|
|
def pvcreate(path: PathLike):
|
|
cmd = ["pvcreate", os.fspath(path)]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def vgcreate(path: PathLike, vg_name: str):
|
|
cmd = ["vgcreate", vg_name, os.fspath(path)]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def lvcreate(vg_name, name: str, size: str):
|
|
cmd = [
|
|
"lvcreate", "-an",
|
|
"-l", size,
|
|
"-n", name,
|
|
vg_name
|
|
]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def list_vgs():
|
|
cmd = [
|
|
"vgs",
|
|
"--reportformat", "json",
|
|
"--readonly",
|
|
"-o", "+vg_all"
|
|
]
|
|
|
|
res = subprocess.run(cmd,
|
|
check=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
encoding="UTF-8")
|
|
|
|
data = res.stdout.strip()
|
|
|
|
if res.returncode != 0:
|
|
msg = f"vgs: {data}"
|
|
raise RuntimeError(msg)
|
|
|
|
data = json.loads(data)
|
|
|
|
return data["report"][0]["vg"]
|
|
|
|
|
|
def find_vg(lst: List, name: str):
|
|
for vg in lst:
|
|
if vg["vg_name"] == name:
|
|
return vg
|
|
return None
|
|
|
|
|
|
@pytest.mark.skipif(not have_lvm(), reason="require lvm2 installation")
|
|
@pytest.mark.skipif(not TestBase.can_bind_mount(), reason="root only")
|
|
def test_rename_vg_group(tempdir):
|
|
|
|
path = os.path.join(tempdir, "lvm.img")
|
|
ctl = loop.LoopControl()
|
|
|
|
f = None
|
|
lo = None
|
|
try:
|
|
f = open(path, "wb+")
|
|
f.truncate(100 * 1024 * 1024)
|
|
f.flush()
|
|
lo = make_loop(ctl, f.fileno(), 0, None)
|
|
devname = os.path.join("/dev", lo.devname)
|
|
|
|
vg_name = str(uuid.uuid4())
|
|
pvcreate(devname)
|
|
vgcreate(devname, vg_name)
|
|
lvcreate(vg_name, "lv1", r"100%FREE")
|
|
|
|
vgs = list_vgs()
|
|
vg = find_vg(vgs, vg_name)
|
|
assert vg
|
|
|
|
finally:
|
|
if lo:
|
|
lo.close()
|
|
if f:
|
|
f.close()
|
|
|
|
new_name = str(uuid.uuid4())
|
|
with lvm2.Disk.open(path) as disk:
|
|
assert disk.metadata
|
|
assert disk.metadata.vg_name == vg_name
|
|
|
|
disk.rename_vg(new_name)
|
|
disk.creation_host = "osbuild"
|
|
disk.description = "created via lvm2 and osbuild"
|
|
|
|
disk.flush_metadata()
|
|
|
|
f = None
|
|
lo = None
|
|
try:
|
|
f = open(path, "rb")
|
|
lo = make_loop(ctl, f.fileno(), 0, None)
|
|
devname = os.path.join("/dev", lo.devname)
|
|
|
|
vg = None
|
|
for i in range(3):
|
|
vgs = list_vgs()
|
|
vg = find_vg(vgs, new_name)
|
|
if vg:
|
|
break
|
|
time.sleep(0.250 * (i+1))
|
|
if not vg:
|
|
raise RuntimeError(f"Could not find vg {new_name}")
|
|
finally:
|
|
if lo:
|
|
lo.close()
|
|
if f:
|
|
f.close()
|