""" Chroot management for deb-mock """ import os import shutil import subprocess import tempfile from pathlib import Path from typing import List, Dict, Optional from .exceptions import ChrootError from .uid_manager import UIDManager class ChrootManager: """Manages chroot environments for deb-mock""" def __init__(self, config): self.config = config self._active_mounts = {} # Track active mounts per chroot self.uid_manager = UIDManager(config) def create_chroot(self, chroot_name: str, arch: str = None, suite: str = None) -> None: """Create a new chroot environment""" if arch: self.config.architecture = arch if suite: self.config.suite = suite # Check if bootstrap chroot is needed (Mock FAQ #2) if self.config.use_bootstrap_chroot: self._create_bootstrap_chroot(chroot_name) else: self._create_standard_chroot(chroot_name) # Setup advanced mounts after chroot creation self._setup_advanced_mounts(chroot_name) # Setup UID/GID management self._setup_chroot_users(chroot_name) def _create_bootstrap_chroot(self, chroot_name: str) -> None: """ Create a bootstrap chroot for cross-distribution builds. This addresses Mock FAQ #2 about building packages for newer distributions on older systems (e.g., building Debian Sid packages on Debian Stable). """ bootstrap_name = self.config.bootstrap_chroot_name or f"{chroot_name}-bootstrap" bootstrap_path = os.path.join(self.config.chroot_dir, bootstrap_name) # Create minimal bootstrap chroot first if not os.path.exists(bootstrap_path): self._create_standard_chroot(bootstrap_name) # Use bootstrap chroot to create the final chroot try: # Create final chroot using debootstrap from within bootstrap cmd = [ "/usr/sbin/debootstrap", "--arch", self.config.architecture, self.config.suite, f"/var/lib/deb-mock/chroots/{chroot_name}", self.config.mirror, ] # Execute debootstrap within bootstrap chroot result = self.execute_in_chroot(bootstrap_name, cmd, capture_output=True) if result.returncode != 0: raise ChrootError( f"Failed to create chroot using bootstrap: {result.stderr}", chroot_name=chroot_name, operation="bootstrap_debootstrap", ) # Configure the new chroot self._configure_chroot(chroot_name) except Exception as e: raise ChrootError( f"Bootstrap chroot creation failed: {e}", chroot_name=chroot_name, operation="bootstrap_creation", ) def _create_standard_chroot(self, chroot_name: str) -> None: """Create a standard chroot using debootstrap""" chroot_path = os.path.join(self.config.chroot_dir, chroot_name) if os.path.exists(chroot_path): raise ChrootError( f"Chroot '{chroot_name}' already exists", chroot_name=chroot_name, operation="create", ) try: # Create chroot directory os.makedirs(chroot_path, exist_ok=True) # Run debootstrap cmd = [ "/usr/sbin/debootstrap", "--arch", self.config.architecture, self.config.suite, chroot_path, self.config.mirror, ] result = subprocess.run(cmd, capture_output=True, text=True, check=False) if result.returncode != 0: raise ChrootError( f"debootstrap failed: {result.stderr}", chroot_name=chroot_name, operation="/usr/sbin/debootstrap", chroot_path=chroot_path, ) # Configure the chroot self._configure_chroot(chroot_name) except subprocess.CalledProcessError as e: raise ChrootError( f"Failed to create chroot: {e}", chroot_name=chroot_name, operation="create", chroot_path=chroot_path, ) def _configure_chroot(self, chroot_name: str) -> None: """Configure a newly created chroot""" chroot_path = os.path.join(self.config.chroot_dir, chroot_name) # Create schroot configuration self._create_schroot_config(chroot_name, chroot_path, self.config.architecture, self.config.suite) # Install additional packages if specified if self.config.chroot_additional_packages: self._install_additional_packages(chroot_name) # Run setup commands if specified if self.config.chroot_setup_cmd: self._run_setup_commands(chroot_name) def _install_additional_packages(self, chroot_name: str) -> None: """Install additional packages in the chroot""" try: # Update package lists self.execute_in_chroot(chroot_name, ["apt-get", "update"], capture_output=True) # Install packages cmd = ["apt-get", "install", "-y"] + self.config.chroot_additional_packages result = self.execute_in_chroot(chroot_name, cmd, capture_output=True) if result.returncode != 0: raise ChrootError( f"Failed to install additional packages: {result.stderr}", chroot_name=chroot_name, operation="install_packages", ) except Exception as e: raise ChrootError( f"Failed to install additional packages: {e}", chroot_name=chroot_name, operation="install_packages", ) def _run_setup_commands(self, chroot_name: str) -> None: """Run setup commands in the chroot""" for cmd in self.config.chroot_setup_cmd: try: result = self.execute_in_chroot(chroot_name, cmd.split(), capture_output=True) if result.returncode != 0: raise ChrootError( f"Setup command failed: {result.stderr}", chroot_name=chroot_name, operation="setup_command", ) except Exception as e: raise ChrootError( f"Failed to run setup command '{cmd}': {e}", chroot_name=chroot_name, operation="setup_command", ) def _create_schroot_config(self, chroot_name: str, chroot_path: str, arch: str, suite: str) -> None: """Create schroot configuration file""" config_content = f"""[{chroot_name}] description=Deb-Mock chroot for {suite} {arch} directory={chroot_path} root-users=root users=root type=directory profile=desktop preserve-environment=true """ config_file = os.path.join(self.config.chroot_config_dir, f"{chroot_name}.conf") try: with open(config_file, "w") as f: f.write(config_content) except Exception as e: raise ChrootError(f"Failed to create schroot config: {e}") def _initialize_chroot(self, chroot_path: str, arch: str, suite: str) -> None: """Initialize chroot using debootstrap""" cmd = [ "/usr/sbin/debootstrap", "--arch", arch, "--variant=buildd", suite, chroot_path, "http://deb.debian.org/debian/", ] try: subprocess.run(cmd, capture_output=True, text=True, check=True) except subprocess.CalledProcessError as e: raise ChrootError(f"debootstrap failed: {e.stderr}") except FileNotFoundError: raise ChrootError("debootstrap not found. Please install debootstrap package.") def _install_build_tools(self, chroot_name: str) -> None: """Install essential build tools in the chroot""" packages = [ "build-essential", "devscripts", "debhelper", "dh-make", "fakeroot", "lintian", "sbuild", "schroot", ] cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "update"] try: subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: raise ChrootError(f"Failed to update package lists: {e}") cmd = [ "schroot", "-c", chroot_name, "--", "apt-get", "install", "-y", ] + packages try: subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: raise ChrootError(f"Failed to install build tools: {e}") def clean_chroot(self, chroot_name: str) -> None: """Clean up a chroot environment""" chroot_path = os.path.join(self.config.chroot_dir, chroot_name) config_file = os.path.join(self.config.chroot_config_dir, f"{chroot_name}.conf") try: # Remove schroot configuration if os.path.exists(config_file): os.remove(config_file) # Remove chroot directory if os.path.exists(chroot_path): shutil.rmtree(chroot_path) except Exception as e: raise ChrootError(f"Failed to clean chroot '{chroot_name}': {e}") def list_chroots(self) -> List[str]: """List available chroot environments""" chroots = [] try: # List chroot configurations for config_file in Path(self.config.chroot_config_dir).glob("*.conf"): chroot_name = config_file.stem chroot_path = os.path.join(self.config.chroot_dir, chroot_name) if os.path.exists(chroot_path): chroots.append(chroot_name) except Exception as e: raise ChrootError(f"Failed to list chroots: {e}") return chroots def chroot_exists(self, chroot_name: str) -> bool: """Check if a chroot environment exists""" chroot_path = os.path.join(self.config.chroot_dir, chroot_name) config_file = os.path.join(self.config.chroot_config_dir, f"{chroot_name}.conf") return os.path.exists(chroot_path) and os.path.exists(config_file) def get_chroot_info(self, chroot_name: str) -> dict: """Get information about a chroot environment""" if not self.chroot_exists(chroot_name): raise ChrootError(f"Chroot '{chroot_name}' does not exist") chroot_path = os.path.join(self.config.chroot_dir, chroot_name) info = { "name": chroot_name, "path": chroot_path, "exists": True, "size": 0, "created": None, "modified": None, } try: stat = os.stat(chroot_path) info["size"] = stat.st_size info["created"] = stat.st_ctime info["modified"] = stat.st_mtime except Exception: pass return info def update_chroot(self, chroot_name: str) -> None: """Update packages in a chroot environment""" if not self.chroot_exists(chroot_name): raise ChrootError(f"Chroot '{chroot_name}' does not exist") try: # Update package lists cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "update"] subprocess.run(cmd, check=True) # Upgrade packages cmd = ["schroot", "-c", chroot_name, "--", "apt-get", "upgrade", "-y"] subprocess.run(cmd, check=True) except subprocess.CalledProcessError as e: raise ChrootError(f"Failed to update chroot '{chroot_name}': {e}") def execute_in_chroot( self, chroot_name: str, command: list, capture_output: bool = True, preserve_env: bool = True, ) -> subprocess.CompletedProcess: """Execute a command in the chroot environment""" if not self.chroot_exists(chroot_name): raise ChrootError(f"Chroot '{chroot_name}' does not exist") chroot_path = os.path.join(self.config.chroot_dir, chroot_name) # Prepare environment variables (Mock FAQ #1 - Environment preservation) env = self._prepare_chroot_environment(preserve_env) # Build schroot command schroot_cmd = [ "schroot", "-c", chroot_name, "--", "sh", "-c", " ".join(command), ] try: if capture_output: result = subprocess.run( schroot_cmd, cwd=chroot_path, env=env, capture_output=True, text=True, check=False, ) else: result = subprocess.run(schroot_cmd, cwd=chroot_path, env=env, check=False) return result except subprocess.CalledProcessError as e: raise ChrootError(f"Command failed in chroot: {e}") def _prepare_chroot_environment(self, preserve_env: bool = True) -> dict: """ Prepare environment variables for chroot execution. This addresses Mock FAQ #1 about environment variable preservation. """ env = os.environ.copy() if not preserve_env or not self.config.environment_sanitization: return env # Filter environment variables based on allowed list filtered_env = {} # Always preserve basic system variables basic_vars = ["PATH", "HOME", "USER", "SHELL", "TERM", "LANG", "LC_ALL"] for var in basic_vars: if var in env: filtered_env[var] = env[var] # Preserve allowed build-related variables for var in self.config.allowed_environment_vars: if var in env: filtered_env[var] = env[var] # Preserve user-specified variables for var in self.config.preserve_environment: if var in env: filtered_env[var] = env[var] return filtered_env def copy_to_chroot(self, source_path: str, dest_path: str, chroot_name: str) -> None: """Copy files from host to chroot (similar to Mock's --copyin)""" if not self.chroot_exists(chroot_name): raise ChrootError(f"Chroot '{chroot_name}' does not exist") chroot_path = os.path.join(self.config.chroot_dir, chroot_name) full_dest_path = os.path.join(chroot_path, dest_path.lstrip("/")) try: # Create destination directory if it doesn't exist os.makedirs(os.path.dirname(full_dest_path), exist_ok=True) # Copy file or directory if os.path.isdir(source_path): shutil.copytree(source_path, full_dest_path, dirs_exist_ok=True) else: shutil.copy2(source_path, full_dest_path) except Exception as e: raise ChrootError(f"Failed to copy {source_path} to chroot: {e}") def copy_from_chroot(self, source_path: str, dest_path: str, chroot_name: str) -> None: """Copy files from chroot to host (similar to Mock's --copyout)""" if not self.chroot_exists(chroot_name): raise ChrootError(f"Chroot '{chroot_name}' does not exist") chroot_path = os.path.join(self.config.chroot_dir, chroot_name) full_source_path = os.path.join(chroot_path, source_path.lstrip("/")) try: # Create destination directory if it doesn't exist os.makedirs(os.path.dirname(dest_path), exist_ok=True) # Copy file or directory if os.path.isdir(full_source_path): shutil.copytree(full_source_path, dest_path, dirs_exist_ok=True) else: shutil.copy2(full_source_path, dest_path) except Exception as e: raise ChrootError(f"Failed to copy {source_path} from chroot: {e}") def scrub_chroot(self, chroot_name: str) -> None: """Clean up chroot without removing it (similar to Mock's --scrub)""" if not self.chroot_exists(chroot_name): raise ChrootError(f"Chroot '{chroot_name}' does not exist") try: # Clean package cache self.execute_in_chroot(chroot_name, ["apt-get", "clean"]) # Clean temporary files self.execute_in_chroot(chroot_name, ["rm", "-rf", "/tmp/*"]) self.execute_in_chroot(chroot_name, ["rm", "-rf", "/var/tmp/*"]) # Clean build artifacts self.execute_in_chroot(chroot_name, ["rm", "-rf", "/build/*"]) except Exception as e: raise ChrootError(f"Failed to scrub chroot '{chroot_name}': {e}") def scrub_all_chroots(self) -> None: """Clean up all chroots (similar to Mock's --scrub-all-chroots)""" chroots = self.list_chroots() for chroot_name in chroots: try: self.scrub_chroot(chroot_name) except Exception as e: print(f"Warning: Failed to scrub chroot '{chroot_name}': {e}") def _setup_advanced_mounts(self, chroot_name: str) -> None: """Setup advanced mount points for the chroot""" chroot_path = os.path.join(self.config.chroot_dir, chroot_name) # Initialize mount tracking for this chroot self._active_mounts[chroot_name] = [] try: # Setup standard system mounts if self.config.mount_proc: self._mount_proc(chroot_name, chroot_path) if self.config.mount_sys: self._mount_sys(chroot_name, chroot_path) if self.config.mount_dev: self._mount_dev(chroot_name, chroot_path) if self.config.mount_devpts: self._mount_devpts(chroot_name, chroot_path) if self.config.mount_tmp: self._mount_tmp(chroot_name, chroot_path) # Setup custom bind mounts for bind_mount in self.config.bind_mounts: self._setup_bind_mount(chroot_name, bind_mount) # Setup tmpfs mounts for tmpfs_mount in self.config.tmpfs_mounts: self._setup_tmpfs_mount(chroot_name, tmpfs_mount) # Setup overlay mounts for overlay_mount in self.config.overlay_mounts: self._setup_overlay_mount(chroot_name, overlay_mount) except Exception as e: raise ChrootError( f"Failed to setup advanced mounts: {e}", chroot_name=chroot_name, operation="mount_setup" ) def _mount_proc(self, chroot_name: str, chroot_path: str) -> None: """Mount /proc in the chroot""" proc_path = os.path.join(chroot_path, "proc") if not os.path.exists(proc_path): os.makedirs(proc_path, exist_ok=True) try: subprocess.run(["mount", "--bind", "/proc", proc_path], check=True) self._active_mounts[chroot_name].append(("proc", proc_path)) except subprocess.CalledProcessError as e: print(f"Warning: Failed to mount /proc: {e}") def _mount_sys(self, chroot_name: str, chroot_path: str) -> None: """Mount /sys in the chroot""" sys_path = os.path.join(chroot_path, "sys") if not os.path.exists(sys_path): os.makedirs(sys_path, exist_ok=True) try: subprocess.run(["mount", "--bind", "/sys", sys_path], check=True) self._active_mounts[chroot_name].append(("sys", sys_path)) except subprocess.CalledProcessError as e: print(f"Warning: Failed to mount /sys: {e}") def _mount_dev(self, chroot_name: str, chroot_path: str) -> None: """Mount /dev in the chroot""" dev_path = os.path.join(chroot_path, "dev") if not os.path.exists(dev_path): os.makedirs(dev_path, exist_ok=True) try: subprocess.run(["mount", "--bind", "/dev", dev_path], check=True) self._active_mounts[chroot_name].append(("dev", dev_path)) except subprocess.CalledProcessError as e: print(f"Warning: Failed to mount /dev: {e}") def _mount_devpts(self, chroot_name: str, chroot_path: str) -> None: """Mount /dev/pts in the chroot""" devpts_path = os.path.join(chroot_path, "dev", "pts") if not os.path.exists(devpts_path): os.makedirs(devpts_path, exist_ok=True) try: subprocess.run(["mount", "-t", "devpts", "devpts", devpts_path], check=True) self._active_mounts[chroot_name].append(("devpts", devpts_path)) except subprocess.CalledProcessError as e: print(f"Warning: Failed to mount /dev/pts: {e}") def _mount_tmp(self, chroot_name: str, chroot_path: str) -> None: """Mount /tmp in the chroot""" tmp_path = os.path.join(chroot_path, "tmp") if not os.path.exists(tmp_path): os.makedirs(tmp_path, exist_ok=True) try: # Use tmpfs for better performance if configured if self.config.use_tmpfs: subprocess.run([ "mount", "-t", "tmpfs", "-o", f"size={self.config.tmpfs_size}", "tmpfs", tmp_path ], check=True) self._active_mounts[chroot_name].append(("tmpfs", tmp_path)) else: # Bind mount host /tmp subprocess.run(["mount", "--bind", "/tmp", tmp_path], check=True) self._active_mounts[chroot_name].append(("tmp", tmp_path)) except subprocess.CalledProcessError as e: print(f"Warning: Failed to mount /tmp: {e}") def _setup_bind_mount(self, chroot_name: str, bind_mount: Dict[str, str]) -> None: """Setup a custom bind mount""" host_path = bind_mount.get("host") chroot_path = bind_mount.get("chroot") options = bind_mount.get("options", "") if not host_path or not chroot_path: print(f"Warning: Invalid bind mount configuration: {bind_mount}") return # Create chroot mount point full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/")) os.makedirs(full_chroot_path, exist_ok=True) try: mount_cmd = ["mount", "--bind"] if options: mount_cmd.extend(["-o", options]) mount_cmd.extend([host_path, full_chroot_path]) subprocess.run(mount_cmd, check=True) self._active_mounts[chroot_name].append(("bind", full_chroot_path)) except subprocess.CalledProcessError as e: print(f"Warning: Failed to setup bind mount {host_path} -> {chroot_path}: {e}") def _setup_tmpfs_mount(self, chroot_name: str, tmpfs_mount: Dict[str, str]) -> None: """Setup a tmpfs mount""" chroot_path = tmpfs_mount.get("chroot") size = tmpfs_mount.get("size", "100M") options = tmpfs_mount.get("options", "") if not chroot_path: print(f"Warning: Invalid tmpfs mount configuration: {tmpfs_mount}") return # Create chroot mount point full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/")) os.makedirs(full_chroot_path, exist_ok=True) try: mount_cmd = ["mount", "-t", "tmpfs", "-o", f"size={size}"] if options: mount_cmd[-1] += f",{options}" mount_cmd.extend(["tmpfs", full_chroot_path]) subprocess.run(mount_cmd, check=True) self._active_mounts[chroot_name].append(("tmpfs", full_chroot_path)) except subprocess.CalledProcessError as e: print(f"Warning: Failed to setup tmpfs mount {chroot_path}: {e}") def _setup_overlay_mount(self, chroot_name: str, overlay_mount: Dict[str, str]) -> None: """Setup an overlay mount (requires overlayfs support)""" lower_dir = overlay_mount.get("lower") upper_dir = overlay_mount.get("upper") work_dir = overlay_mount.get("work") chroot_path = overlay_mount.get("chroot") if not all([lower_dir, upper_dir, work_dir, chroot_path]): print(f"Warning: Invalid overlay mount configuration: {overlay_mount}") return # Create chroot mount point full_chroot_path = os.path.join(self.config.chroot_dir, chroot_name, chroot_path.lstrip("/")) os.makedirs(full_chroot_path, exist_ok=True) try: # Create work directory if it doesn't exist os.makedirs(work_dir, exist_ok=True) mount_cmd = [ "mount", "-t", "overlay", "overlay", "-o", f"lowerdir={lower_dir},upperdir={upper_dir},workdir={work_dir}", full_chroot_path ] subprocess.run(mount_cmd, check=True) self._active_mounts[chroot_name].append(("overlay", full_chroot_path)) except subprocess.CalledProcessError as e: print(f"Warning: Failed to setup overlay mount {chroot_path}: {e}") def cleanup_mounts(self, chroot_name: str) -> None: """Clean up all mounts for a chroot""" if chroot_name not in self._active_mounts: return for mount_type, mount_path in reversed(self._active_mounts[chroot_name]): try: subprocess.run(["umount", mount_path], check=True) print(f"Unmounted {mount_type}: {mount_path}") except subprocess.CalledProcessError as e: print(f"Warning: Failed to unmount {mount_type} {mount_path}: {e}") # Clear the mount list self._active_mounts[chroot_name] = [] def list_mounts(self, chroot_name: str) -> List[Dict[str, str]]: """List all active mounts for a chroot""" if chroot_name not in self._active_mounts: return [] mounts = [] for mount_type, mount_path in self._active_mounts[chroot_name]: mounts.append({ "type": mount_type, "path": mount_path, "chroot": chroot_name }) return mounts def _setup_chroot_users(self, chroot_name: str) -> None: """Setup users and permissions in the chroot""" chroot_path = os.path.join(self.config.chroot_dir, chroot_name) try: # Create the build user self.uid_manager.create_chroot_user(chroot_path) # Copy host users if configured if hasattr(self.config, 'copy_host_users'): for username in self.config.copy_host_users: self.uid_manager.copy_host_user(chroot_path, username) # Setup chroot permissions self.uid_manager.setup_chroot_permissions(chroot_path) except Exception as e: raise ChrootError( f"Failed to setup chroot users: {e}", chroot_name=chroot_name, operation="user_setup" )