test: add rename check for lvm2 module
Check we can create and successfully rename a lvm2 volume group.
This commit is contained in:
parent
0e31e628d7
commit
6ea5ce1836
1 changed files with 190 additions and 0 deletions
190
test/mod/test_util_lvm2.py
Normal file
190
test/mod/test_util_lvm2.py
Normal file
|
|
@ -0,0 +1,190 @@
|
|||
#
|
||||
# Test for the util.lvm2 module
|
||||
#
|
||||
|
||||
import errno
|
||||
import os
|
||||
import json
|
||||
import subprocess
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from typing import List
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
import pytest
|
||||
|
||||
from osbuild import loop
|
||||
from osbuild.util import lvm2
|
||||
from osbuild.util.types import PathLike
|
||||
|
||||
from ..test import TestBase
|
||||
|
||||
|
||||
def have_lvm() -> bool:
|
||||
try:
|
||||
r = subprocess.run(
|
||||
["vgs"],
|
||||
encoding="utf-8",
|
||||
stdout=subprocess.PIPE,
|
||||
check=False
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
return r.returncode == 0
|
||||
|
||||
|
||||
@pytest.fixture(name="tempdir")
|
||||
def tempdir_fixture():
|
||||
with TemporaryDirectory(prefix="lvm2-") as tmp:
|
||||
yield tmp
|
||||
|
||||
|
||||
def make_loop(ctl, fd: int, offset, sizelimit, sector_size=512):
|
||||
lo = loop.Loop(ctl.get_unbound())
|
||||
|
||||
if not sizelimit:
|
||||
stat = os.fstat(fd)
|
||||
sizelimit = stat.st_size - offset
|
||||
print(f"size: {sizelimit}")
|
||||
else:
|
||||
sizelimit *= sector_size
|
||||
|
||||
while True:
|
||||
try:
|
||||
lo.set_fd(fd)
|
||||
except OSError as e:
|
||||
lo.close()
|
||||
if e.errno == errno.EBUSY:
|
||||
continue
|
||||
raise e
|
||||
# `set_status` returns EBUSY when the pages from the previously
|
||||
# bound file have not been fully cleared yet.
|
||||
try:
|
||||
lo.set_status(offset=offset,
|
||||
sizelimit=sizelimit,
|
||||
autoclear=True)
|
||||
except BlockingIOError:
|
||||
lo.clear_fd()
|
||||
lo.close()
|
||||
continue
|
||||
break
|
||||
|
||||
return lo
|
||||
|
||||
|
||||
def pvcreate(path: PathLike):
|
||||
cmd = ["pvcreate", os.fspath(path)]
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
|
||||
def vgcreate(path: PathLike, vg_name: str):
|
||||
cmd = ["vgcreate", vg_name, os.fspath(path)]
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
|
||||
def lvcreate(vg_name, name: str, size: str):
|
||||
cmd = [
|
||||
"lvcreate", "-an",
|
||||
"-l", size,
|
||||
"-n", name,
|
||||
vg_name
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
|
||||
def list_vgs():
|
||||
cmd = [
|
||||
"vgs",
|
||||
"--reportformat", "json",
|
||||
"--readonly",
|
||||
"-o", "+vg_all"
|
||||
]
|
||||
|
||||
res = subprocess.run(cmd,
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
encoding="UTF-8")
|
||||
|
||||
data = res.stdout.strip()
|
||||
|
||||
if res.returncode != 0:
|
||||
msg = f"vgs: {data}"
|
||||
raise RuntimeError(msg)
|
||||
|
||||
data = json.loads(data)
|
||||
|
||||
return data["report"][0]["vg"]
|
||||
|
||||
|
||||
def find_vg(lst: List, name: str):
|
||||
for vg in lst:
|
||||
if vg["vg_name"] == name:
|
||||
return vg
|
||||
return None
|
||||
|
||||
|
||||
@pytest.mark.skipif(not have_lvm(), reason="require lvm2 installation")
|
||||
@pytest.mark.skipif(not TestBase.can_bind_mount(), reason="root only")
|
||||
def test_rename_vg_group(tempdir):
|
||||
|
||||
path = os.path.join(tempdir, "lvm.img")
|
||||
ctl = loop.LoopControl()
|
||||
|
||||
f = None
|
||||
lo = None
|
||||
try:
|
||||
f = open(path, "wb+")
|
||||
f.truncate(100 * 1024 * 1024)
|
||||
f.flush()
|
||||
lo = make_loop(ctl, f.fileno(), 0, None)
|
||||
devname = os.path.join("/dev", lo.devname)
|
||||
|
||||
vg_name = str(uuid.uuid4())
|
||||
pvcreate(devname)
|
||||
vgcreate(devname, vg_name)
|
||||
lvcreate(vg_name, "lv1", r"100%FREE")
|
||||
|
||||
vgs = list_vgs()
|
||||
vg = find_vg(vgs, vg_name)
|
||||
assert vg
|
||||
|
||||
finally:
|
||||
if lo:
|
||||
lo.close()
|
||||
if f:
|
||||
f.close()
|
||||
|
||||
new_name = str(uuid.uuid4())
|
||||
with lvm2.Disk.open(path) as disk:
|
||||
assert disk.metadata
|
||||
assert disk.metadata.vg_name == vg_name
|
||||
|
||||
disk.rename_vg(new_name)
|
||||
disk.creation_host = "osbuild"
|
||||
disk.description = "created via lvm2 and osbuild"
|
||||
|
||||
disk.flush_metadata()
|
||||
|
||||
f = None
|
||||
lo = None
|
||||
try:
|
||||
f = open(path, "rb")
|
||||
lo = make_loop(ctl, f.fileno(), 0, None)
|
||||
devname = os.path.join("/dev", lo.devname)
|
||||
|
||||
vg = None
|
||||
for i in range(3):
|
||||
vgs = list_vgs()
|
||||
vg = find_vg(vgs, new_name)
|
||||
if vg:
|
||||
break
|
||||
time.sleep(0.250 * (i+1))
|
||||
if not vg:
|
||||
raise RuntimeError(f"Could not find vg {new_name}")
|
||||
finally:
|
||||
if lo:
|
||||
lo.close()
|
||||
if f:
|
||||
f.close()
|
||||
Loading…
Add table
Add a link
Reference in a new issue