In all the invocation of `subprocess.run` stderr and stdout were both combined in a shared pipe, but lvm sometimes spits out notices and informational messages on stderr and thus potentially interfering with the data we are interested in on stdout. Separate the two.
167 lines
3.6 KiB
Python
167 lines
3.6 KiB
Python
#
|
|
# Test for the util.lvm2 module
|
|
#
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import time
|
|
import uuid
|
|
|
|
from typing import List
|
|
from tempfile import TemporaryDirectory
|
|
|
|
import pytest
|
|
|
|
from osbuild import loop
|
|
from osbuild.util import lvm2
|
|
from osbuild.util.types import PathLike
|
|
|
|
from ..test import TestBase
|
|
|
|
|
|
def have_lvm() -> bool:
|
|
try:
|
|
r = subprocess.run(
|
|
["vgs"],
|
|
encoding="utf-8",
|
|
stdout=subprocess.PIPE,
|
|
check=False
|
|
)
|
|
except FileNotFoundError:
|
|
return False
|
|
return r.returncode == 0
|
|
|
|
|
|
@pytest.fixture(name="tempdir")
|
|
def tempdir_fixture():
|
|
with TemporaryDirectory(prefix="lvm2-") as tmp:
|
|
yield tmp
|
|
|
|
|
|
def make_loop(ctl, fd: int, offset, sizelimit, sector_size=512):
|
|
if not sizelimit:
|
|
stat = os.fstat(fd)
|
|
sizelimit = stat.st_size - offset
|
|
print(f"size: {sizelimit}")
|
|
else:
|
|
sizelimit *= sector_size
|
|
|
|
return ctl.loop_for_fd(fd, offset, sizelimit=sizelimit, autoclear=True)
|
|
|
|
|
|
def pvcreate(path: PathLike):
|
|
cmd = ["pvcreate", os.fspath(path)]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def vgcreate(path: PathLike, vg_name: str):
|
|
cmd = ["vgcreate", vg_name, os.fspath(path)]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def lvcreate(vg_name, name: str, size: str):
|
|
cmd = [
|
|
"lvcreate", "-an",
|
|
"-l", size,
|
|
"-n", name,
|
|
vg_name
|
|
]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
def list_vgs():
|
|
cmd = [
|
|
"vgs",
|
|
"--reportformat", "json",
|
|
"--readonly",
|
|
"-o", "+vg_all"
|
|
]
|
|
|
|
res = subprocess.run(cmd,
|
|
check=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
encoding="UTF-8")
|
|
|
|
data = res.stdout.strip()
|
|
|
|
if res.returncode != 0:
|
|
msg = f"vgs: {res.stderr.strip()}"
|
|
raise RuntimeError(msg)
|
|
|
|
data = json.loads(data)
|
|
|
|
return data["report"][0]["vg"]
|
|
|
|
|
|
def find_vg(lst: List, name: str):
|
|
for vg in lst:
|
|
if vg["vg_name"] == name:
|
|
return vg
|
|
return None
|
|
|
|
|
|
@pytest.mark.skipif(not have_lvm(), reason="require lvm2 installation")
|
|
@pytest.mark.skipif(not TestBase.can_bind_mount(), reason="root only")
|
|
def test_rename_vg_group(tempdir):
|
|
|
|
path = os.path.join(tempdir, "lvm.img")
|
|
ctl = loop.LoopControl()
|
|
|
|
f = None
|
|
lo = None
|
|
try:
|
|
f = open(path, "wb+")
|
|
f.truncate(100 * 1024 * 1024)
|
|
f.flush()
|
|
lo = make_loop(ctl, f.fileno(), 0, None)
|
|
devname = os.path.join("/dev", lo.devname)
|
|
|
|
vg_name = str(uuid.uuid4())
|
|
pvcreate(devname)
|
|
vgcreate(devname, vg_name)
|
|
lvcreate(vg_name, "lv1", r"100%FREE")
|
|
|
|
vgs = list_vgs()
|
|
vg = find_vg(vgs, vg_name)
|
|
assert vg
|
|
|
|
finally:
|
|
if lo:
|
|
lo.close()
|
|
if f:
|
|
f.close()
|
|
|
|
new_name = str(uuid.uuid4())
|
|
with lvm2.Disk.open(path) as disk:
|
|
assert disk.metadata
|
|
assert disk.metadata.vg_name == vg_name
|
|
|
|
disk.rename_vg(new_name)
|
|
disk.creation_host = "osbuild"
|
|
disk.description = "created via lvm2 and osbuild"
|
|
|
|
disk.flush_metadata()
|
|
|
|
f = None
|
|
lo = None
|
|
try:
|
|
f = open(path, "rb")
|
|
lo = make_loop(ctl, f.fileno(), 0, None)
|
|
devname = os.path.join("/dev", lo.devname)
|
|
|
|
vg = None
|
|
for i in range(3):
|
|
vgs = list_vgs()
|
|
vg = find_vg(vgs, new_name)
|
|
if vg:
|
|
break
|
|
time.sleep(0.250 * (i+1))
|
|
if not vg:
|
|
raise RuntimeError(f"Could not find vg {new_name}")
|
|
finally:
|
|
if lo:
|
|
lo.close()
|
|
if f:
|
|
f.close()
|