- Add complete pytest testing framework with conftest.py and test files - Add performance monitoring and benchmarking capabilities - Add plugin system with ccache plugin example - Add comprehensive documentation (API, deployment, testing, etc.) - Add Docker API wrapper for service deployment - Add advanced configuration examples - Remove old wget package file - Update core modules with enhanced functionality
747 lines
27 KiB
Python
747 lines
27 KiB
Python
"""
|
|
Chroot management for deb-mock
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
|
|
from .exceptions import ChrootError
|
|
from .uid_manager import UIDManager
|
|
|
|
|
|
class ChrootManager:
|
|
"""Manages chroot environments for deb-mock"""
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self._active_mounts = {} # Track active mounts per chroot
|
|
self.uid_manager = UIDManager(config)
|
|
|
|
def create_chroot(self, chroot_name: str, arch: str = None, suite: str = None) -> None:
|
|
"""Create a new chroot environment"""
|
|
|
|
if arch:
|
|
self.config.architecture = arch
|
|
if suite:
|
|
self.config.suite = suite
|
|
|
|
# Check if bootstrap chroot is needed (Mock FAQ #2)
|
|
if self.config.use_bootstrap_chroot:
|
|
self._create_bootstrap_chroot(chroot_name)
|
|
else:
|
|
self._create_standard_chroot(chroot_name)
|
|
|
|
# Setup advanced mounts after chroot creation
|
|
self._setup_advanced_mounts(chroot_name)
|
|
|
|
# Setup UID/GID management
|
|
self._setup_chroot_users(chroot_name)
|
|
|
|
def _create_bootstrap_chroot(self, chroot_name: str) -> None:
|
|
"""
|
|
Create a bootstrap chroot for cross-distribution builds.
|
|
|
|
This addresses Mock FAQ #2 about building packages for newer distributions
|
|
on older systems (e.g., building Debian Sid packages on Debian Stable).
|
|
"""
|
|
bootstrap_name = self.config.bootstrap_chroot_name or f"{chroot_name}-bootstrap"
|
|
bootstrap_path = os.path.join(self.config.chroot_dir, bootstrap_name)
|
|
|
|
# Create minimal bootstrap chroot first
|
|
if not os.path.exists(bootstrap_path):
|
|
self._create_standard_chroot(bootstrap_name)
|
|
|
|
# Use bootstrap chroot to create the final chroot
|
|
try:
|
|
# Create final chroot using debootstrap from within bootstrap
|
|
cmd = [
|
|
"/usr/sbin/debootstrap",
|
|
"--arch",
|
|
self.config.architecture,
|
|
self.config.suite,
|
|
f"/var/lib/deb-mock/chroots/{chroot_name}",
|
|
self.config.mirror,
|
|
]
|
|
|
|
# Execute debootstrap within bootstrap chroot
|
|
result = self.execute_in_chroot(bootstrap_name, cmd, capture_output=True)
|
|
|
|
if result.returncode != 0:
|
|
raise ChrootError(
|
|
f"Failed to create chroot using bootstrap: {result.stderr}",
|
|
chroot_name=chroot_name,
|
|
operation="bootstrap_debootstrap",
|
|
)
|
|
|
|
# Configure the new chroot
|
|
self._configure_chroot(chroot_name)
|
|
|
|
except Exception as e:
|
|
raise ChrootError(
|
|
f"Bootstrap chroot creation failed: {e}",
|
|
chroot_name=chroot_name,
|
|
operation="bootstrap_creation",
|
|
)
|
|
|
|
def _create_standard_chroot(self, chroot_name: str) -> None:
|
|
"""Create a standard chroot using debootstrap"""
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
|
|
if os.path.exists(chroot_path):
|
|
raise ChrootError(
|
|
f"Chroot '{chroot_name}' already exists",
|
|
chroot_name=chroot_name,
|
|
operation="create",
|
|
)
|
|
|
|
try:
|
|
# Create chroot directory
|
|
os.makedirs(chroot_path, exist_ok=True)
|
|
|
|
# Run debootstrap
|
|
cmd = [
|
|
"/usr/sbin/debootstrap",
|
|
"--arch",
|
|
self.config.architecture,
|
|
self.config.suite,
|
|
chroot_path,
|
|
self.config.mirror,
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
|
|
|
|
if result.returncode != 0:
|
|
raise ChrootError(
|
|
f"debootstrap failed: {result.stderr}",
|
|
chroot_name=chroot_name,
|
|
operation="/usr/sbin/debootstrap",
|
|
chroot_path=chroot_path,
|
|
)
|
|
|
|
# Configure the chroot
|
|
self._configure_chroot(chroot_name)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise ChrootError(
|
|
f"Failed to create chroot: {e}",
|
|
chroot_name=chroot_name,
|
|
operation="create",
|
|
chroot_path=chroot_path,
|
|
)
|
|
|
|
def _configure_chroot(self, chroot_name: str) -> None:
|
|
"""Configure a newly created chroot"""
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
|
|
# Create schroot configuration
|
|
self._create_schroot_config(chroot_name, chroot_path, self.config.architecture, self.config.suite)
|
|
|
|
# Install additional packages if specified
|
|
if self.config.chroot_additional_packages:
|
|
self._install_additional_packages(chroot_name)
|
|
|
|
# Run setup commands if specified
|
|
if self.config.chroot_setup_cmd:
|
|
self._run_setup_commands(chroot_name)
|
|
|
|
def _install_additional_packages(self, chroot_name: str) -> None:
|
|
"""Install additional packages in the chroot"""
|
|
try:
|
|
# Update package lists
|
|
self.execute_in_chroot(chroot_name, ["apt-get", "update"], capture_output=True)
|
|
|
|
# Install packages
|
|
cmd = ["apt-get", "install", "-y"] + self.config.chroot_additional_packages
|
|
result = self.execute_in_chroot(chroot_name, cmd, capture_output=True)
|
|
|
|
if result.returncode != 0:
|
|
raise ChrootError(
|
|
f"Failed to install additional packages: {result.stderr}",
|
|
chroot_name=chroot_name,
|
|
operation="install_packages",
|
|
)
|
|
|
|
except Exception as e:
|
|
raise ChrootError(
|
|
f"Failed to install additional packages: {e}",
|
|
chroot_name=chroot_name,
|
|
operation="install_packages",
|
|
)
|
|
|
|
def _run_setup_commands(self, chroot_name: str) -> None:
|
|
"""Run setup commands in the chroot"""
|
|
for cmd in self.config.chroot_setup_cmd:
|
|
try:
|
|
result = self.execute_in_chroot(chroot_name, cmd.split(), capture_output=True)
|
|
|
|
if result.returncode != 0:
|
|
raise ChrootError(
|
|
f"Setup command failed: {result.stderr}",
|
|
chroot_name=chroot_name,
|
|
operation="setup_command",
|
|
)
|
|
|
|
except Exception as e:
|
|
raise ChrootError(
|
|
f"Failed to run setup command '{cmd}': {e}",
|
|
chroot_name=chroot_name,
|
|
operation="setup_command",
|
|
)
|
|
|
|
def _create_schroot_config(self, chroot_name: str, chroot_path: str, arch: str, suite: str) -> None:
|
|
"""Create schroot configuration file"""
|
|
config_content = f"""[{chroot_name}]
|
|
description=Deb-Mock chroot for {suite} {arch}
|
|
directory={chroot_path}
|
|
root-users=root
|
|
users=root
|
|
type=directory
|
|
profile=desktop
|
|
preserve-environment=true
|
|
"""
|
|
|
|
config_file = os.path.join(self.config.chroot_config_dir, f"{chroot_name}.conf")
|
|
|
|
try:
|
|
with open(config_file, "w") as f:
|
|
f.write(config_content)
|
|
except Exception as e:
|
|
raise ChrootError(f"Failed to create schroot config: {e}")
|
|
|
|
def _initialize_chroot(self, chroot_path: str, arch: str, suite: str) -> None:
|
|
"""Initialize chroot using debootstrap"""
|
|
cmd = [
|
|
"/usr/sbin/debootstrap",
|
|
"--arch",
|
|
arch,
|
|
"--variant=buildd",
|
|
suite,
|
|
chroot_path,
|
|
"http://deb.debian.org/debian/",
|
|
]
|
|
|
|
try:
|
|
subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
raise ChrootError(f"debootstrap failed: {e.stderr}")
|
|
except FileNotFoundError:
|
|
raise ChrootError("debootstrap not found. Please install debootstrap package.")
|
|
|
|
def _install_build_tools(self, chroot_name: str) -> None:
|
|
"""Install essential build tools in the chroot"""
|
|
packages = [
|
|
"build-essential",
|
|
"devscripts",
|
|
"debhelper",
|
|
"dh-make",
|
|
"fakeroot",
|
|
"lintian",
|
|
"sbuild",
|
|
"schroot",
|
|
]
|
|
|
|
cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "update"]
|
|
try:
|
|
subprocess.run(cmd, check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
raise ChrootError(f"Failed to update package lists: {e}")
|
|
|
|
cmd = [
|
|
"schroot",
|
|
"-c",
|
|
chroot_name,
|
|
"--",
|
|
"apt-get",
|
|
"install",
|
|
"-y",
|
|
] + packages
|
|
try:
|
|
subprocess.run(cmd, check=True)
|
|
except subprocess.CalledProcessError as e:
|
|
raise ChrootError(f"Failed to install build tools: {e}")
|
|
|
|
def clean_chroot(self, chroot_name: str) -> None:
|
|
"""Clean up a chroot environment"""
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
config_file = os.path.join(self.config.chroot_config_dir, f"{chroot_name}.conf")
|
|
|
|
try:
|
|
# Remove schroot configuration
|
|
if os.path.exists(config_file):
|
|
os.remove(config_file)
|
|
|
|
# Remove chroot directory
|
|
if os.path.exists(chroot_path):
|
|
shutil.rmtree(chroot_path)
|
|
|
|
except Exception as e:
|
|
raise ChrootError(f"Failed to clean chroot '{chroot_name}': {e}")
|
|
|
|
def list_chroots(self) -> List[str]:
|
|
"""List available chroot environments"""
|
|
chroots = []
|
|
|
|
try:
|
|
# List chroot configurations
|
|
for config_file in Path(self.config.chroot_config_dir).glob("*.conf"):
|
|
chroot_name = config_file.stem
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
|
|
if os.path.exists(chroot_path):
|
|
chroots.append(chroot_name)
|
|
|
|
except Exception as e:
|
|
raise ChrootError(f"Failed to list chroots: {e}")
|
|
|
|
return chroots
|
|
|
|
def chroot_exists(self, chroot_name: str) -> bool:
|
|
"""Check if a chroot environment exists"""
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
config_file = os.path.join(self.config.chroot_config_dir, f"{chroot_name}.conf")
|
|
|
|
return os.path.exists(chroot_path) and os.path.exists(config_file)
|
|
|
|
def get_chroot_info(self, chroot_name: str) -> dict:
|
|
"""Get information about a chroot environment"""
|
|
if not self.chroot_exists(chroot_name):
|
|
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
|
|
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
|
|
info = {
|
|
"name": chroot_name,
|
|
"path": chroot_path,
|
|
"exists": True,
|
|
"size": 0,
|
|
"created": None,
|
|
"modified": None,
|
|
}
|
|
|
|
try:
|
|
stat = os.stat(chroot_path)
|
|
info["size"] = stat.st_size
|
|
info["created"] = stat.st_ctime
|
|
info["modified"] = stat.st_mtime
|
|
except Exception:
|
|
pass
|
|
|
|
return info
|
|
|
|
def update_chroot(self, chroot_name: str) -> None:
|
|
"""Update packages in a chroot environment"""
|
|
if not self.chroot_exists(chroot_name):
|
|
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
|
|
|
|
try:
|
|
# Update package lists
|
|
cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "update"]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
# Upgrade packages
|
|
cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "upgrade", "-y"]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise ChrootError(f"Failed to update chroot '{chroot_name}': {e}")
|
|
|
|
def execute_in_chroot(
|
|
self,
|
|
chroot_name: str,
|
|
command: list,
|
|
capture_output: bool = True,
|
|
preserve_env: bool = True,
|
|
) -> subprocess.CompletedProcess:
|
|
"""Execute a command in the chroot environment"""
|
|
|
|
if not self.chroot_exists(chroot_name):
|
|
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
|
|
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
|
|
# Prepare environment variables (Mock FAQ #1 - Environment preservation)
|
|
env = self._prepare_chroot_environment(preserve_env)
|
|
|
|
# Build schroot command
|
|
schroot_cmd = [
|
|
"schroot",
|
|
"-c",
|
|
chroot_name,
|
|
"--",
|
|
"sh",
|
|
"-c",
|
|
" ".join(command),
|
|
]
|
|
|
|
try:
|
|
if capture_output:
|
|
result = subprocess.run(
|
|
schroot_cmd,
|
|
cwd=chroot_path,
|
|
env=env,
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
else:
|
|
result = subprocess.run(schroot_cmd, cwd=chroot_path, env=env, check=False)
|
|
|
|
return result
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
raise ChrootError(f"Command failed in chroot: {e}")
|
|
|
|
def _prepare_chroot_environment(self, preserve_env: bool = True) -> dict:
|
|
"""
|
|
Prepare environment variables for chroot execution.
|
|
|
|
This addresses Mock FAQ #1 about environment variable preservation.
|
|
"""
|
|
env = os.environ.copy()
|
|
|
|
if not preserve_env or not self.config.environment_sanitization:
|
|
return env
|
|
|
|
# Filter environment variables based on allowed list
|
|
filtered_env = {}
|
|
|
|
# Always preserve basic system variables
|
|
basic_vars = ["PATH", "HOME", "USER", "SHELL", "TERM", "LANG", "LC_ALL"]
|
|
for var in basic_vars:
|
|
if var in env:
|
|
filtered_env[var] = env[var]
|
|
|
|
# Preserve allowed build-related variables
|
|
for var in self.config.allowed_environment_vars:
|
|
if var in env:
|
|
filtered_env[var] = env[var]
|
|
|
|
# Preserve user-specified variables
|
|
for var in self.config.preserve_environment:
|
|
if var in env:
|
|
filtered_env[var] = env[var]
|
|
|
|
return filtered_env
|
|
|
|
def copy_to_chroot(self, source_path: str, dest_path: str, chroot_name: str) -> None:
|
|
"""Copy files from host to chroot (similar to Mock's --copyin)"""
|
|
if not self.chroot_exists(chroot_name):
|
|
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
|
|
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
full_dest_path = os.path.join(chroot_path, dest_path.lstrip("/"))
|
|
|
|
try:
|
|
# Create destination directory if it doesn't exist
|
|
os.makedirs(os.path.dirname(full_dest_path), exist_ok=True)
|
|
|
|
# Copy file or directory
|
|
if os.path.isdir(source_path):
|
|
shutil.copytree(source_path, full_dest_path, dirs_exist_ok=True)
|
|
else:
|
|
shutil.copy2(source_path, full_dest_path)
|
|
|
|
except Exception as e:
|
|
raise ChrootError(f"Failed to copy {source_path} to chroot: {e}")
|
|
|
|
def copy_from_chroot(self, source_path: str, dest_path: str, chroot_name: str) -> None:
|
|
"""Copy files from chroot to host (similar to Mock's --copyout)"""
|
|
if not self.chroot_exists(chroot_name):
|
|
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
|
|
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
full_source_path = os.path.join(chroot_path, source_path.lstrip("/"))
|
|
|
|
try:
|
|
# Create destination directory if it doesn't exist
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
|
|
# Copy file or directory
|
|
if os.path.isdir(full_source_path):
|
|
shutil.copytree(full_source_path, dest_path, dirs_exist_ok=True)
|
|
else:
|
|
shutil.copy2(full_source_path, dest_path)
|
|
|
|
except Exception as e:
|
|
raise ChrootError(f"Failed to copy {source_path} from chroot: {e}")
|
|
|
|
def scrub_chroot(self, chroot_name: str) -> None:
|
|
"""Clean up chroot without removing it (similar to Mock's --scrub)"""
|
|
if not self.chroot_exists(chroot_name):
|
|
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
|
|
|
|
try:
|
|
# Clean package cache
|
|
self.execute_in_chroot(chroot_name, ["apt-get", "clean"])
|
|
|
|
# Clean temporary files
|
|
self.execute_in_chroot(chroot_name, ["rm", "-rf", "/tmp/*"])
|
|
self.execute_in_chroot(chroot_name, ["rm", "-rf", "/var/tmp/*"])
|
|
|
|
# Clean build artifacts
|
|
self.execute_in_chroot(chroot_name, ["rm", "-rf", "/build/*"])
|
|
|
|
except Exception as e:
|
|
raise ChrootError(f"Failed to scrub chroot '{chroot_name}': {e}")
|
|
|
|
def scrub_all_chroots(self) -> None:
|
|
"""Clean up all chroots (similar to Mock's --scrub-all-chroots)"""
|
|
chroots = self.list_chroots()
|
|
|
|
for chroot_name in chroots:
|
|
try:
|
|
self.scrub_chroot(chroot_name)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to scrub chroot '{chroot_name}': {e}")
|
|
|
|
def _setup_advanced_mounts(self, chroot_name: str) -> None:
|
|
"""Setup advanced mount points for the chroot"""
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
|
|
# Initialize mount tracking for this chroot
|
|
self._active_mounts[chroot_name] = []
|
|
|
|
try:
|
|
# Setup standard system mounts
|
|
if self.config.mount_proc:
|
|
self._mount_proc(chroot_name, chroot_path)
|
|
|
|
if self.config.mount_sys:
|
|
self._mount_sys(chroot_name, chroot_path)
|
|
|
|
if self.config.mount_dev:
|
|
self._mount_dev(chroot_name, chroot_path)
|
|
|
|
if self.config.mount_devpts:
|
|
self._mount_devpts(chroot_name, chroot_path)
|
|
|
|
if self.config.mount_tmp:
|
|
self._mount_tmp(chroot_name, chroot_path)
|
|
|
|
# Setup custom bind mounts
|
|
for bind_mount in self.config.bind_mounts:
|
|
self._setup_bind_mount(chroot_name, bind_mount)
|
|
|
|
# Setup tmpfs mounts
|
|
for tmpfs_mount in self.config.tmpfs_mounts:
|
|
self._setup_tmpfs_mount(chroot_name, tmpfs_mount)
|
|
|
|
# Setup overlay mounts
|
|
for overlay_mount in self.config.overlay_mounts:
|
|
self._setup_overlay_mount(chroot_name, overlay_mount)
|
|
|
|
except Exception as e:
|
|
raise ChrootError(
|
|
f"Failed to setup advanced mounts: {e}",
|
|
chroot_name=chroot_name,
|
|
operation="mount_setup"
|
|
)
|
|
|
|
def _mount_proc(self, chroot_name: str, chroot_path: str) -> None:
|
|
"""Mount /proc in the chroot"""
|
|
proc_path = os.path.join(chroot_path, "proc")
|
|
if not os.path.exists(proc_path):
|
|
os.makedirs(proc_path, exist_ok=True)
|
|
|
|
try:
|
|
subprocess.run(["mount", "--bind", "/proc", proc_path], check=True)
|
|
self._active_mounts[chroot_name].append(("proc", proc_path))
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Failed to mount /proc: {e}")
|
|
|
|
def _mount_sys(self, chroot_name: str, chroot_path: str) -> None:
|
|
"""Mount /sys in the chroot"""
|
|
sys_path = os.path.join(chroot_path, "sys")
|
|
if not os.path.exists(sys_path):
|
|
os.makedirs(sys_path, exist_ok=True)
|
|
|
|
try:
|
|
subprocess.run(["mount", "--bind", "/sys", sys_path], check=True)
|
|
self._active_mounts[chroot_name].append(("sys", sys_path))
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Failed to mount /sys: {e}")
|
|
|
|
def _mount_dev(self, chroot_name: str, chroot_path: str) -> None:
|
|
"""Mount /dev in the chroot"""
|
|
dev_path = os.path.join(chroot_path, "dev")
|
|
if not os.path.exists(dev_path):
|
|
os.makedirs(dev_path, exist_ok=True)
|
|
|
|
try:
|
|
subprocess.run(["mount", "--bind", "/dev", dev_path], check=True)
|
|
self._active_mounts[chroot_name].append(("dev", dev_path))
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Failed to mount /dev: {e}")
|
|
|
|
def _mount_devpts(self, chroot_name: str, chroot_path: str) -> None:
|
|
"""Mount /dev/pts in the chroot"""
|
|
devpts_path = os.path.join(chroot_path, "dev", "pts")
|
|
if not os.path.exists(devpts_path):
|
|
os.makedirs(devpts_path, exist_ok=True)
|
|
|
|
try:
|
|
subprocess.run(["mount", "-t", "devpts", "devpts", devpts_path], check=True)
|
|
self._active_mounts[chroot_name].append(("devpts", devpts_path))
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Failed to mount /dev/pts: {e}")
|
|
|
|
def _mount_tmp(self, chroot_name: str, chroot_path: str) -> None:
|
|
"""Mount /tmp in the chroot"""
|
|
tmp_path = os.path.join(chroot_path, "tmp")
|
|
if not os.path.exists(tmp_path):
|
|
os.makedirs(tmp_path, exist_ok=True)
|
|
|
|
try:
|
|
# Use tmpfs for better performance if configured
|
|
if self.config.use_tmpfs:
|
|
subprocess.run([
|
|
"mount", "-t", "tmpfs", "-o", f"size={self.config.tmpfs_size}",
|
|
"tmpfs", tmp_path
|
|
], check=True)
|
|
self._active_mounts[chroot_name].append(("tmpfs", tmp_path))
|
|
else:
|
|
# Bind mount host /tmp
|
|
subprocess.run(["mount", "--bind", "/tmp", tmp_path], check=True)
|
|
self._active_mounts[chroot_name].append(("tmp", tmp_path))
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Failed to mount /tmp: {e}")
|
|
|
|
def _setup_bind_mount(self, chroot_name: str, bind_mount: Dict[str, str]) -> None:
|
|
"""Setup a custom bind mount"""
|
|
host_path = bind_mount.get("host")
|
|
chroot_path = bind_mount.get("chroot")
|
|
options = bind_mount.get("options", "")
|
|
|
|
if not host_path or not chroot_path:
|
|
print(f"Warning: Invalid bind mount configuration: {bind_mount}")
|
|
return
|
|
|
|
# Create chroot mount point
|
|
full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/"))
|
|
os.makedirs(full_chroot_path, exist_ok=True)
|
|
|
|
try:
|
|
mount_cmd = ["mount", "--bind"]
|
|
if options:
|
|
mount_cmd.extend(["-o", options])
|
|
mount_cmd.extend([host_path, full_chroot_path])
|
|
|
|
subprocess.run(mount_cmd, check=True)
|
|
self._active_mounts[chroot_name].append(("bind", full_chroot_path))
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Failed to setup bind mount {host_path} -> {chroot_path}: {e}")
|
|
|
|
def _setup_tmpfs_mount(self, chroot_name: str, tmpfs_mount: Dict[str, str]) -> None:
|
|
"""Setup a tmpfs mount"""
|
|
chroot_path = tmpfs_mount.get("chroot")
|
|
size = tmpfs_mount.get("size", "100M")
|
|
options = tmpfs_mount.get("options", "")
|
|
|
|
if not chroot_path:
|
|
print(f"Warning: Invalid tmpfs mount configuration: {tmpfs_mount}")
|
|
return
|
|
|
|
# Create chroot mount point
|
|
full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/"))
|
|
os.makedirs(full_chroot_path, exist_ok=True)
|
|
|
|
try:
|
|
mount_cmd = ["mount", "-t", "tmpfs", "-o", f"size={size}"]
|
|
if options:
|
|
mount_cmd[-1] += f",{options}"
|
|
mount_cmd.extend(["tmpfs", full_chroot_path])
|
|
|
|
subprocess.run(mount_cmd, check=True)
|
|
self._active_mounts[chroot_name].append(("tmpfs", full_chroot_path))
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Failed to setup tmpfs mount {chroot_path}: {e}")
|
|
|
|
def _setup_overlay_mount(self, chroot_name: str, overlay_mount: Dict[str, str]) -> None:
|
|
"""Setup an overlay mount (requires overlayfs support)"""
|
|
lower_dir = overlay_mount.get("lower")
|
|
upper_dir = overlay_mount.get("upper")
|
|
work_dir = overlay_mount.get("work")
|
|
chroot_path = overlay_mount.get("chroot")
|
|
|
|
if not all([lower_dir, upper_dir, work_dir, chroot_path]):
|
|
print(f"Warning: Invalid overlay mount configuration: {overlay_mount}")
|
|
return
|
|
|
|
# Create chroot mount point
|
|
full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/"))
|
|
os.makedirs(full_chroot_path, exist_ok=True)
|
|
|
|
try:
|
|
# Create work directory if it doesn't exist
|
|
os.makedirs(work_dir, exist_ok=True)
|
|
|
|
mount_cmd = [
|
|
"mount", "-t", "overlay", "overlay",
|
|
"-o", f"lowerdir={lower_dir},upperdir={upper_dir},workdir={work_dir}",
|
|
full_chroot_path
|
|
]
|
|
|
|
subprocess.run(mount_cmd, check=True)
|
|
self._active_mounts[chroot_name].append(("overlay", full_chroot_path))
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Failed to setup overlay mount {chroot_path}: {e}")
|
|
|
|
def cleanup_mounts(self, chroot_name: str) -> None:
|
|
"""Clean up all mounts for a chroot"""
|
|
if chroot_name not in self._active_mounts:
|
|
return
|
|
|
|
for mount_type, mount_path in reversed(self._active_mounts[chroot_name]):
|
|
try:
|
|
subprocess.run(["umount", mount_path], check=True)
|
|
print(f"Unmounted {mount_type}: {mount_path}")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Failed to unmount {mount_type} {mount_path}: {e}")
|
|
|
|
# Clear the mount list
|
|
self._active_mounts[chroot_name] = []
|
|
|
|
def list_mounts(self, chroot_name: str) -> List[Dict[str, str]]:
|
|
"""List all active mounts for a chroot"""
|
|
if chroot_name not in self._active_mounts:
|
|
return []
|
|
|
|
mounts = []
|
|
for mount_type, mount_path in self._active_mounts[chroot_name]:
|
|
mounts.append({
|
|
"type": mount_type,
|
|
"path": mount_path,
|
|
"chroot": chroot_name
|
|
})
|
|
|
|
return mounts
|
|
|
|
def _setup_chroot_users(self, chroot_name: str) -> None:
|
|
"""Setup users and permissions in the chroot"""
|
|
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
|
|
|
|
try:
|
|
# Create the build user
|
|
self.uid_manager.create_chroot_user(chroot_path)
|
|
|
|
# Copy host users if configured
|
|
if hasattr(self.config, 'copy_host_users'):
|
|
for username in self.config.copy_host_users:
|
|
self.uid_manager.copy_host_user(chroot_path, username)
|
|
|
|
# Setup chroot permissions
|
|
self.uid_manager.setup_chroot_permissions(chroot_path)
|
|
|
|
except Exception as e:
|
|
raise ChrootError(
|
|
f"Failed to setup chroot users: {e}",
|
|
chroot_name=chroot_name,
|
|
operation="user_setup"
|
|
)
|