167 lines
3.6 KiB
Python
167 lines
3.6 KiB
Python
#
|
|
# Test for the util.lvm2 module
|
|
#
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import time
|
|
import uuid
|
|
|
|
from typing import List
|
|
from tempfile import TemporaryDirectory
|
|
|
|
import pytest
|
|
|
|
from osbuild import loop
|
|
from osbuild.util import lvm2
|
|
from osbuild.util.types import PathLike
|
|
|
|
from ..test import TestBase
|
|
|
|
|
|
def have_lvm() -> bool:
|
|
try:
|
|
r = subprocess.run(
|
|
["vgs"],
|
|
encoding="utf8",
|
|
stdout=subprocess.PIPE,
|
|
check=False
|
|
)
|
|
except FileNotFoundError:
|
|
return False
|
|
return r.returncode == 0
|
|
|
|
|
|
@pytest.fixture(name="tempdir")
|
|
def tempdir_fixture():
|
|
with TemporaryDirectory(prefix="lvm2-") as tmp:
|
|
yield tmp
|
|
|
|
|
|
def make_loop(ctl, fd: int, offset, sizelimit, sector_size=512):
|
|
if not sizelimit:
|
|
stat = os.fstat(fd)
|
|
sizelimit = stat.st_size - offset
|
|
print(f"size: {sizelimit}")
|
|
else:
|
|
sizelimit *= sector_size
|
|
|
|
return ctl.loop_for_fd(fd, offset, sizelimit=sizelimit, autoclear=True)
|
|
|
|
|
|
def pvcreate(path: PathLike):
|
|
cmd = ["pvcreate", os.fspath(path)]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def vgcreate(path: PathLike, vg_name: str):
|
|
cmd = ["vgcreate", vg_name, os.fspath(path)]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def lvcreate(vg_name, name: str, size: str):
|
|
cmd = [
|
|
"lvcreate", "-an",
|
|
"-l", size,
|
|
"-n", name,
|
|
vg_name
|
|
]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def list_vgs():
|
|
cmd = [
|
|
"vgs",
|
|
"--reportformat", "json",
|
|
"--readonly",
|
|
"-o", "+vg_all"
|
|
]
|
|
|
|
res = subprocess.run(cmd,
|
|
check=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
encoding="UTF-8")
|
|
|
|
data = res.stdout.strip()
|
|
|
|
if res.returncode != 0:
|
|
msg = f"vgs: {res.stderr.strip()}"
|
|
raise RuntimeError(msg)
|
|
|
|
data = json.loads(data)
|
|
|
|
return data["report"][0]["vg"]
|
|
|
|
|
|
def find_vg(lst: List, name: str):
|
|
for vg in lst:
|
|
if vg["vg_name"] == name:
|
|
return vg
|
|
return None
|
|
|
|
|
|
@pytest.mark.skipif(not have_lvm(), reason="require lvm2 installation")
|
|
@pytest.mark.skipif(not TestBase.can_bind_mount(), reason="root only")
|
|
def test_rename_vg_group(tempdir):
|
|
|
|
path = os.path.join(tempdir, "lvm.img")
|
|
ctl = loop.LoopControl()
|
|
|
|
f = None
|
|
lo = None
|
|
try:
|
|
f = open(path, "wb+")
|
|
f.truncate(100 * 1024 * 1024)
|
|
f.flush()
|
|
lo = make_loop(ctl, f.fileno(), 0, None)
|
|
devname = os.path.join("/dev", lo.devname)
|
|
|
|
vg_name = str(uuid.uuid4())
|
|
pvcreate(devname)
|
|
vgcreate(devname, vg_name)
|
|
lvcreate(vg_name, "lv1", r"100%FREE")
|
|
|
|
vgs = list_vgs()
|
|
vg = find_vg(vgs, vg_name)
|
|
assert vg
|
|
|
|
finally:
|
|
if lo:
|
|
lo.close()
|
|
if f:
|
|
f.close()
|
|
|
|
new_name = str(uuid.uuid4())
|
|
with lvm2.Disk.open(path) as disk:
|
|
assert disk.metadata
|
|
assert disk.metadata.vg_name == vg_name
|
|
|
|
disk.rename_vg(new_name)
|
|
disk.creation_host = "osbuild"
|
|
disk.description = "created via lvm2 and osbuild"
|
|
|
|
disk.flush_metadata()
|
|
|
|
f = None
|
|
lo = None
|
|
try:
|
|
f = open(path, "rb")
|
|
lo = make_loop(ctl, f.fileno(), 0, None)
|
|
devname = os.path.join("/dev", lo.devname)
|
|
|
|
vg = None
|
|
for i in range(3):
|
|
vgs = list_vgs()
|
|
vg = find_vg(vgs, new_name)
|
|
if vg:
|
|
break
|
|
time.sleep(0.250 * (i+1))
|
|
if not vg:
|
|
raise RuntimeError(f"Could not find vg {new_name}")
|
|
finally:
|
|
if lo:
|
|
lo.close()
|
|
if f:
|
|
f.close()
|