deb-mock/deb_mock/plugins/bind_mount.py
robojerk 5e7f4b0562
Some checks failed
Build Deb-Mock Package / build (push) Successful in 55s
Lint Code / Lint All Code (push) Failing after 3s
Test Deb-Mock Build / test (push) Failing after 53s
Fix sbuild integration and clean up codebase
- Fix environment variable handling in sbuild wrapper
- Remove unsupported --log-dir and --env options from sbuild command
- Clean up unused imports and fix linting issues
- Organize examples directory with official Debian hello package
- Fix YAML formatting (trailing spaces, newlines)
- Remove placeholder example files
- All tests passing (30/30)
- Successfully tested build with official Debian hello package
2025-08-04 04:34:32 +00:00

238 lines
8.8 KiB
Python

"""
BindMount Plugin for Deb-Mock
This plugin allows mounting host directories into chroot environments,
inspired by Fedora's Mock bind_mount plugin but adapted for Debian-based systems.
"""
import logging
import os
import subprocess
from pathlib import Path
from typing import Any, Dict, List, Tuple
from .base import BasePlugin
logger = logging.getLogger(__name__)
class BindMountPlugin(BasePlugin):
"""
Mount host directories into chroot environments.
This plugin allows users to mount host directories into the chroot
environment, which is useful for development workflows, shared
libraries, and other scenarios where host files need to be accessible
within the build environment.
"""
def __init__(self, config, hook_manager):
"""Initialize the BindMount plugin."""
super().__init__(config, hook_manager)
self.mounts = self._get_mounts()
self._log_info(f"Initialized with {len(self.mounts)} mount points")
def _register_hooks(self):
"""Register bind mount hooks."""
self.hook_manager.add_hook("mount_root", self.mount_root)
self.hook_manager.add_hook("postumount", self.postumount)
self._log_debug("Registered mount_root and postumount hooks")
def _get_mounts(self) -> List[Tuple[str, str]]:
"""
Get mount points from configuration.
Returns:
List of (host_path, chroot_path) tuples
"""
plugin_config = self._get_plugin_config()
mounts = []
# Get mounts from configuration
if "mounts" in plugin_config:
for mount_config in plugin_config["mounts"]:
if isinstance(mount_config, dict):
host_path = mount_config.get("host_path")
chroot_path = mount_config.get("chroot_path")
elif isinstance(mount_config, (list, tuple)) and len(mount_config) >= 2:
host_path = mount_config[0]
chroot_path = mount_config[1]
else:
self._log_warning(f"Invalid mount configuration: {mount_config}")
continue
if host_path and chroot_path:
mounts.append((host_path, chroot_path))
# Legacy support for 'dirs' configuration (Mock compatibility)
if "dirs" in plugin_config:
for host_path, chroot_path in plugin_config["dirs"]:
mounts.append((host_path, chroot_path))
return mounts
def mount_root(self, context: Dict[str, Any]) -> None:
"""
Mount bind mounts when chroot is mounted.
Args:
context: Context dictionary with chroot information
"""
if not self.enabled or not self.mounts:
return
chroot_path = context.get("chroot_path")
if not chroot_path:
self._log_warning("No chroot_path in context, skipping bind mounts")
return
self._log_info(f"Setting up {len(self.mounts)} bind mounts")
for host_path, chroot_mount_path in self.mounts:
try:
self._setup_bind_mount(host_path, chroot_mount_path, chroot_path)
except Exception as e:
self._log_error(f"Failed to setup bind mount {host_path} -> {chroot_mount_path}: {e}")
def postumount(self, context: Dict[str, Any]) -> None:
"""
Unmount bind mounts when chroot is unmounted.
Args:
context: Context dictionary with chroot information
"""
if not self.enabled or not self.mounts:
return
chroot_path = context.get("chroot_path")
if not chroot_path:
self._log_warning("No chroot_path in context, skipping bind mount cleanup")
return
self._log_info(f"Cleaning up {len(self.mounts)} bind mounts")
for host_path, chroot_mount_path in self.mounts:
try:
self._cleanup_bind_mount(chroot_mount_path, chroot_path)
except Exception as e:
self._log_error(f"Failed to cleanup bind mount {chroot_mount_path}: {e}")
def _setup_bind_mount(self, host_path: str, chroot_mount_path: str, chroot_path: str) -> None:
"""
Setup a single bind mount.
Args:
host_path: Path on the host to mount
chroot_mount_path: Path in the chroot where to mount
chroot_path: Base chroot path
"""
# Ensure host path exists
if not os.path.exists(host_path):
self._log_warning(f"Host path does not exist: {host_path}")
return
# Create full chroot mount path
full_chroot_path = os.path.join(chroot_path, chroot_mount_path.lstrip("/"))
# Create mount point directory if it doesn't exist
mount_point_dir = os.path.dirname(full_chroot_path)
if not os.path.exists(mount_point_dir):
os.makedirs(mount_point_dir, exist_ok=True)
self._log_debug(f"Created mount point directory: {mount_point_dir}")
# Create mount point if it's a file
if os.path.isfile(host_path) and not os.path.exists(full_chroot_path):
Path(full_chroot_path).touch()
self._log_debug(f"Created file mount point: {full_chroot_path}")
# Perform the bind mount
try:
cmd = ["mount", "--bind", host_path, full_chroot_path]
subprocess.run(cmd, capture_output=True, text=True, check=True)
self._log_debug(f"Successfully mounted {host_path} -> {full_chroot_path}")
except subprocess.CalledProcessError as e:
self._log_error(f"Failed to mount {host_path} -> {full_chroot_path}: {e.stderr}")
raise
except FileNotFoundError:
self._log_error("mount command not found - ensure mount is available")
raise
def _cleanup_bind_mount(self, chroot_mount_path: str, chroot_path: str) -> None:
"""
Cleanup a single bind mount.
Args:
chroot_mount_path: Path in the chroot that was mounted
chroot_path: Base chroot path
"""
full_chroot_path = os.path.join(chroot_path, chroot_mount_path.lstrip("/"))
try:
cmd = ["umount", full_chroot_path]
subprocess.run(cmd, capture_output=True, text=True, check=True)
self._log_debug(f"Successfully unmounted: {full_chroot_path}")
except subprocess.CalledProcessError:
# Try force unmount if regular unmount fails
try:
cmd = ["umount", "-f", full_chroot_path]
subprocess.run(cmd, capture_output=True, text=True, check=True)
self._log_debug(f"Successfully force unmounted: {full_chroot_path}")
except subprocess.CalledProcessError as e2:
self._log_warning(f"Failed to unmount {full_chroot_path}: {e2.stderr}")
except FileNotFoundError:
self._log_error("umount command not found - ensure umount is available")
def validate_config(self, config: Any) -> bool:
"""
Validate plugin configuration.
Args:
config: Configuration to validate
Returns:
True if configuration is valid, False otherwise
"""
plugin_config = getattr(config, "plugins", {}).get("bind_mount", {})
# Check mounts configuration
if "mounts" in plugin_config:
for mount_config in plugin_config["mounts"]:
if isinstance(mount_config, dict):
if not all(key in mount_config for key in ["host_path", "chroot_path"]):
self._log_error("Mount configuration missing required keys: host_path, chroot_path")
return False
elif isinstance(mount_config, (list, tuple)):
if len(mount_config) < 2:
self._log_error("Mount configuration must have at least 2 elements")
return False
else:
self._log_error(f"Invalid mount configuration format: {mount_config}")
return False
# Check dirs configuration (legacy)
if "dirs" in plugin_config:
for host_path, chroot_path in plugin_config["dirs"]:
if not host_path or not chroot_path:
self._log_error("Invalid dirs configuration: host_path and chroot_path must be non-empty")
return False
return True
def get_plugin_info(self) -> Dict[str, Any]:
"""
Get plugin information.
Returns:
Dictionary with plugin information
"""
info = super().get_plugin_info()
info.update(
{
"mounts": self.mounts,
"mount_count": len(self.mounts),
"hooks": ["mount_root", "postumount"],
}
)
return info