deb-mock/deb_mock/chroot.py
robojerk c51819c836
Some checks failed
Build Deb-Mock Package / build (push) Failing after 1m9s
Lint Code / Lint All Code (push) Failing after 1s
Test Deb-Mock Build / test (push) Failing after 35s
Add comprehensive testing framework, performance monitoring, and plugin system
- Add complete pytest testing framework with conftest.py and test files
- Add performance monitoring and benchmarking capabilities
- Add plugin system with ccache plugin example
- Add comprehensive documentation (API, deployment, testing, etc.)
- Add Docker API wrapper for service deployment
- Add advanced configuration examples
- Remove old wget package file
- Update core modules with enhanced functionality
2025-08-19 20:49:32 -07:00

747 lines
27 KiB
Python

"""
Chroot management for deb-mock
"""
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import List, Dict, Optional
from .exceptions import ChrootError
from .uid_manager import UIDManager
class ChrootManager:
"""Manages chroot environments for deb-mock"""
def __init__(self, config):
self.config = config
self._active_mounts = {} # Track active mounts per chroot
self.uid_manager = UIDManager(config)
def create_chroot(self, chroot_name: str, arch: str = None, suite: str = None) -> None:
"""Create a new chroot environment"""
if arch:
self.config.architecture = arch
if suite:
self.config.suite = suite
# Check if bootstrap chroot is needed (Mock FAQ #2)
if self.config.use_bootstrap_chroot:
self._create_bootstrap_chroot(chroot_name)
else:
self._create_standard_chroot(chroot_name)
# Setup advanced mounts after chroot creation
self._setup_advanced_mounts(chroot_name)
# Setup UID/GID management
self._setup_chroot_users(chroot_name)
def _create_bootstrap_chroot(self, chroot_name: str) -> None:
"""
Create a bootstrap chroot for cross-distribution builds.
This addresses Mock FAQ #2 about building packages for newer distributions
on older systems (e.g., building Debian Sid packages on Debian Stable).
"""
bootstrap_name = self.config.bootstrap_chroot_name or f"{chroot_name}-bootstrap"
bootstrap_path = os.path.join(self.config.chroot_dir, bootstrap_name)
# Create minimal bootstrap chroot first
if not os.path.exists(bootstrap_path):
self._create_standard_chroot(bootstrap_name)
# Use bootstrap chroot to create the final chroot
try:
# Create final chroot using debootstrap from within bootstrap
cmd = [
"/usr/sbin/debootstrap",
"--arch",
self.config.architecture,
self.config.suite,
f"/var/lib/deb-mock/chroots/{chroot_name}",
self.config.mirror,
]
# Execute debootstrap within bootstrap chroot
result = self.execute_in_chroot(bootstrap_name, cmd, capture_output=True)
if result.returncode != 0:
raise ChrootError(
f"Failed to create chroot using bootstrap: {result.stderr}",
chroot_name=chroot_name,
operation="bootstrap_debootstrap",
)
# Configure the new chroot
self._configure_chroot(chroot_name)
except Exception as e:
raise ChrootError(
f"Bootstrap chroot creation failed: {e}",
chroot_name=chroot_name,
operation="bootstrap_creation",
)
def _create_standard_chroot(self, chroot_name: str) -> None:
"""Create a standard chroot using debootstrap"""
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
if os.path.exists(chroot_path):
raise ChrootError(
f"Chroot '{chroot_name}' already exists",
chroot_name=chroot_name,
operation="create",
)
try:
# Create chroot directory
os.makedirs(chroot_path, exist_ok=True)
# Run debootstrap
cmd = [
"/usr/sbin/debootstrap",
"--arch",
self.config.architecture,
self.config.suite,
chroot_path,
self.config.mirror,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if result.returncode != 0:
raise ChrootError(
f"debootstrap failed: {result.stderr}",
chroot_name=chroot_name,
operation="/usr/sbin/debootstrap",
chroot_path=chroot_path,
)
# Configure the chroot
self._configure_chroot(chroot_name)
except subprocess.CalledProcessError as e:
raise ChrootError(
f"Failed to create chroot: {e}",
chroot_name=chroot_name,
operation="create",
chroot_path=chroot_path,
)
def _configure_chroot(self, chroot_name: str) -> None:
"""Configure a newly created chroot"""
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
# Create schroot configuration
self._create_schroot_config(chroot_name, chroot_path, self.config.architecture, self.config.suite)
# Install additional packages if specified
if self.config.chroot_additional_packages:
self._install_additional_packages(chroot_name)
# Run setup commands if specified
if self.config.chroot_setup_cmd:
self._run_setup_commands(chroot_name)
def _install_additional_packages(self, chroot_name: str) -> None:
"""Install additional packages in the chroot"""
try:
# Update package lists
self.execute_in_chroot(chroot_name, ["apt-get", "update"], capture_output=True)
# Install packages
cmd = ["apt-get", "install", "-y"] + self.config.chroot_additional_packages
result = self.execute_in_chroot(chroot_name, cmd, capture_output=True)
if result.returncode != 0:
raise ChrootError(
f"Failed to install additional packages: {result.stderr}",
chroot_name=chroot_name,
operation="install_packages",
)
except Exception as e:
raise ChrootError(
f"Failed to install additional packages: {e}",
chroot_name=chroot_name,
operation="install_packages",
)
def _run_setup_commands(self, chroot_name: str) -> None:
"""Run setup commands in the chroot"""
for cmd in self.config.chroot_setup_cmd:
try:
result = self.execute_in_chroot(chroot_name, cmd.split(), capture_output=True)
if result.returncode != 0:
raise ChrootError(
f"Setup command failed: {result.stderr}",
chroot_name=chroot_name,
operation="setup_command",
)
except Exception as e:
raise ChrootError(
f"Failed to run setup command '{cmd}': {e}",
chroot_name=chroot_name,
operation="setup_command",
)
def _create_schroot_config(self, chroot_name: str, chroot_path: str, arch: str, suite: str) -> None:
"""Create schroot configuration file"""
config_content = f"""[{chroot_name}]
description=Deb-Mock chroot for {suite} {arch}
directory={chroot_path}
root-users=root
users=root
type=directory
profile=desktop
preserve-environment=true
"""
config_file = os.path.join(self.config.chroot_config_dir, f"{chroot_name}.conf")
try:
with open(config_file, "w") as f:
f.write(config_content)
except Exception as e:
raise ChrootError(f"Failed to create schroot config: {e}")
def _initialize_chroot(self, chroot_path: str, arch: str, suite: str) -> None:
"""Initialize chroot using debootstrap"""
cmd = [
"/usr/sbin/debootstrap",
"--arch",
arch,
"--variant=buildd",
suite,
chroot_path,
"http://deb.debian.org/debian/",
]
try:
subprocess.run(cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
raise ChrootError(f"debootstrap failed: {e.stderr}")
except FileNotFoundError:
raise ChrootError("debootstrap not found. Please install debootstrap package.")
def _install_build_tools(self, chroot_name: str) -> None:
"""Install essential build tools in the chroot"""
packages = [
"build-essential",
"devscripts",
"debhelper",
"dh-make",
"fakeroot",
"lintian",
"sbuild",
"schroot",
]
cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "update"]
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
raise ChrootError(f"Failed to update package lists: {e}")
cmd = [
"schroot",
"-c",
chroot_name,
"--",
"apt-get",
"install",
"-y",
] + packages
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
raise ChrootError(f"Failed to install build tools: {e}")
def clean_chroot(self, chroot_name: str) -> None:
"""Clean up a chroot environment"""
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
config_file = os.path.join(self.config.chroot_config_dir, f"{chroot_name}.conf")
try:
# Remove schroot configuration
if os.path.exists(config_file):
os.remove(config_file)
# Remove chroot directory
if os.path.exists(chroot_path):
shutil.rmtree(chroot_path)
except Exception as e:
raise ChrootError(f"Failed to clean chroot '{chroot_name}': {e}")
def list_chroots(self) -> List[str]:
"""List available chroot environments"""
chroots = []
try:
# List chroot configurations
for config_file in Path(self.config.chroot_config_dir).glob("*.conf"):
chroot_name = config_file.stem
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
if os.path.exists(chroot_path):
chroots.append(chroot_name)
except Exception as e:
raise ChrootError(f"Failed to list chroots: {e}")
return chroots
def chroot_exists(self, chroot_name: str) -> bool:
"""Check if a chroot environment exists"""
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
config_file = os.path.join(self.config.chroot_config_dir, f"{chroot_name}.conf")
return os.path.exists(chroot_path) and os.path.exists(config_file)
def get_chroot_info(self, chroot_name: str) -> dict:
"""Get information about a chroot environment"""
if not self.chroot_exists(chroot_name):
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
info = {
"name": chroot_name,
"path": chroot_path,
"exists": True,
"size": 0,
"created": None,
"modified": None,
}
try:
stat = os.stat(chroot_path)
info["size"] = stat.st_size
info["created"] = stat.st_ctime
info["modified"] = stat.st_mtime
except Exception:
pass
return info
def update_chroot(self, chroot_name: str) -> None:
"""Update packages in a chroot environment"""
if not self.chroot_exists(chroot_name):
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
try:
# Update package lists
cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "update"]
subprocess.run(cmd, check=True)
# Upgrade packages
cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "upgrade", "-y"]
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
raise ChrootError(f"Failed to update chroot '{chroot_name}': {e}")
def execute_in_chroot(
self,
chroot_name: str,
command: list,
capture_output: bool = True,
preserve_env: bool = True,
) -> subprocess.CompletedProcess:
"""Execute a command in the chroot environment"""
if not self.chroot_exists(chroot_name):
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
# Prepare environment variables (Mock FAQ #1 - Environment preservation)
env = self._prepare_chroot_environment(preserve_env)
# Build schroot command
schroot_cmd = [
"schroot",
"-c",
chroot_name,
"--",
"sh",
"-c",
" ".join(command),
]
try:
if capture_output:
result = subprocess.run(
schroot_cmd,
cwd=chroot_path,
env=env,
capture_output=True,
text=True,
check=False,
)
else:
result = subprocess.run(schroot_cmd, cwd=chroot_path, env=env, check=False)
return result
except subprocess.CalledProcessError as e:
raise ChrootError(f"Command failed in chroot: {e}")
def _prepare_chroot_environment(self, preserve_env: bool = True) -> dict:
"""
Prepare environment variables for chroot execution.
This addresses Mock FAQ #1 about environment variable preservation.
"""
env = os.environ.copy()
if not preserve_env or not self.config.environment_sanitization:
return env
# Filter environment variables based on allowed list
filtered_env = {}
# Always preserve basic system variables
basic_vars = ["PATH", "HOME", "USER", "SHELL", "TERM", "LANG", "LC_ALL"]
for var in basic_vars:
if var in env:
filtered_env[var] = env[var]
# Preserve allowed build-related variables
for var in self.config.allowed_environment_vars:
if var in env:
filtered_env[var] = env[var]
# Preserve user-specified variables
for var in self.config.preserve_environment:
if var in env:
filtered_env[var] = env[var]
return filtered_env
def copy_to_chroot(self, source_path: str, dest_path: str, chroot_name: str) -> None:
"""Copy files from host to chroot (similar to Mock's --copyin)"""
if not self.chroot_exists(chroot_name):
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
full_dest_path = os.path.join(chroot_path, dest_path.lstrip("/"))
try:
# Create destination directory if it doesn't exist
os.makedirs(os.path.dirname(full_dest_path), exist_ok=True)
# Copy file or directory
if os.path.isdir(source_path):
shutil.copytree(source_path, full_dest_path, dirs_exist_ok=True)
else:
shutil.copy2(source_path, full_dest_path)
except Exception as e:
raise ChrootError(f"Failed to copy {source_path} to chroot: {e}")
def copy_from_chroot(self, source_path: str, dest_path: str, chroot_name: str) -> None:
"""Copy files from chroot to host (similar to Mock's --copyout)"""
if not self.chroot_exists(chroot_name):
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
full_source_path = os.path.join(chroot_path, source_path.lstrip("/"))
try:
# Create destination directory if it doesn't exist
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
# Copy file or directory
if os.path.isdir(full_source_path):
shutil.copytree(full_source_path, dest_path, dirs_exist_ok=True)
else:
shutil.copy2(full_source_path, dest_path)
except Exception as e:
raise ChrootError(f"Failed to copy {source_path} from chroot: {e}")
def scrub_chroot(self, chroot_name: str) -> None:
"""Clean up chroot without removing it (similar to Mock's --scrub)"""
if not self.chroot_exists(chroot_name):
raise ChrootError(f"Chroot '{chroot_name}' does not exist")
try:
# Clean package cache
self.execute_in_chroot(chroot_name, ["apt-get", "clean"])
# Clean temporary files
self.execute_in_chroot(chroot_name, ["rm", "-rf", "/tmp/*"])
self.execute_in_chroot(chroot_name, ["rm", "-rf", "/var/tmp/*"])
# Clean build artifacts
self.execute_in_chroot(chroot_name, ["rm", "-rf", "/build/*"])
except Exception as e:
raise ChrootError(f"Failed to scrub chroot '{chroot_name}': {e}")
def scrub_all_chroots(self) -> None:
"""Clean up all chroots (similar to Mock's --scrub-all-chroots)"""
chroots = self.list_chroots()
for chroot_name in chroots:
try:
self.scrub_chroot(chroot_name)
except Exception as e:
print(f"Warning: Failed to scrub chroot '{chroot_name}': {e}")
def _setup_advanced_mounts(self, chroot_name: str) -> None:
"""Setup advanced mount points for the chroot"""
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
# Initialize mount tracking for this chroot
self._active_mounts[chroot_name] = []
try:
# Setup standard system mounts
if self.config.mount_proc:
self._mount_proc(chroot_name, chroot_path)
if self.config.mount_sys:
self._mount_sys(chroot_name, chroot_path)
if self.config.mount_dev:
self._mount_dev(chroot_name, chroot_path)
if self.config.mount_devpts:
self._mount_devpts(chroot_name, chroot_path)
if self.config.mount_tmp:
self._mount_tmp(chroot_name, chroot_path)
# Setup custom bind mounts
for bind_mount in self.config.bind_mounts:
self._setup_bind_mount(chroot_name, bind_mount)
# Setup tmpfs mounts
for tmpfs_mount in self.config.tmpfs_mounts:
self._setup_tmpfs_mount(chroot_name, tmpfs_mount)
# Setup overlay mounts
for overlay_mount in self.config.overlay_mounts:
self._setup_overlay_mount(chroot_name, overlay_mount)
except Exception as e:
raise ChrootError(
f"Failed to setup advanced mounts: {e}",
chroot_name=chroot_name,
operation="mount_setup"
)
def _mount_proc(self, chroot_name: str, chroot_path: str) -> None:
"""Mount /proc in the chroot"""
proc_path = os.path.join(chroot_path, "proc")
if not os.path.exists(proc_path):
os.makedirs(proc_path, exist_ok=True)
try:
subprocess.run(["mount", "--bind", "/proc", proc_path], check=True)
self._active_mounts[chroot_name].append(("proc", proc_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to mount /proc: {e}")
def _mount_sys(self, chroot_name: str, chroot_path: str) -> None:
"""Mount /sys in the chroot"""
sys_path = os.path.join(chroot_path, "sys")
if not os.path.exists(sys_path):
os.makedirs(sys_path, exist_ok=True)
try:
subprocess.run(["mount", "--bind", "/sys", sys_path], check=True)
self._active_mounts[chroot_name].append(("sys", sys_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to mount /sys: {e}")
def _mount_dev(self, chroot_name: str, chroot_path: str) -> None:
"""Mount /dev in the chroot"""
dev_path = os.path.join(chroot_path, "dev")
if not os.path.exists(dev_path):
os.makedirs(dev_path, exist_ok=True)
try:
subprocess.run(["mount", "--bind", "/dev", dev_path], check=True)
self._active_mounts[chroot_name].append(("dev", dev_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to mount /dev: {e}")
def _mount_devpts(self, chroot_name: str, chroot_path: str) -> None:
"""Mount /dev/pts in the chroot"""
devpts_path = os.path.join(chroot_path, "dev", "pts")
if not os.path.exists(devpts_path):
os.makedirs(devpts_path, exist_ok=True)
try:
subprocess.run(["mount", "-t", "devpts", "devpts", devpts_path], check=True)
self._active_mounts[chroot_name].append(("devpts", devpts_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to mount /dev/pts: {e}")
def _mount_tmp(self, chroot_name: str, chroot_path: str) -> None:
"""Mount /tmp in the chroot"""
tmp_path = os.path.join(chroot_path, "tmp")
if not os.path.exists(tmp_path):
os.makedirs(tmp_path, exist_ok=True)
try:
# Use tmpfs for better performance if configured
if self.config.use_tmpfs:
subprocess.run([
"mount", "-t", "tmpfs", "-o", f"size={self.config.tmpfs_size}",
"tmpfs", tmp_path
], check=True)
self._active_mounts[chroot_name].append(("tmpfs", tmp_path))
else:
# Bind mount host /tmp
subprocess.run(["mount", "--bind", "/tmp", tmp_path], check=True)
self._active_mounts[chroot_name].append(("tmp", tmp_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to mount /tmp: {e}")
def _setup_bind_mount(self, chroot_name: str, bind_mount: Dict[str, str]) -> None:
"""Setup a custom bind mount"""
host_path = bind_mount.get("host")
chroot_path = bind_mount.get("chroot")
options = bind_mount.get("options", "")
if not host_path or not chroot_path:
print(f"Warning: Invalid bind mount configuration: {bind_mount}")
return
# Create chroot mount point
full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/"))
os.makedirs(full_chroot_path, exist_ok=True)
try:
mount_cmd = ["mount", "--bind"]
if options:
mount_cmd.extend(["-o", options])
mount_cmd.extend([host_path, full_chroot_path])
subprocess.run(mount_cmd, check=True)
self._active_mounts[chroot_name].append(("bind", full_chroot_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to setup bind mount {host_path} -> {chroot_path}: {e}")
def _setup_tmpfs_mount(self, chroot_name: str, tmpfs_mount: Dict[str, str]) -> None:
"""Setup a tmpfs mount"""
chroot_path = tmpfs_mount.get("chroot")
size = tmpfs_mount.get("size", "100M")
options = tmpfs_mount.get("options", "")
if not chroot_path:
print(f"Warning: Invalid tmpfs mount configuration: {tmpfs_mount}")
return
# Create chroot mount point
full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/"))
os.makedirs(full_chroot_path, exist_ok=True)
try:
mount_cmd = ["mount", "-t", "tmpfs", "-o", f"size={size}"]
if options:
mount_cmd[-1] += f",{options}"
mount_cmd.extend(["tmpfs", full_chroot_path])
subprocess.run(mount_cmd, check=True)
self._active_mounts[chroot_name].append(("tmpfs", full_chroot_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to setup tmpfs mount {chroot_path}: {e}")
def _setup_overlay_mount(self, chroot_name: str, overlay_mount: Dict[str, str]) -> None:
"""Setup an overlay mount (requires overlayfs support)"""
lower_dir = overlay_mount.get("lower")
upper_dir = overlay_mount.get("upper")
work_dir = overlay_mount.get("work")
chroot_path = overlay_mount.get("chroot")
if not all([lower_dir, upper_dir, work_dir, chroot_path]):
print(f"Warning: Invalid overlay mount configuration: {overlay_mount}")
return
# Create chroot mount point
full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/"))
os.makedirs(full_chroot_path, exist_ok=True)
try:
# Create work directory if it doesn't exist
os.makedirs(work_dir, exist_ok=True)
mount_cmd = [
"mount", "-t", "overlay", "overlay",
"-o", f"lowerdir={lower_dir},upperdir={upper_dir},workdir={work_dir}",
full_chroot_path
]
subprocess.run(mount_cmd, check=True)
self._active_mounts[chroot_name].append(("overlay", full_chroot_path))
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to setup overlay mount {chroot_path}: {e}")
def cleanup_mounts(self, chroot_name: str) -> None:
"""Clean up all mounts for a chroot"""
if chroot_name not in self._active_mounts:
return
for mount_type, mount_path in reversed(self._active_mounts[chroot_name]):
try:
subprocess.run(["umount", mount_path], check=True)
print(f"Unmounted {mount_type}: {mount_path}")
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to unmount {mount_type} {mount_path}: {e}")
# Clear the mount list
self._active_mounts[chroot_name] = []
def list_mounts(self, chroot_name: str) -> List[Dict[str, str]]:
"""List all active mounts for a chroot"""
if chroot_name not in self._active_mounts:
return []
mounts = []
for mount_type, mount_path in self._active_mounts[chroot_name]:
mounts.append({
"type": mount_type,
"path": mount_path,
"chroot": chroot_name
})
return mounts
def _setup_chroot_users(self, chroot_name: str) -> None:
"""Setup users and permissions in the chroot"""
chroot_path = os.path.join(self.config.chroot_dir, chroot_name)
try:
# Create the build user
self.uid_manager.create_chroot_user(chroot_path)
# Copy host users if configured
if hasattr(self.config, 'copy_host_users'):
for username in self.config.copy_host_users:
self.uid_manager.copy_host_user(chroot_path, username)
# Setup chroot permissions
self.uid_manager.setup_chroot_permissions(chroot_path)
except Exception as e:
raise ChrootError(
f"Failed to setup chroot users: {e}",
chroot_name=chroot_name,
operation="user_setup"
)