""" BindMount Plugin for Deb-Mock This plugin allows mounting host directories into chroot environments, inspired by Fedora's Mock bind_mount plugin but adapted for Debian-based systems. """ import logging import os import subprocess from pathlib import Path from typing import Any, Dict, List, Tuple from .base import BasePlugin logger = logging.getLogger(__name__) class BindMountPlugin(BasePlugin): """ Mount host directories into chroot environments. This plugin allows users to mount host directories into the chroot environment, which is useful for development workflows, shared libraries, and other scenarios where host files need to be accessible within the build environment. """ def __init__(self, config, hook_manager): """Initialize the BindMount plugin.""" super().__init__(config, hook_manager) self.mounts = self._get_mounts() self._log_info(f"Initialized with {len(self.mounts)} mount points") def _register_hooks(self): """Register bind mount hooks.""" self.hook_manager.add_hook("mount_root", self.mount_root) self.hook_manager.add_hook("postumount", self.postumount) self._log_debug("Registered mount_root and postumount hooks") def _get_mounts(self) -> List[Tuple[str, str]]: """ Get mount points from configuration. Returns: List of (host_path, chroot_path) tuples """ plugin_config = self._get_plugin_config() mounts = [] # Get mounts from configuration if "mounts" in plugin_config: for mount_config in plugin_config["mounts"]: if isinstance(mount_config, dict): host_path = mount_config.get("host_path") chroot_path = mount_config.get("chroot_path") elif isinstance(mount_config, (list, tuple)) and len(mount_config) >= 2: host_path = mount_config[0] chroot_path = mount_config[1] else: self._log_warning(f"Invalid mount configuration: {mount_config}") continue if host_path and chroot_path: mounts.append((host_path, chroot_path)) # Legacy support for 'dirs' configuration (Mock compatibility) if "dirs" in plugin_config: for host_path, chroot_path in plugin_config["dirs"]: mounts.append((host_path, chroot_path)) return mounts def mount_root(self, context: Dict[str, Any]) -> None: """ Mount bind mounts when chroot is mounted. Args: context: Context dictionary with chroot information """ if not self.enabled or not self.mounts: return chroot_path = context.get("chroot_path") if not chroot_path: self._log_warning("No chroot_path in context, skipping bind mounts") return self._log_info(f"Setting up {len(self.mounts)} bind mounts") for host_path, chroot_mount_path in self.mounts: try: self._setup_bind_mount(host_path, chroot_mount_path, chroot_path) except Exception as e: self._log_error(f"Failed to setup bind mount {host_path} -> {chroot_mount_path}: {e}") def postumount(self, context: Dict[str, Any]) -> None: """ Unmount bind mounts when chroot is unmounted. Args: context: Context dictionary with chroot information """ if not self.enabled or not self.mounts: return chroot_path = context.get("chroot_path") if not chroot_path: self._log_warning("No chroot_path in context, skipping bind mount cleanup") return self._log_info(f"Cleaning up {len(self.mounts)} bind mounts") for host_path, chroot_mount_path in self.mounts: try: self._cleanup_bind_mount(chroot_mount_path, chroot_path) except Exception as e: self._log_error(f"Failed to cleanup bind mount {chroot_mount_path}: {e}") def _setup_bind_mount(self, host_path: str, chroot_mount_path: str, chroot_path: str) -> None: """ Setup a single bind mount. Args: host_path: Path on the host to mount chroot_mount_path: Path in the chroot where to mount chroot_path: Base chroot path """ # Ensure host path exists if not os.path.exists(host_path): self._log_warning(f"Host path does not exist: {host_path}") return # Create full chroot mount path full_chroot_path = os.path.join(chroot_path, chroot_mount_path.lstrip("/")) # Create mount point directory if it doesn't exist mount_point_dir = os.path.dirname(full_chroot_path) if not os.path.exists(mount_point_dir): os.makedirs(mount_point_dir, exist_ok=True) self._log_debug(f"Created mount point directory: {mount_point_dir}") # Create mount point if it's a file if os.path.isfile(host_path) and not os.path.exists(full_chroot_path): Path(full_chroot_path).touch() self._log_debug(f"Created file mount point: {full_chroot_path}") # Perform the bind mount try: cmd = ["mount", "--bind", host_path, full_chroot_path] subprocess.run(cmd, capture_output=True, text=True, check=True) self._log_debug(f"Successfully mounted {host_path} -> {full_chroot_path}") except subprocess.CalledProcessError as e: self._log_error(f"Failed to mount {host_path} -> {full_chroot_path}: {e.stderr}") raise except FileNotFoundError: self._log_error("mount command not found - ensure mount is available") raise def _cleanup_bind_mount(self, chroot_mount_path: str, chroot_path: str) -> None: """ Cleanup a single bind mount. Args: chroot_mount_path: Path in the chroot that was mounted chroot_path: Base chroot path """ full_chroot_path = os.path.join(chroot_path, chroot_mount_path.lstrip("/")) try: cmd = ["umount", full_chroot_path] subprocess.run(cmd, capture_output=True, text=True, check=True) self._log_debug(f"Successfully unmounted: {full_chroot_path}") except subprocess.CalledProcessError: # Try force unmount if regular unmount fails try: cmd = ["umount", "-f", full_chroot_path] subprocess.run(cmd, capture_output=True, text=True, check=True) self._log_debug(f"Successfully force unmounted: {full_chroot_path}") except subprocess.CalledProcessError as e2: self._log_warning(f"Failed to unmount {full_chroot_path}: {e2.stderr}") except FileNotFoundError: self._log_error("umount command not found - ensure umount is available") def validate_config(self, config: Any) -> bool: """ Validate plugin configuration. Args: config: Configuration to validate Returns: True if configuration is valid, False otherwise """ plugin_config = getattr(config, "plugins", {}).get("bind_mount", {}) # Check mounts configuration if "mounts" in plugin_config: for mount_config in plugin_config["mounts"]: if isinstance(mount_config, dict): if not all(key in mount_config for key in ["host_path", "chroot_path"]): self._log_error("Mount configuration missing required keys: host_path, chroot_path") return False elif isinstance(mount_config, (list, tuple)): if len(mount_config) < 2: self._log_error("Mount configuration must have at least 2 elements") return False else: self._log_error(f"Invalid mount configuration format: {mount_config}") return False # Check dirs configuration (legacy) if "dirs" in plugin_config: for host_path, chroot_path in plugin_config["dirs"]: if not host_path or not chroot_path: self._log_error("Invalid dirs configuration: host_path and chroot_path must be non-empty") return False return True def get_plugin_info(self) -> Dict[str, Any]: """ Get plugin information. Returns: Dictionary with plugin information """ info = super().get_plugin_info() info.update( { "mounts": self.mounts, "mount_count": len(self.mounts), "hooks": ["mount_root", "postumount"], } ) return info