""" Core DebMock class for orchestrating the build process """ import os import threading import concurrent.futures from pathlib import Path from typing import Any, Dict, List, Optional import time from .cache import CacheManager from .chroot import ChrootManager from .config import Config from .exceptions import ChrootError from .metadata import MetadataManager from .sbuild import SbuildWrapper from .plugin import PluginManager, HookStages from .performance import PerformanceMonitor, PerformanceOptimizer, PerformanceReporter class DebMock: """Main DebMock class for orchestrating package builds""" def __init__(self, config: Config): self.config = config self.chroot_manager = ChrootManager(config) self.sbuild_wrapper = SbuildWrapper(config) self.metadata_manager = MetadataManager(config) self.cache_manager = CacheManager(config) self.plugin_manager = PluginManager(config) # Validate configuration self.config.validate() # Setup caches self._setup_caches() # Initialize plugins self.plugin_manager.init_plugins(self) # Initialize performance monitoring self.performance_monitor = PerformanceMonitor(config) self.performance_optimizer = PerformanceOptimizer(config) self.performance_reporter = PerformanceReporter(config) # Parallel build support self._build_lock = threading.Lock() self._active_builds = {} def _setup_caches(self) -> None: """Setup cache directories and ccache""" try: # Setup ccache if enabled if self.config.use_ccache: self.cache_manager.setup_ccache() except Exception as e: # Log warning but continue print(f"Warning: Failed to setup caches: {e}") def build(self, source_package: str, **kwargs) -> Dict[str, Any]: """Build a Debian source package in an isolated environment""" # Create build profile for performance tracking build_id = f"build_{int(time.time() * 1000)}" profile_id = self.performance_monitor.create_build_profile( build_id, source_package, self.config.architecture, self.config.suite ) # Call pre-build hooks self.plugin_manager.call_hooks(HookStages.PREBUILD, source_package, **kwargs) # Ensure chroot exists chroot_name = kwargs.get("chroot_name", self.config.chroot_name) chroot_path = self.config.get_chroot_path() if not self.chroot_manager.chroot_exists(chroot_name): with self.performance_monitor.monitor_operation("chroot_creation") as op_id: self.chroot_manager.create_chroot(chroot_name) # Add chroot creation metrics to profile self.performance_monitor.add_phase_metrics(profile_id, "chroot_creation", self.performance_monitor._active_operations[op_id]) # Try to restore from cache first if not self.chroot_manager.chroot_exists(chroot_name): if not self.cache_manager.restore_root_cache(chroot_path): self.chroot_manager.create_chroot(chroot_name) # Setup build environment with self.performance_monitor.monitor_operation("build_env_setup") as op_id: build_env = self.config.setup_build_environment() # Add build environment setup metrics to profile self.performance_monitor.add_phase_metrics(profile_id, "build_env_setup", self.performance_monitor._active_operations[op_id]) # Call build start hook self.plugin_manager.call_hooks(HookStages.BUILD_START, source_package, chroot_name, **kwargs) # Build the package with self.performance_monitor.monitor_operation("package_build") as op_id: build_result = self.sbuild_wrapper.build_package(source_package, chroot_name, build_env=build_env, **kwargs) # Add package build metrics to profile self.performance_monitor.add_phase_metrics(profile_id, "package_build", self.performance_monitor._active_operations[op_id]) # Call build end hook self.plugin_manager.call_hooks(HookStages.BUILD_END, build_result, source_package, chroot_name, **kwargs) # Create cache after successful build if build_result.get("success", False): with self.performance_monitor.monitor_operation("cache_creation") as op_id: self.cache_manager.create_root_cache(chroot_path) # Add cache creation metrics to profile self.performance_monitor.add_phase_metrics(profile_id, "cache_creation", self.performance_monitor._active_operations[op_id]) # Capture and store metadata with self.performance_monitor.monitor_operation("metadata_capture") as op_id: metadata = self._capture_build_metadata(build_result, source_package) self.metadata_manager.store_metadata(metadata) # Add metadata capture metrics to profile self.performance_monitor.add_phase_metrics(profile_id, "metadata_capture", self.performance_monitor._active_operations[op_id]) # Clean up chroot if not keeping it if not kwargs.get("keep_chroot", self.config.keep_chroot): with self.performance_monitor.monitor_operation("chroot_cleanup") as op_id: self.chroot_manager.clean_chroot(chroot_name) # Add chroot cleanup metrics to profile self.performance_monitor.add_phase_metrics(profile_id, "chroot_cleanup", self.performance_monitor._active_operations[op_id]) # Call post-build hooks self.plugin_manager.call_hooks(HookStages.POSTBUILD, build_result, source_package, **kwargs) # Finalize build profile and generate optimization suggestions build_profile = self.performance_monitor.finalize_build_profile(profile_id) if build_profile and self.config.performance_auto_optimization: analysis = self.performance_optimizer.analyze_build_performance(build_profile) if analysis['automatic_tunings']: self.performance_optimizer.apply_automatic_tunings(analysis['automatic_tunings']) return build_result def build_parallel(self, source_packages: List[str], max_workers: int = None, **kwargs) -> List[Dict[str, Any]]: """ Build multiple packages in parallel using multiple chroots Args: source_packages: List of source packages to build max_workers: Maximum number of parallel builds (default: config.parallel_builds) **kwargs: Additional build options Returns: List of build results in the same order as source_packages """ if max_workers is None: max_workers = getattr(self.config, 'parallel_builds', 2) # Limit max_workers to available system resources max_workers = min(max_workers, os.cpu_count() or 2) print(f"Building {len(source_packages)} packages with {max_workers} parallel workers") # Create unique chroot names for parallel builds chroot_names = [f"{self.config.chroot_name}-parallel-{i}" for i in range(len(source_packages))] # Prepare build tasks build_tasks = [] for i, (source_package, chroot_name) in enumerate(zip(source_packages, chroot_names)): task_kwargs = kwargs.copy() task_kwargs['chroot_name'] = chroot_name task_kwargs['package_index'] = i build_tasks.append((source_package, task_kwargs)) # Execute builds in parallel results = [None] * len(source_packages) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all build tasks future_to_index = { executor.submit(self._build_single_parallel, source_pkg, **task_kwargs): i for i, (source_pkg, task_kwargs) in enumerate(build_tasks) } # Collect results as they complete for future in concurrent.futures.as_completed(future_to_index): index = future_to_index[future] try: result = future.result() results[index] = result print(f"✅ Package {index + 1}/{len(source_packages)} completed: {result.get('package_name', 'unknown')}") except Exception as e: results[index] = { 'success': False, 'error': str(e), 'package_name': source_packages[index] if index < len(source_packages) else 'unknown' } print(f"❌ Package {index + 1}/{len(source_packages)} failed: {e}") # Clean up parallel chroots for chroot_name in chroot_names: try: self.chroot_manager.clean_chroot(chroot_name) except Exception as e: print(f"Warning: Failed to clean chroot {chroot_name}: {e}") return results def _build_single_parallel(self, source_package: str, **kwargs) -> Dict[str, Any]: """Build a single package for parallel execution""" chroot_name = kwargs.get("chroot_name", self.config.chroot_name) package_index = kwargs.get("package_index", 0) print(f"🔄 Starting parallel build {package_index + 1}: {source_package}") try: # Ensure chroot exists for this parallel build chroot_path = os.path.join(self.config.chroot_dir, chroot_name) if not self.chroot_manager.chroot_exists(chroot_name): if not self.cache_manager.restore_root_cache(chroot_path): self.chroot_manager.create_chroot(chroot_name) # Check build dependencies deps_check = self.sbuild_wrapper.check_dependencies(source_package, chroot_name) if not deps_check["satisfied"]: if deps_check["missing"]: self.sbuild_wrapper.install_build_dependencies(deps_check["missing"], chroot_name) # Setup build environment build_env = self.config.setup_build_environment() # Build the package build_result = self.sbuild_wrapper.build_package( source_package, chroot_name, build_env=build_env, **kwargs ) # Create cache after successful build if build_result.get("success", False): self.cache_manager.create_root_cache(chroot_path) # Capture and store metadata metadata = self._capture_build_metadata(build_result, source_package) self.metadata_manager.store_metadata(metadata) return build_result except Exception as e: return { 'success': False, 'error': str(e), 'package_name': source_package, 'chroot_name': chroot_name } def build_chain(self, source_packages: List[str], **kwargs) -> List[Dict[str, Any]]: """Build a chain of packages that depend on each other (similar to Mock's --chain)""" results = [] chroot_name = kwargs.get("chroot_name", self.config.chroot_name) chroot_path = self.config.get_chroot_path() # Try to restore from cache first if not self.chroot_manager.chroot_exists(chroot_name): if not self.cache_manager.restore_root_cache(chroot_path): self.chroot_manager.create_chroot(chroot_name) # Setup build environment build_env = self.config.setup_build_environment() for i, source_package in enumerate(source_packages): try: # Build the package result = self.sbuild_wrapper.build_package(source_package, chroot_name, build_env=build_env, **kwargs) # Store result results.append(result) # If build failed, stop the chain if not result.get("success", False): print(f"Chain build failed at package {i+1}: {source_package}") break # Install the built package for dependency resolution if result.get("success", False) and kwargs.get("install_built", True): self._install_built_package(result, chroot_name) except Exception as e: error_result = { "success": False, "error": str(e), "package": source_package, "chain_position": i } results.append(error_result) break return results def _install_built_package(self, build_result: Dict[str, Any], chroot_name: str) -> None: """Install a built package in the chroot for dependency resolution""" try: # Extract .deb files from build result deb_files = build_result.get("artifacts", {}).get("deb_files", []) for deb_file in deb_files: if deb_file.endswith(".deb"): # Copy .deb to chroot and install self.chroot_manager.copy_in(deb_file, chroot_name, "/tmp/") # Install the package install_cmd = ["dpkg", "-i", f"/tmp/{os.path.basename(deb_file)}"] self.chroot_manager.execute_in_chroot(chroot_name, install_cmd) # Fix any broken dependencies fix_cmd = ["apt-get", "install", "-f", "-y"] self.chroot_manager.execute_in_chroot(chroot_name, fix_cmd) except Exception as e: print(f"Warning: Failed to install built package: {e}") def init_chroot(self, chroot_name: str, arch: str = None, suite: str = None) -> None: """Initialize a new chroot environment""" self.chroot_manager.create_chroot(chroot_name, arch, suite) # Create cache after successful chroot creation chroot_path = os.path.join(self.config.chroot_dir, chroot_name) self.cache_manager.create_root_cache(chroot_path) def clean_chroot(self, chroot_name: str) -> None: """Clean up a chroot environment""" self.chroot_manager.clean_chroot(chroot_name) def list_chroots(self) -> list: """List available chroot environments""" return self.chroot_manager.list_chroots() def update_chroot(self, chroot_name: str) -> None: """Update packages in a chroot environment""" self.chroot_manager.update_chroot(chroot_name) # Update cache after successful update chroot_path = os.path.join(self.config.chroot_dir, chroot_name) self.cache_manager.create_root_cache(chroot_path) def get_chroot_info(self, chroot_name: str) -> dict: """Get information about a chroot environment""" return self.chroot_manager.get_chroot_info(chroot_name) def shell(self, chroot_name: str = None) -> None: """Open a shell in the chroot environment (similar to Mock's --shell)""" if chroot_name is None: chroot_name = self.config.chroot_name if not self.chroot_manager.chroot_exists(chroot_name): raise ChrootError(f"Chroot '{chroot_name}' does not exist") # Execute shell in chroot self.chroot_manager.execute_in_chroot(chroot_name, ["/bin/bash"], capture_output=False) def copyout(self, source_path: str, dest_path: str, chroot_name: str = None) -> None: """Copy files from chroot to host (similar to Mock's --copyout)""" if chroot_name is None: chroot_name = self.config.chroot_name self.chroot_manager.copy_from_chroot(source_path, dest_path, chroot_name) def copyin(self, source_path: str, dest_path: str, chroot_name: str = None) -> None: """Copy files from host to chroot (similar to Mock's --copyin)""" if chroot_name is None: chroot_name = self.config.chroot_name self.chroot_manager.copy_to_chroot(source_path, dest_path, chroot_name) def cleanup_caches(self) -> Dict[str, int]: """Clean up old cache files (similar to Mock's cache management)""" return self.cache_manager.cleanup_old_caches() def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics""" return self.cache_manager.get_cache_stats() def _capture_build_metadata(self, build_result: Dict[str, Any], source_package: str) -> Dict[str, Any]: """Capture comprehensive build metadata""" metadata = { "source_package": source_package, "build_result": build_result, "config": self.config.to_dict(), "artifacts": build_result.get("artifacts", []), "build_metadata": build_result.get("metadata", {}), "timestamp": self._get_timestamp(), "build_success": build_result.get("success", False), "cache_info": self.get_cache_stats(), } # Add artifact details metadata["artifact_details"] = self._get_artifact_details(build_result.get("artifacts", [])) return metadata def _get_timestamp(self) -> str: """Get current timestamp""" from datetime import datetime return datetime.now().isoformat() def _get_artifact_details(self, artifacts: list) -> list: """Get detailed information about build artifacts""" details = [] for artifact_path in artifacts: if os.path.exists(artifact_path): stat = os.stat(artifact_path) details.append( { "path": artifact_path, "name": os.path.basename(artifact_path), "size": stat.st_size, "modified": stat.st_mtime, "type": self._get_artifact_type(artifact_path), } ) return details def _get_artifact_type(self, artifact_path: str) -> str: """Determine the type of build artifact""" ext = Path(artifact_path).suffix.lower() if ext == ".deb": return "deb_package" elif ext == ".changes": return "changes_file" elif ext == ".buildinfo": return "buildinfo_file" elif ext == ".dsc": return "source_package" else: return "other" def verify_reproducible_build(self, source_package: str, **kwargs) -> Dict[str, Any]: """Verify that a build is reproducible by building twice and comparing results""" # First build result1 = self.build(source_package, **kwargs) # Clean chroot for second build chroot_name = kwargs.get("chroot_name", self.config.chroot_name) if self.chroot_manager.chroot_exists(chroot_name): self.chroot_manager.clean_chroot(chroot_name) # Second build result2 = self.build(source_package, **kwargs) # Compare results comparison = self._compare_build_results(result1, result2) return { "reproducible": comparison["identical"], "first_build": result1, "second_build": result2, "comparison": comparison, } def _compare_build_results(self, result1: Dict[str, Any], result2: Dict[str, Any]) -> Dict[str, Any]: """Compare two build results for reproducibility""" comparison = {"identical": True, "differences": [], "artifact_comparison": {}} # Compare artifacts artifacts1 = set(result1.get("artifacts", [])) artifacts2 = set(result2.get("artifacts", [])) if artifacts1 != artifacts2: comparison["identical"] = False comparison["differences"].append("Different artifacts produced") # Compare individual artifacts common_artifacts = artifacts1.intersection(artifacts2) for artifact in common_artifacts: if os.path.exists(artifact): # Compare file hashes hash1 = self._get_file_hash(artifact) hash2 = self._get_file_hash(artifact) comparison["artifact_comparison"][artifact] = { "identical": hash1 == hash2, "hash1": hash1, "hash2": hash2, } if hash1 != hash2: comparison["identical"] = False comparison["differences"].append(f"Artifact {artifact} differs") return comparison def _get_file_hash(self, file_path: str) -> str: """Get SHA256 hash of a file""" import hashlib hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() def get_build_history(self) -> list: """Get build history from metadata store""" return self.metadata_manager.get_build_history() def get_build_info(self, build_id: str) -> Optional[Dict[str, Any]]: """Get information about a specific build""" return self.metadata_manager.get_build_info(build_id) def install_dependencies(self, source_package: str) -> Dict[str, Any]: """Install build dependencies for a source package""" chroot_name = self.config.chroot_name # Ensure chroot exists if not self.chroot_manager.chroot_exists(chroot_name): self.chroot_manager.create_chroot(chroot_name) # Check and install dependencies deps_check = self.sbuild_wrapper.check_dependencies(source_package, chroot_name) if deps_check["missing"]: result = self.sbuild_wrapper.install_build_dependencies(deps_check["missing"], chroot_name) return { "success": True, "installed": deps_check["missing"], "details": result, } else: return { "success": True, "installed": [], "message": "All dependencies already satisfied", } def install_packages(self, packages: List[str]) -> Dict[str, Any]: """Install packages in the chroot environment""" chroot_name = self.config.chroot_name # Ensure chroot exists if not self.chroot_manager.chroot_exists(chroot_name): self.chroot_manager.create_chroot(chroot_name) # Install packages using APT result = self.chroot_manager.execute_in_chroot( chroot_name, f"{self.config.apt_install_command} {' '.join(packages)}", ) return { "success": result.returncode == 0, "installed": packages, "output": result.stdout, "error": result.stderr if result.returncode != 0 else None, } def update_packages(self, packages: List[str] = None) -> Dict[str, Any]: """Update packages in the chroot environment""" chroot_name = self.config.chroot_name # Ensure chroot exists if not self.chroot_manager.chroot_exists(chroot_name): self.chroot_manager.create_chroot(chroot_name) if packages: # Update specific packages cmd = f"{self.config.apt_command} install --only-upgrade {' '.join(packages)}" else: # Update all packages cmd = f"{self.config.apt_command} update && {self.config.apt_command} upgrade -y" result = self.chroot_manager.execute_in_chroot(chroot_name, cmd) return { "success": result.returncode == 0, "updated": packages if packages else "all", "output": result.stdout, "error": result.stderr if result.returncode != 0 else None, } def remove_packages(self, packages: List[str]) -> Dict[str, Any]: """Remove packages from the chroot environment""" chroot_name = self.config.chroot_name # Ensure chroot exists if not self.chroot_manager.chroot_exists(chroot_name): self.chroot_manager.create_chroot(chroot_name) # Remove packages using APT cmd = f"{self.config.apt_command} remove -y {' '.join(packages)}" result = self.chroot_manager.execute_in_chroot(chroot_name, cmd) return { "success": result.returncode == 0, "removed": packages, "output": result.stdout, "error": result.stderr if result.returncode != 0 else None, } def execute_apt_command(self, command: str) -> Dict[str, Any]: """Execute APT command in the chroot environment""" chroot_name = self.config.chroot_name # Ensure chroot exists if not self.chroot_manager.chroot_exists(chroot_name): self.chroot_manager.create_chroot(chroot_name) # Execute APT command cmd = f"{self.config.apt_command} {command}" result = self.chroot_manager.execute_in_chroot(chroot_name, cmd) return { "success": result.returncode == 0, "command": command, "output": result.stdout, "error": result.stderr if result.returncode != 0 else None, }